mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-13 21:31:32 +00:00
* fix(s3/shell): include EC volumes in bucket size metrics and collection.list S3 bucket size metrics exported to Prometheus (and fed through stats.UpdateBucketSizeMetrics) are computed by collectCollectionInfoFromTopology, which only walked diskInfo.VolumeInfos. As soon as a volume was encoded to EC it dropped out of every aggregate, so Grafana showed bucket sizes shrinking while physical disk usage kept climbing. The shell helper collectCollectionInfo — used by collection.list and s3.bucket.quota.enforce — had the same gap, with the EC branch left as a commented-out TODO. Fold EC shards into both paths using the same approach the admin dashboard already uses (PR #9093): - PhysicalSize / Size sum across shard holders: EC shards are node-local (not replicas), so per-node TotalSize() and MinusParityShards().TotalSize() sum to the whole-volume physical and logical sizes respectively. - FileCount is deduped via max across reporters (every shard holder reports the same .ecx count; a slow node with a not-yet-loaded .ecx reports 0 and must not pin the aggregate). - DeleteCount is summed (each delete tombstones exactly one node's .ecj). - VolumeCount increments once per unique EC volume id. Adds regression tests covering pure-EC, mixed regular+EC, and the slow-reporter FileCount dedupe case. Refs #9086 * Address PR review feedback: EC size helpers, composite key, VolumeCount dedupe - Add EcShardsTotalSize / EcShardsDataSize helpers in the erasure_coding package that walk the shard bitmap directly instead of materializing a ShardsInfo and copying it via MinusParityShards(). Keeps the DataShardsCount dependency encapsulated in one place and avoids the per-shard allocation/copy overhead in the metrics hot path. - Switch shell collectCollectionInfo ecVolumes map to a composite {collection, volumeId} key, matching the bucket_size_metrics collector and defending against any cross-collection volume id aliasing. - Dedupe VolumeCount in shell addToCollection by volume id so regular volumes aren't counted once per replica presence. Aligns the shell's collection.list output with the S3 metrics collector and the EC branch, all of which now report logical volume counts. - Add unit tests for the new helpers and for the regular-volume VolumeCount dedupe. * Parameterize EcShardsDataSize with dataShards for custom EC ratios Add a dataShards parameter to EcShardsDataSize so forks with per-volume ratio metadata (e.g. the enterprise data_shards field carried on an extended VolumeEcShardInformationMessage) can pass the configured value and get accurate logical sizes under custom EC policies like 6+3 or 16+6. Passing 0 or a negative value falls back to the upstream DataShardsCount default, which is correct for the fixed 10+4 layout — so OSS callers in s3api and shell pass 0 and keep their current behavior. Added table cases covering the custom 6+3 and 16+6 paths so the parameterization is pinned by tests.
183 lines
5.4 KiB
Go
183 lines
5.4 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandCollectionList{})
|
|
}
|
|
|
|
type commandCollectionList struct {
|
|
}
|
|
|
|
func (c *commandCollectionList) Name() string {
|
|
return "collection.list"
|
|
}
|
|
|
|
func (c *commandCollectionList) Help() string {
|
|
return `list all collections`
|
|
}
|
|
|
|
func (c *commandCollectionList) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
type CollectionInfo struct {
|
|
FileCount float64
|
|
DeleteCount float64
|
|
DeletedByteCount float64
|
|
Size float64
|
|
VolumeCount int
|
|
}
|
|
|
|
func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
collections, err := ListCollectionNames(commandEnv, true, true)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
collectionInfos := make(map[string]*CollectionInfo)
|
|
|
|
collectCollectionInfo(topologyInfo, collectionInfos)
|
|
|
|
for _, c := range collections {
|
|
cif, found := collectionInfos[c]
|
|
if !found {
|
|
continue
|
|
}
|
|
fmt.Fprintf(writer, "collection:\"%s\"\tvolumeCount:%d\tsize:%.0f\tfileCount:%.0f\tdeletedBytes:%.0f\tdeletion:%.0f\n", c, cif.VolumeCount, cif.Size, cif.FileCount, cif.DeletedByteCount, cif.DeleteCount)
|
|
}
|
|
|
|
fmt.Fprintf(writer, "Total %d collections.\n", len(collections))
|
|
|
|
return nil
|
|
}
|
|
|
|
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
|
|
var resp *master_pb.CollectionListResponse
|
|
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
|
|
IncludeNormalVolumes: includeNormalVolumes,
|
|
IncludeEcVolumes: includeEcVolumes,
|
|
})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, c := range resp.Collections {
|
|
collections = append(collections, c.Name)
|
|
}
|
|
return
|
|
}
|
|
|
|
// volumeKey uniquely identifies a volume for per-collection dedupe. Volume
|
|
// IDs are scoped to a collection, so we key by (collection, volumeId) to
|
|
// avoid cross-collection aliasing if the same numeric ID is ever reused.
|
|
type volumeKey struct {
|
|
collection string
|
|
volumeId uint32
|
|
}
|
|
|
|
// addToCollection folds one replica of a regular volume into the collection
|
|
// totals. Size/FileCount/DeleteCount/DeletedByteCount are divided by the
|
|
// replication factor so that summing over all replicas yields the whole-
|
|
// volume value. VolumeCount is deduped across replicas via seenVolumes so
|
|
// it reports logical volumes (same semantics as the S3 bucket metrics
|
|
// collector and the EC branch below), not shard/replica presences.
|
|
func addToCollection(collectionInfos map[string]*CollectionInfo, seenVolumes map[volumeKey]bool, vif *master_pb.VolumeInformationMessage) {
|
|
c := vif.Collection
|
|
cif, found := collectionInfos[c]
|
|
if !found {
|
|
cif = &CollectionInfo{}
|
|
collectionInfos[c] = cif
|
|
}
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement))
|
|
copyCount := float64(replicaPlacement.GetCopyCount())
|
|
cif.Size += float64(vif.Size) / copyCount
|
|
cif.DeleteCount += float64(vif.DeleteCount) / copyCount
|
|
cif.FileCount += float64(vif.FileCount) / copyCount
|
|
cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount
|
|
|
|
key := volumeKey{collection: c, volumeId: vif.Id}
|
|
if !seenVolumes[key] {
|
|
seenVolumes[key] = true
|
|
cif.VolumeCount++
|
|
}
|
|
}
|
|
|
|
// ecCollectionAgg accumulates per-EC-volume counts across the shard holders.
|
|
// fileCount is volume-wide (every holder reports the same .ecx count) so it
|
|
// is deduped via max; deleteCount is node-local to each .ecj and summed.
|
|
type ecCollectionAgg struct {
|
|
collection string
|
|
fileCount uint64
|
|
deleteCount uint64
|
|
}
|
|
|
|
func collectCollectionInfo(t *master_pb.TopologyInfo, collectionInfos map[string]*CollectionInfo) {
|
|
seenVolumes := make(map[volumeKey]bool)
|
|
ecVolumes := make(map[volumeKey]*ecCollectionAgg)
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
addToCollection(collectionInfos, seenVolumes, vi)
|
|
}
|
|
for _, esi := range diskInfo.EcShardInfos {
|
|
c := esi.Collection
|
|
cif, found := collectionInfos[c]
|
|
if !found {
|
|
cif = &CollectionInfo{}
|
|
collectionInfos[c] = cif
|
|
}
|
|
|
|
// EC shards are node-local, so data-shard sizes sum
|
|
// across nodes to give the logical volume size.
|
|
// Upstream OSS uses the fixed 10+4 ratio; forks with
|
|
// per-volume ratio metadata should pass the
|
|
// configured dataShards value here.
|
|
cif.Size += float64(erasure_coding.EcShardsDataSize(esi, 0))
|
|
|
|
key := volumeKey{collection: c, volumeId: esi.Id}
|
|
agg, ok := ecVolumes[key]
|
|
if !ok {
|
|
agg = &ecCollectionAgg{collection: c}
|
|
ecVolumes[key] = agg
|
|
cif.VolumeCount++
|
|
}
|
|
if esi.FileCount > agg.fileCount {
|
|
agg.fileCount = esi.FileCount
|
|
}
|
|
agg.deleteCount += esi.DeleteCount
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, agg := range ecVolumes {
|
|
cif := collectionInfos[agg.collection]
|
|
if cif == nil {
|
|
continue
|
|
}
|
|
cif.FileCount += float64(agg.fileCount)
|
|
cif.DeleteCount += float64(agg.deleteCount)
|
|
}
|
|
}
|