mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-18 07:41:31 +00:00
* fix(ec): blanket-clean every destination over the full shard range The previous cleanup pass walked t.sources only, with the shard ids the topology had reported at detection time. In the wild, a destination can end up with EC shards mounted that the topology snapshot didn't list — shards on a sibling disk that hadn't heartbeated, or shards left over from a concurrent attempt's mount step. FindEcVolume still returns true, so the next ReceiveFile trips the mounted-volume guard. Cleanup now unions t.sources (with ShardIds) and t.targets and issues unmount + delete over [0..totalShards-1] on each. Both RPCs are idempotent on missing shards, so the wider sweep is free. Two new tests cover the gap: shards mounted beyond what t.sources lists, and a target-only destination with no source row. * log(ec): include disk_id in EC unmount/delete/refusal log lines The current logs identify the volume and shard but leave disk_id off, which makes the cross-server cleanup story hard to follow when multiple disks of one server hold pieces of the same volume: UnmountEcShards 4121.1 -> add disk_id ec volume video-recordings_4121 shard delete [1 5] -> add per-loc disk_id volume server X:Y deletes ec shards from 4121 [...] -> add disk_id ReceiveFile: ec volume 4121 is mounted; refusing... -> add disk_ids ReceiveFile's refusal now names the disk_ids actually holding the mount so operators can see whether the next cleanup pass needs to target a sibling disk. Added Store.FindEcVolumeDiskIds / Store::find_ec_volume_disk_ids as the supporting primitive. Mirrored in seaweed-volume/src/ (unmount log in Store::unmount_ec_shard, heartbeat delete log in diff_ec_shard_delta_messages, refusal in the ReceiveFile handler). * test(ec): stub VolumeEcShardsUnmount/Delete on the fake volume server The plugin-worker EC tests boot a fake volume server that embeds UnimplementedVolumeServerServer. After the worker started calling VolumeEcShardsUnmount + VolumeEcShardsDelete pre-distribute, the default Unimplemented response surfaced as fourteen "method not implemented" errors and TestErasureCodingExecutionEncodesShards failed. Both RPCs are no-ops here — nothing on the fake server has mounted state or persisted shard files to remove.
855 lines
30 KiB
Go
855 lines
30 KiB
Go
package erasure_coding
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
storagetypes "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// ErasureCodingTask implements the Task interface
|
|
type ErasureCodingTask struct {
|
|
*base.BaseTask
|
|
server string
|
|
volumeID uint32
|
|
collection string
|
|
workDir string
|
|
progress float64
|
|
grpcDialOption grpc.DialOption
|
|
|
|
// EC parameters
|
|
dataShards int32
|
|
parityShards int32
|
|
sourceDiskType string // source volume's disk type, forwarded to Mount RPC (#9423)
|
|
targets []*worker_pb.TaskTarget // Unified targets for EC shards
|
|
sources []*worker_pb.TaskSource // Unified sources for cleanup
|
|
shardAssignment map[string][]string // destination -> assigned shard types
|
|
}
|
|
|
|
// NewErasureCodingTask creates a new unified EC task instance
|
|
func NewErasureCodingTask(id string, server string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *ErasureCodingTask {
|
|
return &ErasureCodingTask{
|
|
BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding),
|
|
server: server,
|
|
volumeID: volumeID,
|
|
collection: collection,
|
|
dataShards: erasure_coding.DataShardsCount, // Default values
|
|
parityShards: erasure_coding.ParityShardsCount, // Default values
|
|
grpcDialOption: grpcDialOption,
|
|
}
|
|
}
|
|
|
|
// Execute implements the UnifiedTask interface
|
|
func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
|
|
if params == nil {
|
|
return fmt.Errorf("task parameters are required")
|
|
}
|
|
|
|
ecParams := params.GetErasureCodingParams()
|
|
if ecParams == nil {
|
|
return fmt.Errorf("erasure coding parameters are required")
|
|
}
|
|
|
|
t.dataShards = ecParams.DataShards
|
|
t.parityShards = ecParams.ParityShards
|
|
t.sourceDiskType = ecParams.SourceDiskType
|
|
t.workDir = ecParams.WorkingDir
|
|
t.targets = params.Targets // Get unified targets
|
|
t.sources = params.Sources // Get unified sources
|
|
|
|
// Log detailed task information
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"server": t.server,
|
|
"collection": t.collection,
|
|
"data_shards": t.dataShards,
|
|
"parity_shards": t.parityShards,
|
|
"total_shards": t.dataShards + t.parityShards,
|
|
"targets": len(t.targets),
|
|
"sources": len(t.sources),
|
|
}).Info("Starting erasure coding task")
|
|
|
|
// Log detailed target server assignments
|
|
for i, target := range t.targets {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"target_index": i,
|
|
"server": target.Node,
|
|
"shard_ids": target.ShardIds,
|
|
"shard_count": len(target.ShardIds),
|
|
}).Info("Target server shard assignment")
|
|
}
|
|
|
|
// Log source information
|
|
for i, source := range t.sources {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"source_index": i,
|
|
"server": source.Node,
|
|
"volume_id": source.VolumeId,
|
|
"disk_id": source.DiskId,
|
|
"rack": source.Rack,
|
|
"data_center": source.DataCenter,
|
|
}).Info("Source server information")
|
|
}
|
|
|
|
// Use the working directory from task parameters, or fall back to a default
|
|
baseWorkDir := ecParams.WorkingDir
|
|
if baseWorkDir == "" {
|
|
baseWorkDir = t.GetWorkingDir()
|
|
}
|
|
taskWorkDir := filepath.Join(baseWorkDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
|
|
if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
|
|
}
|
|
glog.V(1).Infof("Created working directory: %s", taskWorkDir)
|
|
|
|
// Update the task's working directory to the specific instance directory
|
|
t.workDir = taskWorkDir
|
|
glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir)
|
|
|
|
// Ensure cleanup of working directory
|
|
defer func() {
|
|
// Clean up volume files and EC shards
|
|
patterns := []string{"*.dat", "*.idx", "*.ec*", "*.vif"}
|
|
for _, pattern := range patterns {
|
|
matches, err := filepath.Glob(filepath.Join(taskWorkDir, pattern))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, match := range matches {
|
|
if err := os.Remove(match); err != nil {
|
|
glog.V(2).Infof("Could not remove %s: %v", match, err)
|
|
}
|
|
}
|
|
}
|
|
// Remove the entire working directory
|
|
if err := os.RemoveAll(taskWorkDir); err != nil {
|
|
glog.V(2).Infof("Could not remove working directory %s: %v", taskWorkDir, err)
|
|
} else {
|
|
glog.V(1).Infof("Cleaned up working directory: %s", taskWorkDir)
|
|
}
|
|
}()
|
|
|
|
// Step 1: Mark volume readonly
|
|
t.ReportProgressWithStage(10.0, "Marking volume readonly")
|
|
t.GetLogger().Info("Marking volume readonly")
|
|
if err := t.markVolumeReadonly(ctx); err != nil {
|
|
return fmt.Errorf("failed to mark volume readonly: %v", err)
|
|
}
|
|
|
|
// Step 2: Copy volume files to worker
|
|
// The .idx and .dat are copied as separate network transfers, with .idx
|
|
// copied first. If a write lands on the source after the .idx copy, the
|
|
// .dat will include extra data not referenced by .idx (harmless).
|
|
// verifyDatIdxConsistency() in generateEcShardsLocally catches the reverse
|
|
// case where .idx references data past .dat.
|
|
t.ReportProgressWithStage(25.0, "Copying volume files to worker")
|
|
t.GetLogger().Info("Copying volume files to worker")
|
|
localFiles, err := t.copyVolumeFilesToWorker(ctx, taskWorkDir)
|
|
if err != nil {
|
|
t.rollbackReadonly(ctx)
|
|
return fmt.Errorf("failed to copy volume files: %v", err)
|
|
}
|
|
|
|
// Step 3: Generate EC shards locally
|
|
t.ReportProgressWithStage(40.0, "Generating EC shards locally")
|
|
t.GetLogger().Info("Generating EC shards locally")
|
|
shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir)
|
|
if err != nil {
|
|
t.rollbackReadonly(ctx)
|
|
return fmt.Errorf("failed to generate EC shards: %v", err)
|
|
}
|
|
|
|
// Clear partial EC shards left over on destinations from a prior failed
|
|
// encode so distributeEcShards' ReceiveFile is not refused by the
|
|
// mounted-volume guard.
|
|
t.ReportProgressWithStage(55.0, "Clearing stale EC shards on destinations")
|
|
t.GetLogger().Info("Clearing stale EC shards on destinations")
|
|
if err := t.cleanupStaleEcShards(ctx); err != nil {
|
|
t.rollbackReadonly(ctx)
|
|
return fmt.Errorf("failed to clear stale EC shards on destinations: %v", err)
|
|
}
|
|
|
|
// Step 4: Distribute shards to destinations
|
|
t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations")
|
|
t.GetLogger().Info("Distributing EC shards to destinations")
|
|
if err := t.distributeEcShards(shardFiles); err != nil {
|
|
return fmt.Errorf("failed to distribute EC shards: %v", err)
|
|
}
|
|
|
|
// Step 5: Mount EC shards
|
|
t.ReportProgressWithStage(80.0, "Mounting EC shards")
|
|
t.GetLogger().Info("Mounting EC shards")
|
|
if err := t.mountEcShards(); err != nil {
|
|
return fmt.Errorf("failed to mount EC shards: %v", err)
|
|
}
|
|
|
|
// Without this gate, a partial distribute/mount lets the next step
|
|
// zero the only intact .dat while the cluster is missing shards.
|
|
t.ReportProgressWithStage(85.0, "Verifying EC shards across destinations")
|
|
t.GetLogger().Info("Verifying EC shards across destinations")
|
|
if err := t.verifyEcShardsBeforeDelete(ctx); err != nil {
|
|
return fmt.Errorf("EC shard verification failed; refusing to delete source volume %d: %w", t.volumeID, err)
|
|
}
|
|
|
|
// Step 7: Delete original volume
|
|
t.ReportProgressWithStage(90.0, "Deleting original volume")
|
|
t.GetLogger().Info("Deleting original volume")
|
|
if err := t.deleteOriginalVolume(ctx); err != nil {
|
|
return fmt.Errorf("failed to delete original volume: %v", err)
|
|
}
|
|
|
|
t.ReportProgressWithStage(100.0, "EC processing complete")
|
|
glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed",
|
|
t.volumeID, t.server, len(shardFiles))
|
|
|
|
return nil
|
|
}
|
|
|
|
// Validate implements the UnifiedTask interface
|
|
func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
|
|
if params == nil {
|
|
return fmt.Errorf("task parameters are required")
|
|
}
|
|
|
|
ecParams := params.GetErasureCodingParams()
|
|
if ecParams == nil {
|
|
return fmt.Errorf("erasure coding parameters are required")
|
|
}
|
|
|
|
if params.VolumeId != t.volumeID {
|
|
return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
|
|
}
|
|
|
|
// Validate that at least one source matches our server
|
|
found := false
|
|
for _, source := range params.Sources {
|
|
if source.Node == t.server {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
return fmt.Errorf("no source matches expected server %s", t.server)
|
|
}
|
|
|
|
if ecParams.DataShards < 1 {
|
|
return fmt.Errorf("invalid data shards: %d (must be >= 1)", ecParams.DataShards)
|
|
}
|
|
|
|
if ecParams.ParityShards < 1 {
|
|
return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards)
|
|
}
|
|
|
|
if len(params.Targets) < int(ecParams.DataShards+ecParams.ParityShards) {
|
|
return fmt.Errorf("insufficient targets: got %d, need %d", len(params.Targets), ecParams.DataShards+ecParams.ParityShards)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// EstimateTime implements the UnifiedTask interface
|
|
func (t *ErasureCodingTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
|
|
// Basic estimate based on simulated steps
|
|
return 20 * time.Second // Sum of all step durations
|
|
}
|
|
|
|
// GetProgress returns current progress
|
|
func (t *ErasureCodingTask) GetProgress() float64 {
|
|
return t.progress
|
|
}
|
|
|
|
// Helper methods for actual EC operations
|
|
|
|
// markVolumeReadonly marks the volume as readonly on the source server
|
|
func (t *ErasureCodingTask) markVolumeReadonly(ctx context.Context) error {
|
|
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
_, err := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
return err
|
|
})
|
|
}
|
|
|
|
// rollbackReadonly is a best-effort rollback of markVolumeReadonly, used when the
|
|
// EC task fails before any shards are distributed. Logs but does not return errors.
|
|
// Uses a fresh context with timeout since the caller's ctx may already be cancelled.
|
|
func (t *ErasureCodingTask) rollbackReadonly(_ context.Context) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
if err := t.markVolumeWritable(ctx); err != nil {
|
|
glog.Warningf("failed to restore volume %d to writable after EC task failure: %v", t.volumeID, err)
|
|
} else {
|
|
glog.V(0).Infof("restored volume %d to writable after EC task failure", t.volumeID)
|
|
}
|
|
}
|
|
|
|
// markVolumeWritable restores the volume to writable on the source server.
|
|
func (t *ErasureCodingTask) markVolumeWritable(ctx context.Context) error {
|
|
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
_, err := client.VolumeMarkWritable(ctx, &volume_server_pb.VolumeMarkWritableRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
return err
|
|
})
|
|
}
|
|
|
|
// copyVolumeFilesToWorker copies .idx and .dat files from source server to local worker.
|
|
// The .idx is copied first, then .dat. Both copies are capped to the sizes reported by
|
|
// ReadVolumeFileStatus. If a write lands after .idx is copied, .dat may include extra
|
|
// data not referenced by .idx (harmless). The reverse (idx referencing data past .dat)
|
|
// is caught by verifyDatIdxConsistency in generateEcShardsLocally.
|
|
func (t *ErasureCodingTask) copyVolumeFilesToWorker(ctx context.Context, workDir string) (map[string]string, error) {
|
|
localFiles := make(map[string]string)
|
|
|
|
fileStatus, err := t.readSourceVolumeFileStatus(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read source volume file status: %v", err)
|
|
}
|
|
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"source": t.server,
|
|
"working_dir": workDir,
|
|
"compaction_revision": fileStatus.GetCompactionRevision(),
|
|
"dat_file_size_bytes": fileStatus.GetDatFileSize(),
|
|
"idx_file_size_bytes": fileStatus.GetIdxFileSize(),
|
|
}).Info("Starting volume file copy from source server")
|
|
|
|
// Copy .idx file FIRST — if a write lands on the source after this copy,
|
|
// the .dat copy will include the new data but .idx won't reference it.
|
|
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
|
|
if err := t.copyFileFromSource(ctx, ".idx", idxFile, fileStatus.GetCompactionRevision(), fileStatus.GetIdxFileSize()); err != nil {
|
|
return nil, fmt.Errorf("failed to copy .idx file: %v", err)
|
|
}
|
|
localFiles["idx"] = idxFile
|
|
|
|
if info, err := os.Stat(idxFile); err == nil {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"file_type": ".idx",
|
|
"file_path": idxFile,
|
|
"size_bytes": info.Size(),
|
|
"size_mb": float64(info.Size()) / (1024 * 1024),
|
|
}).Info("Volume index file copied successfully")
|
|
}
|
|
|
|
// Copy .dat file SECOND — guaranteed to have at least as much data as .idx references.
|
|
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
|
|
if err := t.copyFileFromSource(ctx, ".dat", datFile, fileStatus.GetCompactionRevision(), fileStatus.GetDatFileSize()); err != nil {
|
|
return nil, fmt.Errorf("failed to copy .dat file: %v", err)
|
|
}
|
|
localFiles["dat"] = datFile
|
|
|
|
if info, err := os.Stat(datFile); err == nil {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"file_type": ".dat",
|
|
"file_path": datFile,
|
|
"size_bytes": info.Size(),
|
|
"size_mb": float64(info.Size()) / (1024 * 1024),
|
|
}).Info("Volume data file copied successfully")
|
|
}
|
|
|
|
return localFiles, nil
|
|
}
|
|
|
|
func (t *ErasureCodingTask) readSourceVolumeFileStatus(ctx context.Context) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
|
|
var statusResp *volume_server_pb.ReadVolumeFileStatusResponse
|
|
err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
var readErr error
|
|
statusResp, readErr = client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
|
|
VolumeId: t.volumeID,
|
|
})
|
|
return readErr
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if statusResp.GetDatFileSize() == 0 {
|
|
return nil, fmt.Errorf("volume %d on %s reports zero dat file size", t.volumeID, t.server)
|
|
}
|
|
if statusResp.GetIdxFileSize() == 0 {
|
|
return nil, fmt.Errorf("volume %d on %s reports zero idx file size with non-empty dat", t.volumeID, t.server)
|
|
}
|
|
return statusResp, nil
|
|
}
|
|
|
|
// copyFileFromSource copies a file from source server to local path using gRPC streaming
|
|
func (t *ErasureCodingTask) copyFileFromSource(ctx context.Context, ext, localPath string, compactionRevision uint32, stopOffset uint64) error {
|
|
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
|
VolumeId: t.volumeID,
|
|
Collection: t.collection,
|
|
Ext: ext,
|
|
CompactionRevision: compactionRevision,
|
|
StopOffset: stopOffset,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initiate file copy: %v", err)
|
|
}
|
|
|
|
// Create local file
|
|
localFile, err := os.Create(localPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create local file %s: %v", localPath, err)
|
|
}
|
|
defer localFile.Close()
|
|
|
|
// Stream data and write to local file
|
|
totalBytes := int64(0)
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to receive file data: %v", err)
|
|
}
|
|
|
|
if len(resp.FileContent) > 0 {
|
|
written, writeErr := localFile.Write(resp.FileContent)
|
|
if writeErr != nil {
|
|
return fmt.Errorf("failed to write to local file: %v", writeErr)
|
|
}
|
|
totalBytes += int64(written)
|
|
}
|
|
}
|
|
|
|
if totalBytes != int64(stopOffset) {
|
|
return fmt.Errorf("short copy of %s: got %d bytes, expected %d", ext, totalBytes, stopOffset)
|
|
}
|
|
glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// generateEcShardsLocally generates EC shards from local volume files
|
|
func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
|
|
datFile := localFiles["dat"]
|
|
idxFile := localFiles["idx"]
|
|
|
|
if datFile == "" || idxFile == "" {
|
|
return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
|
|
}
|
|
|
|
// Get base name without extension for EC operations
|
|
baseName := strings.TrimSuffix(datFile, ".dat")
|
|
shardFiles := make(map[string]string)
|
|
|
|
glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
|
|
|
|
// Verify .dat and .idx are consistent before EC encoding.
|
|
// Since they were copied as separate network transfers, the .idx may have
|
|
// entries pointing past the end of .dat if a write landed between the copies.
|
|
if err := verifyDatIdxConsistency(datFile, idxFile); err != nil {
|
|
return nil, fmt.Errorf("dat/idx consistency check failed: %v", err)
|
|
}
|
|
|
|
// Generate .ecx file from .idx BEFORE EC shards to prevent inconsistency.
|
|
if err := erasure_coding.WriteSortedFileFromIdx(baseName, ".ecx"); err != nil {
|
|
return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
|
|
}
|
|
|
|
// Generate EC shard files (.ec00 ~ .ec13)
|
|
if err := erasure_coding.WriteEcFiles(baseName); err != nil {
|
|
return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
|
|
}
|
|
|
|
// Collect generated shard file paths and log details
|
|
var generatedShards []string
|
|
var totalShardSize int64
|
|
|
|
// Check up to MaxShardCount (32) to support custom EC ratios
|
|
for i := 0; i < erasure_coding.MaxShardCount; i++ {
|
|
shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
|
|
if info, err := os.Stat(shardFile); err == nil {
|
|
shardKey := fmt.Sprintf("ec%02d", i)
|
|
shardFiles[shardKey] = shardFile
|
|
generatedShards = append(generatedShards, shardKey)
|
|
totalShardSize += info.Size()
|
|
|
|
// Log individual shard details
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"shard_id": i,
|
|
"shard_type": shardKey,
|
|
"file_path": shardFile,
|
|
"size_bytes": info.Size(),
|
|
"size_kb": float64(info.Size()) / 1024,
|
|
}).Info("EC shard generated")
|
|
}
|
|
}
|
|
|
|
// Add metadata files
|
|
ecxFile := baseName + ".ecx"
|
|
if info, err := os.Stat(ecxFile); err == nil {
|
|
shardFiles["ecx"] = ecxFile
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"file_type": "ecx",
|
|
"file_path": ecxFile,
|
|
"size_bytes": info.Size(),
|
|
}).Info("EC index file generated")
|
|
}
|
|
|
|
ecjFile := baseName + ".ecj"
|
|
if info, err := os.Stat(ecjFile); err == nil {
|
|
shardFiles["ecj"] = ecjFile
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"file_type": "ecj",
|
|
"file_path": ecjFile,
|
|
"size_bytes": info.Size(),
|
|
}).Info("EC journal file generated")
|
|
}
|
|
|
|
// Generate .vif file (volume info)
|
|
vifFile := baseName + ".vif"
|
|
volumeInfo := &volume_server_pb.VolumeInfo{
|
|
Version: uint32(needle.GetCurrentVersion()),
|
|
}
|
|
if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
|
|
glog.Warningf("Failed to create .vif file: %v", err)
|
|
} else {
|
|
shardFiles["vif"] = vifFile
|
|
if info, err := os.Stat(vifFile); err == nil {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"file_type": "vif",
|
|
"file_path": vifFile,
|
|
"size_bytes": info.Size(),
|
|
}).Info("Volume info file generated")
|
|
}
|
|
}
|
|
|
|
// Log summary of generation
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"total_files": len(shardFiles),
|
|
"ec_shards": len(generatedShards),
|
|
"generated_shards": generatedShards,
|
|
"total_shard_size_mb": float64(totalShardSize) / (1024 * 1024),
|
|
}).Info("EC shard generation completed")
|
|
return shardFiles, nil
|
|
}
|
|
|
|
// distributeEcShards distributes locally generated EC shards to destination servers
|
|
// using pre-assigned shard IDs from planning phase
|
|
func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error {
|
|
assignment, err := erasure_coding.DistributeEcShards(t.volumeID, t.collection, t.targets, shardFiles, t.grpcDialOption, t.GetLogger())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.shardAssignment = assignment
|
|
return nil
|
|
}
|
|
|
|
// mountEcShards mounts EC shards on destination servers
|
|
func (t *ErasureCodingTask) mountEcShards() error {
|
|
return erasure_coding.MountEcShards(t.volumeID, t.collection, t.shardAssignment, t.sourceDiskType, t.grpcDialOption, t.GetLogger())
|
|
}
|
|
|
|
func (t *ErasureCodingTask) verifyEcShardsBeforeDelete(ctx context.Context) error {
|
|
servers := make([]string, 0, len(t.shardAssignment))
|
|
for node := range t.shardAssignment {
|
|
servers = append(servers, node)
|
|
}
|
|
if len(servers) == 0 {
|
|
return fmt.Errorf("no destinations to verify; shardAssignment is empty")
|
|
}
|
|
|
|
totalShards := int(t.dataShards + t.parityShards)
|
|
union, perServer := erasure_coding.VerifyShardsAcrossServers(ctx, t.volumeID, servers, t.grpcDialOption)
|
|
|
|
summary := erasure_coding.SummarizeShardInventory(perServer)
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"shards_seen": union.Count(),
|
|
"shards_needed": totalShards,
|
|
"per_server": summary,
|
|
}).Info("EC shard inventory before source deletion")
|
|
|
|
if err := erasure_coding.RequireFullShardSet(t.volumeID, union, totalShards); err != nil {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"per_server": summary,
|
|
"error": err.Error(),
|
|
}).Error("EC shard verification failed — source volume will be kept")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deleteOriginalVolume deletes the original volume and all its replicas from all servers
|
|
func (t *ErasureCodingTask) deleteOriginalVolume(ctx context.Context) error {
|
|
// Get replicas from task parameters (set during detection)
|
|
replicas := t.getReplicas()
|
|
|
|
if len(replicas) == 0 {
|
|
glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID)
|
|
replicas = []string{t.server}
|
|
}
|
|
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"replica_count": len(replicas),
|
|
"replica_servers": replicas,
|
|
}).Info("Starting original volume deletion from replica servers")
|
|
|
|
// Delete volume from all replica locations
|
|
var deleteErrors []string
|
|
successCount := 0
|
|
|
|
for i, replicaServer := range replicas {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"replica_index": i + 1,
|
|
"total_replicas": len(replicas),
|
|
"server": replicaServer,
|
|
"volume_id": t.volumeID,
|
|
}).Info("Deleting volume from replica server")
|
|
|
|
err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), t.grpcDialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
_, err := client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
|
VolumeId: t.volumeID,
|
|
OnlyEmpty: false, // Force delete since we've created EC shards
|
|
})
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err))
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"server": replicaServer,
|
|
"volume_id": t.volumeID,
|
|
"error": err.Error(),
|
|
}).Error("Failed to delete volume from replica server")
|
|
} else {
|
|
successCount++
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"server": replicaServer,
|
|
"volume_id": t.volumeID,
|
|
}).Info("Successfully deleted volume from replica server")
|
|
}
|
|
}
|
|
|
|
if len(deleteErrors) > 0 {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"successful": successCount,
|
|
"failed": len(deleteErrors),
|
|
"total_replicas": len(replicas),
|
|
"success_rate": float64(successCount) / float64(len(replicas)) * 100,
|
|
"errors": deleteErrors,
|
|
}).Error("Failed to delete some original volume replicas after EC encoding")
|
|
// A surviving source replica lets a later detection scan re-propose
|
|
// EC on the same volume, which retries over mounted shards.
|
|
return fmt.Errorf("failed to delete %d of %d original volume replicas for volume %d: %s",
|
|
len(deleteErrors), len(replicas), t.volumeID, strings.Join(deleteErrors, "; "))
|
|
}
|
|
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"replica_count": len(replicas),
|
|
"replica_servers": replicas,
|
|
}).Info("Successfully deleted volume from all replica servers")
|
|
|
|
return nil
|
|
}
|
|
|
|
// getReplicas extracts regular .dat replica servers from unified sources.
|
|
// Sources with ShardIds set are EC-shard cleanup targets and must be skipped.
|
|
// Per-disk source rows are deduped to one server entry — VolumeDelete is a
|
|
// server-wide call.
|
|
func (t *ErasureCodingTask) getReplicas() []string {
|
|
var replicas []string
|
|
seen := make(map[string]struct{})
|
|
for _, source := range t.sources {
|
|
if source.VolumeId == 0 || len(source.ShardIds) > 0 {
|
|
continue
|
|
}
|
|
if _, ok := seen[source.Node]; ok {
|
|
continue
|
|
}
|
|
seen[source.Node] = struct{}{}
|
|
replicas = append(replicas, source.Node)
|
|
}
|
|
return replicas
|
|
}
|
|
|
|
// cleanupStaleEcShards unmounts and deletes any EC shards still mounted on
|
|
// destinations from a previous failed encode of this volume. Targets every
|
|
// node we plan to write to (t.targets) plus every node detection saw EC
|
|
// shards on (t.sources with ShardIds set), and issues the cleanup over the
|
|
// full shard range so a stale topology snapshot — or shards landed by a
|
|
// prior attempt that haven't heartbeated yet — cannot leave the
|
|
// mounted-volume guard tripped during distributeEcShards. Safe by ordering:
|
|
// runs after the source .dat is in the worker's workdir and a full local
|
|
// shard set is generated. Per-destination errors are aggregated, not
|
|
// short-circuited.
|
|
func (t *ErasureCodingTask) cleanupStaleEcShards(ctx context.Context) error {
|
|
nodes := make(map[string]struct{})
|
|
for _, source := range t.sources {
|
|
if source == nil || source.Node == "" || len(source.ShardIds) == 0 {
|
|
continue
|
|
}
|
|
nodes[source.Node] = struct{}{}
|
|
}
|
|
for _, target := range t.targets {
|
|
if target == nil || target.Node == "" {
|
|
continue
|
|
}
|
|
nodes[target.Node] = struct{}{}
|
|
}
|
|
if len(nodes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
allShards := fullShardIdRange(t.dataShards, t.parityShards)
|
|
|
|
var cleanupErrors []string
|
|
for node := range nodes {
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"destination": node,
|
|
"shard_ids": allShards,
|
|
}).Info("Clearing stale EC shards on destination before re-distribute")
|
|
|
|
if err := unmountAndDeleteEcShards(ctx, t.grpcDialOption, node, t.volumeID, t.collection, allShards); err != nil {
|
|
cleanupErrors = append(cleanupErrors, fmt.Sprintf("%s: %v", node, err))
|
|
t.GetLogger().WithFields(map[string]interface{}{
|
|
"volume_id": t.volumeID,
|
|
"destination": node,
|
|
"error": err.Error(),
|
|
}).Error("Failed to clear stale EC shards on destination")
|
|
}
|
|
}
|
|
|
|
if len(cleanupErrors) > 0 {
|
|
return fmt.Errorf("stale EC shard cleanup failed on %d destination(s): %s",
|
|
len(cleanupErrors), strings.Join(cleanupErrors, "; "))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// fullShardIdRange builds [0..total-1] for unmount/delete RPCs. Falls back
|
|
// to erasure_coding.TotalShardsCount when the task's ratio is unset (early
|
|
// callers, tests); the helper never returns an empty slice.
|
|
func fullShardIdRange(dataShards, parityShards int32) []uint32 {
|
|
total := int(dataShards + parityShards)
|
|
if total <= 0 {
|
|
total = erasure_coding.TotalShardsCount
|
|
}
|
|
if total > erasure_coding.MaxShardCount {
|
|
total = erasure_coding.MaxShardCount
|
|
}
|
|
ids := make([]uint32, total)
|
|
for i := range ids {
|
|
ids[i] = uint32(i)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
// unmountAndDeleteEcShards unmounts then deletes the named shards on one
|
|
// destination. Unmount must precede delete (delete requires the shard be
|
|
// unmounted); both RPCs are idempotent against missing shards.
|
|
func unmountAndDeleteEcShards(
|
|
ctx context.Context,
|
|
dialOption grpc.DialOption,
|
|
destination string,
|
|
volumeID uint32,
|
|
collection string,
|
|
shardIds []uint32,
|
|
) error {
|
|
return operation.WithVolumeServerClient(false, pb.ServerAddress(destination), dialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
if _, err := client.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
|
|
VolumeId: volumeID,
|
|
ShardIds: shardIds,
|
|
}); err != nil {
|
|
return fmt.Errorf("unmount: %w", err)
|
|
}
|
|
if _, err := client.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
|
|
VolumeId: volumeID,
|
|
Collection: collection,
|
|
ShardIds: shardIds,
|
|
}); err != nil {
|
|
return fmt.Errorf("delete: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// verifyDatIdxConsistency checks that all .idx entries reference data within the
|
|
// .dat file. Since .dat and .idx are copied as separate network transfers, the
|
|
// .idx may have entries from writes that landed after the .dat was copied.
|
|
func verifyDatIdxConsistency(datFile, idxFile string) error {
|
|
datInfo, err := os.Stat(datFile)
|
|
if err != nil {
|
|
return fmt.Errorf("stat dat file: %v", err)
|
|
}
|
|
datSize := datInfo.Size()
|
|
|
|
// Read volume version from superblock to compute actual needle sizes
|
|
df, err := os.Open(datFile)
|
|
if err != nil {
|
|
return fmt.Errorf("open dat file: %v", err)
|
|
}
|
|
defer df.Close()
|
|
|
|
versionBytes := make([]byte, 1)
|
|
if _, err := df.ReadAt(versionBytes, 0); err != nil {
|
|
return fmt.Errorf("read version byte: %v", err)
|
|
}
|
|
version := needle.Version(versionBytes[0])
|
|
|
|
idxF, err := os.Open(idxFile)
|
|
if err != nil {
|
|
return fmt.Errorf("open idx file: %v", err)
|
|
}
|
|
defer idxF.Close()
|
|
|
|
var maxEnd int64
|
|
var maxEndNeedleId storagetypes.NeedleId
|
|
var entryCount int64
|
|
err = idx.WalkIndexFile(idxF, 0, func(key storagetypes.NeedleId, offset storagetypes.Offset, size storagetypes.Size) error {
|
|
entryCount++
|
|
if size.IsDeleted() {
|
|
return nil
|
|
}
|
|
end := offset.ToActualOffset() + needle.GetActualSize(size, version)
|
|
if end > maxEnd {
|
|
maxEnd = end
|
|
maxEndNeedleId = key
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("walk idx file: %v", err)
|
|
}
|
|
|
|
if maxEnd > datSize {
|
|
return fmt.Errorf(
|
|
"idx references data beyond dat file: needle %d ends at offset %d but dat file is only %d bytes (%d entries total)",
|
|
maxEndNeedleId, maxEnd, datSize, entryCount,
|
|
)
|
|
}
|
|
|
|
glog.V(1).Infof("dat/idx consistency check passed: %d entries, max offset %d, dat size %d", entryCount, maxEnd, datSize)
|
|
return nil
|
|
}
|