fix(ec): bring ec.encode worker and EC/volume helpers to parity with shell (#9599)

* refactor(volume): extract replica sync/select into shared volume_replica package

Move the volume replica reconciliation helpers (status, union builder,
SyncAndSelectBestReplica, ReadNeedleMeta) out of the shell into a new
weed/storage/volume_replica package so both the shell (ec.encode, volume.tier.move,
volume.check.disk) and the EC encode worker can reuse them. No behavior change.

* fix(ec): bring ec.encode worker to parity with the shell

- Sync replicas and encode the most-complete one (via the shared
  volume_replica.SyncAndSelectBestReplica) instead of a possibly-stale replica,
  marking all replicas readonly first. Prevents silent data loss when a stale
  replica is encoded and the originals deleted.
- Skip remote/tiered volumes in detection (shell ec.encode excludes them).
- Min-node safety gate: refuse to encode when cluster nodes < parity shards.
- Align default thresholds with the shell (fullness 0.95, quiet 1h).

* fix(vacuum): plugin path honors min_volume_age_seconds override

deriveVacuumConfig hard-coded MinVolumeAgeSeconds=0, dropping any configured
value. Read it from worker config (default 0, matching the shell/master vacuum
which has no age gate) so an explicit override is honored.

* address review feedback

- config.go: align GetConfigSpec schema defaults (quiet_for_seconds=3600,
  fullness_ratio=0.95) with the runtime defaults so UI/bootstrap flows match the
  shell (coderabbitai).
- ec_task.go: roll back readonly when markReplicasReadonly fails partway, so
  already-marked replicas don't stay readonly (coderabbitai).
- volume_replica: pass the caller's replica statuses into buildUnionReplica instead
  of re-fetching them, and skip the per-needle ReadNeedleMeta RPC when the source
  replica is read-only (gemini-code-assist).

* test(plugin_workers/ec): make fixtures eligible under the new defaults

The default EC encode thresholds were raised to match the shell (fullness 0.95,
quiet 1h), but the plugin-worker integration fixtures still used 90%-full /
10-minute-old volumes, so detection found no eligible volumes and the tests failed
in CI. Bump the eligible fixtures to 96% full and 2h old.
This commit is contained in:
Chris Lu
2026-05-21 02:16:28 -07:00
committed by GitHub
parent 3f6410fdc3
commit cd15ae1395
15 changed files with 238 additions and 134 deletions

View File

@@ -207,8 +207,10 @@ func buildVolumeListResponse(t *testing.T, spec topologySpec, volumeID uint32) *
t.Helper()
volumeSizeLimitMB := uint64(100)
volumeSize := uint64(90) * 1024 * 1024
volumeModifiedAt := time.Now().Add(-10 * time.Minute).Unix()
// Exceed the default fullness (0.95) and quiet (1h) thresholds so volumes are
// EC-eligible.
volumeSize := uint64(96) * 1024 * 1024
volumeModifiedAt := time.Now().Add(-2 * time.Hour).Unix()
diskTypes := spec.diskTypes
if len(diskTypes) == 0 {

View File

@@ -29,9 +29,11 @@ func TestErasureCodingDetectionLargeTopology(t *testing.T) {
}
nodesPerRack := serverCount / rackCount
eligibleSize := uint64(90) * 1024 * 1024
// Eligible volumes must exceed the default fullness (0.95) and quiet (1h)
// thresholds; ineligible ones fall below the fullness threshold.
eligibleSize := uint64(96) * 1024 * 1024
ineligibleSize := uint64(10) * 1024 * 1024
modifiedAt := time.Now().Add(-10 * time.Minute).Unix()
modifiedAt := time.Now().Add(-2 * time.Hour).Unix()
volumeID := uint32(1)
dataCenters := make([]*master_pb.DataCenterInfo, 0, 1)

View File

@@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_replica"
)
func init() {
@@ -295,7 +296,7 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m
collection := volumeIdToCollection[vid]
// Sync missing entries between replicas, then select the best one
bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocations[vid], "", writer)
bestLoc, selectErr := volume_replica.SyncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocations[vid], "", writer)
if selectErr != nil {
return fmt.Errorf("failed to sync and select replica for volume %d: %v", vid, selectErr)
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/server/constants"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_replica"
"google.golang.org/grpc"
)
@@ -521,7 +522,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
return nil
}
if doCutoffOfLastNeedle {
if needleMeta, err := readNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil {
if needleMeta, err := volume_replica.ReadNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil {
// needles older than the cutoff time are not missing yet
if needleMeta.AppendAtNs > cutoffFromAtNs {
return nil

View File

@@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_replica"
)
func init() {
@@ -221,7 +222,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
// Sync replicas and select the best one (with highest file count) for multi-replica volumes
// This addresses data inconsistency risk in multi-replica volumes (issue #7797)
// by syncing missing entries between replicas before moving
sourceLoc, selectErr := syncAndSelectBestReplica(
sourceLoc, selectErr := volume_replica.SyncAndSelectBestReplica(
commandEnv.option.GrpcDialOption, vid, collection, locations, dst.dataNode.Id, writer)
if selectErr != nil {
fmt.Fprintf(writer, "failed to sync and select source replica for volume %d: %v\n", vid, selectErr)

View File

@@ -179,23 +179,6 @@ func handleHelpRequest(c command, args []string, writer io.Writer) bool {
return false
}
func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) {
err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
Offset: needleValue.Offset.ToActualOffset(),
Size: int32(needleValue.Size),
}); err != nil {
return err
}
return nil
},
)
return
}
func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) {
err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {

View File

@@ -1,4 +1,9 @@
package shell
// Package volume_replica reconciles regular (non-EC) volume replicas: it reads
// per-replica status, builds the union of all live entries onto the most-complete
// replica, and returns that replica. It is shared by the shell (volume.tier.move,
// ec.encode, replica check) and the EC encode worker so a stale replica is never
// used as the basis of an operation.
package volume_replica
import (
"bytes"
@@ -19,8 +24,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
// VolumeReplicaStatus represents the status of a volume replica
type VolumeReplicaStatus struct {
// ReplicaStatus is the observed state of one volume replica.
type ReplicaStatus struct {
Location wdclient.Location
FileCount uint64
FileDeletedCount uint64
@@ -29,12 +34,9 @@ type VolumeReplicaStatus struct {
Error error
}
// getVolumeReplicaStatus retrieves the current status of a volume replica
func getVolumeReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location) VolumeReplicaStatus {
status := VolumeReplicaStatus{
Location: location,
}
// GetReplicaStatus retrieves the current status of a single volume replica.
func GetReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location) ReplicaStatus {
status := ReplicaStatus{Location: location}
err := operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(vid),
@@ -54,34 +56,52 @@ func getVolumeReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId,
return status
}
// getVolumeReplicaStatuses retrieves status for all replicas of a volume in parallel
func getVolumeReplicaStatuses(grpcDialOption grpc.DialOption, vid needle.VolumeId, locations []wdclient.Location) []VolumeReplicaStatus {
statuses := make([]VolumeReplicaStatus, len(locations))
// GetReplicaStatuses retrieves status for all replicas of a volume in parallel.
func GetReplicaStatuses(grpcDialOption grpc.DialOption, vid needle.VolumeId, locations []wdclient.Location) []ReplicaStatus {
statuses := make([]ReplicaStatus, len(locations))
var wg sync.WaitGroup
for i, location := range locations {
wg.Add(1)
go func(i int, location wdclient.Location) {
defer wg.Done()
statuses[i] = getVolumeReplicaStatus(grpcDialOption, vid, location)
statuses[i] = GetReplicaStatus(grpcDialOption, vid, location)
}(i, location)
}
wg.Wait()
return statuses
}
// replicaUnionBuilder builds a union replica by copying missing entries from other replicas
type replicaUnionBuilder struct {
// ReadNeedleMeta reads a needle's metadata (e.g. AppendAtNs) from a volume server.
func ReadNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) {
err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
Offset: needleValue.Offset.ToActualOffset(),
Size: int32(needleValue.Size),
}); err != nil {
return err
}
return nil
},
)
return
}
// unionBuilder builds a union replica by copying missing entries from other replicas.
type unionBuilder struct {
grpcDialOption grpc.DialOption
writer io.Writer
vid needle.VolumeId
collection string
}
// buildUnionReplica finds the largest replica and copies missing entries from other replicas into it.
// If excludeFromSelection is non-empty, that server won't be selected as the target but will still
// be used as a source for missing entries.
// buildUnionReplica finds the largest replica and copies missing entries from other
// replicas into it. If excludeFromSelection is non-empty, that server won't be
// selected as the target but will still be used as a source for missing entries.
// Returns the location of the union replica (the one that now has all entries).
func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, excludeFromSelection string) (wdclient.Location, int, error) {
func (rub *unionBuilder) buildUnionReplica(locations []wdclient.Location, statuses []ReplicaStatus, excludeFromSelection string) (wdclient.Location, int, error) {
if len(locations) == 0 {
return wdclient.Location{}, 0, fmt.Errorf("no replicas available")
}
@@ -92,9 +112,8 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location,
return locations[0], 0, nil
}
// Step 1: Find the largest replica (highest file count) that's not excluded
statuses := getVolumeReplicaStatuses(rub.grpcDialOption, rub.vid, locations)
// Step 1: Find the largest replica (highest file count) that's not excluded.
// statuses are supplied by the caller to avoid a redundant status RPC sweep.
bestIdx := -1
var bestFileCount uint64
for i, s := range statuses {
@@ -138,7 +157,6 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location,
continue
}
// Read this replica's index
otherDB := needle_map.NewMemDb()
if otherDB == nil {
fmt.Fprintf(rub.writer, " skipping %s: failed to allocate DB\n", loc.Url)
@@ -151,19 +169,21 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location,
continue
}
// Find entries in other that are missing from best
var missingNeedles []needle_map.NeedleValue
otherDB.AscendingVisit(func(nv needle_map.NeedleValue) error {
if nv.Size.IsDeleted() {
return nil
}
if _, found := bestDB.Get(nv.Key); !found {
// Check if this entry was written too recently (after sync started)
// Skip entries written after sync started to avoid copying in-flight writes
if needleMeta, err := readNeedleMeta(rub.grpcDialOption, loc.ServerAddress(), uint32(rub.vid), nv); err == nil {
if needleMeta.AppendAtNs > cutoffFromAtNs {
return nil // Skip entries written after sync started
// Skip entries written after sync started to avoid copying in-flight
// writes. A read-only replica accepts no new writes, so the per-needle
// metadata RPC is unnecessary then (ec.encode/tier.move mark readonly
// before syncing).
if !statuses[i].IsReadOnly {
if needleMeta, err := ReadNeedleMeta(rub.grpcDialOption, loc.ServerAddress(), uint32(rub.vid), nv); err == nil {
if needleMeta.AppendAtNs > cutoffFromAtNs {
return nil
}
}
}
missingNeedles = append(missingNeedles, nv)
@@ -176,7 +196,6 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location,
continue
}
// Copy missing entries from this replica to best replica
syncedFromThis := 0
for _, nv := range missingNeedles {
needleBlob, err := rub.readNeedleBlob(loc.ServerAddress(), nv)
@@ -190,7 +209,6 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location,
continue
}
// Also add to bestDB so we don't copy duplicates from other replicas
bestDB.Set(nv.Key, nv.Offset, nv.Size)
syncedFromThis++
}
@@ -205,7 +223,7 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location,
return bestLocation, totalSynced, nil
}
func (rub *replicaUnionBuilder) readIndexDatabase(db *needle_map.MemDb, server pb.ServerAddress) error {
func (rub *unionBuilder) readIndexDatabase(db *needle_map.MemDb, server pb.ServerAddress) error {
var buf bytes.Buffer
err := operation.WithVolumeServerClient(true, server, rub.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -241,7 +259,7 @@ func (rub *replicaUnionBuilder) readIndexDatabase(db *needle_map.MemDb, server p
return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false)
}
func (rub *replicaUnionBuilder) readNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue) ([]byte, error) {
func (rub *unionBuilder) readNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue) ([]byte, error) {
var needleBlob []byte
err := operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
@@ -258,7 +276,7 @@ func (rub *replicaUnionBuilder) readNeedleBlob(server pb.ServerAddress, nv needl
return needleBlob, err
}
func (rub *replicaUnionBuilder) writeNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue, needleBlob []byte) error {
func (rub *unionBuilder) writeNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue, needleBlob []byte) error {
return operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
VolumeId: uint32(rub.vid),
@@ -270,20 +288,15 @@ func (rub *replicaUnionBuilder) writeNeedleBlob(server pb.ServerAddress, nv need
})
}
// syncAndSelectBestReplica finds the largest replica, copies missing entries from other replicas
// into it to create a union, then returns this union replica for the operation.
// If excludeFromSelection is non-empty, that server won't be selected but will still contribute entries.
//
// The process:
// 1. Find the replica with the highest file count (the "best" one), excluding excludeFromSelection
// 2. For each other replica, find entries missing from best and copy them to best
// 3. Return the best replica which now contains the union of all entries
func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, locations []wdclient.Location, excludeFromSelection string, writer io.Writer) (wdclient.Location, error) {
// SyncAndSelectBestReplica finds the largest replica, copies missing entries from
// other replicas into it to create a union, then returns this union replica for the
// operation. If excludeFromSelection is non-empty, that server won't be selected
// but will still contribute entries. Already-consistent replica sets skip the sync.
func SyncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, locations []wdclient.Location, excludeFromSelection string, writer io.Writer) (wdclient.Location, error) {
if len(locations) == 0 {
return wdclient.Location{}, fmt.Errorf("no replicas available for volume %d", vid)
}
// Filter for checking consistency (exclude the excluded server)
var checkLocations []wdclient.Location
for _, loc := range locations {
if loc.Url != excludeFromSelection {
@@ -299,14 +312,12 @@ func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeI
return checkLocations[0], nil
}
// Check if replicas are already consistent (skip sync if so)
statuses := getVolumeReplicaStatuses(grpcDialOption, vid, locations)
var validStatuses []VolumeReplicaStatus
for i, s := range statuses {
// Check if replicas are already consistent (skip sync if so).
statuses := GetReplicaStatuses(grpcDialOption, vid, locations)
var validStatuses []ReplicaStatus
for _, s := range statuses {
if s.Error == nil {
// Include all for consistency check
validStatuses = append(validStatuses, s)
_ = i
}
}
@@ -319,7 +330,6 @@ func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeI
}
}
if allSame {
// All replicas are consistent, return the best non-excluded one
for _, s := range validStatuses {
if s.Location.Url != excludeFromSelection {
fmt.Fprintf(writer, "volume %d: all %d replicas are consistent (file count: %d)\n",
@@ -330,17 +340,16 @@ func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeI
}
}
// Replicas are inconsistent, build union on the best replica
fmt.Fprintf(writer, "volume %d: replicas are inconsistent, building union...\n", vid)
builder := &replicaUnionBuilder{
builder := &unionBuilder{
grpcDialOption: grpcDialOption,
writer: writer,
vid: vid,
collection: collection,
}
unionLocation, totalSynced, err := builder.buildUnionReplica(locations, excludeFromSelection)
unionLocation, totalSynced, err := builder.buildUnionReplica(locations, statuses, excludeFromSelection)
if err != nil {
return wdclient.Location{}, fmt.Errorf("failed to build union replica: %w", err)
}

View File

@@ -1,4 +1,4 @@
package shell
package volume_replica
import (
"bytes"
@@ -201,12 +201,12 @@ func TestDeletedEntriesAreSkipped(t *testing.T) {
func TestReplicaUnionBuilder_EmptyLocations(t *testing.T) {
// Test handling of empty locations slice
builder := &replicaUnionBuilder{
builder := &unionBuilder{
writer: &bytes.Buffer{},
vid: 1,
}
_, count, err := builder.buildUnionReplica(nil, "")
_, count, err := builder.buildUnionReplica(nil, nil, "")
if err == nil {
t.Error("Expected error for empty locations")
}

View File

@@ -27,8 +27,8 @@ func NewDefaultConfig() *Config {
ScanIntervalSeconds: 60 * 60, // 1 hour
MaxConcurrent: 1,
},
QuietForSeconds: 300, // 5 minutes
FullnessRatio: 0.8, // 80%
QuietForSeconds: 3600, // 1 hour, matching the shell ec.encode -quietFor default
FullnessRatio: 0.95, // 95%, matching the shell ec.encode -fullPercent default
CollectionFilter: "",
MinSizeMB: 30, // 30MB (more reasonable than 100MB)
PreferredTags: nil,
@@ -87,14 +87,14 @@ func GetConfigSpec() base.ConfigSpec {
Name: "quiet_for_seconds",
JSONName: "quiet_for_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 300,
DefaultValue: 3600,
MinValue: 1,
MaxValue: 3600,
Required: true,
DisplayName: "Quiet Period",
Description: "Minimum time volume must be quiet before erasure coding",
HelpText: "Volume must not be modified for this duration before erasure coding",
Placeholder: "5",
Placeholder: "60",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
@@ -103,7 +103,7 @@ func GetConfigSpec() base.ConfigSpec {
Name: "fullness_ratio",
JSONName: "fullness_ratio",
Type: config.FieldTypeFloat,
DefaultValue: 0.8,
DefaultValue: 0.95,
MinValue: 0.0001,
MaxValue: 1.0,
Required: true,

View File

@@ -52,12 +52,19 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
skippedCollectionFilter := 0
skippedQuietTime := 0
skippedFullness := 0
skippedRemote := 0
skippedTooFewNodes := 0
consecutivePlanningFailures := 0
var planner *ecPlacementPlanner
allowedCollections := wildcard.CompileWildcardMatchers(ecConfig.CollectionFilter)
// Cluster node count for the min-node safety gate (mirrors the shell ec.encode
// guard that refuses to encode when nodes < parity shards, so shards cannot be
// spread for fault tolerance).
clusterNodeCount := countTopologyNodes(clusterInfo.ActiveTopology)
// Group metrics by VolumeID to handle replicas and select canonical server
volumeGroups := make(map[uint32][]*types.VolumeHealthMetrics)
for _, metric := range metrics {
@@ -163,6 +170,21 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
continue
}
// Skip remote/tiered volumes: encoding them would lose the tiering. The
// shell ec.encode excludes remote volumes for the same reason.
if metric.HasRemoteCopy {
skippedRemote++
continue
}
// Min-node safety gate: don't encode when the cluster has fewer nodes than
// this collection's parity shards — shards could not be spread to tolerate
// failures. Mirrors the shell ec.encode guard (node count < parity shards).
if clusterNodeCount > 0 && clusterNodeCount < erasure_coding.ParityShardsCount {
skippedTooFewNodes++
continue
}
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
if ctx != nil {
@@ -373,8 +395,8 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
// Log debug summary if no tasks were created
if len(results) == 0 && len(metrics) > 0 && !stoppedEarly {
totalVolumes := len(metrics)
glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full, %d remote, %d too few nodes)",
totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness, skippedRemote, skippedTooFewNodes)
// Show details for first few volumes
for i, metric := range metrics {
@@ -658,6 +680,25 @@ func (p *ecPlacementPlanner) buildCandidateSets(shardsNeeded int) [][]*placement
// planECDestinations plans the destinations for erasure coding operation.
// dataShards/parityShards are parameters so callers can drive non-10+4 ratios.
// countTopologyNodes counts volume-server nodes in the active topology, used by
// the min-node safety gate.
func countTopologyNodes(at *topology.ActiveTopology) int {
if at == nil {
return 0
}
topo := at.GetTopologyInfo()
if topo == nil {
return 0
}
n := 0
for _, dc := range topo.DataCenterInfos {
for _, rack := range dc.RackInfos {
n += len(rack.DataNodeInfos)
}
}
return n
}
func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthMetrics, ecConfig *Config, dataShards, parityShards int) (*topology.MultiDestinationPlan, error) {
if planner == nil || planner.activeTopology == nil {
return nil, fmt.Errorf("active topology not available for EC placement")

View File

@@ -233,13 +233,13 @@ func buildStuckSourceTopology(t *testing.T, volumeID uint32, presentShardCount i
// criteria (Age, FullnessRatio, Size), with `Age` derived from `LastModified`
// so the two fields stay consistent for any reader.
func buildStuckSourceMetrics(volumeID uint32, server string) []*types.VolumeHealthMetrics {
lastModified := time.Now().Add(-time.Hour)
lastModified := time.Now().Add(-2 * time.Hour)
return []*types.VolumeHealthMetrics{{
VolumeID: volumeID,
Server: server,
Size: 200 * 1024 * 1024,
Collection: "",
FullnessRatio: 0.9,
FullnessRatio: 0.96,
LastModified: lastModified,
Age: time.Since(lastModified),
}}
@@ -482,9 +482,9 @@ func buildVolumeMetricsForIDs(count int) []*types.VolumeHealthMetrics {
Server: "10.0.0.1:8080",
Size: 200 * 1024 * 1024,
Collection: "",
FullnessRatio: 0.9,
LastModified: now.Add(-time.Hour),
Age: 10 * time.Minute,
FullnessRatio: 0.96,
LastModified: now.Add(-2 * time.Hour),
Age: 2 * time.Hour,
})
}
return metrics

View File

@@ -1,6 +1,7 @@
package erasure_coding
import (
"bytes"
"context"
"fmt"
"io"
@@ -19,6 +20,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
storagetypes "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_replica"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
"google.golang.org/grpc"
@@ -35,12 +38,13 @@ type ErasureCodingTask struct {
grpcDialOption grpc.DialOption
// EC parameters
dataShards int32
parityShards int32
sourceDiskType string // source volume's disk type, forwarded to Mount RPC (#9423)
targets []*worker_pb.TaskTarget // Unified targets for EC shards
sources []*worker_pb.TaskSource // Unified sources for cleanup
shardAssignment map[string][]string // destination -> assigned shard types
dataShards int32
parityShards int32
sourceDiskType string // source volume's disk type, forwarded to Mount RPC (#9423)
targets []*worker_pb.TaskTarget // Unified targets for EC shards
sources []*worker_pb.TaskSource // Unified sources for cleanup
shardAssignment map[string][]string // destination -> assigned shard types
readonlyReplicas []pb.ServerAddress // replicas marked readonly, for rollback
}
// NewErasureCodingTask creates a new unified EC task instance
@@ -146,12 +150,22 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
}
}()
// Step 1: Mark volume readonly
// Step 1: Mark all replicas readonly, then reconcile them and select the most
// complete replica as the encode source. Encoding a stale replica and then
// deleting the originals would silently lose entries that exist only on another
// replica; SyncAndSelectBestReplica builds the union onto the best replica first
// (mirrors the shell ec.encode best-replica selection).
t.ReportProgressWithStage(10.0, "Marking volume readonly")
t.GetLogger().Info("Marking volume readonly")
if err := t.markVolumeReadonly(ctx); err != nil {
if err := t.markReplicasReadonly(ctx); err != nil {
// Marking can fail partway; restore the replicas already marked readonly.
t.rollbackReadonly(ctx)
return fmt.Errorf("failed to mark volume readonly: %v", err)
}
if err := t.syncAndSelectSourceReplica(); err != nil {
t.rollbackReadonly(ctx)
return fmt.Errorf("failed to sync and select source replica: %v", err)
}
// Step 2: Copy volume files to worker
// The .idx and .dat are copied as separate network transfers, with .idx
@@ -277,39 +291,87 @@ func (t *ErasureCodingTask) GetProgress() float64 {
// Helper methods for actual EC operations
// markVolumeReadonly marks the volume as readonly on the source server
func (t *ErasureCodingTask) markVolumeReadonly(ctx context.Context) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: t.volumeID,
})
return err
})
// replicaLocations returns the regular (non-EC) volume replica locations from the
// task sources. EC-shard sources carry shard ids; regular replicas do not. Falls
// back to the assigned source server when no replica sources are present.
func (t *ErasureCodingTask) replicaLocations() []wdclient.Location {
var locs []wdclient.Location
for _, s := range t.sources {
if s == nil || len(s.ShardIds) > 0 || s.Node == "" {
continue
}
locs = append(locs, wdclient.Location{Url: s.Node, DataCenter: s.DataCenter})
}
if len(locs) == 0 {
locs = append(locs, wdclient.Location{Url: t.server})
}
return locs
}
// rollbackReadonly is a best-effort rollback of markVolumeReadonly, used when the
// EC task fails before any shards are distributed. Logs but does not return errors.
// Uses a fresh context with timeout since the caller's ctx may already be cancelled.
// markReplicasReadonly marks every regular replica readonly so no writes land
// during encoding, recording them so rollbackReadonly can restore them all.
func (t *ErasureCodingTask) markReplicasReadonly(ctx context.Context) error {
t.readonlyReplicas = t.readonlyReplicas[:0]
for _, loc := range t.replicaLocations() {
addr := loc.ServerAddress()
err := operation.WithVolumeServerClient(false, addr, t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
_, e := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{VolumeId: t.volumeID})
return e
})
if err != nil {
return fmt.Errorf("mark volume %d readonly on %s: %w", t.volumeID, addr, err)
}
t.readonlyReplicas = append(t.readonlyReplicas, addr)
}
return nil
}
// syncAndSelectSourceReplica reconciles the volume's replicas (building the union
// of all live entries onto the most complete one) and switches the encode source
// to that replica, so a stale replica is never the basis of the encode.
func (t *ErasureCodingTask) syncAndSelectSourceReplica() error {
locs := t.replicaLocations()
if len(locs) <= 1 {
return nil // single replica: nothing to reconcile
}
var buf bytes.Buffer
best, err := volume_replica.SyncAndSelectBestReplica(t.grpcDialOption, needle.VolumeId(t.volumeID), t.collection, locs, "", &buf)
if out := strings.TrimSpace(buf.String()); out != "" {
glog.Infof("EC encode replica sync for volume %d:\n%s", t.volumeID, out)
}
if err != nil {
return err
}
if best.Url != "" && best.Url != t.server {
glog.Infof("EC encode: using best replica %s as source for volume %d (was %s)", best.Url, t.volumeID, t.server)
t.server = best.Url
}
return nil
}
// rollbackReadonly is a best-effort restore of every replica markReplicasReadonly
// touched, used when the EC task fails before the originals are deleted. Logs but
// does not return errors; uses a fresh context since the caller's may be cancelled.
func (t *ErasureCodingTask) rollbackReadonly(_ context.Context) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := t.markVolumeWritable(ctx); err != nil {
glog.Warningf("failed to restore volume %d to writable after EC task failure: %v", t.volumeID, err)
} else {
glog.V(0).Infof("restored volume %d to writable after EC task failure", t.volumeID)
servers := t.readonlyReplicas
if len(servers) == 0 {
servers = []pb.ServerAddress{pb.ServerAddress(t.server)}
}
}
// markVolumeWritable restores the volume to writable on the source server.
func (t *ErasureCodingTask) markVolumeWritable(ctx context.Context) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeMarkWritable(ctx, &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: t.volumeID,
for _, addr := range servers {
err := operation.WithVolumeServerClient(false, addr, t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
_, e := client.VolumeMarkWritable(ctx, &volume_server_pb.VolumeMarkWritableRequest{VolumeId: t.volumeID})
return e
})
return err
})
if err != nil {
glog.Warningf("failed to restore volume %d to writable on %s after EC task failure: %v", t.volumeID, addr, err)
} else {
glog.V(0).Infof("restored volume %d to writable on %s after EC task failure", t.volumeID, addr)
}
}
}
// copyVolumeFilesToWorker copies .idx and .dat files from source server to local worker.

View File

@@ -59,7 +59,7 @@ func TestCopyVolumeFilesToWorkerUsesCurrentCompactionRevision(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
require.NoError(t, task.markVolumeReadonly(ctx))
require.NoError(t, task.markReplicasReadonly(ctx))
fileStatus, err := task.readSourceVolumeFileStatus(ctx)
require.NoError(t, err)

View File

@@ -143,10 +143,10 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
@@ -170,10 +170,10 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},

View File

@@ -487,7 +487,9 @@ func (h *VacuumHandler) collectVolumeMetrics(
func deriveVacuumConfig(values map[string]*plugin_pb.ConfigValue) *Config {
config := NewDefaultConfig()
config.GarbageThreshold = pluginworker.ReadDoubleConfig(values, "garbage_threshold", config.GarbageThreshold)
config.MinVolumeAgeSeconds = 0 // plugin worker does not filter by volume age
// Match the shell/master vacuum, which applies no volume-age gate (default 0),
// but honor an explicit override instead of silently dropping it.
config.MinVolumeAgeSeconds = pluginworker.ReadIntConfig(values, "min_volume_age_seconds", 0)
return config
}