Files
seaweedfs/weed/shell/command_collection_list.go
Chris Lu 7364f148bd fix(s3/shell): factor EC volumes into bucket size metrics and collection.list (#9182)
* fix(s3/shell): include EC volumes in bucket size metrics and collection.list

S3 bucket size metrics exported to Prometheus (and fed through
stats.UpdateBucketSizeMetrics) are computed by
collectCollectionInfoFromTopology, which only walked diskInfo.VolumeInfos.
As soon as a volume was encoded to EC it dropped out of every aggregate,
so Grafana showed bucket sizes shrinking while physical disk usage kept
climbing. The shell helper collectCollectionInfo — used by collection.list
and s3.bucket.quota.enforce — had the same gap, with the EC branch left as
a commented-out TODO.

Fold EC shards into both paths using the same approach the admin dashboard
already uses (PR #9093):

- PhysicalSize / Size sum across shard holders: EC shards are node-local
  (not replicas), so per-node TotalSize() and MinusParityShards().TotalSize()
  sum to the whole-volume physical and logical sizes respectively.
- FileCount is deduped via max across reporters (every shard holder reports
  the same .ecx count; a slow node with a not-yet-loaded .ecx reports 0 and
  must not pin the aggregate).
- DeleteCount is summed (each delete tombstones exactly one node's .ecj).
- VolumeCount increments once per unique EC volume id.

Adds regression tests covering pure-EC, mixed regular+EC, and the
slow-reporter FileCount dedupe case.

Refs #9086

* Address PR review feedback: EC size helpers, composite key, VolumeCount dedupe

- Add EcShardsTotalSize / EcShardsDataSize helpers in the erasure_coding
  package that walk the shard bitmap directly instead of materializing a
  ShardsInfo and copying it via MinusParityShards(). Keeps the
  DataShardsCount dependency encapsulated in one place and avoids the
  per-shard allocation/copy overhead in the metrics hot path.
- Switch shell collectCollectionInfo ecVolumes map to a composite
  {collection, volumeId} key, matching the bucket_size_metrics collector
  and defending against any cross-collection volume id aliasing.
- Dedupe VolumeCount in shell addToCollection by volume id so regular
  volumes aren't counted once per replica presence. Aligns the shell's
  collection.list output with the S3 metrics collector and the EC branch,
  all of which now report logical volume counts.
- Add unit tests for the new helpers and for the regular-volume
  VolumeCount dedupe.

* Parameterize EcShardsDataSize with dataShards for custom EC ratios

Add a dataShards parameter to EcShardsDataSize so forks with per-volume
ratio metadata (e.g. the enterprise data_shards field carried on an
extended VolumeEcShardInformationMessage) can pass the configured value
and get accurate logical sizes under custom EC policies like 6+3 or 16+6.
Passing 0 or a negative value falls back to the upstream DataShardsCount
default, which is correct for the fixed 10+4 layout — so OSS callers in
s3api and shell pass 0 and keep their current behavior.

Added table cases covering the custom 6+3 and 16+6 paths so the
parameterization is pinned by tests.
2026-04-21 20:17:42 -07:00

183 lines
5.4 KiB
Go

package shell
import (
"context"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
)
func init() {
Commands = append(Commands, &commandCollectionList{})
}
type commandCollectionList struct {
}
func (c *commandCollectionList) Name() string {
return "collection.list"
}
func (c *commandCollectionList) Help() string {
return `list all collections`
}
func (c *commandCollectionList) HasTag(CommandTag) bool {
return false
}
type CollectionInfo struct {
FileCount float64
DeleteCount float64
DeletedByteCount float64
Size float64
VolumeCount int
}
func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
collections, err := ListCollectionNames(commandEnv, true, true)
if err != nil {
return err
}
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
collectionInfos := make(map[string]*CollectionInfo)
collectCollectionInfo(topologyInfo, collectionInfos)
for _, c := range collections {
cif, found := collectionInfos[c]
if !found {
continue
}
fmt.Fprintf(writer, "collection:\"%s\"\tvolumeCount:%d\tsize:%.0f\tfileCount:%.0f\tdeletedBytes:%.0f\tdeletion:%.0f\n", c, cif.VolumeCount, cif.Size, cif.FileCount, cif.DeletedByteCount, cif.DeleteCount)
}
fmt.Fprintf(writer, "Total %d collections.\n", len(collections))
return nil
}
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
})
return err
})
if err != nil {
return
}
for _, c := range resp.Collections {
collections = append(collections, c.Name)
}
return
}
// volumeKey uniquely identifies a volume for per-collection dedupe. Volume
// IDs are scoped to a collection, so we key by (collection, volumeId) to
// avoid cross-collection aliasing if the same numeric ID is ever reused.
type volumeKey struct {
collection string
volumeId uint32
}
// addToCollection folds one replica of a regular volume into the collection
// totals. Size/FileCount/DeleteCount/DeletedByteCount are divided by the
// replication factor so that summing over all replicas yields the whole-
// volume value. VolumeCount is deduped across replicas via seenVolumes so
// it reports logical volumes (same semantics as the S3 bucket metrics
// collector and the EC branch below), not shard/replica presences.
func addToCollection(collectionInfos map[string]*CollectionInfo, seenVolumes map[volumeKey]bool, vif *master_pb.VolumeInformationMessage) {
c := vif.Collection
cif, found := collectionInfos[c]
if !found {
cif = &CollectionInfo{}
collectionInfos[c] = cif
}
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement))
copyCount := float64(replicaPlacement.GetCopyCount())
cif.Size += float64(vif.Size) / copyCount
cif.DeleteCount += float64(vif.DeleteCount) / copyCount
cif.FileCount += float64(vif.FileCount) / copyCount
cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount
key := volumeKey{collection: c, volumeId: vif.Id}
if !seenVolumes[key] {
seenVolumes[key] = true
cif.VolumeCount++
}
}
// ecCollectionAgg accumulates per-EC-volume counts across the shard holders.
// fileCount is volume-wide (every holder reports the same .ecx count) so it
// is deduped via max; deleteCount is node-local to each .ecj and summed.
type ecCollectionAgg struct {
collection string
fileCount uint64
deleteCount uint64
}
func collectCollectionInfo(t *master_pb.TopologyInfo, collectionInfos map[string]*CollectionInfo) {
seenVolumes := make(map[volumeKey]bool)
ecVolumes := make(map[volumeKey]*ecCollectionAgg)
for _, dc := range t.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, vi := range diskInfo.VolumeInfos {
addToCollection(collectionInfos, seenVolumes, vi)
}
for _, esi := range diskInfo.EcShardInfos {
c := esi.Collection
cif, found := collectionInfos[c]
if !found {
cif = &CollectionInfo{}
collectionInfos[c] = cif
}
// EC shards are node-local, so data-shard sizes sum
// across nodes to give the logical volume size.
// Upstream OSS uses the fixed 10+4 ratio; forks with
// per-volume ratio metadata should pass the
// configured dataShards value here.
cif.Size += float64(erasure_coding.EcShardsDataSize(esi, 0))
key := volumeKey{collection: c, volumeId: esi.Id}
agg, ok := ecVolumes[key]
if !ok {
agg = &ecCollectionAgg{collection: c}
ecVolumes[key] = agg
cif.VolumeCount++
}
if esi.FileCount > agg.fileCount {
agg.fileCount = esi.FileCount
}
agg.deleteCount += esi.DeleteCount
}
}
}
}
}
for _, agg := range ecVolumes {
cif := collectionInfos[agg.collection]
if cif == nil {
continue
}
cif.FileCount += float64(agg.fileCount)
cif.DeleteCount += float64(agg.deleteCount)
}
}