Compare commits
11 Commits
next-5.2
...
scylla-4.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9908f009a4 | ||
|
|
48d8a075b4 | ||
|
|
e3ddd607bc | ||
|
|
511773d466 | ||
|
|
121cd383fa | ||
|
|
90639f48e5 | ||
|
|
8d029a04aa | ||
|
|
67995db899 | ||
|
|
282cd0df7c | ||
|
|
ce58994d30 | ||
|
|
78f5afec30 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.0.rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -614,11 +614,17 @@ public:
|
||||
future<sseg_ptr> terminate() {
|
||||
assert(_closed);
|
||||
if (!std::exchange(_terminated, true)) {
|
||||
clogger.trace("{} is closed but not terminated.", *this);
|
||||
if (_buffer.empty()) {
|
||||
new_buffer(0);
|
||||
// write a terminating zero block iff we are ending (a reused)
|
||||
// block before actual file end.
|
||||
// we should only get here when all actual data is
|
||||
// already flushed (see below, close()).
|
||||
if (size_on_disk() < _segment_manager->max_size) {
|
||||
clogger.trace("{} is closed but not terminated.", *this);
|
||||
if (_buffer.empty()) {
|
||||
new_buffer(0);
|
||||
}
|
||||
return cycle(true, true);
|
||||
}
|
||||
return cycle(true, true);
|
||||
}
|
||||
return make_ready_future<sseg_ptr>(shared_from_this());
|
||||
}
|
||||
|
||||
@@ -689,6 +689,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||
@@ -859,7 +860,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
|
||||
}
|
||||
|
||||
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
||||
if (experimental()) {
|
||||
if (experimental() && f != experimental_features_t::UNUSED) {
|
||||
return true;
|
||||
}
|
||||
const auto& optval = experimental_features();
|
||||
@@ -911,11 +912,13 @@ const db::extensions& db::config::extensions() const {
|
||||
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||
// We decided against using the construct-on-first-use idiom here:
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
|
||||
// Lightweight transactions are no longer experimental. Map them
|
||||
// to UNUSED switch for a while, then remove altogether.
|
||||
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
|
||||
}
|
||||
|
||||
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
|
||||
return {LWT, UDF, CDC};
|
||||
return {UDF, CDC};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
@@ -81,7 +81,7 @@ namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { LWT, UDF, CDC };
|
||||
enum feature { UNUSED, UDF, CDC };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
static std::vector<enum_option<experimental_features_t>> all();
|
||||
};
|
||||
@@ -278,6 +278,7 @@ public:
|
||||
named_value<uint32_t> shutdown_announce_in_ms;
|
||||
named_value<bool> developer_mode;
|
||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||
named_value<int32_t> force_gossip_generation;
|
||||
named_value<bool> experimental;
|
||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||
named_value<size_t> lsa_reclamation_step;
|
||||
|
||||
@@ -187,7 +187,7 @@ schema_ptr batchlog() {
|
||||
{{"cf_id", uuid_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"in_progress_ballot", timeuuid_type},
|
||||
{"promise", timeuuid_type},
|
||||
{"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl
|
||||
{"most_recent_commit_at", timeuuid_type},
|
||||
{"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl
|
||||
@@ -2201,8 +2201,8 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
|
||||
return service::paxos::paxos_state();
|
||||
}
|
||||
auto& row = results->one();
|
||||
auto promised = row.has("in_progress_ballot")
|
||||
? row.get_as<utils::UUID>("in_progress_ballot") : utils::UUID_gen::min_time_UUID(0);
|
||||
auto promised = row.has("promise")
|
||||
? row.get_as<utils::UUID>("promise") : utils::UUID_gen::min_time_UUID(0);
|
||||
|
||||
std::optional<service::paxos::proposal> accepted;
|
||||
if (row.has("proposal")) {
|
||||
@@ -2228,7 +2228,7 @@ static int32_t paxos_ttl_sec(const schema& s) {
|
||||
}
|
||||
|
||||
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
|
||||
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||
return execute_cql_with_timeout(cql,
|
||||
timeout,
|
||||
utils::UUID_gen::micros_timestamp(ballot),
|
||||
@@ -2274,6 +2274,20 @@ future<> save_paxos_decision(const schema& s, const service::paxos::proposal& de
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
|
||||
// This should be called only if a learn stage succeeded on all replicas.
|
||||
// In this case we can remove the paxos row using ballot's timestamp which
|
||||
// guarantees that if there is more recent round it will not be affected.
|
||||
static auto cql = format("DELETE FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||
|
||||
return execute_cql_with_timeout(cql,
|
||||
timeout,
|
||||
utils::UUID_gen::micros_timestamp(ballot),
|
||||
to_legacy(*key.get_compound_type(s), key.representation()),
|
||||
s.id()
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
} // namespace system_keyspace
|
||||
|
||||
sstring system_keyspace_name() {
|
||||
|
||||
@@ -647,6 +647,7 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
|
||||
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
|
||||
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
|
||||
future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
|
||||
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
|
||||
|
||||
} // namespace system_keyspace
|
||||
} // namespace db
|
||||
|
||||
24
dht/token.hh
24
dht/token.hh
@@ -58,19 +58,27 @@ public:
|
||||
, _data(normalize(d)) { }
|
||||
|
||||
token(kind k, const bytes& b) : _kind(std::move(k)) {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
if (_kind != kind::key) {
|
||||
_data = 0;
|
||||
} else {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
|
||||
token(kind k, bytes_view b) : _kind(std::move(k)) {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
if (_kind != kind::key) {
|
||||
_data = 0;
|
||||
} else {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
|
||||
bool is_minimum() const {
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=666.development
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.0/latest/scylla.repo
|
||||
ARG VERSION=4.0.*
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -110,10 +110,6 @@ feature_config feature_config_from_db_config(db::config& cfg) {
|
||||
fcfg.enable_cdc = true;
|
||||
}
|
||||
|
||||
if (cfg.check_experimental(db::experimental_features_t::LWT)) {
|
||||
fcfg.enable_lwt = true;
|
||||
}
|
||||
|
||||
return fcfg;
|
||||
}
|
||||
|
||||
@@ -178,9 +174,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
|
||||
if (_config.enable_cdc) {
|
||||
features.insert(gms::features::CDC);
|
||||
}
|
||||
if (_config.enable_lwt) {
|
||||
features.insert(gms::features::LWT);
|
||||
}
|
||||
features.insert(gms::features::LWT);
|
||||
|
||||
for (const sstring& s : _config.disabled_features) {
|
||||
features.erase(s);
|
||||
|
||||
@@ -41,7 +41,6 @@ struct feature_config {
|
||||
bool enable_sstables_mc_format = false;
|
||||
bool enable_user_defined_functions = false;
|
||||
bool enable_cdc = false;
|
||||
bool enable_lwt = false;
|
||||
std::set<sstring> disabled_features;
|
||||
feature_config();
|
||||
};
|
||||
|
||||
@@ -1725,8 +1725,12 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
// message on all cpus and forard them to cpu0 to process.
|
||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
g.init_messaging_service_handler(do_bind);
|
||||
}).then([this, generation_nbr, preload_local_states] {
|
||||
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||
build_seeds_list();
|
||||
if (_cfg.force_gossip_generation() > 0) {
|
||||
generation_nbr = _cfg.force_gossip_generation();
|
||||
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
|
||||
}
|
||||
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
||||
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
||||
local_state.mark_alive();
|
||||
|
||||
17
main.cc
17
main.cc
@@ -662,9 +662,17 @@ int main(int ac, char** av) {
|
||||
|
||||
supervisor::notify("starting tokens manager");
|
||||
token_metadata.start().get();
|
||||
auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
|
||||
token_metadata.stop().get();
|
||||
});
|
||||
// storage_proxy holds a reference on it and is not yet stopped.
|
||||
// what's worse is that the calltrace
|
||||
// storage_proxy::do_query
|
||||
// ::query_partition_key_range
|
||||
// ::query_partition_key_range_concurrent
|
||||
// leaves unwaited futures on the reactor and once it gets there
|
||||
// the token_metadata instance is accessed and ...
|
||||
//
|
||||
//auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
|
||||
// token_metadata.stop().get();
|
||||
//});
|
||||
|
||||
supervisor::notify("starting migration manager notifier");
|
||||
mm_notifier.start().get();
|
||||
@@ -1071,9 +1079,6 @@ int main(int ac, char** av) {
|
||||
static sharded<alternator::executor> alternator_executor;
|
||||
static sharded<alternator::server> alternator_server;
|
||||
|
||||
if (!cfg->check_experimental(db::experimental_features_t::LWT)) {
|
||||
throw std::runtime_error("Alternator enabled, but needs experimental LWT feature which wasn't enabled");
|
||||
}
|
||||
net::inet_address addr;
|
||||
try {
|
||||
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();
|
||||
|
||||
@@ -452,6 +452,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::PAXOS_PREPARE:
|
||||
case messaging_verb::PAXOS_ACCEPT:
|
||||
case messaging_verb::PAXOS_LEARN:
|
||||
case messaging_verb::PAXOS_PRUNE:
|
||||
return 0;
|
||||
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
|
||||
// sent on a different connection to avoid potential deadlocks
|
||||
@@ -1281,6 +1282,19 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point
|
||||
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
|
||||
}
|
||||
|
||||
void messaging_service::register_paxos_prune(std::function<future<rpc::no_wait_type>(
|
||||
const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info>)>&& func) {
|
||||
register_handler(this, messaging_verb::PAXOS_PRUNE, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_paxos_prune() {
|
||||
return unregister_handler(netw::messaging_verb::PAXOS_PRUNE);
|
||||
}
|
||||
future<>
|
||||
messaging_service::send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id,
|
||||
const partition_key& key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
|
||||
return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_PRUNE, netw::msg_addr(peer), schema_id, key, ballot, std::move(trace_info));
|
||||
}
|
||||
|
||||
void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func) {
|
||||
register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func));
|
||||
|
||||
@@ -139,7 +139,8 @@ enum class messaging_verb : int32_t {
|
||||
PAXOS_ACCEPT = 40,
|
||||
PAXOS_LEARN = 41,
|
||||
HINT_MUTATION = 42,
|
||||
LAST = 43,
|
||||
PAXOS_PRUNE = 43,
|
||||
LAST = 44,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -493,6 +494,14 @@ public:
|
||||
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||
std::optional<tracing::trace_info> trace_info = std::nullopt);
|
||||
|
||||
void register_paxos_prune(std::function<future<rpc::no_wait_type>(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key,
|
||||
utils::UUID ballot, std::optional<tracing::trace_info>)>&& func);
|
||||
|
||||
future<> unregister_paxos_prune();
|
||||
|
||||
future<> send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id, const partition_key& key,
|
||||
utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
|
||||
|
||||
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
|
||||
future<> unregister_hint_mutation();
|
||||
|
||||
@@ -190,4 +190,11 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
|
||||
});
|
||||
}
|
||||
|
||||
future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state) {
|
||||
logger.debug("Delete paxos state for ballot {}", ballot);
|
||||
tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot);
|
||||
return db::system_keyspace::delete_paxos_decision(*schema, key, ballot, timeout);
|
||||
}
|
||||
|
||||
} // end of namespace "service::paxos"
|
||||
|
||||
@@ -124,6 +124,9 @@ public:
|
||||
clock_type::time_point timeout);
|
||||
// Replica RPC endpoint for Paxos "learn".
|
||||
static future<> learn(schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
|
||||
// Replica RPC endpoint for pruning Paxos table
|
||||
static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state);
|
||||
};
|
||||
|
||||
} // end of namespace "service::paxos"
|
||||
|
||||
@@ -171,6 +171,7 @@ public:
|
||||
const schema_ptr& schema() {
|
||||
return _schema;
|
||||
}
|
||||
// called only when all replicas replied
|
||||
virtual void release_mutation() = 0;
|
||||
};
|
||||
|
||||
@@ -300,9 +301,10 @@ public:
|
||||
|
||||
class cas_mutation : public mutation_holder {
|
||||
lw_shared_ptr<paxos::proposal> _proposal;
|
||||
shared_ptr<paxos_response_handler> _handler;
|
||||
public:
|
||||
explicit cas_mutation(paxos::proposal proposal , schema_ptr s)
|
||||
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))) {
|
||||
explicit cas_mutation(paxos::proposal proposal, schema_ptr s, shared_ptr<paxos_response_handler> handler)
|
||||
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))), _handler(std::move(handler)) {
|
||||
_size = _proposal->update.representation().size();
|
||||
_schema = std::move(s);
|
||||
}
|
||||
@@ -327,7 +329,11 @@ public:
|
||||
return true;
|
||||
}
|
||||
virtual void release_mutation() override {
|
||||
_proposal.release();
|
||||
// The handler will be set for "learn", but not for PAXOS repair
|
||||
// since repair may not include all replicas
|
||||
if (_handler) {
|
||||
_handler->prune(_proposal->ballot);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1184,6 +1190,12 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
|
||||
return f;
|
||||
}
|
||||
|
||||
// debug output in mutate_internal needs this
|
||||
std::ostream& operator<<(std::ostream& os, const paxos_response_handler& h) {
|
||||
os << "paxos_response_handler{" << h.id() << "}";
|
||||
return os;
|
||||
}
|
||||
|
||||
// This function implements learning stage of Paxos protocol
|
||||
future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool allow_hints) {
|
||||
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", decision, _cl_for_learn);
|
||||
@@ -1219,12 +1231,41 @@ future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool a
|
||||
}
|
||||
|
||||
// Path for the "base" mutations
|
||||
std::array<std::tuple<paxos::proposal, schema_ptr, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, _key.token())};
|
||||
std::array<std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
|
||||
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
|
||||
|
||||
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
|
||||
}
|
||||
|
||||
void paxos_response_handler::prune(utils::UUID ballot) {
|
||||
if (_has_dead_endpoints) {
|
||||
return;
|
||||
}
|
||||
if ( _proxy->get_stats().cas_now_pruning >= pruning_limit) {
|
||||
_proxy->get_stats().cas_coordinator_dropped_prune++;
|
||||
return;
|
||||
}
|
||||
_proxy->get_stats().cas_now_pruning++;
|
||||
_proxy->get_stats().cas_prune++;
|
||||
// running in the background, but the amount of the bg job is limited by pruning_limit
|
||||
// it is waited by holding shared pointer to storage_proxy which guaranties
|
||||
// that storage_proxy::stop() will wait for this to complete
|
||||
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
|
||||
return futurize_apply([&] {
|
||||
if (fbu::is_me(peer)) {
|
||||
tracing::trace(tr_state, "prune: prune {} locally", ballot);
|
||||
return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state);
|
||||
} else {
|
||||
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
|
||||
netw::messaging_service& ms = netw::get_local_messaging_service();
|
||||
return ms.send_paxos_prune(peer, _timeout, _schema->version(), _key.key(), ballot, tracing::make_trace_info(tr_state));
|
||||
}
|
||||
});
|
||||
}).finally([h = shared_from_this()] {
|
||||
h->_proxy->get_stats().cas_now_pruning++;
|
||||
});
|
||||
}
|
||||
|
||||
static std::vector<gms::inet_address>
|
||||
replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
|
||||
std::vector<gms::inet_address> endpoints;
|
||||
@@ -1571,6 +1612,14 @@ void storage_proxy_stats::stats::register_stats() {
|
||||
sm::make_histogram("cas_write_contention", sm::description("how many contended writes were encountered"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()},
|
||||
[this]{ return cas_write_contention.get_histogram(1, 8);}),
|
||||
|
||||
sm::make_total_operations("cas_prune", cas_prune,
|
||||
sm::description("how many times paxos prune was done after successful cas operation"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
|
||||
sm::make_total_operations("cas_dropped_prune", cas_coordinator_dropped_prune,
|
||||
sm::description("how many times a coordinator did not perfom prune after cas"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
});
|
||||
|
||||
_metrics.add_group(REPLICA_STATS_CATEGORY, {
|
||||
@@ -1606,6 +1655,9 @@ void storage_proxy_stats::stats::register_stats() {
|
||||
sm::description("number of operations that crossed a shard boundary"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
|
||||
sm::make_total_operations("cas_dropped_prune", cas_replica_dropped_prune,
|
||||
sm::description("how many times a coordinator did not perfom prune after cas"),
|
||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1879,11 +1931,11 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
|
||||
}
|
||||
|
||||
storage_proxy::response_id_type
|
||||
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& meta,
|
||||
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& meta,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
|
||||
auto& [commit, s, t] = meta;
|
||||
auto& [commit, s, h, t] = meta;
|
||||
|
||||
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s), cl,
|
||||
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
|
||||
db::write_type::CAS, tr_state, std::move(permit));
|
||||
}
|
||||
|
||||
@@ -1898,7 +1950,7 @@ storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, s
|
||||
auto keyspace_name = s->ks_name();
|
||||
keyspace& ks = _db.local().find_keyspace(keyspace_name);
|
||||
|
||||
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s), std::move(endpoints),
|
||||
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
|
||||
std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
|
||||
}
|
||||
|
||||
@@ -2146,6 +2198,8 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
||||
cl_for_paxos, participants + 1, live_endpoints.size());
|
||||
}
|
||||
|
||||
bool dead = participants != live_endpoints.size();
|
||||
|
||||
// Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key.
|
||||
// If the values received from different replicas match, we skip a separate query stage thus saving
|
||||
// one network round trip. To generate less traffic, only closest replicas send data, others send
|
||||
@@ -2153,7 +2207,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
||||
// list of participants by proximity to this instance.
|
||||
sort_endpoints_by_proximity(live_endpoints);
|
||||
|
||||
return paxos_participants{std::move(live_endpoints), required_participants};
|
||||
return paxos_participants{std::move(live_endpoints), required_participants, dead};
|
||||
}
|
||||
|
||||
|
||||
@@ -4942,6 +4996,42 @@ void storage_proxy::init_messaging_service() {
|
||||
|
||||
return f;
|
||||
});
|
||||
ms.register_paxos_prune([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
|
||||
static thread_local uint16_t pruning = 0;
|
||||
static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit
|
||||
auto src_addr = netw::messaging_service::get_source(cinfo);
|
||||
auto src_ip = src_addr.addr;
|
||||
tracing::trace_state_ptr tr_state;
|
||||
if (trace_info) {
|
||||
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
|
||||
tracing::begin(tr_state);
|
||||
tracing::trace(tr_state, "paxos_prune: message received from /{} ballot {}", src_ip, ballot);
|
||||
}
|
||||
|
||||
if (pruning >= pruning_limit) {
|
||||
get_stats().cas_replica_dropped_prune++;
|
||||
tracing::trace(tr_state, "paxos_prune: do not prune due to overload", src_ip);
|
||||
return make_ready_future<seastar::rpc::no_wait_type>(netw::messaging_service::no_wait());
|
||||
}
|
||||
|
||||
pruning++;
|
||||
return get_schema_for_read(schema_id, src_addr).then([this, key = std::move(key), ballot,
|
||||
timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
|
||||
dht::token token = dht::get_token(*schema, key);
|
||||
unsigned shard = dht::shard_of(*schema, token);
|
||||
bool local = shard == engine().cpu_id();
|
||||
get_stats().replica_cross_shard_ops += !local;
|
||||
return smp::submit_to(shard, _write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
|
||||
local, key = std::move(key), ballot, timeout, src_ip, d = defer([] { pruning--; })] () {
|
||||
tracing::trace_state_ptr tr_state = gt;
|
||||
return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
|
||||
tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip);
|
||||
return netw::messaging_service::no_wait();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_proxy::uninit_messaging_service() {
|
||||
@@ -4956,7 +5046,8 @@ future<> storage_proxy::uninit_messaging_service() {
|
||||
ms.unregister_truncate(),
|
||||
ms.unregister_paxos_prepare(),
|
||||
ms.unregister_paxos_accept(),
|
||||
ms.unregister_paxos_learn()
|
||||
ms.unregister_paxos_learn(),
|
||||
ms.unregister_paxos_prune()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -242,6 +242,7 @@ public:
|
||||
std::vector<gms::inet_address> endpoints;
|
||||
// How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL).
|
||||
size_t required_participants;
|
||||
bool has_dead_endpoints;
|
||||
};
|
||||
|
||||
const gms::feature_service& features() const { return _features; }
|
||||
@@ -317,7 +318,7 @@ private:
|
||||
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& proposal,
|
||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
@@ -634,6 +635,11 @@ private:
|
||||
db::consistency_level _cl_for_learn;
|
||||
// Live endpoints, as per get_paxos_participants()
|
||||
std::vector<gms::inet_address> _live_endpoints;
|
||||
// True if there are dead endpoints
|
||||
// We don't include endpoints known to be unavailable in pending
|
||||
// endpoints list, but need to be aware of them to avoid pruning
|
||||
// system.paxos data if some endpoint is missing a Paxos write.
|
||||
bool _has_dead_endpoints;
|
||||
// How many endpoints need to respond favourably for the protocol to progress to the next step.
|
||||
size_t _required_participants;
|
||||
// A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms
|
||||
@@ -651,6 +657,9 @@ private:
|
||||
// Unique request id for logging purposes.
|
||||
const uint64_t _id = next_id++;
|
||||
|
||||
// max pruning operations to run in parralel
|
||||
static constexpr uint16_t pruning_limit = 1000;
|
||||
|
||||
public:
|
||||
tracing::trace_state_ptr tr_state;
|
||||
|
||||
@@ -674,6 +683,7 @@ public:
|
||||
storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(_schema->ks_name(), _key.token(), _cl_for_paxos);
|
||||
_live_endpoints = std::move(pp.endpoints);
|
||||
_required_participants = pp.required_participants;
|
||||
_has_dead_endpoints = pp.has_dead_endpoints;
|
||||
tracing::trace(tr_state, "Create paxos_response_handler for token {} with live: {} and required participants: {}",
|
||||
_key.token(), _live_endpoints, _required_participants);
|
||||
}
|
||||
@@ -691,6 +701,7 @@ public:
|
||||
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
|
||||
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
|
||||
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
|
||||
void prune(utils::UUID ballot);
|
||||
uint64_t id() const {
|
||||
return _id;
|
||||
}
|
||||
|
||||
@@ -116,6 +116,11 @@ struct write_stats {
|
||||
uint64_t cas_write_condition_not_met = 0;
|
||||
uint64_t cas_write_timeout_due_to_uncertainty = 0;
|
||||
uint64_t cas_failed_read_round_optimization = 0;
|
||||
uint16_t cas_now_pruning = 0;
|
||||
uint64_t cas_prune = 0;
|
||||
uint64_t cas_coordinator_dropped_prune = 0;
|
||||
uint64_t cas_replica_dropped_prune = 0;
|
||||
|
||||
|
||||
std::chrono::microseconds last_mv_flow_control_delay; // delay added for MV flow control in the last request
|
||||
public:
|
||||
|
||||
@@ -3409,10 +3409,13 @@ void feature_enabled_listener::on_enabled() {
|
||||
|
||||
future<> read_sstables_format(distributed<storage_service>& ss) {
|
||||
return db::system_keyspace::get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME).then([&ss] (std::optional<sstring> format_opt) {
|
||||
sstables::sstable_version_types format = sstables::from_string(format_opt.value_or("ka"));
|
||||
return ss.invoke_on_all([format] (storage_service& s) {
|
||||
s._sstables_format = format;
|
||||
});
|
||||
if (format_opt) {
|
||||
sstables::sstable_version_types format = sstables::from_string(*format_opt);
|
||||
return ss.invoke_on_all([format] (storage_service& s) {
|
||||
s._sstables_format = format;
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -312,7 +312,13 @@ private:
|
||||
*/
|
||||
std::optional<db_clock::time_point> _cdc_streams_ts;
|
||||
|
||||
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka;
|
||||
// _sstables_format is the format used for writing new sstables.
|
||||
// Here we set its default value, but if we discover that all the nodes
|
||||
// in the cluster support a newer format, _sstables_format will be set to
|
||||
// that format. read_sstables_format() also overwrites _sstables_format
|
||||
// if an sstable format was chosen earlier (and this choice was persisted
|
||||
// in the system table).
|
||||
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::la;
|
||||
seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}};
|
||||
feature_enabled_listener _la_feature_listener;
|
||||
feature_enabled_listener _mc_feature_listener;
|
||||
|
||||
@@ -296,7 +296,9 @@ SEASTAR_TEST_CASE(test_commitlog_closed) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
||||
commitlog::config cfg;
|
||||
cfg.commitlog_segment_size_in_mb = 2;
|
||||
|
||||
constexpr auto max_size_mb = 2;
|
||||
cfg.commitlog_segment_size_in_mb = max_size_mb;
|
||||
cfg.commitlog_total_space_in_mb = 1;
|
||||
cfg.commitlog_sync_period_in_ms = 1;
|
||||
return cl_test(cfg, [](commitlog& log) {
|
||||
@@ -306,8 +308,15 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
||||
// add a flush handler that simply says we're done with the range.
|
||||
auto r = log.add_flush_handler([&log, sem, segments](cf_id_type id, replay_position pos) {
|
||||
*segments = log.get_active_segment_names();
|
||||
log.discard_completed_segments(id);
|
||||
sem->signal();
|
||||
// Verify #5899 - file size should not exceed the config max.
|
||||
return parallel_for_each(*segments, [](sstring filename) {
|
||||
return file_size(filename).then([](uint64_t size) {
|
||||
BOOST_REQUIRE_LE(size, max_size_mb * 1024 * 1024);
|
||||
});
|
||||
}).then([&log, sem, id] {
|
||||
log.discard_completed_segments(id);
|
||||
sem->signal();
|
||||
});
|
||||
});
|
||||
|
||||
auto set = make_lw_shared<std::set<segment_id_type>>();
|
||||
|
||||
@@ -930,17 +930,17 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -950,7 +950,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -958,9 +958,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -973,7 +973,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
});
|
||||
return make_ready_future();
|
||||
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -992,7 +992,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -397,9 +397,6 @@ public:
|
||||
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
|
||||
cfg->num_tokens.set(256);
|
||||
cfg->ring_delay_ms.set(500);
|
||||
auto features = cfg->experimental_features();
|
||||
features.emplace_back(db::experimental_features_t::LWT);
|
||||
cfg->experimental_features(features);
|
||||
cfg->shutdown_announce_in_ms.set(0);
|
||||
cfg->broadcast_to_all_shards().get();
|
||||
create_directories((data_dir_path + "/system").c_str());
|
||||
@@ -439,7 +436,6 @@ public:
|
||||
|
||||
gms::feature_config fcfg;
|
||||
fcfg.enable_cdc = true;
|
||||
fcfg.enable_lwt = true;
|
||||
fcfg.enable_sstables_mc_format = true;
|
||||
if (cfg->enable_user_defined_functions()) {
|
||||
fcfg.enable_user_defined_functions = true;
|
||||
|
||||
Reference in New Issue
Block a user