Merge 'tablets: Fix race between repair and split' from Raphael "Raph" Carvalho

Consider the following:

```
T
0   split prepare starts
1                               repair starts
2   split prepare finishes
3                               repair adds unsplit sstables
4                               repair ends
5   split executes
```

If repair produces sstable after split prepare phase, the replica will not split that sstable later, as prepare phase is considered completed already. That causes split execution to fail as replicas weren't really prepared. This also can be triggered with load-and-stream which shares the same write (consumer) path.

The approach to fix this is the same employed to prevent a race between split and migration. If migration happens during prepare phase, it can happen source misses the split request, but the tablet will still be split on the destination (if needed). Similarly, the repair writer becomes responsible for splitting the data if underlying table is in split mode. That's implemented in replica::table for correctness, so if node crashes, the new sstable missing split is still split before added to the set.

Fixes #19378.
Fixes #19416.

**Please replace this line with justification for the backport/\* labels added to this PR**

Closes scylladb/scylladb#19427

* github.com:scylladb/scylladb:
  tablets: Fix race between repair and split
  compaction: Allow "offline" sstable to be split
This commit is contained in:
Tomasz Grabiec
2024-08-19 14:44:28 +02:00
8 changed files with 190 additions and 12 deletions

View File

@@ -1550,11 +1550,16 @@ protected:
co_return stats;
}
virtual sstables::compaction_descriptor make_descriptor(const sstables::shared_sstable& sst) const {
static sstables::compaction_descriptor
make_descriptor(const sstables::shared_sstable& sst, const sstables::compaction_type_options& opt, owned_ranges_ptr owned_ranges = {}) {
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
return sstables::compaction_descriptor({ sst },
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options, _owned_ranges_ptr);
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, opt, owned_ranges);
}
virtual sstables::compaction_descriptor make_descriptor(const sstables::shared_sstable& sst) const {
return make_descriptor(sst, _options, _owned_ranges_ptr);
}
virtual future<sstables::compaction_result> rewrite_sstable(const sstables::shared_sstable sst) {
@@ -1608,15 +1613,24 @@ public:
, _opt(options.as<sstables::compaction_type_options::split>())
{
}
static bool sstable_needs_split(const sstables::shared_sstable& sst, const sstables::compaction_type_options::split& opt) {
return opt.classifier(sst->get_first_decorated_key().token()) != opt.classifier(sst->get_last_decorated_key().token());
}
static sstables::compaction_descriptor
make_descriptor(const sstables::shared_sstable& sst, const sstables::compaction_type_options::split& split_opt) {
auto opt = sstables::compaction_type_options::make_split(split_opt.classifier);
return rewrite_sstables_compaction_task_executor::make_descriptor(sst, std::move(opt));
}
private:
bool sstable_needs_split(const sstables::shared_sstable& sst) const {
return _opt.classifier(sst->get_first_decorated_key().token()) != _opt.classifier(sst->get_last_decorated_key().token());
return sstable_needs_split(sst, _opt);
}
protected:
sstables::compaction_descriptor make_descriptor(const sstables::shared_sstable& sst) const override {
auto desc = rewrite_sstables_compaction_task_executor::make_descriptor(sst);
desc.options = sstables::compaction_type_options::make_split(_opt.classifier);
return desc;
return make_descriptor(sst, _opt);
}
future<sstables::compaction_result> rewrite_sstable(const sstables::shared_sstable sst) override {
@@ -2053,6 +2067,30 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
return perform_task_on_all_files<split_compaction_task_executor>(info, t, std::move(options), std::move(owned_ranges_ptr), std::move(get_sstables));
}
future<std::vector<sstables::shared_sstable>>
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, table_state& t, sstables::compaction_type_options::split opt) {
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
std::vector<sstables::shared_sstable> ret;
co_await run_custom_job(t, sstables::compaction_type::Split, "Split SSTable",
[&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& monitor) -> future<> {
sstables::compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
desc.creator = [&t] (shard_id _) {
return t.make_sstable();
};
desc.replacer = [&] (sstables::compaction_completion_desc d) {
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));
};
co_await sstables::compact_sstables(std::move(desc), info, t, monitor);
co_await sst->unlink();
}, tasks::task_info{}, throw_if_stopping::yes);
co_return ret;
}
// Submit a table to be scrubbed and wait for its termination.
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub(table_state& t, sstables::compaction_type_options::scrub opts, tasks::task_info info) {
auto scrub_mode = opts.operation_mode;

View File

@@ -345,6 +345,11 @@ public:
// or user aborted splitting using stop API.
future<compaction_stats_opt> perform_split_compaction(compaction::table_state& t, sstables::compaction_type_options::split opt, tasks::task_info info);
// Splits a single SSTable by segregating all its data according to the classifier.
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, table_state& t, sstables::compaction_type_options::split opt);
// Run a custom job for a given table, defined by a function
// it completes when future returned by job is ready or returns immediately
// if manager was asked to stop.

View File

@@ -322,6 +322,7 @@ public:
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups() = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) = 0;
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;
};

View File

@@ -581,6 +581,11 @@ public:
// be split once it returns.
future<> maybe_split_compaction_group_of(locator::tablet_id);
private:
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
// NOTE: it must only be used on new SSTables that weren't added to the set yet.
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable&);
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
// each tablet split into two, so this replica will remap all of its compaction groups
// that were previously split.
@@ -618,6 +623,7 @@ private:
return _config.enable_cache && _schema->caching_options().enabled();
}
void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept;
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy, bool trigger_compaction);
future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction);
// Helpers which add sstable on behalf of a compaction group and refreshes compound set.
void add_sstable(compaction_group& cg, sstables::shared_sstable sstable);

View File

@@ -664,6 +664,9 @@ public:
bool all_storage_groups_split() override { return true; }
future<> split_all_storage_groups() override { return make_ready_future(); }
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override {
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
}
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
return get_compaction_group().make_sstable_set();
@@ -764,6 +767,7 @@ public:
bool all_storage_groups_split() override;
future<> split_all_storage_groups() override;
future<> maybe_split_compaction_group_of(size_t idx) override;
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override;
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
@@ -846,7 +850,13 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) {
co_return;
}
if (_main_cg->empty()) {
co_return;
}
auto holder = _main_cg->async_gate().hold();
co_await _main_cg->flush();
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
co_await _main_cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{});
co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{});
}
@@ -923,11 +933,27 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
return sg->split(split_compaction_options());
}
future<std::vector<sstables::shared_sstable>>
tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) {
if (!tablet_map().needs_split()) {
co_return std::vector<sstables::shared_sstable>{sst};
}
auto& cg = compaction_group_for_sstable(sst);
auto holder = cg.async_gate().hold();
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.as_table_state(), split_compaction_options());
}
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
auto holder = async_gate().hold();
co_await _sg_manager->maybe_split_compaction_group_of(tablet_id.value());
}
future<std::vector<sstables::shared_sstable>> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
auto holder = async_gate().hold();
co_return co_await _sg_manager->maybe_split_sstable(sst);
}
std::unique_ptr<storage_group_manager> table::make_storage_group_manager() {
std::unique_ptr<storage_group_manager> ret;
if (uses_tablets()) {
@@ -1110,11 +1136,8 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no
}
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
compaction_group& cg = compaction_group_for_sstable(sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy,
bool trigger_compaction) {
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
@@ -1131,6 +1154,16 @@ table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::o
}), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}));
}
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
for (auto sst : co_await maybe_split_new_sstable(new_sst)) {
auto& cg = compaction_group_for_sstable(sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction);
}
}
future<>
table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
bool do_trigger_compaction = offstrategy == sstables::offstrategy::no;

View File

@@ -2467,6 +2467,9 @@ future<bool> topology_coordinator::maybe_start_tablet_split_finalization(group0_
if (plan.finalize_resize.empty()) {
co_return false;
}
if (utils::get_local_injector().enter("tablet_split_finalization_postpone")) {
co_return false;
}
std::vector<canonical_mutation> updates;

View File

@@ -5366,6 +5366,28 @@ SEASTAR_TEST_CASE(splitting_compaction_test) {
BOOST_REQUIRE_EQUAL(classify_fn(sst->get_first_decorated_key().token()), classify_fn(sst->get_last_decorated_key().token()));
BOOST_REQUIRE_EQUAL(window_for(sst->get_stats_metadata().min_timestamp), window_for(sst->get_stats_metadata().max_timestamp));
}
BOOST_REQUIRE(ret.new_sstables.size() == 4); // 2 token groups * 2 windows.
const size_t expected_output_size = 4; // 2 token groups * 2 windows.
BOOST_REQUIRE(ret.new_sstables.size() == expected_output_size);
auto& cm = t->get_compaction_manager();
auto split_opt = compaction_type_options::split{classify_fn};
auto new_ssts = cm.maybe_split_sstable(input, t.as_table_state(), split_opt).get();
BOOST_REQUIRE(new_ssts.size() == expected_output_size);
for (auto& sst : new_ssts) {
// split sstables don't require further split.
auto ssts = cm.maybe_split_sstable(sst, t.as_table_state(), split_opt).get();
BOOST_REQUIRE(ssts.size() == 1);
BOOST_REQUIRE(ssts.front() == sst);
}
// test exception propagation
auto throwing_classifier = [&] (dht::token t) -> mutation_writer::token_group_id {
// skip first and last token, to not trigger exception when checking if sstable needs split.
if (t != input->get_first_decorated_key().token() && t != input->get_last_decorated_key().token()) {
throw std::runtime_error("exception");
}
return classify_fn(t);
};
BOOST_REQUIRE_THROW(cm.maybe_split_sstable(input, t.as_table_state(), compaction_type_options::split{throwing_classifier}).get(),
std::runtime_error);
});
}

View File

@@ -442,6 +442,76 @@ async def test_tablet_repair(manager: ManagerClient):
for r in rows:
assert r.c == repair_cycles - 1
# Reproducer for race between split and repair: https://github.com/scylladb/scylladb/issues/19378
# Verifies repair will not complete with sstables that still require split, causing split
# execution to fail.
@pytest.mark.repair
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_concurrent_tablet_repair_and_split(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'raft_topology=debug',
'--target-tablet-size-in-bytes', '1024',
]
servers = await manager.servers_add(3, cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
})
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 2} AND tablets = {'initial': 32};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
keys = range(5000) # Enough keys to trigger repair digest mismatch with a high chance.
stmt = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ONE
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
await asyncio.gather(*[cql.run_async(stmt, [k, -1]) for k in keys])
# split decision is sstable size based, so data must be flushed first
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, "test")
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", False)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Waiting for split prepare...")
await s0_log.wait_for('Setting split ready sequence number to', from_mark=s0_mark)
s0_mark = await s0_log.mark()
logger.info("Waited for split prepare")
# Balancer is re-enabled later for split execution
await asyncio.create_task(manager.api.disable_tablet_balancing(servers[0].ip_addr))
# Write concurrently with repair to increase the chance of repair having some discrepancy to resolve and send writes.
inserts_future = asyncio.gather(*[cql.run_async(stmt, [k, 1]) for k in keys])
await repair_on_node(manager, servers[0], servers)
await inserts_future
logger.info("Waiting for split execute...")
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s0_log.wait_for('Detected tablet split for table', from_mark=s0_mark)
await inject_error_one_shot_on(manager, "tablet_split_finalization_postpone", servers)
logger.info("Waited for split execute...")
key_count = len(keys)
stmt = cql.prepare("SELECT * FROM test.test;")
stmt.consistency_level = ConsistencyLevel.ALL
rows = await cql.run_async(stmt)
assert len(rows) == key_count
@pytest.mark.repair
@pytest.mark.asyncio