Merge 'Separate hinted handoff manager for materialized views' from Piotr

"
This series introduces a separate hinted handoff manager for materialized views.

Steps:
 * decouple resource limits from hinted handoff, so multiple instances can share space
   and throughput limits in order to avoid internal fragmentation for every instance's
   reservations
 * add a subdirectory to data/, responsible for storing materialized view hints
 * decouple registering global metrics from hinted handoff constructor, now that there
   can be more than one instance - otherwise 'registering metrics twice' errors are going to occur
 * add a hints_for_views_manager to storage proxy and route failed view updates to use it
   instead of the original hints_manager
 * restore previous semantics for enabling/disabling hinted handoff - regular hinted handoff
   can be disabled or enabled just for specific datacenters without influencing materialized
   views flow
"

* 'separate_hh_for_mv_4' of https://github.com/psarna/scylla:
  storage_proxy: restore optional hinted handoff
  storage_proxy: add hints manager for views
  hints: decouple hints manager metrics from constructor
  db, config: add view_pending_updates directory
  hints: move space_watchdog to resource manager
  hints: move send limiter to resource manager
  hints: move constants to resource_manager
This commit is contained in:
Duarte Nunes
2018-06-04 12:03:59 +01:00
8 changed files with 457 additions and 233 deletions

View File

@@ -518,6 +518,7 @@ scylla_core = (['database.cc',
'db/commitlog/commitlog_replayer.cc',
'db/commitlog/commitlog_entry.cc',
'db/hints/manager.cc',
'db/hints/resource_manager.cc',
'db/config.cc',
'db/extensions.cc',
'db/heat_load_balance.cc',

View File

@@ -42,26 +42,26 @@ const std::string manager::FILENAME_PREFIX("HintsLog" + commitlog::descriptor::S
const std::chrono::seconds manager::hint_file_write_timeout = std::chrono::seconds(2);
const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10);
const std::chrono::seconds manager::space_watchdog::_watchdog_period = std::chrono::seconds(1);
// TODO: remove this when we switch to C++17
constexpr size_t manager::_max_hints_send_queue_length;
size_t db::hints::manager::max_shard_disk_space_size;
size_t db::hints::resource_manager::max_shard_disk_space_size;
manager::manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64_t max_hint_window_ms, distributed<database>& db)
manager::manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64_t max_hint_window_ms, resource_manager& res_manager, distributed<database>& db)
: _hints_dir(boost::filesystem::path(hints_directory) / format("{:d}", engine().cpu_id()).c_str())
, _hinted_dcs(hinted_dcs.begin(), hinted_dcs.end())
, _local_snitch_ptr(locator::i_endpoint_snitch::get_local_snitch_ptr())
, _max_hint_window_us(max_hint_window_ms * 1000)
, _local_db(db.local())
, _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, _max_hints_send_queue_length))
, _min_send_hint_budget(_max_send_in_flight_memory / _max_hints_send_queue_length)
, _send_limiter(_max_send_in_flight_memory)
, _space_watchdog(*this)
{
, _resource_manager(res_manager)
{}
manager::~manager() {
assert(_ep_managers.empty());
}
void manager::register_metrics(const sstring& group_name) {
namespace sm = seastar::metrics;
_metrics.add_group("hints_manager", {
_metrics.add_group(group_name, {
sm::make_gauge("size_of_hints_in_progress", _stats.size_of_hints_in_progress,
sm::description("Size of hinted mutations that are scheduled to be written.")),
@@ -79,10 +79,6 @@ manager::manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64
});
}
manager::~manager() {
assert(_ep_managers.empty());
}
future<> manager::start(shared_ptr<service::storage_proxy> proxy_ptr, shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr) {
_proxy_anchor = std::move(proxy_ptr);
_gossiper_anchor = std::move(gossiper_ptr);
@@ -95,8 +91,6 @@ future<> manager::start(shared_ptr<service::storage_proxy> proxy_ptr, shared_ptr
return get_ep_manager(ep).populate_segments_to_replay();
}).then([this] {
_strorage_service_anchor->register_subscriber(this);
// we are ready to store new hints...
_space_watchdog.start();
});
}
@@ -110,24 +104,42 @@ future<> manager::stop() {
_stopping = true;
return _draining_eps_gate.close().finally([this] {
return when_all(
parallel_for_each(_ep_managers, [] (auto& pair) {
return parallel_for_each(_ep_managers, [] (auto& pair) {
return pair.second.stop();
}),
_space_watchdog.stop()
).finally([this] {
}).finally([this] {
_ep_managers.clear();
manager_logger.info("Stopped");
}).discard_result();
});
}
void manager::allow_hints() {
boost::for_each(_ep_managers, [] (auto& pair) { pair.second.allow_hints(); });
}
void manager::forbid_hints() {
boost::for_each(_ep_managers, [] (auto& pair) { pair.second.forbid_hints(); });
}
void manager::forbid_hints_for_eps_with_pending_hints() {
manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints);
boost::for_each(_ep_managers, [this] (auto& pair) {
end_point_hints_manager& ep_man = pair.second;
if (has_ep_with_pending_hints(ep_man.end_point_key())) {
ep_man.forbid_hints();
} else {
ep_man.allow_hints();
}
});
}
bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr<const frozen_mutation> fm, tracing::trace_state_ptr tr_state) noexcept {
try {
with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable {
++_hints_in_progress;
size_t mut_size = fm->representation().size();
shard_stats().size_of_hints_in_progress += mut_size;
shard_resource_manager().inc_size_of_hints_in_progress(mut_size);
return with_shared(file_update_mutex(), [this, fm, s, tr_state] () mutable -> future<> {
return get_or_load().then([this, fm = std::move(fm), s = std::move(s), tr_state] (hints_store_ptr log_ptr) mutable {
@@ -148,6 +160,7 @@ bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr<co
}).finally([this, mut_size, fm, s] {
--_hints_in_progress;
shard_stats().size_of_hints_in_progress -= mut_size;
shard_resource_manager().dec_size_of_hints_in_progress(mut_size);
});;
});
} catch (...) {
@@ -280,8 +293,8 @@ future<db::commitlog> manager::end_point_hints_manager::add_store() noexcept {
commitlog::config cfg;
cfg.commit_log_location = _hints_dir.c_str();
cfg.commitlog_segment_size_in_mb = _hint_segment_size_in_mb;
cfg.commitlog_total_space_in_mb = _max_hints_per_ep_size_mb;
cfg.commitlog_segment_size_in_mb = resource_manager::hint_segment_size_in_mb;
cfg.commitlog_total_space_in_mb = resource_manager::max_hints_per_ep_size_mb;
cfg.fname_prefix = manager::FILENAME_PREFIX;
cfg.extensions = &_shard_manager.local_db().get_config().extensions();
@@ -420,122 +433,10 @@ const column_mapping& manager::end_point_hints_manager::sender::get_column_mappi
return cm_it->second;
}
manager::space_watchdog::space_watchdog(manager& shard_manager)
: _shard_manager(shard_manager)
, _timer([this] { on_timer(); })
{}
void manager::space_watchdog::start() {
_timer.arm(timer_clock_type::now());
}
future<> manager::space_watchdog::stop() noexcept {
try {
return _gate.close().finally([this] { _timer.cancel(); });
} catch (...) {
return make_exception_future<>(std::current_exception());
}
}
future<> manager::space_watchdog::scan_one_ep_dir(boost::filesystem::path path, ep_key_type ep_key) {
return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key] (lister::path dir, directory_entry de) {
// Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory
if (_files_count == 1) {
_eps_with_pending_hints.emplace(ep_key);
}
++_files_count;
return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) {
_total_size += fsize;
});
});
}
void manager::space_watchdog::on_timer() {
with_gate(_gate, [this] {
return futurize_apply([this] {
_eps_with_pending_hints.clear();
_eps_with_pending_hints.reserve(_shard_manager._ep_managers.size());
_total_size = 0;
// The hints directories are organized as follows:
// <hints root>
// |- <shard1 ID>
// | |- <EP1 address>
// | |- <hints file1>
// | |- <hints file2>
// | |- ...
// | |- <EP2 address>
// | |- ...
// | |-...
// |- <shard2 ID>
// | |- ...
// ...
// |- <shardN ID>
// | |- ...
//
// This is a top level shard hints directory, let's enumerate per-end-point sub-directories...
return lister::scan_dir(_shard_manager._hints_dir, {directory_entry_type::directory}, [this] (lister::path dir, directory_entry de) {
_files_count = 0;
// Let's scan per-end-point directories and enumerate hints files...
//
// Let's check if there is a corresponding end point manager (may not exist if the corresponding DC is
// not hintable).
// If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply
// continue to enumeration - there is no one to change them.
auto it = _shard_manager.find_ep_manager(de.name);
if (it != _shard_manager.ep_managers_end()) {
return with_lock(it->second.file_update_mutex(), [this, dir = std::move(dir), ep_name = std::move(de.name)]() mutable {
return scan_one_ep_dir(dir / ep_name.c_str(), ep_key_type(ep_name));
});
} else {
return scan_one_ep_dir(dir / de.name.c_str(), ep_key_type(de.name));
}
}).then([this] {
// Adjust the quota to take into account the space we guarantee to every end point manager
size_t adjusted_quota = 0;
size_t delta = _shard_manager._ep_managers.size() * _hint_segment_size_in_mb * 1024 * 1024;
if (max_shard_disk_space_size > delta) {
adjusted_quota = max_shard_disk_space_size - delta;
}
bool can_hint = _total_size < adjusted_quota;
manager_logger.trace("space_watchdog: total_size ({}) {} max_shard_disk_space_size ({})", _total_size, can_hint ? "<" : ">=", adjusted_quota);
if (!can_hint) {
manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints);
std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [this] (auto& pair) {
end_point_hints_manager& ep_man = pair.second;
auto it = _eps_with_pending_hints.find(ep_man.end_point_key());
if (it != _eps_with_pending_hints.end()) {
ep_man.forbid_hints();
} else {
ep_man.allow_hints();
}
});
} else {
std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [] (auto& pair) {
pair.second.allow_hints();
});
}
});
}).handle_exception([this] (auto eptr) {
manager_logger.trace("space_watchdog: unexpected exception - stop all hints generators");
// Stop all hint generators if space_watchdog callback failed
std::for_each(_shard_manager._ep_managers.begin(), _shard_manager._ep_managers.end(), [this] (auto& pair) {
pair.second.forbid_hints();
});
}).finally([this] {
_timer.arm(_watchdog_period);
});
});
}
bool manager::too_many_in_flight_hints_for(ep_key_type ep) const noexcept {
// There is no need to check the DC here because if there is an in-flight hint for this end point then this means that
// its DC has already been checked and found to be ok.
return _stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us;
return _resource_manager.too_many_hints_in_progress() && !utils::fb_utilities::is_me(ep) && hints_in_progress_for(ep) > 0 && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us;
}
bool manager::can_hint_for(ep_key_type ep) const noexcept {
@@ -552,8 +453,8 @@ bool manager::can_hint_for(ep_key_type ep) const noexcept {
// hints is more than the maximum allowed value.
//
// In the worst case there's going to be (_max_size_of_hints_in_progress + N - 1) in-flight hints, where N is the total number Nodes in the cluster.
if (_stats.size_of_hints_in_progress > _max_size_of_hints_in_progress && hints_in_progress_for(ep) > 0) {
manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _stats.size_of_hints_in_progress, ep, hints_in_progress_for(ep));
if (_resource_manager.too_many_hints_in_progress() && hints_in_progress_for(ep) > 0) {
manager_logger.trace("size_of_hints_in_progress {} hints_in_progress_for({}) {}", _resource_manager.size_of_hints_in_progress(), ep, hints_in_progress_for(ep));
return false;
}
@@ -623,6 +524,7 @@ manager::end_point_hints_manager::sender::sender(end_point_hints_manager& parent
, _ep_key(parent.end_point_key())
, _ep_manager(parent)
, _shard_manager(_ep_manager._shard_manager)
, _resource_manager(_shard_manager._resource_manager)
, _proxy(local_storage_proxy)
, _db(local_db)
, _gossiper(local_gossiper)
@@ -634,6 +536,7 @@ manager::end_point_hints_manager::sender::sender(const sender& other, end_point_
, _ep_key(parent.end_point_key())
, _ep_manager(parent)
, _shard_manager(_ep_manager._shard_manager)
, _resource_manager(_shard_manager._resource_manager)
, _proxy(other._proxy)
, _db(other._db)
, _gossiper(other._gossiper)
@@ -720,14 +623,7 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(mutation m)
}
future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<send_one_file_ctx> ctx_ptr, temporary_buffer<char> buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) {
// Let's approximate the memory size the mutation is going to consume by the size of its serialized form
size_t hint_memory_budget = std::max(_shard_manager._min_send_hint_budget, buf.size());
// Allow a very big mutation to be sent out by consuming the whole shard budget
hint_memory_budget = std::min(hint_memory_budget, _shard_manager._max_send_in_flight_memory);
manager_logger.trace("memory budget: need {} have {}", hint_memory_budget, _shard_manager._send_limiter.available_units());
return get_units(_shard_manager._send_limiter, hint_memory_budget).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable {
return _resource_manager.get_send_units_for(buf.size()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable {
with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable {
try {
try {

View File

@@ -32,9 +32,11 @@
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/shared_mutex.hh>
#include "gms/gossiper.hh"
#include "locator/snitch_base.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "db/commitlog/commitlog.hh"
#include "utils/loading_shared_values.hh"
#include "db/hints/resource_manager.hh"
namespace service {
class storage_service;
@@ -61,6 +63,7 @@ private:
class drain_tag {};
using drain = seastar::bool_class<drain_tag>;
public:
class end_point_hints_manager {
public:
using key_type = gms::inet_address;
@@ -107,6 +110,7 @@ private:
key_type _ep_key;
end_point_hints_manager& _ep_manager;
manager& _shard_manager;
resource_manager& _resource_manager;
service::storage_proxy& _proxy;
database& _db;
gms::gossiper& _gossiper;
@@ -377,66 +381,22 @@ private:
struct stats& shard_stats() {
return _shard_manager._stats;
}
resource_manager& shard_resource_manager() {
return _shard_manager._resource_manager;
}
};
private:
using ep_key_type = typename end_point_hints_manager::key_type;
using ep_managers_map_type = std::unordered_map<ep_key_type, end_point_hints_manager>;
class space_watchdog {
private:
static const std::chrono::seconds _watchdog_period;
private:
std::unordered_set<ep_key_type> _eps_with_pending_hints;
size_t _total_size = 0;
manager& _shard_manager;
seastar::gate _gate;
seastar::timer<timer_clock_type> _timer;
int _files_count = 0;
public:
space_watchdog(manager& shard_manager);
future<> stop() noexcept;
void start();
private:
/// \brief Check that hints don't occupy too much disk space.
///
/// Verifies that the whole \ref manager::_hints_dir occupies less than \ref manager::max_shard_disk_space_size.
///
/// If it does, stop all end point managers that have more than one hints file - we don't want some DOWN Node to
/// prevent hints to other Nodes from being generated (e.g. due to some temporary overload and timeout).
///
/// This is a simplistic implementation of a manager for a limited shared resource with a minimum guarantied share for all
/// participants.
///
/// This implementation guaranties at least a single hint share for all end point managers.
void on_timer();
/// \brief Scan files in a single end point directory.
///
/// Add sizes of files in the directory to _total_size. If number of files is greater than 1 add this end point ID
/// to _eps_with_pending_hints so that we may block it if _total_size value becomes greater than the maximum allowed
/// value.
///
/// \param path directory to scan
/// \param ep_name end point ID (as a string)
/// \return future that resolves when scanning is complete
future<> scan_one_ep_dir(boost::filesystem::path path, ep_key_type ep_name);
};
public:
static const std::string FILENAME_PREFIX;
static const std::chrono::seconds hints_flush_period;
static const std::chrono::seconds hint_file_write_timeout;
static size_t max_shard_disk_space_size;
private:
static constexpr uint64_t _max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB
static constexpr size_t _hint_segment_size_in_mb = 32;
static constexpr size_t _max_hints_per_ep_size_mb = 128; // 4 files 32MB each
static constexpr size_t _max_hints_send_queue_length = 128;
const boost::filesystem::path _hints_dir;
node_to_hint_store_factory_type _store_factory;
@@ -450,20 +410,17 @@ private:
bool _stopping = false;
seastar::gate _draining_eps_gate; // gate used to control the progress of ep_managers stopping not in the context of manager::stop() call
// Limit the maximum size of in-flight (being sent) hints by 10% of the shard memory.
// Also don't allow more than 128 in-flight hints to limit the collateral memory consumption as well.
const size_t _max_send_in_flight_memory;
const size_t _min_send_hint_budget;
seastar::semaphore _send_limiter;
resource_manager& _resource_manager;
space_watchdog _space_watchdog;
ep_managers_map_type _ep_managers;
stats _stats;
seastar::metrics::metric_groups _metrics;
std::unordered_set<ep_key_type> _eps_with_pending_hints;
public:
manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64_t max_hint_window_ms, distributed<database>& db);
manager(sstring hints_directory, std::vector<sstring> hinted_dcs, int64_t max_hint_window_ms, resource_manager&res_manager, distributed<database>& db);
virtual ~manager();
void register_metrics(const sstring& group_name);
future<> start(shared_ptr<service::storage_proxy> proxy_ptr, shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr);
future<> stop();
bool store_hint(gms::inet_address ep, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm, tracing::trace_state_ptr tr_state) noexcept;
@@ -510,6 +467,32 @@ public:
return it->second.hints_in_progress();
}
void add_ep_with_pending_hints(ep_key_type key) {
_eps_with_pending_hints.insert(key);
}
void clear_eps_with_pending_hints() {
_eps_with_pending_hints.clear();
_eps_with_pending_hints.reserve(_ep_managers.size());
}
bool has_ep_with_pending_hints(ep_key_type key) const {
return _eps_with_pending_hints.count(key);
}
size_t ep_managers_size() const {
return _ep_managers.size();
}
const boost::filesystem::path& hints_dir() const {
return _hints_dir;
}
void allow_hints();
void forbid_hints();
void forbid_hints_for_eps_with_pending_hints();
static future<> rebalance() {
// TODO
return make_ready_future<>();
@@ -528,10 +511,6 @@ private:
return _store_factory;
}
const boost::filesystem::path& hints_dir() const {
return _hints_dir;
}
service::storage_proxy& local_storage_proxy() const noexcept {
return *_proxy_anchor;
}
@@ -558,7 +537,7 @@ private:
/// \param endpoint node that left the cluster
void drain_for(gms::inet_address endpoint);
private:
public:
ep_managers_map_type::iterator find_ep_manager(ep_key_type ep_key) noexcept {
return _ep_managers.find(ep_key);
}

View File

@@ -0,0 +1,179 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "resource_manager.hh"
#include "manager.hh"
#include "log.hh"
#include <boost/range/algorithm/for_each.hpp>
#include <boost/range/adaptor/map.hpp>
#include "lister.hh"
#include "disk-error-handler.hh"
#include "seastarx.hh"
namespace db {
namespace hints {
static logging::logger resource_manager_logger("hints_resource_manager");
future<semaphore_units<semaphore_default_exception_factory>> resource_manager::get_send_units_for(size_t buf_size) {
// Let's approximate the memory size the mutation is going to consume by the size of its serialized form
size_t hint_memory_budget = std::max(_min_send_hint_budget, buf_size);
// Allow a very big mutation to be sent out by consuming the whole shard budget
hint_memory_budget = std::min(hint_memory_budget, _max_send_in_flight_memory);
resource_manager_logger.trace("memory budget: need {} have {}", hint_memory_budget, _send_limiter.available_units());
return get_units(_send_limiter, hint_memory_budget);
}
const std::chrono::seconds space_watchdog::_watchdog_period = std::chrono::seconds(1);
space_watchdog::space_watchdog(shard_managers_set& managers)
: _shard_managers(managers)
, _timer([this] { on_timer(); })
{}
void space_watchdog::start() {
_timer.arm(timer_clock_type::now());
}
future<> space_watchdog::stop() noexcept {
try {
return _gate.close().finally([this] { _timer.cancel(); });
} catch (...) {
return make_exception_future<>(std::current_exception());
}
}
future<> space_watchdog::scan_one_ep_dir(boost::filesystem::path path, manager& shard_manager, ep_key_type ep_key) {
return lister::scan_dir(path, { directory_entry_type::regular }, [this, ep_key, &shard_manager] (lister::path dir, directory_entry de) {
// Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory
if (_files_count == 1) {
shard_manager.add_ep_with_pending_hints(ep_key);
}
++_files_count;
return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) {
_total_size += fsize;
});
});
}
size_t space_watchdog::end_point_managers_count() const {
return boost::accumulate(_shard_managers, 0, [] (size_t sum, manager& shard_manager) {
return sum + shard_manager.ep_managers_size();
});
}
void space_watchdog::on_timer() {
with_gate(_gate, [this] {
return futurize_apply([this] {
_total_size = 0;
return do_for_each(_shard_managers, [this] (manager& shard_manager) {
shard_manager.clear_eps_with_pending_hints();
// The hints directories are organized as follows:
// <hints root>
// |- <shard1 ID>
// | |- <EP1 address>
// | |- <hints file1>
// | |- <hints file2>
// | |- ...
// | |- <EP2 address>
// | |- ...
// | |-...
// |- <shard2 ID>
// | |- ...
// ...
// |- <shardN ID>
// | |- ...
//
return lister::scan_dir(shard_manager.hints_dir(), {directory_entry_type::directory}, [this, &shard_manager] (lister::path dir, directory_entry de) {
_files_count = 0;
// Let's scan per-end-point directories and enumerate hints files...
//
// Let's check if there is a corresponding end point manager (may not exist if the corresponding DC is
// not hintable).
// If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply
// continue to enumeration - there is no one to change them.
auto it = shard_manager.find_ep_manager(de.name);
if (it != shard_manager.ep_managers_end()) {
return with_lock(it->second.file_update_mutex(), [this, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)]() mutable {
return scan_one_ep_dir(dir / ep_name.c_str(), shard_manager, ep_key_type(ep_name));
});
} else {
return scan_one_ep_dir(dir / de.name.c_str(), shard_manager, ep_key_type(de.name));
}
});
}).then([this] {
// Adjust the quota to take into account the space we guarantee to every end point manager
size_t adjusted_quota = 0;
size_t delta = end_point_managers_count() * resource_manager::hint_segment_size_in_mb * 1024 * 1024;
if (resource_manager::max_shard_disk_space_size > delta) {
adjusted_quota = resource_manager::max_shard_disk_space_size - delta;
}
bool can_hint = _total_size < adjusted_quota;
resource_manager_logger.trace("space_watchdog: total_size ({}) {} max_shard_disk_space_size ({})", _total_size, can_hint ? "<" : ">=", adjusted_quota);
if (!can_hint) {
for (manager& shard_manager : _shard_managers) {
shard_manager.forbid_hints_for_eps_with_pending_hints();
}
} else {
for (manager& shard_manager : _shard_managers) {
shard_manager.allow_hints();
}
}
});
}).handle_exception([this] (auto eptr) {
resource_manager_logger.trace("space_watchdog: unexpected exception - stop all hints generators");
// Stop all hint generators if space_watchdog callback failed
for (manager& shard_manager : _shard_managers) {
shard_manager.forbid_hints();
}
}).finally([this] {
_timer.arm(_watchdog_period);
});
});
}
future<> resource_manager::start(shared_ptr<service::storage_proxy> proxy_ptr, shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr) {
return parallel_for_each(_shard_managers, [proxy_ptr, gossiper_ptr, ss_ptr](manager& m) {
return m.start(proxy_ptr, gossiper_ptr, ss_ptr);
}).then([this]() {
return _space_watchdog.start();
});
}
future<> resource_manager::stop() noexcept {
return parallel_for_each(_shard_managers, [](manager& m) {
return m.stop();
}).finally([this]() {
return _space_watchdog.stop();
});
}
void resource_manager::register_manager(manager& m) {
_shard_managers.insert(m);
}
}
}

View File

@@ -0,0 +1,153 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <cstdint>
#include <seastar/core/semaphore.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/memory.hh>
#include <seastar/core/future.hh>
#include "seastarx.hh"
#include <unordered_set>
#include <boost/filesystem.hpp>
#include <gms/gossiper.hh>
namespace service {
class storage_proxy;
class storage_service;
}
namespace db {
namespace hints {
using timer_clock_type = seastar::lowres_clock;
class manager;
class space_watchdog {
private:
using ep_key_type = gms::inet_address;
static const std::chrono::seconds _watchdog_period;
struct manager_hash {
size_t operator()(const manager& manager) const {
return reinterpret_cast<uintptr_t>(&manager);
}
};
struct manager_comp {
bool operator()(const std::reference_wrapper<manager>& m1, const std::reference_wrapper<manager>& m2) const {
return std::addressof(m1.get()) == std::addressof(m2.get());
}
};
public:
using shard_managers_set = std::unordered_set<std::reference_wrapper<manager>, manager_hash, manager_comp>;
private:
size_t _total_size = 0;
shard_managers_set& _shard_managers;
seastar::gate _gate;
seastar::timer<timer_clock_type> _timer;
int _files_count = 0;
public:
space_watchdog(shard_managers_set& managers);
void start();
future<> stop() noexcept;
size_t end_point_managers_count() const;
private:
/// \brief Check that hints don't occupy too much disk space.
///
/// Verifies that all \ref manager::_hints_dir dirs for all managers occupy less than \ref resource_manager::max_shard_disk_space_size.
///
/// If they do, stop all end point managers that have more than one hints file - we don't want some DOWN Node to
/// prevent hints to other Nodes from being generated (e.g. due to some temporary overload and timeout).
///
/// This is a simplistic implementation of a manager for a limited shared resource with a minimum guaranteed share for all
/// participants.
///
/// This implementation guarantees at least a single hint share for all end point managers.
void on_timer();
/// \brief Scan files in a single end point directory.
///
/// Add sizes of files in the directory to _total_size. If number of files is greater than 1 add this end point ID
/// to _eps_with_pending_hints so that we may block it if _total_size value becomes greater than the maximum allowed
/// value.
///
/// \param path directory to scan
/// \param ep_name end point ID (as a string)
/// \return future that resolves when scanning is complete
future<> scan_one_ep_dir(boost::filesystem::path path, manager& shard_manager, ep_key_type ep_key);
};
class resource_manager {
const size_t _max_send_in_flight_memory;
const size_t _min_send_hint_budget;
seastar::semaphore _send_limiter;
uint64_t _size_of_hints_in_progress = 0;
space_watchdog::shard_managers_set _shard_managers;
space_watchdog _space_watchdog;
public:
static constexpr uint64_t max_size_of_hints_in_progress = 10 * 1024 * 1024; // 10MB
static constexpr size_t hint_segment_size_in_mb = 32;
static constexpr size_t max_hints_per_ep_size_mb = 128; // 4 files 32MB each
static constexpr size_t max_hints_send_queue_length = 128;
static size_t max_shard_disk_space_size;
public:
resource_manager()
: _max_send_in_flight_memory(std::max(memory::stats().total_memory() / 10, max_hints_send_queue_length))
, _min_send_hint_budget(_max_send_in_flight_memory / max_hints_send_queue_length)
, _send_limiter(_max_send_in_flight_memory)
, _space_watchdog(_shard_managers)
{}
future<semaphore_units<semaphore_default_exception_factory>> get_send_units_for(size_t buf_size);
bool too_many_hints_in_progress() const {
return _size_of_hints_in_progress > max_size_of_hints_in_progress;
}
uint64_t size_of_hints_in_progress() const {
return _size_of_hints_in_progress;
}
inline void inc_size_of_hints_in_progress(int64_t delta) {
_size_of_hints_in_progress += delta;
}
inline void dec_size_of_hints_in_progress(int64_t delta) {
_size_of_hints_in_progress -= delta;
}
future<> start(shared_ptr<service::storage_proxy> proxy_ptr, shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr);
future<> stop() noexcept;
void register_manager(manager& m);
};
}
}

20
main.cc
View File

@@ -518,12 +518,22 @@ int main(int ac, char** av) {
directories.insert(db.local().get_config().commitlog_directory());
supervisor::notify("creating hints directories");
boost::filesystem::path hints_base_dir(db.local().get_config().hints_directory());
dirs.touch_and_lock(db.local().get_config().hints_directory()).get();
directories.insert(db.local().get_config().hints_directory());
if (hinted_handoff_enabled) {
boost::filesystem::path hints_base_dir(db.local().get_config().hints_directory());
dirs.touch_and_lock(db.local().get_config().hints_directory()).get();
directories.insert(db.local().get_config().hints_directory());
for (unsigned i = 0; i < smp::count; ++i) {
sstring shard_dir((hints_base_dir / seastar::to_sstring(i).c_str()).native());
dirs.touch_and_lock(shard_dir).get();
directories.insert(std::move(shard_dir));
}
}
boost::filesystem::path view_pending_updates_base_dir = boost::filesystem::path(db.local().get_config().data_file_directories()[0]) / "view_pending_updates";
sstring view_pending_updates_base_dir_str = view_pending_updates_base_dir.native();
dirs.touch_and_lock(view_pending_updates_base_dir_str).get();
directories.insert(view_pending_updates_base_dir_str);
for (unsigned i = 0; i < smp::count; ++i) {
sstring shard_dir((hints_base_dir / seastar::to_sstring(i).c_str()).native());
sstring shard_dir((view_pending_updates_base_dir / seastar::to_sstring(i).c_str()).native());
dirs.touch_and_lock(shard_dir).get();
directories.insert(std::move(shard_dir));
}

View File

@@ -633,7 +633,9 @@ void storage_proxy_stats::split_stats::register_metrics_for(gms::inet_address ep
}
storage_proxy::~storage_proxy() {}
storage_proxy::storage_proxy(distributed<database>& db, stdx::optional<std::vector<sstring>> hinted_handoff_enabled) : _db(db) {
storage_proxy::storage_proxy(distributed<database>& db, stdx::optional<std::vector<sstring>> hinted_handoff_enabled)
: _db(db)
, _hints_for_views_manager(_db.local().get_config().data_file_directories()[0] + "/view_pending_updates", {}, _db.local().get_config().max_hint_window_in_ms(), _hints_resource_manager, _db) {
namespace sm = seastar::metrics;
_metrics.add_group(COORDINATOR_STATS_CATEGORY, {
sm::make_histogram("read_latency", sm::description("The general read latency histogram"), [this]{ return _stats.estimated_read.get_histogram(16, 20);}),
@@ -724,17 +726,20 @@ storage_proxy::storage_proxy(distributed<database>& db, stdx::optional<std::vect
});
_stats.register_metrics_local();
_hints_enabled_for_user_writes = bool(hinted_handoff_enabled);
if (!hinted_handoff_enabled) {
hinted_handoff_enabled.emplace();
}
supervisor::notify("creating hints manager");
slogger.trace("hinted DCs: {}", *hinted_handoff_enabled);
const db::config& cfg = _db.local().get_config();
// Give each hints manager 10% of the available disk space. Give each shard an equal share of the available space.
db::hints::manager::max_shard_disk_space_size = boost::filesystem::space(cfg.hints_directory().c_str()).capacity / (10 * smp::count);
_hints_manager.emplace(cfg.hints_directory(), *hinted_handoff_enabled, cfg.max_hint_window_in_ms(), _db);
db::hints::resource_manager::max_shard_disk_space_size = boost::filesystem::space(cfg.hints_directory().c_str()).capacity / (10 * smp::count);
if (hinted_handoff_enabled) {
supervisor::notify("creating hints manager");
slogger.trace("hinted DCs: {}", *hinted_handoff_enabled);
_hints_manager.emplace(cfg.hints_directory(), *hinted_handoff_enabled, cfg.max_hint_window_in_ms(), _hints_resource_manager, _db);
_hints_manager->register_metrics("hints_manager");
_hints_resource_manager.register_manager(*_hints_manager);
}
_hints_for_views_manager.register_metrics("hints_for_views_manager");
_hints_resource_manager.register_manager(_hints_for_views_manager);
}
storage_proxy::rh_entry::rh_entry(shared_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {}
@@ -1789,8 +1794,9 @@ template<typename Range>
size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept
{
if (hints_enabled(type)) {
return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state)] (gms::inet_address target) mutable -> bool {
return _hints_manager->store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state);
db::hints::manager& hints_manager = hints_manager_for(type);
return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool {
return hints_manager.store_hint(target, mh->schema(), mh->get_mutation_for(target), tr_state);
});
} else {
return 0;
@@ -3639,7 +3645,11 @@ get_restricted_ranges(locator::token_metadata& tm, const schema& s, dht::partiti
}
bool storage_proxy::hints_enabled(db::write_type type) noexcept {
return _hints_enabled_for_user_writes || (type == db::write_type::VIEW && bool(_hints_manager));
return bool(_hints_manager) || type == db::write_type::VIEW;
}
db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) {
return type == db::write_type::VIEW ? _hints_for_views_manager : *_hints_manager;
}
future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
@@ -4431,17 +4441,11 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s,
}
future<> storage_proxy::start_hints_manager(shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr) {
if (_hints_manager) {
return _hints_manager->start(shared_from_this(), std::move(gossiper_ptr), std::move(ss_ptr));
}
return make_ready_future<>();
return _hints_resource_manager.start(shared_from_this(), gossiper_ptr, ss_ptr);
}
future<> storage_proxy::stop_hints_manager() {
if (_hints_manager) {
return _hints_manager->stop();
}
return make_ready_future<>();
return _hints_resource_manager.stop();
}
future<>

View File

@@ -149,8 +149,9 @@ private:
// not remove request from the buffer), but this is fine since request ids are unique, so we
// just skip an entry if request no longer exists.
circular_buffer<response_id_type> _throttled_writes;
db::hints::resource_manager _hints_resource_manager;
stdx::optional<db::hints::manager> _hints_manager;
bool _hints_enabled_for_user_writes = false;
db::hints::manager _hints_for_views_manager;
stats _stats;
static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
// for read repair chance calculation
@@ -180,6 +181,7 @@ private:
template<typename Range>
bool cannot_hint(const Range& targets, db::write_type type);
bool hints_enabled(db::write_type type) noexcept;
db::hints::manager& hints_manager_for(db::write_type type);
std::vector<gms::inet_address> get_live_endpoints(keyspace& ks, const dht::token& token);
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token);
db::read_repair_decision new_read_repair_decision(const schema& s);