mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* fix(ec): verify full shard set before deleting source volume (#9490) Before this change, both the worker EC task and the shell ec.encode command would delete the source .dat as soon as MountEcShards returned — even if distribute/mount failed partway, leaving fewer than 14 shards in the cluster. The deletion was logged at V(2), so by the time someone noticed missing data the only trace was a 0-byte .dat synthesized by disk_location at next restart. - Worker path adds Step 6: poll VolumeEcShardsInfo on every destination, union the bitmaps, and refuse to call deleteOriginalVolume unless all TotalShardsCount distinct shard ids are observed. A failed gate leaves the source readonly so the next detection scan can retry. - Shell ec.encode adds the same gate after EcBalance, walking the master topology with collectEcNodeShardsInfo. - VolumeDelete RPC success and .dat/.idx unlinks now log at V(0) so any source destruction is traceable in default-verbosity production logs. The EC-balance-vs-in-flight-encode race is intentionally left for a follow-up; balance should refuse to move shards for a volume whose encode job is not in Completed state. * fix(ec): trim doc comments on the new shard-verification path Drop WHAT-describing godoc on freshly added helpers; keep only the WHY notes (query-error policy in VerifyShardsAcrossServers, the #9490 reference at the call sites). * fix(ec): drop issue-number anchors from new comments Issue references age poorly — the why behind each comment already stands on its own. * fix(ec): parametrize RequireFullShardSet on totalShards Take totalShards as an argument instead of reading the package-level TotalShardsCount constant. The OSS callers continue to pass 14, but the helper is now usable with any DataShards+ParityShards ratio. * test(plugin_workers): make fake volume server respond to VolumeEcShardsInfo The new pre-delete verification gate calls VolumeEcShardsInfo on every destination after mount, and the fake server's UnimplementedVolumeServer returns Unimplemented — the verifier read that as zero shards on every node and aborted source deletion. Build the response from recorded mount requests so the integration test exercises the gate end-to-end. * fix(rust/volume): log .dat/.idx unlink with size in remove_volume_files Mirror the Go-side change in weed/storage/volume_write.go: stat each file before removing and emit an info-level log for .dat/.idx so a destructive call is always traceable. The OSS Rust crate previously unlinked them silently. * fix(ec/decode): verify regenerated .dat before deleting EC shards After mountDecodedVolume succeeds, the previous code immediately unmounts and deletes every EC shard. A silent failure in generate or mount could leave the cluster with neither shards nor a valid normal volume. Probe ReadVolumeFileStatus on the target and refuse to proceed if dat or idx is 0 bytes. Also make the fake volume server's VolumeEcShardsInfo reflect whichever shard files exist on disk (seeded for tests as well as mounted via RPC), so the new gate can be exercised end-to-end. * fix(ec): address PR review nits in verification + fake server - Drop unused ServerShardInventory.Sizes field. - Skip shard ids >= MaxShardCount before bitmap Set so the ShardBits bound is explicit (Set already no-ops on overflow, this is for clarity). - Nil-guard the fake server's VolumeEcShardsInfo so a malformed call doesn't panic the test process.
386 lines
12 KiB
Go
386 lines
12 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"syscall"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
)
|
|
|
|
var ErrorNotFound = errors.New("not found")
|
|
var ErrorDeleted = errors.New("already deleted")
|
|
var ErrorSizeMismatch = errors.New("size mismatch")
|
|
|
|
// IoErrorTolerance is the number of consecutive EIOs a volume must
|
|
// see before CollectHeartbeat treats the replica as broken. A single
|
|
// transient error is forgiven so a brief NFS / fabric / power blip
|
|
// affecting several replicas at once does not cascade into removal of
|
|
// the last healthy copy.
|
|
const IoErrorTolerance = 3
|
|
|
|
func (v *Volume) checkReadWriteError(err error) {
|
|
if err == nil {
|
|
v.clearIoError()
|
|
return
|
|
}
|
|
if errors.Is(err, syscall.EIO) {
|
|
v.noteIoError(err)
|
|
return
|
|
}
|
|
// non-EIO error breaks the EIO streak — only sustained EIOs should
|
|
// be treated as a failing volume.
|
|
v.clearIoError()
|
|
}
|
|
|
|
// isFileUnchanged checks whether this needle to write is same as last one.
|
|
// It requires serialized access in the same volume.
|
|
func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
|
|
if v.Ttl.String() != "" {
|
|
return false
|
|
}
|
|
|
|
nv, ok := v.nm.Get(n.Id)
|
|
if ok && !nv.Offset.IsZero() && nv.Size.IsValid() {
|
|
oldNeedle := new(needle.Needle)
|
|
err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version())
|
|
if err != nil {
|
|
glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToActualOffset(), nv.Size, err)
|
|
return false
|
|
}
|
|
if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
|
|
n.DataSize = oldNeedle.DataSize
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
var ErrVolumeNotEmpty = fmt.Errorf("volume not empty")
|
|
|
|
// Destroy removes everything related to this volume. When keepRemoteData is
|
|
// true the cloud-tier object backing the volume is left intact — used by
|
|
// moves where another server is taking over the same .vif.
|
|
func (v *Volume) Destroy(onlyEmpty bool, keepRemoteData bool) (err error) {
|
|
v.dataFileAccessLock.Lock()
|
|
defer v.dataFileAccessLock.Unlock()
|
|
|
|
if onlyEmpty {
|
|
isEmpty, e := v.doIsEmpty()
|
|
if e != nil {
|
|
err = fmt.Errorf("failed to read isEmpty %v", e)
|
|
return
|
|
}
|
|
if !isEmpty {
|
|
err = ErrVolumeNotEmpty
|
|
return
|
|
}
|
|
}
|
|
if !v.isCompactionInProgress.CompareAndSwap(false, true) {
|
|
err = fmt.Errorf("volume %d is compacting", v.Id)
|
|
return
|
|
}
|
|
close(v.asyncRequestsChan)
|
|
if !keepRemoteData {
|
|
storageName, storageKey := v.RemoteStorageNameKey()
|
|
if v.HasRemoteFile() && storageName != "" && storageKey != "" {
|
|
if backendStorage, found := backend.BackendStorages[storageName]; found {
|
|
backendStorage.DeleteFile(storageKey)
|
|
}
|
|
}
|
|
}
|
|
v.doClose()
|
|
removeVolumeFiles(v.DataFileName())
|
|
removeVolumeFiles(v.IndexFileName())
|
|
return
|
|
}
|
|
|
|
func removeVolumeFiles(filename string) {
|
|
// .dat/.idx removals log at V(0) so destructive calls are traceable.
|
|
deleteAndLog := func(ext string) {
|
|
fullFilename := filename + "." + ext
|
|
st, statErr := os.Stat(fullFilename)
|
|
err := os.RemoveAll(fullFilename)
|
|
if err != nil {
|
|
glog.V(0).Infof("failed to remove volume file %s: %s", fullFilename, err)
|
|
return
|
|
}
|
|
if statErr == nil && (ext == "dat" || ext == "idx") {
|
|
glog.Infof("removed volume file %s (size=%d)", fullFilename, st.Size())
|
|
}
|
|
}
|
|
deleteAndLog("dat")
|
|
deleteAndLog("idx")
|
|
deleteAndLog("vif")
|
|
// sorted index file
|
|
deleteAndLog("sdx")
|
|
// compaction
|
|
deleteAndLog("cpd")
|
|
deleteAndLog("cpx")
|
|
// level db index file
|
|
deleteAndLog("ldb")
|
|
// redb index file (Rust volume server)
|
|
deleteAndLog("rdb")
|
|
// marker for damaged or incomplete volume
|
|
deleteAndLog("note")
|
|
}
|
|
|
|
func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) {
|
|
v.asyncRequestsChan <- request
|
|
}
|
|
|
|
func (v *Volume) syncWrite(n *needle.Needle, checkCookie bool) (offset uint64, size Size, isUnchanged bool, err error) {
|
|
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
|
v.dataFileAccessLock.Lock()
|
|
defer v.dataFileAccessLock.Unlock()
|
|
|
|
return v.doWriteRequest(n, checkCookie)
|
|
}
|
|
|
|
func (v *Volume) writeNeedle2(n *needle.Needle, checkCookie bool, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
|
|
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
|
if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL {
|
|
n.SetHasTtl()
|
|
n.Ttl = v.Ttl
|
|
}
|
|
|
|
if !fsync {
|
|
return v.syncWrite(n, checkCookie)
|
|
} else {
|
|
asyncRequest := needle.NewAsyncRequest(n, true)
|
|
// using len(n.Data) here instead of n.Size before n.Size is populated in n.Append()
|
|
asyncRequest.ActualSize = needle.GetActualSize(Size(len(n.Data)), v.Version())
|
|
|
|
v.asyncRequestAppend(asyncRequest)
|
|
offset, _, isUnchanged, err = asyncRequest.WaitComplete()
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
func (v *Volume) doWriteRequest(n *needle.Needle, checkCookie bool) (offset uint64, size Size, isUnchanged bool, err error) {
|
|
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
|
if v.isFileUnchanged(n) {
|
|
size = Size(n.DataSize)
|
|
isUnchanged = true
|
|
return
|
|
}
|
|
|
|
// check whether existing needle cookie matches
|
|
nv, ok := v.nm.Get(n.Id)
|
|
if ok {
|
|
existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
|
|
if existingNeedleReadErr != nil {
|
|
err = fmt.Errorf("reading existing needle: %w", existingNeedleReadErr)
|
|
return
|
|
}
|
|
if n.Cookie == 0 && !checkCookie {
|
|
// this is from batch deletion, and read back again when tailing a remote volume
|
|
// which only happens when checkCookie == false and fsync == false
|
|
n.Cookie = existingNeedle.Cookie
|
|
}
|
|
if existingNeedle.Cookie != n.Cookie {
|
|
glog.V(0).Infof("write cookie mismatch: existing %s, new %s",
|
|
needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n))
|
|
err = fmt.Errorf("mismatching cookie %x", n.Cookie)
|
|
return
|
|
}
|
|
}
|
|
|
|
// append to dat file
|
|
n.UpdateAppendAtNs(v.lastAppendAtNs)
|
|
var actualSize int64
|
|
offset, size, actualSize, err = n.Append(v.DataBackend, v.Version())
|
|
v.checkReadWriteError(err)
|
|
if err != nil {
|
|
err = fmt.Errorf("append to volume %d size %d actualSize %d: %v", v.Id, size, actualSize, err)
|
|
return
|
|
}
|
|
v.lastAppendAtNs = n.AppendAtNs
|
|
|
|
// add to needle map
|
|
if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
|
|
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
|
|
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
|
|
}
|
|
}
|
|
if v.lastModifiedTsSeconds < n.LastModified {
|
|
v.lastModifiedTsSeconds = n.LastModified
|
|
}
|
|
return
|
|
}
|
|
|
|
func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
|
|
// glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
|
v.dataFileAccessLock.Lock()
|
|
defer v.dataFileAccessLock.Unlock()
|
|
|
|
if v.nm == nil {
|
|
return 0, nil
|
|
}
|
|
|
|
return v.doDeleteRequest(n)
|
|
}
|
|
|
|
func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {
|
|
// todo: delete info is always appended no fsync, it may need fsync in future
|
|
fsync := false
|
|
|
|
if !fsync {
|
|
return v.syncDelete(n)
|
|
} else {
|
|
asyncRequest := needle.NewAsyncRequest(n, false)
|
|
asyncRequest.ActualSize = needle.GetActualSize(0, v.Version())
|
|
|
|
v.asyncRequestAppend(asyncRequest)
|
|
_, size, _, err := asyncRequest.WaitComplete()
|
|
|
|
return Size(size), err
|
|
}
|
|
}
|
|
|
|
func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
|
|
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
|
nv, ok := v.nm.Get(n.Id)
|
|
// fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
|
|
if ok && !nv.Size.IsDeleted() {
|
|
var offset uint64
|
|
var err error
|
|
size := nv.Size
|
|
if !v.hasRemoteFile {
|
|
n.Data = nil
|
|
n.UpdateAppendAtNs(v.lastAppendAtNs)
|
|
offset, _, _, err = n.Append(v.DataBackend, v.Version())
|
|
v.checkReadWriteError(err)
|
|
if err != nil {
|
|
return size, err
|
|
}
|
|
}
|
|
v.lastAppendAtNs = n.AppendAtNs
|
|
if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
|
|
return size, err
|
|
}
|
|
return size, err
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
func (v *Volume) startWorker() {
|
|
go func() {
|
|
chanClosed := false
|
|
for {
|
|
// chan closed. go thread will exit
|
|
if chanClosed {
|
|
break
|
|
}
|
|
currentRequests := make([]*needle.AsyncRequest, 0, 128)
|
|
currentBytesToWrite := int64(0)
|
|
for {
|
|
request, ok := <-v.asyncRequestsChan
|
|
// volume may be closed
|
|
if !ok {
|
|
chanClosed = true
|
|
break
|
|
}
|
|
if MaxPossibleVolumeSize < v.ContentSize()+uint64(currentBytesToWrite+request.ActualSize) {
|
|
request.Complete(0, 0, false,
|
|
fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.ContentSize()))
|
|
break
|
|
}
|
|
currentRequests = append(currentRequests, request)
|
|
currentBytesToWrite += request.ActualSize
|
|
// submit at most 4M bytes or 128 requests at one time to decrease request delay.
|
|
// it also need to break if there is no data in channel to avoid io hang.
|
|
if currentBytesToWrite >= 4*1024*1024 || len(currentRequests) >= 128 || len(v.asyncRequestsChan) == 0 {
|
|
break
|
|
}
|
|
}
|
|
if len(currentRequests) == 0 {
|
|
continue
|
|
}
|
|
v.dataFileAccessLock.Lock()
|
|
end, _, e := v.DataBackend.GetStat()
|
|
if e != nil {
|
|
for i := 0; i < len(currentRequests); i++ {
|
|
currentRequests[i].Complete(0, 0, false,
|
|
fmt.Errorf("cannot read current volume position: %v", e))
|
|
}
|
|
v.dataFileAccessLock.Unlock()
|
|
continue
|
|
}
|
|
|
|
for i := 0; i < len(currentRequests); i++ {
|
|
if currentRequests[i].IsWriteRequest {
|
|
offset, size, isUnchanged, err := v.doWriteRequest(currentRequests[i].N, true)
|
|
currentRequests[i].UpdateResult(offset, uint64(size), isUnchanged, err)
|
|
} else {
|
|
size, err := v.doDeleteRequest(currentRequests[i].N)
|
|
currentRequests[i].UpdateResult(0, uint64(size), false, err)
|
|
}
|
|
}
|
|
|
|
// if sync error, data is not reliable, we should mark the completed request as fail and rollback
|
|
if err := v.DataBackend.Sync(); err != nil {
|
|
// todo: this may generate dirty data or cause data inconsistent, may be weed need to panic?
|
|
if te := v.DataBackend.Truncate(end); te != nil {
|
|
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", v.DataBackend.Name(), end, te)
|
|
}
|
|
for i := 0; i < len(currentRequests); i++ {
|
|
if currentRequests[i].IsSucceed() {
|
|
currentRequests[i].UpdateResult(0, 0, false, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
for i := 0; i < len(currentRequests); i++ {
|
|
currentRequests[i].Submit()
|
|
}
|
|
v.dataFileAccessLock.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error {
|
|
|
|
v.dataFileAccessLock.Lock()
|
|
defer v.dataFileAccessLock.Unlock()
|
|
|
|
if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) {
|
|
return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
|
|
}
|
|
|
|
nv, ok := v.nm.Get(needleId)
|
|
if ok && nv.Size == size {
|
|
oldNeedle := new(needle.Needle)
|
|
err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version())
|
|
if err == nil {
|
|
newNeedle := new(needle.Needle)
|
|
err = newNeedle.ReadBytes(needleBlob, nv.Offset.ToActualOffset(), size, v.Version())
|
|
if err == nil && oldNeedle.Cookie == newNeedle.Cookie && oldNeedle.Checksum == newNeedle.Checksum && bytes.Equal(oldNeedle.Data, newNeedle.Data) {
|
|
glog.V(0).Infof("needle %v already exists", needleId)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
appendAtNs := needle.GetAppendAtNs(v.lastAppendAtNs)
|
|
offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())
|
|
|
|
v.checkReadWriteError(err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v.lastAppendAtNs = appendAtNs
|
|
|
|
// add to needle map
|
|
if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil {
|
|
glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err)
|
|
}
|
|
|
|
return err
|
|
}
|