Compare commits
5 Commits
copilot/fi
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f6296b905 | ||
|
|
4f44a61b3a | ||
|
|
362491a650 | ||
|
|
b818331420 | ||
|
|
c714159d5c |
@@ -61,6 +61,8 @@ add_scylla_test(compound_test
|
|||||||
KIND SEASTAR)
|
KIND SEASTAR)
|
||||||
add_scylla_test(compress_test
|
add_scylla_test(compress_test
|
||||||
KIND BOOST)
|
KIND BOOST)
|
||||||
|
add_scylla_test(gzip_test
|
||||||
|
KIND SEASTAR)
|
||||||
add_scylla_test(config_test
|
add_scylla_test(config_test
|
||||||
KIND SEASTAR)
|
KIND SEASTAR)
|
||||||
add_scylla_test(continuous_data_consumer_test
|
add_scylla_test(continuous_data_consumer_test
|
||||||
|
|||||||
317
test/boost/gzip_test.cc
Normal file
317
test/boost/gzip_test.cc
Normal file
@@ -0,0 +1,317 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2025-present ScyllaDB
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <boost/test/unit_test.hpp>
|
||||||
|
#include <seastar/core/thread.hh>
|
||||||
|
#include <seastar/testing/test_case.hh>
|
||||||
|
#include "utils/gzip.hh"
|
||||||
|
#include <libdeflate.h>
|
||||||
|
#include <cstring>
|
||||||
|
|
||||||
|
using namespace seastar;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
// Helper function to compress data with gzip
|
||||||
|
std::vector<char> gzip_compress(const std::vector<char>& data) {
|
||||||
|
auto* compressor = libdeflate_alloc_compressor(6);
|
||||||
|
if (!compressor) {
|
||||||
|
throw std::bad_alloc();
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t max_compressed_size = libdeflate_gzip_compress_bound(compressor, data.size());
|
||||||
|
std::vector<char> compressed(max_compressed_size);
|
||||||
|
|
||||||
|
size_t actual_size = libdeflate_gzip_compress(
|
||||||
|
compressor,
|
||||||
|
data.data(),
|
||||||
|
data.size(),
|
||||||
|
compressed.data(),
|
||||||
|
compressed.size()
|
||||||
|
);
|
||||||
|
|
||||||
|
libdeflate_free_compressor(compressor);
|
||||||
|
|
||||||
|
compressed.resize(actual_size);
|
||||||
|
return compressed;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert vector to chunked_content
|
||||||
|
rjson::chunked_content to_chunked_content(const std::vector<char>& data) {
|
||||||
|
rjson::chunked_content result;
|
||||||
|
temporary_buffer<char> buf(data.size());
|
||||||
|
std::memcpy(buf.get_write(), data.data(), data.size());
|
||||||
|
result.push_back(std::move(buf));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert chunked_content to vector
|
||||||
|
std::vector<char> from_chunked_content(const rjson::chunked_content& chunks) {
|
||||||
|
std::vector<char> result;
|
||||||
|
for (const auto& chunk : chunks) {
|
||||||
|
result.insert(result.end(), chunk.begin(), chunk.end());
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_simple) {
|
||||||
|
return async([] {
|
||||||
|
// Test simple gzip compression/decompression
|
||||||
|
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o', ',', ' ', 'W', 'o', 'r', 'l', 'd', '!'};
|
||||||
|
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
auto chunked_compressed = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
|
||||||
|
auto decompressed = from_chunked_content(decompressed_chunks);
|
||||||
|
|
||||||
|
BOOST_REQUIRE_EQUAL(decompressed.size(), original_data.size());
|
||||||
|
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), original_data.begin()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_empty) {
|
||||||
|
return async([] {
|
||||||
|
// Test empty input
|
||||||
|
std::vector<char> original_data;
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
auto chunked_compressed = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
|
||||||
|
auto decompressed = from_chunked_content(decompressed_chunks);
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(decompressed.size(), 0);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_large_data) {
|
||||||
|
return async([] {
|
||||||
|
// Test with larger data that compresses well
|
||||||
|
std::vector<char> original_data(10000, 'A');
|
||||||
|
original_data.insert(original_data.end(), 10000, 'B');
|
||||||
|
original_data.insert(original_data.end(), 10000, 'C');
|
||||||
|
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
auto chunked_compressed = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 100000).get();
|
||||||
|
auto decompressed = from_chunked_content(decompressed_chunks);
|
||||||
|
|
||||||
|
BOOST_REQUIRE_EQUAL(decompressed.size(), original_data.size());
|
||||||
|
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), original_data.begin()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_concatenated) {
|
||||||
|
return async([] {
|
||||||
|
// Test multiple concatenated gzip files
|
||||||
|
std::vector<char> data1 = {'H', 'e', 'l', 'l', 'o'};
|
||||||
|
std::vector<char> data2 = {'W', 'o', 'r', 'l', 'd'};
|
||||||
|
|
||||||
|
auto compressed1 = gzip_compress(data1);
|
||||||
|
auto compressed2 = gzip_compress(data2);
|
||||||
|
|
||||||
|
// Concatenate the compressed data
|
||||||
|
std::vector<char> concatenated;
|
||||||
|
concatenated.insert(concatenated.end(), compressed1.begin(), compressed1.end());
|
||||||
|
concatenated.insert(concatenated.end(), compressed2.begin(), compressed2.end());
|
||||||
|
|
||||||
|
auto chunked_compressed = to_chunked_content(concatenated);
|
||||||
|
|
||||||
|
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
|
||||||
|
auto decompressed = from_chunked_content(decompressed_chunks);
|
||||||
|
|
||||||
|
// Should decompress to "HelloWorld"
|
||||||
|
std::vector<char> expected;
|
||||||
|
expected.insert(expected.end(), data1.begin(), data1.end());
|
||||||
|
expected.insert(expected.end(), data2.begin(), data2.end());
|
||||||
|
|
||||||
|
BOOST_REQUIRE_EQUAL(decompressed.size(), expected.size());
|
||||||
|
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), expected.begin()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_multiple_concatenated) {
|
||||||
|
return async([] {
|
||||||
|
// Test multiple concatenated gzip files (more than 2)
|
||||||
|
std::vector<std::vector<char>> parts = {
|
||||||
|
{'A', 'B', 'C'},
|
||||||
|
{'D', 'E', 'F'},
|
||||||
|
{'G', 'H', 'I'},
|
||||||
|
{'J', 'K', 'L'}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<char> concatenated;
|
||||||
|
std::vector<char> expected;
|
||||||
|
|
||||||
|
for (const auto& part : parts) {
|
||||||
|
auto compressed = gzip_compress(part);
|
||||||
|
concatenated.insert(concatenated.end(), compressed.begin(), compressed.end());
|
||||||
|
expected.insert(expected.end(), part.begin(), part.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
auto chunked_compressed = to_chunked_content(concatenated);
|
||||||
|
|
||||||
|
auto decompressed_chunks = utils::ungzip(std::move(chunked_compressed), 1024).get();
|
||||||
|
auto decompressed = from_chunked_content(decompressed_chunks);
|
||||||
|
|
||||||
|
BOOST_REQUIRE_EQUAL(decompressed.size(), expected.size());
|
||||||
|
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), expected.begin()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_invalid_magic) {
|
||||||
|
return async([] {
|
||||||
|
// Test invalid gzip magic bytes
|
||||||
|
std::vector<char> bad_data = {0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
|
||||||
|
auto chunked = to_chunked_content(bad_data);
|
||||||
|
|
||||||
|
BOOST_CHECK_THROW(
|
||||||
|
utils::ungzip(std::move(chunked), 1024).get(),
|
||||||
|
std::runtime_error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_truncated) {
|
||||||
|
return async([] {
|
||||||
|
// Test truncated gzip data
|
||||||
|
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o'};
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
|
||||||
|
// Truncate the compressed data
|
||||||
|
compressed.resize(compressed.size() / 2);
|
||||||
|
auto chunked = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
BOOST_CHECK_THROW(
|
||||||
|
utils::ungzip(std::move(chunked), 1024).get(),
|
||||||
|
std::runtime_error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_corrupted) {
|
||||||
|
return async([] {
|
||||||
|
// Test corrupted gzip data
|
||||||
|
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
|
||||||
|
// Corrupt some bytes in the middle
|
||||||
|
if (compressed.size() > 20) {
|
||||||
|
compressed[15] ^= 0xFF;
|
||||||
|
compressed[16] ^= 0xFF;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto chunked = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
BOOST_CHECK_THROW(
|
||||||
|
utils::ungzip(std::move(chunked), 1024).get(),
|
||||||
|
std::runtime_error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_junk_appended) {
|
||||||
|
return async([] {
|
||||||
|
// Test gzip data with junk appended
|
||||||
|
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o'};
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
|
||||||
|
// Append junk
|
||||||
|
std::vector<char> junk = {'J', 'U', 'N', 'K'};
|
||||||
|
compressed.insert(compressed.end(), junk.begin(), junk.end());
|
||||||
|
|
||||||
|
auto chunked = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
BOOST_CHECK_THROW(
|
||||||
|
utils::ungzip(std::move(chunked), 1024).get(),
|
||||||
|
std::runtime_error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_length_limit_exceeded) {
|
||||||
|
return async([] {
|
||||||
|
// Test length limit enforcement
|
||||||
|
std::vector<char> original_data(1000, 'A');
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
auto chunked = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
// Set limit lower than actual size
|
||||||
|
BOOST_CHECK_THROW(
|
||||||
|
utils::ungzip(std::move(chunked), 500).get(),
|
||||||
|
std::runtime_error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_length_limit_exact) {
|
||||||
|
return async([] {
|
||||||
|
// Test that exact limit works
|
||||||
|
std::vector<char> original_data(1000, 'B');
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
auto chunked = to_chunked_content(compressed);
|
||||||
|
|
||||||
|
// Set limit to exact size
|
||||||
|
auto decompressed_chunks = utils::ungzip(std::move(chunked), 1000).get();
|
||||||
|
auto decompressed = from_chunked_content(decompressed_chunks);
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(decompressed.size(), 1000);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_very_short_input) {
|
||||||
|
return async([] {
|
||||||
|
// Test with input too short to be valid gzip
|
||||||
|
std::vector<char> bad_data = {0x1f, 0x8b};
|
||||||
|
auto chunked = to_chunked_content(bad_data);
|
||||||
|
|
||||||
|
BOOST_CHECK_THROW(
|
||||||
|
utils::ungzip(std::move(chunked), 1024).get(),
|
||||||
|
std::runtime_error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_empty_input) {
|
||||||
|
return async([] {
|
||||||
|
// Test with completely empty input
|
||||||
|
rjson::chunked_content empty;
|
||||||
|
|
||||||
|
BOOST_CHECK_THROW(
|
||||||
|
utils::ungzip(std::move(empty), 1024).get(),
|
||||||
|
std::runtime_error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_ungzip_chunked_input) {
|
||||||
|
return async([] {
|
||||||
|
// Test with input split across multiple chunks
|
||||||
|
std::vector<char> original_data = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd', '!'};
|
||||||
|
auto compressed = gzip_compress(original_data);
|
||||||
|
|
||||||
|
// Split compressed data into multiple chunks
|
||||||
|
rjson::chunked_content chunked;
|
||||||
|
size_t chunk_size = compressed.size() / 3 + 1;
|
||||||
|
for (size_t i = 0; i < compressed.size(); i += chunk_size) {
|
||||||
|
size_t this_chunk_size = std::min(chunk_size, compressed.size() - i);
|
||||||
|
temporary_buffer<char> buf(this_chunk_size);
|
||||||
|
std::memcpy(buf.get_write(), compressed.data() + i, this_chunk_size);
|
||||||
|
chunked.push_back(std::move(buf));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto decompressed_chunks = utils::ungzip(std::move(chunked), 1024).get();
|
||||||
|
auto decompressed = from_chunked_content(decompressed_chunks);
|
||||||
|
|
||||||
|
BOOST_REQUIRE_EQUAL(decompressed.size(), original_data.size());
|
||||||
|
BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), original_data.begin()));
|
||||||
|
});
|
||||||
|
}
|
||||||
@@ -22,6 +22,7 @@ target_sources(utils
|
|||||||
error_injection.cc
|
error_injection.cc
|
||||||
exceptions.cc
|
exceptions.cc
|
||||||
file_lock.cc
|
file_lock.cc
|
||||||
|
gzip.cc
|
||||||
gz/crc_combine.cc
|
gz/crc_combine.cc
|
||||||
gz/crc_combine_table.cc
|
gz/crc_combine_table.cc
|
||||||
hashers.cc
|
hashers.cc
|
||||||
@@ -79,6 +80,7 @@ target_link_libraries(utils
|
|||||||
Boost::regex
|
Boost::regex
|
||||||
crypto
|
crypto
|
||||||
cryptopp::cryptopp
|
cryptopp::cryptopp
|
||||||
|
libdeflate::libdeflate
|
||||||
rapidxml::rapidxml
|
rapidxml::rapidxml
|
||||||
yaml-cpp::yaml-cpp
|
yaml-cpp::yaml-cpp
|
||||||
GnuTLS::gnutls)
|
GnuTLS::gnutls)
|
||||||
|
|||||||
187
utils/gzip.cc
Normal file
187
utils/gzip.cc
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2025-present ScyllaDB
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "utils/gzip.hh"
|
||||||
|
#include <libdeflate.h>
|
||||||
|
#include <seastar/core/coroutine.hh>
|
||||||
|
#include <seastar/core/temporary_buffer.hh>
|
||||||
|
#include <seastar/core/thread.hh>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
using namespace seastar;
|
||||||
|
|
||||||
|
namespace utils {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
// Maximum size for a single output chunk (1 MB)
|
||||||
|
constexpr size_t MAX_OUTPUT_CHUNK_SIZE = 1024 * 1024;
|
||||||
|
|
||||||
|
// Build a contiguous buffer from chunks for decompression
|
||||||
|
// This is necessary because libdeflate requires complete input
|
||||||
|
// We build the buffer incrementally and free input chunks as we go
|
||||||
|
std::vector<char> build_input_buffer(rjson::chunked_content& chunks) {
|
||||||
|
size_t total_size = 0;
|
||||||
|
for (const auto& chunk : chunks) {
|
||||||
|
total_size += chunk.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<char> result;
|
||||||
|
result.reserve(total_size);
|
||||||
|
|
||||||
|
for (auto& chunk : chunks) {
|
||||||
|
result.insert(result.end(), chunk.begin(), chunk.end());
|
||||||
|
// Free the chunk immediately after copying to save memory
|
||||||
|
chunk = temporary_buffer<char>();
|
||||||
|
}
|
||||||
|
chunks.clear();
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
|
future<rjson::chunked_content> ungzip(rjson::chunked_content&& compressed_body, size_t length_limit) {
|
||||||
|
// Use thread context for potentially blocking operations
|
||||||
|
return seastar::async([compressed_body = std::move(compressed_body), length_limit] () mutable {
|
||||||
|
if (compressed_body.empty()) {
|
||||||
|
throw std::runtime_error("Invalid gzip data: empty input");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build input buffer from chunks, freeing them as we go
|
||||||
|
// Unfortunately, libdeflate requires the complete compressed input at once
|
||||||
|
std::vector<char> compressed_data = build_input_buffer(compressed_body);
|
||||||
|
|
||||||
|
if (compressed_data.empty()) {
|
||||||
|
throw std::runtime_error("Invalid gzip data: empty input");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create decompressor
|
||||||
|
auto* decompressor = libdeflate_alloc_decompressor();
|
||||||
|
if (!decompressor) {
|
||||||
|
throw std::bad_alloc();
|
||||||
|
}
|
||||||
|
|
||||||
|
// RAII wrapper for decompressor
|
||||||
|
auto decompressor_deleter = [](libdeflate_decompressor* d) {
|
||||||
|
if (d) {
|
||||||
|
libdeflate_free_decompressor(d);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
std::unique_ptr<libdeflate_decompressor, decltype(decompressor_deleter)> decompressor_guard(
|
||||||
|
decompressor, decompressor_deleter);
|
||||||
|
|
||||||
|
rjson::chunked_content result;
|
||||||
|
size_t total_decompressed = 0;
|
||||||
|
size_t input_offset = 0;
|
||||||
|
|
||||||
|
// Process potentially multiple concatenated gzip members
|
||||||
|
// libdeflate_gzip_decompress handles all gzip format details (headers, trailers, etc.)
|
||||||
|
while (input_offset < compressed_data.size()) {
|
||||||
|
const char* current_input = compressed_data.data() + input_offset;
|
||||||
|
size_t remaining_input = compressed_data.size() - input_offset;
|
||||||
|
|
||||||
|
// Check if we've reached the limit before starting decompression
|
||||||
|
if (total_decompressed >= length_limit) {
|
||||||
|
throw std::runtime_error("Decompressed data exceeds length limit");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate output buffer - start with a reasonable size and grow if needed
|
||||||
|
// Limit chunk size to avoid allocating too much at once
|
||||||
|
const size_t initial_chunk_size = std::min({
|
||||||
|
size_t(MAX_OUTPUT_CHUNK_SIZE),
|
||||||
|
length_limit - total_decompressed,
|
||||||
|
remaining_input * 10 // Heuristic: decompressed size often < 10x compressed
|
||||||
|
});
|
||||||
|
std::vector<char> output_buffer(initial_chunk_size);
|
||||||
|
|
||||||
|
size_t actual_in_bytes = 0;
|
||||||
|
size_t actual_out_bytes = 0;
|
||||||
|
|
||||||
|
// Try decompression with progressively larger output buffers if needed
|
||||||
|
libdeflate_result res;
|
||||||
|
size_t max_output_size = length_limit - total_decompressed;
|
||||||
|
|
||||||
|
for (size_t attempt = 0; attempt < 10; ++attempt) {
|
||||||
|
res = libdeflate_gzip_decompress(
|
||||||
|
decompressor,
|
||||||
|
current_input,
|
||||||
|
remaining_input,
|
||||||
|
output_buffer.data(),
|
||||||
|
output_buffer.size(),
|
||||||
|
&actual_in_bytes,
|
||||||
|
&actual_out_bytes
|
||||||
|
);
|
||||||
|
|
||||||
|
if (res == LIBDEFLATE_SUCCESS) {
|
||||||
|
break;
|
||||||
|
} else if (res == LIBDEFLATE_INSUFFICIENT_SPACE) {
|
||||||
|
// Need a larger output buffer
|
||||||
|
size_t new_size = std::min(output_buffer.size() * 2, max_output_size);
|
||||||
|
if (new_size <= output_buffer.size()) {
|
||||||
|
throw std::runtime_error("Decompressed data exceeds length limit");
|
||||||
|
}
|
||||||
|
output_buffer.resize(new_size);
|
||||||
|
} else {
|
||||||
|
// Other error (bad data, short input, etc.)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res != LIBDEFLATE_SUCCESS) {
|
||||||
|
if (res == LIBDEFLATE_BAD_DATA) {
|
||||||
|
throw std::runtime_error("Invalid gzip data: corrupt or truncated");
|
||||||
|
} else if (res == LIBDEFLATE_SHORT_OUTPUT) {
|
||||||
|
throw std::runtime_error("Decompressed data exceeds length limit");
|
||||||
|
} else if (res == LIBDEFLATE_INSUFFICIENT_SPACE) {
|
||||||
|
throw std::runtime_error("Decompressed data exceeds length limit");
|
||||||
|
} else {
|
||||||
|
throw std::runtime_error("Gzip decompression failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// libdeflate_gzip_decompress returns how many bytes were consumed
|
||||||
|
// This includes the entire gzip member (header, compressed data, and trailer)
|
||||||
|
if (actual_in_bytes == 0) {
|
||||||
|
throw std::runtime_error("Invalid gzip data: no bytes consumed");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check total size limit
|
||||||
|
total_decompressed += actual_out_bytes;
|
||||||
|
if (total_decompressed > length_limit) {
|
||||||
|
throw std::runtime_error("Decompressed data exceeds length limit");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move decompressed data into temporary_buffer chunks
|
||||||
|
// Split into reasonably-sized chunks to avoid holding too much contiguous memory
|
||||||
|
size_t offset = 0;
|
||||||
|
while (offset < actual_out_bytes) {
|
||||||
|
size_t chunk_size = std::min(MAX_OUTPUT_CHUNK_SIZE, actual_out_bytes - offset);
|
||||||
|
temporary_buffer<char> chunk(chunk_size);
|
||||||
|
std::memcpy(chunk.get_write(), output_buffer.data() + offset, chunk_size);
|
||||||
|
result.push_back(std::move(chunk));
|
||||||
|
offset += chunk_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move to the next gzip member
|
||||||
|
input_offset += actual_in_bytes;
|
||||||
|
|
||||||
|
// Yield to the reactor periodically
|
||||||
|
seastar::thread::maybe_yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we consumed all input
|
||||||
|
if (input_offset != compressed_data.size()) {
|
||||||
|
throw std::runtime_error("Invalid gzip data: unconsumed trailing data");
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace utils
|
||||||
32
utils/gzip.hh
Normal file
32
utils/gzip.hh
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2025-present ScyllaDB
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <seastar/core/future.hh>
|
||||||
|
#include "utils/rjson.hh"
|
||||||
|
|
||||||
|
namespace utils {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decompresses gzip-compressed data stored in chunked_content format.
|
||||||
|
*
|
||||||
|
* @param compressed_body The gzip-compressed data in chunked_content format
|
||||||
|
* @param length_limit Maximum allowed size of the uncompressed data (throws if exceeded)
|
||||||
|
* @return A future containing the uncompressed data in chunked_content format
|
||||||
|
*
|
||||||
|
* Features:
|
||||||
|
* - Supports concatenated gzip files (multiple gzip files appended together)
|
||||||
|
* - Throws exception if input is not valid gzip
|
||||||
|
* - Throws exception if input is truncated
|
||||||
|
* - Throws exception if non-gzip junk is appended
|
||||||
|
* - Throws exception if uncompressed size exceeds length_limit
|
||||||
|
*/
|
||||||
|
seastar::future<rjson::chunked_content> ungzip(rjson::chunked_content&& compressed_body, size_t length_limit);
|
||||||
|
|
||||||
|
} // namespace utils
|
||||||
Reference in New Issue
Block a user