mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-22 09:41:28 +00:00
* feat(s3/lifecycle): filer-backed cursor Persister FilerPersister persists per-shard cursor maps as JSON to /etc/s3/lifecycle/cursors/shard-NN.json via filer.SaveInsideFiler. One file per shard keeps Save atomic — the filer writes the entry in a single mutation, so a crash mid-write doesn't leak partial state. Pipeline.Run loads on start; the periodic checkpoint and graceful-shutdown save go through this implementation. A small FilerStore interface wraps the SeaweedFilerClient surface the persister needs, so tests inject an in-memory fake instead of mocking the full gRPC client. * refactor(s3/lifecycle): drop BlockerStore — durable cursor IS the block A frozen cursor doesn't advance, so the durable cursor (FilerPersister) encodes the blocked state on its own. On worker restart the reader re-encounters the poison event at MinTsNs, the dispatcher walks the same retry budget to BLOCKED, and the cursor freezes at the same EventTs. Other in-flight events between freeze tsNs and prior cursor positions self-resolve via NOOP_RESOLVED (STALE_IDENTITY) since the underlying objects were already deleted on the prior pass. Removed: - BlockerStore interface + InMemoryBlockerStore + BlockerRecord - Dispatcher.Blockers + Dispatcher.ReplayBlockers - the BlockerStore.Put call in handleBlocked - Pipeline.Blockers field + the ReplayBlockers call on startup Added a TestDispatchRestartReFreezesNaturally that pins the self-recovery property: a fresh Dispatcher with a fresh Cursor, fed the same poison event, reaches the same frozen state at the same EventTs without any durable blocker store. Operator visibility: a cursor whose MinTsNs hasn't advanced is the signal — surfaced via the durable cursor file. * refactor(filer): SaveInsideFiler accepts ctx ReadInsideFiler already takes ctx; SaveInsideFiler used context.Background() internally and silently dropped the caller's ctx. Symmetric API now; cancellation/deadlines propagate through LookupEntry / CreateEntry / UpdateEntry. Mechanical update of all callers — most pass context.Background() since the existing call sites have no ctx in scope. * fix(s3/lifecycle): deterministic order in cursor save Iterating Go maps yields random order, so json.Encode produced a different byte sequence on each save even when the state hadn't changed. Sort entries by (Bucket, ActionKind, RuleHash) before encoding so the on-disk file diffs cleanly. New test pins byte-identical output across two saves of the same map. * fix(s3/lifecycle): log reason when freezing cursor in handleBlocked handleBlocked dropped the reason via _ = reason with a comment claiming the caller logged it; none of the three callers do. A frozen cursor is the only surface where the operator finds out something stuck, so the reason has to land somewhere. glog.Warningf with shard, key, eventTs, and the original reason — same shape the rest of the package uses.
126 lines
3.8 KiB
Go
126 lines
3.8 KiB
Go
package filer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
|
|
var oldContent []byte
|
|
if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
oldContent, readErr = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
|
|
return readErr
|
|
}); readErr != nil {
|
|
if readErr != filer_pb.ErrNotFound {
|
|
return nil, fmt.Errorf("read existing mapping: %w", readErr)
|
|
}
|
|
oldContent = nil
|
|
}
|
|
mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("unmarshal mappings: %w", readErr)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error) {
|
|
|
|
// read current mapping
|
|
var oldContent, newContent []byte
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
oldContent, err = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
if err != filer_pb.ErrNotFound {
|
|
return fmt.Errorf("read existing mapping: %w", err)
|
|
}
|
|
}
|
|
|
|
// add new mapping
|
|
newContent, err = addRemoteStorageMapping(oldContent, dir, remoteStorageLocation)
|
|
if err != nil {
|
|
return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err)
|
|
}
|
|
|
|
// save back
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return SaveInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("save mapping: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error) {
|
|
|
|
// read current mapping
|
|
var oldContent, newContent []byte
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
oldContent, err = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
if err != filer_pb.ErrNotFound {
|
|
return fmt.Errorf("read existing mapping: %w", err)
|
|
}
|
|
}
|
|
|
|
// add new mapping
|
|
newContent, err = removeRemoteStorageMapping(oldContent, dir)
|
|
if err != nil {
|
|
return fmt.Errorf("delete mount %s: %v", dir, err)
|
|
}
|
|
|
|
// save back
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return SaveInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("save mapping: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func addRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) {
|
|
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
|
|
if unmarshalErr != nil {
|
|
// skip
|
|
}
|
|
|
|
// set the new mapping
|
|
mappings.Mappings[dir] = storageLocation
|
|
|
|
if newContent, err = proto.Marshal(mappings); err != nil {
|
|
return oldContent, fmt.Errorf("marshal mappings: %w", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func removeRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) {
|
|
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
|
|
if unmarshalErr != nil {
|
|
return nil, unmarshalErr
|
|
}
|
|
|
|
// set the new mapping
|
|
delete(mappings.Mappings, dir)
|
|
|
|
if newContent, err = proto.Marshal(mappings); err != nil {
|
|
return oldContent, fmt.Errorf("marshal mappings: %w", err)
|
|
}
|
|
|
|
return
|
|
}
|