db, storage_proxy: Drop mutation/frozen_mutation ::shard_of()

dht::shard_of() does not use the correct sharder for tablet-based tables.
Code which is supposed to work with all kinds of tables should use erm::get_sharder().
This commit is contained in:
Tomasz Grabiec
2023-05-09 11:52:47 +02:00
parent d4497a058e
commit e48ec6fed3
8 changed files with 27 additions and 19 deletions

View File

@@ -242,8 +242,9 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
return make_ready_future<>();
}
const auto& schema = *_db.local().find_column_family(uuid).schema();
auto shard = fm.shard_of(schema);
auto& table = _db.local().find_column_family(uuid);
const auto& schema = *table.schema();
auto shard = table.get_effective_replication_map()->shard_of(schema, fm.token(schema));
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp] (replica::database& db) mutable -> future<> {
auto& fm = cer.mutation();
// TODO: might need better verification that the deserialized mutation

View File

@@ -215,8 +215,8 @@ public:
template<FlattenedConsumerV2 Consumer>
auto consume_gently(schema_ptr s, frozen_mutation_consumer_adaptor<Consumer>& adaptor) const -> future<frozen_mutation_consume_result<decltype(adaptor.consumer().consume_end_of_stream())>>;
unsigned shard_of(const schema& s) const {
return dht::shard_of(s, dht::get_token(s, key()));
dht::token token(const schema& s) const {
return dht::get_token(s, key());
}
struct printer {

View File

@@ -178,10 +178,6 @@ public:
// Range tombstones will be trimmed to the boundaries of the clustering ranges.
mutation sliced(const query::clustering_row_ranges&) const;
unsigned shard_of() const {
return dht::shard_of(*schema(), token());
}
// Returns a mutation which contains the same writes but in a minimal form.
// Drops data covered by tombstones.
// Does not drop expired tombstones.

View File

@@ -772,6 +772,9 @@ public:
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
future<const_mutation_partition_ptr> find_partition_slow(schema_ptr, reader_permit permit, const partition_key& key) const;
future<const_row_ptr> find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
shard_id shard_of(const mutation& m) const {
return shard_of(m.token());
}
shard_id shard_of(dht::token t) const {
return _erm ? _erm->shard_of(*_schema, t)
: dht::shard_of(*_schema, t); // for tests.

View File

@@ -2840,7 +2840,8 @@ storage_proxy::response_id_type storage_proxy::unique_response_handler::release(
future<>
storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info) {
auto shard = m.shard_of();
auto erm = _db.local().find_column_family(m.schema()).get_effective_replication_map();
auto shard = erm->get_sharder(*m.schema()).shard_of(m.token());
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, {smp_grp, timeout},
[s = global_schema_ptr(m.schema()),
@@ -2856,7 +2857,8 @@ storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_sta
future<>
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout,
smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info) {
auto shard = m.shard_of(*s);
auto erm = _db.local().find_column_family(s).get_effective_replication_map();
auto shard = erm->get_sharder(*s).shard_of(m.token(*s));
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, {smp_grp, timeout},
[&m, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, rate_limit_info] (replica::database& db) mutable -> future<> {
@@ -2877,7 +2879,8 @@ storage_proxy::mutate_locally(std::vector<mutation> mutation, tracing::trace_sta
}
future<>
storage_proxy::mutate_hint(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) {
auto shard = m.shard_of(*s);
auto erm = _db.local().find_column_family(s).get_effective_replication_map();
auto shard = erm->get_sharder(*s).shard_of(m.token(*s));
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, {_hints_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), tr_state = std::move(tr_state), timeout] (replica::database& db) mutable -> future<> {
return db.apply_hint(gs, m, std::move(tr_state), timeout);
@@ -2927,7 +2930,8 @@ storage_proxy::mutate_counters_on_leader(std::vector<frozen_mutation_and_schema>
future<>
storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation fm, db::consistency_level cl, clock_type::time_point timeout,
tracing::trace_state_ptr trace_state, service_permit permit) {
auto shard = fm.shard_of(*s);
auto erm = _db.local().find_column_family(s).get_effective_replication_map();
auto shard = erm->get_sharder(*s).shard_of(fm.token(*s));
bool local = shard == this_shard_id();
get_stats().replica_cross_shard_ops += !local;
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&proxy = container(), gs = global_schema_ptr(s), fm = std::move(fm), cl, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), permit = std::move(permit), local] (replica::database& db) {

View File

@@ -613,9 +613,10 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){
auto& table = db.find_column_family("ks", "t");
auto& cl = *table.commitlog();
auto s = table.schema();
auto& sharder = table.get_effective_replication_map()->get_sharder(*table.schema());
auto memtables = table.active_memtables();
auto add_entry = [&cl, s] (const partition_key& key) mutable {
auto add_entry = [&cl, s, &sharder] (const partition_key& key) mutable {
auto md = tests::data_model::mutation_description(key.explode());
md.add_clustered_cell({}, "v", to_bytes("val"));
auto m = md.build(s);
@@ -623,7 +624,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){
auto fm = freeze(m);
commitlog_entry_writer cew(s, fm, db::commitlog::force_sync::yes);
cl.add_entry(m.column_family_id(), cew, db::no_timeout).get();
return m.shard_of();
return sharder.shard_of(m.token());
};
const auto shard = add_entry(partition_key::make_empty());

View File

@@ -63,7 +63,8 @@ public:
static future<> apply_mutation(sharded<replica::database>& sharded_db, table_id uuid, const mutation& m, bool do_flush = false,
db::commitlog::force_sync fs = db::commitlog::force_sync::no, db::timeout_clock::time_point timeout = db::no_timeout) {
auto shard = m.shard_of();
auto& t = sharded_db.local().find_column_family(uuid);
auto shard = t.shard_of(m);
return sharded_db.invoke_on(shard, [uuid, fm = freeze(m), do_flush, fs, timeout] (replica::database& db) {
auto& t = db.find_column_family(uuid);
return db.apply(t.schema(), fm, tracing::trace_state_ptr(), fs, timeout).then([do_flush, &t] {
@@ -94,6 +95,7 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
sstring ks_name = "ks";
sstring cf_name = "cf";
auto s = db.find_schema(ks_name, cf_name);
auto&& table = db.find_column_family(s);
auto uuid = s->id();
std::vector<size_t> keys_per_shard;
@@ -104,7 +106,7 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key{}", i)));
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
auto shard = m.shard_of();
auto shard = table.shard_of(m);
keys_per_shard[shard]++;
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
apply_mutation(e.db(), uuid, m).get();
@@ -187,6 +189,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
auto& db = e.local_db();
auto s = db.find_schema("ks", "cf");
auto&& table = db.find_column_family(s);
auto uuid = s->id();
std::vector<size_t> keys_per_shard;
std::vector<dht::partition_range_vector> pranges_per_shard;
@@ -197,7 +200,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
mutation m(s, pkey);
m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now()));
apply_mutation(e.db(), uuid, m).get();
auto shard = m.shard_of();
auto shard = table.shard_of(m);
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
}
for (uint32_t i = 3 * smp::count; i <= 8 * smp::count; ++i) {
@@ -205,7 +208,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1);
apply_mutation(e.db(), uuid, m).get();
auto shard = m.shard_of();
auto shard = table.shard_of(m);
keys_per_shard[shard]++;
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
}

View File

@@ -1909,7 +1909,7 @@ SEASTAR_TEST_CASE(test_deleting_ghost_rows) {
mutation m(schema, partition_key::from_singular(*schema, pk));
auto& row = m.partition().clustered_row(*schema, clustering_key::from_exploded(*schema, {int32_type->decompose(8), int32_type->decompose(7)}));
row.apply(row_marker{api::new_timestamp()});
unsigned shard = m.shard_of();
unsigned shard = t.shard_of(m);
if (shard == this_shard_id()) {
t.apply(m);
}