diff --git a/repair/row_level.cc b/repair/row_level.cc index 706631db7a..5d4abc4901 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2168,25 +2168,55 @@ public: public: future<> mark_sstable_as_repaired() { auto& sstables = _repair_writer->get_sstable_list_to_mark_as_repaired(); - if (_incremental_repair_meta.sst_set || !sstables.empty()) { - co_await seastar::async([&] { - auto do_mark_sstable_as_repaired = [&] (const sstables::shared_sstable& sst, const sstring& type) { - auto filename = sst->toc_filename(); - auto name = sst->component_basename(component_type::Data); - int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1; - sst->update_repaired_at(repaired_at); - rlogger.info("Marking filename={} name={} repaired_at={} being_repaired={} type={} for incremental repair", - filename, name, repaired_at, sst->being_repaired, type); - }; - _incremental_repair_meta.sst_set->for_each_sstable([&] (const sstables::shared_sstable& sst) { - seastar::thread::maybe_yield(); - do_mark_sstable_as_repaired(sst, "existing"); - }); - for (auto& sst : sstables) { - seastar::thread::maybe_yield(); - do_mark_sstable_as_repaired(sst, "repair_produced"); + if (!_incremental_repair_meta.sst_set && !sstables.empty()) { + co_return; + } + + auto& table = _db.local().find_column_family(_schema->id()); + auto& cm = table.get_compaction_manager(); + int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1; + + auto modifier = [repaired_at] (sstables::sstable& new_sst) { + new_sst.update_repaired_at(repaired_at); + }; + + std::unordered_map> sstables_by_group; + auto add_sstable = [&] (const sstables::shared_sstable& sst) { + if (sst->should_update_repaired_at(repaired_at)) { + auto& view = table.compaction_group_view_for_sstable(sst); + sstables_by_group[&view].push_back(sst); + } + }; + + if (_incremental_repair_meta.sst_set) { + _incremental_repair_meta.sst_set->for_each_sstable(add_sstable); + } + + for (auto& sst : sstables) { + add_sstable(sst); + } + + for (auto& [view, ssts] : sstables_by_group) { + for (auto& sst : ssts) { + rlogger.info("Marking sstable={} repaired_at={} being_repaired={} for incremental repair", + sst->toc_filename(), repaired_at, sst->being_repaired); + } + auto rewritten_sstables = co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts), + sstables::component_type::Statistics, modifier); + + // remove the old sstables from incremental repair meta and add the new ones + for (auto& ss : rewritten_sstables) { + bool erased = _incremental_repair_meta.sst_set->erase(ss.first); + if (erased) { + _incremental_repair_meta.sst_set->insert(ss.second); } - }); + + auto it = sstables.find(ss.first); + if (it != sstables.end()) { + sstables.erase(it); + sstables.insert(ss.second); + } + } } } }; diff --git a/replica/table.cc b/replica/table.cc index e519f5de52..73c083a7b7 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2102,24 +2102,36 @@ std::vector compaction_group::all_sstables() const { future<> compaction_group::update_repaired_at_for_merge() { - auto sstables = all_sstables(); auto sstables_repaired_at = get_sstables_repaired_at(); - co_await seastar::async([&] { - for (auto& sst : sstables) { - thread::maybe_yield(); - auto& stats = sst->get_stats_metadata(); - if (stats.repaired_at > sstables_repaired_at) { - auto neww = 0; - auto old = sst->update_repaired_at(neww); - tlogger.info("Finished repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}", - sst->get_filename(), old, neww, sstables_repaired_at, group_id(), token_range()); - } else { - auto old = stats.repaired_at; - tlogger.debug("Skipped repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}", - sst->get_filename(), old, old, sstables_repaired_at, group_id(), token_range()); - } + constexpr int64_t new_repaired_at = 0; + + auto modifier = [] (sstables::sstable& new_sst) { + new_sst.update_repaired_at(new_repaired_at); + }; + + std::unordered_map> sstables_by_view; + for (auto& sst : all_sstables()) { + auto& stats = sst->get_stats_metadata(); + if (stats.repaired_at > sstables_repaired_at) { + auto& view = view_for_sstable(sst); + sstables_by_view[&view].push_back(sst); + } else { + tlogger.debug("Skipped repaired_at update for tablet merge sstable={} repaired_at={} sstables_repaired_at={} group_id={} range={}", + sst->get_filename(), stats.repaired_at, sstables_repaired_at, group_id(), token_range()); } - }); + } + + auto& cm = get_compaction_manager(); + for (auto& [view, ssts] : sstables_by_view) { + for (auto& sst : ssts) { + tlogger.info("Updating repaired_at for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}", + sst->get_filename(), sst->get_stats_metadata().repaired_at, new_repaired_at, sstables_repaired_at, group_id(), token_range()); + } + co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts), + sstables::component_type::Statistics, modifier); + } + tlogger.info("Completed updating repaired_at={} for tablet merge in compaction group_id={} range={}", + new_repaired_at, group_id(), token_range()); } future> table::get_compaction_group_views_for_repair(dht::token_range range) { diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 6b728d2aac..04d1ff2480 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -266,9 +266,18 @@ sstable_directory::process_descriptor(sstables::entry_descriptor desc, auto sst = co_await load_sstable(desc, storage_opts, flags.sstable_open_config); validate(sst, flags); - if (flags.need_mutate_level) { + if (flags.need_mutate_level && sst->should_mutate_sstable_level(0)) { dirlog.trace("Mutating {} to level 0\n", sst->get_filename()); - co_await sst->mutate_sstable_level(0); + auto modifier = [] (sstable& new_sst) { + new_sst.mutate_sstable_level(0); + }; + auto sst_creator = [&](shared_sstable) { + return _manager.make_sstable(_schema, storage_opts, sstables::sstable_generation_generator{}(), _state, desc.version, desc.format, db_clock::now(), _error_handler_gen); + }; + auto new_sst = co_await sst->link_with_rewritten_component(std::move(sst_creator), component_type::Statistics, std::move(modifier), false); + co_await sst->unlink(); + sst = std::move(new_sst); + desc.generation = sst->generation(); } if (flags.sort_sstables_according_to_owner) { diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 3ac7436149..917c213cf9 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1373,8 +1373,7 @@ int64_t sstable::update_repaired_at(int64_t repaired_at) { } auto stats = std::make_unique(old_stats); stats->repaired_at = repaired_at; - _components->statistics.contents[metadata_type::Stats] = std::move(stats); - rewrite_statistics(); + _components->statistics.contents[metadata_type::Stats] = std::move(stats); return old_repaired_at; } @@ -1383,18 +1382,9 @@ future<> sstable::copy_components(const sstable& src) { _recognized_components = src._recognized_components; } -void sstable::rewrite_statistics() { - sstlog.debug("Rewriting statistics component of sstable {}", get_filename()); - - auto lock = get_units(_mutate_sem, 1).get(); - file_output_stream_options options; - options.buffer_size = sstable_buffer_size; - auto w = make_component_file_writer(component_type::TemporaryStatistics, std::move(options), - open_flags::wo | open_flags::create | open_flags::truncate).get(); - write(_version, w, _components->statistics); - w.close(); - // rename() guarantees atomicity when renaming a file into place. - sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryStatistics)), fmt::to_string(filename(component_type::Statistics))).get(); +bool sstable::should_update_repaired_at(int64_t repaired_at) const { + const stats_metadata& stats = get_stats_metadata(); + return stats.repaired_at != repaired_at; } // Creates a new SSTable generation by hard-linking existing components and rewriting a specific one. @@ -3215,39 +3205,38 @@ void sstable::set_sstable_level(uint32_t new_level) { s.sstable_level = new_level; } -future<> sstable::mutate_sstable_level(uint32_t new_level) { +void sstable::mutate_sstable_level(uint32_t new_level) { if (!has_component(component_type::Statistics)) { - return make_ready_future<>(); + return; } auto entry = _components->statistics.contents.find(metadata_type::Stats); if (entry == _components->statistics.contents.end()) { - return make_ready_future<>(); + return; } auto& p = entry->second; if (!p) { - return make_exception_future<>(std::runtime_error("Statistics is malformed")); + throw std::runtime_error("Statistics is malformed"); } stats_metadata& s = *static_cast(p.get()); if (s.sstable_level == new_level) { - return make_ready_future<>(); + return; } s.sstable_level = new_level; - // Technically we don't have to write the whole file again. But the assumption that - // we will always write sequentially is a powerful one, and this does not merit an - // exception. - return seastar::async([this] { - // This is not part of the standard memtable flush path, but there is no reason - // to come up with a class just for that. It is used by the snapshot/restore mechanism - // which comprises mostly hard link creation and this operation at the end + this operation, - // and also (eventually) by some compaction strategy. In any of the cases, it won't be high - // priority enough so we will use the default priority - rewrite_statistics(); - }); } +bool sstable::should_mutate_sstable_level(uint32_t new_level) const { + try { + const auto& stats = get_stats_metadata(); + return stats.sstable_level != new_level; + } catch (...) { + return false; + } +} + + int sstable::compare_by_max_timestamp(const sstable& other) const { auto ts1 = get_stats_metadata().max_timestamp; auto ts2 = other.get_stats_metadata().max_timestamp; diff --git a/sstables/sstables.hh b/sstables/sstables.hh index c4000dfd1e..98fde8167d 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -722,9 +722,6 @@ private: future<> read_statistics(); void write_statistics(); - // Rewrite statistics component by creating a temporary Statistics and - // renaming it into place of existing one. - void rewrite_statistics(); // Validate metadata that's used to optimize reads when user specifies // a clustering key range. If this specific metadata is incorrect, then // it should be cleared. Otherwise, it could lead to bad decisions. @@ -1019,7 +1016,8 @@ public: return _components->compression; } - future<> mutate_sstable_level(uint32_t); + void mutate_sstable_level(uint32_t); + bool should_mutate_sstable_level(uint32_t) const; const summary& get_summary() const { return _components->summary; @@ -1134,10 +1132,9 @@ public: service::session_id being_repaired; public: void mark_as_being_repaired(const service::session_id& id); - // This function must run inside a seastar thread since it calls - // rewrite_statistics which must run inside a seastar thread. int64_t update_repaired_at(int64_t repaired_at); future<> copy_components(const sstable& src); + bool should_update_repaired_at(int64_t repaired_at) const; // Creates a new sstable by linking all sstable components except for the specified component, // which is created by calling the provided sstable_creator function and then written to the disc. diff --git a/test/boost/sstable_test.cc b/test/boost/sstable_test.cc index af216692a4..efc750bc24 100644 --- a/test/boost/sstable_test.cc +++ b/test/boost/sstable_test.cc @@ -13,14 +13,18 @@ #include #include #include +#include #include +#include "sstables/checksum_utils.hh" #include "sstables/generation_type.hh" #include "sstables/sstables.hh" #include "sstables/key.hh" #include "sstables/open_info.hh" #include "sstables/version.hh" +#include "test/lib/random_schema.hh" #include "test/lib/sstable_utils.hh" +#include "test/lib/random_utils.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/scylla_test_case.hh" #include "test/lib/test_utils.hh" @@ -398,22 +402,43 @@ SEASTAR_TEST_CASE(wrong_range) { }); } +future mutate_sstable_level(test_env& env, sstable_ptr sstp, const std::string& dir_path, uint32_t new_level) { + auto modifier = [new_level] (sstables::sstable& sst) { + sst.mutate_sstable_level(new_level); + }; + auto creator = [&env, &dir_path] (shared_sstable sstp) { + return env.make_sstable(sstp->get_schema(), dir_path, sstp->get_version()); + }; + + auto new_sst = co_await sstp->link_with_rewritten_component(std::move(creator), component_type::Statistics, modifier, false); + co_await sstp->unlink(); + sstp = new_sst; + + sstp = co_await env.reusable_sst(uncompressed_schema(), dir_path, sstp->generation()); + co_return sstp; +} + SEASTAR_TEST_CASE(statistics_rewrite) { return test_env::do_with_async([] (test_env& env) { - auto uncompressed_dir_copy = env.tempdir().path(); - for (const auto& entry : std::filesystem::directory_iterator(uncompressed_dir().c_str())) { - std::filesystem::copy(entry.path(), uncompressed_dir_copy / entry.path().filename()); - } + auto random_spec = tests::make_random_schema_specification( + "ks", + std::uniform_int_distribution(1, 4), + std::uniform_int_distribution(2, 4), + std::uniform_int_distribution(2, 8), + std::uniform_int_distribution(2, 8)); + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + auto schema = random_schema.schema(); - auto sstp = env.reusable_sst(uncompressed_schema(), uncompressed_dir_copy.native()).get(); - auto file_path = sstable::filename(uncompressed_dir_copy.native(), "ks", "cf", la, generation_from_value(1), big, component_type::Data); - auto exists = file_exists(file_path).get(); - BOOST_REQUIRE(exists); + const auto muts = tests::generate_random_mutations(random_schema, 2).get(); + auto sstp = make_sstable_containing(env.make_sstable(schema, sstable::version_types::me), muts); - // mutate_sstable_level results in statistics rewrite - sstp->mutate_sstable_level(10).get(); + auto toc_path = fmt::to_string(sstp->toc_filename()); + auto dir_path = std::filesystem::path(toc_path).parent_path().string(); - sstp = env.reusable_sst(uncompressed_schema(), uncompressed_dir_copy.native()).get(); + BOOST_REQUIRE(sstp->get_sstable_level() != 10); + + sstp = mutate_sstable_level(env, sstp, dir_path, 10).get(); + sstp = env.reusable_sst(schema, dir_path, sstp->generation()).get(); BOOST_REQUIRE(sstp->get_sstable_level() == 10); }); } diff --git a/test/cluster/test_incremental_repair.py b/test/cluster/test_incremental_repair.py index 9ae8e33311..d3b424aa05 100644 --- a/test/cluster/test_incremental_repair.py +++ b/test/cluster/test_incremental_repair.py @@ -541,7 +541,8 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager s1_mark = await logs[0].mark() await trigger_tablet_merge(manager, servers, logs) # The merge process will set the unrepaired sstable with repaired_at=3 to repaired_at=0 during merge - await logs[0].wait_for('Finished repaired_at update for tablet merge .* old=3 new=0 sstables_repaired_at=2', from_mark=s1_mark) + await logs[0].wait_for('Updating repaired_at for tablet merge .* old=3 new=0 sstables_repaired_at=2', from_mark=s1_mark) + await logs[0].wait_for('Completed updating repaired_at=.* for tablet merge', from_mark=s1_mark) for server in servers: await manager.server_stop_gracefully(server.server_id)