When encrypted_data_source::get() caches a trailing block in _next, the next call takes it directly — bypassing input_stream::read(), which checks _eof. It then calls input_stream::read_exactly() on the already-drained stream. Unlike read(), read_up_to(), and consume(), read_exactly() does not check _eof when the buffer is empty, so it calls _fd.get() on a source that already returned EOS. In production this manifested as stuck encrypted SSTable component downloads during tablet restore: the underlying chunked_download_source hung forever on the post-EOS get(), causing 4 tablets to never complete. The stuck files were always block-aligned sizes (8k, 12k) where _next gets populated and the source is fully consumed in the same call. Fix by checking _input.eof() before calling read_exactly(). When the stream already reached EOF, buf2 is known to be empty, so the call is skipped entirely. A comprehensive test is added that uses a strict_memory_source which fails on post-EOS get(), reproducing the exact code path that caused the production deadlock.
723 lines
24 KiB
C++
723 lines
24 KiB
C++
/*
|
|
* Copyright (C) 2016 ScyllaDB
|
|
*/
|
|
|
|
|
|
|
|
#include <boost/test/unit_test.hpp>
|
|
#include <stdint.h>
|
|
#include <random>
|
|
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/core/fstream.hh>
|
|
|
|
#include <seastar/testing/test_case.hh>
|
|
|
|
#include "ent/encryption/encryption.hh"
|
|
#include "ent/encryption/symmetric_key.hh"
|
|
#include "ent/encryption/encrypted_file_impl.hh"
|
|
#include "test/lib/log.hh"
|
|
#include "test/lib/tmpdir.hh"
|
|
#include "test/lib/random_utils.hh"
|
|
#include "test/lib/exception_utils.hh"
|
|
#include "test/lib/limiting_data_source.hh"
|
|
#include "utils/io-wrappers.hh"
|
|
|
|
#include <seastar/util/memory-data-source.hh>
|
|
|
|
using namespace encryption;
|
|
|
|
static tmpdir dir;
|
|
|
|
static std::tuple<std::string, ::shared_ptr<symmetric_key>> make_filename(const std::string& name, ::shared_ptr<symmetric_key> k = nullptr) {
|
|
auto dst = std::string(dir.path() / std::string(name));
|
|
if (k == nullptr) {
|
|
key_info info{"AES/CBC", 256};
|
|
k = ::make_shared<symmetric_key>(info);
|
|
}
|
|
return std::make_tuple(dst, k);
|
|
}
|
|
|
|
static future<std::tuple<file, ::shared_ptr<symmetric_key>>> make_file(const std::string& name, open_flags mode, ::shared_ptr<symmetric_key> k_in = nullptr) {
|
|
auto [dst, k] = make_filename(name, std::move(k_in));
|
|
file f = co_await open_file_dma(dst, mode);
|
|
co_return std::tuple(file(make_encrypted_file(f, k)), k);
|
|
}
|
|
|
|
template<typename T = char>
|
|
static void fill_random(temporary_buffer<T>& buf) {
|
|
auto data = tests::random::get_sstring(buf.size());
|
|
std::copy(data.begin(), data.end(), buf.get_write());
|
|
}
|
|
|
|
template<typename T = char>
|
|
static temporary_buffer<T> generate_random(size_t n) {
|
|
temporary_buffer<T> tmp(n);
|
|
fill_random(tmp);
|
|
return tmp;
|
|
}
|
|
|
|
template<typename T = char>
|
|
static temporary_buffer<T> generate_random(size_t n, size_t align) {
|
|
auto tmp = temporary_buffer<T>::aligned(align, align_up(n, align));
|
|
fill_random(tmp);
|
|
return tmp;
|
|
}
|
|
|
|
static future<> test_random_data_disk(size_t n) {
|
|
auto name = "test_rand_" + std::to_string(n);
|
|
auto t = co_await make_file(name, open_flags::rw|open_flags::create);
|
|
auto f = std::get<0>(t);
|
|
std::exception_ptr ex = nullptr;
|
|
|
|
try {
|
|
auto k = std::get<1>(t);
|
|
auto a = f.memory_dma_alignment();
|
|
auto buf = generate_random(n, a);
|
|
auto w = co_await f.dma_write(0, buf.get(), buf.size());
|
|
|
|
co_await f.flush();
|
|
if (n != buf.size()) {
|
|
co_await f.truncate(n);
|
|
}
|
|
|
|
BOOST_REQUIRE_EQUAL(w, buf.size());
|
|
|
|
auto k2 = ::make_shared<symmetric_key>(k->info(), k->key());
|
|
auto f2 = std::get<0>(co_await make_file(name, open_flags::ro, k2));
|
|
|
|
auto tmp = temporary_buffer<uint8_t>::aligned(a, buf.size());
|
|
auto n2 = co_await f2.dma_read(0, tmp.get_write(), tmp.size());
|
|
|
|
BOOST_REQUIRE_EQUAL(n2, n);
|
|
BOOST_REQUIRE_EQUAL_COLLECTIONS(tmp.get(), tmp.get() + n2, buf.get(), buf.get() + n2);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await f.close();
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
|
|
static void test_random_data(size_t n) {
|
|
auto buf = generate_random(n, 8);
|
|
|
|
|
|
// first, verify padded.
|
|
{
|
|
key_info info{"AES/CBC/PKCSPadding", 256};
|
|
auto k = ::make_shared<symmetric_key>(info);
|
|
|
|
bytes b(bytes::initialized_later(), k->iv_len());
|
|
k->generate_iv(b.data(), k->iv_len());
|
|
|
|
temporary_buffer<uint8_t> tmp(n + k->block_size());
|
|
k->encrypt(buf.get(), buf.size(), tmp.get_write(), tmp.size(), b.data());
|
|
|
|
auto bytes = k->key();
|
|
auto k2 = ::make_shared<symmetric_key>(info, bytes);
|
|
|
|
temporary_buffer<uint8_t> tmp2(n + k->block_size());
|
|
k2->decrypt(tmp.get(), tmp.size(), tmp2.get_write(), tmp2.size(), b.data());
|
|
|
|
BOOST_REQUIRE_EQUAL_COLLECTIONS(tmp2.get(), tmp2.get() + n, buf.get(), buf.get() + n);
|
|
}
|
|
|
|
// unpadded
|
|
{
|
|
key_info info{"AES/CBC", 256};
|
|
auto k = ::make_shared<symmetric_key>(info);
|
|
|
|
bytes b(bytes::initialized_later(), k->iv_len());
|
|
k->generate_iv(b.data(), k->iv_len());
|
|
|
|
temporary_buffer<uint8_t> tmp(n);
|
|
k->encrypt_unpadded(buf.get(), buf.size(), tmp.get_write(), b.data());
|
|
|
|
auto bytes = k->key();
|
|
auto k2 = ::make_shared<symmetric_key>(info, bytes);
|
|
|
|
temporary_buffer<uint8_t> tmp2(buf.size());
|
|
k2->decrypt_unpadded(tmp.get(), tmp.size(), tmp2.get_write(), b.data());
|
|
|
|
BOOST_REQUIRE_EQUAL_COLLECTIONS(tmp2.get(), tmp2.get() + n, buf.get(), buf.get() + n);
|
|
}
|
|
}
|
|
|
|
|
|
BOOST_AUTO_TEST_CASE(test_encrypting_data_128) {
|
|
test_random_data(128);
|
|
}
|
|
|
|
BOOST_AUTO_TEST_CASE(test_encrypting_data_4k) {
|
|
test_random_data(4*1024);
|
|
}
|
|
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_file_data_4k) {
|
|
return test_random_data_disk(4*1024);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_file_data_16k) {
|
|
return test_random_data_disk(16*1024);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_file_data_unaligned) {
|
|
return test_random_data_disk(16*1024 - 3);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_file_data_unaligned2) {
|
|
return test_random_data_disk(16*1024 - 4092);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_short) {
|
|
auto name = "test_short";
|
|
file f = co_await open_file_dma(sstring(dir.path() / name), open_flags::rw|open_flags::create);
|
|
co_await f.truncate(1);
|
|
co_await f.close();
|
|
|
|
auto t = co_await make_file(name, open_flags::ro);
|
|
f = std::get<0>(t);
|
|
std::exception_ptr ex = nullptr;
|
|
|
|
try {
|
|
temporary_buffer<char> buf(f.memory_dma_alignment());
|
|
|
|
BOOST_REQUIRE_EXCEPTION(
|
|
co_await f.dma_read(0, buf.get_write(), buf.size()),
|
|
std::domain_error,
|
|
exception_predicate::message_contains("file size 1, expected 0 or at least 16")
|
|
);
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await f.close();
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_read_across_size_boundary) {
|
|
auto name = "test_read_across_size_boundary";
|
|
|
|
auto [dst, k] = co_await make_file(name, open_flags::rw|open_flags::create);
|
|
auto size = dst.disk_write_dma_alignment() - 1;
|
|
co_await dst.truncate(size);
|
|
co_await dst.close();
|
|
|
|
auto [f, _] = co_await make_file(name, open_flags::ro, k);
|
|
auto a = f.disk_write_dma_alignment();
|
|
auto m = f.memory_dma_alignment();
|
|
|
|
auto buf = temporary_buffer<char>::aligned(m, a);
|
|
auto n = co_await f.dma_read(0, buf.get_write(), buf.size());
|
|
|
|
auto buf2 = temporary_buffer<char>::aligned(m, a);
|
|
auto n2 = co_await f.dma_read(a, buf2.get_write(), buf2.size());
|
|
|
|
auto buf3 = temporary_buffer<char>::aligned(m, a);
|
|
std::vector<iovec> iov({{buf3.get_write(), buf3.size()}});
|
|
auto n3 = co_await f.dma_read(a, std::move(iov));
|
|
|
|
auto buf4 = co_await f.dma_read_bulk<char>(a, size_t(a));
|
|
|
|
co_await f.close();
|
|
|
|
BOOST_REQUIRE_EQUAL(size, n);
|
|
buf.trim(n);
|
|
for (auto c : buf) {
|
|
BOOST_REQUIRE_EQUAL(c, 0);
|
|
}
|
|
|
|
BOOST_REQUIRE_EQUAL(0, n2);
|
|
BOOST_REQUIRE_EQUAL(0, n3);
|
|
BOOST_REQUIRE_EQUAL(0, buf4.size());
|
|
}
|
|
|
|
static future<> test_read_across_size_boundary_unaligned_helper(int64_t size_off, int64_t read_off) {
|
|
auto name = "test_read_across_size_boundary_unaligned";
|
|
auto [dst, k] = co_await make_file(name, open_flags::rw|open_flags::create);
|
|
auto size = dst.disk_write_dma_alignment() + size_off;
|
|
co_await dst.truncate(size);
|
|
co_await dst.close();
|
|
|
|
auto [f, k2] = co_await make_file(name, open_flags::ro, k);
|
|
auto buf = co_await f.dma_read_bulk<char>(f.disk_write_dma_alignment() + read_off, size_t(f.disk_write_dma_alignment()));
|
|
|
|
co_await f.close();
|
|
|
|
BOOST_REQUIRE_EQUAL(0, buf.size());
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_read_across_size_boundary_unaligned) {
|
|
co_await test_read_across_size_boundary_unaligned_helper(-1, 1);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_read_across_size_boundary_unaligned2) {
|
|
co_await test_read_across_size_boundary_unaligned_helper(-2, -1);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_truncating_empty) {
|
|
auto name = "test_truncating_empty";
|
|
auto t = co_await make_file(name, open_flags::rw|open_flags::create);
|
|
auto f = std::get<0>(t);
|
|
auto k = std::get<1>(t);
|
|
auto s = 64 * f.memory_dma_alignment();
|
|
|
|
co_await f.truncate(s);
|
|
|
|
temporary_buffer<char> buf(s);
|
|
auto n = co_await f.dma_read(0, buf.get_write(), buf.size());
|
|
|
|
co_await f.close();
|
|
|
|
BOOST_REQUIRE_EQUAL(s, n);
|
|
|
|
for (auto c : buf) {
|
|
BOOST_REQUIRE_EQUAL(c, 0);
|
|
}
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_truncating_extend) {
|
|
auto name = "test_truncating_extend";
|
|
auto t = co_await make_file(name, open_flags::rw|open_flags::create);
|
|
auto f = std::get<0>(t);
|
|
auto k = std::get<1>(t);
|
|
auto a = f.memory_dma_alignment();
|
|
auto s = 32 * a;
|
|
auto buf = generate_random(s, a);
|
|
auto w = co_await f.dma_write(0, buf.get(), buf.size());
|
|
|
|
co_await f.flush();
|
|
BOOST_REQUIRE_EQUAL(s, w);
|
|
|
|
for (size_t i = 1; i < 64; ++i) {
|
|
// truncate smaller, unaligned
|
|
auto l = w - i;
|
|
auto r = w + 8 * a;
|
|
co_await f.truncate(l);
|
|
BOOST_REQUIRE_EQUAL(l, (co_await f.stat()).st_size);
|
|
|
|
{
|
|
auto tmp = temporary_buffer<uint8_t>::aligned(a, align_up(l, a));
|
|
auto n = co_await f.dma_read(0, tmp.get_write(), tmp.size());
|
|
|
|
BOOST_REQUIRE_EQUAL(l, n);
|
|
BOOST_REQUIRE_EQUAL_COLLECTIONS(tmp.get(), tmp.get() + l, buf.get(), buf.get() + l);
|
|
|
|
auto k = align_down(l, a);
|
|
|
|
while (k > 0) {
|
|
n = co_await f.dma_read(0, tmp.get_write(), k);
|
|
|
|
BOOST_REQUIRE_EQUAL(k, n);
|
|
BOOST_REQUIRE_EQUAL_COLLECTIONS(tmp.get(), tmp.get() + k, buf.get(), buf.get() + k);
|
|
|
|
n = co_await f.dma_read(k, tmp.get_write(), tmp.size());
|
|
BOOST_REQUIRE_EQUAL(l - k, n);
|
|
BOOST_REQUIRE_EQUAL_COLLECTIONS(tmp.get(), tmp.get() + n, buf.get() + k, buf.get() + k + n);
|
|
|
|
k -= a;
|
|
}
|
|
}
|
|
|
|
co_await f.truncate(r);
|
|
BOOST_REQUIRE_EQUAL(r, (co_await f.stat()).st_size);
|
|
|
|
auto tmp = temporary_buffer<uint8_t>::aligned(a, align_up(r, a));
|
|
auto n = co_await f.dma_read(0, tmp.get_write(), tmp.size());
|
|
|
|
BOOST_REQUIRE_EQUAL(r, n);
|
|
BOOST_REQUIRE_EQUAL_COLLECTIONS(tmp.get(), tmp.get() + l, buf.get(), buf.get() + l);
|
|
|
|
while (l < r) {
|
|
BOOST_REQUIRE_EQUAL(tmp[l], 0);
|
|
++l;
|
|
}
|
|
}
|
|
|
|
co_await f.close();
|
|
}
|
|
|
|
// Reproducer for https://github.com/scylladb/scylladb/issues/22236
|
|
SEASTAR_TEST_CASE(test_read_from_padding) {
|
|
key_info kinfo {"AES/CBC/PKCSPadding", 128};
|
|
shared_ptr<symmetric_key> k = make_shared<symmetric_key>(kinfo);
|
|
testlog.info("Created symmetric key: info={} key={} ", k->info(), k->key());
|
|
|
|
size_t block_size;
|
|
size_t buf_size;
|
|
|
|
constexpr auto& filename = "encrypted_file";
|
|
const auto& filepath = dir.path() / filename;
|
|
|
|
testlog.info("Creating encrypted file {}", filepath.string());
|
|
{
|
|
auto [file, _] = co_await make_file(filename, open_flags::create | open_flags::wo, k);
|
|
auto ostream = co_await make_file_output_stream(file);
|
|
|
|
block_size = file.disk_write_dma_alignment();
|
|
buf_size = block_size - 1;
|
|
|
|
auto wbuf = seastar::temporary_buffer<char>::aligned(file.memory_dma_alignment(), buf_size);
|
|
co_await ostream.write(wbuf.get(), wbuf.size());
|
|
testlog.info("Wrote {} bytes to encrypted file {}", wbuf.size(), filepath.string());
|
|
|
|
co_await ostream.close();
|
|
testlog.info("Length of {}: {} bytes", filename, co_await file.size());
|
|
}
|
|
|
|
testlog.info("Testing DMA reads from padding area of file {}", filepath.string());
|
|
{
|
|
auto [file, _] = co_await make_file(filename, open_flags::ro, k);
|
|
|
|
// Triggering the bug requires reading from the padding area:
|
|
// `buf_size < read_pos < file.size()`
|
|
//
|
|
// For `dma_read()`, we have the additional requirement that `read_pos` must be aligned.
|
|
// For `dma_read_bulk()`, it doesn't have to.
|
|
uint64_t read_pos = block_size;
|
|
size_t read_len = block_size;
|
|
auto rbuf = seastar::temporary_buffer<char>::aligned(file.memory_dma_alignment(), read_len);
|
|
std::vector<iovec> iov {{static_cast<void*>(rbuf.get_write()), rbuf.size()}};
|
|
|
|
auto res = co_await file.dma_read_bulk<char>(read_pos, read_len);
|
|
BOOST_CHECK_MESSAGE(res.size() == 0, seastar::format(
|
|
"Bulk DMA read on pos {}, len {}: returned {} bytes instead of zero", read_pos, read_len, res.size()));
|
|
|
|
auto res_len = co_await file.dma_read(read_pos, iov);
|
|
BOOST_CHECK_MESSAGE(res_len == 0, seastar::format(
|
|
"IOV DMA read on pos {}, len {}: returned {} bytes instead of zero", read_pos, read_len, res_len));
|
|
|
|
res_len = co_await file.dma_read<char>(read_pos, rbuf.get_write(), read_len);
|
|
BOOST_CHECK_MESSAGE(res_len == 0, seastar::format(
|
|
"DMA read on pos {}, len {}: returned {} bytes instead of zero", read_pos, read_len, res_len));
|
|
|
|
co_await file.close();
|
|
}
|
|
}
|
|
|
|
namespace seastar {
|
|
std::ostream& operator<<(std::ostream& os, const temporary_buffer<char>& buf) {
|
|
return os << "temporary_buffer[size=" << buf.size() << ", data=" << std::string_view(buf.get(), buf.size()) << "]";
|
|
}
|
|
}
|
|
|
|
static future<> test_random_data_sink(std::vector<size_t> sizes) {
|
|
auto name = "test_rand_sink";
|
|
std::vector<temporary_buffer<char>> bufs, srcs;
|
|
|
|
auto [dst, k] = make_filename(name);
|
|
uint64_t total = 0;
|
|
std::exception_ptr ex = nullptr;
|
|
|
|
data_sink sink(make_encrypted_sink(create_memory_sink(bufs), k));
|
|
|
|
try {
|
|
for (size_t s : sizes) {
|
|
auto buf = generate_random<char>(s);
|
|
co_await sink.put(buf.clone()); // deep copy. encrypted sink uses "owned" data
|
|
total += buf.size();
|
|
srcs.emplace_back(std::move(buf));
|
|
}
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await sink.close();
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
|
|
{
|
|
auto os = co_await make_file_output_stream(co_await open_file_dma(dst, open_flags::wo|open_flags::create));
|
|
for (auto& buf : bufs) {
|
|
co_await os.write(buf.get(), buf.size());
|
|
}
|
|
co_await os.flush();
|
|
co_await os.close();
|
|
}
|
|
|
|
file f = make_encrypted_file(co_await open_file_dma(dst, open_flags::ro), k);
|
|
|
|
try {
|
|
auto file_size = co_await f.size();
|
|
|
|
BOOST_REQUIRE_EQUAL(file_size, total);
|
|
|
|
auto in = make_file_input_stream(std::move(f));
|
|
|
|
for (auto& src : srcs) {
|
|
auto tmp = co_await in.read_exactly(src.size());
|
|
BOOST_REQUIRE_EQUAL(tmp, src);
|
|
}
|
|
co_await in.close();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
if (f) {
|
|
co_await f.close();
|
|
}
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_sink_data_small) {
|
|
return test_random_data_sink({ 13 });
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_sink_data_smallish) {
|
|
return test_random_data_sink({ 4*1024, 4*1024, 1467 });
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_sink_data_medium) {
|
|
return test_random_data_sink({ 4*1024, 4*1024, 2*1024, 1457, 234, 999 });
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_sink_data_large) {
|
|
return test_random_data_sink({ 4096, 4096, 4096, 4096, 8192, 1232, 32, 4096, 134 });
|
|
}
|
|
|
|
static future<> test_random_data_source(std::vector<size_t> sizes) {
|
|
testlog.info("test_random_data_source with sizes: {} ({})", sizes, std::accumulate(sizes.begin(), sizes.end(), size_t(0), std::plus{}));
|
|
|
|
auto name = "test_rand_source";
|
|
std::vector<temporary_buffer<char>> bufs, srcs;
|
|
|
|
auto [dst, k] = make_filename(name);
|
|
using namespace std::chrono_literals;
|
|
std::exception_ptr ex = nullptr;
|
|
|
|
data_sink sink(make_encrypted_sink(create_memory_sink(bufs), k));
|
|
|
|
try {
|
|
for (size_t s : sizes) {
|
|
auto buf = generate_random<char>(s);
|
|
co_await sink.put(buf.clone()); // deep copy. encrypted sink uses "owned" data
|
|
srcs.emplace_back(std::move(buf));
|
|
}
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await sink.close();
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
|
|
{
|
|
auto os = co_await make_file_output_stream(co_await open_file_dma(dst, open_flags::truncate|open_flags::wo | open_flags::create));
|
|
for (auto& buf : bufs) {
|
|
co_await os.write(buf.get(), buf.size());
|
|
}
|
|
co_await os.flush();
|
|
co_await os.close();
|
|
}
|
|
|
|
auto f = co_await open_file_dma(dst, open_flags::ro);
|
|
testlog.info("file source {}", (co_await f.stat()).st_size);
|
|
|
|
auto source = make_file_data_source(std::move(f), file_input_stream_options{});
|
|
|
|
class random_chunk_source
|
|
: public data_source_impl
|
|
{
|
|
data_source _source;
|
|
temporary_buffer<char> _buf;
|
|
public:
|
|
random_chunk_source(data_source s)
|
|
: _source(std::move(s))
|
|
{}
|
|
future<temporary_buffer<char>> get() override {
|
|
if (!_buf.empty()) {
|
|
co_return std::exchange(_buf, temporary_buffer<char>{});
|
|
}
|
|
_buf = co_await _source.get();
|
|
if (_buf.empty()) {
|
|
co_return temporary_buffer<char>{};
|
|
}
|
|
auto n = tests::random::get_int(size_t(1), _buf.size());
|
|
auto res = _buf.share(0, n);
|
|
_buf.trim_front(n);
|
|
co_return res;
|
|
}
|
|
future<temporary_buffer<char>> skip(uint64_t n) override {
|
|
if (!_buf.empty()) {
|
|
auto m = std::min(n, _buf.size());
|
|
_buf.trim_front(m);
|
|
n -= m;
|
|
}
|
|
if (n) {
|
|
co_await _source.skip(n);
|
|
}
|
|
co_return temporary_buffer<char>{};
|
|
}
|
|
};
|
|
try {
|
|
auto encrypted_source = data_source(make_encrypted_source(data_source(std::make_unique<random_chunk_source>(std::move(source))), k));
|
|
temporary_buffer<char> unified_buff(std::accumulate(srcs.begin(), srcs.end(), 0, [](size_t acc, const auto& buf) { return acc + buf.size(); }));
|
|
size_t pos = 0;
|
|
for (const auto& src : srcs) {
|
|
memcpy(unified_buff.get_write() + pos, src.get(), src.size());
|
|
pos += src.size();
|
|
}
|
|
|
|
pos = 0;
|
|
while (auto read_buff = co_await encrypted_source.get()) {
|
|
auto rem = unified_buff.size() - pos;
|
|
BOOST_REQUIRE_LE(read_buff.size(), rem);
|
|
size_t size_to_compare = std::min(rem, read_buff.size());
|
|
auto v1 = std::string_view(read_buff.get(), size_to_compare);
|
|
auto v2 = std::string_view(unified_buff.get() + pos, size_to_compare);
|
|
BOOST_REQUIRE_EQUAL(v1, v2);
|
|
auto skip = unified_buff.size() - pos > 4113 ? 4097 : (unified_buff.size() - pos)/2;
|
|
co_await encrypted_source.skip(skip);
|
|
pos += size_to_compare + skip;
|
|
}
|
|
co_await encrypted_source.close();
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_data_source_simple) {
|
|
std::vector<size_t> sizes({3200, 13086, 12065, 200, 11959, 12159, 12852});
|
|
co_await test_random_data_source(sizes);
|
|
}
|
|
|
|
// Reproduces the production deadlock where encrypted SSTable component downloads
|
|
// got stuck during restore. The encrypted_data_source::get() caches a block in
|
|
// _next, then on the next call bypasses input_stream::read()'s _eof check and
|
|
// calls input_stream::read_exactly() — which does NOT check _eof when _buf is
|
|
// empty. This causes a second get() on the underlying source after EOS.
|
|
//
|
|
// In production the underlying source was chunked_download_source whose get()
|
|
// hung forever. Here we simulate it with a strict source that fails the test.
|
|
//
|
|
// The fix belongs in seastar's input_stream::read_exactly(): check _eof before
|
|
// calling _fd.get(), consistent with read(), read_up_to(), and consume().
|
|
static future<> test_encrypted_source_copy(size_t plaintext_size) {
|
|
testlog.info("test_encrypted_source_copy: plaintext_size={}", plaintext_size);
|
|
|
|
key_info info{"AES/CBC", 256};
|
|
auto k = ::make_shared<symmetric_key>(info);
|
|
|
|
// Step 1: Encrypt the plaintext into memory buffers
|
|
auto plaintext = generate_random<char>(plaintext_size);
|
|
std::vector<temporary_buffer<char>> encrypted_bufs;
|
|
{
|
|
data_sink sink(make_encrypted_sink(create_memory_sink(encrypted_bufs), k));
|
|
co_await sink.put(plaintext.clone());
|
|
co_await sink.close();
|
|
}
|
|
|
|
// Flatten encrypted buffers into a single contiguous buffer
|
|
size_t encrypted_total = 0;
|
|
for (const auto& b : encrypted_bufs) {
|
|
encrypted_total += b.size();
|
|
}
|
|
temporary_buffer<char> encrypted(encrypted_total);
|
|
size_t pos = 0;
|
|
for (const auto& b : encrypted_bufs) {
|
|
std::copy(b.begin(), b.end(), encrypted.get_write() + pos);
|
|
pos += b.size();
|
|
}
|
|
|
|
// Step 2: Create a data source from the encrypted data that fails on
|
|
// post-EOS get() — simulating a source like chunked_download_source
|
|
// that would hang forever in this situation.
|
|
class strict_memory_source final : public limiting_data_source_impl {
|
|
bool _eof = false;
|
|
public:
|
|
strict_memory_source(temporary_buffer<char> data, size_t chunk_size)
|
|
: limiting_data_source_impl(
|
|
data_source(std::make_unique<util::temporary_buffer_data_source>(std::move(data))),
|
|
chunk_size) {}
|
|
|
|
future<temporary_buffer<char>> get() override {
|
|
BOOST_REQUIRE_MESSAGE(!_eof,
|
|
"get() called on source after it already returned EOS — "
|
|
"this is the production deadlock: read_exactly() does not "
|
|
"check _eof before calling _fd.get()");
|
|
auto buf = co_await limiting_data_source_impl::get();
|
|
_eof = buf.empty();
|
|
co_return buf;
|
|
}
|
|
};
|
|
|
|
// Step 3: Wrap in encrypted_data_source and drain via consume() —
|
|
// the exact code path used by seastar::copy() which is what
|
|
// sstables_loader_helpers::download_sstable() calls.
|
|
// Try multiple chunk sizes to hit different alignment scenarios.
|
|
for (size_t chunk_size : {1ul, 7ul, 4096ul, 8192ul, encrypted_total, encrypted_total + 1}) {
|
|
if (chunk_size == 0) continue;
|
|
auto src = data_source(make_encrypted_source(
|
|
data_source(std::make_unique<strict_memory_source>(encrypted.clone(), chunk_size)), k));
|
|
auto in = input_stream<char>(std::move(src));
|
|
|
|
// consume() is what seastar::copy() uses internally. It calls
|
|
// encrypted_data_source::get() via _fd.get() until EOF.
|
|
size_t total_decrypted = 0;
|
|
co_await in.consume([&total_decrypted](temporary_buffer<char> buf) {
|
|
total_decrypted += buf.size();
|
|
return make_ready_future<consumption_result<char>>(continue_consuming{});
|
|
});
|
|
co_await in.close();
|
|
|
|
BOOST_REQUIRE_EQUAL(total_decrypted, plaintext_size);
|
|
}
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_source_copy_8k) {
|
|
co_await test_encrypted_source_copy(8192);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_source_copy_4k) {
|
|
co_await test_encrypted_source_copy(4096);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_source_copy_small) {
|
|
co_await test_encrypted_source_copy(100);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_source_copy_12k) {
|
|
co_await test_encrypted_source_copy(12288);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_source_copy_unaligned) {
|
|
co_await test_encrypted_source_copy(8193);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_source_copy_1byte) {
|
|
co_await test_encrypted_source_copy(1);
|
|
}
|
|
|
|
|
|
SEASTAR_TEST_CASE(test_encrypted_data_source_fuzzy) {
|
|
std::mt19937_64 rand_gen(std::random_device{}());
|
|
for (auto i = 0; i < 1000; ++i) {
|
|
std::uniform_int_distribution<uint16_t> rand_dist(1, 15);
|
|
std::vector<size_t> sizes(rand_dist(rand_gen));
|
|
for (auto& s : sizes) {
|
|
std::uniform_int_distribution<uint16_t> buff_sizes(1, 147*100);
|
|
s = buff_sizes(rand_gen);
|
|
}
|
|
co_await test_random_data_source(sizes);
|
|
}
|
|
|
|
co_return;
|
|
}
|