Merge 'Propagate tracing to materialized view update path' from Piotr S

In order to improve materialized views' debuggability, tracing points are added to view update generation path.

Example trace:
```
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
                                                                                                                                                               Execute CQL3 query | 2020-04-27 13:13:46.834000 | 127.0.0.1 |              0 | 127.0.0.1
                                                                                                                                                    Parsing a statement [shard 0] | 2020-04-27 13:13:46.834346 | 127.0.0.1 |              1 | 127.0.0.1
                                                                                                                                                 Processing a statement [shard 0] | 2020-04-27 13:13:46.834426 | 127.0.0.1 |             80 | 127.0.0.1
                                                                     Creating write handler for token: -3248873570005575792 natural: {127.0.0.1, 127.0.0.3} pending: {} [shard 0] | 2020-04-27 13:13:46.834494 | 127.0.0.1 |            148 | 127.0.0.1
                                                                                                      Creating write handler with live: {127.0.0.3, 127.0.0.1} dead: {} [shard 0] | 2020-04-27 13:13:46.834507 | 127.0.0.1 |            161 | 127.0.0.1
                                                                                                                                       Sending a mutation to /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.834519 | 127.0.0.1 |            173 | 127.0.0.1
                                                                                                                                           Executing a mutation locally [shard 0] | 2020-04-27 13:13:46.834532 | 127.0.0.1 |            186 | 127.0.0.1
                                                                                         View updates for ks.t require read-before-write - base table reader is created [shard 0] | 2020-04-27 13:13:46.834570 | 127.0.0.1 |            224 | 127.0.0.1
        Reading key {{-3248873570005575792, pk{000400000002}}} from sstable /home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Data.db [shard 0] | 2020-04-27 13:13:46.834608 | 127.0.0.1 |            262 | 127.0.0.1
                           /home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Index.db: scheduling bulk DMA read of size 8 at offset 0 [shard 0] | 2020-04-27 13:13:46.834635 | 127.0.0.1 |            289 | 127.0.0.1
  /home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Index.db: finished bulk DMA read of size 8 at offset 0, successfully read 8 bytes [shard 0] | 2020-04-27 13:13:46.834975 | 127.0.0.1 |            629 | 127.0.0.1
                                                                                                                                       Message received from /127.0.0.1 [shard 0] | 2020-04-27 13:13:46.834988 | 127.0.0.3 |             11 | 127.0.0.1
                           /home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Data.db: scheduling bulk DMA read of size 41 at offset 0 [shard 0] | 2020-04-27 13:13:46.835015 | 127.0.0.1 |            669 | 127.0.0.1
                                                                                         View updates for ks.t require read-before-write - base table reader is created [shard 0] | 2020-04-27 13:13:46.835020 | 127.0.0.3 |             44 | 127.0.0.1
                                                                                                                                      Generated 1 view update mutations [shard 0] | 2020-04-27 13:13:46.835080 | 127.0.0.3 |            104 | 127.0.0.1
               Sending view update for ks.t_v2_idx_index to 127.0.0.2, with pending endpoints = {}; base token = -3248873570005575792; view token = 3728482343045213994 [shard 0] | 2020-04-27 13:13:46.835095 | 127.0.0.3 |            119 | 127.0.0.1
                                                                                                                                       Sending a mutation to /127.0.0.2 [shard 0] | 2020-04-27 13:13:46.835105 | 127.0.0.3 |            129 | 127.0.0.1
                                                                                                                    View updates for ks.t were generated and propagated [shard 0] | 2020-04-27 13:13:46.835117 | 127.0.0.3 |            141 | 127.0.0.1
 /home/sarna/.ccm/scylla-1/node1/data/ks/t-162ef290887811eaa4bf000000000000/mc-1-big-Data.db: finished bulk DMA read of size 41 at offset 0, successfully read 41 bytes [shard 0] | 2020-04-27 13:13:46.835160 | 127.0.0.1 |            813 | 127.0.0.1
                                                                                                                                    Sending mutation_done to /127.0.0.1 [shard 0] | 2020-04-27 13:13:46.835164 | 127.0.0.3 |            188 | 127.0.0.1
                                                                                                                                              Mutation handling is done [shard 0] | 2020-04-27 13:13:46.835177 | 127.0.0.3 |            201 | 127.0.0.1
                                                                                                                                      Generated 1 view update mutations [shard 0] | 2020-04-27 13:13:46.835215 | 127.0.0.1 |            869 | 127.0.0.1
                                                Locally applying view update for ks.t_v2_idx_index; base token = -3248873570005575792; view token = 3728482343045213994 [shard 0] | 2020-04-27 13:13:46.835226 | 127.0.0.1 |            880 | 127.0.0.1
                                                                                            Successfully applied local view update for 127.0.0.1 and 0 remote endpoints [shard 0] | 2020-04-27 13:13:46.835253 | 127.0.0.1 |            907 | 127.0.0.1
                                                                                                                    View updates for ks.t were generated and propagated [shard 0] | 2020-04-27 13:13:46.835256 | 127.0.0.1 |            910 | 127.0.0.1
                                                                                                                                         Got a response from /127.0.0.1 [shard 0] | 2020-04-27 13:13:46.835274 | 127.0.0.1 |            928 | 127.0.0.1
                                                                                                           Delay decision due to throttling: do not delay, resuming now [shard 0] | 2020-04-27 13:13:46.835276 | 127.0.0.1 |            930 | 127.0.0.1
                                                                                                                                        Mutation successfully completed [shard 0] | 2020-04-27 13:13:46.835279 | 127.0.0.1 |            933 | 127.0.0.1
                                                                                                                                   Done processing - preparing a result [shard 0] | 2020-04-27 13:13:46.835286 | 127.0.0.1 |            941 | 127.0.0.1
                                                                                                                                       Message received from /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.835331 | 127.0.0.2 |             14 | 127.0.0.1
                                                                                                                                    Sending mutation_done to /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.835399 | 127.0.0.2 |             82 | 127.0.0.1
                                                                                                                                              Mutation handling is done [shard 0] | 2020-04-27 13:13:46.835413 | 127.0.0.2 |             96 | 127.0.0.1
                                                                                                                                         Got a response from /127.0.0.2 [shard 0] | 2020-04-27 13:13:46.835639 | 127.0.0.3 |            662 | 127.0.0.1
                                                                                                           Delay decision due to throttling: do not delay, resuming now [shard 0] | 2020-04-27 13:13:46.835640 | 127.0.0.3 |            664 | 127.0.0.1
                                                                                                  Successfully applied view update for 127.0.0.2 and 1 remote endpoints [shard 0] | 2020-04-27 13:13:46.835649 | 127.0.0.3 |            673 | 127.0.0.1
                                                                                                                                         Got a response from /127.0.0.3 [shard 0] | 2020-04-27 13:13:46.835841 | 127.0.0.1 |           1495 | 127.0.0.1
                                                                                                                                                                 Request complete | 2020-04-27 13:13:46.834944 | 127.0.0.1 |            944 | 127.0.0.1
```

Fixes #6175
Tests: unit(dev), manual

* psarna-propagate_tracing_to_more_write_paths:
  db,view: add tracing to view update generation path
  treewide: propagate trace state to write path
This commit is contained in:
Avi Kivity
2020-05-25 15:49:03 +03:00
15 changed files with 112 additions and 78 deletions

View File

@@ -1516,7 +1516,7 @@ future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::
return apply_in_memory(m, std::move(s), {}, timeout);
}
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
// I'm doing a nullcheck here since the init code path for db etc
// is a little in flux and commitlog is created only when db is
// initied from datadir.
@@ -1535,7 +1535,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_
if (cf.views().empty()) {
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally([op = std::move(op)] { });
}
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m, timeout);
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m, timeout, std::move(tr_state));
return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op), sync] (row_locker::lock_holder lock) mutable {
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally(
// Hold the local lock on the base-table partition or row
@@ -1568,7 +1568,7 @@ void database::update_write_metrics_for_timed_out_write() {
++_stats->total_writes_timedout;
}
future<> database::apply(schema_ptr s, const frozen_mutation& m, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout) {
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout) {
if (dblog.is_enabled(logging::log_level::trace)) {
dblog.trace("apply {}", m.pretty_printer(s));
}
@@ -1576,15 +1576,15 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, db::commitlog::
update_write_metrics_for_timed_out_write();
return make_exception_future<>(timed_out_error{});
}
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), timeout, sync));
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync));
}
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, db::timeout_clock::time_point timeout) {
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) {
if (dblog.is_enabled(logging::log_level::trace)) {
dblog.trace("apply hint {}", m.pretty_printer(s));
}
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, timeout] () mutable {
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), timeout, db::commitlog::force_sync::no));
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable {
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no));
});
}

View File

@@ -996,8 +996,8 @@ public:
void remove_view(view_ptr v);
void clear_views();
const std::vector<view_ptr>& views() const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const;
future<row_locker::lock_holder>
stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
std::vector<sstables::shared_sstable>& excluded_sstables) const;
@@ -1029,12 +1029,13 @@ public:
private:
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source,
const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
tracing::trace_state_ptr tr_state, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
future<> generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
mutation&& m,
flat_mutation_reader_opt existings) const;
flat_mutation_reader_opt existings,
tracing::trace_state_ptr tr_state) const;
mutable row_locker _row_locker;
future<row_locker::lock_holder> local_base_lock(
@@ -1364,6 +1365,7 @@ private:
database*,
schema_ptr,
const frozen_mutation&,
tracing::trace_state_ptr,
db::timeout_clock::time_point,
db::commitlog::force_sync> _apply_stage;
@@ -1412,7 +1414,7 @@ private:
void setup_metrics();
friend class db_apply_executor;
future<> do_apply(schema_ptr, const frozen_mutation&, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync);
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync);
future<> apply_with_commitlog(schema_ptr, column_family&, utils::UUID, const frozen_mutation&, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync);
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
@@ -1514,8 +1516,8 @@ public:
db::timeout_clock::time_point timeout);
// Apply the mutation atomically.
// Throws timed_out_error when timeout is reached.
future<> apply(schema_ptr, const frozen_mutation&, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout);
future<> apply_hint(schema_ptr, const frozen_mutation&, db::timeout_clock::time_point timeout);
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout);
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented);
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);

View File

@@ -290,7 +290,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
mutation m(schema, key);
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
return _qp.proxy().mutate_locally(m);
return _qp.proxy().mutate_locally(m, tracing::trace_state_ptr());
});
};

View File

@@ -594,7 +594,7 @@ public:
db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations);
}
}
return _qp.proxy().mutate_locally(std::move(mutations));
return _qp.proxy().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
}
future<> flush_schemas() {

View File

@@ -224,7 +224,7 @@ future<> save_system_schema(const sstring & ksname) {
deletion_timestamp), ksm->name()).discard_result();
}).then([ksm] {
auto mvec = make_create_keyspace_mutations(ksm, schema_creation_timestamp(), true);
return qctx->proxy().mutate_locally(std::move(mvec));
return qctx->proxy().mutate_locally(std::move(mvec), tracing::trace_state_ptr());
});
}
@@ -926,7 +926,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
#endif
proxy.local().mutate_locally(std::move(mutations)).get0();
proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()).get0();
if (do_flush) {
proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) {

View File

@@ -1072,6 +1072,7 @@ future<> mutate_MV(
std::vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all)
@@ -1083,7 +1084,7 @@ future<> mutate_MV(
auto& keyspace_name = mut.s->ks_name();
auto paired_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
auto pending_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name);
auto maybe_account_failure = [&stats, &cf_stats, units = pending_view_updates.split(mut.fm.representation().size())] (
auto maybe_account_failure = [tr_state, &stats, &cf_stats, units = pending_view_updates.split(mut.fm.representation().size())] (
future<>&& f,
gms::inet_address target,
bool is_local,
@@ -1094,9 +1095,13 @@ future<> mutate_MV(
cf_stats.total_view_updates_failed_local += is_local;
cf_stats.total_view_updates_failed_remote += remotes;
auto ep = f.get_exception();
tracing::trace(tr_state, "Failed to apply {}view update for {} and {} remote endpoints",
seastar::value_of([is_local]{return is_local ? "local " : "";}), target, remotes);
vlogger.error("Error applying view update to {}: {}", target, ep);
return make_exception_future<>(std::move(ep));
} else {
tracing::trace(tr_state, "Successfully applied {}view update for {} and {} remote endpoints",
seastar::value_of([is_local]{return is_local ? "local " : "";}), target, remotes);
return make_ready_future<>();
}
};
@@ -1125,7 +1130,9 @@ future<> mutate_MV(
// writes but mutate_locally() doesn't, so we need to do that here.
++stats.writes;
auto mut_ptr = std::make_unique<frozen_mutation>(std::move(mut.fm));
future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped(
tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}",
mut.s->ks_name(), mut.s->cf_name(), base_token, view_token);
future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, std::move(tr_state), db::commitlog::force_sync::no).then_wrapped(
[&stats,
maybe_account_failure = std::move(maybe_account_failure),
mut_ptr = std::move(mut_ptr)] (future<>&& f) {
@@ -1142,11 +1149,14 @@ future<> mutate_MV(
// to send the update there. Currently, we do this from *each* of
// the base replicas, but this is probably excessive - see
// See https://issues.apache.org/jira/browse/CASSANDRA-14262/
tracing::trace(tr_state, "Sending view update for {}.{} to {}, with pending endpoints = {}; base token = {}; view token = {}",
mut.s->ks_name(), mut.s->cf_name(), *paired_endpoint, pending_endpoints, base_token, view_token);
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
std::move(mut),
*paired_endpoint,
std::move(pending_endpoints),
db::write_type::VIEW,
std::move(tr_state),
stats,
allow_hints).then_wrapped(
[paired_endpoint,
@@ -1178,11 +1188,14 @@ future<> mutate_MV(
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
auto target = pending_endpoints.back();
pending_endpoints.pop_back();
tracing::trace(tr_state, "Sending view update for {}.{} to {}, with pending endpoints = {}; base token = {}; view token = {}",
mut.s->ks_name(), mut.s->cf_name(), target, pending_endpoints, base_token, view_token);
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
std::move(mut),
target,
std::move(pending_endpoints),
db::write_type::VIEW,
std::move(tr_state),
allow_hints).then_wrapped(
[target,
updates_pushed_remote,

View File

@@ -109,6 +109,7 @@ future<> mutate_MV(
std::vector<frozen_mutation_and_schema> view_updates,
db::view::stats& stats,
cf_stats& cf_stats,
tracing::trace_state_ptr tr_state,
db::timeout_semaphore_units pending_view_updates,
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all);

View File

@@ -180,7 +180,7 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
f = f.then([schema, &decision, timeout, tr_state] {
logger.debug("Committing decision {}", decision);
tracing::trace(tr_state, "Committing decision {}", decision);
return get_local_storage_proxy().mutate_locally(schema, decision.update, db::commitlog::force_sync::yes, timeout);
return get_local_storage_proxy().mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
});
} else {
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);

View File

@@ -205,7 +205,7 @@ public:
auto m = _mutations[utils::fb_utilities::get_broadcast_address()];
if (m) {
tracing::trace(tr_state, "Executing a mutation locally");
return sp.mutate_locally(_schema, *m, db::commitlog::force_sync::no, timeout);
return sp.mutate_locally(_schema, *m, std::move(tr_state), db::commitlog::force_sync::no, timeout);
}
return make_ready_future<>();
}
@@ -256,7 +256,7 @@ public:
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Executing a mutation locally");
return sp.mutate_locally(_schema, *_mutation, db::commitlog::force_sync::no, timeout);
return sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
@@ -286,7 +286,7 @@ public:
tracing::trace_state_ptr tr_state) override {
// A hint will be sent to all relevant endpoints when the endpoint it was originally intended for
// becomes unavailable - this might include the current node
return sp.mutate_hint(_schema, *_mutation, timeout);
return sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
@@ -1783,38 +1783,40 @@ storage_proxy::response_id_type storage_proxy::unique_response_handler::release(
}
future<>
storage_proxy::mutate_locally(const mutation& m, clock_type::time_point timeout) {
storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) {
auto shard = _db.local().shard_of(m);
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [s = global_schema_ptr(m.schema()), m = freeze(m), timeout] (database& db) -> future<> {
return db.apply(s, m, db::commitlog::force_sync::no, timeout);
return _db.invoke_on(shard, {_write_smp_service_group, timeout},
[s = global_schema_ptr(m.schema()), m = freeze(m), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout] (database& db) mutable -> future<> {
return db.apply(s, m, gtr.get(), db::commitlog::force_sync::no, timeout);
});
}
future<>
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, db::commitlog::force_sync sync, clock_type::time_point timeout) {
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout) {
auto shard = _db.local().shard_of(m);
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), timeout, sync] (database& db) -> future<> {
return db.apply(gs, m, sync, timeout);
return _db.invoke_on(shard, {_write_smp_service_group, timeout},
[&m, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync] (database& db) mutable -> future<> {
return db.apply(gs, m, gtr.get(), sync, timeout);
});
}
future<>
storage_proxy::mutate_locally(std::vector<mutation> mutations, clock_type::time_point timeout) {
return do_with(std::move(mutations), [this, timeout] (std::vector<mutation>& pmut){
return parallel_for_each(pmut.begin(), pmut.end(), [this, timeout] (const mutation& m) {
return mutate_locally(m, timeout);
storage_proxy::mutate_locally(std::vector<mutation> mutations, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) {
return do_with(std::move(mutations), [this, timeout, tr_state = std::move(tr_state)] (std::vector<mutation>& pmut) mutable {
return parallel_for_each(pmut.begin(), pmut.end(), [this, tr_state = std::move(tr_state), timeout] (const mutation& m) mutable {
return mutate_locally(m, tr_state, timeout);
});
});
}
future<>
storage_proxy::mutate_hint(const schema_ptr& s, const frozen_mutation& m, clock_type::time_point timeout) {
storage_proxy::mutate_hint(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout) {
auto shard = _db.local().shard_of(m);
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), timeout] (database& db) -> future<> {
return db.apply_hint(gs, m, timeout);
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&m, gs = global_schema_ptr(s), tr_state = std::move(tr_state), timeout] (database& db) mutable -> future<> {
return db.apply_hint(gs, m, std::move(tr_state), timeout);
});
}
@@ -2017,7 +2019,7 @@ future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutat
}
future<> storage_proxy::mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl,
std::optional<clock_type::time_point> timeout_opt) {
tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt) {
return parallel_for_each(ids, [this, cl, timeout_opt] (unique_response_handler& protected_response) {
auto response_id = protected_response.id;
// This function, mutate_begin(), is called after a preemption point
@@ -2290,9 +2292,10 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool c
utils::latency_counter lc;
lc.start();
return mutate_prepare(mutations, cl, type, tr_state, std::move(permit)).then([this, cl, timeout_opt, tracker = std::move(cdc_tracker)] (std::vector<storage_proxy::unique_response_handler> ids) {
return mutate_prepare(mutations, cl, type, tr_state, std::move(permit)).then([this, cl, timeout_opt, tracker = std::move(cdc_tracker),
tr_state] (std::vector<storage_proxy::unique_response_handler> ids) mutable {
register_cdc_operation_result_tracker(ids, tracker);
return mutate_begin(std::move(ids), cl, timeout_opt);
return mutate_begin(std::move(ids), cl, tr_state, timeout_opt);
}).then_wrapped([this, p = shared_from_this(), lc, tr_state] (future<> f) mutable {
return p->mutate_end(std::move(f), lc, get_stats(), std::move(tr_state));
});
@@ -2375,7 +2378,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit));
}).then([this, cl] (std::vector<unique_response_handler> ids) {
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
return _p.mutate_begin(std::move(ids), cl, _timeout);
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
});
}
future<> sync_write_to_batchlog() {
@@ -2402,7 +2405,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
return sync_write_to_batchlog().then([this, ids = std::move(ids)] () mutable {
tracing::trace(_trace_state, "Sending batch mutations");
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
return _p.mutate_begin(std::move(ids), _cl, _timeout);
return _p.mutate_begin(std::move(ids), _cl, _trace_state, _timeout);
}).then(std::bind(&context::async_remove_from_batchlog, this));
});
}
@@ -2445,6 +2448,7 @@ future<> storage_proxy::send_to_endpoint(
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints) {
utils::latency_counter lc;
@@ -2458,7 +2462,7 @@ future<> storage_proxy::send_to_endpoint(
timeout = clock_type::now() + 5min;
}
return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(),
[this, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
[this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
std::unique_ptr<mutation_holder>& m,
db::consistency_level cl,
db::write_type type, service_permit permit) mutable {
@@ -2481,11 +2485,11 @@ future<> storage_proxy::send_to_endpoint(
std::move(targets),
pending_endpoints,
std::move(dead_endpoints),
nullptr,
tr_state,
stats,
std::move(permit));
}).then([this, cl, timeout = std::move(timeout)] (std::vector<unique_response_handler> ids) mutable {
return mutate_begin(std::move(ids), cl, std::move(timeout));
}).then([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (std::vector<unique_response_handler> ids) mutable {
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
}).then_wrapped([p = shared_from_this(), lc, &stats] (future<>&& f) {
return p->mutate_end(std::move(f), lc, stats, nullptr);
});
@@ -2496,12 +2500,14 @@ future<> storage_proxy::send_to_endpoint(
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
allow_hints allow_hints) {
return send_to_endpoint(
std::make_unique<shared_mutation>(std::move(fm_a_s)),
std::move(target),
std::move(pending_endpoints),
type,
std::move(tr_state),
get_stats(),
allow_hints);
}
@@ -2511,6 +2517,7 @@ future<> storage_proxy::send_to_endpoint(
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints) {
return send_to_endpoint(
@@ -2518,6 +2525,7 @@ future<> storage_proxy::send_to_endpoint(
std::move(target),
std::move(pending_endpoints),
type,
std::move(tr_state),
stats,
allow_hints);
}
@@ -2529,6 +2537,7 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
std::move(target),
{ },
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),
allow_hints::no);
}
@@ -2538,6 +2547,7 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
std::move(target),
{ },
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),
allow_hints::no);
}
@@ -4827,9 +4837,9 @@ void storage_proxy::init_messaging_service() {
utils::UUID schema_version = in.schema_version();
return handle_write(src_addr, t, schema_version, std::move(in), std::move(forward), reply_to, shard, response_id,
trace_info ? *trace_info : std::nullopt,
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr, schema_ptr s, const frozen_mutation& m,
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s, const frozen_mutation& m,
clock_type::time_point timeout) {
return p->mutate_locally(std::move(s), m, db::commitlog::force_sync::no, timeout);
return p->mutate_locally(std::move(s), m, std::move(tr_state), db::commitlog::force_sync::no, timeout);
},
/* forward_fn */ [] (netw::messaging_service::msg_addr addr, clock_type::time_point timeout, const frozen_mutation& m,
gms::inet_address reply_to, unsigned shard, response_id_type response_id,

View File

@@ -386,7 +386,7 @@ private:
future<std::vector<unique_response_handler>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler handler);
template<typename Range>
future<std::vector<unique_response_handler>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
future<> mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl, std::optional<clock_type::time_point> timeout_opt = { });
future<> mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
future<> mutate_end(future<> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
future<> schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
bool need_throttle_writes() const;
@@ -412,6 +412,7 @@ private:
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
allow_hints allow_hints = allow_hints::yes);
@@ -456,15 +457,15 @@ public:
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const mutation& m, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max());
// Applies mutations on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(std::vector<mutation> mutation, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_hint(const schema_ptr&, const frozen_mutation& m, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_hint(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented);
/**
@@ -506,8 +507,10 @@ public:
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
// hinted handoff support, and just one target. See also
// send_to_live_endpoints() - another take on the same original function.
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type, allow_hints allow_hints = allow_hints::yes);
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes);
// Send a mutation to a specific remote target as a hint.
// Unlike regular mutations during write operations, hints are sent on the streaming connection

View File

@@ -2069,15 +2069,17 @@ static size_t memory_usage_of(const std::vector<frozen_mutation_and_schema>& ms)
future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
mutation&& m,
flat_mutation_reader_opt existings) const {
flat_mutation_reader_opt existings,
tracing::trace_state_ptr tr_state) const {
auto base_token = m.token();
return db::view::generate_view_updates(
base,
std::move(views),
flat_mutation_reader_from_mutations({std::move(m)}),
std::move(existings)).then([this, base_token = std::move(base_token)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
std::move(existings)).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
tracing::trace(tr_state, "Generated {} view update mutations", updates.size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats,
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(tr_state),
std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no).handle_exception([] (auto ignored) { });
});
}
@@ -2192,8 +2194,8 @@ future<> table::populate_views(
units_to_consume = update_size - units_to_wait_for,
this] (db::timeout_semaphore_units&& units) mutable {
units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, units_to_consume));
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats,
*_config.cf_stats, std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats,
tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
});
});
}
@@ -2518,14 +2520,14 @@ future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable>
* Given an update for the base table, calculates the set of potentially affected views,
* generates the relevant updates, and sends them to the paired view replicas.
*/
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const {
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const {
//FIXME: Avoid unfreezing here.
auto m = fm.unfreeze(s);
return push_view_replica_updates(s, std::move(m), timeout);
return push_view_replica_updates(s, std::move(m), timeout, std::move(tr_state));
}
future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source,
const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const {
tracing::trace_state_ptr tr_state, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const {
if (!_config.view_update_concurrency_semaphore->current()) {
// We don't have resources to generate view updates for this write. If we reached this point, we failed to
// throttle the client. The memory queue is already full, waiting on the semaphore would cause this node to
@@ -2543,7 +2545,8 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
}
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
if (cr_ranges.empty()) {
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }).then([] {
tracing::trace(tr_state, "View updates do not require read-before-write");
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }, std::move(tr_state)).then([] {
// In this case we are not doing a read-before-write, just a
// write, so no lock is needed.
return make_ready_future<row_locker::lock_holder>();
@@ -2566,14 +2569,16 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
// We'll return this lock to the caller, which will release it after
// writing the base-table update.
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, source = std::move(source), &io_priority] (row_locker::lock_holder lock) {
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, source = std::move(source), tr_state = std::move(tr_state), &io_priority] (row_locker::lock_holder lock) mutable {
tracing::trace(tr_state, "View updates for {}.{} require read-before-write - base table reader is created", base->ks_name(), base->cf_name());
return do_with(
dht::partition_range::make_singular(m.decorated_key()),
std::move(slice),
std::move(m),
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
auto reader = source.make_reader(base, no_reader_permit(), pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority, tr_state = std::move(tr_state)] (auto& pk, auto& slice, auto& m) mutable {
auto reader = source.make_reader(base, no_reader_permit(), pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader), tr_state).then([base, tr_state = std::move(tr_state), lock = std::move(lock)] () mutable {
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this
// partition/row and destroys the lock object.
@@ -2583,16 +2588,16 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
});
}
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const {
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state) const {
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(),
service::get_local_sstable_query_read_priority(), {});
std::move(tr_state), service::get_local_sstable_query_read_priority(), {});
}
future<row_locker::lock_holder>
table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
std::vector<sstables::shared_sstable>& excluded_sstables) const {
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(excluded_sstables),
service::get_local_streaming_read_priority(), query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
tracing::trace_state_ptr(), service::get_local_streaming_read_priority(), query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
}
mutation_source

View File

@@ -64,7 +64,7 @@ SEASTAR_TEST_CASE(test_execute_batch) {
auto version = netw::messaging_service::current_version;
auto bm = bp.get_batch_log_mutation_for({ m }, s->id(), version, db_clock::now() - db_clock::duration(3h));
return qp.proxy().mutate_locally(bm).then([&bp] () mutable {
return qp.proxy().mutate_locally(bm, tracing::trace_state_ptr()).then([&bp] () mutable {
return bp.count_all_batches().then([](auto n) {
BOOST_CHECK_EQUAL(n, 1);
}).then([&bp] () mutable {

View File

@@ -56,7 +56,7 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
}
auto assert_query_result = [&] (size_t expected_size) {
@@ -92,13 +92,13 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
mutation m(s, pkey);
m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now()));
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
}
for (uint32_t i = 3; i <= 8; ++i) {
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
mutation m(s, pkey);
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1);
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
pranges.emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
}
@@ -138,7 +138,7 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc
service::get_local_migration_manager().announce_new_column_family(s, true).get();
column_family& cf = e.local_db().find_column_family(s);
for (auto&& m : partitions) {
e.local_db().apply(cf.schema(), freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
e.local_db().apply(cf.schema(), freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
}
cf.flush().get();
cf.get_row_cache().invalidate([] {}).get();

View File

@@ -281,7 +281,7 @@ int main(int argc, char** argv) {
uint64_t i = 0;
while (i < sstables) {
auto m = gen();
env.local_db().apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
env.local_db().apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
if (tab.active_memtable().occupancy().used_space() > sstable_size) {
tab.flush().get();
++i;

View File

@@ -171,7 +171,7 @@ int main(int argc, char** argv) {
auto&& col = *s->get_column_definition(to_bytes("v"));
m.set_clustered_cell(ck, col, atomic_cell::make_live(*col.type, api::new_timestamp(), serialized(value)));
auto t0 = clock::now();
db.apply(s, freeze(m), db::commitlog::force_sync::no, db::no_timeout).get();
db.apply(s, freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
writes_hist.add(std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - t0).count());
++mutations;
}