priority_manager: merge streaming_read and streaming_write classes into one class
Streaming is handled by just once group for CPU scheduling, so
separating it into read and write classes for I/O is artificial, and
inflates the resources we allow for streaming if both reads and writes
happen at the same time.
Merge both classes into one class ("streaming") and adjust callers. The
merged class has 200 shares, so it reduces streaming bandwidth if both
directions are active at the same time (which is rare; I think it only
happens in view building).
This commit is contained in:
@@ -2077,7 +2077,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
auto&& full_slice = schema->full_slice();
|
||||
auto& cf = db.local().find_column_family(schema);
|
||||
return make_flat_multi_range_reader(std::move(schema), cf.streaming_read_concurrency_semaphore().make_permit(), std::move(ms),
|
||||
std::move(range_generator), std::move(full_slice), service::get_local_streaming_read_priority(), {}, mutation_reader::forwarding::no);
|
||||
std::move(range_generator), std::move(full_slice), service::get_local_streaming_priority(), {}, mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, gc_clock::time_point tp) {
|
||||
|
||||
@@ -741,7 +741,7 @@ bool manager::end_point_hints_manager::sender::send_one_file(const sstring& fnam
|
||||
lw_shared_ptr<send_one_file_ctx> ctx_ptr = make_lw_shared<send_one_file_ctx>(_last_schema_ver_to_column_mapping);
|
||||
|
||||
try {
|
||||
commitlog::read_log_file(fname, manager::FILENAME_PREFIX, service::get_local_streaming_read_priority(), [this, secs_since_file_mod, &fname, ctx_ptr] (commitlog::buffer_and_replay_position buf_rp) mutable {
|
||||
commitlog::read_log_file(fname, manager::FILENAME_PREFIX, service::get_local_streaming_priority(), [this, secs_since_file_mod, &fname, ctx_ptr] (commitlog::buffer_and_replay_position buf_rp) mutable {
|
||||
auto&& [buf, rp] = buf_rp;
|
||||
// Check that we can still send the next hint. Don't try to send it if the destination host
|
||||
// is DOWN or if we have already failed to send some of the previous hints.
|
||||
|
||||
@@ -64,7 +64,7 @@ future<> view_update_generator::start() {
|
||||
std::move(ssts),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
service::get_local_streaming_read_priority(),
|
||||
service::get_local_streaming_priority(),
|
||||
nullptr,
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
@@ -170,7 +170,7 @@ distributed_loader::process_sstable_dir(sharded<sstables::sstable_directory>& di
|
||||
return dir.invoke_on_all([&dir] (sstables::sstable_directory& d) {
|
||||
// Supposed to be called with the node either down or on behalf of maintenance tasks
|
||||
// like nodetool refresh
|
||||
return d.process_sstable_dir(service::get_local_streaming_read_priority()).then([&dir, &d] {
|
||||
return d.process_sstable_dir(service::get_local_streaming_priority()).then([&dir, &d] {
|
||||
return d.move_foreign_sstables(dir);
|
||||
});
|
||||
});
|
||||
@@ -278,7 +278,7 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
|
||||
auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec);
|
||||
auto& cm = table.get_compaction_manager();
|
||||
auto max_threshold = table.schema()->max_compaction_threshold();
|
||||
auto& iop = service::get_local_streaming_read_priority();
|
||||
auto& iop = service::get_local_streaming_priority();
|
||||
return d.reshard(std::move(info_vec), cm, table, max_threshold, creator, iop).then([&d, &dir] {
|
||||
return d.move_foreign_sstables(dir);
|
||||
});
|
||||
@@ -327,7 +327,7 @@ distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<d
|
||||
return dir.map_reduce0([&dir, &db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode] (sstables::sstable_directory& d) {
|
||||
auto& table = db.local().find_column_family(ks_name, table_name);
|
||||
auto& cm = table.get_compaction_manager();
|
||||
auto& iop = service::get_local_streaming_read_priority();
|
||||
auto& iop = service::get_local_streaming_priority();
|
||||
return d.reshape(cm, table, creator, iop, mode);
|
||||
}, uint64_t(0), std::plus<uint64_t>()).then([start] (uint64_t total_size) {
|
||||
if (total_size > 0) {
|
||||
|
||||
@@ -505,7 +505,7 @@ public:
|
||||
[t = std::move(t), use_view_update_path, adjusted_estimated_partitions] (flat_mutation_reader reader) {
|
||||
sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write();
|
||||
schema_ptr s = reader.schema();
|
||||
auto& pc = service::get_local_streaming_write_priority();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s,
|
||||
t->get_sstables_manager().configure_writer(),
|
||||
encoding_stats{}, pc).then([sst] {
|
||||
|
||||
@@ -29,8 +29,7 @@ priority_manager& get_local_priority_manager() {
|
||||
priority_manager::priority_manager()
|
||||
: _commitlog_priority(engine().register_one_priority_class("commitlog", 1000))
|
||||
, _mt_flush_priority(engine().register_one_priority_class("memtable_flush", 1000))
|
||||
, _stream_read_priority(engine().register_one_priority_class("streaming_read", 200))
|
||||
, _stream_write_priority(engine().register_one_priority_class("streaming_write", 200))
|
||||
, _streaming_priority(engine().register_one_priority_class("streaming", 200))
|
||||
, _sstable_query_read(engine().register_one_priority_class("query", 1000))
|
||||
, _compaction_priority(engine().register_one_priority_class("compaction", 1000))
|
||||
{}
|
||||
|
||||
@@ -28,8 +28,7 @@ namespace service {
|
||||
class priority_manager {
|
||||
::io_priority_class _commitlog_priority;
|
||||
::io_priority_class _mt_flush_priority;
|
||||
::io_priority_class _stream_read_priority;
|
||||
::io_priority_class _stream_write_priority;
|
||||
::io_priority_class _streaming_priority;
|
||||
::io_priority_class _sstable_query_read;
|
||||
::io_priority_class _compaction_priority;
|
||||
|
||||
@@ -45,13 +44,8 @@ public:
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
streaming_read_priority() const {
|
||||
return _stream_read_priority;
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
streaming_write_priority() const {
|
||||
return _stream_write_priority;
|
||||
streaming_priority() const {
|
||||
return _streaming_priority;
|
||||
}
|
||||
|
||||
const ::io_priority_class&
|
||||
@@ -79,13 +73,8 @@ get_local_memtable_flush_priority() {
|
||||
}
|
||||
|
||||
const inline ::io_priority_class&
|
||||
get_local_streaming_read_priority() {
|
||||
return get_local_priority_manager().streaming_read_priority();
|
||||
}
|
||||
|
||||
const inline ::io_priority_class&
|
||||
get_local_streaming_write_priority() {
|
||||
return get_local_priority_manager().streaming_write_priority();
|
||||
get_local_streaming_priority() {
|
||||
return get_local_priority_manager().streaming_priority();
|
||||
}
|
||||
|
||||
const inline ::io_priority_class&
|
||||
|
||||
@@ -227,7 +227,7 @@ void stream_session::init_messaging_service_handler() {
|
||||
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path] (flat_mutation_reader reader) {
|
||||
sstables::shared_sstable sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
|
||||
schema_ptr s = reader.schema();
|
||||
auto& pc = service::get_local_streaming_write_priority();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s,
|
||||
cf->get_sstables_manager().configure_writer(),
|
||||
|
||||
10
table.cc
10
table.cc
@@ -493,7 +493,7 @@ table::make_streaming_reader(schema_ptr s,
|
||||
const dht::partition_range_vector& ranges) const {
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit();
|
||||
auto& slice = s->full_slice();
|
||||
auto& pc = service::get_local_streaming_read_priority();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
|
||||
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
|
||||
@@ -512,7 +512,7 @@ table::make_streaming_reader(schema_ptr s,
|
||||
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
|
||||
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const {
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit();
|
||||
const auto& pc = service::get_local_streaming_read_priority();
|
||||
const auto& pc = service::get_local_streaming_priority();
|
||||
auto trace_state = tracing::trace_state_ptr();
|
||||
const auto fwd = streamed_mutation::forwarding::no;
|
||||
|
||||
@@ -840,7 +840,7 @@ table::seal_active_streaming_memtable_immediate(flush_permit&& permit) {
|
||||
auto fp = permit.release_sstable_write_permit();
|
||||
database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
|
||||
return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable {
|
||||
auto&& priority = service::get_local_streaming_write_priority();
|
||||
auto&& priority = service::get_local_streaming_priority();
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
return write_memtable_to_sstable(*old, newtab, monitor, cfg, priority).then([this, newtab, old] {
|
||||
@@ -884,7 +884,7 @@ future<> table::seal_active_streaming_memtable_big(streaming_memtable_big& smb,
|
||||
|
||||
auto fp = permit.release_sstable_write_permit();
|
||||
auto monitor = std::make_unique<database_sstable_write_monitor>(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
|
||||
auto&& priority = service::get_local_streaming_write_priority();
|
||||
auto&& priority = service::get_local_streaming_priority();
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
cfg.leave_unsealed = true;
|
||||
@@ -2571,7 +2571,7 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou
|
||||
as_mutation_source_excluding(excluded_sstables),
|
||||
tracing::trace_state_ptr(),
|
||||
*_config.streaming_read_concurrency_semaphore,
|
||||
service::get_local_streaming_read_priority(),
|
||||
service::get_local_streaming_priority(),
|
||||
query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
|
||||
}
|
||||
|
||||
|
||||
@@ -432,7 +432,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_write_priority();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
|
||||
Reference in New Issue
Block a user