Compare commits

...

11 Commits

Author SHA1 Message Date
Yaron Kaikov
9908f009a4 release: prepare for 4.0.rc1 2020-04-06 10:22:45 +03:00
Pavel Emelyanov
48d8a075b4 main: Do not destroy token_metadata
The storage_proxy instances hold references to token_metadata ones and
leave unwaited futures continuing to its query_partition_key_range_concurrent
method.

The latter is called from do_query so it's not that easy to find
out who is leaking. Keep the tokens not freed for a while.

Fixes: #6093
Test: manual start-stop

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20200402183538.9674-1-xemul@scylladb.com>
(cherry picked from commit 86296ba557)
2020-04-05 13:47:57 +03:00
Konstantin Osipov
e3ddd607bc lwt: remove Paxos from experimental list
Always enable lightweight transactions. Remove the check for the command
line switch from the feature service, assuming LWT is always enabled.

Remove the check for LWT from Alternator.

Note that in order for the cluster to work with LWT, all nodes need
to support it.

Rename LWT to UNUSED in db/config.hh, to keep accepting lwt keyword in
--experimental-features command line option, but do nothing with it.

Changes in v2:
* remove enable_lwt feature flag, it's always there

Closes #6102

test: unit (dev, debug)
Message-Id: <20200401071149.41921-1-kostja@scylladb.com>
(cherry picked from commit 9948f548a5)
2020-04-05 08:56:42 +03:00
Piotr Jastrzebski
511773d466 token: relax the condition of the sanity check
When we switched token representation to int64_t
we added some sanity checks that byte representation
is always 8 bytes long.

It turns out that for token_kind::before_all_keys and
token_kind::after_all_keys bytes can sometimes be empty
because for those tokens they are just ignored. The check
introduced with the change is too strict and sometimes
throws the exception for tokens before/after all keys
created with empty bytes.

This patch relaxes the condition of the check and always
uses 0 as value of _data for special before/after all keys
tokens.

Fixes #6131

Tests: unit(dev, sct)

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit a15b32c9d9)
2020-04-04 20:19:10 +03:00
Gleb Natapov
121cd383fa lwt: remove entries from system.paxos table after successful learn stage
The learning stage of PAXOS protocol leaves behind an entry in
system.paxos table with the last learned value (which can be large). In
case not all participants learned it successfully next round on the same
key may complete the learning using this info. But if all nodes learned
the value the entry does not serve useful purpose any longer.

The patch adds another round, "prune", which is executed in background
(limited to 1000 simultaneous instances) and removes the entry in
case all nodes replied successfully to the "learn" round.  It uses the
ballot's timestamp to do the deletion, so not to interfere with the
next round. Since deletion happens very close to previous writes it will
likely happen in memtable and will never reach sstable, so that reduces
memtable flush and compaction overhead.

Fixes #5779

Message-Id: <20200330154853.GA31074@scylladb.com>
(cherry picked from commit 8a408ac5a8)
2020-04-02 15:36:52 +02:00
Gleb Natapov
90639f48e5 lwt: rename "in_progress_ballot" cell to "promise" in system.paxos table
The value that is stored in "in_progress_ballot" cell is the value of
promised ballot, so call the cell accordingly to avoid confusion
especially as we have a notion of "in progress" proposal in the code
which is not the same as in_progress_ballot here.

We can still do it without care about backwards compatibility since LWT
is still marked as experimental.

Fixes #6087.

Message-Id: <20200326095758.GA10219@scylladb.com>
(cherry picked from commit b3db6f5b04)
2020-04-02 15:36:49 +02:00
Calle Wilund
8d029a04aa db::commitlog: Don't write trailing zero block unless needed
Fixes #5899

When terminating (closing) a segment, we write a trailing block
of zero so reader can have an empty region after last used chunk
as end marker. This is due to using recycled, pre-allocated
segments with potentially non-zero data extending over the point
where we are ending the segment (i.e. we are not fully filling
the segment due to a huge mutation or similar).

However, if we reach end of segment writing the final block
(typically many small mutations), the file will end naturally
after the data written, and any trailing zero block would in fact
just extend the file further. While this will only happen once per
segment recycled (independent on how many times it is recycled),
it is still both slightly breaking the disk usage contract and
also potentially causing some disk stalls due to metadata changes
(though of course very infrequent).

We should only write trailing zero if we are below the max_size
file size when terminating

Adds a small size check to commitlog test to verify size bounds.
(Which breaks without the patch)

v2:
- Fix test to take into account that files might be deleted
  behind our backs.
v3:
- Fix test better, by doing verification _before_ segments are
  queued for delete.

Message-Id: <20200226121601.15347-2-calle@scylladb.com>
Message-Id: <20200324100235.23982-1-calle@scylladb.com>
(cherry picked from commit 9fee712d62)
2020-03-31 14:22:20 +03:00
Asias He
67995db899 gossip: Add an option to force gossip generation
Consider 3 nodes in the cluster, n1, n2, n3 with gossip generation
number g1, g2, g3.

n1, n2, n3 running scylla version with commit
0a52ecb6df (gossip: Fix max generation
drift measure)

One year later, user wants the upgrade n1,n2,n3 to a new version

when n3 does a rolling restart with a new version, n3 will use a
generation number g3'. Because g3' - g2 > MAX_GENERATION_DIFFERENCE and
g3' - g1 > MAX_GENERATION_DIFFERENCE, so g1 and g2 will reject n3's
gossip update and mark g3 as down.

Such unnecessary marking of node down can cause availability issues.
For example:

DC1: n1, n2
DC2: n3, n4

When n3 and n4 restart, n1 and n2 will mark n3 and n4 as down, which
causes the whole DC2 to be unavailable.

To fix, we can start the node with a gossip generation within
MAX_GENERATION_DIFFERENCE difference for the new node.

Once all the nodes run the version with commit
0a52ecb6df, the option is no logger
needed.

Fixes #5164

(cherry picked from commit 743b529c2b)
2020-03-30 12:36:20 +02:00
Yaron Kaikov
282cd0df7c dist/docker: Update SCYLLA_REPO_URL and VERSION defaults
Update the SCYLLA_REPO_URL and VERSION defaults to point to the latest
unstable 4.0 version. This will be used if someone runs "docker build"
locally. For the releases, the release pipelines will pass the stable
version repository URL and a specific release version.
2020-03-26 09:54:44 +02:00
Nadav Har'El
ce58994d30 sstable: default to LA format instead of KA format
Over the years, Scylla updated the sstable format from the KA format to
the LA format, and most recently to the MC format. On a mixed cluster -
as occurs during a rolling upgrade - we want all the nodes, even new ones,
to write sstables in the format preferred by the old version. The thinking
is that if the upgrade fails, and we want to downgrade all nodes back to
the older version, we don't want to lose data because we already have
too-new sstables.

So the current code starts by selecting the oldest format we ever had - KA,
and only switching this choice to LA and MC after we verify that all the
nodes in the cluster support these newer formats.

But before an agreement is reached on the new format, sstables may already
be created in the antique KA format. This is usually harmless - we can
read this format just fine. However, the KA format has a problem that it is
unable to represent table names or keyspaces with the "-" character in them,
because this character is used to separate the keyspace and table names in
the file name. For CQL, a "-" is not allowed anyway in keyspace or table
names; But for Alternator, this character is allowed - and if a KA table
happens to be created by accident (before the LA or MC formats are chosen),
it cannot be read again during boot, and Scylla cannot reboot.

The solution that this patch takes is to change Scylla's default sstable
format to LA (and, as before, if the entire cluster agrees, the newer MC
format will be used). From now on, new KA tables will never be written.
But we still fully support *reading* the KA format - this is important in
case some very old sstables never underwent compaction.

The old code had, confusingly, two places where the default KA format
was chosen. This patch fixes is so the new default (LA) is specified in
only one place.

Fixes #6071.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200324232607.4215-2-nyh@scylladb.com>
(cherry picked from commit 91aba40114)
2020-03-25 13:27:51 +01:00
Yaron Kaikov
78f5afec30 release: prepare for 4.0.rc0 2020-03-24 23:33:23 +02:00
24 changed files with 261 additions and 72 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=666.development
VERSION=4.0.rc1
if test -f version
then

View File

@@ -614,11 +614,17 @@ public:
future<sseg_ptr> terminate() {
assert(_closed);
if (!std::exchange(_terminated, true)) {
clogger.trace("{} is closed but not terminated.", *this);
if (_buffer.empty()) {
new_buffer(0);
// write a terminating zero block iff we are ending (a reused)
// block before actual file end.
// we should only get here when all actual data is
// already flushed (see below, close()).
if (size_on_disk() < _segment_manager->max_size) {
clogger.trace("{} is closed but not terminated.", *this);
if (_buffer.empty()) {
new_buffer(0);
}
return cycle(true, true);
}
return cycle(true, true);
}
return make_ready_future<sseg_ptr>(shared_from_this());
}

View File

@@ -689,6 +689,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
@@ -859,7 +860,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
}
bool db::config::check_experimental(experimental_features_t::feature f) const {
if (experimental()) {
if (experimental() && f != experimental_features_t::UNUSED) {
return true;
}
const auto& optval = experimental_features();
@@ -911,11 +912,13 @@ const db::extensions& db::config::extensions() const {
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
// We decided against using the construct-on-first-use idiom here:
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
// Lightweight transactions are no longer experimental. Map them
// to UNUSED switch for a while, then remove altogether.
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
}
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
return {LWT, UDF, CDC};
return {UDF, CDC};
}
template struct utils::config_file::named_value<seastar::log_level>;

View File

@@ -81,7 +81,7 @@ namespace db {
/// Enumeration of all valid values for the `experimental` config entry.
struct experimental_features_t {
enum feature { LWT, UDF, CDC };
enum feature { UNUSED, UDF, CDC };
static std::unordered_map<sstring, feature> map(); // See enum_option.
static std::vector<enum_option<experimental_features_t>> all();
};
@@ -278,6 +278,7 @@ public:
named_value<uint32_t> shutdown_announce_in_ms;
named_value<bool> developer_mode;
named_value<int32_t> skip_wait_for_gossip_to_settle;
named_value<int32_t> force_gossip_generation;
named_value<bool> experimental;
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
named_value<size_t> lsa_reclamation_step;

View File

@@ -187,7 +187,7 @@ schema_ptr batchlog() {
{{"cf_id", uuid_type}},
// regular columns
{
{"in_progress_ballot", timeuuid_type},
{"promise", timeuuid_type},
{"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl
{"most_recent_commit_at", timeuuid_type},
{"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl
@@ -2201,8 +2201,8 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
return service::paxos::paxos_state();
}
auto& row = results->one();
auto promised = row.has("in_progress_ballot")
? row.get_as<utils::UUID>("in_progress_ballot") : utils::UUID_gen::min_time_UUID(0);
auto promised = row.has("promise")
? row.get_as<utils::UUID>("promise") : utils::UUID_gen::min_time_UUID(0);
std::optional<service::paxos::proposal> accepted;
if (row.has("proposal")) {
@@ -2228,7 +2228,7 @@ static int32_t paxos_ttl_sec(const schema& s) {
}
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
@@ -2274,6 +2274,20 @@ future<> save_paxos_decision(const schema& s, const service::paxos::proposal& de
).discard_result();
}
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
// This should be called only if a learn stage succeeded on all replicas.
// In this case we can remove the paxos row using ballot's timestamp which
// guarantees that if there is more recent round it will not be affected.
static auto cql = format("DELETE FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
to_legacy(*key.get_compound_type(s), key.representation()),
s.id()
).discard_result();
}
} // namespace system_keyspace
sstring system_keyspace_name() {

View File

@@ -647,6 +647,7 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
} // namespace system_keyspace
} // namespace db

View File

@@ -58,19 +58,27 @@ public:
, _data(normalize(d)) { }
token(kind k, const bytes& b) : _kind(std::move(k)) {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
if (_kind != kind::key) {
_data = 0;
} else {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
token(kind k, bytes_view b) : _kind(std::move(k)) {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
if (_kind != kind::key) {
_data = 0;
} else {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
bool is_minimum() const {

View File

@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
ARG VERSION=666.development
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.0/latest/scylla.repo
ARG VERSION=4.0.*
ADD scylla_bashrc /scylla_bashrc

View File

@@ -110,10 +110,6 @@ feature_config feature_config_from_db_config(db::config& cfg) {
fcfg.enable_cdc = true;
}
if (cfg.check_experimental(db::experimental_features_t::LWT)) {
fcfg.enable_lwt = true;
}
return fcfg;
}
@@ -178,9 +174,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
if (_config.enable_cdc) {
features.insert(gms::features::CDC);
}
if (_config.enable_lwt) {
features.insert(gms::features::LWT);
}
features.insert(gms::features::LWT);
for (const sstring& s : _config.disabled_features) {
features.erase(s);

View File

@@ -41,7 +41,6 @@ struct feature_config {
bool enable_sstables_mc_format = false;
bool enable_user_defined_functions = false;
bool enable_cdc = false;
bool enable_lwt = false;
std::set<sstring> disabled_features;
feature_config();
};

View File

@@ -1725,8 +1725,12 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
// message on all cpus and forard them to cpu0 to process.
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
g.init_messaging_service_handler(do_bind);
}).then([this, generation_nbr, preload_local_states] {
}).then([this, generation_nbr, preload_local_states] () mutable {
build_seeds_list();
if (_cfg.force_gossip_generation() > 0) {
generation_nbr = _cfg.force_gossip_generation();
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
local_state.mark_alive();

17
main.cc
View File

@@ -662,9 +662,17 @@ int main(int ac, char** av) {
supervisor::notify("starting tokens manager");
token_metadata.start().get();
auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
token_metadata.stop().get();
});
// storage_proxy holds a reference on it and is not yet stopped.
// what's worse is that the calltrace
// storage_proxy::do_query
// ::query_partition_key_range
// ::query_partition_key_range_concurrent
// leaves unwaited futures on the reactor and once it gets there
// the token_metadata instance is accessed and ...
//
//auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
// token_metadata.stop().get();
//});
supervisor::notify("starting migration manager notifier");
mm_notifier.start().get();
@@ -1071,9 +1079,6 @@ int main(int ac, char** av) {
static sharded<alternator::executor> alternator_executor;
static sharded<alternator::server> alternator_server;
if (!cfg->check_experimental(db::experimental_features_t::LWT)) {
throw std::runtime_error("Alternator enabled, but needs experimental LWT feature which wasn't enabled");
}
net::inet_address addr;
try {
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();

View File

@@ -452,6 +452,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::PAXOS_PREPARE:
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
return 0;
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
// sent on a different connection to avoid potential deadlocks
@@ -1281,6 +1282,19 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
}
void messaging_service::register_paxos_prune(std::function<future<rpc::no_wait_type>(
const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info>)>&& func) {
register_handler(this, messaging_verb::PAXOS_PRUNE, std::move(func));
}
future<> messaging_service::unregister_paxos_prune() {
return unregister_handler(netw::messaging_verb::PAXOS_PRUNE);
}
future<>
messaging_service::send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id,
const partition_key& key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_PRUNE, netw::msg_addr(peer), schema_id, key, ballot, std::move(trace_info));
}
void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func) {
register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func));

View File

@@ -139,7 +139,8 @@ enum class messaging_verb : int32_t {
PAXOS_ACCEPT = 40,
PAXOS_LEARN = 41,
HINT_MUTATION = 42,
LAST = 43,
PAXOS_PRUNE = 43,
LAST = 44,
};
} // namespace netw
@@ -493,6 +494,14 @@ public:
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
std::optional<tracing::trace_info> trace_info = std::nullopt);
void register_paxos_prune(std::function<future<rpc::no_wait_type>(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key,
utils::UUID ballot, std::optional<tracing::trace_info>)>&& func);
future<> unregister_paxos_prune();
future<> send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id, const partition_key& key,
utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
future<> unregister_hint_mutation();

View File

@@ -190,4 +190,11 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
});
}
future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
logger.debug("Delete paxos state for ballot {}", ballot);
tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot);
return db::system_keyspace::delete_paxos_decision(*schema, key, ballot, timeout);
}
} // end of namespace "service::paxos"

View File

@@ -124,6 +124,9 @@ public:
clock_type::time_point timeout);
// Replica RPC endpoint for Paxos "learn".
static future<> learn(schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
// Replica RPC endpoint for pruning Paxos table
static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state);
};
} // end of namespace "service::paxos"

View File

@@ -171,6 +171,7 @@ public:
const schema_ptr& schema() {
return _schema;
}
// called only when all replicas replied
virtual void release_mutation() = 0;
};
@@ -300,9 +301,10 @@ public:
class cas_mutation : public mutation_holder {
lw_shared_ptr<paxos::proposal> _proposal;
shared_ptr<paxos_response_handler> _handler;
public:
explicit cas_mutation(paxos::proposal proposal , schema_ptr s)
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))) {
explicit cas_mutation(paxos::proposal proposal, schema_ptr s, shared_ptr<paxos_response_handler> handler)
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))), _handler(std::move(handler)) {
_size = _proposal->update.representation().size();
_schema = std::move(s);
}
@@ -327,7 +329,11 @@ public:
return true;
}
virtual void release_mutation() override {
_proposal.release();
// The handler will be set for "learn", but not for PAXOS repair
// since repair may not include all replicas
if (_handler) {
_handler->prune(_proposal->ballot);
}
}
};
@@ -1184,6 +1190,12 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
return f;
}
// debug output in mutate_internal needs this
std::ostream& operator<<(std::ostream& os, const paxos_response_handler& h) {
os << "paxos_response_handler{" << h.id() << "}";
return os;
}
// This function implements learning stage of Paxos protocol
future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool allow_hints) {
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", decision, _cl_for_learn);
@@ -1219,12 +1231,41 @@ future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool a
}
// Path for the "base" mutations
std::array<std::tuple<paxos::proposal, schema_ptr, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, _key.token())};
std::array<std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
}
void paxos_response_handler::prune(utils::UUID ballot) {
if (_has_dead_endpoints) {
return;
}
if ( _proxy->get_stats().cas_now_pruning >= pruning_limit) {
_proxy->get_stats().cas_coordinator_dropped_prune++;
return;
}
_proxy->get_stats().cas_now_pruning++;
_proxy->get_stats().cas_prune++;
// running in the background, but the amount of the bg job is limited by pruning_limit
// it is waited by holding shared pointer to storage_proxy which guaranties
// that storage_proxy::stop() will wait for this to complete
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
return futurize_apply([&] {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "prune: prune {} locally", ballot);
return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state);
} else {
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
netw::messaging_service& ms = netw::get_local_messaging_service();
return ms.send_paxos_prune(peer, _timeout, _schema->version(), _key.key(), ballot, tracing::make_trace_info(tr_state));
}
});
}).finally([h = shared_from_this()] {
h->_proxy->get_stats().cas_now_pruning++;
});
}
static std::vector<gms::inet_address>
replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
std::vector<gms::inet_address> endpoints;
@@ -1571,6 +1612,14 @@ void storage_proxy_stats::stats::register_stats() {
sm::make_histogram("cas_write_contention", sm::description("how many contended writes were encountered"),
{storage_proxy_stats::current_scheduling_group_label()},
[this]{ return cas_write_contention.get_histogram(1, 8);}),
sm::make_total_operations("cas_prune", cas_prune,
sm::description("how many times paxos prune was done after successful cas operation"),
{storage_proxy_stats::current_scheduling_group_label()}),
sm::make_total_operations("cas_dropped_prune", cas_coordinator_dropped_prune,
sm::description("how many times a coordinator did not perfom prune after cas"),
{storage_proxy_stats::current_scheduling_group_label()}),
});
_metrics.add_group(REPLICA_STATS_CATEGORY, {
@@ -1606,6 +1655,9 @@ void storage_proxy_stats::stats::register_stats() {
sm::description("number of operations that crossed a shard boundary"),
{storage_proxy_stats::current_scheduling_group_label()}),
sm::make_total_operations("cas_dropped_prune", cas_replica_dropped_prune,
sm::description("how many times a coordinator did not perfom prune after cas"),
{storage_proxy_stats::current_scheduling_group_label()}),
});
}
@@ -1879,11 +1931,11 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
}
storage_proxy::response_id_type
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& meta,
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
auto& [commit, s, t] = meta;
auto& [commit, s, h, t] = meta;
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s), cl,
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
db::write_type::CAS, tr_state, std::move(permit));
}
@@ -1898,7 +1950,7 @@ storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, s
auto keyspace_name = s->ks_name();
keyspace& ks = _db.local().find_keyspace(keyspace_name);
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s), std::move(endpoints),
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
}
@@ -2146,6 +2198,8 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
cl_for_paxos, participants + 1, live_endpoints.size());
}
bool dead = participants != live_endpoints.size();
// Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key.
// If the values received from different replicas match, we skip a separate query stage thus saving
// one network round trip. To generate less traffic, only closest replicas send data, others send
@@ -2153,7 +2207,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
// list of participants by proximity to this instance.
sort_endpoints_by_proximity(live_endpoints);
return paxos_participants{std::move(live_endpoints), required_participants};
return paxos_participants{std::move(live_endpoints), required_participants, dead};
}
@@ -4942,6 +4996,42 @@ void storage_proxy::init_messaging_service() {
return f;
});
ms.register_paxos_prune([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
static thread_local uint16_t pruning = 0;
static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit
auto src_addr = netw::messaging_service::get_source(cinfo);
auto src_ip = src_addr.addr;
tracing::trace_state_ptr tr_state;
if (trace_info) {
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
tracing::begin(tr_state);
tracing::trace(tr_state, "paxos_prune: message received from /{} ballot {}", src_ip, ballot);
}
if (pruning >= pruning_limit) {
get_stats().cas_replica_dropped_prune++;
tracing::trace(tr_state, "paxos_prune: do not prune due to overload", src_ip);
return make_ready_future<seastar::rpc::no_wait_type>(netw::messaging_service::no_wait());
}
pruning++;
return get_schema_for_read(schema_id, src_addr).then([this, key = std::move(key), ballot,
timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
dht::token token = dht::get_token(*schema, key);
unsigned shard = dht::shard_of(*schema, token);
bool local = shard == engine().cpu_id();
get_stats().replica_cross_shard_ops += !local;
return smp::submit_to(shard, _write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
local, key = std::move(key), ballot, timeout, src_ip, d = defer([] { pruning--; })] () {
tracing::trace_state_ptr tr_state = gt;
return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip);
return netw::messaging_service::no_wait();
});
});
});
});
}
future<> storage_proxy::uninit_messaging_service() {
@@ -4956,7 +5046,8 @@ future<> storage_proxy::uninit_messaging_service() {
ms.unregister_truncate(),
ms.unregister_paxos_prepare(),
ms.unregister_paxos_accept(),
ms.unregister_paxos_learn()
ms.unregister_paxos_learn(),
ms.unregister_paxos_prune()
);
}

View File

@@ -242,6 +242,7 @@ public:
std::vector<gms::inet_address> endpoints;
// How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL).
size_t required_participants;
bool has_dead_endpoints;
};
const gms::feature_service& features() const { return _features; }
@@ -317,7 +318,7 @@ private:
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& proposal,
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
@@ -634,6 +635,11 @@ private:
db::consistency_level _cl_for_learn;
// Live endpoints, as per get_paxos_participants()
std::vector<gms::inet_address> _live_endpoints;
// True if there are dead endpoints
// We don't include endpoints known to be unavailable in pending
// endpoints list, but need to be aware of them to avoid pruning
// system.paxos data if some endpoint is missing a Paxos write.
bool _has_dead_endpoints;
// How many endpoints need to respond favourably for the protocol to progress to the next step.
size_t _required_participants;
// A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms
@@ -651,6 +657,9 @@ private:
// Unique request id for logging purposes.
const uint64_t _id = next_id++;
// max pruning operations to run in parralel
static constexpr uint16_t pruning_limit = 1000;
public:
tracing::trace_state_ptr tr_state;
@@ -674,6 +683,7 @@ public:
storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(_schema->ks_name(), _key.token(), _cl_for_paxos);
_live_endpoints = std::move(pp.endpoints);
_required_participants = pp.required_participants;
_has_dead_endpoints = pp.has_dead_endpoints;
tracing::trace(tr_state, "Create paxos_response_handler for token {} with live: {} and required participants: {}",
_key.token(), _live_endpoints, _required_participants);
}
@@ -691,6 +701,7 @@ public:
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
void prune(utils::UUID ballot);
uint64_t id() const {
return _id;
}

View File

@@ -116,6 +116,11 @@ struct write_stats {
uint64_t cas_write_condition_not_met = 0;
uint64_t cas_write_timeout_due_to_uncertainty = 0;
uint64_t cas_failed_read_round_optimization = 0;
uint16_t cas_now_pruning = 0;
uint64_t cas_prune = 0;
uint64_t cas_coordinator_dropped_prune = 0;
uint64_t cas_replica_dropped_prune = 0;
std::chrono::microseconds last_mv_flow_control_delay; // delay added for MV flow control in the last request
public:

View File

@@ -3409,10 +3409,13 @@ void feature_enabled_listener::on_enabled() {
future<> read_sstables_format(distributed<storage_service>& ss) {
return db::system_keyspace::get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME).then([&ss] (std::optional<sstring> format_opt) {
sstables::sstable_version_types format = sstables::from_string(format_opt.value_or("ka"));
return ss.invoke_on_all([format] (storage_service& s) {
s._sstables_format = format;
});
if (format_opt) {
sstables::sstable_version_types format = sstables::from_string(*format_opt);
return ss.invoke_on_all([format] (storage_service& s) {
s._sstables_format = format;
});
}
return make_ready_future<>();
});
}

View File

@@ -312,7 +312,13 @@ private:
*/
std::optional<db_clock::time_point> _cdc_streams_ts;
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka;
// _sstables_format is the format used for writing new sstables.
// Here we set its default value, but if we discover that all the nodes
// in the cluster support a newer format, _sstables_format will be set to
// that format. read_sstables_format() also overwrites _sstables_format
// if an sstable format was chosen earlier (and this choice was persisted
// in the system table).
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::la;
seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}};
feature_enabled_listener _la_feature_listener;
feature_enabled_listener _mc_feature_listener;

View File

@@ -296,7 +296,9 @@ SEASTAR_TEST_CASE(test_commitlog_closed) {
SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 2;
constexpr auto max_size_mb = 2;
cfg.commitlog_segment_size_in_mb = max_size_mb;
cfg.commitlog_total_space_in_mb = 1;
cfg.commitlog_sync_period_in_ms = 1;
return cl_test(cfg, [](commitlog& log) {
@@ -306,8 +308,15 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
// add a flush handler that simply says we're done with the range.
auto r = log.add_flush_handler([&log, sem, segments](cf_id_type id, replay_position pos) {
*segments = log.get_active_segment_names();
log.discard_completed_segments(id);
sem->signal();
// Verify #5899 - file size should not exceed the config max.
return parallel_for_each(*segments, [](sstring filename) {
return file_size(filename).then([](uint64_t size) {
BOOST_REQUIRE_LE(size, max_size_mb * 1024 * 1024);
});
}).then([&log, sem, id] {
log.discard_completed_segments(id);
sem->signal();
});
});
auto set = make_lw_shared<std::set<segment_id_type>>();

View File

@@ -930,17 +930,17 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -950,7 +950,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -958,9 +958,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -973,7 +973,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
});
return make_ready_future();
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
config cfg;
cfg.read_from_yaml("experimental: true", throw_on_error);
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -992,7 +992,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
config cfg;
cfg.read_from_yaml("experimental: false", throw_on_error);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}

View File

@@ -397,9 +397,6 @@ public:
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
cfg->num_tokens.set(256);
cfg->ring_delay_ms.set(500);
auto features = cfg->experimental_features();
features.emplace_back(db::experimental_features_t::LWT);
cfg->experimental_features(features);
cfg->shutdown_announce_in_ms.set(0);
cfg->broadcast_to_all_shards().get();
create_directories((data_dir_path + "/system").c_str());
@@ -439,7 +436,6 @@ public:
gms::feature_config fcfg;
fcfg.enable_cdc = true;
fcfg.enable_lwt = true;
fcfg.enable_sstables_mc_format = true;
if (cfg->enable_user_defined_functions()) {
fcfg.enable_user_defined_functions = true;