From 63ea666ca00d6cc9be2b1e3109d714d38a1dcbaf Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 29 Sep 2021 14:28:28 +0000 Subject: [PATCH 1/2] database: coroutinize do_apply and apply_with_commitlog Somewhat controversial. Making the apply with CL decision path coroutinized, mainly to be able to in next patch make error handling more informative (because we will have exceptions that are immediate and/or futurized). This is as stated somewhat problematic, it adds an allocation to perf_simple_query::write path (because of crap clang cr frame folding?). However, tasks/op remain constant and actual tps (though unstable) remain more or less the same (on my crappy measurements). Counter path is unaffected, as coroutine frame alloc replaces with(...) alloc, and all is same and dandy. I am hoping that the simpler error + verbose code will compensate for the extra alloc. --- replica/database.cc | 67 +++++++++++++++++++-------------------------- replica/database.hh | 1 - 2 files changed, 28 insertions(+), 40 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index d4d5cc1505..c413a02bbb 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1726,41 +1726,21 @@ future database::apply_counter_update(schema_ptr s, const frozen_mutat })); } -static future<> maybe_handle_reorder(std::exception_ptr exp) { +future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout) { + db::rp_handle h; + if (cf.commitlog() != nullptr && cf.durable_writes()) { + auto fm = freeze(m); + commitlog_entry_writer cew(m.schema(), fm, db::commitlog::force_sync::no); + h = co_await cf.commitlog()->add_entry(m.schema()->id(), cew, timeout); + } try { - std::rethrow_exception(exp); - return make_exception_future(exp); + co_await apply_in_memory(m, cf, std::move(h), timeout); } catch (mutation_reordered_with_truncate_exception&) { // This mutation raced with a truncate, so we can just drop it. dblog.debug("replay_position reordering detected"); - return make_ready_future<>(); } } -future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout) { - if (cf.commitlog() != nullptr && cf.durable_writes()) { - return do_with(freeze(m), [this, &m, &cf, timeout] (frozen_mutation& fm) { - commitlog_entry_writer cew(m.schema(), fm, db::commitlog::force_sync::no); - return cf.commitlog()->add_entry(m.schema()->id(), cew, timeout); - }).then([this, &m, &cf, timeout] (db::rp_handle h) { - return apply_in_memory(m, cf, std::move(h), timeout).handle_exception(maybe_handle_reorder); - }); - } - return apply_in_memory(m, cf, {}, timeout); -} - -future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::UUID uuid, const frozen_mutation& m, db::timeout_clock::time_point timeout, - db::commitlog::force_sync sync) { - auto cl = cf.commitlog(); - if (cl != nullptr && cf.durable_writes()) { - commitlog_entry_writer cew(s, m, sync); - return cf.commitlog()->add_entry(uuid, cew, timeout).then([&m, this, s, timeout, cl](db::rp_handle h) { - return this->apply_in_memory(m, s, std::move(h), timeout).handle_exception(maybe_handle_reorder); - }); - } - return apply_in_memory(m, std::move(s), {}, timeout); -} - future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) { // I'm doing a nullcheck here since the init code path for db etc // is a little in flux and commitlog is created only when db is @@ -1768,8 +1748,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); if (!s->is_synced()) { - return make_exception_future<>(std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}", - s->ks_name(), s->cf_name(), s->version()))); + throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); } sync = sync || db::commitlog::force_sync(s->wait_for_sync_to_commitlog()); @@ -1777,16 +1756,26 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra // Signal to view building code that a write is in progress, // so it knows when new writes start being sent to a new view. auto op = cf.write_in_progress(); - if (cf.views().empty()) { - return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally([op = std::move(op)] { }); + + row_locker::lock_holder lock; + if (!cf.views().empty()) { + lock = co_await cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore()); + } + + // purposefully manually "inlined" apply_with_commitlog call here to reduce # coroutine + // frames. + db::rp_handle h; + auto cl = cf.commitlog(); + if (cl != nullptr && cf.durable_writes()) { + commitlog_entry_writer cew(s, m, sync); + h = co_await cf.commitlog()->add_entry(uuid, cew, timeout); + } + try { + co_await this->apply_in_memory(m, s, std::move(h), timeout); + } catch (mutation_reordered_with_truncate_exception&) { + // This mutation raced with a truncate, so we can just drop it. + dblog.debug("replay_position reordering detected"); } - future f = cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore()); - return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op), sync] (row_locker::lock_holder lock) mutable { - return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally( - // Hold the local lock on the base-table partition or row - // taken before the read, until the update is done. - [lock = std::move(lock), op = std::move(op)] { }); - }); } template diff --git a/replica/database.hh b/replica/database.hh index 0b0b35c919..8962716180 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1356,7 +1356,6 @@ private: friend class db_apply_executor; future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync); - future<> apply_with_commitlog(schema_ptr, column_family&, utils::UUID, const frozen_mutation&, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync); future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout); future do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout, From a6202ae079b44da44f402f50001bd2df1cb49673 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 29 Sep 2021 14:47:38 +0000 Subject: [PATCH 2/2] database: Add error message with mutation info on commit log apply failure Fixes #9408 While it is rare, some customer issues have shown that we can run into cases where commit log apply (writing mutations to it) fails badly. In the known cases, due to oversized mutations. While these should have been caught earlier in the call chain really, it would probably help both end users and us (trying to figure out how they got so big and how they got so far) iff we added info to the errors thrown (and printed), such as ks, cf, and mutation content. --- replica/database.cc | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index c413a02bbb..e9e50b083d 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1726,12 +1726,25 @@ future database::apply_counter_update(schema_ptr s, const frozen_mutat })); } +static void throw_commitlog_add_error(schema_ptr s, const frozen_mutation& m) { + // it is tempting to do a full pretty print here, but the mutation is likely + // humungous if we got an error, so just tell us where and pk... + std::throw_with_nested(std::runtime_error(format("Could not write mutation {}:{} ({}) to commitlog" + , s->ks_name(), s->cf_name() + , m.key() + ))); +} + future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout) { db::rp_handle h; if (cf.commitlog() != nullptr && cf.durable_writes()) { auto fm = freeze(m); - commitlog_entry_writer cew(m.schema(), fm, db::commitlog::force_sync::no); - h = co_await cf.commitlog()->add_entry(m.schema()->id(), cew, timeout); + try { + commitlog_entry_writer cew(m.schema(), fm, db::commitlog::force_sync::no); + h = co_await cf.commitlog()->add_entry(m.schema()->id(), cew, timeout); + } catch (...) { + throw_commitlog_add_error(cf.schema(), fm); + } } try { co_await apply_in_memory(m, cf, std::move(h), timeout); @@ -1767,8 +1780,12 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra db::rp_handle h; auto cl = cf.commitlog(); if (cl != nullptr && cf.durable_writes()) { - commitlog_entry_writer cew(s, m, sync); - h = co_await cf.commitlog()->add_entry(uuid, cew, timeout); + try { + commitlog_entry_writer cew(s, m, sync); + h = co_await cf.commitlog()->add_entry(uuid, cew, timeout); + } catch (...) { + throw_commitlog_add_error(s, m); + } } try { co_await this->apply_in_memory(m, s, std::move(h), timeout);