mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-08 16:03:20 +00:00
Merge 'database: Add error message with mutation info on commit log apply failure' from Calle Wilund
Fixes #9408 While it is rare, some customer issues have shown that we can run into cases where commit log apply (writing mutations to it) fails badly. In the known cases, due to oversized mutations. While these should have been caught earlier in the call chain really, it would probably help both end users and us (trying to figure out how they got so big and how they got so far) iff we added info to the errors thrown (and printed), such as ks, cf, and mutation content. Somewhat controversial, this makes the apply with CL decision path coroutinized, mainly to be able to do the error handling for the more informative wrapper exception easier/less ugly. Could perhaps do with futurize_invoke + then_wrapper also. But future is coroutines... This is as stated somewhat problematic, it adds an allocation to perf_simple_query::write path (because of crap clang cr frame folding?). However, tasks/op remain constant and actual tps (though unstable) remain more or less the same (on my crappy measurements). Counter path is unaffected, as coroutine frame alloc replaces with(...) dtest for the wrapped exception on separate pr. Closes #9412 * github.com:scylladb/scylla: database: Add error message with mutation info on commit log apply failure database: coroutinize do_apply and apply_with_commitlog
This commit is contained in:
@@ -1726,39 +1726,32 @@ future<mutation> database::apply_counter_update(schema_ptr s, const frozen_mutat
|
||||
}));
|
||||
}
|
||||
|
||||
static future<> maybe_handle_reorder(std::exception_ptr exp) {
|
||||
try {
|
||||
std::rethrow_exception(exp);
|
||||
return make_exception_future(exp);
|
||||
} catch (mutation_reordered_with_truncate_exception&) {
|
||||
// This mutation raced with a truncate, so we can just drop it.
|
||||
dblog.debug("replay_position reordering detected");
|
||||
return make_ready_future<>();
|
||||
}
|
||||
static void throw_commitlog_add_error(schema_ptr s, const frozen_mutation& m) {
|
||||
// it is tempting to do a full pretty print here, but the mutation is likely
|
||||
// humungous if we got an error, so just tell us where and pk...
|
||||
std::throw_with_nested(std::runtime_error(format("Could not write mutation {}:{} ({}) to commitlog"
|
||||
, s->ks_name(), s->cf_name()
|
||||
, m.key()
|
||||
)));
|
||||
}
|
||||
|
||||
future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout) {
|
||||
db::rp_handle h;
|
||||
if (cf.commitlog() != nullptr && cf.durable_writes()) {
|
||||
return do_with(freeze(m), [this, &m, &cf, timeout] (frozen_mutation& fm) {
|
||||
auto fm = freeze(m);
|
||||
try {
|
||||
commitlog_entry_writer cew(m.schema(), fm, db::commitlog::force_sync::no);
|
||||
return cf.commitlog()->add_entry(m.schema()->id(), cew, timeout);
|
||||
}).then([this, &m, &cf, timeout] (db::rp_handle h) {
|
||||
return apply_in_memory(m, cf, std::move(h), timeout).handle_exception(maybe_handle_reorder);
|
||||
});
|
||||
h = co_await cf.commitlog()->add_entry(m.schema()->id(), cew, timeout);
|
||||
} catch (...) {
|
||||
throw_commitlog_add_error(cf.schema(), fm);
|
||||
}
|
||||
}
|
||||
return apply_in_memory(m, cf, {}, timeout);
|
||||
}
|
||||
|
||||
future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::UUID uuid, const frozen_mutation& m, db::timeout_clock::time_point timeout,
|
||||
db::commitlog::force_sync sync) {
|
||||
auto cl = cf.commitlog();
|
||||
if (cl != nullptr && cf.durable_writes()) {
|
||||
commitlog_entry_writer cew(s, m, sync);
|
||||
return cf.commitlog()->add_entry(uuid, cew, timeout).then([&m, this, s, timeout, cl](db::rp_handle h) {
|
||||
return this->apply_in_memory(m, s, std::move(h), timeout).handle_exception(maybe_handle_reorder);
|
||||
});
|
||||
try {
|
||||
co_await apply_in_memory(m, cf, std::move(h), timeout);
|
||||
} catch (mutation_reordered_with_truncate_exception&) {
|
||||
// This mutation raced with a truncate, so we can just drop it.
|
||||
dblog.debug("replay_position reordering detected");
|
||||
}
|
||||
return apply_in_memory(m, std::move(s), {}, timeout);
|
||||
}
|
||||
|
||||
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
|
||||
@@ -1768,8 +1761,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra
|
||||
auto uuid = m.column_family_id();
|
||||
auto& cf = find_column_family(uuid);
|
||||
if (!s->is_synced()) {
|
||||
return make_exception_future<>(std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}",
|
||||
s->ks_name(), s->cf_name(), s->version())));
|
||||
throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
|
||||
sync = sync || db::commitlog::force_sync(s->wait_for_sync_to_commitlog());
|
||||
@@ -1777,16 +1769,30 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra
|
||||
// Signal to view building code that a write is in progress,
|
||||
// so it knows when new writes start being sent to a new view.
|
||||
auto op = cf.write_in_progress();
|
||||
if (cf.views().empty()) {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally([op = std::move(op)] { });
|
||||
|
||||
row_locker::lock_holder lock;
|
||||
if (!cf.views().empty()) {
|
||||
lock = co_await cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore());
|
||||
}
|
||||
|
||||
// purposefully manually "inlined" apply_with_commitlog call here to reduce # coroutine
|
||||
// frames.
|
||||
db::rp_handle h;
|
||||
auto cl = cf.commitlog();
|
||||
if (cl != nullptr && cf.durable_writes()) {
|
||||
try {
|
||||
commitlog_entry_writer cew(s, m, sync);
|
||||
h = co_await cf.commitlog()->add_entry(uuid, cew, timeout);
|
||||
} catch (...) {
|
||||
throw_commitlog_add_error(s, m);
|
||||
}
|
||||
}
|
||||
try {
|
||||
co_await this->apply_in_memory(m, s, std::move(h), timeout);
|
||||
} catch (mutation_reordered_with_truncate_exception&) {
|
||||
// This mutation raced with a truncate, so we can just drop it.
|
||||
dblog.debug("replay_position reordering detected");
|
||||
}
|
||||
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore());
|
||||
return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op), sync] (row_locker::lock_holder lock) mutable {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout, sync).finally(
|
||||
// Hold the local lock on the base-table partition or row
|
||||
// taken before the read, until the update is done.
|
||||
[lock = std::move(lock), op = std::move(op)] { });
|
||||
});
|
||||
}
|
||||
|
||||
template<typename Future>
|
||||
|
||||
@@ -1364,7 +1364,6 @@ private:
|
||||
|
||||
friend class db_apply_executor;
|
||||
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync);
|
||||
future<> apply_with_commitlog(schema_ptr, column_family&, utils::UUID, const frozen_mutation&, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync);
|
||||
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
|
||||
|
||||
future<mutation> do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,
|
||||
|
||||
Reference in New Issue
Block a user