mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* feat(s3/lifecycle): filer-backed cursor Persister FilerPersister persists per-shard cursor maps as JSON to /etc/s3/lifecycle/cursors/shard-NN.json via filer.SaveInsideFiler. One file per shard keeps Save atomic — the filer writes the entry in a single mutation, so a crash mid-write doesn't leak partial state. Pipeline.Run loads on start; the periodic checkpoint and graceful-shutdown save go through this implementation. A small FilerStore interface wraps the SeaweedFilerClient surface the persister needs, so tests inject an in-memory fake instead of mocking the full gRPC client. * refactor(s3/lifecycle): drop BlockerStore — durable cursor IS the block A frozen cursor doesn't advance, so the durable cursor (FilerPersister) encodes the blocked state on its own. On worker restart the reader re-encounters the poison event at MinTsNs, the dispatcher walks the same retry budget to BLOCKED, and the cursor freezes at the same EventTs. Other in-flight events between freeze tsNs and prior cursor positions self-resolve via NOOP_RESOLVED (STALE_IDENTITY) since the underlying objects were already deleted on the prior pass. Removed: - BlockerStore interface + InMemoryBlockerStore + BlockerRecord - Dispatcher.Blockers + Dispatcher.ReplayBlockers - the BlockerStore.Put call in handleBlocked - Pipeline.Blockers field + the ReplayBlockers call on startup Added a TestDispatchRestartReFreezesNaturally that pins the self-recovery property: a fresh Dispatcher with a fresh Cursor, fed the same poison event, reaches the same frozen state at the same EventTs without any durable blocker store. Operator visibility: a cursor whose MinTsNs hasn't advanced is the signal — surfaced via the durable cursor file. * refactor(filer): SaveInsideFiler accepts ctx ReadInsideFiler already takes ctx; SaveInsideFiler used context.Background() internally and silently dropped the caller's ctx. Symmetric API now; cancellation/deadlines propagate through LookupEntry / CreateEntry / UpdateEntry. Mechanical update of all callers — most pass context.Background() since the existing call sites have no ctx in scope. * fix(s3/lifecycle): deterministic order in cursor save Iterating Go maps yields random order, so json.Encode produced a different byte sequence on each save even when the state hadn't changed. Sort entries by (Bucket, ActionKind, RuleHash) before encoding so the on-disk file diffs cleanly. New test pins byte-identical output across two saves of the same map. * fix(s3/lifecycle): log reason when freezing cursor in handleBlocked handleBlocked dropped the reason via _ = reason with a comment claiming the caller logged it; none of the three callers do. A frozen cursor is the only surface where the operator finds out something stuck, so the reason has to land somewhere. glog.Warningf with shard, key, eventTs, and the original reason — same shape the rest of the package uses.
80 lines
2.1 KiB
Go
80 lines
2.1 KiB
Go
package filer
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
)
|
|
|
|
func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.SeaweedFilerClient, dir, name string, byteBuffer *bytes.Buffer) error {
|
|
|
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: dir,
|
|
Name: name,
|
|
}
|
|
respLookupEntry, err := filer_pb.LookupEntry(context.Background(), filerClient, request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(respLookupEntry.Entry.Content) > 0 {
|
|
_, err = byteBuffer.Write(respLookupEntry.Entry.Content)
|
|
return err
|
|
}
|
|
|
|
return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.GetChunks(), 0, int64(FileSize(respLookupEntry.Entry)))
|
|
|
|
}
|
|
|
|
func ReadInsideFiler(ctx context.Context, filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) {
|
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: dir,
|
|
Name: name,
|
|
}
|
|
respLookupEntry, err := filer_pb.LookupEntry(ctx, filerClient, request)
|
|
if err != nil {
|
|
return
|
|
}
|
|
content = respLookupEntry.Entry.Content
|
|
return
|
|
}
|
|
|
|
func SaveInsideFiler(ctx context.Context, client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error {
|
|
|
|
resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: dir,
|
|
Name: name,
|
|
})
|
|
|
|
if err == filer_pb.ErrNotFound {
|
|
err = filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{
|
|
Directory: dir,
|
|
Entry: &filer_pb.Entry{
|
|
Name: name,
|
|
IsDirectory: false,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: time.Now().Unix(),
|
|
FileMode: uint32(0644),
|
|
FileSize: uint64(len(content)),
|
|
},
|
|
Content: content,
|
|
},
|
|
SkipCheckParentDirectory: false,
|
|
})
|
|
} else if err == nil {
|
|
entry := resp.Entry
|
|
entry.Content = content
|
|
entry.Attributes.Mtime = time.Now().Unix()
|
|
entry.Attributes.FileSize = uint64(len(content))
|
|
err = filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
|
|
Directory: dir,
|
|
Entry: entry,
|
|
})
|
|
}
|
|
|
|
return err
|
|
}
|