diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index d86b2ebf4..c2aa54d82 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -52,6 +52,7 @@ type commandVolumeFsck struct { bucketsPath string collection *string volumeIds map[uint32]bool + scopedFilerPath string tempFolder string verbose *bool forcePurging *bool @@ -185,6 +186,11 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. } } + c.scopedFilerPath = c.resolveScopedFilerPath(dataNodeVolumeIdToVInfo) + if *c.verbose && c.scopedFilerPath != "/" { + fmt.Fprintf(c.writer, "scoping filer walk to %s\n", c.scopedFilerPath) + } + var collectCutoffFromAtNs int64 = 0 if cutoffTimeAgo.Seconds() != 0 { collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano() @@ -854,10 +860,65 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds [] } func (c *commandVolumeFsck) getCollectFilerFilePath() string { + return c.scopedFilerPath +} + +func (c *commandVolumeFsck) resolveScopedFilerPath(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo) string { if *c.collection != "" { return fmt.Sprintf("%s/%s", c.bucketsPath, *c.collection) } - return "/" + if len(c.volumeIds) == 0 { + return "/" + } + collections := make(map[string]struct{}) + for _, vidMap := range dataNodeVolumeIdToVInfo { + for vid, vinfo := range vidMap { + if _, ok := c.volumeIds[vid]; !ok { + continue + } + // empty collection: volume can be referenced from anywhere + if vinfo.collection == "" { + return "/" + } + collections[vinfo.collection] = struct{}{} + if len(collections) > 1 { + return "/" + } + } + } + if len(collections) != 1 { + return "/" + } + var collection string + for col := range collections { + collection = col + } + exists, err := c.bucketDirExists(collection) + if err != nil || !exists { + return "/" + } + return fmt.Sprintf("%s/%s", c.bucketsPath, collection) +} + +func (c *commandVolumeFsck) bucketDirExists(name string) (bool, error) { + var found bool + err := c.env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: c.bucketsPath, + Name: name, + }) + if err != nil { + if strings.Contains(err.Error(), filer_pb.ErrNotFound.Error()) { + return nil + } + return err + } + if resp.Entry != nil && resp.Entry.IsDirectory { + found = true + } + return nil + }) + return found, err } func getVolumeFileIdFile(tempFolder string, dataNodeid string, vid uint32) string {