diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index 1b7f3a79e..b52dde9cb 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -89,6 +89,38 @@ func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool { return eFi.ModTime.Equal(oFi.ModTime) && eFi.Size == oFi.Size && eFi.VersionID == oFi.VersionID } +// resolveEntries returns if the entries match by comparing their latest version fileinfo. +func resolveEntries(a, b *metaCacheEntry, bucket string) *metaCacheEntry { + if b == nil { + return a + } + if a == nil { + return b + } + + aFi, err := a.fileInfo(bucket) + if err != nil { + return b + } + bFi, err := b.fileInfo(bucket) + if err != nil { + return a + } + + if !aFi.ModTime.Equal(bFi.ModTime) { + if aFi.ModTime.After(bFi.ModTime) { + return a + } + return b + } + + if aFi.NumVersions > bFi.NumVersions { + return a + } + + return b +} + // isInDir returns whether the entry is in the dir when considering the separator. func (e metaCacheEntry) isInDir(dir, separator string) bool { if len(dir) == 0 { @@ -199,15 +231,29 @@ type metadataResolutionParams struct { dirQuorum int // Number if disks needed for a directory to 'exist'. objQuorum int // Number of disks needed for an object to 'exist'. bucket string // Name of the bucket. Used for generating cached fileinfo. + + // Reusable slice for resolution + candidates []struct { + n int + e *metaCacheEntry + } } +// resolve multiple entries. +// entries are resolved by majority, then if tied by mod-time and versions. func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCacheEntry, ok bool) { if len(m) == 0 { return nil, false } dirExists := 0 - objExists := 0 + if cap(r.candidates) < len(m) { + r.candidates = make([]struct { + n int + e *metaCacheEntry + }, 0, len(m)) + } + r.candidates = r.candidates[0:] for i := range m { entry := &m[i] if entry.name == "" { @@ -225,28 +271,56 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa continue } + found := false + for i, c := range r.candidates { + if c.e.matches(entry, r.bucket) { + c.n++ + r.candidates[i] = c + found = true + break + } + } + if !found { + r.candidates = append(r.candidates, struct { + n int + e *metaCacheEntry + }{n: 1, e: entry}) + } + } + if selected != nil && selected.isDir() && dirExists > r.dirQuorum { + return selected, true + } + + switch len(r.candidates) { + case 0: if selected == nil { - objExists++ - selected = entry - continue + return nil, false } - - if selected.matches(entry, r.bucket) { - objExists++ - continue + if !selected.isDir() || dirExists < r.dirQuorum { + return nil, false } + return selected, true + case 1: + cand := r.candidates[0] + if cand.n < r.objQuorum { + return nil, false + } + return cand.e, true + default: + // Sort by matches.... + sort.Slice(r.candidates, func(i, j int) bool { + return r.candidates[i].n > r.candidates[j].n + }) + // Check if we have enough. + if r.candidates[0].n < r.objQuorum { + return nil, false + } + if r.candidates[0].n > r.candidates[1].n { + return r.candidates[0].e, true + } + // Tie between two, resolve using modtime+versions. + return resolveEntries(r.candidates[0].e, r.candidates[1].e, r.bucket), true } - - if selected == nil { - return nil, false - } - - if selected.isDir() && dirExists < r.dirQuorum { - return nil, false - } else if !selected.isDir() && objExists < r.objQuorum { - return nil, false - } - return selected, true } // firstFound returns the first found and the number of set entries. diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index 7829cf926..831b66860 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -182,7 +182,11 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e // Resolve non-trivial conflicts entries.deduplicate(func(existing, other *metaCacheEntry) (replace bool) { - if existing.isDir() { + // Pick object over directory + if existing.isDir() && !other.isDir() { + return true + } + if !existing.isDir() && other.isDir() { return false } eFIV, err := existing.fileInfo(o.Bucket)