mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-17 07:11:30 +00:00
* fix(s3/lifecycle): reader handles bare /buckets parent and pre-normalizes prefix extractBucketKey accepted /buckets/ but rejected /buckets (no trailing slash); some delete events emit the bare form, so bucket-root events were silently dropped. Pre-normalize BucketsPath once on Run instead of recomputing per event. * perf(s3/lifecycle): pool sha256 hashers in ShardID ShardID runs on every meta-log event before the shard filter; a fresh sha256.New per call produces measurable allocator pressure under load. sync.Pool reuses hashers across calls. * fix(s3/lifecycle): router skips hard deletes and missing-attribute events A hard delete carries no schedule-relevant state — Expiration would hit NOOP_RESOLVED at dispatch and ExpiredObjectDeleteMarker fires from a Create on the latest version. Skip rather than burn a schedule slot. Missing Attributes leaves ModTime at year 0001, which makes ExpirationDays fire immediately at dispatch. Skip the event instead. Drop the unused 'versioned' parameter from buildObjectInfo; the dispatcher's identity-CAS handles version drift in Phase 5. * fix(s3/lifecycle): EntryIdentity.MtimeNs holds true nanoseconds Both computeEntryIdentity (server) and buildIdentity (router) wrote entry.Attributes.Mtime (seconds) into a field named MtimeNs. The CAS worked because both sides agreed, but the encoding contradicted the field name and would break if either side later started using true nanoseconds. Combine Mtime*1e9 + the FuseAttributes.MtimeNs nanosecond component on both sides; the test was updated to match. * fix(s3/lifecycle): dispatcher distinguishes ctx cancel from transport errors A canceled or deadline-exceeded RPC is shutdown, not a transport failure: re-queue the Match at its original DueTime with no retry-budget burn so a quick restart can't escalate it to BLOCKED. * fix(s3/lifecycle): reader fallback prefix normalization mirrors Run The fallback path that builds prefix from r.BucketsPath when bucketsPathSlash is empty (test-only entry into extractBucketKey) was appending an unconditional '/', producing '//' if BucketsPath already ended with one. Use the same normalization Run does. * fix(s3/lifecycle): ObjectInfo.ModTime carries the nanosecond component ModTime dropped FuseAttributes.MtimeNs, leaving ExpirationDays one nanosecond off relative to EntryIdentity.MtimeNs. Pass both to time.Unix so the precision matches the CAS witness.
185 lines
5.7 KiB
Go
185 lines
5.7 KiB
Go
package reader
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
|
|
)
|
|
|
|
func TestExtractBucketKeyCreate(t *testing.T) {
|
|
r := &Reader{BucketsPath: "/buckets"}
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/buckets/mybucket/path/to",
|
|
NewEntry: &filer_pb.Entry{Name: "obj.txt"},
|
|
},
|
|
}
|
|
b, k, ok := r.extractBucketKey(resp)
|
|
if !ok || b != "mybucket" || k != "path/to/obj.txt" {
|
|
t.Fatalf("got (%q,%q,%v), want (mybucket,path/to/obj.txt,true)", b, k, ok)
|
|
}
|
|
}
|
|
|
|
func TestExtractBucketKeyTopLevelObject(t *testing.T) {
|
|
r := &Reader{BucketsPath: "/buckets"}
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/buckets/mybucket",
|
|
NewEntry: &filer_pb.Entry{Name: "a.txt"},
|
|
},
|
|
}
|
|
b, k, ok := r.extractBucketKey(resp)
|
|
if !ok || b != "mybucket" || k != "a.txt" {
|
|
t.Fatalf("got (%q,%q,%v), want (mybucket,a.txt,true)", b, k, ok)
|
|
}
|
|
}
|
|
|
|
func TestExtractBucketKeyDeleteUsesOldEntry(t *testing.T) {
|
|
r := &Reader{BucketsPath: "/buckets"}
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/buckets/b/dir",
|
|
OldEntry: &filer_pb.Entry{Name: "gone.txt"},
|
|
},
|
|
}
|
|
b, k, ok := r.extractBucketKey(resp)
|
|
if !ok || b != "b" || k != "dir/gone.txt" {
|
|
t.Fatalf("got (%q,%q,%v), want (b,dir/gone.txt,true)", b, k, ok)
|
|
}
|
|
}
|
|
|
|
func TestExtractBucketKeyDeleteWithEmptyParent(t *testing.T) {
|
|
// Some delete events carry the directory in resp.Directory rather than
|
|
// EventNotification.NewParentPath.
|
|
r := &Reader{BucketsPath: "/buckets"}
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
Directory: "/buckets/b/dir",
|
|
EventNotification: &filer_pb.EventNotification{
|
|
OldEntry: &filer_pb.Entry{Name: "gone.txt"},
|
|
},
|
|
}
|
|
b, k, ok := r.extractBucketKey(resp)
|
|
if !ok || b != "b" || k != "dir/gone.txt" {
|
|
t.Fatalf("got (%q,%q,%v), want (b,dir/gone.txt,true)", b, k, ok)
|
|
}
|
|
}
|
|
|
|
func TestExtractBucketKeyOutsideBucketsSkipped(t *testing.T) {
|
|
r := &Reader{BucketsPath: "/buckets"}
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/etc/something",
|
|
NewEntry: &filer_pb.Entry{Name: "config"},
|
|
},
|
|
}
|
|
if _, _, ok := r.extractBucketKey(resp); ok {
|
|
t.Fatal("expected skip for non-bucket path")
|
|
}
|
|
}
|
|
|
|
func TestExtractBucketKeyBucketCreateAtRoot(t *testing.T) {
|
|
// Creating a new bucket emits an event at /buckets/ with the bucket as
|
|
// the new entry's Name.
|
|
r := &Reader{BucketsPath: "/buckets"}
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/buckets/",
|
|
NewEntry: &filer_pb.Entry{Name: "newbucket"},
|
|
},
|
|
}
|
|
b, k, ok := r.extractBucketKey(resp)
|
|
if !ok || b != "newbucket" || k != "" {
|
|
t.Fatalf("got (%q,%q,%v), want (newbucket,,true)", b, k, ok)
|
|
}
|
|
}
|
|
|
|
func TestExtractBucketKeyBucketCreateBareRoot(t *testing.T) {
|
|
// Some delete and create events emit the parent without a trailing
|
|
// slash. /buckets (no trailing slash) must also resolve to a bucket-
|
|
// root event with the entry name as the bucket.
|
|
r := &Reader{BucketsPath: "/buckets"}
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/buckets",
|
|
NewEntry: &filer_pb.Entry{Name: "newbucket"},
|
|
},
|
|
}
|
|
b, k, ok := r.extractBucketKey(resp)
|
|
if !ok || b != "newbucket" || k != "" {
|
|
t.Fatalf("bare /buckets parent: got (%q,%q,%v), want (newbucket,,true)", b, k, ok)
|
|
}
|
|
}
|
|
|
|
func TestDispatchOneFiltersByShard(t *testing.T) {
|
|
// Pick a bucket+key whose shard is known, set Reader to a different
|
|
// shard, expect skip; same shard, expect emit.
|
|
bucket, key := "mybucket", "thing.txt"
|
|
target := s3lifecycle.ShardID(bucket, key)
|
|
other := (target + 1) % s3lifecycle.ShardCount
|
|
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
TsNs: 12345,
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/buckets/" + bucket,
|
|
NewEntry: &filer_pb.Entry{Name: key},
|
|
},
|
|
}
|
|
|
|
out := make(chan *Event, 1)
|
|
r := &Reader{
|
|
ShardID: other,
|
|
BucketsPath: "/buckets",
|
|
Cursor: NewCursor(),
|
|
Events: out,
|
|
}
|
|
processed := 0
|
|
if err := r.dispatchOne(context.Background(), resp, &processed); err != nil {
|
|
t.Fatalf("dispatchOne err: %v", err)
|
|
}
|
|
if processed != 0 || len(out) != 0 {
|
|
t.Fatalf("wrong-shard event should be skipped: processed=%d emitted=%d", processed, len(out))
|
|
}
|
|
|
|
r.ShardID = target
|
|
if err := r.dispatchOne(context.Background(), resp, &processed); err != nil {
|
|
t.Fatalf("dispatchOne err: %v", err)
|
|
}
|
|
if processed != 1 || len(out) != 1 {
|
|
t.Fatalf("matching-shard event should be emitted: processed=%d emitted=%d", processed, len(out))
|
|
}
|
|
ev := <-out
|
|
if ev.Bucket != bucket || ev.Key != key || ev.TsNs != 12345 {
|
|
t.Fatalf("emitted event mismatch: %+v", ev)
|
|
}
|
|
}
|
|
|
|
func TestDispatchOneRespectsCtxCancel(t *testing.T) {
|
|
// Buffered channel of size 0 (unbuffered) blocks the send; ctx cancel
|
|
// should unblock and return ctx.Err().
|
|
bucket, key := "mybucket", "thing.txt"
|
|
target := s3lifecycle.ShardID(bucket, key)
|
|
resp := &filer_pb.SubscribeMetadataResponse{
|
|
TsNs: 1,
|
|
EventNotification: &filer_pb.EventNotification{
|
|
NewParentPath: "/buckets/" + bucket,
|
|
NewEntry: &filer_pb.Entry{Name: key},
|
|
},
|
|
}
|
|
out := make(chan *Event)
|
|
r := &Reader{
|
|
ShardID: target,
|
|
BucketsPath: "/buckets",
|
|
Cursor: NewCursor(),
|
|
Events: out,
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
processed := 0
|
|
err := r.dispatchOne(ctx, resp, &processed)
|
|
if err != context.Canceled {
|
|
t.Fatalf("expected ctx.Canceled, got %v", err)
|
|
}
|
|
}
|