Compare commits
115 Commits
copilot/co
...
scylla-6.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b60f9ef4c2 | ||
|
|
00e96d4b70 | ||
|
|
848054079b | ||
|
|
c80cefe422 | ||
|
|
b07c74a65c | ||
|
|
2556c7a0dc | ||
|
|
132d77f447 | ||
|
|
bb9249f055 | ||
|
|
e4a18b0858 | ||
|
|
105293b2ab | ||
|
|
ad47c0e2f9 | ||
|
|
0eb66cbee5 | ||
|
|
a2458f07d7 | ||
|
|
b484effcad | ||
|
|
4c4d1cce14 | ||
|
|
c64ae3f839 | ||
|
|
f77686cefb | ||
|
|
d6a1a55d6c | ||
|
|
9db819763b | ||
|
|
4769e694d1 | ||
|
|
74012c562a | ||
|
|
51215fb7f7 | ||
|
|
93fbe3af12 | ||
|
|
b164ea4a68 | ||
|
|
2db808e364 | ||
|
|
e6d2d29dd1 | ||
|
|
01661e1eaa | ||
|
|
6232982772 | ||
|
|
6418787ee0 | ||
|
|
06d6cf5608 | ||
|
|
1f8d8fd3db | ||
|
|
bc03d13c76 | ||
|
|
4b4dbc1112 | ||
|
|
739be17801 | ||
|
|
7fc15ce200 | ||
|
|
164d58b0d5 | ||
|
|
0839df3dbf | ||
|
|
64befbca61 | ||
|
|
8ee09f4353 | ||
|
|
e84d8b1205 | ||
|
|
6692c1702d | ||
|
|
415bdf3160 | ||
|
|
6b2d0f5934 | ||
|
|
0f990a8dc5 | ||
|
|
5f8b199253 | ||
|
|
2288f98d83 | ||
|
|
3ed214a728 | ||
|
|
b7e6f22999 | ||
|
|
31f3ff37f4 | ||
|
|
828595786a | ||
|
|
fdbb0cdef3 | ||
|
|
912c46e07f | ||
|
|
e80c587da3 | ||
|
|
485a508cb3 | ||
|
|
1683b07d2e | ||
|
|
853d2ec76f | ||
|
|
0b1dbb3a64 | ||
|
|
e13d5ee834 | ||
|
|
505cad64ad | ||
|
|
4b88ec4722 | ||
|
|
13aa97a00f | ||
|
|
c336ee63a3 | ||
|
|
8d90b81766 | ||
|
|
bc0097688f | ||
|
|
69c1a0e2ca | ||
|
|
c382e19e5e | ||
|
|
b786e6a39a | ||
|
|
3286c14d76 | ||
|
|
1773dd5632 | ||
|
|
c1292c69cf | ||
|
|
f27edaa19c | ||
|
|
706761d8ec | ||
|
|
41e4c39087 | ||
|
|
d5bdef9ee5 | ||
|
|
a4dcf3956e | ||
|
|
858fa914b1 | ||
|
|
ec923171a6 | ||
|
|
0144549cd6 | ||
|
|
0f246bfbc9 | ||
|
|
1a1583a5b6 | ||
|
|
f78b88b59b | ||
|
|
73d46ec548 | ||
|
|
dcee7839d4 | ||
|
|
75477f5661 | ||
|
|
78d7c953b0 | ||
|
|
753fc87efa | ||
|
|
c75dbc1f9c | ||
|
|
96e5ebe28c | ||
|
|
c45e92142e | ||
|
|
d69f0e529a | ||
|
|
86ff3c2aa3 | ||
|
|
efac73109e | ||
|
|
8c975712d3 | ||
|
|
1fdfe11bb0 | ||
|
|
58c06819d7 | ||
|
|
5b604509ce | ||
|
|
abbf0b24a6 | ||
|
|
347857e5e5 | ||
|
|
cd2ca5ef57 | ||
|
|
5a4065ecd5 | ||
|
|
ed4f2ecca4 | ||
|
|
8f80a84e93 | ||
|
|
95abb6d4a7 | ||
|
|
30b0cb4f5d | ||
|
|
97ae704f99 | ||
|
|
738e4c3681 | ||
|
|
ee74fe4e0e | ||
|
|
b2ea946837 | ||
|
|
92e725c467 | ||
|
|
e57d48253f | ||
|
|
47df9f9b05 | ||
|
|
193dc87bd0 | ||
|
|
11d1950957 | ||
|
|
6317325ed5 | ||
|
|
14222ad205 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -19,7 +19,7 @@ CMakeLists.txt.user
|
||||
*.egg-info
|
||||
__pycache__CMakeLists.txt.user
|
||||
.gdbinit
|
||||
resources
|
||||
/resources
|
||||
.pytest_cache
|
||||
/expressions.tokens
|
||||
tags
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=6.1.0-dev
|
||||
VERSION=6.1.2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "alternator/executor.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "types/types.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
namespace alternator {
|
||||
@@ -31,11 +32,12 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
const column_definition* salted_hash_col = schema->get_column_definition(bytes("salted_hash"));
|
||||
if (!salted_hash_col) {
|
||||
const column_definition* can_login_col = schema->get_column_definition(bytes("can_login"));
|
||||
if (!salted_hash_col || !can_login_col) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Credentials cannot be fetched for: {}", username)));
|
||||
}
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options());
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col, can_login_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
|
||||
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
@@ -51,7 +53,14 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
if (result_set->empty()) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("User not found: {}", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result_set->rows().front().front(); // We only asked for 1 row and 1 column
|
||||
const auto& result = result_set->rows().front();
|
||||
bool can_login = result[1] && value_cast<bool>(boolean_type->deserialize(*result[1]));
|
||||
if (!can_login) {
|
||||
// This is a valid role name, but has "login=False" so should not be
|
||||
// usable for authentication (see #19735).
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Role {} has login=false so cannot be used for login", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result.front();
|
||||
if (!salted_hash) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("No password found for user: {}", username)));
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include "alternator/executor.hh"
|
||||
#include "cdc/log.hh"
|
||||
#include "db/config.hh"
|
||||
#include "log.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
@@ -4439,8 +4440,10 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
|
||||
auto tables = _proxy.data_dictionary().get_tables(); // hold on to temporary, table_names isn't a container, it's a view
|
||||
auto table_names = tables
|
||||
| boost::adaptors::filtered([] (data_dictionary::table t) {
|
||||
return t.schema()->ks_name().find(KEYSPACE_NAME_PREFIX) == 0 && !t.schema()->is_view();
|
||||
| boost::adaptors::filtered([this] (data_dictionary::table t) {
|
||||
return t.schema()->ks_name().find(KEYSPACE_NAME_PREFIX) == 0 &&
|
||||
!t.schema()->is_view() &&
|
||||
!cdc::is_log_for_some_table(_proxy.local_db(), t.schema()->ks_name(), t.schema()->cf_name());
|
||||
})
|
||||
| boost::adaptors::transformed([] (data_dictionary::table t) {
|
||||
return t.schema()->cf_name();
|
||||
|
||||
@@ -211,7 +211,10 @@ protected:
|
||||
sstring local_dc = topology.get_datacenter();
|
||||
std::unordered_set<gms::inet_address> local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc);
|
||||
for (auto& ip : local_dc_nodes) {
|
||||
if (_gossiper.is_alive(ip)) {
|
||||
// Note that it's not enough for the node to be is_alive() - a
|
||||
// node joining the cluster is also "alive" but not responsive to
|
||||
// requests. We need the node to be in normal state. See #19694.
|
||||
if (_gossiper.is_normal(ip)) {
|
||||
// Use the gossiped broadcast_rpc_address if available instead
|
||||
// of the internal IP address "ip". See discussion in #18711.
|
||||
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "log.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "timestamp.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
@@ -312,7 +313,7 @@ static size_t random_offset(size_t min, size_t max) {
|
||||
// this range's primary node is down. For this we need to return not just
|
||||
// a list of this node's secondary ranges - but also the primary owner of
|
||||
// each of those ranges.
|
||||
static std::vector<std::pair<dht::token_range, gms::inet_address>> get_secondary_ranges(
|
||||
static future<std::vector<std::pair<dht::token_range, gms::inet_address>>> get_secondary_ranges(
|
||||
const locator::effective_replication_map_ptr& erm,
|
||||
gms::inet_address ep) {
|
||||
const auto& tm = *erm->get_token_metadata_ptr();
|
||||
@@ -323,6 +324,7 @@ static std::vector<std::pair<dht::token_range, gms::inet_address>> get_secondary
|
||||
}
|
||||
auto prev_tok = sorted_tokens.back();
|
||||
for (const auto& tok : sorted_tokens) {
|
||||
co_await coroutine::maybe_yield();
|
||||
inet_address_vector_replica_set eps = erm->get_natural_endpoints(tok);
|
||||
if (eps.size() <= 1 || eps[1] != ep) {
|
||||
prev_tok = tok;
|
||||
@@ -350,7 +352,7 @@ static std::vector<std::pair<dht::token_range, gms::inet_address>> get_secondary
|
||||
}
|
||||
prev_tok = tok;
|
||||
}
|
||||
return ret;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
|
||||
@@ -386,63 +388,63 @@ static std::vector<std::pair<dht::token_range, gms::inet_address>> get_secondary
|
||||
//
|
||||
// FIXME: Check if this algorithm is safe with tablet migration.
|
||||
// https://github.com/scylladb/scylladb/issues/16567
|
||||
enum primary_or_secondary_t {primary, secondary};
|
||||
template<primary_or_secondary_t primary_or_secondary>
|
||||
class token_ranges_owned_by_this_shard {
|
||||
// ranges_holder_primary holds just the primary ranges themselves
|
||||
class ranges_holder_primary {
|
||||
const dht::token_range_vector _token_ranges;
|
||||
public:
|
||||
ranges_holder_primary(const locator::vnode_effective_replication_map_ptr& erm, gms::gossiper& g, gms::inet_address ep)
|
||||
: _token_ranges(erm->get_primary_ranges(ep)) {}
|
||||
std::size_t size() const { return _token_ranges.size(); }
|
||||
const dht::token_range& operator[](std::size_t i) const {
|
||||
return _token_ranges[i];
|
||||
}
|
||||
bool should_skip(std::size_t i) const {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
// ranges_holder<secondary> holds the secondary token ranges plus each
|
||||
// range's primary owner, needed to implement should_skip().
|
||||
class ranges_holder_secondary {
|
||||
std::vector<std::pair<dht::token_range, gms::inet_address>> _token_ranges;
|
||||
gms::gossiper& _gossiper;
|
||||
public:
|
||||
ranges_holder_secondary(const locator::effective_replication_map_ptr& erm, gms::gossiper& g, gms::inet_address ep)
|
||||
: _token_ranges(get_secondary_ranges(erm, ep))
|
||||
, _gossiper(g) {}
|
||||
std::size_t size() const { return _token_ranges.size(); }
|
||||
const dht::token_range& operator[](std::size_t i) const {
|
||||
return _token_ranges[i].first;
|
||||
}
|
||||
// range i should be skipped if its primary owner is alive.
|
||||
bool should_skip(std::size_t i) const {
|
||||
return _gossiper.is_alive(_token_ranges[i].second);
|
||||
}
|
||||
};
|
||||
|
||||
// ranges_holder_primary holds just the primary ranges themselves
|
||||
class ranges_holder_primary {
|
||||
dht::token_range_vector _token_ranges;
|
||||
public:
|
||||
explicit ranges_holder_primary(dht::token_range_vector token_ranges) : _token_ranges(std::move(token_ranges)) {}
|
||||
static future<ranges_holder_primary> make(const locator::vnode_effective_replication_map_ptr& erm, gms::inet_address ep) {
|
||||
co_return ranges_holder_primary(co_await erm->get_primary_ranges(ep));
|
||||
}
|
||||
std::size_t size() const { return _token_ranges.size(); }
|
||||
const dht::token_range& operator[](std::size_t i) const {
|
||||
return _token_ranges[i];
|
||||
}
|
||||
bool should_skip(std::size_t i) const {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
// ranges_holder<secondary> holds the secondary token ranges plus each
|
||||
// range's primary owner, needed to implement should_skip().
|
||||
class ranges_holder_secondary {
|
||||
std::vector<std::pair<dht::token_range, gms::inet_address>> _token_ranges;
|
||||
const gms::gossiper& _gossiper;
|
||||
public:
|
||||
explicit ranges_holder_secondary(std::vector<std::pair<dht::token_range, gms::inet_address>> token_ranges, const gms::gossiper& g)
|
||||
: _token_ranges(std::move(token_ranges))
|
||||
, _gossiper(g) {}
|
||||
static future<ranges_holder_secondary> make(const locator::effective_replication_map_ptr& erm, gms::inet_address ep, const gms::gossiper& g) {
|
||||
co_return ranges_holder_secondary(co_await get_secondary_ranges(erm, ep), g);
|
||||
}
|
||||
std::size_t size() const { return _token_ranges.size(); }
|
||||
const dht::token_range& operator[](std::size_t i) const {
|
||||
return _token_ranges[i].first;
|
||||
}
|
||||
// range i should be skipped if its primary owner is alive.
|
||||
bool should_skip(std::size_t i) const {
|
||||
return _gossiper.is_alive(_token_ranges[i].second);
|
||||
}
|
||||
};
|
||||
|
||||
template<class primary_or_secondary_t>
|
||||
class token_ranges_owned_by_this_shard {
|
||||
schema_ptr _s;
|
||||
locator::effective_replication_map_ptr _erm;
|
||||
// _token_ranges will contain a list of token ranges owned by this node.
|
||||
// We'll further need to split each such range to the pieces owned by
|
||||
// the current shard, using _intersecter.
|
||||
using ranges_holder = std::conditional_t<
|
||||
primary_or_secondary == primary_or_secondary_t::primary,
|
||||
ranges_holder_primary,
|
||||
ranges_holder_secondary>;
|
||||
const ranges_holder _token_ranges;
|
||||
const primary_or_secondary_t _token_ranges;
|
||||
// NOTICE: _range_idx is used modulo _token_ranges size when accessing
|
||||
// the data to ensure that it doesn't go out of bounds
|
||||
size_t _range_idx;
|
||||
size_t _end_idx;
|
||||
std::optional<dht::selective_token_range_sharder> _intersecter;
|
||||
public:
|
||||
token_ranges_owned_by_this_shard(replica::database& db, gms::gossiper& g, schema_ptr s)
|
||||
token_ranges_owned_by_this_shard(schema_ptr s, primary_or_secondary_t token_ranges)
|
||||
: _s(s)
|
||||
, _erm(s->table().get_effective_replication_map())
|
||||
, _token_ranges(db.find_keyspace(s->ks_name()).get_vnode_effective_replication_map(),
|
||||
g, _erm->get_topology().my_address())
|
||||
, _token_ranges(std::move(token_ranges))
|
||||
, _range_idx(random_offset(0, _token_ranges.size() - 1))
|
||||
, _end_idx(_range_idx + _token_ranges.size())
|
||||
{
|
||||
@@ -498,6 +500,7 @@ struct scan_ranges_context {
|
||||
bytes column_name;
|
||||
std::optional<std::string> member;
|
||||
|
||||
service::client_state internal_client_state;
|
||||
::shared_ptr<cql3::selection::selection> selection;
|
||||
std::unique_ptr<service::query_state> query_state_ptr;
|
||||
std::unique_ptr<cql3::query_options> query_options;
|
||||
@@ -507,6 +510,7 @@ struct scan_ranges_context {
|
||||
: s(s)
|
||||
, column_name(column_name)
|
||||
, member(member)
|
||||
, internal_client_state(service::client_state::internal_tag())
|
||||
{
|
||||
// FIXME: don't read the entire items - read only parts of it.
|
||||
// We must read the key columns (to be able to delete) and also
|
||||
@@ -525,10 +529,9 @@ struct scan_ranges_context {
|
||||
std::vector<query::clustering_range> ck_bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), opts);
|
||||
command = ::make_lw_shared<query::read_command>(s->id(), s->version(), partition_slice, proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
executor::client_state client_state{executor::client_state::internal_tag()};
|
||||
tracing::trace_state_ptr trace_state;
|
||||
// NOTICE: empty_service_permit is used because the TTL service has fixed parallelism
|
||||
query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, empty_service_permit());
|
||||
query_state_ptr = std::make_unique<service::query_state>(internal_client_state, trace_state, empty_service_permit());
|
||||
// FIXME: What should we do on multi-DC? Will we run the expiration on the same ranges on all
|
||||
// DCs or only once for each range? If the latter, we need to change the CLs in the
|
||||
// scanner and deleter.
|
||||
@@ -724,7 +727,9 @@ static future<bool> scan_table(
|
||||
expiration_stats.scan_table++;
|
||||
// FIXME: need to pace the scan, not do it all at once.
|
||||
scan_ranges_context scan_ctx{s, proxy, std::move(column_name), std::move(member)};
|
||||
token_ranges_owned_by_this_shard<primary> my_ranges(db.real_database(), gossiper, s);
|
||||
auto erm = db.real_database().find_keyspace(s->ks_name()).get_vnode_effective_replication_map();
|
||||
auto my_address = erm->get_topology().my_address();
|
||||
token_ranges_owned_by_this_shard my_ranges(s, co_await ranges_holder_primary::make(erm, my_address));
|
||||
while (std::optional<dht::partition_range> range = my_ranges.next_partition_range()) {
|
||||
// Note that because of issue #9167 we need to run a separate
|
||||
// query on each partition range, and can't pass several of
|
||||
@@ -744,7 +749,7 @@ static future<bool> scan_table(
|
||||
// by tasking another node to take over scanning of the dead node's primary
|
||||
// ranges. What we do here is that this node will also check expiration
|
||||
// on its *secondary* ranges - but only those whose primary owner is down.
|
||||
token_ranges_owned_by_this_shard<secondary> my_secondary_ranges(db.real_database(), gossiper, s);
|
||||
token_ranges_owned_by_this_shard my_secondary_ranges(s, co_await ranges_holder_secondary::make(erm, my_address, gossiper));
|
||||
while (std::optional<dht::partition_range> range = my_secondary_ranges.next_partition_range()) {
|
||||
expiration_stats.secondary_ranges_scanned++;
|
||||
dht::partition_range_vector partition_ranges;
|
||||
|
||||
@@ -1891,6 +1891,14 @@
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"force",
|
||||
"description":"Enforce the source_dc option, even if it unsafe to use for rebuild",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -194,6 +194,21 @@
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/system/highest_supported_sstable_version",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get highest supported sstable version",
|
||||
"type":"string",
|
||||
"nickname":"get_highest_supported_sstable_version",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "sstables_loader.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "utils/user_provided_param.hh"
|
||||
|
||||
using namespace seastar::httpd;
|
||||
using namespace std::chrono_literals;
|
||||
@@ -1096,7 +1097,16 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
});
|
||||
|
||||
ss::rebuild.set(r, [&ss](std::unique_ptr<http::request> req) {
|
||||
auto source_dc = req->get_query_param("source_dc");
|
||||
utils::optional_param source_dc;
|
||||
if (auto source_dc_str = req->get_query_param("source_dc"); !source_dc_str.empty()) {
|
||||
source_dc.emplace(std::move(source_dc_str)).set_user_provided();
|
||||
}
|
||||
if (auto force_str = req->get_query_param("force"); !force_str.empty() && service::loosen_constraints(validate_bool(force_str))) {
|
||||
if (!source_dc) {
|
||||
throw bad_param_exception("The `source_dc` option must be provided for using the `force` option");
|
||||
}
|
||||
source_dc.set_force();
|
||||
}
|
||||
apilog.info("rebuild: source_dc={}", source_dc);
|
||||
return ss.local().rebuild(std::move(source_dc)).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "api/api-doc/system.json.hh"
|
||||
#include "api/api-doc/metrics.json.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
|
||||
#include <rapidjson/document.h>
|
||||
#include <seastar/core/reactor.hh>
|
||||
@@ -182,6 +183,11 @@ void set_system(http_context& ctx, routes& r) {
|
||||
apilog.info("Profile dumped to {}", profile_dest);
|
||||
return make_ready_future<json::json_return_type>(json::json_return_type(json::json_void()));
|
||||
}) ;
|
||||
|
||||
hs::get_highest_supported_sstable_version.set(r, [&ctx] (const_req req) {
|
||||
auto& table = ctx.db.local().find_column_family("system", "local");
|
||||
return seastar::to_sstring(table.get_sstables_manager().get_highest_supported_format());
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ static future<> announce_mutations_with_guard(
|
||||
::service::raft_group0_client& group0_client,
|
||||
std::vector<canonical_mutation> muts,
|
||||
::service::group0_guard group0_guard,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
auto group0_cmd = group0_client.prepare_command(
|
||||
::service::write_mutations{
|
||||
@@ -137,7 +137,7 @@ future<> announce_mutations_with_batching(
|
||||
::service::raft_group0_client& group0_client,
|
||||
start_operation_func_t start_operation_func,
|
||||
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
// account for command's overhead, it's better to use smaller threshold than constantly bounce off the limit
|
||||
size_t memory_threshold = group0_client.max_command_size() * 0.75;
|
||||
@@ -188,7 +188,7 @@ future<> announce_mutations(
|
||||
::service::raft_group0_client& group0_client,
|
||||
const sstring query_string,
|
||||
std::vector<data_value_or_unset> values,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
auto group0_guard = co_await group0_client.start_operation(as, timeout);
|
||||
auto timestamp = group0_guard.write_timestamp();
|
||||
|
||||
@@ -80,7 +80,7 @@ future<> create_legacy_metadata_table_if_missing(
|
||||
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
|
||||
// Use this function when need to perform read before write on a single guard or if
|
||||
// you have more than one mutation and potentially exceed single command size limit.
|
||||
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source*)>;
|
||||
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source&)>;
|
||||
future<> announce_mutations_with_batching(
|
||||
::service::raft_group0_client& group0_client,
|
||||
// since we can operate also in topology coordinator context where we need stronger
|
||||
@@ -88,7 +88,7 @@ future<> announce_mutations_with_batching(
|
||||
// function here
|
||||
start_operation_func_t start_operation_func,
|
||||
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout);
|
||||
|
||||
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
|
||||
@@ -97,7 +97,7 @@ future<> announce_mutations(
|
||||
::service::raft_group0_client& group0_client,
|
||||
const sstring query_string,
|
||||
std::vector<data_value_or_unset> values,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout);
|
||||
|
||||
// Appends mutations to a collector, they will be applied later on all nodes via group0 mechanism.
|
||||
|
||||
@@ -136,7 +136,7 @@ future<> password_authenticator::create_default_if_missing() {
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
} else {
|
||||
co_await announce_mutations(_qp, _group0_client, query,
|
||||
{salted_pwd, _superuser}, &_as, ::service::raft_timeout{});
|
||||
{salted_pwd, _superuser}, _as, ::service::raft_timeout{});
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,7 +681,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
co_await announce_mutations_with_batching(g0,
|
||||
start_operation_func,
|
||||
std::move(gen),
|
||||
&as,
|
||||
as,
|
||||
std::nullopt);
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ future<> standard_role_manager::create_default_role_if_missing() {
|
||||
{_superuser},
|
||||
cql3::query_processor::cache_internal::no).discard_result();
|
||||
} else {
|
||||
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, &_as, ::service::raft_timeout{});
|
||||
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, _as, ::service::raft_timeout{});
|
||||
}
|
||||
log.info("Created default superuser role '{}'.", _superuser);
|
||||
} catch(const exceptions::unavailable_exception& e) {
|
||||
|
||||
@@ -467,7 +467,16 @@ future<> shard_cleanup_keyspace_compaction_task_impl::run() {
|
||||
|
||||
future<> table_cleanup_keyspace_compaction_task_impl::run() {
|
||||
co_await wait_for_your_turn(_cv, _current_task, _status.id);
|
||||
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(_db.get_keyspace_local_ranges(_status.keyspace));
|
||||
// Note that we do not hold an effective_replication_map_ptr throughout
|
||||
// the cleanup operation, so the topology might change.
|
||||
// Since clenaup is an admin operation required for vnodes,
|
||||
// it is the responsibility of the system operator to not
|
||||
// perform additional incompatible range movements during cleanup.
|
||||
auto get_owned_ranges = [&] (std::string_view ks_name) -> future<owned_ranges_ptr> {
|
||||
const auto& erm = _db.find_keyspace(ks_name).get_vnode_effective_replication_map();
|
||||
co_return compaction::make_owned_ranges_ptr(co_await _db.get_keyspace_local_ranges(erm));
|
||||
};
|
||||
auto owned_ranges_ptr = co_await get_owned_ranges(_status.keyspace);
|
||||
co_await run_on_table("force_keyspace_cleanup", _db, _status.keyspace, _ti, [&] (replica::table& t) {
|
||||
// skip the flush, as cleanup_keyspace_compaction_task_impl::run should have done this.
|
||||
return t.perform_cleanup_compaction(owned_ranges_ptr, tasks::task_info{_status.id, _status.shard}, replica::table::do_flush::no);
|
||||
@@ -531,8 +540,15 @@ future<> shard_upgrade_sstables_compaction_task_impl::run() {
|
||||
|
||||
future<> table_upgrade_sstables_compaction_task_impl::run() {
|
||||
co_await wait_for_your_turn(_cv, _current_task, _status.id);
|
||||
auto owned_ranges = _db.maybe_get_keyspace_local_ranges(_status.keyspace);
|
||||
auto owned_ranges_ptr = owned_ranges ? compaction::make_owned_ranges_ptr(std::move(owned_ranges.value())) : nullptr;
|
||||
auto get_owned_ranges = [&] (std::string_view keyspace_name) -> future<owned_ranges_ptr> {
|
||||
const auto& ks = _db.find_keyspace(keyspace_name);
|
||||
if (ks.get_replication_strategy().is_per_table()) {
|
||||
co_return nullptr;
|
||||
}
|
||||
const auto& erm = ks.get_vnode_effective_replication_map();
|
||||
co_return compaction::make_owned_ranges_ptr(co_await _db.get_keyspace_local_ranges(erm));
|
||||
};
|
||||
auto owned_ranges_ptr = co_await get_owned_ranges(_status.keyspace);
|
||||
tasks::task_info info{_status.id, _status.shard};
|
||||
co_await run_on_table("upgrade_sstables", _db, _status.keyspace, _ti, [&] (replica::table& t) -> future<> {
|
||||
return t.parallel_foreach_table_state([&] (compaction::table_state& ts) -> future<> {
|
||||
|
||||
@@ -503,10 +503,12 @@ selection::collect_metadata(const schema& schema, const std::vector<prepared_sel
|
||||
}
|
||||
|
||||
result_set_builder::result_set_builder(const selection& s, gc_clock::time_point now,
|
||||
std::vector<size_t> group_by_cell_indices)
|
||||
std::vector<size_t> group_by_cell_indices,
|
||||
uint64_t limit)
|
||||
: _result_set(std::make_unique<result_set>(::make_shared<metadata>(*(s.get_result_metadata()))))
|
||||
, _selectors(s.new_selectors())
|
||||
, _group_by_cell_indices(std::move(group_by_cell_indices))
|
||||
, _limit(limit)
|
||||
, _last_group(_group_by_cell_indices.size())
|
||||
, _group_began(false)
|
||||
, _now(now)
|
||||
@@ -577,8 +579,10 @@ void result_set_builder::flush_selectors() {
|
||||
// handled by process_current_row
|
||||
return;
|
||||
}
|
||||
_result_set->add_row(_selectors->get_output_row());
|
||||
_selectors->reset();
|
||||
if (_result_set->size() < _limit) {
|
||||
_result_set->add_row(_selectors->get_output_row());
|
||||
_selectors->reset();
|
||||
}
|
||||
}
|
||||
|
||||
void result_set_builder::complete_row() {
|
||||
@@ -790,6 +794,10 @@ int32_t result_set_builder::ttl_of(size_t idx) {
|
||||
return _ttls[idx];
|
||||
}
|
||||
|
||||
size_t result_set_builder::result_set_size() const {
|
||||
return _result_set->size();
|
||||
}
|
||||
|
||||
bytes_opt result_set_builder::get_value(data_type t, query::result_atomic_cell_view c) {
|
||||
return {c.value().linearize()};
|
||||
}
|
||||
|
||||
@@ -172,6 +172,7 @@ private:
|
||||
std::unique_ptr<result_set> _result_set;
|
||||
std::unique_ptr<selectors> _selectors;
|
||||
const std::vector<size_t> _group_by_cell_indices; ///< Indices in \c current of cells holding GROUP BY values.
|
||||
const uint64_t _limit; ///< Maximum number of rows to return.
|
||||
std::vector<managed_bytes_opt> _last_group; ///< Previous row's group: all of GROUP BY column values.
|
||||
bool _group_began; ///< Whether a group began being formed.
|
||||
public:
|
||||
@@ -236,7 +237,8 @@ public:
|
||||
};
|
||||
|
||||
result_set_builder(const selection& s, gc_clock::time_point now,
|
||||
std::vector<size_t> group_by_cell_indices = {});
|
||||
std::vector<size_t> group_by_cell_indices = {},
|
||||
uint64_t limit = std::numeric_limits<uint64_t>::max());
|
||||
void add_empty();
|
||||
void add(bytes_opt value);
|
||||
void add(const column_definition& def, const query::result_atomic_cell_view& c);
|
||||
@@ -246,6 +248,7 @@ public:
|
||||
std::unique_ptr<result_set> build();
|
||||
api::timestamp_type timestamp_of(size_t idx);
|
||||
int32_t ttl_of(size_t idx);
|
||||
size_t result_set_size() const;
|
||||
|
||||
// Implements ResultVisitor concept from query.hh
|
||||
template<typename Filter = nop_filter>
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
#include "auth/service.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "cql3/statements/create_service_level_statement.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
@@ -38,6 +39,10 @@ create_service_level_statement::execute(query_processor& qp,
|
||||
service::query_state &state,
|
||||
const query_options &,
|
||||
std::optional<service::group0_guard> guard) const {
|
||||
if (_service_level.starts_with('$')) {
|
||||
throw exceptions::invalid_request_exception("Names starting with '$' are reserved for internal tenants. Use a different name.");
|
||||
}
|
||||
|
||||
service::group0_batch mc{std::move(guard)};
|
||||
qos::service_level_options slo = _slo.replace_defaults(qos::service_level_options{});
|
||||
auto& sl = state.get_service_level_controller();
|
||||
|
||||
@@ -192,6 +192,13 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
|
||||
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
|
||||
|
||||
bool ks_uses_tablets;
|
||||
try {
|
||||
ks_uses_tablets = db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets();
|
||||
} catch (const data_dictionary::no_such_keyspace& e) {
|
||||
throw exceptions::invalid_request_exception("Cannot create a table in a non-existent keyspace: " + keyspace());
|
||||
}
|
||||
|
||||
std::optional<std::map<bytes, data_type>> defined_multi_cell_columns;
|
||||
for (auto&& entry : _definitions) {
|
||||
::shared_ptr<column_identifier> id = entry.first;
|
||||
@@ -201,7 +208,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
throw exceptions::invalid_request_exception("Cannot set default_time_to_live on a table with counters");
|
||||
}
|
||||
|
||||
if (db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets() && pt.is_counter()) {
|
||||
if (ks_uses_tablets && pt.is_counter()) {
|
||||
throw exceptions::invalid_request_exception(format("Cannot use the 'counter' type for table {}.{}: Counters are not yet supported with tablets", keyspace(), cf_name));
|
||||
}
|
||||
|
||||
|
||||
@@ -283,33 +283,44 @@ select_statement::make_partition_slice(const query_options& options) const
|
||||
std::reverse(bounds.begin(), bounds.end());
|
||||
++_stats.reverse_queries;
|
||||
}
|
||||
|
||||
const uint64_t per_partition_limit = get_inner_loop_limit(get_limit(options, _per_partition_limit),
|
||||
_selection->is_aggregate());
|
||||
return query::partition_slice(std::move(bounds),
|
||||
std::move(static_columns), std::move(regular_columns), _opts, nullptr, get_per_partition_limit(options));
|
||||
std::move(static_columns), std::move(regular_columns), _opts, nullptr, per_partition_limit);
|
||||
}
|
||||
|
||||
uint64_t select_statement::do_get_limit(const query_options& options,
|
||||
const std::optional<expr::expression>& limit,
|
||||
const expr::unset_bind_variable_guard& limit_unset_guard,
|
||||
uint64_t default_limit) const {
|
||||
if (!limit.has_value() || limit_unset_guard.is_unset(options) || _selection->is_aggregate()) {
|
||||
return default_limit;
|
||||
}
|
||||
|
||||
auto val = expr::evaluate(*limit, options);
|
||||
if (val.is_null()) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value of limit");
|
||||
select_statement::get_limit_result select_statement::get_limit(
|
||||
const query_options& options, const std::optional<expr::expression>& limit) const
|
||||
{
|
||||
if (!limit.has_value()) {
|
||||
return bo::success(query::max_rows);
|
||||
}
|
||||
try {
|
||||
auto val = expr::evaluate(*limit, options);
|
||||
if (val.is_null()) {
|
||||
return bo::failure(exceptions::invalid_request_exception("Invalid null value of limit"));
|
||||
}
|
||||
auto l = val.view().validate_and_deserialize<int32_t>(*int32_type);
|
||||
if (l <= 0) {
|
||||
throw exceptions::invalid_request_exception("LIMIT must be strictly positive");
|
||||
return bo::failure(exceptions::invalid_request_exception("LIMIT must be strictly positive"));
|
||||
}
|
||||
return l;
|
||||
return bo::success(l);
|
||||
} catch (const marshal_exception& e) {
|
||||
throw exceptions::invalid_request_exception("Invalid limit value");
|
||||
return bo::failure(exceptions::invalid_request_exception("Invalid limit value"));
|
||||
} catch (const exceptions::invalid_request_exception& e) {
|
||||
return bo::failure(e);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t select_statement::get_inner_loop_limit(const select_statement::get_limit_result& limit, bool is_aggregate)
|
||||
{
|
||||
if (!limit.has_value() || is_aggregate) {
|
||||
return query::max_rows;
|
||||
}
|
||||
return limit.value();
|
||||
}
|
||||
|
||||
bool select_statement::needs_post_query_ordering() const {
|
||||
// We need post-query ordering only for queries with IN on the partition key and an ORDER BY.
|
||||
return _restrictions->key_is_in_relation() && !_parameters->orderings().empty();
|
||||
@@ -358,7 +369,8 @@ select_statement::do_execute(query_processor& qp,
|
||||
|
||||
validate_for_read(cl);
|
||||
|
||||
uint64_t limit = get_limit(options);
|
||||
const auto parsed_limit = get_limit(options, _limit);
|
||||
const uint64_t inner_loop_limit = get_inner_loop_limit(parsed_limit, _selection->is_aggregate());
|
||||
auto now = gc_clock::now();
|
||||
|
||||
_stats.filtered_reads += _restrictions_need_filtering;
|
||||
@@ -380,7 +392,7 @@ select_statement::do_execute(query_processor& qp,
|
||||
std::move(slice),
|
||||
max_result_size,
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
|
||||
query::row_limit(limit),
|
||||
query::row_limit(inner_loop_limit),
|
||||
query::partition_limit(query::max_partitions),
|
||||
now,
|
||||
tracing::make_trace_info(state.get_trace_state()),
|
||||
@@ -393,14 +405,13 @@ select_statement::do_execute(query_processor& qp,
|
||||
|
||||
_stats.unpaged_select_queries(_ks_sel) += page_size <= 0;
|
||||
|
||||
// An aggregation query will never be paged for the user, but we always page it internally to avoid OOM.
|
||||
// If we user provided a page_size we'll use that to page internally (because why not), otherwise we use our default
|
||||
// Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
|
||||
// An aggregation query may not be paged for the user, but we always page it internally to avoid OOM.
|
||||
// If the user provided a page_size we'll use that to page internally (because why not), otherwise we use our default
|
||||
// Also note: all GROUP BY queries are considered aggregation.
|
||||
const bool aggregate = _selection->is_aggregate() || has_group_by();
|
||||
const bool nonpaged_filtering = _restrictions_need_filtering && page_size <= 0;
|
||||
if (aggregate || nonpaged_filtering) {
|
||||
page_size = internal_paging_size;
|
||||
page_size = page_size <= 0 ? internal_paging_size : std::min(page_size, internal_paging_size);
|
||||
}
|
||||
|
||||
auto key_ranges = _restrictions->get_partition_key_ranges(options);
|
||||
@@ -438,7 +449,9 @@ select_statement::do_execute(query_processor& qp,
|
||||
*command, key_ranges))) {
|
||||
f = execute_without_checking_exception_message_non_aggregate_unpaged(qp, command, std::move(key_ranges), state, options, now);
|
||||
} else {
|
||||
f = execute_without_checking_exception_message_aggregate_or_paged(qp, command, std::move(key_ranges), state, options, now, page_size, aggregate, nonpaged_filtering);
|
||||
f = execute_without_checking_exception_message_aggregate_or_paged(qp, command,
|
||||
std::move(key_ranges), state, options, now, page_size, aggregate,
|
||||
nonpaged_filtering, parsed_limit.has_value() ? parsed_limit.value() : query::max_rows);
|
||||
}
|
||||
|
||||
if (!tablet_info.has_value()) {
|
||||
@@ -454,7 +467,8 @@ select_statement::do_execute(query_processor& qp,
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
select_statement::execute_without_checking_exception_message_aggregate_or_paged(query_processor& qp,
|
||||
lw_shared_ptr<query::read_command> command, dht::partition_range_vector&& key_ranges, service::query_state& state,
|
||||
const query_options& options, gc_clock::time_point now, int32_t page_size, bool aggregate, bool nonpaged_filtering) const {
|
||||
const query_options& options, gc_clock::time_point now, int32_t page_size, bool aggregate, bool nonpaged_filtering,
|
||||
uint64_t limit) const {
|
||||
command->slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
auto timeout_duration = get_timeout(state.get_client_state(), options);
|
||||
auto timeout = db::timeout_clock::now() + timeout_duration;
|
||||
@@ -462,8 +476,11 @@ select_statement::execute_without_checking_exception_message_aggregate_or_paged(
|
||||
state, options, command, std::move(key_ranges), _restrictions_need_filtering ? _restrictions : nullptr);
|
||||
|
||||
if (aggregate || nonpaged_filtering) {
|
||||
auto builder = cql3::selection::result_set_builder(*_selection, now, *_group_by_cell_indices);
|
||||
coordinator_result<void> result_void = co_await utils::result_do_until([&p] {return p->is_exhausted();},
|
||||
auto builder = cql3::selection::result_set_builder(*_selection, now, *_group_by_cell_indices, limit);
|
||||
coordinator_result<void> result_void = co_await utils::result_do_until(
|
||||
[&p, &builder, limit] {
|
||||
return p->is_exhausted() || (limit < builder.result_set_size());
|
||||
},
|
||||
[&p, &builder, page_size, now, timeout] {
|
||||
return p->fetch_page_result(builder, page_size, now, timeout);
|
||||
}
|
||||
@@ -586,7 +603,7 @@ indexed_table_select_statement::prepare_command_for_base_query(query_processor&
|
||||
std::move(slice),
|
||||
qp.proxy().get_max_result_size(slice),
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
|
||||
query::row_limit(get_limit(options)),
|
||||
query::row_limit(get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate())),
|
||||
query::partition_limit(query::max_partitions),
|
||||
now,
|
||||
tracing::make_trace_info(state.get_trace_state()),
|
||||
@@ -1368,7 +1385,8 @@ indexed_table_select_statement::find_index_partition_ranges(query_processor& qp,
|
||||
using value_type = std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>;
|
||||
auto now = gc_clock::now();
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
return read_posting_list(qp, options, get_limit(options), state, now, timeout, false).then(utils::result_wrap(
|
||||
const uint64_t limit = get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate());
|
||||
return read_posting_list(qp, options, limit, state, now, timeout, false).then(utils::result_wrap(
|
||||
[this, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
|
||||
auto rs = cql3::untyped_result_set(rows);
|
||||
dht::partition_range_vector partition_ranges;
|
||||
@@ -1417,7 +1435,8 @@ indexed_table_select_statement::find_index_clustering_rows(query_processor& qp,
|
||||
using value_type = std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>;
|
||||
auto now = gc_clock::now();
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
return read_posting_list(qp, options, get_limit(options), state, now, timeout, true).then(utils::result_wrap(
|
||||
const uint64_t limit = get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate());
|
||||
return read_posting_list(qp, options, limit, state, now, timeout, true).then(utils::result_wrap(
|
||||
[this, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
|
||||
|
||||
auto rs = cql3::untyped_result_set(rows);
|
||||
@@ -1683,6 +1702,7 @@ schema_ptr mutation_fragments_select_statement::generate_output_schema(schema_pt
|
||||
|
||||
future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>
|
||||
mutation_fragments_select_statement::do_query(
|
||||
locator::effective_replication_map_ptr erm_keepalive,
|
||||
locator::host_id this_node,
|
||||
service::storage_proxy& sp,
|
||||
schema_ptr schema,
|
||||
@@ -1690,7 +1710,7 @@ mutation_fragments_select_statement::do_query(
|
||||
dht::partition_range_vector partition_ranges,
|
||||
db::consistency_level cl,
|
||||
service::storage_proxy_coordinator_query_options optional_params) const {
|
||||
auto res = co_await replica::mutation_dump::dump_mutations(sp.get_db(), schema, _underlying_schema, partition_ranges, *cmd, optional_params.timeout(sp));
|
||||
auto res = co_await replica::mutation_dump::dump_mutations(sp.get_db(), std::move(erm_keepalive), schema, _underlying_schema, partition_ranges, *cmd, optional_params.timeout(sp));
|
||||
service::replicas_per_token_range last_replicas;
|
||||
if (this_node) {
|
||||
last_replicas.emplace(dht::token_range::make_open_ended_both_sides(), std::vector<locator::host_id>{this_node});
|
||||
@@ -1704,7 +1724,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
|
||||
auto cl = options.get_consistency();
|
||||
|
||||
uint64_t limit = get_limit(options);
|
||||
const uint64_t limit = get_inner_loop_limit(get_limit(options, _limit), _selection->is_aggregate());
|
||||
auto now = gc_clock::now();
|
||||
|
||||
_stats.filtered_reads += _restrictions_need_filtering;
|
||||
@@ -1762,7 +1782,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
if (!aggregate && !_restrictions_need_filtering && (page_size <= 0
|
||||
|| !service::pager::query_pagers::may_need_paging(*_schema, page_size,
|
||||
*command, key_ranges))) {
|
||||
return do_query({}, qp.proxy(), _schema, command, std::move(key_ranges), cl,
|
||||
return do_query(erm_keepalive, {}, qp.proxy(), _schema, command, std::move(key_ranges), cl,
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}})
|
||||
.then(wrap_result_to_error_message([this, erm_keepalive, now, slice = command->slice] (service::storage_proxy_coordinator_query_result&& qr) mutable {
|
||||
cql3::selection::result_set_builder builder(*_selection, now);
|
||||
@@ -1801,8 +1821,8 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
std::move(key_ranges),
|
||||
_restrictions_need_filtering ? _restrictions : nullptr,
|
||||
[this, erm_keepalive, this_node] (service::storage_proxy& sp, schema_ptr schema, lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector partition_ranges,
|
||||
db::consistency_level cl, service::storage_proxy_coordinator_query_options optional_params) {
|
||||
return do_query(this_node, sp, std::move(schema), std::move(cmd), std::move(partition_ranges), cl, std::move(optional_params));
|
||||
db::consistency_level cl, service::storage_proxy_coordinator_query_options optional_params) mutable {
|
||||
return do_query(std::move(erm_keepalive), this_node, sp, std::move(schema), std::move(cmd), std::move(partition_ranges), cl, std::move(optional_params));
|
||||
});
|
||||
|
||||
if (_selection->is_trivial() && !_restrictions_need_filtering && !_per_partition_limit) {
|
||||
|
||||
@@ -128,7 +128,7 @@ public:
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> execute_without_checking_exception_message_aggregate_or_paged(query_processor& qp,
|
||||
lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, service::query_state& state,
|
||||
const query_options& options, gc_clock::time_point now, int32_t page_size, bool aggregate, bool nonpaged_filtering) const;
|
||||
const query_options& options, gc_clock::time_point now, int32_t page_size, bool aggregate, bool nonpaged_filtering, uint64_t limit) const;
|
||||
|
||||
|
||||
struct primary_key {
|
||||
@@ -152,13 +152,10 @@ public:
|
||||
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
|
||||
|
||||
protected:
|
||||
uint64_t do_get_limit(const query_options& options, const std::optional<expr::expression>& limit, const expr::unset_bind_variable_guard& unset_guard, uint64_t default_limit) const;
|
||||
uint64_t get_limit(const query_options& options) const {
|
||||
return do_get_limit(options, _limit, _limit_unset_guard, query::max_rows);
|
||||
}
|
||||
uint64_t get_per_partition_limit(const query_options& options) const {
|
||||
return do_get_limit(options, _per_partition_limit, _per_partition_limit_unset_guard, query::partition_max_rows);
|
||||
}
|
||||
using get_limit_result = bo::result<uint64_t, exceptions::invalid_request_exception>;
|
||||
get_limit_result get_limit(const query_options& options, const std::optional<expr::expression>& limit) const;
|
||||
static uint64_t get_inner_loop_limit(const select_statement::get_limit_result& limit, bool is_aggregate);
|
||||
|
||||
bool needs_post_query_ordering() const;
|
||||
virtual void update_stats_rows_read(int64_t rows_read) const {
|
||||
_stats.rows_read += rows_read;
|
||||
@@ -338,6 +335,7 @@ public:
|
||||
private:
|
||||
future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>
|
||||
do_query(
|
||||
locator::effective_replication_map_ptr erm_keepalive,
|
||||
locator::host_id this_node,
|
||||
service::storage_proxy& sp,
|
||||
schema_ptr schema,
|
||||
|
||||
@@ -334,7 +334,13 @@ filter_for_query(consistency_level cl,
|
||||
if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations
|
||||
// local node is always first if present (see storage_proxy::get_endpoints_for_reading)
|
||||
unsigned local_idx = erm.get_topology().is_me(epi[0].first) ? 0 : epi.size() + 1;
|
||||
live_endpoints = boost::copy_range<inet_address_vector_replica_set>(miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)));
|
||||
auto weighted = boost::copy_range<inet_address_vector_replica_set>(miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)));
|
||||
// Workaround for https://github.com/scylladb/scylladb/issues/9285
|
||||
auto last = std::adjacent_find(weighted.begin(), weighted.end());
|
||||
if (last == weighted.end()) {
|
||||
// No duplicates, so use the result based on hit rates
|
||||
live_endpoints = std::move(weighted);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -167,6 +167,7 @@ future<db::commitlog> hint_endpoint_manager::add_store() noexcept {
|
||||
return io_check([name = _hints_dir.c_str()] { return recursive_touch_directory(name); }).then([this] () {
|
||||
commitlog::config cfg;
|
||||
|
||||
cfg.sched_group = _shard_manager.local_db().commitlog()->active_config().sched_group;
|
||||
cfg.commit_log_location = _hints_dir.c_str();
|
||||
cfg.commitlog_segment_size_in_mb = resource_manager::hint_segment_size_in_mb;
|
||||
cfg.commitlog_total_space_in_mb = resource_manager::max_hints_per_ep_size_mb;
|
||||
|
||||
@@ -76,23 +76,6 @@ future<timespec> hint_sender::get_last_file_modification(const sstring& fname) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> hint_sender::do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) {
|
||||
return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> {
|
||||
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
|
||||
// to be generated as a result of hints sending.
|
||||
const auto& tm = ermp->get_token_metadata();
|
||||
const auto maybe_addr = tm.get_endpoint_for_host_id_if_known(end_point_key());
|
||||
|
||||
if (maybe_addr && boost::range::find(natural_endpoints, *maybe_addr) != natural_endpoints.end()) {
|
||||
manager_logger.trace("Sending directly to {}", end_point_key());
|
||||
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), *maybe_addr);
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
return _proxy.send_hint_to_all_replicas(std::move(m));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool hint_sender::can_send() noexcept {
|
||||
if (stopping() && !draining()) {
|
||||
return false;
|
||||
@@ -274,11 +257,30 @@ void hint_sender::start() {
|
||||
}
|
||||
|
||||
future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) {
|
||||
auto erm = _db.find_column_family(m.s).get_effective_replication_map();
|
||||
auto ermp = _db.find_column_family(m.s).get_effective_replication_map();
|
||||
auto token = dht::get_token(*m.s, m.fm.key());
|
||||
inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints(std::move(token));
|
||||
inet_address_vector_replica_set natural_endpoints = ermp->get_natural_endpoints(std::move(token));
|
||||
|
||||
return do_send_one_mutation(std::move(m), std::move(erm), std::move(natural_endpoints));
|
||||
return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> {
|
||||
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
|
||||
// to be generated as a result of hints sending.
|
||||
const auto& tm = ermp->get_token_metadata();
|
||||
const auto maybe_addr = tm.get_endpoint_for_host_id_if_known(end_point_key());
|
||||
|
||||
if (maybe_addr && boost::range::find(natural_endpoints, *maybe_addr) != natural_endpoints.end() && !tm.is_leaving(end_point_key())) {
|
||||
manager_logger.trace("Sending directly to {}", end_point_key());
|
||||
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), *maybe_addr);
|
||||
} else {
|
||||
if (manager_logger.is_enabled(log_level::trace)) {
|
||||
if (tm.is_leaving(end_point_key())) {
|
||||
manager_logger.trace("The original target endpoint {} is leaving. Mutating from scratch...", end_point_key());
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
}
|
||||
}
|
||||
return _proxy.send_hint_to_all_replicas(std::move(m));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> hint_sender::send_one_hint(lw_shared_ptr<send_one_file_ctx> ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) {
|
||||
|
||||
@@ -233,18 +233,14 @@ private:
|
||||
/// \return
|
||||
const column_mapping& get_column_mapping(lw_shared_ptr<send_one_file_ctx> ctx_ptr, const frozen_mutation& fm, const hint_entry_reader& hr);
|
||||
|
||||
/// \brief Perform a single mutation send attempt.
|
||||
/// \brief Send one mutation out.
|
||||
///
|
||||
/// If the original destination end point is still a replica for the given mutation - send the mutation directly
|
||||
/// to it, otherwise execute the mutation "from scratch" with CL=ALL.
|
||||
///
|
||||
/// \param m mutation to send
|
||||
/// \param ermp points to the effective_replication_map used to obtain \c natural_endpoints
|
||||
/// \param natural_endpoints current replicas for the given mutation
|
||||
/// \return future that resolves when the operation is complete
|
||||
future<> do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints);
|
||||
|
||||
/// \brief Send one mutation out.
|
||||
/// The mutation will be sent with CL=ALL semantics to all current replicas also in case if the original destination
|
||||
/// is leaving the cluster - otherwise the hint might be applied only on the leaving node and streaming might
|
||||
/// miss it.
|
||||
///
|
||||
/// \param m mutation to send
|
||||
/// \return future that resolves when the mutation sending processing is complete.
|
||||
|
||||
@@ -779,40 +779,35 @@ redact_columns_for_missing_features(mutation&& m, schema_features features) {
|
||||
*/
|
||||
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features, noncopyable_function<bool(std::string_view)> accept_keyspace)
|
||||
{
|
||||
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> future<std::vector<mutation>> {
|
||||
using mutations_generator = coroutine::experimental::generator<mutation>;
|
||||
|
||||
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> mutations_generator {
|
||||
auto& db = proxy.local().get_db();
|
||||
auto rs = co_await db::system_keyspace::query_mutations(db, NAME, table);
|
||||
auto s = db.local().find_schema(NAME, table);
|
||||
std::vector<mutation> mutations;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
auto mut = co_await unfreeze_gently(p.mut(), s);
|
||||
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
|
||||
auto partition_key = value_cast<sstring>(utf8_type->deserialize(::partition_key(p.mut().key()).get_component(*s, 0)));
|
||||
if (!accept_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
mutations.emplace_back(std::move(mut));
|
||||
}
|
||||
co_return mutations;
|
||||
};
|
||||
auto reduce = [features] (auto& hash, auto&& mutations) {
|
||||
for (const mutation& m : mutations) {
|
||||
feed_hash_for_schema_digest(hash, m, features);
|
||||
auto mut = co_await unfreeze_gently(p.mut(), s);
|
||||
co_yield redact_columns_for_missing_features(std::move(mut), features);
|
||||
}
|
||||
};
|
||||
auto hash = md5_hasher();
|
||||
auto tables = all_table_names(features);
|
||||
{
|
||||
for (auto& table: tables) {
|
||||
auto mutations = co_await map(table);
|
||||
if (diff_logger.is_enabled(logging::log_level::trace)) {
|
||||
for (const mutation& m : mutations) {
|
||||
auto gen_mutations = map(table);
|
||||
while (auto mut_opt = co_await gen_mutations()) {
|
||||
auto& m = *mut_opt;
|
||||
feed_hash_for_schema_digest(hash, m, features);
|
||||
if (diff_logger.is_enabled(logging::log_level::trace)) {
|
||||
md5_hasher h;
|
||||
feed_hash_for_schema_digest(h, m, features);
|
||||
diff_logger.trace("Digest {} for {}, compacted={}", h.finalize(), m, compact_for_schema_digest(m));
|
||||
}
|
||||
}
|
||||
reduce(hash, mutations);
|
||||
}
|
||||
co_return utils::UUID_gen::get_name_UUID(hash.finalize());
|
||||
}
|
||||
|
||||
@@ -1673,7 +1673,22 @@ get_view_natural_endpoint(
|
||||
return {};
|
||||
}
|
||||
auto replica = view_endpoints[base_it - base_endpoints.begin()];
|
||||
return view_topology.get_node(replica).endpoint();
|
||||
|
||||
// https://github.com/scylladb/scylladb/issues/19439
|
||||
// With tablets, a node being replaced might transition to "left" state
|
||||
// but still be kept as a replica. In such case, the IP of the replaced
|
||||
// node will be lost and `endpoint()` will return an empty IP here.
|
||||
// As of writing this, storage proxy was not migrated to host IDs yet
|
||||
// (#6403) and hints are not prepared to handle nodes that are left
|
||||
// but are still replicas. Therefore, there is no other sensible option
|
||||
// right now but to give up attempt to send the update or write a hint
|
||||
// to the paired, permanently down replica.
|
||||
const auto ep = view_topology.get_node(replica).endpoint();
|
||||
if (ep != gms::inet_address{}) {
|
||||
return ep;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
||||
|
||||
2
dist/common/scripts/scylla_raid_setup
vendored
2
dist/common/scripts/scylla_raid_setup
vendored
@@ -325,6 +325,8 @@ WantedBy=local-fs.target
|
||||
os.chown(dpath, uid, gid)
|
||||
|
||||
if is_debian_variant():
|
||||
if not shutil.which('update-initramfs'):
|
||||
pkg_install('initramfs-tools')
|
||||
run('update-initramfs -u', shell=True, check=True)
|
||||
|
||||
if not udev_info.uuid_link:
|
||||
|
||||
27
dist/redhat/scylla.spec
vendored
27
dist/redhat/scylla.spec
vendored
@@ -158,33 +158,6 @@ Obsoletes: scylla-server < 1.1
|
||||
%description conf
|
||||
This package contains the main scylla configuration file.
|
||||
|
||||
# we need to refuse upgrade if current scylla < 1.7.3 && commitlog remains
|
||||
%pretrans conf
|
||||
ver=$(rpm -qi scylla-server | grep Version | awk '{print $3}')
|
||||
if [ -n "$ver" ]; then
|
||||
ver_fmt=$(echo $ver | awk -F. '{printf "%d%02d%02d", $1,$2,$3}')
|
||||
if [ $ver_fmt -lt 10703 ]; then
|
||||
# for <scylla-1.2
|
||||
if [ ! -f /opt/scylladb/lib/scylla/scylla_config_get.py ]; then
|
||||
echo
|
||||
echo "Error: Upgrading from scylla-$ver to scylla-%{version} is not supported."
|
||||
echo "Please upgrade to scylla-1.7.3 or later, before upgrade to %{version}."
|
||||
echo
|
||||
exit 1
|
||||
fi
|
||||
commitlog_directory=$(/opt/scylladb/lib/scylla/scylla_config_get.py -g commitlog_directory)
|
||||
commitlog_files=$(ls $commitlog_directory | wc -l)
|
||||
if [ $commitlog_files -ne 0 ]; then
|
||||
echo
|
||||
echo "Error: Upgrading from scylla-$ver to scylla-%{version} is not supported when commitlog is not clean."
|
||||
echo "Please upgrade to scylla-1.7.3 or later, before upgrade to %{version}."
|
||||
echo "Also make sure $commitlog_directory is empty."
|
||||
echo
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
%files conf
|
||||
%defattr(-,root,root)
|
||||
%attr(0755,root,root) %dir %{_sysconfdir}/scylla
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import os
|
||||
from sphinx.directives.other import Include
|
||||
from sphinx.util import logging
|
||||
from docutils.parsers.rst import directives
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
class IncludeFlagDirective(Include):
|
||||
option_spec = Include.option_spec.copy()
|
||||
option_spec['base_path'] = directives.unchanged
|
||||
@@ -8,11 +12,18 @@ class IncludeFlagDirective(Include):
|
||||
def run(self):
|
||||
env = self.state.document.settings.env
|
||||
base_path = self.options.get('base_path', '_common')
|
||||
file_path = self.arguments[0]
|
||||
|
||||
if env.app.tags.has('enterprise'):
|
||||
self.arguments[0] = base_path + "_enterprise/" + self.arguments[0]
|
||||
enterprise_path = os.path.join(base_path + "_enterprise", file_path)
|
||||
_, enterprise_abs_path = env.relfn2path(enterprise_path)
|
||||
if os.path.exists(enterprise_abs_path):
|
||||
self.arguments[0] = enterprise_path
|
||||
else:
|
||||
LOGGER.info(f"Enterprise content not found: Skipping inclusion of {file_path}")
|
||||
return []
|
||||
else:
|
||||
self.arguments[0] = base_path + "/" + self.arguments[0]
|
||||
self.arguments[0] = os.path.join(base_path, file_path)
|
||||
return super().run()
|
||||
|
||||
def setup(app):
|
||||
|
||||
@@ -123,10 +123,6 @@ the secret key is the `salted_hash`, i.e., the secret key can be found by
|
||||
|
||||
<!--- REMOVE IN FUTURE VERSIONS - Remove the note below in version 6.1 -->
|
||||
|
||||
(Note: If you upgraded from version 5.4 to version 6.0 without
|
||||
[enabling consistent topology updates](../upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.rst),
|
||||
the table name is `system_auth.roles`.)
|
||||
|
||||
By default, authorization is not enforced at all. It can be turned on
|
||||
by providing an entry in Scylla configuration:
|
||||
`alternator_enforce_authorization: true`
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
If you upgraded from 5.4, you must perform a manual action in order to enable
|
||||
consistent topology changes.
|
||||
See :doc:`the guide for enabling consistent topology changes</upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>` for more details.
|
||||
@@ -60,9 +60,8 @@ In summary, Raft makes schema changes safe, but it requires that a quorum of nod
|
||||
Verifying that the Raft upgrade procedure finished successfully
|
||||
========================================================================
|
||||
|
||||
You may need to perform the following procedure on upgrade if you explicitly
|
||||
disabled the Raft-based schema changes feature in the previous ScyllaDB
|
||||
version. Please consult the upgrade guide.
|
||||
You may need to perform the following procedure as part of
|
||||
the :ref:`manual recovery procedure <recovery-procedure>`.
|
||||
|
||||
The Raft upgrade procedure requires **full cluster availability** to correctly setup the Raft algorithm; after the setup finishes, Raft can proceed with only a majority of nodes, but this initial setup is an exception.
|
||||
An unlucky event, such as a hardware failure, may cause one of your nodes to fail. If this happens before the Raft upgrade procedure finishes, the procedure will get stuck and your intervention will be required.
|
||||
@@ -173,8 +172,6 @@ gossip-based topology.
|
||||
|
||||
The feature is automatically enabled in new clusters.
|
||||
|
||||
.. scylladb_include_flag:: consistent-topology-with-raft-upgrade-info.rst
|
||||
|
||||
Verifying that Raft is Enabled
|
||||
----------------------------------
|
||||
|
||||
|
||||
3
docs/cql/_common/tablets-default.rst
Normal file
3
docs/cql/_common/tablets-default.rst
Normal file
@@ -0,0 +1,3 @@
|
||||
By default, a keyspace is created with tablets enabled. The ``tablets`` option
|
||||
is used to opt out a keyspace from tablets-based distribution; see :ref:`Enabling Tablets <tablets-enable-tablets>`
|
||||
for details.
|
||||
@@ -62,7 +62,7 @@ The following options are available for all compaction strategies.
|
||||
=====
|
||||
|
||||
``tombstone_compaction_interval`` (default: 86400s (1 day))
|
||||
An SSTable that is suitable for single SSTable compaction, according to tombstone_threshold will not be compacted if it is newer than tombstone_compaction_interval.
|
||||
*tombstone_compaction_interval* is lower-bound for when a new tombstone compaction can start. If an SSTable was compacted at a time *X*, the earliest time it will be considered for tombstone compaction again is *X + tombstone_compaction_interval*. This does not guarantee that sstables will be considered for compaction immediately after tombstone_compaction_interval time has elapsed after the last compaction.
|
||||
|
||||
=====
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ name kind mandatory default description
|
||||
details below).
|
||||
``durable_writes`` *simple* no true Whether to use the commit log for updates on this keyspace
|
||||
(disable this option at your own risk!).
|
||||
``tablets`` *map* no Enables or disables tablets for the keyspace (see :ref:`tablets<tablets>`)
|
||||
``tablets`` *map* no Enables or disables tablets for the keyspace (see :ref:`tablets <tablets>`)
|
||||
=================== ========== =========== ========= ===================================================================
|
||||
|
||||
The ``replication`` property is mandatory and must at least contains the ``'class'`` sub-option, which defines the
|
||||
@@ -232,9 +232,7 @@ sub-option type description
|
||||
``'initial'`` int The number of tablets to start with
|
||||
===================================== ====== =============================================
|
||||
|
||||
By default, a keyspace is created with tablets enabled. The ``tablets`` option
|
||||
is used to opt out a keyspace from tablets-based distribution; see :ref:`Enabling Tablets <tablets-enable-tablets>`
|
||||
for details.
|
||||
.. scylladb_include_flag:: tablets-default.rst
|
||||
|
||||
A good rule of thumb to calculate initial tablets is to divide the expected total storage used
|
||||
by tables in this keyspace by (``replication_factor`` * 5GB). For example, if you expect a 30TB
|
||||
@@ -759,10 +757,8 @@ available:
|
||||
========================= =============== =============================================================================
|
||||
Option Default Description
|
||||
========================= =============== =============================================================================
|
||||
``sstable_compression`` LZ4Compressor The compression algorithm to use. Default compressors are
|
||||
LZ4Compressor, SnappyCompressor, and DeflateCompressor.
|
||||
A custom compressor can be provided by specifying the full class
|
||||
name as a “string constant”:#constants.
|
||||
``sstable_compression`` LZ4Compressor The compression algorithm to use. Available compressors are
|
||||
LZ4Compressor, SnappyCompressor, DeflateCompressor, and ZstdCompressor.
|
||||
``chunk_length_in_kb`` 4 On disk SSTables are compressed by block (to allow random reads). This
|
||||
defines the size (in KB) of the block. Bigger values may improve the
|
||||
compression rate, but increases the minimum size of data to be read from disk
|
||||
|
||||
@@ -6,9 +6,9 @@ You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 11 | 8 | 9 |
|
||||
+============================+======+======+======+=======+=======+=======+
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| 5.4 | |v| | |v| | |x| | |v| | |v| | |v| |
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
|
||||
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
|
||||
54
docs/getting-started/_common/setup-after-install.rst
Normal file
54
docs/getting-started/_common/setup-after-install.rst
Normal file
@@ -0,0 +1,54 @@
|
||||
Configure and Run ScyllaDB
|
||||
-------------------------------
|
||||
|
||||
#. Configure the following parameters in the ``/etc/scylla/scylla.yaml`` configuration file.
|
||||
|
||||
* ``cluster_name`` - The name of the cluster. All the nodes in the cluster must have the same
|
||||
cluster name configured.
|
||||
* ``seeds`` - The IP address of the first node. Other nodes will use it as the first contact
|
||||
point to discover the cluster topology when joining the cluster.
|
||||
* ``listen_address`` - The IP address that ScyllaDB uses to connect to other nodes in the cluster.
|
||||
* ``rpc_address`` - The IP address of the interface for CQL client connections.
|
||||
|
||||
#. Run the ``scylla_setup`` script to tune the system settings and determine the optimal configuration.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo scylla_setup
|
||||
|
||||
* The script invokes a set of :ref:`scripts <system-configuration-scripts>` to configure several operating system settings; for example, it sets
|
||||
RAID0 and XFS filesystem.
|
||||
* The script runs a short (up to a few minutes) benchmark on your storage and generates the ``/etc/scylla.d/io.conf``
|
||||
configuration file. When the file is ready, you can start ScyllaDB. ScyllaDB will not run without XFS
|
||||
or ``io.conf`` file.
|
||||
* You can bypass this check by running ScyllaDB in :doc:`developer mode </getting-started/installation-common/dev-mod>`.
|
||||
We recommend against enabling developer mode in production environments to ensure ScyllaDB's maximum performance.
|
||||
|
||||
#. Run ScyllaDB as a service (if not already running).
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
|
||||
Now you can start using ScyllaDB. Here are some tools you may find useful.
|
||||
|
||||
Run nodetool:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool status
|
||||
|
||||
Run cqlsh:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cqlsh
|
||||
|
||||
Run cassandra-stress:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cassandra-stress write -mode cql3 native
|
||||
|
||||
|
||||
@@ -154,59 +154,7 @@ Install ScyllaDB
|
||||
sudo yum install scylla-5.2.3
|
||||
|
||||
|
||||
Configure and Run ScyllaDB
|
||||
-------------------------------
|
||||
|
||||
#. Configure the following parameters in the ``/etc/scylla/scylla.yaml`` configuration file.
|
||||
|
||||
* ``cluster_name`` - The name of the cluster. All the nodes in the cluster must have the same
|
||||
cluster name configured.
|
||||
* ``seeds`` - The IP address of the first node. Other nodes will use it as the first contact
|
||||
point to discover the cluster topology when joining the cluster.
|
||||
* ``listen_address`` - The IP address that ScyllaDB uses to connect to other nodes in the cluster.
|
||||
* ``rpc_address`` - The IP address of the interface for CQL client connections.
|
||||
|
||||
#. Run the ``scylla_setup`` script to tune the system settings and determine the optimal configuration.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo scylla_setup
|
||||
|
||||
* The script invokes a set of :ref:`scripts <system-configuration-scripts>` to configure several operating system settings; for example, it sets
|
||||
RAID0 and XFS filesystem.
|
||||
* The script runs a short (up to a few minutes) benchmark on your storage and generates the ``/etc/scylla.d/io.conf``
|
||||
configuration file. When the file is ready, you can start ScyllaDB. ScyllaDB will not run without XFS
|
||||
or ``io.conf`` file.
|
||||
* You can bypass this check by running ScyllaDB in :doc:`developer mode </getting-started/installation-common/dev-mod>`.
|
||||
We recommend against enabling developer mode in production environments to ensure ScyllaDB's maximum performance.
|
||||
|
||||
#. Run ScyllaDB as a service (if not already running).
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
|
||||
Now you can start using ScyllaDB. Here are some tools you may find useful.
|
||||
|
||||
Run nodetool:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool status
|
||||
|
||||
Run cqlsh:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cqlsh
|
||||
|
||||
Run cassandra-stress:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cassandra-stress write -mode cql3 native
|
||||
|
||||
.. include:: /getting-started/_common/setup-after-install.rst
|
||||
|
||||
Next Steps
|
||||
------------
|
||||
|
||||
@@ -12,7 +12,7 @@ Prerequisites
|
||||
Ensure that your platform is supported by the ScyllaDB version you want to install.
|
||||
See :doc:`OS Support by Platform and Version </getting-started/os-support/>`.
|
||||
|
||||
Installing ScyllaDB with Web Installer
|
||||
Install ScyllaDB with Web Installer
|
||||
---------------------------------------
|
||||
To install ScyllaDB with Web Installer, run:
|
||||
|
||||
@@ -40,22 +40,24 @@ options to install a different version or ScyllaDB Enterprise:
|
||||
You can run the command with the ``-h`` or ``--help`` flag to print information about the script.
|
||||
|
||||
Examples
|
||||
---------
|
||||
===========
|
||||
|
||||
Installing ScyllaDB Open Source 4.6.1:
|
||||
Installing ScyllaDB Open Source 6.0.1:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 4.6.1
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 6.0.1
|
||||
|
||||
Installing the latest patch release for ScyllaDB Open Source 4.6:
|
||||
Installing the latest patch release for ScyllaDB Open Source 6.0:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 4.6
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 6.0
|
||||
|
||||
Installing ScyllaDB Enterprise 2021.1:
|
||||
Installing ScyllaDB Enterprise 2024.1:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-product scylla-enterprise --scylla-version 2021.1
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-product scylla-enterprise --scylla-version 2024.1
|
||||
|
||||
.. include:: /getting-started/_common/setup-after-install.rst
|
||||
@@ -1,8 +1,17 @@
|
||||
Nodetool rebuild
|
||||
================
|
||||
|
||||
**rebuild** ``[<src-dc-name>]`` - This command rebuilds a node's data by streaming data from other nodes in the cluster (similarly to bootstrap).
|
||||
Rebuild operates on multiple nodes in a ScyllaDB cluster. It streams data from a single source replica when rebuilding a token range. When executing the command, ScyllaDB first figures out which ranges the local node (the one we want to rebuild) is responsible for. Then which node in the cluster contains the same ranges. Finally, ScyllaDB streams the data to the local node.
|
||||
**rebuild** ``[[--force] <source-dc-name>]`` - This command rebuilds a node's data by streaming data from other nodes in the cluster (similarly to bootstrap).
|
||||
|
||||
When executing the command, ScyllaDB first figures out which ranges the local node (the one we want to rebuild) is responsible for.
|
||||
Then which node in the cluster contains the same ranges.
|
||||
If ``source-dc-name`` is provided, ScyllaDB will stream data only from nodes in that datacenter, when safe to do so.
|
||||
Otherwise, an alternative datacenter that lost no nodes will be considered, and if none exist, all datacenters will be considered.
|
||||
Use the ``--force`` option to enforce rebuild using the source datacenter, even if it is unsafe to do so.
|
||||
|
||||
When ``rebuild`` is enabled in :doc:`Repair Based Node Operations (RBNO) </operating-scylla/procedures/cluster-management/repair-based-node-operation>`,
|
||||
data is rebuilt using repair-based-rebuild by reading all source replicas in each token range and repairing any discrepancies between them.
|
||||
Otherwise, data is streamed from a single source replica when rebuilding each token range.
|
||||
|
||||
When :doc:`adding a new data-center into an existing ScyllaDB cluster </operating-scylla/procedures/cluster-management/add-dc-to-existing-dc/>` use the rebuild command.
|
||||
|
||||
@@ -14,6 +23,6 @@ For Example:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
nodetool rebuild <src-dc-name>
|
||||
nodetool rebuild <source-dc-name>
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
.. note::
|
||||
|
||||
This page only applies to clusters where consistent topology updates are not enabled.
|
||||
This page only applies to clusters where consistent topology updates are not enabled.
|
||||
Consistent topology updates are mandatory, so **this page serves troubleshooting purposes**.
|
||||
|
||||
The page does NOT apply if you:
|
||||
|
||||
* Created a cluster with ScyllaDB 6.0 (consistent topology updates are automatically enabled).
|
||||
* Upgraded from ScyllaDB 5.4 and :doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
* Created a cluster with ScyllaDB 6.0 or later (consistent topology updates are automatically enabled).
|
||||
* `Manually enabled consistent topology updates <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_
|
||||
after upgrading to 6.0 or before upgrading to 6.1 (required).
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
(Note: If you upgraded from version 5.4 without
|
||||
:doc:`enabling consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`,
|
||||
you must additionally alter the ``system_auth`` keyspace.)
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <add-dc-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <add-new-node-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <remove-node-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <replace-node-upgrade-info>`.
|
||||
@@ -1,24 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must consider the following
|
||||
limitations while applying the procedure:
|
||||
|
||||
* You can only bootstrap one node at a time. You need to wait until the status
|
||||
of one new node becomes UN (Up Normal) before adding another new node.
|
||||
* If the node starts bootstrapping but fails in the middle, for example, due to
|
||||
a power loss, you can retry bootstrap by restarting the node. If you don't want to
|
||||
retry, or the node refuses to boot on subsequent attempts, consult the
|
||||
:doc:`Handling Membership Change Failures </operating-scylla/procedures/cluster-management/handling-membership-change-failures>`
|
||||
document.
|
||||
* The ``system_auth`` keyspace has not been upgraded to ``system``.
|
||||
As a result, if ``authenticator`` is set to ``PasswordAuthenticator``, you must
|
||||
increase the replication factor of the ``system_auth`` keyspace. It is
|
||||
recommended to set ``system_auth`` replication factor to the number of nodes
|
||||
in each DC.
|
||||
@@ -1,21 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must consider the following
|
||||
limitations while applying the procedure:
|
||||
|
||||
* It’s essential to ensure the removed node will **never** come back to the cluster,
|
||||
which might adversely affect your data (data resurrection/loss). To prevent the removed
|
||||
node from rejoining the cluster, remove that node from the cluster network or VPC.
|
||||
* You can only remove one node at a time. You need to verify that the node has
|
||||
been removed before removing another one.
|
||||
* If ``nodetool decommission`` starts executing but fails in the middle, for example,
|
||||
due to a power loss, consult the
|
||||
:doc:`Handling Membership Change Failures </operating-scylla/procedures/cluster-management/handling-membership-change-failures>`
|
||||
document.
|
||||
@@ -1,23 +0,0 @@
|
||||
|
||||
----------------------------
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must consider the following
|
||||
limitations while applying the procedure:
|
||||
|
||||
* It’s essential to ensure the replaced (dead) node will never come back to the cluster,
|
||||
which might lead to a split-brain situation. Remove the replaced (dead) node from
|
||||
the cluster network or VPC.
|
||||
* You can only replace one node at a time. You need to wait until the status
|
||||
of the new node becomes UN (Up Normal) before replacing another new node.
|
||||
* If the new node starts and begins the replace operation but then fails in the middle,
|
||||
for example, due to a power loss, you can retry the replace by restarting the node.
|
||||
If you don’t want to retry, or the node refuses to boot on subsequent attempts, consult the
|
||||
:doc:`Handling Membership Change Failures </operating-scylla/procedures/cluster-management/handling-membership-change-failures>`
|
||||
document.
|
||||
@@ -1,8 +1,6 @@
|
||||
Adding a New Data Center Into an Existing ScyllaDB Cluster
|
||||
***********************************************************
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-add-new-dc.rst
|
||||
|
||||
The following procedure specifies how to add a Data Center (DC) to a live ScyllaDB Cluster, in a single data center, :ref:`multi-availability zone <faq-best-scenario-node-multi-availability-zone>`, or multi-datacenter. Adding a DC out-scales the cluster and provides higher availability (HA).
|
||||
|
||||
The procedure includes:
|
||||
@@ -164,8 +162,6 @@ Add New DC
|
||||
* Keyspace created by the user (which needed to replicate to the new DC).
|
||||
* System: ``system_distributed``, ``system_traces``, for example, replicate the data to three nodes in the new DC.
|
||||
|
||||
.. scylladb_include_flag:: system-auth-alter-info.rst
|
||||
|
||||
For example:
|
||||
|
||||
Before
|
||||
@@ -234,7 +230,3 @@ Additional Resources for Java Clients
|
||||
* `DCAwareRoundRobinPolicy.Builder <https://java-driver.docs.scylladb.com/scylla-3.10.2.x/api/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.Builder.html>`_
|
||||
* `DCAwareRoundRobinPolicy <https://java-driver.docs.scylladb.com/scylla-3.10.2.x/api/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.html>`_
|
||||
|
||||
|
||||
.. _add-dc-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-add-new-node-or-dc.rst
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
Adding a New Node Into an Existing ScyllaDB Cluster (Out Scale)
|
||||
=================================================================
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-add-new-node.rst
|
||||
|
||||
When you add a new node, other nodes in the cluster stream data to the new node. This operation is called bootstrapping and may
|
||||
be time-consuming, depending on the data size and network bandwidth. If using a :ref:`multi-availability-zone <faq-best-scenario-node-multi-availability-zone>`, make sure they are balanced.
|
||||
|
||||
@@ -100,7 +98,3 @@ Procedure
|
||||
|
||||
#. If you are using ScyllaDB Monitoring, update the `monitoring stack <https://monitoring.docs.scylladb.com/stable/install/monitoring_stack.html#configure-scylla-nodes-from-files>`_ to monitor it. If you are using ScyllaDB Manager, make sure you install the `Manager Agent <https://manager.docs.scylladb.com/stable/install-scylla-manager-agent.html>`_, and Manager can access it.
|
||||
|
||||
|
||||
.. _add-new-node-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-add-new-node-or-dc.rst
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
Remove a Node from a ScyllaDB Cluster (Down Scale)
|
||||
***************************************************
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-remove-node.rst
|
||||
|
||||
You can remove nodes from your cluster to reduce its size.
|
||||
|
||||
-----------------------
|
||||
@@ -83,10 +81,6 @@ the ``nodetool removenode`` operation will fail. To ensure successful operation
|
||||
``nodetool removenode`` (not required when :doc:`Repair Based Node Operations (RBNO) <repair-based-node-operation>` for ``removenode``
|
||||
is enabled).
|
||||
|
||||
.. _remove-node-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-remove-node.rst
|
||||
|
||||
Additional Information
|
||||
----------------------
|
||||
* :doc:`Nodetool Reference </operating-scylla/nodetool>`
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
Replace a Dead Node in a ScyllaDB Cluster
|
||||
******************************************
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-replace-node.rst
|
||||
|
||||
Replace dead node operation will cause the other nodes in the cluster to stream data to the node that was replaced. This operation can take some time (depending on the data size and network bandwidth).
|
||||
|
||||
This procedure is for replacing one dead node. You can replace more than one dead node in parallel.
|
||||
@@ -194,7 +192,3 @@ In this case, the node's data will be cleaned after restart. To remedy this, you
|
||||
|
||||
Sometimes the public/ private IP of instance is changed after restart. If so refer to the Replace Procedure_ above.
|
||||
|
||||
|
||||
.. _replace-node-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-replace-node.rst
|
||||
|
||||
@@ -23,8 +23,6 @@ Alter the following:
|
||||
* Keyspace created by the user.
|
||||
* System: ``system_distributed``, ``system_traces``.
|
||||
|
||||
.. scylladb_include_flag:: system-auth-alter-info.rst
|
||||
|
||||
For example:
|
||||
|
||||
Before
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <authentication-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <runtime-authentication-upgrade-info>`.
|
||||
@@ -1,20 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must take additional steps
|
||||
to enable authentication:
|
||||
|
||||
* Before you start the procedure, set the ``system_auth`` keyspace replication factor
|
||||
to the number of nodes in the datacenter via cqlsh. It allows you to ensure that
|
||||
the user's information is kept highly available for the cluster. If ``system_auth``
|
||||
is not equal to the number of nodes and a node fails, the user whose information
|
||||
is on that node will be denied access.
|
||||
* After you start cqlsh with the default superuser username and password, run
|
||||
a repair on the ``system_auth`` keyspace on all the nodes in the cluster, for example:
|
||||
``nodetool repair -pr system_auth``
|
||||
@@ -1,20 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The procedures described above apply to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must take additional steps
|
||||
to enable or disable authentication without downtime:
|
||||
|
||||
* Before you enable authentication without downtime, set the ``system_auth``
|
||||
keyspace replication factor to the number of nodes in the datacenter via cqlsh.
|
||||
It allows you to ensure that the user's information is kept highly available
|
||||
for the cluster. If ``system_auth`` is not equal to the number of nodes and
|
||||
a node fails, the user whose information is on that node will be denied access.
|
||||
* After you restart the nodes when you enable or disable authentication without
|
||||
downtime, run repair on the ``system_auth`` keyspace, one node at a time on
|
||||
all the nodes in the cluster.
|
||||
@@ -1,8 +1,6 @@
|
||||
Enable Authentication
|
||||
=====================
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-authentication.rst
|
||||
|
||||
Authentication is the process where login accounts and their passwords are verified, and the user is allowed access to the database. Authentication is done internally within ScyllaDB and is not done with a third party. Users and passwords are created with roles using a ``CREATE ROLE`` statement. Refer to :doc:`Grant Authorization CQL Reference </operating-scylla/security/authorization>` for details.
|
||||
|
||||
The procedure described below enables Authentication on the ScyllaDB servers. It is intended to be used when you do **not** have applications running with ScyllaDB/Cassandra drivers.
|
||||
@@ -39,10 +37,6 @@ Procedure
|
||||
|
||||
#. If you want to create users and roles, continue to :doc:`Enable Authorization </operating-scylla/security/enable-authorization>`.
|
||||
|
||||
.. _authentication-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-authentication.rst
|
||||
|
||||
Additional Resources
|
||||
--------------------
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ In the same manner, should someone leave the organization, all you would have to
|
||||
Should someone change positions at the company, just assign the new employee to the new role and revoke roles no longer required for the new position.
|
||||
|
||||
To build an RBAC environment, you need to create the roles and their associated permissions and then assign or grant the roles to the individual users. Roles inherit the permissions of any other roles that they are granted. The hierarchy of roles can be either simple or extremely complex. This gives great flexibility to database administrators, where they can create specific permission conditions without incurring a huge administrative burden.
|
||||
In addition to standard roles, `ScyllaDB Enterprise <https://enterprise.docs.scylladb.com/>`_ users can implement `Workload Prioritization <https://enterprise.docs.scylladb.com/stable/using-scylla/workload-prioritization.html>`, which allows you to attach roles to Service Levels, thus granting resources to roles as the role demands.
|
||||
In addition to standard roles, `ScyllaDB Enterprise <https://enterprise.docs.scylladb.com/>`_ users can implement `Workload Prioritization <https://enterprise.docs.scylladb.com/stable/using-scylla/workload-prioritization.html>`_, which allows you to attach roles to Service Levels, thus granting resources to roles as the role demands.
|
||||
|
||||
.. _rbac-usecase-grant-roles-and-permissions:
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
Enable and Disable Authentication Without Downtime
|
||||
==================================================
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-runtime-authentication.rst
|
||||
|
||||
Authentication is the process where login accounts and their passwords are verified, and the user is allowed access into the database. Authentication is done internally within ScyllaDB and is not done with a third party. Users and passwords are created with :doc:`roles </operating-scylla/security/authorization>` using a ``CREATE ROLE`` statement. This procedure enables Authentication on the ScyllaDB servers using a transit state, allowing clients to work with or without Authentication at the same time. In this state, you can update the clients (application using ScyllaDB/Apache Cassandra drivers) one at the time. Once all the clients are using Authentication, you can enforce Authentication on all ScyllaDB nodes as well. If you would rather perform a faster authentication procedure where all clients (application using ScyllaDB/Apache Cassandra drivers) will stop working until they are updated to work with Authentication, refer to :doc:`Enable Authentication </operating-scylla/security/runtime-authentication>`.
|
||||
|
||||
|
||||
@@ -108,6 +106,3 @@ Procedure
|
||||
|
||||
#. Verify that all the client applications are working correctly with authentication disabled.
|
||||
|
||||
.. _runtime-authentication-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-runtime-authentication.rst
|
||||
@@ -1 +1 @@
|
||||
Perform :doc:`the procedure for enabling consistent topology changes </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Perform `the procedure for enabling consistent topology changes <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_.
|
||||
@@ -1,3 +1,3 @@
|
||||
:ref:`The Raft upgrade procedure <verify-raft-procedure>`
|
||||
or :doc:`the procedure for enabling consistent topology changes</upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`
|
||||
or `the procedure for enabling consistent topology changes <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_
|
||||
got stuck because one of the nodes failed in the middle of the procedure and is irrecoverable.
|
||||
@@ -1,3 +0,0 @@
|
||||
(Note: If you upgraded from version 5.4 without
|
||||
:doc:`enabling consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`,
|
||||
the keyspace name is ``system_auth``.)
|
||||
@@ -4,8 +4,6 @@ Reset Authenticator Password
|
||||
This procedure describes what to do when a user loses his password and can not reset it with a superuser role.
|
||||
The procedure requires cluster downtime and as a result, all auth data is deleted.
|
||||
|
||||
.. scylladb_include_flag:: system-auth-name-info.rst
|
||||
|
||||
Procedure
|
||||
.........
|
||||
|
||||
|
||||
@@ -5,12 +5,12 @@ Upgrade ScyllaDB Open Source
|
||||
.. toctree::
|
||||
:hidden:
|
||||
|
||||
ScyllaDB 5.4 to 6.0 <upgrade-guide-from-5.4-to-6.0/index>
|
||||
ScyllaDB 6.0 to 6.1 <upgrade-guide-from-6.0-to-6.1/index>
|
||||
ScyllaDB 6.x Maintenance Upgrade <upgrade-guide-from-6.x.y-to-6.x.z>
|
||||
|
||||
Procedures for upgrading to a newer version of ScyllaDB Open Source.
|
||||
|
||||
* :doc:`ScyllaDB 5.4 to 6.0 <upgrade-guide-from-5.4-to-6.0/index>`
|
||||
* :doc:`ScyllaDB 6.0 to 6.1 <upgrade-guide-from-6.0-to-6.1/index>`
|
||||
* :doc:`ScyllaDB 6.x Maintenance Upgrade <upgrade-guide-from-6.x.y-to-6.x.z>`
|
||||
|
||||
|
||||
@@ -1,113 +0,0 @@
|
||||
=====================================
|
||||
Enable Consistent Topology Updates
|
||||
=====================================
|
||||
|
||||
This article explains how to enable consistent topology changes
|
||||
when you upgrade from version 5.4 to 6.0.
|
||||
|
||||
Introduction
|
||||
============
|
||||
|
||||
ScyllaDB Open Source 6.0 introduces :ref:`consistent topology changes based on Raft <raft-topology-changes>`.
|
||||
Newly created clusters use consistent topology changes right from the start. However - unlike in the case
|
||||
of schema managed on Raft - consistent topology changes are *not* automatically enabled after the cluster
|
||||
was upgraded from an older version of ScyllaDB. If you have such a cluster, then you need to enable
|
||||
consistent topology changes manually with a dedicated upgrade procedure.
|
||||
|
||||
Before running the procedure, you **must** check that the cluster meets some prerequisites
|
||||
and you **must** ensure that some administrative procedures will not be run
|
||||
while the upgrade procedure is in progress.
|
||||
|
||||
.. _enable-raft-topology-6.0-prerequisites:
|
||||
|
||||
Prerequisites
|
||||
=============
|
||||
|
||||
* Make sure that all nodes in the cluster are upgraded to ScyllaDB Open Source 6.0.
|
||||
* Verify that :ref:`schema on raft is enabled <schema-on-raft-enabled>`.
|
||||
* Make sure that all nodes enabled ``SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`` cluster feature.
|
||||
One way to verify this is to look for the following message in the log:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
features - Feature SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES is enabled
|
||||
|
||||
Alternatively, this can be verified programmatically by checking whether ``value`` column under the key ``enabled_features`` contains the name of the feature in the ``system.scylla_local`` table.
|
||||
For example, this can be done with the following bash script:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
until cqlsh -e "select value from system.scylla_local where key = 'enabled_features'" | grep "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"
|
||||
do
|
||||
echo "Upgrade didn't finish yet on the local node, waiting 10 seconds before checking again..."
|
||||
sleep 10
|
||||
done
|
||||
echo "Upgrade completed on the local node"
|
||||
|
||||
* Make sure that all nodes are alive for the duration of the upgrade.
|
||||
|
||||
.. _enable-raft-topology-6.0-forbidden-operations:
|
||||
|
||||
Administrative operations which must not be running during upgrade
|
||||
==================================================================
|
||||
|
||||
Make sure that administrative operations will not be running while upgrade is in progress.
|
||||
In particular, you must abstain from:
|
||||
|
||||
* :doc:`Cluster management procedures </operating-scylla/procedures/cluster-management/index>` (adding, replacing, removing, decommissioning nodes etc.).
|
||||
* Running :doc:`nodetool repair </operating-scylla/nodetool-commands/repair>`.
|
||||
* Running :doc:`nodetool checkAndRepairCdcStreams </operating-scylla/nodetool-commands/checkandrepaircdcstreams>`.
|
||||
* Any modifications of :doc:`authentication </operating-scylla/security/authentication>` and :doc:`authorization </operating-scylla/security/enable-authorization>` settings.
|
||||
* Any change of authorization via :doc:`CQL API </operating-scylla/security/authorization>`.
|
||||
* Doing schema changes.
|
||||
|
||||
Running the procedure
|
||||
=====================
|
||||
|
||||
.. warning::
|
||||
|
||||
Before proceeding, make sure that all the :ref:`prerequisites <enable-raft-topology-6.0-prerequisites>` are met
|
||||
and no :ref:`forbidden administrative operations <enable-raft-topology-6.0-forbidden-operations>` will run
|
||||
during upgrade. Failing to do so may put the cluster in an inconsistent state.
|
||||
|
||||
Starting the upgrade procedure is done by issuing an POST HTTP request to the ``/storage_service/raft_topology/upgrade`` endpoint,
|
||||
to any of the nodes in the cluster.
|
||||
|
||||
For example, you can do it via ``curl``, like this:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
curl -X POST "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
|
||||
|
||||
Next, wait until all nodes report that upgrade is complete. You can check that a single node finished upgrade in one of two ways:
|
||||
|
||||
* By sending a HTTP ``GET`` request on the ``/storage_service/raft_topology/upgrade`` endpoint. For example, you can do it with ``curl`` like this:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
curl -X GET "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
|
||||
|
||||
It will return a JSON string which will be equal to ``done`` after the upgrade is complete on this node.
|
||||
|
||||
* By querying the ``upgrade_state`` column in the ``system.topology`` table. You can use ``cqlsh`` to get the value of the column like this:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
cqlsh -e "select upgrade_state from system.topology"
|
||||
|
||||
The ``upgrade_state`` column should be set to ``done`` after the upgrade is complete on this node:
|
||||
|
||||
After the upgrade is complete on all nodes, wait at least one minute before issuing any topology changes in order to avoid data loss from writes that were started before the upgrade.
|
||||
|
||||
What if upgrade gets stuck?
|
||||
===========================
|
||||
|
||||
If the process gets stuck at some point, first check the status of your cluster:
|
||||
|
||||
- If there are some nodes that are not alive, try to restart them.
|
||||
- If all nodes are alive, ensure that the network is healthy and every node can reach all other nodes.
|
||||
- If all nodes are alive and the network is healthy, perform a :doc:`rolling restart </operating-scylla/procedures/config-change/rolling-restart/>` of the cluster.
|
||||
|
||||
If none of the above solves the issue, perform :ref:`the Raft recovery procedure <recovery-procedure>`.
|
||||
During recovery, the cluster will switch back to gossip-based topology management mechanism.
|
||||
After exiting recovery, you should upgrade the cluster to consistent topology updates using the procedure described in this document.
|
||||
@@ -1,16 +0,0 @@
|
||||
=====================================
|
||||
ScyllaDB 5.4 to 6.0 Upgrade Guide
|
||||
=====================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-5.4-to-6.0-generic>
|
||||
Enable Consistent Topology Updates <enable-consistent-topology.rst>
|
||||
Metrics Update <metric-update-5.4-to-6.0>
|
||||
|
||||
|
||||
* :doc:`Upgrade ScyllaDB from 5.4.x to 6.0.y <upgrade-guide-from-5.4-to-6.0-generic>`
|
||||
* :doc:`Enable Consistent Topology Updates <enable-consistent-topology>`
|
||||
* :doc:`ScyllaDB Metrics Update - ScyllaDB 5.4 to 6.0 <metric-update-5.4-to-6.0>`
|
||||
@@ -1,64 +0,0 @@
|
||||
.. |SRC_VERSION| replace:: 5.4
|
||||
.. |NEW_VERSION| replace:: 6.0
|
||||
|
||||
ScyllaDB Metric Update - ScyllaDB |SRC_VERSION| to |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION|:
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_column_family_tablet_count
|
||||
- Tablet count
|
||||
* - scylla_cql_replication_strategy_fail_list_violations
|
||||
- Counts the number of replication_strategy_fail_list guardrail violations,
|
||||
i.e., attempts to set a forbidden replication strategy in a keyspace via
|
||||
CREATE/ALTER KEYSPACE.
|
||||
* - scylla_cql_replication_strategy_warn_list_violations
|
||||
- Counts the number of replication_strategy_warn_list guardrail violations,
|
||||
i.e., attempts to set a discouraged replication strategy in a keyspace
|
||||
via CREATE/ALTER KEYSPACE.
|
||||
* - scylla_load_balancer_resizes_emitted
|
||||
- Number of resizes produced by the load balancer
|
||||
* - scylla_load_balancer_resizes_finalized
|
||||
- Number of resizes finalized by the load balancer.
|
||||
* - scylla_reactor_fstream_read_bytes_blocked
|
||||
- Counts the number of bytes read from disk that could not be satisfied
|
||||
from read-ahead buffers, and had to block. Indicates short streams or
|
||||
incorrect read ahead configuration.
|
||||
* - scylla_reactor_fstream_read_bytes
|
||||
- Counts bytes read from disk file streams. A high rate indicates high disk
|
||||
activity. Divide by fstream_reads to determine the average read size.
|
||||
* - scylla_reactor_fstream_reads_ahead_bytes_discarded
|
||||
- Counts the number of buffered bytes that were read ahead of time and were
|
||||
discarded because they were not needed, wasting disk bandwidth. Indicates
|
||||
over-eager read ahead configuration.
|
||||
* - scylla_reactor_fstream_reads_aheads_discarded
|
||||
- Counts the number of times a buffer that was read ahead of time and was
|
||||
discarded because it was not needed, wasting disk bandwidth. Indicates
|
||||
over-eager read ahead configuration.
|
||||
* - scylla_reactor_fstream_reads_blocked
|
||||
- Counts the number of times a disk read could not be satisfied from
|
||||
read-ahead buffers, and had to block. Indicates short streams or
|
||||
incorrect read ahead configuration.
|
||||
* - scylla_reactor_fstream_reads
|
||||
- Counts reads from disk file streams. A high rate indicates high disk
|
||||
activity. Contrast with other fstream_read* counters to locate bottlenecks.
|
||||
* - scylla_tablets_count
|
||||
- Tablet count
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
=====================================
|
||||
ScyllaDB 6.0 to 6.1 Upgrade Guide
|
||||
=====================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-6.0-to-6.1-generic>
|
||||
Metrics Update <metric-update-6.0-to-6.1>
|
||||
|
||||
* :doc:`Upgrade ScyllaDB from 6.0.x to 6.1.y <upgrade-guide-from-6.0-to-6.1-generic>`
|
||||
* :doc:`ScyllaDB Metrics Update - ScyllaDB 6.0 to 6.1 <metric-update-6.0-to-6.1>`
|
||||
@@ -0,0 +1,57 @@
|
||||
.. |SRC_VERSION| replace:: 6.0
|
||||
.. |NEW_VERSION| replace:: 6.1
|
||||
|
||||
ScyllaDB Metric Update - ScyllaDB |SRC_VERSION| to |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
New Metrics
|
||||
------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION|:
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_database_total_view_updates_on_wrong_node
|
||||
- The total number of view updates which are computed on the wrong node.
|
||||
* - scylla_raft_apply_index
|
||||
- The applied index.
|
||||
* - scylla_raft_commit_index
|
||||
- The commit index.
|
||||
* - scylla_raft_log_last_term
|
||||
- The term of the last log entry.
|
||||
* - scylla_raft_log_last_index
|
||||
- The index of the last log entry.
|
||||
* - scylla_raft_snapshot_last_index
|
||||
- The index of the snapshot.
|
||||
* - scylla_raft_snapshot_last_term
|
||||
- The term of the snapshot.
|
||||
* - scylla_raft_state
|
||||
- The current state: 0 - follower, 1 - candidate, 2 - leader
|
||||
* - scylla_storage_proxy_replica_received_hints_bytes_total
|
||||
- The total size of hints and MV hints received by this node.
|
||||
* - scylla_storage_proxy_replica_received_hints_total
|
||||
- The number of hints and MV hints received by this node.
|
||||
* - scylla_storage_proxy_stats::REPLICA_STATS_CATEGORY_view_update_backlog
|
||||
- Tracks the size of ``scylla_database_view_update_backlog`` and is used
|
||||
instead of that one to calculate the max backlog across all shards, which
|
||||
is then used by other nodes to calculate appropriate throttling delays if it grows
|
||||
too large. If it's notably different from ``scylla_database_view_update_backlog``,
|
||||
it means that we're currently processing a write that generated a large number
|
||||
of view updates.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,35 +1,35 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 5.4
|
||||
.. |NEW_VERSION| replace:: 6.0
|
||||
.. |SRC_VERSION| replace:: 6.0
|
||||
.. |NEW_VERSION| replace:: 6.1
|
||||
|
||||
.. |DEBIAN_SRC_REPO| replace:: Debian
|
||||
.. _DEBIAN_SRC_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-5.4
|
||||
.. _DEBIAN_SRC_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-6.0
|
||||
|
||||
.. |UBUNTU_SRC_REPO| replace:: Ubuntu
|
||||
.. _UBUNTU_SRC_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-5.4
|
||||
.. _UBUNTU_SRC_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-6.0
|
||||
|
||||
.. |SCYLLA_DEB_SRC_REPO| replace:: ScyllaDB deb repo (|DEBIAN_SRC_REPO|_, |UBUNTU_SRC_REPO|_)
|
||||
|
||||
.. |SCYLLA_RPM_SRC_REPO| replace:: ScyllaDB rpm repo
|
||||
.. _SCYLLA_RPM_SRC_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-5.4
|
||||
.. _SCYLLA_RPM_SRC_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-6.0
|
||||
|
||||
.. |DEBIAN_NEW_REPO| replace:: Debian
|
||||
.. _DEBIAN_NEW_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-6.0
|
||||
.. _DEBIAN_NEW_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-6.1
|
||||
|
||||
.. |UBUNTU_NEW_REPO| replace:: Ubuntu
|
||||
.. _UBUNTU_NEW_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-6.0
|
||||
.. _UBUNTU_NEW_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-6.1
|
||||
|
||||
.. |SCYLLA_DEB_NEW_REPO| replace:: ScyllaDB deb repo (|DEBIAN_NEW_REPO|_, |UBUNTU_NEW_REPO|_)
|
||||
|
||||
.. |SCYLLA_RPM_NEW_REPO| replace:: ScyllaDB rpm repo
|
||||
.. _SCYLLA_RPM_NEW_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-6.0
|
||||
.. _SCYLLA_RPM_NEW_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-6.1
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 5.4 to 6.0
|
||||
.. _SCYLLA_METRICS: ../metric-update-5.4-to-6.0
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 6.0 to 6.1
|
||||
.. _SCYLLA_METRICS: ../metric-update-6.0-to-6.1
|
||||
|
||||
=============================================================================
|
||||
Upgrade |SCYLLA_NAME| from |SRC_VERSION| to |NEW_VERSION|
|
||||
@@ -47,6 +47,20 @@ It also applies when using ScyllaDB official image on EC2, GCP, or Azure.
|
||||
Before You Upgrade ScyllaDB
|
||||
==============================
|
||||
|
||||
**Ensure Consistent Topology Changes Are Enabled**
|
||||
|
||||
In ScyllaDB 6.1, the Raft-based *consistent topology changes* feature is mandatory.
|
||||
|
||||
* If you enabled the feature after upgrading from 5.4 to 6.0 or created your
|
||||
cluster with version 6.0, no action is required before upgrading to 6.1.
|
||||
* If you did not enable the feature after upgrading from 5.4 to 6.0, you must
|
||||
enable the feature before upgrading to 6.1 by following
|
||||
the `Enable Consistent Topology Updates <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_
|
||||
procedure.
|
||||
|
||||
To verify if the *consistent topology changes* feature is enabled on your cluster,
|
||||
see :ref:`Verifying that Raft is Enabled - Consistent Topology Changes <verifying-consistent-topology-changes-enabled>`.
|
||||
|
||||
**Upgrade Your Driver**
|
||||
|
||||
If you're using a :doc:`ScyllaDB driver </using-scylla/drivers/cql-drivers/index>`,
|
||||
@@ -66,11 +80,6 @@ We recommend upgrading the Monitoring Stack to the latest version.
|
||||
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
|
||||
at the `ScyllaDB Community Forum <https://forum.scylladb.com/>`_.
|
||||
|
||||
.. note::
|
||||
|
||||
In ScyllaDB 6.0, Raft-based consistent schema management for new and existing
|
||||
deployments is enabled by default and cannot be disabled.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
@@ -95,13 +104,6 @@ node before validating that the node you upgraded is up and running the new vers
|
||||
or remove nodes.
|
||||
* Not to apply schema changes.
|
||||
|
||||
**After** the upgrade:
|
||||
|
||||
* You may need to verify that Raft has been successfully initiated in your cluster.
|
||||
* You need to enable consistent topology updates.
|
||||
|
||||
See :ref:`After Upgrading Every Node <upgrade-5.4-6.0-after-upgrading-nodes>` for details.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
@@ -237,61 +239,6 @@ Validate
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
|
||||
.. _upgrade-5.4-6.0-after-upgrading-nodes:
|
||||
|
||||
After Upgrading Every Node
|
||||
===============================
|
||||
|
||||
After you have upgraded every node, perform the following procedures.
|
||||
|
||||
#. Validate Raft setup. This step only applies if you manually disabled
|
||||
the ``consistent_cluster_management`` option before upgrading to version 5.4.
|
||||
|
||||
In ScyllaDB 6.0, Raft-based consistent schema management for new and existing
|
||||
deployments is enabled by default and cannot be disabled.
|
||||
You need to verify if Raft was successfully initiated in your cluster
|
||||
**before** you proceed to the next step.
|
||||
See :ref:`Validate Raft Setup <upgrade-5.4-6.0-validate-raft-setup>` for instructions.
|
||||
|
||||
#. Enable the Raft-based consistent topology updates feature. See
|
||||
:doc:`Enable Consistent Topology Updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`
|
||||
for instructions.
|
||||
|
||||
.. _upgrade-5.4-6.0-validate-raft-setup:
|
||||
|
||||
Validate Raft Setup
|
||||
-------------------------
|
||||
|
||||
.. note::
|
||||
|
||||
Skip this step if you upgraded from 5.2 to 5.4 with default settings. This
|
||||
section only applies if you manually disabled the ``consistent_cluster_management``
|
||||
option before upgrading from version 5.2. to 5.4.
|
||||
|
||||
Enabling Raft causes the ScyllaDB cluster to start an internal Raft
|
||||
initialization procedure as soon as every node is upgraded to the new version.
|
||||
The goal of that procedure is to initialize data structures used by the Raft
|
||||
algorithm to consistently manage cluster-wide metadata, such as table schemas.
|
||||
|
||||
Assuming you performed the rolling upgrade procedure correctly (in particular,
|
||||
ensuring that the schema is synchronized on every step), and if there are no
|
||||
problems with cluster connectivity, that internal procedure should take a few
|
||||
seconds to finish. However, the procedure requires full cluster availability.
|
||||
If one of the nodes fails before the procedure finishes (for example, due to
|
||||
a hardware problem), the process may get stuck, which may prevent schema or
|
||||
topology changes in your cluster.
|
||||
|
||||
Therefore, following the rolling upgrade, you must verify that the internal
|
||||
Raft initialization procedure has finished successfully by checking the logs
|
||||
of every ScyllaDB node. If the process gets stuck, manual intervention is
|
||||
required.
|
||||
|
||||
Refer to the
|
||||
:ref:`Verifying that the internal Raft upgrade procedure finished successfully <verify-raft-procedure>`
|
||||
section for instructions on verifying that the procedure was successful and
|
||||
proceeding if it gets stuck.
|
||||
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
@@ -123,12 +123,7 @@ Download and install the new release
|
||||
|
||||
**To upgrade ScyllaDB:**
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION| and and enable scylla/ppa repo:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo add-apt-repository -y ppa:scylladb/ppa
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION|
|
||||
#. Configure Java 1.8:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
@@ -137,12 +137,7 @@ This guide only covers |SRC_VERSION|.x to |NEW_VERSION|.y upgrades.
|
||||
|
||||
**To upgrade ScyllaDB:**
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION| and enable scylla/ppa repo:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo add-apt-repository -y ppa:scylladb/ppa
|
||||
|
||||
#. Update the |SCYLLA_DEB_NEW_REPO| to |NEW_VERSION|
|
||||
#. Configure Java 1.8:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
@@ -209,8 +209,8 @@ Two time series helper tables were introduced that will help simplify the queryi
|
||||
|
||||
``sessions_time_idx`` is for querying regular traces. Another table, the ``node_slow_log_time_idx`` table, is for querying slow query records.
|
||||
|
||||
``sessions_time_idx`` and ``node_slow_log_time`` table column descriptions
|
||||
==========================================================================
|
||||
``sessions_time_idx`` and ``node_slow_log_time_idx`` table column descriptions
|
||||
===============================================================================
|
||||
|
||||
* ``minute``: the minute, from epoch time, from when the record was taken.
|
||||
* ``started_at``: a timestamp taken when the tracing session has begun.
|
||||
|
||||
29
install.sh
29
install.sh
@@ -349,6 +349,7 @@ if ! $without_systemd; then
|
||||
ExecStart=
|
||||
ExecStart=$prefix/kernel_conf/scylla_tune_sched
|
||||
EOS
|
||||
chmod 644 "$retc"/systemd/system/scylla-tune-sched.service.d/execpath.conf
|
||||
fi
|
||||
fi
|
||||
relocate_python3 "$rprefix"/kernel_conf dist/common/kernel_conf/scylla_tune_sched
|
||||
@@ -375,6 +376,7 @@ if ! $nonroot && ! $without_systemd; then
|
||||
EnvironmentFile=
|
||||
EnvironmentFile=$sysconfdir/scylla-node-exporter
|
||||
EOS
|
||||
chmod 644 "$retc"/systemd/system/scylla-node-exporter.service.d/sysconfdir.conf
|
||||
fi
|
||||
elif ! $without_systemd; then
|
||||
install -d -m755 "$rsystemd"/scylla-node-exporter.service.d
|
||||
@@ -387,7 +389,7 @@ ExecStart=$rprefix/node_exporter/node_exporter $SCYLLA_NODE_EXPORTER_ARGS
|
||||
User=
|
||||
Group=
|
||||
EOS
|
||||
|
||||
chmod 644 "$rsystemd"/scylla-node-exporter.service.d/nonroot.conf
|
||||
fi
|
||||
|
||||
# scylla-server
|
||||
@@ -436,14 +438,13 @@ install -m755 -d "$rdata"/hints
|
||||
install -m755 -d "$rdata"/view_hints
|
||||
install -m755 -d "$rdata"/coredump
|
||||
install -m755 -d "$rprefix"/swagger-ui
|
||||
cp -r swagger-ui/dist "$rprefix"/swagger-ui
|
||||
cp -pr swagger-ui/dist "$rprefix"/swagger-ui
|
||||
install -d -m755 -d "$rprefix"/api
|
||||
cp -r api/api-doc "$rprefix"/api
|
||||
cp -pr api/api-doc "$rprefix"/api
|
||||
install -d -m755 -d "$rprefix"/scyllatop
|
||||
cp -r tools/scyllatop/* "$rprefix"/scyllatop
|
||||
cp -pr tools/scyllatop/* "$rprefix"/scyllatop
|
||||
install -d -m755 -d "$rprefix"/scripts
|
||||
cp -r dist/common/scripts/* "$rprefix"/scripts
|
||||
chmod 755 "$rprefix"/scripts/*
|
||||
cp -pr dist/common/scripts/* "$rprefix"/scripts
|
||||
ln -srf "$rprefix/scyllatop/scyllatop.py" "$rprefix/bin/scyllatop"
|
||||
if $supervisor; then
|
||||
install -d -m755 "$rprefix"/supervisor
|
||||
@@ -461,6 +462,7 @@ SBINFILES+=" $(cd seastar/scripts; ls seastar-cpu-map.sh)"
|
||||
cat << EOS > "$rprefix"/scripts/scylla_product.py
|
||||
PRODUCT="$product"
|
||||
EOS
|
||||
chmod 644 "$rprefix"/scripts/scylla_product.py
|
||||
|
||||
if ! $nonroot && ! $without_systemd; then
|
||||
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
|
||||
@@ -472,6 +474,7 @@ EnvironmentFile=
|
||||
EnvironmentFile=$sysconfdir/scylla-server
|
||||
EnvironmentFile=/etc/scylla.d/*.conf
|
||||
EOS
|
||||
chmod 644 "$retc"/systemd/system/scylla-server.service.d/sysconfdir.conf
|
||||
for i in daily restart; do
|
||||
install -d -m755 "$retc"/systemd/system/scylla-housekeeping-$i.service.d
|
||||
cat << EOS > "$retc"/systemd/system/scylla-housekeeping-$i.service.d/sysconfdir.conf
|
||||
@@ -496,6 +499,7 @@ ExecStopPost=
|
||||
User=
|
||||
AmbientCapabilities=
|
||||
EOS
|
||||
chmod 644 "$rsystemd"/scylla-server.service.d/nonroot.conf
|
||||
else
|
||||
cat << EOS > "$rsystemd"/scylla-server.service.d/nonroot.conf
|
||||
[Service]
|
||||
@@ -514,6 +518,7 @@ StandardOutput=file:$rprefix/scylla-server.log
|
||||
StandardError=
|
||||
StandardError=inherit
|
||||
EOS
|
||||
chmod 644 "$rsystemd"/scylla-server.service.d/nonroot.conf
|
||||
fi
|
||||
fi
|
||||
|
||||
@@ -523,6 +528,7 @@ if ! $nonroot; then
|
||||
cat << EOS > "$rprefix"/scripts/scylla_sysconfdir.py
|
||||
SYSCONFDIR="$sysconfdir"
|
||||
EOS
|
||||
chmod 644 "$rprefix"/scripts/scylla_sysconfdir.py
|
||||
fi
|
||||
install -m755 -d "$rusr/bin"
|
||||
install -m755 -d "$rhkdata"
|
||||
@@ -530,7 +536,7 @@ EOS
|
||||
ln -srf "$rprefix/bin/iotune" "$rusr/bin/iotune"
|
||||
ln -srf "$rprefix/bin/scyllatop" "$rusr/bin/scyllatop"
|
||||
ln -srf "$rprefix/bin/nodetool" "$rusr/bin/nodetool"
|
||||
install -d "$rusr"/sbin
|
||||
install -d -m755 "$rusr"/sbin
|
||||
for i in $SBINFILES; do
|
||||
ln -srf "$rprefix/scripts/$i" "$rusr/sbin/$i"
|
||||
done
|
||||
@@ -553,7 +559,8 @@ else
|
||||
cat << EOS > "$rprefix"/scripts/scylla_sysconfdir.py
|
||||
SYSCONFDIR="$sysconfdir"
|
||||
EOS
|
||||
install -d "$rprefix"/sbin
|
||||
chmod 644 "$rprefix"/scripts/scylla_sysconfdir.py
|
||||
install -d -m755 "$rprefix"/sbin
|
||||
for i in $SBINFILES; do
|
||||
ln -srf "$rprefix/scripts/$i" "$rprefix/sbin/$i"
|
||||
done
|
||||
@@ -586,10 +593,12 @@ if $supervisor; then
|
||||
directory=$rprefix
|
||||
command=/bin/bash -c './supervisor/$service.sh'
|
||||
EOS
|
||||
chmod 644 `supervisor_conf $retc $service`
|
||||
if [ "$service" != "scylla-server" ]; then
|
||||
cat << EOS >> `supervisor_conf $retc $service`
|
||||
user=scylla
|
||||
EOS
|
||||
chmod 644 `supervisor_conf $retc $service`
|
||||
fi
|
||||
if $supervisor_log_to_stdout; then
|
||||
cat << EOS >> `supervisor_conf $retc $service`
|
||||
@@ -598,6 +607,7 @@ stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
EOS
|
||||
chmod 644 `supervisor_conf $retc $service`
|
||||
fi
|
||||
done
|
||||
fi
|
||||
@@ -611,7 +621,9 @@ if $nonroot; then
|
||||
fi
|
||||
# nonroot install is also 'offline install'
|
||||
touch $rprefix/SCYLLA-OFFLINE-FILE
|
||||
chmod 644 $rprefix/SCYLLA-OFFLINE-FILE
|
||||
touch $rprefix/SCYLLA-NONROOT-FILE
|
||||
chmod 644 $rprefix/SCYLLA-NONROOT-FILE
|
||||
if ! $without_systemd_check && check_usermode_support; then
|
||||
systemctl --user daemon-reload
|
||||
fi
|
||||
@@ -622,6 +634,7 @@ elif ! $packaging; then
|
||||
fi
|
||||
# run install.sh without --packaging is 'offline install'
|
||||
touch $rprefix/SCYLLA-OFFLINE-FILE
|
||||
chmod 644 $rprefix/SCYLLA-OFFLINE-FILE
|
||||
nousr=
|
||||
nogrp=
|
||||
getent passwd scylla || nousr=1
|
||||
|
||||
@@ -227,7 +227,7 @@ insert_token_range_to_sorted_container_while_unwrapping(
|
||||
}
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
future<dht::token_range_vector>
|
||||
vnode_effective_replication_map::do_get_ranges(noncopyable_function<stop_iteration(bool&, const inet_address&)> consider_range_for_endpoint) const {
|
||||
dht::token_range_vector ret;
|
||||
const auto& tm = *_tmptr;
|
||||
@@ -245,11 +245,12 @@ vnode_effective_replication_map::do_get_ranges(noncopyable_function<stop_iterati
|
||||
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
|
||||
}
|
||||
prev_tok = tok;
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
return ret;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
future<dht::token_range_vector>
|
||||
vnode_effective_replication_map::get_ranges(inet_address ep) const {
|
||||
// The callback function below is called for each endpoint
|
||||
// in each token natural endpoints.
|
||||
@@ -299,7 +300,7 @@ abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metad
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
future<dht::token_range_vector>
|
||||
vnode_effective_replication_map::get_primary_ranges(inet_address ep) const {
|
||||
// The callback function below is called for each endpoint
|
||||
// in each token natural endpoints.
|
||||
@@ -312,7 +313,7 @@ vnode_effective_replication_map::get_primary_ranges(inet_address ep) const {
|
||||
});
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
future<dht::token_range_vector>
|
||||
vnode_effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const {
|
||||
const topology& topo = _tmptr->get_topology();
|
||||
sstring local_dc = topo.get_datacenter(ep);
|
||||
|
||||
@@ -262,7 +262,7 @@ public:
|
||||
// This function is not efficient, and not meant for the fast path.
|
||||
//
|
||||
// Note: must be called after token_metadata has been initialized.
|
||||
virtual dht::token_range_vector get_ranges(inet_address ep) const = 0;
|
||||
virtual future<dht::token_range_vector> get_ranges(inet_address ep) const = 0;
|
||||
|
||||
shard_id shard_for_reads(const schema& s, dht::token t) const {
|
||||
return get_sharder(s).shard_for_reads(t);
|
||||
@@ -334,7 +334,7 @@ public: // effective_replication_map
|
||||
bool has_pending_ranges(locator::host_id endpoint) const override;
|
||||
std::unique_ptr<token_range_splitter> make_splitter() const override;
|
||||
const dht::sharder& get_sharder(const schema& s) const override;
|
||||
dht::token_range_vector get_ranges(inet_address ep) const override;
|
||||
future<dht::token_range_vector> get_ranges(inet_address ep) const override;
|
||||
public:
|
||||
explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map,
|
||||
ring_mapping pending_endpoints, ring_mapping read_endpoints, std::unordered_set<locator::host_id> dirty_endpoints, size_t replication_factor) noexcept
|
||||
@@ -366,14 +366,14 @@ public:
|
||||
// StorageService.getPrimaryRangesForEndpoint().
|
||||
//
|
||||
// Note: must be called after token_metadata has been initialized.
|
||||
dht::token_range_vector get_primary_ranges(inet_address ep) const;
|
||||
future<dht::token_range_vector> get_primary_ranges(inet_address ep) const;
|
||||
|
||||
// get_primary_ranges_within_dc() is similar to get_primary_ranges()
|
||||
// except it assigns a primary node for each range within each dc,
|
||||
// instead of one node globally.
|
||||
//
|
||||
// Note: must be called after token_metadata has been initialized.
|
||||
dht::token_range_vector get_primary_ranges_within_dc(inet_address ep) const;
|
||||
future<dht::token_range_vector> get_primary_ranges_within_dc(inet_address ep) const;
|
||||
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
get_range_addresses() const;
|
||||
@@ -388,7 +388,7 @@ public:
|
||||
std::unordered_set<locator::host_id> get_all_pending_nodes() const;
|
||||
|
||||
private:
|
||||
dht::token_range_vector do_get_ranges(noncopyable_function<stop_iteration(bool& add_range, const inet_address& natural_endpoint)> consider_range_for_endpoint) const;
|
||||
future<dht::token_range_vector> do_get_ranges(noncopyable_function<stop_iteration(bool& add_range, const inet_address& natural_endpoint)> consider_range_for_endpoint) const;
|
||||
inet_address_vector_replica_set do_get_natural_endpoints(const token& tok, bool is_vnode) const;
|
||||
host_id_vector_replica_set do_get_replicas(const token& tok, bool is_vnode) const;
|
||||
stop_iteration for_each_natural_endpoint_until(const token& vnode_tok, const noncopyable_function<stop_iteration(const inet_address&)>& func) const;
|
||||
|
||||
@@ -430,6 +430,9 @@ future<tablet_replica_set> network_topology_strategy::add_tablets_in_dc(schema_p
|
||||
auto& candidate = existing.empty() ?
|
||||
new_racks.emplace_back(rack) : existing_racks.emplace_back(rack);
|
||||
for (const auto& node : nodes) {
|
||||
if (!node->is_normal()) {
|
||||
continue;
|
||||
}
|
||||
const auto& host_id = node->host_id();
|
||||
if (!existing.contains(host_id)) {
|
||||
candidate.nodes.emplace_back(host_id, load.get_load(host_id));
|
||||
|
||||
@@ -630,8 +630,7 @@ public:
|
||||
return result;
|
||||
}
|
||||
|
||||
// FIXME: return a future object.
|
||||
virtual dht::token_range_vector get_ranges(inet_address ep) const override {
|
||||
virtual future<dht::token_range_vector> get_ranges(inet_address ep) const override {
|
||||
dht::token_range_vector ret;
|
||||
|
||||
auto& tablet_map = get_tablet_map();
|
||||
@@ -642,9 +641,10 @@ public:
|
||||
if (should_add_range) {
|
||||
ret.push_back(tablet_map.get_token_range(tablet_id));
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
return ret;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override {
|
||||
|
||||
@@ -356,6 +356,15 @@ public:
|
||||
bool is_candidate() const {
|
||||
return std::holds_alternative<candidate>(_state);
|
||||
}
|
||||
std::string_view current_state() const {
|
||||
static constexpr std::string_view leader_state = "Leader";
|
||||
static constexpr std::string_view follower_state = "Follower";
|
||||
static constexpr std::string_view candidate_state = "Candidate";
|
||||
if (is_leader()) {
|
||||
return leader_state;
|
||||
}
|
||||
return is_follower() ? follower_state : candidate_state;
|
||||
}
|
||||
bool is_prevote_candidate() const {
|
||||
return is_candidate() && std::get<candidate>(_state).is_prevote;
|
||||
}
|
||||
|
||||
@@ -323,7 +323,7 @@ struct no_other_voting_member : public error {
|
||||
};
|
||||
|
||||
struct request_aborted : public error {
|
||||
request_aborted() : error("Request is aborted by a caller") {}
|
||||
request_aborted(const std::string& error_msg) : error(error_msg) {}
|
||||
};
|
||||
|
||||
inline bool is_uncertainty(const std::exception& e) {
|
||||
|
||||
@@ -428,7 +428,7 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
|
||||
try {
|
||||
co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future());
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Aborted while waiting for next tick on server: {}, latest applied entry: {}", _id, _applied_idx));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -446,7 +446,7 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
||||
try {
|
||||
co_await (as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future());
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Aborted while waiting for leader on server: {}, latest applied entry: {}", _id, _applied_idx));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -458,7 +458,8 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
|
||||
try {
|
||||
return as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future();
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format(
|
||||
"Aborted while waiting for state change on server: {}, latest applied entry: {}, current state: {}", _id, _applied_idx, _fsm->current_state()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -496,9 +497,19 @@ future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
|
||||
as->check();
|
||||
}
|
||||
} catch (abort_requested_exception&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(
|
||||
format("Aborted in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on server: {}, latest applied entry: {}",
|
||||
awaited_idx,
|
||||
_snapshot_desc_idx,
|
||||
_id,
|
||||
_applied_idx));
|
||||
} catch (seastar::broken_condition_variable&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Condition variable is broken in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on "
|
||||
"server: {}, latest applied entry: {}",
|
||||
awaited_idx,
|
||||
_snapshot_desc_idx,
|
||||
_id,
|
||||
_applied_idx));
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
@@ -575,7 +586,7 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
|
||||
check_not_aborted();
|
||||
|
||||
if (as && as->abort_requested()) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Abort requested before waiting for entry with idx: {}, term: {}", eid.idx, eid.term));
|
||||
}
|
||||
|
||||
auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
|
||||
@@ -623,8 +634,9 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
|
||||
}
|
||||
assert(inserted);
|
||||
if (as) {
|
||||
it->second.abort = as->subscribe([it = it, &container] () noexcept {
|
||||
it->second.done.set_exception(request_aborted());
|
||||
it->second.abort = as->subscribe([it = it, &container] noexcept {
|
||||
it->second.done.set_exception(
|
||||
request_aborted(format("Abort requested while waiting for entry with idx: {}, term: {}", it->first, it->second.term)));
|
||||
container.erase(it);
|
||||
});
|
||||
assert(it->second.abort);
|
||||
@@ -642,7 +654,11 @@ future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_so
|
||||
try {
|
||||
memory_permit = co_await _fsm->wait_for_memory_permit(as, log::memory_usage_of(cmd, _config.max_command_size));
|
||||
} catch (semaphore_aborted&) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(
|
||||
format("Semaphore aborted while waiting for memory availability for adding entry on leader in term: {}, on server: {}, current term: {}",
|
||||
t,
|
||||
_id,
|
||||
_fsm->get_current_term()));
|
||||
}
|
||||
if (t == _fsm->get_current_term()) {
|
||||
break;
|
||||
@@ -689,7 +705,9 @@ future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, Async
|
||||
|
||||
while (true) {
|
||||
if (as && as->abort_requested()) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Request aborted while performing action on leader, current leader: {}, previous leader: {}",
|
||||
leader ? leader.to_sstring() : "unknown",
|
||||
prev_leader ? prev_leader.to_sstring() : "unknown"));
|
||||
}
|
||||
check_not_aborted();
|
||||
if (leader == server_id{}) {
|
||||
@@ -1429,7 +1447,7 @@ term_t server_impl::get_current_term() const {
|
||||
|
||||
future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
|
||||
if (as && as->abort_requested()) {
|
||||
throw request_aborted();
|
||||
throw request_aborted(format("Aborted before waiting for applying entry: {}, last applied entry: {}", idx, _applied_idx));
|
||||
}
|
||||
|
||||
check_not_aborted();
|
||||
@@ -1439,8 +1457,9 @@ future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
|
||||
// This will be signalled when read_idx is applied
|
||||
auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}});
|
||||
if (as) {
|
||||
it->second.abort = as->subscribe([this, it] () noexcept {
|
||||
it->second.promise.set_exception(request_aborted());
|
||||
it->second.abort = as->subscribe([this, it] noexcept {
|
||||
it->second.promise.set_exception(
|
||||
request_aborted(format("Aborted while waiting to apply entry: {}, last applied entry: {}", it->first, _applied_idx)));
|
||||
_awaited_indexes.erase(it);
|
||||
});
|
||||
assert(it->second.abort);
|
||||
@@ -1467,13 +1486,15 @@ future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, sea
|
||||
logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}",
|
||||
_id, rid->first, rid->second);
|
||||
if (as && as->abort_requested()) {
|
||||
return make_exception_future<read_barrier_reply>(request_aborted());
|
||||
return make_exception_future<read_barrier_reply>(
|
||||
request_aborted(format("Abort requested before waiting for read barrier from {}, read id is {} for commit idx {}", from, rid->first, rid->second)));
|
||||
}
|
||||
_reads.push_back({rid->first, rid->second, {}, {}});
|
||||
auto read = std::prev(_reads.end());
|
||||
if (as) {
|
||||
read->abort = as->subscribe([this, read] () noexcept {
|
||||
read->promise.set_exception(request_aborted());
|
||||
read->abort = as->subscribe([this, read, from] noexcept {
|
||||
read->promise.set_exception(
|
||||
request_aborted(format("Abort requested while waiting for read barrier from {}, read id is {} for commit idx {}", from, read->id, read->idx)));
|
||||
_reads.erase(read);
|
||||
});
|
||||
assert(read->abort);
|
||||
@@ -1676,12 +1697,14 @@ future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_
|
||||
|
||||
auto f = _non_joint_conf_commit_promise.emplace().promise.get_future();
|
||||
if (as) {
|
||||
_non_joint_conf_commit_promise->abort = as->subscribe([this] () noexcept {
|
||||
_non_joint_conf_commit_promise->abort = as->subscribe([this, idx = e.idx, term = e.term] noexcept {
|
||||
// If we're inside this callback, the subscription wasn't destroyed yet.
|
||||
// The subscription is destroyed when the field is reset, so if we're here, the field must be engaged.
|
||||
assert(_non_joint_conf_commit_promise);
|
||||
// Whoever resolves the promise must reset the field. Thus, if we're here, the promise is not resolved.
|
||||
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_exception(request_aborted{});
|
||||
std::exchange(_non_joint_conf_commit_promise, std::nullopt)
|
||||
->promise.set_exception(request_aborted(
|
||||
format("Aborted while setting configuration (at index: {}, term: {}, current config: {})", idx, term, _fsm->get_configuration())));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
178
repair/repair.cc
178
repair/repair.cc
@@ -47,6 +47,7 @@
|
||||
#include <atomic>
|
||||
|
||||
#include "idl/partition_checksum.dist.hh"
|
||||
#include "utils/user_provided_param.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -1244,15 +1245,15 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
|
||||
// but instead of each range being assigned just one primary owner
|
||||
// across the entire cluster, here each range is assigned a primary
|
||||
// owner in each of the DCs.
|
||||
ranges = erm.get_primary_ranges_within_dc(my_address);
|
||||
ranges = co_await erm.get_primary_ranges_within_dc(my_address);
|
||||
} else if (options.data_centers.size() > 0 || options.hosts.size() > 0) {
|
||||
throw std::invalid_argument("You need to run primary range repair on all nodes in the cluster.");
|
||||
} else {
|
||||
ranges = erm.get_primary_ranges(my_address);
|
||||
ranges = co_await erm.get_primary_ranges(my_address);
|
||||
}
|
||||
} else {
|
||||
// get keyspace local ranges
|
||||
ranges = erm.get_ranges(my_address);
|
||||
ranges = co_await erm.get_ranges(my_address);
|
||||
}
|
||||
|
||||
if (!options.data_centers.empty() && !options.hosts.empty()) {
|
||||
@@ -1750,7 +1751,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
streaming::stream_reason reason = is_removenode ? streaming::stream_reason::removenode : streaming::stream_reason::decommission;
|
||||
size_t nr_ranges_total = 0;
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
dht::token_range_vector ranges = erm->get_ranges(leaving_node);
|
||||
dht::token_range_vector ranges = erm->get_ranges(leaving_node).get();
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
nr_ranges_total += ranges.size() * nr_tables;
|
||||
}
|
||||
@@ -1777,7 +1778,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
}
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
// First get all ranges the leaving node is responsible for
|
||||
dht::token_range_vector ranges = erm->get_ranges(leaving_node);
|
||||
dht::token_range_vector ranges = erm->get_ranges(leaving_node).get();
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size() * nr_tables);
|
||||
size_t nr_ranges_total = ranges.size() * nr_tables;
|
||||
@@ -1950,12 +1951,11 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
future<> repair_service::do_rebuild_replace_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, sstring op, utils::optional_param source_dc, streaming::stream_reason reason, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node) {
|
||||
assert(this_shard_id() == 0);
|
||||
return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable {
|
||||
return seastar::async([this, ks_erms = std::move(ks_erms), tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes), replaced_node] () mutable {
|
||||
auto& db = get_db().local();
|
||||
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
|
||||
auto myip = tmptr->get_topology().my_address();
|
||||
const auto& topology = tmptr->get_topology();
|
||||
auto myid = tmptr->get_my_id();
|
||||
size_t nr_ranges_total = 0;
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
@@ -1979,8 +1979,45 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
rs.get_metrics().replace_finished_ranges = 0;
|
||||
rs.get_metrics().replace_total_ranges = nr_ranges_total;
|
||||
}).get();
|
||||
} else {
|
||||
on_internal_error(rlogger, format("do_rebuild_replace_with_repair: unsupported reason={}", reason));
|
||||
}
|
||||
rlogger.info("{}: started with keyspaces={}, source_dc={}, nr_ranges_total={}, ignore_nodes={}", op, ks_erms | boost::adaptors::map_keys, source_dc, nr_ranges_total, ignore_nodes);
|
||||
std::unordered_set<locator::host_id> all_live_nodes;
|
||||
std::unordered_map<sstring, std::unordered_set<locator::host_id>> live_nodes_per_dc;
|
||||
std::unordered_map<sstring, size_t> lost_nodes_per_dc;
|
||||
topology.for_each_node([&] (const locator::node* node) {
|
||||
const auto& host_id = node->host_id();
|
||||
const auto& dc = node->dc_rack().dc;
|
||||
if (node->is_this_node()) {
|
||||
// Count the rebuilt node as lost.
|
||||
// For replace, we count the replaced_node below.
|
||||
if (reason == streaming::stream_reason::rebuild) {
|
||||
lost_nodes_per_dc[dc]++;
|
||||
}
|
||||
} else if (host_id == replaced_node || ignore_nodes.contains(host_id)) {
|
||||
lost_nodes_per_dc[dc]++;
|
||||
} else {
|
||||
all_live_nodes.insert(host_id);
|
||||
live_nodes_per_dc[dc].insert(host_id);
|
||||
}
|
||||
});
|
||||
// Sanity check
|
||||
auto mydc = topology.get_datacenter();
|
||||
if (!lost_nodes_per_dc[mydc]) {
|
||||
rlogger.warn("Expected at least 1 lost nodes in my dc={}: lost_nodes_per_dc={} live_nodes_per_dc={}", mydc, lost_nodes_per_dc, live_nodes_per_dc);
|
||||
}
|
||||
rlogger.debug("live_nodes_per_dc={}", live_nodes_per_dc);
|
||||
rlogger.debug("lost_nodes_per_dc={}", lost_nodes_per_dc);
|
||||
if (source_dc) {
|
||||
if (!topology.get_datacenters().contains(*source_dc)) {
|
||||
throw std::runtime_error(format("{}: Could not find source_dc={} in datacenters={}", op, *source_dc, topology.get_datacenters()));
|
||||
}
|
||||
if (topology.get_datacenters().size() == 1) {
|
||||
rlogger.info("{}: source_dc={} ignored since the cluster has a single datacenter", op, *source_dc);
|
||||
source_dc.reset();
|
||||
}
|
||||
}
|
||||
rlogger.info("{}: started with keyspaces={}, source_dc={}, nr_ranges_total={}, ignore_nodes={} replaced_node={}", op, ks_erms | boost::adaptors::map_keys, source_dc, nr_ranges_total, ignore_nodes, replaced_node);
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
size_t nr_ranges_skipped = 0;
|
||||
if (!db.has_keyspace(keyspace_name)) {
|
||||
@@ -1989,28 +2026,108 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
}
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myid, *tmptr).get();
|
||||
auto& topology = erm->get_token_metadata().get_topology();
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}, ignore_nodes={}", op, keyspace_name, source_dc, ranges.size() * nr_tables, ignore_nodes);
|
||||
sstring source_dc_for_keyspace;
|
||||
// Allow repairing in the source_dc only if there are enough replicas remaining
|
||||
if (source_dc) {
|
||||
switch (strat.get_type()) {
|
||||
case locator::replication_strategy_type::network_topology: {
|
||||
const auto& nt_strat = dynamic_cast<const locator::network_topology_strategy&>(strat);
|
||||
size_t rf = nt_strat.get_replication_factor(*source_dc);
|
||||
auto lost = lost_nodes_per_dc[*source_dc];
|
||||
source_dc_for_keyspace = *source_dc;
|
||||
|
||||
auto find_alternative_datacenter = [&] {
|
||||
std::vector<sstring> dcs;
|
||||
dcs.reserve(live_nodes_per_dc.size());
|
||||
std::ranges::copy_if(topology.get_datacenters(), std::back_inserter(dcs), [&] (const auto& dc) {
|
||||
return dc != *source_dc && !lost_nodes_per_dc[dc];
|
||||
});
|
||||
if (!dcs.empty()) {
|
||||
std::uniform_int_distribution<int> dist(0, dcs.size() - 1);
|
||||
return dcs[dist(_random_engine)];
|
||||
}
|
||||
return sstring();
|
||||
};
|
||||
|
||||
// See if it is safe to rebuild/replace from the source_dc.
|
||||
// We identify two cases:
|
||||
// 1. lost > 1: the datacenter has lost additional nodes other than the one being rebuilt/replaced.
|
||||
// In this case we may have lost data that may be present in other DCs but no longer in the source_dc,
|
||||
// due to insufficient consistency_level on write (e.g. CL=1), or too small replication factor.
|
||||
// 2. lost == 1 && rf <= 1: if we lost even a single node in the source_dc, tokens it owned with RF=1 will be lost,
|
||||
// so we need to rebuild/replace from another dc.
|
||||
//
|
||||
// Note that if lost==1 and rf > 1, we would still use the source_dc.
|
||||
// This could miss data written successfully only to a single node with CL=ONE,
|
||||
// requiring cluster-wide repair or repair from an alternative dc.
|
||||
if (lost > 1 || (lost == 1 && rf <= 1)) {
|
||||
auto msg = format("{}: it is unsafe to use source_dc={} to rebuild/replace keyspace={} since it lost {} nodes, rf={}", op, source_dc, keyspace_name, lost, rf);
|
||||
if (source_dc.force()) {
|
||||
rlogger.warn("{}: using source_dc anyway according to the force option", msg);
|
||||
} else if (source_dc.user_provided()) {
|
||||
auto alt_dc = find_alternative_datacenter();
|
||||
if (!alt_dc.empty()) {
|
||||
throw std::runtime_error(format("{}: It is advised to select another datacenter (e.g. {}) that has lost no nodes, or omit the source_dc option to allow using all DCs in the cluster. Or, use the --force option to enforce using source_dc={}", msg, alt_dc, source_dc));
|
||||
} else {
|
||||
throw std::runtime_error(format("{}: found no alternative datacenter: omit the source_dc option to allow using all DCs in the cluster, or use the --force option to enforce source_dc={}", msg, source_dc));
|
||||
}
|
||||
} else {
|
||||
auto alt_dc = find_alternative_datacenter();
|
||||
if (!alt_dc.empty()) {
|
||||
// Use alt_dc instead if source_dc_for_keyspace
|
||||
source_dc_for_keyspace = alt_dc;
|
||||
rlogger.warn("{}: will use alternative dc={} instead", msg, alt_dc);
|
||||
} else {
|
||||
rlogger.warn("{}: found no alternative datacenter, falling back to sync data using all replicas", msg);
|
||||
source_dc_for_keyspace = "";
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case locator::replication_strategy_type::everywhere_topology:
|
||||
// If source_dc_live_nodes is not empty, we can use any remaining nodes
|
||||
if (live_nodes_per_dc.contains(*source_dc)) {
|
||||
source_dc_for_keyspace = *source_dc;
|
||||
} else {
|
||||
source_dc_for_keyspace = "";
|
||||
}
|
||||
break;
|
||||
case locator::replication_strategy_type::simple:
|
||||
// With simple strategy, we have no assurance that source_dc will contain
|
||||
// another replica for all token ranges.
|
||||
source_dc_for_keyspace = "";
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!source_dc_for_keyspace.empty() && !live_nodes_per_dc.contains(source_dc_for_keyspace)) {
|
||||
on_internal_error(rlogger, format("do_rebuild_replace_with_repair: cannot find source_dc_for_keyspace={} in live_nodes_per_dc={}", source_dc_for_keyspace, live_nodes_per_dc));
|
||||
}
|
||||
const auto& sync_nodes = source_dc_for_keyspace.empty() ? all_live_nodes : live_nodes_per_dc.at(source_dc_for_keyspace);
|
||||
rlogger.info("{}: started with keyspace={}, nr_ranges={}, sync_nodes={}, ignore_nodes={} replaced_node={}", op, keyspace_name, ranges.size() * nr_tables, sync_nodes, ignore_nodes, replaced_node);
|
||||
for (auto it = ranges.begin(); it != ranges.end();) {
|
||||
auto& r = *it;
|
||||
seastar::thread::maybe_yield();
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_ips(end_token, *tmptr).get() |
|
||||
boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) {
|
||||
if (node == myip) {
|
||||
auto natural_eps = strat.calculate_natural_endpoints(end_token, *tmptr).get();
|
||||
auto neighbors = boost::copy_range<std::unordered_map<locator::host_id, gms::inet_address>>(natural_eps |
|
||||
boost::adaptors::filtered([&] (const auto& node) {
|
||||
if (topology.is_me(node)) {
|
||||
return false;
|
||||
}
|
||||
if (ignore_nodes.contains(node)) {
|
||||
return false;
|
||||
}
|
||||
return source_dc.empty() ? true : topology.get_datacenter(node) == source_dc;
|
||||
return sync_nodes.contains(node);
|
||||
}) | boost::adaptors::transformed([&topology] (const auto& node) {
|
||||
const auto& n = topology.get_node(node);
|
||||
return std::make_tuple(n.host_id(), n.endpoint());
|
||||
})
|
||||
);
|
||||
rlogger.debug("{}: keyspace={}, range={}, neighbors={}", op, keyspace_name, r, neighbors);
|
||||
rlogger.debug("{}: keyspace={}, range={}, natural_enpoints={}, neighbors={}", op, keyspace_name, r, natural_eps, neighbors);
|
||||
if (!neighbors.empty()) {
|
||||
range_sources[r] = repair_neighbors(std::move(neighbors));
|
||||
range_sources[r] = repair_neighbors(neighbors);
|
||||
++it;
|
||||
} else {
|
||||
// Skip the range with zero neighbors
|
||||
@@ -2029,21 +2146,22 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, nullptr).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc_for_keyspace, nr_ranges);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, ks_erms | boost::adaptors::map_keys, source_dc);
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) {
|
||||
future<> repair_service::rebuild_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, utils::optional_param source_dc) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto op = sstring("rebuild_with_repair");
|
||||
if (source_dc.empty()) {
|
||||
auto& topology = tmptr->get_topology();
|
||||
source_dc = topology.get_datacenter();
|
||||
const auto& topology = tmptr->get_topology();
|
||||
if (!source_dc) {
|
||||
source_dc = utils::optional_param(topology.get_datacenter());
|
||||
}
|
||||
auto reason = streaming::stream_reason::rebuild;
|
||||
co_await do_rebuild_replace_with_repair(std::move(tmptr), std::move(op), std::move(source_dc), reason, {});
|
||||
rlogger.info("{}: this-node={} source_dc={}", op, *topology.this_node(), source_dc);
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(tmptr), std::move(op), std::move(source_dc), reason);
|
||||
co_await get_db().invoke_on_all([](replica::database& db) {
|
||||
for (auto& t : db.get_non_system_column_families()) {
|
||||
t->trigger_offstrategy_compaction();
|
||||
@@ -2051,7 +2169,7 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr,
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto cloned_tm = co_await tmptr->clone_async();
|
||||
auto op = sstring("replace_with_repair");
|
||||
@@ -2063,7 +2181,9 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
|
||||
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
|
||||
cloned_tmptr->update_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing);
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), myloc.dc, reason, std::move(ignore_nodes));
|
||||
auto source_dc = utils::optional_param(myloc.dc);
|
||||
rlogger.info("{}: this-node={} ignore_nodes={} source_dc={}", op, *topology.this_node(), ignore_nodes, source_dc);
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes), replaced_node);
|
||||
}
|
||||
|
||||
static std::unordered_set<gms::inet_address> get_nodes_in_dcs(std::vector<sstring> data_centers, locator::effective_replication_map_ptr erm) {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <exception>
|
||||
#include <absl/container/btree_set.h>
|
||||
#include <fmt/core.h>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
@@ -145,6 +146,9 @@ public:
|
||||
explicit repair_neighbors(std::vector<gms::inet_address> a)
|
||||
: all(std::move(a)) {
|
||||
}
|
||||
explicit repair_neighbors(const std::unordered_map<locator::host_id, gms::inet_address>& a)
|
||||
: all(boost::copy_range<std::vector<gms::inet_address>>(a | boost::adaptors::map_values)) {
|
||||
}
|
||||
repair_neighbors(std::vector<gms::inet_address> a, std::vector<gms::inet_address> m)
|
||||
: all(std::move(a))
|
||||
, mandatory(std::move(m)) {
|
||||
|
||||
@@ -2296,6 +2296,10 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes));
|
||||
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
|
||||
try {
|
||||
bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized");
|
||||
if (!_bm.local_is_initialized() || bm_throw) {
|
||||
throw std::runtime_error("Backlog manager isn't initialized");
|
||||
}
|
||||
co_await coroutine::all(
|
||||
[this, &from, &req, &sync_point, &deadline] () -> future<> {
|
||||
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
@@ -3339,9 +3343,7 @@ repair_service::insert_repair_meta(
|
||||
reason,
|
||||
compaction_time] (schema_ptr s) {
|
||||
auto& db = get_db();
|
||||
auto& cf = db.local().find_column_family(s->id());
|
||||
return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout, {}).then([s = std::move(s),
|
||||
&cf,
|
||||
return db.local().obtain_reader_permit(db.local().find_column_family(s->id()), "repair-meta", db::no_timeout, {}).then([s = std::move(s),
|
||||
this,
|
||||
from,
|
||||
repair_meta_id,
|
||||
@@ -3354,7 +3356,7 @@ repair_service::insert_repair_meta(
|
||||
compaction_time] (reader_permit permit) mutable {
|
||||
node_repair_meta_id id{from, repair_meta_id};
|
||||
auto rm = seastar::make_shared<repair_meta>(*this,
|
||||
cf,
|
||||
get_db().local().find_column_family(s->id()),
|
||||
s,
|
||||
std::move(permit),
|
||||
range,
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <vector>
|
||||
#include "gms/inet_address.hh"
|
||||
#include "repair/repair.hh"
|
||||
@@ -17,6 +19,7 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include "service/raft/raft_address_map.hh"
|
||||
#include "utils/user_provided_param.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
@@ -112,6 +115,8 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
|
||||
future<> _load_history_done = make_ready_future<>();
|
||||
|
||||
mutable std::default_random_engine _random_engine{std::random_device{}()};
|
||||
|
||||
future<> init_ms_handlers();
|
||||
future<> uninit_ms_handlers();
|
||||
|
||||
@@ -149,11 +154,12 @@ public:
|
||||
future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(locator::token_metadata_ptr tmptr);
|
||||
future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc);
|
||||
future<> replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
future<> rebuild_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, utils::optional_param source_dc);
|
||||
future<> replace_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node);
|
||||
private:
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
|
||||
future<> do_rebuild_replace_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, sstring op, utils::optional_param source_dc, streaming::stream_reason reason, std::unordered_set<locator::host_id> ignore_nodes = {}, locator::host_id replaced_node = {});
|
||||
|
||||
// Must be called on shard 0
|
||||
future<> sync_data_using_repair(sstring keyspace,
|
||||
|
||||
@@ -910,33 +910,10 @@ future<> database::add_column_family(keyspace& ks, schema_ptr schema, column_fam
|
||||
throw std::invalid_argument("Column family " + schema->cf_name() + " exists");
|
||||
}
|
||||
cf->start();
|
||||
std::exception_ptr ex = nullptr;
|
||||
try {
|
||||
ks.add_or_update_column_family(schema);
|
||||
schema->registry_entry()->set_table(cf->weak_from_this());
|
||||
co_await _tables_metadata.add_table(schema);
|
||||
if (schema->is_view()) {
|
||||
find_column_family(schema->view_info()->base_id()).add_or_update_view(view_ptr(schema));
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
if (ex) {
|
||||
// Wrap in noexcept lambda to shutdown on failure.
|
||||
auto revert_changes = [&] () noexcept -> future<> {
|
||||
if (schema->is_view()) {
|
||||
try {
|
||||
find_column_family(schema->view_info()->base_id()).remove_view(view_ptr(schema));
|
||||
} catch (no_such_column_family&) {
|
||||
// Accept that a table is dropped, continue reverting changes.
|
||||
}
|
||||
}
|
||||
co_await _tables_metadata.remove_table(schema);
|
||||
ks.metadata()->remove_column_family(schema);
|
||||
co_await cf->stop();
|
||||
};
|
||||
co_await revert_changes();
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
auto f = co_await coroutine::as_future(_tables_metadata.add_table(*this, ks, *cf, schema));
|
||||
if (f.failed()) {
|
||||
co_await cf->stop();
|
||||
co_await coroutine::return_exception_ptr(f.get_exception());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -967,18 +944,8 @@ bool database::update_column_family(schema_ptr new_schema) {
|
||||
}
|
||||
|
||||
future<> database::remove(table& cf) noexcept {
|
||||
auto s = cf.schema();
|
||||
auto& ks = find_keyspace(s->ks_name());
|
||||
cf.deregister_metrics();
|
||||
co_await _tables_metadata.remove_table(s);
|
||||
ks.metadata()->remove_column_family(s);
|
||||
if (s->is_view()) {
|
||||
try {
|
||||
find_column_family(s->view_info()->base_id()).remove_view(view_ptr(s));
|
||||
} catch (no_such_column_family&) {
|
||||
// Drop view mutations received after base table drop.
|
||||
}
|
||||
}
|
||||
return _tables_metadata.remove_table(*this, cf);
|
||||
}
|
||||
|
||||
future<> database::detach_column_family(table& cf) {
|
||||
@@ -1766,8 +1733,8 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
auto slice = query::partition_slice(std::move(cr_ranges), std::move(static_columns),
|
||||
std::move(regular_columns), { }, { }, query::max_rows);
|
||||
|
||||
return do_with(std::move(slice), std::move(m), std::vector<locked_cell>(),
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state), op = cf.write_in_progress()] (const query::partition_slice& slice, mutation& m, std::vector<locked_cell>& locks) mutable {
|
||||
return do_with(std::move(slice), std::move(m), cf.write_in_progress(), std::vector<locked_cell>(),
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state)] (const query::partition_slice& slice, mutation& m, const utils::phased_barrier::operation& op, std::vector<locked_cell>& locks) mutable {
|
||||
tracing::trace(trace_state, "Acquiring counter locks");
|
||||
return cf.lock_counter_cells(m, timeout).then([&, m_schema = cf.schema(), trace_state = std::move(trace_state), timeout, this] (std::vector<locked_cell> lcs) mutable {
|
||||
locks = std::move(lcs);
|
||||
@@ -2614,19 +2581,9 @@ const sstring& database::get_snitch_name() const {
|
||||
return _cfg.endpoint_snitch();
|
||||
}
|
||||
|
||||
dht::token_range_vector database::get_keyspace_local_ranges(sstring ks) {
|
||||
auto my_address = get_token_metadata().get_topology().my_address();
|
||||
return find_keyspace(ks).get_vnode_effective_replication_map()->get_ranges(my_address);
|
||||
}
|
||||
|
||||
std::optional<dht::token_range_vector> database::maybe_get_keyspace_local_ranges(sstring ks) {
|
||||
const auto& keyspace = find_keyspace(ks);
|
||||
if (keyspace.get_replication_strategy().is_per_table()) {
|
||||
// return nullopt if each tables have their own effective_replication_map
|
||||
return std::nullopt;
|
||||
}
|
||||
auto my_address = get_token_metadata().get_topology().my_address();
|
||||
return keyspace.get_vnode_effective_replication_map()->get_ranges(my_address);
|
||||
future<dht::token_range_vector> database::get_keyspace_local_ranges(locator::vnode_effective_replication_map_ptr erm) {
|
||||
auto my_address = erm->get_topology().my_address();
|
||||
co_return co_await erm->get_ranges(my_address);
|
||||
}
|
||||
|
||||
/*!
|
||||
@@ -2833,29 +2790,71 @@ future<> database::drain() {
|
||||
b.cancel();
|
||||
}
|
||||
|
||||
void database::tables_metadata::add_table_helper(database& db, keyspace& ks, table& cf, schema_ptr s) {
|
||||
// A table needs to be added atomically.
|
||||
auto id = s->id();
|
||||
ks.add_or_update_column_family(s);
|
||||
auto remove_cf1 = defer([&] () noexcept { ks.metadata()->remove_column_family(s); });
|
||||
// A table will be removed via weak pointer and destructors.
|
||||
s->registry_entry()->set_table(cf.weak_from_this());
|
||||
|
||||
_column_families.emplace(id, s->table().shared_from_this());
|
||||
auto remove_cf2 = defer([&] () noexcept {
|
||||
_column_families.erase(s->id());
|
||||
});
|
||||
_ks_cf_to_uuid.emplace(std::make_pair(s->ks_name(), s->cf_name()), id);
|
||||
auto remove_cf3 = defer([&] () noexcept {
|
||||
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
|
||||
});
|
||||
|
||||
if (s->is_view()) {
|
||||
db.find_column_family(s->view_info()->base_id()).add_or_update_view(view_ptr(s));
|
||||
}
|
||||
auto remove_view = defer([&] () noexcept {
|
||||
if (s->is_view()) {
|
||||
try {
|
||||
db.find_column_family(s->view_info()->base_id()).remove_view(view_ptr(s));
|
||||
} catch (no_such_column_family&) {
|
||||
// Drop view mutations received after base table drop.
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
remove_cf1.cancel();
|
||||
remove_cf2.cancel();
|
||||
remove_cf3.cancel();
|
||||
remove_view.cancel();
|
||||
}
|
||||
|
||||
void database::tables_metadata::remove_table_helper(database& db, keyspace& ks, table& cf, schema_ptr s) {
|
||||
// A table needs to be removed atomically.
|
||||
_column_families.erase(s->id());
|
||||
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
|
||||
ks.metadata()->remove_column_family(s);
|
||||
if (s->is_view()) {
|
||||
try {
|
||||
db.find_column_family(s->view_info()->base_id()).remove_view(view_ptr(s));
|
||||
} catch (no_such_column_family&) {
|
||||
// Drop view mutations received after base table drop.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t database::tables_metadata::size() const noexcept {
|
||||
return _column_families.size();
|
||||
}
|
||||
|
||||
future<> database::tables_metadata::add_table(schema_ptr schema) {
|
||||
future<> database::tables_metadata::add_table(database& db, keyspace& ks, table& cf, schema_ptr s) {
|
||||
auto holder = co_await _cf_lock.hold_write_lock();
|
||||
auto id = schema->id();
|
||||
auto kscf = std::make_pair(schema->ks_name(), schema->cf_name());
|
||||
try {
|
||||
_column_families.emplace(id, schema->table().shared_from_this());
|
||||
_ks_cf_to_uuid.emplace(kscf, id);
|
||||
} catch (...) {
|
||||
_ks_cf_to_uuid.erase(std::move(kscf));
|
||||
_column_families.erase(id);
|
||||
throw;
|
||||
}
|
||||
add_table_helper(db, ks, cf, s);
|
||||
}
|
||||
|
||||
future<> database::tables_metadata::remove_table(schema_ptr schema) noexcept {
|
||||
future<> database::tables_metadata::remove_table(database& db, table& cf) noexcept {
|
||||
try {
|
||||
auto holder = co_await _cf_lock.hold_write_lock();
|
||||
_column_families.erase(schema->id());
|
||||
_ks_cf_to_uuid.erase(std::make_pair(schema->ks_name(), schema->cf_name()));
|
||||
auto s = cf.schema();
|
||||
auto& ks = db.find_keyspace(s->ks_name());
|
||||
remove_table_helper(db, ks, cf, s);
|
||||
} catch (...) {
|
||||
on_fatal_internal_error(dblog, format("tables_metadata::remove_cf: {}", std::current_exception()));
|
||||
}
|
||||
|
||||
@@ -1393,11 +1393,14 @@ public:
|
||||
rwlock _cf_lock;
|
||||
std::unordered_map<table_id, lw_shared_ptr<column_family>> _column_families;
|
||||
ks_cf_to_uuid_t _ks_cf_to_uuid;
|
||||
private:
|
||||
void add_table_helper(database& db, keyspace& ks, table& cf, schema_ptr s);
|
||||
void remove_table_helper(database& db, keyspace& ks, table& cf, schema_ptr s);
|
||||
public:
|
||||
size_t size() const noexcept;
|
||||
|
||||
future<> add_table(schema_ptr schema);
|
||||
future<> remove_table(schema_ptr schema) noexcept;
|
||||
future<> add_table(database& db, keyspace& ks, table& cf, schema_ptr s);
|
||||
future<> remove_table(database& db, table& cf) noexcept;
|
||||
table& get_table(table_id id) const;
|
||||
table_id get_table_id(const std::pair<std::string_view, std::string_view>& kscf) const;
|
||||
lw_shared_ptr<table> get_table_if_exists(table_id id) const;
|
||||
@@ -1781,8 +1784,7 @@ public:
|
||||
|
||||
// Returns the list of ranges held by this endpoint
|
||||
// The returned list is sorted, and its elements are non overlapping and non wrap-around.
|
||||
dht::token_range_vector get_keyspace_local_ranges(sstring ks);
|
||||
std::optional<dht::token_range_vector> maybe_get_keyspace_local_ranges(sstring ks);
|
||||
future<dht::token_range_vector> get_keyspace_local_ranges(locator::vnode_effective_replication_map_ptr erm);
|
||||
|
||||
void set_format(sstables::sstable_version_types format) noexcept;
|
||||
void set_format_by_config();
|
||||
|
||||
@@ -220,7 +220,8 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, shard
|
||||
// - segregate resharded tables into compaction groups
|
||||
// - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction
|
||||
// so that cleanup can be considered per compaction group
|
||||
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(ks));
|
||||
const auto& erm = db.local().find_keyspace(ks).get_vnode_effective_replication_map();
|
||||
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(erm).get());
|
||||
reshard(directory, db, ks, cf, make_sstable, owned_ranges_ptr).get();
|
||||
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, make_sstable,
|
||||
[] (const sstables::shared_sstable&) { return true; }).get();
|
||||
|
||||
@@ -405,6 +405,20 @@ future<mutation_reader> make_partition_mutation_dump_reader(
|
||||
tracing::trace_state_ptr ts,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
const auto& tbl = db.local().find_column_family(underlying_schema);
|
||||
|
||||
// We can get a request for a token we don't own.
|
||||
// Just return empty reader in this case, otherwise we will hit
|
||||
// std::terminate because the replica side does not handle requests for
|
||||
// un-owned tokens.
|
||||
{
|
||||
auto erm = tbl.get_effective_replication_map();
|
||||
auto& topo = erm->get_topology();
|
||||
const auto endpoints = erm->get_endpoints_for_reading(dk.token());
|
||||
if (std::ranges::find(endpoints, topo.this_node()->endpoint()) == endpoints.end()) {
|
||||
co_return make_empty_flat_reader_v2(output_schema, std::move(permit));
|
||||
}
|
||||
}
|
||||
|
||||
const auto shard = tbl.shard_for_reads(dk.token());
|
||||
if (shard == this_shard_id()) {
|
||||
co_return make_mutation_reader<mutation_dump_reader>(std::move(output_schema), std::move(underlying_schema), std::move(permit),
|
||||
@@ -561,6 +575,7 @@ schema_ptr generate_output_schema_from_underlying_schema(schema_ptr underlying_s
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
|
||||
sharded<database>& db,
|
||||
locator::effective_replication_map_ptr erm_keepalive,
|
||||
schema_ptr output_schema,
|
||||
schema_ptr underlying_schema,
|
||||
const dht::partition_range_vector& prs,
|
||||
|
||||
@@ -11,12 +11,21 @@
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "query-result.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
class effective_replication_map;
|
||||
|
||||
using effective_replication_map_ptr = seastar::shared_ptr<const effective_replication_map>;
|
||||
|
||||
}
|
||||
|
||||
namespace replica::mutation_dump {
|
||||
|
||||
schema_ptr generate_output_schema_from_underlying_schema(schema_ptr underlying_schema);
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
|
||||
sharded<database>& db,
|
||||
locator::effective_replication_map_ptr erm_keepalive,
|
||||
schema_ptr output_schema, // must have been generated from `underlying_schema`, with `generate_output_schema_from_underlying_schema()`
|
||||
schema_ptr underlying_schema,
|
||||
const dht::partition_range_vector& pr,
|
||||
|
||||
@@ -485,6 +485,14 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke
|
||||
}
|
||||
|
||||
void compaction_group::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
|
||||
// If group was closed / is being closed, it's ok to ignore request to adjust backlog tracker,
|
||||
// since that might result in an exception due to the group being deregistered from compaction
|
||||
// manager already. And the group is being removed anyway, so that won't have any practical
|
||||
// impact.
|
||||
if (_async_gate.is_closed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto& tracker = get_backlog_tracker();
|
||||
tracker.replace_sstables(old_sstables, new_sstables);
|
||||
}
|
||||
@@ -3628,7 +3636,13 @@ future<> storage_group::stop() noexcept {
|
||||
auto closed_gate_fut = _async_gate.close();
|
||||
|
||||
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
|
||||
co_await coroutine::parallel_for_each(compaction_groups(), [] (const compaction_group_ptr& cg_ptr) {
|
||||
|
||||
// The reason we have to stop main cg first, is because an ongoing split always run in main cg
|
||||
// and output will be written to left and right groups. If either left or right are stopped before
|
||||
// main, split completion will add sstable to a closed group, and that might in turn trigger an
|
||||
// exception while running under row_cache::external_updater::execute, resulting in node crash.
|
||||
co_await _main_cg->stop();
|
||||
co_await coroutine::parallel_for_each(_split_ready_groups, [] (const compaction_group_ptr& cg_ptr) {
|
||||
return cg_ptr->stop();
|
||||
});
|
||||
co_await std::move(closed_gate_fut);
|
||||
|
||||
@@ -378,14 +378,6 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
tablet_sstable_set(const tablet_sstable_set& o)
|
||||
: _schema(o._schema)
|
||||
, _tablet_map(o._tablet_map.tablet_count())
|
||||
, _sstable_sets(o._sstable_sets)
|
||||
, _size(o._size)
|
||||
, _bytes_on_disk(o._bytes_on_disk)
|
||||
{}
|
||||
|
||||
static lw_shared_ptr<sstables::sstable_set> make(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) {
|
||||
return make_lw_shared<sstables::sstable_set>(std::make_unique<tablet_sstable_set>(std::move(s), sgm, tmap));
|
||||
}
|
||||
|
||||
@@ -906,7 +906,7 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
|
||||
},
|
||||
guard, std::move(description));
|
||||
|
||||
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &_as);
|
||||
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), _as);
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_without_raft(std::vector<mutation> schema, group0_guard guard) {
|
||||
@@ -993,7 +993,7 @@ future<> migration_manager::announce<topology_change>(std::vector<mutation> sche
|
||||
|
||||
future<group0_guard> migration_manager::start_group0_operation() {
|
||||
assert(this_shard_id() == 0);
|
||||
return _group0_client.start_operation(&_as, raft_timeout{});
|
||||
return _group0_client.start_operation(_as, raft_timeout{});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
#pragma once
|
||||
#include "seastar/core/semaphore.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "log.hh"
|
||||
#include "utils/digest_algorithm.hh"
|
||||
@@ -31,6 +32,7 @@ private:
|
||||
|
||||
class key_lock_map {
|
||||
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
|
||||
using semaphore_units = semaphore_units<semaphore_default_exception_factory, clock_type>;
|
||||
using map = std::unordered_map<dht::token, semaphore>;
|
||||
|
||||
semaphore& get_semaphore_for_key(const dht::token& key);
|
||||
@@ -46,22 +48,15 @@ private:
|
||||
key_lock_map& _map;
|
||||
dht::token _key;
|
||||
clock_type::time_point _timeout;
|
||||
bool _locked = false;
|
||||
key_lock_map::semaphore_units _units;
|
||||
public:
|
||||
future<> lock() {
|
||||
auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1);
|
||||
_locked = true;
|
||||
return f;
|
||||
future<> lock () {
|
||||
return get_units(_map.get_semaphore_for_key(_key), 1, _timeout).then([this] (auto&& u) { _units = std::move(u); });
|
||||
}
|
||||
guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {};
|
||||
guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) {
|
||||
o._locked = false;
|
||||
}
|
||||
guard(guard&& o) = default;
|
||||
~guard() {
|
||||
if (_locked) {
|
||||
_map.get_semaphore_for_key(_key).signal(1);
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -175,6 +175,10 @@ future<> service_level_controller::update_service_levels_from_distributed_data()
|
||||
// firstly delete all that there is to be deleted and only then adding new
|
||||
// service levels.
|
||||
while (current_it != _service_levels_db.end() && new_state_it != service_levels.end()) {
|
||||
if (current_it->first.starts_with('$')) {
|
||||
sl_logger.warn("Service level names starting with '$' are reserved for internal tenants. Rename service level \"{}\" to drop '$' prefix.", current_it->first.c_str());
|
||||
}
|
||||
|
||||
if (current_it->first == new_state_it->first) {
|
||||
//the service level exists on both the cureent and new state.
|
||||
if (current_it->second.slo != new_state_it->second) {
|
||||
@@ -519,7 +523,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
|
||||
val_binders_str += ", ?";
|
||||
}
|
||||
|
||||
auto guard = co_await group0_client.start_operation(&as);
|
||||
auto guard = co_await group0_client.start_operation(as);
|
||||
|
||||
std::vector<mutation> migration_muts;
|
||||
for (const auto& row: *rows) {
|
||||
@@ -554,7 +558,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
|
||||
.mutations{migration_muts.begin(), migration_muts.end()},
|
||||
};
|
||||
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as);
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
||||
|
||||
@@ -342,7 +342,8 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
|
||||
|
||||
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
|
||||
} catch (const abort_requested_exception&) {
|
||||
throw raft::request_aborted();
|
||||
throw raft::request_aborted(format(
|
||||
"Abort requested while transferring snapshot from ID/IP: {}/{}, snapshot descriptor id: {}, snapshot index: {}", from_id, from_ip, snp.id, snp.idx));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user