Merge 'sstables: make sstable_manager control the lifetime of the sstables it manages' from Avi Kivity

Currently, sstable_manager is used to create sstables, but it loses track
of them immediately afterwards. This series makes an sstable's life fully
contained within its sstable_manager.

The first practical impact (implemented in this series) is that file removal
stops being a background job; instead it is tracked by the sstable_manager,
so when the sstable_manager is stopped, you know that all of its sstable
activity is complete.

Later, we can make use of this to track the data size on disk, but this is not
implemented here.

Closes #7253

* github.com:scylladb/scylla:
  sstables: remove background_jobs(), await_background_jobs()
  sstables: make sstables_manager take charge of closing sstables
  test: test_env: hold sstables_manager with a unique_ptr
  test: drop test_sstable_manager
  test: sstables::test_env: take ownership of manager
  test: broken_sstable_test: prepare for asynchronously closed sstables_manager
  test: sstable_utils: close test_env after use
  test: sstable_test:  dont leak shared_sstable outside its test_env's lifetime
  test: sstables::test_env: close self in do_with helpers
  test: perf/perf_sstable.hh: prepare for asynchronously closed sstables_manager
  test: view_build_test: prepare for asynchronously closed sstables_manager
  test: sstable_resharding_test: prepare for asynchronously closed sstables_manager
  test: sstable_mutation_test: prepare for asynchronously closed sstables_manager
  test: sstable_directory_test: prepare for asynchronously closed sstables_manager
  test: sstable_datafile_test: prepare for asynchronously closed sstables_manager
  test: sstable_conforms_to_mutation_source_test: remove references to test_sstables_manager
  test: sstable_3_x_test: remove test_sstables_manager references
  test: schema_changes_test: drop use of test_sstables_manager
  mutation_test: adjust for column_family_test_config accepting an sstables_manager
  test: lib: sstable_utils: stop using test_sstables_manager
  test: sstables test_env: introduce manager() accessor
  test: sstables test_env: introduce do_with_async_sharded()
  test: sstables test_env: introduce  do_with_async_returning()
  test: lib: sstable test_env: prepare for life as a sharded<> service
  test: schema_changes_test: properly close sstables::test_env
  test: sstable_mutation_test: avoid constructing temporary sstables::test_env
  test: mutation_reader_test: avoid constructing temporary sstables::test_env
  test: sstable_3_x_test: avoid constructing temporary sstables::test_env
  test: lib: test_services: pass sstables_manager to column_family_test_config
  test: lib: sstables test_env: implement tests_env::manager()
  test: sstable_test: detemplate write_and_validate_sst()
  test: sstable_test_env: detemplate do_with_async()
  test: sstable_datafile_test: drop bad 'return'
  table: clear sstable set when stopping
  table: prevent table::stop() race with table::query()
  database: close sstable_manager:s
  sstables_manager: introduce a stub close()
  sstable_directory_test: fix threading confusion in make_sstable_directory_for*() functions
  test: sstable_datafile_test: reorder table stop in compaction_manager_test
  test: view_build_test: test_view_update_generator_register_semaphore_unit_leak: do not discard future in timer
  test: view_build_test: fix threading in test_view_update_generator_register_semaphore_unit_leak
  view: view_update_generator: drop references to sstables when stopping
This commit is contained in:
Nadav Har'El
2020-09-24 13:54:38 +03:00
26 changed files with 760 additions and 551 deletions

View File

@@ -1814,6 +1814,10 @@ database::stop() {
return _streaming_dirty_memory_manager.shutdown();
}).then([this] {
return _memtable_controller.shutdown();
}).then([this] {
return _user_sstables_manager->close();
}).then([this] {
return _system_sstables_manager->close();
});
}

View File

@@ -37,6 +37,13 @@ future<> view_update_generator::start() {
thread_attributes attr;
attr.sched_group = _db.get_streaming_scheduling_group();
_started = seastar::async(std::move(attr), [this]() mutable {
auto drop_sstable_references = defer([&] {
// Clear sstable references so sstables_manager::stop() doesn't hang.
vug_logger.info("leaving {} unstaged sstables unprocessed",
_sstables_to_move.size(), _sstables_with_tables.size());
_sstables_to_move.clear();
_sstables_with_tables.clear();
});
while (!_as.abort_requested()) {
if (_sstables_with_tables.empty()) {
_pending_sstables.wait().get();

View File

@@ -818,9 +818,6 @@ int main(int ac, char** av) {
return db.invoke_on_all([](auto& db) {
return db.stop();
});
}).then([] {
startlog.info("Shutting down database: waiting for background jobs...");
return sstables::await_background_jobs_on_all_shards();
}).get();
});
api::set_server_config(ctx).get();

View File

@@ -76,6 +76,7 @@
#include "sstables/sstables_manager.hh"
#include "utils/UUID_gen.hh"
#include "database.hh"
#include "sstables_manager.hh"
#include <boost/algorithm/string/predicate.hpp>
#include "tracing/traced_file.hh"
@@ -177,24 +178,6 @@ future<file> sstable::new_sstable_component_file(const io_error_handler& error_h
}
}
utils::phased_barrier& background_jobs() {
static thread_local utils::phased_barrier gate;
return gate;
}
future<> await_background_jobs() {
sstlog.debug("Waiting for background jobs");
return background_jobs().advance_and_await().finally([] {
sstlog.debug("Waiting done");
});
}
future<> await_background_jobs_on_all_shards() {
return smp::invoke_on_all([] {
return await_background_jobs();
});
}
std::unordered_map<sstable::version_types, sstring, enum_hash<sstable::version_types>> sstable::_version_string = {
{ sstable::version_types::ka , "ka" },
{ sstable::version_types::la , "la" },
@@ -2979,36 +2962,32 @@ int sstable::compare_by_max_timestamp(const sstable& other) const {
return (ts1 > ts2 ? 1 : (ts1 == ts2 ? 0 : -1));
}
sstable::~sstable() {
future<> sstable::close_files() {
auto index_closed = make_ready_future<>();
if (_index_file) {
// Registered as background job.
(void)_index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) {
index_closed = _index_file.close().handle_exception([me = shared_from_this()] (auto ep) {
sstlog.warn("sstable close index_file failed: {}", ep);
general_disk_error();
});
}
auto data_closed = make_ready_future<>();
if (_data_file) {
// Registered as background job.
(void)_data_file.close().handle_exception([save = _data_file, op = background_jobs().start()] (auto ep) {
data_closed = _data_file.close().handle_exception([me = shared_from_this()] (auto ep) {
sstlog.warn("sstable close data_file failed: {}", ep);
general_disk_error();
});
}
auto unlinked = make_ready_future<>();
if (_marked_for_deletion != mark_for_deletion::none) {
// We need to delete the on-disk files for this table. Since this is a
// destructor, we can't wait for this to finish, or return any errors,
// but just need to do our best. If a deletion fails for some reason we
// If a deletion fails for some reason we
// log and ignore this failure, because on startup we'll again try to
// clean up unused sstables, and because we'll never reuse the same
// generation number anyway.
sstlog.debug("Deleting sstable that is {}marked for deletion", _marked_for_deletion == mark_for_deletion::implicit ? "implicitly " : "");
try {
// FIXME:
// - Longer term fix is to hand off deletion of sstables to a manager that can
// deal with sstable marked to be deleted after the corresponding object is destructed.
(void)unlink().handle_exception(
[op = background_jobs().start()] (std::exception_ptr eptr) {
unlinked = unlink().handle_exception(
[me = shared_from_this()] (std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (...) {
@@ -3022,6 +3001,8 @@ sstable::~sstable() {
}
_on_closed(*this);
return when_all_succeed(std::move(index_closed), std::move(data_closed), std::move(unlinked)).discard_result();
}
static inline sstring dirname(const sstring& fname) {
@@ -3588,6 +3569,16 @@ sstable::sstable(schema_ptr schema,
, _manager(manager)
{
tracker.add(*this);
manager.add(this);
}
void sstable::unused() {
if (_active) {
_active = false;
_manager.deactivate(this);
} else {
_manager.remove(this);
}
}
future<file_writer> file_writer::make(file f, file_output_stream_options options, sstring filename) noexcept {
@@ -3628,7 +3619,7 @@ namespace seastar {
void
lw_shared_ptr_deleter<sstables::sstable>::dispose(sstables::sstable* s) {
delete s;
s->unused();
}

View File

@@ -52,7 +52,6 @@
#include "utils/disk-error-handler.hh"
#include "sstables/progress_monitor.hh"
#include "db/commitlog/replay_position.hh"
#include "utils/phased_barrier.hh"
#include "component_type.hh"
#include "sstable_version.hh"
#include "db/large_data_handler.hh"
@@ -102,6 +101,7 @@ requires ConsumeRowsContext<DataConsumeRowsContext>
class data_consume_context;
class index_reader;
class sstables_manager;
extern bool use_binary_search_in_promoted_index;
@@ -134,6 +134,7 @@ public:
using version_types = sstable_version_types;
using format_types = sstable_format_types;
using tracker_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
using manager_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
public:
sstable(schema_ptr schema,
sstring dir,
@@ -149,8 +150,6 @@ public:
sstable(const sstable&) = delete;
sstable(sstable&&) = delete;
~sstable();
// disk_read_range describes a byte ranges covering part of an sstable
// row that we need to read from disk. Usually this is the whole byte
// range covering a single sstable row, but in very large rows we might
@@ -463,6 +462,7 @@ public:
return touch_directory(std::move(name));
});
}
future<> close_files();
private:
size_t sstable_buffer_size;
@@ -541,6 +541,7 @@ private:
none = 0,
marked = 1
} _marked_for_deletion = mark_for_deletion::none;
bool _active = true;
gc_clock::time_point _now;
@@ -552,10 +553,13 @@ private:
sstables_stats _stats;
tracker_link_type _tracker_link;
manager_link_type _manager_link;
public:
const bool has_component(component_type f) const;
sstables_manager& manager() { return _manager; }
const sstables_manager& manager() const { return _manager; }
private:
void unused(); // Called when reference count drops to zero
future<file> open_file(component_type, open_flags, file_open_options = {}) noexcept;
template <component_type Type, typename T>
@@ -859,6 +863,7 @@ public:
friend class index_reader;
friend class sstable_writer;
friend class compaction;
friend class sstables_manager;
template <typename DataConsumeRowsContext>
friend data_consume_context<DataConsumeRowsContext>
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range, uint64_t);
@@ -868,18 +873,9 @@ public:
template <typename DataConsumeRowsContext>
friend data_consume_context<DataConsumeRowsContext>
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&);
friend void lw_shared_ptr_deleter<sstables::sstable>::dispose(sstable* s);
};
// Waits for all prior tasks started on current shard related to sstable management to finish.
//
// There may be asynchronous cleanup started from sstable destructor. Since we can't have blocking
// destructors in seastar, that cleanup is not waited for. It can be waited for using this function.
// It is also waited for when seastar exits.
future<> await_background_jobs();
// Invokes await_background_jobs() on all shards
future<> await_background_jobs_on_all_shards();
// When we compact sstables, we have to atomically instantiate the new
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
// and if A contained some data that was tombstoned by B, and if B was
@@ -930,8 +926,6 @@ public:
future<> init_metrics();
utils::phased_barrier& background_jobs();
class file_io_extension {
public:
virtual ~file_io_extension() {}

View File

@@ -35,6 +35,12 @@ sstables_manager::sstables_manager(
: _large_data_handler(large_data_handler), _db_config(dbcfg), _features(feat) {
}
sstables_manager::~sstables_manager() {
assert(_closing);
assert(_active.empty());
assert(_undergoing_close.empty());
}
shared_sstable sstables_manager::make_sstable(schema_ptr schema,
sstring dir,
int64_t generation,
@@ -60,4 +66,39 @@ sstable_writer_config sstables_manager::configure_writer() const {
return cfg;
}
void sstables_manager::add(sstable* sst) {
_active.push_back(*sst);
}
void sstables_manager::deactivate(sstable* sst) {
// At this point, sst has a reference count of zero, since we got here from
// lw_shared_ptr_deleter<sstables::sstable>::dispose().
_active.erase(_active.iterator_to(*sst));
_undergoing_close.push_back(*sst);
// guard against sstable::close_files() calling shared_from_this() and immediately destroying
// the result, which will dispose of the sstable recursively
auto ptr = sst->shared_from_this();
(void)sst->close_files().finally([ptr] {
// destruction of ptr will call maybe_done() and release close()
});
}
void sstables_manager::remove(sstable* sst) {
_undergoing_close.erase(_undergoing_close.iterator_to(*sst));
delete sst;
maybe_done();
}
void sstables_manager::maybe_done() {
if (_closing && _active.empty() && _undergoing_close.empty()) {
_done.set_value();
}
}
future<> sstables_manager::close() {
_closing = true;
maybe_done();
return _done.get_future();
}
} // namespace sstables

View File

@@ -30,9 +30,12 @@
#include "sstables/sstables.hh"
#include "sstables/shareable_components.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/sstables.hh"
#include "sstables/version.hh"
#include "sstables/component_type.hh"
#include <boost/intrusive/list.hpp>
namespace db {
class large_data_handler;
@@ -50,6 +53,10 @@ using shareable_components_ptr = lw_shared_ptr<shareable_components>;
static constexpr size_t default_sstable_buffer_size = 128 * 1024;
class sstables_manager {
using list_type = boost::intrusive::list<sstable,
boost::intrusive::member_hook<sstable, sstable::manager_link_type, &sstable::_manager_link>,
boost::intrusive::constant_time_size<false>>;
private:
db::large_data_handler& _large_data_handler;
const db::config& _db_config;
gms::feature_service& _features;
@@ -61,8 +68,13 @@ class sstables_manager {
// in the system table).
sstable_version_types _format = sstable_version_types::mc;
list_type _active;
list_type _undergoing_close;
bool _closing = false;
promise<> _done;
public:
explicit sstables_manager(db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat);
~sstables_manager();
// Constructs a shared sstable
shared_sstable make_sstable(schema_ptr schema,
@@ -80,10 +92,28 @@ public:
void set_format(sstable_version_types format) { _format = format; }
sstables::sstable::version_types get_highest_supported_format() const { return _format; }
// Wait until all sstables managed by this sstables_manager instance
// (previously created by make_sstable()) have been disposed of:
// - if they were marked for deletion, the files are deleted
// - in any case, the open file handles are closed
// - all memory resources are freed
//
// Note that close() will not complete until all references to all
// sstables have been destroyed.
future<> close();
private:
void add(sstable* sst);
// Transition the sstable to the "inactive" state. It has no
// visible references at this point, and only waits for its
// files to be deleted (if necessary) and closed.
void deactivate(sstable* sst);
void remove(sstable* sst);
void maybe_done();
private:
db::large_data_handler& get_large_data_handler() const {
return _large_data_handler;
}
friend class sstable;
};
} // namespace sstables

View File

@@ -836,7 +836,14 @@ table::stop() {
// Nest, instead of using when_all, so we don't lose any exceptions.
return _streaming_flush_gate.close();
}).then([this] {
return _sstable_deletion_gate.close();
return _sstable_deletion_gate.close().then([this] {
return get_row_cache().invalidate([this] {
_sstables = _compaction_strategy.make_sstable_set(_schema);
_sstables_staging.clear();
}).then([this] {
_cache.refresh_snapshot();
});
});
});
});
});
@@ -1961,13 +1968,16 @@ table::query(schema_ptr s,
query::result_memory_limiter& memory_limiter,
db::timeout_clock::time_point timeout,
query::querier_cache_context cache_ctx) {
_async_gate.enter();
auto leave = defer([&] { _async_gate.leave(); });
utils::latency_counter lc;
_stats.reads.set_latency(lc);
const auto short_read_allwoed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
auto f = opts.request == query::result_request::only_digest
? memory_limiter.new_digest_read(*cmd.max_result_size, short_read_allwoed) : memory_limiter.new_data_read(*cmd.max_result_size, short_read_allwoed);
return f.then([this, lc, s = std::move(s), &cmd, class_config, opts, &partition_ranges,
trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] (query::result_memory_accounter accounter) mutable {
trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx),
leave = std::move(leave)] (query::result_memory_accounter accounter) mutable {
auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, opts, partition_ranges, std::move(accounter));
auto& qs = *qs_ptr;
return do_until(std::bind(&query_state::done, &qs), [this, &qs, class_config, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] {
@@ -1977,11 +1987,12 @@ table::query(schema_ptr s,
}).then([qs_ptr = std::move(qs_ptr), &qs] {
return make_ready_future<lw_shared_ptr<query::result>>(
make_lw_shared<query::result>(qs.builder.build()));
}).finally([lc, this]() mutable {
}).finally([lc, this, leave = std::move(leave)]() mutable {
_stats.reads.mark(lc);
if (lc.is_start()) {
_stats.estimated_read.add(lc.latency());
}
// "leave" is destroyed here
});
});
}

View File

@@ -40,8 +40,8 @@ struct my_consumer {
static void broken_sst(sstring dir, unsigned long generation, schema_ptr s, sstring msg,
sstable_version_types version = la) {
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
try {
sstables::test_env env;
sstable_ptr sstp = env.reusable_sst(s, dir, generation, version).get0();
auto r = sstp->read_rows_flat(s, tests::make_permit());
r.consume(my_consumer{}, db::no_timeout).get();
@@ -49,6 +49,7 @@ static void broken_sst(sstring dir, unsigned long generation, schema_ptr s, sstr
} catch (malformed_sstable_exception& e) {
BOOST_REQUIRE_EQUAL(sstring(e.what()), msg);
}
}).get();
}
static void broken_sst(sstring dir, unsigned long generation, sstring msg) {
@@ -59,18 +60,19 @@ static void broken_sst(sstring dir, unsigned long generation, sstring msg) {
}
SEASTAR_THREAD_TEST_CASE(test_empty_index) {
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
auto s = schema_builder("test_ks", "test_table")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("val", int32_type)
.set_compressor_params(compression_parameters::no_compression())
.build();
sstables::test_env env;
sstable_ptr sstp = env.reusable_sst(s, "test/resource/sstables/empty_index", 36, sstable_version_types::mc).get0();
sstp->load().get();
auto fut = sstables::test(sstp).read_indexes();
BOOST_REQUIRE_EXCEPTION(fut.get(), malformed_sstable_exception, exception_predicate::message_equals(
"missing index entry in sstable test/resource/sstables/empty_index/mc-36-big-Index.db"));
}).get();
}
SEASTAR_THREAD_TEST_CASE(missing_column_in_schema) {

View File

@@ -548,14 +548,15 @@ SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping
}
struct sst_factory {
sstables::test_env env;
sstables::test_env& env;
schema_ptr s;
sstring path;
unsigned gen;
uint32_t level;
sst_factory(schema_ptr s, const sstring& path, unsigned gen, int level)
: s(s)
sst_factory(sstables::test_env& env, schema_ptr s, const sstring& path, unsigned gen, int level)
: env(env)
, s(s)
, path(path)
, gen(gen)
, level(level)
@@ -570,6 +571,7 @@ struct sst_factory {
};
SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) {
sstables::test_env::do_with_async([] (sstables::test_env& env) {
storage_service_for_tests ssft;
simple_schema s;
@@ -614,11 +616,11 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) {
unsigned gen{0};
std::vector<sstables::shared_sstable> sstable_list = {
make_sstable_containing(sst_factory(s.schema(), tmp.path().string(), ++gen, 0), std::move(sstable_level_0_0_mutations)),
make_sstable_containing(sst_factory(s.schema(), tmp.path().string(), ++gen, 1), std::move(sstable_level_1_0_mutations)),
make_sstable_containing(sst_factory(s.schema(), tmp.path().string(), ++gen, 1), std::move(sstable_level_1_1_mutations)),
make_sstable_containing(sst_factory(s.schema(), tmp.path().string(), ++gen, 2), std::move(sstable_level_2_0_mutations)),
make_sstable_containing(sst_factory(s.schema(), tmp.path().string(), ++gen, 2), std::move(sstable_level_2_1_mutations)),
make_sstable_containing(sst_factory(env, s.schema(), tmp.path().string(), ++gen, 0), std::move(sstable_level_0_0_mutations)),
make_sstable_containing(sst_factory(env, s.schema(), tmp.path().string(), ++gen, 1), std::move(sstable_level_1_0_mutations)),
make_sstable_containing(sst_factory(env, s.schema(), tmp.path().string(), ++gen, 1), std::move(sstable_level_1_1_mutations)),
make_sstable_containing(sst_factory(env, s.schema(), tmp.path().string(), ++gen, 2), std::move(sstable_level_2_0_mutations)),
make_sstable_containing(sst_factory(env, s.schema(), tmp.path().string(), ++gen, 2), std::move(sstable_level_2_1_mutations)),
};
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, {});
@@ -672,6 +674,7 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) {
.produces(expexted_mutation_4)
.produces(expexted_mutation_5)
.produces_end_of_stream();
}).get();
}
static mutation make_mutation_with_key(simple_schema& s, dht::decorated_key dk) {

View File

@@ -479,13 +479,13 @@ SEASTAR_THREAD_TEST_CASE(test_large_collection_serialization_exception_safety) {
}
SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
return seastar::async([] {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
storage_service_for_tests ssft;
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type);
auto cf_stats = make_lw_shared<::cf_stats>();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;
@@ -529,6 +529,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
}
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
return sstables::test_env::do_with([] (sstables::test_env& env) {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type)
@@ -536,7 +537,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
auto cf_stats = make_lw_shared<::cf_stats>();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.enable_disk_reads = true;
cfg.enable_disk_writes = true;
cfg.enable_cache = true;
@@ -606,16 +607,17 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
flushed.get();
});
}).then([cf_stats] {});
});
}
SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
return seastar::async([] {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type);
auto cf_stats = make_lw_shared<::cf_stats>();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;

View File

@@ -37,9 +37,9 @@ using namespace sstables;
using namespace std::chrono_literals;
SEASTAR_THREAD_TEST_CASE(test_schema_changes) {
sstables::test_env::do_with_async([] (sstables::test_env& env) {
auto dir = tmpdir();
storage_service_for_tests ssft;
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
int gen = 1;
std::map<std::tuple<sstables::sstable::version_types, schema_ptr>, std::tuple<shared_sstable, int>> cache;
@@ -50,7 +50,6 @@ SEASTAR_THREAD_TEST_CASE(test_schema_changes) {
shared_sstable created_with_base_schema;
shared_sstable created_with_changed_schema;
sstables::test_env env;
if (it == cache.end()) {
auto mt = make_lw_shared<memtable>(base);
for (auto& m : base_mutations) {
@@ -58,7 +57,7 @@ SEASTAR_THREAD_TEST_CASE(test_schema_changes) {
}
created_with_base_schema = env.make_sstable(base, dir.path().string(), gen, version, sstables::sstable::format_types::big);
created_with_base_schema->write_components(mt->make_flat_reader(base, tests::make_permit()), base_mutations.size(), base,
test_sstables_manager.configure_writer(), mt->get_encoding_stats()).get();
env.manager().configure_writer(), mt->get_encoding_stats()).get();
created_with_base_schema->load().get();
created_with_changed_schema = env.make_sstable(changed, dir.path().string(), gen, version, sstables::sstable::format_types::big);
@@ -88,4 +87,5 @@ SEASTAR_THREAD_TEST_CASE(test_schema_changes) {
mr.produces_end_of_stream();
}
});
}).get();
}

File diff suppressed because it is too large Load Diff

View File

@@ -51,13 +51,11 @@ void test_mutation_source(sstables::test_env& env, sstable_writer_config cfg, ss
SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
storage_service_for_tests ssft;
sstables::test_env env;
for (auto version : all_sstable_versions) {
for (auto index_block_size : {1, 128, 64*1024}) {
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = index_block_size;
test_mutation_source(env, cfg, version);
}

View File

@@ -1050,7 +1050,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
});
auto tmp = tmpdir();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string();
cfg.enable_commitlog = false;
cfg.enable_incremental_backups = false;
@@ -1097,15 +1097,15 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
BOOST_REQUIRE(cm->get_stats().completed_tasks == 1);
BOOST_REQUIRE(cm->get_stats().errors == 0);
// remove cf from compaction manager; this will wait for the
// ongoing compaction to finish.
cf->stop().get();
// expect sstables of cf to be compacted.
BOOST_REQUIRE(cf->sstables_count() == 1);
cf->stop().get();
});
}
SEASTAR_TEST_CASE(compact) {
return sstables::test_env::do_with([] (sstables::test_env& env) {
BOOST_REQUIRE(smp::count == 1);
constexpr int generation = 17;
// The "compaction" sstable was created with the following schema:
@@ -1125,7 +1125,7 @@ SEASTAR_TEST_CASE(compact) {
auto cm = make_lw_shared<compaction_manager>();
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto tracker = make_lw_shared<cache_tracker>();
auto cf = make_lw_shared<column_family>(s, column_family_test_config(), column_family::no_commitlog(), *cm, *cl_stats, *tracker);
auto cf = make_lw_shared<column_family>(s, column_family_test_config(env.manager()), column_family::no_commitlog(), *cm, *cl_stats, *tracker);
cf->mark_ready_for_writes();
return test_setup::do_with_tmp_directory([s, generation, cf, cm] (test_env& env, sstring tmpdir_path) {
@@ -1200,6 +1200,7 @@ SEASTAR_TEST_CASE(compact) {
});
});
}).finally([cl_stats, tracker] { });
});
// verify that the compacted sstable look like
}
@@ -1223,7 +1224,7 @@ static future<std::vector<unsigned long>> compact_sstables(test_env& env, sstrin
builder.set_min_compaction_threshold(4);
auto s = builder.build(schema_builder::compact_storage::no);
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
auto generations = make_lw_shared<std::vector<unsigned long>>(std::move(generations_to_compact));
auto sstables = make_lw_shared<std::vector<sstables::shared_sstable>>();
@@ -1754,8 +1755,8 @@ static bool sstable_overlaps(const lw_shared_ptr<column_family>& cf, int64_t gen
}
SEASTAR_TEST_CASE(leveled_01) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1792,11 +1793,12 @@ SEASTAR_TEST_CASE(leveled_01) {
BOOST_REQUIRE(gens.empty());
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(leveled_02) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1843,11 +1845,12 @@ SEASTAR_TEST_CASE(leveled_02) {
BOOST_REQUIRE(gens.empty());
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(leveled_03) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1898,11 +1901,12 @@ SEASTAR_TEST_CASE(leveled_03) {
BOOST_REQUIRE(gen_and_level.empty());
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(leveled_04) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1961,6 +1965,7 @@ SEASTAR_TEST_CASE(leveled_04) {
BOOST_REQUIRE(levels.empty());
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(leveled_05) {
@@ -1985,8 +1990,8 @@ SEASTAR_TEST_CASE(leveled_05) {
SEASTAR_TEST_CASE(leveled_06) {
// Test that we can compact a single L1 compaction into an empty L2.
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto max_sstable_size_in_mb = 1;
auto max_sstable_size_in_bytes = max_sstable_size_in_mb*1024*1024;
@@ -2013,11 +2018,12 @@ SEASTAR_TEST_CASE(leveled_06) {
BOOST_REQUIRE(sst->generation() == 1);
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(leveled_07) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
for (auto i = 0; i < leveled_manifest::MAX_COMPACTING_L0*2; i++) {
add_sstable_for_leveled_test(env, cf, i, 1024*1024, /*level*/0, "a", "a", i /* max timestamp */);
@@ -2036,11 +2042,12 @@ SEASTAR_TEST_CASE(leveled_07) {
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(leveled_invariant_fix) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto sstables_no = cf.schema()->max_compaction_threshold();
auto key_and_token_pair = token_generation_for_current_shard(sstables_no);
@@ -2071,16 +2078,17 @@ SEASTAR_TEST_CASE(leveled_invariant_fix) {
}));
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(leveled_stcs_on_L0) {
test_env env;
return test_env::do_with([] (test_env& env) {
schema_builder builder(make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
builder.set_min_compaction_threshold(4);
auto s = builder.build(schema_builder::compact_storage::no);
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
auto key_and_token_pair = token_generation_for_current_shard(1);
auto sstable_max_size_in_mb = 1;
@@ -2119,11 +2127,12 @@ SEASTAR_TEST_CASE(leveled_stcs_on_L0) {
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(overlapping_starved_sstables_test) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto key_and_token_pair = token_generation_for_current_shard(5);
auto min_key = key_and_token_pair[0].first;
@@ -2151,11 +2160,12 @@ SEASTAR_TEST_CASE(overlapping_starved_sstables_test) {
BOOST_REQUIRE(candidate.sstables.size() == 3);
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(check_overlapping) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto key_and_token_pair = token_generation_for_current_shard(4);
auto min_key = key_and_token_pair[0].first;
@@ -2175,6 +2185,7 @@ SEASTAR_TEST_CASE(check_overlapping) {
BOOST_REQUIRE(overlapping_sstables.front()->generation() == 4);
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(check_read_indexes) {
@@ -2218,7 +2229,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
};
auto compact = [&, s] (std::vector<shared_sstable> all, std::vector<shared_sstable> to_compact) -> std::vector<shared_sstable> {
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
for (auto&& sst : all) {
column_family_test(cf).add_sstable(sst);
}
@@ -2415,7 +2426,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
// e blob
//);
return test_env::do_with_async([] (test_env& env) {
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
auto builder = schema_builder("test", "test_multi_schema")
.with_column("a", int32_type, column_kind::partition_key)
@@ -2479,7 +2490,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
new_tables->emplace_back(sst);
return sst;
};
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
std::vector<shared_sstable> sstables;
sstables.push_back(std::move(sstp));
@@ -2836,7 +2847,7 @@ SEASTAR_TEST_CASE(test_sstable_max_local_deletion_time_2) {
builder.with_column("c1", utf8_type, column_kind::clustering_key);
builder.with_column("r1", utf8_type);
schema_ptr s = builder.build(schema_builder::compact_storage::no);
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
auto mt = make_lw_shared<memtable>(s);
auto now = gc_clock::now();
int32_t last_expiry = 0;
@@ -2889,7 +2900,7 @@ static stats_metadata build_stats(int64_t min_timestamp, int64_t max_timestamp,
}
SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
test_env env;
return test_env::do_with([] (test_env& env) {
auto key_and_token_pair = token_generation_for_current_shard(4);
auto min_key = key_and_token_pair[0].first;
auto max_key = key_and_token_pair[key_and_token_pair.size()-1].first;
@@ -2901,7 +2912,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
auto t4 = gc_clock::from_time_t(30).time_since_epoch().count();
{
column_family_for_tests cf;
column_family_for_tests cf(env.manager());
auto sst1 = add_sstable_for_overlapping_test(env, cf, /*gen*/1, min_key, key_and_token_pair[1].first, build_stats(t0, t1, t1));
auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t0, t1, std::numeric_limits<int32_t>::max()));
auto sst3 = add_sstable_for_overlapping_test(env, cf, /*gen*/3, min_key, max_key, build_stats(t3, t4, std::numeric_limits<int32_t>::max()));
@@ -2911,7 +2922,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
}
{
column_family_for_tests cf;
column_family_for_tests cf(env.manager());
auto sst1 = add_sstable_for_overlapping_test(env, cf, /*gen*/1, min_key, key_and_token_pair[1].first, build_stats(t0, t1, t1));
auto sst2 = add_sstable_for_overlapping_test(env, cf, /*gen*/2, min_key, key_and_token_pair[2].first, build_stats(t2, t3, std::numeric_limits<int32_t>::max()));
@@ -2924,6 +2935,7 @@ SEASTAR_TEST_CASE(get_fully_expired_sstables_test) {
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(compaction_with_fully_expired_table) {
@@ -2953,7 +2965,7 @@ SEASTAR_TEST_CASE(compaction_with_fully_expired_table) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 1).get0();
column_family_for_tests cf;
column_family_for_tests cf(env.manager());
auto ssts = std::vector<shared_sstable>{ sst };
auto expired = get_fully_expired_sstables(*cf, ssts, gc_clock::now());
@@ -2976,12 +2988,12 @@ SEASTAR_TEST_CASE(compaction_with_fully_expired_table) {
}
SEASTAR_TEST_CASE(basic_date_tiered_strategy_test) {
return test_env::do_with([] (test_env& env) {
schema_builder builder(make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
builder.set_min_compaction_threshold(4);
auto s = builder.build(schema_builder::compact_storage::no);
test_env env;
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
std::vector<sstables::shared_sstable> candidates;
int min_threshold = cf->schema()->min_compaction_threshold();
@@ -3010,15 +3022,16 @@ SEASTAR_TEST_CASE(basic_date_tiered_strategy_test) {
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
return test_env::do_with([] (test_env& env) {
schema_builder builder(make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
builder.set_min_compaction_threshold(4);
auto s = builder.build(schema_builder::compact_storage::no);
test_env env;
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
// deterministic timestamp for Fri, 01 Jan 2016 00:00:00 GMT.
auto tp = db_clock::from_time_t(1451606400);
@@ -3063,6 +3076,7 @@ SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
BOOST_REQUIRE(!gens.contains(min_threshold + 2));
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
@@ -3089,6 +3103,7 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
}
SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
return test_env::do_with([] (test_env& env) {
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
auto ts_in_ms = std::chrono::milliseconds(ts);
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);
@@ -3096,7 +3111,6 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
auto s = schema_builder("tests", "time_window_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
test_env env;
{
std::map<sstring, sstring> opts = { { time_window_compaction_strategy_options::TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS" }, };
@@ -3124,6 +3138,7 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
BOOST_REQUIRE(ret.second == expected);
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
@@ -3370,7 +3385,7 @@ static void check_min_max_column_names(const sstable_ptr& sst, std::vector<bytes
}
}
static void test_min_max_clustering_key(schema_ptr s, std::vector<bytes> exploded_pk, std::vector<std::vector<bytes>> exploded_cks,
static void test_min_max_clustering_key(test_env& env, schema_ptr s, std::vector<bytes> exploded_pk, std::vector<std::vector<bytes>> exploded_cks,
std::vector<bytes> min_components, std::vector<bytes> max_components, sstable_version_types version, bool remove = false) {
auto mt = make_lw_shared<memtable>(s);
auto insert_data = [&mt, &s] (std::vector<bytes>& exploded_pk, std::vector<bytes>&& exploded_ck) {
@@ -3404,7 +3419,6 @@ static void test_min_max_clustering_key(schema_ptr s, std::vector<bytes> explode
}
}
}
test_env env;
auto tmp = tmpdir();
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
@@ -3423,7 +3437,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(s, {"key1"}, {{"a", "b"},
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "b"},
{"a", "c"}}, {"a", "b"}, {"a", "c"}, version);
}
{
@@ -3434,7 +3448,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(s, {"key1"}, {{"a", "b"},
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "b"},
{"a", "c"}}, {"a", "b"}, {"a", "c"}, version);
}
{
@@ -3445,7 +3459,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
test_min_max_clustering_key(env, s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
@@ -3456,7 +3470,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} with compact storage version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
test_min_max_clustering_key(env, s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
@@ -3466,7 +3480,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"z\"}} max={{\"a\", \"a\"}} version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"a", "a"}, {"a", "z"}}, {"a", "z"}, {"a", "a"}, version);
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "a"}, {"a", "z"}}, {"a", "z"}, {"a", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
@@ -3476,7 +3490,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"a\"}} max={{\"b\", \"z\"}} version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"b", "z"}, {"a", "a"}}, {"a", "a"}, {"b", "z"}, version);
test_min_max_clustering_key(env, s, {"key1"}, {{"b", "z"}, {"a", "a"}}, {"a", "a"}, {"b", "z"}, version);
}
{
auto s = schema_builder("ks", "cf")
@@ -3484,7 +3498,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(s, {"key1"}, {{"a"},
test_min_max_clustering_key(env, s, {"key1"}, {{"a"},
{"z"}}, {"a"}, {"z"}, version);
}
{
@@ -3493,7 +3507,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(s, {"key1"}, {{"a"},
test_min_max_clustering_key(env, s, {"key1"}, {{"a"},
{"z"}}, {"a"}, {"z"}, version, true);
}
{
@@ -3501,7 +3515,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(s, {"key1"}, {}, {}, {}, version);
test_min_max_clustering_key(env, s, {"key1"}, {}, {}, {}, version);
}
if (version >= sstable_version_types::mc) {
{
@@ -3513,7 +3527,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\"}} max={{\"a\"}} with compact storage version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"a", "z"}, {"a"}}, {"a"}, {"a"}, version);
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "z"}, {"a"}}, {"a"}, {"a"}, version);
}
}
}
@@ -3529,7 +3543,7 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test_2) {
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
auto tmp = tmpdir();
auto mt = make_lw_shared<memtable>(s);
const column_definition &r1_col = *s->get_column_definition("r1");
@@ -4307,7 +4321,7 @@ SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) {
}
tmpdir dir;
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 100;
auto mut = mutation(table.schema(), table.make_pkey("key"));
for (auto&& mf : fragments) {
@@ -4361,7 +4375,7 @@ SEASTAR_TEST_CASE(test_skipping_using_index) {
std::sort(partitions.begin(), partitions.end(), mutation_decorated_key_less_comparator());
tmpdir dir;
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1; // So that every fragment is indexed
auto sst = make_sstable_easy(env, dir.path(), flat_mutation_reader_from_mutations(partitions), cfg, version);
@@ -4488,8 +4502,8 @@ SEASTAR_TEST_CASE(test_unknown_component) {
}
SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) {
test_env env;
column_family_for_tests cf;
return test_env::do_with([] (test_env& env) {
column_family_for_tests cf(env.manager());
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, cf.schema()->compaction_strategy_options());
std::vector<sstables::shared_sstable> candidates;
@@ -4503,10 +4517,11 @@ SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) {
auto desc = cs.get_sstables_for_compaction(*cf, std::move(candidates));
BOOST_REQUIRE(desc.sstables.size() == size_t(max_threshold));
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
test_env env;
return test_env::do_with([] (test_env& env) {
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
@@ -4566,10 +4581,11 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(sstable_set_erase) {
test_env env;
return test_env::do_with([] (test_env& env) {
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
auto key_and_token_pair = token_generation_for_current_shard(1);
@@ -4624,6 +4640,7 @@ SEASTAR_TEST_CASE(sstable_set_erase) {
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(sstable_tombstone_histogram_test) {
@@ -4729,7 +4746,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
// Asserts that two keys are equal to within a positive delta
BOOST_REQUIRE(std::fabs(sst->estimate_droppable_tombstone_ratio(gc_before) - expired) <= 0.1);
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
auto creator = [&, gen = make_lw_shared<unsigned>(2)] {
auto sst = env.make_sstable(s, tmp.path().string(), (*gen)++, la, big);
return sst;
@@ -5002,7 +5019,7 @@ SEASTAR_TEST_CASE(compaction_correctness_with_partitioned_sstable_set) {
auto compact = [&, s] (std::vector<shared_sstable> all) -> std::vector<shared_sstable> {
// NEEDED for partitioned_sstable_set to actually have an effect
std::for_each(all.begin(), all.end(), [] (auto& sst) { sst->set_sstable_level(1); });
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
return compact_sstables(sstables::compaction_descriptor(std::move(all), cf->get_sstable_set(), default_priority_class(), 0, 0 /*std::numeric_limits<uint64_t>::max()*/),
*cf, sst_gen).get0().new_sstables;
};
@@ -5236,7 +5253,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
auto sst = make_sstable_containing(sst_gen, mutations);
auto run_identifier = sst->run_identifier();
auto cf = make_lw_shared<column_family>(s, column_family_test_config(), column_family::no_commitlog(),
auto cf = make_lw_shared<column_family>(s, column_family_test_config(env.manager()), column_family::no_commitlog(),
db.get_compaction_manager(), cl_stats, db.row_cache_tracker());
cf->mark_ready_for_writes();
cf->start();
@@ -5290,7 +5307,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
auto local_keys = make_local_keys(3, schema);
auto config = test_sstables_manager.configure_writer();
auto config = env.manager().configure_writer();
auto writer = sst->get_writer(*schema, local_keys.size(), config, encoding_stats{});
auto make_static_row = [&, schema, ts] {
@@ -5374,7 +5391,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
testlog.info("Loaded sstable {}", sst->get_filename());
auto cfg = column_family_test_config();
auto cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string();
auto table = make_lw_shared<column_family>(schema, cfg, column_family::no_commitlog(),
db.get_compaction_manager(), cl_stats, db.row_cache_tracker());
@@ -5614,7 +5631,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
auto tmp = tmpdir();
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.run_identifier = utils::make_random_uuid();
auto sst = make_sstable_easy(env, tmp.path(), flat_mutation_reader_from_mutations({ std::move(mut) }), cfg, la);
@@ -5640,7 +5657,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
auto cm = make_lw_shared<compaction_manager>();
auto tracker = make_lw_shared<cache_tracker>();
auto cf = make_lw_shared<column_family>(s, column_family_test_config(), column_family::no_commitlog(), *cm, cl_stats, *tracker);
auto cf = make_lw_shared<column_family>(s, column_family_test_config(env.manager()), column_family::no_commitlog(), *cm, cl_stats, *tracker);
cf->mark_ready_for_writes();
cf->start();
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
@@ -5779,7 +5796,7 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
sst2->set_sstable_level(3);
auto candidates = std::vector<sstables::shared_sstable>({ sst, sst2 });
column_family_for_tests cf;
column_family_for_tests cf(env.manager());
{
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, cf.schema()->compaction_strategy_options());
@@ -5840,7 +5857,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
return sst;
};
column_family_for_tests cf(s);
column_family_for_tests cf(env.manager(), s);
cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled);
{
@@ -5954,7 +5971,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
auto cm = make_lw_shared<compaction_manager>();
cm->enable();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string();
cfg.enable_commitlog = false;
cfg.enable_incremental_backups = false;
@@ -5968,7 +5985,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
mutation mut(s, partition_key::from_exploded(*s, {to_bytes("alpha")}));
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
sstable_writer_config sst_cfg = env.manager().configure_writer();
sst_cfg.run_identifier = partial_sstable_run_identifier;
auto partial_sstable_run_sst = make_sstable_easy(env, tmp.path(), flat_mutation_reader_from_mutations({ std::move(mut) }),
sst_cfg, la, 1);
@@ -6021,9 +6038,9 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
shared_sstable& _sst;
sstable_writer _writer;
public:
explicit compacting_sstable_writer_test(const schema_ptr& s, shared_sstable& sst)
explicit compacting_sstable_writer_test(const schema_ptr& s, shared_sstable& sst, sstables_manager& manager)
: _sst(sst),
_writer(sst->get_writer(*s, 1, test_sstables_manager.configure_writer(),
_writer(sst->get_writer(*s, 1, manager.configure_writer(),
encoding_stats{}, service::get_local_compaction_priority())) {}
void consume_new_partition(const dht::decorated_key& dk) { _writer.consume_new_partition(dk); }
@@ -6051,8 +6068,8 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
auto non_purged = sst_gen();
auto purged_only = sst_gen();
auto cr = compacting_sstable_writer_test(s, non_purged);
auto purged_cr = compacting_sstable_writer_test(s, purged_only);
auto cr = compacting_sstable_writer_test(s, non_purged, env.manager());
auto purged_cr = compacting_sstable_writer_test(s, purged_only, env.manager());
auto gc_now = gc_clock::now();
gc_before = gc_now - s->gc_grace_seconds();
@@ -6223,7 +6240,7 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
forward_jump_clocks(std::chrono::seconds(ttl));
auto cm = make_lw_shared<compaction_manager>();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string();
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
@@ -6331,7 +6348,7 @@ SEASTAR_TEST_CASE(twcs_major_compaction_test) {
auto mut4 = make_insert(1ms);
auto cm = make_lw_shared<compaction_manager>();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string();
cfg.enable_disk_writes = true;
cfg.enable_commitlog = false;
@@ -6369,7 +6386,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
.build();
auto tmp = tmpdir();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string();
cfg.enable_commitlog = false;
cfg.enable_disk_writes = true;
@@ -6472,7 +6489,7 @@ SEASTAR_TEST_CASE(test_bug_6472) {
};
auto cm = make_lw_shared<compaction_manager>();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmpdir_path;
cfg.enable_disk_writes = true;
cfg.enable_commitlog = false;
@@ -6515,7 +6532,7 @@ SEASTAR_TEST_CASE(test_bug_6472) {
}
SEASTAR_TEST_CASE(sstable_needs_cleanup_test) {
test_env env;
return test_env::do_with([] (test_env& env) {
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
@@ -6557,6 +6574,7 @@ SEASTAR_TEST_CASE(sstable_needs_cleanup_test) {
}
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
@@ -6603,7 +6621,7 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
};
auto cm = make_lw_shared<compaction_manager>();
column_family::config cfg = column_family_test_config();
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmpdir_path;
cfg.enable_disk_writes = true;
cfg.enable_commitlog = false;
@@ -6648,7 +6666,7 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
auto mr = flat_mutation_reader_from_mutations({mut});
auto sst = env.make_sstable(s, tmpdir_path, 0, version, big);
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
sst->write_components(std::move(mr), 0, s, cfg, encoding_stats{}).get();
sst->load().get();

View File

@@ -56,21 +56,21 @@ schema_ptr test_table_schema() {
using namespace sstables;
future<sstables::shared_sstable>
sstables::shared_sstable
make_sstable_for_this_shard(std::function<sstables::shared_sstable()> sst_factory) {
auto s = test_table_schema();
auto key_token_pair = token_generation_for_shard(1, this_shard_id(), 12);
auto key = partition_key::from_exploded(*s, {to_bytes(key_token_pair[0].first)});
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0));
return make_ready_future<sstables::shared_sstable>(make_sstable_containing(sst_factory, {m}));
return make_sstable_containing(sst_factory, {m});
}
/// Create a shared SSTable belonging to all shards for the following schema: "create table cf (p text PRIMARY KEY, c int)"
///
/// Arguments passed to the function are passed to table::make_sstable
template <typename... Args>
future<sstables::shared_sstable>
sstables::shared_sstable
make_sstable_for_all_shards(database& db, table& table, fs::path sstdir, int64_t generation, Args&&... args) {
// Unlike the previous helper, we'll assume we're in a thread here. It's less flexible
// but the users are usually in a thread, and rewrite_toc_without_scylla_component requires
@@ -92,15 +92,24 @@ make_sstable_for_all_shards(database& db, table& table, fs::path sstdir, int64_t
// it came from Cassandra
sstables::test(sst).remove_component(sstables::component_type::Scylla).get();
sstables::test(sst).rewrite_toc_without_scylla_component();
return make_ready_future<sstables::shared_sstable>(sst);
return sst;
}
sstables::shared_sstable sstable_from_existing_file(fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return test_sstables_manager.make_sstable(test_table_schema(), dir.native(), gen, v, f, gc_clock::now(), default_io_error_handler_gen(), default_sstable_buffer_size);
}
class sstable_from_existing_file {
std::function<sstables::sstables_manager* ()> _get_mgr;
public:
explicit sstable_from_existing_file(sstables::test_env& env) : _get_mgr([m = &env.manager()] { return m; }) {}
// This variant this transportable across shards
explicit sstable_from_existing_file(sharded<sstables::test_env>& env) : _get_mgr([s = &env] { return &s->local().manager(); }) {}
// This variant this transportable across shards
explicit sstable_from_existing_file(cql_test_env& env) : _get_mgr([&env] { return &env.db().local().get_user_sstables_manager(); }) {}
sstables::shared_sstable operator()(fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) const {
return _get_mgr()->make_sstable(test_table_schema(), dir.native(), gen, v, f, gc_clock::now(), default_io_error_handler_gen(), default_sstable_buffer_size);
}
};
sstables::shared_sstable new_sstable(fs::path dir, int64_t gen) {
return test_sstables_manager.make_sstable(test_table_schema(), dir.native(), gen,
sstables::shared_sstable new_sstable(sstables::test_env& env, fs::path dir, int64_t gen) {
return env.manager().make_sstable(test_table_schema(), dir.native(), gen,
sstables::sstable_version_types::mc, sstables::sstable_format_types::big,
gc_clock::now(), default_io_error_handler_gen(), default_sstable_buffer_size);
}
@@ -115,6 +124,7 @@ highest_generation_seen(sharded<sstables::sstable_directory>& dir) {
}
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) {
sstables::test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
// Write a manifest file to make sure it's ignored
@@ -128,7 +138,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_simple_empty_directory_sca
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
@@ -138,12 +148,14 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_simple_empty_directory_sca
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
// No generation found on empty directory.
BOOST_REQUIRE_EQUAL(max_generation_seen, 0);
}).get();
}
// Test unrecoverable SSTable: missing a file that is expected in the TOC.
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) {
sstables::test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
// Now there is one sstable to the upload directory, but it is incomplete and one component is missing.
// We should fail validation and leave the directory untouched
@@ -155,7 +167,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables)
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
@@ -163,12 +175,14 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables)
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
}).get();
}
// Test always-benign incomplete SSTable: temporaryTOC found
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_temporary_toc) {
sstables::test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
sharded<sstable_directory> sstdir;
@@ -177,7 +191,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_temporary_toc) {
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
@@ -185,13 +199,15 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_temporary_toc) {
auto expect_ok = distributed_loader::process_sstable_dir(sstdir);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
}).get();
}
// Test the absence of TOC. Behavior is controllable by a flag
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_missing_toc) {
sstables::test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
remove_file(sst->filename(sstables::component_type::TOC)).get();
sharded<sstable_directory> sstdir_fatal;
@@ -200,7 +216,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_missing_toc) {
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop_fatal = defer([&sstdir_fatal] {
sstdir_fatal.stop().get();
@@ -215,7 +231,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_missing_toc) {
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop_ok = defer([&sstdir_ok] {
sstdir_ok.stop().get();
@@ -223,15 +239,17 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_missing_toc) {
auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
}).get();
}
// Test the presence of TemporaryStatistics. If the old Statistics file is around
// this is benign and we'll just delete it and move on. If the old Statistics file
// is not around (but mentioned in the TOC), then this is an error.
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
sstables::test_env::do_with_sharded_async([] (sharded<test_env>& env) {
auto dir = tmpdir();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 1)).get0();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 1));
auto tempstr = sst->filename(dir.path().native(), component_type::TemporaryStatistics);
auto f = open_file_dma(tempstr, open_flags::rw | open_flags::create | open_flags::truncate).get0();
f.close().get();
@@ -243,7 +261,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop_ok= defer([&sstdir_ok] {
sstdir_ok.stop().get();
@@ -264,7 +282,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop_fatal = defer([&sstdir_fatal] {
sstdir_fatal.stop().get();
@@ -272,13 +290,15 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
}).get();
}
// Test that we see the right generation during the scan. Temporary files are skipped
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
sstables::test_env::do_with_sharded_async([] (sharded<test_env>& env) {
auto dir = tmpdir();
make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 3333)).get0();
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, dir.path(), 6666)).get0();
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 3333));
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 6666));
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
sharded<sstable_directory> sstdir;
@@ -287,7 +307,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
@@ -296,6 +316,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
distributed_loader::process_sstable_dir(sstdir).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
BOOST_REQUIRE_EQUAL(max_generation_seen, 3333);
}).get();
}
future<> verify_that_all_sstables_are_local(sharded<sstable_directory>& sstdir, unsigned expected_sstables) {
@@ -318,13 +339,14 @@ future<> verify_that_all_sstables_are_local(sharded<sstable_directory>& sstdir,
// Test that all SSTables are seen as unshared, if the generation numbers match what their
// shard-assignments expect
SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_generations) {
sstables::test_env::do_with_sharded_async([] (sharded<test_env>& env) {
auto dir = tmpdir();
for (shard_id i = 0; i < smp::count; ++i) {
smp::submit_to(i, [dir = dir.path(), i] {
env.invoke_on(i, [dir = dir.path(), i] (sstables::test_env& env) {
// this is why it is annoying for the internal functions in the test infrastructure to
// assume threaded execution
return seastar::async([dir, i] {
make_sstable_for_this_shard(std::bind(new_sstable, dir, i)).get0();
return seastar::async([dir, i, &env] {
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir, i));
});
}).get();
}
@@ -335,7 +357,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_gene
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
@@ -343,18 +365,20 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_gene
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
}).get();
}
// Test that all SSTables are seen as unshared, even if the generation numbers do not match what their
// shard-assignments expect
SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_generations) {
sstables::test_env::do_with_sharded_async([] (sharded<test_env>& env) {
auto dir = tmpdir();
for (shard_id i = 0; i < smp::count; ++i) {
smp::submit_to(i, [dir = dir.path(), i] {
env.invoke_on(i, [dir = dir.path(), i] (sstables::test_env& env) {
// this is why it is annoying for the internal functions in the test infrastructure to
// assume threaded execution
return seastar::async([dir, i] {
make_sstable_for_this_shard(std::bind(new_sstable, dir, i + 1)).get0();
return seastar::async([dir, i, &env] {
make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir, i + 1));
});
}).get();
}
@@ -365,7 +389,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_ge
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
@@ -373,6 +397,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_ge
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
}).get();
}
// Test that the sstable_dir object can keep the table alive against a drop
@@ -390,7 +415,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) {
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
&sstable_from_existing_file).get();
sstable_from_existing_file(e)).get();
// stop cleanly in case we fail early for unexpected reasons
auto stop = defer([&sstdir] {
@@ -440,7 +465,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
unsigned num_sstables = 10 * smp::count;
auto generation = 0;
for (unsigned nr = 0; nr < num_sstables; ++nr) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get();
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big);
}
sharded<sstable_directory> sstdir;
@@ -493,7 +518,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
unsigned num_sstables = 10 * smp::count;
auto generation = 0;
for (unsigned nr = 0; nr < num_sstables; ++nr) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++ * smp::count, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get();
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++ * smp::count, sstables::sstable_version_types::mc, sstables::sstable::format_types::big);
}
sharded<sstable_directory> sstdir;
@@ -546,7 +571,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
unsigned num_sstables = (cf.schema()->max_compaction_threshold() + 1) * smp::count;
auto generation = 0;
for (unsigned nr = 0; nr < num_sstables; ++nr) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big).get();
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big);
}
sharded<sstable_directory> sstdir;

View File

@@ -52,8 +52,7 @@ using namespace sstables;
using namespace std::chrono_literals;
SEASTAR_THREAD_TEST_CASE(nonexistent_key) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
env.reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([] (auto sstp) {
return do_with(make_dkey(uncompressed_schema(), "invalid_key"), [sstp] (auto& key) {
auto s = uncompressed_schema();
@@ -64,6 +63,7 @@ SEASTAR_THREAD_TEST_CASE(nonexistent_key) {
});
});
}).get();
}).get();
}
future<> test_no_clustered(sstables::test_env& env, bytes&& key, std::unordered_map<bytes, data_value> &&map) {
@@ -90,27 +90,27 @@ future<> test_no_clustered(sstables::test_env& env, bytes&& key, std::unordered_
}
SEASTAR_THREAD_TEST_CASE(uncompressed_1) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
test_no_clustered(env, "vinna", {{ "col1", to_sstring("daughter") }, { "col2", 3 }}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(uncompressed_2) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
test_no_clustered(env, "gustaf", {{ "col1", to_sstring("son") }, { "col2", 0 }}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(uncompressed_3) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
test_no_clustered(env, "isak", {{ "col1", to_sstring("son") }, { "col2", 1 }}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(uncompressed_4) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
test_no_clustered(env, "finna", {{ "col1", to_sstring("daughter") }, { "col2", 2 }}).get();
}).get();
}
/*
@@ -161,8 +161,7 @@ inline auto clustered_row(mutation& mutation, const schema& s, std::vector<bytes
}
SEASTAR_THREAD_TEST_CASE(complex_sst1_k1) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
generate_clustered<1>(env, "key1").then([] (auto&& mutation) {
auto s = complex_schema();
@@ -189,11 +188,11 @@ SEASTAR_THREAD_TEST_CASE(complex_sst1_k1) {
return make_ready_future<>();
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(complex_sst1_k2) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
generate_clustered<1>(env, "key2").then([] (auto&& mutation) {
auto s = complex_schema();
@@ -222,11 +221,11 @@ SEASTAR_THREAD_TEST_CASE(complex_sst1_k2) {
return make_ready_future<>();
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(complex_sst2_k1) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
generate_clustered<2>(env, "key1").then([] (auto&& mutation) {
auto s = complex_schema();
@@ -242,11 +241,11 @@ SEASTAR_THREAD_TEST_CASE(complex_sst2_k1) {
match_collection_element<status::dead>(reg_list.cells[0], bytes_opt{}, bytes_opt{});
return make_ready_future<>();
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(complex_sst2_k2) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
generate_clustered<2>(env, "key2").then([] (auto&& mutation) {
auto s = complex_schema();
@@ -273,11 +272,11 @@ SEASTAR_THREAD_TEST_CASE(complex_sst2_k2) {
return make_ready_future<>();
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(complex_sst2_k3) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
generate_clustered<2>(env, "key3").then([] (auto&& mutation) {
auto s = complex_schema();
@@ -293,11 +292,11 @@ SEASTAR_THREAD_TEST_CASE(complex_sst2_k3) {
match_absent(row1.cells(), *s, "reg_fset");
return make_ready_future<>();
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(complex_sst3_k1) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
generate_clustered<3>(env, "key1").then([] (auto&& mutation) {
auto s = complex_schema();
@@ -315,11 +314,11 @@ SEASTAR_THREAD_TEST_CASE(complex_sst3_k1) {
match_absent(row.cells(), *s, "reg_fset");
return make_ready_future<>();
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(complex_sst3_k2) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
generate_clustered<3>(env, "key2").then([] (auto&& mutation) {
auto s = complex_schema();
@@ -335,6 +334,7 @@ SEASTAR_THREAD_TEST_CASE(complex_sst3_k2) {
match_absent(row.cells(), *s, "reg_fset");
return make_ready_future<>();
}).get();
}).get();
}
future<> test_range_reads(sstables::test_env& env, const dht::token& min, const dht::token& max, std::vector<bytes>& expected) {
@@ -371,27 +371,30 @@ future<> test_range_reads(sstables::test_env& env, const dht::token& min, const
}
SEASTAR_THREAD_TEST_CASE(read_range) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
std::vector<bytes> expected = { to_bytes("finna"), to_bytes("isak"), to_bytes("gustaf"), to_bytes("vinna") };
do_with(sstables::test_env(), std::move(expected), [] (auto& env, auto& expected) {
do_with(std::move(expected), [&env] (auto& expected) {
return test_range_reads(env, dht::minimum_token(), dht::maximum_token(), expected);
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(read_partial_range) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
std::vector<bytes> expected = { to_bytes("finna"), to_bytes("isak") };
do_with(sstables::test_env(), std::move(expected), [] (auto& env, auto& expected) {
do_with(std::move(expected), [&env] (auto& expected) {
return test_range_reads(env, uncompressed_schema()->get_partitioner().get_token(key_view(bytes_view(expected.back()))), dht::maximum_token(), expected);
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(read_partial_range_2) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
std::vector<bytes> expected = { to_bytes("gustaf"), to_bytes("vinna") };
do_with(sstables::test_env(), std::move(expected), [] (auto& env, auto& expected) {
do_with(std::move(expected), [&env] (auto& expected) {
return test_range_reads(env, dht::minimum_token(), uncompressed_schema()->get_partitioner().get_token(key_view(bytes_view(expected.front()))), expected);
}).get();
}).get();
}
static
@@ -402,7 +405,7 @@ mutation_source make_sstable_mutation_source(sstables::test_env& env, schema_ptr
SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
storage_service_for_tests ssft;
auto dir = tmpdir();
auto s = make_shared_schema({}, "ks", "cf",
@@ -419,7 +422,6 @@ SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
@@ -439,12 +441,12 @@ SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) {
c_key_end,
bound_kind::excl_end,
tombstone(9, ttl))));
}).get();
});
}
SEASTAR_THREAD_TEST_CASE(compact_storage_sparse_read) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
env.reusable_sst(compact_sparse_schema(), "test/resource/sstables/compact_sparse", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_sparse_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_sparse_schema();
@@ -459,11 +461,11 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_sparse_read) {
});
});
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(compact_storage_simple_dense_read) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
env.reusable_sst(compact_simple_dense_schema(), "test/resource/sstables/compact_simple_dense", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_simple_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_simple_dense_schema();
@@ -480,11 +482,11 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_simple_dense_read) {
});
});
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(compact_storage_dense_read) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
env.reusable_sst(compact_dense_schema(), "test/resource/sstables/compact_dense", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_dense_schema();
@@ -501,6 +503,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_dense_read) {
});
});
}).get();
}).get();
}
// We recently had an issue, documented at #188, where range-reading from an
@@ -508,8 +511,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_dense_read) {
//
// Make sure we don't regress on that.
SEASTAR_THREAD_TEST_CASE(broken_ranges_collection) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
sstables::test_env env;
test_env::do_with_async([] (test_env& env) {
env.reusable_sst(peers_schema(), "test/resource/sstables/broken_ranges", 2).then([] (auto sstp) {
auto s = peers_schema();
auto reader = make_lw_shared<flat_mutation_reader>(sstp->as_mutation_source().make_reader(s, tests::make_permit(), query::full_partition_range));
@@ -537,6 +539,7 @@ SEASTAR_THREAD_TEST_CASE(broken_ranges_collection) {
});
});
}).get();
}).get();
}
static schema_ptr tombstone_overlap_schema() {
@@ -561,8 +564,7 @@ static schema_ptr tombstone_overlap_schema() {
}
static future<sstable_ptr> ka_sst(schema_ptr schema, sstring dir, unsigned long generation) {
sstables::test_env env;
static future<sstable_ptr> ka_sst(sstables::test_env& env, schema_ptr schema, sstring dir, unsigned long generation) {
auto sst = env.make_sstable(std::move(schema), dir, generation, sstables::sstable::version_types::ka, big);
auto fut = sst->load();
return std::move(fut).then([sst = std::move(sst)] {
@@ -577,8 +579,8 @@ static future<sstable_ptr> ka_sst(schema_ptr schema, sstring dir, unsigned long
// ["aaa:bbb:!","aaa:!",1459334681228103,"t",1459334681]]}
// ]
SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
ka_sst(tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 1).then([] (auto sstp) {
test_env::do_with_async([] (test_env& env) {
ka_sst(env, tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 1).then([] (auto sstp) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s, tests::make_permit()), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
@@ -629,6 +631,7 @@ SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone) {
});
});
}).get();
}).get();
}
// Same schema as above, the sstable looks like:
@@ -641,8 +644,8 @@ SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone) {
// We're not sure how this sort of sstable can be generated with Cassandra 2's
// CQL, but we saw a similar thing is a real use case.
SEASTAR_THREAD_TEST_CASE(range_tombstone_reading) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
ka_sst(tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 4).then([] (auto sstp) {
test_env::do_with_async([] (test_env& env) {
ka_sst(env, tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 4).then([] (auto sstp) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s, tests::make_permit()), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
@@ -678,6 +681,7 @@ SEASTAR_THREAD_TEST_CASE(range_tombstone_reading) {
});
});
}).get();
}).get();
}
// In this test case we have *three* levels of of tombstones:
@@ -719,8 +723,8 @@ static schema_ptr tombstone_overlap_schema2() {
return s;
}
SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone2) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
ka_sst(tombstone_overlap_schema2(), "test/resource/sstables/tombstone_overlap", 3).then([] (auto sstp) {
test_env::do_with_async([] (test_env& env) {
ka_sst(env, tombstone_overlap_schema2(), "test/resource/sstables/tombstone_overlap", 3).then([] (auto sstp) {
auto s = tombstone_overlap_schema2();
return do_with(sstp->read_rows_flat(s, tests::make_permit()), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
@@ -776,6 +780,7 @@ SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone2) {
});
});
}).get();
}).get();
}
// Reproducer for #4783
@@ -800,8 +805,9 @@ static schema_ptr buffer_overflow_schema() {
return s;
}
SEASTAR_THREAD_TEST_CASE(buffer_overflow) {
test_env::do_with_async([] (test_env& env) {
auto s = buffer_overflow_schema();
auto sstp = ka_sst(s, "test/resource/sstables/buffer_overflow", 5).get0();
auto sstp = ka_sst(env, s, "test/resource/sstables/buffer_overflow", 5).get0();
auto r = sstp->read_rows_flat(s, tests::make_permit());
auto pk1 = partition_key::from_exploded(*s, { int32_type->decompose(4) });
auto dk1 = dht::decorate_key(*s, pk1);
@@ -830,11 +836,12 @@ SEASTAR_THREAD_TEST_CASE(buffer_overflow) {
.produces_row_with_key(ck2)
.produces_partition_end()
.produces_end_of_stream();
}).get();
}
SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -854,7 +861,6 @@ SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
@@ -866,12 +872,13 @@ SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) {
auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0();
BOOST_REQUIRE(bool(mut));
}
}).get();
});
}
SEASTAR_TEST_CASE(test_has_partition_key) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -891,7 +898,6 @@ SEASTAR_TEST_CASE(test_has_partition_key) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
@@ -910,6 +916,7 @@ SEASTAR_TEST_CASE(test_has_partition_key) {
res = sst->has_partition_key(hk2, dk2).get0();
BOOST_REQUIRE(! bool(res));
}
}).get();
});
}
@@ -919,7 +926,7 @@ static std::unique_ptr<index_reader> get_index_reader(shared_sstable sst) {
SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
storage_service_for_tests ssft;
auto dir = tmpdir();
schema_builder builder("ks", "cf");
@@ -955,23 +962,23 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
assert_that(get_index_reader(sst)).has_monotonic_positions(*s);
}).get();
});
}
SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -1008,13 +1015,12 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
@@ -1030,12 +1036,13 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
.produces_end_of_stream();
}
}
}).get();
});
}
SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -1068,13 +1075,12 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
@@ -1090,12 +1096,13 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
.produces_end_of_stream();
}
}
}).get();
});
}
SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -1125,13 +1132,12 @@ SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
generation,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
@@ -1144,12 +1150,13 @@ SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
}
}
}
}).get();
});
}
SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound_dense_schemas) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -1172,13 +1179,12 @@ SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
version,
sstables::sstable::format_types::big);
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, test_sstables_manager.configure_writer(), mt->get_encoding_stats()).get();
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, env.manager().configure_writer(), mt->get_encoding_stats()).get();
sst->load().get();
{
@@ -1188,12 +1194,13 @@ SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound
.produces_end_of_stream();
}
}
}).get();
});
}
SEASTAR_TEST_CASE(test_promoted_index_is_absent_for_schemas_without_clustering_key) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -1211,25 +1218,25 @@ SEASTAR_TEST_CASE(test_promoted_index_is_absent_for_schemas_without_clustering_k
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
assert_that(get_index_reader(sst)).is_empty(*s);
}
}).get();
});
}
SEASTAR_TEST_CASE(test_can_write_and_read_non_compound_range_tombstone_as_compound) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -1252,13 +1259,12 @@ SEASTAR_TEST_CASE(test_can_write_and_read_non_compound_range_tombstone_as_compou
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.correctly_serialize_non_compound_range_tombstones = false;
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
@@ -1270,12 +1276,13 @@ SEASTAR_TEST_CASE(test_can_write_and_read_non_compound_range_tombstone_as_compou
.produces_end_of_stream();
}
}
}).get();
});
}
SEASTAR_TEST_CASE(test_writing_combined_stream_with_tombstones_at_the_same_position) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = tmpdir();
@@ -1306,7 +1313,6 @@ SEASTAR_TEST_CASE(test_writing_combined_stream_with_tombstones_at_the_same_posit
auto mt2 = make_lw_shared<memtable>(s);
mt2->apply(m2);
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
@@ -1314,19 +1320,20 @@ SEASTAR_TEST_CASE(test_writing_combined_stream_with_tombstones_at_the_same_posit
sstables::sstable::format_types::big);
sst->write_components(make_combined_reader(s,
mt1->make_flat_reader(s, tests::make_permit()),
mt2->make_flat_reader(s, tests::make_permit())), 1, s, test_sstables_manager.configure_writer(), encoding_stats{}).get();
mt2->make_flat_reader(s, tests::make_permit())), 1, s, env.manager().configure_writer(), encoding_stats{}).get();
sst->load().get();
assert_that(sst->as_mutation_source().make_reader(s, tests::make_permit()))
.produces(m1 + m2)
.produces_end_of_stream();
}
}).get();
});
}
SEASTAR_TEST_CASE(test_no_index_reads_when_rows_fall_into_range_boundaries) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
simple_schema ss(simple_schema::with_static::yes);
@@ -1347,8 +1354,7 @@ SEASTAR_TEST_CASE(test_no_index_reads_when_rows_fall_into_range_boundaries) {
ss.add_row(m2, ss.make_ckey(6), "v");
tmpdir dir;
sstables::test_env env;
auto ms = make_sstable_mutation_source(env, s, dir.path().string(), {m1, m2}, test_sstables_manager.configure_writer(), version);
auto ms = make_sstable_mutation_source(env, s, dir.path().string(), {m1, m2}, env.manager().configure_writer(), version);
auto index_accesses = [] {
auto&& stats = sstables::shared_index_lists::shard_stats();
@@ -1366,12 +1372,13 @@ SEASTAR_TEST_CASE(test_no_index_reads_when_rows_fall_into_range_boundaries) {
BOOST_REQUIRE_EQUAL(index_accesses(), before);
}
}
}).get();
});
}
SEASTAR_TEST_CASE(test_key_count_estimation) {
return seastar::async([] {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
simple_schema ss;
@@ -1388,8 +1395,7 @@ SEASTAR_TEST_CASE(test_key_count_estimation) {
}
tmpdir dir;
sstables::test_env env;
shared_sstable sst = make_sstable(env, s, dir.path().string(), muts, test_sstables_manager.configure_writer(), version);
shared_sstable sst = make_sstable(env, s, dir.path().string(), muts, env.manager().configure_writer(), version);
auto max_est = sst->get_estimated_key_count();
testlog.trace("count = {}", count);
@@ -1433,12 +1439,12 @@ SEASTAR_TEST_CASE(test_key_count_estimation) {
BOOST_REQUIRE_EQUAL(est, 0);
}
}
}).get();
});
}
SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
test_env::do_with_async([] (test_env& env) {
// We create a sequence of partitions such that first we have a partition with a very long key, then
// series of partitions with small keys. This should result in large index page.
@@ -1492,13 +1498,12 @@ SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations)
mt->apply(m);
}
sstables::test_env env;
auto sst = env.make_sstable(s,
dir.path().string(),
1 /* generation */,
sstable_version_types::ka,
sstables::sstable::format_types::big);
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, test_sstables_manager.configure_writer(), mt->get_encoding_stats()).get();
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, env.manager().configure_writer(), mt->get_encoding_stats()).get();
sst->load().get();
auto pr = dht::partition_range::make_singular(small_keys[0]);
@@ -1517,12 +1522,13 @@ SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations)
assert_that(actual).is_equal_to(expected);
BOOST_REQUIRE_EQUAL(large_allocs_after - large_allocs_before, 0);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_reading_serialization_header) {
test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
storage_service_for_tests ssft;
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
auto random_int32_value = [] {
return int32_type->decompose(tests::random::get_int<int32_t>());
@@ -1560,13 +1566,12 @@ SEASTAR_THREAD_TEST_CASE(test_reading_serialization_header) {
auto m1ow = md1_overwrite.build(s);
mt->apply(m1ow);
sstables::test_env env;
{
// SSTable class has way too many responsibilities. In particular, it mixes the reading and
// writting parts. Let's use a separate objects for writing and reading to ensure that nothing
// carries over that wouldn't normally be read from disk.
auto sst = env.make_sstable(s, dir.path().string(), 1, sstable::version_types::mc, sstables::sstable::format_types::big);
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 2, s, test_sstables_manager.configure_writer(), mt->get_encoding_stats()).get();
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 2, s, env.manager().configure_writer(), mt->get_encoding_stats()).get();
}
auto sst = env.make_sstable(s, dir.path().string(), 1, sstable::version_types::mc, sstables::sstable::format_types::big);
@@ -1589,6 +1594,7 @@ SEASTAR_THREAD_TEST_CASE(test_reading_serialization_header) {
// Like Cassandra even if a row marker is not expiring we update the metadata with NO_TTL value
// which is 0.
BOOST_CHECK(stats.min_ttl == gc_clock::duration(0));
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_merging_encoding_stats) {
@@ -1627,9 +1633,9 @@ SEASTAR_THREAD_TEST_CASE(test_merging_encoding_stats) {
// Reproducer for #4206
SEASTAR_THREAD_TEST_CASE(test_counter_header_size) {
test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
storage_service_for_tests ssft;
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
auto s = schema_builder("ks", "counter_test")
.with_column("pk", int32_type, column_kind::partition_key)
@@ -1658,16 +1664,16 @@ SEASTAR_THREAD_TEST_CASE(test_counter_header_size) {
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
sstables::test_env env;
for (const auto version : all_sstable_versions) {
auto sst = env.make_sstable(s, dir.path().string(), 1, version, sstables::sstable::format_types::big);
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, test_sstables_manager.configure_writer(), mt->get_encoding_stats()).get();
sst->write_components(mt->make_flat_reader(s, tests::make_permit()), 1, s, env.manager().configure_writer(), mt->get_encoding_stats()).get();
sst->load().get();
assert_that(sst->as_mutation_source().make_reader(s, tests::make_permit()))
.produces(m)
.produces_end_of_stream();
}
}).get();
}
SEASTAR_TEST_CASE(test_static_compact_tables_are_read) {
@@ -1699,7 +1705,7 @@ SEASTAR_TEST_CASE(test_static_compact_tables_are_read) {
boost::sort(muts, mutation_decorated_key_less_comparator{});
tmpdir dir;
sstable_writer_config cfg = test_sstables_manager.configure_writer();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.correctly_serialize_static_compact_in_mc = correctly_serialize;
auto ms = make_sstable_mutation_source(env, s, dir.path().string(), muts, cfg, version);

View File

@@ -42,6 +42,7 @@ static schema_ptr get_schema(unsigned shard_count, unsigned sharding_ignore_msb_
void run_sstable_resharding_test() {
test_env env;
auto close_env = defer([&] { env.stop().get(); });
cache_tracker tracker;
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
@@ -49,7 +50,7 @@ void run_sstable_resharding_test() {
auto s = get_schema();
auto cm = make_lw_shared<compaction_manager>();
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto cf = make_lw_shared<column_family>(s, column_family_test_config(), column_family::no_commitlog(), *cm, *cl_stats, tracker);
auto cf = make_lw_shared<column_family>(s, column_family_test_config(env.manager()), column_family::no_commitlog(), *cm, *cl_stats, tracker);
cf->mark_ready_for_writes();
std::unordered_map<shard_id, std::vector<mutation>> muts;
static constexpr auto keys_per_shard = 1000u;

View File

@@ -183,14 +183,10 @@ static future<sstable_ptr> do_write_sst(test_env& env, schema_ptr schema, sstrin
});
}
static future<sstable_ptr> do_write_sst(schema_ptr schema, sstring load_dir, sstring write_dir, unsigned long generation) {
return test_env::do_with([schema = std::move(schema), load_dir = std::move(load_dir), write_dir = std::move(write_dir), generation] (test_env& env) {
return do_write_sst(env, std::move(schema), std::move(load_dir), std::move(write_dir), generation);
});
}
static future<> write_sst_info(schema_ptr schema, sstring load_dir, sstring write_dir, unsigned long generation) {
return do_write_sst(std::move(schema), load_dir, write_dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
return test_env::do_with([schema = std::move(schema), load_dir = std::move(load_dir), write_dir = std::move(write_dir), generation] (test_env& env) {
return do_write_sst(env, std::move(schema), load_dir, write_dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
});
}
using bufptr_t = std::unique_ptr<char [], free_deleter>;
@@ -245,10 +241,9 @@ SEASTAR_TEST_CASE(check_compressed_info_func) {
return check_component_integrity(component_type::CompressionInfo);
}
template <typename Func>
inline auto
write_and_validate_sst(schema_ptr s, sstring dir, Func&& func) {
return test_env::do_with(tmpdir(), [s = std::move(s), dir = std::move(dir), func = std::move(func)] (test_env& env, tmpdir& tmp) {
future<>
write_and_validate_sst(schema_ptr s, sstring dir, noncopyable_function<future<> (shared_sstable sst1, shared_sstable sst2)> func) {
return test_env::do_with(tmpdir(), [s = std::move(s), dir = std::move(dir), func = std::move(func)] (test_env& env, tmpdir& tmp) mutable {
return do_write_sst(env, s, dir, tmp.path().string(), 1).then([&env, &tmp, s = std::move(s), func = std::move(func)] (auto sst1) {
auto sst2 = env.make_sstable(s, tmp.path().string(), 2, la, big);
return func(std::move(sst1), std::move(sst2));

View File

@@ -433,7 +433,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
auto write_to_sstable = [&] (mutation m) {
auto sst = t->make_streaming_staging_sstable();
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer();
auto& pc = service::get_local_streaming_priority();
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
@@ -547,7 +547,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
}
auto sst = t->make_streaming_staging_sstable();
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer();
auto& pc = service::get_local_streaming_priority();
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
@@ -584,7 +584,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak
db_cfg.enable_cache(false);
db_cfg.enable_commitlog(false);
do_with_cql_env([] (cql_test_env& e) -> future<> {
do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table t (p text, c text, v text, primary key (p, c))").get();
e.execute_cql("create materialized view tv as select * from t "
"where p is not null and c is not null and v is not null "
@@ -619,7 +619,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak
}
auto sst = t->make_streaming_staging_sstable();
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer();
auto& pc = service::get_local_streaming_priority();
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
@@ -669,10 +669,16 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak
auto fut_res = when_all_succeed(futures.begin(), futures.end());
auto watchdog_timer_done = make_ready_future<>();
// Watchdog timer which will break out of the deadlock and fail the test.
timer watchdog_timer([&view_update_generator] {
timer watchdog_timer([&view_update_generator, &watchdog_timer_done] {
// Re-start it so stop() on shutdown doesn't crash.
(void)view_update_generator.stop().then([&] { return view_update_generator.start(); });
watchdog_timer_done = watchdog_timer_done.then([&] {
return view_update_generator.stop().then([&] {
return view_update_generator.start();
});
});
});
watchdog_timer.arm(std::chrono::seconds(60));
@@ -682,8 +688,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak
watchdog_timer.cancel();
return make_ready_future<>();
watchdog_timer_done.get();
}, std::move(test_cfg)).get();
}

View File

@@ -80,12 +80,6 @@ cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
cql_test_config::cql_test_config(const cql_test_config&) = default;
cql_test_config::~cql_test_config() = default;
namespace sstables {
future<> await_background_jobs_on_all_shards();
}
static const sstring testing_superuser = "tester";
static future<> tst_init_ms_fd_gossiper(sharded<gms::feature_service>& features, sharded<locator::token_metadata>& tm, sharded<netw::messaging_service>& ms, db::config& cfg, db::seed_provider_type seed_provider,
@@ -381,8 +375,6 @@ public:
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
auto stop_snitch = defer([] { locator::i_endpoint_snitch::stop_snitch().get(); });
auto wait_for_background_jobs = defer([] { sstables::await_background_jobs_on_all_shards().get(); });
sharded<abort_source> abort_sources;
abort_sources.start().get();
auto stop_abort_sources = defer([&] { abort_sources.stop().get(); });

View File

@@ -22,23 +22,30 @@
#pragma once
#include <seastar/core/do_with.hh>
#include <seastar/util/noncopyable_function.hh>
#include <seastar/core/sharded.hh>
#include <seastar/util/defer.hh>
#include "sstables/sstables.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/test_services.hh"
#include "test/lib/log.hh"
namespace sstables {
class test_env {
sstables_manager& _mgr;
std::unique_ptr<sstables_manager> _mgr;
public:
explicit test_env() : _mgr(test_sstables_manager) { }
explicit test_env(sstables_manager& mgr) : _mgr(mgr) { }
explicit test_env() : _mgr(std::make_unique<sstables_manager>(nop_lp_handler, test_db_config, test_feature_service)) { }
future<> stop() {
return _mgr->close();
}
shared_sstable make_sstable(schema_ptr schema, sstring dir, unsigned long generation,
sstable::version_types v, sstable::format_types f = sstable::format_types::big,
size_t buffer_size = default_sstable_buffer_size, gc_clock::time_point now = gc_clock::now()) {
return _mgr.make_sstable(std::move(schema), dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
return _mgr->make_sstable(std::move(schema), dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
}
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, unsigned long generation,
@@ -49,6 +56,8 @@ public:
});
}
sstables_manager& manager() { return *_mgr; }
future<> working_sst(schema_ptr schema, sstring dir, unsigned long generation) {
return reusable_sst(std::move(schema), dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
}
@@ -56,25 +65,46 @@ public:
template <typename Func>
static inline auto do_with(Func&& func) {
return seastar::do_with(test_env(), [func = std::move(func)] (test_env& env) mutable {
return func(env);
return func(env).finally([&env] {
return env.stop();
});
});
}
template <typename T, typename Func>
static inline auto do_with(T&& rval, Func&& func) {
return seastar::do_with(test_env(), std::forward<T>(rval), [func = std::move(func)] (test_env& env, T& val) mutable {
return func(env, val);
return func(env, val).finally([&env] {
return env.stop();
});
});
}
template <typename Func>
static inline auto do_with_async(Func&& func) {
static inline future<> do_with_async(noncopyable_function<void (test_env&)> func) {
return seastar::async([func = std::move(func)] {
auto wait_for_background_jobs = defer([] { sstables::await_background_jobs_on_all_shards().get(); });
test_env env;
auto close_env = defer([&] { env.stop().get(); });
func(env);
});
}
static inline future<> do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)> func) {
return seastar::async([func = std::move(func)] {
sharded<test_env> env;
env.start().get();
auto stop = defer([&] { env.stop().get(); });
func(env);
});
}
template <typename T>
static future<T> do_with_async_returning(noncopyable_function<T (test_env&)> func) {
return seastar::async([func = std::move(func)] {
test_env env;
auto stop = defer([&] { env.stop().get(); });
return func(env);
});
}
};
} // namespace sstables

View File

@@ -44,7 +44,7 @@ using local_shard_only = bool_class<local_shard_only_tag>;
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, std::vector<mutation> muts);
inline future<> write_memtable_to_sstable_for_test(memtable& mt, sstables::shared_sstable sst) {
return write_memtable_to_sstable(mt, sst, test_sstables_manager.configure_writer());
return write_memtable_to_sstable(mt, sst, sst->manager().configure_writer());
}
//
@@ -316,6 +316,7 @@ public:
storage_service_for_tests ssft;
auto tmp = tmpdir();
test_env env;
auto close_env = defer([&] { env.stop().get(); });
fut(env, tmp.path().string()).get();
});
}
@@ -331,6 +332,7 @@ public:
auto dest_path = dest_dir.path() / src.c_str();
std::filesystem::create_directories(dest_path);
test_env env;
auto close_env = defer([&] { env.stop().get(); });
fut(env, src_dir.path().string(), dest_path.string()).get();
});
}

View File

@@ -101,28 +101,28 @@ static const sstring some_column_family("cf");
db::nop_large_data_handler nop_lp_handler;
db::config test_db_config;
gms::feature_service test_feature_service(gms::feature_config_from_db_config(test_db_config));
thread_local sstables::sstables_manager test_sstables_manager(nop_lp_handler, test_db_config, test_feature_service);
column_family::config column_family_test_config() {
column_family::config column_family_test_config(sstables::sstables_manager& sstables_manager) {
column_family::config cfg;
cfg.sstables_manager = &test_sstables_manager;
cfg.sstables_manager = &sstables_manager;
cfg.compaction_concurrency_semaphore = &tests::semaphore();
return cfg;
}
column_family_for_tests::column_family_for_tests()
column_family_for_tests::column_family_for_tests(sstables::sstables_manager& sstables_manager)
: column_family_for_tests(
sstables_manager,
schema_builder(some_keyspace, some_column_family)
.with_column(utf8_type->decompose("p1"), utf8_type, column_kind::partition_key)
.build()
)
{ }
column_family_for_tests::column_family_for_tests(schema_ptr s)
column_family_for_tests::column_family_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s)
: _data(make_lw_shared<data>())
{
_data->s = s;
_data->cfg = column_family_test_config();
_data->cfg = column_family_test_config(sstables_manager);
_data->cfg.enable_disk_writes = false;
_data->cfg.enable_commitlog = false;
_data->cf = make_lw_shared<column_family>(_data->s, _data->cfg, column_family::no_commitlog(), _data->cm, _data->cl_stats, _data->tracker);

View File

@@ -46,9 +46,8 @@ public:
extern db::nop_large_data_handler nop_lp_handler;
extern db::config test_db_config;
extern gms::feature_service test_feature_service;
extern thread_local sstables::sstables_manager test_sstables_manager;
column_family::config column_family_test_config();
column_family::config column_family_test_config(sstables::sstables_manager& sstables_manager);
struct column_family_for_tests {
struct data {
@@ -61,9 +60,9 @@ struct column_family_for_tests {
};
lw_shared_ptr<data> _data;
column_family_for_tests();
explicit column_family_for_tests(sstables::sstables_manager& sstables_manager);
explicit column_family_for_tests(schema_ptr s);
explicit column_family_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s);
schema_ptr schema() { return _data->s; }

View File

@@ -153,7 +153,7 @@ public:
future<double> compaction(int idx) {
return test_setup::create_empty_test_dir(dir()).then([this, idx] {
return seastar::async([this, idx] {
return sstables::test_env::do_with_async_returning<double>([this, idx] (sstables::test_env& env) {
auto sst_gen = [this, gen = make_lw_shared<unsigned>(idx)] () mutable {
return _env.make_sstable(s, dir(), (*gen)++, sstable::version_types::ka, sstable::format_types::big, _cfg.buffer_size);
};
@@ -170,7 +170,7 @@ public:
cache_tracker tracker;
cell_locker_stats cl_stats;
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family_test_config(), column_family::no_commitlog(), *cm, cl_stats, tracker);
auto cf = make_lw_shared<column_family>(s, column_family_test_config(env.manager()), column_family::no_commitlog(), *cm, cl_stats, tracker);
auto start = perf_sstable_test_env::now();