diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 983f20756..7a8fbac24 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -274,6 +274,8 @@ message WriteCondition { bool allow_weak = 4; // compare ETags ignoring the weak (W/) marker string ext_key = 5; // extended attribute name for IF_EXTENDED_* kinds string ext_value = 6; // blocking value for IF_EXTENDED_NOT_EQUAL + string gate_key = 7; // IF_EXTENDED_TIME_ELAPSED: only enforce when extended[gate_key] == gate_value + string gate_value = 8; // gate value (e.g. retention mode COMPLIANCE for governance bypass) } repeated Clause clauses = 1; // all must hold (logical AND) } @@ -333,6 +335,7 @@ message Recompute { string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes bytes demote_value = 8; // value for demote_key + string exclude_name = 9; // if set, skip this child when scanning (e.g. a version about to be deleted) } // ObjectTransactionRequest applies an ordered list of mutations atomically with @@ -346,6 +349,7 @@ message ObjectTransactionRequest { repeated ObjectMutation mutations = 3; bool is_from_other_cluster = 4; repeated int32 signatures = 5; + string condition_key = 6; } message ObjectTransactionResponse { diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 983f20756..b29f72aa9 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -274,6 +274,8 @@ message WriteCondition { bool allow_weak = 4; // compare ETags ignoring the weak (W/) marker string ext_key = 5; // extended attribute name for IF_EXTENDED_* kinds string ext_value = 6; // blocking value for IF_EXTENDED_NOT_EQUAL + string gate_key = 7; // IF_EXTENDED_TIME_ELAPSED: only enforce when extended[gate_key] == gate_value + string gate_value = 8; // gate value (e.g. retention mode COMPLIANCE for governance bypass) } repeated Clause clauses = 1; // all must hold (logical AND) } @@ -333,6 +335,7 @@ message Recompute { string mtime_to_key = 6; // if set, store the chosen child's Mtime (decimal) under this pointer key string demote_key = 7; // if set, stamp demote_value on the prior name_to_key target when it changes bytes demote_value = 8; // value for demote_key + string exclude_name = 9; // if set, skip this child when scanning (e.g. a version about to be deleted) } // ObjectTransactionRequest applies an ordered list of mutations atomically with @@ -346,6 +349,7 @@ message ObjectTransactionRequest { repeated ObjectMutation mutations = 3; bool is_from_other_cluster = 4; repeated int32 signatures = 5; + string condition_key = 6; // if set, evaluate the condition against this entry instead of lock_key (still locking lock_key) } message ObjectTransactionResponse { diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index bcec04359..6f494c701 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1559,6 +1559,7 @@ type Recompute struct { MtimeToKey string `protobuf:"bytes,6,opt,name=mtime_to_key,json=mtimeToKey,proto3" json:"mtime_to_key,omitempty"` // if set, store the chosen child's Mtime (decimal) under this pointer key DemoteKey string `protobuf:"bytes,7,opt,name=demote_key,json=demoteKey,proto3" json:"demote_key,omitempty"` // if set, stamp demote_value on the prior name_to_key target when it changes DemoteValue []byte `protobuf:"bytes,8,opt,name=demote_value,json=demoteValue,proto3" json:"demote_value,omitempty"` // value for demote_key + ExcludeName string `protobuf:"bytes,9,opt,name=exclude_name,json=excludeName,proto3" json:"exclude_name,omitempty"` // if set, skip this child when scanning (e.g. a version about to be deleted) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1649,6 +1650,13 @@ func (x *Recompute) GetDemoteValue() []byte { return nil } +func (x *Recompute) GetExcludeName() string { + if x != nil { + return x.ExcludeName + } + return "" +} + // ObjectTransactionRequest applies an ordered list of mutations atomically with // respect to other writers of the same object, by holding the filer's per-path // lock on lock_key for the whole transaction. The optional condition is checked @@ -1661,6 +1669,7 @@ type ObjectTransactionRequest struct { Mutations []*ObjectMutation `protobuf:"bytes,3,rep,name=mutations,proto3" json:"mutations,omitempty"` IsFromOtherCluster bool `protobuf:"varint,4,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` Signatures []int32 `protobuf:"varint,5,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` + ConditionKey string `protobuf:"bytes,6,opt,name=condition_key,json=conditionKey,proto3" json:"condition_key,omitempty"` // if set, evaluate the condition against this entry instead of lock_key (still locking lock_key) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1730,6 +1739,13 @@ func (x *ObjectTransactionRequest) GetSignatures() []int32 { return nil } +func (x *ObjectTransactionRequest) GetConditionKey() string { + if x != nil { + return x.ConditionKey + } + return "" +} + type ObjectTransactionResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` @@ -5935,6 +5951,8 @@ type WriteCondition_Clause struct { AllowWeak bool `protobuf:"varint,4,opt,name=allow_weak,json=allowWeak,proto3" json:"allow_weak,omitempty"` // compare ETags ignoring the weak (W/) marker ExtKey string `protobuf:"bytes,5,opt,name=ext_key,json=extKey,proto3" json:"ext_key,omitempty"` // extended attribute name for IF_EXTENDED_* kinds ExtValue string `protobuf:"bytes,6,opt,name=ext_value,json=extValue,proto3" json:"ext_value,omitempty"` // blocking value for IF_EXTENDED_NOT_EQUAL + GateKey string `protobuf:"bytes,7,opt,name=gate_key,json=gateKey,proto3" json:"gate_key,omitempty"` // IF_EXTENDED_TIME_ELAPSED: only enforce when extended[gate_key] == gate_value + GateValue string `protobuf:"bytes,8,opt,name=gate_value,json=gateValue,proto3" json:"gate_value,omitempty"` // gate value (e.g. retention mode COMPLIANCE for governance bypass) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -6011,6 +6029,20 @@ func (x *WriteCondition_Clause) GetExtValue() string { return "" } +func (x *WriteCondition_Clause) GetGateKey() string { + if x != nil { + return x.GateKey + } + return "" +} + +func (x *WriteCondition_Clause) GetGateValue() string { + if x != nil { + return x.GateValue + } + return "" +} + // if found, send the exact address // if not found, send the full list of existing brokers type LocateBrokerResponse_Resource struct { @@ -6343,9 +6375,9 @@ const file_filer_proto_rawDesc = "" + "signatures\x18\x05 \x03(\x05R\n" + "signatures\x12=\n" + "\x1bskip_check_parent_directory\x18\x06 \x01(\bR\x18skipCheckParentDirectory\x126\n" + - "\tcondition\x18\a \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\"\xd9\x03\n" + + "\tcondition\x18\a \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\"\x93\x04\n" + "\x0eWriteCondition\x129\n" + - "\aclauses\x18\x01 \x03(\v2\x1f.filer_pb.WriteCondition.ClauseR\aclauses\x1a\xc3\x01\n" + + "\aclauses\x18\x01 \x03(\v2\x1f.filer_pb.WriteCondition.ClauseR\aclauses\x1a\xfd\x01\n" + "\x06Clause\x121\n" + "\x04kind\x18\x01 \x01(\x0e2\x1d.filer_pb.WriteCondition.KindR\x04kind\x12\x14\n" + "\x05etags\x18\x02 \x03(\tR\x05etags\x12\x1b\n" + @@ -6353,7 +6385,10 @@ const file_filer_proto_rawDesc = "" + "\n" + "allow_weak\x18\x04 \x01(\bR\tallowWeak\x12\x17\n" + "\aext_key\x18\x05 \x01(\tR\x06extKey\x12\x1b\n" + - "\text_value\x18\x06 \x01(\tR\bextValue\"\xc5\x01\n" + + "\text_value\x18\x06 \x01(\tR\bextValue\x12\x19\n" + + "\bgate_key\x18\a \x01(\tR\agateKey\x12\x1d\n" + + "\n" + + "gate_value\x18\b \x01(\tR\tgateValue\"\xc5\x01\n" + "\x04Kind\x12\b\n" + "\x04NONE\x10\x00\x12\x11\n" + "\rIF_NOT_EXISTS\x10\x01\x12\r\n" + @@ -6386,7 +6421,7 @@ const file_filer_proto_rawDesc = "" + "\n" + "\x06DELETE\x10\x01\x12\x12\n" + "\x0ePATCH_EXTENDED\x10\x02\x12\x14\n" + - "\x10RECOMPUTE_LATEST\x10\x03\"\xf7\x02\n" + + "\x10RECOMPUTE_LATEST\x10\x03\"\x9a\x03\n" + "\tRecompute\x12\x19\n" + "\bscan_dir\x18\x01 \x01(\tR\ascanDir\x12\x1e\n" + "\n" + @@ -6399,10 +6434,11 @@ const file_filer_proto_rawDesc = "" + "mtimeToKey\x12\x1d\n" + "\n" + "demote_key\x18\a \x01(\tR\tdemoteKey\x12!\n" + - "\fdemote_value\x18\b \x01(\fR\vdemoteValue\x1a?\n" + + "\fdemote_value\x18\b \x01(\fR\vdemoteValue\x12!\n" + + "\fexclude_name\x18\t \x01(\tR\vexcludeName\x1a?\n" + "\x11CopyExtendedEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xf8\x01\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x9d\x02\n" + "\x18ObjectTransactionRequest\x12\x19\n" + "\block_key\x18\x01 \x01(\tR\alockKey\x126\n" + "\tcondition\x18\x02 \x01(\v2\x18.filer_pb.WriteConditionR\tcondition\x126\n" + @@ -6410,7 +6446,8 @@ const file_filer_proto_rawDesc = "" + "\x15is_from_other_cluster\x18\x04 \x01(\bR\x12isFromOtherCluster\x12\x1e\n" + "\n" + "signatures\x18\x05 \x03(\x05R\n" + - "signatures\"f\n" + + "signatures\x12#\n" + + "\rcondition_key\x18\x06 \x01(\tR\fconditionKey\"f\n" + "\x19ObjectTransactionResponse\x12\x14\n" + "\x05error\x18\x01 \x01(\tR\x05error\x123\n" + "\n" + diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 9200e3fb7..8c334a7a6 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -239,16 +239,42 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque } } // Versioned/suspended delete with no specific version: route off the lock when - // the bucket has an owner. createDeleteMarker routes its own pointer flip; - // object-lock buckets are excluded by routableWriteOwner and stay on the lock, - // and the If-Match precondition was already checked above. A specific-version - // delete keeps the lock (its recompute-after-delete is a separate change). + // the bucket has an owner. createDeleteMarker routes its own pointer flip; a + // delete marker never removes a locked version, so object-lock buckets route + // here too. The If-Match precondition was already checked above. if !deleteHandled && versioningConfigured && versionId == "" { if owner := s3a.routableWriteOwner(bucket, object); owner != "" { deleteResult, deleteCode = s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState) deleteHandled = true } } + // Specific-version delete: route off the lock. A real version recomputes the + // .versions pointer excluding it and deletes the version file; the null + // version is the regular object entry, deleted directly. Object-lock buckets + // gate the delete on the version's WORM guards, evaluated on the owner — for + // governance bypass the retention guard is scoped to COMPLIANCE so the filer + // allows a governance-mode delete while still denying compliance and legal + // hold, without the gateway reading the version. + if !deleteHandled && versionId != "" { + worm, lockErr := s3a.isObjectLockEnabled(bucket) + bypass := worm && s3a.evaluateGovernanceBypassRequest(r, bucket, object) + if lockErr == nil { + if owner := s3a.objectWriteOwner(bucket, object); owner != "" { + deleteResult.versionId = versionId + if ve, vErr := s3a.getSpecificObjectVersion(bucket, object, versionId); vErr == nil && ve != nil && ve.Extended != nil { + if dm, ok := ve.Extended[s3_constants.ExtDeleteMarkerKey]; ok && string(dm) == "true" { + deleteResult.deleteMarker = true + } + } + if versionId == "null" { + deleteCode = s3a.routedDeleteNullVersion(owner, bucket, object, worm, bypass) + } else { + deleteCode = s3a.routedDeleteSpecificVersion(owner, bucket, object, versionId, worm, bypass) + } + deleteHandled = true + } + } + } if !deleteHandled { deleteCode = s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode { return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed) diff --git a/weed/s3api/s3api_object_versioned_finalize.go b/weed/s3api/s3api_object_versioned_finalize.go index 874817aae..41627b965 100644 --- a/weed/s3api/s3api_object_versioned_finalize.go +++ b/weed/s3api/s3api_object_versioned_finalize.go @@ -23,37 +23,49 @@ func (s3a *S3ApiServer) objectWriteOwner(bucket, object string) pb.ServerAddress return s3a.objectWriteLockClient.PrimaryForKey("s3.object.write:" + s3a.toFilerPath(bucket, object)) } +// latestPointerRecompute builds the RECOMPUTE_LATEST mutation that re-derives an +// object's .versions pointer. excludeName, when set, omits a version about to be +// deleted (so the pointer is repointed before the blob is removed); demote, when +// set, stamps the displaced prior latest with NoncurrentSinceNs. +func (s3a *S3ApiServer) latestPointerRecompute(bucket, object string, useInvertedFormat bool, excludeName string, demote bool) *filer_pb.ObjectMutation { + versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder) + vdir, vname := util.FullPath(versionsPath).DirAndName() + rc := &filer_pb.Recompute{ + ScanDir: versionsPath, + // Inverted ids sort newest-first, so the newest is the first ascending + // entry; legacy ids sort oldest-first (scan to the last). + Descending: !useInvertedFormat, + NameToKey: s3_constants.ExtLatestVersionFileNameKey, + SizeToKey: s3_constants.ExtLatestVersionSizeKey, + MtimeToKey: s3_constants.ExtLatestVersionMtimeKey, + CopyExtended: map[string]string{ + s3_constants.ExtLatestVersionIdKey: s3_constants.ExtVersionIdKey, + s3_constants.ExtLatestVersionETagKey: s3_constants.ExtETagKey, + s3_constants.ExtLatestVersionOwnerKey: s3_constants.ExtAmzOwnerKey, + s3_constants.ExtLatestVersionIsDeleteMarker: s3_constants.ExtDeleteMarkerKey, + }, + ExcludeName: excludeName, + } + if demote { + rc.DemoteKey = s3_constants.ExtNoncurrentSinceNsKey + rc.DemoteValue = []byte(strconv.FormatInt(time.Now().UnixNano(), 10)) + } + return &filer_pb.ObjectMutation{ + Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, + Directory: vdir, + Name: vname, + Recompute: rc, + } +} + // routedVersionedFinalize flips the .versions pointer to the newest version and // demotes the prior latest, atomically under the object's per-path lock on the // owner filer, via a single RECOMPUTE_LATEST. The version file is already // written; the owner re-derives the pointer by scanning the directory. func (s3a *S3ApiServer) routedVersionedFinalize(owner pb.ServerAddress, bucket, object string, useInvertedFormat bool) s3err.ErrorCode { - versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder) - vdir, vname := util.FullPath(versionsPath).DirAndName() req := &filer_pb.ObjectTransactionRequest{ - LockKey: s3a.toFilerPath(bucket, object), - Mutations: []*filer_pb.ObjectMutation{{ - Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, - Directory: vdir, - Name: vname, - Recompute: &filer_pb.Recompute{ - ScanDir: versionsPath, - // Inverted ids sort newest-first, so the newest is the first - // ascending entry; legacy ids sort oldest-first (scan to the last). - Descending: !useInvertedFormat, - NameToKey: s3_constants.ExtLatestVersionFileNameKey, - SizeToKey: s3_constants.ExtLatestVersionSizeKey, - MtimeToKey: s3_constants.ExtLatestVersionMtimeKey, - CopyExtended: map[string]string{ - s3_constants.ExtLatestVersionIdKey: s3_constants.ExtVersionIdKey, - s3_constants.ExtLatestVersionETagKey: s3_constants.ExtETagKey, - s3_constants.ExtLatestVersionOwnerKey: s3_constants.ExtAmzOwnerKey, - s3_constants.ExtLatestVersionIsDeleteMarker: s3_constants.ExtDeleteMarkerKey, - }, - DemoteKey: s3_constants.ExtNoncurrentSinceNsKey, - DemoteValue: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), - }, - }}, + LockKey: s3a.toFilerPath(bucket, object), + Mutations: []*filer_pb.ObjectMutation{s3a.latestPointerRecompute(bucket, object, useInvertedFormat, "", true)}, } resp, err := s3a.objectTxnOnFiler(owner, req) switch { @@ -68,6 +80,93 @@ func (s3a *S3ApiServer) routedVersionedFinalize(owner pb.ServerAddress, bucket, } } +// wormDeleteCondition returns the object-lock guards for a delete, or nil when +// the bucket has no object lock. Legal hold always blocks. Retention blocks +// while not elapsed; with governance bypass the retention guard is gated to +// COMPLIANCE mode, so a governance-mode version becomes deletable while a +// compliance-mode one stays protected — the filer decides from the version's +// mode under the lock, so the gateway never has to read it. +func wormDeleteCondition(worm, bypass bool) *filer_pb.WriteCondition { + if !worm { + return nil + } + retention := &filer_pb.WriteCondition_Clause{ + Kind: filer_pb.WriteCondition_IF_EXTENDED_TIME_ELAPSED, + ExtKey: s3_constants.ExtRetentionUntilDateKey, + } + if bypass { + retention.GateKey = s3_constants.ExtObjectLockModeKey + retention.GateValue = s3_constants.RetentionModeCompliance + } + return &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{ + {Kind: filer_pb.WriteCondition_IF_EXTENDED_NOT_EQUAL, ExtKey: s3_constants.ExtLegalHoldKey, ExtValue: s3_constants.LegalHoldOn}, + retention, + }} +} + +// routedDeleteSpecificVersion deletes one version off the distributed lock: in a +// single transaction on the owner it recomputes the .versions pointer excluding +// the version (repoint-before-delete, so a crash leaves a recoverable orphan +// rather than a dangling pointer) and deletes the version file. lock_key is the +// object (serializing the pointer recompute); for object-lock buckets the +// condition gates the delete on the version's WORM guards evaluated on the owner. +func (s3a *S3ApiServer) routedDeleteSpecificVersion(owner pb.ServerAddress, bucket, object, versionId string, worm, bypass bool) s3err.ErrorCode { + versionFileName := s3a.getVersionFileName(versionId) + versionsPath := s3a.toFilerPath(bucket, object+s3_constants.VersionsFolder) + cond := wormDeleteCondition(worm, bypass) + req := &filer_pb.ObjectTransactionRequest{ + LockKey: s3a.toFilerPath(bucket, object), + ConditionKey: versionsPath + "/" + versionFileName, + Condition: cond, + Mutations: []*filer_pb.ObjectMutation{ + s3a.latestPointerRecompute(bucket, object, isNewFormatVersionId(versionId), versionFileName, false), + {Type: filer_pb.ObjectMutation_DELETE, Directory: versionsPath, Name: versionFileName, IsDeleteData: true}, + }, + } + resp, err := s3a.objectTxnOnFiler(owner, req) + switch { + case err != nil: + glog.Errorf("routedDeleteSpecificVersion: %s/%s %s on %s: %v", bucket, object, versionId, owner, err) + return s3err.ErrInternalError + case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED: + // Legal hold or retention in force on the version. + return s3err.ErrAccessDenied + case resp.Error != "": + glog.Errorf("routedDeleteSpecificVersion: %s/%s %s: %s", bucket, object, versionId, resp.Error) + return s3err.ErrInternalError + default: + return s3err.ErrNone + } +} + +// routedDeleteNullVersion deletes the null version (the regular object entry, not +// a .versions file) off the distributed lock. There is no pointer to recompute; +// the WORM guards, when present, gate the delete on the object entry itself +// (condition defaults to lock_key). +func (s3a *S3ApiServer) routedDeleteNullVersion(owner pb.ServerAddress, bucket, object string, worm, bypass bool) s3err.ErrorCode { + fullpath := util.NewFullPath(s3a.bucketDir(bucket), object) + dir, name := fullpath.DirAndName() + resp, err := s3a.objectTxnOnFiler(owner, &filer_pb.ObjectTransactionRequest{ + LockKey: string(fullpath), + Condition: wormDeleteCondition(worm, bypass), + Mutations: []*filer_pb.ObjectMutation{ + {Type: filer_pb.ObjectMutation_DELETE, Directory: dir, Name: name, IsDeleteData: true}, + }, + }) + switch { + case err != nil: + glog.Errorf("routedDeleteNullVersion: %s/%s on %s: %v", bucket, object, owner, err) + return s3err.ErrInternalError + case resp.ErrorCode == filer_pb.FilerError_PRECONDITION_FAILED: + return s3err.ErrAccessDenied + case resp.Error != "": + glog.Errorf("routedDeleteNullVersion: %s/%s: %s", bucket, object, resp.Error) + return s3err.ErrInternalError + default: + return s3err.ErrNone + } +} + // versionedAfterCreate returns the putToFiler hook that finalizes a versioned // write: the routed RECOMPUTE_LATEST when the owner is known, else the existing // lock-free updateLatestVersionInDirectory. diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 8a02c7d85..2bf466882 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -261,15 +261,22 @@ func (fs *FilerServer) ObjectTransaction(ctx context.Context, req *filer_pb.Obje defer fs.entryLockTable.ReleaseLock(lockPath, pathLock) if conditionIsSet(req.Condition) { - current, findErr := fs.filer.FindEntry(ctx, lockPath) + // The condition is evaluated against condition_key when set (e.g. a + // version entry whose WORM guards gate the delete), while the lock stays + // on lock_key (the object, serializing the pointer recompute). + conditionPath := lockPath + if req.ConditionKey != "" { + conditionPath = util.FullPath(req.ConditionKey) + } + current, findErr := fs.filer.FindEntry(ctx, conditionPath) if findErr != nil && findErr != filer_pb.ErrNotFound { - return &filer_pb.ObjectTransactionResponse{}, fmt.Errorf("ObjectTransaction condition %s: %w", lockPath, findErr) + return &filer_pb.ObjectTransactionResponse{}, fmt.Errorf("ObjectTransaction condition %s: %w", conditionPath, findErr) } if findErr == filer_pb.ErrNotFound { current = nil } if !writeConditionSatisfied(req.Condition, current) { - glog.V(3).InfofCtx(ctx, "ObjectTransaction %s: precondition failed", lockPath) + glog.V(3).InfofCtx(ctx, "ObjectTransaction %s: precondition failed", conditionPath) return &filer_pb.ObjectTransactionResponse{ Error: "precondition failed", ErrorCode: filer_pb.FilerError_PRECONDITION_FAILED, @@ -421,12 +428,17 @@ func (fs *FilerServer) applyRecomputeLatest(ctx context.Context, m *filer_pb.Obj // The store streams entries ascending by name. For the lowest-name pick we // only need the first entry, so cap the listing at one; for the highest-name // pick we must scan all and keep the last (the store has no reverse order). + // With exclude_name set the first child may be the excluded one, so the cap + // is lifted to find the first non-excluded entry. limit := int64(math.MaxInt32) - if !rc.Descending { + if !rc.Descending && rc.ExcludeName == "" { limit = 1 } var chosen *filer.Entry _, listErr := fs.filer.StreamListDirectoryEntries(ctx, util.FullPath(rc.ScanDir), "", false, limit, "", "", "", func(entry *filer.Entry) (bool, error) { + if rc.ExcludeName != "" && entry.Name() == rc.ExcludeName { + return true, nil + } chosen = entry return rc.Descending, nil }) diff --git a/weed/server/filer_grpc_server_condition.go b/weed/server/filer_grpc_server_condition.go index 809dfd4e1..e832bc3d2 100644 --- a/weed/server/filer_grpc_server_condition.go +++ b/weed/server/filer_grpc_server_condition.go @@ -59,6 +59,16 @@ func clauseSatisfied(c *filer_pb.WriteCondition_Clause, current *filer.Entry) bo if !exists { return true } + // An optional gate scopes the guard: when gate_key is set, the time check + // only applies if extended[gate_key] == gate_value. This lets the gateway + // express governance bypass (enforce retention only for COMPLIANCE mode) + // without reading the entry — the filer decides under the lock. + if c.GateKey != "" { + gv, gok := current.Extended[c.GateKey] + if !gok || string(gv) != c.GateValue { + return true + } + } v, ok := current.Extended[c.ExtKey] if !ok { return true diff --git a/weed/server/filer_grpc_server_condition_test.go b/weed/server/filer_grpc_server_condition_test.go index ce9007a55..3dcb0f792 100644 --- a/weed/server/filer_grpc_server_condition_test.go +++ b/weed/server/filer_grpc_server_condition_test.go @@ -124,6 +124,12 @@ func TestWriteConditionObjectLockGuards(t *testing.T) { retention := &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{ {Kind: filer_pb.WriteCondition_IF_EXTENDED_TIME_ELAPSED, ExtKey: "retain-until"}, }} + // Governance bypass: the retention guard is gated to COMPLIANCE mode, so a + // governance-mode (or unmoded) entry is deletable while compliance stays + // protected. + gatedRetention := &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{ + {Kind: filer_pb.WriteCondition_IF_EXTENDED_TIME_ELAPSED, ExtKey: "retain-until", GateKey: "lock-mode", GateValue: "COMPLIANCE"}, + }} future := strconv.FormatInt(now.Add(time.Hour).Unix(), 10) past := strconv.FormatInt(now.Add(-time.Hour).Unix(), 10) @@ -141,6 +147,11 @@ func TestWriteConditionObjectLockGuards(t *testing.T) { {"retain-past-allows", retention, withExt(map[string]string{"retain-until": past}), true}, {"retain-absent-allows", retention, withExt(nil), true}, {"retain-malformed-blocks", retention, withExt(map[string]string{"retain-until": "soon"}), false}, + // Governance bypass (gated to COMPLIANCE): compliance still blocks, but + // governance and unmoded entries become deletable despite future retention. + {"bypass-compliance-blocks", gatedRetention, withExt(map[string]string{"retain-until": future, "lock-mode": "COMPLIANCE"}), false}, + {"bypass-governance-allows", gatedRetention, withExt(map[string]string{"retain-until": future, "lock-mode": "GOVERNANCE"}), true}, + {"bypass-no-mode-allows", gatedRetention, withExt(map[string]string{"retain-until": future}), true}, // Composed WORM guard: legal hold AND retention, both clear -> allowed. {"worm-both-clear", &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{ {Kind: filer_pb.WriteCondition_IF_EXTENDED_NOT_EQUAL, ExtKey: "lock-hold", ExtValue: "ON"}, diff --git a/weed/server/filer_grpc_server_object_txn_test.go b/weed/server/filer_grpc_server_object_txn_test.go index 496d0cb60..ddf5a42b2 100644 --- a/weed/server/filer_grpc_server_object_txn_test.go +++ b/weed/server/filer_grpc_server_object_txn_test.go @@ -490,3 +490,73 @@ func TestObjectTransactionRecomputeDemoteAndAttrs(t *testing.T) { t.Errorf("new latest v2.ver should not be demoted") } } + +// A version-specific delete locks the object (condition_key checks WORM on the +// version), recomputes the pointer excluding the version (repoint-before-delete), +// then deletes it. A legal-hold guard blocks the delete and preserves the entry. +func TestObjectTransactionVersionDeleteWithWorm(t *testing.T) { + now := time.Unix(1700000000, 0) + ver := func(inode uint64, ext map[string][]byte) *filer.Entry { + return &filer.Entry{Attr: filer.Attr{Inode: inode, Mtime: now, Crtime: now, Mode: 0644}, Extended: ext} + } + seed := func(latestLocked bool) map[string]*filer.Entry { + vcExt := map[string][]byte{"vid": []byte("v3")} + if latestLocked { + vcExt["legalhold"] = []byte("ON") + } + return map[string]*filer.Entry{ + "/buckets/b/obj/.versions": { + Attr: filer.Attr{Inode: 2, Mtime: now, Crtime: now, Mode: 0755 | (1 << 31)}, + Extended: map[string][]byte{"latestName": []byte("v_c"), "latestVid": []byte("v3")}, + }, + "/buckets/b/obj/.versions/v_a": ver(10, map[string][]byte{"vid": []byte("v1")}), + "/buckets/b/obj/.versions/v_b": ver(11, map[string][]byte{"vid": []byte("v2")}), + "/buckets/b/obj/.versions/v_c": ver(12, vcExt), + } + } + mkReq := func() *filer_pb.ObjectTransactionRequest { + return &filer_pb.ObjectTransactionRequest{ + LockKey: "/buckets/b/obj", + ConditionKey: "/buckets/b/obj/.versions/v_c", + Condition: &filer_pb.WriteCondition{Clauses: []*filer_pb.WriteCondition_Clause{ + {Kind: filer_pb.WriteCondition_IF_EXTENDED_NOT_EQUAL, ExtKey: "legalhold", ExtValue: "ON"}, + }}, + Mutations: []*filer_pb.ObjectMutation{ + {Type: filer_pb.ObjectMutation_RECOMPUTE_LATEST, Directory: "/buckets/b/obj", Name: ".versions", + Recompute: &filer_pb.Recompute{ScanDir: "/buckets/b/obj/.versions", Descending: true, ExcludeName: "v_c", + NameToKey: "latestName", CopyExtended: map[string]string{"latestVid": "vid"}}}, + {Type: filer_pb.ObjectMutation_DELETE, Directory: "/buckets/b/obj/.versions", Name: "v_c"}, + }, + } + } + + // Legal hold ON: the WORM guard blocks; version and pointer untouched. + fs, store := newTxnTestServer(seed(true)) + resp, err := fs.ObjectTransaction(context.Background(), mkReq()) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.ErrorCode != filer_pb.FilerError_PRECONDITION_FAILED { + t.Fatalf("locked version delete should fail precondition, got code=%v err=%q", resp.ErrorCode, resp.Error) + } + if _, ok := store.entries["/buckets/b/obj/.versions/v_c"]; !ok { + t.Errorf("locked version must not be deleted") + } + if got := string(store.entries["/buckets/b/obj/.versions"].Extended["latestName"]); got != "v_c" { + t.Errorf("pointer must be unchanged when delete is blocked, got %s", got) + } + + // No legal hold: pointer recomputes to v_b (excluding v_c), then v_c is deleted. + fs, store = newTxnTestServer(seed(false)) + resp, err = fs.ObjectTransaction(context.Background(), mkReq()) + if err != nil || resp.Error != "" { + t.Fatalf("unlocked delete failed: err=%v resp=%q", err, resp.Error) + } + if _, ok := store.entries["/buckets/b/obj/.versions/v_c"]; ok { + t.Errorf("unlocked version should be deleted") + } + ptr := store.entries["/buckets/b/obj/.versions"].Extended + if string(ptr["latestName"]) != "v_b" || string(ptr["latestVid"]) != "v2" { + t.Errorf("pointer should recompute to v_b/v2, got name=%s vid=%s", ptr["latestName"], ptr["latestVid"]) + } +}