database, streaming, messaging: drop streaming memtables

Before Scylla 3.0, we used to send streaming mutations using
individual RPC requests and flush them together using dedicated
streaming memtables. This mechanism is no longer in use and all
versions that use it have long reached end-of-life.

Remove this code.
This commit is contained in:
Avi Kivity
2020-06-10 18:35:10 +03:00
committed by Tomasz Grabiec
parent b17d20b5f4
commit e5be3352cf
10 changed files with 7 additions and 397 deletions

View File

@@ -249,7 +249,7 @@
"MIGRATION_REQUEST",
"PREPARE_MESSAGE",
"PREPARE_DONE_MESSAGE",
"STREAM_MUTATION",
"UNUSED__STREAM_MUTATION",
"STREAM_MUTATION_DONE",
"COMPLETE_MESSAGE",
"REPAIR_CHECKSUM_RANGE",

View File

@@ -1612,20 +1612,6 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t
});
}
future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) {
if (!s->is_synced()) {
throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}",
s->ks_name(), s->cf_name(), s->version()));
}
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, fragmented, plan_id] () mutable {
return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, fragmented, s = std::move(s)] {
auto uuid = m.column_family_id();
auto& cf = find_column_family(uuid);
cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented);
}, db::no_timeout);
});
}
keyspace::config
database::make_keyspace_config(const keyspace_metadata& ksm) {
keyspace::config cfg;

View File

@@ -416,24 +416,6 @@ private:
lw_shared_ptr<memtable_list> _memtables;
// In older incarnations, we simply commited the mutations to memtables.
// However, doing that makes it harder for us to provide QoS within the
// disk subsystem. Keeping them in separate memtables allow us to properly
// classify those streams into its own I/O class
//
// We could write those directly to disk, but we still want the mutations
// coming through the wire to go to a memtable staging area. This has two
// major advantages:
//
// first, it will allow us to properly order the partitions. They are
// hopefuly sent in order but we can't really guarantee that without
// sacrificing sender-side parallelism.
//
// second, we will be able to coalesce writes from multiple plan_id's and
// even multiple senders, as well as automatically tapping into the dirty
// memory throttling mechanism, guaranteeing we will not overload the
// server.
lw_shared_ptr<memtable_list> _streaming_memtables;
utils::phased_barrier _streaming_flush_phaser;
// If mutations are fragmented during streaming the sstables cannot be made
@@ -448,21 +430,8 @@ private:
sstables::shared_sstable sstable;
};
struct streaming_memtable_big {
lw_shared_ptr<memtable_list> memtables;
std::vector<monitored_sstable> sstables;
seastar::gate flush_in_progress;
};
std::unordered_map<utils::UUID, lw_shared_ptr<streaming_memtable_big>> _streaming_memtables_big;
future<std::vector<monitored_sstable>> flush_streaming_big_mutations(utils::UUID plan_id);
void apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m);
future<> seal_active_streaming_memtable_big(streaming_memtable_big& smb, flush_permit&&);
lw_shared_ptr<memtable_list> make_memory_only_memtable_list();
lw_shared_ptr<memtable_list> make_memtable_list();
lw_shared_ptr<memtable_list> make_streaming_memtable_list();
lw_shared_ptr<memtable_list> make_streaming_memtable_big_list(streaming_memtable_big& smb);
sstables::compaction_strategy _compaction_strategy;
// generation -> sstable. Ordered by key so we can easily get the most recent.
@@ -590,7 +559,6 @@ private:
int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f);
void load_sstable(sstables::shared_sstable& sstable, bool reset_level = false);
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
// Caller must keep m alive.
future<> update_cache(lw_shared_ptr<memtable> m, sstables::shared_sstable sst);
@@ -812,7 +780,6 @@ public:
future<> stop();
future<> flush();
future<> flush_streaming_mutations(utils::UUID plan_id, dht::partition_range_vector ranges = dht::partition_range_vector{});
future<> fail_streaming_mutations(utils::UUID plan_id);
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
@@ -1050,28 +1017,6 @@ private:
// synchronously flush data to disk.
future<> seal_active_memtable(flush_permit&&);
// I am assuming here that the repair process will potentially send ranges containing
// few mutations, definitely not enough to fill a memtable. It wants to know whether or
// not each of those ranges individually succeeded or failed, so we need a future for
// each.
//
// One of the ways to fix that, is changing the repair itself to send more mutations at
// a single batch. But relying on that is a bad idea for two reasons:
//
// First, the goals of the SSTable writer and the repair sender are at odds. The SSTable
// writer wants to write as few SSTables as possible, while the repair sender wants to
// break down the range in pieces as small as it can and checksum them individually, so
// it doesn't have to send a lot of mutations for no reason.
//
// Second, even if the repair process wants to process larger ranges at once, some ranges
// themselves may be small. So while most ranges would be large, we would still have
// potentially some fairly small SSTables lying around.
//
// The best course of action in this case is to coalesce the incoming streams write-side.
// repair can now choose whatever strategy - small or big ranges - it wants, resting assure
// that the incoming memtables will be coalesced together.
future<> seal_active_streaming_memtable_immediate(flush_permit&&);
void check_valid_rp(const db::replay_position&) const;
public:
// Iterate over all partitions. Protocol is the same as std::all_of(),
@@ -1527,7 +1472,6 @@ public:
// Throws timed_out_error when timeout is reached.
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout);
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented);
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);
const sstring& get_snitch_name() const;

View File

@@ -476,7 +476,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
case messaging_verb::STREAM_MUTATION:
case messaging_verb::UNUSED__STREAM_MUTATION:
case messaging_verb::STREAM_MUTATION_DONE:
case messaging_verb::COMPLETE_MESSAGE:
case messaging_verb::REPLICATION_FINISHED:
@@ -953,15 +953,6 @@ future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id,
plan_id, dst_cpu_id);
}
// STREAM_MUTATION
void messaging_service::register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool> fragmented, rpc::optional<streaming::stream_reason> reason)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION, std::move(func));
}
future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented, streaming::stream_reason reason) {
return send_message<void>(this, messaging_verb::STREAM_MUTATION, id,
plan_id, std::move(fm), dst_cpu_id, fragmented, reason);
}
// STREAM_MUTATION_DONE
void messaging_service::register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo,
UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) {

View File

@@ -112,7 +112,7 @@ enum class messaging_verb : int32_t {
// Used by streaming
PREPARE_MESSAGE = 15,
PREPARE_DONE_MESSAGE = 16,
STREAM_MUTATION = 17,
UNUSED__STREAM_MUTATION = 17,
STREAM_MUTATION_DONE = 18,
COMPLETE_MESSAGE = 19,
// end of streaming verbs
@@ -302,10 +302,6 @@ public:
void register_prepare_done_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func);
future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id);
// Wrapper for STREAM_MUTATION verb
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool>, rpc::optional<streaming::stream_reason>)>&& func);
future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented, streaming::stream_reason reason);
// Wrapper for STREAM_MUTATION_FRAGMENTS
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use.
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);

View File

@@ -1866,18 +1866,6 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze
});
}
future<>
storage_proxy::mutate_streaming_mutation(const schema_ptr& s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) {
auto shard = _db.local().shard_of(m);
get_stats().replica_cross_shard_ops += shard != this_shard_id();
// In theory streaming writes should have their own smp_service_group, but this is only used during upgrades from old versions; new
// versions use rpc streaming.
return _db.invoke_on(shard, _write_smp_service_group, [&m, plan_id, fragmented, gs = global_schema_ptr(s)] (database& db) mutable -> future<> {
return db.apply_streaming_mutation(gs, plan_id, m, fragmented);
});
}
storage_proxy::response_id_type
storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr<mutation_holder> mh,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {

View File

@@ -481,7 +481,6 @@ public:
future<> mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_hint(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented);
/**
* Use this method to have these Mutations applied

View File

@@ -125,43 +125,6 @@ void stream_session::init_messaging_service_handler() {
return make_ready_future<>();
});
});
ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool> fragmented_opt, rpc::optional<stream_reason> reason_opt) {
auto from = netw::messaging_service::get_source(cinfo);
auto src_cpu_id = from.cpu_id;
auto fragmented = fragmented_opt && *fragmented_opt;
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
sslog.trace("Got stream_mutation from {} reason {}", from, int(reason));
return smp::submit_to(src_cpu_id % smp::count, [fm = std::move(fm), plan_id, from, fragmented] () mutable {
return do_with(std::move(fm), [plan_id, from, fragmented] (const auto& fm) {
auto fm_size = fm.representation().size();
get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, fm_size);
return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm, fragmented] (schema_ptr s) {
auto cf_id = fm.column_family_id();
sslog.debug("[Stream #{}] GOT STREAM_MUTATION from {}: cf_id={}", plan_id, from.addr, cf_id);
auto& db = service::get_local_storage_proxy().get_db().local();
if (!db.column_family_exists(cf_id)) {
sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped",
plan_id, from.addr, cf_id);
return make_ready_future<>();
}
return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), plan_id, fm, fragmented).then_wrapped([plan_id, cf_id, from] (auto&& f) {
try {
f.get();
return make_ready_future<>();
} catch (no_such_column_family&) {
sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped",
plan_id, from.addr, cf_id);
return make_ready_future<>();
} catch (...) {
throw;
}
return make_ready_future<>();
});
});
});
});
});
ms().register_stream_mutation_fragments([] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
auto from = netw::messaging_service::get_source(cinfo);
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
@@ -598,14 +561,7 @@ void stream_session::add_transfer_ranges(sstring keyspace, dht::token_range_vect
future<> stream_session::receiving_failed(UUID cf_id)
{
return get_db().invoke_on_all([cf_id, plan_id = plan_id()] (database& db) {
try {
auto& cf = db.find_column_family(cf_id);
return cf.fail_streaming_mutations(plan_id);
} catch (no_such_column_family&) {
return make_ready_future<>();
}
});
return make_ready_future<>();
}
void stream_session::close_session(stream_session_state final_state) {

View File

@@ -130,47 +130,6 @@ struct send_info {
}
};
future<stop_iteration> do_send_mutations(lw_shared_ptr<send_info> si, frozen_mutation fm, bool fragmented) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
// Do it in the background.
(void)netw::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented, si->reason).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
return stop_iteration::no;
});
}
future<> send_mutations(lw_shared_ptr<send_info> si) {
size_t fragment_size = default_frozen_fragment_size;
// Mutations cannot be sent fragmented if the receiving side doesn't support that.
if (!si->db.features().cluster_supports_large_partitions()) {
fragment_size = std::numeric_limits<size_t>::max();
}
return fragment_and_freeze(std::move(si->reader), [si] (auto fm, bool fragmented) {
if (!si->db.column_family_exists(si->cf_id)) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
si->mutations_nr++;
return do_send_mutations(si, std::move(fm), fragmented);
}, fragment_size).then([si] {
return si->mutations_done.wait(si->mutations_nr);
});
}
future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
return si->reader.peek(db::no_timeout).then([si] (mutation_fragment* mfp) {
if (!mfp) {
@@ -270,7 +229,7 @@ future<> stream_transfer_task::execute() {
if (si->db.features().cluster_supports_stream_with_rpc_stream()) {
return send_mutation_fragments(std::move(si));
} else {
return send_mutations(std::move(si));
throw std::runtime_error("cluster does not support STREAM_WITH_RPC_STREAM feature");
}
});
}).then([this, plan_id, cf_id, id] {

213
table.cc
View File

@@ -811,100 +811,6 @@ public:
}
};
future<>
table::seal_active_streaming_memtable_immediate(flush_permit&& permit) {
return with_scheduling_group(_config.streaming_scheduling_group, [this, permit = std::move(permit)] () mutable {
auto old = _streaming_memtables->back();
if (old->empty()) {
return make_ready_future<>();
}
_streaming_memtables->add_memtable();
_streaming_memtables->erase(old);
tlogger.debug("Sealing streaming memtable of {}.{}, partitions: {}, occupancy: {}", _schema->ks_name(), _schema->cf_name(), old->partition_count(), old->occupancy());
auto guard = _streaming_flush_phaser.start();
return with_gate(_streaming_flush_gate, [this, old, permit = std::move(permit)] () mutable {
auto newtab = make_sstable();
tlogger.debug("Flushing to {}", newtab->get_filename());
// This is somewhat similar to the main memtable flush, but with important differences.
//
// The first difference, is that we don't keep aggregate collectd statistics about this one.
// If we ever need to, we'll keep them separate statistics, but we don't want to polute the
// main stats about memtables with streaming memtables.
//
// Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the
// memtable list, since this memtable was not available for reading up until this point.
auto fp = permit.release_sstable_write_permit();
database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable {
auto&& priority = service::get_local_streaming_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
cfg.backup = incremental_backups_enabled();
return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] {
return newtab->open_data();
}).then([this, old, newtab] () {
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, newtab, old] {
auto adder = [this, newtab] {
add_sstable(newtab, {this_shard_id()});
try_trigger_compaction();
tlogger.debug("Flushing to {} done", newtab->get_filename());
};
if (cache_enabled()) {
return _cache.update_invalidating(adder, *old);
} else {
return _cache.invalidate(adder).then([old] { return old->clear_gently(); });
}
});
}).handle_exception([old, permit = std::move(permit), newtab] (auto ep) {
newtab->mark_for_deletion();
tlogger.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
});
});
// We will also not have any retry logic. If we fail here, we'll fail the streaming and let
// the upper layers know. They can then apply any logic they want here.
}).finally([guard = std::move(guard)] { });
});
}
future<> table::seal_active_streaming_memtable_big(streaming_memtable_big& smb, flush_permit&& permit) {
return with_scheduling_group(_config.streaming_scheduling_group, [this, &smb, permit = std::move(permit)] () mutable {
auto old = smb.memtables->back();
if (old->empty()) {
return make_ready_future<>();
}
smb.memtables->add_memtable();
smb.memtables->erase(old);
return with_gate(_streaming_flush_gate, [this, old, &smb, permit = std::move(permit)] () mutable {
return with_gate(smb.flush_in_progress, [this, old, &smb, permit = std::move(permit)] () mutable {
auto newtab = make_sstable();
auto fp = permit.release_sstable_write_permit();
auto monitor = std::make_unique<database_sstable_write_monitor>(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
auto&& priority = service::get_local_streaming_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
cfg.backup = incremental_backups_enabled();
cfg.leave_unsealed = true;
auto fut = write_memtable_to_sstable(*old, newtab, *monitor, cfg, priority);
return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable {
if (!f.failed()) {
smb.sstables.push_back(monitored_sstable{std::move(monitor), newtab});
return make_ready_future<>();
} else {
newtab->mark_for_deletion();
auto ep = f.get_exception();
tlogger.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
}
});
});
});
});
}
future<>
table::seal_active_memtable(flush_permit&& permit) {
auto old = _memtables->back();
@@ -1024,7 +930,7 @@ table::stop() {
}
return _async_gate.close().then([this] {
return when_all(await_pending_writes(), await_pending_reads(), await_pending_streams()).discard_result().finally([this] {
return when_all(_memtables->request_flush(), _streaming_memtables->request_flush()).discard_result().finally([this] {
return _memtables->request_flush().finally([this] {
return _compaction_manager.remove(this).then([this] {
// Nest, instead of using when_all, so we don't lose any exceptions.
return _streaming_flush_gate.close();
@@ -1473,24 +1379,6 @@ table::make_memtable_list() {
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _stats, _config.memory_compaction_scheduling_group);
}
lw_shared_ptr<memtable_list>
table::make_streaming_memtable_list() {
auto seal = [this] (flush_permit&& permit) {
return seal_active_streaming_memtable_immediate(std::move(permit));
};
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager, _stats, _config.streaming_scheduling_group);
}
lw_shared_ptr<memtable_list>
table::make_streaming_memtable_big_list(streaming_memtable_big& smb) {
auto seal = [this, &smb] (flush_permit&& permit) {
return seal_active_streaming_memtable_big(smb, std::move(permit));
};
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager, _stats, _config.streaming_scheduling_group);
}
table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager,
cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker)
: _schema(std::move(schema))
@@ -1500,7 +1388,6 @@ table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_man
column_family_label(_schema->cf_name())
)
, _memtables(_config.enable_disk_writes ? make_memtable_list() : make_memory_only_memtable_list())
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema)))
, _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes)
@@ -1563,14 +1450,6 @@ logalloc::occupancy_stats table::occupancy() const {
for (auto m : *_memtables) {
res += m->region().occupancy();
}
for (auto m : *_streaming_memtables) {
res += m->region().occupancy();
}
for (auto smb : _streaming_memtables_big) {
for (auto m : *smb.second->memtables) {
res += m->region().occupancy();
}
}
return res;
}
@@ -1812,63 +1691,9 @@ future<> table::flush() {
// be indiscriminately touching the cache during repair. We will just have to
// invalidate the entries that are relevant to things we already have in the cache.
future<> table::flush_streaming_mutations(utils::UUID plan_id, dht::partition_range_vector ranges) {
// This will effectively take the gate twice for this call. The proper way to fix that would
// be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we
// need this code to go away as soon as we can (see FIXME above). So the double gate is a better
// temporary counter measure.
tlogger.debug("Flushing streaming memtable, plan={}", plan_id);
return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] () mutable {
return flush_streaming_big_mutations(plan_id).then([this, ranges = std::move(ranges)] (auto sstables) mutable {
return _streaming_memtables->seal_active_memtable_delayed().then([this] {
return _streaming_flush_phaser.advance_and_await();
}).then([this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable {
if (sstables.empty()) {
return make_ready_future<>();
}
return _cache.invalidate([this, sstables = std::move(sstables)] () mutable noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
for (auto&& sst : sstables) {
// seal_active_streaming_memtable_big() ensures sst is unshared.
this->add_sstable(sst.sstable, {this_shard_id()});
}
this->try_trigger_compaction();
}, std::move(ranges));
});
});
});
}
future<std::vector<table::monitored_sstable>> table::flush_streaming_big_mutations(utils::UUID plan_id) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
return make_ready_future<std::vector<monitored_sstable>>(std::vector<monitored_sstable>());
}
auto entry = it->second;
_streaming_memtables_big.erase(it);
return entry->memtables->request_flush().then([entry] {
return entry->flush_in_progress.close();
}).then([this, entry] {
return parallel_for_each(entry->sstables, [this] (auto& sst) {
return sst.sstable->seal_sstable(this->incremental_backups_enabled()).then([&sst] {
return sst.sstable->open_data();
});
}).then([this, entry] {
return std::move(entry->sstables);
});
});
}
future<> table::fail_streaming_mutations(utils::UUID plan_id) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
return make_ready_future<>();
}
auto entry = it->second;
_streaming_memtables_big.erase(it);
return entry->flush_in_progress.close().then([this, entry] {
for (auto&& sst : entry->sstables) {
sst.sstable->mark_for_deletion();
}
return _streaming_flush_phaser.advance_and_await();
});
}
@@ -1878,9 +1703,6 @@ future<> table::clear() {
}
_memtables->clear();
_memtables->add_memtable();
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
_streaming_memtables_big.clear();
return _cache.invalidate([] { /* There is no underlying mutation source */ });
}
@@ -1942,16 +1764,6 @@ void table::set_schema(schema_ptr s) {
m->set_schema(s);
}
for (auto& m : *_streaming_memtables) {
m->set_schema(s);
}
for (auto smb : _streaming_memtables_big) {
for (auto m : *smb.second->memtables) {
m->set_schema(s);
}
}
_cache.set_schema(s);
if (_counter_cell_locks) {
_counter_cell_locks->set_schema(s);
@@ -2199,27 +2011,6 @@ void table::drop_hit_rate(gms::inet_address addr) {
_cluster_cache_hit_rates.erase(addr);
}
void table::apply_streaming_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) {
if (tlogger.is_enabled(logging::log_level::trace)) {
tlogger.trace("streaming apply {}", m.pretty_printer(m_schema));
}
if (fragmented) {
apply_streaming_big_mutation(std::move(m_schema), plan_id, m);
return;
}
_streaming_memtables->active_memtable().apply(m, m_schema);
}
void table::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
it = _streaming_memtables_big.emplace(plan_id, make_lw_shared<streaming_memtable_big>()).first;
it->second->memtables = _config.enable_disk_writes ? make_streaming_memtable_big_list(*it->second) : make_memory_only_memtable_list();
}
auto entry = it->second;
entry->memtables->active_memtable().apply(m, m_schema);
}
void
table::check_valid_rp(const db::replay_position& rp) const {
if (rp != db::replay_position() && rp < _lowest_allowed_rp) {