Files
seaweedfs/weed/s3api/s3lifecycle/reader/reader_test.go
Chris Lu 3a192c6c57 fix(s3/lifecycle): address Phase 3 post-merge review (#9354 #9355 #9356) (#9357)
* fix(s3/lifecycle): reader handles bare /buckets parent and pre-normalizes prefix

extractBucketKey accepted /buckets/ but rejected /buckets (no trailing
slash); some delete events emit the bare form, so bucket-root events
were silently dropped. Pre-normalize BucketsPath once on Run instead
of recomputing per event.

* perf(s3/lifecycle): pool sha256 hashers in ShardID

ShardID runs on every meta-log event before the shard filter; a fresh
sha256.New per call produces measurable allocator pressure under load.
sync.Pool reuses hashers across calls.

* fix(s3/lifecycle): router skips hard deletes and missing-attribute events

A hard delete carries no schedule-relevant state — Expiration would hit
NOOP_RESOLVED at dispatch and ExpiredObjectDeleteMarker fires from a
Create on the latest version. Skip rather than burn a schedule slot.

Missing Attributes leaves ModTime at year 0001, which makes
ExpirationDays fire immediately at dispatch. Skip the event instead.

Drop the unused 'versioned' parameter from buildObjectInfo; the
dispatcher's identity-CAS handles version drift in Phase 5.

* fix(s3/lifecycle): EntryIdentity.MtimeNs holds true nanoseconds

Both computeEntryIdentity (server) and buildIdentity (router) wrote
entry.Attributes.Mtime (seconds) into a field named MtimeNs. The CAS
worked because both sides agreed, but the encoding contradicted the
field name and would break if either side later started using true
nanoseconds. Combine Mtime*1e9 + the FuseAttributes.MtimeNs nanosecond
component on both sides; the test was updated to match.

* fix(s3/lifecycle): dispatcher distinguishes ctx cancel from transport errors

A canceled or deadline-exceeded RPC is shutdown, not a transport
failure: re-queue the Match at its original DueTime with no retry-budget
burn so a quick restart can't escalate it to BLOCKED.

* fix(s3/lifecycle): reader fallback prefix normalization mirrors Run

The fallback path that builds prefix from r.BucketsPath when
bucketsPathSlash is empty (test-only entry into extractBucketKey) was
appending an unconditional '/', producing '//' if BucketsPath already
ended with one. Use the same normalization Run does.

* fix(s3/lifecycle): ObjectInfo.ModTime carries the nanosecond component

ModTime dropped FuseAttributes.MtimeNs, leaving ExpirationDays one
nanosecond off relative to EntryIdentity.MtimeNs. Pass both to
time.Unix so the precision matches the CAS witness.
2026-05-07 16:54:24 -07:00

185 lines
5.7 KiB
Go

package reader
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
)
func TestExtractBucketKeyCreate(t *testing.T) {
r := &Reader{BucketsPath: "/buckets"}
resp := &filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/buckets/mybucket/path/to",
NewEntry: &filer_pb.Entry{Name: "obj.txt"},
},
}
b, k, ok := r.extractBucketKey(resp)
if !ok || b != "mybucket" || k != "path/to/obj.txt" {
t.Fatalf("got (%q,%q,%v), want (mybucket,path/to/obj.txt,true)", b, k, ok)
}
}
func TestExtractBucketKeyTopLevelObject(t *testing.T) {
r := &Reader{BucketsPath: "/buckets"}
resp := &filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/buckets/mybucket",
NewEntry: &filer_pb.Entry{Name: "a.txt"},
},
}
b, k, ok := r.extractBucketKey(resp)
if !ok || b != "mybucket" || k != "a.txt" {
t.Fatalf("got (%q,%q,%v), want (mybucket,a.txt,true)", b, k, ok)
}
}
func TestExtractBucketKeyDeleteUsesOldEntry(t *testing.T) {
r := &Reader{BucketsPath: "/buckets"}
resp := &filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/buckets/b/dir",
OldEntry: &filer_pb.Entry{Name: "gone.txt"},
},
}
b, k, ok := r.extractBucketKey(resp)
if !ok || b != "b" || k != "dir/gone.txt" {
t.Fatalf("got (%q,%q,%v), want (b,dir/gone.txt,true)", b, k, ok)
}
}
func TestExtractBucketKeyDeleteWithEmptyParent(t *testing.T) {
// Some delete events carry the directory in resp.Directory rather than
// EventNotification.NewParentPath.
r := &Reader{BucketsPath: "/buckets"}
resp := &filer_pb.SubscribeMetadataResponse{
Directory: "/buckets/b/dir",
EventNotification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{Name: "gone.txt"},
},
}
b, k, ok := r.extractBucketKey(resp)
if !ok || b != "b" || k != "dir/gone.txt" {
t.Fatalf("got (%q,%q,%v), want (b,dir/gone.txt,true)", b, k, ok)
}
}
func TestExtractBucketKeyOutsideBucketsSkipped(t *testing.T) {
r := &Reader{BucketsPath: "/buckets"}
resp := &filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/etc/something",
NewEntry: &filer_pb.Entry{Name: "config"},
},
}
if _, _, ok := r.extractBucketKey(resp); ok {
t.Fatal("expected skip for non-bucket path")
}
}
func TestExtractBucketKeyBucketCreateAtRoot(t *testing.T) {
// Creating a new bucket emits an event at /buckets/ with the bucket as
// the new entry's Name.
r := &Reader{BucketsPath: "/buckets"}
resp := &filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/buckets/",
NewEntry: &filer_pb.Entry{Name: "newbucket"},
},
}
b, k, ok := r.extractBucketKey(resp)
if !ok || b != "newbucket" || k != "" {
t.Fatalf("got (%q,%q,%v), want (newbucket,,true)", b, k, ok)
}
}
func TestExtractBucketKeyBucketCreateBareRoot(t *testing.T) {
// Some delete and create events emit the parent without a trailing
// slash. /buckets (no trailing slash) must also resolve to a bucket-
// root event with the entry name as the bucket.
r := &Reader{BucketsPath: "/buckets"}
resp := &filer_pb.SubscribeMetadataResponse{
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/buckets",
NewEntry: &filer_pb.Entry{Name: "newbucket"},
},
}
b, k, ok := r.extractBucketKey(resp)
if !ok || b != "newbucket" || k != "" {
t.Fatalf("bare /buckets parent: got (%q,%q,%v), want (newbucket,,true)", b, k, ok)
}
}
func TestDispatchOneFiltersByShard(t *testing.T) {
// Pick a bucket+key whose shard is known, set Reader to a different
// shard, expect skip; same shard, expect emit.
bucket, key := "mybucket", "thing.txt"
target := s3lifecycle.ShardID(bucket, key)
other := (target + 1) % s3lifecycle.ShardCount
resp := &filer_pb.SubscribeMetadataResponse{
TsNs: 12345,
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/buckets/" + bucket,
NewEntry: &filer_pb.Entry{Name: key},
},
}
out := make(chan *Event, 1)
r := &Reader{
ShardID: other,
BucketsPath: "/buckets",
Cursor: NewCursor(),
Events: out,
}
processed := 0
if err := r.dispatchOne(context.Background(), resp, &processed); err != nil {
t.Fatalf("dispatchOne err: %v", err)
}
if processed != 0 || len(out) != 0 {
t.Fatalf("wrong-shard event should be skipped: processed=%d emitted=%d", processed, len(out))
}
r.ShardID = target
if err := r.dispatchOne(context.Background(), resp, &processed); err != nil {
t.Fatalf("dispatchOne err: %v", err)
}
if processed != 1 || len(out) != 1 {
t.Fatalf("matching-shard event should be emitted: processed=%d emitted=%d", processed, len(out))
}
ev := <-out
if ev.Bucket != bucket || ev.Key != key || ev.TsNs != 12345 {
t.Fatalf("emitted event mismatch: %+v", ev)
}
}
func TestDispatchOneRespectsCtxCancel(t *testing.T) {
// Buffered channel of size 0 (unbuffered) blocks the send; ctx cancel
// should unblock and return ctx.Err().
bucket, key := "mybucket", "thing.txt"
target := s3lifecycle.ShardID(bucket, key)
resp := &filer_pb.SubscribeMetadataResponse{
TsNs: 1,
EventNotification: &filer_pb.EventNotification{
NewParentPath: "/buckets/" + bucket,
NewEntry: &filer_pb.Entry{Name: key},
},
}
out := make(chan *Event)
r := &Reader{
ShardID: target,
BucketsPath: "/buckets",
Cursor: NewCursor(),
Events: out,
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
processed := 0
err := r.dispatchOne(ctx, resp, &processed)
if err != context.Canceled {
t.Fatalf("expected ctx.Canceled, got %v", err)
}
}