Rewrite shared sstables soon after startup
Several shards may share the same sstable - e.g., when re-starting scylla with a different number of shards, or when importing sstables from an external source. Sharing an sstable is fine, but it can result in excessive disk space use because the shared sstable cannot be deleted until all the shards using it have finished compacting it. Normally, we have no idea when the shards will decide to compact these sstables - e.g., with size- tiered-compaction a large sstable will take a long time until we decide to compact it. So what this patch does is to initiate compaction of the shared sstables - on each shard using it - so that a soon as possible after the restart, we will have the original sstable is split into separate sstables per shard, and the original sstable can be deleted. If several sstables are shared, we serialize this compaction process so that each shard only rewrites one sstable at a time. Regular compactions may happen in parallel, but they will not not be able to choose any of the shared sstables because those are already marked as being compacted. Commit3f2286d0increased the need for this patch, because since that commit, if we don't delete the shared sstable, we also cannot delete additional sstables which the different shards compacted with it. For one scylla user, this resulted in so much excessive disk space use, that it literally filled the whole disk. After this patch commit3f2286d0, or the discussion in issue #1318 on how to improve it, is no longer necessary, because we will never compact a shared sstable together with any other sstable - as explained above, the shared sstables are marked as "being compacted" so the regular compactions will avoid them. Fixes #1314. Signed-off-by: Nadav Har'El <nyh@scylladb.com> Message-Id: <1465406235-15378-1-git-send-email-nyh@scylladb.com> Reviewed-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
92
database.cc
92
database.cc
@@ -484,12 +484,61 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
|
||||
return (s1 <= me) && (me <= s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
|
||||
auto key_shard = [&s] (const partition_key& pk) {
|
||||
auto token = dht::global_partitioner().get_token(s, pk);
|
||||
return dht::shard_of(token);
|
||||
};
|
||||
auto s1 = key_shard(first);
|
||||
auto s2 = key_shard(last);
|
||||
auto me = engine().cpu_id();
|
||||
return (s1 != me) || (me != s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
|
||||
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, r)) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
|
||||
sst->mark_for_deletion();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
|
||||
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
|
||||
if (in_other_shard) {
|
||||
// If we're here, this sstable is shared by this and other
|
||||
// shard(s). Shared sstables cannot be deleted until all
|
||||
// shards compacted them, so to reduce disk space usage we
|
||||
// want to start splitting them now.
|
||||
dblog.info("Splitting {} for shard", sst->get_filename());
|
||||
_compaction_manager.submit_sstable_rewrite(this, sst);
|
||||
}
|
||||
if (reset_level) {
|
||||
// When loading a migrated sstable, set level to 0 because
|
||||
// it may overlap with existing tables in levels > 0.
|
||||
// This step is optional, because even if we didn't do this
|
||||
// scylla would detect the overlap, and bring back some of
|
||||
// the sstables to level 0.
|
||||
sst->set_sstable_level(0);
|
||||
}
|
||||
add_sstable(sst);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
|
||||
|
||||
using namespace sstables;
|
||||
@@ -514,24 +563,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
|
||||
}
|
||||
}
|
||||
|
||||
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
auto fut = sst->get_sstable_key_range(*_schema);
|
||||
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, std::move(r))) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring",
|
||||
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
|
||||
sstables::sstable::component_type::Data));
|
||||
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto fut = sst->load();
|
||||
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
|
||||
add_sstable(std::move(*sst));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then_wrapped([fname, comps] (future<> f) {
|
||||
return load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
|
||||
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (malformed_sstable_exception& e) {
|
||||
@@ -1027,25 +1061,9 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
|
||||
future<>
|
||||
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
|
||||
return parallel_for_each(new_tables, [this] (auto comps) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
|
||||
return sst->load().then([this, sst] {
|
||||
// This sets in-memory level of sstable to 0.
|
||||
// When loading a migrated sstable, it's important to set it to level 0 because
|
||||
// leveled compaction relies on a level > 0 having no overlapping sstables.
|
||||
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
|
||||
// is smart enough to detect a sstable that overlaps and set its in-memory
|
||||
// level to 0.
|
||||
return sst->set_sstable_level(0);
|
||||
}).then([this, sst] {
|
||||
auto first = sst->get_first_partition_key(*_schema);
|
||||
auto last = sst->get_last_partition_key(*_schema);
|
||||
if (belongs_to_current_shard(*_schema, first, last)) {
|
||||
this->add_sstable(sst);
|
||||
} else {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return this->load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), _config.datadir,
|
||||
comps.generation, comps.version, comps.format), true);
|
||||
}).then([this] {
|
||||
// Drop entire cache for this column family because it may be populated
|
||||
// with stale data.
|
||||
|
||||
@@ -371,6 +371,7 @@ private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
|
||||
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
lw_shared_ptr<memtable> new_streaming_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
|
||||
@@ -247,6 +247,51 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
|
||||
return task;
|
||||
}
|
||||
|
||||
// submit_sstable_rewrite() starts a compaction task, much like submit(),
|
||||
// But rather than asking a compaction policy what to compact, this function
|
||||
// compacts just a single sstable, and writes one new sstable. This operation
|
||||
// is useful to split an sstable containing data belonging to multiple shards
|
||||
// into a separate sstable on each shard.
|
||||
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
|
||||
// The semaphore ensures that the sstable rewrite operations submitted by
|
||||
// submit_sstable_rewrite are run in sequence, and not all of them in
|
||||
// parallel. Note that unlike general compaction which currently allows
|
||||
// different cfs to compact in parallel, here we don't have a semaphore
|
||||
// per cf, so we only get one rewrite at a time on each shard.
|
||||
static thread_local semaphore sem(1);
|
||||
// We cannot, and don't need to, compact an sstable which is already
|
||||
// being compacted anyway.
|
||||
if (_stopped || _compacting_sstables.count(sst)) {
|
||||
return;
|
||||
}
|
||||
// Conversely, we don't want another compaction job to compact the
|
||||
// sstable we are planning to work on:
|
||||
_compacting_sstables.insert(sst);
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
_tasks.push_back(task);
|
||||
_stats.active_tasks++;
|
||||
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
|
||||
return cf->compact_sstables(sstables::compaction_descriptor(
|
||||
std::vector<sstables::shared_sstable>{sst},
|
||||
sst->get_sstable_level(),
|
||||
std::numeric_limits<uint64_t>::max()), false);
|
||||
}).then_wrapped([this, sst, task] (future<> f) {
|
||||
_compacting_sstables.erase(sst);
|
||||
_stats.active_tasks--;
|
||||
_tasks.remove(task);
|
||||
try {
|
||||
f.get();
|
||||
_stats.completed_tasks++;
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
cmlog.info("compaction info: {}", e.what());
|
||||
_stats.errors++;
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}", std::current_exception());
|
||||
_stats.errors++;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
|
||||
task->stopping = true;
|
||||
return task->compaction_gate.close().then([task] {
|
||||
|
||||
@@ -109,6 +109,13 @@ public:
|
||||
// Submit a column family to be cleaned up and wait for its termination.
|
||||
future<> perform_cleanup(column_family* cf);
|
||||
|
||||
// Submit a specific sstable to be rewritten, while dropping data which
|
||||
// does not belong to this shard. Meant to be used on startup when an
|
||||
// sstable is shared by multiple shards, and we want to split it to a
|
||||
// separate sstable for each shard.
|
||||
void submit_sstable_rewrite(column_family* cf,
|
||||
sstables::shared_sstable s);
|
||||
|
||||
// Remove a column family from the compaction manager.
|
||||
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
|
||||
future<> remove(column_family* cf);
|
||||
|
||||
Reference in New Issue
Block a user