As requested in #22104, moved the files and fixed other includes and build system. Moved files: - combine.hh - collection_mutation.hh - collection_mutation.cc - converting_mutation_partition_applier.hh - converting_mutation_partition_applier.cc - counters.hh - counters.cc - timestamp.hh Fixes: #22104 This is a cleanup, no need to backport Closes scylladb/scylladb#25085
632 lines
28 KiB
C++
632 lines
28 KiB
C++
/*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2023-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "db/hints/internal/hint_sender.hh"
|
|
|
|
// Seastar features.
|
|
#include <chrono>
|
|
#include <exception>
|
|
#include <seastar/core/abort_source.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/file.hh>
|
|
#include <seastar/core/file-types.hh>
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/core/format.hh>
|
|
#include <seastar/core/seastar.hh>
|
|
|
|
// Scylla includes.
|
|
#include "db/hints/internal/common.hh"
|
|
#include "db/hints/internal/hint_logger.hh"
|
|
#include "db/hints/internal/hint_endpoint_manager.hh"
|
|
#include "db/hints/manager.hh"
|
|
#include "db/hints/resource_manager.hh"
|
|
#include "gms/gossiper.hh"
|
|
#include "gms/inet_address.hh"
|
|
#include "replica/database.hh"
|
|
#include "schema/schema_fwd.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "utils/div_ceil.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "mutation/converting_mutation_partition_applier.hh"
|
|
#include "gc_clock.hh"
|
|
|
|
// STD.
|
|
#include <ranges>
|
|
#include <span>
|
|
#include <stdexcept>
|
|
#include <string_view>
|
|
|
|
namespace db::hints {
|
|
namespace internal {
|
|
|
|
class no_column_mapping : public std::out_of_range {
|
|
public:
|
|
no_column_mapping(const table_schema_version& id) : std::out_of_range(format("column mapping for CF schema_version {} is missing", id)) {}
|
|
};
|
|
|
|
future<> hint_sender::flush_maybe() noexcept {
|
|
auto current_time = clock::now();
|
|
if (current_time >= _next_flush_tp) {
|
|
return _ep_manager.flush_current_hints().then([this, current_time] {
|
|
_next_flush_tp = current_time + manager::hints_flush_period;
|
|
}).handle_exception([this] (auto eptr) {
|
|
manager_logger.debug("hint_sender[{}]:flush_maybe: Failed with {}", _ep_key, eptr);
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<timespec> hint_sender::get_last_file_modification(const sstring& fname) {
|
|
return open_file_dma(fname, open_flags::ro).then([] (file f) {
|
|
return do_with(std::move(f), [] (file& f) {
|
|
return f.stat();
|
|
});
|
|
}).then([] (struct stat st) {
|
|
return make_ready_future<timespec>(st.st_mtim);
|
|
});
|
|
}
|
|
|
|
bool hint_sender::can_send() noexcept {
|
|
if (stopping() && !draining()) {
|
|
return false;
|
|
}
|
|
|
|
const auto tmptr = _shard_manager._proxy.get_token_metadata_ptr();
|
|
|
|
if (_gossiper.is_alive(_ep_key)) {
|
|
_state.remove(state::ep_state_left_the_ring);
|
|
return true;
|
|
} else {
|
|
if (!_state.contains(state::ep_state_left_the_ring)) {
|
|
_state.set_if<state::ep_state_left_the_ring>(!tmptr->is_normal_token_owner(_ep_key));
|
|
}
|
|
// If the node is not part of the ring, we will send hints to all new replicas.
|
|
return _state.contains(state::ep_state_left_the_ring);
|
|
}
|
|
}
|
|
|
|
frozen_mutation_and_schema hint_sender::get_mutation(lw_shared_ptr<send_one_file_ctx> ctx_ptr, fragmented_temporary_buffer& buf) {
|
|
hint_entry_reader hr(buf);
|
|
auto& fm = hr.mutation();
|
|
auto& cm = get_column_mapping(std::move(ctx_ptr), fm, hr);
|
|
auto schema = _db.find_schema(fm.column_family_id());
|
|
|
|
if (schema->version() != fm.schema_version()) {
|
|
mutation m(schema, fm.decorated_key(*schema));
|
|
converting_mutation_partition_applier v(cm, *schema, m.partition());
|
|
fm.partition().accept(cm, v);
|
|
return {freeze(m), std::move(schema)};
|
|
}
|
|
return {std::move(hr).mutation(), std::move(schema)};
|
|
}
|
|
|
|
const column_mapping& hint_sender::get_column_mapping(lw_shared_ptr<send_one_file_ctx> ctx_ptr, const frozen_mutation& fm, const hint_entry_reader& hr) {
|
|
auto cm_it = ctx_ptr->schema_ver_to_column_mapping.find(fm.schema_version());
|
|
if (cm_it == ctx_ptr->schema_ver_to_column_mapping.end()) {
|
|
if (!hr.get_column_mapping()) {
|
|
throw no_column_mapping(fm.schema_version());
|
|
}
|
|
|
|
manager_logger.trace("hint_sender[{}]:get_column_mapping: new schema version {}", _ep_key, fm.schema_version());
|
|
cm_it = ctx_ptr->schema_ver_to_column_mapping.emplace(fm.schema_version(), *hr.get_column_mapping()).first;
|
|
}
|
|
|
|
return cm_it->second;
|
|
}
|
|
|
|
hint_sender::hint_sender(hint_endpoint_manager& parent, service::storage_proxy& local_storage_proxy,replica::database& local_db, const gms::gossiper& local_gossiper) noexcept
|
|
: _stopped(make_ready_future<>())
|
|
, _ep_key(parent.end_point_key())
|
|
, _ep_manager(parent)
|
|
, _shard_manager(_ep_manager._shard_manager)
|
|
, _resource_manager(_shard_manager._resource_manager)
|
|
, _proxy(local_storage_proxy)
|
|
, _db(local_db)
|
|
, _hints_cpu_sched_group(_db.get_streaming_scheduling_group())
|
|
, _gossiper(local_gossiper)
|
|
, _file_update_mutex(_ep_manager.file_update_mutex())
|
|
{}
|
|
|
|
hint_sender::hint_sender(const hint_sender& other, hint_endpoint_manager& parent) noexcept
|
|
: _stopped(make_ready_future<>())
|
|
, _ep_key(parent.end_point_key())
|
|
, _ep_manager(parent)
|
|
, _shard_manager(_ep_manager._shard_manager)
|
|
, _resource_manager(_shard_manager._resource_manager)
|
|
, _proxy(other._proxy)
|
|
, _db(other._db)
|
|
, _hints_cpu_sched_group(other._hints_cpu_sched_group)
|
|
, _gossiper(other._gossiper)
|
|
, _file_update_mutex(_ep_manager.file_update_mutex())
|
|
{}
|
|
|
|
hint_sender::~hint_sender() {
|
|
dismiss_replay_waiters();
|
|
}
|
|
|
|
|
|
future<> hint_sender::stop(drain should_drain) noexcept {
|
|
return seastar::async([this, should_drain] {
|
|
set_stopping();
|
|
_stop_as.request_abort();
|
|
_stopped.get();
|
|
|
|
if (should_drain == drain::yes) {
|
|
// "Draining" is performed by a sequence of following calls:
|
|
// set_draining() -> send_hints_maybe() -> flush_current_hints() -> send_hints_maybe()
|
|
//
|
|
// Before hint_sender::stop() is called the storing path for this end point is blocked and no new hints
|
|
// will be generated when this method is running.
|
|
//
|
|
// send_hints_maybe() in a "draining" mode is going to send all hints from segments in the
|
|
// _segments_to_replay.
|
|
//
|
|
// Therefore after the first call for send_hints_maybe() the _segments_to_replay is going to become empty
|
|
// and the following flush_current_hints() is going to store all in-memory hints to the disk and re-populate
|
|
// the _segments_to_replay.
|
|
//
|
|
// The next call for send_hints_maybe() will send the last hints to the current end point and when it is
|
|
// done there is going to be no more pending hints and the corresponding hints directory may be removed.
|
|
manager_logger.info("hint_sender[{}]:stop: Draining starts", end_point_key());
|
|
set_draining();
|
|
send_hints_maybe();
|
|
_ep_manager.flush_current_hints().handle_exception([this] (auto e) {
|
|
manager_logger.error("hint_sender[{}]:stop: Failed to flush pending hints: {}. Ignoring", _ep_key, e);
|
|
}).get();
|
|
send_hints_maybe();
|
|
manager_logger.info("hint_sender[{}]:stop: Draining finished", end_point_key());
|
|
}
|
|
|
|
manager_logger.debug("hint_sender[{}]:stop: Finished", end_point_key());
|
|
});
|
|
}
|
|
|
|
void hint_sender::cancel_draining() {
|
|
manager_logger.info("hint_sender[{}]:cancel_draining: Marking as canceled", _ep_key);
|
|
if (_state.contains(state::draining)) {
|
|
_state.remove(state::draining);
|
|
}
|
|
_state.set(state::canceled_draining);
|
|
}
|
|
|
|
void hint_sender::add_segment(sstring seg_name) {
|
|
_segments_to_replay.emplace_back(std::move(seg_name));
|
|
}
|
|
|
|
void hint_sender::add_foreign_segment(sstring seg_name) {
|
|
_foreign_segments_to_replay.emplace_back(std::move(seg_name));
|
|
}
|
|
|
|
hint_sender::clock::duration hint_sender::next_sleep_duration() const {
|
|
clock::time_point current_time = clock::now();
|
|
clock::time_point next_flush_tp = std::max(_next_flush_tp, current_time);
|
|
clock::time_point next_retry_tp = std::max(_next_send_retry_tp, current_time);
|
|
|
|
clock::duration d = std::min(next_flush_tp, next_retry_tp) - current_time;
|
|
|
|
// Don't sleep for less than 10 ticks of the "clock" if we are planning to sleep at all - the sleep() function is not perfect.
|
|
return clock::duration(10 * div_ceil(d.count(), 10));
|
|
}
|
|
|
|
void hint_sender::start() {
|
|
seastar::thread_attributes attr;
|
|
|
|
attr.sched_group = _hints_cpu_sched_group;
|
|
_stopped = seastar::async(std::move(attr), [this] {
|
|
manager_logger.debug("hint_sender[{}]:start: Starting", end_point_key());
|
|
|
|
while (!stopping()) {
|
|
try {
|
|
flush_maybe().get();
|
|
send_hints_maybe();
|
|
|
|
// If we got here means that either there are no more hints to send or we failed to send hints we have.
|
|
// In both cases it makes sense to wait a little before continuing.
|
|
sleep_abortable(next_sleep_duration(), _stop_as).get();
|
|
} catch (seastar::sleep_aborted&) {
|
|
break;
|
|
} catch (...) {
|
|
// log and keep on spinning
|
|
manager_logger.debug("hint_sender[{}]:start: Exception in the loop: {}", _ep_key, std::current_exception());
|
|
}
|
|
}
|
|
|
|
manager_logger.debug("hint_sender[{}]:start: Exited the loop", _ep_key);
|
|
});
|
|
}
|
|
|
|
future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) {
|
|
auto ermp = _db.find_column_family(m.s).get_effective_replication_map();
|
|
auto token = dht::get_token(*m.s, m.fm.key());
|
|
host_id_vector_replica_set natural_endpoints = ermp->get_natural_replicas(token);
|
|
host_id_vector_topology_change pending_endpoints = ermp->get_pending_replicas(token);
|
|
|
|
return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints, &pending_endpoints] () mutable -> future<> {
|
|
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
|
|
// to be generated as a result of hints sending.
|
|
const auto& tm = ermp->get_token_metadata();
|
|
const auto dst = end_point_key();
|
|
|
|
if (std::ranges::contains(natural_endpoints, dst) && !tm.is_leaving(dst)) {
|
|
manager_logger.trace("hint_sender[{}]:send_one_mutation: Sending directly", dst);
|
|
// dst is not duplicated in pending_endpoints because it's in natural_endpoints
|
|
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), dst, std::move(pending_endpoints));
|
|
} else {
|
|
if (manager_logger.is_enabled(log_level::trace)) {
|
|
if (tm.is_leaving(end_point_key())) {
|
|
manager_logger.trace("hint_sender[{}]:send_one_mutation: Original target is leaving. Mutating from scratch", dst);
|
|
} else {
|
|
manager_logger.trace("hint_sender[{}]:send_one_mutation: Endpoint set has changed and original target is no longer a replica. Mutating from scratch", dst);
|
|
}
|
|
}
|
|
return _proxy.send_hint_to_all_replicas(std::move(m));
|
|
}
|
|
});
|
|
}
|
|
|
|
future<> hint_sender::send_one_hint(lw_shared_ptr<send_one_file_ctx> ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) {
|
|
return _resource_manager.get_send_units_for(buf.size_bytes()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable {
|
|
ctx_ptr->mark_hint_as_in_progress(rp);
|
|
|
|
// Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`).
|
|
auto h = ctx_ptr->file_send_gate.hold();
|
|
(void)std::invoke([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable {
|
|
try {
|
|
auto m = this->get_mutation(ctx_ptr, buf);
|
|
gc_clock::duration gc_grace_sec = m.s->gc_grace_seconds();
|
|
|
|
// The hint is too old - drop it.
|
|
//
|
|
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
|
// (last_modification - manager::hints_timer_period) old.
|
|
if (const auto now = gc_clock::now().time_since_epoch(); now - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
|
manager_logger.trace("hint_sender[{}]:send_hints: Hint is too old, skipping it, "
|
|
"secs since file last modification {}, gc_grace_sec {}, hints_flush_period {}",
|
|
_ep_key, now - secs_since_file_mod, gc_grace_sec, manager::hints_flush_period);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
const auto mutation_size = m.fm.representation().size();
|
|
return this->send_one_mutation(std::move(m)).then([this, ctx_ptr, mutation_size] {
|
|
++this->shard_stats().sent_total;
|
|
this->shard_stats().sent_hints_bytes_total += mutation_size;
|
|
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
|
manager_logger.trace("hint_sender[{}]:send_one_hint: Failed to send: {}", end_point_key(), eptr);
|
|
++this->shard_stats().send_errors;
|
|
return make_exception_future<>(std::move(eptr));
|
|
});
|
|
|
|
// ignore these errors and move on - probably this hint is too old and the KS/CF has been deleted...
|
|
} catch (replica::no_such_column_family& e) {
|
|
manager_logger.debug("hint_sender[{}]:send_one_hint: no_such_column_family: {}", _ep_key, e.what());
|
|
++this->shard_stats().discarded;
|
|
} catch (replica::no_such_keyspace& e) {
|
|
manager_logger.debug("hint_sender[{}]:send_one_hint: no_such_keyspace: {}", _ep_key, e.what());
|
|
++this->shard_stats().discarded;
|
|
} catch (no_column_mapping& e) {
|
|
manager_logger.debug("hint_sender[{}]:send_one_hint: no_column_mapping: {} at {}: {}", _ep_key, fname, rp, e.what());
|
|
++this->shard_stats().discarded;
|
|
} catch (...) {
|
|
auto eptr = std::current_exception();
|
|
manager_logger.debug("hint_sender[{}]:send_one_hint: Unexpected error in file {} at {}: {}", _ep_key, fname, rp, eptr);
|
|
++this->shard_stats().send_errors;
|
|
return make_exception_future<>(std::move(eptr));
|
|
}
|
|
return make_ready_future<>();
|
|
}).then_wrapped([this, units = std::move(units), rp, ctx_ptr, h = std::move(h)] (future<>&& f) {
|
|
// Information about the error was already printed somewhere higher.
|
|
// We just need to account in the ctx that sending of this hint has failed.
|
|
if (!f.failed()) {
|
|
ctx_ptr->on_hint_send_success(rp);
|
|
auto new_bound = ctx_ptr->get_replayed_bound();
|
|
// Segments from other shards are replayed first and are considered to be "before" replay position 0.
|
|
// Update the sent upper bound only if it is a local segment.
|
|
if (new_bound.shard_id() == this_shard_id() && _sent_upper_bound_rp < new_bound) {
|
|
_sent_upper_bound_rp = new_bound;
|
|
notify_replay_waiters();
|
|
}
|
|
} else {
|
|
ctx_ptr->on_hint_send_failure(rp);
|
|
}
|
|
f.ignore_ready_future();
|
|
});
|
|
}).handle_exception([this, ctx_ptr, rp] (auto eptr) {
|
|
manager_logger.trace("hint_sender[{}]:send_one_hint: Exception occurred: {}", _ep_key, eptr);
|
|
ctx_ptr->on_hint_send_failure(rp);
|
|
});
|
|
}
|
|
|
|
void hint_sender::notify_replay_waiters() noexcept {
|
|
if (!_foreign_segments_to_replay.empty()) {
|
|
manager_logger.trace("hint_sender[{}]:notify_replay_waiters: Not notifying because there are still {} foreign segments to replay",
|
|
end_point_key(), _foreign_segments_to_replay.size());
|
|
return;
|
|
}
|
|
|
|
manager_logger.trace("hint_sender[{}]:notify_replay_waiters: Replay position upper bound was updated to {}", end_point_key(), _sent_upper_bound_rp);
|
|
while (!_replay_waiters.empty() && _replay_waiters.begin()->first < _sent_upper_bound_rp) {
|
|
manager_logger.trace("hint_sender[{}]:notify_replay_waiters: Notifying one ({} < {})",
|
|
end_point_key(), _replay_waiters.begin()->first, _sent_upper_bound_rp);
|
|
|
|
auto ptr = _replay_waiters.begin()->second;
|
|
(**ptr).set_value();
|
|
(*ptr) = std::nullopt; // Prevent it from being resolved by abort source subscription
|
|
_replay_waiters.erase(_replay_waiters.begin());
|
|
}
|
|
}
|
|
|
|
void hint_sender::dismiss_replay_waiters() noexcept {
|
|
for (auto& p : _replay_waiters) {
|
|
manager_logger.debug("hint_sender[{}]:dismiss_replay_waiters: Dismissing one", end_point_key());
|
|
auto ptr = p.second;
|
|
(**ptr).set_exception(std::runtime_error(format("Hints manager for {} is stopping", end_point_key())));
|
|
(*ptr) = std::nullopt; // Prevent it from being resolved by abort source subscription
|
|
}
|
|
_replay_waiters.clear();
|
|
}
|
|
|
|
future<> hint_sender::wait_until_hints_are_replayed_up_to(abort_source& as, db::replay_position up_to_rp) {
|
|
manager_logger.debug("hint_sender[{}]:wait_until_hints_are_replayed_up_to: Entering with target {}", end_point_key(), up_to_rp);
|
|
if (_foreign_segments_to_replay.empty() && up_to_rp < _sent_upper_bound_rp) {
|
|
manager_logger.debug("hint_sender[{}]:wait_until_hints_are_replayed_up_to: Hints were already replayed above the point ({} < {})",
|
|
end_point_key(), up_to_rp, _sent_upper_bound_rp);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
if (as.abort_requested()) {
|
|
manager_logger.debug("hint_sender[{}]:wait_until_hints_are_replayed_up_to: Already aborted - stopping", end_point_key());
|
|
return make_exception_future<>(abort_requested_exception());
|
|
}
|
|
|
|
auto ptr = make_lw_shared<std::optional<promise<>>>(promise<>());
|
|
auto it = _replay_waiters.emplace(up_to_rp, ptr);
|
|
auto sub = as.subscribe([this, ptr, it] () noexcept {
|
|
if (!ptr->has_value()) {
|
|
// The promise already was resolved by `notify_replay_waiters` and removed from the map
|
|
return;
|
|
}
|
|
manager_logger.debug("hint_sender[{}]:wait_until_hints_are_replayed_up_to: Abort requested - stopping", end_point_key());
|
|
_replay_waiters.erase(it);
|
|
(**ptr).set_exception(abort_requested_exception());
|
|
});
|
|
|
|
// When the future resolves, the endpoint manager is not guaranteed to exist anymore
|
|
// therefore we cannot capture `this`
|
|
auto ep = end_point_key();
|
|
return (**ptr).get_future().finally([sub = std::move(sub), ep] {
|
|
manager_logger.debug("hint_sender[{}]:wait_until_hints_are_replayed_up_to: Returning after the future was satisfied", ep);
|
|
});
|
|
}
|
|
|
|
void hint_sender::send_one_file_ctx::mark_hint_as_in_progress(db::replay_position rp) {
|
|
in_progress_rps.insert(rp);
|
|
}
|
|
|
|
void hint_sender::send_one_file_ctx::on_hint_send_success(db::replay_position rp) noexcept {
|
|
in_progress_rps.erase(rp);
|
|
if (!last_succeeded_rp || *last_succeeded_rp < rp) {
|
|
last_succeeded_rp = rp;
|
|
}
|
|
}
|
|
|
|
void hint_sender::send_one_file_ctx::on_hint_send_failure(db::replay_position rp) noexcept {
|
|
in_progress_rps.erase(rp);
|
|
segment_replay_failed = true;
|
|
if (!first_failed_rp || rp < *first_failed_rp) {
|
|
first_failed_rp = rp;
|
|
}
|
|
}
|
|
|
|
db::replay_position hint_sender::send_one_file_ctx::get_replayed_bound() const noexcept {
|
|
// We are sure that all hints were sent _below_ the position which is the minimum of the following:
|
|
// - Position of the first hint that failed to be sent in this replay (first_failed_rp),
|
|
// - Position of the last hint which was successfully sent (last_succeeded_rp, inclusive bound),
|
|
// - Position of the lowest hint which is being currently sent (in_progress_rps.begin()).
|
|
|
|
db::replay_position rp;
|
|
if (first_failed_rp) {
|
|
rp = *first_failed_rp;
|
|
} else if (last_succeeded_rp) {
|
|
// It is always true that `first_failed_rp` <= `last_succeeded_rp`, so no need to compare
|
|
rp = *last_succeeded_rp;
|
|
// We replayed _up to_ `last_attempted_rp`, so the bound is not strict; we can increase `pos` by one
|
|
rp.pos++;
|
|
}
|
|
|
|
if (!in_progress_rps.empty() && *in_progress_rps.begin() < rp) {
|
|
rp = *in_progress_rps.begin();
|
|
}
|
|
|
|
return rp;
|
|
}
|
|
|
|
void hint_sender::rewind_sent_replay_position_to(db::replay_position rp) {
|
|
_sent_upper_bound_rp = rp;
|
|
notify_replay_waiters();
|
|
}
|
|
|
|
// runs in a seastar::async context
|
|
bool hint_sender::send_one_file(const sstring& fname) {
|
|
timespec last_mod = get_last_file_modification(fname).get();
|
|
gc_clock::duration secs_since_file_mod = std::chrono::seconds(last_mod.tv_sec);
|
|
lw_shared_ptr<send_one_file_ctx> ctx_ptr = make_lw_shared<send_one_file_ctx>(_last_schema_ver_to_column_mapping);
|
|
|
|
struct canceled_draining_exception {};
|
|
|
|
try {
|
|
commitlog::read_log_file(fname, manager::FILENAME_PREFIX, [this, secs_since_file_mod, &fname, ctx_ptr] (commitlog::buffer_and_replay_position buf_rp) -> future<> {
|
|
auto& buf = buf_rp.buffer;
|
|
auto& rp = buf_rp.position;
|
|
|
|
while (true) {
|
|
// Check that we can still send the next hint. Don't try to send it if the destination host
|
|
// is DOWN or if we have already failed to send some of the previous hints.
|
|
if (!draining() && ctx_ptr->segment_replay_failed) {
|
|
co_return;
|
|
}
|
|
|
|
if (canceled_draining()) {
|
|
manager_logger.debug("hint_sender[{}]:send_one_file: Exiting reading from commitlog because of canceled draining", _ep_key);
|
|
// We need to throw an exception here to cancel reading the segment.
|
|
throw canceled_draining_exception{};
|
|
}
|
|
|
|
// Break early if stop() was called or the destination node went down.
|
|
if (!can_send()) {
|
|
ctx_ptr->segment_replay_failed = true;
|
|
co_return;
|
|
}
|
|
|
|
co_await flush_maybe();
|
|
|
|
if (utils::get_local_injector().enter("hinted_handoff_pause_hint_replay")) {
|
|
// We cannot send the hint because hint replay is paused.
|
|
// Sleep 100ms and do the whole loop again.
|
|
//
|
|
// Jumping to the beginning of the loop makes sure that
|
|
// - We regularly check if we should stop - so that we won't
|
|
// get stuck in shutdown.
|
|
// - flush_maybe() is called regularly - so that new segments
|
|
// are created and we help enforce the "at most 10s worth of
|
|
// hints in a segment".
|
|
co_await sleep(std::chrono::milliseconds(100));
|
|
continue;
|
|
} else {
|
|
co_await send_one_hint(ctx_ptr, std::move(buf), rp, secs_since_file_mod, fname);
|
|
break;
|
|
}
|
|
};
|
|
}, _last_not_complete_rp.pos, &_db.extensions()).get();
|
|
} catch (db::commitlog::segment_error& ex) {
|
|
manager_logger.error("hint_sender[{}]:send_one_file: Segment error in {}: {}. Last not complete position={}",
|
|
_ep_key, fname, ex.what(), _last_not_complete_rp);
|
|
ctx_ptr->segment_replay_failed = false;
|
|
++this->shard_stats().corrupted_files;
|
|
} catch (const canceled_draining_exception&) {
|
|
manager_logger.debug("hint_sender[{}]:send_one_file: Loop in send_one_file finishes due to canceled draining", _ep_key);
|
|
} catch (...) {
|
|
manager_logger.debug("hint_sender[{}]:send_one_file: Sending of {} failed: {}. Last not complete position={}",
|
|
_ep_key, fname, std::current_exception(), _last_not_complete_rp);
|
|
ctx_ptr->segment_replay_failed = true;
|
|
}
|
|
|
|
// wait till all background hints sending is complete
|
|
ctx_ptr->file_send_gate.close().get();
|
|
|
|
// If draining was canceled, we can't say anything about the segment's state,
|
|
// so return immediately. We return false here because of that reason too.
|
|
if (canceled_draining()) {
|
|
return false;
|
|
}
|
|
|
|
// If we are draining ignore failures and drop the segment even if we failed to send it.
|
|
if (draining() && ctx_ptr->segment_replay_failed) {
|
|
manager_logger.debug("hint_sender[{}]:send_one_file: We are draining, so we are going to delete the segment anyway", _ep_key);
|
|
ctx_ptr->segment_replay_failed = false;
|
|
}
|
|
|
|
// update the next iteration replay position if needed
|
|
if (ctx_ptr->segment_replay_failed) {
|
|
// If some hints failed to be sent, first_failed_rp will tell the position of first such hint.
|
|
// If there was an error thrown by read_log_file function itself, we will retry sending from
|
|
// the last hint that was successfully sent (last_succeeded_rp).
|
|
_last_not_complete_rp = ctx_ptr->first_failed_rp.value_or(ctx_ptr->last_succeeded_rp.value_or(_last_not_complete_rp));
|
|
manager_logger.debug("hint_sender[{}]:send_one_file: Error while sending hints from {}, last RP is {}", _ep_key, fname, _last_not_complete_rp);
|
|
return false;
|
|
}
|
|
|
|
// If we got here we are done with the current segment and we can remove it.
|
|
with_shared(_file_update_mutex, [&fname, this] {
|
|
auto p = _ep_manager.get_or_load().get();
|
|
return p->delete_segments({ fname });
|
|
}).get();
|
|
|
|
// clear the replay position - we are going to send the next segment...
|
|
_last_not_complete_rp = replay_position();
|
|
_last_schema_ver_to_column_mapping.clear();
|
|
manager_logger.debug("hint_sender[{}]:send_one_file: Segment {} has been sent in full and deleted", _ep_key, fname);
|
|
return true;
|
|
}
|
|
|
|
const sstring* hint_sender::name_of_current_segment() const {
|
|
// Foreign segments are replayed first
|
|
if (!_foreign_segments_to_replay.empty()) {
|
|
return &_foreign_segments_to_replay.front();
|
|
}
|
|
if (!_segments_to_replay.empty()) {
|
|
return &_segments_to_replay.front();
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
void hint_sender::pop_current_segment() {
|
|
if (!_foreign_segments_to_replay.empty()) {
|
|
_foreign_segments_to_replay.pop_front();
|
|
} else if (!_segments_to_replay.empty()) {
|
|
_segments_to_replay.pop_front();
|
|
}
|
|
}
|
|
|
|
// Runs in the seastar::async context
|
|
void hint_sender::send_hints_maybe() noexcept {
|
|
using namespace std::literals::chrono_literals;
|
|
manager_logger.trace("hint_sender[{}]:send_hints_maybe: Going to send hints. We have {} segment to replay",
|
|
end_point_key(), _segments_to_replay.size() + _foreign_segments_to_replay.size());
|
|
|
|
int replayed_segments_count = 0;
|
|
|
|
try {
|
|
while (true) {
|
|
if (canceled_draining()) {
|
|
manager_logger.debug("hint_sender[{}]:send_hints_maybe: Exiting loop in send_hints_maybe because of canceled draining", _ep_key);
|
|
break;
|
|
}
|
|
const sstring* seg_name = name_of_current_segment();
|
|
if (!seg_name || !replay_allowed() || !can_send()) {
|
|
break;
|
|
}
|
|
if (!send_one_file(*seg_name)) {
|
|
break;
|
|
}
|
|
pop_current_segment();
|
|
++replayed_segments_count;
|
|
|
|
notify_replay_waiters();
|
|
}
|
|
|
|
// Ignore exceptions, we will retry sending this file from where we left off the next time.
|
|
// Exceptions are not expected here during the regular operation, so just log them.
|
|
} catch (...) {
|
|
manager_logger.debug("hint_sender[{}]:send_hints_maybe: Exception occurred while sending: {}", _ep_key, std::current_exception());
|
|
}
|
|
|
|
if (have_segments()) {
|
|
// TODO: come up with something more sophisticated here
|
|
_next_send_retry_tp = clock::now() + 1s;
|
|
} else {
|
|
// if there are no segments to send we want to retry when we maybe have some (after flushing)
|
|
_next_send_retry_tp = _next_flush_tp;
|
|
}
|
|
|
|
manager_logger.debug("hint_sender[{}]:send_hints_maybe: We handled {} segments", _ep_key, replayed_segments_count);
|
|
}
|
|
|
|
hint_stats& hint_sender::shard_stats() {
|
|
return _shard_manager._stats;
|
|
}
|
|
|
|
bool hint_sender::replay_allowed() const noexcept {
|
|
return _ep_manager.replay_allowed();
|
|
}
|
|
|
|
} // namespace internal
|
|
} // namespace db::hints
|