Files
seaweedfs/weed/filer/remote_mapping.go
Chris Lu 35e3fe89bc feat(s3/lifecycle): filer-backed cursor Persister + drop BlockerStore (#9358)
* feat(s3/lifecycle): filer-backed cursor Persister

FilerPersister persists per-shard cursor maps as JSON to
/etc/s3/lifecycle/cursors/shard-NN.json via filer.SaveInsideFiler.
One file per shard keeps Save atomic — the filer writes the entry
in a single mutation, so a crash mid-write doesn't leak partial
state. Pipeline.Run loads on start; the periodic checkpoint and
graceful-shutdown save go through this implementation.

A small FilerStore interface wraps the SeaweedFilerClient surface
the persister needs, so tests inject an in-memory fake instead of
mocking the full gRPC client.

* refactor(s3/lifecycle): drop BlockerStore — durable cursor IS the block

A frozen cursor doesn't advance, so the durable cursor (FilerPersister)
encodes the blocked state on its own. On worker restart the reader
re-encounters the poison event at MinTsNs, the dispatcher walks the
same retry budget to BLOCKED, and the cursor freezes at the same
EventTs. Other in-flight events between freeze tsNs and prior cursor
positions self-resolve via NOOP_RESOLVED (STALE_IDENTITY) since the
underlying objects were already deleted on the prior pass.

Removed:
  - BlockerStore interface + InMemoryBlockerStore + BlockerRecord
  - Dispatcher.Blockers + Dispatcher.ReplayBlockers
  - the BlockerStore.Put call in handleBlocked
  - Pipeline.Blockers field + the ReplayBlockers call on startup

Added a TestDispatchRestartReFreezesNaturally that pins the
self-recovery property: a fresh Dispatcher with a fresh Cursor, fed
the same poison event, reaches the same frozen state at the same
EventTs without any durable blocker store.

Operator visibility: a cursor whose MinTsNs hasn't advanced is the
signal — surfaced via the durable cursor file.

* refactor(filer): SaveInsideFiler accepts ctx

ReadInsideFiler already takes ctx; SaveInsideFiler used context.Background()
internally and silently dropped the caller's ctx. Symmetric API now;
cancellation/deadlines propagate through LookupEntry / CreateEntry /
UpdateEntry. Mechanical update of all callers — most pass
context.Background() since the existing call sites have no ctx in scope.

* fix(s3/lifecycle): deterministic order in cursor save

Iterating Go maps yields random order, so json.Encode produced a different
byte sequence on each save even when the state hadn't changed. Sort
entries by (Bucket, ActionKind, RuleHash) before encoding so the on-disk
file diffs cleanly. New test pins byte-identical output across two saves
of the same map.

* fix(s3/lifecycle): log reason when freezing cursor in handleBlocked

handleBlocked dropped the reason via _ = reason with a comment claiming
the caller logged it; none of the three callers do. A frozen cursor is
the only surface where the operator finds out something stuck, so the
reason has to land somewhere. glog.Warningf with shard, key, eventTs,
and the original reason — same shape the rest of the package uses.
2026-05-07 17:45:04 -07:00

126 lines
3.8 KiB
Go

package filer
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return readErr
}); readErr != nil {
if readErr != filer_pb.ErrNotFound {
return nil, fmt.Errorf("read existing mapping: %w", readErr)
}
oldContent = nil
}
mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
if readErr != nil {
return nil, fmt.Errorf("unmarshal mappings: %w", readErr)
}
return
}
func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error) {
// read current mapping
var oldContent, newContent []byte
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
oldContent, err = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return err
})
if err != nil {
if err != filer_pb.ErrNotFound {
return fmt.Errorf("read existing mapping: %w", err)
}
}
// add new mapping
newContent, err = addRemoteStorageMapping(oldContent, dir, remoteStorageLocation)
if err != nil {
return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err)
}
// save back
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return SaveInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
})
if err != nil {
return fmt.Errorf("save mapping: %w", err)
}
return nil
}
func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error) {
// read current mapping
var oldContent, newContent []byte
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
oldContent, err = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return err
})
if err != nil {
if err != filer_pb.ErrNotFound {
return fmt.Errorf("read existing mapping: %w", err)
}
}
// add new mapping
newContent, err = removeRemoteStorageMapping(oldContent, dir)
if err != nil {
return fmt.Errorf("delete mount %s: %v", dir, err)
}
// save back
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return SaveInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
})
if err != nil {
return fmt.Errorf("save mapping: %w", err)
}
return nil
}
func addRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) {
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
if unmarshalErr != nil {
// skip
}
// set the new mapping
mappings.Mappings[dir] = storageLocation
if newContent, err = proto.Marshal(mappings); err != nil {
return oldContent, fmt.Errorf("marshal mappings: %w", err)
}
return
}
func removeRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) {
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
if unmarshalErr != nil {
return nil, unmarshalErr
}
// set the new mapping
delete(mappings.Mappings, dir)
if newContent, err = proto.Marshal(mappings); err != nil {
return oldContent, fmt.Errorf("marshal mappings: %w", err)
}
return
}