mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-21 01:01:29 +00:00
* admin: report file and delete counts for EC volumes The admin bucket size fix (#9058) left object counts at zero for EC-encoded data because VolumeEcShardInformationMessage carried no file count. Billing/monitoring dashboards therefore still under-report objects once a bucket is EC-encoded. Thread file_count and delete_count end-to-end: - Add file_count/delete_count to VolumeEcShardInformationMessage (proto fields 8 and 9) and regenerate master_pb. - Compute them lazily on volume servers by walking the .ecx index once per EcVolume, cache on the struct, and keep the cache in sync inside DeleteNeedleFromEcx (distinguishing live vs already-tombstoned entries so idempotent deletes do not drift the counts). - Populate the new proto fields from EcVolume.ToVolumeEcShardInformationMessage and carry them through the master-side EcVolumeInfo / topology sync. - Aggregate in admin collectCollectionStats, deduping per volume id: every node holding shards of an EC volume reports the same counts, so summing across nodes would otherwise multiply the object count by the number of shard holders. Regression tests cover the initial .ecx walk, live/tombstoned delete bookkeeping (including idempotent and missing-key cases), and the admin dedup path for an EC volume reported by multiple nodes. * ec: include .ecj journal in EcVolume delete count The initial delete count only reflected .ecx tombstones, missing any needle that was journaled in .ecj but not yet folded into .ecx — e.g. on partial recovery. Expand initCountsLocked to take the union of .ecx tombstones and .ecj journal entries, deduped by needle id, so: - an id that is both tombstoned in .ecx and listed in .ecj counts once - a duplicate .ecj entry counts once - an .ecj id with a live .ecx entry is counted as deleted (not live) - an .ecj id with no matching .ecx entry is still counted Covered by TestEcVolumeFileAndDeleteCountEcjUnion. * ec: report delete count authoritatively and tombstone once per delete Address two issues with the previous EcVolume file/delete count work: 1. The delete count was computed lazily on first heartbeat and mixed in a .ecj-union fallback to "recover" partial state. That diverged from how regular volumes report counts (always live from the needle map) and had drift cases when .ecj got reconciled. Replace with an eager walk of .ecx at NewEcVolume time, maintained incrementally on every DeleteNeedleFromEcx call. Semantics now match needle_map_metric: FileCount is the total number of needles ever recorded in .ecx (live + tombstoned), DeleteCount is the tombstones — so live = FileCount - DeleteCount. Drop the .ecj-union logic entirely. 2. A single EC needle delete fanned out to every node holding a replica of the primary data shard and called DeleteNeedleFromEcx on each, which inflated the per-volume delete total by the replica factor. Rewrite doDeleteNeedleFromRemoteEcShardServers to try replicas in order and stop at the first success (one tombstone per delete), and only fall back to other shards when the primary shard has no home (ErrEcShardMissing sentinel), not on transient RPC errors. Admin aggregation now folds EC counts correctly: FileCount is deduped per volume id (every shard holder has an identical .ecx) and DeleteCount is summed across nodes (each delete tombstones exactly one node). Live object count = deduped FileCount - summed DeleteCount. Tests updated to match the new semantics: - EC volume counts seed FileCount as total .ecx entries (live + tombstoned), DeleteCount as tombstones. - DeleteNeedleFromEcx keeps FileCount constant and increments DeleteCount only on live->tombstone transitions. - Admin dedup test uses distinct per-node delete counts (5 + 3 + 2) to prove they're summed, while FileCount=100 is applied once. * ec: test fixture uses real vid; admin warns on skewed ec counts - writeFixture now builds the .ecx/.ecj/.ec00/.vif filenames from the actual vid passed in, instead of hardcoding "_1". The existing tests all use vid=1 so behaviour is unchanged, but the helper no longer silently diverges from its documented parameter. - collectCollectionStats logs a glog warning when an EC volume's summed delete count exceeds its deduped file count, surfacing the anomaly (stale heartbeat, counter drift, etc.) instead of silently dropping the volume from the object count. * ec: derive file/delete counts from .ecx/.ecj file sizes seedCountsFromEcx walked the full .ecx index at volume load, which is wasted work: .ecx has fixed-size entries (NeedleMapEntrySize) and .ecj has fixed-size deletion records (NeedleIdSize), so both counts are pure file-size arithmetic. fileCount = ecxFileSize / NeedleMapEntrySize deleteCount = ecjFileSize / NeedleIdSize Rip out the cached counters, countsLock, seedCountsFromEcx, and the recordDelete helper. Track ecjFileSize directly on the EcVolume struct, seed it from Stat() at load, and bump it on every successful .ecj append inside DeleteNeedleFromEcx under ecjFileAccessLock. Skip the .ecj write entirely when the needle is already tombstoned so the derived delete count stays idempotent on repeat deletes. Heartbeats now compute counts in O(1). Tests updated: the initial fixture pre-populates .ecj with two ids to verify the file-size derivation end-to-end, and the delete test keeps its idempotent-re-delete / missing-needle invariants (unchanged externally, now enforced by the early return rather than a cache guard). * ec: sync Rust volume server with Go file/delete count semantics Mirror the Go-side EC file/delete count work in the Rust volume server so mixed Go/Rust clusters report consistent bucket object counts in the admin dashboard. - Add file_count (8) and delete_count (9) to the Rust copy of VolumeEcShardInformationMessage (seaweed-volume/proto/master.proto). - EcVolume gains ecj_file_size, seeded from the journal's metadata on open and bumped inside journal_delete on every successful append. - file_and_delete_count() returns counts derived in O(1) from ecx_file_size / NEEDLE_MAP_ENTRY_SIZE and ecj_file_size / NEEDLE_ID_SIZE, matching Go's FileAndDeleteCount. - to_volume_ec_shard_information_messages populates the new proto fields instead of defaulting them to zero. - mark_needle_deleted_in_ecx now returns a DeleteOutcome enum (NotFound / AlreadyDeleted / Tombstoned) so journal_delete can skip both the .ecj append and the size bump when the needle is missing or already tombstoned, keeping the derived delete_count idempotent on repeat or no-op deletes. - Rust's EcVolume::new no longer replays .ecj into .ecx on load. Go's RebuildEcxFile is only called from specific decode/rebuild gRPC handlers, not on volume open, and replaying on load was hiding the deletion journal from the new file-size-derived delete counter. rebuild_ecx_from_journal is kept as dead_code for future decode paths that may want the same replay semantics. Also clean up the Go FileAndDeleteCount to drop unnecessary runtime guards against zero constants — NeedleMapEntrySize and NeedleIdSize are compile-time non-zero. test_ec_volume_journal updated to pre-populate the .ecx with the needles it deletes, and extended to verify that repeat and missing-id deletes do not drift the derived counts. * ec: document enterprise-reserved proto field range on ec shard info Both OSS master.proto copies now note that fields 10-19 are reserved for future upstream additions while 20+ are owned by the enterprise fork. Enterprise already pins data_shards/parity_shards at 20/21, so keeping OSS additions inside 8-19 avoids wire-level collisions for mixed deployments. * ec(rust): resolve .ecx/.ecj helpers from ecx_actual_dir ecx_file_name() and ecj_file_name() resolved from self.dir_idx, but new() opens the actual files from ecx_actual_dir (which may fall back to the data dir when the idx dir does not contain the index). After a fallback, read_deleted_needles() and rebuild_ecx_from_journal() would read/rebuild the wrong (nonexistent) path while heartbeats reported counts from the file actually in use — silently dropping deletes. Point idx_base_name() at ecx_actual_dir, which is initialized to dir_idx and only diverges after a successful fallback, so every call site agrees with the file new() has open. The pre-fallback call in new() (line 142) still returns the dir_idx path because ecx_actual_dir == dir_idx at that point. Update the destroy() sweep to build the dir_idx cleanup paths explicitly instead of leaning on the helpers, so post-fallback stale files in the idx dir are still removed. * ec: reset ecj size after rebuild; rollback ecx tombstone on ecj failure Two EC delete-count correctness fixes applied symmetrically to Go and Rust volume servers. 1. rebuild_ecx_from_journal (Rust) now sets ecj_file_size = 0 after recreating the empty journal, matching the on-disk truth. Previously the cached size still reflected the pre-rebuild journal and file_and_delete_count() would keep reporting stale delete counts. The Go side has no equivalent bug because RebuildEcxFile runs in an offline helper that does not touch an EcVolume struct. 2. DeleteNeedleFromEcx / journal_delete used to tombstone the .ecx entry before writing the .ecj record. If the .ecj append then failed, the needle was permanently marked deleted but the heartbeat-reported delete_count never advanced (it is derived from .ecj file size), and a retry would see AlreadyDeleted and early- return, leaving the drift permanent. Both languages now capture the entry's file offset and original size bytes during the mark step, attempt the .ecj append, and on failure roll the .ecx tombstone back by writing the original size bytes at the known offset. A rollback that itself errors is logged (glog / tracing) but cannot re-sync the files — this is the same failure mode a double disk error would produce, and is unavoidable without a full on-disk transaction log. Go: wrap MarkNeedleDeleted in a closure that captures the file offset into an outer variable, then pass the offset + oldSize to the new rollbackEcxTombstone helper on .ecj seek/write errors. Rust: DeleteOutcome::Tombstoned now carries the size_offset and a [u8; SIZE_SIZE] copy of the pre-tombstone size field. journal_delete destructures on Tombstoned and calls restore_ecx_size on .ecj append failure. * test(ec): widen admin /health wait to 180s for cold CI TestEcEndToEnd starts master, 14 volume servers, filer, 2 workers and admin in sequence, then waited only 60s for admin's HTTP server to come up. On cold GitHub runners the tail of the earlier subprocess startups eats most of that budget and the wait occasionally times out (last hit on run 24374773031). The local fast path is still ~20s total, so the bump only extends the timeout ceiling, not the happy path. * test(ec): fork volume servers in parallel in TestEcEndToEnd startWeed is non-blocking (just cmd.Start()), so the per-process fork + mkdir + log-file-open overhead for 14 volume servers was serialized for no reason. On cold CI disks that overhead stacks up and eats into the subsequent admin /health wait, which is how run 24374773031 flaked. Wrap the volume-server loop in a sync.WaitGroup and guard runningCmds with a mutex so concurrent appends are safe. startWeed still calls t.Fatalf on failure, which is fine from a goroutine for a fatal test abort; the fail-fast isn't something we rely on for precise ordering. * ec: fsync ecx before ecj, truncate on failure, harden rebuild Four correctness fixes covering both volume servers. 1. Durability ordering (Go + Rust). After marking the .ecx tombstone we now fsync .ecx before touching .ecj, so a crash between the two files cannot leave the journal with an entry for a needle whose tombstone is still sitting in page cache. Once the fsync returns, the tombstone is the source of truth: reads see "deleted", delete_count may under-count by one (benign, idempotent retries) but never over-reports. If the fsync itself fails we restore the original size bytes and surface the error. The .ecj append is then followed by its own Sync so the reported delete_count matches the on-disk journal once the write returns. 2. .ecj truncation on append failure. write_all may have extended the journal on disk before sync_all / Sync errors out, leaving the cached ecj_file_size out of sync with the physical length and drifting delete_count permanently after restart. Both languages now capture the pre-append size, truncate the file back via set_len / Truncate on any write or sync failure, and only then restore the .ecx tombstone. Truncation errors are logged — same-fd length resets cannot realistically fail — but cannot themselves re-sync the files. 3. Atomic rebuild_ecx_from_journal (Rust, dead code today but wired up on any future decode path). Previously a failed mark_needle_deleted_in_ecx call was swallowed with `let _ = ...` and the journal was still removed, silently losing tombstones. We now bubble up any non-NotFound error, fsync .ecx after the whole replay succeeds, and only then drop and recreate .ecj. NotFound is still ignored (expected race between delete and encode). 4. Missing-.ecx hardening (Rust). mark_needle_deleted_in_ecx used to return Ok(NotFound) when self.ecx_file was None, hiding a closed or corrupt volume behind what looks like an idempotent no-op. It now returns an io::Error carrying the volume id so callers (e.g. journal_delete) fail loudly instead. Existing Go and Rust EC test suites stay green. * ec: make .ecx immutable at runtime; track deletes in memory + .ecj Refactors both volume servers so the sealed sorted .ecx index is never mutated during normal operation. Runtime deletes are committed to the .ecj deletion journal and tracked in an in-memory deleted-needle set; read-path lookups consult that set to mask out deleted ids on top of the immutable .ecx record. Mirrors the intended design on both Go and Rust sides. EcVolume gains a `deletedNeedles` / `deleted_needles` set seeded from .ecj in NewEcVolume / EcVolume::new. DeleteNeedleFromEcx / journal_delete: 1. Looks the needle up read-only in .ecx. 2. Missing needle -> no-op. 3. Pre-existing .ecx tombstone (from a prior decode/rebuild) -> mirror into the in-memory set, no .ecj append. 4. Otherwise append the id to .ecj, fsync, and only then publish the id into the set. A partial write is truncated back to the pre-append length so the on-disk journal and the in-memory set cannot drift. FindNeedleFromEcx / find_needle_from_ecx now return TombstoneFileSize when the id is in the in-memory set, even though the bytes on disk still show the original size. FileAndDeleteCount: fileCount = .ecx size / NeedleMapEntrySize (unchanged) deleteCount = len(deletedNeedles) (was: .ecj size / NeedleIdSize) The RebuildEcxFile / rebuild_ecx_from_journal decode-time helpers still fold .ecj into .ecx — that is the one place tombstones land in the physical index, and it runs offline on closed files. Rust's rebuild helper now also clears the in-memory set when it succeeds. Dead code removed on the Rust side: `DeleteOutcome`, `mark_needle_deleted_in_ecx`, `restore_ecx_size`. Go drops the runtime `rollbackEcxTombstone` path. Neither helper was needed once .ecx stopped being a runtime mutation target. TestEcVolumeSyncEnsuresDeletionsVisible (issue #7751) is rewritten as TestEcVolumeDeleteDurableToJournal, which exercises the full durability chain: delete -> .ecj fsync -> FindNeedleFromEcx masks via the in-memory set -> raw .ecx bytes are *unchanged* -> Close + RebuildEcxFile folds the journal into .ecx -> raw bytes now show the tombstone, as CopyFile in the decode path expects.
1780 lines
55 KiB
Go
1780 lines
55 KiB
Go
package dash
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
|
|
adminplugin "github.com/seaweedfs/seaweedfs/weed/admin/plugin"
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
|
clustermaintenance "github.com/seaweedfs/seaweedfs/weed/cluster/maintenance"
|
|
"github.com/seaweedfs/seaweedfs/weed/credential"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
|
|
|
_ "github.com/seaweedfs/seaweedfs/weed/credential/grpc" // Register gRPC credential store
|
|
)
|
|
|
|
const (
|
|
defaultCacheTimeout = 10 * time.Second
|
|
defaultFilerCacheTimeout = 30 * time.Second
|
|
defaultStatsCacheTimeout = 30 * time.Second
|
|
)
|
|
|
|
// FilerConfig holds filer configuration needed for bucket operations
|
|
type FilerConfig struct {
|
|
BucketsPath string
|
|
FilerGroup string
|
|
}
|
|
|
|
// getFilerConfig retrieves the filer configuration (buckets path and filer group)
|
|
func (s *AdminServer) getFilerConfig() (*FilerConfig, error) {
|
|
config := &FilerConfig{
|
|
BucketsPath: s3_constants.DefaultBucketsPath,
|
|
FilerGroup: "",
|
|
}
|
|
|
|
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("get filer configuration: %w", err)
|
|
}
|
|
if resp.DirBuckets != "" {
|
|
config.BucketsPath = resp.DirBuckets
|
|
}
|
|
config.FilerGroup = resp.FilerGroup
|
|
return nil
|
|
})
|
|
|
|
return config, err
|
|
}
|
|
|
|
// getCollectionName returns the collection name for a bucket, prefixed with filer group if configured
|
|
func getCollectionName(filerGroup, bucketName string) string {
|
|
if filerGroup != "" {
|
|
return fmt.Sprintf("%s_%s", filerGroup, bucketName)
|
|
}
|
|
return bucketName
|
|
}
|
|
|
|
type AdminServer struct {
|
|
masterClient *wdclient.MasterClient
|
|
templateFS http.FileSystem
|
|
dataDir string
|
|
grpcDialOption grpc.DialOption
|
|
cacheExpiration time.Duration
|
|
lastCacheUpdate time.Time
|
|
cachedTopology *ClusterTopology
|
|
|
|
// Filer discovery and caching
|
|
cachedFilers []string
|
|
lastFilerUpdate time.Time
|
|
filerCacheExpiration time.Duration
|
|
|
|
// Credential management
|
|
credentialManager *credential.CredentialManager
|
|
|
|
// Configuration persistence
|
|
configPersistence *ConfigPersistence
|
|
|
|
// Maintenance system
|
|
maintenanceManager *maintenance.MaintenanceManager
|
|
plugin *adminplugin.Plugin
|
|
pluginLock *AdminLockManager
|
|
adminPresenceLock *adminPresenceLock
|
|
expireJobHandler func(jobID string, reason string) (*adminplugin.TrackedJob, bool, error)
|
|
|
|
// Topic retention purger
|
|
topicRetentionPurger *TopicRetentionPurger
|
|
|
|
// Worker gRPC server
|
|
workerGrpcServer *WorkerGrpcServer
|
|
|
|
// Background goroutine lifecycle
|
|
bgCancel context.CancelFunc
|
|
|
|
// Collection statistics caching
|
|
collectionStatsCache map[string]collectionStats
|
|
lastCollectionStatsUpdate time.Time
|
|
collectionStatsCacheThreshold time.Duration
|
|
|
|
s3TablesManager *s3tables.Manager
|
|
icebergPort int
|
|
}
|
|
|
|
// Type definitions moved to types.go
|
|
|
|
func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, icebergPort int) *AdminServer {
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.admin")
|
|
|
|
// Create master client with multiple master support
|
|
masterClient := wdclient.NewMasterClient(
|
|
grpcDialOption,
|
|
"", // filerGroup - not needed for admin
|
|
"admin", // clientType
|
|
"", // clientHost - not needed for admin
|
|
"", // dataCenter - not needed for admin
|
|
"", // rack - not needed for admin
|
|
*pb.ServerAddresses(masters).ToServiceDiscovery(),
|
|
)
|
|
|
|
// Start master client connection process (like shell and filer do)
|
|
bgCtx, bgCancel := context.WithCancel(context.Background())
|
|
go masterClient.KeepConnectedToMaster(bgCtx)
|
|
|
|
lockManager := NewAdminLockManager(masterClient, adminLockClientName)
|
|
presenceLock := newAdminPresenceLock(masterClient)
|
|
if presenceLock != nil {
|
|
presenceLock.Start()
|
|
}
|
|
|
|
server := &AdminServer{
|
|
masterClient: masterClient,
|
|
templateFS: templateFS,
|
|
dataDir: dataDir,
|
|
grpcDialOption: grpcDialOption,
|
|
cacheExpiration: defaultCacheTimeout,
|
|
filerCacheExpiration: defaultFilerCacheTimeout,
|
|
configPersistence: NewConfigPersistence(dataDir),
|
|
collectionStatsCacheThreshold: defaultStatsCacheTimeout,
|
|
s3TablesManager: newS3TablesManager(),
|
|
icebergPort: icebergPort,
|
|
pluginLock: lockManager,
|
|
adminPresenceLock: presenceLock,
|
|
bgCancel: bgCancel,
|
|
}
|
|
|
|
// Initialize topic retention purger
|
|
server.topicRetentionPurger = NewTopicRetentionPurger(server)
|
|
|
|
// Initialize credential manager with defaults
|
|
credentialManager, err := credential.NewCredentialManagerWithDefaults(credential.StoreTypeGrpc)
|
|
if err != nil {
|
|
glog.Warningf("Failed to initialize credential manager: %v", err)
|
|
// Continue without credential manager - will fall back to legacy approach
|
|
} else {
|
|
server.credentialManager = credentialManager
|
|
glog.V(0).Infof("Credential manager initialized with store type: %s", credentialManager.GetStore().GetName())
|
|
|
|
// For stores that need filer address function, configure them
|
|
if store := credentialManager.GetStore(); store != nil {
|
|
if filerFuncSetter, ok := store.(interface {
|
|
SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
|
|
}); ok {
|
|
// Configure the filer address function to dynamically return the current active filer
|
|
// This function will be called each time credentials need to be loaded/saved,
|
|
// so it will automatically use whatever filer is currently available (HA-aware)
|
|
filerFuncSetter.SetFilerAddressFunc(func() pb.ServerAddress {
|
|
return pb.ServerAddress(server.GetFilerAddress())
|
|
}, server.grpcDialOption)
|
|
glog.V(0).Infof("Credential store configured with dynamic filer address function")
|
|
} else {
|
|
glog.V(0).Infof("Credential store %s does not support filer address function", store.GetName())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initialize maintenance system - always initialize even without persistent storage
|
|
var maintenanceConfig *maintenance.MaintenanceConfig
|
|
if server.configPersistence.IsConfigured() {
|
|
var err error
|
|
maintenanceConfig, err = server.configPersistence.LoadMaintenanceConfig()
|
|
if err != nil {
|
|
glog.Errorf("Failed to load maintenance configuration: %v", err)
|
|
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
|
|
}
|
|
|
|
// Apply new defaults to handle schema changes (like enabling by default)
|
|
schema := maintenance.GetMaintenanceConfigSchema()
|
|
if err := schema.ApplyDefaultsToProtobuf(maintenanceConfig); err != nil {
|
|
glog.Warningf("Failed to apply schema defaults to loaded config: %v", err)
|
|
}
|
|
|
|
// Force enable maintenance system for new default behavior
|
|
// This handles the case where old configs had Enabled=false as default
|
|
if !maintenanceConfig.Enabled {
|
|
glog.V(1).Infof("Enabling maintenance system (new default behavior)")
|
|
maintenanceConfig.Enabled = true
|
|
}
|
|
|
|
glog.V(1).Infof("Maintenance system initialized with persistent configuration (enabled: %v)", maintenanceConfig.Enabled)
|
|
} else {
|
|
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
|
|
glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode (enabled: %v)", maintenanceConfig.Enabled)
|
|
}
|
|
|
|
// Always initialize maintenance manager
|
|
server.InitMaintenanceManager(maintenanceConfig)
|
|
|
|
// Load saved task configurations from persistence
|
|
server.loadTaskConfigurationsFromPersistence()
|
|
|
|
// Start maintenance manager if enabled
|
|
if maintenanceConfig.Enabled {
|
|
go func() {
|
|
// Give master client a bit of time to connect before starting scans
|
|
time.Sleep(2 * time.Second)
|
|
if err := server.StartMaintenanceManager(); err != nil {
|
|
glog.Errorf("Failed to start maintenance manager: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
pluginOpts := adminplugin.Options{
|
|
DataDir: dataDir,
|
|
ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) {
|
|
return server.buildDefaultPluginClusterContext(), nil
|
|
},
|
|
LockManager: lockManager,
|
|
ConfigDefaultsProvider: server.enrichConfigDefaults,
|
|
}
|
|
plugin, err := adminplugin.New(pluginOpts)
|
|
if err != nil && dataDir != "" {
|
|
glog.Warningf("Failed to initialize plugin with dataDir=%q: %v. Falling back to in-memory plugin state.", dataDir, err)
|
|
pluginOpts.DataDir = ""
|
|
plugin, err = adminplugin.New(pluginOpts)
|
|
}
|
|
if err != nil {
|
|
glog.Errorf("Failed to initialize plugin: %v", err)
|
|
} else {
|
|
server.plugin = plugin
|
|
glog.V(0).Infof("Plugin enabled")
|
|
go server.monitorVacuumWorker(bgCtx)
|
|
}
|
|
|
|
return server
|
|
}
|
|
|
|
// vacuumToggler abstracts the master's vacuum enable/disable for testing.
|
|
type vacuumToggler interface {
|
|
disableVacuum() error
|
|
enableVacuum() error
|
|
}
|
|
|
|
// masterVacuumToggler implements vacuumToggler via gRPC calls to the master.
|
|
type masterVacuumToggler struct {
|
|
server *AdminServer
|
|
}
|
|
|
|
func (m *masterVacuumToggler) disableVacuum() error {
|
|
return m.server.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_, err := client.DisableVacuum(ctx, &master_pb.DisableVacuumRequest{ByPlugin: true})
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (m *masterVacuumToggler) enableVacuum() error {
|
|
return m.server.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_, err := client.EnableVacuum(ctx, &master_pb.EnableVacuumRequest{ByPlugin: true})
|
|
return err
|
|
})
|
|
}
|
|
|
|
// syncVacuumState performs a single sync step: checks if a vacuum-capable worker
|
|
// is present and calls disable/enable accordingly. Returns the updated state
|
|
// and whether the call failed (for log dedup on retries).
|
|
func syncVacuumState(hasWorker bool, previouslyActive bool, toggler vacuumToggler, retrying bool) (active bool, failed bool) {
|
|
if hasWorker == previouslyActive {
|
|
return previouslyActive, false
|
|
}
|
|
if hasWorker {
|
|
if !retrying {
|
|
glog.V(0).Infof("Vacuum plugin worker connected, disabling master automatic vacuum")
|
|
}
|
|
if err := toggler.disableVacuum(); err != nil {
|
|
glog.Warningf("Failed to disable vacuum on master: %v", err)
|
|
return false, true // retry next tick
|
|
}
|
|
return true, false
|
|
}
|
|
if !retrying {
|
|
glog.V(0).Infof("Vacuum plugin worker disconnected, re-enabling master automatic vacuum")
|
|
}
|
|
if err := toggler.enableVacuum(); err != nil {
|
|
glog.Warningf("Failed to enable vacuum on master: %v", err)
|
|
return true, true // retry next tick
|
|
}
|
|
return false, false
|
|
}
|
|
|
|
// monitorVacuumWorker polls the plugin registry for vacuum-capable workers and
|
|
// disables/enables the master's automatic scheduled vacuum accordingly.
|
|
func (s *AdminServer) monitorVacuumWorker(ctx context.Context) {
|
|
const pollInterval = 30 * time.Second
|
|
ticker := time.NewTicker(pollInterval)
|
|
defer ticker.Stop()
|
|
|
|
toggler := &masterVacuumToggler{server: s}
|
|
vacuumWorkerActive := false
|
|
retrying := false
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if s.plugin == nil {
|
|
continue
|
|
}
|
|
hasWorker := s.plugin.HasCapableWorker("vacuum")
|
|
vacuumWorkerActive, retrying = syncVacuumState(hasWorker, vacuumWorkerActive, toggler, retrying)
|
|
}
|
|
}
|
|
}
|
|
|
|
// loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files
|
|
func (s *AdminServer) loadTaskConfigurationsFromPersistence() {
|
|
if s.configPersistence == nil || !s.configPersistence.IsConfigured() {
|
|
glog.V(1).Infof("Config persistence not available, using default task configurations")
|
|
return
|
|
}
|
|
|
|
// Load task configurations dynamically using the config update registry
|
|
configUpdateRegistry := tasks.GetGlobalConfigUpdateRegistry()
|
|
configUpdateRegistry.UpdateAllConfigs(s.configPersistence)
|
|
}
|
|
|
|
// enrichConfigDefaults is called by the plugin when bootstrapping a job type's
|
|
// default config from its descriptor. For admin_script, it fetches maintenance
|
|
// scripts from the master and uses them as the script default.
|
|
//
|
|
// MIGRATION: This exists to help users migrate from master.toml [master.maintenance]
|
|
// to the admin script plugin worker. Remove after March 2027.
|
|
func (s *AdminServer) enrichConfigDefaults(cfg *plugin_pb.PersistedJobTypeConfig) *plugin_pb.PersistedJobTypeConfig {
|
|
if cfg.JobType != "admin_script" {
|
|
return cfg
|
|
}
|
|
|
|
var maintenanceScripts string
|
|
var sleepMinutes uint32
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
maintenanceScripts = resp.MaintenanceScripts
|
|
sleepMinutes = resp.MaintenanceSleepMinutes
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
glog.V(1).Infof("Could not fetch master configuration for admin_script defaults: %v", err)
|
|
return cfg
|
|
}
|
|
|
|
script := cleanMaintenanceScript(maintenanceScripts)
|
|
if script == "" {
|
|
return cfg
|
|
}
|
|
|
|
interval := int64(sleepMinutes)
|
|
if interval <= 0 {
|
|
interval = clustermaintenance.DefaultMaintenanceSleepMinutes
|
|
}
|
|
|
|
glog.V(0).Infof("Enriching admin_script defaults from master maintenance scripts (interval=%dm)", interval)
|
|
|
|
if cfg.AdminConfigValues == nil {
|
|
cfg.AdminConfigValues = make(map[string]*plugin_pb.ConfigValue)
|
|
}
|
|
cfg.AdminConfigValues["script"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: script},
|
|
}
|
|
cfg.AdminConfigValues["run_interval_minutes"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: interval},
|
|
}
|
|
cfg.UpdatedBy = "master_migration"
|
|
|
|
return cfg
|
|
}
|
|
|
|
// cleanMaintenanceScript strips lock/unlock commands and normalizes a
|
|
// maintenance script string for use with the admin script plugin worker.
|
|
//
|
|
// MIGRATION: Used by enrichConfigDefaults. Remove after March 2027.
|
|
func cleanMaintenanceScript(script string) string {
|
|
script = strings.ReplaceAll(script, "\r\n", "\n")
|
|
var lines []string
|
|
for _, line := range strings.Split(script, "\n") {
|
|
trimmed := strings.TrimSpace(line)
|
|
if trimmed == "" || strings.HasPrefix(trimmed, "#") {
|
|
continue
|
|
}
|
|
// Strip inline comments (e.g., "lock # migration note")
|
|
if idx := strings.Index(trimmed, "#"); idx >= 0 {
|
|
trimmed = strings.TrimSpace(trimmed[:idx])
|
|
if trimmed == "" {
|
|
continue
|
|
}
|
|
}
|
|
firstToken := strings.ToLower(strings.Fields(trimmed)[0])
|
|
if firstToken == "lock" || firstToken == "unlock" {
|
|
continue
|
|
}
|
|
lines = append(lines, trimmed)
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
}
|
|
|
|
// GetCredentialManager returns the credential manager
|
|
func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
|
|
return s.credentialManager
|
|
}
|
|
|
|
// Filer discovery methods moved to client_management.go
|
|
|
|
// Client management methods moved to client_management.go
|
|
|
|
// WithFilerClient and WithVolumeServerClient methods moved to client_management.go
|
|
|
|
// Cluster topology methods moved to cluster_topology.go
|
|
|
|
// getTopologyViaGRPC method moved to cluster_topology.go
|
|
|
|
// InvalidateCache method moved to cluster_topology.go
|
|
|
|
// GetS3BucketsData retrieves Object Store buckets with pagination and sorting
|
|
func (s *AdminServer) GetS3BucketsData(page, pageSize int, sortBy, sortOrder string) (S3BucketsData, error) {
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if pageSize < 1 || pageSize > 1000 {
|
|
pageSize = 100
|
|
}
|
|
if sortBy == "" {
|
|
sortBy = "name"
|
|
}
|
|
if sortOrder == "" {
|
|
sortOrder = "asc"
|
|
}
|
|
|
|
buckets, err := s.GetS3Buckets()
|
|
if err != nil {
|
|
return S3BucketsData{}, err
|
|
}
|
|
|
|
var totalSize int64
|
|
for _, bucket := range buckets {
|
|
totalSize += bucket.PhysicalSize
|
|
}
|
|
|
|
totalBuckets := len(buckets)
|
|
|
|
// Sort buckets
|
|
s.sortBuckets(buckets, sortBy, sortOrder)
|
|
|
|
// Calculate pagination
|
|
totalPages := (totalBuckets + pageSize - 1) / pageSize
|
|
if totalPages == 0 {
|
|
totalPages = 1
|
|
}
|
|
if page > totalPages {
|
|
page = totalPages
|
|
}
|
|
|
|
startIndex := (page - 1) * pageSize
|
|
endIndex := startIndex + pageSize
|
|
if startIndex >= totalBuckets {
|
|
buckets = []S3Bucket{}
|
|
} else {
|
|
if endIndex > totalBuckets {
|
|
endIndex = totalBuckets
|
|
}
|
|
buckets = buckets[startIndex:endIndex]
|
|
}
|
|
|
|
return S3BucketsData{
|
|
Buckets: buckets,
|
|
TotalBuckets: totalBuckets,
|
|
TotalSize: totalSize,
|
|
LastUpdated: time.Now(),
|
|
CurrentPage: page,
|
|
TotalPages: totalPages,
|
|
PageSize: pageSize,
|
|
SortBy: sortBy,
|
|
SortOrder: sortOrder,
|
|
}, nil
|
|
}
|
|
|
|
// sortBuckets sorts the bucket slice in place by the given field and order
|
|
func (s *AdminServer) sortBuckets(buckets []S3Bucket, sortBy, sortOrder string) {
|
|
desc := sortOrder == "desc"
|
|
sort.Slice(buckets, func(i, j int) bool {
|
|
a, b := buckets[i], buckets[j]
|
|
switch sortBy {
|
|
case "owner":
|
|
if a.Owner != b.Owner {
|
|
if desc {
|
|
return a.Owner > b.Owner
|
|
}
|
|
return a.Owner < b.Owner
|
|
}
|
|
case "created":
|
|
if !a.CreatedAt.Equal(b.CreatedAt) {
|
|
if desc {
|
|
return a.CreatedAt.After(b.CreatedAt)
|
|
}
|
|
return a.CreatedAt.Before(b.CreatedAt)
|
|
}
|
|
case "objects":
|
|
if a.ObjectCount != b.ObjectCount {
|
|
if desc {
|
|
return a.ObjectCount > b.ObjectCount
|
|
}
|
|
return a.ObjectCount < b.ObjectCount
|
|
}
|
|
case "logical_size":
|
|
if a.LogicalSize != b.LogicalSize {
|
|
if desc {
|
|
return a.LogicalSize > b.LogicalSize
|
|
}
|
|
return a.LogicalSize < b.LogicalSize
|
|
}
|
|
case "physical_size":
|
|
if a.PhysicalSize != b.PhysicalSize {
|
|
if desc {
|
|
return a.PhysicalSize > b.PhysicalSize
|
|
}
|
|
return a.PhysicalSize < b.PhysicalSize
|
|
}
|
|
}
|
|
// Tie-breaker: sort by name (also the default/primary for sortBy=="name")
|
|
if a.Name != b.Name {
|
|
if desc {
|
|
return a.Name > b.Name
|
|
}
|
|
return a.Name < b.Name
|
|
}
|
|
return false
|
|
})
|
|
}
|
|
|
|
// GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections
|
|
func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
|
|
var buckets []S3Bucket
|
|
|
|
// Collect volume information by collection with caching
|
|
collectionMap, _ := s.getCollectionStats()
|
|
|
|
// Get filer configuration (buckets path and filer group)
|
|
filerConfig, err := s.getFilerConfig()
|
|
if err != nil {
|
|
glog.Warningf("Failed to get filer configuration, using defaults: %v", err)
|
|
}
|
|
|
|
// Now list buckets from the filer and match with collection data
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// Paginate through all buckets in the buckets directory
|
|
const listPageSize = 1000
|
|
startFrom := ""
|
|
var snapshotTsNs int64
|
|
for {
|
|
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
|
Directory: filerConfig.BucketsPath,
|
|
Prefix: "",
|
|
StartFromFileName: startFrom,
|
|
InclusiveStartFrom: false,
|
|
Limit: listPageSize,
|
|
SnapshotTsNs: snapshotTsNs,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pageCount := 0
|
|
lastName := ""
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
|
|
if snapshotTsNs == 0 && resp.SnapshotTsNs != 0 {
|
|
snapshotTsNs = resp.SnapshotTsNs
|
|
}
|
|
|
|
if resp.Entry == nil {
|
|
continue
|
|
}
|
|
lastName = resp.Entry.Name
|
|
pageCount++
|
|
|
|
if !resp.Entry.IsDirectory {
|
|
continue
|
|
}
|
|
|
|
bucketName := resp.Entry.Name
|
|
if strings.HasPrefix(bucketName, ".") {
|
|
// Skip internal/system directories from Object Store bucket listing.
|
|
continue
|
|
}
|
|
if s3tables.IsTableBucketEntry(resp.Entry) || strings.HasSuffix(bucketName, "--table-s3") {
|
|
// Keep table buckets in the S3 Tables pages, not regular Object Store buckets.
|
|
continue
|
|
}
|
|
|
|
// Determine collection name for this bucket
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
|
|
|
|
// Get size and object count from collection data
|
|
var physicalSize int64
|
|
var logicalSize int64
|
|
var objectCount int64
|
|
if collectionData, exists := collectionMap[collectionName]; exists {
|
|
physicalSize = collectionData.PhysicalSize
|
|
logicalSize = collectionData.LogicalSize
|
|
objectCount = collectionData.FileCount
|
|
}
|
|
|
|
// Get quota information from entry
|
|
quota := resp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
|
|
// Get versioning, object lock, and owner information from extended attributes
|
|
versioningStatus := ""
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
var owner string
|
|
|
|
if resp.Entry.Extended != nil {
|
|
// Use shared utility to extract versioning information
|
|
versioningStatus = extractVersioningFromEntry(resp.Entry)
|
|
|
|
// Use shared utility to extract Object Lock information
|
|
objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(resp.Entry)
|
|
|
|
// Extract owner information
|
|
if ownerBytes, ok := resp.Entry.Extended[s3_constants.AmzIdentityId]; ok {
|
|
owner = string(ownerBytes)
|
|
}
|
|
}
|
|
|
|
var createdAt, lastModified time.Time
|
|
if resp.Entry.Attributes != nil {
|
|
createdAt = time.Unix(resp.Entry.Attributes.Crtime, 0)
|
|
lastModified = time.Unix(resp.Entry.Attributes.Mtime, 0)
|
|
}
|
|
bucket := S3Bucket{
|
|
Name: bucketName,
|
|
CreatedAt: createdAt,
|
|
LogicalSize: logicalSize,
|
|
PhysicalSize: physicalSize,
|
|
ObjectCount: objectCount,
|
|
LastModified: lastModified,
|
|
Quota: quota,
|
|
QuotaEnabled: quotaEnabled,
|
|
VersioningStatus: versioningStatus,
|
|
ObjectLockEnabled: objectLockEnabled,
|
|
ObjectLockMode: objectLockMode,
|
|
ObjectLockDuration: objectLockDuration,
|
|
Owner: owner,
|
|
}
|
|
buckets = append(buckets, bucket)
|
|
}
|
|
|
|
// If we received fewer entries than the page size, we've listed everything
|
|
if pageCount < listPageSize {
|
|
break
|
|
}
|
|
startFrom = lastName
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list Object Store buckets: %w", err)
|
|
}
|
|
|
|
return buckets, nil
|
|
}
|
|
|
|
// GetBucketDetails retrieves detailed information about a specific bucket
|
|
// Note: This no longer lists objects for performance reasons. Use GetS3Buckets for size/count data.
|
|
func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) {
|
|
// Get filer configuration (buckets path)
|
|
filerConfig, err := s.getFilerConfig()
|
|
if err != nil {
|
|
glog.Warningf("Failed to get filer configuration, using defaults: %v", err)
|
|
}
|
|
|
|
details := &BucketDetails{
|
|
Bucket: S3Bucket{
|
|
Name: bucketName,
|
|
},
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
// Get collection data for size and object count with caching
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
|
|
stats, err := s.getCollectionStats()
|
|
if err != nil {
|
|
glog.Warningf("Failed to get collection data: %v", err)
|
|
// Continue without collection data - use zero values
|
|
} else if data, ok := stats[collectionName]; ok {
|
|
details.Bucket.LogicalSize = data.LogicalSize
|
|
details.Bucket.PhysicalSize = data.PhysicalSize
|
|
details.Bucket.ObjectCount = data.FileCount
|
|
}
|
|
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// Get bucket info
|
|
bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: filerConfig.BucketsPath,
|
|
Name: bucketName,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("bucket not found: %w", err)
|
|
}
|
|
|
|
details.Bucket.CreatedAt = time.Unix(bucketResp.Entry.Attributes.Crtime, 0)
|
|
details.Bucket.LastModified = time.Unix(bucketResp.Entry.Attributes.Mtime, 0)
|
|
|
|
// Get quota information from entry
|
|
quota := bucketResp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
details.Bucket.Quota = quota
|
|
details.Bucket.QuotaEnabled = quotaEnabled
|
|
|
|
// Get versioning, object lock, and owner information from extended attributes
|
|
versioningStatus := ""
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
var owner string
|
|
|
|
if bucketResp.Entry.Extended != nil {
|
|
// Use shared utility to extract versioning information
|
|
versioningStatus = extractVersioningFromEntry(bucketResp.Entry)
|
|
|
|
// Use shared utility to extract Object Lock information
|
|
objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(bucketResp.Entry)
|
|
|
|
// Extract owner information
|
|
if ownerBytes, ok := bucketResp.Entry.Extended[s3_constants.AmzIdentityId]; ok {
|
|
owner = string(ownerBytes)
|
|
}
|
|
}
|
|
|
|
details.Bucket.VersioningStatus = versioningStatus
|
|
details.Bucket.ObjectLockEnabled = objectLockEnabled
|
|
details.Bucket.ObjectLockMode = objectLockMode
|
|
details.Bucket.ObjectLockDuration = objectLockDuration
|
|
details.Bucket.Owner = owner
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return details, nil
|
|
}
|
|
|
|
// CreateS3Bucket creates a new S3 bucket
|
|
func (s *AdminServer) CreateS3Bucket(bucketName string) error {
|
|
return s.CreateS3BucketWithQuota(bucketName, 0, false)
|
|
}
|
|
|
|
// DeleteS3Bucket deletes an S3 bucket and all its contents
|
|
func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
|
|
ctx := context.Background()
|
|
|
|
// Get filer configuration (buckets path and filer group)
|
|
filerConfig, err := s.getFilerConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get filer configuration: %w", err)
|
|
}
|
|
|
|
// Check if bucket has Object Lock enabled and if there are locked objects
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
return s3api.CheckBucketForLockedObjects(ctx, client, filerConfig.BucketsPath, bucketName)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete the collection first (same as s3.bucket.delete shell command)
|
|
// This ensures volume data is cleaned up properly
|
|
// Collection name must be prefixed with filer group if configured
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
|
|
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
_, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
|
|
Name: collectionName,
|
|
})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete collection %s: %w", collectionName, err)
|
|
}
|
|
|
|
// Then delete bucket directory recursively from filer
|
|
// Use same parameters as s3.bucket.delete shell command and S3 API
|
|
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
|
|
Directory: filerConfig.BucketsPath,
|
|
Name: bucketName,
|
|
IsDeleteData: false, // Collection already deleted, just remove metadata
|
|
IsRecursive: true,
|
|
IgnoreRecursiveError: true, // Same as S3 API and shell command
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete bucket: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// IsStaticUser checks if a user is a static identity by loading the
|
|
// configuration from the credential manager and checking the IsStatic flag.
|
|
func (s *AdminServer) IsStaticUser(username string) bool {
|
|
if s.credentialManager == nil {
|
|
return false
|
|
}
|
|
s3cfg, err := s.credentialManager.LoadConfiguration(context.Background())
|
|
if err != nil {
|
|
return false
|
|
}
|
|
for _, ident := range s3cfg.Identities {
|
|
if ident.Name == username {
|
|
return ident.IsStatic
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetObjectStoreUsers retrieves object store users from identity.json
|
|
func (s *AdminServer) GetObjectStoreUsers(ctx context.Context) ([]ObjectStoreUser, error) {
|
|
if s.credentialManager == nil {
|
|
return []ObjectStoreUser{}, nil
|
|
}
|
|
|
|
s3cfg, err := s.credentialManager.LoadConfiguration(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load IAM configuration: %w", err)
|
|
}
|
|
|
|
var users []ObjectStoreUser
|
|
|
|
// Convert IAM identities to ObjectStoreUser format
|
|
for _, identity := range s3cfg.Identities {
|
|
// Skip service accounts - they should not be parent users
|
|
if strings.HasPrefix(identity.Name, serviceAccountPrefix) {
|
|
continue
|
|
}
|
|
|
|
user := ObjectStoreUser{
|
|
Username: identity.Name,
|
|
Permissions: identity.Actions,
|
|
IsStatic: identity.IsStatic,
|
|
}
|
|
|
|
// Set email from account if available
|
|
if identity.Account != nil {
|
|
user.Email = identity.Account.EmailAddress
|
|
}
|
|
|
|
// Get first access key for display
|
|
if len(identity.Credentials) > 0 {
|
|
user.AccessKey = identity.Credentials[0].AccessKey
|
|
user.SecretKey = identity.Credentials[0].SecretKey
|
|
}
|
|
|
|
users = append(users, user)
|
|
}
|
|
|
|
return users, nil
|
|
}
|
|
|
|
// Volume server methods moved to volume_management.go
|
|
|
|
// Volume methods moved to volume_management.go
|
|
|
|
// sortVolumes method moved to volume_management.go
|
|
|
|
// GetClusterCollections method moved to collection_management.go
|
|
|
|
// GetClusterMasters retrieves cluster masters data
|
|
func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) {
|
|
var masters []MasterInfo
|
|
var leaderCount int
|
|
|
|
// First, get master information from topology
|
|
topology, err := s.GetClusterTopology()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a map to merge topology and raft data
|
|
masterMap := make(map[string]*MasterInfo)
|
|
|
|
// Add masters from topology
|
|
for _, master := range topology.Masters {
|
|
masterInfo := &MasterInfo{
|
|
Address: pb.ServerAddress(master.Address).ToHttpAddress(),
|
|
IsLeader: master.IsLeader,
|
|
Suffrage: "",
|
|
}
|
|
|
|
if master.IsLeader {
|
|
leaderCount++
|
|
}
|
|
|
|
masterMap[masterInfo.Address] = masterInfo
|
|
}
|
|
|
|
// Then, get additional master information from Raft cluster
|
|
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each raft server
|
|
for _, server := range resp.ClusterServers {
|
|
// Raft stores gRPC addresses, convert to HTTP address
|
|
httpAddress := pb.GrpcAddressToServerAddress(server.Address)
|
|
|
|
// Update existing master info or create new one
|
|
if masterInfo, exists := masterMap[httpAddress]; exists {
|
|
// Update existing master with raft data
|
|
masterInfo.IsLeader = server.IsLeader
|
|
masterInfo.Suffrage = server.Suffrage
|
|
} else {
|
|
// Create new master info from raft data
|
|
masterInfo := &MasterInfo{
|
|
Address: httpAddress,
|
|
IsLeader: server.IsLeader,
|
|
Suffrage: server.Suffrage,
|
|
}
|
|
masterMap[httpAddress] = masterInfo
|
|
}
|
|
|
|
if server.IsLeader {
|
|
// Update leader count based on raft data
|
|
leaderCount = 1 // There should only be one leader
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
// If gRPC call fails, log the error but continue with topology data
|
|
currentMaster := s.masterClient.GetMaster(context.Background())
|
|
glog.Errorf("Failed to get raft cluster servers from master %s: %v", currentMaster, err)
|
|
}
|
|
|
|
// Convert map to slice
|
|
for _, masterInfo := range masterMap {
|
|
masters = append(masters, *masterInfo)
|
|
}
|
|
|
|
// Sort masters by address for consistent ordering on page refresh
|
|
sort.Slice(masters, func(i, j int) bool {
|
|
return masters[i].Address < masters[j].Address
|
|
})
|
|
|
|
// If no masters found at all, add the current master as fallback
|
|
if len(masters) == 0 {
|
|
currentMaster := s.masterClient.GetMaster(context.Background())
|
|
if currentMaster != "" {
|
|
masters = append(masters, MasterInfo{
|
|
Address: pb.ServerAddress(currentMaster).ToHttpAddress(),
|
|
IsLeader: true,
|
|
Suffrage: "Voter",
|
|
})
|
|
leaderCount = 1
|
|
}
|
|
}
|
|
|
|
return &ClusterMastersData{
|
|
Masters: masters,
|
|
TotalMasters: len(masters),
|
|
LeaderCount: leaderCount,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterFilers retrieves cluster filers data
|
|
func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) {
|
|
var filers []FilerInfo
|
|
|
|
// Get filer information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.FilerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each filer node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
filerInfo := FilerInfo{
|
|
Address: pb.ServerAddress(node.Address).ToHttpAddress(),
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
filers = append(filers, filerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get filer nodes from master: %w", err)
|
|
}
|
|
|
|
// Sort filers by address for consistent ordering on page refresh
|
|
sort.Slice(filers, func(i, j int) bool {
|
|
return filers[i].Address < filers[j].Address
|
|
})
|
|
|
|
return &ClusterFilersData{
|
|
Filers: filers,
|
|
TotalFilers: len(filers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterBrokers retrieves cluster message brokers data
|
|
func (s *AdminServer) GetClusterBrokers() (*ClusterBrokersData, error) {
|
|
var brokers []MessageBrokerInfo
|
|
|
|
// Get broker information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each broker node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
brokerInfo := MessageBrokerInfo{
|
|
Address: pb.ServerAddress(node.Address).ToHttpAddress(),
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
brokers = append(brokers, brokerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get broker nodes from master: %w", err)
|
|
}
|
|
|
|
// Sort brokers by address for consistent ordering on page refresh
|
|
sort.Slice(brokers, func(i, j int) bool {
|
|
return brokers[i].Address < brokers[j].Address
|
|
})
|
|
|
|
return &ClusterBrokersData{
|
|
Brokers: brokers,
|
|
TotalBrokers: len(brokers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetAllFilers method moved to client_management.go
|
|
|
|
// GetVolumeDetails method moved to volume_management.go
|
|
|
|
// VacuumVolume method moved to volume_management.go
|
|
|
|
// TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API
|
|
func (as *AdminServer) TriggerTopicRetentionPurgeAPI(w http.ResponseWriter, r *http.Request) {
|
|
err := as.TriggerTopicRetentionPurge()
|
|
if err != nil {
|
|
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{"message": "Topic retention purge triggered successfully"})
|
|
}
|
|
|
|
// GetConfigInfo returns information about the admin configuration
|
|
func (as *AdminServer) GetConfigInfo(w http.ResponseWriter, r *http.Request) {
|
|
configInfo := as.configPersistence.GetConfigInfo()
|
|
|
|
// Add additional admin server info
|
|
currentMaster := as.masterClient.GetMaster(context.Background())
|
|
configInfo["master_address"] = string(currentMaster)
|
|
configInfo["cache_expiration"] = as.cacheExpiration.String()
|
|
configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String()
|
|
|
|
// Add maintenance system info
|
|
if as.maintenanceManager != nil {
|
|
configInfo["maintenance_enabled"] = true
|
|
configInfo["maintenance_running"] = as.maintenanceManager.IsRunning()
|
|
} else {
|
|
configInfo["maintenance_enabled"] = false
|
|
configInfo["maintenance_running"] = false
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{
|
|
"config_info": configInfo,
|
|
"title": "Configuration Information",
|
|
})
|
|
}
|
|
|
|
// StartWorkerGrpcServer starts the worker gRPC server
|
|
func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error {
|
|
if s.workerGrpcServer != nil {
|
|
return fmt.Errorf("worker gRPC server is already running")
|
|
}
|
|
|
|
s.workerGrpcServer = NewWorkerGrpcServer(s)
|
|
return s.workerGrpcServer.StartWithTLS(grpcPort)
|
|
}
|
|
|
|
// StopWorkerGrpcServer stops the worker gRPC server
|
|
func (s *AdminServer) StopWorkerGrpcServer() error {
|
|
if s.workerGrpcServer != nil {
|
|
err := s.workerGrpcServer.Stop()
|
|
s.workerGrpcServer = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetWorkerGrpcServer returns the worker gRPC server
|
|
func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
|
|
return s.workerGrpcServer
|
|
}
|
|
|
|
// GetWorkerGrpcPort returns the worker gRPC listen port, or 0 when unavailable.
|
|
func (s *AdminServer) GetWorkerGrpcPort() int {
|
|
if s.workerGrpcServer == nil {
|
|
return 0
|
|
}
|
|
return s.workerGrpcServer.ListenPort()
|
|
}
|
|
|
|
// GetPlugin returns the plugin instance when enabled.
|
|
func (s *AdminServer) GetPlugin() *adminplugin.Plugin {
|
|
return s.plugin
|
|
}
|
|
|
|
func (s *AdminServer) acquirePluginLock(reason string) (func(), error) {
|
|
if s == nil || s.pluginLock == nil {
|
|
return func() {}, nil
|
|
}
|
|
return s.pluginLock.Acquire(reason)
|
|
}
|
|
|
|
// RequestPluginJobTypeDescriptor asks one worker for job type schema and returns the descriptor.
|
|
func (s *AdminServer) RequestPluginJobTypeDescriptor(ctx context.Context, jobType string, forceRefresh bool) (*plugin_pb.JobTypeDescriptor, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.RequestConfigSchema(ctx, jobType, forceRefresh)
|
|
}
|
|
|
|
// LoadPluginJobTypeDescriptor loads persisted descriptor for one job type.
|
|
func (s *AdminServer) LoadPluginJobTypeDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.LoadDescriptor(jobType)
|
|
}
|
|
|
|
// SavePluginJobTypeConfig persists plugin job type config in admin data dir.
|
|
func (s *AdminServer) SavePluginJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig) error {
|
|
if s.plugin == nil {
|
|
return fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.SaveJobTypeConfig(config)
|
|
}
|
|
|
|
// LoadPluginJobTypeConfig loads plugin job type config from persistence.
|
|
func (s *AdminServer) LoadPluginJobTypeConfig(jobType string) (*plugin_pb.PersistedJobTypeConfig, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.LoadJobTypeConfig(jobType)
|
|
}
|
|
|
|
// RunPluginDetection triggers one detection pass for a job type and returns proposed jobs.
|
|
func (s *AdminServer) RunPluginDetection(
|
|
ctx context.Context,
|
|
jobType string,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
maxResults int32,
|
|
) ([]*plugin_pb.JobProposal, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
return s.plugin.RunDetection(ctx, jobType, clusterContext, maxResults)
|
|
}
|
|
|
|
// FilterPluginProposalsWithActiveJobs drops proposals already represented by assigned/running jobs.
|
|
func (s *AdminServer) FilterPluginProposalsWithActiveJobs(
|
|
jobType string,
|
|
proposals []*plugin_pb.JobProposal,
|
|
) ([]*plugin_pb.JobProposal, int, error) {
|
|
if s.plugin == nil {
|
|
return nil, 0, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
filtered, skipped := s.plugin.FilterProposalsWithActiveJobs(jobType, proposals)
|
|
return filtered, skipped, nil
|
|
}
|
|
|
|
// RunPluginDetectionWithReport triggers one detection pass and returns request metadata and proposals.
|
|
func (s *AdminServer) RunPluginDetectionWithReport(
|
|
ctx context.Context,
|
|
jobType string,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
maxResults int32,
|
|
) (*adminplugin.DetectionReport, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults)
|
|
}
|
|
|
|
// DispatchPluginProposals dispatches a batch of proposals using the same
|
|
// capacity-aware dispatch logic as the scheduler loop (executor reservation with
|
|
// backoff, per-job retry on transient errors). The plugin lock must already be
|
|
// held by the caller.
|
|
func (s *AdminServer) DispatchPluginProposals(
|
|
ctx context.Context,
|
|
jobType string,
|
|
proposals []*plugin_pb.JobProposal,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
) (successCount, errorCount, canceledCount int, err error) {
|
|
if s.plugin == nil {
|
|
return 0, 0, 0, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
sc, ec, cc := s.plugin.DispatchProposals(ctx, jobType, proposals, clusterContext)
|
|
return sc, ec, cc, nil
|
|
}
|
|
|
|
// ExecutePluginJob dispatches one job to a capable worker and waits for completion.
|
|
func (s *AdminServer) ExecutePluginJob(
|
|
ctx context.Context,
|
|
job *plugin_pb.JobSpec,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
attempt int32,
|
|
) (*plugin_pb.JobCompleted, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
jobType := ""
|
|
if job != nil {
|
|
jobType = strings.TrimSpace(job.JobType)
|
|
}
|
|
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin execution %s", jobType))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
return s.plugin.ExecuteJob(ctx, job, clusterContext, attempt)
|
|
}
|
|
|
|
// GetPluginRunHistory returns the bounded run history (last 10 success + last 10 error).
|
|
func (s *AdminServer) GetPluginRunHistory(jobType string) (*adminplugin.JobTypeRunHistory, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.LoadRunHistory(jobType)
|
|
}
|
|
|
|
// ListPluginJobTypes returns known plugin job types from connected worker registry and persisted data.
|
|
func (s *AdminServer) ListPluginJobTypes() ([]adminplugin.JobTypeInfo, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.ListKnownJobTypes()
|
|
}
|
|
|
|
// GetPluginWorkers returns currently connected plugin workers.
|
|
func (s *AdminServer) GetPluginWorkers() []*adminplugin.WorkerSession {
|
|
if s.plugin == nil {
|
|
return nil
|
|
}
|
|
return s.plugin.ListWorkers()
|
|
}
|
|
|
|
// ListPluginJobs returns tracked plugin jobs for monitoring.
|
|
func (s *AdminServer) ListPluginJobs(jobType, state string, limit int) []adminplugin.TrackedJob {
|
|
if s.plugin == nil {
|
|
return nil
|
|
}
|
|
return s.plugin.ListTrackedJobs(jobType, state, limit)
|
|
}
|
|
|
|
// GetPluginJob returns one tracked plugin job by ID.
|
|
func (s *AdminServer) GetPluginJob(jobID string) (*adminplugin.TrackedJob, bool) {
|
|
if s.plugin == nil {
|
|
return nil, false
|
|
}
|
|
return s.plugin.GetTrackedJob(jobID)
|
|
}
|
|
|
|
// GetPluginJobDetail returns detailed plugin job information with activity timeline.
|
|
func (s *AdminServer) GetPluginJobDetail(jobID string, activityLimit, relatedLimit int) (*adminplugin.JobDetail, bool, error) {
|
|
if s.plugin == nil {
|
|
return nil, false, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.BuildJobDetail(jobID, activityLimit, relatedLimit)
|
|
}
|
|
|
|
// ExpirePluginJob marks an active plugin job as failed so it no longer blocks scheduling.
|
|
func (s *AdminServer) ExpirePluginJob(jobID, reason string) (*adminplugin.TrackedJob, bool, error) {
|
|
if handler := s.expireJobHandler; handler != nil {
|
|
return handler(jobID, reason)
|
|
}
|
|
if s.plugin == nil {
|
|
return nil, false, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.ExpireJob(jobID, reason)
|
|
}
|
|
|
|
// ListPluginActivities returns plugin job activities for monitoring.
|
|
func (s *AdminServer) ListPluginActivities(jobType string, limit int) []adminplugin.JobActivity {
|
|
if s.plugin == nil {
|
|
return nil
|
|
}
|
|
return s.plugin.ListActivities(jobType, limit)
|
|
}
|
|
|
|
// ListPluginSchedulerStates returns per-job-type scheduler state.
|
|
func (s *AdminServer) ListPluginSchedulerStates() ([]adminplugin.SchedulerJobTypeState, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.ListSchedulerStates()
|
|
}
|
|
|
|
// Maintenance system integration methods
|
|
|
|
// InitMaintenanceManager initializes the maintenance manager
|
|
func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
|
|
s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
|
|
|
|
// Set up task persistence if config persistence is available
|
|
if s.configPersistence != nil {
|
|
queue := s.maintenanceManager.GetQueue()
|
|
if queue != nil {
|
|
queue.SetPersistence(s.configPersistence)
|
|
|
|
// Load tasks from persistence on startup
|
|
if err := queue.LoadTasksFromPersistence(); err != nil {
|
|
glog.Errorf("Failed to load tasks from persistence: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
|
|
}
|
|
|
|
// GetMaintenanceManager returns the maintenance manager
|
|
func (s *AdminServer) GetMaintenanceManager() *maintenance.MaintenanceManager {
|
|
return s.maintenanceManager
|
|
}
|
|
|
|
// StartMaintenanceManager starts the maintenance manager
|
|
func (s *AdminServer) StartMaintenanceManager() error {
|
|
if s.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
return s.maintenanceManager.Start()
|
|
}
|
|
|
|
// StopMaintenanceManager stops the maintenance manager
|
|
func (s *AdminServer) StopMaintenanceManager() {
|
|
if s.maintenanceManager != nil {
|
|
s.maintenanceManager.Stop()
|
|
}
|
|
}
|
|
|
|
// TriggerTopicRetentionPurge triggers topic data purging based on retention policies
|
|
func (s *AdminServer) TriggerTopicRetentionPurge() error {
|
|
if s.topicRetentionPurger == nil {
|
|
return fmt.Errorf("topic retention purger not initialized")
|
|
}
|
|
|
|
glog.V(0).Infof("Triggering topic retention purge")
|
|
return s.topicRetentionPurger.PurgeExpiredTopicData()
|
|
}
|
|
|
|
// GetTopicRetentionPurger returns the topic retention purger
|
|
func (s *AdminServer) GetTopicRetentionPurger() *TopicRetentionPurger {
|
|
return s.topicRetentionPurger
|
|
}
|
|
|
|
// CreateTopicWithRetention creates a new topic with optional retention configuration
|
|
func (s *AdminServer) CreateTopicWithRetention(namespace, name string, partitionCount int32, retentionEnabled bool, retentionSeconds int64) error {
|
|
// Find broker leader to create the topic
|
|
brokerLeader, err := s.findBrokerLeader()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find broker leader: %w", err)
|
|
}
|
|
|
|
// Create retention configuration
|
|
var retention *mq_pb.TopicRetention
|
|
if retentionEnabled {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: true,
|
|
RetentionSeconds: retentionSeconds,
|
|
}
|
|
} else {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: false,
|
|
RetentionSeconds: 0,
|
|
}
|
|
}
|
|
|
|
// Create the topic via broker
|
|
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
PartitionCount: partitionCount,
|
|
Retention: retention,
|
|
})
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create topic: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Created topic %s.%s with %d partitions (retention: enabled=%v, seconds=%d)",
|
|
namespace, name, partitionCount, retentionEnabled, retentionSeconds)
|
|
return nil
|
|
}
|
|
|
|
// UpdateTopicRetention updates the retention configuration for an existing topic
|
|
func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, retentionSeconds int64) error {
|
|
// Get broker information from master
|
|
var brokerAddress string
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find the first available broker
|
|
for _, node := range resp.ClusterNodes {
|
|
brokerAddress = node.Address
|
|
break
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get broker nodes from master: %w", err)
|
|
}
|
|
|
|
if brokerAddress == "" {
|
|
return fmt.Errorf("no active brokers found")
|
|
}
|
|
|
|
// Create gRPC connection
|
|
conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to broker: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := mq_pb.NewSeaweedMessagingClient(conn)
|
|
|
|
// First, get the current topic configuration to preserve existing settings
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
currentConfig, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current topic configuration: %w", err)
|
|
}
|
|
|
|
// Create the topic configuration request, preserving all existing settings
|
|
configRequest := &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
// Preserve existing partition count - this is critical!
|
|
PartitionCount: currentConfig.PartitionCount,
|
|
// Preserve existing schema if it exists
|
|
MessageRecordType: currentConfig.MessageRecordType,
|
|
KeyColumns: currentConfig.KeyColumns,
|
|
}
|
|
|
|
// Update only the retention configuration
|
|
if enabled {
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: retentionSeconds,
|
|
Enabled: true,
|
|
}
|
|
} else {
|
|
// Set retention to disabled
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: 0,
|
|
Enabled: false,
|
|
}
|
|
}
|
|
|
|
// Send the configuration request with preserved settings
|
|
_, err = client.ConfigureTopic(ctx, configRequest)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update topic retention: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Updated topic %s.%s retention (enabled: %v, seconds: %d) while preserving %d partitions",
|
|
namespace, name, enabled, retentionSeconds, currentConfig.PartitionCount)
|
|
return nil
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the admin server
|
|
func (s *AdminServer) Shutdown() {
|
|
glog.V(1).Infof("Shutting down admin server...")
|
|
|
|
// Cancel background goroutines (vacuum monitor, etc.)
|
|
if s.bgCancel != nil {
|
|
s.bgCancel()
|
|
}
|
|
|
|
// Stop maintenance manager
|
|
s.StopMaintenanceManager()
|
|
if s.adminPresenceLock != nil {
|
|
s.adminPresenceLock.Stop()
|
|
}
|
|
|
|
if s.plugin != nil {
|
|
s.plugin.Shutdown()
|
|
}
|
|
|
|
// Stop worker gRPC server
|
|
if err := s.StopWorkerGrpcServer(); err != nil {
|
|
glog.Errorf("Failed to stop worker gRPC server: %v", err)
|
|
}
|
|
|
|
// Shutdown credential manager
|
|
if s.credentialManager != nil {
|
|
s.credentialManager.Shutdown()
|
|
}
|
|
|
|
glog.V(1).Infof("Admin server shutdown complete")
|
|
}
|
|
|
|
// Function to extract Object Lock information from bucket entry using shared utilities
|
|
func extractObjectLockInfoFromEntry(entry *filer_pb.Entry) (bool, string, int32) {
|
|
// Try to load Object Lock configuration using shared utility
|
|
if config, found := s3api.LoadObjectLockConfigurationFromExtended(entry); found {
|
|
return s3api.ExtractObjectLockInfoFromConfig(config)
|
|
}
|
|
|
|
return false, "", 0
|
|
}
|
|
|
|
// Function to extract versioning information from bucket entry using shared utilities
|
|
func extractVersioningFromEntry(entry *filer_pb.Entry) string {
|
|
return s3api.GetVersioningStatus(entry)
|
|
}
|
|
|
|
// GetConfigPersistence returns the config persistence manager
|
|
func (as *AdminServer) GetConfigPersistence() *ConfigPersistence {
|
|
return as.configPersistence
|
|
}
|
|
|
|
type collectionStats struct {
|
|
PhysicalSize int64
|
|
LogicalSize int64
|
|
FileCount int64
|
|
}
|
|
|
|
// ecVolumeCounts is used to correctly combine EC volume counts reported by
|
|
// multiple nodes. Every node holding any shard of an EC volume reports the
|
|
// same file_count (total entries in the replicated .ecx), so we dedupe it
|
|
// per volume id. In contrast, a needle delete is applied on exactly one
|
|
// shard holder, so each node reports its own local tombstone count and the
|
|
// true delete total is the sum across nodes.
|
|
type ecVolumeCounts struct {
|
|
collection string
|
|
fileCount uint64
|
|
deleteCount uint64
|
|
}
|
|
|
|
func collectCollectionStats(topologyInfo *master_pb.TopologyInfo) map[string]collectionStats {
|
|
collectionMap := make(map[string]collectionStats)
|
|
ecVolumeAgg := make(map[uint32]*ecVolumeCounts)
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, node := range rack.DataNodeInfos {
|
|
for _, diskInfo := range node.DiskInfos {
|
|
for _, volInfo := range diskInfo.VolumeInfos {
|
|
collection := volInfo.Collection
|
|
if collection == "" {
|
|
collection = "default"
|
|
}
|
|
|
|
data := collectionMap[collection]
|
|
data.PhysicalSize += int64(volInfo.Size)
|
|
rp, _ := super_block.NewReplicaPlacementFromByte(byte(volInfo.ReplicaPlacement))
|
|
// NewReplicaPlacementFromByte never returns a nil rp. If there's an error,
|
|
// it returns a zero-valued ReplicaPlacement, for which GetCopyCount() is 1.
|
|
// This provides a safe fallback, so we can ignore the error.
|
|
replicaCount := int64(rp.GetCopyCount())
|
|
if volInfo.Size >= volInfo.DeletedByteCount {
|
|
data.LogicalSize += int64(volInfo.Size-volInfo.DeletedByteCount) / replicaCount
|
|
}
|
|
if volInfo.FileCount >= volInfo.DeleteCount {
|
|
data.FileCount += int64(volInfo.FileCount-volInfo.DeleteCount) / replicaCount
|
|
}
|
|
collectionMap[collection] = data
|
|
}
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
|
collection := ecShardInfo.Collection
|
|
if collection == "" {
|
|
collection = "default"
|
|
}
|
|
shards := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo)
|
|
data := collectionMap[collection]
|
|
data.PhysicalSize += int64(shards.TotalSize())
|
|
data.LogicalSize += int64(shards.MinusParityShards().TotalSize())
|
|
collectionMap[collection] = data
|
|
|
|
agg, ok := ecVolumeAgg[ecShardInfo.Id]
|
|
if !ok {
|
|
agg = &ecVolumeCounts{collection: collection, fileCount: ecShardInfo.FileCount}
|
|
ecVolumeAgg[ecShardInfo.Id] = agg
|
|
}
|
|
agg.deleteCount += ecShardInfo.DeleteCount
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fold EC per-volume counts into the collection totals. fileCount is
|
|
// already deduped (set once per volume), deleteCount is the sum of
|
|
// local tombstones across every node holding shards of the volume.
|
|
for vid, agg := range ecVolumeAgg {
|
|
data := collectionMap[agg.collection]
|
|
if agg.fileCount >= agg.deleteCount {
|
|
data.FileCount += int64(agg.fileCount - agg.deleteCount)
|
|
} else {
|
|
// Should not happen in steady state — indicates a node reporting
|
|
// a stale fileCount, a skewed heartbeat, or a delete-counter bug.
|
|
// Defend the UI by skipping the add and surface the anomaly.
|
|
glog.Warningf("ec volume %d in collection %q: summed delete_count=%d exceeds file_count=%d; skipping object count",
|
|
vid, agg.collection, agg.deleteCount, agg.fileCount)
|
|
}
|
|
collectionMap[agg.collection] = data
|
|
}
|
|
|
|
return collectionMap
|
|
}
|
|
|
|
// getCollectionStats returns current collection statistics with caching
|
|
func (s *AdminServer) getCollectionStats() (map[string]collectionStats, error) {
|
|
now := time.Now()
|
|
if s.collectionStatsCache != nil && now.Sub(s.lastCollectionStatsUpdate) < s.collectionStatsCacheThreshold {
|
|
return s.collectionStatsCache, nil
|
|
}
|
|
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.TopologyInfo != nil {
|
|
s.collectionStatsCache = collectCollectionStats(resp.TopologyInfo)
|
|
s.lastCollectionStatsUpdate = now
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return s.collectionStatsCache, err
|
|
}
|