Merge 'Propagate tracing to materialized view update path' from Piotr S
In order to improve materialized views' debuggability, tracing points are added to view update generation path.
Example trace:
```
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
Execute CQL3 query | 2020-04-27 13:13:46.834000 | 127.0.0.1 | 0 | 127.0.0.1
Parsing a statement [shard 0] | 2020-04-27 13:13:46.834346 | 127.0.0.1 | 1 | 127.0.0.1
Processing a statement [shard 0] | 2020-04-27 13:13:46.834426 | 127.0.0.1 | 80 | 127.0.0.1
Creating write handler for token: -3248873570005575792 natural: {127.0.0.1, 127.0.0.3} pending: {} [shard 0] | 2020-04-27 13:13:46.834494 | 127.0.0.1 | 148 | 127.0.0.1
Creating write handler with live: {127.0.0.3, 127.0.0.1} dead: {} [shard 0] | 2020-04-27 13:13:46.834507 | 127.0.0.1 | 161 | 127.0.0.1
Sending a mutation to /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.834519 | 127.0.0.1 | 173 | 127.0.0.1
Executing a mutation locally [shard 0] | 2020-04-27 13:13:46.834532 | 127.0.0.1 | 186 | 127.0.0.1
View updates for ks.t require read-before-write - base table reader is created [shard 0] | 2020-04-27 13:13:46.834570 | 127.0.0.1 | 224 | 127.0.0.1
Reading key {{-3248873570005575792, pk{000400000002}}} from sstable /home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Data.db [shard 0] | 2020-04-27 13:13:46.834608 | 127.0.0.1 | 262 | 127.0.0.1
/home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Index.db: scheduling bulk DMA read of size 8 at offset 0 [shard 0] | 2020-04-27 13:13:46.834635 | 127.0.0.1 | 289 | 127.0.0.1
/home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Index.db: finished bulk DMA read of size 8 at offset 0, successfully read 8 bytes [shard 0] | 2020-04-27 13:13:46.834975 | 127.0.0.1 | 629 | 127.0.0.1
Message received from /127.0.0.1 [shard 0] | 2020-04-27 13:13:46.834988 | 127.0.0.3 | 11 | 127.0.0.1
/home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Data.db: scheduling bulk DMA read of size 41 at offset 0 [shard 0] | 2020-04-27 13:13:46.835015 | 127.0.0.1 | 669 | 127.0.0.1
View updates for ks.t require read-before-write - base table reader is created [shard 0] | 2020-04-27 13:13:46.835020 | 127.0.0.3 | 44 | 127.0.0.1
Generated 1 view update mutations [shard 0] | 2020-04-27 13:13:46.835080 | 127.0.0.3 | 104 | 127.0.0.1
Sending view update for ks.t_v2_idx_index to 127.0.0.2, with pending endpoints = {}; base token = -3248873570005575792; view token = 3728482343045213994 [shard 0] | 2020-04-27 13:13:46.835095 | 127.0.0.3 | 119 | 127.0.0.1
Sending a mutation to /127.0.0.2 [shard 0] | 2020-04-27 13:13:46.835105 | 127.0.0.3 | 129 | 127.0.0.1
View updates for ks.t were generated and propagated [shard 0] | 2020-04-27 13:13:46.835117 | 127.0.0.3 | 141 | 127.0.0.1
/home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Data.db: finished bulk DMA read of size 41 at offset 0, successfully read 41 bytes [shard 0] | 2020-04-27 13:13:46.835160 | 127.0.0.1 | 813 | 127.0.0.1
Sending mutation_done to /127.0.0.1 [shard 0] | 2020-04-27 13:13:46.835164 | 127.0.0.3 | 188 | 127.0.0.1
Mutation handling is done [shard 0] | 2020-04-27 13:13:46.835177 | 127.0.0.3 | 201 | 127.0.0.1
Generated 1 view update mutations [shard 0] | 2020-04-27 13:13:46.835215 | 127.0.0.1 | 869 | 127.0.0.1
Locally applying view update for ks.t_v2_idx_index; base token = -3248873570005575792; view token = 3728482343045213994 [shard 0] | 2020-04-27 13:13:46.835226 | 127.0.0.1 | 880 | 127.0.0.1
Successfully applied local view update for 127.0.0.1 and 0 remote endpoints [shard 0] | 2020-04-27 13:13:46.835253 | 127.0.0.1 | 907 | 127.0.0.1
View updates for ks.t were generated and propagated [shard 0] | 2020-04-27 13:13:46.835256 | 127.0.0.1 | 910 | 127.0.0.1
Got a response from /127.0.0.1 [shard 0] | 2020-04-27 13:13:46.835274 | 127.0.0.1 | 928 | 127.0.0.1
Delay decision due to throttling: do not delay, resuming now [shard 0] | 2020-04-27 13:13:46.835276 | 127.0.0.1 | 930 | 127.0.0.1
Mutation successfully completed [shard 0] | 2020-04-27 13:13:46.835279 | 127.0.0.1 | 933 | 127.0.0.1
Done processing - preparing a result [shard 0] | 2020-04-27 13:13:46.835286 | 127.0.0.1 | 941 | 127.0.0.1
Message received from /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.835331 | 127.0.0.2 | 14 | 127.0.0.1
Sending mutation_done to /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.835399 | 127.0.0.2 | 82 | 127.0.0.1
Mutation handling is done [shard 0] | 2020-04-27 13:13:46.835413 | 127.0.0.2 | 96 | 127.0.0.1
Got a response from /127.0.0.2 [shard 0] | 2020-04-27 13:13:46.835639 | 127.0.0.3 | 662 | 127.0.0.1
Delay decision due to throttling: do not delay, resuming now [shard 0] | 2020-04-27 13:13:46.835640 | 127.0.0.3 | 664 | 127.0.0.1
Successfully applied view update for 127.0.0.2 and 1 remote endpoints [shard 0] | 2020-04-27 13:13:46.835649 | 127.0.0.3 | 673 | 127.0.0.1
Got a response from /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.835841 | 127.0.0.1 | 1495 | 127.0.0.1
Request complete | 2020-04-27 13:13:46.834944 | 127.0.0.1 | 944 | 127.0.0.1
```
Fixes #6175
Tests: unit(dev), manual
* psarna-propagate_tracing_to_more_write_paths:
db,view: add tracing to view update generation path
treewide: propagate trace state to write path
This commit is contained in:
14
database.cc
14
database.cc
@@ -1516,7 +1516,7 @@ future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::
|
||||
return apply_in_memory(m, std::move(s), {}, timeout);
|
||||
}
|
||||
|
||||
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
|
||||
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
|
||||
// I'm doing a nullcheck here since the init code path for db etc
|
||||
// is a little in flux and commitlog is created only when db is
|
||||
// initied from datadir.
|
||||
@@ -1535,7 +1535,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_
|
||||
if (cf.views().empty()) {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally([op = std::move(op)] { });
|
||||
}
|
||||
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m, timeout);
|
||||
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m, timeout, std::move(tr_state));
|
||||
return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op), sync] (row_locker::lock_holder lock) mutable {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally(
|
||||
// Hold the local lock on the base-table partition or row
|
||||
@@ -1568,7 +1568,7 @@ void database::update_write_metrics_for_timed_out_write() {
|
||||
++_stats->total_writes_timedout;
|
||||
}
|
||||
|
||||
future<> database::apply(schema_ptr s, const frozen_mutation& m, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout) {
|
||||
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout) {
|
||||
if (dblog.is_enabled(logging::log_level::trace)) {
|
||||
dblog.trace("apply {}", m.pretty_printer(s));
|
||||
}
|
||||
@@ -1576,15 +1576,15 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, db::commitlog::
|
||||
update_write_metrics_for_timed_out_write();
|
||||
return make_exception_future<>(timed_out_error{});
|
||||
}
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), timeout, sync));
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync));
|
||||
}
|
||||
|
||||
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, db::timeout_clock::time_point timeout) {
|
||||
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) {
|
||||
if (dblog.is_enabled(logging::log_level::trace)) {
|
||||
dblog.trace("apply hint {}", m.pretty_printer(s));
|
||||
}
|
||||
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, timeout] () mutable {
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), timeout, db::commitlog::force_sync::no));
|
||||
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable {
|
||||
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
16
database.hh
16
database.hh
@@ -996,8 +996,8 @@ public:
|
||||
void remove_view(view_ptr v);
|
||||
void clear_views();
|
||||
const std::vector<view_ptr>& views() const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const;
|
||||
future<row_locker::lock_holder>
|
||||
stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
||||
std::vector<sstables::shared_sstable>& excluded_sstables) const;
|
||||
@@ -1029,12 +1029,13 @@ public:
|
||||
|
||||
private:
|
||||
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source,
|
||||
const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
|
||||
tracing::trace_state_ptr tr_state, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
|
||||
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
|
||||
future<> generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings) const;
|
||||
flat_mutation_reader_opt existings,
|
||||
tracing::trace_state_ptr tr_state) const;
|
||||
|
||||
mutable row_locker _row_locker;
|
||||
future<row_locker::lock_holder> local_base_lock(
|
||||
@@ -1364,6 +1365,7 @@ private:
|
||||
database*,
|
||||
schema_ptr,
|
||||
const frozen_mutation&,
|
||||
tracing::trace_state_ptr,
|
||||
db::timeout_clock::time_point,
|
||||
db::commitlog::force_sync> _apply_stage;
|
||||
|
||||
@@ -1412,7 +1414,7 @@ private:
|
||||
void setup_metrics();
|
||||
|
||||
friend class db_apply_executor;
|
||||
future<> do_apply(schema_ptr, const frozen_mutation&, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync);
|
||||
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync);
|
||||
future<> apply_with_commitlog(schema_ptr, column_family&, utils::UUID, const frozen_mutation&, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync);
|
||||
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
|
||||
|
||||
@@ -1514,8 +1516,8 @@ public:
|
||||
db::timeout_clock::time_point timeout);
|
||||
// Apply the mutation atomically.
|
||||
// Throws timed_out_error when timeout is reached.
|
||||
future<> apply(schema_ptr, const frozen_mutation&, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout);
|
||||
future<> apply_hint(schema_ptr, const frozen_mutation&, db::timeout_clock::time_point timeout);
|
||||
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout);
|
||||
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
|
||||
future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented);
|
||||
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
|
||||
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);
|
||||
|
||||
@@ -290,7 +290,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
|
||||
mutation m(schema, key);
|
||||
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
|
||||
m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
|
||||
return _qp.proxy().mutate_locally(m);
|
||||
return _qp.proxy().mutate_locally(m, tracing::trace_state_ptr());
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
@@ -594,7 +594,7 @@ public:
|
||||
db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations);
|
||||
}
|
||||
}
|
||||
return _qp.proxy().mutate_locally(std::move(mutations));
|
||||
return _qp.proxy().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
||||
}
|
||||
|
||||
future<> flush_schemas() {
|
||||
|
||||
@@ -224,7 +224,7 @@ future<> save_system_schema(const sstring & ksname) {
|
||||
deletion_timestamp), ksm->name()).discard_result();
|
||||
}).then([ksm] {
|
||||
auto mvec = make_create_keyspace_mutations(ksm, schema_creation_timestamp(), true);
|
||||
return qctx->proxy().mutate_locally(std::move(mvec));
|
||||
return qctx->proxy().mutate_locally(std::move(mvec), tracing::trace_state_ptr());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -926,7 +926,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
|
||||
#endif
|
||||
|
||||
proxy.local().mutate_locally(std::move(mutations)).get0();
|
||||
proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()).get0();
|
||||
|
||||
if (do_flush) {
|
||||
proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) {
|
||||
|
||||
@@ -1072,6 +1072,7 @@ future<> mutate_MV(
|
||||
std::vector<frozen_mutation_and_schema> view_updates,
|
||||
db::view::stats& stats,
|
||||
cf_stats& cf_stats,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
db::timeout_semaphore_units pending_view_updates,
|
||||
service::allow_hints allow_hints,
|
||||
wait_for_all_updates wait_for_all)
|
||||
@@ -1083,7 +1084,7 @@ future<> mutate_MV(
|
||||
auto& keyspace_name = mut.s->ks_name();
|
||||
auto paired_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
|
||||
auto pending_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name);
|
||||
auto maybe_account_failure = [&stats, &cf_stats, units = pending_view_updates.split(mut.fm.representation().size())] (
|
||||
auto maybe_account_failure = [tr_state, &stats, &cf_stats, units = pending_view_updates.split(mut.fm.representation().size())] (
|
||||
future<>&& f,
|
||||
gms::inet_address target,
|
||||
bool is_local,
|
||||
@@ -1094,9 +1095,13 @@ future<> mutate_MV(
|
||||
cf_stats.total_view_updates_failed_local += is_local;
|
||||
cf_stats.total_view_updates_failed_remote += remotes;
|
||||
auto ep = f.get_exception();
|
||||
tracing::trace(tr_state, "Failed to apply {}view update for {} and {} remote endpoints",
|
||||
seastar::value_of([is_local]{return is_local ? "local " : "";}), target, remotes);
|
||||
vlogger.error("Error applying view update to {}: {}", target, ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
} else {
|
||||
tracing::trace(tr_state, "Successfully applied {}view update for {} and {} remote endpoints",
|
||||
seastar::value_of([is_local]{return is_local ? "local " : "";}), target, remotes);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
};
|
||||
@@ -1125,7 +1130,9 @@ future<> mutate_MV(
|
||||
// writes but mutate_locally() doesn't, so we need to do that here.
|
||||
++stats.writes;
|
||||
auto mut_ptr = std::make_unique<frozen_mutation>(std::move(mut.fm));
|
||||
future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped(
|
||||
tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}",
|
||||
mut.s->ks_name(), mut.s->cf_name(), base_token, view_token);
|
||||
future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, std::move(tr_state), db::commitlog::force_sync::no).then_wrapped(
|
||||
[&stats,
|
||||
maybe_account_failure = std::move(maybe_account_failure),
|
||||
mut_ptr = std::move(mut_ptr)] (future<>&& f) {
|
||||
@@ -1142,11 +1149,14 @@ future<> mutate_MV(
|
||||
// to send the update there. Currently, we do this from *each* of
|
||||
// the base replicas, but this is probably excessive - see
|
||||
// See https://issues.apache.org/jira/browse/CASSANDRA-14262/
|
||||
tracing::trace(tr_state, "Sending view update for {}.{} to {}, with pending endpoints = {}; base token = {}; view token = {}",
|
||||
mut.s->ks_name(), mut.s->cf_name(), *paired_endpoint, pending_endpoints, base_token, view_token);
|
||||
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
|
||||
std::move(mut),
|
||||
*paired_endpoint,
|
||||
std::move(pending_endpoints),
|
||||
db::write_type::VIEW,
|
||||
std::move(tr_state),
|
||||
stats,
|
||||
allow_hints).then_wrapped(
|
||||
[paired_endpoint,
|
||||
@@ -1178,11 +1188,14 @@ future<> mutate_MV(
|
||||
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
|
||||
auto target = pending_endpoints.back();
|
||||
pending_endpoints.pop_back();
|
||||
tracing::trace(tr_state, "Sending view update for {}.{} to {}, with pending endpoints = {}; base token = {}; view token = {}",
|
||||
mut.s->ks_name(), mut.s->cf_name(), target, pending_endpoints, base_token, view_token);
|
||||
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
|
||||
std::move(mut),
|
||||
target,
|
||||
std::move(pending_endpoints),
|
||||
db::write_type::VIEW,
|
||||
std::move(tr_state),
|
||||
allow_hints).then_wrapped(
|
||||
[target,
|
||||
updates_pushed_remote,
|
||||
|
||||
@@ -109,6 +109,7 @@ future<> mutate_MV(
|
||||
std::vector<frozen_mutation_and_schema> view_updates,
|
||||
db::view::stats& stats,
|
||||
cf_stats& cf_stats,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
db::timeout_semaphore_units pending_view_updates,
|
||||
service::allow_hints allow_hints,
|
||||
wait_for_all_updates wait_for_all);
|
||||
|
||||
@@ -180,7 +180,7 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
|
||||
f = f.then([schema, &decision, timeout, tr_state] {
|
||||
logger.debug("Committing decision {}", decision);
|
||||
tracing::trace(tr_state, "Committing decision {}", decision);
|
||||
return get_local_storage_proxy().mutate_locally(schema, decision.update, db::commitlog::force_sync::yes, timeout);
|
||||
return get_local_storage_proxy().mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
|
||||
});
|
||||
} else {
|
||||
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
|
||||
|
||||
@@ -205,7 +205,7 @@ public:
|
||||
auto m = _mutations[utils::fb_utilities::get_broadcast_address()];
|
||||
if (m) {
|
||||
tracing::trace(tr_state, "Executing a mutation locally");
|
||||
return sp.mutate_locally(_schema, *m, db::commitlog::force_sync::no, timeout);
|
||||
return sp.mutate_locally(_schema, *m, std::move(tr_state), db::commitlog::force_sync::no, timeout);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -256,7 +256,7 @@ public:
|
||||
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
tracing::trace(tr_state, "Executing a mutation locally");
|
||||
return sp.mutate_locally(_schema, *_mutation, db::commitlog::force_sync::no, timeout);
|
||||
return sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout);
|
||||
}
|
||||
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
|
||||
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
|
||||
@@ -286,7 +286,7 @@ public:
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
// A hint will be sent to all relevant endpoints when the endpoint it was originally intended for
|
||||
// becomes unavailable - this might include the current node
|
||||
return sp.mutate_hint(_schema, *_mutation, timeout);
|
||||
return sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout);
|
||||
}
|
||||
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
|
||||
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
|
||||
@@ -1783,38 +1783,40 @@ storage_proxy::response_id_type storage_proxy::unique_response_handler::release(
|
||||
}
|
||||
|
||||
future<>
|
||||
storage_proxy::mutate_locally(const mutation& m, clock_type::time_point timeout) {
|
||||
storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) {
|
||||
auto shard = _db.local().shard_of(m);
|
||||
get_stats().replica_cross_shard_ops += shard != this_shard_id();
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [s = global_schema_ptr(m.schema()), m = freeze(m), timeout] (database& db) -> future<> {
|
||||
return db.apply(s, m, db::commitlog::force_sync::no, timeout);
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout},
|
||||
[s = global_schema_ptr(m.schema()), m = freeze(m), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout] (database& db) mutable -> future<> {
|
||||
return db.apply(s, m, gtr.get(), db::commitlog::force_sync::no, timeout);
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, db::commitlog::force_sync sync, clock_type::time_point timeout) {
|
||||
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout) {
|
||||
auto shard = _db.local().shard_of(m);
|
||||
get_stats().replica_cross_shard_ops += shard != this_shard_id();
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), timeout, sync] (database& db) -> future<> {
|
||||
return db.apply(gs, m, sync, timeout);
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout},
|
||||
[&m, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync] (database& db) mutable -> future<> {
|
||||
return db.apply(gs, m, gtr.get(), sync, timeout);
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
storage_proxy::mutate_locally(std::vector<mutation> mutations, clock_type::time_point timeout) {
|
||||
return do_with(std::move(mutations), [this, timeout] (std::vector<mutation>& pmut){
|
||||
return parallel_for_each(pmut.begin(), pmut.end(), [this, timeout] (const mutation& m) {
|
||||
return mutate_locally(m, timeout);
|
||||
storage_proxy::mutate_locally(std::vector<mutation> mutations, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) {
|
||||
return do_with(std::move(mutations), [this, timeout, tr_state = std::move(tr_state)] (std::vector<mutation>& pmut) mutable {
|
||||
return parallel_for_each(pmut.begin(), pmut.end(), [this, tr_state = std::move(tr_state), timeout] (const mutation& m) mutable {
|
||||
return mutate_locally(m, tr_state, timeout);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
storage_proxy::mutate_hint(const schema_ptr& s, const frozen_mutation& m, clock_type::time_point timeout) {
|
||||
storage_proxy::mutate_hint(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) {
|
||||
auto shard = _db.local().shard_of(m);
|
||||
get_stats().replica_cross_shard_ops += shard != this_shard_id();
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), timeout] (database& db) -> future<> {
|
||||
return db.apply_hint(gs, m, timeout);
|
||||
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), tr_state = std::move(tr_state), timeout] (database& db) mutable -> future<> {
|
||||
return db.apply_hint(gs, m, std::move(tr_state), timeout);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2017,7 +2019,7 @@ future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutat
|
||||
}
|
||||
|
||||
future<> storage_proxy::mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl,
|
||||
std::optional<clock_type::time_point> timeout_opt) {
|
||||
tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt) {
|
||||
return parallel_for_each(ids, [this, cl, timeout_opt] (unique_response_handler& protected_response) {
|
||||
auto response_id = protected_response.id;
|
||||
// This function, mutate_begin(), is called after a preemption point
|
||||
@@ -2290,9 +2292,10 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool c
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
return mutate_prepare(mutations, cl, type, tr_state, std::move(permit)).then([this, cl, timeout_opt, tracker = std::move(cdc_tracker)] (std::vector<storage_proxy::unique_response_handler> ids) {
|
||||
return mutate_prepare(mutations, cl, type, tr_state, std::move(permit)).then([this, cl, timeout_opt, tracker = std::move(cdc_tracker),
|
||||
tr_state] (std::vector<storage_proxy::unique_response_handler> ids) mutable {
|
||||
register_cdc_operation_result_tracker(ids, tracker);
|
||||
return mutate_begin(std::move(ids), cl, timeout_opt);
|
||||
return mutate_begin(std::move(ids), cl, tr_state, timeout_opt);
|
||||
}).then_wrapped([this, p = shared_from_this(), lc, tr_state] (future<> f) mutable {
|
||||
return p->mutate_end(std::move(f), lc, get_stats(), std::move(tr_state));
|
||||
});
|
||||
@@ -2375,7 +2378,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit));
|
||||
}).then([this, cl] (std::vector<unique_response_handler> ids) {
|
||||
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
|
||||
return _p.mutate_begin(std::move(ids), cl, _timeout);
|
||||
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
|
||||
});
|
||||
}
|
||||
future<> sync_write_to_batchlog() {
|
||||
@@ -2402,7 +2405,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
return sync_write_to_batchlog().then([this, ids = std::move(ids)] () mutable {
|
||||
tracing::trace(_trace_state, "Sending batch mutations");
|
||||
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
|
||||
return _p.mutate_begin(std::move(ids), _cl, _timeout);
|
||||
return _p.mutate_begin(std::move(ids), _cl, _trace_state, _timeout);
|
||||
}).then(std::bind(&context::async_remove_from_batchlog, this));
|
||||
});
|
||||
}
|
||||
@@ -2445,6 +2448,7 @@ future<> storage_proxy::send_to_endpoint(
|
||||
gms::inet_address target,
|
||||
std::vector<gms::inet_address> pending_endpoints,
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
write_stats& stats,
|
||||
allow_hints allow_hints) {
|
||||
utils::latency_counter lc;
|
||||
@@ -2458,7 +2462,7 @@ future<> storage_proxy::send_to_endpoint(
|
||||
timeout = clock_type::now() + 5min;
|
||||
}
|
||||
return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(),
|
||||
[this, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
|
||||
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
|
||||
std::unique_ptr<mutation_holder>& m,
|
||||
db::consistency_level cl,
|
||||
db::write_type type, service_permit permit) mutable {
|
||||
@@ -2481,11 +2485,11 @@ future<> storage_proxy::send_to_endpoint(
|
||||
std::move(targets),
|
||||
pending_endpoints,
|
||||
std::move(dead_endpoints),
|
||||
nullptr,
|
||||
tr_state,
|
||||
stats,
|
||||
std::move(permit));
|
||||
}).then([this, cl, timeout = std::move(timeout)] (std::vector<unique_response_handler> ids) mutable {
|
||||
return mutate_begin(std::move(ids), cl, std::move(timeout));
|
||||
}).then([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (std::vector<unique_response_handler> ids) mutable {
|
||||
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
|
||||
}).then_wrapped([p = shared_from_this(), lc, &stats] (future<>&& f) {
|
||||
return p->mutate_end(std::move(f), lc, stats, nullptr);
|
||||
});
|
||||
@@ -2496,12 +2500,14 @@ future<> storage_proxy::send_to_endpoint(
|
||||
gms::inet_address target,
|
||||
std::vector<gms::inet_address> pending_endpoints,
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
allow_hints allow_hints) {
|
||||
return send_to_endpoint(
|
||||
std::make_unique<shared_mutation>(std::move(fm_a_s)),
|
||||
std::move(target),
|
||||
std::move(pending_endpoints),
|
||||
type,
|
||||
std::move(tr_state),
|
||||
get_stats(),
|
||||
allow_hints);
|
||||
}
|
||||
@@ -2511,6 +2517,7 @@ future<> storage_proxy::send_to_endpoint(
|
||||
gms::inet_address target,
|
||||
std::vector<gms::inet_address> pending_endpoints,
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
write_stats& stats,
|
||||
allow_hints allow_hints) {
|
||||
return send_to_endpoint(
|
||||
@@ -2518,6 +2525,7 @@ future<> storage_proxy::send_to_endpoint(
|
||||
std::move(target),
|
||||
std::move(pending_endpoints),
|
||||
type,
|
||||
std::move(tr_state),
|
||||
stats,
|
||||
allow_hints);
|
||||
}
|
||||
@@ -2529,6 +2537,7 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
|
||||
std::move(target),
|
||||
{ },
|
||||
db::write_type::SIMPLE,
|
||||
tracing::trace_state_ptr(),
|
||||
get_stats(),
|
||||
allow_hints::no);
|
||||
}
|
||||
@@ -2538,6 +2547,7 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
|
||||
std::move(target),
|
||||
{ },
|
||||
db::write_type::SIMPLE,
|
||||
tracing::trace_state_ptr(),
|
||||
get_stats(),
|
||||
allow_hints::no);
|
||||
}
|
||||
@@ -4827,9 +4837,9 @@ void storage_proxy::init_messaging_service() {
|
||||
utils::UUID schema_version = in.schema_version();
|
||||
return handle_write(src_addr, t, schema_version, std::move(in), std::move(forward), reply_to, shard, response_id,
|
||||
trace_info ? *trace_info : std::nullopt,
|
||||
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr, schema_ptr s, const frozen_mutation& m,
|
||||
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s, const frozen_mutation& m,
|
||||
clock_type::time_point timeout) {
|
||||
return p->mutate_locally(std::move(s), m, db::commitlog::force_sync::no, timeout);
|
||||
return p->mutate_locally(std::move(s), m, std::move(tr_state), db::commitlog::force_sync::no, timeout);
|
||||
},
|
||||
/* forward_fn */ [] (netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const frozen_mutation& m,
|
||||
gms::inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||
|
||||
@@ -386,7 +386,7 @@ private:
|
||||
future<std::vector<unique_response_handler>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler handler);
|
||||
template<typename Range>
|
||||
future<std::vector<unique_response_handler>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
future<> mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl, std::optional<clock_type::time_point> timeout_opt = { });
|
||||
future<> mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
|
||||
future<> mutate_end(future<> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
|
||||
future<> schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
bool need_throttle_writes() const;
|
||||
@@ -412,6 +412,7 @@ private:
|
||||
gms::inet_address target,
|
||||
std::vector<gms::inet_address> pending_endpoints,
|
||||
db::write_type type,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
write_stats& stats,
|
||||
allow_hints allow_hints = allow_hints::yes);
|
||||
|
||||
@@ -456,15 +457,15 @@ public:
|
||||
|
||||
// Applies mutation on this node.
|
||||
// Resolves with timed_out_error when timeout is reached.
|
||||
future<> mutate_locally(const mutation& m, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
future<> mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
// Applies mutation on this node.
|
||||
// Resolves with timed_out_error when timeout is reached.
|
||||
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
// Applies mutations on this node.
|
||||
// Resolves with timed_out_error when timeout is reached.
|
||||
future<> mutate_locally(std::vector<mutation> mutation, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
future<> mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
|
||||
future<> mutate_hint(const schema_ptr&, const frozen_mutation& m, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
future<> mutate_hint(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
|
||||
future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented);
|
||||
|
||||
/**
|
||||
@@ -506,8 +507,10 @@ public:
|
||||
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
|
||||
// hinted handoff support, and just one target. See also
|
||||
// send_to_live_endpoints() - another take on the same original function.
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type, allow_hints allow_hints = allow_hints::yes);
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type,
|
||||
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type,
|
||||
tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes);
|
||||
|
||||
// Send a mutation to a specific remote target as a hint.
|
||||
// Unlike regular mutations during write operations, hints are sent on the streaming connection
|
||||
|
||||
37
table.cc
37
table.cc
@@ -2069,15 +2069,17 @@ static size_t memory_usage_of(const std::vector<frozen_mutation_and_schema>& ms)
|
||||
future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings) const {
|
||||
flat_mutation_reader_opt existings,
|
||||
tracing::trace_state_ptr tr_state) const {
|
||||
auto base_token = m.token();
|
||||
return db::view::generate_view_updates(
|
||||
base,
|
||||
std::move(views),
|
||||
flat_mutation_reader_from_mutations({std::move(m)}),
|
||||
std::move(existings)).then([this, base_token = std::move(base_token)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
|
||||
std::move(existings)).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
|
||||
tracing::trace(tr_state, "Generated {} view update mutations", updates.size());
|
||||
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats,
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(tr_state),
|
||||
std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no).handle_exception([] (auto ignored) { });
|
||||
});
|
||||
}
|
||||
@@ -2192,8 +2194,8 @@ future<> table::populate_views(
|
||||
units_to_consume = update_size - units_to_wait_for,
|
||||
this] (db::timeout_semaphore_units&& units) mutable {
|
||||
units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, units_to_consume));
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats,
|
||||
*_config.cf_stats, std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats,
|
||||
tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2518,14 +2520,14 @@ future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable>
|
||||
* Given an update for the base table, calculates the set of potentially affected views,
|
||||
* generates the relevant updates, and sends them to the paired view replicas.
|
||||
*/
|
||||
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const {
|
||||
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const {
|
||||
//FIXME: Avoid unfreezing here.
|
||||
auto m = fm.unfreeze(s);
|
||||
return push_view_replica_updates(s, std::move(m), timeout);
|
||||
return push_view_replica_updates(s, std::move(m), timeout, std::move(tr_state));
|
||||
}
|
||||
|
||||
future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source,
|
||||
const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const {
|
||||
tracing::trace_state_ptr tr_state, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const {
|
||||
if (!_config.view_update_concurrency_semaphore->current()) {
|
||||
// We don't have resources to generate view updates for this write. If we reached this point, we failed to
|
||||
// throttle the client. The memory queue is already full, waiting on the semaphore would cause this node to
|
||||
@@ -2543,7 +2545,8 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
}
|
||||
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
|
||||
if (cr_ranges.empty()) {
|
||||
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }).then([] {
|
||||
tracing::trace(tr_state, "View updates do not require read-before-write");
|
||||
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }, std::move(tr_state)).then([] {
|
||||
// In this case we are not doing a read-before-write, just a
|
||||
// write, so no lock is needed.
|
||||
return make_ready_future<row_locker::lock_holder>();
|
||||
@@ -2566,14 +2569,16 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
// We'll return this lock to the caller, which will release it after
|
||||
// writing the base-table update.
|
||||
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
|
||||
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, source = std::move(source), &io_priority] (row_locker::lock_holder lock) {
|
||||
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, source = std::move(source), tr_state = std::move(tr_state), &io_priority] (row_locker::lock_holder lock) mutable {
|
||||
tracing::trace(tr_state, "View updates for {}.{} require read-before-write - base table reader is created", base->ks_name(), base->cf_name());
|
||||
return do_with(
|
||||
dht::partition_range::make_singular(m.decorated_key()),
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = source.make_reader(base, no_reader_permit(), pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority, tr_state = std::move(tr_state)] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = source.make_reader(base, no_reader_permit(), pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader), tr_state).then([base, tr_state = std::move(tr_state), lock = std::move(lock)] () mutable {
|
||||
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
|
||||
// return the local partition/row lock we have taken so it
|
||||
// remains locked until the caller is done modifying this
|
||||
// partition/row and destroys the lock object.
|
||||
@@ -2583,16 +2588,16 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
});
|
||||
}
|
||||
|
||||
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const {
|
||||
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const {
|
||||
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(),
|
||||
service::get_local_sstable_query_read_priority(), {});
|
||||
std::move(tr_state), service::get_local_sstable_query_read_priority(), {});
|
||||
}
|
||||
|
||||
future<row_locker::lock_holder>
|
||||
table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
||||
std::vector<sstables::shared_sstable>& excluded_sstables) const {
|
||||
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(excluded_sstables),
|
||||
service::get_local_streaming_read_priority(), query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
|
||||
tracing::trace_state_ptr(), service::get_local_streaming_read_priority(), query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
|
||||
}
|
||||
|
||||
mutation_source
|
||||
|
||||
@@ -64,7 +64,7 @@ SEASTAR_TEST_CASE(test_execute_batch) {
|
||||
auto version = netw::messaging_service::current_version;
|
||||
auto bm = bp.get_batch_log_mutation_for({ m }, s->id(), version, db_clock::now() - db_clock::duration(3h));
|
||||
|
||||
return qp.proxy().mutate_locally(bm).then([&bp] () mutable {
|
||||
return qp.proxy().mutate_locally(bm, tracing::trace_state_ptr()).then([&bp] () mutable {
|
||||
return bp.count_all_batches().then([](auto n) {
|
||||
BOOST_CHECK_EQUAL(n, 1);
|
||||
}).then([&bp] () mutable {
|
||||
|
||||
@@ -56,7 +56,7 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
|
||||
mutation m(s, pkey);
|
||||
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
|
||||
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
}
|
||||
|
||||
auto assert_query_result = [&] (size_t expected_size) {
|
||||
@@ -92,13 +92,13 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
||||
mutation m(s, pkey);
|
||||
m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now()));
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
}
|
||||
for (uint32_t i = 3; i <= 8; ++i) {
|
||||
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
||||
mutation m(s, pkey);
|
||||
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1);
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc
|
||||
service::get_local_migration_manager().announce_new_column_family(s, true).get();
|
||||
column_family& cf = e.local_db().find_column_family(s);
|
||||
for (auto&& m : partitions) {
|
||||
e.local_db().apply(cf.schema(), freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
e.local_db().apply(cf.schema(), freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
}
|
||||
cf.flush().get();
|
||||
cf.get_row_cache().invalidate([] {}).get();
|
||||
|
||||
@@ -281,7 +281,7 @@ int main(int argc, char** argv) {
|
||||
uint64_t i = 0;
|
||||
while (i < sstables) {
|
||||
auto m = gen();
|
||||
env.local_db().apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
env.local_db().apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
if (tab.active_memtable().occupancy().used_space() > sstable_size) {
|
||||
tab.flush().get();
|
||||
++i;
|
||||
|
||||
@@ -171,7 +171,7 @@ int main(int argc, char** argv) {
|
||||
auto&& col = *s->get_column_definition(to_bytes("v"));
|
||||
m.set_clustered_cell(ck, col, atomic_cell::make_live(*col.type, api::new_timestamp(), serialized(value)));
|
||||
auto t0 = clock::now();
|
||||
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
writes_hist.add(std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - t0).count());
|
||||
++mutations;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user