From c188aef08852650e71fd4da0ca8382ad3efa9c5b Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 10 Jun 2020 17:39:54 -0400 Subject: [PATCH 01/15] sstable_directory: fix debug message I just noticed while working on the reshape patches that there is an extra format bracket in two of the debug message. As they are debug I've seen them less often than the others and that slipped. Signed-off-by: Glauber Costa --- sstables/sstable_directory.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 2716c91d39..fe6ca152c5 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -187,7 +187,7 @@ sstable_directory::process_sstable_dir(const ::io_priority_class& iop) { return std::max(a, b); }); - dirlog.debug("{} After {} scanned, seen generation {}. {} descriptors found, {} different files found ", + dirlog.debug("After {} scanned, seen generation {}. {} descriptors found, {} different files found ", _sstable_dir, _max_generation_seen, state.descriptors.size(), state.generations_found.size()); // _descriptors is everything with a TOC. So after we remove this, what's left is @@ -232,7 +232,7 @@ sstable_directory::move_foreign_sstables(sharded& source_dire } // Should be empty, since an SSTable that belongs to this shard is not remote. assert(shard_id != this_shard_id()); - dirlog.debug("{} Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id); + dirlog.debug("Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id); return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec)); }); } From 1c70a7c54ef2c95a7b235378658212e3ea167138 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 10 Jun 2020 23:14:43 -0400 Subject: [PATCH 02/15] upload: use custom error handler for upload directory SSTables created for the upload directory should be using its custom error handler. There is one user of the custom error handler in tree, which is the current upload directory function. As we will use a free function instead of a lambda in our implementation we also use the opportunity to fix it for consistency. Signed-off-by: Glauber Costa --- distributed_loader.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/distributed_loader.cc b/distributed_loader.cc index d6ea2fc78e..76d9dea667 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -79,6 +79,10 @@ static io_error_handler error_handler_for_upload_dir() { }; } +io_error_handler error_handler_gen_for_upload_dir(disk_error_signal_type& dummy) { + return error_handler_for_upload_dir(); +} + // TODO: possibly move it to seastar template static future<> invoke_shards_with_ptr(std::unordered_set shards, distributed& s, PtrType ptr, Func&& func) { @@ -396,7 +400,7 @@ distributed_loader::process_upload_dir(distributed& db, sstring ks, ss sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()), sstables::sstable_directory::allow_loading_materialized_view::no, [&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { - return global_table->make_sstable(dir.native(), gen, v, f); + return global_table->make_sstable(dir.native(), gen, v, f, &error_handler_gen_for_upload_dir); }).get(); @@ -427,7 +431,7 @@ distributed_loader::process_upload_dir(distributed& db, sstring ks, ss return global_table->make_sstable(upload.native(), gen, global_table->get_sstables_manager().get_highest_supported_format(), - sstables::sstable::format_types::big); + sstables::sstable::format_types::big, &error_handler_gen_for_upload_dir); }).get(); }); } @@ -465,8 +469,7 @@ distributed_loader::flush_upload_dir(distributed& db, distributedread_toc().get(); From bb076783469a7e558ea064843f18788e85bac39a Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 15 Jun 2020 15:05:40 -0400 Subject: [PATCH 03/15] api: do not allow user to meddle with auto compaction too early We are about to use the auto compaction property during the populate/reshard process. If the user toggles it, the database can be left in a bad state. There should be no reason why a user would want to set that up this early. So we'll disallow it. To do that property, it is better if the check of whether or not the storage service is ready to accomodate this request is local to the storage service itself. We then move the logic of set_tables_autocompaction from api to the storage service. The API layer now merely translates the table names and pass it along. Signed-off-by: Glauber Costa --- api/storage_service.cc | 25 ++++++++----------------- service/storage_service.cc | 19 +++++++++++++++++++ service/storage_service.hh | 2 ++ 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 209de1842c..454d563302 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -87,20 +87,13 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) { }; } -future<> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector tables, bool enabled) { +future set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector tables, bool enabled) { if (tables.empty()) { tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); } - return ctx.db.invoke_on_all([keyspace, tables, enabled] (database& db) { - return parallel_for_each(tables, [&db, keyspace, enabled](const sstring& table) mutable { - column_family& cf = db.find_column_family(keyspace, table); - if (enabled) { - cf.enable_auto_compaction(); - } else { - cf.disable_auto_compaction(); - } - return make_ready_future<>(); - }); + + return service::get_local_storage_service().set_tables_autocompaction(keyspace, tables, enabled).then([]{ + return make_ready_future(json_void()); }); } @@ -742,17 +735,15 @@ void set_storage_service(http_context& ctx, routes& r) { ss::enable_auto_compaction.set(r, [&ctx](std::unique_ptr req) { auto keyspace = validate_keyspace(ctx, req->param); auto tables = split_cf(req->get_query_param("cf")); - return set_tables_autocompaction(ctx, keyspace, tables, true).then([]{ - return make_ready_future(json_void()); - }); + + return set_tables_autocompaction(ctx, keyspace, tables, true); }); ss::disable_auto_compaction.set(r, [&ctx](std::unique_ptr req) { auto keyspace = validate_keyspace(ctx, req->param); auto tables = split_cf(req->get_query_param("cf")); - return set_tables_autocompaction(ctx, keyspace, tables, false).then([]{ - return make_ready_future(json_void()); - }); + + return set_tables_autocompaction(ctx, keyspace, tables, false); }); ss::deliver_hints.set(r, [](std::unique_ptr req) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 9166a0fd31..b200762194 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2921,6 +2921,25 @@ void storage_service::shutdown_client_servers() { } } +future<> +storage_service::set_tables_autocompaction(const sstring &keyspace, std::vector tables, bool enabled) { + if (_initialized) { + return make_exception_future<>(std::runtime_error("Too early: storage service not initialized yet")); + } + + return _db.invoke_on_all([keyspace, tables, enabled] (database& db) { + return parallel_for_each(tables, [&db, keyspace, enabled](const sstring& table) mutable { + column_family& cf = db.find_column_family(keyspace, table); + if (enabled) { + cf.enable_auto_compaction(); + } else { + cf.disable_auto_compaction(); + } + return make_ready_future<>(); + }); + }); +} + std::unordered_multimap storage_service::get_new_source_ranges(const sstring& keyspace_name, const dht::token_range_vector& ranges) { auto my_address = get_broadcast_address(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 60231f9c99..4adf9ef9e1 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -853,6 +853,8 @@ public: */ future<> load_new_sstables(sstring ks_name, sstring cf_name); + future<> set_tables_autocompaction(const sstring &keyspace, std::vector tables, bool enabled); + template auto run_with_api_lock(sstring operation, Func&& func) { return get_storage_service().invoke_on(0, [operation = std::move(operation), From 45f3bc679ea95b1fbba4f6fdda185aa6a34202d2 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 20 Apr 2020 14:45:25 -0400 Subject: [PATCH 04/15] distributed_loader: assume populate_column_families is run in shard 0 This is already the case, since main.cc calls it from shard 0 and relies on it to spread the information to the other shards. We will turn this branch - which is always taken - into an assert for the sake of future-proofing and soon add even more code that relies on this being executed in shard 0. Signed-off-by: Glauber Costa --- distributed_loader.cc | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/distributed_loader.cc b/distributed_loader.cc index 76d9dea667..2ba00ff12f 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -963,14 +963,13 @@ future<> distributed_loader::do_populate_column_family(distributed& db future<> distributed_loader::populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] { + assert(this_shard_id() == 0); // First pass, cleanup temporary sstable directories and sstables pending delete. - if (this_shard_id() == 0) { - cleanup_column_family_temp_sst_dirs(sstdir).get(); - auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename(); - auto exists = file_exists(pending_delete_dir).get0(); - if (exists) { - handle_sstables_pending_delete(pending_delete_dir).get(); - } + cleanup_column_family_temp_sst_dirs(sstdir).get(); + auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename(); + auto exists = file_exists(pending_delete_dir).get0(); + if (exists) { + handle_sstables_pending_delete(pending_delete_dir).get(); } // Second pass, cleanup sstables with temporary TOCs and load the rest. do_populate_column_family(db, std::move(sstdir), std::move(ks), std::move(cf)).get(); From 9902af894aa9ac48e61ff889b2f1440039aa4441 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 10 Jun 2020 17:36:46 -0400 Subject: [PATCH 05/15] compaction_manager: rename run_resharding_job It will be used to run any custom job where the caller provides a function. One such example is indeed resharding, but reshaping SSTables can also fall here. The semaphore is also renamed, and we'll allow only one custom job at a time (across all possible types). We also remove the assumption of the scheduling group. The caller has to have already placed the code in the correct CPU scheduling group. The I/O priority class comes from the descriptor. To make sure that we don't regress, we wrap the entire reshard-at-boot code in the compaction class. Currently the setup would be done in the main group, and the actual resharding in the compaction group. Note that this is temporary, as this code is about to change. Signed-off-by: Glauber Costa --- distributed_loader.cc | 2 +- main.cc | 7 +++++++ sstables/compaction_manager.cc | 17 +++++++---------- sstables/compaction_manager.hh | 12 +++++------- sstables/sstable_directory.cc | 2 +- 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/distributed_loader.cc b/distributed_loader.cc index 2ba00ff12f..cbf72c682a 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -565,7 +565,7 @@ static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, sstring auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable { return func(std::move(sstables), level, max_sstable_bytes); }; - return cf->get_compaction_manager().run_resharding_job(&*cf, std::move(job)); + return cf->get_compaction_manager().run_custom_job(&*cf, "resharding", std::move(job)); }); }); } diff --git a/main.cc b/main.cc index 1a9a56b075..6872aeec67 100644 --- a/main.cc +++ b/main.cc @@ -937,10 +937,17 @@ int main(int ac, char** av) { // we will have races between the compaction and loading processes // We also want to trigger regular compaction on boot. + // FIXME: temporary as this code is being replaced. I am keeping the scheduling + // group that was effectively used in the bulk of it (compaction). Soon it will become + // streaming + + with_scheduling_group(dbcfg.compaction_scheduling_group, [&db] { for (auto& x : db.local().get_column_families()) { column_family& cf = *(x.second); distributed_loader::reshard(db, cf.schema()->ks_name(), cf.schema()->cf_name()); } + }).get(); + db.invoke_on_all([&proxy] (database& db) { for (auto& x : db.get_column_families()) { column_family& cf = *(x.second); diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 563ca8cdf8..3055725f6f 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -309,7 +309,7 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) { return task->compaction_done.get_future().then([task] {}); } -future<> compaction_manager::run_resharding_job(column_family* cf, std::function()> job) { +future<> compaction_manager::run_custom_job(column_family* cf, sstring name, noncopyable_function()> job) { if (_state != state::enabled) { return make_ready_future<>(); } @@ -318,9 +318,9 @@ future<> compaction_manager::run_resharding_job(column_family* cf, std::function task->compacting_cf = cf; _tasks.push_back(task); - task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] { + task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, job = std::move(job)] () mutable { // take read lock for cf, so major compaction and resharding can't proceed in parallel. - return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] { + return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] () mutable { _stats.active_tasks++; if (!can_proceed(task)) { return make_ready_future<>(); @@ -329,20 +329,17 @@ future<> compaction_manager::run_resharding_job(column_family* cf, std::function // NOTE: // no need to register shared sstables because they're excluded from non-resharding // compaction and some of them may not even belong to current shard. - - return with_scheduling_group(_scheduling_group, [job = std::move(job)] { - return job(); - }); + return job(); }); - }).then_wrapped([this, task] (future<> f) { + }).then_wrapped([this, task, name] (future<> f) { _stats.active_tasks--; _tasks.remove(task); try { f.get(); } catch (sstables::compaction_stop_exception& e) { - cmlog.info("resharding was abruptly stopped, reason: {}", e.what()); + cmlog.info("{} was abruptly stopped, reason: {}", name, e.what()); } catch (...) { - cmlog.error("resharding failed: {}", std::current_exception()); + cmlog.error("{} failed: {}", name, std::current_exception()); } }); return task->compaction_done.get_future().then([task] {}); diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index 7ea5d0dc58..bbd3b0e728 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -109,7 +109,7 @@ private: // Prevents column family from running major and minor compaction at same time. std::unordered_map _compaction_locks; - semaphore _resharding_sem{1}; + semaphore _custom_job_sem{1}; std::function compaction_submission_callback(); // all registered column families are submitted for compaction at a constant interval. @@ -216,15 +216,13 @@ public: // Submit a column family for major compaction. future<> submit_major_compaction(column_family* cf); - // Run a resharding job for a given column family. + + // Run a custom job for a given column family, defined by a function // it completes when future returned by job is ready or returns immediately // if manager was asked to stop. // - // parameter job is a function that will carry the reshard operation on a set - // of sstables that belong to different shards for this column family using - // sstables::reshard_sstables(), and in the end, it will forward unshared - // sstables created by the process to their owner shards. - future<> run_resharding_job(column_family* cf, std::function()> job); + // parameter job is a function that will carry the operation + future<> run_custom_job(column_family* cf, sstring name, noncopyable_function()> job); // Remove a column family from the compaction manager. // Cancel requests on cf and wait for a possible ongoing compaction on cf. diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index fe6ca152c5..ccded9fdd9 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -309,7 +309,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector& sstlist) mutable { - return cm.run_resharding_job(&table, [this, iop, &cm, &table, creator, &sstlist] () { + return cm.run_custom_job(&table, "resharding", [this, iop, &cm, &table, creator, &sstlist] () { sstables::compaction_descriptor desc(sstlist, {}, iop); desc.options = sstables::compaction_options::make_reshard(); desc.creator = std::move(creator); From baa82b3a262789ce3e9f3e5dce205021aa91a30e Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 11 Jun 2020 12:17:29 -0400 Subject: [PATCH 06/15] distributed_loader.cc : extract highest_generation_seen code We'll use it in one more other location so extract it to common code. Signed-off-by: Glauber Costa --- distributed_loader.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/distributed_loader.cc b/distributed_loader.cc index cbf72c682a..23ea99ca52 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -384,6 +384,13 @@ distributed_loader::reshard(sharded& dir, sharded +highest_generation_seen(sharded& directory) { + return directory.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_generation_seen), int64_t(0), [] (int64_t a, int64_t b) { + return std::max(a, b); + }); +} + future<> distributed_loader::process_upload_dir(distributed& db, sstring ks, sstring cf) { seastar::thread_attributes attr; @@ -410,13 +417,9 @@ distributed_loader::process_upload_dir(distributed& db, sstring ks, ss lock_table(directory, db, ks, cf).get(); process_sstable_dir(directory).get(); - auto highest_generation_seen = directory.map_reduce0( - std::mem_fn(&sstables::sstable_directory::highest_generation_seen), - int64_t(0), - [] (int64_t a, int64_t b) { return std::max(a, b); } - ).get0(); - auto shard_generation_base = highest_generation_seen / smp::count + 1; + auto generation = highest_generation_seen(directory).get0(); + auto shard_generation_base = generation / smp::count + 1; // We still want to do our best to keep the generation numbers shard-friendly. // Each destination shard will manage its own generation counter. From 072d0d3073357a7d04c8117465a1e61f1d5dd48b Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 17 Jun 2020 10:30:47 -0400 Subject: [PATCH 07/15] distributed_loader.cc: add a helper function to extract the highest SSTable version found Using a map reduce in a shared sstable directory, finds the highest version seen across all shards. Signed-off-by: Glauber Costa --- distributed_loader.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/distributed_loader.cc b/distributed_loader.cc index 23ea99ca52..3ed00159e1 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -391,6 +391,14 @@ highest_generation_seen(sharded& directory) { }); } +future +highest_version_seen(sharded& dir, sstables::sstable_version_types system_version) { + using version = sstables::sstable_version_types; + return dir.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_version_seen), system_version, [] (version a, version b) { + return sstables::is_later(a, b) ? a : b; + }); +} + future<> distributed_loader::process_upload_dir(distributed& db, sstring ks, sstring cf) { seastar::thread_attributes attr; From 96abf80c5e3bfbf8dfd27b00f38e572ed159e9c5 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 11 Jun 2020 14:35:43 -0400 Subject: [PATCH 08/15] tests: fix fragile database tests This test wants to make sure that an SSTable with generation number 4, which is incomplete, gets deleted. While that works today, the way the test verifies that is fragile because new SSTables can and will be created, especially in the local directory that sees a lot of activity on startup. It works if generations don't go that far, but with SMP, even a single SSTable in the right shard can end up having generation 4. In practice this isn't an issue today because the code calls cf.update_sstables_known_generation() as soon as it sees a file, before deciding whether or not the file has to be deleted. However this behavior is not guaranteed and is changing. The best way to fix this would be to check if the file is the same, including its inode. But given that this is just a unit test (which is almost always if not always single node), I am just moving to use the peers table instead. Again, we could have created a user table, but it's just not worth the hassle. Signed-off-by: Glauber Costa --- test/boost/database_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 5cfc3c5312..caf1c749a7 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -169,7 +169,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) { // Create incomplete sstables in test data directory sstring ks = "system"; - sstring cf = "local-7ad54392bcdd35a684174e047860b377"; + sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f"; sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string(); auto require_exist = [] (const sstring& name, bool should_exist) { @@ -223,7 +223,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) { // Create incomplete sstables in test data directory sstring ks = "system"; - sstring cf = "local-7ad54392bcdd35a684174e047860b377"; + sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f"; sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string(); sstring pending_delete_dir = sst_dir + "/" + sst::pending_delete_dir_basename(); From ef85a2cec5a4d2387b8675085a50d9ddcbfec3ed Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 17 Jun 2020 10:36:07 -0400 Subject: [PATCH 09/15] compaction: add default implementation for some pure functions There are some functions that are today pure that have an obvious implementation (for example on_new_partition, do nothing). We'll add default implementations to the compaction class, which reduces the boilerplate needed to add a new compaction type. Signed-off-by: Glauber Costa --- sstables/compaction.cc | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index aac24a3959..04771c17e0 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -583,8 +583,13 @@ private: return consumer(make_sstable_reader()); } - virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0; - virtual bool use_interposer_consumer() const = 0; + virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) { + return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)); + } + + virtual bool use_interposer_consumer() const { + return _cf.get_compaction_strategy().use_interposer_consumer(); + } compaction_info finish(std::chrono::time_point started_at, std::chrono::time_point ended_at) { _info->ended_at = std::chrono::duration_cast(ended_at.time_since_epoch()).count(); @@ -621,7 +626,7 @@ private: virtual void report_start(const sstring& formatted_msg) const = 0; virtual void report_finish(const sstring& formatted_msg, std::chrono::time_point ended_at) const = 0; - virtual void backlog_tracker_adjust_charges() = 0; + virtual void backlog_tracker_adjust_charges() { }; std::function max_purgeable_func() { if (!tombstone_expiration_enabled()) { @@ -640,9 +645,9 @@ private: }; } - virtual void on_new_partition() = 0; + virtual void on_new_partition() {} - virtual void on_end_of_compaction() = 0; + virtual void on_end_of_compaction() {}; // create a writer based on decorated key. virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) = 0; @@ -768,14 +773,6 @@ public: _monitor_generator); } - reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { - return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)); - } - - bool use_interposer_consumer() const override { - return _cf.get_compaction_strategy().use_interposer_consumer(); - } - void report_start(const sstring& formatted_msg) const override { clogger.info("Compacting {}", formatted_msg); } From c4841fa73510d91f39eb9cb67526ae8682525af1 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 17 Jun 2020 14:50:33 -0400 Subject: [PATCH 10/15] compaction: add a size and throught pretty printer. This is so we don't always use MB. Sometimes it is best to report GB, TB, and their equivalent throughput metrics. Signed-off-by: Glauber Costa --- distributed_loader.cc | 11 ++++------- sstables/compaction.cc | 28 +++++++++++++++++++++++----- sstables/compaction.hh | 14 ++++++++++++++ 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/distributed_loader.cc b/distributed_loader.cc index 3ed00159e1..acc8b2a9db 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -347,9 +347,8 @@ future<> run_resharding_jobs(sharded& dir, std::vec } return do_with(std::move(reshard_jobs), [&dir, &db, ks_name, table_name, creator = std::move(creator), total_size] (std::vector& reshard_jobs) { - auto total_size_mb = total_size / 1000000.0; auto start = std::chrono::steady_clock::now(); - dblog.info("{}", fmt::format("Resharding {:.2f} MB", total_size_mb)); + dblog.info("{}", fmt::format("Resharding {} ", sstables::pretty_printed_data_size(total_size))); return dir.invoke_on_all([&dir, &db, &reshard_jobs, ks_name, table_name, creator] (sstables::sstable_directory& d) mutable { auto& table = db.local().find_column_family(ks_name, table_name); @@ -360,11 +359,9 @@ future<> run_resharding_jobs(sharded& dir, std::vec return d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop).then([&d, &dir] { return d.move_foreign_sstables(dir); }); - }).then([start, total_size_mb] { - // add a microsecond to prevent division by zero - auto now = std::chrono::steady_clock::now() + 1us; - auto seconds = std::chrono::duration_cast>(now - start).count(); - dblog.info("{}", fmt::format("Resharded {:.2f} MB in {:.2f} seconds, {:.2f} MB/s", total_size_mb, seconds, (total_size_mb / seconds))); + }).then([start, total_size] { + auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); + dblog.info("{}", fmt::format("Resharded {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration))); return make_ready_future<>(); }); }); diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 04771c17e0..5d2ba7cf92 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -74,6 +74,25 @@ namespace sstables { logging::logger clogger("compaction"); +std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) { + static constexpr const char* suffixes[] = { " bytes", "kB", "MB", "GB", "TB", "PB" }; + + unsigned exp = 0; + while ((data._size >= 1000) && (exp < sizeof(suffixes))) { + exp++; + data._size /= 1000; + } + + os << data._size << suffixes[exp]; + return os; +} + +std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) { + uint64_t throughput = tp._duration.count() > 0 ? tp._size / tp._duration.count() : 0; + os << pretty_printed_data_size(throughput) << "/s"; + return os; +} + static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf, sstable_set::incremental_selector& selector, const std::unordered_set& compacting_set, const dht::decorated_key& dk) { auto timestamp = api::max_timestamp; @@ -596,7 +615,6 @@ private: auto ratio = double(_info->end_size) / double(_info->start_size); auto duration = std::chrono::duration(ended_at - started_at); // Don't report NaN or negative number. - auto throughput = duration.count() > 0 ? (double(_info->end_size) / (1024*1024)) / duration.count() : double{}; sstring new_sstables_msg; on_end_of_compaction(); @@ -610,10 +628,10 @@ private: // - add support to merge summary (message: Partition merge counts were {%s}.). // - there is no easy way, currently, to know the exact number of total partitions. // By the time being, using estimated key count. - sstring formatted_msg = sprint("%ld sstables to [%s]. %ld bytes to %ld (~%d%% of original) in %dms = %.2fMB/s. " \ - "~%ld total partitions merged to %ld.", - _info->sstables, new_sstables_msg, _info->start_size, _info->end_size, int(ratio * 100), - std::chrono::duration_cast(duration).count(), throughput, + sstring formatted_msg = fmt::format("{} sstables to [{}]. {} to {} (~{} of original) in {}ms = {}. " \ + "~{} total partitions merged to {}.", + _info->sstables, new_sstables_msg, pretty_printed_data_size(_info->start_size), pretty_printed_data_size(_info->end_size), int(ratio * 100), + std::chrono::duration_cast(duration).count(), pretty_printed_throughput(_info->end_size, duration), _info->total_partitions, _info->total_keys_written); report_finish(formatted_msg, ended_at); diff --git a/sstables/compaction.hh b/sstables/compaction.hh index 187a360aa4..ef9bfba29a 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -35,6 +35,20 @@ class flat_mutation_reader; namespace sstables { + class pretty_printed_data_size { + uint64_t _size; + public: + pretty_printed_data_size(uint64_t size) : _size(size) {} + friend std::ostream& operator<<(std::ostream&, pretty_printed_data_size); + }; + + class pretty_printed_throughput { + uint64_t _size; + std::chrono::duration _duration; + public: + pretty_printed_throughput(uint64_t size, std::chrono::duration dur) : _size(size), _duration(std::move(dur)) {} + friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput); + }; struct resharding_descriptor { std::vector sstables; From 0467bd0a942559669b675e2507261b30b3abe60c Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 16 Jun 2020 07:44:16 -0400 Subject: [PATCH 11/15] compaction: add a new compaction type, Reshape From the point of view of selecting SSTables and its expected output, Reshaping really is just a normal compaction. However, there are some key differences that we would like to uphold: - Reshaping is done separately from the main SSTable set. It can be done with the node offline, or it can be done in a separate priority class. Either way, we don't want those SSTables to count towards backlog. For reads, because the SSTables are not yet registered in the backlog tracker (if offline or coming from upload), if we were to deduct compaction charges from it we would go negative. For writes, we don't want to deal with backlog management here because we will add the SSTable at once when reshaping is finished. - We don't need to do early replacements. - We would like to clearly mark the Reshaping compactions as such in the logs For the reasons above, it is nicer to add a new Reshape compaction type, a subclass of compaction, that upholds such properties. Signed-off-by: Glauber Costa --- sstables/compaction.cc | 59 ++++++++++++++++++++++++++++++- sstables/compaction.hh | 2 ++ sstables/compaction_descriptor.hh | 10 ++++-- 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 5d2ba7cf92..94487c8173 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -766,6 +766,53 @@ void garbage_collected_sstable_writer::data::finish_sstable_writer() { } } +class reshape_compaction : public compaction { +public: + reshape_compaction(column_family& cf, compaction_descriptor descriptor) + : compaction(cf, std::move(descriptor)) { + _info->run_identifier = _run_identifier; + _info->type = compaction_type::Reshape; + } + + flat_mutation_reader make_sstable_reader() const override { + return ::make_local_shard_sstable_reader(_schema, + _permit, + _compacting, + query::full_partition_range, + _schema->full_slice(), + _io_priority, + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + default_read_monitor_generator()); + } + + void report_start(const sstring& formatted_msg) const override { + clogger.info("Reshaping {}", formatted_msg); + } + + void report_finish(const sstring& formatted_msg, std::chrono::time_point ended_at) const override { + clogger.info("Reshaped {}", formatted_msg); + } + + virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override { + auto sst = _sstable_creator(this_shard_id()); + setup_new_sstable(sst); + + sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer(); + cfg.max_sstable_size = _max_sstable_size; + cfg.monitor = &default_write_monitor(); + cfg.run_identifier = _run_identifier; + return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats(), _io_priority), sst}; + } + + virtual void stop_sstable_writer(compaction_writer* writer) override { + if (writer) { + finish_new_sstable(writer); + } + } +}; + class regular_compaction : public compaction { // sstable being currently written. mutable compaction_read_monitor_generator _monitor_generator; @@ -1335,7 +1382,14 @@ future compaction::run(std::unique_ptr c, GCConsume compaction_type compaction_options::type() const { // Maps options_variant indexes to the corresponding compaction_type member. - static const compaction_type index_to_type[] = {compaction_type::Compaction, compaction_type::Cleanup, compaction_type::Upgrade, compaction_type::Scrub, compaction_type::Reshard}; + static const compaction_type index_to_type[] = { + compaction_type::Compaction, + compaction_type::Cleanup, + compaction_type::Upgrade, + compaction_type::Scrub, + compaction_type::Reshard, + compaction_type::Reshape, + }; return index_to_type[_options.index()]; } @@ -1344,6 +1398,9 @@ static std::unique_ptr make_compaction(column_family& cf, sstables:: column_family& cf; sstables::compaction_descriptor&& descriptor; + std::unique_ptr operator()(compaction_options::reshape) { + return std::make_unique(cf, std::move(descriptor)); + } std::unique_ptr operator()(compaction_options::reshard) { return std::make_unique(cf, std::move(descriptor)); } diff --git a/sstables/compaction.hh b/sstables/compaction.hh index ef9bfba29a..bbc3531363 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -73,6 +73,8 @@ namespace sstables { return "RESHARD"; case compaction_type::Upgrade: return "UPGRADE"; + case compaction_type::Reshape: + return "RESHAPE"; default: throw std::runtime_error("Invalid Compaction Type"); } diff --git a/sstables/compaction_descriptor.hh b/sstables/compaction_descriptor.hh index 5e0fa7cf72..159fd49be8 100644 --- a/sstables/compaction_descriptor.hh +++ b/sstables/compaction_descriptor.hh @@ -43,6 +43,7 @@ enum class compaction_type { Index_build = 4, Reshard = 5, Upgrade = 6, + Reshape = 7, }; struct compaction_completion_desc { @@ -72,9 +73,10 @@ public: }; struct reshard { }; - + struct reshape { + }; private: - using options_variant = std::variant; + using options_variant = std::variant; private: options_variant _options; @@ -84,6 +86,10 @@ private: } public: + static compaction_options make_reshape() { + return compaction_options(reshape{}); + } + static compaction_options make_reshard() { return compaction_options(reshard{}); } From 3c254dd49d01c78d799e538dcc1db0bfcd50b6c0 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 28 May 2020 11:27:31 -0400 Subject: [PATCH 12/15] compaction_strategy: add method to reshape SSTables Some SSTable sets are considered to be off-strategy: they are in a shape that is at best not optimal and at worst adversarial to the current compaction strategy. This patch introduces the compaction strategy-specific method get_reshaping_job(). Given an SSTable set, it returns one compaction that can be done to bring the table closer to being in-strategy. The caller can then call this repeatedly until the table is fully in-strategy. As an example of how this is supposed to work, consider TWCS: some SSTables will belong to a single window -> in which case they are already in-strategy and don't need to be compacted, and others span multiple windows in which case they are considered off-strategy and have to be compacted. Signed-off-by: Glauber Costa --- compaction_strategy.hh | 15 +++++ compaction_strategy_type.hh | 1 + sstables/compaction_strategy.cc | 10 ++++ sstables/compaction_strategy_impl.hh | 2 + sstables/leveled_compaction_strategy.cc | 65 +++++++++++++++++++++ sstables/leveled_compaction_strategy.hh | 2 + sstables/size_tiered_compaction_strategy.cc | 21 +++++++ sstables/size_tiered_compaction_strategy.hh | 3 + sstables/time_window_compaction_strategy.cc | 45 ++++++++++++++ sstables/time_window_compaction_strategy.hh | 2 + 10 files changed, 166 insertions(+) diff --git a/compaction_strategy.hh b/compaction_strategy.hh index 0cb49910e1..232c5523f4 100644 --- a/compaction_strategy.hh +++ b/compaction_strategy.hh @@ -23,6 +23,7 @@ #include #include +#include #include "schema_fwd.hh" #include "sstables/shared_sstable.hh" @@ -135,6 +136,20 @@ public: // Returns whether or not interposer consumer is used by a given strategy. bool use_interposer_consumer() const; + + // Informs the caller (usually the compaction manager) about what would it take for this set of + // SSTables closer to becoming in-strategy. If this returns an empty compaction descriptor, this + // means that the sstable set is already in-strategy. + // + // The caller can specify one of two modes: strict or relaxed. In relaxed mode the tolerance for + // what is considered offstrategy is higher. It can be used, for instance, for when the system + // is restarting and previous compactions were likely in-flight. In strict mode, we are less + // tolerant to invariant breakages. + // + // The caller should also pass a maximum number of SSTables which is the maximum amount of + // SSTables that can be added into a single job. + compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode); + }; // Creates a compaction_strategy object from one of the strategies available. diff --git a/compaction_strategy_type.hh b/compaction_strategy_type.hh index ec5728dbc5..65322ff723 100644 --- a/compaction_strategy_type.hh +++ b/compaction_strategy_type.hh @@ -32,4 +32,5 @@ enum class compaction_strategy_type { time_window, }; +enum class reshape_mode { strict, relaxed }; } diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index bef0a1cc15..15652b95dc 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -481,6 +481,11 @@ reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutatio return end_consumer; } +compaction_descriptor +compaction_strategy_impl::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + return compaction_descriptor(); +} + } // namespace sstables size_tiered_backlog_tracker::inflight_component @@ -1011,6 +1016,11 @@ compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() { return _compaction_strategy_impl->get_backlog_tracker(); } +sstables::compaction_descriptor +compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, iop, mode); +} + uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) { return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate); } diff --git a/sstables/compaction_strategy_impl.hh b/sstables/compaction_strategy_impl.hh index db6362ea79..4b93b825fc 100644 --- a/sstables/compaction_strategy_impl.hh +++ b/sstables/compaction_strategy_impl.hh @@ -103,5 +103,7 @@ public: virtual bool use_interposer_consumer() const { return false; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode); }; } diff --git a/sstables/leveled_compaction_strategy.cc b/sstables/leveled_compaction_strategy.cc index 95769184bb..a3ae5a3562 100644 --- a/sstables/leveled_compaction_strategy.cc +++ b/sstables/leveled_compaction_strategy.cc @@ -171,4 +171,69 @@ int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family return manifest.get_estimated_tasks(); } +compaction_descriptor +leveled_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + std::array, leveled_manifest::MAX_LEVELS> level_info; + + auto is_disjoint = [this, schema] (const std::vector& sstables, unsigned tolerance) { + unsigned disjoint_sstables = 0; + auto prev_last = dht::ring_position::min(); + for (auto& sst : sstables) { + if (dht::ring_position(sst->get_first_decorated_key()).less_compare(*schema, prev_last)) { + disjoint_sstables++; + } + prev_last = dht::ring_position(sst->get_last_decorated_key()); + } + return disjoint_sstables > tolerance; + }; + + for (auto& sst : input) { + auto sst_level = sst->get_sstable_level(); + if (sst_level > leveled_manifest::MAX_LEVELS) { + leveled_manifest::logger.warn("Found SSTable with level {}, higher than the maximum {}. This is unexpected, but will fix", sst_level, leveled_manifest::MAX_LEVELS); + + // This is really unexpected, so we'll just compact it all to fix it + compaction_descriptor desc(std::move(input), std::optional(), iop, leveled_manifest::MAX_LEVELS - 1, _max_sstable_size_in_mb * 1024 * 1024); + desc.options = compaction_options::make_reshape(); + return desc; + } + level_info[sst_level].push_back(sst); + } + + for (auto& level : level_info) { + std::sort(level.begin(), level.end(), [this, schema] (shared_sstable a, shared_sstable b) { + return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key())); + }); + } + + unsigned max_filled_level = 0; + + size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4); + size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold)); + unsigned tolerance = mode == reshape_mode::strict ? 0 : leveled_manifest::leveled_fan_out * 2; + + if (level_info[0].size() > offstrategy_threshold) { + level_info[0].resize(std::min(level_info[0].size(), max_sstables)); + compaction_descriptor desc(std::move(level_info[0]), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + return desc; + } + + for (unsigned level = 1; level < leveled_manifest::MAX_LEVELS; ++level) { + if (level_info[level].empty()) { + continue; + } + max_filled_level = std::max(max_filled_level, level); + + if (!is_disjoint(level_info[level], tolerance)) { + // Unfortunately no good limit to limit input size to max_sstables for LCS major + compaction_descriptor desc(std::move(input), std::optional(), iop, max_filled_level, _max_sstable_size_in_mb * 1024 * 1024); + desc.options = compaction_options::make_reshape(); + return desc; + } + } + + return compaction_descriptor(); +} + } diff --git a/sstables/leveled_compaction_strategy.hh b/sstables/leveled_compaction_strategy.hh index 53dcd47df5..273a6a6615 100644 --- a/sstables/leveled_compaction_strategy.hh +++ b/sstables/leveled_compaction_strategy.hh @@ -63,6 +63,8 @@ public: virtual compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override; }; } diff --git a/sstables/size_tiered_compaction_strategy.cc b/sstables/size_tiered_compaction_strategy.cc index 62e82b58b3..190b9d2f3f 100644 --- a/sstables/size_tiered_compaction_strategy.cc +++ b/sstables/size_tiered_compaction_strategy.cc @@ -208,4 +208,25 @@ size_tiered_compaction_strategy::most_interesting_bucket(const std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) +{ + size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4); + size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold)); + + if (mode == reshape_mode::relaxed) { + offstrategy_threshold = max_sstables; + } + + for (auto& bucket : get_buckets(input)) { + if (bucket.size() >= offstrategy_threshold) { + bucket.resize(std::min(max_sstables, bucket.size())); + compaction_descriptor desc(std::move(bucket), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + } + } + + return compaction_descriptor(); +} + } diff --git a/sstables/size_tiered_compaction_strategy.hh b/sstables/size_tiered_compaction_strategy.hh index e6f23ae83b..5ca079e75f 100644 --- a/sstables/size_tiered_compaction_strategy.hh +++ b/sstables/size_tiered_compaction_strategy.hh @@ -168,6 +168,9 @@ public: virtual compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override; + }; } diff --git a/sstables/time_window_compaction_strategy.cc b/sstables/time_window_compaction_strategy.cc index b224d38e1f..d7b8b4ee56 100644 --- a/sstables/time_window_compaction_strategy.cc +++ b/sstables/time_window_compaction_strategy.cc @@ -80,4 +80,49 @@ reader_consumer time_window_compaction_strategy::make_interposer_consumer(const }; } +compaction_descriptor +time_window_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) { + std::vector single_window; + std::vector multi_window; + + size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4); + size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold)); + + if (mode == reshape_mode::relaxed) { + offstrategy_threshold = max_sstables; + } + + for (auto& sst : input) { + auto min = sst->get_stats_metadata().min_timestamp; + auto max = sst->get_stats_metadata().max_timestamp; + if (get_window_for(_options, min) != get_window_for(_options, max)) { + multi_window.push_back(sst); + } else { + single_window.push_back(sst); + } + } + + if (!multi_window.empty()) { + // Everything that spans multiple windows will need reshaping + multi_window.resize(std::min(multi_window.size(), max_sstables)); + compaction_descriptor desc(std::move(multi_window), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + return desc; + } + + // For things that don't span multiple windows, we compact windows that are individually too big + auto all_buckets = get_buckets(single_window, _options); + for (auto& pair : all_buckets.first) { + auto ssts = std::move(pair.second); + if (ssts.size() > offstrategy_threshold) { + ssts.resize(std::min(multi_window.size(), max_sstables)); + compaction_descriptor desc(std::move(ssts), std::optional(), iop); + desc.options = compaction_options::make_reshape(); + return desc; + } + } + + return compaction_descriptor(); +} + } diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh index d07e44dee2..31d2852212 100644 --- a/sstables/time_window_compaction_strategy.hh +++ b/sstables/time_window_compaction_strategy.hh @@ -351,6 +351,8 @@ public: virtual bool use_interposer_consumer() const override { return true; } + + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override; }; } From 4d6aacb26515ec9bc8be83d9b4f702d153afc3a7 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 10 Jun 2020 17:42:24 -0400 Subject: [PATCH 13/15] sstable_directory: add helper to reshape existing unshared sstables Before moving SSTables to the main directory, we may need to reshape them into in-strategy. This patch provides helper code that reshapes the SSTables that are known to be unshared local in the sstable directory, and updates the sstable directory with the result. Rehaping can be made more or less aggressive by passing a reshape mode (relaxed or strict), which will influence the amount of SSTables reshape can tolerate to consider a particular slice of the SSTable set offstrategy. Because the compaction expects an std::vector everywhere, we changed our chunked vector for the unshared sstables to a std::vector so we can more easily pass it around without conversions. Signed-off-by: Glauber Costa --- distributed_loader.cc | 19 +++++++++ distributed_loader.hh | 2 + sstables/sstable_directory.cc | 75 +++++++++++++++++++++++++++++++++++ sstables/sstable_directory.hh | 11 ++++- 4 files changed, 106 insertions(+), 1 deletion(-) diff --git a/distributed_loader.cc b/distributed_loader.cc index acc8b2a9db..d17d1aec8c 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -396,6 +396,25 @@ highest_version_seen(sharded& dir, sstables::sstabl }); } +future<> +distributed_loader::reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, + sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) { + + auto start = std::chrono::steady_clock::now(); + return dir.map_reduce0([&dir, &db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode] (sstables::sstable_directory& d) { + auto& table = db.local().find_column_family(ks_name, table_name); + auto& cm = table.get_compaction_manager(); + auto& iop = service::get_local_streaming_read_priority(); + return d.reshape(cm, table, creator, iop, mode); + }, uint64_t(0), std::plus()).then([start] (uint64_t total_size) { + if (total_size > 0) { + auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); + dblog.info("{}", fmt::format("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration))); + } + return make_ready_future<>(); + }); +} + future<> distributed_loader::process_upload_dir(distributed& db, sstring ks, sstring cf) { seastar::thread_attributes attr; diff --git a/distributed_loader.hh b/distributed_loader.hh index c292995364..9a98c34053 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -58,6 +58,8 @@ class migration_manager; class distributed_loader { public: + static future<> reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, + sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator); static future<> process_sstable_dir(sharded& dir); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index ccded9fdd9..f2ab34f9e9 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -26,6 +26,7 @@ #include "log.hh" #include "sstable_directory.hh" #include "lister.hh" +#include "database.hh" static logging::logger dirlog("sstable_directory"); @@ -278,6 +279,80 @@ sstable_directory::collect_output_sstables_from_resharding(std::vector +sstable_directory::remove_input_sstables_from_reshaping(std::vector sstlist) { + // When removing input sstables from reshaping: Those SSTables used to be in the unshared local + // list. So not only do we have to remove them, we also have to update the list. Because we're + // dealing with a vector it's easier to just reconstruct the list. + dirlog.debug("Removing {} reshaped SSTables", sstlist.size()); + return do_with(std::move(sstlist), std::unordered_set(), + [this] (std::vector& sstlist, std::unordered_set& exclude) { + return parallel_for_each(sstlist, [this, &exclude] (sstables::shared_sstable sst) { + exclude.insert(sst); + return make_ready_future<>(); + }).then([this, &exclude] { + return parallel_for_each(std::exchange(_unshared_local_sstables, {}), [this, &exclude] (sstables::shared_sstable sst) { + if (!exclude.count(sst)) { + _unshared_local_sstables.push_back(sst); + } + return make_ready_future<>(); + }); + }).then([this, &exclude] {; + // Do this last for exception safety. If there is an exception on unlink we + // want to at least leave the SSTable unshared list in a sane state. + return parallel_for_each(exclude, [] (sstables::shared_sstable sst) { + return sst->unlink(); + }); + }); + }); +} + + +future<> +sstable_directory::collect_output_sstables_from_reshaping(std::vector reshaped_sstables) { + dirlog.debug("Collecting {} reshaped SSTables", reshaped_sstables.size()); + return parallel_for_each(std::move(reshaped_sstables), [this] (sstables::shared_sstable sst) { + _unshared_local_sstables.push_back(std::move(sst)); + return make_ready_future<>(); + }); +} + +future sstable_directory::reshape(compaction_manager& cm, table& table, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop, sstables::reshape_mode mode) +{ + return do_with(uint64_t(0), [this, &cm, &table, creator = std::move(creator), iop, mode] (uint64_t & reshaped_size) mutable { + return repeat([this, &cm, &table, creator = std::move(creator), iop, &reshaped_size, mode] () mutable { + auto desc = table.get_compaction_strategy().get_reshaping_job(_unshared_local_sstables, table.schema(), iop, mode); + if (desc.sstables.empty()) { + return make_ready_future(stop_iteration::yes); + } + + if (!reshaped_size) { + dirlog.info("Found SSTables that need reshape. Starting reshape process"); + } + + std::vector sstlist; + for (auto& sst : desc.sstables) { + reshaped_size += sst->data_size(); + sstlist.push_back(sst); + } + + desc.creator = creator; + + return cm.run_custom_job(&table, "reshape", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] () mutable { + return sstables::compact_sstables(std::move(desc), table).then([this, sstlist = std::move(sstlist)] (sstables::compaction_info result) mutable { + return remove_input_sstables_from_reshaping(std::move(sstlist)).then([this, new_sstables = std::move(result.new_sstables)] () mutable { + return collect_output_sstables_from_reshaping(std::move(new_sstables)); + }); + }); + }).then([] { + return make_ready_future(stop_iteration::no); + }); + }).then([&reshaped_size] { + return make_ready_future(reshaped_size); + }); + }); +} + future<> sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& cm, table& table, unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop) diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 829d155364..a2f33d7493 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -101,7 +101,7 @@ private: // SSTables that are unshared and belong to this shard. They are already stored as an // SSTable object. - utils::chunked_vector _unshared_local_sstables; + std::vector _unshared_local_sstables; // SSTables that are unshared and belong to foreign shards. Because they are more conveniently // stored as a foreign_sstable_open_info object, they are in a different attribute separate from the @@ -122,6 +122,9 @@ private: future<> remove_input_sstables_from_resharding(const std::vector& sstlist); future<> collect_output_sstables_from_resharding(std::vector resharded_sstables); + future<> remove_input_sstables_from_reshaping(std::vector sstlist); + future<> collect_output_sstables_from_reshaping(std::vector reshaped_sstables); + template future<> parallel_for_each_restricted(Container&& C, Func&& func); future<> load_foreign_sstables(sstable_info_vector info_vec); @@ -174,6 +177,12 @@ public: unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop); + // reshapes a collection of SSTables, and returns the total amount of bytes reshaped. + future reshape(compaction_manager& cm, table& table, + sstables::compaction_sstable_creator_fn creator, + const ::io_priority_class& iop, + sstables::reshape_mode mode); + // Store a phased operation. Usually used to keep an object alive while the directory is being // processed. One example is preventing table drops concurrent to the processing of this // directory. From b34c0c2ff6918d0265393c461b1032c8f9b934ed Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 28 May 2020 19:04:13 -0400 Subject: [PATCH 14/15] distributed_loader: rework uploading of SSTables Uploading of SSTables is problematic: for historical reasons it takes a lock that may have to wait for ongoing compactions to finish, then it disables writes in the table, and then it goes loading SSTables as if it knew nothing about them. With the sstable_directory infrastructure we can do much better: * we can reshard and reshape the SSTables in place, keeping the number of SSTables in check. Because this is an background process we can be fairly aggressive and set the reshape mode to strict. * we can then move the SSTables directly into the main directory. Because we know they are few in number we can call the more elegant add_sstable_and_invalidate_cache instead of the open coding currently done by load_new_sstables * we know they are not shared (if they were, we resharded them), simplifying the load process even further. The major changes after this patch is applied is that all compactions (resharding and reshape) needed to make the SSTables in-strategy are done in the streaming class, which reduces the impact of this operation on the node. When the SSTables are loaded, subsequent reads will not suffer as we will not be adding shared SSTables in potential high numbers, nor will we reshard in the compaction class. There is also no more need for a lock in the upload process so in the fast path where users are uploading a set of SSTables from a backup this should essentially be instantaneous. The lock, as well as the code to disable and enable table writes is removed. A future improvement is to bypass the staging directory too, in which case the reshaping compaction would already generate the view updates. Signed-off-by: Glauber Costa --- database.hh | 14 -- distributed_loader.cc | 192 +++++++++++---------------- distributed_loader.hh | 11 +- service/storage_service.cc | 108 +--------------- sstables/sstable_directory.cc | 33 ++--- table.cc | 237 ++++++++++++++-------------------- 6 files changed, 200 insertions(+), 395 deletions(-) diff --git a/database.hh b/database.hh index db560a8ae6..eabe780525 100644 --- a/database.hh +++ b/database.hh @@ -489,11 +489,7 @@ private: // This semaphore ensures that an operation like snapshot won't have its selected // sstables deleted by compaction in parallel, a race condition which could // easily result in failure. - // Locking order: must be acquired either independently or after _sstables_lock seastar::named_semaphore _sstable_deletion_sem = {1, named_semaphore_exception_factory{"sstable deletion"}}; - // There are situations in which we need to stop writing sstables. Flushers will take - // the read lock, and the ones that wish to stop that process will take the write lock. - rwlock _sstables_lock; mutable row_cache _cache; // Cache covers only sstables. std::optional _sstable_generation = {}; @@ -820,16 +816,6 @@ public: future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); - // Important warning: disabling writes will only have an effect in the current shard. - // The other shards will keep writing tables at will. Therefore, you very likely need - // to call this separately in all shards first, to guarantee that none of them are writing - // new data before you can safely assume that the whole node is disabled. - future disable_sstable_write(); - - // SSTable writes are now allowed again, and generation is updated to new_generation if != -1 - // returns the amount of microseconds elapsed since we disabled writes. - std::chrono::steady_clock::duration enable_sstable_write(int64_t new_generation); - // Make sure the generation numbers are sequential, starting from "start". // Generations before "start" are left untouched. // diff --git a/distributed_loader.cc b/distributed_loader.cc index d17d1aec8c..9b94bdc5ff 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -415,12 +415,65 @@ distributed_loader::reshape(sharded& dir, sharded +distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, + sharded& view_update_generator, fs::path datadir, sstring ks, sstring cf) { + + auto& table = db.local().find_column_family(ks, cf); + + return do_with(dht::ring_position::max(), dht::ring_position::min(), [&table, &dir, &view_update_generator, datadir = std::move(datadir)] (dht::ring_position& min, dht::ring_position& max) { + return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &min, &max] (sstables::shared_sstable sst) { + min = std::min(dht::ring_position(sst->get_first_decorated_key()), min, dht::ring_position_less_comparator(*table.schema())); + max = std::max(dht::ring_position(sst->get_last_decorated_key()) , max, dht::ring_position_less_comparator(*table.schema())); + + auto gen = table.calculate_generation_for_new_table(); + dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen); + return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, sst] { + table._sstables_opened_but_not_loaded.push_back(std::move(sst)); + return make_ready_future<>(); + }); + }).then([&table, &min, &max] { + // nothing loaded + if (min.is_max() && max.is_min()) { + return make_ready_future<>(); + } + + return table.get_row_cache().invalidate([&table] () noexcept { + for (auto& sst : table._sstables_opened_but_not_loaded) { + try { + table.load_sstable(sst, true); + } catch (...) { + dblog.error("Failed to load {}: {}. Aborting.", sst->toc_filename(), std::current_exception()); + abort(); + } + } + }, dht::partition_range::make({min, true}, {max, true})); + }).then([&view_update_generator, &table] { + return parallel_for_each(table._sstables_opened_but_not_loaded, [&view_update_generator, &table] (sstables::shared_sstable& sst) { + if (sst->requires_view_building()) { + return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this()); + } + return make_ready_future<>(); + }); + }).then_wrapped([&table] (future<> f) { + auto opened = std::exchange(table._sstables_opened_but_not_loaded, {}); + if (!f.failed()) { + return make_ready_future(opened.size()); + } else { + return make_exception_future(f.get_exception()); + } + }); + }); +} + future<> -distributed_loader::process_upload_dir(distributed& db, sstring ks, sstring cf) { +distributed_loader::process_upload_dir(distributed& db, distributed& sys_dist_ks, + distributed& view_update_generator, sstring ks, sstring cf) { seastar::thread_attributes attr; attr.sched_group = db.local().get_streaming_scheduling_group(); - return seastar::async(std::move(attr), [&db, ks = std::move(ks), cf = std::move(cf)] { + return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] { global_column_family_ptr global_table(db, ks, cf); sharded directory; @@ -460,77 +513,28 @@ distributed_loader::process_upload_dir(distributed& db, sstring ks, ss global_table->get_sstables_manager().get_highest_supported_format(), sstables::sstable::format_types::big, &error_handler_gen_for_upload_dir); }).get(); - }); -} -// This function will iterate through upload directory in column family, -// and will do the following for each sstable found: -// 1) Mutate sstable level to 0. -// 2) Check if view updates need to be generated from this sstable. If so, leave it intact for now. -// 3) Otherwise, create hard links to its components in column family dir. -// 4) Remove all of its components in upload directory. -// At the end, it's expected that upload dir contains only staging sstables -// which need to wait until view updates are generated from them. -// -// Return a vector containing descriptor of sstables to be loaded. -future> -distributed_loader::flush_upload_dir(distributed& db, distributed& sys_dist_ks, sstring ks_name, sstring cf_name) { - return seastar::async([&db, &sys_dist_ks, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] { - std::unordered_map descriptors; - std::vector flushed; + reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [global_table, upload, &shard_gen] (shard_id shard) { + auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed); + return global_table->make_sstable(upload.native(), gen, + global_table->get_sstables_manager().get_highest_supported_format(), + sstables::sstable::format_types::big, + &error_handler_gen_for_upload_dir); + }).get(); - auto& cf = db.local().find_column_family(ks_name, cf_name); - auto upload_dir = fs::path(cf._config.datadir) / "upload"; - verify_owner_and_mode(upload_dir).get(); - lister::scan_dir(upload_dir, { directory_entry_type::regular }, [&descriptors] (fs::path parent_dir, directory_entry de) { - auto comps = sstables::entry_descriptor::make_descriptor(parent_dir.native(), de.name); - if (comps.component != component_type::TOC) { - return make_ready_future<>(); - } - descriptors.emplace(comps.generation, std::move(comps)); - return make_ready_future<>(); - }, &sstables::manifest_json_filter).get(); + const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0(); - flushed.reserve(descriptors.size()); - for (auto& [generation, comps] : descriptors) { - auto descriptors = db.invoke_on(column_family::calculate_shard_from_sstable_generation(generation), [&sys_dist_ks, ks_name, cf_name, comps] (database& db) { - return seastar::async([&db, &sys_dist_ks, ks_name = std::move(ks_name), cf_name = std::move(cf_name), comps = std::move(comps)] () mutable { - auto& cf = db.find_column_family(ks_name, cf_name); - auto sst = cf.make_sstable(cf._config.datadir + "/upload", comps.generation, comps.version, comps.format, &error_handler_gen_for_upload_dir); - auto gen = cf.calculate_generation_for_new_table(); - - sst->read_toc().get(); - schema_ptr s = cf.schema(); - if (s->is_counter() && !sst->has_scylla_component()) { - sstring error = "Direct loading non-Scylla SSTables containing counters is not supported."; - if (db.get_config().enable_dangerous_direct_import_of_cassandra_counters()) { - dblog.info("{} But trying to continue on user's request.", error); - } else { - dblog.error("{} Use sstableloader instead.", error); - throw std::runtime_error(fmt::format("{} Use sstableloader instead.", error)); - } - } - if (s->is_view()) { - throw std::runtime_error("Loading Materialized View SSTables is not supported. Re-create the view instead."); - } - sst->mutate_sstable_level(0).get(); - const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), cf, streaming::stream_reason::repair).get0(); - sstring datadir = cf._config.datadir; - if (use_view_update_path) { - // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. - datadir += "/staging"; - } - sst->create_links(datadir, gen).get(); - sstables::remove_by_toc_name(sst->toc_filename(), error_handler_for_upload_dir()).get(); - comps.generation = gen; - comps.sstdir = std::move(datadir); - return std::move(comps); - }); - }).get0(); - - flushed.push_back(std::move(descriptors)); + auto datadir = upload.parent_path(); + if (use_view_update_path) { + // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. + datadir /= "staging"; } - return std::vector(std::move(flushed)); + + size_t loaded = directory.map_reduce0([&db, ks, cf, datadir, &view_update_generator] (sstables::sstable_directory& dir) { + return make_sstables_available(dir, db, view_update_generator, datadir, ks, cf); + }, size_t(0), std::plus()).get0(); + + dblog.info("Loaded {} SSTables into {}", loaded, datadir.native()); }); } @@ -731,56 +735,6 @@ void distributed_loader::reshard(distributed& db, sstring ks_name, sst }); } -future<> distributed_loader::load_new_sstables(distributed& db, distributed& view_update_generator, - sstring ks, sstring cf, std::vector new_tables) { - return parallel_for_each(new_tables, [&] (auto comps) { - auto cf_sstable_open = [comps] (column_family& cf, sstables::foreign_sstable_open_info info) { - auto f = cf.open_sstable(std::move(info), comps.sstdir, comps.generation, comps.version, comps.format); - return f.then([&cf] (sstables::shared_sstable sst) mutable { - if (sst) { - cf._sstables_opened_but_not_loaded.push_back(sst); - } - return make_ready_future<>(); - }); - }; - return distributed_loader::open_sstable(db, comps, cf_sstable_open, service::get_local_compaction_priority()) - .handle_exception([comps, ks, cf] (std::exception_ptr ep) { - auto name = sstables::sstable::filename(comps.sstdir, ks, cf, comps.version, comps.generation, comps.format, sstables::component_type::TOC); - dblog.error("Failed to open {}: {}", name, ep); - return make_exception_future<>(ep); - }); - }).then([&db, &view_update_generator, ks, cf] { - return db.invoke_on_all([&view_update_generator, ks = std::move(ks), cfname = std::move(cf)] (database& db) { - auto& cf = db.find_column_family(ks, cfname); - return cf.get_row_cache().invalidate([&view_update_generator, &cf] () noexcept { - // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. - // atomically load all opened sstables into column family. - for (auto& sst : cf._sstables_opened_but_not_loaded) { - try { - cf.load_sstable(sst, true); - } catch(...) { - dblog.error("Failed to load {}: {}. Aborting.", sst->toc_filename(), std::current_exception()); - abort(); - } - if (sst->requires_view_building()) { - // FIXME: discarded future. - (void)view_update_generator.local().register_staging_sstable(sst, cf.shared_from_this()); - } - } - cf._sstables_opened_but_not_loaded.clear(); - cf.trigger_compaction(); - }); - }); - }).handle_exception([&db, ks, cf] (std::exception_ptr ep) { - return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) { - auto& cf = db.find_column_family(ks, cfname); - cf._sstables_opened_but_not_loaded.clear(); - }).then([ep] { - return make_exception_future<>(ep); - }); - }); -} - future distributed_loader::probe_file(distributed& db, sstring sstdir, sstring fname) { using namespace sstables; diff --git a/distributed_loader.hh b/distributed_loader.hh index 9a98c34053..76f8f28626 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -28,6 +28,7 @@ #include #include #include +#include #include "seastarx.hh" #include "sstables/compaction_descriptor.hh" @@ -69,10 +70,12 @@ public: std::function (column_family&, sstables::foreign_sstable_open_info)> func, const io_priority_class& pc = default_priority_class()); static future<> verify_owner_and_mode(std::filesystem::path path); - static future<> load_new_sstables(distributed& db, distributed& view_update_generator, - sstring ks, sstring cf, std::vector new_tables); - static future> flush_upload_dir(distributed& db, distributed& sys_dist_ks, sstring ks_name, sstring cf_name); - static future<> process_upload_dir(distributed& db, sstring ks_name, sstring cf_name); + + static future make_sstables_available(sstables::sstable_directory& dir, + sharded& db, sharded& view_update_generator, + std::filesystem::path datadir, sstring ks, sstring cf); + static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, + distributed& view_update_generator, sstring ks_name, sstring cf_name); static future probe_file(distributed& db, sstring sstdir, sstring fname); static future<> populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); diff --git a/service/storage_service.cc b/service/storage_service.cc index b200762194..8138238202 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2789,18 +2789,6 @@ void storage_service::add_expire_time_if_found(inet_address endpoint, int64_t ex // All the global operations are going to happen here, and just the reloading happens // in there. future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { - class max_element { - int64_t _result = 0; - public: - future<> operator()(int64_t value) { - _result = std::max(value, _result); - return make_ready_future<>(); - } - int64_t get() && { - return _result; - } - }; - if (_loading_new_sstables) { throw std::runtime_error("Already loading SSTables. Try again later"); } else { @@ -2809,100 +2797,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { slogger.info("Loading new SSTables for {}.{}...", ks_name, cf_name); - return distributed_loader::process_upload_dir(_db, ks_name, cf_name).then([this, ks_name, cf_name] { - // First, we need to stop SSTable creation for that CF in all shards. This is a really horrible - // thing to do, because under normal circumnstances this can make dirty memory go up to the point - // of explosion. - // - // Remember, however, that we are assuming this is going to be ran on an empty CF. In that scenario, - // stopping the SSTables should have no effect, while guaranteeing we will see no data corruption - // * in case * this is ran on a live CF. - // - // The statement above is valid at least from the Scylla side of things: it is still totally possible - // that someones just copies the table over existing ones. There isn't much we can do about it. - return _db.map_reduce(max_element(), [ks_name, cf_name] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.disable_sstable_write(); - }).then([this, cf_name, ks_name] (int64_t max_seen_sstable) { - // Then, we will reshuffle the tables to make sure that the generation numbers don't go too high. - // We will do all of it the same CPU, to make sure that we won't have two parallel shufflers stepping - // onto each other. - - class all_generations { - std::set _result; - public: - future<> operator()(std::set value) { - _result.insert(value.begin(), value.end()); - return make_ready_future<>(); - } - std::set get() && { - return _result; - } - }; - - // We provide to reshuffle_sstables() the generation of all existing sstables, such that it will - // easily know which sstables are new. - return _db.map_reduce(all_generations(), [ks_name, cf_name] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - std::set generations; - for (auto& p : *(cf.get_sstables())) { - generations.insert(p->generation()); - } - return make_ready_future>(std::move(generations)); - }).then([this, max_seen_sstable, ks_name, cf_name] (std::set all_generations) { - auto shard = std::hash()(cf_name) % smp::count; - return _db.invoke_on(shard, [ks_name, cf_name, max_seen_sstable, all_generations = std::move(all_generations)] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - return cf.reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1); - }); - }); - }).then_wrapped([this, ks_name, cf_name] (future> f) { - std::vector new_tables; - std::exception_ptr eptr; - int64_t new_gen = -1; - - try { - new_tables = f.get0(); - } catch(std::exception& e) { - slogger.error("Loading of new tables failed to {}.{} due to {}", ks_name, cf_name, e.what()); - eptr = std::current_exception(); - } catch(...) { - slogger.error("Loading of new tables failed to {}.{} due to unexpected reason", ks_name, cf_name); - eptr = std::current_exception(); - } - - if (new_tables.size() > 0) { - new_gen = new_tables.back().generation; - } - - slogger.debug("Now accepting writes for sstables with generation larger or equal than {}", new_gen); - return _db.invoke_on_all([ks_name, cf_name, new_gen] (database& db) { - auto& cf = db.find_column_family(ks_name, cf_name); - auto disabled = std::chrono::duration_cast(cf.enable_sstable_write(new_gen)).count(); - slogger.info("CF {}.{} at shard {} had SSTables writes disabled for {} usec", ks_name, cf_name, this_shard_id(), disabled); - return make_ready_future<>(); - }).then([new_tables = std::move(new_tables), eptr = std::move(eptr)] { - if (eptr) { - return make_exception_future>(eptr); - } - return make_ready_future>(std::move(new_tables)); - }); - }).then([this, ks_name, cf_name] (std::vector new_tables) { - auto f = distributed_loader::flush_upload_dir(_db, _sys_dist_ks, ks_name, cf_name); - return f.then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector new_tables_from_upload) mutable { - if (new_tables.empty() && new_tables_from_upload.empty()) { - slogger.info("No new SSTables were found for {}.{}", ks_name, cf_name); - } - // merge new sstables found in both column family and upload directories, if any. - new_tables.insert(new_tables.end(), new_tables_from_upload.begin(), new_tables_from_upload.end()); - return make_ready_future>(std::move(new_tables)); - }); - }).then([this, ks_name, cf_name] (std::vector new_tables) { - return distributed_loader::load_new_sstables(_db, _view_update_generator, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] { - slogger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name); - }); - }); - }).finally([this] { + return distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name).finally([this, ks_name, cf_name] { + slogger.info("Done loading new SSTables for {}.{}", ks_name, cf_name); _loading_new_sstables = false; }); } diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index f2ab34f9e9..f7598e17bc 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -287,22 +287,25 @@ sstable_directory::remove_input_sstables_from_reshaping(std::vector(), [this] (std::vector& sstlist, std::unordered_set& exclude) { - return parallel_for_each(sstlist, [this, &exclude] (sstables::shared_sstable sst) { + + for (auto& sst : sstlist) { exclude.insert(sst); - return make_ready_future<>(); - }).then([this, &exclude] { - return parallel_for_each(std::exchange(_unshared_local_sstables, {}), [this, &exclude] (sstables::shared_sstable sst) { - if (!exclude.count(sst)) { - _unshared_local_sstables.push_back(sst); - } - return make_ready_future<>(); - }); - }).then([this, &exclude] {; - // Do this last for exception safety. If there is an exception on unlink we - // want to at least leave the SSTable unshared list in a sane state. - return parallel_for_each(exclude, [] (sstables::shared_sstable sst) { - return sst->unlink(); - }); + } + + auto old = std::exchange(_unshared_local_sstables, {}); + + for (auto& sst : old) { + if (!exclude.count(sst)) { + _unshared_local_sstables.push_back(sst); + } + } + + // Do this last for exception safety. If there is an exception on unlink we + // want to at least leave the SSTable unshared list in a sane state. + return parallel_for_each(std::move(sstlist), [] (sstables::shared_sstable sst) { + return sst->unlink(); + }).then([] { + fmt::print("Finished removing all SSTables\n"); }); }); } diff --git a/table.cc b/table.cc index 933ec4c8c4..87222e72fa 100644 --- a/table.cc +++ b/table.cc @@ -825,49 +825,47 @@ table::seal_active_streaming_memtable_immediate(flush_permit&& permit) { auto guard = _streaming_flush_phaser.start(); return with_gate(_streaming_flush_gate, [this, old, permit = std::move(permit)] () mutable { - return with_lock(_sstables_lock.for_read(), [this, old, permit = std::move(permit)] () mutable { - auto newtab = make_sstable(); + auto newtab = make_sstable(); - tlogger.debug("Flushing to {}", newtab->get_filename()); + tlogger.debug("Flushing to {}", newtab->get_filename()); - // This is somewhat similar to the main memtable flush, but with important differences. - // - // The first difference, is that we don't keep aggregate collectd statistics about this one. - // If we ever need to, we'll keep them separate statistics, but we don't want to polute the - // main stats about memtables with streaming memtables. - // - // Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the - // memtable list, since this memtable was not available for reading up until this point. - auto fp = permit.release_sstable_write_permit(); - database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); - return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable { - auto&& priority = service::get_local_streaming_write_priority(); - sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); - cfg.backup = incremental_backups_enabled(); - return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] { - return newtab->open_data(); - }).then([this, old, newtab] () { - return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, newtab, old] { - auto adder = [this, newtab] { - add_sstable(newtab, {this_shard_id()}); - try_trigger_compaction(); - tlogger.debug("Flushing to {} done", newtab->get_filename()); - }; - if (cache_enabled()) { - return _cache.update_invalidating(adder, *old); - } else { - return _cache.invalidate(adder).then([old] { return old->clear_gently(); }); - } - }); - }).handle_exception([old, permit = std::move(permit), newtab] (auto ep) { - newtab->mark_for_deletion(); - tlogger.error("failed to write streamed sstable: {}", ep); - return make_exception_future<>(ep); + // This is somewhat similar to the main memtable flush, but with important differences. + // + // The first difference, is that we don't keep aggregate collectd statistics about this one. + // If we ever need to, we'll keep them separate statistics, but we don't want to polute the + // main stats about memtables with streaming memtables. + // + // Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the + // memtable list, since this memtable was not available for reading up until this point. + auto fp = permit.release_sstable_write_permit(); + database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); + return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable { + auto&& priority = service::get_local_streaming_write_priority(); + sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); + cfg.backup = incremental_backups_enabled(); + return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] { + return newtab->open_data(); + }).then([this, old, newtab] () { + return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, newtab, old] { + auto adder = [this, newtab] { + add_sstable(newtab, {this_shard_id()}); + try_trigger_compaction(); + tlogger.debug("Flushing to {} done", newtab->get_filename()); + }; + if (cache_enabled()) { + return _cache.update_invalidating(adder, *old); + } else { + return _cache.invalidate(adder).then([old] { return old->clear_gently(); }); + } }); + }).handle_exception([old, permit = std::move(permit), newtab] (auto ep) { + newtab->mark_for_deletion(); + tlogger.error("failed to write streamed sstable: {}", ep); + return make_exception_future<>(ep); }); - // We will also not have any retry logic. If we fail here, we'll fail the streaming and let - // the upper layers know. They can then apply any logic they want here. }); + // We will also not have any retry logic. If we fail here, we'll fail the streaming and let + // the upper layers know. They can then apply any logic they want here. }).finally([guard = std::move(guard)] { }); }); } @@ -882,27 +880,25 @@ future<> table::seal_active_streaming_memtable_big(streaming_memtable_big& smb, smb.memtables->erase(old); return with_gate(_streaming_flush_gate, [this, old, &smb, permit = std::move(permit)] () mutable { return with_gate(smb.flush_in_progress, [this, old, &smb, permit = std::move(permit)] () mutable { - return with_lock(_sstables_lock.for_read(), [this, old, &smb, permit = std::move(permit)] () mutable { - auto newtab = make_sstable(); + auto newtab = make_sstable(); - auto fp = permit.release_sstable_write_permit(); - auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); - auto&& priority = service::get_local_streaming_write_priority(); - sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); - cfg.backup = incremental_backups_enabled(); - cfg.leave_unsealed = true; - auto fut = write_memtable_to_sstable(*old, newtab, *monitor, cfg, priority); - return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable { - if (!f.failed()) { - smb.sstables.push_back(monitored_sstable{std::move(monitor), newtab}); - return make_ready_future<>(); - } else { - newtab->mark_for_deletion(); - auto ep = f.get_exception(); - tlogger.error("failed to write streamed sstable: {}", ep); - return make_exception_future<>(ep); - } - }); + auto fp = permit.release_sstable_write_permit(); + auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); + auto&& priority = service::get_local_streaming_write_priority(); + sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); + cfg.backup = incremental_backups_enabled(); + cfg.leave_unsealed = true; + auto fut = write_memtable_to_sstable(*old, newtab, *monitor, cfg, priority); + return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable { + if (!f.failed()) { + smb.sstables.push_back(monitored_sstable{std::move(monitor), newtab}); + return make_ready_future<>(); + } else { + newtab->mark_for_deletion(); + auto ep = f.get_exception(); + tlogger.error("failed to write streamed sstable: {}", ep); + return make_exception_future<>(ep); + } }); }); }); @@ -937,9 +933,7 @@ table::seal_active_memtable(flush_permit&& permit) { return do_with(std::move(permit), [this, old] (auto& permit) { return repeat([this, old, &permit] () mutable { auto sstable_write_permit = permit.release_sstable_write_permit(); - return with_lock(_sstables_lock.for_read(), [this, old, sstable_write_permit = std::move(sstable_write_permit)] () mutable { - return this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit)); - }).then([this, &permit] (auto should_stop) mutable { + return this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit)).then([this, &permit] (auto should_stop) mutable { if (should_stop) { return make_ready_future(should_stop); } @@ -1285,22 +1279,20 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) { return make_ready_future<>(); } - return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor)] () mutable { - descriptor.creator = [this] (shard_id dummy) { - auto sst = make_sstable(); - return sst; - }; - descriptor.replacer = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) { - _compaction_strategy.notify_completion(desc.old_sstables, desc.new_sstables); - _compaction_manager.propagate_replacement(this, desc.old_sstables, desc.new_sstables); - this->on_compaction_completion(desc); - if (release_exhausted) { - release_exhausted(desc.old_sstables); - } - }; + descriptor.creator = [this] (shard_id dummy) { + auto sst = make_sstable(); + return sst; + }; + descriptor.replacer = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) { + _compaction_strategy.notify_completion(desc.old_sstables, desc.new_sstables); + _compaction_manager.propagate_replacement(this, desc.old_sstables, desc.new_sstables); + this->on_compaction_completion(desc); + if (release_exhausted) { + release_exhausted(desc.old_sstables); + } + }; - return sstables::compact_sstables(std::move(descriptor), *this); - }).then([this] (auto info) { + return sstables::compact_sstables(std::move(descriptor), *this).then([this] (auto info) { if (info.type != sstables::compaction_type::Compaction) { return make_ready_future<>(); } @@ -1897,79 +1889,50 @@ future<> table::clear() { future table::discard_sstables(db_clock::time_point truncated_at) { assert(_compaction_disabled > 0); - return with_lock(_sstables_lock.for_read(), [this, truncated_at] { - struct pruner { - column_family& cf; - db::replay_position rp; - std::vector remove; + struct pruner { + column_family& cf; + db::replay_position rp; + std::vector remove; - pruner(column_family& cf) - : cf(cf) {} + pruner(column_family& cf) + : cf(cf) {} - void prune(db_clock::time_point truncated_at) { - auto gc_trunc = to_gc_clock(truncated_at); + void prune(db_clock::time_point truncated_at) { + auto gc_trunc = to_gc_clock(truncated_at); - auto pruned = make_lw_shared(cf._compaction_strategy.make_sstable_set(cf._schema)); + auto pruned = make_lw_shared(cf._compaction_strategy.make_sstable_set(cf._schema)); - for (auto& p : *cf._sstables->all()) { - if (p->max_data_age() <= gc_trunc) { - // Only one shard that own the sstable will submit it for deletion to avoid race condition in delete procedure. - if (*boost::min_element(p->get_shards_for_this_sstable()) == this_shard_id()) { - rp = std::max(p->get_stats_metadata().position, rp); - remove.emplace_back(p); - } - continue; + for (auto& p : *cf._sstables->all()) { + if (p->max_data_age() <= gc_trunc) { + // Only one shard that own the sstable will submit it for deletion to avoid race condition in delete procedure. + if (*boost::min_element(p->get_shards_for_this_sstable()) == this_shard_id()) { + rp = std::max(p->get_stats_metadata().position, rp); + remove.emplace_back(p); } - pruned->insert(p); + continue; } - - cf._sstables = std::move(pruned); + pruned->insert(p); } - }; - auto p = make_lw_shared(*this); - return _cache.invalidate([p, truncated_at] { - p->prune(truncated_at); - tlogger.debug("cleaning out row cache"); - }).then([this, p]() mutable { - rebuild_statistics(); - return parallel_for_each(p->remove, [this](sstables::shared_sstable s) { - remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), s); - return sstables::delete_atomically({s}); - }).then([p] { - return make_ready_future(p->rp); - }); + cf._sstables = std::move(pruned); + } + }; + auto p = make_lw_shared(*this); + return _cache.invalidate([p, truncated_at] { + p->prune(truncated_at); + tlogger.debug("cleaning out row cache"); + }).then([this, p]() mutable { + rebuild_statistics(); + + return parallel_for_each(p->remove, [this](sstables::shared_sstable s) { + remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), s); + return sstables::delete_atomically({s}); + }).then([p] { + return make_ready_future(p->rp); }); }); } -future -table::disable_sstable_write() { - _sstable_writes_disabled_at = std::chrono::steady_clock::now(); - return _sstables_lock.write_lock().then([this] { - // _sstable_deletion_sem must be acquired after _sstables_lock.write_lock - return _sstable_deletion_sem.wait().then([this] { - if (_sstables->all()->empty()) { - return make_ready_future(0); - } - int64_t max = 0; - for (auto&& s : *_sstables->all()) { - max = std::max(max, s->generation()); - } - return make_ready_future(max); - }); - }); -} - -std::chrono::steady_clock::duration table::enable_sstable_write(int64_t new_generation) { - if (new_generation != -1) { - update_sstables_known_generation(new_generation); - } - _sstable_deletion_sem.signal(); - _sstables_lock.write_unlock(); - return std::chrono::steady_clock::now() - _sstable_writes_disabled_at; -} - void table::set_schema(schema_ptr s) { assert(s->is_counter() == _schema->is_counter()); tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}", From e40aa042a7405d9c0d3db97e639f6280f95eb98e Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 18 May 2020 11:59:19 -0400 Subject: [PATCH 15/15] distributed_loader: reshard before the node is made online This patch moves the resharding process to use the new directory_with_sstables_handler infrastructure. There is no longer a clear reshard step, and that just becomes a natural part of populate_column_family. In main.cc, a couple of changes are necessary to make that happen. The first one obviously is to stop calling reshard. We also need to make sure that: - The compaction manager is started much earlier, so we can register resharding jobs with it. - auto compactions are disabled in the populate method, so resharding doesn't have to fight for bandwidth with auto compactions. Now that we are resharding through the sstable_directory, the old resharding code can be deleted. There is also no need to deal with the resharding backlog either, because the SSTables are not yet added to the sstable set at this point. Signed-off-by: Glauber Costa --- compaction_strategy.hh | 2 - db/config.cc | 2 + db/config.hh | 1 + distributed_loader.cc | 373 ++++-------------------- distributed_loader.hh | 1 - main.cc | 19 +- sstables/compaction.cc | 40 +-- sstables/compaction.hh | 7 - sstables/compaction_strategy.cc | 17 -- sstables/compaction_strategy_impl.hh | 1 - sstables/leveled_compaction_strategy.cc | 33 --- sstables/leveled_compaction_strategy.hh | 2 - test/boost/sstable_resharding_test.cc | 37 --- 13 files changed, 70 insertions(+), 465 deletions(-) diff --git a/compaction_strategy.hh b/compaction_strategy.hh index 232c5523f4..77833b2fac 100644 --- a/compaction_strategy.hh +++ b/compaction_strategy.hh @@ -63,8 +63,6 @@ public: compaction_descriptor get_major_compaction_job(column_family& cf, std::vector candidates); - std::vector get_resharding_jobs(column_family& cf, std::vector candidates); - // Some strategies may look at the compacted and resulting sstables to // get some useful information for subsequent compactions. void notify_completion(const std::vector& removed, const std::vector& added); diff --git a/db/config.cc b/db/config.cc index 5f1096e957..4914000355 100644 --- a/db/config.cc +++ b/db/config.cc @@ -724,6 +724,8 @@ db::config::config(std::shared_ptr exts) "especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.") , max_memory_for_unlimited_query(this, "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, size_t(1) << 20, "Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries.") + , initial_sstable_loading_concurrency(this, "initial_sstable_loading_concurrency", value_status::Used, 4u, + "Maximum amount of sstables to load in parallel during initialization. A higher number can lead to more memory consumption. You should not need to touch this") , enable_3_1_0_compatibility_mode(this, "enable_3_1_0_compatibility_mode", value_status::Used, false, "Set to true if the cluster was initially installed from 3.1.0. If it was upgraded from an earlier version," " or installed from a later version, leave this set to false. This adjusts the communication protocol to" diff --git a/db/config.hh b/db/config.hh index c726e34b10..e8867a0340 100644 --- a/db/config.hh +++ b/db/config.hh @@ -304,6 +304,7 @@ public: named_value max_partition_key_restrictions_per_query; named_value max_clustering_key_restrictions_per_query; named_value max_memory_for_unlimited_query; + named_value initial_sstable_loading_concurrency; named_value enable_3_1_0_compatibility_mode; named_value enable_user_defined_functions; named_value user_defined_function_time_limit_ms; diff --git a/distributed_loader.cc b/distributed_loader.cc index 9b94bdc5ff..73870b6dd1 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -113,87 +113,6 @@ public: } }; -// checks whether or not a given column family is worth resharding by checking if any of its -// sstables has more than one owner shard. -static future worth_resharding(distributed& db, global_column_family_ptr cf) { - auto has_shared_sstables = [cf] (database& db) { - return cf->has_shared_sstables(); - }; - return db.map_reduce0(has_shared_sstables, bool(false), std::logical_or()); -} - -static future> -load_sstables_with_open_info(std::vector ssts_info, global_column_family_ptr cf, sstring dir, - noncopyable_function pred) { - return do_with(std::vector(), [ssts_info = std::move(ssts_info), cf, dir, pred = std::move(pred)] (auto& ssts) mutable { - return parallel_for_each(std::move(ssts_info), [&ssts, cf, dir, pred = std::move(pred)] (auto& info) mutable { - if (!pred(info)) { - return make_ready_future<>(); - } - auto sst = cf->make_sstable(dir, info.generation, info.version, info.format); - return sst->load(std::move(info)).then([&ssts, sst] { - ssts.push_back(std::move(sst)); - return make_ready_future<>(); - }); - }).then([&ssts] () mutable { - return std::move(ssts); - }); - }); -} - -// make a set of sstables available at another shard. -static future<> forward_sstables_to(shard_id shard, sstring directory, std::vector sstables, global_column_family_ptr cf, - std::function (std::vector)> func) { - return seastar::async([sstables = std::move(sstables), directory, shard, cf, func = std::move(func)] () mutable { - auto infos = boost::copy_range>(sstables - | boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); })); - - smp::submit_to(shard, [cf, func, infos = std::move(infos), directory] () mutable { - return load_sstables_with_open_info(std::move(infos), cf, directory, [] (auto& p) { - return true; - }).then([func] (std::vector sstables) { - return func(std::move(sstables)); - }); - }).get(); - }); -} - -// Return all sstables that need resharding in the system. Only one instance of a shared sstable is returned. -static future> get_all_shared_sstables(distributed& db, sstring sstdir, global_column_family_ptr cf) { - class all_shared_sstables { - sstring _dir; - global_column_family_ptr _cf; - std::unordered_map _result; - public: - all_shared_sstables(const sstring& sstdir, global_column_family_ptr cf) : _dir(sstdir), _cf(cf) {} - - future<> operator()(std::vector ssts_info) { - return load_sstables_with_open_info(std::move(ssts_info), _cf, _dir, [this] (auto& info) { - // skip loading of shared sstable that is already stored in _result. - return !_result.count(info.generation); - }).then([this] (std::vector sstables) { - for (auto& sst : sstables) { - auto gen = sst->generation(); - _result.emplace(gen, std::move(sst)); - } - return make_ready_future<>(); - }); - } - - std::vector get() && { - return boost::copy_range>(std::move(_result) | boost::adaptors::map_values); - } - }; - - return db.map_reduce(all_shared_sstables(sstdir, cf), [cf, sstdir] (database& db) mutable { - return seastar::async([cf, sstdir] { - return boost::copy_range>(cf->sstables_need_rewrite() - | boost::adaptors::filtered([sstdir] (auto&& sst) { return sst->get_dir() == sstdir; }) - | boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); })); - }); - }); -} - template static inline future<> verification_error(fs::path path, const char* fstr, Args&&... args) { @@ -586,155 +505,6 @@ future<> distributed_loader::open_sstable(distributed& db, sstables::e }); } -// invokes each descriptor at its target shard, which involves forwarding sstables too. -static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, sstring directory, std::vector jobs, - std::function (std::vector, uint32_t level, uint64_t max_sstable_bytes)> func) { - return parallel_for_each(std::move(jobs), [cf, func, &directory] (sstables::resharding_descriptor& job) mutable { - return forward_sstables_to(job.reshard_at, directory, std::move(job.sstables), cf, - [cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) { - // compaction manager ensures that only one reshard operation will run per shard. - auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable { - return func(std::move(sstables), level, max_sstable_bytes); - }; - return cf->get_compaction_manager().run_custom_job(&*cf, "resharding", std::move(job)); - }); - }); -} - -static std::vector sstables_for_shard(const std::vector& sstables, shard_id shard) { - auto belongs_to_shard = [] (const sstables::shared_sstable& sst, unsigned shard) { - auto& shards = sst->get_shards_for_this_sstable(); - return boost::range::find(shards, shard) != shards.end(); - }; - - return boost::copy_range>(sstables - | boost::adaptors::filtered([&] (auto& sst) { return belongs_to_shard(sst, shard); })); -} - -void distributed_loader::reshard(distributed& db, sstring ks_name, sstring cf_name) { - assert(this_shard_id() == 0); // NOTE: should always run on shard 0! - - // ensures that only one column family is resharded at a time (that's okay because - // actual resharding is parallelized), and that's needed to prevent the same column - // family from being resharded in parallel (that could happen, for example, if - // refresh (triggers resharding) is issued by user while resharding is going on). - static semaphore sem(1); - - // FIXME: discarded future. - (void)with_semaphore(sem, 1, [&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable { - return seastar::async([&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable { - global_column_family_ptr cf(db, ks_name, cf_name); - - if (!cf->get_compaction_manager().enabled()) { - return; - } - // fast path to detect that this column family doesn't need reshard. - if (!worth_resharding(db, cf).get0()) { - dblog.debug("Nothing to reshard for {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name()); - return; - } - - parallel_for_each(cf->_config.all_datadirs, [&db, cf] (const sstring& directory) { - auto candidates = get_all_shared_sstables(db, directory, cf).get0(); - dblog.debug("{} candidates for resharding for {}.{}", candidates.size(), cf->schema()->ks_name(), cf->schema()->cf_name()); - auto jobs = cf->get_compaction_strategy().get_resharding_jobs(*cf, std::move(candidates)); - dblog.debug("{} resharding jobs for {}.{}", jobs.size(), cf->schema()->ks_name(), cf->schema()->cf_name()); - - return invoke_all_resharding_jobs(cf, directory, std::move(jobs), [directory, &cf] (auto sstables, auto level, auto max_sstable_bytes) { - // FIXME: run it in maintenance priority. - // Resharding, currently, cannot provide compaction with a snapshot of the sstable set - // which spans all shards that input sstables belong to, so expiration is disabled. - std::optional sstable_set = std::nullopt; - sstables::compaction_descriptor descriptor(sstables, std::move(sstable_set), service::get_local_compaction_priority(), - level, max_sstable_bytes); - descriptor.options = sstables::compaction_options::make_reshard(); - descriptor.creator = [&cf, directory] (shard_id shard) mutable { - // we need generation calculated by instance of cf at requested shard, - // or resource usage wouldn't be fairly distributed among shards. - auto gen = smp::submit_to(shard, [&cf] () { - return cf->calculate_generation_for_new_table(); - }).get0(); - - return cf->make_sstable(directory, gen, - cf->get_sstables_manager().get_highest_supported_format(), - sstables::sstable::format_types::big); - }; - auto f = sstables::compact_sstables(std::move(descriptor), *cf); - - return f.then([&cf, sstables = std::move(sstables), directory] (sstables::compaction_info info) mutable { - auto new_sstables = std::move(info.new_sstables); - // an input sstable may belong to shard 1 and 2 and only have data which - // token belongs to shard 1. That means resharding will only create a - // sstable for shard 1, but both shards opened the sstable. So our code - // below should ask both shards to remove the resharded table, or it - // wouldn't be deleted by our deletion manager, and resharding would be - // triggered again in the subsequent boot. - return parallel_for_each(boost::irange(0u, smp::count), [&cf, directory, sstables, new_sstables] (auto shard) { - auto old_sstables_for_shard = sstables_for_shard(sstables, shard); - // nothing to do if no input sstable belongs to this shard. - if (old_sstables_for_shard.empty()) { - return make_ready_future<>(); - } - auto new_sstables_for_shard = sstables_for_shard(new_sstables, shard); - // sanity checks - for (auto& sst : new_sstables_for_shard) { - auto& shards = sst->get_shards_for_this_sstable(); - if (shards.size() != 1) { - throw std::runtime_error(format("resharded sstable {} doesn't belong to only one shard", sst->get_filename())); - } - if (shards.front() != shard) { - throw std::runtime_error(format("resharded sstable {} should belong to shard {:d}", sst->get_filename(), shard)); - } - } - - std::unordered_set ancestors; - boost::range::transform(old_sstables_for_shard, std::inserter(ancestors, ancestors.end()), - std::mem_fn(&sstables::sstable::generation)); - - if (new_sstables_for_shard.empty()) { - // handles case where sstable needing rewrite doesn't produce any sstable - // for a shard it belongs to when resharded (the reason is explained above). - return smp::submit_to(shard, [cf, ancestors = std::move(ancestors)] () mutable { - return cf->remove_ancestors_needed_rewrite(ancestors); - }); - } else { - return forward_sstables_to(shard, directory, new_sstables_for_shard, cf, [cf, ancestors = std::move(ancestors)] (std::vector sstables) mutable { - return cf->replace_ancestors_needed_rewrite(std::move(ancestors), std::move(sstables)); - }); - } - }).then([&cf, sstables] { - // schedule deletion of shared sstables after we're certain that new unshared ones were successfully forwarded to respective shards. - (void)sstables::delete_atomically(sstables).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) { - try { - std::rethrow_exception(eptr); - } catch (...) { - dblog.warn("Exception in resharding when deleting sstable file: {}", eptr); - } - }).then([cf, sstables = std::move(sstables)] { - // Refresh cache's snapshot of shards involved in resharding to prevent the cache from - // holding reference to deleted files which results in disk space not being released. - std::unordered_set owner_shards; - for (auto& sst : sstables) { - const auto& shards = sst->get_shards_for_this_sstable(); - owner_shards.insert(shards.begin(), shards.end()); - if (owner_shards.size() == smp::count) { - break; - } - } - return parallel_for_each(std::move(owner_shards), [cf] (shard_id shard) { - return smp::submit_to(shard, [cf] () mutable { - cf->_cache.refresh_snapshot(); - }); - }); - }); - }); - }); - }); - }).get(); - }); - }); -} - future distributed_loader::probe_file(distributed& db, sstring sstdir, sstring fname) { using namespace sstables; @@ -856,92 +626,6 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele }); } -future<> distributed_loader::do_populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { - // We can catch most errors when we try to load an sstable. But if the TOC - // file is the one missing, we won't try to load the sstable at all. This - // case is still an invalid case, but it is way easier for us to treat it - // by waiting for all files to be loaded, and then checking if we saw a - // file during scan_dir, without its corresponding TOC. - enum class component_status { - has_some_file, - has_toc_file, - has_temporary_toc_file, - }; - - struct sstable_descriptor { - component_status status; - sstables::sstable::version_types version; - sstables::sstable::format_types format; - }; - - auto verifier = make_lw_shared>(); - - return do_with(std::vector>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector>& futures) { - return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) { - // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") - - // push future returned by probe_file into an array of futures, - // so that the supplied callback will not block scan_dir() from - // reading the next entry in the directory. - auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) { - if (entry.component == component_type::TemporaryStatistics) { - return remove_file(sstables::sstable::filename(sstdir.native(), entry.ks, entry.cf, entry.version, entry.generation, - entry.format, component_type::TemporaryStatistics)); - } - - if (verifier->count(entry.generation)) { - if (verifier->at(entry.generation).status == component_status::has_toc_file) { - fs::path file_path(sstdir / de.name); - if (entry.component == component_type::TOC) { - throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed", file_path.native()); - } else if (entry.component == component_type::TemporaryTOC) { - throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed", file_path.native()); - } - } else if (entry.component == component_type::TOC) { - verifier->at(entry.generation).status = component_status::has_toc_file; - } else if (entry.component == component_type::TemporaryTOC) { - verifier->at(entry.generation).status = component_status::has_temporary_toc_file; - } - } else { - if (entry.component == component_type::TOC) { - verifier->emplace(entry.generation, sstable_descriptor{component_status::has_toc_file, entry.version, entry.format}); - } else if (entry.component == component_type::TemporaryTOC) { - verifier->emplace(entry.generation, sstable_descriptor{component_status::has_temporary_toc_file, entry.version, entry.format}); - } else { - verifier->emplace(entry.generation, sstable_descriptor{component_status::has_some_file, entry.version, entry.format}); - } - } - return make_ready_future<>(); - }); - - futures.push_back(std::move(f)); - - return make_ready_future<>(); - }, &sstables::manifest_json_filter).then([&futures] { - return execute_futures(futures); - }).then([verifier, sstdir, ks = std::move(ks), cf = std::move(cf)] { - return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), verifier] (auto v) { - if (v.second.status == component_status::has_temporary_toc_file) { - unsigned long gen = v.first; - sstables::sstable::version_types version = v.second.version; - sstables::sstable::format_types format = v.second.format; - - if (this_shard_id() != 0) { - dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first); - return make_ready_future<>(); - } - // shard 0 is the responsible for removing a partial sstable. - return sstables::sstable::remove_sstable_with_temp_toc(ks, cf, sstdir, gen, version, format); - } else if (v.second.status != component_status::has_toc_file) { - throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable with generation {:d}!. Refusing to boot", sstdir, v.first)); - } - return make_ready_future<>(); - }); - }); - }); - -} - future<> distributed_loader::populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] { assert(this_shard_id() == 0); @@ -952,8 +636,61 @@ future<> distributed_loader::populate_column_family(distributed& db, s if (exists) { handle_sstables_pending_delete(pending_delete_dir).get(); } - // Second pass, cleanup sstables with temporary TOCs and load the rest. - do_populate_column_family(db, std::move(sstdir), std::move(ks), std::move(cf)).get(); + + global_column_family_ptr global_table(db, ks, cf); + + sharded directory; + directory.start(fs::path(sstdir), db.local().get_config().initial_sstable_loading_concurrency(), + sstables::sstable_directory::need_mutate_level::yes, + sstables::sstable_directory::lack_of_toc_fatal::yes, + sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()), + sstables::sstable_directory::allow_loading_materialized_view::yes, + [&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) { + return global_table->make_sstable(dir.native(), gen, v, f); + }).get(); + + auto stop = defer([&directory] { + directory.stop().get(); + }); + + lock_table(directory, db, ks, cf).get(); + process_sstable_dir(directory).get(); + + // If we are resharding system tables before we can read them, we will not + // know which is the highest format we support: this information is itself stored + // in the system tables. In that case we'll rely on what we find on disk: we'll + // at least not downgrade any files. If we already know that we support a higher + // format than the one we see then we use that. + auto sys_format = global_table->get_sstables_manager().get_highest_supported_format(); + auto sst_version = highest_version_seen(directory, sys_format).get0(); + auto generation = highest_generation_seen(directory).get0(); + + db.invoke_on_all([&global_table, generation] (database& db) { + global_table->update_sstables_known_generation(generation); + global_table->disable_auto_compaction(); + return make_ready_future<>(); + }).get(); + + reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable { + auto gen = smp::submit_to(shard, [&global_table] () { + return global_table->calculate_generation_for_new_table(); + }).get0(); + + return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big); + }).get(); + + // The node is offline at this point so we are very lenient with what we consider + // offstrategy. + reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) { + auto gen = global_table->calculate_generation_for_new_table(); + return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big); + }).get(); + + directory.invoke_on_all([global_table] (sstables::sstable_directory& dir) { + return dir.do_for_each_sstable([&global_table] (sstables::shared_sstable sst) { + return global_table->add_sstable_and_update_cache(sst); + }); + }).get(); }); } diff --git a/distributed_loader.hh b/distributed_loader.hh index 76f8f28626..bf177736bb 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -65,7 +65,6 @@ public: static future<> process_sstable_dir(sharded& dir); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); - static void reshard(distributed& db, sstring ks_name, sstring cf_name); static future<> open_sstable(distributed& db, sstables::entry_descriptor comps, std::function (column_family&, sstables::foreign_sstable_open_info)> func, const io_priority_class& pc = default_priority_class()); diff --git a/main.cc b/main.cc index 6872aeec67..e06449a57e 100644 --- a/main.cc +++ b/main.cc @@ -762,6 +762,11 @@ int main(int ac, char** av) { dirs.init(*cfg, bool(hinted_handoff_enabled)).get(); + // We need the compaction manager ready early so we can reshard. + db.invoke_on_all([&proxy, &stop_signal] (database& db) { + db.get_compaction_manager().enable(); + }).get(); + // Initialization of a keyspace is done by shard 0 only. For system // keyspace, the procedure will go through the hardcoded column // families, and in each of them, it will load the sstables for all @@ -926,8 +931,11 @@ int main(int ac, char** av) { } } - db.invoke_on_all([&proxy] (database& db) { - db.get_compaction_manager().enable(); + db.invoke_on_all([] (database& db) { + for (auto& x : db.get_column_families()) { + table& t = *(x.second); + t.enable_auto_compaction(); + } }).get(); // If the same sstable is shared by several shards, it cannot be @@ -941,13 +949,6 @@ int main(int ac, char** av) { // group that was effectively used in the bulk of it (compaction). Soon it will become // streaming - with_scheduling_group(dbcfg.compaction_scheduling_group, [&db] { - for (auto& x : db.local().get_column_families()) { - column_family& cf = *(x.second); - distributed_loader::reshard(db, cf.schema()->ks_name(), cf.schema()->cf_name()); - } - }).get(); - db.invoke_on_all([&proxy] (database& db) { for (auto& x : db.get_column_families()) { column_family& cf = *(x.second); diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 94487c8173..05f6e1150f 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -390,34 +390,6 @@ public: } }; -// Resharding doesn't really belong into any strategy, because it is not worried about laying out -// SSTables according to any strategy-specific criteria. So we will just make it proportional to -// the amount of data we still have to reshard. -// -// Although at first it may seem like we could improve this by tracking the ongoing reshard as well -// and reducing the backlog as we compact, that is not really true. Resharding is not really -// expected to get rid of data and it is usually just splitting data among shards. Whichever backlog -// we get rid of by tracking the compaction will come back as a big spike as we add this SSTable -// back to their rightful shard owners. -// -// So because the data is supposed to be constant, we will just add the total amount of data as the -// backlog. -class resharding_backlog_tracker final : public compaction_backlog_tracker::impl { - uint64_t _total_bytes = 0; -public: - virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { - return _total_bytes; - } - - virtual void add_sstable(sstables::shared_sstable sst) override { - _total_bytes += sst->data_size(); - } - - virtual void remove_sstable(sstables::shared_sstable sst) override { - _total_bytes -= sst->data_size(); - } -}; - class compaction { protected: column_family& _cf; @@ -1247,7 +1219,6 @@ flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_co class resharding_compaction final : public compaction { shard_id _shard; // shard of current sstable writer - compaction_backlog_tracker _resharding_backlog_tracker; // Partition count estimation for a shard S: // @@ -1277,14 +1248,10 @@ private: public: resharding_compaction(column_family& cf, sstables::compaction_descriptor descriptor) : compaction(cf, std::move(descriptor)) - , _resharding_backlog_tracker(std::make_unique()) , _estimation_per_shard(smp::count) , _run_identifiers(smp::count) { - cf.get_compaction_manager().register_backlog_tracker(_resharding_backlog_tracker); for (auto& sst : _sstables) { - _resharding_backlog_tracker.add_sstable(sst); - const auto& shards = sst->get_shards_for_this_sstable(); auto size = sst->bytes_on_disk(); auto estimated_partitions = sst->get_estimated_key_count(); @@ -1299,11 +1266,7 @@ public: _info->type = compaction_type::Reshard; } - ~resharding_compaction() { - for (auto& s : _sstables) { - _resharding_backlog_tracker.remove_sstable(s); - } - } + ~resharding_compaction() { } // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { @@ -1346,6 +1309,7 @@ public: sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer(); cfg.max_sstable_size = _max_sstable_size; + cfg.monitor = &default_write_monitor(); // sstables generated for a given shard will share the same run identifier. cfg.run_identifier = _run_identifiers.at(shard); return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(shard), cfg, get_encoding_stats(), _io_priority, shard), sst}; diff --git a/sstables/compaction.hh b/sstables/compaction.hh index bbc3531363..70516228e2 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -50,13 +50,6 @@ namespace sstables { friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput); }; - struct resharding_descriptor { - std::vector sstables; - uint64_t max_sstable_bytes; - shard_id reshard_at; - uint32_t level; - }; - static inline sstring compaction_name(compaction_type type) { switch (type) { case compaction_type::Compaction: diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 15652b95dc..dd606e97e6 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -460,19 +460,6 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold; } -std::vector -compaction_strategy_impl::get_resharding_jobs(column_family& cf, std::vector candidates) { - std::vector jobs; - shard_id reshard_at_current = 0; - - clogger.debug("Trying to get resharding jobs for {}.{}...", cf.schema()->ks_name(), cf.schema()->cf_name()); - for (auto& candidate : candidates) { - auto level = candidate->get_sstable_level(); - jobs.push_back(resharding_descriptor{{std::move(candidate)}, std::numeric_limits::max(), reshard_at_current++ % smp::count, level}); - } - return jobs; -} - uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) { return partition_estimate; } @@ -984,10 +971,6 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(column_famil return _compaction_strategy_impl->get_major_compaction_job(cf, std::move(candidates)); } -std::vector compaction_strategy::get_resharding_jobs(column_family& cf, std::vector candidates) { - return _compaction_strategy_impl->get_resharding_jobs(cf, std::move(candidates)); -} - void compaction_strategy::notify_completion(const std::vector& removed, const std::vector& added) { _compaction_strategy_impl->notify_completion(removed, added); } diff --git a/sstables/compaction_strategy_impl.hh b/sstables/compaction_strategy_impl.hh index 4b93b825fc..f5c119e3c8 100644 --- a/sstables/compaction_strategy_impl.hh +++ b/sstables/compaction_strategy_impl.hh @@ -73,7 +73,6 @@ public: virtual ~compaction_strategy_impl() {} virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector candidates) = 0; virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector candidates); - virtual std::vector get_resharding_jobs(column_family& cf, std::vector candidates); virtual void notify_completion(const std::vector& removed, const std::vector& added) { } virtual compaction_strategy_type type() const = 0; virtual bool parallel_compaction() const { diff --git a/sstables/leveled_compaction_strategy.cc b/sstables/leveled_compaction_strategy.cc index a3ae5a3562..b562dc28ab 100644 --- a/sstables/leveled_compaction_strategy.cc +++ b/sstables/leveled_compaction_strategy.cc @@ -76,39 +76,6 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(colu sst->get_sstable_level(), _max_sstable_size_in_mb*1024*1024); } -std::vector leveled_compaction_strategy::get_resharding_jobs(column_family& cf, std::vector candidates) { - leveled_manifest manifest = leveled_manifest::create(cf, candidates, _max_sstable_size_in_mb, _stcs_options); - - std::vector descriptors; - shard_id target_shard = 0; - auto get_shard = [&target_shard] { return target_shard++ % smp::count; }; - - // Basically, we'll iterate through all levels, and for each, we'll sort the - // sstables by first key because there's a need to reshard together adjacent - // sstables. - // The shard at which the job will run is chosen in a round-robin fashion. - for (auto level = 0U; level <= manifest.get_level_count(); level++) { - uint64_t max_sstable_size = !level ? std::numeric_limits::max() : (_max_sstable_size_in_mb*1024*1024); - auto& sstables = manifest.get_level(level); - boost::sort(sstables, [] (auto& i, auto& j) { - return i->compare_by_first_key(*j) < 0; - }); - - resharding_descriptor current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level}; - - for (auto it = sstables.begin(); it != sstables.end(); it++) { - current_descriptor.sstables.push_back(*it); - - auto next = std::next(it); - if (current_descriptor.sstables.size() == smp::count || next == sstables.end()) { - descriptors.push_back(std::move(current_descriptor)); - current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level}; - } - } - } - return descriptors; -} - void leveled_compaction_strategy::notify_completion(const std::vector& removed, const std::vector& added) { if (removed.empty() || added.empty()) { return; diff --git a/sstables/leveled_compaction_strategy.hh b/sstables/leveled_compaction_strategy.hh index 273a6a6615..82656667f9 100644 --- a/sstables/leveled_compaction_strategy.hh +++ b/sstables/leveled_compaction_strategy.hh @@ -41,8 +41,6 @@ public: virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector candidates) override; - virtual std::vector get_resharding_jobs(column_family& cf, std::vector candidates) override; - virtual void notify_completion(const std::vector& removed, const std::vector& added) override; // for each level > 0, get newest sstable and use its last key as last diff --git a/test/boost/sstable_resharding_test.cc b/test/boost/sstable_resharding_test.cc index 6bf1626c90..6809093335 100644 --- a/test/boost/sstable_resharding_test.cc +++ b/test/boost/sstable_resharding_test.cc @@ -133,43 +133,6 @@ SEASTAR_TEST_CASE(sstable_resharding_test) { }); } -SEASTAR_THREAD_TEST_CASE(sstable_resharding_strategy_tests) { - test_env env; - - for (const auto version : all_sstable_versions) { - auto s = make_lw_shared(schema({}, "ks", "cf", {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); - auto get_sstable = [&] (int64_t gen, sstring first_key, sstring last_key) mutable { - auto sst = env.make_sstable(s, "", gen, version, sstables::sstable::format_types::big); - stats_metadata stats = {}; - stats.sstable_level = 1; - sstables::test(sst).set_values(std::move(first_key), std::move(last_key), std::move(stats)); - return sst; - }; - - column_family_for_tests cf; - - auto tokens = token_generation_for_current_shard(2); - auto stcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options()); - auto lcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - - auto sst1 = get_sstable(1, tokens[0].first, tokens[1].first); - auto sst2 = get_sstable(2, tokens[1].first, tokens[1].first); - - { - // TODO: sstable_test runs with smp::count == 1, thus we will not be able to stress it more - // until we move this test case to sstable_resharding_test. - auto descriptors = stcs.get_resharding_jobs(*cf, { sst1, sst2 }); - BOOST_REQUIRE(descriptors.size() == 2); - } - { - auto ssts = std::vector{ sst1, sst2 }; - auto descriptors = lcs.get_resharding_jobs(*cf, ssts); - auto expected_jobs = (ssts.size()+smp::count-1)/smp::count; - BOOST_REQUIRE(descriptors.size() == expected_jobs); - } - } -} - SEASTAR_TEST_CASE(sstable_is_shared_correctness) { return test_env::do_with_async([] (test_env& env) { for (const auto version : all_sstable_versions) {