Rewrite shared sstables soon after startup

Several shards may share the same sstable - e.g., when re-starting scylla
with a different number of shards, or when importing sstables from an
external source. Sharing an sstable is fine, but it can result in excessive
disk space use because the shared sstable cannot be deleted until all
the shards using it have finished compacting it. Normally, we have no idea
when the shards will decide to compact these sstables - e.g., with size-
tiered-compaction a large sstable will take a long time until we decide
to compact it. So what this patch does is to initiate compaction of the
shared sstables - on each shard using it - so that a soon as possible after
the restart, we will have the original sstable is split into separate
sstables per shard, and the original sstable can be deleted. If several
sstables are shared, we serialize this compaction process so that each
shard only rewrites one sstable at a time. Regular compactions may happen
in parallel, but they will not not be able to choose any of the shared
sstables because those are already marked as being compacted.

Commit 3f2286d0 increased the need for this patch, because since that
commit, if we don't delete the shared sstable, we also cannot delete
additional sstables which the different shards compacted with it. For one
scylla user, this resulted in so much excessive disk space use, that it
literally filled the whole disk.

After this patch commit 3f2286d0, or the discussion in issue #1318 on how
to improve it, is no longer necessary, because we will never compact a shared
sstable together with any other sstable - as explained above, the shared
sstables are marked as "being compacted" so the regular compactions will
avoid them.

Fixes #1314.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1465406235-15378-1-git-send-email-nyh@scylladb.com>
Reviewed-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Nadav Har'El
2016-06-08 20:17:15 +03:00
committed by Avi Kivity
parent 1b8e170254
commit 721f7d1d4f
4 changed files with 108 additions and 37 deletions

View File

@@ -484,12 +484,61 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
return (s1 <= me) && (me <= s2);
}
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
auto key_shard = [&s] (const partition_key& pk) {
auto token = dht::global_partitioner().get_token(s, pk);
return dht::shard_of(token);
};
auto s1 = key_shard(first);
auto s2 = key_shard(last);
auto me = engine().cpu_id();
return (s1 != me) || (me != s2);
}
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
}
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
}
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, r)) {
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
sst->mark_for_deletion();
return make_ready_future<>();
}
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
if (in_other_shard) {
// If we're here, this sstable is shared by this and other
// shard(s). Shared sstables cannot be deleted until all
// shards compacted them, so to reduce disk space usage we
// want to start splitting them now.
dblog.info("Splitting {} for shard", sst->get_filename());
_compaction_manager.submit_sstable_rewrite(this, sst);
}
if (reset_level) {
// When loading a migrated sstable, set level to 0 because
// it may overlap with existing tables in levels > 0.
// This step is optional, because even if we didn't do this
// scylla would detect the overlap, and bring back some of
// the sstables to level 0.
sst->set_sstable_level(0);
}
add_sstable(sst);
});
});
}
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
using namespace sstables;
@@ -514,24 +563,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
}
}
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
auto fut = sst->get_sstable_key_range(*_schema);
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, std::move(r))) {
dblog.debug("sstable {} not relevant for this shard, ignoring",
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
sstables::sstable::component_type::Data));
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
return make_ready_future<>();
}
auto fut = sst->load();
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
add_sstable(std::move(*sst));
return make_ready_future<>();
});
}).then_wrapped([fname, comps] (future<> f) {
return load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
try {
f.get();
} catch (malformed_sstable_exception& e) {
@@ -1027,25 +1061,9 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
future<>
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
return parallel_for_each(new_tables, [this] (auto comps) {
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
return sst->load().then([this, sst] {
// This sets in-memory level of sstable to 0.
// When loading a migrated sstable, it's important to set it to level 0 because
// leveled compaction relies on a level > 0 having no overlapping sstables.
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
// is smart enough to detect a sstable that overlaps and set its in-memory
// level to 0.
return sst->set_sstable_level(0);
}).then([this, sst] {
auto first = sst->get_first_partition_key(*_schema);
auto last = sst->get_last_partition_key(*_schema);
if (belongs_to_current_shard(*_schema, first, last)) {
this->add_sstable(sst);
} else {
sst->mark_for_deletion();
}
return make_ready_future<>();
});
return this->load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), _config.datadir,
comps.generation, comps.version, comps.format), true);
}).then([this] {
// Drop entire cache for this column family because it may be populated
// with stale data.

View File

@@ -371,6 +371,7 @@ private:
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
void add_sstable(sstables::sstable&& sstable);
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);

View File

@@ -247,6 +247,51 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
return task;
}
// submit_sstable_rewrite() starts a compaction task, much like submit(),
// But rather than asking a compaction policy what to compact, this function
// compacts just a single sstable, and writes one new sstable. This operation
// is useful to split an sstable containing data belonging to multiple shards
// into a separate sstable on each shard.
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
// The semaphore ensures that the sstable rewrite operations submitted by
// submit_sstable_rewrite are run in sequence, and not all of them in
// parallel. Note that unlike general compaction which currently allows
// different cfs to compact in parallel, here we don't have a semaphore
// per cf, so we only get one rewrite at a time on each shard.
static thread_local semaphore sem(1);
// We cannot, and don't need to, compact an sstable which is already
// being compacted anyway.
if (_stopped || _compacting_sstables.count(sst)) {
return;
}
// Conversely, we don't want another compaction job to compact the
// sstable we are planning to work on:
_compacting_sstables.insert(sst);
auto task = make_lw_shared<compaction_manager::task>();
_tasks.push_back(task);
_stats.active_tasks++;
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
return cf->compact_sstables(sstables::compaction_descriptor(
std::vector<sstables::shared_sstable>{sst},
sst->get_sstable_level(),
std::numeric_limits<uint64_t>::max()), false);
}).then_wrapped([this, sst, task] (future<> f) {
_compacting_sstables.erase(sst);
_stats.active_tasks--;
_tasks.remove(task);
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stop_exception& e) {
cmlog.info("compaction info: {}", e.what());
_stats.errors++;
} catch (...) {
cmlog.error("compaction failed: {}", std::current_exception());
_stats.errors++;
}
});
}
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
task->stopping = true;
return task->compaction_gate.close().then([task] {

View File

@@ -109,6 +109,13 @@ public:
// Submit a column family to be cleaned up and wait for its termination.
future<> perform_cleanup(column_family* cf);
// Submit a specific sstable to be rewritten, while dropping data which
// does not belong to this shard. Meant to be used on startup when an
// sstable is shared by multiple shards, and we want to split it to a
// separate sstable for each shard.
void submit_sstable_rewrite(column_family* cf,
sstables::shared_sstable s);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);