Compare commits
10 Commits
copilot/fi
...
scylla-4.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af43d0c62d | ||
|
|
8c8c266f67 | ||
|
|
6d1301d93c | ||
|
|
be545d6d5d | ||
|
|
a1c15f0690 | ||
|
|
4d68c53389 | ||
|
|
7d1f352be2 | ||
|
|
0fe5335447 | ||
|
|
8a026b8b14 | ||
|
|
0760107b9f |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.1.rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1261,7 +1261,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
stats& stats) {
|
||||
if (needs_read_before_write) {
|
||||
if (_write_isolation == write_isolation::FORBID_RMW) {
|
||||
throw api_error("ValidationException", "Read-modify-write operations not supported");
|
||||
throw api_error("ValidationException", "Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
|
||||
}
|
||||
stats.reads_before_write++;
|
||||
if (_write_isolation == write_isolation::UNSAFE_RMW) {
|
||||
|
||||
10
database.cc
10
database.cc
@@ -113,11 +113,11 @@ make_flush_controller(const db::config& cfg, seastar::scheduling_group sg, const
|
||||
|
||||
inline
|
||||
std::unique_ptr<compaction_manager>
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg) {
|
||||
if (cfg.compaction_static_shares() > 0) {
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares());
|
||||
}
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
@@ -161,7 +161,7 @@ void keyspace::remove_user_type(const user_type ut) {
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm)
|
||||
: _stats(make_lw_shared<db_stats>())
|
||||
, _cl_stats(std::make_unique<cell_locker_stats>())
|
||||
, _cfg(cfg)
|
||||
@@ -198,7 +198,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _mutation_query_stage()
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
|
||||
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,
|
||||
|
||||
@@ -1427,7 +1427,7 @@ public:
|
||||
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
|
||||
|
||||
future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm);
|
||||
database(database&&) = delete;
|
||||
~database();
|
||||
|
||||
|
||||
@@ -703,6 +703,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||
// (last_modification - manager::hints_timer_period) old.
|
||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -725,6 +726,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||
++this->shard_stats().discarded;
|
||||
}
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}).finally([units = std::move(units), ctx_ptr] {});
|
||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||
|
||||
8
main.cc
8
main.cc
@@ -736,7 +736,7 @@ int main(int ac, char** av) {
|
||||
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
|
||||
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
start_large_data_handler(db).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -1186,6 +1186,12 @@ int main(int ac, char** av) {
|
||||
}
|
||||
});
|
||||
|
||||
auto stop_compaction_manager = defer_verbose_shutdown("compaction manager", [&db] {
|
||||
db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
});
|
||||
|
||||
auto stop_redis_service = defer_verbose_shutdown("redis service", [&cfg] {
|
||||
if (cfg->redis_port() || cfg->redis_ssl_port()) {
|
||||
redis.stop().get();
|
||||
|
||||
@@ -450,6 +450,7 @@ class repair_writer {
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
streaming::stream_reason _reason;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -561,11 +562,13 @@ public:
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -588,6 +591,10 @@ public:
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
@@ -1191,6 +1198,23 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
// Must run inside a seastar thread
|
||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||
@@ -1216,18 +1240,7 @@ private:
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
for (auto& r : row_diff) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1238,15 +1251,7 @@ private:
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1936,22 +1941,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1977,22 +1977,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2017,22 +2012,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1040,12 +1040,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
@@ -2602,11 +2606,8 @@ future<> storage_service::drain() {
|
||||
ss.do_stop_ms().get();
|
||||
|
||||
// Interrupt on going compaction and shutdown to prevent further compaction
|
||||
// No new compactions will be started from this call site on, but we don't need
|
||||
// to wait for them to stop. Drain leaves the node alive, and a future shutdown
|
||||
// will wait on the compaction_manager stop future.
|
||||
ss.db().invoke_on_all([] (auto& db) {
|
||||
db.get_compaction_manager().do_stop();
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
|
||||
ss.set_mode(mode::DRAINING, "flushing column families", false);
|
||||
|
||||
@@ -357,7 +357,7 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
|
||||
});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory)
|
||||
: _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
|
||||
auto b = backlog() / available_memory;
|
||||
// This means we are using an unimplemented strategy
|
||||
@@ -372,26 +372,17 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares)
|
||||
: _compaction_controller(sg, iop, shares)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
, _available_memory(available_memory)
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager()
|
||||
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(1)
|
||||
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
{}
|
||||
|
||||
compaction_manager::~compaction_manager() {
|
||||
@@ -455,17 +446,11 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop() {
|
||||
do_stop();
|
||||
return std::move(*_stop_future);
|
||||
}
|
||||
|
||||
void compaction_manager::do_stop() {
|
||||
if (_stopped) {
|
||||
return;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
_stopped = true;
|
||||
cmlog.info("Asked to stop");
|
||||
_stopped = true;
|
||||
// Reset the metrics registry
|
||||
_metrics.clear();
|
||||
// Stop all ongoing compaction.
|
||||
@@ -475,10 +460,7 @@ void compaction_manager::do_stop() {
|
||||
// Wait for each task handler to stop. Copy list because task remove itself
|
||||
// from the list when done.
|
||||
auto tasks = _tasks;
|
||||
|
||||
// fine to ignore here, since it is used to set up the shared promise in
|
||||
// the finally block. Waiters will wait on the shared_future through stop().
|
||||
_stop_future.emplace(do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return parallel_for_each(tasks, [this] (auto& task) {
|
||||
return this->task_stop(task);
|
||||
});
|
||||
@@ -490,7 +472,7 @@ void compaction_manager::do_stop() {
|
||||
_compaction_submission_timer.cancel();
|
||||
cmlog.info("Stopped");
|
||||
return _compaction_controller.shutdown();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
|
||||
@@ -523,7 +505,8 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
|
||||
} catch (storage_io_error& e) {
|
||||
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
|
||||
retry = false;
|
||||
do_stop();
|
||||
// FIXME discarded future.
|
||||
(void)stop();
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}: {}", std::current_exception(), decision_msg);
|
||||
retry = true;
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include <seastar/core/scheduling.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "log.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include <vector>
|
||||
@@ -70,9 +69,6 @@ private:
|
||||
|
||||
// Used to assert that compaction_manager was explicitly stopped, if started.
|
||||
bool _stopped = true;
|
||||
// We use a shared promise to indicate whether or not we are stopped because it is legal
|
||||
// for stop() to be called twice. For instance it is called on DRAIN and shutdown.
|
||||
std::optional<future<>> _stop_future;
|
||||
|
||||
stats _stats;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
@@ -153,10 +149,9 @@ private:
|
||||
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
|
||||
|
||||
future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func);
|
||||
optimized_optional<abort_source::subscription> _early_abort_subscription;
|
||||
public:
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares);
|
||||
compaction_manager();
|
||||
~compaction_manager();
|
||||
|
||||
@@ -165,13 +160,9 @@ public:
|
||||
// Start compaction manager.
|
||||
void start();
|
||||
|
||||
// Stop all fibers. Ongoing compactions will be waited. Should only be called
|
||||
// once, from main teardown path.
|
||||
// Stop all fibers. Ongoing compactions will be waited.
|
||||
future<> stop();
|
||||
|
||||
// Stop all fibers, without waiting. Safe to be called multiple times.
|
||||
void do_stop();
|
||||
|
||||
bool stopped() const { return _stopped; }
|
||||
|
||||
// Submit a column family to be compacted.
|
||||
|
||||
@@ -85,7 +85,7 @@ private:
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
uint32_t _promoted_index_end;
|
||||
uint64_t _promoted_index_end;
|
||||
uint64_t _position;
|
||||
uint64_t _partition_header_length = 0;
|
||||
std::optional<deletion_time> _deletion_time;
|
||||
|
||||
@@ -84,7 +84,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, std::ref(mm_notif), std::ref(token_metadata), true).get();
|
||||
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
|
||||
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
db.invoke_on_all([] (database& db) {
|
||||
db.get_compaction_manager().start();
|
||||
}).get();
|
||||
|
||||
@@ -460,7 +460,7 @@ public:
|
||||
|
||||
database_config dbcfg;
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
auto stop_db = defer([&db] {
|
||||
db.stop().get();
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user