From 4f6c74a257e88029f198b6943fa796f8e5b616b1 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 1 Jul 2021 14:02:44 -0700 Subject: [PATCH] simplify audit logging for replication and ILM (#12610) auditLog should be attempted right before the return of the function and not multiple times per function, this ensures that we only trigger it once per function call. --- cmd/bucket-lifecycle.go | 2 +- cmd/bucket-replication.go | 162 +++++++++++++++++++------------------- cmd/data-scanner.go | 26 +++--- internal/logger/audit.go | 9 ++- 4 files changed, 102 insertions(+), 97 deletions(-) diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 019947aab..1ba30c846 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -235,7 +235,7 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *Ob } // Send audit for the lifecycle delete operation - auditLogLifecycle(ctx, oi.Bucket, oi.Name, oi.VersionID, ILMExpiryActivity) + auditLogLifecycle(ctx, *oi, ILMExpiry) eventName := event.ObjectRemovedDelete if lcOpts.DeleteMarker { diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index c7e1e4f82..85899514a 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -257,13 +257,29 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet // target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently // deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds // on target. -func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) { +func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, trigger string) { + var versionPurgeStatus VersionPurgeStatusType + var replicationStatus string + bucket := dobj.Bucket versionID := dobj.DeleteMarkerVersionID if versionID == "" { versionID = dobj.VersionID } + defer func() { + replStatus := replicationStatus + if !versionPurgeStatus.Empty() { + replStatus = string(versionPurgeStatus) + } + auditLogInternal(context.Background(), bucket, dobj.ObjectName, AuditLogOptions{ + Trigger: trigger, + APIName: ReplicateDeleteAPI, + VersionID: versionID, + Status: replStatus, + }) + }() + rcfg, err := getReplicationConfig(ctx, bucket) if err != nil || rcfg == nil { logger.LogIf(ctx, err) @@ -308,8 +324,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj }, }) - replicationStatus := dobj.DeleteMarkerReplicationStatus - versionPurgeStatus := dobj.VersionPurgeStatus + replicationStatus = dobj.DeleteMarkerReplicationStatus + versionPurgeStatus = dobj.VersionPurgeStatus if rmErr != nil { if dobj.VersionID == "" { @@ -332,7 +348,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj currStatus = string(versionPurgeStatus) } // to decrement pending count later. - globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType) + globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus), + replication.StatusType(prevStatus), replication.DeleteReplicationType) var eventName = event.ObjectReplicationComplete if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed { @@ -606,13 +623,24 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. -func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) { - auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ - Trigger: ReplicationIncomingActivity, - APIName: "s3:ReplicateObject", - VersionID: ri.VersionID, - Status: ri.ReplicationStatus.String(), - }) +func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, trigger string) { + var replicationStatus replication.StatusType + defer func() { + if replicationStatus.Empty() { + // replication status is empty means + // replication was not attempted for some + // reason, notify the state of the object + // on disk. + replicationStatus = ri.ReplicationStatus + } + auditLogInternal(ctx, ri.Bucket, ri.Name, AuditLogOptions{ + Trigger: trigger, + APIName: ReplicateObjectAPI, + VersionID: ri.VersionID, + Status: replicationStatus.String(), + }) + }() + objInfo := ri.ObjectInfo bucket := objInfo.Bucket object := objInfo.Name @@ -697,8 +725,11 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje // object with same VersionID already exists, replication kicked off by // PutObject might have completed if objInfo.ReplicationStatus == replication.Pending || objInfo.ReplicationStatus == replication.Failed { - // if metadata is not updated for some reason after replication, such as 503 encountered while updating metadata - make sure - // to set ReplicationStatus as Completed.Note that replication Stats would have been updated despite metadata update failure. + // if metadata is not updated for some reason after replication, such as + // 503 encountered while updating metadata - make sure to set ReplicationStatus + // as Completed. + // + // Note: Replication Stats would have been updated despite metadata update failure. gr.Close() closeOnDefer = false popts := ObjectOptions{ @@ -715,19 +746,13 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje } if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) - } else { - auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ - Trigger: ReplicationIncomingActivity, - APIName: "s3:ReplicateObject", - VersionID: ri.VersionID, - Status: ri.ReplicationStatus.String(), - }) } } return } } - replicationStatus := replication.Completed + + replicationStatus = replication.Completed // use core client to avoid doing multipart on PUT c := &miniogo.Core{Client: tgt.Client} if rtype != replicateAll { @@ -774,7 +799,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje if uploadID, err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object, r, objInfo, putOpts); err != nil { replicationStatus = replication.Failed logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) - defer c.AbortMultipartUpload(ctx, dest.Bucket, object, uploadID) + // block and abort remote upload upon failure. + c.AbortMultipartUpload(ctx, dest.Bucket, object, uploadID) } } else { if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil { @@ -818,16 +844,9 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje popts.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags } if _, err = objectAPI.PutObjectMetadata(ctx, bucket, object, popts); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) - } else { - auditLogInternal(context.Background(), objInfo.Bucket, objInfo.Name, AuditLogOptions{ - Trigger: ReplicationIncomingActivity, - APIName: "s3:ReplicateObject", - VersionID: objInfo.VersionID, - Status: replicationStatus.String(), - }) + logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", + bucket, objInfo.Name, objInfo.VersionID, err)) } - opType := replication.MetadataReplicationType if rtype == replicateAll { opType = replication.ObjectReplicationType @@ -914,21 +933,29 @@ type DeletedObjectReplicationInfo struct { ResetID string } +// Replication specific APIName const ( - // ReplicationQueuedActivity - replication being queued activity trail - ReplicationQueuedActivity = "replication:queue" - // ReplicationExistingActivity - activity trail for existing objects replication - ReplicationExistingActivity = "replication:existing" - // ReplicationMRFActivity - activity trail for replication from Most Recent Failures (MRF) queue - ReplicationMRFActivity = "replication:mrf" - // ReplicationIncomingActivity - activity trail indicating replication started [could be from incoming/existing/heal activity] - ReplicationIncomingActivity = "replication:incoming" - // ReplicationHealActivity - activity trail for healing of failed/pending replications - ReplicationHealActivity = "replication:heal" - // ReplicationDeleteActivity - activity trail for delete replication - ReplicationDeleteActivity = "replication:delete" - // ReplicationExistingDeleteActivity - activity trail for delete replication triggered for existing delete markers - ReplicationExistingDeleteActivity = "replication:delete:existing" + ReplicateObjectAPI = "ReplicateObject" + ReplicateDeleteAPI = "ReplicateDelete" +) + +const ( + // ReplicateQueued - replication being queued trail + ReplicateQueued = "replicate:queue" + + // ReplicateExisting - audit trail for existing objects replication + ReplicateExisting = "replicate:existing" + // ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers + ReplicateExistingDelete = "replicate:existing:delete" + + // ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue + ReplicateMRF = "replicate:mrf" + // ReplicateIncoming - audit trail indicating replication started [could be from incoming/existing/heal activity] + ReplicateIncoming = "replicate:incoming" + // ReplicateHeal - audit trail for healing of failed/pending replications + ReplicateHeal = "replicate:heal" + // ReplicateDelete - audit trail for delete replication + ReplicateDelete = "replicate:delete" ) var ( @@ -986,7 +1013,7 @@ func (p *ReplicationPool) AddMRFWorker() { if !ok { return } - replicateObject(p.ctx, oi, p.objLayer) + replicateObject(p.ctx, oi, p.objLayer, ReplicateMRF) case <-p.mrfWorkerKillCh: return } @@ -1004,12 +1031,12 @@ func (p *ReplicationPool) AddWorker() { if !ok { return } - replicateObject(p.ctx, oi, p.objLayer) + replicateObject(p.ctx, oi, p.objLayer, ReplicateIncoming) case doi, ok := <-p.replicaDeleteCh: if !ok { return } - replicateDelete(p.ctx, doi, p.objLayer) + replicateDelete(p.ctx, doi, p.objLayer, ReplicateDelete) case <-p.workerKillCh: return } @@ -1027,12 +1054,12 @@ func (p *ReplicationPool) AddExistingObjectReplicateWorker() { if !ok { return } - replicateObject(p.ctx, oi, p.objLayer) + replicateObject(p.ctx, oi, p.objLayer, ReplicateExisting) case doi, ok := <-p.existingReplicaDeleteCh: if !ok { return } - replicateDelete(p.ctx, doi, p.objLayer) + replicateDelete(p.ctx, doi, p.objLayer, ReplicateExistingDelete) } } } @@ -1081,12 +1108,6 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { close(p.existingReplicaCh) }) case p.mrfReplicaCh <- ri: - auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ - Trigger: ReplicationMRFActivity, - APIName: "s3:ReplicateObject", - VersionID: ri.VersionID, - Status: ri.ReplicationStatus.String(), - }) default: } } @@ -1096,14 +1117,11 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { return } var ch chan ReplicateObjectInfo - trigger := ReplicationQueuedActivity switch ri.OpType { case replication.ExistingObjectReplicationType: ch = p.existingReplicaCh - trigger = ReplicationExistingActivity case replication.HealReplicationType: - ch = p.replicaCh - trigger = ReplicationHealActivity + fallthrough default: ch = p.replicaCh } @@ -1115,12 +1133,6 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { close(p.existingReplicaCh) }) case ch <- ri: - auditLogInternal(context.Background(), ri.Bucket, ri.Name, AuditLogOptions{ - Trigger: trigger, - APIName: "s3:ReplicateObject", - VersionID: ri.VersionID, - Status: string(ri.ReplicationStatus), - }) default: } } @@ -1129,15 +1141,13 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf if p == nil { return } - trigger := ReplicationDeleteActivity + var ch chan DeletedObjectReplicationInfo switch doi.OpType { case replication.ExistingObjectReplicationType: ch = p.existingReplicaDeleteCh - trigger = ReplicationExistingDeleteActivity case replication.HealReplicationType: - ch = p.replicaDeleteCh - trigger = ReplicationHealActivity + fallthrough default: ch = p.replicaDeleteCh } @@ -1149,16 +1159,6 @@ func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInf close(p.existingReplicaDeleteCh) }) case ch <- doi: - replStatus := doi.DeleteMarkerReplicationStatus - if doi.VersionPurgeStatus != "" { - replStatus = string(doi.VersionPurgeStatus) - } - auditLogInternal(context.Background(), doi.Bucket, doi.ObjectName, AuditLogOptions{ - Trigger: trigger, - APIName: "s3:ReplicateDelete", - VersionID: doi.VersionID, - Status: replStatus, - }) default: } } @@ -1314,7 +1314,7 @@ func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, op func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool, opType replication.Type) { if sync { - replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o) + replicateObject(ctx, ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}, o, ReplicateIncoming) } else { globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: objInfo, OpType: opType}) } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 1d284e486..213ab177e 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -990,7 +990,7 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta opts.VersionID = meta.oi.VersionID _, err = o.DeleteObject(ctx, meta.oi.Bucket, meta.oi.Name, opts) if err == nil { - auditLogLifecycle(ctx, meta.oi.Bucket, meta.oi.Name, meta.oi.VersionID, ILMFreeVersionDeleteActivity) + auditLogLifecycle(ctx, meta.oi, ILMFreeVersionDelete) } if ignoreNotFoundErr(err) != nil { logger.LogIf(ctx, err) @@ -1136,7 +1136,7 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay } // Send audit for the lifecycle delete operation - auditLogLifecycle(ctx, obj.Bucket, obj.Name, obj.VersionID, ILMExpiryActivity) + auditLogLifecycle(ctx, obj, ILMExpiry) eventName := event.ObjectRemovedDelete if obj.DeleteMarker { @@ -1379,23 +1379,23 @@ func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error { } const ( - // ILMExpiryActivity - activity trail for ILM expiry - ILMExpiryActivity = "ilm:expiry" - // ILMFreeVersionDeleteActivity - activity trail for ILM free-version delete - ILMFreeVersionDeleteActivity = "ilm:free-version-delete" + // ILMExpiry - audit trail for ILM expiry + ILMExpiry = "ilm:expiry" + // ILMFreeVersionDelete - audit trail for ILM free-version delete + ILMFreeVersionDelete = "ilm:free-version-delete" ) -func auditLogLifecycle(ctx context.Context, bucket, object, versionID string, trigger string) { +func auditLogLifecycle(ctx context.Context, oi ObjectInfo, trigger string) { var apiName string switch trigger { - case ILMExpiryActivity: - apiName = "s3:ExpireObject" - case ILMFreeVersionDeleteActivity: - apiName = "s3:DeleteFreeVersion" + case ILMExpiry: + apiName = "ILMExpiry" + case ILMFreeVersionDelete: + apiName = "ILMFreeVersionDelete" } - auditLogInternal(ctx, bucket, object, AuditLogOptions{ + auditLogInternal(ctx, oi.Bucket, oi.Name, AuditLogOptions{ Trigger: trigger, APIName: apiName, - VersionID: versionID, + VersionID: oi.VersionID, }) } diff --git a/internal/logger/audit.go b/internal/logger/audit.go index cd607f541..13dee06c9 100644 --- a/internal/logger/audit.go +++ b/internal/logger/audit.go @@ -142,7 +142,11 @@ func GetAuditEntry(ctx context.Context) *audit.Entry { if ok { return r } - r = &audit.Entry{} + r = &audit.Entry{ + Version: audit.Version, + DeploymentID: globalDeploymentID, + Time: time.Now().UTC().Format(time.RFC3339Nano), + } SetAuditEntry(ctx, r) return r } @@ -165,7 +169,8 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl } entry = audit.ToEntry(w, r, reqClaims, globalDeploymentID) - entry.Trigger = "external-request" + // indicates all requests for this API call are inbound + entry.Trigger = "incoming" for _, filterKey := range filterKeys { delete(entry.ReqClaims, filterKey)