Added replication bulk modal to buckets list (#665)
* Added replication bulk modal to buckets list * Disabled buttons & inputs in loading state Co-authored-by: Benjamin Perez <benjamin@bexsoft.net>
This commit is contained in:
@@ -19,19 +19,28 @@ package restapi
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/runtime/middleware"
|
||||
"github.com/go-openapi/swag"
|
||||
"github.com/minio/console/models"
|
||||
"github.com/minio/console/restapi/operations"
|
||||
"github.com/minio/console/restapi/operations/user_api"
|
||||
"github.com/minio/minio-go/v7/pkg/replication"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
)
|
||||
|
||||
type RemoteBucketResult struct {
|
||||
OriginBucket string
|
||||
TargetBucket string
|
||||
Error string
|
||||
}
|
||||
|
||||
func registerAdminBucketRemoteHandlers(api *operations.ConsoleAPI) {
|
||||
// return list of remote buckets
|
||||
api.UserAPIListRemoteBucketsHandler = user_api.ListRemoteBucketsHandlerFunc(func(params user_api.ListRemoteBucketsParams, session *models.Principal) middleware.Responder {
|
||||
@@ -69,6 +78,38 @@ func registerAdminBucketRemoteHandlers(api *operations.ConsoleAPI) {
|
||||
return user_api.NewAddRemoteBucketCreated()
|
||||
})
|
||||
|
||||
// set multi-bucket replication
|
||||
api.UserAPISetMultiBucketReplicationHandler = user_api.SetMultiBucketReplicationHandlerFunc(func(params user_api.SetMultiBucketReplicationParams, session *models.Principal) middleware.Responder {
|
||||
response, err := setMultiBucketReplicationResponse(session, params)
|
||||
|
||||
if err != nil {
|
||||
return user_api.NewSetMultiBucketReplicationDefault(500).WithPayload(err)
|
||||
}
|
||||
|
||||
return user_api.NewSetMultiBucketReplicationOK().WithPayload(response)
|
||||
})
|
||||
|
||||
// list external buckets
|
||||
api.UserAPIListExternalBucketsHandler = user_api.ListExternalBucketsHandlerFunc(func(params user_api.ListExternalBucketsParams, session *models.Principal) middleware.Responder {
|
||||
response, err := listExternalBucketsResponse(params)
|
||||
|
||||
if err != nil {
|
||||
return user_api.NewListExternalBucketsDefault(500).WithPayload(err)
|
||||
}
|
||||
|
||||
return user_api.NewListExternalBucketsOK().WithPayload(response)
|
||||
})
|
||||
|
||||
// delete replication rule
|
||||
api.UserAPIDeleteBucketReplicationRuleHandler = user_api.DeleteBucketReplicationRuleHandlerFunc(func(params user_api.DeleteBucketReplicationRuleParams, session *models.Principal) middleware.Responder {
|
||||
err := deleteReplicationRuleResponse(session, params)
|
||||
|
||||
if err != nil {
|
||||
return user_api.NewDeleteBucketReplicationRuleDefault(500).WithPayload(err)
|
||||
}
|
||||
|
||||
return user_api.NewDeleteBucketReplicationRuleNoContent()
|
||||
})
|
||||
}
|
||||
|
||||
func getListRemoteBucketsResponse(session *models.Principal) (*models.ListRemoteBucketsResponse, error) {
|
||||
@@ -130,7 +171,7 @@ func getAddRemoteBucketResponse(session *models.Principal, params user_api.AddRe
|
||||
return err
|
||||
}
|
||||
adminClient := adminClient{client: mAdmin}
|
||||
err = addRemoteBucket(ctx, adminClient, *params.Body)
|
||||
_, err = addRemoteBucket(ctx, adminClient, *params.Body)
|
||||
if err != nil {
|
||||
log.Println("error adding remote bucket: ", err)
|
||||
return err
|
||||
@@ -184,13 +225,13 @@ func deleteRemoteBucket(ctx context.Context, client MinioAdmin, sourceBucketName
|
||||
return client.removeRemoteBucket(ctx, sourceBucketName, arn)
|
||||
}
|
||||
|
||||
func addRemoteBucket(ctx context.Context, client MinioAdmin, params models.CreateRemoteBucket) error {
|
||||
func addRemoteBucket(ctx context.Context, client MinioAdmin, params models.CreateRemoteBucket) (string, error) {
|
||||
TargetURL := *params.TargetURL
|
||||
accessKey := *params.AccessKey
|
||||
secretKey := *params.SecretKey
|
||||
u, err := url.Parse(TargetURL)
|
||||
if err != nil {
|
||||
return errors.New("malformed Remote target URL")
|
||||
return "", errors.New("malformed Remote target URL")
|
||||
}
|
||||
secure := u.Scheme == "https"
|
||||
host := u.Host
|
||||
@@ -212,6 +253,225 @@ func addRemoteBucket(ctx context.Context, client MinioAdmin, params models.Creat
|
||||
Type: "replication",
|
||||
Region: params.Region,
|
||||
}
|
||||
_, err = client.addRemoteBucket(ctx, *params.SourceBucket, remoteBucket)
|
||||
return err
|
||||
bucketARN, err := client.addRemoteBucket(ctx, *params.SourceBucket, remoteBucket)
|
||||
|
||||
return bucketARN, err
|
||||
}
|
||||
|
||||
func addBucketReplicationItem(ctx context.Context, session *models.Principal, minClient minioClient, bucketName, arn, destinationBucket string) error {
|
||||
// we will tolerate this call failing
|
||||
cfg, err := minClient.getBucketReplication(ctx, bucketName)
|
||||
if err != nil {
|
||||
log.Println("error versioning bucket:", err)
|
||||
}
|
||||
|
||||
// add rule
|
||||
maxPrio := 0
|
||||
for _, r := range cfg.Rules {
|
||||
if r.Priority > maxPrio {
|
||||
maxPrio = r.Priority
|
||||
}
|
||||
}
|
||||
maxPrio++
|
||||
|
||||
s3Client, err := newS3BucketClient(session, bucketName, "")
|
||||
if err != nil {
|
||||
log.Println("error creating S3Client:", err)
|
||||
return err
|
||||
}
|
||||
// create a mc S3Client interface implementation
|
||||
// defining the client to be used
|
||||
mcClient := mcClient{client: s3Client}
|
||||
|
||||
opts := replication.Options{
|
||||
RoleArn: arn,
|
||||
Priority: fmt.Sprintf("%d", maxPrio),
|
||||
RuleStatus: "enable",
|
||||
DestBucket: destinationBucket,
|
||||
Op: replication.AddOption,
|
||||
}
|
||||
|
||||
err2 := mcClient.setReplication(ctx, &cfg, opts)
|
||||
if err2 != nil {
|
||||
log.Println("error creating replication for bucket:", err2.Cause)
|
||||
return err2.Cause
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func setMultiBucketReplication(ctx context.Context, session *models.Principal, client MinioAdmin, minClient minioClient, params user_api.SetMultiBucketReplicationParams) []RemoteBucketResult {
|
||||
bucketsRelation := params.Body.BucketsRelation
|
||||
|
||||
// Parallel remote bucket adding
|
||||
parallelRemoteBucket := func(bucketRelationData *models.MultiBucketsRelation) chan RemoteBucketResult {
|
||||
remoteProc := make(chan RemoteBucketResult)
|
||||
sourceBucket := bucketRelationData.OriginBucket
|
||||
targetBucket := bucketRelationData.DestinationBucket
|
||||
|
||||
go func() {
|
||||
defer close(remoteProc)
|
||||
|
||||
createRemoteBucketParams := models.CreateRemoteBucket{
|
||||
AccessKey: params.Body.AccessKey,
|
||||
SecretKey: params.Body.SecretKey,
|
||||
SourceBucket: &sourceBucket,
|
||||
TargetBucket: &targetBucket,
|
||||
Region: params.Body.Region,
|
||||
TargetURL: params.Body.TargetURL,
|
||||
}
|
||||
|
||||
// We add the remote bucket reference & store the arn or errors returned
|
||||
arn, err := addRemoteBucket(ctx, client, createRemoteBucketParams)
|
||||
|
||||
if err == nil {
|
||||
err = addBucketReplicationItem(ctx, session, minClient, sourceBucket, arn, targetBucket)
|
||||
}
|
||||
|
||||
var errorReturn = ""
|
||||
|
||||
if err != nil {
|
||||
errorReturn = err.Error()
|
||||
}
|
||||
|
||||
retParams := RemoteBucketResult{
|
||||
OriginBucket: sourceBucket,
|
||||
TargetBucket: targetBucket,
|
||||
Error: errorReturn,
|
||||
}
|
||||
|
||||
remoteProc <- retParams
|
||||
}()
|
||||
return remoteProc
|
||||
}
|
||||
|
||||
var bucketsManagement []chan RemoteBucketResult
|
||||
|
||||
for _, bucketName := range bucketsRelation {
|
||||
// We generate the ARNs for each bucket
|
||||
rBucket := parallelRemoteBucket(bucketName)
|
||||
bucketsManagement = append(bucketsManagement, rBucket)
|
||||
}
|
||||
|
||||
resultsList := []RemoteBucketResult{}
|
||||
for _, result := range bucketsManagement {
|
||||
res := <-result
|
||||
resultsList = append(resultsList, res)
|
||||
}
|
||||
|
||||
return resultsList
|
||||
}
|
||||
|
||||
func setMultiBucketReplicationResponse(session *models.Principal, params user_api.SetMultiBucketReplicationParams) (*models.MultiBucketResponseState, *models.Error) {
|
||||
ctx := context.Background()
|
||||
|
||||
mAdmin, err := newMAdminClient(session)
|
||||
if err != nil {
|
||||
log.Println("error creating Madmin Client:", err)
|
||||
return nil, prepareError(err)
|
||||
}
|
||||
adminClient := adminClient{client: mAdmin}
|
||||
|
||||
mClient, err := newMinioClient(session)
|
||||
if err != nil {
|
||||
log.Println("error creating MinIO Client:", err)
|
||||
return nil, prepareError(err)
|
||||
}
|
||||
// create a minioClient interface implementation
|
||||
// defining the client to be used
|
||||
mnClient := minioClient{client: mClient}
|
||||
|
||||
replicationResults := setMultiBucketReplication(ctx, session, adminClient, mnClient, params)
|
||||
|
||||
if replicationResults == nil {
|
||||
err = errors.New("error setting buckets replication")
|
||||
return nil, prepareError(err)
|
||||
}
|
||||
|
||||
resParsed := []*models.MultiBucketResponseItem{}
|
||||
|
||||
for _, repResult := range replicationResults {
|
||||
responseItem := models.MultiBucketResponseItem{
|
||||
ErrorString: repResult.Error,
|
||||
OriginBucket: repResult.OriginBucket,
|
||||
TargetBucket: repResult.TargetBucket,
|
||||
}
|
||||
|
||||
resParsed = append(resParsed, &responseItem)
|
||||
}
|
||||
|
||||
resultsParsed := models.MultiBucketResponseState{
|
||||
ReplicationState: resParsed,
|
||||
}
|
||||
|
||||
return &resultsParsed, nil
|
||||
}
|
||||
|
||||
func listExternalBucketsResponse(params user_api.ListExternalBucketsParams) (*models.ListBucketsResponse, *models.Error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
|
||||
defer cancel()
|
||||
remoteAdmin, err := newAdminFromCreds(*params.Body.AccessKey, *params.Body.SecretKey, *params.Body.TargetURL, *params.Body.UseTLS)
|
||||
if err != nil {
|
||||
return nil, prepareError(err)
|
||||
}
|
||||
// create a minioClient interface implementation
|
||||
// defining the client to be used
|
||||
remoteClient := adminClient{client: remoteAdmin}
|
||||
buckets, err := getAccountInfo(ctx, remoteClient)
|
||||
if err != nil {
|
||||
return nil, prepareError(err)
|
||||
}
|
||||
|
||||
// serialize output
|
||||
listBucketsResponse := &models.ListBucketsResponse{
|
||||
Buckets: buckets,
|
||||
Total: int64(len(buckets)),
|
||||
}
|
||||
return listBucketsResponse, nil
|
||||
}
|
||||
|
||||
func deleteReplicationRule(ctx context.Context, session *models.Principal, bucketName, ruleID string) error {
|
||||
mClient, err := newMinioClient(session)
|
||||
if err != nil {
|
||||
log.Println("error creating MinIO Client:", err)
|
||||
return err
|
||||
}
|
||||
// create a minioClient interface implementation
|
||||
// defining the client to be used
|
||||
minClient := minioClient{client: mClient}
|
||||
|
||||
cfg, err := minClient.getBucketReplication(ctx, bucketName)
|
||||
if err != nil {
|
||||
log.Println("error versioning bucket:", err)
|
||||
}
|
||||
|
||||
s3Client, err := newS3BucketClient(session, bucketName, "")
|
||||
if err != nil {
|
||||
log.Println("error creating S3Client:", err)
|
||||
return err
|
||||
}
|
||||
// create a mc S3Client interface implementation
|
||||
// defining the client to be used
|
||||
mcClient := mcClient{client: s3Client}
|
||||
|
||||
opts := replication.Options{
|
||||
ID: ruleID,
|
||||
Op: replication.RemoveOption,
|
||||
}
|
||||
|
||||
err2 := mcClient.setReplication(ctx, &cfg, opts)
|
||||
if err2 != nil {
|
||||
return err2.Cause
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteReplicationRuleResponse(session *models.Principal, params user_api.DeleteBucketReplicationRuleParams) *models.Error {
|
||||
ctx := context.Background()
|
||||
|
||||
err := deleteReplicationRule(ctx, session, params.BucketName, params.RuleID)
|
||||
|
||||
if err != nil {
|
||||
return prepareError(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user