Add support for Remote Buckets and Replication (#287)

Co-authored-by: Benjamin Perez <benjamin@bexsoft.net>
Co-authored-by: Daniel Valdivia <hola@danielvaldivia.com>
This commit is contained in:
Alex
2020-09-28 12:46:08 -05:00
committed by GitHub
parent 459e2bf61c
commit 8313a62f17
70 changed files with 7470 additions and 323 deletions

View File

@@ -20,9 +20,12 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/minio/minio-go/v7/pkg/replication"
"github.com/go-openapi/runtime/middleware"
"github.com/go-openapi/swag"
"github.com/minio/console/models"
@@ -73,6 +76,150 @@ func registerBucketsHandlers(api *operations.ConsoleAPI) {
}
return user_api.NewBucketSetPolicyOK().WithPayload(bucketSetPolicyResp)
})
// get bucket versioning
api.UserAPIGetBucketVersioningHandler = user_api.GetBucketVersioningHandlerFunc(func(params user_api.GetBucketVersioningParams, session *models.Principal) middleware.Responder {
getBucketVersioning, err := getBucketVersionedResponse(session, params.BucketName)
if err != nil {
return user_api.NewGetBucketVersioningDefault(500).WithPayload(&models.Error{Code: 500, Message: swag.String(err.Error())})
}
return user_api.NewGetBucketVersioningOK().WithPayload(getBucketVersioning)
})
// get bucket replication
api.UserAPIGetBucketReplicationHandler = user_api.GetBucketReplicationHandlerFunc(func(params user_api.GetBucketReplicationParams, session *models.Principal) middleware.Responder {
getBucketReplication, err := getBucketReplicationdResponse(session, params.BucketName)
if err != nil {
return user_api.NewGetBucketReplicationDefault(500).WithPayload(&models.Error{Code: 500, Message: swag.String(err.Error())})
}
return user_api.NewGetBucketReplicationOK().WithPayload(getBucketReplication)
})
// get bucket replication
api.UserAPIAddBucketReplicationHandler = user_api.AddBucketReplicationHandlerFunc(func(params user_api.AddBucketReplicationParams, session *models.Principal) middleware.Responder {
err := getAddBucketReplicationdResponse(session, params.BucketName, &params)
if err != nil {
return user_api.NewAddBucketReplicationDefault(500).WithPayload(&models.Error{Code: 500, Message: swag.String(err.Error())})
}
return user_api.NewAddBucketReplicationCreated()
})
}
func getAddBucketReplicationdResponse(session *models.Principal, bucketName string, params *user_api.AddBucketReplicationParams) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
mClient, err := newMinioClient(session)
if err != nil {
log.Println("error creating MinIO Client:", err)
return err
}
// create a minioClient interface implementation
// defining the client to be used
minioClient := minioClient{client: mClient}
// we will tolerate this call failing
cfg, err := minioClient.getBucketReplication(ctx, bucketName)
if err != nil {
log.Println("error versioning bucket:", err)
}
// add rule
maxPrio := 0
for _, r := range cfg.Rules {
if r.Priority > maxPrio {
maxPrio = r.Priority
}
}
maxPrio++
s3Client, err := newS3BucketClient(session, bucketName)
if err != nil {
log.Println("error creating S3Client:", err)
return err
}
// create a mc S3Client interface implementation
// defining the client to be used
mcClient := mcClient{client: s3Client}
opts := replication.Options{
RoleArn: params.Body.Arn,
Priority: fmt.Sprintf("%d", maxPrio),
RuleStatus: "enable",
//ID: cliCtx.String("id"),
DestBucket: params.Body.DestinationBucket,
Op: replication.AddOption,
}
err2 := mcClient.setReplication(ctx, &cfg, opts)
if err2 != nil {
log.Println("error creating replication for bucket:", err2.Cause)
return err2.Cause
}
return nil
}
func getBucketReplicationdResponse(session *models.Principal, bucketName string) (*models.BucketReplicationResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
mClient, err := newMinioClient(session)
if err != nil {
log.Println("error creating MinIO Client:", err)
return nil, err
}
// create a minioClient interface implementation
// defining the client to be used
minioClient := minioClient{client: mClient}
// we will tolerate this call failing
res, err := minioClient.getBucketReplication(ctx, bucketName)
if err != nil {
log.Println("error versioning bucket:", err)
}
var rules []*models.BucketReplicationRule
for _, rule := range res.Rules {
rules = append(rules, &models.BucketReplicationRule{
DeleteMarkerReplication: &models.BucketReplicationRuleMarker{Status: string(rule.DeleteMarkerReplication.Status)},
Destination: &models.BucketReplicationDestination{Bucket: rule.Destination.Bucket},
ID: rule.ID,
Priority: int32(rule.Priority),
Status: string(rule.Status),
})
}
// serialize output
listBucketsResponse := &models.BucketReplicationResponse{
Rules: rules,
}
return listBucketsResponse, nil
}
func getBucketVersionedResponse(session *models.Principal, bucketName string) (*models.BucketVersioningResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
mClient, err := newMinioClient(session)
if err != nil {
log.Println("error creating MinIO Client:", err)
return nil, err
}
// create a minioClient interface implementation
// defining the client to be used
minioClient := minioClient{client: mClient}
// we will tolerate this call failing
res, err := minioClient.getBucketVersioning(ctx, bucketName)
if err != nil {
log.Println("error versioning bucket:", err)
}
// serialize output
listBucketsResponse := &models.BucketVersioningResponse{
IsVersioned: res.Status == "Enabled",
}
return listBucketsResponse, nil
}
// getaAcountUsageInfo fetches a list of all buckets allowed to that particular client from MinIO Servers
@@ -142,6 +289,13 @@ func getMakeBucketResponse(session *models.Principal, br *models.MakeBucketReque
if err := makeBucket(ctx, minioClient, *br.Name); err != nil {
return prepareError(err)
}
// if versioned
if br.Versioning {
// we will tolerate this call failing
if err := minioClient.enableVersioning(ctx, *br.Name); err != nil {
log.Println("error versioning bucket:", err)
}
}
return nil
}