diff --git a/auth/service.cc b/auth/service.cc index 1e87d2eee0..fdd199dafc 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -77,17 +77,23 @@ private: void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {} void on_drop_keyspace(const sstring& ks_name) override { - _authorizer.revoke_all( + // Do it in the background. + (void)_authorizer.revoke_all( auth::make_data_resource(ks_name)).handle_exception_type([](const unsupported_authorization_operation&) { // Nothing. + }).handle_exception([] (std::exception_ptr e) { + log.error("Unexpected exception while revoking all permissions on dropped keyspace: {}", e); }); } void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override { - _authorizer.revoke_all( + // Do it in the background. + (void)_authorizer.revoke_all( auth::make_data_resource( ks_name, cf_name)).handle_exception_type([](const unsupported_authorization_operation&) { // Nothing. + }).handle_exception([] (std::exception_ptr e) { + log.error("Unexpected exception while revoking all permissions on dropped table: {}", e); }); } diff --git a/database.cc b/database.cc index 699fc83196..e4e21e11b2 100644 --- a/database.cc +++ b/database.cc @@ -621,7 +621,8 @@ database::init_commitlog() { _commitlog->discard_completed_segments(id); return; } - _column_families[id]->flush(); + // Initiate a background flush. Waited upon in `stop()`. + (void)_column_families[id]->flush(); }).release(); // we have longer life time than CL. Ignore reg anchor }); } @@ -1377,7 +1378,7 @@ future<> dirty_memory_manager::flush_when_needed() { // Do not wait. The semaphore will protect us against a concurrent flush. But we // want to start a new one as soon as the permits are destroyed and the semaphore is // made ready again, not when we are done with the current one. - this->flush_one(*(candidate_memtable.get_memtable_list()), std::move(permit)); + (void)this->flush_one(*(candidate_memtable.get_memtable_list()), std::move(permit)); return make_ready_future<>(); }); }); @@ -1948,10 +1949,13 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); } virtual void destroy_reader(shard_id shard, future reader_fut) noexcept override { - reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable { + // Move to the background. + (void)reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable { return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { ctx.semaphore->unregister_inactive_read(std::move(*handle)); }); + }).handle_exception([shard] (std::exception_ptr e) { + dblog.warn("Failed to destroy shard reader of streaming multishard reader on shard {}: {}", shard, e); }); } virtual reader_concurrency_semaphore& semaphore() override { diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 6c0bc7b73e..f9d92b36b4 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -118,7 +118,8 @@ future<> db::batchlog_manager::start() { // round-robin scheduling. if (engine().cpu_id() == 0) { _timer.set_callback([this] { - do_batch_log_replay().handle_exception([] (auto ep) { + // Do it in the background. + (void)do_batch_log_replay().handle_exception([] (auto ep) { blogger.error("Exception in batch replay: {}", ep); }).finally([this] { _timer.arm(lowres_clock::now() + std::chrono::milliseconds(replay_interval)); diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index f8bd894cc4..a55dbd5e99 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1447,7 +1447,7 @@ void db::commitlog::segment_manager::discard_unused_segments() { // segments on deletion queue could be non-empty, and we don't want // those accidentally left around for replay. if (!_shutdown) { - with_gate(_gate, [this] { + (void)with_gate(_gate, [this] { return do_pending_deletes(); }); } @@ -1598,7 +1598,7 @@ future<> db::commitlog::segment_manager::clear() { */ void db::commitlog::segment_manager::sync() { for (auto s : _segments) { - s->sync(); // we do not care about waiting... + (void)s->sync(); // we do not care about waiting... } } @@ -1606,7 +1606,7 @@ void db::commitlog::segment_manager::on_timer() { // Gate, because we are starting potentially blocking ops // without waiting for them, so segement_manager could be shut down // while they are running. - seastar::with_gate(_gate, [this] { + (void)seastar::with_gate(_gate, [this] { if (cfg.mode != sync_mode::BATCH) { sync(); } diff --git a/db/hints/manager.cc b/db/hints/manager.cc index bfcb764eb0..c2ea35eafe 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -158,7 +158,8 @@ void manager::forbid_hints_for_eps_with_pending_hints() { bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept { try { - with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable { + // Future is waited on indirectly in `stop()` (via `_store_gate`). + (void)with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable { ++_hints_in_progress; size_t mut_size = fm->representation().size(); shard_stats().size_of_hints_in_progress += mut_size; @@ -534,7 +535,8 @@ void manager::drain_for(gms::inet_address endpoint) { manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint); - with_gate(_draining_eps_gate, [this, endpoint] { + // Future is waited on indirectly in `stop()` (via `_draining_eps_gate`). + (void)with_gate(_draining_eps_gate, [this, endpoint] { return futurize_apply([this, endpoint] () { if (utils::fb_utilities::is_me(endpoint)) { return parallel_for_each(_ep_managers, [] (auto& pair) { @@ -672,7 +674,8 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) { return _resource_manager.get_send_units_for(buf.size_bytes()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable { - with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { + // Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`). + (void)with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { try { try { ctx_ptr->rps_set.emplace(rp); diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 7b4aad3544..e4448ed729 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -55,7 +55,9 @@ private: template future<> with_sem(Func&& func) { return get_units(_sem, 1).then([func = std::forward(func)] (auto units) mutable { - func().finally([units = std::move(units)] {}); + // Future is discarded purposefully, see method description. + // FIXME: error handling. + (void)func().finally([units = std::move(units)] {}); }); } diff --git a/db/view/view.cc b/db/view/view.cc index 698f1cd6d6..6a62c682ed 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1150,7 +1150,8 @@ future<> view_builder::start() { calculate_shard_build_step(std::move(built), std::move(in_progress)).get(); _mm.register_listener(this); _current_step = _base_to_build_step.begin(); - _build_step.trigger(); + // Waited on indirectly in stop(). + (void)_build_step.trigger(); }); return make_ready_future<>(); } @@ -1427,7 +1428,8 @@ static future<> flush_base(lw_shared_ptr base, abort_source& as) } void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) { - with_semaphore(_sem, 1, [ks_name, view_name, this] { + // Do it in the background, serialized. + (void)with_semaphore(_sem, 1, [ks_name, view_name, this] { auto view = view_ptr(_db.find_schema(ks_name, view_name)); auto& step = get_or_create_build_step(view->view_info()->base_id()); return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] { @@ -1442,14 +1444,16 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na if (f.failed()) { vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception()); } - _build_step.trigger(); + // Waited on indirectly in stop(). + (void)_build_step.trigger(); }); }); }).handle_exception_type([] (no_such_column_family&) { }); } void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) { - with_semaphore(_sem, 1, [ks_name, view_name, this] { + // Do it in the background, serialized. + (void)with_semaphore(_sem, 1, [ks_name, view_name, this] { auto view = view_ptr(_db.find_schema(ks_name, view_name)); auto step_it = _base_to_build_step.find(view->view_info()->base_id()); if (step_it == _base_to_build_step.end()) { @@ -1466,7 +1470,8 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) { vlogger.info0("Stopping to build view {}.{}", ks_name, view_name); - with_semaphore(_sem, 1, [ks_name, view_name, this] { + // Do it in the background, serialized. + (void)with_semaphore(_sem, 1, [ks_name, view_name, this] { // The view is absent from the database at this point, so find it by brute force. ([&, this] { for (auto& [_, step] : _base_to_build_step) { diff --git a/distributed_loader.cc b/distributed_loader.cc index 43741f4c6c..3ba6225884 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -459,7 +459,7 @@ void distributed_loader::reshard(distributed& db, sstring ks_name, sst } }).then([&cf, sstables] { // schedule deletion of shared sstables after we're certain that new unshared ones were successfully forwarded to respective shards. - sstables::delete_atomically(std::move(sstables)).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) { + (void)sstables::delete_atomically(std::move(sstables)).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) { try { std::rethrow_exception(eptr); } catch (...) { diff --git a/gms/gossiper.cc b/gms/gossiper.cc index b5fc804264..abc7f1a764 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -339,7 +339,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { _ms_registered = true; ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) { auto from = netw::messaging_service::get_source(cinfo); - smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable { + // In a new fiber. + (void)smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable { auto& gossiper = gms::get_local_gossiper(); return gossiper.handle_syn_msg(from, std::move(syn_msg)); }).handle_exception([] (auto ep) { @@ -349,7 +350,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { }); ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) { auto from = netw::messaging_service::get_source(cinfo); - smp::submit_to(0, [from, msg = std::move(msg)] () mutable { + // In a new fiber. + (void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable { auto& gossiper = gms::get_local_gossiper(); return gossiper.handle_ack_msg(from, std::move(msg)); }).handle_exception([] (auto ep) { @@ -358,7 +360,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { return messaging_service::no_wait(); }); ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) { - smp::submit_to(0, [msg = std::move(msg)] () mutable { + // In a new fiber. + (void)smp::submit_to(0, [msg = std::move(msg)] () mutable { return gms::get_local_gossiper().handle_ack2_msg(std::move(msg)); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep); @@ -371,7 +374,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { }); }); ms().register_gossip_shutdown([] (inet_address from) { - smp::submit_to(0, [from] { + // In a new fiber. + (void)smp::submit_to(0, [from] { return gms::get_local_gossiper().handle_shutdown_msg(from); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep); @@ -525,7 +529,7 @@ void gossiper::remove_endpoint(inet_address endpoint) { // do subscribers first so anything in the subscriber that depends on gossiper state won't get confused // We can not run on_remove callbacks here becasue on_remove in // storage_service might take the gossiper::timer_callback_lock - seastar::async([this, endpoint] { + (void)seastar::async([this, endpoint] { _subscribers.for_each([endpoint] (auto& subscriber) { subscriber->on_remove(endpoint); }); @@ -606,7 +610,8 @@ future gossiper::lock_endpoint(inet_address ep) { // - failure_detector // - on_remove callbacks, e.g, storage_service -> access token_metadata void gossiper::run() { - timer_callback_lock().then([this, g = this->shared_from_this()] { + // Run it in the background. + (void)timer_callback_lock().then([this, g = this->shared_from_this()] { return seastar::async([this, g] { logger.trace("=== Gossip round START"); @@ -653,13 +658,15 @@ void gossiper::run() { } logger.debug("Talk to {} live nodes: {}", nr_live_nodes, live_nodes); for (auto& ep: live_nodes) { - do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) { + // Do it in the background. + (void)do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) { logger.trace("Failed to do_gossip_to_live_member: {}", ep); }); } /* Gossip to some unreachable member with some probability to check if he is back up */ - do_gossip_to_unreachable_member(message).handle_exception([] (auto ep) { + // Do it in the background. + (void)do_gossip_to_unreachable_member(message).handle_exception([] (auto ep) { logger.trace("Faill to do_gossip_to_unreachable_member: {}", ep); }); @@ -682,7 +689,8 @@ void gossiper::run() { logger.trace("gossiped_to_seed={}, _live_endpoints.size={}, _seeds.size={}", _gossiped_to_seed, _live_endpoints.size(), _seeds.size()); if (!_gossiped_to_seed || _live_endpoints.size() < _seeds.size()) { - do_gossip_to_seed(message).handle_exception([] (auto ep) { + // Do it in the background. + (void)do_gossip_to_seed(message).handle_exception([] (auto ep) { logger.trace("Faill to do_gossip_to_seed: {}", ep); }); } @@ -1294,7 +1302,8 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { local_state.mark_dead(); msg_addr id = get_msg_addr(addr); logger.trace("Sending a EchoMessage to {}", id); - ms().send_gossip_echo(id).then([this, addr] { + // Do it in the background. + (void)ms().send_gossip_echo(id).then([this, addr] { logger.trace("Got EchoMessage Reply"); set_last_processed_message_at(); return seastar::async([this, addr] { @@ -1656,7 +1665,8 @@ future<> gossiper::do_shadow_round() { gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); auto id = get_msg_addr(seed); logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); - ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + // Do it in the background. + (void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); }); } diff --git a/init.cc b/init.cc index 22d2bb79a0..52e3a3c022 100644 --- a/init.cc +++ b/init.cc @@ -165,8 +165,11 @@ void init_ms_fd_gossiper(sharded& gossiper throw bad_configuration_error(); } gossiper.local().set_seeds(seeds); - gossiper.invoke_on_all([cluster_name](gms::gossiper& g) { + // Do it in the background. + (void)gossiper.invoke_on_all([cluster_name](gms::gossiper& g) { g.set_cluster_name(cluster_name); + }).handle_exception([] (std::exception_ptr e) { + startlog.error("Unexpected exception while setting cluster name: {}", e); }); } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 049193e537..7421e1ea00 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -623,7 +623,8 @@ shared_ptr messaging_service::ge assert(res.second); it = res.first; uint32_t src_cpu_id = engine().cpu_id(); - _rpc->make_client(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id, + // No reply is received, nothing to wait for. + (void)_rpc->make_client(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id, query::result_memory_limiter::maximum_result_size).handle_exception([ms = shared_from_this(), remote_addr, verb] (std::exception_ptr ep) { mlogger.debug("Failed to send client id to {} for verb {}: {}", remote_addr, std::underlying_type_t(verb), ep); }); @@ -649,7 +650,7 @@ bool messaging_service::remove_rpc_client_one(clients_map& clients, msg_addr id, // This will make sure messaging_service::stop() blocks until // client->stop() is over. // - client->stop().finally([id, client, ms = shared_from_this()] { + (void)client->stop().finally([id, client, ms = shared_from_this()] { mlogger.debug("dropped connection to {}", id.addr); }).discard_result(); found = true; diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 798d3b7571..554813b830 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -299,7 +299,8 @@ flat_mutation_reader read_context::create_reader( } void read_context::destroy_reader(shard_id shard, future reader_fut) noexcept { - with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable { + // Future is waited on indirectly in `stop()` (via `_dismantling_gate`). + (void)with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable { return reader_fut.then_wrapped([this, shard] (future&& reader_fut) { auto& rm = _readers[shard]; @@ -331,10 +332,12 @@ future<> read_context::stop() { auto pr = promise<>(); auto fut = pr.get_future(); auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close(); - gate_fut.then([this] { + // Forwarded to `fut`. + (void)gate_fut.then([this] { for (shard_id shard = 0; shard != smp::count; ++shard) { if (_readers[shard].state == reader_state::saving) { - _db.invoke_on(shard, [schema = global_schema_ptr(_schema), rm = std::move(_readers[shard])] (database& db) mutable { + // Move to the background. + (void)_db.invoke_on(shard, [schema = global_schema_ptr(_schema), rm = std::move(_readers[shard])] (database& db) mutable { // We cannot use semaphore() here, as this can be already destroyed. auto& table = db.find_column_family(schema); table.read_concurrency_semaphore().unregister_inactive_read(std::move(*rm.handle)); diff --git a/repair/repair.cc b/repair/repair.cc index a0bbb64bf9..596640e98b 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -776,7 +776,8 @@ static future<> repair_cf_range(repair_info& ri, completion.enter(); auto leave = defer([&completion] { completion.leave(); }); - when_all(checksums.begin(), checksums.end()).then( + // Do it in the background. + (void)when_all(checksums.begin(), checksums.end()).then( [&ri, &cf, range, &neighbors, &success] (std::vector> checksums) { // If only some of the replicas of this range are alive, @@ -1412,7 +1413,8 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, repair_results.push_back(std::move(f)); } - when_all(repair_results.begin(), repair_results.end()).then([id, fail = std::move(fail)] (std::vector> results) mutable { + // Do it in the background. + (void)when_all(repair_results.begin(), repair_results.end()).then([id, fail = std::move(fail)] (std::vector> results) mutable { if (std::any_of(results.begin(), results.end(), [] (auto&& f) { return f.failed(); })) { rlogger.info("repair {} failed", id); } else { diff --git a/repair/row_level.cc b/repair/row_level.cc index 4013ae9867..4c4f05c1ad 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1995,7 +1995,8 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed repair_init_messaging_service_handler(repair_service& rs, distributed repair_init_messaging_service_handler(repair_service& rs, distributed schema_registry_entry::start_loading(async_schema_loader load auto sf = _schema_promise.get_shared_future(); _state = state::LOADING; slogger.trace("Loading {}", _version); - f.then_wrapped([self = shared_from_this(), this] (future&& f) { + // Move to background. + (void)f.then_wrapped([self = shared_from_this(), this] (future&& f) { _loader = {}; if (_state != state::LOADING) { slogger.trace("Loading of {} aborted", _version); @@ -223,7 +224,8 @@ future<> schema_registry_entry::maybe_sync(std::function()> syncer) { }); auto sf = _synced_promise.get_shared_future(); _sync_state = schema_registry_entry::sync_state::SYNCING; - f.then_wrapped([this, self = shared_from_this()] (auto&& f) { + // Move to background. + (void)f.then_wrapped([this, self = shared_from_this()] (auto&& f) { if (_sync_state != sync_state::SYNCING) { return; } diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 56991aad5c..85a3392348 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -96,7 +96,8 @@ void migration_manager::init_messaging_service() auto& ms = netw::get_local_messaging_service(); ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector m) { auto src = netw::messaging_service::get_source(cinfo); - do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { + // Start a new fiber. + (void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { return service::get_local_migration_manager().merge_schema_from(src, mutations); }).then_wrapped([src] (auto&& f) { if (f.failed()) { diff --git a/service/misc_services.cc b/service/misc_services.cc index 7047dc7e34..0a455d05c8 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -106,7 +106,8 @@ void cache_hitrate_calculator::recalculate_timer() { void cache_hitrate_calculator::run_on(size_t master, lowres_clock::duration d) { if (!_stopped) { - _me.invoke_on(master, [d] (cache_hitrate_calculator& local) { + // Do it in the background. + (void)_me.invoke_on(master, [d] (cache_hitrate_calculator& local) { local._timer.arm(d); }).handle_exception_type([] (seastar::no_sharded_instance_exception&) { /* ignore */ }); } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 84fe40504b..6165359b82 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -405,7 +405,8 @@ public: ++stats().throttled_base_writes; tracing::trace(trace, "Delaying user write due to view update backlog {}/{} by {}us", backlog.current, backlog.max, delay.count()); - sleep_abortable(delay).finally([self = shared_from_this(), on_resume = std::forward(on_resume)] { + // Waited on indirectly. + (void)sleep_abortable(delay).finally([self = shared_from_this(), on_resume = std::forward(on_resume)] { --self->stats().throttled_base_writes; on_resume(self.get()); }).handle_exception_type([] (const seastar::sleep_aborted& ignored) { }); @@ -1636,7 +1637,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo } } - f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) { + // Waited on indirectly. + (void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) { ++stats.writes_errors.get_ep_stat(coordinator); p->got_failure_response(response_id, coordinator, forward_size + 1, std::nullopt); try { @@ -2476,9 +2478,11 @@ protected: data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); - make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{}); + // Waited on indirectly. + (void)make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{}); - data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { + // Waited on indirectly. + (void)data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { try { f.get(); auto rr_opt = data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here @@ -2495,7 +2499,8 @@ protected: // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) - _proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then([this, result = std::move(result)] () mutable { + // Waited on indirectly. + (void)_proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then([this, result = std::move(result)] () mutable { _result_promise.set_value(std::move(result)); on_read_resolved(); }).handle_exception([this, exec] (std::exception_ptr eptr) { @@ -2560,11 +2565,13 @@ public: db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout); auto exec = shared_from_this(); - make_requests(digest_resolver, timeout).finally([exec]() { + // Waited on indirectly. + (void)make_requests(digest_resolver, timeout).finally([exec]() { // hold on to executor until all queries are complete }); - digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future>, bool> f) mutable { + // Waited on indirectly. + (void)digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future>, bool> f) mutable { bool background_repair_check = false; try { exec->got_cl(); @@ -2598,7 +2605,8 @@ public: exec->on_read_resolved(); } - digest_resolver->done().then([exec, digest_resolver, timeout, background_repair_check] () mutable { + // Waited on indirectly. + (void)digest_resolver->done().then([exec, digest_resolver, timeout, background_repair_check] () mutable { if (background_repair_check && !digest_resolver->digests_match()) { exec->_proxy->_stats.read_repair_repaired_background++; exec->_result_promise = promise>>(); @@ -2661,7 +2669,8 @@ public: return make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true); } }; - send_request(resolver->has_data()).finally([exec = shared_from_this()]{}); + // Waited on indirectly. + (void)send_request(resolver->has_data()).finally([exec = shared_from_this()]{}); } }); auto& sr = _schema->speculative_retry(); diff --git a/sstables/data_consume_context.hh b/sstables/data_consume_context.hh index a2bac14f9f..7ea99ec5a5 100644 --- a/sstables/data_consume_context.hh +++ b/sstables/data_consume_context.hh @@ -127,7 +127,8 @@ public: ~data_consume_context() { if (_ctx) { auto f = _ctx->close(); - f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)](auto) {}); + // Can't wait on the future in the destructor. + (void)f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)](auto) {}); } } diff --git a/sstables/random_access_reader.hh b/sstables/random_access_reader.hh index 7ab499c550..a26a37b186 100644 --- a/sstables/random_access_reader.hh +++ b/sstables/random_access_reader.hh @@ -46,7 +46,9 @@ public: void seek(uint64_t pos) { if (_in) { - seastar::with_gate(_close_gate, [in = std::move(_in)]() mutable { + // Future is waited on indirectly in `close()` (via `_close_gate`). + // FIXME: error handling + (void)seastar::with_gate(_close_gate, [in = std::move(_in)]() mutable { auto fut = in->close(); return fut.then([in = std::move(in)] {}); }); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 82b05b412e..20522458d5 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2847,13 +2847,15 @@ int sstable::compare_by_max_timestamp(const sstable& other) const { sstable::~sstable() { if (_index_file) { - _index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) { + // Registered as background job. + (void)_index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) { sstlog.warn("sstable close index_file failed: {}", ep); general_disk_error(); }); } if (_data_file) { - _data_file.close().handle_exception([save = _data_file, op = background_jobs().start()] (auto ep) { + // Registered as background job. + (void)_data_file.close().handle_exception([save = _data_file, op = background_jobs().start()] (auto ep) { sstlog.warn("sstable close data_file failed: {}", ep); general_disk_error(); }); @@ -2870,7 +2872,7 @@ sstable::~sstable() { // FIXME: // - Longer term fix is to hand off deletion of sstables to a manager that can // deal with sstable marked to be deleted after the corresponding object is destructed. - unlink().handle_exception( + (void)unlink().handle_exception( [op = background_jobs().start()] (std::exception_ptr eptr) { try { std::rethrow_exception(eptr); diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index b969705a78..3a7ddf751e 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -113,7 +113,7 @@ void stream_manager::remove_stream(UUID plan_id) { _initiated_streams.erase(plan_id); _receiving_streams.erase(plan_id); // FIXME: Do not ignore the future - remove_progress_on_all_shards(plan_id).handle_exception([plan_id] (auto ep) { + (void)remove_progress_on_all_shards(plan_id).handle_exception([plan_id] (auto ep) { sslog.info("stream_manager: Fail to remove progress for plan_id={}: {}", plan_id, ep); }); } diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index d1fe11ed2d..e1e4fa0b02 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -138,7 +138,8 @@ future do_send_mutations(lw_shared_ptr si, frozen_mut return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); - netw::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented, si->reason).then([si, fm_size] { + // Do it in the background. + (void)netw::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented, si->reason).then([si, fm_size] { sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); si->mutations_done.signal(); diff --git a/table.cc b/table.cc index 14833041e9..26440f5248 100644 --- a/table.cc +++ b/table.cc @@ -1196,7 +1196,7 @@ table::on_compaction_completion(const std::vector& new rebuild_statistics(); // This is done in the background, so we can consider this compaction completed. - seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] { + (void)seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] { return with_semaphore(_sstable_deletion_sem, 1, [this, sstables_to_remove = std::move(sstables_to_remove)] { return sstables::delete_atomically(sstables_to_remove).then_wrapped([this, sstables_to_remove] (future<> f) { std::exception_ptr eptr; diff --git a/thrift/handler.cc b/thrift/handler.cc index 786517645a..ee453e68c2 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -129,7 +129,7 @@ with_cob(thrift_fn::function&& cob, thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - futurize>::apply([func = std::forward(func)] { + (void)futurize>::apply([func = std::forward(func)] { return noexcept_movable::wrap(func()); }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (auto&& f) { try { @@ -147,7 +147,7 @@ with_cob(thrift_fn::function&& cob, thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - futurize::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) { + (void)futurize::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) { try { f.get(); cob(); @@ -162,7 +162,7 @@ template void with_exn_cob(thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - futurize::apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) { + (void)futurize::apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) { try { f.get(); } catch (...) { diff --git a/thrift/server.cc b/thrift/server.cc index ebeaef213a..bf2c2c2787 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -230,12 +230,14 @@ thrift_server::do_accepts(int which, bool keepalive) { if (_stop_gate.is_closed()) { return; } - with_gate(_stop_gate, [&, this] { + // Future is waited on indirectly in `stop()` (via `_stop_gate`). + (void)with_gate(_stop_gate, [&, this] { return _listeners[which].accept().then([this, which, keepalive] (accept_result ar) { auto&& [fd, addr] = ar; fd.set_nodelay(true); fd.set_keepalive(keepalive); - with_gate(_stop_gate, [&, this] { + // Future is waited on indirectly in `stop()` (via `_stop_gate`). + (void)with_gate(_stop_gate, [&, this] { return do_with(connection(*this, std::move(fd), addr), [this] (auto& conn) { return conn.process().then_wrapped([this, &conn] (future<> f) { conn.shutdown(); @@ -262,7 +264,8 @@ void thrift_server::maybe_retry_accept(int which, bool keepalive, std::exception }; auto retry_with_backoff = [&] { // FIXME: Consider using exponential backoff - sleep(1ms).then([retry = std::move(retry)] { retry(); }); + // Done in the background. + (void)sleep(1ms).then([retry = std::move(retry)] { retry(); }); }; try { std::rethrow_exception(std::move(ex)); diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index 459597b3a9..df8a066ac9 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -209,7 +209,8 @@ future<> trace_keyspace_helper::start() { } void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { - with_gate(_pending_writes, [this, records = std::move(records)] { + // Future is waited on indirectly in `stop()` (via `_pending_writes`). + (void)with_gate(_pending_writes, [this, records = std::move(records)] { auto num_records = records->size(); return this->flush_one_session_mutations(std::move(records)).finally([this, num_records] { _local_tracing.write_complete(num_records); }); }).handle_exception([this] (auto ep) { diff --git a/transport/server.cc b/transport/server.cc index 3c7eb7e9f0..0b995a1d08 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -243,7 +243,8 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) { auto conn = make_shared(*this, server_addr, std::move(fd), std::move(addr)); ++_connects; ++_connections; - conn->process().then_wrapped([this, conn] (future<> f) { + // Move connection to the background, monitor for lifetime and errors. + (void)conn->process().then_wrapped([this, conn] (future<> f) { --_connections; try { f.get(); diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index 5566fcbc9c..388af052d2 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -580,7 +580,9 @@ private: } // Reload all those which value needs to be reloaded. - with_gate(_timer_reads_gate, [this] { + // Future is waited on indirectly in `stop()` (via `_timer_reads_gate`). + // FIXME: error handling + (void)with_gate(_timer_reads_gate, [this] { auto to_reload = boost::copy_range>(_lru_list | boost::adaptors::filtered([this] (ts_value_lru_entry& lru_entry) { return lru_entry.timestamped_value().loaded() + _refresh < loading_cache_clock_type::now(); diff --git a/utils/loading_shared_values.hh b/utils/loading_shared_values.hh index c3147134b2..8b97192e61 100644 --- a/utils/loading_shared_values.hh +++ b/utils/loading_shared_values.hh @@ -234,7 +234,8 @@ public: _set.insert(*e); // get_shared_future() may throw, so make sure to call it before invoking the loader(key) f = e->loaded().get_shared_future(); - futurize_apply([&] { return loader(key); }).then_wrapped([e](future&& val_fut) mutable { + // Future indirectly forwarded to `e`. + (void)futurize_apply([&] { return loader(key); }).then_wrapped([e](future&& val_fut) mutable { if (val_fut.failed()) { e->loaded().set_exception(val_fut.get_exception()); } else {