Compare commits

...

10 Commits

Author SHA1 Message Date
Hagit Segev
af43d0c62d release: prepare for 4.1.rc1 2020-05-26 18:57:30 +03:00
Amnon Heiman
8c8c266f67 storage_service: get_range_to_address_map prevent use after free
The implementation of get_range_to_address_map has a default behaviour,
when getting an empty keypsace, it uses the first non-system keyspace
(first here is basically, just a keyspace).

The current implementation has two issues, first, it uses a reference to
a string that is held on a stack of another function. In other word,
there's a use after free that is not clear why we never hit.

The second, it calls get_non_system_keyspaces twice. Though this is not
a bug, it's redundant (get_non_system_keyspaces uses a loop, so calling
that function does have a cost).

This patch solves both issues, by chaning the implementation to hold a
string instead of a reference to a string.

Second, it stores the results from get_non_system_keyspaces and reuse
them it's more efficient and holds the returned values on the local
stack.

Fixes #6465

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 69a46d4179)
2020-05-25 12:48:11 +03:00
Nadav Har'El
6d1301d93c alternator: better error messages when 'forbid_rmw' mode is on
When the 'forbid_rmw' write isolation policy is selected, read-modify-write
are intentionally forbidden. The error message in this case used to say:

	"Read-modify-write operations not supported"

Which can lead users to believe that this operation isn't supported by this
version of Alternator - instead of realizing that this is in fact a
configurable choice.

So in this patch we just change the error message to say:

	"Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information."

Fixes #6421.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200518125538.8347-1-nyh@scylladb.com>
(cherry picked from commit 5ef9854e86)
2020-05-25 08:49:48 +03:00
Tomasz Grabiec
be545d6d5d sstables: index_reader: Fix overflow when calculating promoted index end
When index file is larger than 4GB, offset calculation will overflow
uint32_t and _promoted_index_end will be too small.

As a result, promoted_index_size calculation will underflow and the
rest of the page will be interpretd as a promoted index.

The partitions which are in the remainder of the index page will not
be found by single-partition queries.

Data is not lost.

Introduced in 6c5f8e0eda.

Fixes #6040
Message-Id: <20200521174822.8350-1-tgrabiec@scylladb.com>

(cherry picked from commit a6c87a7b9e)
2020-05-24 09:45:42 +03:00
Rafael Ávila de Espíndola
a1c15f0690 repair: Make sure sinks are always closed
In a recent next failure I got the following backtrace

    function=function@entry=0x270360 "seastar::rpc::sink_impl<Serializer, Out>::~sink_impl() [with Serializer = netw::serializer; Out = {repair_row_on_wire_with_cmd}]") at assert.c:101
    at ./seastar/include/seastar/core/shared_ptr.hh:463
    at repair/row_level.cc:2059

This patch changes a few functions to use finally to make sure the sink
is always closed.

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200515202803.60020-1-espindola@scylladb.com>
(cherry picked from commit 311fbe2f0a)

Ref #6414
2020-05-20 09:00:10 +03:00
Asias He
4d68c53389 repair: Fix race between write_end_of_stream and apply_rows
Consider: n1, n2, n1 is the repair master, n2 is the repair follower.

=== Case 1 ===
1) n1 sends missing rows {r1, r2} to n2
2) n2 runs apply_rows_on_follower to apply rows, e.g., {r1, r2}, r1
   is written to sstable, r2 is not written yet, r1 belongs to
   partition 1, r2 belongs to partition 2. It yields after row r1 is
   written.
   data: partition_start, r1
3) n1 sends repair_row_level_stop to n2 because error has happened on n1
4) n2 calls wait_for_writer_done() which in turn calls write_end_of_stream()
   data: partition_start, r1, partition_end
5) Step 2 resumes to apply the rows.
   data: partition_start, r1, partition_end, partition_end, partition_start, r2

=== Case 2 ===
1) n1 sends missing rows {r1, r2} to n2
2) n2 runs apply_rows_on_follower to apply rows, e.g., {r1, r2}, r1
   is written to sstable, r2 is not written yet, r1 belongs to partition
   1, r2 belongs to partition 2. It yields after partition_start for r2
   is written but before _partition_opened is set to true.
   data: partition_start, r1, partition_end, partition_start
3) n1 sends repair_row_level_stop to n2 because error has happened on n1
4) n2 calls wait_for_writer_done() which in turn calls write_end_of_stream().
   Since _partition_opened[node_idx] is false, partition_end is skipped,
   end_of_stream is written.
   data: partition_start, r1, partition_end, partition_start, end_of_stream

This causes unbalanced partition_start and partition_end in the stream
written to sstables.

To fix, serialize the write_end_of_stream and apply_rows with a semaphore.

Fixes: #6394
Fixes: #6296
Fixes: #6414
(cherry picked from commit b2c4d9fdbc)
2020-05-20 08:07:53 +03:00
Piotr Dulikowski
7d1f352be2 hinted handoff: don't keep positions of old hints in rps_set
When sending hints from one file, rps_set field in send_one_file_ctx
keeps track of commitlog positions of hints that are being currently
sent, or have failed to be sent. At the end of the operation, if sending
of some hints failed, we will choose position of the earliest hint that
failed to be sent, and will retry sending that file later, starting from
that position. This position is stored in _last_not_complete_rp.

Usually, this set has a bounded size, because we impose a limit of at
most 128 hints being sent concurrently. Because we do not attempt to
send any more hints after a failure is detected, rps_set should not have
more than 128 elements at a time.

Due to a bug, commitlog positions of old hints (older than
gc_grace_seconds of the destination table) were inserted into rps_set
but not removed after checking their age. This could cause rps_set to
grow very large when replaying a file with old hints.

Moreover, if the file mixed expired and non-expired hints (which could
happen if it had hints to two tables with different gc_grace_seconds),
and sending of some non-expired hints failed, then positions of expired
hints could influence calculation _last_not_complete_rp, and more hints
than necessary would be resent on the next retry.

This simple patch removes commitlog position of a hint from rps_set when
it is detected to be too old.

Fixes #6422

(cherry picked from commit 85d5c3d5ee)
2020-05-20 08:05:51 +03:00
Piotr Dulikowski
0fe5335447 hinted handoff: remove discarded hint positions from rps_set
Related commit: 85d5c3d

When attempting to send a hint, an exception might occur that results in
that hint being discarded (e.g. keyspace or table of the hint was
removed).

When such an exception is thrown, position of the hint will already be
stored in rps_set. We are only allowed to retain positions of hints that
failed to be sent and needed to be retried later. Dropping a hint is not
an error, therefore its position should be removed from rps_set - but
current logic does not do that.

Because of that bug, hint files with many discardable hints might cause
rps_set to grow large when the file is replayed. Furthermore, leaving
positions of such hints in rps_set might cause more hints than necessary
to be re-sent if some non-discarded hints fail to be sent.

This commit fixes the problem by removing positions of discarded hints
from rps_set.

Fixes #6433

(cherry picked from commit 0c5ac0da98)
2020-05-20 08:03:20 +03:00
Avi Kivity
8a026b8b14 Revert "compaction_manager: allow early aborts through abort sources."
This reverts commit e8213fb5c3. It results
in an assertion failure in remove_index_file_test.

Fixes #6413.

(cherry picked from commit 5b971397aa)
2020-05-13 18:26:34 +03:00
Yaron Kaikov
0760107b9f release: prepare for 4.1.rc0 2020-05-11 11:32:01 +03:00
13 changed files with 79 additions and 106 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=666.development
VERSION=4.1.rc1
if test -f version
then

View File

@@ -1261,7 +1261,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
stats& stats) {
if (needs_read_before_write) {
if (_write_isolation == write_isolation::FORBID_RMW) {
throw api_error("ValidationException", "Read-modify-write operations not supported");
throw api_error("ValidationException", "Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
}
stats.reads_before_write++;
if (_write_isolation == write_isolation::UNSAFE_RMW) {

View File

@@ -113,11 +113,11 @@ make_flush_controller(const db::config& cfg, seastar::scheduling_group sg, const
inline
std::unique_ptr<compaction_manager>
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
make_compaction_manager(const db::config& cfg, database_config& dbcfg) {
if (cfg.compaction_static_shares() > 0) {
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares());
}
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory);
}
lw_shared_ptr<keyspace_metadata>
@@ -161,7 +161,7 @@ void keyspace::remove_user_type(const user_type ut) {
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm)
: _stats(make_lw_shared<db_stats>())
, _cl_stats(std::make_unique<cell_locker_stats>())
, _cfg(cfg)
@@ -198,7 +198,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _mutation_query_stage()
, _apply_stage("db_apply", &database::do_apply)
, _version(empty_version)
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
, _compaction_manager(make_compaction_manager(_cfg, dbcfg))
, _enable_incremental_backups(cfg.incremental_backups())
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,

View File

@@ -1427,7 +1427,7 @@ public:
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm);
database(database&&) = delete;
~database();

View File

@@ -703,6 +703,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
// (last_modification - manager::hints_timer_period) old.
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
ctx_ptr->rps_set.erase(rp);
return make_ready_future<>();
}
@@ -725,6 +726,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
++this->shard_stats().discarded;
}
ctx_ptr->rps_set.erase(rp);
return make_ready_future<>();
}).finally([units = std::move(units), ctx_ptr] {});
}).handle_exception([this, ctx_ptr] (auto eptr) {

View File

@@ -736,7 +736,7 @@ int main(int ac, char** av) {
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
dbcfg.available_memory = memory::stats().total_memory();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata)).get();
start_large_data_handler(db).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
@@ -1186,6 +1186,12 @@ int main(int ac, char** av) {
}
});
auto stop_compaction_manager = defer_verbose_shutdown("compaction manager", [&db] {
db.invoke_on_all([](auto& db) {
return db.get_compaction_manager().stop();
}).get();
});
auto stop_redis_service = defer_verbose_shutdown("redis service", [&cfg] {
if (cfg->redis_port() || cfg->redis_ssl_port()) {
redis.stop().get();

View File

@@ -450,6 +450,7 @@ class repair_writer {
// written.
std::vector<bool> _partition_opened;
streaming::stream_reason _reason;
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
public:
repair_writer(
schema_ptr schema,
@@ -561,11 +562,13 @@ public:
future<> write_end_of_stream(unsigned node_idx) {
if (_mq[node_idx]) {
return with_semaphore(_sem, 1, [this, node_idx] {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end(node_idx).then([this, node_idx] () mutable {
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
});
});
} else {
return make_ready_future<>();
}
@@ -588,6 +591,10 @@ public:
return make_exception_future<>(std::move(ep));
});
}
named_semaphore& sem() {
return _sem;
}
};
class repair_meta {
@@ -1191,6 +1198,23 @@ private:
}
}
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
_repair_writer.create_writer(_db, node_idx);
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
});
});
}
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
// Must run inside a seastar thread
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
@@ -1216,18 +1240,7 @@ private:
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
}
_repair_writer.create_writer(_db, node_idx);
for (auto& r : row_diff) {
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
}
do_apply_rows(row_diff, node_idx, update_buf).get();
}
future<>
@@ -1238,15 +1251,7 @@ private:
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
unsigned node_idx = 0;
_repair_writer.create_writer(_db, node_idx);
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
});
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
});
});
}
@@ -1936,22 +1941,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
current_set_diff,
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
error = true;
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
return sink.close();
}).then([sink] {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return sink.close().then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
}
@@ -1977,22 +1977,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
current_rows,
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
error = true;
return sink(repair_stream_cmd::error).then([sink] () mutable {
return sink.close();
}).then([sink] {
return sink(repair_stream_cmd::error).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return sink.close().then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
}
@@ -2017,22 +2012,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
error,
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
error = true;
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
return sink.close();
}).then([sink] {
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return sink.close().then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
}

View File

@@ -1040,12 +1040,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
std::unordered_map<dht::token_range, std::vector<inet_address>>
storage_service::get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const {
sstring ks = keyspace;
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
throw std::runtime_error("No keyspace provided and no non system kespace exist");
if (keyspace == "") {
auto keyspaces = _db.local().get_non_system_keyspaces();
if (keyspaces.empty()) {
throw std::runtime_error("No keyspace provided and no non system kespace exist");
}
ks = keyspaces[0];
}
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
}
@@ -2602,11 +2606,8 @@ future<> storage_service::drain() {
ss.do_stop_ms().get();
// Interrupt on going compaction and shutdown to prevent further compaction
// No new compactions will be started from this call site on, but we don't need
// to wait for them to stop. Drain leaves the node alive, and a future shutdown
// will wait on the compaction_manager stop future.
ss.db().invoke_on_all([] (auto& db) {
db.get_compaction_manager().do_stop();
return db.get_compaction_manager().stop();
}).get();
ss.set_mode(mode::DRAINING, "flushing column families", false);

View File

@@ -357,7 +357,7 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
});
}
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory)
: _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
auto b = backlog() / available_memory;
// This means we are using an unimplemented strategy
@@ -372,26 +372,17 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
, _backlog_manager(_compaction_controller)
, _scheduling_group(_compaction_controller.sg())
, _available_memory(available_memory)
, _early_abort_subscription(as.subscribe([this] {
do_stop();
}))
{}
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares)
: _compaction_controller(sg, iop, shares)
, _backlog_manager(_compaction_controller)
, _scheduling_group(_compaction_controller.sg())
, _available_memory(available_memory)
, _early_abort_subscription(as.subscribe([this] {
do_stop();
}))
, _available_memory(available_memory)
{}
compaction_manager::compaction_manager()
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
, _backlog_manager(_compaction_controller)
, _scheduling_group(_compaction_controller.sg())
, _available_memory(1)
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
{}
compaction_manager::~compaction_manager() {
@@ -455,17 +446,11 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
}
future<> compaction_manager::stop() {
do_stop();
return std::move(*_stop_future);
}
void compaction_manager::do_stop() {
if (_stopped) {
return;
return make_ready_future<>();
}
_stopped = true;
cmlog.info("Asked to stop");
_stopped = true;
// Reset the metrics registry
_metrics.clear();
// Stop all ongoing compaction.
@@ -475,10 +460,7 @@ void compaction_manager::do_stop() {
// Wait for each task handler to stop. Copy list because task remove itself
// from the list when done.
auto tasks = _tasks;
// fine to ignore here, since it is used to set up the shared promise in
// the finally block. Waiters will wait on the shared_future through stop().
_stop_future.emplace(do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
return do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
return parallel_for_each(tasks, [this] (auto& task) {
return this->task_stop(task);
});
@@ -490,7 +472,7 @@ void compaction_manager::do_stop() {
_compaction_submission_timer.cancel();
cmlog.info("Stopped");
return _compaction_controller.shutdown();
}));
});
}
inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
@@ -523,7 +505,8 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
} catch (storage_io_error& e) {
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
retry = false;
do_stop();
// FIXME discarded future.
(void)stop();
} catch (...) {
cmlog.error("compaction failed: {}: {}", std::current_exception(), decision_msg);
retry = true;

View File

@@ -29,7 +29,6 @@
#include <seastar/core/rwlock.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/abort_source.hh>
#include "log.hh"
#include "utils/exponential_backoff_retry.hh"
#include <vector>
@@ -70,9 +69,6 @@ private:
// Used to assert that compaction_manager was explicitly stopped, if started.
bool _stopped = true;
// We use a shared promise to indicate whether or not we are stopped because it is legal
// for stop() to be called twice. For instance it is called on DRAIN and shutdown.
std::optional<future<>> _stop_future;
stats _stats;
seastar::metrics::metric_groups _metrics;
@@ -153,10 +149,9 @@ private:
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func);
optimized_optional<abort_source::subscription> _early_abort_subscription;
public:
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory);
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares);
compaction_manager();
~compaction_manager();
@@ -165,13 +160,9 @@ public:
// Start compaction manager.
void start();
// Stop all fibers. Ongoing compactions will be waited. Should only be called
// once, from main teardown path.
// Stop all fibers. Ongoing compactions will be waited.
future<> stop();
// Stop all fibers, without waiting. Safe to be called multiple times.
void do_stop();
bool stopped() const { return _stopped; }
// Submit a column family to be compacted.

View File

@@ -85,7 +85,7 @@ private:
} _state = state::START;
temporary_buffer<char> _key;
uint32_t _promoted_index_end;
uint64_t _promoted_index_end;
uint64_t _position;
uint64_t _partition_header_length = 0;
std::optional<deletion_time> _deletion_time;

View File

@@ -84,7 +84,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, std::ref(mm_notif), std::ref(token_metadata), true).get();
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
db.invoke_on_all([] (database& db) {
db.get_compaction_manager().start();
}).get();

View File

@@ -460,7 +460,7 @@ public:
database_config dbcfg;
dbcfg.available_memory = memory::stats().total_memory();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
auto stop_db = defer([&db] {
db.stop().get();
});