From e48ec6fed3e519e660426b25cfe8a31d33fff505 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 9 May 2023 11:52:47 +0200 Subject: [PATCH] db, storage_proxy: Drop mutation/frozen_mutation ::shard_of() dht::shard_of() does not use the correct sharder for tablet-based tables. Code which is supposed to work with all kinds of tables should use erm::get_sharder(). --- db/commitlog/commitlog_replayer.cc | 5 +++-- mutation/frozen_mutation.hh | 4 ++-- mutation/mutation.hh | 4 ---- replica/database.hh | 3 +++ service/storage_proxy.cc | 12 ++++++++---- test/boost/commitlog_test.cc | 5 +++-- test/boost/database_test.cc | 11 +++++++---- test/boost/secondary_index_test.cc | 2 +- 8 files changed, 27 insertions(+), 19 deletions(-) diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index 122ff458b3..9862e694e8 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -242,8 +242,9 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r return make_ready_future<>(); } - const auto& schema = *_db.local().find_column_family(uuid).schema(); - auto shard = fm.shard_of(schema); + auto& table = _db.local().find_column_family(uuid); + const auto& schema = *table.schema(); + auto shard = table.get_effective_replication_map()->shard_of(schema, fm.token(schema)); return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp] (replica::database& db) mutable -> future<> { auto& fm = cer.mutation(); // TODO: might need better verification that the deserialized mutation diff --git a/mutation/frozen_mutation.hh b/mutation/frozen_mutation.hh index a2dc1fc1fc..5fba24dc61 100644 --- a/mutation/frozen_mutation.hh +++ b/mutation/frozen_mutation.hh @@ -215,8 +215,8 @@ public: template auto consume_gently(schema_ptr s, frozen_mutation_consumer_adaptor& adaptor) const -> future>; - unsigned shard_of(const schema& s) const { - return dht::shard_of(s, dht::get_token(s, key())); + dht::token token(const schema& s) const { + return dht::get_token(s, key()); } struct printer { diff --git a/mutation/mutation.hh b/mutation/mutation.hh index 309cb256d2..bda975b559 100644 --- a/mutation/mutation.hh +++ b/mutation/mutation.hh @@ -178,10 +178,6 @@ public: // Range tombstones will be trimmed to the boundaries of the clustering ranges. mutation sliced(const query::clustering_row_ranges&) const; - unsigned shard_of() const { - return dht::shard_of(*schema(), token()); - } - // Returns a mutation which contains the same writes but in a minimal form. // Drops data covered by tombstones. // Does not drop expired tombstones. diff --git a/replica/database.hh b/replica/database.hh index aa72278fea..11ce36d2aa 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -772,6 +772,9 @@ public: future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_partition_slow(schema_ptr, reader_permit permit, const partition_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; + shard_id shard_of(const mutation& m) const { + return shard_of(m.token()); + } shard_id shard_of(dht::token t) const { return _erm ? _erm->shard_of(*_schema, t) : dht::shard_of(*_schema, t); // for tests. diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 42b805e14a..9085376ec8 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2840,7 +2840,8 @@ storage_proxy::response_id_type storage_proxy::unique_response_handler::release( future<> storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info) { - auto shard = m.shard_of(); + auto erm = _db.local().find_column_family(m.schema()).get_effective_replication_map(); + auto shard = erm->get_sharder(*m.schema()).shard_of(m.token()); get_stats().replica_cross_shard_ops += shard != this_shard_id(); return _db.invoke_on(shard, {smp_grp, timeout}, [s = global_schema_ptr(m.schema()), @@ -2856,7 +2857,8 @@ storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_sta future<> storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info) { - auto shard = m.shard_of(*s); + auto erm = _db.local().find_column_family(s).get_effective_replication_map(); + auto shard = erm->get_sharder(*s).shard_of(m.token(*s)); get_stats().replica_cross_shard_ops += shard != this_shard_id(); return _db.invoke_on(shard, {smp_grp, timeout}, [&m, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, rate_limit_info] (replica::database& db) mutable -> future<> { @@ -2877,7 +2879,8 @@ storage_proxy::mutate_locally(std::vector mutation, tracing::trace_sta } future<> storage_proxy::mutate_hint(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) { - auto shard = m.shard_of(*s); + auto erm = _db.local().find_column_family(s).get_effective_replication_map(); + auto shard = erm->get_sharder(*s).shard_of(m.token(*s)); get_stats().replica_cross_shard_ops += shard != this_shard_id(); return _db.invoke_on(shard, {_hints_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), tr_state = std::move(tr_state), timeout] (replica::database& db) mutable -> future<> { return db.apply_hint(gs, m, std::move(tr_state), timeout); @@ -2927,7 +2930,8 @@ storage_proxy::mutate_counters_on_leader(std::vector future<> storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation fm, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr trace_state, service_permit permit) { - auto shard = fm.shard_of(*s); + auto erm = _db.local().find_column_family(s).get_effective_replication_map(); + auto shard = erm->get_sharder(*s).shard_of(fm.token(*s)); bool local = shard == this_shard_id(); get_stats().replica_cross_shard_ops += !local; return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&proxy = container(), gs = global_schema_ptr(s), fm = std::move(fm), cl, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), permit = std::move(permit), local] (replica::database& db) { diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 47f6b160e0..6100b6ac89 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -613,9 +613,10 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ auto& table = db.find_column_family("ks", "t"); auto& cl = *table.commitlog(); auto s = table.schema(); + auto& sharder = table.get_effective_replication_map()->get_sharder(*table.schema()); auto memtables = table.active_memtables(); - auto add_entry = [&cl, s] (const partition_key& key) mutable { + auto add_entry = [&cl, s, &sharder] (const partition_key& key) mutable { auto md = tests::data_model::mutation_description(key.explode()); md.add_clustered_cell({}, "v", to_bytes("val")); auto m = md.build(s); @@ -623,7 +624,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ auto fm = freeze(m); commitlog_entry_writer cew(s, fm, db::commitlog::force_sync::yes); cl.add_entry(m.column_family_id(), cew, db::no_timeout).get(); - return m.shard_of(); + return sharder.shard_of(m.token()); }; const auto shard = add_entry(partition_key::make_empty()); diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 6fabb50c20..dacc9510ad 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -63,7 +63,8 @@ public: static future<> apply_mutation(sharded& sharded_db, table_id uuid, const mutation& m, bool do_flush = false, db::commitlog::force_sync fs = db::commitlog::force_sync::no, db::timeout_clock::time_point timeout = db::no_timeout) { - auto shard = m.shard_of(); + auto& t = sharded_db.local().find_column_family(uuid); + auto shard = t.shard_of(m); return sharded_db.invoke_on(shard, [uuid, fm = freeze(m), do_flush, fs, timeout] (replica::database& db) { auto& t = db.find_column_family(uuid); return db.apply(t.schema(), fm, tracing::trace_state_ptr(), fs, timeout).then([do_flush, &t] { @@ -94,6 +95,7 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) { sstring ks_name = "ks"; sstring cf_name = "cf"; auto s = db.find_schema(ks_name, cf_name); + auto&& table = db.find_column_family(s); auto uuid = s->id(); std::vector keys_per_shard; @@ -104,7 +106,7 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) { auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key{}", i))); mutation m(s, pkey); m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {}); - auto shard = m.shard_of(); + auto shard = table.shard_of(m); keys_per_shard[shard]++; pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey)))); apply_mutation(e.db(), uuid, m).get(); @@ -187,6 +189,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) { e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get(); auto& db = e.local_db(); auto s = db.find_schema("ks", "cf"); + auto&& table = db.find_column_family(s); auto uuid = s->id(); std::vector keys_per_shard; std::vector pranges_per_shard; @@ -197,7 +200,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) { mutation m(s, pkey); m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now())); apply_mutation(e.db(), uuid, m).get(); - auto shard = m.shard_of(); + auto shard = table.shard_of(m); pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey)))); } for (uint32_t i = 3 * smp::count; i <= 8 * smp::count; ++i) { @@ -205,7 +208,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) { mutation m(s, pkey); m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1); apply_mutation(e.db(), uuid, m).get(); - auto shard = m.shard_of(); + auto shard = table.shard_of(m); keys_per_shard[shard]++; pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey)))); } diff --git a/test/boost/secondary_index_test.cc b/test/boost/secondary_index_test.cc index 81b0e90dd7..43f2c9f6bb 100644 --- a/test/boost/secondary_index_test.cc +++ b/test/boost/secondary_index_test.cc @@ -1909,7 +1909,7 @@ SEASTAR_TEST_CASE(test_deleting_ghost_rows) { mutation m(schema, partition_key::from_singular(*schema, pk)); auto& row = m.partition().clustered_row(*schema, clustering_key::from_exploded(*schema, {int32_type->decompose(8), int32_type->decompose(7)})); row.apply(row_marker{api::new_timestamp()}); - unsigned shard = m.shard_of(); + unsigned shard = t.shard_of(m); if (shard == this_shard_id()) { t.apply(m); }