From b38b9fea799b9123ebe3f5c8a08d97901a3851ac Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Tue, 24 May 2016 20:14:32 +0530 Subject: [PATCH] XL/erasure: fix for skipping 0 padding. (#1737) Fixes #1736 --- erasure-readfile.go | 66 ++++++++++++++++++++++++++------------------- erasure-utils.go | 13 +++------ xl-v1-object.go | 4 ++- 3 files changed, 44 insertions(+), 39 deletions(-) diff --git a/erasure-readfile.go b/erasure-readfile.go index 9c35058a7..2c50fc677 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -23,7 +23,7 @@ import ( ) // ReadFile - decoded erasure coded file. -func (e erasure) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, error) { +func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int64) (io.ReadCloser, error) { // Input validation. if !isValidVolname(volume) { return nil, errInvalidArgument @@ -32,13 +32,13 @@ func (e erasure) ReadFile(volume, path string, startOffset int64) (io.ReadCloser return nil, errInvalidArgument } - var wg = &sync.WaitGroup{} + var rwg = &sync.WaitGroup{} readers := make([]io.ReadCloser, len(e.storageDisks)) for index, disk := range e.storageDisks { - wg.Add(1) + rwg.Add(1) go func(index int, disk StorageAPI) { - defer wg.Done() + defer rwg.Done() // If disk.ReadFile returns error and we don't have read // quorum it will be taken care as ReedSolomon.Reconstruct() // will fail later. @@ -49,18 +49,28 @@ func (e erasure) ReadFile(volume, path string, startOffset int64) (io.ReadCloser }(index, disk) } - wg.Wait() + // Wait for all readers. + rwg.Wait() // Initialize pipe. pipeReader, pipeWriter := io.Pipe() go func() { + var totalLeft = totalSize // Read until EOF. - for { + for totalLeft > 0 { + // Figure out the right blockSize as it was encoded + // before. + var curBlockSize int64 + if erasureBlockSize < totalLeft { + curBlockSize = erasureBlockSize + } else { + curBlockSize = totalLeft + } // Calculate the current encoded block size. - curEncBlockSize := getEncodedBlockLen(erasureBlockSize, e.DataBlocks) + curEncBlockSize := getEncodedBlockLen(curBlockSize, e.DataBlocks) enBlocks := make([][]byte, len(e.storageDisks)) - // Loop through all readers and read. + // Read all the readers. for index, reader := range readers { // Initialize shard slice and fill the data from each parts. enBlocks[index] = make([]byte, curEncBlockSize) @@ -68,26 +78,11 @@ func (e erasure) ReadFile(volume, path string, startOffset int64) (io.ReadCloser continue } // Read the necessary blocks. - n, rErr := io.ReadFull(reader, enBlocks[index]) - if rErr == io.EOF { - // Close the pipe. - pipeWriter.Close() - - // Cleanly close all the underlying data readers. - for _, reader := range readers { - if reader == nil { - continue - } - reader.Close() - } - return - } + _, rErr := io.ReadFull(reader, enBlocks[index]) if rErr != nil && rErr != io.ErrUnexpectedEOF { readers[index].Close() readers[index] = nil - continue } - enBlocks[index] = enBlocks[index][:n] } // Check blocks if they are all zero in length. @@ -131,17 +126,18 @@ func (e erasure) ReadFile(volume, path string, startOffset int64) (io.ReadCloser } // Get all the data blocks. - dataBlocks := getDataBlocks(enBlocks, e.DataBlocks) + dataBlocks := getDataBlocks(enBlocks, e.DataBlocks, int(curBlockSize)) - // Verify if the offset is right for the block, if not move to - // the next block. + // Verify if the offset is right for the block, if not move to the next block. if startOffset > 0 { startOffset = startOffset - int64(len(dataBlocks)) // Start offset is greater than or equal to zero, skip the dataBlocks. if startOffset >= 0 { + totalLeft = totalLeft - erasureBlockSize continue } - // Now get back the remaining offset if startOffset is negative. + // Now get back the remaining offset if startOffset is + // negative. startOffset = startOffset + int64(len(dataBlocks)) } @@ -154,6 +150,20 @@ func (e erasure) ReadFile(volume, path string, startOffset int64) (io.ReadCloser // Reset offset to '0' to read rest of the blocks. startOffset = int64(0) + + // Save what's left after reading erasureBlockSize. + totalLeft = totalLeft - erasureBlockSize + } + + // Cleanly end the pipe after a successful decoding. + pipeWriter.Close() + + // Cleanly close all the underlying data readers. + for _, reader := range readers { + if reader == nil { + continue + } + reader.Close() } }() diff --git a/erasure-utils.go b/erasure-utils.go index c291dda4a..ff505b143 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -17,19 +17,12 @@ package main // getDataBlocks - fetches the data block only part of the input encoded blocks. -func getDataBlocks(enBlocks [][]byte, dataBlocks int) []byte { +func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) []byte { var data []byte for _, block := range enBlocks[:dataBlocks] { - var newBlock []byte - // FIXME: Find a better way to skip the padding zeros. - for _, b := range block { - if b == 0 { - continue - } - newBlock = append(newBlock, b) - } - data = append(data, newBlock...) + data = append(data, block...) } + data = data[:curBlockSize] return data } diff --git a/xl-v1-object.go b/xl-v1-object.go index b74a1bba4..1291286f2 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -37,6 +37,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read return nil, toObjectErr(err, bucket, object) } + totalObjectSize := xlMeta.Stat.Size // Total object size. + // Hold a read lock once more which can be released after the following go-routine ends. // We hold RLock once more because the current function would return before the go routine below // executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call). @@ -45,7 +47,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read defer nsMutex.RUnlock(bucket, object) for ; partIndex < len(xlMeta.Parts); partIndex++ { part := xlMeta.Parts[partIndex] - r, err := xl.erasureDisk.ReadFile(bucket, pathJoin(object, part.Name), offset) + r, err := xl.erasureDisk.ReadFile(bucket, pathJoin(object, part.Name), offset, totalObjectSize) if err != nil { fileWriter.CloseWithError(err) return