diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index ef771de7d..10653cd23 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -120,11 +120,21 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer } entryPath := parentPath.Child(entry.Name) entryChanged := false + // Every successful moveChunk or rewriteManifestChunk leaves the old + // needle sitting on its source volume as a silent orphan — until + // now the source only shrank after a separate volume.fsck + + // volume.vacuum cycle, which is what made #9116 (comment 4282692876) + // look like mergeVolumes hadn't done anything. Track the old fids + // and delete them below after the filer update commits, so the + // filer never points at a fid we already deleted. + var movedSources []movedSourceNeedle for i, chunk := range entry.Chunks { if chunk.IsChunkManifest { - newChunk, changed, mErr := c.rewriteManifestChunk(context.Background(), commandEnv, lookupFn, plan, entryPath, chunk, *apply) + oldManifestFid := chunk.GetFileIdString() + oldManifestVid := chunk.Fid.VolumeId + newChunk, changed, subSources, mErr := c.rewriteManifestChunk(context.Background(), commandEnv, lookupFn, plan, entryPath, chunk, *apply) if mErr != nil { - fmt.Printf("failed to rewrite manifest %s(%s): %v\n", entryPath, chunk.GetFileIdString(), mErr) + fmt.Printf("failed to rewrite manifest %s(%s): %v\n", entryPath, oldManifestFid, mErr) continue } if !changed || !*apply { @@ -132,6 +142,12 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer } entry.Chunks[i] = newChunk entryChanged = true + movedSources = append(movedSources, subSources...) + // The old manifest needle is always orphaned when we + // replace it with a freshly uploaded one, even when the + // rewrite was triggered by sub-chunk moves rather than the + // manifest volume itself being in the plan. + movedSources = append(movedSources, movedSourceNeedle{volumeId: oldManifestVid, fileId: oldManifestFid}) continue } @@ -141,15 +157,18 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer continue } - fmt.Printf("move %s(%s)\n", entryPath, chunk.GetFileIdString()) + oldFid := chunk.GetFileIdString() + oldVid := chunk.Fid.VolumeId + fmt.Printf("move %s(%s)\n", entryPath, oldFid) if !*apply { continue } if mvErr := moveChunk(chunk, toVolumeId, commandEnv.MasterClient); mvErr != nil { - fmt.Printf("failed to move %s(%s): %v\n", entryPath, chunk.GetFileIdString(), mvErr) + fmt.Printf("failed to move %s(%s): %v\n", entryPath, oldFid, mvErr) continue } entryChanged = true + movedSources = append(movedSources, movedSourceNeedle{volumeId: oldVid, fileId: oldFid}) } if entryChanged { if uErr := filer_pb.UpdateEntry(context.Background(), filerClient, &filer_pb.UpdateEntryRequest{ @@ -157,13 +176,63 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer Entry: entry, }); uErr != nil { fmt.Printf("failed to update %s: %v\n", entryPath, uErr) + // Filer still references the source fids. Deleting them + // now would lose data — abandon the cleanup for this + // entry and let fsck reconcile later. + return nil } + c.deleteMovedSourceNeedles(commandEnv, entryPath, movedSources) } return nil }) }) } +// movedSourceNeedle is a needle that was copied out of its source volume by +// a move/rewrite operation and is safe to delete once the filer update that +// re-pointed references to the new location has committed. +type movedSourceNeedle struct { + volumeId uint32 + fileId string +} + +// deleteMovedSourceNeedles fans out BatchDelete RPCs to every replica of each +// source volume. Errors are logged but never returned — the source data is +// already orphan at this point, so a failed cleanup just leaves work for a +// later fsck. Propagating an error here would abort TraverseBfs and strand +// the remaining entries mid-merge, which is strictly worse. +func (c *commandFsMergeVolumes) deleteMovedSourceNeedles(commandEnv *CommandEnv, entryPath util.FullPath, sources []movedSourceNeedle) { + if len(sources) == 0 { + return + } + byVolume := make(map[uint32][]string) + for _, s := range sources { + byVolume[s.volumeId] = append(byVolume[s.volumeId], s.fileId) + } + for vid, fids := range byVolume { + locations, found := commandEnv.MasterClient.GetLocations(vid) + if !found { + fmt.Printf("source cleanup %s: no locations for volume %d\n", entryPath, vid) + continue + } + for _, loc := range locations { + results := operation.DeleteFileIdsAtOneVolumeServer(loc.ServerAddress(), commandEnv.option.GrpcDialOption, fids, false) + for _, r := range results { + // StatusNotModified means the needle was already deleted + // (e.g. a concurrent fsck purge or a replica that had + // already reconciled). That's the desired end state, so + // don't warn about it. Cast to int because r.Status is + // an int32 protobuf field and linters flag the mixed-type + // compare even though Go's untyped-constant rules make it + // valid. + if r.Error != "" && int(r.Status) != http.StatusNotModified { + fmt.Printf("source cleanup %s: delete %s on %v: %s\n", entryPath, r.FileId, loc.ServerAddress(), r.Error) + } + } + } + } +} + func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) { info := c.volumes[vid] var err error @@ -322,8 +391,15 @@ func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.Volume // rewriteManifestChunk walks the sub-chunks referenced by a manifest chunk and // moves any that live in a source volume from the merge plan. If any sub-chunk // moves, or the manifest chunk itself lives in a source volume, the manifest -// blob is re-serialized and uploaded to a freshly assigned file id. The old -// manifest needle becomes orphaned and is later reclaimed by vacuum. +// blob is re-serialized and uploaded to a freshly assigned file id. +// +// The returned movedSourceNeedle slice lists every source needle the caller +// should delete once the filer update commits — sub-chunks that were moved and +// nested manifest chunks that got rewritten. The OUTER manifest needle is the +// caller's responsibility to record, since only the caller knows its pre-move +// fid (this function's own chunk argument still reports the old fid on return, +// but that couples manifest-nesting logic to a fact that is easier to capture +// at the top-level callsite). func (c *commandFsMergeVolumes) rewriteManifestChunk( ctx context.Context, commandEnv *CommandEnv, @@ -332,26 +408,35 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk( entryPath util.FullPath, chunk *filer_pb.FileChunk, apply bool, -) (*filer_pb.FileChunk, bool, error) { +) (*filer_pb.FileChunk, bool, []movedSourceNeedle, error) { if !chunk.IsChunkManifest { - return chunk, false, fmt.Errorf("not a manifest chunk: %s", chunk.GetFileIdString()) + return chunk, false, nil, fmt.Errorf("not a manifest chunk: %s", chunk.GetFileIdString()) } subChunks, err := filer.ResolveOneChunkManifest(ctx, lookupFn, chunk) if err != nil { - return chunk, false, err + return chunk, false, nil, err } + var movedSources []movedSourceNeedle anySubChanged := false for i, sub := range subChunks { if sub.IsChunkManifest { - newSub, changed, rErr := c.rewriteManifestChunk(ctx, commandEnv, lookupFn, plan, entryPath, sub, apply) + oldSubManifestFid := sub.GetFileIdString() + oldSubManifestVid := sub.Fid.VolumeId + newSub, changed, nestedSources, rErr := c.rewriteManifestChunk(ctx, commandEnv, lookupFn, plan, entryPath, sub, apply) if rErr != nil { - return chunk, false, rErr + return chunk, false, nil, rErr } if changed { subChunks[i] = newSub anySubChanged = true + if apply { + movedSources = append(movedSources, nestedSources...) + // Nested manifest got replaced — its old needle is now + // orphan on the same volume it used to live on. + movedSources = append(movedSources, movedSourceNeedle{volumeId: oldSubManifestVid, fileId: oldSubManifestFid}) + } } continue } @@ -360,23 +445,26 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk( if !ok { continue } - fmt.Printf("move %s(%s) [inside manifest %s]\n", entryPath, sub.GetFileIdString(), chunk.GetFileIdString()) + oldSubFid := sub.GetFileIdString() + oldSubVid := sub.Fid.VolumeId + fmt.Printf("move %s(%s) [inside manifest %s]\n", entryPath, oldSubFid, chunk.GetFileIdString()) if !apply { anySubChanged = true continue } if mErr := moveChunk(sub, toVid, commandEnv.MasterClient); mErr != nil { - fmt.Printf("failed to move %s(%s): %v\n", entryPath, sub.GetFileIdString(), mErr) + fmt.Printf("failed to move %s(%s): %v\n", entryPath, oldSubFid, mErr) continue } anySubChanged = true + movedSources = append(movedSources, movedSourceNeedle{volumeId: oldSubVid, fileId: oldSubFid}) } manifestVid := needle.VolumeId(chunk.Fid.VolumeId) _, manifestMustMove := plan[manifestVid] if !anySubChanged && !manifestMustMove { - return chunk, false, nil + return chunk, false, nil, nil } fmt.Printf("rewrite manifest %s(%s)\n", entryPath, chunk.GetFileIdString()) @@ -384,14 +472,14 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk( // Propagate "would change" so nested callers also announce their // rewrites in dry-run mode. The top-level caller gates any actual // filer writes on *apply, so returning true here is safe. - return chunk, true, nil + return chunk, true, nil, nil } filer_pb.BeforeEntrySerialization(subChunks) defer filer_pb.AfterEntryDeserialization(subChunks) data, err := proto.Marshal(&filer_pb.FileChunkManifest{Chunks: subChunks}) if err != nil { - return chunk, false, fmt.Errorf("marshal manifest: %w", err) + return chunk, false, nil, fmt.Errorf("marshal manifest: %w", err) } collection := "" @@ -400,7 +488,7 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk( } newChunk, err := c.uploadManifestChunk(ctx, commandEnv, entryPath, collection, plan, data) if err != nil { - return chunk, false, fmt.Errorf("upload new manifest: %w", err) + return chunk, false, nil, fmt.Errorf("upload new manifest: %w", err) } newChunk.IsChunkManifest = true @@ -411,7 +499,7 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk( } newChunk.FileId = "" - return newChunk, true, nil + return newChunk, true, movedSources, nil } // uploadManifestChunk assigns a fresh file id via the filer and uploads the