diff --git a/replica/database.cc b/replica/database.cc index d4d5cc1505..e9e50b083d 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1726,39 +1726,32 @@ future database::apply_counter_update(schema_ptr s, const frozen_mutat })); } -static future<> maybe_handle_reorder(std::exception_ptr exp) { - try { - std::rethrow_exception(exp); - return make_exception_future(exp); - } catch (mutation_reordered_with_truncate_exception&) { - // This mutation raced with a truncate, so we can just drop it. - dblog.debug("replay_position reordering detected"); - return make_ready_future<>(); - } +static void throw_commitlog_add_error(schema_ptr s, const frozen_mutation& m) { + // it is tempting to do a full pretty print here, but the mutation is likely + // humungous if we got an error, so just tell us where and pk... + std::throw_with_nested(std::runtime_error(format("Could not write mutation {}:{} ({}) to commitlog" + , s->ks_name(), s->cf_name() + , m.key() + ))); } future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout) { + db::rp_handle h; if (cf.commitlog() != nullptr && cf.durable_writes()) { - return do_with(freeze(m), [this, &m, &cf, timeout] (frozen_mutation& fm) { + auto fm = freeze(m); + try { commitlog_entry_writer cew(m.schema(), fm, db::commitlog::force_sync::no); - return cf.commitlog()->add_entry(m.schema()->id(), cew, timeout); - }).then([this, &m, &cf, timeout] (db::rp_handle h) { - return apply_in_memory(m, cf, std::move(h), timeout).handle_exception(maybe_handle_reorder); - }); + h = co_await cf.commitlog()->add_entry(m.schema()->id(), cew, timeout); + } catch (...) { + throw_commitlog_add_error(cf.schema(), fm); + } } - return apply_in_memory(m, cf, {}, timeout); -} - -future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::UUID uuid, const frozen_mutation& m, db::timeout_clock::time_point timeout, - db::commitlog::force_sync sync) { - auto cl = cf.commitlog(); - if (cl != nullptr && cf.durable_writes()) { - commitlog_entry_writer cew(s, m, sync); - return cf.commitlog()->add_entry(uuid, cew, timeout).then([&m, this, s, timeout, cl](db::rp_handle h) { - return this->apply_in_memory(m, s, std::move(h), timeout).handle_exception(maybe_handle_reorder); - }); + try { + co_await apply_in_memory(m, cf, std::move(h), timeout); + } catch (mutation_reordered_with_truncate_exception&) { + // This mutation raced with a truncate, so we can just drop it. + dblog.debug("replay_position reordering detected"); } - return apply_in_memory(m, std::move(s), {}, timeout); } future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) { @@ -1768,8 +1761,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); if (!s->is_synced()) { - return make_exception_future<>(std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}", - s->ks_name(), s->cf_name(), s->version()))); + throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); } sync = sync || db::commitlog::force_sync(s->wait_for_sync_to_commitlog()); @@ -1777,16 +1769,30 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra // Signal to view building code that a write is in progress, // so it knows when new writes start being sent to a new view. auto op = cf.write_in_progress(); - if (cf.views().empty()) { - return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally([op = std::move(op)] { }); + + row_locker::lock_holder lock; + if (!cf.views().empty()) { + lock = co_await cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore()); + } + + // purposefully manually "inlined" apply_with_commitlog call here to reduce # coroutine + // frames. + db::rp_handle h; + auto cl = cf.commitlog(); + if (cl != nullptr && cf.durable_writes()) { + try { + commitlog_entry_writer cew(s, m, sync); + h = co_await cf.commitlog()->add_entry(uuid, cew, timeout); + } catch (...) { + throw_commitlog_add_error(s, m); + } + } + try { + co_await this->apply_in_memory(m, s, std::move(h), timeout); + } catch (mutation_reordered_with_truncate_exception&) { + // This mutation raced with a truncate, so we can just drop it. + dblog.debug("replay_position reordering detected"); } - future f = cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore()); - return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op), sync] (row_locker::lock_holder lock) mutable { - return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally( - // Hold the local lock on the base-table partition or row - // taken before the read, until the update is done. - [lock = std::move(lock), op = std::move(op)] { }); - }); } template diff --git a/replica/database.hh b/replica/database.hh index abc386d886..dcc1c0b712 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1364,7 +1364,6 @@ private: friend class db_apply_executor; future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync); - future<> apply_with_commitlog(schema_ptr, column_family&, utils::UUID, const frozen_mutation&, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync); future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout); future do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,