|
|
|
|
@@ -257,13 +257,29 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
|
|
|
|
|
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
|
|
|
|
|
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
|
|
|
|
|
// on target.
|
|
|
|
|
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) {
|
|
|
|
|
func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, trigger string) {
|
|
|
|
|
var versionPurgeStatus VersionPurgeStatusType
|
|
|
|
|
var replicationStatus string
|
|
|
|
|
|
|
|
|
|
bucket := dobj.Bucket
|
|
|
|
|
versionID := dobj.DeleteMarkerVersionID
|
|
|
|
|
if versionID == "" {
|
|
|
|
|
versionID = dobj.VersionID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
replStatus := replicationStatus
|
|
|
|
|
if !versionPurgeStatus.Empty() {
|
|
|
|
|
replStatus = string(versionPurgeStatus)
|
|
|
|
|
}
|
|
|
|
|
auditLogInternal(context.Background(), bucket, dobj.ObjectName, AuditLogOptions{
|
|
|
|
|
Trigger: trigger,
|
|
|
|
|
APIName: ReplicateDeleteAPI,
|
|
|
|
|
VersionID: versionID,
|
|
|
|
|
Status: replStatus,
|
|
|
|
|
})
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
|
|
|
if err != nil || rcfg == nil {
|
|
|
|
|
logger.LogIf(ctx, err)
|
|
|
|
|
@@ -308,8 +324,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
replicationStatus := dobj.DeleteMarkerReplicationStatus
|
|
|
|
|
versionPurgeStatus := dobj.VersionPurgeStatus
|
|
|
|
|
replicationStatus = dobj.DeleteMarkerReplicationStatus
|
|
|
|
|
versionPurgeStatus = dobj.VersionPurgeStatus
|
|
|
|
|
|
|
|
|
|
if rmErr != nil {
|
|
|
|
|
if dobj.VersionID == "" {
|
|
|
|
|
@@ -332,7 +348,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
|
|
|
|
|
currStatus = string(versionPurgeStatus)
|
|
|
|
|
}
|
|
|
|
|
// to decrement pending count later.
|
|
|
|
|
globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType)
|
|
|
|
|
globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus),
|
|
|
|
|
replication.StatusType(prevStatus), replication.DeleteReplicationType)
|
|
|
|
|
|
|
|
|
|
var eventName = event.ObjectReplicationComplete
|
|
|
|
|
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
|
|
|
|
|
@@ -606,13 +623,24 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio
|
|
|
|
|
|
|
|
|
|
// replicateObject replicates the specified version of the object to destination bucket
|
|
|
|
|
// The source object is then updated to reflect the replication status.
|
|
|
|
|
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
|
|
|
|
|
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
|
|
|
|
|
Trigger: ReplicationIncomingActivity,
|
|
|
|
|
APIName: "s3:ReplicateObject",
|
|
|
|
|
VersionID: ri.VersionID,
|
|
|
|
|
Status: ri.ReplicationStatus.String(),
|
|
|
|
|
})
|
|
|
|
|
func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, trigger string) {
|
|
|
|
|
var replicationStatus replication.StatusType
|
|
|
|
|
defer func() {
|
|
|
|
|
if replicationStatus.Empty() {
|
|
|
|
|
// replication status is empty means
|
|
|
|
|
// replication was not attempted for some
|
|
|
|
|
// reason, notify the state of the object
|
|
|
|
|
// on disk.
|
|
|
|
|
replicationStatus = ri.ReplicationStatus
|
|
|
|
|
}
|
|
|
|
|
auditLogInternal(ctx, ri.Bucket, ri.Name, AuditLogOptions{
|
|
|
|
|
Trigger: trigger,
|
|
|
|
|
APIName: ReplicateObjectAPI,
|
|
|
|
|
VersionID: ri.VersionID,
|
|
|
|
|
Status: replicationStatus.String(),
|
|
|
|
|
})
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
objInfo := ri.ObjectInfo
|
|
|
|
|
bucket := objInfo.Bucket
|
|
|
|
|
object := objInfo.Name
|
|
|
|
|
@@ -697,8 +725,11 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|
|
|
|
// object with same VersionID already exists, replication kicked off by
|
|
|
|
|
// PutObject might have completed
|
|
|
|
|
if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed {
|
|
|
|
|
// if metadata is not updated for some reason after replication, such as 503 encountered while updating metadata - make sure
|
|
|
|
|
// to set ReplicationStatus as Completed.Note that replication Stats would have been updated despite metadata update failure.
|
|
|
|
|
// if metadata is not updated for some reason after replication, such as
|
|
|
|
|
// 503 encountered while updating metadata - make sure to set ReplicationStatus
|
|
|
|
|
// as Completed.
|
|
|
|
|
//
|
|
|
|
|
// Note: Replication Stats would have been updated despite metadata update failure.
|
|
|
|
|
gr.Close()
|
|
|
|
|
closeOnDefer = false
|
|
|
|
|
popts := ObjectOptions{
|
|
|
|
|
@@ -715,19 +746,13 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|
|
|
|
}
|
|
|
|
|
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
|
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
|
|
|
} else {
|
|
|
|
|
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
|
|
|
|
|
Trigger: ReplicationIncomingActivity,
|
|
|
|
|
APIName: "s3:ReplicateObject",
|
|
|
|
|
VersionID: ri.VersionID,
|
|
|
|
|
Status: ri.ReplicationStatus.String(),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
replicationStatus := replication.Completed
|
|
|
|
|
|
|
|
|
|
replicationStatus = replication.Completed
|
|
|
|
|
// use core client to avoid doing multipart on PUT
|
|
|
|
|
c := &miniogo.Core{Client: tgt.Client}
|
|
|
|
|
if rtype != replicateAll {
|
|
|
|
|
@@ -774,7 +799,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|
|
|
|
if uploadID, err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object, r, objInfo, putOpts); err != nil {
|
|
|
|
|
replicationStatus = replication.Failed
|
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
|
|
|
defer c.AbortMultipartUpload(ctx, dest.Bucket, object, uploadID)
|
|
|
|
|
// block and abort remote upload upon failure.
|
|
|
|
|
c.AbortMultipartUpload(ctx, dest.Bucket, object, uploadID)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
|
|
|
|
@@ -818,16 +844,9 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|
|
|
|
popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
|
|
|
|
}
|
|
|
|
|
if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil {
|
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
|
|
|
|
|
} else {
|
|
|
|
|
auditLogInternal(context.Background(), objInfo.Bucket, objInfo.Name, AuditLogOptions{
|
|
|
|
|
Trigger: ReplicationIncomingActivity,
|
|
|
|
|
APIName: "s3:ReplicateObject",
|
|
|
|
|
VersionID: objInfo.VersionID,
|
|
|
|
|
Status: replicationStatus.String(),
|
|
|
|
|
})
|
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w",
|
|
|
|
|
bucket, objInfo.Name, objInfo.VersionID, err))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
opType := replication.MetadataReplicationType
|
|
|
|
|
if rtype == replicateAll {
|
|
|
|
|
opType = replication.ObjectReplicationType
|
|
|
|
|
@@ -914,21 +933,29 @@ type DeletedObjectReplicationInfo struct {
|
|
|
|
|
ResetID string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Replication specific APIName
|
|
|
|
|
const (
|
|
|
|
|
// ReplicationQueuedActivity - replication being queued activity trail
|
|
|
|
|
ReplicationQueuedActivity = "replication:queue"
|
|
|
|
|
// ReplicationExistingActivity - activity trail for existing objects replication
|
|
|
|
|
ReplicationExistingActivity = "replication:existing"
|
|
|
|
|
// ReplicationMRFActivity - activity trail for replication from Most Recent Failures (MRF) queue
|
|
|
|
|
ReplicationMRFActivity = "replication:mrf"
|
|
|
|
|
// ReplicationIncomingActivity - activity trail indicating replication started [could be from incoming/existing/heal activity]
|
|
|
|
|
ReplicationIncomingActivity = "replication:incoming"
|
|
|
|
|
// ReplicationHealActivity - activity trail for healing of failed/pending replications
|
|
|
|
|
ReplicationHealActivity = "replication:heal"
|
|
|
|
|
// ReplicationDeleteActivity - activity trail for delete replication
|
|
|
|
|
ReplicationDeleteActivity = "replication:delete"
|
|
|
|
|
// ReplicationExistingDeleteActivity - activity trail for delete replication triggered for existing delete markers
|
|
|
|
|
ReplicationExistingDeleteActivity = "replication:delete:existing"
|
|
|
|
|
ReplicateObjectAPI = "ReplicateObject"
|
|
|
|
|
ReplicateDeleteAPI = "ReplicateDelete"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// ReplicateQueued - replication being queued trail
|
|
|
|
|
ReplicateQueued = "replicate:queue"
|
|
|
|
|
|
|
|
|
|
// ReplicateExisting - audit trail for existing objects replication
|
|
|
|
|
ReplicateExisting = "replicate:existing"
|
|
|
|
|
// ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
|
|
|
|
|
ReplicateExistingDelete = "replicate:existing:delete"
|
|
|
|
|
|
|
|
|
|
// ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
|
|
|
|
|
ReplicateMRF = "replicate:mrf"
|
|
|
|
|
// ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity]
|
|
|
|
|
ReplicateIncoming = "replicate:incoming"
|
|
|
|
|
// ReplicateHeal - audit trail for healing of failed/pending replications
|
|
|
|
|
ReplicateHeal = "replicate:heal"
|
|
|
|
|
// ReplicateDelete - audit trail for delete replication
|
|
|
|
|
ReplicateDelete = "replicate:delete"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
@@ -986,7 +1013,7 @@ func (p *ReplicationPool) AddMRFWorker() {
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
replicateObject(p.ctx, oi, p.objLayer)
|
|
|
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateMRF)
|
|
|
|
|
case <-p.mrfWorkerKillCh:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
@@ -1004,12 +1031,12 @@ func (p *ReplicationPool) AddWorker() {
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
replicateObject(p.ctx, oi, p.objLayer)
|
|
|
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateIncoming)
|
|
|
|
|
case doi, ok := <-p.replicaDeleteCh:
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
replicateDelete(p.ctx, doi, p.objLayer)
|
|
|
|
|
replicateDelete(p.ctx, doi, p.objLayer, ReplicateDelete)
|
|
|
|
|
case <-p.workerKillCh:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
@@ -1027,12 +1054,12 @@ func (p *ReplicationPool) AddExistingObjectReplicateWorker() {
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
replicateObject(p.ctx, oi, p.objLayer)
|
|
|
|
|
replicateObject(p.ctx, oi, p.objLayer, ReplicateExisting)
|
|
|
|
|
case doi, ok := <-p.existingReplicaDeleteCh:
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
replicateDelete(p.ctx, doi, p.objLayer)
|
|
|
|
|
replicateDelete(p.ctx, doi, p.objLayer, ReplicateExistingDelete)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1081,12 +1108,6 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) {
|
|
|
|
|
close(p.existingReplicaCh)
|
|
|
|
|
})
|
|
|
|
|
case p.mrfReplicaCh <- ri:
|
|
|
|
|
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
|
|
|
|
|
Trigger: ReplicationMRFActivity,
|
|
|
|
|
APIName: "s3:ReplicateObject",
|
|
|
|
|
VersionID: ri.VersionID,
|
|
|
|
|
Status: ri.ReplicationStatus.String(),
|
|
|
|
|
})
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1096,14 +1117,11 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var ch chan ReplicateObjectInfo
|
|
|
|
|
trigger := ReplicationQueuedActivity
|
|
|
|
|
switch ri.OpType {
|
|
|
|
|
case replication.ExistingObjectReplicationType:
|
|
|
|
|
ch = p.existingReplicaCh
|
|
|
|
|
trigger = ReplicationExistingActivity
|
|
|
|
|
case replication.HealReplicationType:
|
|
|
|
|
ch = p.replicaCh
|
|
|
|
|
trigger = ReplicationHealActivity
|
|
|
|
|
fallthrough
|
|
|
|
|
default:
|
|
|
|
|
ch = p.replicaCh
|
|
|
|
|
}
|
|
|
|
|
@@ -1115,12 +1133,6 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
|
|
|
|
|
close(p.existingReplicaCh)
|
|
|
|
|
})
|
|
|
|
|
case ch <- ri:
|
|
|
|
|
auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{
|
|
|
|
|
Trigger: trigger,
|
|
|
|
|
APIName: "s3:ReplicateObject",
|
|
|
|
|
VersionID: ri.VersionID,
|
|
|
|
|
Status: string(ri.ReplicationStatus),
|
|
|
|
|
})
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1129,15 +1141,13 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
|
|
|
|
|
if p == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
trigger := ReplicationDeleteActivity
|
|
|
|
|
|
|
|
|
|
var ch chan DeletedObjectReplicationInfo
|
|
|
|
|
switch doi.OpType {
|
|
|
|
|
case replication.ExistingObjectReplicationType:
|
|
|
|
|
ch = p.existingReplicaDeleteCh
|
|
|
|
|
trigger = ReplicationExistingDeleteActivity
|
|
|
|
|
case replication.HealReplicationType:
|
|
|
|
|
ch = p.replicaDeleteCh
|
|
|
|
|
trigger = ReplicationHealActivity
|
|
|
|
|
fallthrough
|
|
|
|
|
default:
|
|
|
|
|
ch = p.replicaDeleteCh
|
|
|
|
|
}
|
|
|
|
|
@@ -1149,16 +1159,6 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf
|
|
|
|
|
close(p.existingReplicaDeleteCh)
|
|
|
|
|
})
|
|
|
|
|
case ch <- doi:
|
|
|
|
|
replStatus := doi.DeleteMarkerReplicationStatus
|
|
|
|
|
if doi.VersionPurgeStatus != "" {
|
|
|
|
|
replStatus = string(doi.VersionPurgeStatus)
|
|
|
|
|
}
|
|
|
|
|
auditLogInternal(context.Background(), doi.Bucket, doi.ObjectName, AuditLogOptions{
|
|
|
|
|
Trigger: trigger,
|
|
|
|
|
APIName: "s3:ReplicateDelete",
|
|
|
|
|
VersionID: doi.VersionID,
|
|
|
|
|
Status: replStatus,
|
|
|
|
|
})
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1314,7 +1314,7 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, op
|
|
|
|
|
|
|
|
|
|
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) {
|
|
|
|
|
if sync {
|
|
|
|
|
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o)
|
|
|
|
|
replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o, ReplicateIncoming)
|
|
|
|
|
} else {
|
|
|
|
|
globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType})
|
|
|
|
|
}
|
|
|
|
|
|