mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-28 18:50:53 +00:00
Merge 'db/hints: Move the code for writing hints to a separate function' from Dawid Mędrek
In scylladb/scylladb@7301a96, in the function `hint_endpoint_manager::store_hint()`, we transformed the lambda passed to `seastar::with_gate()` to a coroutine lambda to improve the readability. However, there was a subtle problem related to lifetimes of the captures that needed to be addressed: * Since we started `co_await`ing in the lambda, the captures were at risk of being destructed too soon. The usual solution is to wrap a coroutine lambda within a `seastar::coroutine::lambda` object and rely on the extended lifetime enforced by the semantics of the language. See `docs/dev/lambda-coroutine-fiasco.md` for more context. * However, since we don't immediately `co_await` the future returned by `with_gate()`, we cannot rely on the extended lifetime provided by the wrapper. The document linked in the previous bullet point suggests keeping the passed coroutine lambda as a variable and pass it as a reference to `with_gate()`. However, that's not feasible either because we discard the returned future and the function returns almost instantly -- destructing every local object, which would encompass the lambda too. The solution used in the commit was to move captures of the lambda into the lambda's body. That helped because Seastar's backend is responsible for keeping all of the local variables alive until the lambda finishes its execution. However, we didn't move all of the captures into the lambda -- the missing one was the `this` pointer that was implicitly used in the lambda. Address sanitiser hasn't reported any bugs related to the pointer yet, but the bug is most likely there. In this commit, we transform the lambda's body into a new member function and only call it from the lambda. This way, we don't need to care about the lifetimes of the captures because Seastar ensures that the function's arguments stay alive until the coroutine finishes. Choosing this solution instead of assigning `this` to a pointer variable inside the lambda's body and using it to refer to the object's members has actual benefit: it's not possible to accidentally forget to refer to a member of the object via the pointer; it also makes the code less awkward. Fixes scylladb/scylladb#20306 Closes scylladb/scylladb#20258 * github.com:scylladb/scylladb: db/hints: Fix indentation in `do_store_hint()` db/hints: Move code for writing hints to separate function
This commit is contained in:
@@ -45,54 +45,51 @@ constexpr std::chrono::seconds HINT_FILE_WRITE_TIMEOUT = std::chrono::seconds(2)
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
future<> hint_endpoint_manager::do_store_hint(schema_ptr s, lw_shared_ptr<const frozen_mutation> fm, tracing::trace_state_ptr tr_state) {
|
||||
++_hints_in_progress;
|
||||
size_t mut_size = fm->representation().size();
|
||||
shard_stats().size_of_hints_in_progress += mut_size;
|
||||
|
||||
if (utils::get_local_injector().enter("slow_down_writing_hints")) {
|
||||
co_await seastar::sleep(std::chrono::seconds(10));
|
||||
}
|
||||
|
||||
try {
|
||||
const auto shared_lock = co_await get_shared_lock(file_update_mutex());
|
||||
|
||||
hints_store_ptr log_ptr = co_await get_or_load();
|
||||
commitlog_entry_writer cew(s, *fm, commitlog::force_sync::no);
|
||||
|
||||
rp_handle rh = co_await log_ptr->add_entry(s->id(), cew, db::timeout_clock::now() + HINT_FILE_WRITE_TIMEOUT);
|
||||
|
||||
const replay_position rp = rh.release();
|
||||
if (_last_written_rp < rp) {
|
||||
_last_written_rp = rp;
|
||||
manager_logger.debug("[{}] Updated last written replay position to {}", end_point_key(), rp);
|
||||
}
|
||||
|
||||
++shard_stats().written;
|
||||
|
||||
manager_logger.trace("Hint to {} was stored", end_point_key());
|
||||
tracing::trace(tr_state, "Hint to {} was stored", end_point_key());
|
||||
} catch (...) {
|
||||
++shard_stats().errors;
|
||||
const auto eptr = std::current_exception();
|
||||
|
||||
manager_logger.debug("store_hint(): got the exception when storing a hint to {}: {}", end_point_key(), eptr);
|
||||
tracing::trace(tr_state, "Failed to store a hint to {}: {}", end_point_key(), eptr);
|
||||
}
|
||||
|
||||
--_hints_in_progress;
|
||||
shard_stats().size_of_hints_in_progress -= mut_size;
|
||||
}
|
||||
|
||||
bool hint_endpoint_manager::store_hint(schema_ptr s, lw_shared_ptr<const frozen_mutation> fm, tracing::trace_state_ptr tr_state) noexcept {
|
||||
try {
|
||||
// Future is waited on indirectly in `stop()` (via `_store_gate`).
|
||||
(void) with_gate(_store_gate,
|
||||
[this, s_ = std::move(s), fm_ = std::move(fm), tr_state_ = tr_state] () mutable -> future<> {
|
||||
// We discard the future to this call to `with_gate()`, so wrapping this lambda
|
||||
// within `coroutine::lambda` will NOT work. That's why we need to copy these
|
||||
// variables by hand.
|
||||
auto s = std::move(s_);
|
||||
auto fm = std::move(fm_);
|
||||
auto tr_state = std::move(tr_state_);
|
||||
|
||||
++_hints_in_progress;
|
||||
size_t mut_size = fm->representation().size();
|
||||
shard_stats().size_of_hints_in_progress += mut_size;
|
||||
|
||||
if (utils::get_local_injector().enter("slow_down_writing_hints")) {
|
||||
co_await seastar::sleep(std::chrono::seconds(10));
|
||||
}
|
||||
|
||||
try {
|
||||
const auto shared_lock = co_await get_shared_lock(file_update_mutex());
|
||||
|
||||
hints_store_ptr log_ptr = co_await get_or_load();
|
||||
commitlog_entry_writer cew(s, *fm, commitlog::force_sync::no);
|
||||
|
||||
rp_handle rh = co_await log_ptr->add_entry(s->id(), cew, db::timeout_clock::now() + HINT_FILE_WRITE_TIMEOUT);
|
||||
|
||||
const replay_position rp = rh.release();
|
||||
if (_last_written_rp < rp) {
|
||||
_last_written_rp = rp;
|
||||
manager_logger.debug("[{}] Updated last written replay position to {}", end_point_key(), rp);
|
||||
}
|
||||
|
||||
++shard_stats().written;
|
||||
|
||||
manager_logger.trace("Hint to {} was stored", end_point_key());
|
||||
tracing::trace(tr_state, "Hint to {} was stored", end_point_key());
|
||||
} catch (...) {
|
||||
++shard_stats().errors;
|
||||
const auto eptr = std::current_exception();
|
||||
|
||||
manager_logger.debug("store_hint(): got the exception when storing a hint to {}: {}", end_point_key(), eptr);
|
||||
tracing::trace(tr_state, "Failed to store a hint to {}: {}", end_point_key(), eptr);
|
||||
}
|
||||
|
||||
--_hints_in_progress;
|
||||
shard_stats().size_of_hints_in_progress -= mut_size;
|
||||
[this, s = std::move(s), fm = std::move(fm), tr_state = tr_state] () mutable -> future<> {
|
||||
return do_store_hint(std::move(s), std::move(fm), tr_state);
|
||||
});
|
||||
} catch (...) {
|
||||
manager_logger.trace("Failed to store a hint to {}: {}", end_point_key(), std::current_exception());
|
||||
|
||||
@@ -175,6 +175,8 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
future<> do_store_hint(schema_ptr s, lw_shared_ptr<const frozen_mutation> fm, tracing::trace_state_ptr tr_state);
|
||||
|
||||
seastar::shared_mutex& file_update_mutex() noexcept {
|
||||
return _file_update_mutex;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user