From fddd9a88ddcb652dbfc3b6e4694ddfdfaa0537cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 5 Aug 2019 17:07:54 +0300 Subject: [PATCH] treewide: silence discarded future warnings for legit discards This patch silences those future discard warnings where it is clear that discarding the future was actually the intent of the original author, *and* they did the necessary precautions (handling errors). The patch also adds some trivial error handling (logging the error) in some places, which were lacking this, but otherwise look ok. No functional changes. --- auth/service.cc | 10 ++++++++-- database.cc | 10 +++++++--- db/batchlog_manager.cc | 3 ++- db/commitlog/commitlog.cc | 6 +++--- db/hints/manager.cc | 9 ++++++--- db/large_data_handler.hh | 4 +++- db/view/view.cc | 15 ++++++++++----- distributed_loader.cc | 2 +- gms/gossiper.cc | 32 ++++++++++++++++++++----------- init.cc | 5 ++++- message/messaging_service.cc | 5 +++-- multishard_mutation_query.cc | 9 ++++++--- repair/repair.cc | 6 ++++-- repair/row_level.cc | 9 ++++++--- schema_registry.cc | 6 ++++-- service/migration_manager.cc | 3 ++- service/misc_services.cc | 3 ++- service/storage_proxy.cc | 27 +++++++++++++++++--------- sstables/data_consume_context.hh | 3 ++- sstables/random_access_reader.hh | 4 +++- sstables/sstables.cc | 8 +++++--- streaming/stream_manager.cc | 2 +- streaming/stream_transfer_task.cc | 3 ++- table.cc | 2 +- thrift/handler.cc | 6 +++--- thrift/server.cc | 9 ++++++--- tracing/trace_keyspace_helper.cc | 3 ++- transport/server.cc | 3 ++- utils/loading_cache.hh | 4 +++- utils/loading_shared_values.hh | 3 ++- 30 files changed, 142 insertions(+), 72 deletions(-) diff --git a/auth/service.cc b/auth/service.cc index 1e87d2eee0..fdd199dafc 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -77,17 +77,23 @@ private: void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {} void on_drop_keyspace(const sstring& ks_name) override { - _authorizer.revoke_all( + // Do it in the background. + (void)_authorizer.revoke_all( auth::make_data_resource(ks_name)).handle_exception_type([](const unsupported_authorization_operation&) { // Nothing. + }).handle_exception([] (std::exception_ptr e) { + log.error("Unexpected exception while revoking all permissions on dropped keyspace: {}", e); }); } void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override { - _authorizer.revoke_all( + // Do it in the background. + (void)_authorizer.revoke_all( auth::make_data_resource( ks_name, cf_name)).handle_exception_type([](const unsupported_authorization_operation&) { // Nothing. + }).handle_exception([] (std::exception_ptr e) { + log.error("Unexpected exception while revoking all permissions on dropped table: {}", e); }); } diff --git a/database.cc b/database.cc index 699fc83196..e4e21e11b2 100644 --- a/database.cc +++ b/database.cc @@ -621,7 +621,8 @@ database::init_commitlog() { _commitlog->discard_completed_segments(id); return; } - _column_families[id]->flush(); + // Initiate a background flush. Waited upon in `stop()`. + (void)_column_families[id]->flush(); }).release(); // we have longer life time than CL. Ignore reg anchor }); } @@ -1377,7 +1378,7 @@ future<> dirty_memory_manager::flush_when_needed() { // Do not wait. The semaphore will protect us against a concurrent flush. But we // want to start a new one as soon as the permits are destroyed and the semaphore is // made ready again, not when we are done with the current one. - this->flush_one(*(candidate_memtable.get_memtable_list()), std::move(permit)); + (void)this->flush_one(*(candidate_memtable.get_memtable_list()), std::move(permit)); return make_ready_future<>(); }); }); @@ -1948,10 +1949,13 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); } virtual void destroy_reader(shard_id shard, future reader_fut) noexcept override { - reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable { + // Move to the background. + (void)reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable { return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { ctx.semaphore->unregister_inactive_read(std::move(*handle)); }); + }).handle_exception([shard] (std::exception_ptr e) { + dblog.warn("Failed to destroy shard reader of streaming multishard reader on shard {}: {}", shard, e); }); } virtual reader_concurrency_semaphore& semaphore() override { diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 6c0bc7b73e..f9d92b36b4 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -118,7 +118,8 @@ future<> db::batchlog_manager::start() { // round-robin scheduling. if (engine().cpu_id() == 0) { _timer.set_callback([this] { - do_batch_log_replay().handle_exception([] (auto ep) { + // Do it in the background. + (void)do_batch_log_replay().handle_exception([] (auto ep) { blogger.error("Exception in batch replay: {}", ep); }).finally([this] { _timer.arm(lowres_clock::now() + std::chrono::milliseconds(replay_interval)); diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index f8bd894cc4..a55dbd5e99 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1447,7 +1447,7 @@ void db::commitlog::segment_manager::discard_unused_segments() { // segments on deletion queue could be non-empty, and we don't want // those accidentally left around for replay. if (!_shutdown) { - with_gate(_gate, [this] { + (void)with_gate(_gate, [this] { return do_pending_deletes(); }); } @@ -1598,7 +1598,7 @@ future<> db::commitlog::segment_manager::clear() { */ void db::commitlog::segment_manager::sync() { for (auto s : _segments) { - s->sync(); // we do not care about waiting... + (void)s->sync(); // we do not care about waiting... } } @@ -1606,7 +1606,7 @@ void db::commitlog::segment_manager::on_timer() { // Gate, because we are starting potentially blocking ops // without waiting for them, so segement_manager could be shut down // while they are running. - seastar::with_gate(_gate, [this] { + (void)seastar::with_gate(_gate, [this] { if (cfg.mode != sync_mode::BATCH) { sync(); } diff --git a/db/hints/manager.cc b/db/hints/manager.cc index bfcb764eb0..c2ea35eafe 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -158,7 +158,8 @@ void manager::forbid_hints_for_eps_with_pending_hints() { bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept { try { - with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable { + // Future is waited on indirectly in `stop()` (via `_store_gate`). + (void)with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable { ++_hints_in_progress; size_t mut_size = fm->representation().size(); shard_stats().size_of_hints_in_progress += mut_size; @@ -534,7 +535,8 @@ void manager::drain_for(gms::inet_address endpoint) { manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint); - with_gate(_draining_eps_gate, [this, endpoint] { + // Future is waited on indirectly in `stop()` (via `_draining_eps_gate`). + (void)with_gate(_draining_eps_gate, [this, endpoint] { return futurize_apply([this, endpoint] () { if (utils::fb_utilities::is_me(endpoint)) { return parallel_for_each(_ep_managers, [] (auto& pair) { @@ -672,7 +674,8 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) { return _resource_manager.get_send_units_for(buf.size_bytes()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable { - with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { + // Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`). + (void)with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { try { try { ctx_ptr->rps_set.emplace(rp); diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 7b4aad3544..e4448ed729 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -55,7 +55,9 @@ private: template future<> with_sem(Func&& func) { return get_units(_sem, 1).then([func = std::forward(func)] (auto units) mutable { - func().finally([units = std::move(units)] {}); + // Future is discarded purposefully, see method description. + // FIXME: error handling. + (void)func().finally([units = std::move(units)] {}); }); } diff --git a/db/view/view.cc b/db/view/view.cc index 698f1cd6d6..6a62c682ed 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1150,7 +1150,8 @@ future<> view_builder::start() { calculate_shard_build_step(std::move(built), std::move(in_progress)).get(); _mm.register_listener(this); _current_step = _base_to_build_step.begin(); - _build_step.trigger(); + // Waited on indirectly in stop(). + (void)_build_step.trigger(); }); return make_ready_future<>(); } @@ -1427,7 +1428,8 @@ static future<> flush_base(lw_shared_ptr base, abort_source& as) } void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) { - with_semaphore(_sem, 1, [ks_name, view_name, this] { + // Do it in the background, serialized. + (void)with_semaphore(_sem, 1, [ks_name, view_name, this] { auto view = view_ptr(_db.find_schema(ks_name, view_name)); auto& step = get_or_create_build_step(view->view_info()->base_id()); return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] { @@ -1442,14 +1444,16 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na if (f.failed()) { vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception()); } - _build_step.trigger(); + // Waited on indirectly in stop(). + (void)_build_step.trigger(); }); }); }).handle_exception_type([] (no_such_column_family&) { }); } void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) { - with_semaphore(_sem, 1, [ks_name, view_name, this] { + // Do it in the background, serialized. + (void)with_semaphore(_sem, 1, [ks_name, view_name, this] { auto view = view_ptr(_db.find_schema(ks_name, view_name)); auto step_it = _base_to_build_step.find(view->view_info()->base_id()); if (step_it == _base_to_build_step.end()) { @@ -1466,7 +1470,8 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) { vlogger.info0("Stopping to build view {}.{}", ks_name, view_name); - with_semaphore(_sem, 1, [ks_name, view_name, this] { + // Do it in the background, serialized. + (void)with_semaphore(_sem, 1, [ks_name, view_name, this] { // The view is absent from the database at this point, so find it by brute force. ([&, this] { for (auto& [_, step] : _base_to_build_step) { diff --git a/distributed_loader.cc b/distributed_loader.cc index 43741f4c6c..3ba6225884 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -459,7 +459,7 @@ void distributed_loader::reshard(distributed& db, sstring ks_name, sst } }).then([&cf, sstables] { // schedule deletion of shared sstables after we're certain that new unshared ones were successfully forwarded to respective shards. - sstables::delete_atomically(std::move(sstables)).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) { + (void)sstables::delete_atomically(std::move(sstables)).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) { try { std::rethrow_exception(eptr); } catch (...) { diff --git a/gms/gossiper.cc b/gms/gossiper.cc index b5fc804264..abc7f1a764 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -339,7 +339,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { _ms_registered = true; ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) { auto from = netw::messaging_service::get_source(cinfo); - smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable { + // In a new fiber. + (void)smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable { auto& gossiper = gms::get_local_gossiper(); return gossiper.handle_syn_msg(from, std::move(syn_msg)); }).handle_exception([] (auto ep) { @@ -349,7 +350,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { }); ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) { auto from = netw::messaging_service::get_source(cinfo); - smp::submit_to(0, [from, msg = std::move(msg)] () mutable { + // In a new fiber. + (void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable { auto& gossiper = gms::get_local_gossiper(); return gossiper.handle_ack_msg(from, std::move(msg)); }).handle_exception([] (auto ep) { @@ -358,7 +360,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { return messaging_service::no_wait(); }); ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) { - smp::submit_to(0, [msg = std::move(msg)] () mutable { + // In a new fiber. + (void)smp::submit_to(0, [msg = std::move(msg)] () mutable { return gms::get_local_gossiper().handle_ack2_msg(std::move(msg)); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep); @@ -371,7 +374,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { }); }); ms().register_gossip_shutdown([] (inet_address from) { - smp::submit_to(0, [from] { + // In a new fiber. + (void)smp::submit_to(0, [from] { return gms::get_local_gossiper().handle_shutdown_msg(from); }).handle_exception([] (auto ep) { logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep); @@ -525,7 +529,7 @@ void gossiper::remove_endpoint(inet_address endpoint) { // do subscribers first so anything in the subscriber that depends on gossiper state won't get confused // We can not run on_remove callbacks here becasue on_remove in // storage_service might take the gossiper::timer_callback_lock - seastar::async([this, endpoint] { + (void)seastar::async([this, endpoint] { _subscribers.for_each([endpoint] (auto& subscriber) { subscriber->on_remove(endpoint); }); @@ -606,7 +610,8 @@ future gossiper::lock_endpoint(inet_address ep) { // - failure_detector // - on_remove callbacks, e.g, storage_service -> access token_metadata void gossiper::run() { - timer_callback_lock().then([this, g = this->shared_from_this()] { + // Run it in the background. + (void)timer_callback_lock().then([this, g = this->shared_from_this()] { return seastar::async([this, g] { logger.trace("=== Gossip round START"); @@ -653,13 +658,15 @@ void gossiper::run() { } logger.debug("Talk to {} live nodes: {}", nr_live_nodes, live_nodes); for (auto& ep: live_nodes) { - do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) { + // Do it in the background. + (void)do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) { logger.trace("Failed to do_gossip_to_live_member: {}", ep); }); } /* Gossip to some unreachable member with some probability to check if he is back up */ - do_gossip_to_unreachable_member(message).handle_exception([] (auto ep) { + // Do it in the background. + (void)do_gossip_to_unreachable_member(message).handle_exception([] (auto ep) { logger.trace("Faill to do_gossip_to_unreachable_member: {}", ep); }); @@ -682,7 +689,8 @@ void gossiper::run() { logger.trace("gossiped_to_seed={}, _live_endpoints.size={}, _seeds.size={}", _gossiped_to_seed, _live_endpoints.size(), _seeds.size()); if (!_gossiped_to_seed || _live_endpoints.size() < _seeds.size()) { - do_gossip_to_seed(message).handle_exception([] (auto ep) { + // Do it in the background. + (void)do_gossip_to_seed(message).handle_exception([] (auto ep) { logger.trace("Faill to do_gossip_to_seed: {}", ep); }); } @@ -1294,7 +1302,8 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { local_state.mark_dead(); msg_addr id = get_msg_addr(addr); logger.trace("Sending a EchoMessage to {}", id); - ms().send_gossip_echo(id).then([this, addr] { + // Do it in the background. + (void)ms().send_gossip_echo(id).then([this, addr] { logger.trace("Got EchoMessage Reply"); set_last_processed_message_at(); return seastar::async([this, addr] { @@ -1656,7 +1665,8 @@ future<> gossiper::do_shadow_round() { gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); auto id = get_msg_addr(seed); logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); - ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + // Do it in the background. + (void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); }); } diff --git a/init.cc b/init.cc index 22d2bb79a0..52e3a3c022 100644 --- a/init.cc +++ b/init.cc @@ -165,8 +165,11 @@ void init_ms_fd_gossiper(sharded& gossiper throw bad_configuration_error(); } gossiper.local().set_seeds(seeds); - gossiper.invoke_on_all([cluster_name](gms::gossiper& g) { + // Do it in the background. + (void)gossiper.invoke_on_all([cluster_name](gms::gossiper& g) { g.set_cluster_name(cluster_name); + }).handle_exception([] (std::exception_ptr e) { + startlog.error("Unexpected exception while setting cluster name: {}", e); }); } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 049193e537..7421e1ea00 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -623,7 +623,8 @@ shared_ptr messaging_service::ge assert(res.second); it = res.first; uint32_t src_cpu_id = engine().cpu_id(); - _rpc->make_client(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id, + // No reply is received, nothing to wait for. + (void)_rpc->make_client(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id, query::result_memory_limiter::maximum_result_size).handle_exception([ms = shared_from_this(), remote_addr, verb] (std::exception_ptr ep) { mlogger.debug("Failed to send client id to {} for verb {}: {}", remote_addr, std::underlying_type_t(verb), ep); }); @@ -649,7 +650,7 @@ bool messaging_service::remove_rpc_client_one(clients_map& clients, msg_addr id, // This will make sure messaging_service::stop() blocks until // client->stop() is over. // - client->stop().finally([id, client, ms = shared_from_this()] { + (void)client->stop().finally([id, client, ms = shared_from_this()] { mlogger.debug("dropped connection to {}", id.addr); }).discard_result(); found = true; diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 798d3b7571..554813b830 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -299,7 +299,8 @@ flat_mutation_reader read_context::create_reader( } void read_context::destroy_reader(shard_id shard, future reader_fut) noexcept { - with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable { + // Future is waited on indirectly in `stop()` (via `_dismantling_gate`). + (void)with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable { return reader_fut.then_wrapped([this, shard] (future&& reader_fut) { auto& rm = _readers[shard]; @@ -331,10 +332,12 @@ future<> read_context::stop() { auto pr = promise<>(); auto fut = pr.get_future(); auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close(); - gate_fut.then([this] { + // Forwarded to `fut`. + (void)gate_fut.then([this] { for (shard_id shard = 0; shard != smp::count; ++shard) { if (_readers[shard].state == reader_state::saving) { - _db.invoke_on(shard, [schema = global_schema_ptr(_schema), rm = std::move(_readers[shard])] (database& db) mutable { + // Move to the background. + (void)_db.invoke_on(shard, [schema = global_schema_ptr(_schema), rm = std::move(_readers[shard])] (database& db) mutable { // We cannot use semaphore() here, as this can be already destroyed. auto& table = db.find_column_family(schema); table.read_concurrency_semaphore().unregister_inactive_read(std::move(*rm.handle)); diff --git a/repair/repair.cc b/repair/repair.cc index a0bbb64bf9..596640e98b 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -776,7 +776,8 @@ static future<> repair_cf_range(repair_info& ri, completion.enter(); auto leave = defer([&completion] { completion.leave(); }); - when_all(checksums.begin(), checksums.end()).then( + // Do it in the background. + (void)when_all(checksums.begin(), checksums.end()).then( [&ri, &cf, range, &neighbors, &success] (std::vector> checksums) { // If only some of the replicas of this range are alive, @@ -1412,7 +1413,8 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, repair_results.push_back(std::move(f)); } - when_all(repair_results.begin(), repair_results.end()).then([id, fail = std::move(fail)] (std::vector> results) mutable { + // Do it in the background. + (void)when_all(repair_results.begin(), repair_results.end()).then([id, fail = std::move(fail)] (std::vector> results) mutable { if (std::any_of(results.begin(), results.end(), [] (auto&& f) { return f.failed(); })) { rlogger.info("repair {} failed", id); } else { diff --git a/repair/row_level.cc b/repair/row_level.cc index 4013ae9867..4c4f05c1ad 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1995,7 +1995,8 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed repair_init_messaging_service_handler(repair_service& rs, distributed repair_init_messaging_service_handler(repair_service& rs, distributed schema_registry_entry::start_loading(async_schema_loader load auto sf = _schema_promise.get_shared_future(); _state = state::LOADING; slogger.trace("Loading {}", _version); - f.then_wrapped([self = shared_from_this(), this] (future&& f) { + // Move to background. + (void)f.then_wrapped([self = shared_from_this(), this] (future&& f) { _loader = {}; if (_state != state::LOADING) { slogger.trace("Loading of {} aborted", _version); @@ -223,7 +224,8 @@ future<> schema_registry_entry::maybe_sync(std::function()> syncer) { }); auto sf = _synced_promise.get_shared_future(); _sync_state = schema_registry_entry::sync_state::SYNCING; - f.then_wrapped([this, self = shared_from_this()] (auto&& f) { + // Move to background. + (void)f.then_wrapped([this, self = shared_from_this()] (auto&& f) { if (_sync_state != sync_state::SYNCING) { return; } diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 56991aad5c..85a3392348 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -96,7 +96,8 @@ void migration_manager::init_messaging_service() auto& ms = netw::get_local_messaging_service(); ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector m) { auto src = netw::messaging_service::get_source(cinfo); - do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { + // Start a new fiber. + (void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { return service::get_local_migration_manager().merge_schema_from(src, mutations); }).then_wrapped([src] (auto&& f) { if (f.failed()) { diff --git a/service/misc_services.cc b/service/misc_services.cc index 7047dc7e34..0a455d05c8 100644 --- a/service/misc_services.cc +++ b/service/misc_services.cc @@ -106,7 +106,8 @@ void cache_hitrate_calculator::recalculate_timer() { void cache_hitrate_calculator::run_on(size_t master, lowres_clock::duration d) { if (!_stopped) { - _me.invoke_on(master, [d] (cache_hitrate_calculator& local) { + // Do it in the background. + (void)_me.invoke_on(master, [d] (cache_hitrate_calculator& local) { local._timer.arm(d); }).handle_exception_type([] (seastar::no_sharded_instance_exception&) { /* ignore */ }); } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 84fe40504b..6165359b82 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -405,7 +405,8 @@ public: ++stats().throttled_base_writes; tracing::trace(trace, "Delaying user write due to view update backlog {}/{} by {}us", backlog.current, backlog.max, delay.count()); - sleep_abortable(delay).finally([self = shared_from_this(), on_resume = std::forward(on_resume)] { + // Waited on indirectly. + (void)sleep_abortable(delay).finally([self = shared_from_this(), on_resume = std::forward(on_resume)] { --self->stats().throttled_base_writes; on_resume(self.get()); }).handle_exception_type([] (const seastar::sleep_aborted& ignored) { }); @@ -1636,7 +1637,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo } } - f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) { + // Waited on indirectly. + (void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) { ++stats.writes_errors.get_ep_stat(coordinator); p->got_failure_response(response_id, coordinator, forward_size + 1, std::nullopt); try { @@ -2476,9 +2478,11 @@ protected: data_resolver_ptr data_resolver = ::make_shared(_schema, cl, _targets.size(), timeout); auto exec = shared_from_this(); - make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{}); + // Waited on indirectly. + (void)make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{}); - data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { + // Waited on indirectly. + (void)data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { try { f.get(); auto rr_opt = data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here @@ -2495,7 +2499,8 @@ protected: // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) - _proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then([this, result = std::move(result)] () mutable { + // Waited on indirectly. + (void)_proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then([this, result = std::move(result)] () mutable { _result_promise.set_value(std::move(result)); on_read_resolved(); }).handle_exception([this, exec] (std::exception_ptr eptr) { @@ -2560,11 +2565,13 @@ public: db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout); auto exec = shared_from_this(); - make_requests(digest_resolver, timeout).finally([exec]() { + // Waited on indirectly. + (void)make_requests(digest_resolver, timeout).finally([exec]() { // hold on to executor until all queries are complete }); - digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future>, bool> f) mutable { + // Waited on indirectly. + (void)digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future>, bool> f) mutable { bool background_repair_check = false; try { exec->got_cl(); @@ -2598,7 +2605,8 @@ public: exec->on_read_resolved(); } - digest_resolver->done().then([exec, digest_resolver, timeout, background_repair_check] () mutable { + // Waited on indirectly. + (void)digest_resolver->done().then([exec, digest_resolver, timeout, background_repair_check] () mutable { if (background_repair_check && !digest_resolver->digests_match()) { exec->_proxy->_stats.read_repair_repaired_background++; exec->_result_promise = promise>>(); @@ -2661,7 +2669,8 @@ public: return make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true); } }; - send_request(resolver->has_data()).finally([exec = shared_from_this()]{}); + // Waited on indirectly. + (void)send_request(resolver->has_data()).finally([exec = shared_from_this()]{}); } }); auto& sr = _schema->speculative_retry(); diff --git a/sstables/data_consume_context.hh b/sstables/data_consume_context.hh index a2bac14f9f..7ea99ec5a5 100644 --- a/sstables/data_consume_context.hh +++ b/sstables/data_consume_context.hh @@ -127,7 +127,8 @@ public: ~data_consume_context() { if (_ctx) { auto f = _ctx->close(); - f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)](auto) {}); + // Can't wait on the future in the destructor. + (void)f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)](auto) {}); } } diff --git a/sstables/random_access_reader.hh b/sstables/random_access_reader.hh index 7ab499c550..a26a37b186 100644 --- a/sstables/random_access_reader.hh +++ b/sstables/random_access_reader.hh @@ -46,7 +46,9 @@ public: void seek(uint64_t pos) { if (_in) { - seastar::with_gate(_close_gate, [in = std::move(_in)]() mutable { + // Future is waited on indirectly in `close()` (via `_close_gate`). + // FIXME: error handling + (void)seastar::with_gate(_close_gate, [in = std::move(_in)]() mutable { auto fut = in->close(); return fut.then([in = std::move(in)] {}); }); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 82b05b412e..20522458d5 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2847,13 +2847,15 @@ int sstable::compare_by_max_timestamp(const sstable& other) const { sstable::~sstable() { if (_index_file) { - _index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) { + // Registered as background job. + (void)_index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) { sstlog.warn("sstable close index_file failed: {}", ep); general_disk_error(); }); } if (_data_file) { - _data_file.close().handle_exception([save = _data_file, op = background_jobs().start()] (auto ep) { + // Registered as background job. + (void)_data_file.close().handle_exception([save = _data_file, op = background_jobs().start()] (auto ep) { sstlog.warn("sstable close data_file failed: {}", ep); general_disk_error(); }); @@ -2870,7 +2872,7 @@ sstable::~sstable() { // FIXME: // - Longer term fix is to hand off deletion of sstables to a manager that can // deal with sstable marked to be deleted after the corresponding object is destructed. - unlink().handle_exception( + (void)unlink().handle_exception( [op = background_jobs().start()] (std::exception_ptr eptr) { try { std::rethrow_exception(eptr); diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index b969705a78..3a7ddf751e 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -113,7 +113,7 @@ void stream_manager::remove_stream(UUID plan_id) { _initiated_streams.erase(plan_id); _receiving_streams.erase(plan_id); // FIXME: Do not ignore the future - remove_progress_on_all_shards(plan_id).handle_exception([plan_id] (auto ep) { + (void)remove_progress_on_all_shards(plan_id).handle_exception([plan_id] (auto ep) { sslog.info("stream_manager: Fail to remove progress for plan_id={}: {}", plan_id, ep); }); } diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index d1fe11ed2d..e1e4fa0b02 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -138,7 +138,8 @@ future do_send_mutations(lw_shared_ptr si, frozen_mut return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); - netw::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented, si->reason).then([si, fm_size] { + // Do it in the background. + (void)netw::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented, si->reason).then([si, fm_size] { sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); si->mutations_done.signal(); diff --git a/table.cc b/table.cc index 14833041e9..26440f5248 100644 --- a/table.cc +++ b/table.cc @@ -1196,7 +1196,7 @@ table::on_compaction_completion(const std::vector& new rebuild_statistics(); // This is done in the background, so we can consider this compaction completed. - seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] { + (void)seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] { return with_semaphore(_sstable_deletion_sem, 1, [this, sstables_to_remove = std::move(sstables_to_remove)] { return sstables::delete_atomically(sstables_to_remove).then_wrapped([this, sstables_to_remove] (future<> f) { std::exception_ptr eptr; diff --git a/thrift/handler.cc b/thrift/handler.cc index 786517645a..ee453e68c2 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -129,7 +129,7 @@ with_cob(thrift_fn::function&& cob, thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - futurize>::apply([func = std::forward(func)] { + (void)futurize>::apply([func = std::forward(func)] { return noexcept_movable::wrap(func()); }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (auto&& f) { try { @@ -147,7 +147,7 @@ with_cob(thrift_fn::function&& cob, thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - futurize::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) { + (void)futurize::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) { try { f.get(); cob(); @@ -162,7 +162,7 @@ template void with_exn_cob(thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - futurize::apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) { + (void)futurize::apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) { try { f.get(); } catch (...) { diff --git a/thrift/server.cc b/thrift/server.cc index ebeaef213a..bf2c2c2787 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -230,12 +230,14 @@ thrift_server::do_accepts(int which, bool keepalive) { if (_stop_gate.is_closed()) { return; } - with_gate(_stop_gate, [&, this] { + // Future is waited on indirectly in `stop()` (via `_stop_gate`). + (void)with_gate(_stop_gate, [&, this] { return _listeners[which].accept().then([this, which, keepalive] (accept_result ar) { auto&& [fd, addr] = ar; fd.set_nodelay(true); fd.set_keepalive(keepalive); - with_gate(_stop_gate, [&, this] { + // Future is waited on indirectly in `stop()` (via `_stop_gate`). + (void)with_gate(_stop_gate, [&, this] { return do_with(connection(*this, std::move(fd), addr), [this] (auto& conn) { return conn.process().then_wrapped([this, &conn] (future<> f) { conn.shutdown(); @@ -262,7 +264,8 @@ void thrift_server::maybe_retry_accept(int which, bool keepalive, std::exception }; auto retry_with_backoff = [&] { // FIXME: Consider using exponential backoff - sleep(1ms).then([retry = std::move(retry)] { retry(); }); + // Done in the background. + (void)sleep(1ms).then([retry = std::move(retry)] { retry(); }); }; try { std::rethrow_exception(std::move(ex)); diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index 459597b3a9..df8a066ac9 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -209,7 +209,8 @@ future<> trace_keyspace_helper::start() { } void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { - with_gate(_pending_writes, [this, records = std::move(records)] { + // Future is waited on indirectly in `stop()` (via `_pending_writes`). + (void)with_gate(_pending_writes, [this, records = std::move(records)] { auto num_records = records->size(); return this->flush_one_session_mutations(std::move(records)).finally([this, num_records] { _local_tracing.write_complete(num_records); }); }).handle_exception([this] (auto ep) { diff --git a/transport/server.cc b/transport/server.cc index 3c7eb7e9f0..0b995a1d08 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -243,7 +243,8 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) { auto conn = make_shared(*this, server_addr, std::move(fd), std::move(addr)); ++_connects; ++_connections; - conn->process().then_wrapped([this, conn] (future<> f) { + // Move connection to the background, monitor for lifetime and errors. + (void)conn->process().then_wrapped([this, conn] (future<> f) { --_connections; try { f.get(); diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index 5566fcbc9c..388af052d2 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -580,7 +580,9 @@ private: } // Reload all those which value needs to be reloaded. - with_gate(_timer_reads_gate, [this] { + // Future is waited on indirectly in `stop()` (via `_timer_reads_gate`). + // FIXME: error handling + (void)with_gate(_timer_reads_gate, [this] { auto to_reload = boost::copy_range>(_lru_list | boost::adaptors::filtered([this] (ts_value_lru_entry& lru_entry) { return lru_entry.timestamped_value().loaded() + _refresh < loading_cache_clock_type::now(); diff --git a/utils/loading_shared_values.hh b/utils/loading_shared_values.hh index c3147134b2..8b97192e61 100644 --- a/utils/loading_shared_values.hh +++ b/utils/loading_shared_values.hh @@ -234,7 +234,8 @@ public: _set.insert(*e); // get_shared_future() may throw, so make sure to call it before invoking the loader(key) f = e->loaded().get_shared_future(); - futurize_apply([&] { return loader(key); }).then_wrapped([e](future&& val_fut) mutable { + // Future indirectly forwarded to `e`. + (void)futurize_apply([&] { return loader(key); }).then_wrapped([e](future&& val_fut) mutable { if (val_fut.failed()) { e->loaded().set_exception(val_fut.get_exception()); } else {