mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-16 23:01:30 +00:00
* test: add integration tests for volume and EC volume scrubbing Add scrub integration tests covering normal volumes (full data scrub, corrupt .dat detection, mixed healthy/broken batches, missing volume error) and EC volumes (INDEX/LOCAL modes on healthy volumes, corrupt shard detection with broken shard info reporting, corrupt .ecx index, auto-select, unsupported mode error). Also adds framework helpers: CorruptDatFile, CorruptEcxFile, CorruptEcShardFile for fault injection in scrub tests. * fix: correct dat/ecx corruption helpers and ecx test setup - CorruptDatFile: truncate .dat to superblock size instead of overwriting bytes (ensures scrub detects data file size mismatch) - TestScrubEcVolumeIndexCorruptEcx: corrupt .ecx before mount so the corrupted size is loaded into memory (EC volumes cache ecx size at mount)
128 lines
4.1 KiB
Go
128 lines
4.1 KiB
Go
package framework
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
)
|
|
|
|
func AllocateVolume(t testing.TB, client volume_server_pb.VolumeServerClient, volumeID uint32, collection string) {
|
|
t.Helper()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := client.AllocateVolume(ctx, &volume_server_pb.AllocateVolumeRequest{
|
|
VolumeId: volumeID,
|
|
Collection: collection,
|
|
Replication: "000",
|
|
Version: uint32(needle.GetCurrentVersion()),
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("allocate volume %d: %v", volumeID, err)
|
|
}
|
|
}
|
|
|
|
func NewFileID(volumeID uint32, key uint64, cookie uint32) string {
|
|
return needle.NewFileId(needle.VolumeId(volumeID), key, cookie).String()
|
|
}
|
|
|
|
func UploadBytes(t testing.TB, client *http.Client, volumeURL, fid string, data []byte) *http.Response {
|
|
t.Helper()
|
|
|
|
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/%s", volumeURL, fid), bytes.NewReader(data))
|
|
if err != nil {
|
|
t.Fatalf("build upload request: %v", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/octet-stream")
|
|
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data)))
|
|
return DoRequest(t, client, req)
|
|
}
|
|
|
|
// CorruptIndexFile appends garbage bytes to a volume's .idx file on disk so
|
|
// that CheckIndexFile detects a size mismatch during scrub.
|
|
func CorruptIndexFile(t testing.TB, baseDir string, volumeID uint32) {
|
|
t.Helper()
|
|
idxPath := filepath.Join(baseDir, "volume", fmt.Sprintf("%d.idx", volumeID))
|
|
f, err := os.OpenFile(idxPath, os.O_WRONLY|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
t.Fatalf("open idx file for corruption: %v", err)
|
|
}
|
|
defer f.Close()
|
|
if _, err := f.Write([]byte{0xDE, 0xAD}); err != nil {
|
|
t.Fatalf("corrupt idx file: %v", err)
|
|
}
|
|
}
|
|
|
|
// EnableMaintenanceMode puts the volume server into maintenance mode.
|
|
func EnableMaintenanceMode(t testing.TB, ctx context.Context, client volume_server_pb.VolumeServerClient) {
|
|
t.Helper()
|
|
stateResp, err := client.GetState(ctx, &volume_server_pb.GetStateRequest{})
|
|
if err != nil {
|
|
t.Fatalf("GetState failed: %v", err)
|
|
}
|
|
_, err = client.SetState(ctx, &volume_server_pb.SetStateRequest{
|
|
State: &volume_server_pb.VolumeServerState{
|
|
Maintenance: true,
|
|
Version: stateResp.GetState().GetVersion(),
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("SetState maintenance=true failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// CorruptDatFile truncates a volume's .dat file to just the superblock (8 bytes)
|
|
// so that needle reads fail during a full scrub due to data file size mismatch.
|
|
func CorruptDatFile(t testing.TB, baseDir string, volumeID uint32) {
|
|
t.Helper()
|
|
datPath := filepath.Join(baseDir, "volume", fmt.Sprintf("%d.dat", volumeID))
|
|
// Truncate to superblock size only, removing all needle data.
|
|
if err := os.Truncate(datPath, 8); err != nil {
|
|
t.Fatalf("truncate dat file for corruption: %v", err)
|
|
}
|
|
}
|
|
|
|
// CorruptEcxFile appends garbage bytes to a volume's .ecx file on disk so
|
|
// that CheckIndexFile detects a size mismatch during EC index scrub.
|
|
func CorruptEcxFile(t testing.TB, baseDir string, volumeID uint32) {
|
|
t.Helper()
|
|
ecxPath := filepath.Join(baseDir, "volume", fmt.Sprintf("%d.ecx", volumeID))
|
|
f, err := os.OpenFile(ecxPath, os.O_WRONLY|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
t.Fatalf("open ecx file for corruption: %v", err)
|
|
}
|
|
defer f.Close()
|
|
if _, err := f.Write([]byte{0xDE, 0xAD}); err != nil {
|
|
t.Fatalf("corrupt ecx file: %v", err)
|
|
}
|
|
}
|
|
|
|
// CorruptEcShardFile truncates an EC shard file to 1 byte so that local shard
|
|
// reads fail during an EC scrub.
|
|
func CorruptEcShardFile(t testing.TB, baseDir string, volumeID uint32, shardID int) {
|
|
t.Helper()
|
|
shardPath := filepath.Join(baseDir, "volume", fmt.Sprintf("%d.ec%02d", volumeID, shardID))
|
|
if err := os.Truncate(shardPath, 1); err != nil {
|
|
t.Fatalf("truncate EC shard file %s: %v", shardPath, err)
|
|
}
|
|
}
|
|
|
|
func ReadBytes(t testing.TB, client *http.Client, volumeURL, fid string) *http.Response {
|
|
t.Helper()
|
|
|
|
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", volumeURL, fid), nil)
|
|
if err != nil {
|
|
t.Fatalf("build read request: %v", err)
|
|
}
|
|
return DoRequest(t, client, req)
|
|
}
|