Merge "Reshape upload files and reshard+reshape at boot" from Glauber

"

This patchset adds a reshape operation to each compaction strategy;
that is a strategy-specific way of detecting if SSTables are in-strategy
or off-strategy, and in case they are offstrategy moving them to in-strategy.

Often times the number of SSTables in a particular slice of the sstable set
matters for that decision (number of SSTables in the same time window for TWCS,
number of SSTables per tier for STCS, number of L0 SSTables for LCS). We want
to be more lenient for operations that keep the node offline, like reshape at
boot, but more forgiving for operations like upload, which run in maintenance
mode. To accomodate for that the threshold for considering a slice of the SSTable
set offstrategy is passed as a parameter

Once this patchset is applied, the upload directory will reshape the SSTables
before moving them to the main directory (if needed). One side effect of it
is that it is no longer necessary to take locks for the refresh operation nor
disable writes in the table.

With the infrastructure that we have built in the upload directory, we can
apply the same set of steps to populate_column_family. Using the sstable_directory
to scan the files we can reshard and reshape (usually if we resharded a reshape
will be necessary) with the node still offline. This has the benefit of never
adding shared SSTables to the table.

Applying this patchset will unlock a host of cleanups:
- we can get rid of all testing for shared sstables, sstable_need_rewrite, etc.
- we can remove the resharding backlog tracker.

and many others. Most cleanups are deferred for a later patchset, though.
"

* 'reshard-reshape-v4' of github.com:glommer/scylla:
  distributed_loader: reshard before the node is made online
  distributed_loader: rework uploading of SSTables
  sstable_directory: add helper to reshape existing unshared sstables
  compaction_strategy: add method to reshape SSTables
  compaction: add a new compaction type, Reshape
  compaction: add a size and throught pretty printer.
  compaction: add default implementation for some pure functions
  tests: fix fragile database tests
  distributed_loader.cc: add a helper function to extract the highest SSTable version found
  distributed_loader.cc : extract highest_generation_seen code
  compaction_manager: rename run_resharding_job
  distributed_loader: assume populate_column_families is run in shard 0
  api: do not allow user to meddle with auto compaction too early
  upload: use custom error handler for upload directory
  sstable_directory: fix debug message
This commit is contained in:
Avi Kivity
2020-06-18 17:04:53 +03:00
29 changed files with 723 additions and 924 deletions

View File

@@ -87,20 +87,13 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
};
}
future<> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
future<json::json_return_type> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
if (tables.empty()) {
tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}
return ctx.db.invoke_on_all([keyspace, tables, enabled] (database& db) {
return parallel_for_each(tables, [&db, keyspace, enabled](const sstring& table) mutable {
column_family& cf = db.find_column_family(keyspace, table);
if (enabled) {
cf.enable_auto_compaction();
} else {
cf.disable_auto_compaction();
}
return make_ready_future<>();
});
return service::get_local_storage_service().set_tables_autocompaction(keyspace, tables, enabled).then([]{
return make_ready_future<json::json_return_type>(json_void());
});
}
@@ -742,17 +735,15 @@ void set_storage_service(http_context& ctx, routes& r) {
ss::enable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));
return set_tables_autocompaction(ctx, keyspace, tables, true).then([]{
return make_ready_future<json::json_return_type>(json_void());
});
return set_tables_autocompaction(ctx, keyspace, tables, true);
});
ss::disable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));
return set_tables_autocompaction(ctx, keyspace, tables, false).then([]{
return make_ready_future<json::json_return_type>(json_void());
});
return set_tables_autocompaction(ctx, keyspace, tables, false);
});
ss::deliver_hints.set(r, [](std::unique_ptr<request> req) {

View File

@@ -23,6 +23,7 @@
#include <seastar/core/future.hh>
#include <seastar/util/noncopyable_function.hh>
#include <seastar/core/file.hh>
#include "schema_fwd.hh"
#include "sstables/shared_sstable.hh"
@@ -62,8 +63,6 @@ public:
compaction_descriptor get_major_compaction_job(column_family& cf, std::vector<shared_sstable> candidates);
std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates);
// Some strategies may look at the compacted and resulting sstables to
// get some useful information for subsequent compactions.
void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added);
@@ -135,6 +134,20 @@ public:
// Returns whether or not interposer consumer is used by a given strategy.
bool use_interposer_consumer() const;
// Informs the caller (usually the compaction manager) about what would it take for this set of
// SSTables closer to becoming in-strategy. If this returns an empty compaction descriptor, this
// means that the sstable set is already in-strategy.
//
// The caller can specify one of two modes: strict or relaxed. In relaxed mode the tolerance for
// what is considered offstrategy is higher. It can be used, for instance, for when the system
// is restarting and previous compactions were likely in-flight. In strict mode, we are less
// tolerant to invariant breakages.
//
// The caller should also pass a maximum number of SSTables which is the maximum amount of
// SSTables that can be added into a single job.
compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode);
};
// Creates a compaction_strategy object from one of the strategies available.

View File

@@ -32,4 +32,5 @@ enum class compaction_strategy_type {
time_window,
};
enum class reshape_mode { strict, relaxed };
}

View File

@@ -489,11 +489,7 @@ private:
// This semaphore ensures that an operation like snapshot won't have its selected
// sstables deleted by compaction in parallel, a race condition which could
// easily result in failure.
// Locking order: must be acquired either independently or after _sstables_lock
seastar::named_semaphore _sstable_deletion_sem = {1, named_semaphore_exception_factory{"sstable deletion"}};
// There are situations in which we need to stop writing sstables. Flushers will take
// the read lock, and the ones that wish to stop that process will take the write lock.
rwlock _sstables_lock;
mutable row_cache _cache; // Cache covers only sstables.
std::optional<int64_t> _sstable_generation = {};
@@ -820,16 +816,6 @@ public:
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
// Important warning: disabling writes will only have an effect in the current shard.
// The other shards will keep writing tables at will. Therefore, you very likely need
// to call this separately in all shards first, to guarantee that none of them are writing
// new data before you can safely assume that the whole node is disabled.
future<int64_t> disable_sstable_write();
// SSTable writes are now allowed again, and generation is updated to new_generation if != -1
// returns the amount of microseconds elapsed since we disabled writes.
std::chrono::steady_clock::duration enable_sstable_write(int64_t new_generation);
// Make sure the generation numbers are sequential, starting from "start".
// Generations before "start" are left untouched.
//

View File

@@ -724,6 +724,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.")
, max_memory_for_unlimited_query(this, "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, size_t(1) << 20,
"Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries.")
, initial_sstable_loading_concurrency(this, "initial_sstable_loading_concurrency", value_status::Used, 4u,
"Maximum amount of sstables to load in parallel during initialization. A higher number can lead to more memory consumption. You should not need to touch this")
, enable_3_1_0_compatibility_mode(this, "enable_3_1_0_compatibility_mode", value_status::Used, false,
"Set to true if the cluster was initially installed from 3.1.0. If it was upgraded from an earlier version,"
" or installed from a later version, leave this set to false. This adjusts the communication protocol to"

View File

@@ -304,6 +304,7 @@ public:
named_value<uint32_t> max_partition_key_restrictions_per_query;
named_value<uint32_t> max_clustering_key_restrictions_per_query;
named_value<uint64_t> max_memory_for_unlimited_query;
named_value<unsigned> initial_sstable_loading_concurrency;
named_value<bool> enable_3_1_0_compatibility_mode;
named_value<bool> enable_user_defined_functions;
named_value<unsigned> user_defined_function_time_limit_ms;

View File

@@ -79,6 +79,10 @@ static io_error_handler error_handler_for_upload_dir() {
};
}
io_error_handler error_handler_gen_for_upload_dir(disk_error_signal_type& dummy) {
return error_handler_for_upload_dir();
}
// TODO: possibly move it to seastar
template <typename Service, typename PtrType, typename Func>
static future<> invoke_shards_with_ptr(std::unordered_set<shard_id> shards, distributed<Service>& s, PtrType ptr, Func&& func) {
@@ -109,87 +113,6 @@ public:
}
};
// checks whether or not a given column family is worth resharding by checking if any of its
// sstables has more than one owner shard.
static future<bool> worth_resharding(distributed<database>& db, global_column_family_ptr cf) {
auto has_shared_sstables = [cf] (database& db) {
return cf->has_shared_sstables();
};
return db.map_reduce0(has_shared_sstables, bool(false), std::logical_or<bool>());
}
static future<std::vector<sstables::shared_sstable>>
load_sstables_with_open_info(std::vector<sstables::foreign_sstable_open_info> ssts_info, global_column_family_ptr cf, sstring dir,
noncopyable_function<bool (const sstables::foreign_sstable_open_info&)> pred) {
return do_with(std::vector<sstables::shared_sstable>(), [ssts_info = std::move(ssts_info), cf, dir, pred = std::move(pred)] (auto& ssts) mutable {
return parallel_for_each(std::move(ssts_info), [&ssts, cf, dir, pred = std::move(pred)] (auto& info) mutable {
if (!pred(info)) {
return make_ready_future<>();
}
auto sst = cf->make_sstable(dir, info.generation, info.version, info.format);
return sst->load(std::move(info)).then([&ssts, sst] {
ssts.push_back(std::move(sst));
return make_ready_future<>();
});
}).then([&ssts] () mutable {
return std::move(ssts);
});
});
}
// make a set of sstables available at another shard.
static future<> forward_sstables_to(shard_id shard, sstring directory, std::vector<sstables::shared_sstable> sstables, global_column_family_ptr cf,
std::function<future<> (std::vector<sstables::shared_sstable>)> func) {
return seastar::async([sstables = std::move(sstables), directory, shard, cf, func = std::move(func)] () mutable {
auto infos = boost::copy_range<std::vector<sstables::foreign_sstable_open_info>>(sstables
| boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); }));
smp::submit_to(shard, [cf, func, infos = std::move(infos), directory] () mutable {
return load_sstables_with_open_info(std::move(infos), cf, directory, [] (auto& p) {
return true;
}).then([func] (std::vector<sstables::shared_sstable> sstables) {
return func(std::move(sstables));
});
}).get();
});
}
// Return all sstables that need resharding in the system. Only one instance of a shared sstable is returned.
static future<std::vector<sstables::shared_sstable>> get_all_shared_sstables(distributed<database>& db, sstring sstdir, global_column_family_ptr cf) {
class all_shared_sstables {
sstring _dir;
global_column_family_ptr _cf;
std::unordered_map<int64_t, sstables::shared_sstable> _result;
public:
all_shared_sstables(const sstring& sstdir, global_column_family_ptr cf) : _dir(sstdir), _cf(cf) {}
future<> operator()(std::vector<sstables::foreign_sstable_open_info> ssts_info) {
return load_sstables_with_open_info(std::move(ssts_info), _cf, _dir, [this] (auto& info) {
// skip loading of shared sstable that is already stored in _result.
return !_result.count(info.generation);
}).then([this] (std::vector<sstables::shared_sstable> sstables) {
for (auto& sst : sstables) {
auto gen = sst->generation();
_result.emplace(gen, std::move(sst));
}
return make_ready_future<>();
});
}
std::vector<sstables::shared_sstable> get() && {
return boost::copy_range<std::vector<sstables::shared_sstable>>(std::move(_result) | boost::adaptors::map_values);
}
};
return db.map_reduce(all_shared_sstables(sstdir, cf), [cf, sstdir] (database& db) mutable {
return seastar::async([cf, sstdir] {
return boost::copy_range<std::vector<sstables::foreign_sstable_open_info>>(cf->sstables_need_rewrite()
| boost::adaptors::filtered([sstdir] (auto&& sst) { return sst->get_dir() == sstdir; })
| boost::adaptors::transformed([] (auto&& sst) { return sst->get_open_info().get0(); }));
});
});
}
template <typename... Args>
static inline
future<> verification_error(fs::path path, const char* fstr, Args&&... args) {
@@ -343,9 +266,8 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
}
return do_with(std::move(reshard_jobs), [&dir, &db, ks_name, table_name, creator = std::move(creator), total_size] (std::vector<reshard_shard_descriptor>& reshard_jobs) {
auto total_size_mb = total_size / 1000000.0;
auto start = std::chrono::steady_clock::now();
dblog.info("{}", fmt::format("Resharding {:.2f} MB", total_size_mb));
dblog.info("{}", fmt::format("Resharding {} ", sstables::pretty_printed_data_size(total_size)));
return dir.invoke_on_all([&dir, &db, &reshard_jobs, ks_name, table_name, creator] (sstables::sstable_directory& d) mutable {
auto& table = db.local().find_column_family(ks_name, table_name);
@@ -356,11 +278,9 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
return d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop).then([&d, &dir] {
return d.move_foreign_sstables(dir);
});
}).then([start, total_size_mb] {
// add a microsecond to prevent division by zero
auto now = std::chrono::steady_clock::now() + 1us;
auto seconds = std::chrono::duration_cast<std::chrono::duration<float>>(now - start).count();
dblog.info("{}", fmt::format("Resharded {:.2f} MB in {:.2f} seconds, {:.2f} MB/s", total_size_mb, seconds, (total_size_mb / seconds)));
}).then([start, total_size] {
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("{}", fmt::format("Resharded {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration)));
return make_ready_future<>();
});
});
@@ -380,12 +300,99 @@ distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<d
});
}
future<int64_t>
highest_generation_seen(sharded<sstables::sstable_directory>& directory) {
return directory.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_generation_seen), int64_t(0), [] (int64_t a, int64_t b) {
return std::max(a, b);
});
}
future<sstables::sstable::version_types>
highest_version_seen(sharded<sstables::sstable_directory>& dir, sstables::sstable_version_types system_version) {
using version = sstables::sstable_version_types;
return dir.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_version_seen), system_version, [] (version a, version b) {
return sstables::is_later(a, b) ? a : b;
});
}
future<>
distributed_loader::process_upload_dir(distributed<database>& db, sstring ks, sstring cf) {
distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstables::reshape_mode mode,
sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) {
auto start = std::chrono::steady_clock::now();
return dir.map_reduce0([&dir, &db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode] (sstables::sstable_directory& d) {
auto& table = db.local().find_column_family(ks_name, table_name);
auto& cm = table.get_compaction_manager();
auto& iop = service::get_local_streaming_read_priority();
return d.reshape(cm, table, creator, iop, mode);
}, uint64_t(0), std::plus<uint64_t>()).then([start] (uint64_t total_size) {
if (total_size > 0) {
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("{}", fmt::format("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration)));
}
return make_ready_future<>();
});
}
// Loads SSTables into the main directory (or staging) and returns how many were loaded
future<size_t>
distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded<database>& db,
sharded<db::view::view_update_generator>& view_update_generator, fs::path datadir, sstring ks, sstring cf) {
auto& table = db.local().find_column_family(ks, cf);
return do_with(dht::ring_position::max(), dht::ring_position::min(), [&table, &dir, &view_update_generator, datadir = std::move(datadir)] (dht::ring_position& min, dht::ring_position& max) {
return dir.do_for_each_sstable([&table, datadir = std::move(datadir), &min, &max] (sstables::shared_sstable sst) {
min = std::min(dht::ring_position(sst->get_first_decorated_key()), min, dht::ring_position_less_comparator(*table.schema()));
max = std::max(dht::ring_position(sst->get_last_decorated_key()) , max, dht::ring_position_less_comparator(*table.schema()));
auto gen = table.calculate_generation_for_new_table();
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), datadir.native(), gen);
return sst->move_to_new_dir(datadir.native(), gen, true).then([&table, sst] {
table._sstables_opened_but_not_loaded.push_back(std::move(sst));
return make_ready_future<>();
});
}).then([&table, &min, &max] {
// nothing loaded
if (min.is_max() && max.is_min()) {
return make_ready_future<>();
}
return table.get_row_cache().invalidate([&table] () noexcept {
for (auto& sst : table._sstables_opened_but_not_loaded) {
try {
table.load_sstable(sst, true);
} catch (...) {
dblog.error("Failed to load {}: {}. Aborting.", sst->toc_filename(), std::current_exception());
abort();
}
}
}, dht::partition_range::make({min, true}, {max, true}));
}).then([&view_update_generator, &table] {
return parallel_for_each(table._sstables_opened_but_not_loaded, [&view_update_generator, &table] (sstables::shared_sstable& sst) {
if (sst->requires_view_building()) {
return view_update_generator.local().register_staging_sstable(sst, table.shared_from_this());
}
return make_ready_future<>();
});
}).then_wrapped([&table] (future<> f) {
auto opened = std::exchange(table._sstables_opened_but_not_loaded, {});
if (!f.failed()) {
return make_ready_future<size_t>(opened.size());
} else {
return make_exception_future<size_t>(f.get_exception());
}
});
});
}
future<>
distributed_loader::process_upload_dir(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
distributed<db::view::view_update_generator>& view_update_generator, sstring ks, sstring cf) {
seastar::thread_attributes attr;
attr.sched_group = db.local().get_streaming_scheduling_group();
return seastar::async(std::move(attr), [&db, ks = std::move(ks), cf = std::move(cf)] {
return seastar::async(std::move(attr), [&db, &view_update_generator, &sys_dist_ks, ks = std::move(ks), cf = std::move(cf)] {
global_column_family_ptr global_table(db, ks, cf);
sharded<sstables::sstable_directory> directory;
@@ -396,7 +403,7 @@ distributed_loader::process_upload_dir(distributed<database>& db, sstring ks, ss
sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()),
sstables::sstable_directory::allow_loading_materialized_view::no,
[&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return global_table->make_sstable(dir.native(), gen, v, f);
return global_table->make_sstable(dir.native(), gen, v, f, &error_handler_gen_for_upload_dir);
}).get();
@@ -406,13 +413,9 @@ distributed_loader::process_upload_dir(distributed<database>& db, sstring ks, ss
lock_table(directory, db, ks, cf).get();
process_sstable_dir(directory).get();
auto highest_generation_seen = directory.map_reduce0(
std::mem_fn(&sstables::sstable_directory::highest_generation_seen),
int64_t(0),
[] (int64_t a, int64_t b) { return std::max(a, b); }
).get0();
auto shard_generation_base = highest_generation_seen / smp::count + 1;
auto generation = highest_generation_seen(directory).get0();
auto shard_generation_base = generation / smp::count + 1;
// We still want to do our best to keep the generation numbers shard-friendly.
// Each destination shard will manage its own generation counter.
@@ -427,80 +430,30 @@ distributed_loader::process_upload_dir(distributed<database>& db, sstring ks, ss
return global_table->make_sstable(upload.native(), gen,
global_table->get_sstables_manager().get_highest_supported_format(),
sstables::sstable::format_types::big);
sstables::sstable::format_types::big, &error_handler_gen_for_upload_dir);
}).get();
});
}
// This function will iterate through upload directory in column family,
// and will do the following for each sstable found:
// 1) Mutate sstable level to 0.
// 2) Check if view updates need to be generated from this sstable. If so, leave it intact for now.
// 3) Otherwise, create hard links to its components in column family dir.
// 4) Remove all of its components in upload directory.
// At the end, it's expected that upload dir contains only staging sstables
// which need to wait until view updates are generated from them.
//
// Return a vector containing descriptor of sstables to be loaded.
future<std::vector<sstables::entry_descriptor>>
distributed_loader::flush_upload_dir(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks, sstring ks_name, sstring cf_name) {
return seastar::async([&db, &sys_dist_ks, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] {
std::unordered_map<int64_t, sstables::entry_descriptor> descriptors;
std::vector<sstables::entry_descriptor> flushed;
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [global_table, upload, &shard_gen] (shard_id shard) {
auto gen = shard_gen[shard].fetch_add(smp::count, std::memory_order_relaxed);
return global_table->make_sstable(upload.native(), gen,
global_table->get_sstables_manager().get_highest_supported_format(),
sstables::sstable::format_types::big,
&error_handler_gen_for_upload_dir);
}).get();
auto& cf = db.local().find_column_family(ks_name, cf_name);
auto upload_dir = fs::path(cf._config.datadir) / "upload";
verify_owner_and_mode(upload_dir).get();
lister::scan_dir(upload_dir, { directory_entry_type::regular }, [&descriptors] (fs::path parent_dir, directory_entry de) {
auto comps = sstables::entry_descriptor::make_descriptor(parent_dir.native(), de.name);
if (comps.component != component_type::TOC) {
return make_ready_future<>();
}
descriptors.emplace(comps.generation, std::move(comps));
return make_ready_future<>();
}, &sstables::manifest_json_filter).get();
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0();
flushed.reserve(descriptors.size());
for (auto& [generation, comps] : descriptors) {
auto descriptors = db.invoke_on(column_family::calculate_shard_from_sstable_generation(generation), [&sys_dist_ks, ks_name, cf_name, comps] (database& db) {
return seastar::async([&db, &sys_dist_ks, ks_name = std::move(ks_name), cf_name = std::move(cf_name), comps = std::move(comps)] () mutable {
auto& cf = db.find_column_family(ks_name, cf_name);
auto sst = cf.make_sstable(cf._config.datadir + "/upload", comps.generation, comps.version, comps.format,
[] (disk_error_signal_type&) { return error_handler_for_upload_dir(); });
auto gen = cf.calculate_generation_for_new_table();
sst->read_toc().get();
schema_ptr s = cf.schema();
if (s->is_counter() && !sst->has_scylla_component()) {
sstring error = "Direct loading non-Scylla SSTables containing counters is not supported.";
if (db.get_config().enable_dangerous_direct_import_of_cassandra_counters()) {
dblog.info("{} But trying to continue on user's request.", error);
} else {
dblog.error("{} Use sstableloader instead.", error);
throw std::runtime_error(fmt::format("{} Use sstableloader instead.", error));
}
}
if (s->is_view()) {
throw std::runtime_error("Loading Materialized View SSTables is not supported. Re-create the view instead.");
}
sst->mutate_sstable_level(0).get();
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), cf, streaming::stream_reason::repair).get0();
sstring datadir = cf._config.datadir;
if (use_view_update_path) {
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
datadir += "/staging";
}
sst->create_links(datadir, gen).get();
sstables::remove_by_toc_name(sst->toc_filename(), error_handler_for_upload_dir()).get();
comps.generation = gen;
comps.sstdir = std::move(datadir);
return std::move(comps);
});
}).get0();
flushed.push_back(std::move(descriptors));
auto datadir = upload.parent_path();
if (use_view_update_path) {
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
datadir /= "staging";
}
return std::vector<sstables::entry_descriptor>(std::move(flushed));
size_t loaded = directory.map_reduce0([&db, ks, cf, datadir, &view_update_generator] (sstables::sstable_directory& dir) {
return make_sstables_available(dir, db, view_update_generator, datadir, ks, cf);
}, size_t(0), std::plus<size_t>()).get0();
dblog.info("Loaded {} SSTables into {}", loaded, datadir.native());
});
}
@@ -552,205 +505,6 @@ future<> distributed_loader::open_sstable(distributed<database>& db, sstables::e
});
}
// invokes each descriptor at its target shard, which involves forwarding sstables too.
static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, sstring directory, std::vector<sstables::resharding_descriptor> jobs,
std::function<future<> (std::vector<sstables::shared_sstable>, uint32_t level, uint64_t max_sstable_bytes)> func) {
return parallel_for_each(std::move(jobs), [cf, func, &directory] (sstables::resharding_descriptor& job) mutable {
return forward_sstables_to(job.reshard_at, directory, std::move(job.sstables), cf,
[cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
// compaction manager ensures that only one reshard operation will run per shard.
auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
return func(std::move(sstables), level, max_sstable_bytes);
};
return cf->get_compaction_manager().run_resharding_job(&*cf, std::move(job));
});
});
}
static std::vector<sstables::shared_sstable> sstables_for_shard(const std::vector<sstables::shared_sstable>& sstables, shard_id shard) {
auto belongs_to_shard = [] (const sstables::shared_sstable& sst, unsigned shard) {
auto& shards = sst->get_shards_for_this_sstable();
return boost::range::find(shards, shard) != shards.end();
};
return boost::copy_range<std::vector<sstables::shared_sstable>>(sstables
| boost::adaptors::filtered([&] (auto& sst) { return belongs_to_shard(sst, shard); }));
}
void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sstring cf_name) {
assert(this_shard_id() == 0); // NOTE: should always run on shard 0!
// ensures that only one column family is resharded at a time (that's okay because
// actual resharding is parallelized), and that's needed to prevent the same column
// family from being resharded in parallel (that could happen, for example, if
// refresh (triggers resharding) is issued by user while resharding is going on).
static semaphore sem(1);
// FIXME: discarded future.
(void)with_semaphore(sem, 1, [&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable {
return seastar::async([&db, ks_name = std::move(ks_name), cf_name = std::move(cf_name)] () mutable {
global_column_family_ptr cf(db, ks_name, cf_name);
if (!cf->get_compaction_manager().enabled()) {
return;
}
// fast path to detect that this column family doesn't need reshard.
if (!worth_resharding(db, cf).get0()) {
dblog.debug("Nothing to reshard for {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name());
return;
}
parallel_for_each(cf->_config.all_datadirs, [&db, cf] (const sstring& directory) {
auto candidates = get_all_shared_sstables(db, directory, cf).get0();
dblog.debug("{} candidates for resharding for {}.{}", candidates.size(), cf->schema()->ks_name(), cf->schema()->cf_name());
auto jobs = cf->get_compaction_strategy().get_resharding_jobs(*cf, std::move(candidates));
dblog.debug("{} resharding jobs for {}.{}", jobs.size(), cf->schema()->ks_name(), cf->schema()->cf_name());
return invoke_all_resharding_jobs(cf, directory, std::move(jobs), [directory, &cf] (auto sstables, auto level, auto max_sstable_bytes) {
// FIXME: run it in maintenance priority.
// Resharding, currently, cannot provide compaction with a snapshot of the sstable set
// which spans all shards that input sstables belong to, so expiration is disabled.
std::optional<sstables::sstable_set> sstable_set = std::nullopt;
sstables::compaction_descriptor descriptor(sstables, std::move(sstable_set), service::get_local_compaction_priority(),
level, max_sstable_bytes);
descriptor.options = sstables::compaction_options::make_reshard();
descriptor.creator = [&cf, directory] (shard_id shard) mutable {
// we need generation calculated by instance of cf at requested shard,
// or resource usage wouldn't be fairly distributed among shards.
auto gen = smp::submit_to(shard, [&cf] () {
return cf->calculate_generation_for_new_table();
}).get0();
return cf->make_sstable(directory, gen,
cf->get_sstables_manager().get_highest_supported_format(),
sstables::sstable::format_types::big);
};
auto f = sstables::compact_sstables(std::move(descriptor), *cf);
return f.then([&cf, sstables = std::move(sstables), directory] (sstables::compaction_info info) mutable {
auto new_sstables = std::move(info.new_sstables);
// an input sstable may belong to shard 1 and 2 and only have data which
// token belongs to shard 1. That means resharding will only create a
// sstable for shard 1, but both shards opened the sstable. So our code
// below should ask both shards to remove the resharded table, or it
// wouldn't be deleted by our deletion manager, and resharding would be
// triggered again in the subsequent boot.
return parallel_for_each(boost::irange(0u, smp::count), [&cf, directory, sstables, new_sstables] (auto shard) {
auto old_sstables_for_shard = sstables_for_shard(sstables, shard);
// nothing to do if no input sstable belongs to this shard.
if (old_sstables_for_shard.empty()) {
return make_ready_future<>();
}
auto new_sstables_for_shard = sstables_for_shard(new_sstables, shard);
// sanity checks
for (auto& sst : new_sstables_for_shard) {
auto& shards = sst->get_shards_for_this_sstable();
if (shards.size() != 1) {
throw std::runtime_error(format("resharded sstable {} doesn't belong to only one shard", sst->get_filename()));
}
if (shards.front() != shard) {
throw std::runtime_error(format("resharded sstable {} should belong to shard {:d}", sst->get_filename(), shard));
}
}
std::unordered_set<uint64_t> ancestors;
boost::range::transform(old_sstables_for_shard, std::inserter(ancestors, ancestors.end()),
std::mem_fn(&sstables::sstable::generation));
if (new_sstables_for_shard.empty()) {
// handles case where sstable needing rewrite doesn't produce any sstable
// for a shard it belongs to when resharded (the reason is explained above).
return smp::submit_to(shard, [cf, ancestors = std::move(ancestors)] () mutable {
return cf->remove_ancestors_needed_rewrite(ancestors);
});
} else {
return forward_sstables_to(shard, directory, new_sstables_for_shard, cf, [cf, ancestors = std::move(ancestors)] (std::vector<sstables::shared_sstable> sstables) mutable {
return cf->replace_ancestors_needed_rewrite(std::move(ancestors), std::move(sstables));
});
}
}).then([&cf, sstables] {
// schedule deletion of shared sstables after we're certain that new unshared ones were successfully forwarded to respective shards.
(void)sstables::delete_atomically(sstables).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (...) {
dblog.warn("Exception in resharding when deleting sstable file: {}", eptr);
}
}).then([cf, sstables = std::move(sstables)] {
// Refresh cache's snapshot of shards involved in resharding to prevent the cache from
// holding reference to deleted files which results in disk space not being released.
std::unordered_set<shard_id> owner_shards;
for (auto& sst : sstables) {
const auto& shards = sst->get_shards_for_this_sstable();
owner_shards.insert(shards.begin(), shards.end());
if (owner_shards.size() == smp::count) {
break;
}
}
return parallel_for_each(std::move(owner_shards), [cf] (shard_id shard) {
return smp::submit_to(shard, [cf] () mutable {
cf->_cache.refresh_snapshot();
});
});
});
});
});
});
}).get();
});
});
}
future<> distributed_loader::load_new_sstables(distributed<database>& db, distributed<db::view::view_update_generator>& view_update_generator,
sstring ks, sstring cf, std::vector<sstables::entry_descriptor> new_tables) {
return parallel_for_each(new_tables, [&] (auto comps) {
auto cf_sstable_open = [comps] (column_family& cf, sstables::foreign_sstable_open_info info) {
auto f = cf.open_sstable(std::move(info), comps.sstdir, comps.generation, comps.version, comps.format);
return f.then([&cf] (sstables::shared_sstable sst) mutable {
if (sst) {
cf._sstables_opened_but_not_loaded.push_back(sst);
}
return make_ready_future<>();
});
};
return distributed_loader::open_sstable(db, comps, cf_sstable_open, service::get_local_compaction_priority())
.handle_exception([comps, ks, cf] (std::exception_ptr ep) {
auto name = sstables::sstable::filename(comps.sstdir, ks, cf, comps.version, comps.generation, comps.format, sstables::component_type::TOC);
dblog.error("Failed to open {}: {}", name, ep);
return make_exception_future<>(ep);
});
}).then([&db, &view_update_generator, ks, cf] {
return db.invoke_on_all([&view_update_generator, ks = std::move(ks), cfname = std::move(cf)] (database& db) {
auto& cf = db.find_column_family(ks, cfname);
return cf.get_row_cache().invalidate([&view_update_generator, &cf] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
// atomically load all opened sstables into column family.
for (auto& sst : cf._sstables_opened_but_not_loaded) {
try {
cf.load_sstable(sst, true);
} catch(...) {
dblog.error("Failed to load {}: {}. Aborting.", sst->toc_filename(), std::current_exception());
abort();
}
if (sst->requires_view_building()) {
// FIXME: discarded future.
(void)view_update_generator.local().register_staging_sstable(sst, cf.shared_from_this());
}
}
cf._sstables_opened_but_not_loaded.clear();
cf.trigger_compaction();
});
});
}).handle_exception([&db, ks, cf] (std::exception_ptr ep) {
return db.invoke_on_all([ks = std::move(ks), cfname = std::move(cf)] (database& db) {
auto& cf = db.find_column_family(ks, cfname);
cf._sstables_opened_but_not_loaded.clear();
}).then([ep] {
return make_exception_future<>(ep);
});
});
}
future<sstables::entry_descriptor> distributed_loader::probe_file(distributed<database>& db, sstring sstdir, sstring fname) {
using namespace sstables;
@@ -872,105 +626,71 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
});
}
future<> distributed_loader::do_populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
// We can catch most errors when we try to load an sstable. But if the TOC
// file is the one missing, we won't try to load the sstable at all. This
// case is still an invalid case, but it is way easier for us to treat it
// by waiting for all files to be loaded, and then checking if we saw a
// file during scan_dir, without its corresponding TOC.
enum class component_status {
has_some_file,
has_toc_file,
has_temporary_toc_file,
};
struct sstable_descriptor {
component_status status;
sstables::sstable::version_types version;
sstables::sstable::format_types format;
};
auto verifier = make_lw_shared<std::unordered_map<unsigned long, sstable_descriptor>>();
return do_with(std::vector<future<>>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector<future<>>& futures) {
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) {
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
// push future returned by probe_file into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) {
if (entry.component == component_type::TemporaryStatistics) {
return remove_file(sstables::sstable::filename(sstdir.native(), entry.ks, entry.cf, entry.version, entry.generation,
entry.format, component_type::TemporaryStatistics));
}
if (verifier->count(entry.generation)) {
if (verifier->at(entry.generation).status == component_status::has_toc_file) {
fs::path file_path(sstdir / de.name);
if (entry.component == component_type::TOC) {
throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed", file_path.native());
} else if (entry.component == component_type::TemporaryTOC) {
throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed", file_path.native());
}
} else if (entry.component == component_type::TOC) {
verifier->at(entry.generation).status = component_status::has_toc_file;
} else if (entry.component == component_type::TemporaryTOC) {
verifier->at(entry.generation).status = component_status::has_temporary_toc_file;
}
} else {
if (entry.component == component_type::TOC) {
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_toc_file, entry.version, entry.format});
} else if (entry.component == component_type::TemporaryTOC) {
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_temporary_toc_file, entry.version, entry.format});
} else {
verifier->emplace(entry.generation, sstable_descriptor{component_status::has_some_file, entry.version, entry.format});
}
}
return make_ready_future<>();
});
futures.push_back(std::move(f));
return make_ready_future<>();
}, &sstables::manifest_json_filter).then([&futures] {
return execute_futures(futures);
}).then([verifier, sstdir, ks = std::move(ks), cf = std::move(cf)] {
return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), verifier] (auto v) {
if (v.second.status == component_status::has_temporary_toc_file) {
unsigned long gen = v.first;
sstables::sstable::version_types version = v.second.version;
sstables::sstable::format_types format = v.second.format;
if (this_shard_id() != 0) {
dblog.debug("At directory: {}, partial SSTable with generation {} not relevant for this shard, ignoring", sstdir, v.first);
return make_ready_future<>();
}
// shard 0 is the responsible for removing a partial sstable.
return sstables::sstable::remove_sstable_with_temp_toc(ks, cf, sstdir, gen, version, format);
} else if (v.second.status != component_status::has_toc_file) {
throw sstables::malformed_sstable_exception(format("At directory: {}: no TOC found for SSTable with generation {:d}!. Refusing to boot", sstdir, v.first));
}
return make_ready_future<>();
});
});
});
}
future<> distributed_loader::populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] {
assert(this_shard_id() == 0);
// First pass, cleanup temporary sstable directories and sstables pending delete.
if (this_shard_id() == 0) {
cleanup_column_family_temp_sst_dirs(sstdir).get();
auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename();
auto exists = file_exists(pending_delete_dir).get0();
if (exists) {
handle_sstables_pending_delete(pending_delete_dir).get();
}
cleanup_column_family_temp_sst_dirs(sstdir).get();
auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename();
auto exists = file_exists(pending_delete_dir).get0();
if (exists) {
handle_sstables_pending_delete(pending_delete_dir).get();
}
// Second pass, cleanup sstables with temporary TOCs and load the rest.
do_populate_column_family(db, std::move(sstdir), std::move(ks), std::move(cf)).get();
global_column_family_ptr global_table(db, ks, cf);
sharded<sstables::sstable_directory> directory;
directory.start(fs::path(sstdir), db.local().get_config().initial_sstable_loading_concurrency(),
sstables::sstable_directory::need_mutate_level::yes,
sstables::sstable_directory::lack_of_toc_fatal::yes,
sstables::sstable_directory::enable_dangerous_direct_import_of_cassandra_counters(db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters()),
sstables::sstable_directory::allow_loading_materialized_view::yes,
[&global_table] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return global_table->make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&directory] {
directory.stop().get();
});
lock_table(directory, db, ks, cf).get();
process_sstable_dir(directory).get();
// If we are resharding system tables before we can read them, we will not
// know which is the highest format we support: this information is itself stored
// in the system tables. In that case we'll rely on what we find on disk: we'll
// at least not downgrade any files. If we already know that we support a higher
// format than the one we see then we use that.
auto sys_format = global_table->get_sstables_manager().get_highest_supported_format();
auto sst_version = highest_version_seen(directory, sys_format).get0();
auto generation = highest_generation_seen(directory).get0();
db.invoke_on_all([&global_table, generation] (database& db) {
global_table->update_sstables_known_generation(generation);
global_table->disable_auto_compaction();
return make_ready_future<>();
}).get();
reshard(directory, db, ks, cf, [&global_table, sstdir, sst_version] (shard_id shard) mutable {
auto gen = smp::submit_to(shard, [&global_table] () {
return global_table->calculate_generation_for_new_table();
}).get0();
return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big);
}).get();
// The node is offline at this point so we are very lenient with what we consider
// offstrategy.
reshape(directory, db, sstables::reshape_mode::relaxed, ks, cf, [global_table, sstdir, sst_version] (shard_id shard) {
auto gen = global_table->calculate_generation_for_new_table();
return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big);
}).get();
directory.invoke_on_all([global_table] (sstables::sstable_directory& dir) {
return dir.do_for_each_sstable([&global_table] (sstables::shared_sstable sst) {
return global_table->add_sstable_and_update_cache(sst);
});
}).get();
});
}

View File

@@ -28,6 +28,7 @@
#include <seastar/core/file.hh>
#include <vector>
#include <functional>
#include <filesystem>
#include "seastarx.hh"
#include "sstables/compaction_descriptor.hh"
@@ -58,19 +59,22 @@ class migration_manager;
class distributed_loader {
public:
static future<> reshape(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstables::reshape_mode mode,
sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator);
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator);
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir);
static future<> lock_table(sharded<sstables::sstable_directory>& dir, sharded<database>& db, sstring ks_name, sstring cf_name);
static void reshard(distributed<database>& db, sstring ks_name, sstring cf_name);
static future<> open_sstable(distributed<database>& db, sstables::entry_descriptor comps,
std::function<future<> (column_family&, sstables::foreign_sstable_open_info)> func,
const io_priority_class& pc = default_priority_class());
static future<> verify_owner_and_mode(std::filesystem::path path);
static future<> load_new_sstables(distributed<database>& db, distributed<db::view::view_update_generator>& view_update_generator,
sstring ks, sstring cf, std::vector<sstables::entry_descriptor> new_tables);
static future<std::vector<sstables::entry_descriptor>> flush_upload_dir(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks, sstring ks_name, sstring cf_name);
static future<> process_upload_dir(distributed<database>& db, sstring ks_name, sstring cf_name);
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
sharded<database>& db, sharded<db::view::view_update_generator>& view_update_generator,
std::filesystem::path datadir, sstring ks, sstring cf);
static future<> process_upload_dir(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks,
distributed<db::view::view_update_generator>& view_update_generator, sstring ks_name, sstring cf_name);
static future<sstables::entry_descriptor> probe_file(distributed<database>& db, sstring sstdir, sstring fname);
static future<> populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<database>& db, sstring datadir, sstring ks_name);

20
main.cc
View File

@@ -762,6 +762,11 @@ int main(int ac, char** av) {
dirs.init(*cfg, bool(hinted_handoff_enabled)).get();
// We need the compaction manager ready early so we can reshard.
db.invoke_on_all([&proxy, &stop_signal] (database& db) {
db.get_compaction_manager().enable();
}).get();
// Initialization of a keyspace is done by shard 0 only. For system
// keyspace, the procedure will go through the hardcoded column
// families, and in each of them, it will load the sstables for all
@@ -926,8 +931,11 @@ int main(int ac, char** av) {
}
}
db.invoke_on_all([&proxy] (database& db) {
db.get_compaction_manager().enable();
db.invoke_on_all([] (database& db) {
for (auto& x : db.get_column_families()) {
table& t = *(x.second);
t.enable_auto_compaction();
}
}).get();
// If the same sstable is shared by several shards, it cannot be
@@ -937,10 +945,10 @@ int main(int ac, char** av) {
// we will have races between the compaction and loading processes
// We also want to trigger regular compaction on boot.
for (auto& x : db.local().get_column_families()) {
column_family& cf = *(x.second);
distributed_loader::reshard(db, cf.schema()->ks_name(), cf.schema()->cf_name());
}
// FIXME: temporary as this code is being replaced. I am keeping the scheduling
// group that was effectively used in the bulk of it (compaction). Soon it will become
// streaming
db.invoke_on_all([&proxy] (database& db) {
for (auto& x : db.get_column_families()) {
column_family& cf = *(x.second);

View File

@@ -2789,18 +2789,6 @@ void storage_service::add_expire_time_if_found(inet_address endpoint, int64_t ex
// All the global operations are going to happen here, and just the reloading happens
// in there.
future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
class max_element {
int64_t _result = 0;
public:
future<> operator()(int64_t value) {
_result = std::max(value, _result);
return make_ready_future<>();
}
int64_t get() && {
return _result;
}
};
if (_loading_new_sstables) {
throw std::runtime_error("Already loading SSTables. Try again later");
} else {
@@ -2809,100 +2797,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
slogger.info("Loading new SSTables for {}.{}...", ks_name, cf_name);
return distributed_loader::process_upload_dir(_db, ks_name, cf_name).then([this, ks_name, cf_name] {
// First, we need to stop SSTable creation for that CF in all shards. This is a really horrible
// thing to do, because under normal circumnstances this can make dirty memory go up to the point
// of explosion.
//
// Remember, however, that we are assuming this is going to be ran on an empty CF. In that scenario,
// stopping the SSTables should have no effect, while guaranteeing we will see no data corruption
// * in case * this is ran on a live CF.
//
// The statement above is valid at least from the Scylla side of things: it is still totally possible
// that someones just copies the table over existing ones. There isn't much we can do about it.
return _db.map_reduce(max_element(), [ks_name, cf_name] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);
return cf.disable_sstable_write();
}).then([this, cf_name, ks_name] (int64_t max_seen_sstable) {
// Then, we will reshuffle the tables to make sure that the generation numbers don't go too high.
// We will do all of it the same CPU, to make sure that we won't have two parallel shufflers stepping
// onto each other.
class all_generations {
std::set<int64_t> _result;
public:
future<> operator()(std::set<int64_t> value) {
_result.insert(value.begin(), value.end());
return make_ready_future<>();
}
std::set<int64_t> get() && {
return _result;
}
};
// We provide to reshuffle_sstables() the generation of all existing sstables, such that it will
// easily know which sstables are new.
return _db.map_reduce(all_generations(), [ks_name, cf_name] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);
std::set<int64_t> generations;
for (auto& p : *(cf.get_sstables())) {
generations.insert(p->generation());
}
return make_ready_future<std::set<int64_t>>(std::move(generations));
}).then([this, max_seen_sstable, ks_name, cf_name] (std::set<int64_t> all_generations) {
auto shard = std::hash<sstring>()(cf_name) % smp::count;
return _db.invoke_on(shard, [ks_name, cf_name, max_seen_sstable, all_generations = std::move(all_generations)] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);
return cf.reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1);
});
});
}).then_wrapped([this, ks_name, cf_name] (future<std::vector<sstables::entry_descriptor>> f) {
std::vector<sstables::entry_descriptor> new_tables;
std::exception_ptr eptr;
int64_t new_gen = -1;
try {
new_tables = f.get0();
} catch(std::exception& e) {
slogger.error("Loading of new tables failed to {}.{} due to {}", ks_name, cf_name, e.what());
eptr = std::current_exception();
} catch(...) {
slogger.error("Loading of new tables failed to {}.{} due to unexpected reason", ks_name, cf_name);
eptr = std::current_exception();
}
if (new_tables.size() > 0) {
new_gen = new_tables.back().generation;
}
slogger.debug("Now accepting writes for sstables with generation larger or equal than {}", new_gen);
return _db.invoke_on_all([ks_name, cf_name, new_gen] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);
auto disabled = std::chrono::duration_cast<std::chrono::microseconds>(cf.enable_sstable_write(new_gen)).count();
slogger.info("CF {}.{} at shard {} had SSTables writes disabled for {} usec", ks_name, cf_name, this_shard_id(), disabled);
return make_ready_future<>();
}).then([new_tables = std::move(new_tables), eptr = std::move(eptr)] {
if (eptr) {
return make_exception_future<std::vector<sstables::entry_descriptor>>(eptr);
}
return make_ready_future<std::vector<sstables::entry_descriptor>>(std::move(new_tables));
});
}).then([this, ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables) {
auto f = distributed_loader::flush_upload_dir(_db, _sys_dist_ks, ks_name, cf_name);
return f.then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables_from_upload) mutable {
if (new_tables.empty() && new_tables_from_upload.empty()) {
slogger.info("No new SSTables were found for {}.{}", ks_name, cf_name);
}
// merge new sstables found in both column family and upload directories, if any.
new_tables.insert(new_tables.end(), new_tables_from_upload.begin(), new_tables_from_upload.end());
return make_ready_future<std::vector<sstables::entry_descriptor>>(std::move(new_tables));
});
}).then([this, ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables) {
return distributed_loader::load_new_sstables(_db, _view_update_generator, ks_name, cf_name, std::move(new_tables)).then([ks_name, cf_name] {
slogger.info("Done loading new SSTables for {}.{} for all shards", ks_name, cf_name);
});
});
}).finally([this] {
return distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name).finally([this, ks_name, cf_name] {
slogger.info("Done loading new SSTables for {}.{}", ks_name, cf_name);
_loading_new_sstables = false;
});
}
@@ -2921,6 +2817,25 @@ void storage_service::shutdown_client_servers() {
}
}
future<>
storage_service::set_tables_autocompaction(const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
if (_initialized) {
return make_exception_future<>(std::runtime_error("Too early: storage service not initialized yet"));
}
return _db.invoke_on_all([keyspace, tables, enabled] (database& db) {
return parallel_for_each(tables, [&db, keyspace, enabled](const sstring& table) mutable {
column_family& cf = db.find_column_family(keyspace, table);
if (enabled) {
cf.enable_auto_compaction();
} else {
cf.disable_auto_compaction();
}
return make_ready_future<>();
});
});
}
std::unordered_multimap<inet_address, dht::token_range>
storage_service::get_new_source_ranges(const sstring& keyspace_name, const dht::token_range_vector& ranges) {
auto my_address = get_broadcast_address();

View File

@@ -853,6 +853,8 @@ public:
*/
future<> load_new_sstables(sstring ks_name, sstring cf_name);
future<> set_tables_autocompaction(const sstring &keyspace, std::vector<sstring> tables, bool enabled);
template <typename Func>
auto run_with_api_lock(sstring operation, Func&& func) {
return get_storage_service().invoke_on(0, [operation = std::move(operation),

View File

@@ -74,6 +74,25 @@ namespace sstables {
logging::logger clogger("compaction");
std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) {
static constexpr const char* suffixes[] = { " bytes", "kB", "MB", "GB", "TB", "PB" };
unsigned exp = 0;
while ((data._size >= 1000) && (exp < sizeof(suffixes))) {
exp++;
data._size /= 1000;
}
os << data._size << suffixes[exp];
return os;
}
std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) {
uint64_t throughput = tp._duration.count() > 0 ? tp._size / tp._duration.count() : 0;
os << pretty_printed_data_size(throughput) << "/s";
return os;
}
static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf, sstable_set::incremental_selector& selector,
const std::unordered_set<shared_sstable>& compacting_set, const dht::decorated_key& dk) {
auto timestamp = api::max_timestamp;
@@ -371,34 +390,6 @@ public:
}
};
// Resharding doesn't really belong into any strategy, because it is not worried about laying out
// SSTables according to any strategy-specific criteria. So we will just make it proportional to
// the amount of data we still have to reshard.
//
// Although at first it may seem like we could improve this by tracking the ongoing reshard as well
// and reducing the backlog as we compact, that is not really true. Resharding is not really
// expected to get rid of data and it is usually just splitting data among shards. Whichever backlog
// we get rid of by tracking the compaction will come back as a big spike as we add this SSTable
// back to their rightful shard owners.
//
// So because the data is supposed to be constant, we will just add the total amount of data as the
// backlog.
class resharding_backlog_tracker final : public compaction_backlog_tracker::impl {
uint64_t _total_bytes = 0;
public:
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
return _total_bytes;
}
virtual void add_sstable(sstables::shared_sstable sst) override {
_total_bytes += sst->data_size();
}
virtual void remove_sstable(sstables::shared_sstable sst) override {
_total_bytes -= sst->data_size();
}
};
class compaction {
protected:
column_family& _cf;
@@ -583,15 +574,19 @@ private:
return consumer(make_sstable_reader());
}
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0;
virtual bool use_interposer_consumer() const = 0;
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) {
return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
}
virtual bool use_interposer_consumer() const {
return _cf.get_compaction_strategy().use_interposer_consumer();
}
compaction_info finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) {
_info->ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(ended_at.time_since_epoch()).count();
auto ratio = double(_info->end_size) / double(_info->start_size);
auto duration = std::chrono::duration<float>(ended_at - started_at);
// Don't report NaN or negative number.
auto throughput = duration.count() > 0 ? (double(_info->end_size) / (1024*1024)) / duration.count() : double{};
sstring new_sstables_msg;
on_end_of_compaction();
@@ -605,10 +600,10 @@ private:
// - add support to merge summary (message: Partition merge counts were {%s}.).
// - there is no easy way, currently, to know the exact number of total partitions.
// By the time being, using estimated key count.
sstring formatted_msg = sprint("%ld sstables to [%s]. %ld bytes to %ld (~%d%% of original) in %dms = %.2fMB/s. " \
"~%ld total partitions merged to %ld.",
_info->sstables, new_sstables_msg, _info->start_size, _info->end_size, int(ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), throughput,
sstring formatted_msg = fmt::format("{} sstables to [{}]. {} to {} (~{} of original) in {}ms = {}. " \
"~{} total partitions merged to {}.",
_info->sstables, new_sstables_msg, pretty_printed_data_size(_info->start_size), pretty_printed_data_size(_info->end_size), int(ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
_info->total_partitions, _info->total_keys_written);
report_finish(formatted_msg, ended_at);
@@ -621,7 +616,7 @@ private:
virtual void report_start(const sstring& formatted_msg) const = 0;
virtual void report_finish(const sstring& formatted_msg, std::chrono::time_point<db_clock> ended_at) const = 0;
virtual void backlog_tracker_adjust_charges() = 0;
virtual void backlog_tracker_adjust_charges() { };
std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() {
if (!tombstone_expiration_enabled()) {
@@ -640,9 +635,9 @@ private:
};
}
virtual void on_new_partition() = 0;
virtual void on_new_partition() {}
virtual void on_end_of_compaction() = 0;
virtual void on_end_of_compaction() {};
// create a writer based on decorated key.
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) = 0;
@@ -743,6 +738,53 @@ void garbage_collected_sstable_writer::data::finish_sstable_writer() {
}
}
class reshape_compaction : public compaction {
public:
reshape_compaction(column_family& cf, compaction_descriptor descriptor)
: compaction(cf, std::move(descriptor)) {
_info->run_identifier = _run_identifier;
_info->type = compaction_type::Reshape;
}
flat_mutation_reader make_sstable_reader() const override {
return ::make_local_shard_sstable_reader(_schema,
_permit,
_compacting,
query::full_partition_range,
_schema->full_slice(),
_io_priority,
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
default_read_monitor_generator());
}
void report_start(const sstring& formatted_msg) const override {
clogger.info("Reshaping {}", formatted_msg);
}
void report_finish(const sstring& formatted_msg, std::chrono::time_point<db_clock> ended_at) const override {
clogger.info("Reshaped {}", formatted_msg);
}
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
auto sst = _sstable_creator(this_shard_id());
setup_new_sstable(sst);
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer();
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &default_write_monitor();
cfg.run_identifier = _run_identifier;
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats(), _io_priority), sst};
}
virtual void stop_sstable_writer(compaction_writer* writer) override {
if (writer) {
finish_new_sstable(writer);
}
}
};
class regular_compaction : public compaction {
// sstable being currently written.
mutable compaction_read_monitor_generator _monitor_generator;
@@ -768,14 +810,6 @@ public:
_monitor_generator);
}
reader_consumer make_interposer_consumer(reader_consumer end_consumer) override {
return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
}
bool use_interposer_consumer() const override {
return _cf.get_compaction_strategy().use_interposer_consumer();
}
void report_start(const sstring& formatted_msg) const override {
clogger.info("Compacting {}", formatted_msg);
}
@@ -1185,7 +1219,6 @@ flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_co
class resharding_compaction final : public compaction {
shard_id _shard; // shard of current sstable writer
compaction_backlog_tracker _resharding_backlog_tracker;
// Partition count estimation for a shard S:
//
@@ -1215,14 +1248,10 @@ private:
public:
resharding_compaction(column_family& cf, sstables::compaction_descriptor descriptor)
: compaction(cf, std::move(descriptor))
, _resharding_backlog_tracker(std::make_unique<resharding_backlog_tracker>())
, _estimation_per_shard(smp::count)
, _run_identifiers(smp::count)
{
cf.get_compaction_manager().register_backlog_tracker(_resharding_backlog_tracker);
for (auto& sst : _sstables) {
_resharding_backlog_tracker.add_sstable(sst);
const auto& shards = sst->get_shards_for_this_sstable();
auto size = sst->bytes_on_disk();
auto estimated_partitions = sst->get_estimated_key_count();
@@ -1237,11 +1266,7 @@ public:
_info->type = compaction_type::Reshard;
}
~resharding_compaction() {
for (auto& s : _sstables) {
_resharding_backlog_tracker.remove_sstable(s);
}
}
~resharding_compaction() { }
// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader make_sstable_reader() const override {
@@ -1284,6 +1309,7 @@ public:
sstable_writer_config cfg = _cf.get_sstables_manager().configure_writer();
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &default_write_monitor();
// sstables generated for a given shard will share the same run identifier.
cfg.run_identifier = _run_identifiers.at(shard);
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(shard), cfg, get_encoding_stats(), _io_priority, shard), sst};
@@ -1320,7 +1346,14 @@ future<compaction_info> compaction::run(std::unique_ptr<compaction> c, GCConsume
compaction_type compaction_options::type() const {
// Maps options_variant indexes to the corresponding compaction_type member.
static const compaction_type index_to_type[] = {compaction_type::Compaction, compaction_type::Cleanup, compaction_type::Upgrade, compaction_type::Scrub, compaction_type::Reshard};
static const compaction_type index_to_type[] = {
compaction_type::Compaction,
compaction_type::Cleanup,
compaction_type::Upgrade,
compaction_type::Scrub,
compaction_type::Reshard,
compaction_type::Reshape,
};
return index_to_type[_options.index()];
}
@@ -1329,6 +1362,9 @@ static std::unique_ptr<compaction> make_compaction(column_family& cf, sstables::
column_family& cf;
sstables::compaction_descriptor&& descriptor;
std::unique_ptr<compaction> operator()(compaction_options::reshape) {
return std::make_unique<reshape_compaction>(cf, std::move(descriptor));
}
std::unique_ptr<compaction> operator()(compaction_options::reshard) {
return std::make_unique<resharding_compaction>(cf, std::move(descriptor));
}

View File

@@ -35,12 +35,19 @@ class flat_mutation_reader;
namespace sstables {
class pretty_printed_data_size {
uint64_t _size;
public:
pretty_printed_data_size(uint64_t size) : _size(size) {}
friend std::ostream& operator<<(std::ostream&, pretty_printed_data_size);
};
struct resharding_descriptor {
std::vector<sstables::shared_sstable> sstables;
uint64_t max_sstable_bytes;
shard_id reshard_at;
uint32_t level;
class pretty_printed_throughput {
uint64_t _size;
std::chrono::duration<float> _duration;
public:
pretty_printed_throughput(uint64_t size, std::chrono::duration<float> dur) : _size(size), _duration(std::move(dur)) {}
friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput);
};
static inline sstring compaction_name(compaction_type type) {
@@ -59,6 +66,8 @@ namespace sstables {
return "RESHARD";
case compaction_type::Upgrade:
return "UPGRADE";
case compaction_type::Reshape:
return "RESHAPE";
default:
throw std::runtime_error("Invalid Compaction Type");
}

View File

@@ -43,6 +43,7 @@ enum class compaction_type {
Index_build = 4,
Reshard = 5,
Upgrade = 6,
Reshape = 7,
};
struct compaction_completion_desc {
@@ -72,9 +73,10 @@ public:
};
struct reshard {
};
struct reshape {
};
private:
using options_variant = std::variant<regular, cleanup, upgrade, scrub, reshard>;
using options_variant = std::variant<regular, cleanup, upgrade, scrub, reshard, reshape>;
private:
options_variant _options;
@@ -84,6 +86,10 @@ private:
}
public:
static compaction_options make_reshape() {
return compaction_options(reshape{});
}
static compaction_options make_reshard() {
return compaction_options(reshard{});
}

View File

@@ -309,7 +309,7 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
return task->compaction_done.get_future().then([task] {});
}
future<> compaction_manager::run_resharding_job(column_family* cf, std::function<future<>()> job) {
future<> compaction_manager::run_custom_job(column_family* cf, sstring name, noncopyable_function<future<>()> job) {
if (_state != state::enabled) {
return make_ready_future<>();
}
@@ -318,9 +318,9 @@ future<> compaction_manager::run_resharding_job(column_family* cf, std::function
task->compacting_cf = cf;
_tasks.push_back(task);
task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] {
task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, job = std::move(job)] () mutable {
// take read lock for cf, so major compaction and resharding can't proceed in parallel.
return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] {
return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] () mutable {
_stats.active_tasks++;
if (!can_proceed(task)) {
return make_ready_future<>();
@@ -329,20 +329,17 @@ future<> compaction_manager::run_resharding_job(column_family* cf, std::function
// NOTE:
// no need to register shared sstables because they're excluded from non-resharding
// compaction and some of them may not even belong to current shard.
return with_scheduling_group(_scheduling_group, [job = std::move(job)] {
return job();
});
return job();
});
}).then_wrapped([this, task] (future<> f) {
}).then_wrapped([this, task, name] (future<> f) {
_stats.active_tasks--;
_tasks.remove(task);
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
cmlog.info("resharding was abruptly stopped, reason: {}", e.what());
cmlog.info("{} was abruptly stopped, reason: {}", name, e.what());
} catch (...) {
cmlog.error("resharding failed: {}", std::current_exception());
cmlog.error("{} failed: {}", name, std::current_exception());
}
});
return task->compaction_done.get_future().then([task] {});

View File

@@ -109,7 +109,7 @@ private:
// Prevents column family from running major and minor compaction at same time.
std::unordered_map<column_family*, rwlock> _compaction_locks;
semaphore _resharding_sem{1};
semaphore _custom_job_sem{1};
std::function<void()> compaction_submission_callback();
// all registered column families are submitted for compaction at a constant interval.
@@ -216,15 +216,13 @@ public:
// Submit a column family for major compaction.
future<> submit_major_compaction(column_family* cf);
// Run a resharding job for a given column family.
// Run a custom job for a given column family, defined by a function
// it completes when future returned by job is ready or returns immediately
// if manager was asked to stop.
//
// parameter job is a function that will carry the reshard operation on a set
// of sstables that belong to different shards for this column family using
// sstables::reshard_sstables(), and in the end, it will forward unshared
// sstables created by the process to their owner shards.
future<> run_resharding_job(column_family* cf, std::function<future<>()> job);
// parameter job is a function that will carry the operation
future<> run_custom_job(column_family* cf, sstring name, noncopyable_function<future<>()> job);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.

View File

@@ -460,19 +460,6 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s
return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold;
}
std::vector<resharding_descriptor>
compaction_strategy_impl::get_resharding_jobs(column_family& cf, std::vector<sstables::shared_sstable> candidates) {
std::vector<resharding_descriptor> jobs;
shard_id reshard_at_current = 0;
clogger.debug("Trying to get resharding jobs for {}.{}...", cf.schema()->ks_name(), cf.schema()->cf_name());
for (auto& candidate : candidates) {
auto level = candidate->get_sstable_level();
jobs.push_back(resharding_descriptor{{std::move(candidate)}, std::numeric_limits<uint64_t>::max(), reshard_at_current++ % smp::count, level});
}
return jobs;
}
uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) {
return partition_estimate;
}
@@ -481,6 +468,11 @@ reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutatio
return end_consumer;
}
compaction_descriptor
compaction_strategy_impl::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
return compaction_descriptor();
}
} // namespace sstables
size_tiered_backlog_tracker::inflight_component
@@ -980,10 +972,6 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(column_famil
return _compaction_strategy_impl->get_major_compaction_job(cf, std::move(candidates));
}
std::vector<resharding_descriptor> compaction_strategy::get_resharding_jobs(column_family& cf, std::vector<sstables::shared_sstable> candidates) {
return _compaction_strategy_impl->get_resharding_jobs(cf, std::move(candidates));
}
void compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
_compaction_strategy_impl->notify_completion(removed, added);
}
@@ -1012,6 +1000,11 @@ compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() {
return _compaction_strategy_impl->get_backlog_tracker();
}
sstables::compaction_descriptor
compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, iop, mode);
}
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) {
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate);
}

View File

@@ -73,7 +73,6 @@ public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector<sstables::shared_sstable> candidates);
virtual std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<sstables::shared_sstable> candidates);
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) { }
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
@@ -103,5 +102,7 @@ public:
virtual bool use_interposer_consumer() const {
return false;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode);
};
}

View File

@@ -76,39 +76,6 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(colu
sst->get_sstable_level(), _max_sstable_size_in_mb*1024*1024);
}
std::vector<resharding_descriptor> leveled_compaction_strategy::get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates) {
leveled_manifest manifest = leveled_manifest::create(cf, candidates, _max_sstable_size_in_mb, _stcs_options);
std::vector<resharding_descriptor> descriptors;
shard_id target_shard = 0;
auto get_shard = [&target_shard] { return target_shard++ % smp::count; };
// Basically, we'll iterate through all levels, and for each, we'll sort the
// sstables by first key because there's a need to reshard together adjacent
// sstables.
// The shard at which the job will run is chosen in a round-robin fashion.
for (auto level = 0U; level <= manifest.get_level_count(); level++) {
uint64_t max_sstable_size = !level ? std::numeric_limits<uint64_t>::max() : (_max_sstable_size_in_mb*1024*1024);
auto& sstables = manifest.get_level(level);
boost::sort(sstables, [] (auto& i, auto& j) {
return i->compare_by_first_key(*j) < 0;
});
resharding_descriptor current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level};
for (auto it = sstables.begin(); it != sstables.end(); it++) {
current_descriptor.sstables.push_back(*it);
auto next = std::next(it);
if (current_descriptor.sstables.size() == smp::count || next == sstables.end()) {
descriptors.push_back(std::move(current_descriptor));
current_descriptor = resharding_descriptor{{}, max_sstable_size, get_shard(), level};
}
}
}
return descriptors;
}
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
if (removed.empty() || added.empty()) {
return;
@@ -171,4 +138,69 @@ int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family
return manifest.get_estimated_tasks();
}
compaction_descriptor
leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
std::array<std::vector<shared_sstable>, leveled_manifest::MAX_LEVELS> level_info;
auto is_disjoint = [this, schema] (const std::vector<shared_sstable>& sstables, unsigned tolerance) {
unsigned disjoint_sstables = 0;
auto prev_last = dht::ring_position::min();
for (auto& sst : sstables) {
if (dht::ring_position(sst->get_first_decorated_key()).less_compare(*schema, prev_last)) {
disjoint_sstables++;
}
prev_last = dht::ring_position(sst->get_last_decorated_key());
}
return disjoint_sstables > tolerance;
};
for (auto& sst : input) {
auto sst_level = sst->get_sstable_level();
if (sst_level > leveled_manifest::MAX_LEVELS) {
leveled_manifest::logger.warn("Found SSTable with level {}, higher than the maximum {}. This is unexpected, but will fix", sst_level, leveled_manifest::MAX_LEVELS);
// This is really unexpected, so we'll just compact it all to fix it
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, leveled_manifest::MAX_LEVELS - 1, _max_sstable_size_in_mb * 1024 * 1024);
desc.options = compaction_options::make_reshape();
return desc;
}
level_info[sst_level].push_back(sst);
}
for (auto& level : level_info) {
std::sort(level.begin(), level.end(), [this, schema] (shared_sstable a, shared_sstable b) {
return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key()));
});
}
unsigned max_filled_level = 0;
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
unsigned tolerance = mode == reshape_mode::strict ? 0 : leveled_manifest::leveled_fan_out * 2;
if (level_info[0].size() > offstrategy_threshold) {
level_info[0].resize(std::min(level_info[0].size(), max_sstables));
compaction_descriptor desc(std::move(level_info[0]), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
}
for (unsigned level = 1; level < leveled_manifest::MAX_LEVELS; ++level) {
if (level_info[level].empty()) {
continue;
}
max_filled_level = std::max(max_filled_level, level);
if (!is_disjoint(level_info[level], tolerance)) {
// Unfortunately no good limit to limit input size to max_sstables for LCS major
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, max_filled_level, _max_sstable_size_in_mb * 1024 * 1024);
desc.options = compaction_options::make_reshape();
return desc;
}
}
return compaction_descriptor();
}
}

View File

@@ -41,8 +41,6 @@ public:
virtual compaction_descriptor get_major_compaction_job(column_family& cf, std::vector<sstables::shared_sstable> candidates) override;
virtual std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates) override;
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) override;
// for each level > 0, get newest sstable and use its last key as last
@@ -63,6 +61,8 @@ public:
virtual compaction_backlog_tracker& get_backlog_tracker() override {
return _backlog_tracker;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override;
};
}

View File

@@ -208,4 +208,25 @@ size_tiered_compaction_strategy::most_interesting_bucket(const std::vector<sstab
return most_interesting;
}
compaction_descriptor
size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode)
{
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
if (mode == reshape_mode::relaxed) {
offstrategy_threshold = max_sstables;
}
for (auto& bucket : get_buckets(input)) {
if (bucket.size() >= offstrategy_threshold) {
bucket.resize(std::min(max_sstables, bucket.size()));
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
}
}
return compaction_descriptor();
}
}

View File

@@ -168,6 +168,9 @@ public:
virtual compaction_backlog_tracker& get_backlog_tracker() override {
return _backlog_tracker;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override;
};
}

View File

@@ -26,6 +26,7 @@
#include "log.hh"
#include "sstable_directory.hh"
#include "lister.hh"
#include "database.hh"
static logging::logger dirlog("sstable_directory");
@@ -187,7 +188,7 @@ sstable_directory::process_sstable_dir(const ::io_priority_class& iop) {
return std::max<int64_t>(a, b);
});
dirlog.debug("{} After {} scanned, seen generation {}. {} descriptors found, {} different files found ",
dirlog.debug("After {} scanned, seen generation {}. {} descriptors found, {} different files found ",
_sstable_dir, _max_generation_seen, state.descriptors.size(), state.generations_found.size());
// _descriptors is everything with a TOC. So after we remove this, what's left is
@@ -232,7 +233,7 @@ sstable_directory::move_foreign_sstables(sharded<sstable_directory>& source_dire
}
// Should be empty, since an SSTable that belongs to this shard is not remote.
assert(shard_id != this_shard_id());
dirlog.debug("{} Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id);
dirlog.debug("Moving {} unshared SSTables to shard {} ", info_vec.size(), shard_id);
return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec));
});
}
@@ -278,6 +279,83 @@ sstable_directory::collect_output_sstables_from_resharding(std::vector<sstables:
});
}
future<>
sstable_directory::remove_input_sstables_from_reshaping(std::vector<sstables::shared_sstable> sstlist) {
// When removing input sstables from reshaping: Those SSTables used to be in the unshared local
// list. So not only do we have to remove them, we also have to update the list. Because we're
// dealing with a vector it's easier to just reconstruct the list.
dirlog.debug("Removing {} reshaped SSTables", sstlist.size());
return do_with(std::move(sstlist), std::unordered_set<sstables::shared_sstable>(),
[this] (std::vector<sstables::shared_sstable>& sstlist, std::unordered_set<sstables::shared_sstable>& exclude) {
for (auto& sst : sstlist) {
exclude.insert(sst);
}
auto old = std::exchange(_unshared_local_sstables, {});
for (auto& sst : old) {
if (!exclude.count(sst)) {
_unshared_local_sstables.push_back(sst);
}
}
// Do this last for exception safety. If there is an exception on unlink we
// want to at least leave the SSTable unshared list in a sane state.
return parallel_for_each(std::move(sstlist), [] (sstables::shared_sstable sst) {
return sst->unlink();
}).then([] {
fmt::print("Finished removing all SSTables\n");
});
});
}
future<>
sstable_directory::collect_output_sstables_from_reshaping(std::vector<sstables::shared_sstable> reshaped_sstables) {
dirlog.debug("Collecting {} reshaped SSTables", reshaped_sstables.size());
return parallel_for_each(std::move(reshaped_sstables), [this] (sstables::shared_sstable sst) {
_unshared_local_sstables.push_back(std::move(sst));
return make_ready_future<>();
});
}
future<uint64_t> sstable_directory::reshape(compaction_manager& cm, table& table, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop, sstables::reshape_mode mode)
{
return do_with(uint64_t(0), [this, &cm, &table, creator = std::move(creator), iop, mode] (uint64_t & reshaped_size) mutable {
return repeat([this, &cm, &table, creator = std::move(creator), iop, &reshaped_size, mode] () mutable {
auto desc = table.get_compaction_strategy().get_reshaping_job(_unshared_local_sstables, table.schema(), iop, mode);
if (desc.sstables.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
if (!reshaped_size) {
dirlog.info("Found SSTables that need reshape. Starting reshape process");
}
std::vector<sstables::shared_sstable> sstlist;
for (auto& sst : desc.sstables) {
reshaped_size += sst->data_size();
sstlist.push_back(sst);
}
desc.creator = creator;
return cm.run_custom_job(&table, "reshape", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] () mutable {
return sstables::compact_sstables(std::move(desc), table).then([this, sstlist = std::move(sstlist)] (sstables::compaction_info result) mutable {
return remove_input_sstables_from_reshaping(std::move(sstlist)).then([this, new_sstables = std::move(result.new_sstables)] () mutable {
return collect_output_sstables_from_reshaping(std::move(new_sstables));
});
});
}).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
}).then([&reshaped_size] {
return make_ready_future<uint64_t>(reshaped_size);
});
});
}
future<>
sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& cm, table& table,
unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator, const ::io_priority_class& iop)
@@ -309,7 +387,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return cm.run_resharding_job(&table, [this, iop, &cm, &table, creator, &sstlist] () {
return cm.run_custom_job(&table, "resharding", [this, iop, &cm, &table, creator, &sstlist] () {
sstables::compaction_descriptor desc(sstlist, {}, iop);
desc.options = sstables::compaction_options::make_reshard();
desc.creator = std::move(creator);

View File

@@ -101,7 +101,7 @@ private:
// SSTables that are unshared and belong to this shard. They are already stored as an
// SSTable object.
utils::chunked_vector<sstables::shared_sstable> _unshared_local_sstables;
std::vector<sstables::shared_sstable> _unshared_local_sstables;
// SSTables that are unshared and belong to foreign shards. Because they are more conveniently
// stored as a foreign_sstable_open_info object, they are in a different attribute separate from the
@@ -122,6 +122,9 @@ private:
future<> remove_input_sstables_from_resharding(const std::vector<sstables::shared_sstable>& sstlist);
future<> collect_output_sstables_from_resharding(std::vector<sstables::shared_sstable> resharded_sstables);
future<> remove_input_sstables_from_reshaping(std::vector<sstables::shared_sstable> sstlist);
future<> collect_output_sstables_from_reshaping(std::vector<sstables::shared_sstable> reshaped_sstables);
template <typename Container, typename Func>
future<> parallel_for_each_restricted(Container&& C, Func&& func);
future<> load_foreign_sstables(sstable_info_vector info_vec);
@@ -174,6 +177,12 @@ public:
unsigned max_sstables_per_job, sstables::compaction_sstable_creator_fn creator,
const ::io_priority_class& iop);
// reshapes a collection of SSTables, and returns the total amount of bytes reshaped.
future<uint64_t> reshape(compaction_manager& cm, table& table,
sstables::compaction_sstable_creator_fn creator,
const ::io_priority_class& iop,
sstables::reshape_mode mode);
// Store a phased operation. Usually used to keep an object alive while the directory is being
// processed. One example is preventing table drops concurrent to the processing of this
// directory.

View File

@@ -80,4 +80,49 @@ reader_consumer time_window_compaction_strategy::make_interposer_consumer(const
};
}
compaction_descriptor
time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
std::vector<shared_sstable> single_window;
std::vector<shared_sstable> multi_window;
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
if (mode == reshape_mode::relaxed) {
offstrategy_threshold = max_sstables;
}
for (auto& sst : input) {
auto min = sst->get_stats_metadata().min_timestamp;
auto max = sst->get_stats_metadata().max_timestamp;
if (get_window_for(_options, min) != get_window_for(_options, max)) {
multi_window.push_back(sst);
} else {
single_window.push_back(sst);
}
}
if (!multi_window.empty()) {
// Everything that spans multiple windows will need reshaping
multi_window.resize(std::min(multi_window.size(), max_sstables));
compaction_descriptor desc(std::move(multi_window), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
}
// For things that don't span multiple windows, we compact windows that are individually too big
auto all_buckets = get_buckets(single_window, _options);
for (auto& pair : all_buckets.first) {
auto ssts = std::move(pair.second);
if (ssts.size() > offstrategy_threshold) {
ssts.resize(std::min(multi_window.size(), max_sstables));
compaction_descriptor desc(std::move(ssts), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
}
}
return compaction_descriptor();
}
}

View File

@@ -351,6 +351,8 @@ public:
virtual bool use_interposer_consumer() const override {
return true;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override;
};
}

237
table.cc
View File

@@ -825,49 +825,47 @@ table::seal_active_streaming_memtable_immediate(flush_permit&& permit) {
auto guard = _streaming_flush_phaser.start();
return with_gate(_streaming_flush_gate, [this, old, permit = std::move(permit)] () mutable {
return with_lock(_sstables_lock.for_read(), [this, old, permit = std::move(permit)] () mutable {
auto newtab = make_sstable();
auto newtab = make_sstable();
tlogger.debug("Flushing to {}", newtab->get_filename());
tlogger.debug("Flushing to {}", newtab->get_filename());
// This is somewhat similar to the main memtable flush, but with important differences.
//
// The first difference, is that we don't keep aggregate collectd statistics about this one.
// If we ever need to, we'll keep them separate statistics, but we don't want to polute the
// main stats about memtables with streaming memtables.
//
// Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the
// memtable list, since this memtable was not available for reading up until this point.
auto fp = permit.release_sstable_write_permit();
database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable {
auto&& priority = service::get_local_streaming_write_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
cfg.backup = incremental_backups_enabled();
return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] {
return newtab->open_data();
}).then([this, old, newtab] () {
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, newtab, old] {
auto adder = [this, newtab] {
add_sstable(newtab, {this_shard_id()});
try_trigger_compaction();
tlogger.debug("Flushing to {} done", newtab->get_filename());
};
if (cache_enabled()) {
return _cache.update_invalidating(adder, *old);
} else {
return _cache.invalidate(adder).then([old] { return old->clear_gently(); });
}
});
}).handle_exception([old, permit = std::move(permit), newtab] (auto ep) {
newtab->mark_for_deletion();
tlogger.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
// This is somewhat similar to the main memtable flush, but with important differences.
//
// The first difference, is that we don't keep aggregate collectd statistics about this one.
// If we ever need to, we'll keep them separate statistics, but we don't want to polute the
// main stats about memtables with streaming memtables.
//
// Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the
// memtable list, since this memtable was not available for reading up until this point.
auto fp = permit.release_sstable_write_permit();
database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable {
auto&& priority = service::get_local_streaming_write_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
cfg.backup = incremental_backups_enabled();
return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] {
return newtab->open_data();
}).then([this, old, newtab] () {
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, newtab, old] {
auto adder = [this, newtab] {
add_sstable(newtab, {this_shard_id()});
try_trigger_compaction();
tlogger.debug("Flushing to {} done", newtab->get_filename());
};
if (cache_enabled()) {
return _cache.update_invalidating(adder, *old);
} else {
return _cache.invalidate(adder).then([old] { return old->clear_gently(); });
}
});
}).handle_exception([old, permit = std::move(permit), newtab] (auto ep) {
newtab->mark_for_deletion();
tlogger.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
});
// We will also not have any retry logic. If we fail here, we'll fail the streaming and let
// the upper layers know. They can then apply any logic they want here.
});
// We will also not have any retry logic. If we fail here, we'll fail the streaming and let
// the upper layers know. They can then apply any logic they want here.
}).finally([guard = std::move(guard)] { });
});
}
@@ -882,27 +880,25 @@ future<> table::seal_active_streaming_memtable_big(streaming_memtable_big& smb,
smb.memtables->erase(old);
return with_gate(_streaming_flush_gate, [this, old, &smb, permit = std::move(permit)] () mutable {
return with_gate(smb.flush_in_progress, [this, old, &smb, permit = std::move(permit)] () mutable {
return with_lock(_sstables_lock.for_read(), [this, old, &smb, permit = std::move(permit)] () mutable {
auto newtab = make_sstable();
auto newtab = make_sstable();
auto fp = permit.release_sstable_write_permit();
auto monitor = std::make_unique<database_sstable_write_monitor>(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
auto&& priority = service::get_local_streaming_write_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
cfg.backup = incremental_backups_enabled();
cfg.leave_unsealed = true;
auto fut = write_memtable_to_sstable(*old, newtab, *monitor, cfg, priority);
return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable {
if (!f.failed()) {
smb.sstables.push_back(monitored_sstable{std::move(monitor), newtab});
return make_ready_future<>();
} else {
newtab->mark_for_deletion();
auto ep = f.get_exception();
tlogger.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
}
});
auto fp = permit.release_sstable_write_permit();
auto monitor = std::make_unique<database_sstable_write_monitor>(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
auto&& priority = service::get_local_streaming_write_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
cfg.backup = incremental_backups_enabled();
cfg.leave_unsealed = true;
auto fut = write_memtable_to_sstable(*old, newtab, *monitor, cfg, priority);
return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable {
if (!f.failed()) {
smb.sstables.push_back(monitored_sstable{std::move(monitor), newtab});
return make_ready_future<>();
} else {
newtab->mark_for_deletion();
auto ep = f.get_exception();
tlogger.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
}
});
});
});
@@ -937,9 +933,7 @@ table::seal_active_memtable(flush_permit&& permit) {
return do_with(std::move(permit), [this, old] (auto& permit) {
return repeat([this, old, &permit] () mutable {
auto sstable_write_permit = permit.release_sstable_write_permit();
return with_lock(_sstables_lock.for_read(), [this, old, sstable_write_permit = std::move(sstable_write_permit)] () mutable {
return this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit));
}).then([this, &permit] (auto should_stop) mutable {
return this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit)).then([this, &permit] (auto should_stop) mutable {
if (should_stop) {
return make_ready_future<stop_iteration>(should_stop);
}
@@ -1285,22 +1279,20 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) {
return make_ready_future<>();
}
return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor)] () mutable {
descriptor.creator = [this] (shard_id dummy) {
auto sst = make_sstable();
return sst;
};
descriptor.replacer = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) {
_compaction_strategy.notify_completion(desc.old_sstables, desc.new_sstables);
_compaction_manager.propagate_replacement(this, desc.old_sstables, desc.new_sstables);
this->on_compaction_completion(desc);
if (release_exhausted) {
release_exhausted(desc.old_sstables);
}
};
descriptor.creator = [this] (shard_id dummy) {
auto sst = make_sstable();
return sst;
};
descriptor.replacer = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) {
_compaction_strategy.notify_completion(desc.old_sstables, desc.new_sstables);
_compaction_manager.propagate_replacement(this, desc.old_sstables, desc.new_sstables);
this->on_compaction_completion(desc);
if (release_exhausted) {
release_exhausted(desc.old_sstables);
}
};
return sstables::compact_sstables(std::move(descriptor), *this);
}).then([this] (auto info) {
return sstables::compact_sstables(std::move(descriptor), *this).then([this] (auto info) {
if (info.type != sstables::compaction_type::Compaction) {
return make_ready_future<>();
}
@@ -1897,79 +1889,50 @@ future<> table::clear() {
future<db::replay_position> table::discard_sstables(db_clock::time_point truncated_at) {
assert(_compaction_disabled > 0);
return with_lock(_sstables_lock.for_read(), [this, truncated_at] {
struct pruner {
column_family& cf;
db::replay_position rp;
std::vector<sstables::shared_sstable> remove;
struct pruner {
column_family& cf;
db::replay_position rp;
std::vector<sstables::shared_sstable> remove;
pruner(column_family& cf)
: cf(cf) {}
pruner(column_family& cf)
: cf(cf) {}
void prune(db_clock::time_point truncated_at) {
auto gc_trunc = to_gc_clock(truncated_at);
void prune(db_clock::time_point truncated_at) {
auto gc_trunc = to_gc_clock(truncated_at);
auto pruned = make_lw_shared(cf._compaction_strategy.make_sstable_set(cf._schema));
auto pruned = make_lw_shared(cf._compaction_strategy.make_sstable_set(cf._schema));
for (auto& p : *cf._sstables->all()) {
if (p->max_data_age() <= gc_trunc) {
// Only one shard that own the sstable will submit it for deletion to avoid race condition in delete procedure.
if (*boost::min_element(p->get_shards_for_this_sstable()) == this_shard_id()) {
rp = std::max(p->get_stats_metadata().position, rp);
remove.emplace_back(p);
}
continue;
for (auto& p : *cf._sstables->all()) {
if (p->max_data_age() <= gc_trunc) {
// Only one shard that own the sstable will submit it for deletion to avoid race condition in delete procedure.
if (*boost::min_element(p->get_shards_for_this_sstable()) == this_shard_id()) {
rp = std::max(p->get_stats_metadata().position, rp);
remove.emplace_back(p);
}
pruned->insert(p);
continue;
}
cf._sstables = std::move(pruned);
pruned->insert(p);
}
};
auto p = make_lw_shared<pruner>(*this);
return _cache.invalidate([p, truncated_at] {
p->prune(truncated_at);
tlogger.debug("cleaning out row cache");
}).then([this, p]() mutable {
rebuild_statistics();
return parallel_for_each(p->remove, [this](sstables::shared_sstable s) {
remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), s);
return sstables::delete_atomically({s});
}).then([p] {
return make_ready_future<db::replay_position>(p->rp);
});
cf._sstables = std::move(pruned);
}
};
auto p = make_lw_shared<pruner>(*this);
return _cache.invalidate([p, truncated_at] {
p->prune(truncated_at);
tlogger.debug("cleaning out row cache");
}).then([this, p]() mutable {
rebuild_statistics();
return parallel_for_each(p->remove, [this](sstables::shared_sstable s) {
remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), s);
return sstables::delete_atomically({s});
}).then([p] {
return make_ready_future<db::replay_position>(p->rp);
});
});
}
future<int64_t>
table::disable_sstable_write() {
_sstable_writes_disabled_at = std::chrono::steady_clock::now();
return _sstables_lock.write_lock().then([this] {
// _sstable_deletion_sem must be acquired after _sstables_lock.write_lock
return _sstable_deletion_sem.wait().then([this] {
if (_sstables->all()->empty()) {
return make_ready_future<int64_t>(0);
}
int64_t max = 0;
for (auto&& s : *_sstables->all()) {
max = std::max(max, s->generation());
}
return make_ready_future<int64_t>(max);
});
});
}
std::chrono::steady_clock::duration table::enable_sstable_write(int64_t new_generation) {
if (new_generation != -1) {
update_sstables_known_generation(new_generation);
}
_sstable_deletion_sem.signal();
_sstables_lock.write_unlock();
return std::chrono::steady_clock::now() - _sstable_writes_disabled_at;
}
void table::set_schema(schema_ptr s) {
assert(s->is_counter() == _schema->is_counter());
tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}",

View File

@@ -169,7 +169,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
// Create incomplete sstables in test data directory
sstring ks = "system";
sstring cf = "local-7ad54392bcdd35a684174e047860b377";
sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f";
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
auto require_exist = [] (const sstring& name, bool should_exist) {
@@ -223,7 +223,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
// Create incomplete sstables in test data directory
sstring ks = "system";
sstring cf = "local-7ad54392bcdd35a684174e047860b377";
sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f";
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
sstring pending_delete_dir = sst_dir + "/" + sst::pending_delete_dir_basename();

View File

@@ -133,43 +133,6 @@ SEASTAR_TEST_CASE(sstable_resharding_test) {
});
}
SEASTAR_THREAD_TEST_CASE(sstable_resharding_strategy_tests) {
test_env env;
for (const auto version : all_sstable_versions) {
auto s = make_lw_shared(schema({}, "ks", "cf", {{"p1", utf8_type}}, {}, {}, {}, utf8_type));
auto get_sstable = [&] (int64_t gen, sstring first_key, sstring last_key) mutable {
auto sst = env.make_sstable(s, "", gen, version, sstables::sstable::format_types::big);
stats_metadata stats = {};
stats.sstable_level = 1;
sstables::test(sst).set_values(std::move(first_key), std::move(last_key), std::move(stats));
return sst;
};
column_family_for_tests cf;
auto tokens = token_generation_for_current_shard(2);
auto stcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options());
auto lcs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
auto sst1 = get_sstable(1, tokens[0].first, tokens[1].first);
auto sst2 = get_sstable(2, tokens[1].first, tokens[1].first);
{
// TODO: sstable_test runs with smp::count == 1, thus we will not be able to stress it more
// until we move this test case to sstable_resharding_test.
auto descriptors = stcs.get_resharding_jobs(*cf, { sst1, sst2 });
BOOST_REQUIRE(descriptors.size() == 2);
}
{
auto ssts = std::vector<sstables::shared_sstable>{ sst1, sst2 };
auto descriptors = lcs.get_resharding_jobs(*cf, ssts);
auto expected_jobs = (ssts.size()+smp::count-1)/smp::count;
BOOST_REQUIRE(descriptors.size() == expected_jobs);
}
}
}
SEASTAR_TEST_CASE(sstable_is_shared_correctness) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {