mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-23 02:01:32 +00:00
fix(shell): scope volume.fsck filer walk when -volumeId selects one bucketed collection (#9347)
* fix(shell): scope volume.fsck filer walk to the bucket when -volumeId selects one bucketed collection Closes #9345. -volumeId only filtered which volume .idx files were pulled; the filer-side BFS still walked from "/", printing every directory under -v and making it look like the flag was ignored. When all requested volumes share a single non-empty collection that maps to an existing <bucketsPath>/<collection> directory, restrict the BFS root to that bucket. Empty-collection volumes or multi-collection selections fall back to the full walk, since chunks for those can live anywhere. * trim comments * address review: collapse getCollectFilerFilePath; unshadow receiver in loop
This commit is contained in:
@@ -52,6 +52,7 @@ type commandVolumeFsck struct {
|
||||
bucketsPath string
|
||||
collection *string
|
||||
volumeIds map[uint32]bool
|
||||
scopedFilerPath string
|
||||
tempFolder string
|
||||
verbose *bool
|
||||
forcePurging *bool
|
||||
@@ -185,6 +186,11 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
|
||||
}
|
||||
}
|
||||
|
||||
c.scopedFilerPath = c.resolveScopedFilerPath(dataNodeVolumeIdToVInfo)
|
||||
if *c.verbose && c.scopedFilerPath != "/" {
|
||||
fmt.Fprintf(c.writer, "scoping filer walk to %s\n", c.scopedFilerPath)
|
||||
}
|
||||
|
||||
var collectCutoffFromAtNs int64 = 0
|
||||
if cutoffTimeAgo.Seconds() != 0 {
|
||||
collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano()
|
||||
@@ -854,10 +860,65 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
|
||||
}
|
||||
|
||||
func (c *commandVolumeFsck) getCollectFilerFilePath() string {
|
||||
return c.scopedFilerPath
|
||||
}
|
||||
|
||||
func (c *commandVolumeFsck) resolveScopedFilerPath(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo) string {
|
||||
if *c.collection != "" {
|
||||
return fmt.Sprintf("%s/%s", c.bucketsPath, *c.collection)
|
||||
}
|
||||
return "/"
|
||||
if len(c.volumeIds) == 0 {
|
||||
return "/"
|
||||
}
|
||||
collections := make(map[string]struct{})
|
||||
for _, vidMap := range dataNodeVolumeIdToVInfo {
|
||||
for vid, vinfo := range vidMap {
|
||||
if _, ok := c.volumeIds[vid]; !ok {
|
||||
continue
|
||||
}
|
||||
// empty collection: volume can be referenced from anywhere
|
||||
if vinfo.collection == "" {
|
||||
return "/"
|
||||
}
|
||||
collections[vinfo.collection] = struct{}{}
|
||||
if len(collections) > 1 {
|
||||
return "/"
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(collections) != 1 {
|
||||
return "/"
|
||||
}
|
||||
var collection string
|
||||
for col := range collections {
|
||||
collection = col
|
||||
}
|
||||
exists, err := c.bucketDirExists(collection)
|
||||
if err != nil || !exists {
|
||||
return "/"
|
||||
}
|
||||
return fmt.Sprintf("%s/%s", c.bucketsPath, collection)
|
||||
}
|
||||
|
||||
func (c *commandVolumeFsck) bucketDirExists(name string) (bool, error) {
|
||||
var found bool
|
||||
err := c.env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: c.bucketsPath,
|
||||
Name: name,
|
||||
})
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), filer_pb.ErrNotFound.Error()) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
if resp.Entry != nil && resp.Entry.IsDirectory {
|
||||
found = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return found, err
|
||||
}
|
||||
|
||||
func getVolumeFileIdFile(tempFolder string, dataNodeid string, vid uint32) string {
|
||||
|
||||
Reference in New Issue
Block a user