mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
storage_proxy: add hints manager for views
This commit adds a separate hints manager that serves only failed materialized view updates.
This commit is contained in:
@@ -633,7 +633,9 @@ void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep
|
||||
}
|
||||
|
||||
storage_proxy::~storage_proxy() {}
|
||||
storage_proxy::storage_proxy(distributed<database>& db, stdx::optional<std::vector<sstring>> hinted_handoff_enabled) : _db(db) {
|
||||
storage_proxy::storage_proxy(distributed<database>& db, stdx::optional<std::vector<sstring>> hinted_handoff_enabled)
|
||||
: _db(db)
|
||||
, _hints_for_views_manager(_db.local().get_config().data_file_directories()[0] + "/view_pending_updates", {}, _db.local().get_config().max_hint_window_in_ms(), _hints_resource_manager, _db) {
|
||||
namespace sm = seastar::metrics;
|
||||
_metrics.add_group(COORDINATOR_STATS_CATEGORY, {
|
||||
sm::make_histogram("read_latency", sm::description("The general read latency histogram"), [this]{ return _stats.estimated_read.get_histogram(16, 20);}),
|
||||
@@ -737,6 +739,9 @@ storage_proxy::storage_proxy(distributed<database>& db, stdx::optional<std::vect
|
||||
_hints_manager.emplace(cfg.hints_directory(), *hinted_handoff_enabled, cfg.max_hint_window_in_ms(), _hints_resource_manager, _db);
|
||||
|
||||
_hints_manager->register_metrics("hints_manager");
|
||||
_hints_for_views_manager.register_metrics("hints_for_views_manager");
|
||||
_hints_resource_manager.register_manager(*_hints_manager);
|
||||
_hints_resource_manager.register_manager(_hints_for_views_manager);
|
||||
}
|
||||
|
||||
storage_proxy::rh_entry::rh_entry(shared_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {}
|
||||
@@ -1791,8 +1796,9 @@ template<typename Range>
|
||||
size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept
|
||||
{
|
||||
if (hints_enabled(type)) {
|
||||
return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state)] (gms::inet_address target) mutable -> bool {
|
||||
return _hints_manager->store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state);
|
||||
db::hints::manager& hints_manager = hints_manager_for(type);
|
||||
return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool {
|
||||
return hints_manager.store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state);
|
||||
});
|
||||
} else {
|
||||
return 0;
|
||||
@@ -3644,6 +3650,10 @@ bool storage_proxy::hints_enabled(db::write_type type) noexcept {
|
||||
return _hints_enabled_for_user_writes || (type == db::write_type::VIEW && bool(_hints_manager));
|
||||
}
|
||||
|
||||
db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) {
|
||||
return type == db::write_type::VIEW ? _hints_for_views_manager : *_hints_manager;
|
||||
}
|
||||
|
||||
future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
|
||||
slogger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", keyspace, cfname);
|
||||
|
||||
@@ -4433,17 +4443,11 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s,
|
||||
}
|
||||
|
||||
future<> storage_proxy::start_hints_manager(shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr) {
|
||||
if (_hints_manager) {
|
||||
return _hints_manager->start(shared_from_this(), std::move(gossiper_ptr), std::move(ss_ptr));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return _hints_resource_manager.start(shared_from_this(), gossiper_ptr, ss_ptr);
|
||||
}
|
||||
|
||||
future<> storage_proxy::stop_hints_manager() {
|
||||
if (_hints_manager) {
|
||||
return _hints_manager->stop();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return _hints_resource_manager.stop();
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
@@ -151,6 +151,7 @@ private:
|
||||
circular_buffer<response_id_type> _throttled_writes;
|
||||
db::hints::resource_manager _hints_resource_manager;
|
||||
stdx::optional<db::hints::manager> _hints_manager;
|
||||
db::hints::manager _hints_for_views_manager;
|
||||
bool _hints_enabled_for_user_writes = false;
|
||||
stats _stats;
|
||||
static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
|
||||
@@ -181,6 +182,7 @@ private:
|
||||
template<typename Range>
|
||||
bool cannot_hint(const Range& targets, db::write_type type);
|
||||
bool hints_enabled(db::write_type type) noexcept;
|
||||
db::hints::manager& hints_manager_for(db::write_type type);
|
||||
std::vector<gms::inet_address> get_live_endpoints(keyspace& ks, const dht::token& token);
|
||||
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token);
|
||||
db::read_repair_decision new_read_repair_decision(const schema& s);
|
||||
|
||||
Reference in New Issue
Block a user