mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-06 00:42:38 +00:00
s3: route object-lock version-specific deletes off the distributed lock (#9657)
A version-specific DELETE (real version or the null version, including object-lock WORM-checked ones and governance-bypass) now runs as one routed transaction on the object's owner instead of holding the distributed lock. For a real version: recompute the .versions pointer excluding the version (repoint-before-delete, so a crash leaves a recoverable orphan rather than a dangling pointer), then delete the version file, under the object's per-path lock. The null version is the regular object entry, deleted directly (no pointer). Object-lock buckets gate the delete on the version's WORM guards evaluated on the owner: legal hold (always) + retention (while not elapsed). Governance bypass scopes the retention guard to COMPLIANCE mode, so the filer allows a governance-mode delete while still denying compliance and legal hold — the gateway never reads the version. Three primitives make this expressible: - ObjectTransaction.condition_key: evaluate the condition against a named entry (the version) while the lock stays on lock_key (the object). - Recompute.exclude_name: omit a child from the scan, to repoint before delete. - WriteCondition.Clause gate_key/gate_value: scope IF_EXTENDED_TIME_ELAPSED to a mode, expressing governance bypass without a gateway-side read.
This commit is contained in:
@@ -274,6 +274,8 @@ message WriteCondition {
|
||||
bool allow_weak = 4; // compare ETags ignoring the weak (W/) marker
|
||||
string ext_key = 5; // extended attribute name for IF_EXTENDED_* kinds
|
||||
string ext_value = 6; // blocking value for IF_EXTENDED_NOT_EQUAL
|
||||
string gate_key = 7; // IF_EXTENDED_TIME_ELAPSED: only enforce when extended[gate_key] == gate_value
|
||||
string gate_value = 8; // gate value (e.g. retention mode COMPLIANCE for governance bypass)
|
||||
}
|
||||
repeated Clause clauses = 1; // all must hold (logical AND)
|
||||
}
|
||||
@@ -333,6 +335,7 @@ message Recompute {
|
||||
string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key
|
||||
string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes
|
||||
bytes demote_value = 8; // value for demote_key
|
||||
string exclude_name = 9; // if set, skip this child when scanning (e.g. a version about to be deleted)
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
@@ -346,6 +349,7 @@ message ObjectTransactionRequest {
|
||||
repeated ObjectMutation mutations = 3;
|
||||
bool is_from_other_cluster = 4;
|
||||
repeated int32 signatures = 5;
|
||||
string condition_key = 6;
|
||||
}
|
||||
|
||||
message ObjectTransactionResponse {
|
||||
|
||||
@@ -274,6 +274,8 @@ message WriteCondition {
|
||||
bool allow_weak = 4; // compare ETags ignoring the weak (W/) marker
|
||||
string ext_key = 5; // extended attribute name for IF_EXTENDED_* kinds
|
||||
string ext_value = 6; // blocking value for IF_EXTENDED_NOT_EQUAL
|
||||
string gate_key = 7; // IF_EXTENDED_TIME_ELAPSED: only enforce when extended[gate_key] == gate_value
|
||||
string gate_value = 8; // gate value (e.g. retention mode COMPLIANCE for governance bypass)
|
||||
}
|
||||
repeated Clause clauses = 1; // all must hold (logical AND)
|
||||
}
|
||||
@@ -333,6 +335,7 @@ message Recompute {
|
||||
string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key
|
||||
string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes
|
||||
bytes demote_value = 8; // value for demote_key
|
||||
string exclude_name = 9; // if set, skip this child when scanning (e.g. a version about to be deleted)
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
@@ -346,6 +349,7 @@ message ObjectTransactionRequest {
|
||||
repeated ObjectMutation mutations = 3;
|
||||
bool is_from_other_cluster = 4;
|
||||
repeated int32 signatures = 5;
|
||||
string condition_key = 6; // if set, evaluate the condition against this entry instead of lock_key (still locking lock_key)
|
||||
}
|
||||
|
||||
message ObjectTransactionResponse {
|
||||
|
||||
@@ -1559,6 +1559,7 @@ type Recompute struct {
|
||||
MtimeToKey string `protobuf:"bytes,6,opt,name=mtime_to_key,json=mtimeToKey,proto3" json:"mtime_to_key,omitempty"` // if set, store the chosen child's Mtime (decimal) under this pointer key
|
||||
DemoteKey string `protobuf:"bytes,7,opt,name=demote_key,json=demoteKey,proto3" json:"demote_key,omitempty"` // if set, stamp demote_value on the prior name_to_key target when it changes
|
||||
DemoteValue []byte `protobuf:"bytes,8,opt,name=demote_value,json=demoteValue,proto3" json:"demote_value,omitempty"` // value for demote_key
|
||||
ExcludeName string `protobuf:"bytes,9,opt,name=exclude_name,json=excludeName,proto3" json:"exclude_name,omitempty"` // if set, skip this child when scanning (e.g. a version about to be deleted)
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -1649,6 +1650,13 @@ func (x *Recompute) GetDemoteValue() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Recompute) GetExcludeName() string {
|
||||
if x != nil {
|
||||
return x.ExcludeName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ObjectTransactionRequest applies an ordered list of mutations atomically with
|
||||
// respect to other writers of the same object, by holding the filer's per-path
|
||||
// lock on lock_key for the whole transaction. The optional condition is checked
|
||||
@@ -1661,6 +1669,7 @@ type ObjectTransactionRequest struct {
|
||||
Mutations []*ObjectMutation `protobuf:"bytes,3,rep,name=mutations,proto3" json:"mutations,omitempty"`
|
||||
IsFromOtherCluster bool `protobuf:"varint,4,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"`
|
||||
Signatures []int32 `protobuf:"varint,5,rep,packed,name=signatures,proto3" json:"signatures,omitempty"`
|
||||
ConditionKey string `protobuf:"bytes,6,opt,name=condition_key,json=conditionKey,proto3" json:"condition_key,omitempty"` // if set, evaluate the condition against this entry instead of lock_key (still locking lock_key)
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -1730,6 +1739,13 @@ func (x *ObjectTransactionRequest) GetSignatures() []int32 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *ObjectTransactionRequest) GetConditionKey() string {
|
||||
if x != nil {
|
||||
return x.ConditionKey
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type ObjectTransactionResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
|
||||
@@ -5935,6 +5951,8 @@ type WriteCondition_Clause struct {
|
||||
AllowWeak bool `protobuf:"varint,4,opt,name=allow_weak,json=allowWeak,proto3" json:"allow_weak,omitempty"` // compare ETags ignoring the weak (W/) marker
|
||||
ExtKey string `protobuf:"bytes,5,opt,name=ext_key,json=extKey,proto3" json:"ext_key,omitempty"` // extended attribute name for IF_EXTENDED_* kinds
|
||||
ExtValue string `protobuf:"bytes,6,opt,name=ext_value,json=extValue,proto3" json:"ext_value,omitempty"` // blocking value for IF_EXTENDED_NOT_EQUAL
|
||||
GateKey string `protobuf:"bytes,7,opt,name=gate_key,json=gateKey,proto3" json:"gate_key,omitempty"` // IF_EXTENDED_TIME_ELAPSED: only enforce when extended[gate_key] == gate_value
|
||||
GateValue string `protobuf:"bytes,8,opt,name=gate_value,json=gateValue,proto3" json:"gate_value,omitempty"` // gate value (e.g. retention mode COMPLIANCE for governance bypass)
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -6011,6 +6029,20 @@ func (x *WriteCondition_Clause) GetExtValue() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *WriteCondition_Clause) GetGateKey() string {
|
||||
if x != nil {
|
||||
return x.GateKey
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *WriteCondition_Clause) GetGateValue() string {
|
||||
if x != nil {
|
||||
return x.GateValue
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// if found, send the exact address
|
||||
// if not found, send the full list of existing brokers
|
||||
type LocateBrokerResponse_Resource struct {
|
||||
@@ -6343,9 +6375,9 @@ const file_filer_proto_rawDesc = "" +
|
||||
"signatures\x18\x05 \x03(\x05R\n" +
|
||||
"signatures\x12=\n" +
|
||||
"\x1bskip_check_parent_directory\x18\x06 \x01(\bR\x18skipCheckParentDirectory\x126\n" +
|
||||
"\tcondition\x18\a \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\"\xd9\x03\n" +
|
||||
"\tcondition\x18\a \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\"\x93\x04\n" +
|
||||
"\x0eWriteCondition\x129\n" +
|
||||
"\aclauses\x18\x01 \x03(\v2\x1f.filer_pb.WriteCondition.ClauseR\aclauses\x1a\xc3\x01\n" +
|
||||
"\aclauses\x18\x01 \x03(\v2\x1f.filer_pb.WriteCondition.ClauseR\aclauses\x1a\xfd\x01\n" +
|
||||
"\x06Clause\x121\n" +
|
||||
"\x04kind\x18\x01 \x01(\x0e2\x1d.filer_pb.WriteCondition.KindR\x04kind\x12\x14\n" +
|
||||
"\x05etags\x18\x02 \x03(\tR\x05etags\x12\x1b\n" +
|
||||
@@ -6353,7 +6385,10 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"allow_weak\x18\x04 \x01(\bR\tallowWeak\x12\x17\n" +
|
||||
"\aext_key\x18\x05 \x01(\tR\x06extKey\x12\x1b\n" +
|
||||
"\text_value\x18\x06 \x01(\tR\bextValue\"\xc5\x01\n" +
|
||||
"\text_value\x18\x06 \x01(\tR\bextValue\x12\x19\n" +
|
||||
"\bgate_key\x18\a \x01(\tR\agateKey\x12\x1d\n" +
|
||||
"\n" +
|
||||
"gate_value\x18\b \x01(\tR\tgateValue\"\xc5\x01\n" +
|
||||
"\x04Kind\x12\b\n" +
|
||||
"\x04NONE\x10\x00\x12\x11\n" +
|
||||
"\rIF_NOT_EXISTS\x10\x01\x12\r\n" +
|
||||
@@ -6386,7 +6421,7 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"\x06DELETE\x10\x01\x12\x12\n" +
|
||||
"\x0ePATCH_EXTENDED\x10\x02\x12\x14\n" +
|
||||
"\x10RECOMPUTE_LATEST\x10\x03\"\xf7\x02\n" +
|
||||
"\x10RECOMPUTE_LATEST\x10\x03\"\x9a\x03\n" +
|
||||
"\tRecompute\x12\x19\n" +
|
||||
"\bscan_dir\x18\x01 \x01(\tR\ascanDir\x12\x1e\n" +
|
||||
"\n" +
|
||||
@@ -6399,10 +6434,11 @@ const file_filer_proto_rawDesc = "" +
|
||||
"mtimeToKey\x12\x1d\n" +
|
||||
"\n" +
|
||||
"demote_key\x18\a \x01(\tR\tdemoteKey\x12!\n" +
|
||||
"\fdemote_value\x18\b \x01(\fR\vdemoteValue\x1a?\n" +
|
||||
"\fdemote_value\x18\b \x01(\fR\vdemoteValue\x12!\n" +
|
||||
"\fexclude_name\x18\t \x01(\tR\vexcludeName\x1a?\n" +
|
||||
"\x11CopyExtendedEntry\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
|
||||
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xf8\x01\n" +
|
||||
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x9d\x02\n" +
|
||||
"\x18ObjectTransactionRequest\x12\x19\n" +
|
||||
"\block_key\x18\x01 \x01(\tR\alockKey\x126\n" +
|
||||
"\tcondition\x18\x02 \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\x126\n" +
|
||||
@@ -6410,7 +6446,8 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\x15is_from_other_cluster\x18\x04 \x01(\bR\x12isFromOtherCluster\x12\x1e\n" +
|
||||
"\n" +
|
||||
"signatures\x18\x05 \x03(\x05R\n" +
|
||||
"signatures\"f\n" +
|
||||
"signatures\x12#\n" +
|
||||
"\rcondition_key\x18\x06 \x01(\tR\fconditionKey\"f\n" +
|
||||
"\x19ObjectTransactionResponse\x12\x14\n" +
|
||||
"\x05error\x18\x01 \x01(\tR\x05error\x123\n" +
|
||||
"\n" +
|
||||
|
||||
@@ -239,16 +239,42 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
}
|
||||
// Versioned/suspended delete with no specific version: route off the lock when
|
||||
// the bucket has an owner. createDeleteMarker routes its own pointer flip;
|
||||
// object-lock buckets are excluded by routableWriteOwner and stay on the lock,
|
||||
// and the If-Match precondition was already checked above. A specific-version
|
||||
// delete keeps the lock (its recompute-after-delete is a separate change).
|
||||
// the bucket has an owner. createDeleteMarker routes its own pointer flip; a
|
||||
// delete marker never removes a locked version, so object-lock buckets route
|
||||
// here too. The If-Match precondition was already checked above.
|
||||
if !deleteHandled && versioningConfigured && versionId == "" {
|
||||
if owner := s3a.routableWriteOwner(bucket, object); owner != "" {
|
||||
deleteResult, deleteCode = s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState)
|
||||
deleteHandled = true
|
||||
}
|
||||
}
|
||||
// Specific-version delete: route off the lock. A real version recomputes the
|
||||
// .versions pointer excluding it and deletes the version file; the null
|
||||
// version is the regular object entry, deleted directly. Object-lock buckets
|
||||
// gate the delete on the version's WORM guards, evaluated on the owner — for
|
||||
// governance bypass the retention guard is scoped to COMPLIANCE so the filer
|
||||
// allows a governance-mode delete while still denying compliance and legal
|
||||
// hold, without the gateway reading the version.
|
||||
if !deleteHandled && versionId != "" {
|
||||
worm, lockErr := s3a.isObjectLockEnabled(bucket)
|
||||
bypass := worm && s3a.evaluateGovernanceBypassRequest(r, bucket, object)
|
||||
if lockErr == nil {
|
||||
if owner := s3a.objectWriteOwner(bucket, object); owner != "" {
|
||||
deleteResult.versionId = versionId
|
||||
if ve, vErr := s3a.getSpecificObjectVersion(bucket, object, versionId); vErr == nil && ve != nil && ve.Extended != nil {
|
||||
if dm, ok := ve.Extended[s3_constants.ExtDeleteMarkerKey]; ok && string(dm) == "true" {
|
||||
deleteResult.deleteMarker = true
|
||||
}
|
||||
}
|
||||
if versionId == "null" {
|
||||
deleteCode = s3a.routedDeleteNullVersion(owner, bucket, object, worm, bypass)
|
||||
} else {
|
||||
deleteCode = s3a.routedDeleteSpecificVersion(owner, bucket, object, versionId, worm, bypass)
|
||||
}
|
||||
deleteHandled = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !deleteHandled {
|
||||
deleteCode = s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode {
|
||||
return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed)
|
||||
|
||||
@@ -23,37 +23,49 @@ func (s3a *S3ApiServer) objectWriteOwner(bucket, object string) pb.ServerAddress
|
||||
return s3a.objectWriteLockClient.PrimaryForKey("s3.object.write:" + s3a.toFilerPath(bucket, object))
|
||||
}
|
||||
|
||||
// latestPointerRecompute builds the RECOMPUTE_LATEST mutation that re-derives an
|
||||
// object's .versions pointer. excludeName, when set, omits a version about to be
|
||||
// deleted (so the pointer is repointed before the blob is removed); demote, when
|
||||
// set, stamps the displaced prior latest with NoncurrentSinceNs.
|
||||
func (s3a *S3ApiServer) latestPointerRecompute(bucket, object string, useInvertedFormat bool, excludeName string, demote bool) *filer_pb.ObjectMutation {
|
||||
versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder)
|
||||
vdir, vname := util.FullPath(versionsPath).DirAndName()
|
||||
rc := &filer_pb.Recompute{
|
||||
ScanDir: versionsPath,
|
||||
// Inverted ids sort newest-first, so the newest is the first ascending
|
||||
// entry; legacy ids sort oldest-first (scan to the last).
|
||||
Descending: !useInvertedFormat,
|
||||
NameToKey: s3_constants.ExtLatestVersionFileNameKey,
|
||||
SizeToKey: s3_constants.ExtLatestVersionSizeKey,
|
||||
MtimeToKey: s3_constants.ExtLatestVersionMtimeKey,
|
||||
CopyExtended: map[string]string{
|
||||
s3_constants.ExtLatestVersionIdKey: s3_constants.ExtVersionIdKey,
|
||||
s3_constants.ExtLatestVersionETagKey: s3_constants.ExtETagKey,
|
||||
s3_constants.ExtLatestVersionOwnerKey: s3_constants.ExtAmzOwnerKey,
|
||||
s3_constants.ExtLatestVersionIsDeleteMarker: s3_constants.ExtDeleteMarkerKey,
|
||||
},
|
||||
ExcludeName: excludeName,
|
||||
}
|
||||
if demote {
|
||||
rc.DemoteKey = s3_constants.ExtNoncurrentSinceNsKey
|
||||
rc.DemoteValue = []byte(strconv.FormatInt(time.Now().UnixNano(), 10))
|
||||
}
|
||||
return &filer_pb.ObjectMutation{
|
||||
Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST,
|
||||
Directory: vdir,
|
||||
Name: vname,
|
||||
Recompute: rc,
|
||||
}
|
||||
}
|
||||
|
||||
// routedVersionedFinalize flips the .versions pointer to the newest version and
|
||||
// demotes the prior latest, atomically under the object's per-path lock on the
|
||||
// owner filer, via a single RECOMPUTE_LATEST. The version file is already
|
||||
// written; the owner re-derives the pointer by scanning the directory.
|
||||
func (s3a *S3ApiServer) routedVersionedFinalize(owner pb.ServerAddress, bucket, object string, useInvertedFormat bool) s3err.ErrorCode {
|
||||
versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder)
|
||||
vdir, vname := util.FullPath(versionsPath).DirAndName()
|
||||
req := &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: s3a.toFilerPath(bucket, object),
|
||||
Mutations: []*filer_pb.ObjectMutation{{
|
||||
Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST,
|
||||
Directory: vdir,
|
||||
Name: vname,
|
||||
Recompute: &filer_pb.Recompute{
|
||||
ScanDir: versionsPath,
|
||||
// Inverted ids sort newest-first, so the newest is the first
|
||||
// ascending entry; legacy ids sort oldest-first (scan to the last).
|
||||
Descending: !useInvertedFormat,
|
||||
NameToKey: s3_constants.ExtLatestVersionFileNameKey,
|
||||
SizeToKey: s3_constants.ExtLatestVersionSizeKey,
|
||||
MtimeToKey: s3_constants.ExtLatestVersionMtimeKey,
|
||||
CopyExtended: map[string]string{
|
||||
s3_constants.ExtLatestVersionIdKey: s3_constants.ExtVersionIdKey,
|
||||
s3_constants.ExtLatestVersionETagKey: s3_constants.ExtETagKey,
|
||||
s3_constants.ExtLatestVersionOwnerKey: s3_constants.ExtAmzOwnerKey,
|
||||
s3_constants.ExtLatestVersionIsDeleteMarker: s3_constants.ExtDeleteMarkerKey,
|
||||
},
|
||||
DemoteKey: s3_constants.ExtNoncurrentSinceNsKey,
|
||||
DemoteValue: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
|
||||
},
|
||||
}},
|
||||
LockKey: s3a.toFilerPath(bucket, object),
|
||||
Mutations: []*filer_pb.ObjectMutation{s3a.latestPointerRecompute(bucket, object, useInvertedFormat, "", true)},
|
||||
}
|
||||
resp, err := s3a.objectTxnOnFiler(owner, req)
|
||||
switch {
|
||||
@@ -68,6 +80,93 @@ func (s3a *S3ApiServer) routedVersionedFinalize(owner pb.ServerAddress, bucket,
|
||||
}
|
||||
}
|
||||
|
||||
// wormDeleteCondition returns the object-lock guards for a delete, or nil when
|
||||
// the bucket has no object lock. Legal hold always blocks. Retention blocks
|
||||
// while not elapsed; with governance bypass the retention guard is gated to
|
||||
// COMPLIANCE mode, so a governance-mode version becomes deletable while a
|
||||
// compliance-mode one stays protected — the filer decides from the version's
|
||||
// mode under the lock, so the gateway never has to read it.
|
||||
func wormDeleteCondition(worm, bypass bool) *filer_pb.WriteCondition {
|
||||
if !worm {
|
||||
return nil
|
||||
}
|
||||
retention := &filer_pb.WriteCondition_Clause{
|
||||
Kind: filer_pb.WriteCondition_IF_EXTENDED_TIME_ELAPSED,
|
||||
ExtKey: s3_constants.ExtRetentionUntilDateKey,
|
||||
}
|
||||
if bypass {
|
||||
retention.GateKey = s3_constants.ExtObjectLockModeKey
|
||||
retention.GateValue = s3_constants.RetentionModeCompliance
|
||||
}
|
||||
return &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
|
||||
{Kind: filer_pb.WriteCondition_IF_EXTENDED_NOT_EQUAL, ExtKey: s3_constants.ExtLegalHoldKey, ExtValue: s3_constants.LegalHoldOn},
|
||||
retention,
|
||||
}}
|
||||
}
|
||||
|
||||
// routedDeleteSpecificVersion deletes one version off the distributed lock: in a
|
||||
// single transaction on the owner it recomputes the .versions pointer excluding
|
||||
// the version (repoint-before-delete, so a crash leaves a recoverable orphan
|
||||
// rather than a dangling pointer) and deletes the version file. lock_key is the
|
||||
// object (serializing the pointer recompute); for object-lock buckets the
|
||||
// condition gates the delete on the version's WORM guards evaluated on the owner.
|
||||
func (s3a *S3ApiServer) routedDeleteSpecificVersion(owner pb.ServerAddress, bucket, object, versionId string, worm, bypass bool) s3err.ErrorCode {
|
||||
versionFileName := s3a.getVersionFileName(versionId)
|
||||
versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder)
|
||||
cond := wormDeleteCondition(worm, bypass)
|
||||
req := &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: s3a.toFilerPath(bucket, object),
|
||||
ConditionKey: versionsPath + "/" + versionFileName,
|
||||
Condition: cond,
|
||||
Mutations: []*filer_pb.ObjectMutation{
|
||||
s3a.latestPointerRecompute(bucket, object, isNewFormatVersionId(versionId), versionFileName, false),
|
||||
{Type: filer_pb.ObjectMutation_DELETE, Directory: versionsPath, Name: versionFileName, IsDeleteData: true},
|
||||
},
|
||||
}
|
||||
resp, err := s3a.objectTxnOnFiler(owner, req)
|
||||
switch {
|
||||
case err != nil:
|
||||
glog.Errorf("routedDeleteSpecificVersion: %s/%s %s on %s: %v", bucket, object, versionId, owner, err)
|
||||
return s3err.ErrInternalError
|
||||
case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED:
|
||||
// Legal hold or retention in force on the version.
|
||||
return s3err.ErrAccessDenied
|
||||
case resp.Error != "":
|
||||
glog.Errorf("routedDeleteSpecificVersion: %s/%s %s: %s", bucket, object, versionId, resp.Error)
|
||||
return s3err.ErrInternalError
|
||||
default:
|
||||
return s3err.ErrNone
|
||||
}
|
||||
}
|
||||
|
||||
// routedDeleteNullVersion deletes the null version (the regular object entry, not
|
||||
// a .versions file) off the distributed lock. There is no pointer to recompute;
|
||||
// the WORM guards, when present, gate the delete on the object entry itself
|
||||
// (condition defaults to lock_key).
|
||||
func (s3a *S3ApiServer) routedDeleteNullVersion(owner pb.ServerAddress, bucket, object string, worm, bypass bool) s3err.ErrorCode {
|
||||
fullpath := util.NewFullPath(s3a.bucketDir(bucket), object)
|
||||
dir, name := fullpath.DirAndName()
|
||||
resp, err := s3a.objectTxnOnFiler(owner, &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: string(fullpath),
|
||||
Condition: wormDeleteCondition(worm, bypass),
|
||||
Mutations: []*filer_pb.ObjectMutation{
|
||||
{Type: filer_pb.ObjectMutation_DELETE, Directory: dir, Name: name, IsDeleteData: true},
|
||||
},
|
||||
})
|
||||
switch {
|
||||
case err != nil:
|
||||
glog.Errorf("routedDeleteNullVersion: %s/%s on %s: %v", bucket, object, owner, err)
|
||||
return s3err.ErrInternalError
|
||||
case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED:
|
||||
return s3err.ErrAccessDenied
|
||||
case resp.Error != "":
|
||||
glog.Errorf("routedDeleteNullVersion: %s/%s: %s", bucket, object, resp.Error)
|
||||
return s3err.ErrInternalError
|
||||
default:
|
||||
return s3err.ErrNone
|
||||
}
|
||||
}
|
||||
|
||||
// versionedAfterCreate returns the putToFiler hook that finalizes a versioned
|
||||
// write: the routed RECOMPUTE_LATEST when the owner is known, else the existing
|
||||
// lock-free updateLatestVersionInDirectory.
|
||||
|
||||
@@ -261,15 +261,22 @@ func (fs *FilerServer) ObjectTransaction(ctx context.Context, req *filer_pb.Obje
|
||||
defer fs.entryLockTable.ReleaseLock(lockPath, pathLock)
|
||||
|
||||
if conditionIsSet(req.Condition) {
|
||||
current, findErr := fs.filer.FindEntry(ctx, lockPath)
|
||||
// The condition is evaluated against condition_key when set (e.g. a
|
||||
// version entry whose WORM guards gate the delete), while the lock stays
|
||||
// on lock_key (the object, serializing the pointer recompute).
|
||||
conditionPath := lockPath
|
||||
if req.ConditionKey != "" {
|
||||
conditionPath = util.FullPath(req.ConditionKey)
|
||||
}
|
||||
current, findErr := fs.filer.FindEntry(ctx, conditionPath)
|
||||
if findErr != nil && findErr != filer_pb.ErrNotFound {
|
||||
return &filer_pb.ObjectTransactionResponse{}, fmt.Errorf("ObjectTransaction condition %s: %w", lockPath, findErr)
|
||||
return &filer_pb.ObjectTransactionResponse{}, fmt.Errorf("ObjectTransaction condition %s: %w", conditionPath, findErr)
|
||||
}
|
||||
if findErr == filer_pb.ErrNotFound {
|
||||
current = nil
|
||||
}
|
||||
if !writeConditionSatisfied(req.Condition, current) {
|
||||
glog.V(3).InfofCtx(ctx, "ObjectTransaction %s: precondition failed", lockPath)
|
||||
glog.V(3).InfofCtx(ctx, "ObjectTransaction %s: precondition failed", conditionPath)
|
||||
return &filer_pb.ObjectTransactionResponse{
|
||||
Error: "precondition failed",
|
||||
ErrorCode: filer_pb.FilerError_PRECONDITION_FAILED,
|
||||
@@ -421,12 +428,17 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj
|
||||
// The store streams entries ascending by name. For the lowest-name pick we
|
||||
// only need the first entry, so cap the listing at one; for the highest-name
|
||||
// pick we must scan all and keep the last (the store has no reverse order).
|
||||
// With exclude_name set the first child may be the excluded one, so the cap
|
||||
// is lifted to find the first non-excluded entry.
|
||||
limit := int64(math.MaxInt32)
|
||||
if !rc.Descending {
|
||||
if !rc.Descending && rc.ExcludeName == "" {
|
||||
limit = 1
|
||||
}
|
||||
var chosen *filer.Entry
|
||||
_, listErr := fs.filer.StreamListDirectoryEntries(ctx, util.FullPath(rc.ScanDir), "", false, limit, "", "", "", func(entry *filer.Entry) (bool, error) {
|
||||
if rc.ExcludeName != "" && entry.Name() == rc.ExcludeName {
|
||||
return true, nil
|
||||
}
|
||||
chosen = entry
|
||||
return rc.Descending, nil
|
||||
})
|
||||
|
||||
@@ -59,6 +59,16 @@ func clauseSatisfied(c *filer_pb.WriteCondition_Clause, current *filer.Entry) bo
|
||||
if !exists {
|
||||
return true
|
||||
}
|
||||
// An optional gate scopes the guard: when gate_key is set, the time check
|
||||
// only applies if extended[gate_key] == gate_value. This lets the gateway
|
||||
// express governance bypass (enforce retention only for COMPLIANCE mode)
|
||||
// without reading the entry — the filer decides under the lock.
|
||||
if c.GateKey != "" {
|
||||
gv, gok := current.Extended[c.GateKey]
|
||||
if !gok || string(gv) != c.GateValue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
v, ok := current.Extended[c.ExtKey]
|
||||
if !ok {
|
||||
return true
|
||||
|
||||
@@ -124,6 +124,12 @@ func TestWriteConditionObjectLockGuards(t *testing.T) {
|
||||
retention := &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
|
||||
{Kind: filer_pb.WriteCondition_IF_EXTENDED_TIME_ELAPSED, ExtKey: "retain-until"},
|
||||
}}
|
||||
// Governance bypass: the retention guard is gated to COMPLIANCE mode, so a
|
||||
// governance-mode (or unmoded) entry is deletable while compliance stays
|
||||
// protected.
|
||||
gatedRetention := &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
|
||||
{Kind: filer_pb.WriteCondition_IF_EXTENDED_TIME_ELAPSED, ExtKey: "retain-until", GateKey: "lock-mode", GateValue: "COMPLIANCE"},
|
||||
}}
|
||||
future := strconv.FormatInt(now.Add(time.Hour).Unix(), 10)
|
||||
past := strconv.FormatInt(now.Add(-time.Hour).Unix(), 10)
|
||||
|
||||
@@ -141,6 +147,11 @@ func TestWriteConditionObjectLockGuards(t *testing.T) {
|
||||
{"retain-past-allows", retention, withExt(map[string]string{"retain-until": past}), true},
|
||||
{"retain-absent-allows", retention, withExt(nil), true},
|
||||
{"retain-malformed-blocks", retention, withExt(map[string]string{"retain-until": "soon"}), false},
|
||||
// Governance bypass (gated to COMPLIANCE): compliance still blocks, but
|
||||
// governance and unmoded entries become deletable despite future retention.
|
||||
{"bypass-compliance-blocks", gatedRetention, withExt(map[string]string{"retain-until": future, "lock-mode": "COMPLIANCE"}), false},
|
||||
{"bypass-governance-allows", gatedRetention, withExt(map[string]string{"retain-until": future, "lock-mode": "GOVERNANCE"}), true},
|
||||
{"bypass-no-mode-allows", gatedRetention, withExt(map[string]string{"retain-until": future}), true},
|
||||
// Composed WORM guard: legal hold AND retention, both clear -> allowed.
|
||||
{"worm-both-clear", &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
|
||||
{Kind: filer_pb.WriteCondition_IF_EXTENDED_NOT_EQUAL, ExtKey: "lock-hold", ExtValue: "ON"},
|
||||
|
||||
@@ -490,3 +490,73 @@ func TestObjectTransactionRecomputeDemoteAndAttrs(t *testing.T) {
|
||||
t.Errorf("new latest v2.ver should not be demoted")
|
||||
}
|
||||
}
|
||||
|
||||
// A version-specific delete locks the object (condition_key checks WORM on the
|
||||
// version), recomputes the pointer excluding the version (repoint-before-delete),
|
||||
// then deletes it. A legal-hold guard blocks the delete and preserves the entry.
|
||||
func TestObjectTransactionVersionDeleteWithWorm(t *testing.T) {
|
||||
now := time.Unix(1700000000, 0)
|
||||
ver := func(inode uint64, ext map[string][]byte) *filer.Entry {
|
||||
return &filer.Entry{Attr: filer.Attr{Inode: inode, Mtime: now, Crtime: now, Mode: 0644}, Extended: ext}
|
||||
}
|
||||
seed := func(latestLocked bool) map[string]*filer.Entry {
|
||||
vcExt := map[string][]byte{"vid": []byte("v3")}
|
||||
if latestLocked {
|
||||
vcExt["legalhold"] = []byte("ON")
|
||||
}
|
||||
return map[string]*filer.Entry{
|
||||
"/buckets/b/obj/.versions": {
|
||||
Attr: filer.Attr{Inode: 2, Mtime: now, Crtime: now, Mode: 0755 | (1 << 31)},
|
||||
Extended: map[string][]byte{"latestName": []byte("v_c"), "latestVid": []byte("v3")},
|
||||
},
|
||||
"/buckets/b/obj/.versions/v_a": ver(10, map[string][]byte{"vid": []byte("v1")}),
|
||||
"/buckets/b/obj/.versions/v_b": ver(11, map[string][]byte{"vid": []byte("v2")}),
|
||||
"/buckets/b/obj/.versions/v_c": ver(12, vcExt),
|
||||
}
|
||||
}
|
||||
mkReq := func() *filer_pb.ObjectTransactionRequest {
|
||||
return &filer_pb.ObjectTransactionRequest{
|
||||
LockKey: "/buckets/b/obj",
|
||||
ConditionKey: "/buckets/b/obj/.versions/v_c",
|
||||
Condition: &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{
|
||||
{Kind: filer_pb.WriteCondition_IF_EXTENDED_NOT_EQUAL, ExtKey: "legalhold", ExtValue: "ON"},
|
||||
}},
|
||||
Mutations: []*filer_pb.ObjectMutation{
|
||||
{Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions",
|
||||
Recompute: &filer_pb.Recompute{ScanDir: "/buckets/b/obj/.versions", Descending: true, ExcludeName: "v_c",
|
||||
NameToKey: "latestName", CopyExtended: map[string]string{"latestVid": "vid"}}},
|
||||
{Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b/obj/.versions", Name: "v_c"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Legal hold ON: the WORM guard blocks; version and pointer untouched.
|
||||
fs, store := newTxnTestServer(seed(true))
|
||||
resp, err := fs.ObjectTransaction(context.Background(), mkReq())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.ErrorCode != filer_pb.FilerError_PRECONDITION_FAILED {
|
||||
t.Fatalf("locked version delete should fail precondition, got code=%v err=%q", resp.ErrorCode, resp.Error)
|
||||
}
|
||||
if _, ok := store.entries["/buckets/b/obj/.versions/v_c"]; !ok {
|
||||
t.Errorf("locked version must not be deleted")
|
||||
}
|
||||
if got := string(store.entries["/buckets/b/obj/.versions"].Extended["latestName"]); got != "v_c" {
|
||||
t.Errorf("pointer must be unchanged when delete is blocked, got %s", got)
|
||||
}
|
||||
|
||||
// No legal hold: pointer recomputes to v_b (excluding v_c), then v_c is deleted.
|
||||
fs, store = newTxnTestServer(seed(false))
|
||||
resp, err = fs.ObjectTransaction(context.Background(), mkReq())
|
||||
if err != nil || resp.Error != "" {
|
||||
t.Fatalf("unlocked delete failed: err=%v resp=%q", err, resp.Error)
|
||||
}
|
||||
if _, ok := store.entries["/buckets/b/obj/.versions/v_c"]; ok {
|
||||
t.Errorf("unlocked version should be deleted")
|
||||
}
|
||||
ptr := store.entries["/buckets/b/obj/.versions"].Extended
|
||||
if string(ptr["latestName"]) != "v_b" || string(ptr["latestVid"]) != "v2" {
|
||||
t.Errorf("pointer should recompute to v_b/v2, got name=%s vid=%s", ptr["latestName"], ptr["latestVid"])
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user