From cd15ae13957b727402bc50a809e6f2aa6acf4c2a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 21 May 2026 02:16:28 -0700 Subject: [PATCH] fix(ec): bring ec.encode worker and EC/volume helpers to parity with shell (#9599) * refactor(volume): extract replica sync/select into shared volume_replica package Move the volume replica reconciliation helpers (status, union builder, SyncAndSelectBestReplica, ReadNeedleMeta) out of the shell into a new weed/storage/volume_replica package so both the shell (ec.encode, volume.tier.move, volume.check.disk) and the EC encode worker can reuse them. No behavior change. * fix(ec): bring ec.encode worker to parity with the shell - Sync replicas and encode the most-complete one (via the shared volume_replica.SyncAndSelectBestReplica) instead of a possibly-stale replica, marking all replicas readonly first. Prevents silent data loss when a stale replica is encoded and the originals deleted. - Skip remote/tiered volumes in detection (shell ec.encode excludes them). - Min-node safety gate: refuse to encode when cluster nodes < parity shards. - Align default thresholds with the shell (fullness 0.95, quiet 1h). * fix(vacuum): plugin path honors min_volume_age_seconds override deriveVacuumConfig hard-coded MinVolumeAgeSeconds=0, dropping any configured value. Read it from worker config (default 0, matching the shell/master vacuum which has no age gate) so an explicit override is honored. * address review feedback - config.go: align GetConfigSpec schema defaults (quiet_for_seconds=3600, fullness_ratio=0.95) with the runtime defaults so UI/bootstrap flows match the shell (coderabbitai). - ec_task.go: roll back readonly when markReplicasReadonly fails partway, so already-marked replicas don't stay readonly (coderabbitai). - volume_replica: pass the caller's replica statuses into buildUnionReplica instead of re-fetching them, and skip the per-needle ReadNeedleMeta RPC when the source replica is read-only (gemini-code-assist). * test(plugin_workers/ec): make fixtures eligible under the new defaults The default EC encode thresholds were raised to match the shell (fullness 0.95, quiet 1h), but the plugin-worker integration fixtures still used 90%-full / 10-minute-old volumes, so detection found no eligible volumes and the tests failed in CI. Bump the eligible fixtures to 96% full and 2h old. --- .../erasure_coding/detection_test.go | 6 +- .../erasure_coding/large_topology_test.go | 6 +- weed/shell/command_ec_encode.go | 3 +- weed/shell/command_volume_check_disk.go | 3 +- weed/shell/command_volume_tier_move.go | 3 +- weed/shell/commands.go | 17 --- .../volume_replica/replica_sync.go} | 119 ++++++++-------- .../volume_replica/replica_sync_test.go} | 6 +- weed/worker/tasks/erasure_coding/config.go | 10 +- weed/worker/tasks/erasure_coding/detection.go | 45 +++++- .../tasks/erasure_coding/detection_test.go | 10 +- weed/worker/tasks/erasure_coding/ec_task.go | 130 +++++++++++++----- .../tasks/erasure_coding/ec_task_test.go | 2 +- .../tasks/erasure_coding/plugin_handler.go | 8 +- weed/worker/tasks/vacuum/plugin_handler.go | 4 +- 15 files changed, 238 insertions(+), 134 deletions(-) rename weed/{shell/command_volume_replica_check.go => storage/volume_replica/replica_sync.go} (69%) rename weed/{shell/command_volume_replica_check_test.go => storage/volume_replica/replica_sync_test.go} (98%) diff --git a/test/plugin_workers/erasure_coding/detection_test.go b/test/plugin_workers/erasure_coding/detection_test.go index 2fe27eda3..d41e1ea00 100644 --- a/test/plugin_workers/erasure_coding/detection_test.go +++ b/test/plugin_workers/erasure_coding/detection_test.go @@ -207,8 +207,10 @@ func buildVolumeListResponse(t *testing.T, spec topologySpec, volumeID uint32) * t.Helper() volumeSizeLimitMB := uint64(100) - volumeSize := uint64(90) * 1024 * 1024 - volumeModifiedAt := time.Now().Add(-10 * time.Minute).Unix() + // Exceed the default fullness (0.95) and quiet (1h) thresholds so volumes are + // EC-eligible. + volumeSize := uint64(96) * 1024 * 1024 + volumeModifiedAt := time.Now().Add(-2 * time.Hour).Unix() diskTypes := spec.diskTypes if len(diskTypes) == 0 { diff --git a/test/plugin_workers/erasure_coding/large_topology_test.go b/test/plugin_workers/erasure_coding/large_topology_test.go index 504602870..bf108b3c0 100644 --- a/test/plugin_workers/erasure_coding/large_topology_test.go +++ b/test/plugin_workers/erasure_coding/large_topology_test.go @@ -29,9 +29,11 @@ func TestErasureCodingDetectionLargeTopology(t *testing.T) { } nodesPerRack := serverCount / rackCount - eligibleSize := uint64(90) * 1024 * 1024 + // Eligible volumes must exceed the default fullness (0.95) and quiet (1h) + // thresholds; ineligible ones fall below the fullness threshold. + eligibleSize := uint64(96) * 1024 * 1024 ineligibleSize := uint64(10) * 1024 * 1024 - modifiedAt := time.Now().Add(-10 * time.Minute).Unix() + modifiedAt := time.Now().Add(-2 * time.Hour).Unix() volumeID := uint32(1) dataCenters := make([]*master_pb.DataCenterInfo, 0, 1) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 6d6d9eb3d..a81ef2650 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -22,6 +22,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_replica" ) func init() { @@ -295,7 +296,7 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m collection := volumeIdToCollection[vid] // Sync missing entries between replicas, then select the best one - bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocations[vid], "", writer) + bestLoc, selectErr := volume_replica.SyncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocations[vid], "", writer) if selectErr != nil { return fmt.Errorf("failed to sync and select replica for volume %d: %v", vid, selectErr) } diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 4d775000f..af5b806a3 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -22,6 +22,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/server/constants" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_replica" "google.golang.org/grpc" ) @@ -521,7 +522,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me return nil } if doCutoffOfLastNeedle { - if needleMeta, err := readNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { + if needleMeta, err := volume_replica.ReadNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { // needles older than the cutoff time are not missing yet if needleMeta.AppendAtNs > cutoffFromAtNs { return nil diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 222354f7a..2c1386756 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -16,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_replica" ) func init() { @@ -221,7 +222,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer // Sync replicas and select the best one (with highest file count) for multi-replica volumes // This addresses data inconsistency risk in multi-replica volumes (issue #7797) // by syncing missing entries between replicas before moving - sourceLoc, selectErr := syncAndSelectBestReplica( + sourceLoc, selectErr := volume_replica.SyncAndSelectBestReplica( commandEnv.option.GrpcDialOption, vid, collection, locations, dst.dataNode.Id, writer) if selectErr != nil { fmt.Fprintf(writer, "failed to sync and select source replica for volume %d: %v\n", vid, selectErr) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 9973ee650..5cc7e704f 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -179,23 +179,6 @@ func handleHelpRequest(c command, args []string, writer io.Writer) bool { return false } -func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) { - err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ - VolumeId: volumeId, - NeedleId: uint64(needleValue.Key), - Offset: needleValue.Offset.ToActualOffset(), - Size: int32(needleValue.Size), - }); err != nil { - return err - } - return nil - }, - ) - return -} - func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) { err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { diff --git a/weed/shell/command_volume_replica_check.go b/weed/storage/volume_replica/replica_sync.go similarity index 69% rename from weed/shell/command_volume_replica_check.go rename to weed/storage/volume_replica/replica_sync.go index cbc18e7f7..3997569c8 100644 --- a/weed/shell/command_volume_replica_check.go +++ b/weed/storage/volume_replica/replica_sync.go @@ -1,4 +1,9 @@ -package shell +// Package volume_replica reconciles regular (non-EC) volume replicas: it reads +// per-replica status, builds the union of all live entries onto the most-complete +// replica, and returns that replica. It is shared by the shell (volume.tier.move, +// ec.encode, replica check) and the EC encode worker so a stale replica is never +// used as the basis of an operation. +package volume_replica import ( "bytes" @@ -19,8 +24,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" ) -// VolumeReplicaStatus represents the status of a volume replica -type VolumeReplicaStatus struct { +// ReplicaStatus is the observed state of one volume replica. +type ReplicaStatus struct { Location wdclient.Location FileCount uint64 FileDeletedCount uint64 @@ -29,12 +34,9 @@ type VolumeReplicaStatus struct { Error error } -// getVolumeReplicaStatus retrieves the current status of a volume replica -func getVolumeReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location) VolumeReplicaStatus { - status := VolumeReplicaStatus{ - Location: location, - } - +// GetReplicaStatus retrieves the current status of a single volume replica. +func GetReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location) ReplicaStatus { + status := ReplicaStatus{Location: location} err := operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ VolumeId: uint32(vid), @@ -54,34 +56,52 @@ func getVolumeReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId, return status } -// getVolumeReplicaStatuses retrieves status for all replicas of a volume in parallel -func getVolumeReplicaStatuses(grpcDialOption grpc.DialOption, vid needle.VolumeId, locations []wdclient.Location) []VolumeReplicaStatus { - statuses := make([]VolumeReplicaStatus, len(locations)) +// GetReplicaStatuses retrieves status for all replicas of a volume in parallel. +func GetReplicaStatuses(grpcDialOption grpc.DialOption, vid needle.VolumeId, locations []wdclient.Location) []ReplicaStatus { + statuses := make([]ReplicaStatus, len(locations)) var wg sync.WaitGroup for i, location := range locations { wg.Add(1) go func(i int, location wdclient.Location) { defer wg.Done() - statuses[i] = getVolumeReplicaStatus(grpcDialOption, vid, location) + statuses[i] = GetReplicaStatus(grpcDialOption, vid, location) }(i, location) } wg.Wait() return statuses } -// replicaUnionBuilder builds a union replica by copying missing entries from other replicas -type replicaUnionBuilder struct { +// ReadNeedleMeta reads a needle's metadata (e.g. AppendAtNs) from a volume server. +func ReadNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) { + err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ + VolumeId: volumeId, + NeedleId: uint64(needleValue.Key), + Offset: needleValue.Offset.ToActualOffset(), + Size: int32(needleValue.Size), + }); err != nil { + return err + } + return nil + }, + ) + return +} + +// unionBuilder builds a union replica by copying missing entries from other replicas. +type unionBuilder struct { grpcDialOption grpc.DialOption writer io.Writer vid needle.VolumeId collection string } -// buildUnionReplica finds the largest replica and copies missing entries from other replicas into it. -// If excludeFromSelection is non-empty, that server won't be selected as the target but will still -// be used as a source for missing entries. +// buildUnionReplica finds the largest replica and copies missing entries from other +// replicas into it. If excludeFromSelection is non-empty, that server won't be +// selected as the target but will still be used as a source for missing entries. // Returns the location of the union replica (the one that now has all entries). -func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, excludeFromSelection string) (wdclient.Location, int, error) { +func (rub *unionBuilder) buildUnionReplica(locations []wdclient.Location, statuses []ReplicaStatus, excludeFromSelection string) (wdclient.Location, int, error) { if len(locations) == 0 { return wdclient.Location{}, 0, fmt.Errorf("no replicas available") } @@ -92,9 +112,8 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, return locations[0], 0, nil } - // Step 1: Find the largest replica (highest file count) that's not excluded - statuses := getVolumeReplicaStatuses(rub.grpcDialOption, rub.vid, locations) - + // Step 1: Find the largest replica (highest file count) that's not excluded. + // statuses are supplied by the caller to avoid a redundant status RPC sweep. bestIdx := -1 var bestFileCount uint64 for i, s := range statuses { @@ -138,7 +157,6 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, continue } - // Read this replica's index otherDB := needle_map.NewMemDb() if otherDB == nil { fmt.Fprintf(rub.writer, " skipping %s: failed to allocate DB\n", loc.Url) @@ -151,19 +169,21 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, continue } - // Find entries in other that are missing from best var missingNeedles []needle_map.NeedleValue - otherDB.AscendingVisit(func(nv needle_map.NeedleValue) error { if nv.Size.IsDeleted() { return nil } if _, found := bestDB.Get(nv.Key); !found { - // Check if this entry was written too recently (after sync started) - // Skip entries written after sync started to avoid copying in-flight writes - if needleMeta, err := readNeedleMeta(rub.grpcDialOption, loc.ServerAddress(), uint32(rub.vid), nv); err == nil { - if needleMeta.AppendAtNs > cutoffFromAtNs { - return nil // Skip entries written after sync started + // Skip entries written after sync started to avoid copying in-flight + // writes. A read-only replica accepts no new writes, so the per-needle + // metadata RPC is unnecessary then (ec.encode/tier.move mark readonly + // before syncing). + if !statuses[i].IsReadOnly { + if needleMeta, err := ReadNeedleMeta(rub.grpcDialOption, loc.ServerAddress(), uint32(rub.vid), nv); err == nil { + if needleMeta.AppendAtNs > cutoffFromAtNs { + return nil + } } } missingNeedles = append(missingNeedles, nv) @@ -176,7 +196,6 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, continue } - // Copy missing entries from this replica to best replica syncedFromThis := 0 for _, nv := range missingNeedles { needleBlob, err := rub.readNeedleBlob(loc.ServerAddress(), nv) @@ -190,7 +209,6 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, continue } - // Also add to bestDB so we don't copy duplicates from other replicas bestDB.Set(nv.Key, nv.Offset, nv.Size) syncedFromThis++ } @@ -205,7 +223,7 @@ func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, return bestLocation, totalSynced, nil } -func (rub *replicaUnionBuilder) readIndexDatabase(db *needle_map.MemDb, server pb.ServerAddress) error { +func (rub *unionBuilder) readIndexDatabase(db *needle_map.MemDb, server pb.ServerAddress) error { var buf bytes.Buffer err := operation.WithVolumeServerClient(true, server, rub.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -241,7 +259,7 @@ func (rub *replicaUnionBuilder) readIndexDatabase(db *needle_map.MemDb, server p return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } -func (rub *replicaUnionBuilder) readNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue) ([]byte, error) { +func (rub *unionBuilder) readNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue) ([]byte, error) { var needleBlob []byte err := operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ @@ -258,7 +276,7 @@ func (rub *replicaUnionBuilder) readNeedleBlob(server pb.ServerAddress, nv needl return needleBlob, err } -func (rub *replicaUnionBuilder) writeNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue, needleBlob []byte) error { +func (rub *unionBuilder) writeNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue, needleBlob []byte) error { return operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: uint32(rub.vid), @@ -270,20 +288,15 @@ func (rub *replicaUnionBuilder) writeNeedleBlob(server pb.ServerAddress, nv need }) } -// syncAndSelectBestReplica finds the largest replica, copies missing entries from other replicas -// into it to create a union, then returns this union replica for the operation. -// If excludeFromSelection is non-empty, that server won't be selected but will still contribute entries. -// -// The process: -// 1. Find the replica with the highest file count (the "best" one), excluding excludeFromSelection -// 2. For each other replica, find entries missing from best and copy them to best -// 3. Return the best replica which now contains the union of all entries -func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, locations []wdclient.Location, excludeFromSelection string, writer io.Writer) (wdclient.Location, error) { +// SyncAndSelectBestReplica finds the largest replica, copies missing entries from +// other replicas into it to create a union, then returns this union replica for the +// operation. If excludeFromSelection is non-empty, that server won't be selected +// but will still contribute entries. Already-consistent replica sets skip the sync. +func SyncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, locations []wdclient.Location, excludeFromSelection string, writer io.Writer) (wdclient.Location, error) { if len(locations) == 0 { return wdclient.Location{}, fmt.Errorf("no replicas available for volume %d", vid) } - // Filter for checking consistency (exclude the excluded server) var checkLocations []wdclient.Location for _, loc := range locations { if loc.Url != excludeFromSelection { @@ -299,14 +312,12 @@ func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeI return checkLocations[0], nil } - // Check if replicas are already consistent (skip sync if so) - statuses := getVolumeReplicaStatuses(grpcDialOption, vid, locations) - var validStatuses []VolumeReplicaStatus - for i, s := range statuses { + // Check if replicas are already consistent (skip sync if so). + statuses := GetReplicaStatuses(grpcDialOption, vid, locations) + var validStatuses []ReplicaStatus + for _, s := range statuses { if s.Error == nil { - // Include all for consistency check validStatuses = append(validStatuses, s) - _ = i } } @@ -319,7 +330,6 @@ func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeI } } if allSame { - // All replicas are consistent, return the best non-excluded one for _, s := range validStatuses { if s.Location.Url != excludeFromSelection { fmt.Fprintf(writer, "volume %d: all %d replicas are consistent (file count: %d)\n", @@ -330,17 +340,16 @@ func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeI } } - // Replicas are inconsistent, build union on the best replica fmt.Fprintf(writer, "volume %d: replicas are inconsistent, building union...\n", vid) - builder := &replicaUnionBuilder{ + builder := &unionBuilder{ grpcDialOption: grpcDialOption, writer: writer, vid: vid, collection: collection, } - unionLocation, totalSynced, err := builder.buildUnionReplica(locations, excludeFromSelection) + unionLocation, totalSynced, err := builder.buildUnionReplica(locations, statuses, excludeFromSelection) if err != nil { return wdclient.Location{}, fmt.Errorf("failed to build union replica: %w", err) } diff --git a/weed/shell/command_volume_replica_check_test.go b/weed/storage/volume_replica/replica_sync_test.go similarity index 98% rename from weed/shell/command_volume_replica_check_test.go rename to weed/storage/volume_replica/replica_sync_test.go index fde9d22f7..1d734d9af 100644 --- a/weed/shell/command_volume_replica_check_test.go +++ b/weed/storage/volume_replica/replica_sync_test.go @@ -1,4 +1,4 @@ -package shell +package volume_replica import ( "bytes" @@ -201,12 +201,12 @@ func TestDeletedEntriesAreSkipped(t *testing.T) { func TestReplicaUnionBuilder_EmptyLocations(t *testing.T) { // Test handling of empty locations slice - builder := &replicaUnionBuilder{ + builder := &unionBuilder{ writer: &bytes.Buffer{}, vid: 1, } - _, count, err := builder.buildUnionReplica(nil, "") + _, count, err := builder.buildUnionReplica(nil, nil, "") if err == nil { t.Error("Expected error for empty locations") } diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go index e9dce5cbd..9d03fdaf3 100644 --- a/weed/worker/tasks/erasure_coding/config.go +++ b/weed/worker/tasks/erasure_coding/config.go @@ -27,8 +27,8 @@ func NewDefaultConfig() *Config { ScanIntervalSeconds: 60 * 60, // 1 hour MaxConcurrent: 1, }, - QuietForSeconds: 300, // 5 minutes - FullnessRatio: 0.8, // 80% + QuietForSeconds: 3600, // 1 hour, matching the shell ec.encode -quietFor default + FullnessRatio: 0.95, // 95%, matching the shell ec.encode -fullPercent default CollectionFilter: "", MinSizeMB: 30, // 30MB (more reasonable than 100MB) PreferredTags: nil, @@ -87,14 +87,14 @@ func GetConfigSpec() base.ConfigSpec { Name: "quiet_for_seconds", JSONName: "quiet_for_seconds", Type: config.FieldTypeInterval, - DefaultValue: 300, + DefaultValue: 3600, MinValue: 1, MaxValue: 3600, Required: true, DisplayName: "Quiet Period", Description: "Minimum time volume must be quiet before erasure coding", HelpText: "Volume must not be modified for this duration before erasure coding", - Placeholder: "5", + Placeholder: "60", Unit: config.UnitMinutes, InputType: "interval", CSSClasses: "form-control", @@ -103,7 +103,7 @@ func GetConfigSpec() base.ConfigSpec { Name: "fullness_ratio", JSONName: "fullness_ratio", Type: config.FieldTypeFloat, - DefaultValue: 0.8, + DefaultValue: 0.95, MinValue: 0.0001, MaxValue: 1.0, Required: true, diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index a9134022c..9b70e7ffc 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -52,12 +52,19 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste skippedCollectionFilter := 0 skippedQuietTime := 0 skippedFullness := 0 + skippedRemote := 0 + skippedTooFewNodes := 0 consecutivePlanningFailures := 0 var planner *ecPlacementPlanner allowedCollections := wildcard.CompileWildcardMatchers(ecConfig.CollectionFilter) + // Cluster node count for the min-node safety gate (mirrors the shell ec.encode + // guard that refuses to encode when nodes < parity shards, so shards cannot be + // spread for fault tolerance). + clusterNodeCount := countTopologyNodes(clusterInfo.ActiveTopology) + // Group metrics by VolumeID to handle replicas and select canonical server volumeGroups := make(map[uint32][]*types.VolumeHealthMetrics) for _, metric := range metrics { @@ -163,6 +170,21 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste continue } + // Skip remote/tiered volumes: encoding them would lose the tiering. The + // shell ec.encode excludes remote volumes for the same reason. + if metric.HasRemoteCopy { + skippedRemote++ + continue + } + + // Min-node safety gate: don't encode when the cluster has fewer nodes than + // this collection's parity shards — shards could not be spread to tolerate + // failures. Mirrors the shell ec.encode guard (node count < parity shards). + if clusterNodeCount > 0 && clusterNodeCount < erasure_coding.ParityShardsCount { + skippedTooFewNodes++ + continue + } + // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { if ctx != nil { @@ -373,8 +395,8 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste // Log debug summary if no tasks were created if len(results) == 0 && len(metrics) > 0 && !stoppedEarly { totalVolumes := len(metrics) - glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", - totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness) + glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full, %d remote, %d too few nodes)", + totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness, skippedRemote, skippedTooFewNodes) // Show details for first few volumes for i, metric := range metrics { @@ -658,6 +680,25 @@ func (p *ecPlacementPlanner) buildCandidateSets(shardsNeeded int) [][]*placement // planECDestinations plans the destinations for erasure coding operation. // dataShards/parityShards are parameters so callers can drive non-10+4 ratios. +// countTopologyNodes counts volume-server nodes in the active topology, used by +// the min-node safety gate. +func countTopologyNodes(at *topology.ActiveTopology) int { + if at == nil { + return 0 + } + topo := at.GetTopologyInfo() + if topo == nil { + return 0 + } + n := 0 + for _, dc := range topo.DataCenterInfos { + for _, rack := range dc.RackInfos { + n += len(rack.DataNodeInfos) + } + } + return n +} + func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthMetrics, ecConfig *Config, dataShards, parityShards int) (*topology.MultiDestinationPlan, error) { if planner == nil || planner.activeTopology == nil { return nil, fmt.Errorf("active topology not available for EC placement") diff --git a/weed/worker/tasks/erasure_coding/detection_test.go b/weed/worker/tasks/erasure_coding/detection_test.go index 11e5676f4..df2e2edaa 100644 --- a/weed/worker/tasks/erasure_coding/detection_test.go +++ b/weed/worker/tasks/erasure_coding/detection_test.go @@ -233,13 +233,13 @@ func buildStuckSourceTopology(t *testing.T, volumeID uint32, presentShardCount i // criteria (Age, FullnessRatio, Size), with `Age` derived from `LastModified` // so the two fields stay consistent for any reader. func buildStuckSourceMetrics(volumeID uint32, server string) []*types.VolumeHealthMetrics { - lastModified := time.Now().Add(-time.Hour) + lastModified := time.Now().Add(-2 * time.Hour) return []*types.VolumeHealthMetrics{{ VolumeID: volumeID, Server: server, Size: 200 * 1024 * 1024, Collection: "", - FullnessRatio: 0.9, + FullnessRatio: 0.96, LastModified: lastModified, Age: time.Since(lastModified), }} @@ -482,9 +482,9 @@ func buildVolumeMetricsForIDs(count int) []*types.VolumeHealthMetrics { Server: "10.0.0.1:8080", Size: 200 * 1024 * 1024, Collection: "", - FullnessRatio: 0.9, - LastModified: now.Add(-time.Hour), - Age: 10 * time.Minute, + FullnessRatio: 0.96, + LastModified: now.Add(-2 * time.Hour), + Age: 2 * time.Hour, }) } return metrics diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index 5c80e6712..89ca6830c 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -1,6 +1,7 @@ package erasure_coding import ( + "bytes" "context" "fmt" "io" @@ -19,6 +20,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" storagetypes "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_replica" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/seaweedfs/seaweedfs/weed/worker/types/base" "google.golang.org/grpc" @@ -35,12 +38,13 @@ type ErasureCodingTask struct { grpcDialOption grpc.DialOption // EC parameters - dataShards int32 - parityShards int32 - sourceDiskType string // source volume's disk type, forwarded to Mount RPC (#9423) - targets []*worker_pb.TaskTarget // Unified targets for EC shards - sources []*worker_pb.TaskSource // Unified sources for cleanup - shardAssignment map[string][]string // destination -> assigned shard types + dataShards int32 + parityShards int32 + sourceDiskType string // source volume's disk type, forwarded to Mount RPC (#9423) + targets []*worker_pb.TaskTarget // Unified targets for EC shards + sources []*worker_pb.TaskSource // Unified sources for cleanup + shardAssignment map[string][]string // destination -> assigned shard types + readonlyReplicas []pb.ServerAddress // replicas marked readonly, for rollback } // NewErasureCodingTask creates a new unified EC task instance @@ -146,12 +150,22 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP } }() - // Step 1: Mark volume readonly + // Step 1: Mark all replicas readonly, then reconcile them and select the most + // complete replica as the encode source. Encoding a stale replica and then + // deleting the originals would silently lose entries that exist only on another + // replica; SyncAndSelectBestReplica builds the union onto the best replica first + // (mirrors the shell ec.encode best-replica selection). t.ReportProgressWithStage(10.0, "Marking volume readonly") t.GetLogger().Info("Marking volume readonly") - if err := t.markVolumeReadonly(ctx); err != nil { + if err := t.markReplicasReadonly(ctx); err != nil { + // Marking can fail partway; restore the replicas already marked readonly. + t.rollbackReadonly(ctx) return fmt.Errorf("failed to mark volume readonly: %v", err) } + if err := t.syncAndSelectSourceReplica(); err != nil { + t.rollbackReadonly(ctx) + return fmt.Errorf("failed to sync and select source replica: %v", err) + } // Step 2: Copy volume files to worker // The .idx and .dat are copied as separate network transfers, with .idx @@ -277,39 +291,87 @@ func (t *ErasureCodingTask) GetProgress() float64 { // Helper methods for actual EC operations -// markVolumeReadonly marks the volume as readonly on the source server -func (t *ErasureCodingTask) markVolumeReadonly(ctx context.Context) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ - VolumeId: t.volumeID, - }) - return err - }) +// replicaLocations returns the regular (non-EC) volume replica locations from the +// task sources. EC-shard sources carry shard ids; regular replicas do not. Falls +// back to the assigned source server when no replica sources are present. +func (t *ErasureCodingTask) replicaLocations() []wdclient.Location { + var locs []wdclient.Location + for _, s := range t.sources { + if s == nil || len(s.ShardIds) > 0 || s.Node == "" { + continue + } + locs = append(locs, wdclient.Location{Url: s.Node, DataCenter: s.DataCenter}) + } + if len(locs) == 0 { + locs = append(locs, wdclient.Location{Url: t.server}) + } + return locs } -// rollbackReadonly is a best-effort rollback of markVolumeReadonly, used when the -// EC task fails before any shards are distributed. Logs but does not return errors. -// Uses a fresh context with timeout since the caller's ctx may already be cancelled. +// markReplicasReadonly marks every regular replica readonly so no writes land +// during encoding, recording them so rollbackReadonly can restore them all. +func (t *ErasureCodingTask) markReplicasReadonly(ctx context.Context) error { + t.readonlyReplicas = t.readonlyReplicas[:0] + for _, loc := range t.replicaLocations() { + addr := loc.ServerAddress() + err := operation.WithVolumeServerClient(false, addr, t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, e := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{VolumeId: t.volumeID}) + return e + }) + if err != nil { + return fmt.Errorf("mark volume %d readonly on %s: %w", t.volumeID, addr, err) + } + t.readonlyReplicas = append(t.readonlyReplicas, addr) + } + return nil +} + +// syncAndSelectSourceReplica reconciles the volume's replicas (building the union +// of all live entries onto the most complete one) and switches the encode source +// to that replica, so a stale replica is never the basis of the encode. +func (t *ErasureCodingTask) syncAndSelectSourceReplica() error { + locs := t.replicaLocations() + if len(locs) <= 1 { + return nil // single replica: nothing to reconcile + } + var buf bytes.Buffer + best, err := volume_replica.SyncAndSelectBestReplica(t.grpcDialOption, needle.VolumeId(t.volumeID), t.collection, locs, "", &buf) + if out := strings.TrimSpace(buf.String()); out != "" { + glog.Infof("EC encode replica sync for volume %d:\n%s", t.volumeID, out) + } + if err != nil { + return err + } + if best.Url != "" && best.Url != t.server { + glog.Infof("EC encode: using best replica %s as source for volume %d (was %s)", best.Url, t.volumeID, t.server) + t.server = best.Url + } + return nil +} + +// rollbackReadonly is a best-effort restore of every replica markReplicasReadonly +// touched, used when the EC task fails before the originals are deleted. Logs but +// does not return errors; uses a fresh context since the caller's may be cancelled. func (t *ErasureCodingTask) rollbackReadonly(_ context.Context) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if err := t.markVolumeWritable(ctx); err != nil { - glog.Warningf("failed to restore volume %d to writable after EC task failure: %v", t.volumeID, err) - } else { - glog.V(0).Infof("restored volume %d to writable after EC task failure", t.volumeID) + servers := t.readonlyReplicas + if len(servers) == 0 { + servers = []pb.ServerAddress{pb.ServerAddress(t.server)} } -} - -// markVolumeWritable restores the volume to writable on the source server. -func (t *ErasureCodingTask) markVolumeWritable(ctx context.Context) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMarkWritable(ctx, &volume_server_pb.VolumeMarkWritableRequest{ - VolumeId: t.volumeID, + for _, addr := range servers { + err := operation.WithVolumeServerClient(false, addr, t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, e := client.VolumeMarkWritable(ctx, &volume_server_pb.VolumeMarkWritableRequest{VolumeId: t.volumeID}) + return e }) - return err - }) + if err != nil { + glog.Warningf("failed to restore volume %d to writable on %s after EC task failure: %v", t.volumeID, addr, err) + } else { + glog.V(0).Infof("restored volume %d to writable on %s after EC task failure", t.volumeID, addr) + } + } } // copyVolumeFilesToWorker copies .idx and .dat files from source server to local worker. diff --git a/weed/worker/tasks/erasure_coding/ec_task_test.go b/weed/worker/tasks/erasure_coding/ec_task_test.go index a6a4ab5d6..ff2ec755c 100644 --- a/weed/worker/tasks/erasure_coding/ec_task_test.go +++ b/weed/worker/tasks/erasure_coding/ec_task_test.go @@ -59,7 +59,7 @@ func TestCopyVolumeFilesToWorkerUsesCurrentCompactionRevision(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - require.NoError(t, task.markVolumeReadonly(ctx)) + require.NoError(t, task.markReplicasReadonly(ctx)) fileStatus, err := task.readSourceVolumeFileStatus(ctx) require.NoError(t, err) diff --git a/weed/worker/tasks/erasure_coding/plugin_handler.go b/weed/worker/tasks/erasure_coding/plugin_handler.go index f29026bf1..ef1991bb5 100644 --- a/weed/worker/tasks/erasure_coding/plugin_handler.go +++ b/weed/worker/tasks/erasure_coding/plugin_handler.go @@ -143,10 +143,10 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor { }, DefaultValues: map[string]*plugin_pb.ConfigValue{ "quiet_for_seconds": { - Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300}, + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600}, }, "fullness_ratio": { - Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8}, + Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95}, }, "min_size_mb": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30}, @@ -170,10 +170,10 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor { }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "quiet_for_seconds": { - Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300}, + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600}, }, "fullness_ratio": { - Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8}, + Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95}, }, "min_size_mb": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30}, diff --git a/weed/worker/tasks/vacuum/plugin_handler.go b/weed/worker/tasks/vacuum/plugin_handler.go index c533ddcc2..e753e9f15 100644 --- a/weed/worker/tasks/vacuum/plugin_handler.go +++ b/weed/worker/tasks/vacuum/plugin_handler.go @@ -487,7 +487,9 @@ func (h *VacuumHandler) collectVolumeMetrics( func deriveVacuumConfig(values map[string]*plugin_pb.ConfigValue) *Config { config := NewDefaultConfig() config.GarbageThreshold = pluginworker.ReadDoubleConfig(values, "garbage_threshold", config.GarbageThreshold) - config.MinVolumeAgeSeconds = 0 // plugin worker does not filter by volume age + // Match the shell/master vacuum, which applies no volume-age gate (default 0), + // but honor an explicit override instead of silently dropping it. + config.MinVolumeAgeSeconds = pluginworker.ReadIntConfig(values, "min_volume_age_seconds", 0) return config }