diff --git a/raft/server.cc b/raft/server.cc index a9f9dcff14..4271d1ee61 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -81,14 +81,14 @@ public: // server interface - future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) override; - future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) override; + future<> add_entry(command command, wait_type type, seastar::abort_source* as) override; + future<> set_configuration(config_member_set c_new, seastar::abort_source* as) override; raft::configuration get_configuration() const override; future<> start() override; future<> abort(sstring reason) override; bool is_alive() const override; term_t get_current_term() const override; - future<> read_barrier(seastar::abort_source* as = nullptr) override; + future<> read_barrier(seastar::abort_source* as) override; void wait_until_candidate() override; future<> wait_election_done() override; future<> wait_log_idx_term(std::pair idx_log) override; @@ -100,7 +100,7 @@ public: raft::server_id id() const override; void set_applier_queue_max_size(size_t queue_max_size) override; future<> stepdown(logical_clock::duration timeout) override; - future<> modify_config(std::vector add, std::vector del, seastar::abort_source* as = nullptr) override; + future<> modify_config(std::vector add, std::vector del, seastar::abort_source* as) override; future add_entry_on_leader(command command, seastar::abort_source* as); void register_metrics() override; size_t max_command_size() const override; @@ -276,7 +276,7 @@ private: // A helper to wait for a leader to get elected future<> wait_for_leader(seastar::abort_source* as); - future<> wait_for_state_change(seastar::abort_source* as = nullptr) override; + future<> wait_for_state_change(seastar::abort_source* as) override; // Get "safe to read" index from a leader future get_read_idx(server_id leader, seastar::abort_source* as); diff --git a/raft/server.hh b/raft/server.hh index 2489545c57..ebc731e105 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -77,6 +77,7 @@ public: // committed locally means simply that the commit index is beyond this entry's index. // // The caller may pass a pointer to an abort_source to make the operation abortable. + // It it passes nullptr, the operation is unabortable. // // Successful `add_entry` with `wait_type::committed` does not guarantee that `state_machine::apply` will be called // locally for this entry. Between the commit and the application we may receive a snapshot containing this entry, @@ -98,7 +99,7 @@ public: // Thrown if abort() was called on the server instance. // raft::not_a_leader // Thrown if the node is not a leader and forwarding is not enabled through enable_forwarding config option. - virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) = 0; + virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as) = 0; // Set a new cluster configuration. If the configuration is // identical to the previous one does nothing. @@ -124,6 +125,7 @@ public: // returned even in case of a successful config change. // // The caller may pass a pointer to an abort_source to make the operation abortable. + // It it passes nullptr, the operation is unabortable. // // Exceptions: // raft::conf_change_in_progress @@ -135,7 +137,7 @@ public: // forwarding the corresponding add_entry to the leader. // raft::request_aborted // Thrown if abort is requested before the operation finishes. - virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as = nullptr) = 0; + virtual future<> set_configuration(config_member_set c_new, seastar::abort_source* as) = 0; // A simplified wrapper around set_configuration() which adds // and deletes servers. Unlike set_configuration(), @@ -158,6 +160,7 @@ public: // // The caller may pass a pointer to an abort_source to make the operation abortable. // If abort is requested before the operation finishes, the future will contain `raft::request_aborted` exception. + // It the caller passes nullptr, the operation is unabortable. // // Exceptions: // raft::commit_status_unknown @@ -173,7 +176,7 @@ public: // raft::conf_change_in_progress // Thrown if the previous set_configuration/modify_config is not completed. virtual future<> modify_config(std::vector add, - std::vector del, seastar::abort_source* as = nullptr) = 0; + std::vector del, seastar::abort_source* as) = 0; // Return the currently known configuration virtual raft::configuration get_configuration() const = 0; @@ -203,13 +206,14 @@ public: // future has resolved successfully. // // The caller may pass a pointer to an abort_source to make the operation abortable. + // It it passes nullptr, the operation is unabortable. // // Exceptions: // raft::request_aborted // Thrown if abort is requested before the operation finishes. // raft::stopped_error // Thrown if abort() was called on the server instance. - virtual future<> read_barrier(seastar::abort_source* as = nullptr) = 0; + virtual future<> read_barrier(seastar::abort_source* as) = 0; // Initiate leader stepdown process. // @@ -245,7 +249,10 @@ public: // State changes can be coalesced, so it is not guaranteed that the caller will // get notification about each one of them. The state can even be the same after // the call as before, but term should be different. - virtual future<> wait_for_state_change(seastar::abort_source* as = nullptr) = 0; + // + // The caller may pass a pointer to an abort_source to make the function abortable. + // It it passes nullptr, the function is unabortable. + virtual future<> wait_for_state_change(seastar::abort_source* as) = 0; // Ad hoc functions for testing virtual void wait_until_candidate() = 0; diff --git a/service/broadcast_tables/experimental/lang.cc b/service/broadcast_tables/experimental/lang.cc index d1a1bd352c..5f957626a6 100644 --- a/service/broadcast_tables/experimental/lang.cc +++ b/service/broadcast_tables/experimental/lang.cc @@ -30,7 +30,7 @@ bool is_broadcast_table_statement(const sstring& keyspace, const sstring& column future execute(service::raft_group0_client& group0_client, const query& query) { auto group0_cmd = group0_client.prepare_command(broadcast_table_query{query}, "broadcast_tables query"); auto guard = group0_client.create_result_guard(group0_cmd.new_state_id); - co_await group0_client.add_entry_unguarded(std::move(group0_cmd)); + co_await group0_client.add_entry_unguarded(std::move(group0_cmd), nullptr); co_return guard.get(); } diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index a2d2ed8c1b..de02d39c50 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -160,7 +160,7 @@ void raft_group0::init_rpc_verbs(raft_group0& shard0_this) { ser::group0_rpc_verbs::register_group0_modify_config(&shard0_this._ms.local(), [&shard0_this] (const rpc::client_info&, rpc::opt_time_point, raft::group_id gid, std::vector add, std::vector del) { return smp::submit_to(0, [&shard0_this, gid, add = std::move(add), del = std::move(del)]() mutable { - return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del)); + return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del), nullptr); }); }); diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh index 51db4339a4..c1659e9478 100644 --- a/service/raft/raft_group0_client.hh +++ b/service/raft/raft_group0_client.hh @@ -104,9 +104,9 @@ public: // Call after `system_keyspace` is initialized. future<> init(); - future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as = nullptr); + future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as); - future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as = nullptr); + future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as); // Ensures that all previously finished operations on group 0 are visible on this node; // in particular, performs a Raft read barrier on group 0. @@ -128,7 +128,7 @@ public: // FIXME?: this is kind of annoying for the user. // we could forward the call to shard 0, have group0_guard keep a foreign_ptr to the internal data structures on shard 0, // and add_entry would again forward to shard 0. - future start_operation(seastar::abort_source* as = nullptr); + future start_operation(seastar::abort_source* as); template requires std::same_as || std::same_as diff --git a/service/storage_service.cc b/service/storage_service.cc index 076c458ce3..f1c1d89030 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1070,7 +1070,7 @@ class topology_coordinator { slogger.trace("raft topology: do update {} reason {}", updates, reason); topology_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); - co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard)); + co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: race while changing state: {}. Retrying", reason); throw; @@ -1297,7 +1297,7 @@ class topology_coordinator { slogger.trace("raft topology: do update {} reason {}", m, reason); write_mutations change{{std::move(m)}}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), reason); - return _group0.client().add_entry_unguarded(std::move(g0_cmd)); + return _group0.client().add_entry_unguarded(std::move(g0_cmd), &_as); }); guard = co_await start_operation(); @@ -2502,7 +2502,7 @@ class topology_coordinator { assert(!_topo_sm._topology.transition_nodes.empty()); if (!_raft.get_configuration().contains(id)) { - co_await _raft.modify_config({raft::config_member({id, {}}, {})}, {}); + co_await _raft.modify_config({raft::config_member({id, {}}, {})}, {}, &_as); } release_node(std::move(node)); @@ -6909,7 +6909,7 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard)); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { slogger.debug("move_tablet(): concurrent modification, retrying"); @@ -6953,7 +6953,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) { topology_change change{std::move(updates)}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard)); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); break; } catch (group0_concurrent_modification&) { slogger.debug("set_tablet_balancing_enabled(): concurrent modification"); diff --git a/test/raft/raft_server_test.cc b/test/raft/raft_server_test.cc index 1413971eef..06d1eecbf4 100644 --- a/test/raft/raft_server_test.cc +++ b/test/raft/raft_server_test.cc @@ -26,9 +26,9 @@ SEASTAR_THREAD_TEST_CASE(test_check_abort_on_client_api) { return sstring(e.what()) == sstring("Raft instance is stopped, reason: \"test crash\""); }; BOOST_CHECK_EXCEPTION(cluster.add_entries(1, 0).get0(), raft::stopped_error, check_error); - BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}).get0(), raft::stopped_error, check_error); - BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier().get0(), raft::stopped_error, check_error); - BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}).get0(), raft::stopped_error, check_error); + BOOST_CHECK_EXCEPTION(cluster.get_server(0).modify_config({}, {to_raft_id(0)}, nullptr).get0(), raft::stopped_error, check_error); + BOOST_CHECK_EXCEPTION(cluster.get_server(0).read_barrier(nullptr).get0(), raft::stopped_error, check_error); + BOOST_CHECK_EXCEPTION(cluster.get_server(0).set_configuration({}, nullptr).get0(), raft::stopped_error, check_error); } SEASTAR_THREAD_TEST_CASE(test_release_memory_if_add_entry_throws) { diff --git a/test/raft/replication.hh b/test/raft/replication.hh index 4706909756..d8de9109f2 100644 --- a/test/raft/replication.hh +++ b/test/raft/replication.hh @@ -951,7 +951,7 @@ future<> raft_cluster::add_entry(size_t val, std::optional server while (true) { try { auto& at = _servers[server ? *server : _leader].server; - co_await at->add_entry(create_command(val), raft::wait_type::committed); + co_await at->add_entry(create_command(val), raft::wait_type::committed, nullptr); break; } catch (raft::commit_status_unknown& e) { // FIXME: in some cases when we get `commit_status_unknown` the entry may have been applied. @@ -1201,7 +1201,7 @@ future<> raft_cluster::change_configuration(set_config sc) { } tlogger.debug("Changing configuration on leader {}", _leader); - co_await _servers[_leader].server->set_configuration(std::move(set)); + co_await _servers[_leader].server->set_configuration(std::move(set), nullptr); if (!new_config.contains(_leader)) { co_await free_election(); @@ -1211,7 +1211,7 @@ future<> raft_cluster::change_configuration(set_config sc) { // Add a dummy entry to confirm new configuration was committed try { co_await _servers[_leader].server->add_entry(create_command(dummy_command), - raft::wait_type::committed); + raft::wait_type::committed, nullptr); } catch (raft::not_a_leader& e) { // leader stepped down, implying config fully changed } catch (raft::commit_status_unknown& e) {} @@ -1338,7 +1338,7 @@ future<> raft_cluster::tick(::tick t) { template future<> raft_cluster::read(read_value r) { - co_await _servers[r.node_idx].server->read_barrier(); + co_await _servers[r.node_idx].server->read_barrier(nullptr); auto val = _servers[r.node_idx].sm->hasher->finalize_uint64(); auto expected = hasher_int::hash_range(r.expected_index).finalize_uint64(); BOOST_CHECK_MESSAGE(val == expected,