From 17efaaa90229dc28ae41b307be67d7bf40db1d71 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 22 Jun 2016 03:04:11 +0530 Subject: [PATCH 1/3] XL/erasure-read: Support parallel reads from disks. --- erasure-readfile.go | 155 ++++++++++++++++++++++++++------------------ erasure-utils.go | 16 ++--- xl-v1-object.go | 10 ++- 3 files changed, 108 insertions(+), 73 deletions(-) diff --git a/erasure-readfile.go b/erasure-readfile.go index 7a0bc58a7..71b4eb1a9 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "errors" "io" + "sync" "github.com/klauspost/reedsolomon" ) @@ -30,82 +31,113 @@ import ( // are decoded into a data block. Data block is trimmed for given offset and length, // then written to given writer. This function also supports bit-rot detection by // verifying checksum of individual block's checksum. -func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64) (int64, error) { +func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64, totalLength int64) (int64, error) { + min := func(a int64, b int64) int { + if a < b { + return int(a) + } + return int(b) + } // Total bytes written to writer bytesWritten := int64(0) // Gather previously calculated block checksums. - blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) + // blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) // Pick one erasure info. eInfo := pickValidErasureInfo(eInfos) - // Get block info for given offset, length and block size. - startBlock, bytesToSkip, endBlock := getBlockInfo(offset, length, eInfo.BlockSize) - // Data chunk size on each block. - chunkSize := eInfo.BlockSize / int64(eInfo.DataBlocks) + chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) + + // Get block info for given offset, length and block size. + startBlock, bytesToSkip := getBlockInfo(offset, eInfo.BlockSize) + + orderedDisks := make([]StorageAPI, len(disks)) + for index := range disks { + blockIndex := eInfo.Distribution[index] + orderedDisks[blockIndex-1] = disks[index] + } + + for block := startBlock; bytesWritten < length; block++ { + curChunkSize := chunkSize + if totalLength-offset+bytesWritten < curChunkSize { + curChunkSize = getEncodedBlockLen(totalLength-offset+bytesWritten, eInfo.DataBlocks) + } - for block := startBlock; block <= endBlock; block++ { // Allocate encoded blocks up to storage disks. enBlocks := make([][]byte, len(disks)) - // Counter to keep success data blocks. - var successDataBlocksCount = 0 - var noReconstruct bool // Set for no reconstruction. + // Figure out the number of disks that are needed for the read. + // If all the data disks are available then dataDiskCount = eInfo.DataBlocks + // Else dataDiskCount = eInfo.DataBlocks + 1 - // Keep how many bytes are read for this block. - // In most cases, last block in the file is shorter than chunkSize - lastReadSize := int64(0) - - // Read from all the disks. - for index, disk := range disks { - blockIndex := eInfo.Distribution[index] - 1 - if !isValidBlock(disks, volume, path, toDiskIndex(blockIndex, eInfo.Distribution), blockCheckSums) { - continue - } + diskCount := 0 + for _, disk := range orderedDisks[:eInfo.DataBlocks] { if disk == nil { continue } - - // Initialize chunk slice and fill the data from each parts. - enBlocks[blockIndex] = make([]byte, chunkSize) - - // Read the necessary blocks. - n, err := disk.ReadFile(volume, path, block*chunkSize, enBlocks[blockIndex]) - if err != nil { - enBlocks[blockIndex] = nil - } else if n < chunkSize { - // As the data we got is smaller than chunk size, keep only required chunk slice - enBlocks[blockIndex] = append([]byte{}, enBlocks[blockIndex][:n]...) - } - - // Remember bytes read at first time. - if lastReadSize == 0 { - lastReadSize = n - } - - // If bytes read is not equal to bytes read lastly, treat it as corrupted chunk. - if n != lastReadSize { - return bytesWritten, errXLDataCorrupt - } - - // Verify if we have successfully read all the data blocks. - if blockIndex < eInfo.DataBlocks && enBlocks[blockIndex] != nil { - successDataBlocksCount++ - // Set when we have all the data blocks and no - // reconstruction is needed, so that we can avoid - // erasure reconstruction. - noReconstruct = successDataBlocksCount == eInfo.DataBlocks - if noReconstruct { - // Break out we have read all the data blocks. - break - } - } + diskCount++ } - // Verify if reconstruction is needed, proceed with reconstruction. - if !noReconstruct { + if diskCount < eInfo.DataBlocks { + diskCount = eInfo.DataBlocks + 1 + } + + wg := &sync.WaitGroup{} + index := 0 + for _, disk := range orderedDisks { + if disk == nil { + index++ + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + buf := make([]byte, curChunkSize) + n, err := disk.ReadFile(volume, path, block*curChunkSize, buf) + if err != nil { + orderedDisks[index] = nil + return + } + enBlocks[index] = buf[:n] + }(index, disk) + index++ + diskCount-- + if diskCount == 0 { + break + } + } + wg.Wait() + + // Counter to keep success data blocks. + var successDataBlocksCount = 0 + var successParityBlocksCount = 0 + for bufidx, buf := range enBlocks { + if buf == nil { + continue + } + if bufidx < eInfo.DataBlocks { + successDataBlocksCount++ + continue + } + successParityBlocksCount++ + } + + if successDataBlocksCount < eInfo.DataBlocks { + for ; index < len(orderedDisks); index++ { + if (successDataBlocksCount + successParityBlocksCount) == (eInfo.DataBlocks + 1) { + break + } + buf := make([]byte, curChunkSize) + n, err := orderedDisks[index].ReadFile(volume, path, block*curChunkSize, buf) + if err != nil { + orderedDisks[index] = nil + continue + } + successParityBlocksCount++ + enBlocks[index] = buf[:n] + } err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) if err != nil { return bytesWritten, err @@ -113,7 +145,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } // Get data blocks from encoded blocks. - dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, int(lastReadSize)*eInfo.DataBlocks) + dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, min(eInfo.BlockSize, totalLength-offset+bytesWritten)) if err != nil { return bytesWritten, err } @@ -123,12 +155,11 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // If this is start block, skip unwanted bytes. if block == startBlock { - buf = append([]byte{}, dataBlocks[bytesToSkip:]...) + buf = buf[bytesToSkip:] } - // If this is end block, retain only required bytes. - if block == endBlock { - buf = append([]byte{}, buf[:length-bytesWritten]...) + if len(buf) > int(length-bytesWritten) { + buf = buf[:length-bytesWritten] } // Copy data blocks. diff --git a/erasure-utils.go b/erasure-utils.go index c4dcf8a5e..9316e739a 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -89,16 +89,16 @@ func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data [] } // getBlockInfo - find start/end block and bytes to skip for given offset, length and block size. -func getBlockInfo(offset, length, blockSize int64) (startBlock, bytesToSkip, endBlock int64) { +func getBlockInfo(offset, blockSize int64) (startBlock, bytesToSkip int64) { // Calculate start block for given offset and how many bytes to skip to get the offset. startBlock = offset / blockSize bytesToSkip = offset % blockSize - - // Calculate end block for given size to read - endBlock = (offset + length) / blockSize - if endBlock > 0 && (offset+length)%blockSize == 0 { - endBlock-- - } - return } + +// calculate the blockSize based on input length and total number of +// data blocks. +func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) { + curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) + return curEncBlockSize +} diff --git a/xl-v1-object.go b/xl-v1-object.go index aea58fd20..0237173c3 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -91,15 +91,19 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i totalBytesRead := int64(0) // Read from all parts. for ; partIndex <= lastPartIndex; partIndex++ { + if length == totalBytesRead { + break + } // Save the current part name and size. partName := xlMeta.Parts[partIndex].Name partSize := xlMeta.Parts[partIndex].Size - if partSize > (length - totalBytesRead) { - partSize = length - totalBytesRead + readSize := partSize - partOffset + if readSize > (length - totalBytesRead) { + readSize = length - totalBytesRead } // Start reading the part name. - n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, partSize) + n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) if err != nil { return err } From d4bea5fbf80509071ac6cbc351f3e49942038852 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 22 Jun 2016 21:35:03 +0530 Subject: [PATCH 2/3] XL/erasure-read: Add Comments and enable bitrot detection. --- erasure-readfile.go | 146 ++++++++++++++++++++++++++++---------------- erasure-utils.go | 3 +- xl-v1-object.go | 5 +- 3 files changed, 100 insertions(+), 54 deletions(-) diff --git a/erasure-readfile.go b/erasure-readfile.go index 71b4eb1a9..7f9a0389c 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -17,7 +17,6 @@ package main import ( - "bytes" "encoding/hex" "errors" "io" @@ -28,51 +27,72 @@ import ( // erasureReadFile - read bytes from erasure coded files and writes to given writer. // Erasure coded files are read block by block as per given erasureInfo and data chunks -// are decoded into a data block. Data block is trimmed for given offset and length, -// then written to given writer. This function also supports bit-rot detection by +// are decoded into a data block. Data block is trimmed for given offset and length, +// then written to given writer. This function also supports bit-rot detection by // verifying checksum of individual block's checksum. func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64, totalLength int64) (int64, error) { - min := func(a int64, b int64) int { - if a < b { - return int(a) - } - return int(b) - } - // Total bytes written to writer - bytesWritten := int64(0) - - // Gather previously calculated block checksums. - // blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) - // Pick one erasure info. eInfo := pickValidErasureInfo(eInfos) - // Data chunk size on each block. - chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) - - // Get block info for given offset, length and block size. - startBlock, bytesToSkip := getBlockInfo(offset, eInfo.BlockSize) + // Gather previously calculated block checksums. + blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) + orderedBlockCheckSums := make([]checkSumInfo, len(disks)) + // []orderedDisks will have first eInfo.DataBlocks disks as data disks and rest will be parity. orderedDisks := make([]StorageAPI, len(disks)) for index := range disks { blockIndex := eInfo.Distribution[index] orderedDisks[blockIndex-1] = disks[index] + orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index] } + // bitrotVerify verifies if the file on a particular disk does not have bitrot by verifying the hash of + // the contents of the file. + bitrotVerify := func() func(diskIndex int) bool { + verified := make([]bool, len(orderedDisks)) + // Return closure so that we have reference to []verified and not recalculate the hash on it + // everytime the function is called for the same disk. + return func(diskIndex int) bool { + if verified[diskIndex] { + return true + } + isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex]) + verified[diskIndex] = isValid + return isValid + } + }() + + // Total bytes written to writer + bytesWritten := int64(0) + + // chunkSize is roughly BlockSize/DataBlocks. + // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. + // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by + // DataBlocks. The extra space will have 0-padding. + chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) + + startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize) + + // For each block, read chunk from each disk. If we are able to read all the data disks then we don't + // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number + // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. for block := startBlock; bytesWritten < length; block++ { + // curChunkSize will be chunkSize except for the last block because the size of the last block + // can be less than BlockSize. curChunkSize := chunkSize - if totalLength-offset+bytesWritten < curChunkSize { - curChunkSize = getEncodedBlockLen(totalLength-offset+bytesWritten, eInfo.DataBlocks) + if block == endBlock && (totalLength%eInfo.BlockSize != 0) { + // If this is the last block and size of the block is < BlockSize. + curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks) } - // Allocate encoded blocks up to storage disks. + // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. enBlocks := make([][]byte, len(disks)) // Figure out the number of disks that are needed for the read. - // If all the data disks are available then dataDiskCount = eInfo.DataBlocks - // Else dataDiskCount = eInfo.DataBlocks + 1 - + // We will need DataBlocks number of disks if all the data disks are up. + // We will need DataBlocks+1 number of disks even if one of the data disks is down. diskCount := 0 + // Count the number of data disks that are up. for _, disk := range orderedDisks[:eInfo.DataBlocks] { if disk == nil { continue @@ -81,11 +101,15 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } if diskCount < eInfo.DataBlocks { + // Not enough data disks up, so we need DataBlocks+1 number of disks for reed-solomon Reconstruct() diskCount = eInfo.DataBlocks + 1 } wg := &sync.WaitGroup{} + + // current disk index from which to read, this will be used later in case one of the parallel reads fails. index := 0 + // Read from the disks in parallel. for _, disk := range orderedDisks { if disk == nil { index++ @@ -94,9 +118,19 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() + ok := bitrotVerify(index) + if !ok { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } buf := make([]byte, curChunkSize) - n, err := disk.ReadFile(volume, path, block*curChunkSize, buf) + // Note that for the offset calculation we have to use chunkSize and not + // curChunkSize. If we use curChunkSize for offset calculation then it + // can result in wrong offset for the last block. + n, err := disk.ReadFile(volume, path, block*chunkSize, buf) if err != nil { + // So that we don't read from this disk for the next block. orderedDisks[index] = nil return } @@ -110,7 +144,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } wg.Wait() - // Counter to keep success data blocks. + // Count number of data and parity blocks that were read. var successDataBlocksCount = 0 var successParityBlocksCount = 0 for bufidx, buf := range enBlocks { @@ -125,50 +159,63 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } if successDataBlocksCount < eInfo.DataBlocks { + // If we don't have DataBlocks number of data blocks we will have to read enough + // parity blocks such that we have DataBlocks+1 number for blocks for reedsolomon.Reconstruct() for ; index < len(orderedDisks); index++ { if (successDataBlocksCount + successParityBlocksCount) == (eInfo.DataBlocks + 1) { + // We have DataBlocks+1 blocks, enough for reedsolomon.Reconstruct() break } + ok := bitrotVerify(index) + if !ok { + // Mark nil so that we don't read from this disk for the next block. + orderedDisks[index] = nil + continue + } buf := make([]byte, curChunkSize) - n, err := orderedDisks[index].ReadFile(volume, path, block*curChunkSize, buf) + n, err := orderedDisks[index].ReadFile(volume, path, block*chunkSize, buf) if err != nil { + // Mark nil so that we don't read from this disk for the next block. orderedDisks[index] = nil continue } successParityBlocksCount++ enBlocks[index] = buf[:n] } + // Reconstruct the missing data blocks. err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) if err != nil { return bytesWritten, err } } - // Get data blocks from encoded blocks. - dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, min(eInfo.BlockSize, totalLength-offset+bytesWritten)) + // enBlocks data can have 0-padding hence we need to figure the exact number + // of bytes we want to read from enBlocks. + blockSize := eInfo.BlockSize + if block == endBlock && totalLength%eInfo.BlockSize != 0 { + // For the last block, the block size can be less than BlockSize. + blockSize = totalLength % eInfo.BlockSize + } + data, err := getDataBlocks(enBlocks, eInfo.DataBlocks, int(blockSize)) if err != nil { return bytesWritten, err } - // Keep required bytes into buf. - buf := dataBlocks - // If this is start block, skip unwanted bytes. if block == startBlock { - buf = buf[bytesToSkip:] + data = data[bytesToSkip:] } - if len(buf) > int(length-bytesWritten) { - buf = buf[:length-bytesWritten] + if len(data) > int(length-bytesWritten) { + // We should not send more data than what was requested. + data = data[:length-bytesWritten] } - // Copy data blocks. - var n int64 - n, err = io.Copy(writer, bytes.NewReader(buf)) - bytesWritten += int64(n) + _, err = writer.Write(data) if err != nil { return bytesWritten, err } + bytesWritten += int64(len(data)) } return bytesWritten, nil @@ -210,23 +257,18 @@ func toDiskIndex(blockIdx int, distribution []int) int { // isValidBlock - calculates the checksum hash for the block and // validates if its correct returns true for valid cases, false otherwise. -func isValidBlock(disks []StorageAPI, volume, path string, diskIndex int, blockCheckSums []checkSumInfo) (ok bool) { +func isValidBlock(disk StorageAPI, volume, path string, blockCheckSum checkSumInfo) (ok bool) { ok = false - // Unknown block index requested, treat it as error. - if diskIndex == -1 { - return ok - } - // Disk is not present, treat entire block to be non existent. - if disks[diskIndex] == nil { - return ok + if disk == nil { + return false } // Read everything for a given block and calculate hash. - hashWriter := newHash(blockCheckSums[diskIndex].Algorithm) - hashBytes, err := hashSum(disks[diskIndex], volume, path, hashWriter) + hashWriter := newHash(blockCheckSum.Algorithm) + hashBytes, err := hashSum(disk, volume, path, hashWriter) if err != nil { return ok } - ok = hex.EncodeToString(hashBytes) == blockCheckSums[diskIndex].Hash + ok = hex.EncodeToString(hashBytes) == blockCheckSum.Hash return ok } diff --git a/erasure-utils.go b/erasure-utils.go index 9316e739a..521bc7b21 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -89,10 +89,11 @@ func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data [] } // getBlockInfo - find start/end block and bytes to skip for given offset, length and block size. -func getBlockInfo(offset, blockSize int64) (startBlock, bytesToSkip int64) { +func getBlockInfo(offset, length, blockSize int64) (startBlock, endBlock, bytesToSkip int64) { // Calculate start block for given offset and how many bytes to skip to get the offset. startBlock = offset / blockSize bytesToSkip = offset % blockSize + endBlock = length / blockSize return } diff --git a/xl-v1-object.go b/xl-v1-object.go index 0237173c3..464c4069f 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -97,7 +97,9 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Save the current part name and size. partName := xlMeta.Parts[partIndex].Name partSize := xlMeta.Parts[partIndex].Size + readSize := partSize - partOffset + // readSize should be adjusted so that we don't write more data than what was requested. if readSize > (length - totalBytesRead) { readSize = length - totalBytesRead } @@ -110,7 +112,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i totalBytesRead += n - // Reset part offset to 0 to read rest of the part from the beginning. + // partOffset will be valid only for the first part, hence reset it to 0 for + // the remaining parts. partOffset = 0 } // End of read all parts loop. From 9b82e64a112295794ff1173bff4feb803b4182b2 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 22 Jun 2016 12:55:23 -0700 Subject: [PATCH 3/3] XL/erasure-read: Avoid memory copy, write to writer directly all the dataBlocks. --- erasure-readfile.go | 19 +++++------ erasure-utils.go | 81 +++++++++++++++++++++++++++++++++++---------- server_xl_test.go | 8 +++-- 3 files changed, 77 insertions(+), 31 deletions(-) diff --git a/erasure-readfile.go b/erasure-readfile.go index 7f9a0389c..91f494db8 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -189,6 +189,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } } + var outSize, outOffset int64 // enBlocks data can have 0-padding hence we need to figure the exact number // of bytes we want to read from enBlocks. blockSize := eInfo.BlockSize @@ -196,26 +197,24 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // For the last block, the block size can be less than BlockSize. blockSize = totalLength % eInfo.BlockSize } - data, err := getDataBlocks(enBlocks, eInfo.DataBlocks, int(blockSize)) - if err != nil { - return bytesWritten, err - } // If this is start block, skip unwanted bytes. if block == startBlock { - data = data[bytesToSkip:] + outOffset = bytesToSkip } - if len(data) > int(length-bytesWritten) { + // Total data to be read. + outSize = blockSize + if length-bytesWritten < blockSize { // We should not send more data than what was requested. - data = data[:length-bytesWritten] + outSize = length - bytesWritten } - - _, err = writer.Write(data) + // Write data blocks. + n, err := writeDataBlocks(writer, enBlocks, eInfo.DataBlocks, outOffset, outSize) if err != nil { return bytesWritten, err } - bytesWritten += int64(len(data)) + bytesWritten += n } return bytesWritten, nil diff --git a/erasure-utils.go b/erasure-utils.go index 521bc7b21..9daf39325 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -17,6 +17,7 @@ package main import ( + "bytes" "crypto/sha512" "hash" "io" @@ -62,30 +63,74 @@ func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, er return writer.Sum(nil), nil } -// getDataBlocks - fetches the data block only part of the input encoded blocks. -func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte, err error) { - if len(enBlocks) < dataBlocks { - return nil, reedsolomon.ErrTooFewShards - } +// getDataBlockLen - get length of data blocks from encoded blocks. +func getDataBlockLen(enBlocks [][]byte, dataBlocks int) int { size := 0 - blocks := enBlocks[:dataBlocks] - for _, block := range blocks { + // Figure out the data block length. + for _, block := range enBlocks[:dataBlocks] { size += len(block) } - if size < curBlockSize { - return nil, reedsolomon.ErrShortData + return size +} + +// Writes all the data blocks from encoded blocks until requested +// outSize length. Provides a way to skip bytes until the offset. +func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset int64, outSize int64) (int64, error) { + // Do we have enough blocks? + if len(enBlocks) < dataBlocks { + return 0, reedsolomon.ErrTooFewShards } - write := curBlockSize - for _, block := range blocks { - if write < len(block) { - data = append(data, block[:write]...) - return data, nil - } - data = append(data, block...) - write -= len(block) + // Do we have enough data? + if int64(getDataBlockLen(enBlocks, dataBlocks)) < outSize { + return 0, reedsolomon.ErrShortData } - return data, nil + + // Counter to decrement total left to write. + write := outSize + + // Counter to increment total written. + totalWritten := int64(0) + + // Write all data blocks to dst. + for _, block := range enBlocks[:dataBlocks] { + // Skip blocks until we have reached our offset. + if outOffset >= int64(len(block)) { + // Decrement offset. + outOffset -= int64(len(block)) + continue + } else { + // Skip until offset. + block = block[outOffset:] + + // Reset the offset for next iteration to read everything + // from subsequent blocks. + outOffset = 0 + } + // We have written all the blocks, write the last remaining block. + if write < int64(len(block)) { + n, err := io.Copy(dst, bytes.NewReader(block[:write])) + if err != nil { + return 0, err + } + totalWritten += n + break + } + // Copy the block. + n, err := io.Copy(dst, bytes.NewReader(block)) + if err != nil { + return 0, err + } + + // Decrement output size. + write -= n + + // Increment written. + totalWritten += n + } + + // Success. + return totalWritten, nil } // getBlockInfo - find start/end block and bytes to skip for given offset, length and block size. diff --git a/server_xl_test.go b/server_xl_test.go index 4814cdc47..6e6be7de3 100644 --- a/server_xl_test.go +++ b/server_xl_test.go @@ -840,15 +840,17 @@ func (s *MyAPIXLSuite) TestPartialContent(c *C) { c.Assert(response.StatusCode, Equals, http.StatusOK) // Prepare request - var table = []struct { + var testCases = []struct { byteRange string expectedString string }{ - {"6-7", "Wo"}, + {"4-7", "o Wo"}, + {"1-", "ello World"}, {"6-", "World"}, + {"-2", "ld"}, {"-7", "o World"}, } - for _, t := range table { + for _, t := range testCases { request, err = newTestRequest("GET", s.testServer.Server.URL+"/partial-content/bar", 0, nil, s.testServer.AccessKey, s.testServer.SecretKey) c.Assert(err, IsNil)