mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-22 01:31:34 +00:00
but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted.
1520 lines
44 KiB
Go
1520 lines
44 KiB
Go
package dash
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
|
"github.com/seaweedfs/seaweedfs/weed/credential"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api"
|
|
)
|
|
|
|
type AdminServer struct {
|
|
masterClient *wdclient.MasterClient
|
|
templateFS http.FileSystem
|
|
dataDir string
|
|
grpcDialOption grpc.DialOption
|
|
cacheExpiration time.Duration
|
|
lastCacheUpdate time.Time
|
|
cachedTopology *ClusterTopology
|
|
|
|
// Filer discovery and caching
|
|
cachedFilers []string
|
|
lastFilerUpdate time.Time
|
|
filerCacheExpiration time.Duration
|
|
|
|
// Credential management
|
|
credentialManager *credential.CredentialManager
|
|
|
|
// Configuration persistence
|
|
configPersistence *ConfigPersistence
|
|
|
|
// Maintenance system
|
|
maintenanceManager *maintenance.MaintenanceManager
|
|
|
|
// Topic retention purger
|
|
topicRetentionPurger *TopicRetentionPurger
|
|
|
|
// Worker gRPC server
|
|
workerGrpcServer *WorkerGrpcServer
|
|
}
|
|
|
|
// Type definitions moved to types.go
|
|
|
|
func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string) *AdminServer {
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
|
|
// Create master client with multiple master support
|
|
masterClient := wdclient.NewMasterClient(
|
|
grpcDialOption,
|
|
"", // filerGroup - not needed for admin
|
|
"admin", // clientType
|
|
"", // clientHost - not needed for admin
|
|
"", // dataCenter - not needed for admin
|
|
"", // rack - not needed for admin
|
|
*pb.ServerAddresses(masters).ToServiceDiscovery(),
|
|
)
|
|
|
|
// Start master client connection process (like shell and filer do)
|
|
ctx := context.Background()
|
|
go masterClient.KeepConnectedToMaster(ctx)
|
|
|
|
server := &AdminServer{
|
|
masterClient: masterClient,
|
|
templateFS: templateFS,
|
|
dataDir: dataDir,
|
|
grpcDialOption: grpcDialOption,
|
|
cacheExpiration: 10 * time.Second,
|
|
filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
|
|
configPersistence: NewConfigPersistence(dataDir),
|
|
}
|
|
|
|
// Initialize topic retention purger
|
|
server.topicRetentionPurger = NewTopicRetentionPurger(server)
|
|
|
|
// Initialize credential manager with defaults
|
|
credentialManager, err := credential.NewCredentialManagerWithDefaults("")
|
|
if err != nil {
|
|
glog.Warningf("Failed to initialize credential manager: %v", err)
|
|
// Continue without credential manager - will fall back to legacy approach
|
|
} else {
|
|
// For stores that need filer client details, set them
|
|
if store := credentialManager.GetStore(); store != nil {
|
|
if filerClientSetter, ok := store.(interface {
|
|
SetFilerClient(string, grpc.DialOption)
|
|
}); ok {
|
|
// We'll set the filer client later when we discover filers
|
|
// For now, just store the credential manager
|
|
server.credentialManager = credentialManager
|
|
|
|
// Set up a goroutine to set filer client once we discover filers
|
|
go func() {
|
|
for {
|
|
filerAddr := server.GetFilerAddress()
|
|
if filerAddr != "" {
|
|
filerClientSetter.SetFilerClient(filerAddr, server.grpcDialOption)
|
|
glog.V(1).Infof("Set filer client for credential manager: %s", filerAddr)
|
|
break
|
|
}
|
|
glog.V(1).Infof("Waiting for filer discovery for credential manager...")
|
|
time.Sleep(5 * time.Second) // Retry every 5 seconds
|
|
}
|
|
}()
|
|
} else {
|
|
server.credentialManager = credentialManager
|
|
}
|
|
} else {
|
|
server.credentialManager = credentialManager
|
|
}
|
|
}
|
|
|
|
// Initialize maintenance system with persistent configuration
|
|
if server.configPersistence.IsConfigured() {
|
|
maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig()
|
|
if err != nil {
|
|
glog.Errorf("Failed to load maintenance configuration: %v", err)
|
|
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
|
|
}
|
|
server.InitMaintenanceManager(maintenanceConfig)
|
|
|
|
// Start maintenance manager if enabled
|
|
if maintenanceConfig.Enabled {
|
|
go func() {
|
|
if err := server.StartMaintenanceManager(); err != nil {
|
|
glog.Errorf("Failed to start maintenance manager: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
} else {
|
|
glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode")
|
|
}
|
|
|
|
return server
|
|
}
|
|
|
|
// GetCredentialManager returns the credential manager
|
|
func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
|
|
return s.credentialManager
|
|
}
|
|
|
|
// Filer discovery methods moved to client_management.go
|
|
|
|
// Client management methods moved to client_management.go
|
|
|
|
// WithFilerClient and WithVolumeServerClient methods moved to client_management.go
|
|
|
|
// Cluster topology methods moved to cluster_topology.go
|
|
|
|
// getTopologyViaGRPC method moved to cluster_topology.go
|
|
|
|
// InvalidateCache method moved to cluster_topology.go
|
|
|
|
// GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections
|
|
func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
|
|
var buckets []S3Bucket
|
|
|
|
// Build a map of collection name to collection data
|
|
collectionMap := make(map[string]struct {
|
|
Size int64
|
|
FileCount int64
|
|
})
|
|
|
|
// Collect volume information by collection
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.TopologyInfo != nil {
|
|
for _, dc := range resp.TopologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, node := range rack.DataNodeInfos {
|
|
for _, diskInfo := range node.DiskInfos {
|
|
for _, volInfo := range diskInfo.VolumeInfos {
|
|
collection := volInfo.Collection
|
|
if collection == "" {
|
|
collection = "default"
|
|
}
|
|
|
|
if _, exists := collectionMap[collection]; !exists {
|
|
collectionMap[collection] = struct {
|
|
Size int64
|
|
FileCount int64
|
|
}{}
|
|
}
|
|
|
|
data := collectionMap[collection]
|
|
data.Size += int64(volInfo.Size)
|
|
data.FileCount += int64(volInfo.FileCount)
|
|
collectionMap[collection] = data
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get volume information: %w", err)
|
|
}
|
|
|
|
// Get filer configuration to determine FilerGroup
|
|
var filerGroup string
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
configResp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
|
|
if err != nil {
|
|
glog.Warningf("Failed to get filer configuration: %v", err)
|
|
// Continue without filer group
|
|
return nil
|
|
}
|
|
filerGroup = configResp.FilerGroup
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get filer configuration: %w", err)
|
|
}
|
|
|
|
// Now list buckets from the filer and match with collection data
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// List buckets by looking at the /buckets directory
|
|
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
|
Directory: "/buckets",
|
|
Prefix: "",
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: false,
|
|
Limit: 1000,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
|
|
if resp.Entry.IsDirectory {
|
|
bucketName := resp.Entry.Name
|
|
|
|
// Determine collection name for this bucket
|
|
var collectionName string
|
|
if filerGroup != "" {
|
|
collectionName = fmt.Sprintf("%s_%s", filerGroup, bucketName)
|
|
} else {
|
|
collectionName = bucketName
|
|
}
|
|
|
|
// Get size and object count from collection data
|
|
var size int64
|
|
var objectCount int64
|
|
if collectionData, exists := collectionMap[collectionName]; exists {
|
|
size = collectionData.Size
|
|
objectCount = collectionData.FileCount
|
|
}
|
|
|
|
// Get quota information from entry
|
|
quota := resp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
|
|
// Get versioning and object lock information from extended attributes
|
|
versioningEnabled := false
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
|
|
if resp.Entry.Extended != nil {
|
|
// Use shared utility to extract versioning information
|
|
versioningEnabled = extractVersioningFromEntry(resp.Entry)
|
|
|
|
// Use shared utility to extract Object Lock information
|
|
objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(resp.Entry)
|
|
}
|
|
|
|
bucket := S3Bucket{
|
|
Name: bucketName,
|
|
CreatedAt: time.Unix(resp.Entry.Attributes.Crtime, 0),
|
|
Size: size,
|
|
ObjectCount: objectCount,
|
|
LastModified: time.Unix(resp.Entry.Attributes.Mtime, 0),
|
|
Quota: quota,
|
|
QuotaEnabled: quotaEnabled,
|
|
VersioningEnabled: versioningEnabled,
|
|
ObjectLockEnabled: objectLockEnabled,
|
|
ObjectLockMode: objectLockMode,
|
|
ObjectLockDuration: objectLockDuration,
|
|
}
|
|
buckets = append(buckets, bucket)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list Object Store buckets: %w", err)
|
|
}
|
|
|
|
return buckets, nil
|
|
}
|
|
|
|
// GetBucketDetails retrieves detailed information about a specific bucket
|
|
func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) {
|
|
bucketPath := fmt.Sprintf("/buckets/%s", bucketName)
|
|
|
|
details := &BucketDetails{
|
|
Bucket: S3Bucket{
|
|
Name: bucketName,
|
|
},
|
|
Objects: []S3Object{},
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// Get bucket info
|
|
bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: "/buckets",
|
|
Name: bucketName,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("bucket not found: %w", err)
|
|
}
|
|
|
|
details.Bucket.CreatedAt = time.Unix(bucketResp.Entry.Attributes.Crtime, 0)
|
|
details.Bucket.LastModified = time.Unix(bucketResp.Entry.Attributes.Mtime, 0)
|
|
|
|
// Get quota information from entry
|
|
quota := bucketResp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
details.Bucket.Quota = quota
|
|
details.Bucket.QuotaEnabled = quotaEnabled
|
|
|
|
// Get versioning and object lock information from extended attributes
|
|
versioningEnabled := false
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
|
|
if bucketResp.Entry.Extended != nil {
|
|
// Use shared utility to extract versioning information
|
|
versioningEnabled = extractVersioningFromEntry(bucketResp.Entry)
|
|
|
|
// Use shared utility to extract Object Lock information
|
|
objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(bucketResp.Entry)
|
|
}
|
|
|
|
details.Bucket.VersioningEnabled = versioningEnabled
|
|
details.Bucket.ObjectLockEnabled = objectLockEnabled
|
|
details.Bucket.ObjectLockMode = objectLockMode
|
|
details.Bucket.ObjectLockDuration = objectLockDuration
|
|
|
|
// List objects in bucket (recursively)
|
|
return s.listBucketObjects(client, bucketPath, "", details)
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return details, nil
|
|
}
|
|
|
|
// listBucketObjects recursively lists all objects in a bucket
|
|
func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, directory, prefix string, details *BucketDetails) error {
|
|
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
|
Directory: directory,
|
|
Prefix: prefix,
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: false,
|
|
Limit: 1000,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
|
|
entry := resp.Entry
|
|
if entry.IsDirectory {
|
|
// Recursively list subdirectories
|
|
subDir := fmt.Sprintf("%s/%s", directory, entry.Name)
|
|
err := s.listBucketObjects(client, subDir, "", details)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// Add file object
|
|
objectKey := entry.Name
|
|
if directory != fmt.Sprintf("/buckets/%s", details.Bucket.Name) {
|
|
// Remove bucket prefix to get relative path
|
|
relativePath := directory[len(fmt.Sprintf("/buckets/%s", details.Bucket.Name))+1:]
|
|
objectKey = fmt.Sprintf("%s/%s", relativePath, entry.Name)
|
|
}
|
|
|
|
obj := S3Object{
|
|
Key: objectKey,
|
|
Size: int64(entry.Attributes.FileSize),
|
|
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
|
ETag: "", // Could be calculated from chunks if needed
|
|
StorageClass: "STANDARD",
|
|
}
|
|
|
|
details.Objects = append(details.Objects, obj)
|
|
details.TotalSize += obj.Size
|
|
details.TotalCount++
|
|
}
|
|
}
|
|
|
|
// Update bucket totals
|
|
details.Bucket.Size = details.TotalSize
|
|
details.Bucket.ObjectCount = details.TotalCount
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateS3Bucket creates a new S3 bucket
|
|
func (s *AdminServer) CreateS3Bucket(bucketName string) error {
|
|
return s.CreateS3BucketWithQuota(bucketName, 0, false)
|
|
}
|
|
|
|
// DeleteS3Bucket deletes an S3 bucket and all its contents
|
|
func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
|
|
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// Delete bucket directory recursively
|
|
_, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
|
|
Directory: "/buckets",
|
|
Name: bucketName,
|
|
IsDeleteData: true,
|
|
IsRecursive: true,
|
|
IgnoreRecursiveError: false,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete bucket: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetObjectStoreUsers retrieves object store users from identity.json
|
|
func (s *AdminServer) GetObjectStoreUsers() ([]ObjectStoreUser, error) {
|
|
s3cfg := &iam_pb.S3ApiConfiguration{}
|
|
|
|
// Load IAM configuration from filer
|
|
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
var buf bytes.Buffer
|
|
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
|
|
if err == filer_pb.ErrNotFound {
|
|
// If file doesn't exist, return empty configuration
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if buf.Len() > 0 {
|
|
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
glog.Errorf("Failed to load IAM configuration: %v", err)
|
|
return []ObjectStoreUser{}, nil // Return empty list instead of error for UI
|
|
}
|
|
|
|
var users []ObjectStoreUser
|
|
|
|
// Convert IAM identities to ObjectStoreUser format
|
|
for _, identity := range s3cfg.Identities {
|
|
// Skip anonymous identity
|
|
if identity.Name == "anonymous" {
|
|
continue
|
|
}
|
|
|
|
user := ObjectStoreUser{
|
|
Username: identity.Name,
|
|
Permissions: identity.Actions,
|
|
}
|
|
|
|
// Set email from account if available
|
|
if identity.Account != nil {
|
|
user.Email = identity.Account.EmailAddress
|
|
}
|
|
|
|
// Get first access key for display
|
|
if len(identity.Credentials) > 0 {
|
|
user.AccessKey = identity.Credentials[0].AccessKey
|
|
user.SecretKey = identity.Credentials[0].SecretKey
|
|
}
|
|
|
|
users = append(users, user)
|
|
}
|
|
|
|
return users, nil
|
|
}
|
|
|
|
// Volume server methods moved to volume_management.go
|
|
|
|
// Volume methods moved to volume_management.go
|
|
|
|
// sortVolumes method moved to volume_management.go
|
|
|
|
// GetClusterCollections method moved to collection_management.go
|
|
|
|
// GetClusterMasters retrieves cluster masters data
|
|
func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) {
|
|
var masters []MasterInfo
|
|
var leaderCount int
|
|
|
|
// First, get master information from topology
|
|
topology, err := s.GetClusterTopology()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a map to merge topology and raft data
|
|
masterMap := make(map[string]*MasterInfo)
|
|
|
|
// Add masters from topology
|
|
for _, master := range topology.Masters {
|
|
masterInfo := &MasterInfo{
|
|
Address: master.Address,
|
|
IsLeader: master.IsLeader,
|
|
Suffrage: "",
|
|
}
|
|
|
|
if master.IsLeader {
|
|
leaderCount++
|
|
}
|
|
|
|
masterMap[master.Address] = masterInfo
|
|
}
|
|
|
|
// Then, get additional master information from Raft cluster
|
|
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each raft server
|
|
for _, server := range resp.ClusterServers {
|
|
address := server.Address
|
|
|
|
// Update existing master info or create new one
|
|
if masterInfo, exists := masterMap[address]; exists {
|
|
// Update existing master with raft data
|
|
masterInfo.IsLeader = server.IsLeader
|
|
masterInfo.Suffrage = server.Suffrage
|
|
} else {
|
|
// Create new master info from raft data
|
|
masterInfo := &MasterInfo{
|
|
Address: address,
|
|
IsLeader: server.IsLeader,
|
|
Suffrage: server.Suffrage,
|
|
}
|
|
masterMap[address] = masterInfo
|
|
}
|
|
|
|
if server.IsLeader {
|
|
// Update leader count based on raft data
|
|
leaderCount = 1 // There should only be one leader
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
// If gRPC call fails, log the error but continue with topology data
|
|
currentMaster := s.masterClient.GetMaster(context.Background())
|
|
glog.Errorf("Failed to get raft cluster servers from master %s: %v", currentMaster, err)
|
|
}
|
|
|
|
// Convert map to slice
|
|
for _, masterInfo := range masterMap {
|
|
masters = append(masters, *masterInfo)
|
|
}
|
|
|
|
// If no masters found at all, add the current master as fallback
|
|
if len(masters) == 0 {
|
|
currentMaster := s.masterClient.GetMaster(context.Background())
|
|
if currentMaster != "" {
|
|
masters = append(masters, MasterInfo{
|
|
Address: string(currentMaster),
|
|
IsLeader: true,
|
|
Suffrage: "Voter",
|
|
})
|
|
leaderCount = 1
|
|
}
|
|
}
|
|
|
|
return &ClusterMastersData{
|
|
Masters: masters,
|
|
TotalMasters: len(masters),
|
|
LeaderCount: leaderCount,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterFilers retrieves cluster filers data
|
|
func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) {
|
|
var filers []FilerInfo
|
|
|
|
// Get filer information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.FilerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each filer node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
filerInfo := FilerInfo{
|
|
Address: node.Address,
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
filers = append(filers, filerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get filer nodes from master: %w", err)
|
|
}
|
|
|
|
return &ClusterFilersData{
|
|
Filers: filers,
|
|
TotalFilers: len(filers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterBrokers retrieves cluster message brokers data
|
|
func (s *AdminServer) GetClusterBrokers() (*ClusterBrokersData, error) {
|
|
var brokers []MessageBrokerInfo
|
|
|
|
// Get broker information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each broker node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
brokerInfo := MessageBrokerInfo{
|
|
Address: node.Address,
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
brokers = append(brokers, brokerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get broker nodes from master: %w", err)
|
|
}
|
|
|
|
return &ClusterBrokersData{
|
|
Brokers: brokers,
|
|
TotalBrokers: len(brokers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetAllFilers method moved to client_management.go
|
|
|
|
// GetVolumeDetails method moved to volume_management.go
|
|
|
|
// VacuumVolume method moved to volume_management.go
|
|
|
|
// ShowMaintenanceQueue displays the maintenance queue page
|
|
func (as *AdminServer) ShowMaintenanceQueue(c *gin.Context) {
|
|
data, err := as.getMaintenanceQueueData()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// This should not render HTML template, it should use the component approach
|
|
c.JSON(http.StatusOK, data)
|
|
}
|
|
|
|
// ShowMaintenanceWorkers displays the maintenance workers page
|
|
func (as *AdminServer) ShowMaintenanceWorkers(c *gin.Context) {
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// Create worker details data
|
|
workersData := make([]*WorkerDetailsData, 0, len(workers))
|
|
for _, worker := range workers {
|
|
details, err := as.getMaintenanceWorkerDetails(worker.ID)
|
|
if err != nil {
|
|
// Create basic worker details if we can't get full details
|
|
details = &WorkerDetailsData{
|
|
Worker: worker,
|
|
CurrentTasks: []*MaintenanceTask{},
|
|
RecentTasks: []*MaintenanceTask{},
|
|
Performance: &WorkerPerformance{
|
|
TasksCompleted: 0,
|
|
TasksFailed: 0,
|
|
AverageTaskTime: 0,
|
|
Uptime: 0,
|
|
SuccessRate: 0,
|
|
},
|
|
LastUpdated: time.Now(),
|
|
}
|
|
}
|
|
workersData = append(workersData, details)
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"workers": workersData,
|
|
"title": "Maintenance Workers",
|
|
})
|
|
}
|
|
|
|
// ShowMaintenanceConfig displays the maintenance configuration page
|
|
func (as *AdminServer) ShowMaintenanceConfig(c *gin.Context) {
|
|
config, err := as.getMaintenanceConfig()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// This should not render HTML template, it should use the component approach
|
|
c.JSON(http.StatusOK, config)
|
|
}
|
|
|
|
// UpdateMaintenanceConfig updates maintenance configuration from form
|
|
func (as *AdminServer) UpdateMaintenanceConfig(c *gin.Context) {
|
|
var config MaintenanceConfig
|
|
if err := c.ShouldBind(&config); err != nil {
|
|
c.HTML(http.StatusBadRequest, "error.html", gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
err := as.updateMaintenanceConfig(&config)
|
|
if err != nil {
|
|
c.HTML(http.StatusInternalServerError, "error.html", gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.Redirect(http.StatusSeeOther, "/maintenance/config")
|
|
}
|
|
|
|
// TriggerMaintenanceScan triggers a maintenance scan
|
|
func (as *AdminServer) TriggerMaintenanceScan(c *gin.Context) {
|
|
err := as.triggerMaintenanceScan()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Maintenance scan triggered"})
|
|
}
|
|
|
|
// GetMaintenanceTasks returns all maintenance tasks
|
|
func (as *AdminServer) GetMaintenanceTasks(c *gin.Context) {
|
|
tasks, err := as.getMaintenanceTasks()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, tasks)
|
|
}
|
|
|
|
// GetMaintenanceTask returns a specific maintenance task
|
|
func (as *AdminServer) GetMaintenanceTask(c *gin.Context) {
|
|
taskID := c.Param("id")
|
|
task, err := as.getMaintenanceTask(taskID)
|
|
if err != nil {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "Task not found"})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, task)
|
|
}
|
|
|
|
// CancelMaintenanceTask cancels a pending maintenance task
|
|
func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
|
|
taskID := c.Param("id")
|
|
err := as.cancelMaintenanceTask(taskID)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"})
|
|
}
|
|
|
|
// cancelMaintenanceTask cancels a pending maintenance task
|
|
func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
|
|
if as.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
return as.maintenanceManager.CancelTask(taskID)
|
|
}
|
|
|
|
// GetMaintenanceWorkersAPI returns all maintenance workers
|
|
func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) {
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, workers)
|
|
}
|
|
|
|
// GetMaintenanceWorker returns a specific maintenance worker
|
|
func (as *AdminServer) GetMaintenanceWorker(c *gin.Context) {
|
|
workerID := c.Param("id")
|
|
worker, err := as.getMaintenanceWorkerDetails(workerID)
|
|
if err != nil {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "Worker not found"})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, worker)
|
|
}
|
|
|
|
// GetMaintenanceStats returns maintenance statistics
|
|
func (as *AdminServer) GetMaintenanceStats(c *gin.Context) {
|
|
stats, err := as.getMaintenanceStats()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, stats)
|
|
}
|
|
|
|
// GetMaintenanceConfigAPI returns maintenance configuration
|
|
func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) {
|
|
config, err := as.getMaintenanceConfig()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, config)
|
|
}
|
|
|
|
// UpdateMaintenanceConfigAPI updates maintenance configuration via API
|
|
func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) {
|
|
var config MaintenanceConfig
|
|
if err := c.ShouldBindJSON(&config); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
err := as.updateMaintenanceConfig(&config)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Configuration updated"})
|
|
}
|
|
|
|
// GetMaintenanceConfigData returns maintenance configuration data (public wrapper)
|
|
func (as *AdminServer) GetMaintenanceConfigData() (*maintenance.MaintenanceConfigData, error) {
|
|
return as.getMaintenanceConfig()
|
|
}
|
|
|
|
// UpdateMaintenanceConfigData updates maintenance configuration (public wrapper)
|
|
func (as *AdminServer) UpdateMaintenanceConfigData(config *maintenance.MaintenanceConfig) error {
|
|
return as.updateMaintenanceConfig(config)
|
|
}
|
|
|
|
// Helper methods for maintenance operations
|
|
|
|
// getMaintenanceQueueData returns data for the maintenance queue UI
|
|
func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueData, error) {
|
|
tasks, err := as.getMaintenanceTasks()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stats, err := as.getMaintenanceQueueStats()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &maintenance.MaintenanceQueueData{
|
|
Tasks: tasks,
|
|
Workers: workers,
|
|
Stats: stats,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetMaintenanceQueueStats returns statistics for the maintenance queue (exported for handlers)
|
|
func (as *AdminServer) GetMaintenanceQueueStats() (*maintenance.QueueStats, error) {
|
|
return as.getMaintenanceQueueStats()
|
|
}
|
|
|
|
// getMaintenanceQueueStats returns statistics for the maintenance queue
|
|
func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) {
|
|
if as.maintenanceManager == nil {
|
|
return &maintenance.QueueStats{
|
|
PendingTasks: 0,
|
|
RunningTasks: 0,
|
|
CompletedToday: 0,
|
|
FailedToday: 0,
|
|
TotalTasks: 0,
|
|
}, nil
|
|
}
|
|
|
|
// Get real statistics from maintenance manager
|
|
stats := as.maintenanceManager.GetStats()
|
|
|
|
// Convert MaintenanceStats to QueueStats
|
|
queueStats := &maintenance.QueueStats{
|
|
PendingTasks: stats.TasksByStatus[maintenance.TaskStatusPending],
|
|
RunningTasks: stats.TasksByStatus[maintenance.TaskStatusAssigned] + stats.TasksByStatus[maintenance.TaskStatusInProgress],
|
|
CompletedToday: stats.CompletedToday,
|
|
FailedToday: stats.FailedToday,
|
|
TotalTasks: stats.TotalTasks,
|
|
}
|
|
|
|
return queueStats, nil
|
|
}
|
|
|
|
// getMaintenanceTasks returns all maintenance tasks
|
|
func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) {
|
|
if as.maintenanceManager == nil {
|
|
return []*MaintenanceTask{}, nil
|
|
}
|
|
return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil
|
|
}
|
|
|
|
// getMaintenanceTask returns a specific maintenance task
|
|
func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) {
|
|
if as.maintenanceManager == nil {
|
|
return nil, fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
// Search for the task across all statuses since we don't know which status it has
|
|
statuses := []MaintenanceTaskStatus{
|
|
TaskStatusPending,
|
|
TaskStatusAssigned,
|
|
TaskStatusInProgress,
|
|
TaskStatusCompleted,
|
|
TaskStatusFailed,
|
|
TaskStatusCancelled,
|
|
}
|
|
|
|
for _, status := range statuses {
|
|
tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status
|
|
for _, task := range tasks {
|
|
if task.ID == taskID {
|
|
return task, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("task %s not found", taskID)
|
|
}
|
|
|
|
// getMaintenanceWorkers returns all maintenance workers
|
|
func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
|
|
if as.maintenanceManager == nil {
|
|
return []*MaintenanceWorker{}, nil
|
|
}
|
|
return as.maintenanceManager.GetWorkers(), nil
|
|
}
|
|
|
|
// getMaintenanceWorkerDetails returns detailed information about a worker
|
|
func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDetailsData, error) {
|
|
if as.maintenanceManager == nil {
|
|
return nil, fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
workers := as.maintenanceManager.GetWorkers()
|
|
var targetWorker *MaintenanceWorker
|
|
for _, worker := range workers {
|
|
if worker.ID == workerID {
|
|
targetWorker = worker
|
|
break
|
|
}
|
|
}
|
|
|
|
if targetWorker == nil {
|
|
return nil, fmt.Errorf("worker %s not found", workerID)
|
|
}
|
|
|
|
// Get current tasks for this worker
|
|
currentTasks := as.maintenanceManager.GetTasks(TaskStatusInProgress, "", 0)
|
|
var workerCurrentTasks []*MaintenanceTask
|
|
for _, task := range currentTasks {
|
|
if task.WorkerID == workerID {
|
|
workerCurrentTasks = append(workerCurrentTasks, task)
|
|
}
|
|
}
|
|
|
|
// Get recent tasks for this worker
|
|
recentTasks := as.maintenanceManager.GetTasks(TaskStatusCompleted, "", 10)
|
|
var workerRecentTasks []*MaintenanceTask
|
|
for _, task := range recentTasks {
|
|
if task.WorkerID == workerID {
|
|
workerRecentTasks = append(workerRecentTasks, task)
|
|
}
|
|
}
|
|
|
|
// Calculate performance metrics
|
|
var totalDuration time.Duration
|
|
var completedTasks, failedTasks int
|
|
for _, task := range workerRecentTasks {
|
|
if task.Status == TaskStatusCompleted {
|
|
completedTasks++
|
|
if task.StartedAt != nil && task.CompletedAt != nil {
|
|
totalDuration += task.CompletedAt.Sub(*task.StartedAt)
|
|
}
|
|
} else if task.Status == TaskStatusFailed {
|
|
failedTasks++
|
|
}
|
|
}
|
|
|
|
var averageTaskTime time.Duration
|
|
var successRate float64
|
|
if completedTasks+failedTasks > 0 {
|
|
if completedTasks > 0 {
|
|
averageTaskTime = totalDuration / time.Duration(completedTasks)
|
|
}
|
|
successRate = float64(completedTasks) / float64(completedTasks+failedTasks) * 100
|
|
}
|
|
|
|
return &WorkerDetailsData{
|
|
Worker: targetWorker,
|
|
CurrentTasks: workerCurrentTasks,
|
|
RecentTasks: workerRecentTasks,
|
|
Performance: &WorkerPerformance{
|
|
TasksCompleted: completedTasks,
|
|
TasksFailed: failedTasks,
|
|
AverageTaskTime: averageTaskTime,
|
|
Uptime: time.Since(targetWorker.LastHeartbeat), // This should be tracked properly
|
|
SuccessRate: successRate,
|
|
},
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// getMaintenanceStats returns maintenance statistics
|
|
func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) {
|
|
if as.maintenanceManager == nil {
|
|
return &MaintenanceStats{
|
|
TotalTasks: 0,
|
|
TasksByStatus: make(map[MaintenanceTaskStatus]int),
|
|
TasksByType: make(map[MaintenanceTaskType]int),
|
|
ActiveWorkers: 0,
|
|
}, nil
|
|
}
|
|
return as.maintenanceManager.GetStats(), nil
|
|
}
|
|
|
|
// getMaintenanceConfig returns maintenance configuration
|
|
func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigData, error) {
|
|
// Load configuration from persistent storage
|
|
config, err := as.configPersistence.LoadMaintenanceConfig()
|
|
if err != nil {
|
|
glog.Errorf("Failed to load maintenance configuration: %v", err)
|
|
// Fallback to default configuration
|
|
config = DefaultMaintenanceConfig()
|
|
}
|
|
|
|
// Get system stats from maintenance manager if available
|
|
var systemStats *MaintenanceStats
|
|
if as.maintenanceManager != nil {
|
|
systemStats = as.maintenanceManager.GetStats()
|
|
} else {
|
|
// Fallback stats
|
|
systemStats = &MaintenanceStats{
|
|
TotalTasks: 0,
|
|
TasksByStatus: map[MaintenanceTaskStatus]int{
|
|
TaskStatusPending: 0,
|
|
TaskStatusInProgress: 0,
|
|
TaskStatusCompleted: 0,
|
|
TaskStatusFailed: 0,
|
|
},
|
|
TasksByType: make(map[MaintenanceTaskType]int),
|
|
ActiveWorkers: 0,
|
|
CompletedToday: 0,
|
|
FailedToday: 0,
|
|
AverageTaskTime: 0,
|
|
LastScanTime: time.Now().Add(-time.Hour),
|
|
NextScanTime: time.Now().Add(time.Duration(config.ScanIntervalSeconds) * time.Second),
|
|
}
|
|
}
|
|
|
|
return &MaintenanceConfigData{
|
|
Config: config,
|
|
IsEnabled: config.Enabled,
|
|
LastScanTime: systemStats.LastScanTime,
|
|
NextScanTime: systemStats.NextScanTime,
|
|
SystemStats: systemStats,
|
|
MenuItems: maintenance.BuildMaintenanceMenuItems(),
|
|
}, nil
|
|
}
|
|
|
|
// updateMaintenanceConfig updates maintenance configuration
|
|
func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error {
|
|
// Save configuration to persistent storage
|
|
if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil {
|
|
return fmt.Errorf("failed to save maintenance configuration: %w", err)
|
|
}
|
|
|
|
// Update maintenance manager if available
|
|
if as.maintenanceManager != nil {
|
|
if err := as.maintenanceManager.UpdateConfig(config); err != nil {
|
|
glog.Errorf("Failed to update maintenance manager config: %v", err)
|
|
// Don't return error here, just log it
|
|
}
|
|
}
|
|
|
|
glog.V(1).Infof("Updated maintenance configuration (enabled: %v, scan interval: %ds)",
|
|
config.Enabled, config.ScanIntervalSeconds)
|
|
return nil
|
|
}
|
|
|
|
// triggerMaintenanceScan triggers a maintenance scan
|
|
func (as *AdminServer) triggerMaintenanceScan() error {
|
|
if as.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
|
|
return as.maintenanceManager.TriggerScan()
|
|
}
|
|
|
|
// TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API
|
|
func (as *AdminServer) TriggerTopicRetentionPurgeAPI(c *gin.Context) {
|
|
err := as.TriggerTopicRetentionPurge()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{"message": "Topic retention purge triggered successfully"})
|
|
}
|
|
|
|
// GetConfigInfo returns information about the admin configuration
|
|
func (as *AdminServer) GetConfigInfo(c *gin.Context) {
|
|
configInfo := as.configPersistence.GetConfigInfo()
|
|
|
|
// Add additional admin server info
|
|
currentMaster := as.masterClient.GetMaster(context.Background())
|
|
configInfo["master_address"] = string(currentMaster)
|
|
configInfo["cache_expiration"] = as.cacheExpiration.String()
|
|
configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String()
|
|
|
|
// Add maintenance system info
|
|
if as.maintenanceManager != nil {
|
|
configInfo["maintenance_enabled"] = true
|
|
configInfo["maintenance_running"] = as.maintenanceManager.IsRunning()
|
|
} else {
|
|
configInfo["maintenance_enabled"] = false
|
|
configInfo["maintenance_running"] = false
|
|
}
|
|
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"config_info": configInfo,
|
|
"title": "Configuration Information",
|
|
})
|
|
}
|
|
|
|
// GetMaintenanceWorkersData returns workers data for the maintenance workers page
|
|
func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, error) {
|
|
workers, err := as.getMaintenanceWorkers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create worker details data
|
|
workersData := make([]*WorkerDetailsData, 0, len(workers))
|
|
activeWorkers := 0
|
|
busyWorkers := 0
|
|
totalLoad := 0
|
|
|
|
for _, worker := range workers {
|
|
details, err := as.getMaintenanceWorkerDetails(worker.ID)
|
|
if err != nil {
|
|
// Create basic worker details if we can't get full details
|
|
details = &WorkerDetailsData{
|
|
Worker: worker,
|
|
CurrentTasks: []*MaintenanceTask{},
|
|
RecentTasks: []*MaintenanceTask{},
|
|
Performance: &WorkerPerformance{
|
|
TasksCompleted: 0,
|
|
TasksFailed: 0,
|
|
AverageTaskTime: 0,
|
|
Uptime: 0,
|
|
SuccessRate: 0,
|
|
},
|
|
LastUpdated: time.Now(),
|
|
}
|
|
}
|
|
workersData = append(workersData, details)
|
|
|
|
if worker.Status == "active" {
|
|
activeWorkers++
|
|
} else if worker.Status == "busy" {
|
|
busyWorkers++
|
|
}
|
|
totalLoad += worker.CurrentLoad
|
|
}
|
|
|
|
return &MaintenanceWorkersData{
|
|
Workers: workersData,
|
|
ActiveWorkers: activeWorkers,
|
|
BusyWorkers: busyWorkers,
|
|
TotalLoad: totalLoad,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// StartWorkerGrpcServer starts the worker gRPC server
|
|
func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error {
|
|
if s.workerGrpcServer != nil {
|
|
return fmt.Errorf("worker gRPC server is already running")
|
|
}
|
|
|
|
s.workerGrpcServer = NewWorkerGrpcServer(s)
|
|
return s.workerGrpcServer.StartWithTLS(grpcPort)
|
|
}
|
|
|
|
// StopWorkerGrpcServer stops the worker gRPC server
|
|
func (s *AdminServer) StopWorkerGrpcServer() error {
|
|
if s.workerGrpcServer != nil {
|
|
err := s.workerGrpcServer.Stop()
|
|
s.workerGrpcServer = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetWorkerGrpcServer returns the worker gRPC server
|
|
func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
|
|
return s.workerGrpcServer
|
|
}
|
|
|
|
// Maintenance system integration methods
|
|
|
|
// InitMaintenanceManager initializes the maintenance manager
|
|
func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
|
|
s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
|
|
glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
|
|
}
|
|
|
|
// GetMaintenanceManager returns the maintenance manager
|
|
func (s *AdminServer) GetMaintenanceManager() *maintenance.MaintenanceManager {
|
|
return s.maintenanceManager
|
|
}
|
|
|
|
// StartMaintenanceManager starts the maintenance manager
|
|
func (s *AdminServer) StartMaintenanceManager() error {
|
|
if s.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
return s.maintenanceManager.Start()
|
|
}
|
|
|
|
// StopMaintenanceManager stops the maintenance manager
|
|
func (s *AdminServer) StopMaintenanceManager() {
|
|
if s.maintenanceManager != nil {
|
|
s.maintenanceManager.Stop()
|
|
}
|
|
}
|
|
|
|
// TriggerTopicRetentionPurge triggers topic data purging based on retention policies
|
|
func (s *AdminServer) TriggerTopicRetentionPurge() error {
|
|
if s.topicRetentionPurger == nil {
|
|
return fmt.Errorf("topic retention purger not initialized")
|
|
}
|
|
|
|
glog.V(0).Infof("Triggering topic retention purge")
|
|
return s.topicRetentionPurger.PurgeExpiredTopicData()
|
|
}
|
|
|
|
// GetTopicRetentionPurger returns the topic retention purger
|
|
func (s *AdminServer) GetTopicRetentionPurger() *TopicRetentionPurger {
|
|
return s.topicRetentionPurger
|
|
}
|
|
|
|
// CreateTopicWithRetention creates a new topic with optional retention configuration
|
|
func (s *AdminServer) CreateTopicWithRetention(namespace, name string, partitionCount int32, retentionEnabled bool, retentionSeconds int64) error {
|
|
// Find broker leader to create the topic
|
|
brokerLeader, err := s.findBrokerLeader()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find broker leader: %w", err)
|
|
}
|
|
|
|
// Create retention configuration
|
|
var retention *mq_pb.TopicRetention
|
|
if retentionEnabled {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: true,
|
|
RetentionSeconds: retentionSeconds,
|
|
}
|
|
} else {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: false,
|
|
RetentionSeconds: 0,
|
|
}
|
|
}
|
|
|
|
// Create the topic via broker
|
|
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
PartitionCount: partitionCount,
|
|
Retention: retention,
|
|
})
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create topic: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Created topic %s.%s with %d partitions (retention: enabled=%v, seconds=%d)",
|
|
namespace, name, partitionCount, retentionEnabled, retentionSeconds)
|
|
return nil
|
|
}
|
|
|
|
// UpdateTopicRetention updates the retention configuration for an existing topic
|
|
func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, retentionSeconds int64) error {
|
|
// Get broker information from master
|
|
var brokerAddress string
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find the first available broker
|
|
for _, node := range resp.ClusterNodes {
|
|
brokerAddress = node.Address
|
|
break
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get broker nodes from master: %w", err)
|
|
}
|
|
|
|
if brokerAddress == "" {
|
|
return fmt.Errorf("no active brokers found")
|
|
}
|
|
|
|
// Create gRPC connection
|
|
conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to broker: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := mq_pb.NewSeaweedMessagingClient(conn)
|
|
|
|
// First, get the current topic configuration to preserve existing settings
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
currentConfig, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current topic configuration: %w", err)
|
|
}
|
|
|
|
// Create the topic configuration request, preserving all existing settings
|
|
configRequest := &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
// Preserve existing partition count - this is critical!
|
|
PartitionCount: currentConfig.PartitionCount,
|
|
// Preserve existing record type if it exists
|
|
RecordType: currentConfig.RecordType,
|
|
}
|
|
|
|
// Update only the retention configuration
|
|
if enabled {
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: retentionSeconds,
|
|
Enabled: true,
|
|
}
|
|
} else {
|
|
// Set retention to disabled
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: 0,
|
|
Enabled: false,
|
|
}
|
|
}
|
|
|
|
// Send the configuration request with preserved settings
|
|
_, err = client.ConfigureTopic(ctx, configRequest)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update topic retention: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Updated topic %s.%s retention (enabled: %v, seconds: %d) while preserving %d partitions",
|
|
namespace, name, enabled, retentionSeconds, currentConfig.PartitionCount)
|
|
return nil
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the admin server
|
|
func (s *AdminServer) Shutdown() {
|
|
glog.V(1).Infof("Shutting down admin server...")
|
|
|
|
// Stop maintenance manager
|
|
s.StopMaintenanceManager()
|
|
|
|
// Stop worker gRPC server
|
|
if err := s.StopWorkerGrpcServer(); err != nil {
|
|
glog.Errorf("Failed to stop worker gRPC server: %v", err)
|
|
}
|
|
|
|
glog.V(1).Infof("Admin server shutdown complete")
|
|
}
|
|
|
|
// Function to extract Object Lock information from bucket entry using shared utilities
|
|
func extractObjectLockInfoFromEntry(entry *filer_pb.Entry) (bool, string, int32) {
|
|
// Try to load Object Lock configuration using shared utility
|
|
if config, found := s3api.LoadObjectLockConfigurationFromExtended(entry); found {
|
|
return s3api.ExtractObjectLockInfoFromConfig(config)
|
|
}
|
|
|
|
return false, "", 0
|
|
}
|
|
|
|
// Function to extract versioning information from bucket entry using shared utilities
|
|
func extractVersioningFromEntry(entry *filer_pb.Entry) bool {
|
|
enabled, _ := s3api.LoadVersioningFromExtended(entry)
|
|
return enabled
|
|
}
|