mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-25 03:01:47 +00:00
feat(shell): fs.mergeVolumes deletes source needles after filer update (#9160)
* feat(shell): fs.mergeVolumes deletes source needles after filer update Before this change, mergeVolumes only copied chunks to the destination volume and updated the filer — the source needle sat untouched on its original volume as a silent orphan. Operators had to run a separate volume.fsck + volume.vacuum pass to actually reclaim the space, and #9116 (comment 4282692876) showed how that pipeline can look exactly like "mergeVolumes did nothing": the source volume keeps reporting its original size even though every chunk has been logically moved out. Clean up the source inline. For each entry, track the pre-move fids as they're captured, and after the UpdateEntry RPC commits, issue BatchDelete on every replica of each source volume. Key invariants: - Source fids are only deleted AFTER UpdateEntry succeeds; if the filer write fails we skip the cleanup for that entry so we never delete data the filer still references. - rewriteManifestChunk grew a fourth return value so nested manifest and sub-chunk moves propagate their moved-source list back to the top-level callsite. The outer manifest itself is recorded at the callsite, since only the callsite sees the pre-rewrite fid. - deleteMovedSourceNeedles logs errors but never returns them. Propagating would abort TraverseBfs mid-merge, stranding remaining entries; logging leaves the fallback path (fsck reconciles later) intact. - StatusNotModified from the volume server is expected whenever a concurrent fsck purge beat us to the delete or a replica already reconciled — don't warn on it. Readonly source volumes are already rejected up front by createMergePlan, so by the time we reach the delete the source is writable. If a replica's readonly bit has flipped since then the delete will fail and get logged; the user can re-run once they've fixed the replica (same failure mode as today's fsck purge). Fixes the space-not-reclaimed half of #9116. Related design discussion: #8589. * address review: cast r.Status to int in StatusNotModified compare http.StatusNotModified is an untyped constant so the compare works as written, but the int32/int mixed-type signal trips static analyzers and PR tooling. Cast explicitly and note why.
This commit is contained in:
@@ -120,11 +120,21 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
|
||||
}
|
||||
entryPath := parentPath.Child(entry.Name)
|
||||
entryChanged := false
|
||||
// Every successful moveChunk or rewriteManifestChunk leaves the old
|
||||
// needle sitting on its source volume as a silent orphan — until
|
||||
// now the source only shrank after a separate volume.fsck +
|
||||
// volume.vacuum cycle, which is what made #9116 (comment 4282692876)
|
||||
// look like mergeVolumes hadn't done anything. Track the old fids
|
||||
// and delete them below after the filer update commits, so the
|
||||
// filer never points at a fid we already deleted.
|
||||
var movedSources []movedSourceNeedle
|
||||
for i, chunk := range entry.Chunks {
|
||||
if chunk.IsChunkManifest {
|
||||
newChunk, changed, mErr := c.rewriteManifestChunk(context.Background(), commandEnv, lookupFn, plan, entryPath, chunk, *apply)
|
||||
oldManifestFid := chunk.GetFileIdString()
|
||||
oldManifestVid := chunk.Fid.VolumeId
|
||||
newChunk, changed, subSources, mErr := c.rewriteManifestChunk(context.Background(), commandEnv, lookupFn, plan, entryPath, chunk, *apply)
|
||||
if mErr != nil {
|
||||
fmt.Printf("failed to rewrite manifest %s(%s): %v\n", entryPath, chunk.GetFileIdString(), mErr)
|
||||
fmt.Printf("failed to rewrite manifest %s(%s): %v\n", entryPath, oldManifestFid, mErr)
|
||||
continue
|
||||
}
|
||||
if !changed || !*apply {
|
||||
@@ -132,6 +142,12 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
|
||||
}
|
||||
entry.Chunks[i] = newChunk
|
||||
entryChanged = true
|
||||
movedSources = append(movedSources, subSources...)
|
||||
// The old manifest needle is always orphaned when we
|
||||
// replace it with a freshly uploaded one, even when the
|
||||
// rewrite was triggered by sub-chunk moves rather than the
|
||||
// manifest volume itself being in the plan.
|
||||
movedSources = append(movedSources, movedSourceNeedle{volumeId: oldManifestVid, fileId: oldManifestFid})
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -141,15 +157,18 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Printf("move %s(%s)\n", entryPath, chunk.GetFileIdString())
|
||||
oldFid := chunk.GetFileIdString()
|
||||
oldVid := chunk.Fid.VolumeId
|
||||
fmt.Printf("move %s(%s)\n", entryPath, oldFid)
|
||||
if !*apply {
|
||||
continue
|
||||
}
|
||||
if mvErr := moveChunk(chunk, toVolumeId, commandEnv.MasterClient); mvErr != nil {
|
||||
fmt.Printf("failed to move %s(%s): %v\n", entryPath, chunk.GetFileIdString(), mvErr)
|
||||
fmt.Printf("failed to move %s(%s): %v\n", entryPath, oldFid, mvErr)
|
||||
continue
|
||||
}
|
||||
entryChanged = true
|
||||
movedSources = append(movedSources, movedSourceNeedle{volumeId: oldVid, fileId: oldFid})
|
||||
}
|
||||
if entryChanged {
|
||||
if uErr := filer_pb.UpdateEntry(context.Background(), filerClient, &filer_pb.UpdateEntryRequest{
|
||||
@@ -157,13 +176,63 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
|
||||
Entry: entry,
|
||||
}); uErr != nil {
|
||||
fmt.Printf("failed to update %s: %v\n", entryPath, uErr)
|
||||
// Filer still references the source fids. Deleting them
|
||||
// now would lose data — abandon the cleanup for this
|
||||
// entry and let fsck reconcile later.
|
||||
return nil
|
||||
}
|
||||
c.deleteMovedSourceNeedles(commandEnv, entryPath, movedSources)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// movedSourceNeedle is a needle that was copied out of its source volume by
|
||||
// a move/rewrite operation and is safe to delete once the filer update that
|
||||
// re-pointed references to the new location has committed.
|
||||
type movedSourceNeedle struct {
|
||||
volumeId uint32
|
||||
fileId string
|
||||
}
|
||||
|
||||
// deleteMovedSourceNeedles fans out BatchDelete RPCs to every replica of each
|
||||
// source volume. Errors are logged but never returned — the source data is
|
||||
// already orphan at this point, so a failed cleanup just leaves work for a
|
||||
// later fsck. Propagating an error here would abort TraverseBfs and strand
|
||||
// the remaining entries mid-merge, which is strictly worse.
|
||||
func (c *commandFsMergeVolumes) deleteMovedSourceNeedles(commandEnv *CommandEnv, entryPath util.FullPath, sources []movedSourceNeedle) {
|
||||
if len(sources) == 0 {
|
||||
return
|
||||
}
|
||||
byVolume := make(map[uint32][]string)
|
||||
for _, s := range sources {
|
||||
byVolume[s.volumeId] = append(byVolume[s.volumeId], s.fileId)
|
||||
}
|
||||
for vid, fids := range byVolume {
|
||||
locations, found := commandEnv.MasterClient.GetLocations(vid)
|
||||
if !found {
|
||||
fmt.Printf("source cleanup %s: no locations for volume %d\n", entryPath, vid)
|
||||
continue
|
||||
}
|
||||
for _, loc := range locations {
|
||||
results := operation.DeleteFileIdsAtOneVolumeServer(loc.ServerAddress(), commandEnv.option.GrpcDialOption, fids, false)
|
||||
for _, r := range results {
|
||||
// StatusNotModified means the needle was already deleted
|
||||
// (e.g. a concurrent fsck purge or a replica that had
|
||||
// already reconciled). That's the desired end state, so
|
||||
// don't warn about it. Cast to int because r.Status is
|
||||
// an int32 protobuf field and linters flag the mixed-type
|
||||
// compare even though Go's untyped-constant rules make it
|
||||
// valid.
|
||||
if r.Error != "" && int(r.Status) != http.StatusNotModified {
|
||||
fmt.Printf("source cleanup %s: delete %s on %v: %s\n", entryPath, r.FileId, loc.ServerAddress(), r.Error)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
|
||||
info := c.volumes[vid]
|
||||
var err error
|
||||
@@ -322,8 +391,15 @@ func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.Volume
|
||||
// rewriteManifestChunk walks the sub-chunks referenced by a manifest chunk and
|
||||
// moves any that live in a source volume from the merge plan. If any sub-chunk
|
||||
// moves, or the manifest chunk itself lives in a source volume, the manifest
|
||||
// blob is re-serialized and uploaded to a freshly assigned file id. The old
|
||||
// manifest needle becomes orphaned and is later reclaimed by vacuum.
|
||||
// blob is re-serialized and uploaded to a freshly assigned file id.
|
||||
//
|
||||
// The returned movedSourceNeedle slice lists every source needle the caller
|
||||
// should delete once the filer update commits — sub-chunks that were moved and
|
||||
// nested manifest chunks that got rewritten. The OUTER manifest needle is the
|
||||
// caller's responsibility to record, since only the caller knows its pre-move
|
||||
// fid (this function's own chunk argument still reports the old fid on return,
|
||||
// but that couples manifest-nesting logic to a fact that is easier to capture
|
||||
// at the top-level callsite).
|
||||
func (c *commandFsMergeVolumes) rewriteManifestChunk(
|
||||
ctx context.Context,
|
||||
commandEnv *CommandEnv,
|
||||
@@ -332,26 +408,35 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk(
|
||||
entryPath util.FullPath,
|
||||
chunk *filer_pb.FileChunk,
|
||||
apply bool,
|
||||
) (*filer_pb.FileChunk, bool, error) {
|
||||
) (*filer_pb.FileChunk, bool, []movedSourceNeedle, error) {
|
||||
if !chunk.IsChunkManifest {
|
||||
return chunk, false, fmt.Errorf("not a manifest chunk: %s", chunk.GetFileIdString())
|
||||
return chunk, false, nil, fmt.Errorf("not a manifest chunk: %s", chunk.GetFileIdString())
|
||||
}
|
||||
|
||||
subChunks, err := filer.ResolveOneChunkManifest(ctx, lookupFn, chunk)
|
||||
if err != nil {
|
||||
return chunk, false, err
|
||||
return chunk, false, nil, err
|
||||
}
|
||||
|
||||
var movedSources []movedSourceNeedle
|
||||
anySubChanged := false
|
||||
for i, sub := range subChunks {
|
||||
if sub.IsChunkManifest {
|
||||
newSub, changed, rErr := c.rewriteManifestChunk(ctx, commandEnv, lookupFn, plan, entryPath, sub, apply)
|
||||
oldSubManifestFid := sub.GetFileIdString()
|
||||
oldSubManifestVid := sub.Fid.VolumeId
|
||||
newSub, changed, nestedSources, rErr := c.rewriteManifestChunk(ctx, commandEnv, lookupFn, plan, entryPath, sub, apply)
|
||||
if rErr != nil {
|
||||
return chunk, false, rErr
|
||||
return chunk, false, nil, rErr
|
||||
}
|
||||
if changed {
|
||||
subChunks[i] = newSub
|
||||
anySubChanged = true
|
||||
if apply {
|
||||
movedSources = append(movedSources, nestedSources...)
|
||||
// Nested manifest got replaced — its old needle is now
|
||||
// orphan on the same volume it used to live on.
|
||||
movedSources = append(movedSources, movedSourceNeedle{volumeId: oldSubManifestVid, fileId: oldSubManifestFid})
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -360,23 +445,26 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk(
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
fmt.Printf("move %s(%s) [inside manifest %s]\n", entryPath, sub.GetFileIdString(), chunk.GetFileIdString())
|
||||
oldSubFid := sub.GetFileIdString()
|
||||
oldSubVid := sub.Fid.VolumeId
|
||||
fmt.Printf("move %s(%s) [inside manifest %s]\n", entryPath, oldSubFid, chunk.GetFileIdString())
|
||||
if !apply {
|
||||
anySubChanged = true
|
||||
continue
|
||||
}
|
||||
if mErr := moveChunk(sub, toVid, commandEnv.MasterClient); mErr != nil {
|
||||
fmt.Printf("failed to move %s(%s): %v\n", entryPath, sub.GetFileIdString(), mErr)
|
||||
fmt.Printf("failed to move %s(%s): %v\n", entryPath, oldSubFid, mErr)
|
||||
continue
|
||||
}
|
||||
anySubChanged = true
|
||||
movedSources = append(movedSources, movedSourceNeedle{volumeId: oldSubVid, fileId: oldSubFid})
|
||||
}
|
||||
|
||||
manifestVid := needle.VolumeId(chunk.Fid.VolumeId)
|
||||
_, manifestMustMove := plan[manifestVid]
|
||||
|
||||
if !anySubChanged && !manifestMustMove {
|
||||
return chunk, false, nil
|
||||
return chunk, false, nil, nil
|
||||
}
|
||||
|
||||
fmt.Printf("rewrite manifest %s(%s)\n", entryPath, chunk.GetFileIdString())
|
||||
@@ -384,14 +472,14 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk(
|
||||
// Propagate "would change" so nested callers also announce their
|
||||
// rewrites in dry-run mode. The top-level caller gates any actual
|
||||
// filer writes on *apply, so returning true here is safe.
|
||||
return chunk, true, nil
|
||||
return chunk, true, nil, nil
|
||||
}
|
||||
|
||||
filer_pb.BeforeEntrySerialization(subChunks)
|
||||
defer filer_pb.AfterEntryDeserialization(subChunks)
|
||||
data, err := proto.Marshal(&filer_pb.FileChunkManifest{Chunks: subChunks})
|
||||
if err != nil {
|
||||
return chunk, false, fmt.Errorf("marshal manifest: %w", err)
|
||||
return chunk, false, nil, fmt.Errorf("marshal manifest: %w", err)
|
||||
}
|
||||
|
||||
collection := ""
|
||||
@@ -400,7 +488,7 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk(
|
||||
}
|
||||
newChunk, err := c.uploadManifestChunk(ctx, commandEnv, entryPath, collection, plan, data)
|
||||
if err != nil {
|
||||
return chunk, false, fmt.Errorf("upload new manifest: %w", err)
|
||||
return chunk, false, nil, fmt.Errorf("upload new manifest: %w", err)
|
||||
}
|
||||
|
||||
newChunk.IsChunkManifest = true
|
||||
@@ -411,7 +499,7 @@ func (c *commandFsMergeVolumes) rewriteManifestChunk(
|
||||
}
|
||||
newChunk.FileId = ""
|
||||
|
||||
return newChunk, true, nil
|
||||
return newChunk, true, movedSources, nil
|
||||
}
|
||||
|
||||
// uploadManifestChunk assigns a fresh file id via the filer and uploads the
|
||||
|
||||
Reference in New Issue
Block a user