Compare commits

...

5 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
8f6296b905 Simplify ungzip implementation per review feedback
- Remove manual gzip header parsing - libdeflate handles all format details
- Rename linearize_chunked_content to build_input_buffer and free chunks as we copy
- Add output chunking to split large decompressed data into 1MB chunks
- Add comment explaining libdeflate's whole-buffer requirement
- Use better initial size heuristic based on compression ratio

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 12:47:02 +00:00
copilot-swe-agent[bot]
4f44a61b3a Add edge case check for length limit in ungzip
- Check if total_decompressed >= length_limit before allocating output buffer
- Prevents allocating a zero-sized buffer when limit is already reached
- Ensures clear error message when limit is exceeded

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 11:50:31 +00:00
copilot-swe-agent[bot]
362491a650 Fix ungzip implementation to properly handle concatenated gzip files
- Removed unused get_gzip_member_size function
- Rely on libdeflate_gzip_decompress to tell us how many input bytes were consumed
- Added check for zero bytes consumed to detect invalid state
- Simplified the logic by removing unnecessary header size tracking

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 11:48:35 +00:00
copilot-swe-agent[bot]
b818331420 Add ungzip function implementation with libdeflate
- Created utils/gzip.hh header with ungzip function declaration
- Created utils/gzip.cc implementation using libdeflate
- Updated utils/CMakeLists.txt to include gzip.cc and link libdeflate
- Created comprehensive test suite in test/boost/gzip_test.cc
- Added gzip_test to test/boost/CMakeLists.txt

The implementation:
- Uses libdeflate for high-performance gzip decompression
- Handles chunked_content input/output (vector of temporary_buffer)
- Supports concatenated gzip files
- Validates gzip headers and detects invalid/truncated/corrupted data
- Enforces size limits to prevent memory exhaustion
- Runs in async context to avoid blocking the reactor

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 11:46:29 +00:00
copilot-swe-agent[bot]
c714159d5c Initial plan 2025-11-19 11:32:38 +00:00
5 changed files with 540 additions and 0 deletions

View File

@@ -61,6 +61,8 @@ add_scylla_test(compound_test
KIND SEASTAR)
add_scylla_test(compress_test
KIND BOOST)
add_scylla_test(gzip_test
KIND SEASTAR)
add_scylla_test(config_test
KIND SEASTAR)
add_scylla_test(continuous_data_consumer_test

317
test/boost/gzip_test.cc Normal file
View File

@@ -0,0 +1,317 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <boost/test/unit_test.hpp>
#include <seastar/core/thread.hh>
#include <seastar/testing/test_case.hh>
#include "utils/gzip.hh"
#include <libdeflate.h>
#include <cstring>
using namespace seastar;
namespace {
// Helper function to compress data with gzip
std::vector<char> gzip_compress(const std::vector<char>& data) {
auto* compressor = libdeflate_alloc_compressor(6);
if (!compressor) {
throw std::bad_alloc();
}
size_t max_compressed_size = libdeflate_gzip_compress_bound(compressor, data.size());
std::vector<char> compressed(max_compressed_size);
size_t actual_size = libdeflate_gzip_compress(
compressor,
data.data(),
data.size(),
compressed.data(),
compressed.size()
);
libdeflate_free_compressor(compressor);
compressed.resize(actual_size);
return compressed;
}
// Convert vector to chunked_content
rjson::chunked_content to_chunked_content(const std::vector<char>& data) {
rjson::chunked_content result;
temporary_buffer<char> buf(data.size());
std::memcpy(buf.get_write(), data.data(), data.size());
result.push_back(std::move(buf));
return result;
}
// Convert chunked_content to vector
std::vector<char> from_chunked_content(const rjson::chunked_content& chunks) {
std::vector<char> result;
for (const auto& chunk : chunks) {
result.insert(result.end(), chunk.begin(), chunk.end());
}
return result;
}
} // anonymous namespace
SEASTAR_TEST_CASE(test_ungzip_simple) {
return async([] {
// Test simple gzip compression/decompression
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o', ',', ' ', 'W', 'o', 'r', 'l', 'd', '!'};
auto compressed = gzip_compress(original_data);
auto chunked_compressed = to_chunked_content(compressed);
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
auto decompressed = from_chunked_content(decompressed_chunks);
BOOST_REQUIRE_EQUAL(decompressed.size(), original_data.size());
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), original_data.begin()));
});
}
SEASTAR_TEST_CASE(test_ungzip_empty) {
return async([] {
// Test empty input
std::vector<char> original_data;
auto compressed = gzip_compress(original_data);
auto chunked_compressed = to_chunked_content(compressed);
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
auto decompressed = from_chunked_content(decompressed_chunks);
BOOST_CHECK_EQUAL(decompressed.size(), 0);
});
}
SEASTAR_TEST_CASE(test_ungzip_large_data) {
return async([] {
// Test with larger data that compresses well
std::vector<char> original_data(10000, 'A');
original_data.insert(original_data.end(), 10000, 'B');
original_data.insert(original_data.end(), 10000, 'C');
auto compressed = gzip_compress(original_data);
auto chunked_compressed = to_chunked_content(compressed);
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 100000).get();
auto decompressed = from_chunked_content(decompressed_chunks);
BOOST_REQUIRE_EQUAL(decompressed.size(), original_data.size());
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), original_data.begin()));
});
}
SEASTAR_TEST_CASE(test_ungzip_concatenated) {
return async([] {
// Test multiple concatenated gzip files
std::vector<char> data1 = {'H', 'e', 'l', 'l', 'o'};
std::vector<char> data2 = {'W', 'o', 'r', 'l', 'd'};
auto compressed1 = gzip_compress(data1);
auto compressed2 = gzip_compress(data2);
// Concatenate the compressed data
std::vector<char> concatenated;
concatenated.insert(concatenated.end(), compressed1.begin(), compressed1.end());
concatenated.insert(concatenated.end(), compressed2.begin(), compressed2.end());
auto chunked_compressed = to_chunked_content(concatenated);
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
auto decompressed = from_chunked_content(decompressed_chunks);
// Should decompress to "HelloWorld"
std::vector<char> expected;
expected.insert(expected.end(), data1.begin(), data1.end());
expected.insert(expected.end(), data2.begin(), data2.end());
BOOST_REQUIRE_EQUAL(decompressed.size(), expected.size());
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), expected.begin()));
});
}
SEASTAR_TEST_CASE(test_ungzip_multiple_concatenated) {
return async([] {
// Test multiple concatenated gzip files (more than 2)
std::vector<std::vector<char>> parts = {
{'A', 'B', 'C'},
{'D', 'E', 'F'},
{'G', 'H', 'I'},
{'J', 'K', 'L'}
};
std::vector<char> concatenated;
std::vector<char> expected;
for (const auto& part : parts) {
auto compressed = gzip_compress(part);
concatenated.insert(concatenated.end(), compressed.begin(), compressed.end());
expected.insert(expected.end(), part.begin(), part.end());
}
auto chunked_compressed = to_chunked_content(concatenated);
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
auto decompressed = from_chunked_content(decompressed_chunks);
BOOST_REQUIRE_EQUAL(decompressed.size(), expected.size());
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), expected.begin()));
});
}
SEASTAR_TEST_CASE(test_ungzip_invalid_magic) {
return async([] {
// Test invalid gzip magic bytes
std::vector<char> bad_data = {0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
auto chunked = to_chunked_content(bad_data);
BOOST_CHECK_THROW(
utils::ungzip(std::move(chunked), 1024).get(),
std::runtime_error
);
});
}
SEASTAR_TEST_CASE(test_ungzip_truncated) {
return async([] {
// Test truncated gzip data
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o'};
auto compressed = gzip_compress(original_data);
// Truncate the compressed data
compressed.resize(compressed.size() / 2);
auto chunked = to_chunked_content(compressed);
BOOST_CHECK_THROW(
utils::ungzip(std::move(chunked), 1024).get(),
std::runtime_error
);
});
}
SEASTAR_TEST_CASE(test_ungzip_corrupted) {
return async([] {
// Test corrupted gzip data
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};
auto compressed = gzip_compress(original_data);
// Corrupt some bytes in the middle
if (compressed.size() > 20) {
compressed[15] ^= 0xFF;
compressed[16] ^= 0xFF;
}
auto chunked = to_chunked_content(compressed);
BOOST_CHECK_THROW(
utils::ungzip(std::move(chunked), 1024).get(),
std::runtime_error
);
});
}
SEASTAR_TEST_CASE(test_ungzip_junk_appended) {
return async([] {
// Test gzip data with junk appended
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o'};
auto compressed = gzip_compress(original_data);
// Append junk
std::vector<char> junk = {'J', 'U', 'N', 'K'};
compressed.insert(compressed.end(), junk.begin(), junk.end());
auto chunked = to_chunked_content(compressed);
BOOST_CHECK_THROW(
utils::ungzip(std::move(chunked), 1024).get(),
std::runtime_error
);
});
}
SEASTAR_TEST_CASE(test_ungzip_length_limit_exceeded) {
return async([] {
// Test length limit enforcement
std::vector<char> original_data(1000, 'A');
auto compressed = gzip_compress(original_data);
auto chunked = to_chunked_content(compressed);
// Set limit lower than actual size
BOOST_CHECK_THROW(
utils::ungzip(std::move(chunked), 500).get(),
std::runtime_error
);
});
}
SEASTAR_TEST_CASE(test_ungzip_length_limit_exact) {
return async([] {
// Test that exact limit works
std::vector<char> original_data(1000, 'B');
auto compressed = gzip_compress(original_data);
auto chunked = to_chunked_content(compressed);
// Set limit to exact size
auto decompressed_chunks = utils::ungzip(std::move(chunked), 1000).get();
auto decompressed = from_chunked_content(decompressed_chunks);
BOOST_CHECK_EQUAL(decompressed.size(), 1000);
});
}
SEASTAR_TEST_CASE(test_ungzip_very_short_input) {
return async([] {
// Test with input too short to be valid gzip
std::vector<char> bad_data = {0x1f, 0x8b};
auto chunked = to_chunked_content(bad_data);
BOOST_CHECK_THROW(
utils::ungzip(std::move(chunked), 1024).get(),
std::runtime_error
);
});
}
SEASTAR_TEST_CASE(test_ungzip_empty_input) {
return async([] {
// Test with completely empty input
rjson::chunked_content empty;
BOOST_CHECK_THROW(
utils::ungzip(std::move(empty), 1024).get(),
std::runtime_error
);
});
}
SEASTAR_TEST_CASE(test_ungzip_chunked_input) {
return async([] {
// Test with input split across multiple chunks
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd', '!'};
auto compressed = gzip_compress(original_data);
// Split compressed data into multiple chunks
rjson::chunked_content chunked;
size_t chunk_size = compressed.size() / 3 + 1;
for (size_t i = 0; i < compressed.size(); i += chunk_size) {
size_t this_chunk_size = std::min(chunk_size, compressed.size() - i);
temporary_buffer<char> buf(this_chunk_size);
std::memcpy(buf.get_write(), compressed.data() + i, this_chunk_size);
chunked.push_back(std::move(buf));
}
auto decompressed_chunks = utils::ungzip(std::move(chunked), 1024).get();
auto decompressed = from_chunked_content(decompressed_chunks);
BOOST_REQUIRE_EQUAL(decompressed.size(), original_data.size());
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), original_data.begin()));
});
}

View File

@@ -22,6 +22,7 @@ target_sources(utils
error_injection.cc
exceptions.cc
file_lock.cc
gzip.cc
gz/crc_combine.cc
gz/crc_combine_table.cc
hashers.cc
@@ -79,6 +80,7 @@ target_link_libraries(utils
Boost::regex
crypto
cryptopp::cryptopp
libdeflate::libdeflate
rapidxml::rapidxml
yaml-cpp::yaml-cpp
GnuTLS::gnutls)

187
utils/gzip.cc Normal file
View File

@@ -0,0 +1,187 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/gzip.hh"
#include <libdeflate.h>
#include <seastar/core/coroutine.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/core/thread.hh>
#include <stdexcept>
using namespace seastar;
namespace utils {
namespace {
// Maximum size for a single output chunk (1 MB)
constexpr size_t MAX_OUTPUT_CHUNK_SIZE = 1024 * 1024;
// Build a contiguous buffer from chunks for decompression
// This is necessary because libdeflate requires complete input
// We build the buffer incrementally and free input chunks as we go
std::vector<char> build_input_buffer(rjson::chunked_content& chunks) {
size_t total_size = 0;
for (const auto& chunk : chunks) {
total_size += chunk.size();
}
std::vector<char> result;
result.reserve(total_size);
for (auto& chunk : chunks) {
result.insert(result.end(), chunk.begin(), chunk.end());
// Free the chunk immediately after copying to save memory
chunk = temporary_buffer<char>();
}
chunks.clear();
return result;
}
} // anonymous namespace
future<rjson::chunked_content> ungzip(rjson::chunked_content&& compressed_body, size_t length_limit) {
// Use thread context for potentially blocking operations
return seastar::async([compressed_body = std::move(compressed_body), length_limit] () mutable {
if (compressed_body.empty()) {
throw std::runtime_error("Invalid gzip data: empty input");
}
// Build input buffer from chunks, freeing them as we go
// Unfortunately, libdeflate requires the complete compressed input at once
std::vector<char> compressed_data = build_input_buffer(compressed_body);
if (compressed_data.empty()) {
throw std::runtime_error("Invalid gzip data: empty input");
}
// Create decompressor
auto* decompressor = libdeflate_alloc_decompressor();
if (!decompressor) {
throw std::bad_alloc();
}
// RAII wrapper for decompressor
auto decompressor_deleter = [](libdeflate_decompressor* d) {
if (d) {
libdeflate_free_decompressor(d);
}
};
std::unique_ptr<libdeflate_decompressor, decltype(decompressor_deleter)> decompressor_guard(
decompressor, decompressor_deleter);
rjson::chunked_content result;
size_t total_decompressed = 0;
size_t input_offset = 0;
// Process potentially multiple concatenated gzip members
// libdeflate_gzip_decompress handles all gzip format details (headers, trailers, etc.)
while (input_offset < compressed_data.size()) {
const char* current_input = compressed_data.data() + input_offset;
size_t remaining_input = compressed_data.size() - input_offset;
// Check if we've reached the limit before starting decompression
if (total_decompressed >= length_limit) {
throw std::runtime_error("Decompressed data exceeds length limit");
}
// Allocate output buffer - start with a reasonable size and grow if needed
// Limit chunk size to avoid allocating too much at once
const size_t initial_chunk_size = std::min({
size_t(MAX_OUTPUT_CHUNK_SIZE),
length_limit - total_decompressed,
remaining_input * 10 // Heuristic: decompressed size often < 10x compressed
});
std::vector<char> output_buffer(initial_chunk_size);
size_t actual_in_bytes = 0;
size_t actual_out_bytes = 0;
// Try decompression with progressively larger output buffers if needed
libdeflate_result res;
size_t max_output_size = length_limit - total_decompressed;
for (size_t attempt = 0; attempt < 10; ++attempt) {
res = libdeflate_gzip_decompress(
decompressor,
current_input,
remaining_input,
output_buffer.data(),
output_buffer.size(),
&actual_in_bytes,
&actual_out_bytes
);
if (res == LIBDEFLATE_SUCCESS) {
break;
} else if (res == LIBDEFLATE_INSUFFICIENT_SPACE) {
// Need a larger output buffer
size_t new_size = std::min(output_buffer.size() * 2, max_output_size);
if (new_size <= output_buffer.size()) {
throw std::runtime_error("Decompressed data exceeds length limit");
}
output_buffer.resize(new_size);
} else {
// Other error (bad data, short input, etc.)
break;
}
}
if (res != LIBDEFLATE_SUCCESS) {
if (res == LIBDEFLATE_BAD_DATA) {
throw std::runtime_error("Invalid gzip data: corrupt or truncated");
} else if (res == LIBDEFLATE_SHORT_OUTPUT) {
throw std::runtime_error("Decompressed data exceeds length limit");
} else if (res == LIBDEFLATE_INSUFFICIENT_SPACE) {
throw std::runtime_error("Decompressed data exceeds length limit");
} else {
throw std::runtime_error("Gzip decompression failed");
}
}
// libdeflate_gzip_decompress returns how many bytes were consumed
// This includes the entire gzip member (header, compressed data, and trailer)
if (actual_in_bytes == 0) {
throw std::runtime_error("Invalid gzip data: no bytes consumed");
}
// Check total size limit
total_decompressed += actual_out_bytes;
if (total_decompressed > length_limit) {
throw std::runtime_error("Decompressed data exceeds length limit");
}
// Move decompressed data into temporary_buffer chunks
// Split into reasonably-sized chunks to avoid holding too much contiguous memory
size_t offset = 0;
while (offset < actual_out_bytes) {
size_t chunk_size = std::min(MAX_OUTPUT_CHUNK_SIZE, actual_out_bytes - offset);
temporary_buffer<char> chunk(chunk_size);
std::memcpy(chunk.get_write(), output_buffer.data() + offset, chunk_size);
result.push_back(std::move(chunk));
offset += chunk_size;
}
// Move to the next gzip member
input_offset += actual_in_bytes;
// Yield to the reactor periodically
seastar::thread::maybe_yield();
}
// Check if we consumed all input
if (input_offset != compressed_data.size()) {
throw std::runtime_error("Invalid gzip data: unconsumed trailing data");
}
return result;
});
}
} // namespace utils

32
utils/gzip.hh Normal file
View File

@@ -0,0 +1,32 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/future.hh>
#include "utils/rjson.hh"
namespace utils {
/**
* Decompresses gzip-compressed data stored in chunked_content format.
*
* @param compressed_body The gzip-compressed data in chunked_content format
* @param length_limit Maximum allowed size of the uncompressed data (throws if exceeded)
* @return A future containing the uncompressed data in chunked_content format
*
* Features:
* - Supports concatenated gzip files (multiple gzip files appended together)
* - Throws exception if input is not valid gzip
* - Throws exception if input is truncated
* - Throws exception if non-gzip junk is appended
* - Throws exception if uncompressed size exceeds length_limit
*/
seastar::future<rjson::chunked_content> ungzip(rjson::chunked_content&& compressed_body, size_t length_limit);
} // namespace utils