mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-23 02:01:32 +00:00
* fix(ec): off-by-one in nLargeBlockRows causes EC read corruption (#8947) The nLargeBlockRows formula in locateOffset used (shardDatSize-1)/largeBlockLength, which produces an off-by-one error when shardDatSize is an exact multiple of largeBlockLength (e.g. a 30GB volume with 10 data shards = 3GB per shard). This causes needles in the last large block row to be mislocated as small blocks, reading from completely wrong shard positions and returning garbage data. Fix: remove the -1 from locateOffset and only apply it in the ecdFileSize fallback path (old volumes without datFileSize in .vif), where it's needed to handle the ambiguous case conservatively. Also fix ReadEcShardNeedle to pass offset=0 to ReadBytes, consistent with the scrub path, since the bytes buffer already starts at position 0. * fix: add volume context to EC read errors, remove contextless glog The glog.Errorf in ReadBytes logged "entry not found" without any volume ID, making it impossible to identify which volume was affected. Remove this contextless log and instead add volume ID, needle ID, offset, and size to the error returned from the EC read path. The EC scrub callers already wrap errors with volume context.
275 lines
8.3 KiB
Go
275 lines
8.3 KiB
Go
package erasure_coding
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/klauspost/reedsolomon"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
)
|
|
|
|
const (
|
|
largeBlockSize = 10000
|
|
smallBlockSize = 100
|
|
)
|
|
|
|
func TestEncodingDecoding(t *testing.T) {
|
|
bufferSize := 50
|
|
baseFileName := "1"
|
|
|
|
// Create default EC context for testing
|
|
ctx := NewDefaultECContext("", 0)
|
|
|
|
err := generateEcFiles(baseFileName, bufferSize, largeBlockSize, smallBlockSize, ctx)
|
|
if err != nil {
|
|
t.Logf("generateEcFiles: %v", err)
|
|
}
|
|
|
|
err = WriteSortedFileFromIdx(baseFileName, ".ecx")
|
|
if err != nil {
|
|
t.Logf("WriteSortedFileFromIdx: %v", err)
|
|
}
|
|
|
|
err = validateFiles(baseFileName, ctx)
|
|
if err != nil {
|
|
t.Logf("WriteSortedFileFromIdx: %v", err)
|
|
}
|
|
|
|
removeGeneratedFiles(baseFileName, ctx)
|
|
|
|
}
|
|
|
|
func validateFiles(baseFileName string, ctx *ECContext) error {
|
|
nm, err := readNeedleMap(baseFileName)
|
|
if err != nil {
|
|
return fmt.Errorf("readNeedleMap: %v", err)
|
|
}
|
|
defer nm.Close()
|
|
|
|
datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open dat file: %v", err)
|
|
}
|
|
defer datFile.Close()
|
|
|
|
fi, err := datFile.Stat()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stat dat file: %v", err)
|
|
}
|
|
|
|
ecFiles, err := openEcFiles(baseFileName, true, ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("error opening ec files: %w", err)
|
|
}
|
|
defer closeEcFiles(ecFiles)
|
|
|
|
err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
|
|
return assertSame(datFile, fi.Size(), ecFiles, value.Offset, value.Size)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check ec files: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) error {
|
|
|
|
data, err := readDatFile(datFile, offset, size)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read dat file: %v", err)
|
|
}
|
|
|
|
ecFileStat, _ := ecFiles[0].Stat()
|
|
|
|
ecData, err := readEcFile(ecFileStat.Size(), ecFiles, offset, size)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read ec file: %v", err)
|
|
}
|
|
|
|
if bytes.Compare(data, ecData) != 0 {
|
|
return fmt.Errorf("unexpected data read")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) {
|
|
|
|
data := make([]byte, size)
|
|
n, err := datFile.ReadAt(data, offset.ToActualOffset())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to ReadAt dat file: %v", err)
|
|
}
|
|
if n != int(size) {
|
|
return nil, fmt.Errorf("unexpected read size %d, expected %d", n, size)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
func readEcFile(shardDatSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
|
|
|
|
intervals := LocateData(largeBlockSize, smallBlockSize, shardDatSize, offset.ToActualOffset(), size)
|
|
|
|
for i, interval := range intervals {
|
|
if d, e := readOneInterval(interval, ecFiles); e != nil {
|
|
return nil, e
|
|
} else {
|
|
if i == 0 {
|
|
data = d
|
|
} else {
|
|
data = append(data, d...)
|
|
}
|
|
}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err error) {
|
|
|
|
ecFileIndex, ecFileOffset := interval.ToShardIdAndOffset(largeBlockSize, smallBlockSize)
|
|
|
|
data = make([]byte, interval.Size)
|
|
err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset)
|
|
if false { // do some ec testing
|
|
ecData, err := readFromOtherEcFiles(ecFiles, int(ecFileIndex), ecFileOffset, interval.Size)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("ec reconstruct error: %v", err)
|
|
}
|
|
if bytes.Compare(data, ecData) != 0 {
|
|
return nil, fmt.Errorf("ec compare error")
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size types.Size) (data []byte, err error) {
|
|
enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create encoder: %v", err)
|
|
}
|
|
|
|
bufs := make([][]byte, TotalShardsCount)
|
|
for i := 0; i < DataShardsCount; {
|
|
n := int(rand.Int31n(TotalShardsCount))
|
|
if n == ecFileIndex || bufs[n] != nil {
|
|
continue
|
|
}
|
|
bufs[n] = make([]byte, size)
|
|
i++
|
|
}
|
|
|
|
for i, buf := range bufs {
|
|
if buf == nil {
|
|
continue
|
|
}
|
|
err = readFromFile(ecFiles[i], buf, ecFileOffset)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
if err = enc.ReconstructData(bufs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return bufs[ecFileIndex], nil
|
|
}
|
|
|
|
func readFromFile(file *os.File, data []byte, ecFileOffset int64) (err error) {
|
|
_, err = file.ReadAt(data, ecFileOffset)
|
|
return
|
|
}
|
|
|
|
func removeGeneratedFiles(baseFileName string, ctx *ECContext) {
|
|
for i := 0; i < ctx.Total(); i++ {
|
|
fname := baseFileName + ctx.ToExt(i)
|
|
os.Remove(fname)
|
|
}
|
|
os.Remove(baseFileName + ".ecx")
|
|
}
|
|
|
|
func TestLocateData(t *testing.T) {
|
|
intervals := LocateData(largeBlockSize, smallBlockSize, largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
|
|
if len(intervals) != 1 {
|
|
t.Errorf("unexpected interval size %d", len(intervals))
|
|
}
|
|
if !intervals[0].sameAs(Interval{0, 0, 1, false, 1}) {
|
|
t.Errorf("unexpected interval %+v", intervals[0])
|
|
}
|
|
|
|
intervals = LocateData(largeBlockSize, smallBlockSize, largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
|
|
fmt.Printf("%+v\n", intervals)
|
|
}
|
|
|
|
func (this Interval) sameAs(that Interval) bool {
|
|
return this.IsLargeBlock == that.IsLargeBlock &&
|
|
this.InnerBlockOffset == that.InnerBlockOffset &&
|
|
this.BlockIndex == that.BlockIndex &&
|
|
this.Size == that.Size
|
|
}
|
|
|
|
func TestLocateData2(t *testing.T) {
|
|
// Use ecdFileSize-1 to simulate the fallback path in LocateEcShardNeedleInterval
|
|
// when datFileSize is not available (old EC volumes without .vif datFileSize).
|
|
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, 3221225472-1, 21479557912, 4194339)
|
|
assert.Equal(t, intervals, []Interval{
|
|
{BlockIndex: 4, InnerBlockOffset: 527128, Size: 521448, IsLargeBlock: false, LargeBlockRowsCount: 2},
|
|
{BlockIndex: 5, InnerBlockOffset: 0, Size: 1048576, IsLargeBlock: false, LargeBlockRowsCount: 2},
|
|
{BlockIndex: 6, InnerBlockOffset: 0, Size: 1048576, IsLargeBlock: false, LargeBlockRowsCount: 2},
|
|
{BlockIndex: 7, InnerBlockOffset: 0, Size: 1048576, IsLargeBlock: false, LargeBlockRowsCount: 2},
|
|
{BlockIndex: 8, InnerBlockOffset: 0, Size: 527163, IsLargeBlock: false, LargeBlockRowsCount: 2},
|
|
})
|
|
}
|
|
|
|
func TestLocateData3(t *testing.T) {
|
|
// Use ecdFileSize-1 to simulate the fallback path in LocateEcShardNeedleInterval
|
|
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, 3221225472-1, 30782909808, 112568)
|
|
for _, interval := range intervals {
|
|
fmt.Printf("%+v\n", interval)
|
|
}
|
|
assert.Equal(t, intervals, []Interval{
|
|
{BlockIndex: 8876, InnerBlockOffset: 912752, Size: 112568, IsLargeBlock: false, LargeBlockRowsCount: 2},
|
|
})
|
|
}
|
|
|
|
func TestLocateData_ExactMultiple_Issue8947(t *testing.T) {
|
|
// When datFileSize is available, shardDatSize = datFileSize / DataShards.
|
|
// For a 30GB volume with 10 data shards, shardDatSize = 3GB = 3 * ErasureCodingLargeBlockSize.
|
|
// The encoder produces 3 large block rows, 0 small block rows.
|
|
// nLargeBlockRows must be 3, not 2.
|
|
shardDatSize := int64(3) * ErasureCodingLargeBlockSize // 3GB per shard from datFileSize/DataShards
|
|
|
|
// Reading from the 3rd large block row (offsets 20GB-30GB) should work
|
|
offset := int64(2) * ErasureCodingLargeBlockSize * DataShardsCount // 20GB
|
|
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardDatSize, offset, 1024)
|
|
assert.Equal(t, 1, len(intervals))
|
|
assert.True(t, intervals[0].IsLargeBlock, "data in 3rd large row should be in large blocks")
|
|
assert.Equal(t, 3, intervals[0].LargeBlockRowsCount)
|
|
assert.Equal(t, 20, intervals[0].BlockIndex) // block 20 = shard 0 of 3rd row
|
|
}
|
|
|
|
func TestLocateData_Issue8179(t *testing.T) {
|
|
large := int64(10000)
|
|
small := int64(100)
|
|
shardSize := int64(259092) // Resulting in nLargeBlockRows = 25 as seen in panic log
|
|
|
|
// Testing range through the large-to-small transition boundary
|
|
nLargeBlockRows := shardSize / large
|
|
largeAreaSize := nLargeBlockRows * int64(DataShardsCount) * large
|
|
|
|
for offset := largeAreaSize - 500; offset < largeAreaSize+500; offset++ {
|
|
intervals := LocateData(large, small, shardSize, offset, 200)
|
|
for _, interval := range intervals {
|
|
assert.True(t, interval.Size > 0, "Interval size must be positive at offset %d, got %+v", offset, interval)
|
|
}
|
|
}
|
|
}
|