ilm: Make per-tier stats available via admin-tier-info (#13381)

This commit is contained in:
Krishnan Parthasarathi
2021-10-23 21:38:33 -04:00
committed by GitHub
parent 3b9dfa9d29
commit 939fbb3c38
12 changed files with 2029 additions and 386 deletions

View File

@@ -31,6 +31,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/klauspost/compress/zstd"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
@@ -45,16 +46,70 @@ type dataUsageHash string
// sizeHistogram is a size histogram.
type sizeHistogram [dataUsageBucketLen]uint64
//msgp:tuple dataUsageEntry
type dataUsageEntry struct {
Children dataUsageHashMap
Children dataUsageHashMap `msg:"ch"`
// These fields do no include any children.
Size int64
Objects uint64
Versions uint64 // Versions that are not delete markers.
ObjSizes sizeHistogram
ReplicationStats *replicationAllStats
Compacted bool
Size int64 `msg:"sz"`
Objects uint64 `msg:"os"`
Versions uint64 `msg:"vs"` // Versions that are not delete markers.
ObjSizes sizeHistogram `msg:"szs"`
ReplicationStats *replicationAllStats `msg:"rs,omitempty"`
AllTierStats *allTierStats `msg:"ats,omitempty"`
Compacted bool `msg:"c"`
}
// allTierStats is a collection of per-tier stats across all configured remote
// tiers.
type allTierStats struct {
Tiers map[string]tierStats `msg:"ts"`
}
func newAllTierStats() *allTierStats {
return &allTierStats{
Tiers: make(map[string]tierStats),
}
}
func (ats *allTierStats) addSizes(sz sizeSummary) {
for tier, st := range sz.tiers {
ats.Tiers[tier] = ats.Tiers[tier].add(st)
}
}
func (ats *allTierStats) merge(other *allTierStats) {
for tier, st := range other.Tiers {
ats.Tiers[tier] = ats.Tiers[tier].add(st)
}
}
func (ats *allTierStats) adminStats(stats map[string]madmin.TierStats) map[string]madmin.TierStats {
if ats == nil {
return stats
}
// Update stats for tiers as they become available.
for tier, st := range ats.Tiers {
stats[tier] = madmin.TierStats{
TotalSize: st.TotalSize,
NumVersions: st.NumVersions,
NumObjects: st.NumObjects,
}
}
return stats
}
// tierStats holds per-tier stats of a remote tier.
type tierStats struct {
TotalSize uint64 `msg:"ts"`
NumVersions int `msg:"nv"`
NumObjects int `msg:"no"`
}
func (ts tierStats) add(u tierStats) tierStats {
ts.TotalSize += u.TotalSize
ts.NumVersions += u.NumVersions
ts.NumObjects += u.NumObjects
return ts
}
//msgp:tuple replicationStatsV1
@@ -96,14 +151,19 @@ func (rs replicationStats) Empty() bool {
rs.FailedCount == 0
}
//msgp:tuple replicationAllStats
type replicationAllStats struct {
Targets map[string]replicationStats `msg:"t,omitempty"`
ReplicaSize uint64 `msg:"r,omitempty"`
}
//msgp:tuple replicationAllStatsV1
type replicationAllStatsV1 struct {
Targets map[string]replicationStats
ReplicaSize uint64 `msg:"ReplicaSize,omitempty"`
}
//msgp:encode ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4
//msgp:marshal ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4
//msgp:encode ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6
//msgp:marshal ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4 dataUsageEntryV5 dataUsageEntryV6
//msgp:tuple dataUsageEntryV2
type dataUsageEntryV2 struct {
@@ -149,6 +209,18 @@ type dataUsageEntryV5 struct {
Compacted bool
}
//msgp:tuple dataUsageEntryV6
type dataUsageEntryV6 struct {
Children dataUsageHashMap
// These fields do no include any children.
Size int64
Objects uint64
Versions uint64 // Versions that are not delete markers.
ObjSizes sizeHistogram
ReplicationStats *replicationAllStatsV1
Compacted bool
}
// dataUsageCache contains a cache of data usage entries latest version.
type dataUsageCache struct {
Info dataUsageCacheInfo
@@ -156,8 +228,8 @@ type dataUsageCache struct {
Disks []string
}
//msgp:encode ignore dataUsageCacheV2 dataUsageCacheV3 dataUsageCacheV4 dataUsageCacheV5
//msgp:marshal ignore dataUsageCacheV2 dataUsageCacheV3 dataUsageCacheV4 dataUsageCacheV5
//msgp:encode ignore dataUsageCacheV2 dataUsageCacheV3 dataUsageCacheV4 dataUsageCacheV5 dataUsageCacheV6
//msgp:marshal ignore dataUsageCacheV2 dataUsageCacheV3 dataUsageCacheV4 dataUsageCacheV5 dataUsageCacheV6
// dataUsageCacheV2 contains a cache of data usage entries version 2.
type dataUsageCacheV2 struct {
@@ -166,27 +238,34 @@ type dataUsageCacheV2 struct {
Cache map[string]dataUsageEntryV2
}
// dataUsageCache contains a cache of data usage entries version 3.
// dataUsageCacheV3 contains a cache of data usage entries version 3.
type dataUsageCacheV3 struct {
Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntryV3
}
// dataUsageCache contains a cache of data usage entries version 4.
// dataUsageCacheV4 contains a cache of data usage entries version 4.
type dataUsageCacheV4 struct {
Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntryV4
}
// dataUsageCache contains a cache of data usage entries version 5.
// dataUsageCacheV5 contains a cache of data usage entries version 5.
type dataUsageCacheV5 struct {
Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntryV5
}
// dataUsageCacheV6 contains a cache of data usage entries version 6.
type dataUsageCacheV6 struct {
Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntryV6
}
//msgp:ignore dataUsageEntryInfo
type dataUsageEntryInfo struct {
Name string
@@ -242,6 +321,12 @@ func (e *dataUsageEntry) addSizes(summary sizeSummary) {
e.ReplicationStats.Targets[arn] = tgtStat
}
}
if summary.tiers != nil {
if e.AllTierStats == nil {
e.AllTierStats = newAllTierStats()
}
e.AllTierStats.addSizes(summary)
}
}
// merge other data usage entry into this, excluding children.
@@ -271,6 +356,13 @@ func (e *dataUsageEntry) merge(other dataUsageEntry) {
for i, v := range other.ObjSizes[:] {
e.ObjSizes[i] += v
}
if other.AllTierStats != nil {
if e.AllTierStats == nil {
e.AllTierStats = newAllTierStats()
}
e.AllTierStats.merge(other.AllTierStats)
}
}
// mod returns true if the hash mod cycles == cycle.
@@ -317,6 +409,11 @@ func (e dataUsageEntry) clone() dataUsageEntry {
r := *e.ReplicationStats
e.ReplicationStats = &r
}
if e.AllTierStats != nil {
ats := newAllTierStats()
ats.merge(e.AllTierStats)
e.AllTierStats = ats
}
return e
}
@@ -438,6 +535,7 @@ func (d *dataUsageCache) dui(path string, buckets []BucketInfo) DataUsageInfo {
ObjectsTotalSize: uint64(flat.Size),
BucketsCount: uint64(len(e.Children)),
BucketsUsage: d.bucketsUsageInfo(buckets),
TierStats: d.tiersUsageInfo(buckets),
}
return dui
}
@@ -654,6 +752,25 @@ func (h *sizeHistogram) toMap() map[string]uint64 {
return res
}
func (d *dataUsageCache) tiersUsageInfo(buckets []BucketInfo) *allTierStats {
dst := newAllTierStats()
for _, bucket := range buckets {
e := d.find(bucket.Name)
if e == nil {
continue
}
flat := d.flatten(*e)
if flat.AllTierStats == nil {
continue
}
dst.merge(flat.AllTierStats)
}
if len(dst.Tiers) == 0 {
return nil
}
return dst
}
// bucketsUsageInfo returns the buckets usage info as a map, with
// key as bucket name
func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]BucketUsageInfo {
@@ -857,7 +974,8 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
// Bumping the cache version will drop data from previous versions
// and write new data with the new version.
const (
dataUsageCacheVerCurrent = 6
dataUsageCacheVerCurrent = 7
dataUsageCacheVerV6 = 6
dataUsageCacheVerV5 = 5
dataUsageCacheVerV4 = 4
dataUsageCacheVerV3 = 3
@@ -1086,6 +1204,40 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
d.Cache[k] = e
}
return nil
case dataUsageCacheVerV6:
// Zstd compressed.
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))
if err != nil {
return err
}
defer dec.Close()
dold := &dataUsageCacheV6{}
if err = dold.DecodeMsg(msgp.NewReader(dec)); err != nil {
return err
}
d.Info = dold.Info
d.Disks = dold.Disks
d.Cache = make(map[string]dataUsageEntry, len(dold.Cache))
for k, v := range dold.Cache {
var replicationStats *replicationAllStats
if v.ReplicationStats != nil {
replicationStats = &replicationAllStats{
Targets: v.ReplicationStats.Targets,
ReplicaSize: v.ReplicationStats.ReplicaSize,
}
}
due := dataUsageEntry{
Children: v.Children,
Size: v.Size,
Objects: v.Objects,
Versions: v.Versions,
ObjSizes: v.ObjSizes,
ReplicationStats: replicationStats,
Compacted: v.Compacted,
}
d.Cache[k] = due
}
return nil
case dataUsageCacheVerCurrent:
// Zstd compressed.
dec, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(2))