Compare commits
74 Commits
copilot/fi
...
scylla-6.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8a9fd4e49 | ||
|
|
363cf881d4 | ||
|
|
e018b38a54 | ||
|
|
d1a31460a0 | ||
|
|
9175cc528b | ||
|
|
18be4f454e | ||
|
|
f35a083abe | ||
|
|
57affc7fad | ||
|
|
927e526e2d | ||
|
|
b224665575 | ||
|
|
9afb1afefa | ||
|
|
72153cac96 | ||
|
|
f988980260 | ||
|
|
1d11adf766 | ||
|
|
dae1d18145 | ||
|
|
e9588a8a53 | ||
|
|
c73d0ffbaa | ||
|
|
c7b5571766 | ||
|
|
92325073a9 | ||
|
|
f5c0969c06 | ||
|
|
90ced080a8 | ||
|
|
7674d80c31 | ||
|
|
06ceef34a7 | ||
|
|
ec83367b45 | ||
|
|
dfe2e20442 | ||
|
|
ad2191e84f | ||
|
|
855abd7368 | ||
|
|
086dc6d53c | ||
|
|
09b0b3f7d6 | ||
|
|
3bbb7a24b1 | ||
|
|
b43454c658 | ||
|
|
93700ff5d1 | ||
|
|
5e2b4a0e80 | ||
|
|
bb5dc0771c | ||
|
|
9ed8519362 | ||
|
|
077d7c06a0 | ||
|
|
5a1575678b | ||
|
|
2401f7f9ca | ||
|
|
906d085289 | ||
|
|
34dd3a6daa | ||
|
|
3afa8ee2ca | ||
|
|
3347152ff9 | ||
|
|
ff7bd937e2 | ||
|
|
50ea1dbe32 | ||
|
|
45125c4d7d | ||
|
|
9207f7823d | ||
|
|
711864687f | ||
|
|
faf11e5bc3 | ||
|
|
f9215b4d7e | ||
|
|
469ac9976a | ||
|
|
d341f1ef1e | ||
|
|
07dfcd1f64 | ||
|
|
f8d63b5572 | ||
|
|
ca83da91d1 | ||
|
|
f55081fb1a | ||
|
|
aa8cdec5bd | ||
|
|
75a2484dba | ||
|
|
37387135b4 | ||
|
|
ac24ab5141 | ||
|
|
729dc03e0c | ||
|
|
9d64ced982 | ||
|
|
ea6349a6f5 | ||
|
|
ed9122a84e | ||
|
|
c7d6b4a194 | ||
|
|
a35e138b22 | ||
|
|
3db67faa8a | ||
|
|
6a12174e2d | ||
|
|
ca0096ccb8 | ||
|
|
a71d4bc49c | ||
|
|
749399e4b8 | ||
|
|
bdd97b2950 | ||
|
|
1a056f0cab | ||
|
|
cf78a2caca | ||
|
|
cbc53f0e81 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=6.2.0-dev
|
||||
VERSION=6.2.0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2195,7 +2195,6 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
mutation_builders.reserve(request_items.MemberCount());
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
batch_size++;
|
||||
schema_ptr schema = get_table_from_batch_request(_proxy, it);
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
|
||||
@@ -2216,6 +2215,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else if (r_name == "DeleteRequest") {
|
||||
const rjson::value& key = (r->value)["Key"];
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
@@ -2226,6 +2226,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else {
|
||||
co_return api_error::validation(fmt::format("Unknown BatchWriteItem request type: {}", r_name));
|
||||
}
|
||||
@@ -3483,7 +3484,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
};
|
||||
std::vector<table_requests> requests;
|
||||
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs(get_table_from_batch_request(_proxy, it));
|
||||
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
|
||||
@@ -3497,6 +3498,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rs.add(key);
|
||||
check_key(key, rs.schema);
|
||||
}
|
||||
batch_size += rs.requests.size();
|
||||
requests.emplace_back(std::move(rs));
|
||||
}
|
||||
|
||||
@@ -3504,7 +3506,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
co_await verify_permission(client_state, tr.schema, auth::permission::SELECT);
|
||||
}
|
||||
|
||||
_stats.api_operations.batch_get_item_batch_total += requests.size();
|
||||
_stats.api_operations.batch_get_item_batch_total += batch_size;
|
||||
// If we got here, all "requests" are valid, so let's start the
|
||||
// requests for the different partitions all in parallel.
|
||||
std::vector<future<std::vector<rjson::value>>> response_futures;
|
||||
|
||||
@@ -29,8 +29,6 @@ stats::stats() : api_operations{} {
|
||||
seastar::metrics::description("Latency summary of an operation via Alternator API"), [this]{return to_metrics_summary(api_operations.name.summary());})(op(CamelCaseName)).set_skip_when_empty(),
|
||||
OPERATION(batch_get_item, "BatchGetItem")
|
||||
OPERATION(batch_write_item, "BatchWriteItem")
|
||||
OPERATION(batch_get_item_batch_total, "BatchGetItemSize")
|
||||
OPERATION(batch_write_item_batch_total, "BatchWriteItemSize")
|
||||
OPERATION(create_backup, "CreateBackup")
|
||||
OPERATION(create_global_table, "CreateGlobalTable")
|
||||
OPERATION(create_table, "CreateTable")
|
||||
@@ -98,6 +96,10 @@ stats::stats() : api_operations{} {
|
||||
seastar::metrics::description("number of rows read and matched during filtering operations")),
|
||||
seastar::metrics::make_total_operations("filtered_rows_dropped_total", [this] { return cql_stats.filtered_rows_read_total - cql_stats.filtered_rows_matched_total; },
|
||||
seastar::metrics::description("number of rows read and dropped during filtering operations")),
|
||||
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchWriteItem")},
|
||||
api_operations.batch_write_item_batch_total).set_skip_when_empty(),
|
||||
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchGetItem")},
|
||||
api_operations.batch_get_item_batch_total).set_skip_when_empty(),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -898,7 +898,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
auto host_id = validate_host_id(req->get_query_param("host_id"));
|
||||
std::vector<sstring> ignore_nodes_strs = utils::split_comma_separated_list(req->get_query_param("ignore_nodes"));
|
||||
apilog.info("remove_node: host_id={} ignore_nodes={}", host_id, ignore_nodes_strs);
|
||||
auto ignore_nodes = std::list<locator::host_id_or_endpoint>();
|
||||
locator::host_id_or_endpoint_list ignore_nodes;
|
||||
ignore_nodes.reserve(ignore_nodes_strs.size());
|
||||
for (const sstring& n : ignore_nodes_strs) {
|
||||
try {
|
||||
auto hoep = locator::host_id_or_endpoint(n);
|
||||
|
||||
@@ -76,7 +76,7 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
|
||||
continue;
|
||||
} catch (std::out_of_range&) {
|
||||
// just fallthrough
|
||||
} catch (std::regex_error&) {
|
||||
} catch (boost::regex_error&) {
|
||||
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,7 +296,8 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
// When trimming, let's keep sstables with overlapping time window, so as to reduce write amplification.
|
||||
// For example, if there are N sstables spanning window W, where N <= 32, then we can produce all data for W
|
||||
// in a single compaction round, removing the need to later compact W to reduce its number of files.
|
||||
boost::partial_sort(multi_window, multi_window.begin() + max_sstables, [](const shared_sstable &a, const shared_sstable &b) {
|
||||
auto sort_size = std::min(max_sstables, multi_window.size());
|
||||
boost::partial_sort(multi_window, multi_window.begin() + sort_size, [](const shared_sstable &a, const shared_sstable &b) {
|
||||
return a->get_stats_metadata().max_timestamp < b->get_stats_metadata().max_timestamp;
|
||||
});
|
||||
maybe_trim_job(multi_window, job_size, disjoint);
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <boost/range/algorithm.hpp>
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <stdexcept>
|
||||
#include "alter_keyspace_statement.hh"
|
||||
#include "prepared_statement.hh"
|
||||
@@ -43,18 +44,16 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo
|
||||
return state.has_keyspace_access(_name, auth::permission::ALTER);
|
||||
}
|
||||
|
||||
static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) {
|
||||
auto to_number = [] (const std::string_view rf) {
|
||||
int result;
|
||||
// We assume the passed string view represents a valid decimal number,
|
||||
// so we don't need the error code.
|
||||
(void) std::from_chars(rf.begin(), rf.end(), result);
|
||||
return result;
|
||||
};
|
||||
|
||||
// We want to ensure that each DC's RF is going to change by at most 1
|
||||
// because in that case the old and new quorums must overlap.
|
||||
return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1;
|
||||
static unsigned get_abs_rf_diff(const std::string& curr_rf, const std::string& new_rf) {
|
||||
try {
|
||||
return std::abs(std::stoi(curr_rf) - std::stoi(new_rf));
|
||||
} catch (std::invalid_argument const& ex) {
|
||||
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments, "
|
||||
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
|
||||
} catch (std::out_of_range const& ex) {
|
||||
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments to fit into `int` type, "
|
||||
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
|
||||
}
|
||||
}
|
||||
|
||||
void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const {
|
||||
@@ -84,11 +83,24 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
|
||||
auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());
|
||||
|
||||
if (ks.get_replication_strategy().uses_tablets()) {
|
||||
const std::map<sstring, sstring>& current_rfs = ks.metadata()->strategy_options();
|
||||
for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) {
|
||||
auto it = current_rfs.find(new_dc);
|
||||
if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) {
|
||||
throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time.");
|
||||
const std::map<sstring, sstring>& current_rf_per_dc = ks.metadata()->strategy_options();
|
||||
auto new_rf_per_dc = _attrs->get_replication_options();
|
||||
new_rf_per_dc.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
unsigned total_abs_rfs_diff = 0;
|
||||
for (const auto& [new_dc, new_rf] : new_rf_per_dc) {
|
||||
sstring old_rf = "0";
|
||||
if (auto new_dc_in_current_mapping = current_rf_per_dc.find(new_dc);
|
||||
new_dc_in_current_mapping != current_rf_per_dc.end()) {
|
||||
old_rf = new_dc_in_current_mapping->second;
|
||||
} else if (!qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters().contains(new_dc)) {
|
||||
// This means that the DC listed in ALTER doesn't exist. This error will be reported later,
|
||||
// during validation in abstract_replication_strategy::validate_replication_strategy.
|
||||
// We can't report this error now, because it'd change the order of errors reported:
|
||||
// first we need to report non-existing DCs, then if RFs aren't changed by too much.
|
||||
continue;
|
||||
}
|
||||
if (total_abs_rfs_diff += get_abs_rf_diff(old_rf, new_rf); total_abs_rfs_diff >= 2) {
|
||||
throw exceptions::invalid_request_exception("Only one DC's RF can be changed at a time and not by more than 1");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -118,6 +130,63 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
|
||||
return ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty();
|
||||
}
|
||||
|
||||
namespace {
|
||||
// These functions are used to flatten all the options in the keyspace definition into a single-level map<string, string>.
|
||||
// (Currently options are stored in a nested structure that looks more like a map<string, map<string, string>>).
|
||||
// Flattening is simply joining the keys of maps from both levels with a colon ':' character,
|
||||
// or in other words: prefixing the keys in the output map with the option type, e.g. 'replication', 'storage', etc.,
|
||||
// so that the output map contains entries like: "replication:dc1" -> "3".
|
||||
// This is done to avoid key conflicts and to be able to de-flatten the map back into the original structure.
|
||||
|
||||
void add_prefixed_key(const sstring& prefix, const std::map<sstring, sstring>& in, std::map<sstring, sstring>& out) {
|
||||
for (const auto& [in_key, in_value]: in) {
|
||||
out[prefix + ":" + in_key] = in_value;
|
||||
}
|
||||
};
|
||||
|
||||
std::map<sstring, sstring> get_current_options_flattened(const shared_ptr<cql3::statements::ks_prop_defs>& ks,
|
||||
bool include_tablet_options,
|
||||
const gms::feature_service& feat) {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
add_prefixed_key(ks->KW_REPLICATION, ks->get_replication_options(), all_options);
|
||||
add_prefixed_key(ks->KW_STORAGE, ks->get_storage_options().to_map(), all_options);
|
||||
// if no tablet options are specified in ATLER KS statement,
|
||||
// we want to preserve the old ones and hence cannot overwrite them with defaults
|
||||
if (include_tablet_options) {
|
||||
auto initial_tablets = ks->get_initial_tablets(std::nullopt);
|
||||
add_prefixed_key(ks->KW_TABLETS,
|
||||
{{"enabled", initial_tablets ? "true" : "false"},
|
||||
{"initial", std::to_string(initial_tablets.value_or(0))}},
|
||||
all_options);
|
||||
}
|
||||
add_prefixed_key(ks->KW_DURABLE_WRITES,
|
||||
{{sstring(ks->KW_DURABLE_WRITES), to_sstring(ks->get_boolean(ks->KW_DURABLE_WRITES, true))}},
|
||||
all_options);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> get_old_options_flattened(const data_dictionary::keyspace& ks, bool include_tablet_options) {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
using namespace cql3::statements;
|
||||
add_prefixed_key(ks_prop_defs::KW_REPLICATION, ks.get_replication_strategy().get_config_options(), all_options);
|
||||
add_prefixed_key(ks_prop_defs::KW_STORAGE, ks.metadata()->get_storage_options().to_map(), all_options);
|
||||
if (include_tablet_options) {
|
||||
add_prefixed_key(ks_prop_defs::KW_TABLETS,
|
||||
{{"enabled", ks.metadata()->initial_tablets() ? "true" : "false"},
|
||||
{"initial", std::to_string(ks.metadata()->initial_tablets().value_or(0))}},
|
||||
all_options);
|
||||
}
|
||||
add_prefixed_key(ks_prop_defs::KW_DURABLE_WRITES,
|
||||
{{sstring(ks_prop_defs::KW_DURABLE_WRITES), to_sstring(ks.metadata()->durable_writes())}},
|
||||
all_options);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
} // <anonymous> namespace
|
||||
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
|
||||
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
|
||||
using namespace cql_transport;
|
||||
@@ -130,11 +199,18 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat);
|
||||
std::vector<mutation> muts;
|
||||
std::vector<sstring> warnings;
|
||||
auto ks_options = _attrs->get_all_options_flattened(feat);
|
||||
bool include_tablet_options = _attrs->get_map(_attrs->KW_TABLETS).has_value();
|
||||
auto old_ks_options = get_old_options_flattened(ks, include_tablet_options);
|
||||
auto ks_options = get_current_options_flattened(_attrs, include_tablet_options, feat);
|
||||
ks_options.merge(old_ks_options);
|
||||
|
||||
auto ts = mc.write_timestamp();
|
||||
auto global_request_id = mc.new_group0_state_id();
|
||||
|
||||
// we only want to run the tablets path if there are actually any tablets changes, not only schema changes
|
||||
// TODO: the current `if (changes_tablets(qp))` is insufficient: someone may set the same RFs as before,
|
||||
// and we'll unnecessarily trigger the processing path for ALTER tablets KS,
|
||||
// when in reality nothing or only schema is being changed
|
||||
if (changes_tablets(qp)) {
|
||||
if (!qp.topology_global_queue_empty()) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
|
||||
|
||||
@@ -139,28 +139,22 @@ data_dictionary::storage_options ks_prop_defs::get_storage_options() const {
|
||||
return opts;
|
||||
}
|
||||
|
||||
ks_prop_defs::init_tablets_options ks_prop_defs::get_initial_tablets(const sstring& strategy_class, bool enabled_by_default) const {
|
||||
// FIXME -- this should be ignored somehow else
|
||||
init_tablets_options ret{ .enabled = false, .specified_count = std::nullopt };
|
||||
if (locator::abstract_replication_strategy::to_qualified_class_name(strategy_class) != "org.apache.cassandra.locator.NetworkTopologyStrategy") {
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<unsigned> ks_prop_defs::get_initial_tablets(std::optional<unsigned> default_value) const {
|
||||
auto tablets_options = get_map(KW_TABLETS);
|
||||
if (!tablets_options) {
|
||||
return enabled_by_default ? init_tablets_options{ .enabled = true } : ret;
|
||||
return default_value;
|
||||
}
|
||||
|
||||
unsigned initial_count = 0;
|
||||
auto it = tablets_options->find("enabled");
|
||||
if (it != tablets_options->end()) {
|
||||
auto enabled = it->second;
|
||||
tablets_options->erase(it);
|
||||
|
||||
if (enabled == "true") {
|
||||
ret = init_tablets_options{ .enabled = true, .specified_count = 0 }; // even if 'initial' is not set, it'll start with auto-detection
|
||||
// nothing
|
||||
} else if (enabled == "false") {
|
||||
SCYLLA_ASSERT(!ret.enabled);
|
||||
return ret;
|
||||
return std::nullopt;
|
||||
} else {
|
||||
throw exceptions::configuration_exception(sstring("Tablets enabled value must be true or false; found: ") + enabled);
|
||||
}
|
||||
@@ -169,7 +163,7 @@ ks_prop_defs::init_tablets_options ks_prop_defs::get_initial_tablets(const sstri
|
||||
it = tablets_options->find("initial");
|
||||
if (it != tablets_options->end()) {
|
||||
try {
|
||||
ret = init_tablets_options{ .enabled = true, .specified_count = std::stol(it->second)};
|
||||
initial_count = std::stol(it->second);
|
||||
} catch (...) {
|
||||
throw exceptions::configuration_exception(sstring("Initial tablets value should be numeric; found ") + it->second);
|
||||
}
|
||||
@@ -180,7 +174,7 @@ ks_prop_defs::init_tablets_options ks_prop_defs::get_initial_tablets(const sstri
|
||||
throw exceptions::configuration_exception(sstring("Unrecognized tablets option ") + tablets_options->begin()->first);
|
||||
}
|
||||
|
||||
return ret;
|
||||
return initial_count;
|
||||
}
|
||||
|
||||
std::optional<sstring> ks_prop_defs::get_replication_strategy_class() const {
|
||||
@@ -191,32 +185,13 @@ bool ks_prop_defs::get_durable_writes() const {
|
||||
return get_boolean(KW_DURABLE_WRITES, true);
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
auto ingest_flattened_options = [&all_options](const std::map<sstring, sstring>& options, const sstring& prefix) {
|
||||
for (auto& option: options) {
|
||||
all_options[prefix + ":" + option.first] = option.second;
|
||||
}
|
||||
};
|
||||
ingest_flattened_options(get_replication_options(), KW_REPLICATION);
|
||||
ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE);
|
||||
ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map<sstring, sstring>{}), KW_TABLETS);
|
||||
ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) {
|
||||
auto sc = get_replication_strategy_class().value();
|
||||
auto initial_tablets = get_initial_tablets(sc, feat.tablets);
|
||||
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0
|
||||
if (initial_tablets.enabled && !initial_tablets.specified_count) {
|
||||
initial_tablets.specified_count = 0;
|
||||
}
|
||||
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0 for N.T.S. only
|
||||
auto initial_tablets = get_initial_tablets(feat.tablets && locator::abstract_replication_strategy::to_qualified_class_name(sc) == "org.apache.cassandra.locator.NetworkTopologyStrategy" ? std::optional<unsigned>(0) : std::nullopt);
|
||||
auto options = prepare_options(sc, tm, get_replication_options());
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
|
||||
std::move(options), initial_tablets.specified_count, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
std::move(options), initial_tablets, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata& tm, const gms::feature_service& feat) {
|
||||
@@ -229,13 +204,9 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
|
||||
sc = old->strategy_name();
|
||||
options = old_options;
|
||||
}
|
||||
auto initial_tablets = get_initial_tablets(*sc, old->initial_tablets().has_value());
|
||||
// if tablets options have not been specified, inherit them if it's tablets-enabled KS
|
||||
if (initial_tablets.enabled && !initial_tablets.specified_count) {
|
||||
initial_tablets.specified_count = old->initial_tablets();
|
||||
}
|
||||
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(old->name(), *sc, options, initial_tablets.specified_count, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
auto initial_tablets = get_initial_tablets(old->initial_tablets());
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(old->name(), *sc, options, initial_tablets, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -49,21 +49,15 @@ public:
|
||||
private:
|
||||
std::optional<sstring> _strategy_class;
|
||||
public:
|
||||
struct init_tablets_options {
|
||||
bool enabled;
|
||||
std::optional<unsigned> specified_count;
|
||||
};
|
||||
|
||||
ks_prop_defs() = default;
|
||||
explicit ks_prop_defs(std::map<sstring, sstring> options);
|
||||
|
||||
void validate();
|
||||
std::map<sstring, sstring> get_replication_options() const;
|
||||
std::optional<sstring> get_replication_strategy_class() const;
|
||||
init_tablets_options get_initial_tablets(const sstring& strategy_class, bool enabled_by_default) const;
|
||||
std::optional<unsigned> get_initial_tablets(std::optional<unsigned> default_value) const;
|
||||
data_dictionary::storage_options get_storage_options() const;
|
||||
bool get_durable_writes() const;
|
||||
std::map<sstring, sstring> get_all_options_flattened(const gms::feature_service& feat) const;
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
|
||||
};
|
||||
|
||||
@@ -46,14 +46,14 @@ public:
|
||||
protected:
|
||||
std::optional<sstring> get_simple(const sstring& name) const;
|
||||
|
||||
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
|
||||
|
||||
void remove_from_map_if_exists(const sstring& name, const sstring& key) const;
|
||||
public:
|
||||
bool has_property(const sstring& name) const;
|
||||
|
||||
std::optional<value_type> get(const sstring& name) const;
|
||||
|
||||
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
|
||||
|
||||
sstring get_string(sstring key, sstring default_value) const;
|
||||
|
||||
// Return a property value, typed as a Boolean
|
||||
|
||||
@@ -1132,7 +1132,12 @@ public:
|
||||
write(out, uint64_t(0));
|
||||
}
|
||||
|
||||
buf.remove_suffix(buf.size_bytes() - size);
|
||||
auto to_remove = buf.size_bytes() - size;
|
||||
// #20862 - we decrement usage counter based on buf.size() below.
|
||||
// Since we are shrinking buffer here, we need to also decrement
|
||||
// counter already
|
||||
buf.remove_suffix(to_remove);
|
||||
_segment_manager->totals.buffer_list_bytes -= to_remove;
|
||||
|
||||
// Build sector checksums.
|
||||
auto id = net::hton(_desc.id);
|
||||
@@ -3826,6 +3831,10 @@ uint64_t db::commitlog::get_total_size() const {
|
||||
;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_buffer_size() const {
|
||||
return _segment_manager->totals.buffer_list_bytes;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_completed_tasks() const {
|
||||
return _segment_manager->totals.allocation_count;
|
||||
}
|
||||
|
||||
@@ -306,6 +306,7 @@ public:
|
||||
future<> delete_segments(std::vector<sstring>) const;
|
||||
|
||||
uint64_t get_total_size() const;
|
||||
uint64_t get_buffer_size() const;
|
||||
uint64_t get_completed_tasks() const;
|
||||
uint64_t get_flush_count() const;
|
||||
uint64_t get_pending_tasks() const;
|
||||
|
||||
11
db/config.cc
11
db/config.cc
@@ -1526,18 +1526,19 @@ future<> update_relabel_config_from_file(const std::string& name) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
std::vector<sstring> split_comma_separated_list(sstring comma_separated_list) {
|
||||
std::vector<sstring> split_comma_separated_list(const std::string_view comma_separated_list) {
|
||||
std::vector<sstring> strs, trimmed_strs;
|
||||
boost::split(strs, std::move(comma_separated_list), boost::is_any_of(","));
|
||||
for (sstring n : strs) {
|
||||
boost::split(strs, comma_separated_list, boost::is_any_of(","));
|
||||
trimmed_strs.reserve(strs.size());
|
||||
for (sstring& n : strs) {
|
||||
std::replace(n.begin(), n.end(), '\"', ' ');
|
||||
std::replace(n.begin(), n.end(), '\'', ' ');
|
||||
boost::trim_all(n);
|
||||
if (!n.empty()) {
|
||||
trimmed_strs.push_back(n);
|
||||
trimmed_strs.push_back(std::move(n));
|
||||
}
|
||||
}
|
||||
return trimmed_strs;
|
||||
}
|
||||
|
||||
}
|
||||
} // namespace utils
|
||||
|
||||
@@ -545,6 +545,6 @@ future<gms::inet_address> resolve(const config_file::named_value<sstring>&, gms:
|
||||
*/
|
||||
future<> update_relabel_config_from_file(const std::string& name);
|
||||
|
||||
std::vector<sstring> split_comma_separated_list(sstring comma_separated_list);
|
||||
std::vector<sstring> split_comma_separated_list(std::string_view comma_separated_list);
|
||||
|
||||
}
|
||||
} // namespace utils
|
||||
|
||||
@@ -35,8 +35,6 @@
|
||||
#include <span>
|
||||
#include <unordered_map>
|
||||
|
||||
class fragmented_temporary_buffer;
|
||||
|
||||
namespace utils {
|
||||
class directories;
|
||||
} // namespace utils
|
||||
|
||||
4
docs/_templates/db_config.tmpl
vendored
4
docs/_templates/db_config.tmpl
vendored
@@ -2,7 +2,7 @@
|
||||
|
||||
{% for group in data %}
|
||||
{% if group.value_status_count[value_status] > 0 %}
|
||||
.. _confgroup_{{ group.name }}:
|
||||
.. _confgroup_{{ group.name|lower|replace(" ", "_") }}:
|
||||
|
||||
{{ group.name }}
|
||||
{{ '-' * (group.name|length) }}
|
||||
@@ -13,7 +13,7 @@
|
||||
|
||||
{% for item in group.properties %}
|
||||
{% if item.value_status == value_status %}
|
||||
.. _confprop_{{ item.name }}:
|
||||
.. _confprop_{{ item.name|lower|replace(" ", "_") }}:
|
||||
|
||||
.. confval:: {{ item.name }}
|
||||
{% endif %}
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
### a dictionary of redirections
|
||||
#old path: new path
|
||||
|
||||
|
||||
|
||||
# Move up the Features section
|
||||
# THESE REDIRECTIOSN SHOULD BE UNCOMMENTED WHEN 6.2 IS RELEASED
|
||||
# Before 6.2 documentation is available, these redirections result in 404
|
||||
|
||||
#/stable/troubleshooting/nodetool-memory-read-timeout.html: /stable/troubleshooting/index.html
|
||||
|
||||
# Move up the Features section
|
||||
|
||||
#/stable/using-scylla/features.html: /stable/features/index.html
|
||||
#/stable/using-scylla/lwt.html: /stable/features/lwt.html
|
||||
#/stable/using-scylla/secondary-indexes.html: /stable/features/secondary-indexes.html
|
||||
|
||||
@@ -50,6 +50,13 @@ Which yields, for `/proc/sys/fs/aio-max-nr`:
|
||||
$ docker run --name some-scylla --hostname some-scylla -d scylladb/scylla
|
||||
```
|
||||
|
||||
If you're on macOS and plan to start a multi-node cluster (3 nodes or more), start ScyllaDB with
|
||||
`–reactor-backend=epoll` to override the default `linux-aio` reactor backend:
|
||||
|
||||
```console
|
||||
$ docker run --name some-scylla --hostname some-scylla -d scylladb/scylla --reactor-backend=epoll
|
||||
```
|
||||
|
||||
### Run `nodetool` utility
|
||||
|
||||
```console
|
||||
@@ -77,6 +84,11 @@ cqlsh>
|
||||
```console
|
||||
$ docker run --name some-scylla2 --hostname some-scylla2 -d scylladb/scylla --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
```
|
||||
If you're on macOS, ensure to add the `–reactor-backend=epoll` option when adding new nodes:
|
||||
|
||||
```console
|
||||
$ docker run --name some-scylla2 --hostname some-scylla2 -d scylladb/scylla --reactor-backend=epoll --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
```
|
||||
|
||||
#### Make a cluster with Docker Compose
|
||||
|
||||
@@ -344,90 +356,6 @@ The `--authenticator` command lines option allows to provide the authenticator c
|
||||
|
||||
The `--authorizer` command lines option allows to provide the authorizer class ScyllaDB will use. By default ScyllaDB uses the `AllowAllAuthorizer` which allows any action to any user. The second option is using the `CassandraAuthorizer` parameter, which stores permissions in `system.permissions` table.
|
||||
|
||||
**Since: 2.3**
|
||||
|
||||
### JMX parameters
|
||||
|
||||
JMX ScyllaDB service is initialized from the `/scylla-jmx-service.sh` on
|
||||
container startup. By default the script uses `/etc/sysconfig/scylla-jmx`
|
||||
to read the default configuration. It then can be overridden by setting
|
||||
environmental parameters.
|
||||
|
||||
An example:
|
||||
|
||||
docker run -d -e "SCYLLA_JMX_ADDR=-ja 0.0.0.0" -e SCYLLA_JMX_REMOTE=-r --publish 7199:7199 scylladb/scylla
|
||||
|
||||
#### SCYLLA_JMX_PORT
|
||||
|
||||
Scylla JMX listening port.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_JMX_PORT="-jp 7199"
|
||||
|
||||
#### SCYLLA_API_PORT
|
||||
|
||||
Scylla API port for JMX to connect to.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_API_PORT="-p 10000"
|
||||
|
||||
#### SCYLLA_API_ADDR
|
||||
|
||||
Scylla API address for JMX to connect to.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_API_ADDR="-a localhost"
|
||||
|
||||
#### SCYLLA_JMX_ADDR
|
||||
|
||||
JMX address to bind on.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_JMX_ADDR="-ja localhost"
|
||||
|
||||
For example, it is possible to make JMX available to the outer world
|
||||
by changing its bind address to `0.0.0.0`:
|
||||
|
||||
docker run -d -e "SCYLLA_JMX_ADDR=-ja 0.0.0.0" -e SCYLLA_JMX_REMOTE=-r --publish 7199:7199 scylladb/scylla
|
||||
|
||||
`cassandra-stress` requires direct access to the JMX.
|
||||
|
||||
#### SCYLLA_JMX_FILE
|
||||
|
||||
A JMX service configuration file path.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_FILE="-cf /etc/scylla.d/scylla-user.cfg"
|
||||
|
||||
#### SCYLLA_JMX_LOCAL
|
||||
|
||||
The location of the JMX executable.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_LOCAL="-l /opt/scylladb/jmx
|
||||
|
||||
#### SCYLLA_JMX_REMOTE
|
||||
|
||||
Allow JMX to run remotely.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_REMOTE="-r"
|
||||
|
||||
#### SCYLLA_JMX_DEBUG
|
||||
|
||||
Enable debugger.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_DEBUG="-d"
|
||||
|
||||
### Related Links
|
||||
|
||||
* [Best practices for running ScyllaDB on docker](http://docs.scylladb.com/procedures/best_practices_scylla_on_docker/)
|
||||
|
||||
@@ -194,7 +194,7 @@ Alternatively, you can explicitly install **all** the ScyllaDB packages for the
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get install scylla-enterprise{,-server,-jmx,-tools,-tools-core,-kernel-conf,-node-exporter,-conf,-python3}=2021.1.0-0.20210511.9e8e7d58b-1
|
||||
sudo apt-get install scylla-enterprise{,-server,-tools,-tools-core,-kernel-conf,-node-exporter,-conf,-python3}=2021.1.0-0.20210511.9e8e7d58b-1
|
||||
sudo apt-get install scylla-enterprise-machine-image=2021.1.0-0.20210511.9e8e7d58b-1 # only execute on AMI instance
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
Features
|
||||
========================
|
||||
|
||||
This document highlights ScyllaDB's key data modeling features.
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
:hidden:
|
||||
|
||||
Lightweight Transactions </features/lwt/>
|
||||
Global Secondary Indexes </features/secondary-indexes/>
|
||||
@@ -12,6 +15,23 @@ Features
|
||||
Change Data Capture </features/cdc/index>
|
||||
Workload Attributes </features/workload-attributes>
|
||||
|
||||
`ScyllaDB Enterprise <https://enterprise.docs.scylladb.com/stable/overview.html#enterprise-only-features>`_
|
||||
provides additional features, including Encryption at Rest,
|
||||
workload prioritization, auditing, and more.
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Features
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* Secondary Indexes and Materialized Views provide efficient search mechanisms
|
||||
on non-partition keys by creating an index.
|
||||
|
||||
* :doc:`Global Secondary Indexes </features/secondary-indexes/>`
|
||||
* :doc:`Local Secondary Indexes </features/local-secondary-indexes/>`
|
||||
* :doc:`Materialized Views </features/materialized-views/>`
|
||||
|
||||
* :doc:`Lightweight Transactions </features/lwt/>` provide conditional updates
|
||||
through linearizability.
|
||||
* :doc:`Counters </features/counters/>` are columns that only allow their values
|
||||
to be incremented, decremented, read, or deleted.
|
||||
* :doc:`Change Data Capture </features/cdc/index>` allows you to query the current
|
||||
state and the history of all changes made to tables in the database.
|
||||
* :doc:`Workload Attributes </features/workload-attributes>` assigned to your workloads
|
||||
specify how ScyllaDB will handle requests depending on the workload.
|
||||
|
||||
@@ -6,9 +6,9 @@ You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 11 | 8 | 9 |
|
||||
+============================+======+======+======+=======+=======+=======+
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.2 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
|
||||
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
|
||||
@@ -78,7 +78,7 @@ Launching Instances from ScyllaDB AMI
|
||||
* The ``scylla.yaml`` file: ``/etc/scylla/scylla.yaml``
|
||||
* Data: ``/var/lib/scylla/``
|
||||
|
||||
To check that the ScyllaDB server and the JMX component are running, run:
|
||||
To check that the ScyllaDB server is running, run:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ Launching ScyllaDB on Azure
|
||||
|
||||
ssh -i ~/.ssh/ssh-key.pem scyllaadm@public-ip
|
||||
|
||||
To check that the ScyllaDB server and the JMX component are running, run:
|
||||
To check that the ScyllaDB server is running, run:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ Launching ScyllaDB on GCP
|
||||
|
||||
gcloud compute ssh scylla-node1
|
||||
|
||||
To check that the ScyllaDB server and the JMX component are running, run:
|
||||
To check that the ScyllaDB server is running, run:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
|
||||
@@ -1,8 +1,3 @@
|
||||
.. |SCYLLADB_VERSION| replace:: 5.2
|
||||
|
||||
.. update the version folder URL below (variables won't work):
|
||||
https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/
|
||||
|
||||
====================================================
|
||||
Install ScyllaDB Without root Privileges
|
||||
====================================================
|
||||
@@ -24,14 +19,17 @@ Note that if you're on CentOS 7, only root offline installation is supported.
|
||||
Download and Install
|
||||
-----------------------
|
||||
|
||||
#. Download the latest tar.gz file for ScyllaDB |SCYLLADB_VERSION| (x86 or ARM) from https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/.
|
||||
#. Download the latest tar.gz file for ScyllaDB version (x86 or ARM) from ``https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-<version>/``.
|
||||
|
||||
Example for version 6.1: https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-6.1/
|
||||
|
||||
#. Uncompress the downloaded package.
|
||||
|
||||
The following example shows the package for ScyllaDB 5.2.4 (x86):
|
||||
The following example shows the package for ScyllaDB 6.1.1 (x86):
|
||||
|
||||
.. code:: console
|
||||
|
||||
tar xvfz scylla-unified-5.2.4-0.20230623.cebbf6c5df2b.x86_64.tar.gz
|
||||
tar xvfz scylla-unified-6.1.1-0.20240814.8d90b817660a.x86_64.tar.gz
|
||||
|
||||
#. Install OpenJDK 8 or 11.
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ This will send ScyllaDB only logs to :code:`/var/log/scylla/scylla.log`
|
||||
|
||||
Logging on Docker
|
||||
-----------------
|
||||
Starting from ScyllaDB 1.3, `ScyllaDB Docker <https://hub.docker.com/r/scylladb/scylla/>`_, you should use :code:`docker logs` command to access ScyllaDB server and JMX proxy logs
|
||||
Starting from ScyllaDB 1.3, `ScyllaDB Docker <https://hub.docker.com/r/scylladb/scylla/>`_, you should use :code:`docker logs` command to access ScyllaDB server logs.
|
||||
|
||||
|
||||
.. include:: /rst_include/advance-index.rst
|
||||
|
||||
@@ -26,13 +26,6 @@ By default, ScyllaDB runs as user ``scylla`` in group ``scylla``. The following
|
||||
|
||||
4. Edit ``/etc/systemd/system/multi-user.target.wants/node-exporter.service``
|
||||
|
||||
.. code-block:: sh
|
||||
|
||||
User=test
|
||||
Group=test
|
||||
|
||||
5. Edit /usr/lib/systemd/system/scylla-jmx.service
|
||||
|
||||
.. code-block:: sh
|
||||
|
||||
User=test
|
||||
@@ -51,5 +44,4 @@ At this point, all services should be started as test:test user:
|
||||
.. code-block:: sh
|
||||
|
||||
test 8760 1 11 14:42 ? 00:00:01 /usr/bin/scylla --log-to-syslog 1 --log-to-std ...
|
||||
test 8765 1 12 14:42 ? 00:00:01 /opt/scylladb/jmx/symlinks/scylla-jmx -Xmx256m ...
|
||||
test 13638 1 0 14:30 ? 00:00:00 /usr/bin/node_exporter --collector.interrupts
|
||||
|
||||
@@ -11,7 +11,7 @@ For example:
|
||||
ScyllaDB uses available memory to cache your data. ScyllaDB knows how to dynamically manage memory for optimal performance, for example, if many clients connect to ScyllaDB, it will evict some data from the cache to make room for these connections, when the connection count drops again, this memory is returned to the cache.
|
||||
|
||||
To limit the memory usage you can start scylla with ``--memory`` parameter.
|
||||
Alternatively, you can specify the amount of memory ScyllaDB should leave to the OS with ``--reserve-memory`` parameter. Keep in mind that the amount of memory left to the operating system needs to suffice external scylla modules, such as ``scylla-jmx``, which runs on top of JVM.
|
||||
Alternatively, you can specify the amount of memory ScyllaDB should leave to the OS with ``--reserve-memory`` parameter. Keep in mind that the amount of memory left to the operating system needs to suffice external scylla modules.
|
||||
|
||||
On Ubuntu, edit the ``/etc/default/scylla-server``.
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@ Port Description Protocol
|
||||
------ -------------------------------------------- --------
|
||||
7001 SSL inter-node communication (RPC) TCP
|
||||
------ -------------------------------------------- --------
|
||||
7199 JMX management TCP
|
||||
------ -------------------------------------------- --------
|
||||
10000 ScyllaDB REST API TCP
|
||||
------ -------------------------------------------- --------
|
||||
9180 Prometheus API TCP
|
||||
|
||||
@@ -146,9 +146,7 @@ The ScyllaDB ports are detailed in the table below. For ScyllaDB Manager ports,
|
||||
|
||||
.. include:: /operating-scylla/_common/networking-ports.rst
|
||||
|
||||
All ports above need to be open to external clients (CQL), external admin systems (JMX), and other nodes (RPC). REST API port can be kept closed for incoming external connections.
|
||||
|
||||
The JMX service, :code:`scylla-jmx`, runs on port 7199. It is required in order to manage ScyllaDB using :code:`nodetool` and other Apache Cassandra-compatible utilities. The :code:`scylla-jmx` process must be able to connect to port 10000 on localhost. The JMX service listens for incoming JMX connections on all network interfaces on the system.
|
||||
All ports above need to be open to external clients (CQL) and other nodes (RPC). REST API port can be kept closed for incoming external connections.
|
||||
|
||||
Advanced networking
|
||||
-------------------
|
||||
@@ -223,10 +221,6 @@ Monitoring Stack
|
||||
|
||||
|mon_root|
|
||||
|
||||
JMX
|
||||
---
|
||||
ScyllaDB JMX is compatible with Apache Cassandra, exposing the relevant subset of MBeans.
|
||||
|
||||
.. REST
|
||||
|
||||
.. include:: /operating-scylla/rest.rst
|
||||
|
||||
@@ -31,7 +31,7 @@ Parameter Descriptio
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-kc <ktlist>, --kc.list <ktlist> The list of Keyspaces to take snapshot
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-p <port> / --port <port> Remote jmx agent port number
|
||||
-p <port> / --port <port> The port of the REST API of the ScyllaDB node.
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
|
||||
@@ -67,20 +67,8 @@ The ``nodetool`` utility provides a simple command-line interface to the followi
|
||||
|
||||
Nodetool generic options
|
||||
========================
|
||||
All options are supported:
|
||||
|
||||
|
||||
|
||||
* ``-p <port>`` or ``--port <port>`` - Remote JMX agent port number.
|
||||
|
||||
* ``-pp`` or ``--print-port`` - Operate in 4.0 mode with hosts disambiguated by port number.
|
||||
|
||||
* ``-pw <password>`` or ``--password <password>`` - Remote JMX agent password.
|
||||
|
||||
* ``-pwf <passwordFilePath>`` or ``--password-file <passwordFilePath>`` - Path to the JMX password file.
|
||||
|
||||
* ``-u <username>`` or ``--username <username>`` - Remote JMX agent username.
|
||||
|
||||
* ``-p <port>`` or ``--port <port>`` - The port of the REST API of the ScyllaDB node.
|
||||
* ``--`` - Separates command-line options from the list of argument(useful when an argument might be mistaken for a command-line option).
|
||||
|
||||
Supported Nodetool operations
|
||||
|
||||
@@ -41,7 +41,7 @@ With the recent addition of the `ScyllaDB Advisor <http://monitoring.docs.scylla
|
||||
Install ScyllaDB Manager
|
||||
------------------------
|
||||
|
||||
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>` together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
|
||||
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>`_ together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
|
||||
ScyllaDB Manager provides automated backups and repairs of your database.
|
||||
ScyllaDB Manager can manage multiple ScyllaDB clusters and run cluster-wide tasks in a controlled and predictable way.
|
||||
For example, with ScyllaDB Manager you can control the intensity of a repair, increasing it to speed up the process, or lower the intensity to ensure it minimizes impact on ongoing operations.
|
||||
|
||||
@@ -22,6 +22,13 @@ To start a single ScyllaDB node instance in a Docker container, run:
|
||||
|
||||
docker run --name some-scylla -d scylladb/scylla
|
||||
|
||||
If you're on macOS and plan to start a multi-node cluster (3 nodes or more), start ScyllaDB with
|
||||
``–reactor-backend=epoll`` to override the default ``linux-aio`` reactor backend:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
docker run --name some-scylla -d scylladb/scylla --reactor-backend=epoll
|
||||
|
||||
The ``docker run`` command starts a new Docker instance in the background named some-scylla that runs the ScyllaDB server:
|
||||
|
||||
.. code-block:: console
|
||||
@@ -95,6 +102,12 @@ With a single ``some-scylla`` instance running, joining new nodes to form a clu
|
||||
|
||||
docker run --name some-scylla2 -d scylladb/scylla --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
|
||||
If you're on macOS, ensure to add the ``–reactor-backend=epoll`` option when adding new nodes:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
docker run --name some-scylla2 -d scylladb/scylla --reactor-backend=epoll --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
|
||||
To query when the node is up and running (and view the status of the entire cluster) use the ``nodetool status`` command:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
@@ -6,8 +6,8 @@ ScyllaDB exposes a REST API to retrieve administrative information from a node a
|
||||
administrative operations. For example, it allows you to check or update configuration,
|
||||
retrieve cluster-level information, and more.
|
||||
|
||||
The :doc:`nodetool </operating-scylla/nodetool>` CLI tool interacts with a *scylla-jmx* process using JMX.
|
||||
The process, in turn, uses the REST API to interact with the ScyllaDB process.
|
||||
The :doc:`nodetool </operating-scylla/nodetool>` CLI tool uses the REST API
|
||||
to interact with the ScyllaDB process.
|
||||
|
||||
You can interact with the REST API directly using :code:`curl`, ScyllaDB's CLI for REST API, or the Swagger UI.
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ Encryption: Data in Transit Client to Node
|
||||
|
||||
Follow the procedures below to enable a client to node encryption.
|
||||
Once enabled, all communication between the client and the node is transmitted over TLS/SSL.
|
||||
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 certified.
|
||||
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 enabled.
|
||||
|
||||
Workflow
|
||||
^^^^^^^^
|
||||
|
||||
@@ -10,7 +10,6 @@ Cluster and Node
|
||||
Failed Decommission Problem </troubleshooting/failed-decommission/>
|
||||
Cluster Timeouts </troubleshooting/timeouts>
|
||||
Node Joined With No Data </troubleshooting/node-joined-without-any-data>
|
||||
SocketTimeoutException </troubleshooting/nodetool-memory-read-timeout/>
|
||||
NullPointerException </troubleshooting/nodetool-nullpointerexception/>
|
||||
Failed Schema Sync </troubleshooting/failed-schema-sync/>
|
||||
|
||||
@@ -28,7 +27,6 @@ Cluster and Node
|
||||
* :doc:`Failed Decommission Problem </troubleshooting/failed-decommission/>`
|
||||
* :doc:`Cluster Timeouts </troubleshooting/timeouts>`
|
||||
* :doc:`Node Joined With No Data </troubleshooting/node-joined-without-any-data>`
|
||||
* :doc:`Nodetool fails with SocketTimeoutException 'Read timed out' </troubleshooting/nodetool-memory-read-timeout>`
|
||||
* :doc:`Nodetool Throws NullPointerException </troubleshooting/nodetool-nullpointerexception>`
|
||||
* :doc:`Failed Schema Sync </troubleshooting/failed-schema-sync>`
|
||||
|
||||
|
||||
@@ -1,112 +0,0 @@
|
||||
Nodetool fails with SocketTimeoutException 'Read timed out'
|
||||
===========================================================
|
||||
|
||||
This troubleshooting article describes what to do when Nodetool fails with a 'Read timed out' error.
|
||||
|
||||
Problem
|
||||
^^^^^^^
|
||||
|
||||
When running any Nodetool command, users may see the following error:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
Failed to connect to '127.0.0.1:7199' - SocketTimeoutException: 'Read timed out'
|
||||
|
||||
Analysis
|
||||
^^^^^^^^
|
||||
Nodetool is a Java based application which requires memory. ScyllaDB by default consumes 93% of the node’s RAM (for MemTables + Cache) and leaves 7% for other applications, such as nodetool.
|
||||
|
||||
If cases where this is not enough memory (e.g. small instances with ~64GB RAM or lower), Nodetool may not be able to run due to insufficient memory. In this case an out of memory (OOM) error may appear and scylla-jmx will not run.
|
||||
|
||||
|
||||
Example
|
||||
-------
|
||||
|
||||
The error you will see is similar to:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000,
|
||||
671088640, 0) failed; error='Cannot allocate memory' (err no=12)
|
||||
|
||||
|
||||
In order to check if the issue is scylla-jmx, use the following command (systemd-based Linux distribution) to check the status of the service:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo systemctl status scylla-jmx
|
||||
|
||||
If the service is running you will see something similar to:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo service scylla-jmx status
|
||||
● scylla-jmx.service - ScyllaDB JMX
|
||||
Loaded: loaded (/lib/systemd/system/scylla-jmx.service; disabled; vendor preset: enabled)
|
||||
Active: active (running) since Wed 2018-07-18 20:59:08 UTC; 3s ago
|
||||
Main PID: 256050 (scylla-jmx)
|
||||
Tasks: 27
|
||||
Memory: 119.5M
|
||||
CPU: 1.959s
|
||||
CGroup: /system.slice/scylla-jmx.service
|
||||
└─256050 /usr/lib/scylla/jmx/symlinks/scylla-jmx -Xmx384m -XX:+UseSerialGC -Dcom.sun.management.jmxremote.auth
|
||||
|
||||
If it isn't, you will see an error similar to:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo systemctl status scylla-jmx
|
||||
● scylla-jmx.service - ScyllaDB JMX
|
||||
Loaded: loaded (/usr/lib/systemd/system/scylla-jmx.service; disabled; vendor preset: disabled)
|
||||
Active: failed (Result: exit-code) since Thu 2018-05-10 10:34:15 EDT; 3min 47s ago
|
||||
Process: 1417 ExecStart=/usr/lib/scylla/jmx/scylla-jmx $SCYLLA_JMX_PORT $SCYLLA_API_PORT $SCYLLA_API_ADDR $SCYLLA_JMX_ADDR
|
||||
$SCYLLA_JMX_FILE $SCYLLA_JMX_LOCAL $SCYLLA_JMX_REMOTE $SCYLLA_JMX_DEBUG (code=exited, status=127)
|
||||
Main PID: 1417 (code=exited, status=127)
|
||||
|
||||
or
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo service scylla-jmx status
|
||||
● scylla-jmx.service
|
||||
Loaded: not-found (Reason: No such file or directory)
|
||||
Active: failed (Result: exit-code) since Wed 2018-07-18 20:38:58 UTC; 12min ago
|
||||
Main PID: 141256 (code=exited, status=143)
|
||||
|
||||
You will need to restart the service or change the RAM allocation as per the Solution_ below.
|
||||
|
||||
Solution
|
||||
^^^^^^^^
|
||||
|
||||
There are two ways to fix this problem, one is faster but may not permanently fix the issue and the other solution is more robust.
|
||||
|
||||
**The immediate solution**
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
service scylla-jmx restart
|
||||
|
||||
.. note:: This is not a permanent fix as the problem might manifest again at a later time.
|
||||
|
||||
**The more robust solution**
|
||||
|
||||
1. Take the size of your node’s RAM, calculate 7% of that size, increase it by another 40%, and use this new size as your RAM requirement.
|
||||
|
||||
For example: on a GCP n1-highmem-8 instance (52GB RAM)
|
||||
|
||||
* 7% would be ~3.6GB.
|
||||
* Increasing it by ~40% means you need to increase your RAM ~5GB.
|
||||
2. Open one of the following files (as per your OS platform):
|
||||
|
||||
* Ubuntu: ``/etc/default/scylla-server``.
|
||||
* Red Hat/ CentOS: ``/etc/sysconfig/scylla-server``
|
||||
3. In the file you are editing, add to the ``SCYLLA_ARGS`` statement ``--reserve-memory 5G`` (the amount you calculated above). Save and exit.
|
||||
4. Restart ScyllaDB server
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo systemctl restart scylla-server
|
||||
|
||||
|
||||
.. note:: If the initial calculation and reserve memory is not enough and problem persists and/or reappears, repeat the procedure from step 2 and increase the RAM in 1GB increments.
|
||||
|
||||
@@ -21,8 +21,8 @@ The following metrics are new in ScyllaDB |NEW_VERSION|:
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* -
|
||||
-
|
||||
* - scylla_alternator_batch_item_count
|
||||
- The total number of items processed across all batches
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -143,6 +143,7 @@ public:
|
||||
// whereas without it, it will fail the insert - i.e. for things like raft etc _all_ nodes should
|
||||
// have it or none, otherwise we can get partial failures on writes.
|
||||
gms::feature fragmented_commitlog_entries { *this, "FRAGMENTED_COMMITLOG_ENTRIES"sv };
|
||||
gms::feature maintenance_tenant { *this, "MAINTENANCE_TENANT"sv };
|
||||
|
||||
// A feature just for use in tests. It must not be advertised unless
|
||||
// the "features_enable_test_feature" injection is enabled.
|
||||
|
||||
@@ -39,7 +39,11 @@ abstract_replication_strategy::abstract_replication_strategy(
|
||||
replication_strategy_params params,
|
||||
replication_strategy_type my_type)
|
||||
: _config_options(params.options)
|
||||
, _my_type(my_type) {}
|
||||
, _my_type(my_type) {
|
||||
if (params.initial_tablets.has_value()) {
|
||||
_uses_tablets = true;
|
||||
}
|
||||
}
|
||||
|
||||
abstract_replication_strategy::ptr_type abstract_replication_strategy::create_replication_strategy(const sstring& strategy_name, replication_strategy_params params) {
|
||||
try {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "locator/everywhere_replication_strategy.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
@@ -33,6 +34,12 @@ size_t everywhere_replication_strategy::get_replication_factor(const token_metad
|
||||
return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners();
|
||||
}
|
||||
|
||||
void everywhere_replication_strategy::validate_options(const gms::feature_service&) const {
|
||||
if (_uses_tablets) {
|
||||
throw exceptions::configuration_exception("EverywhereStrategy doesn't support tablet replication");
|
||||
}
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, replication_strategy_params>;
|
||||
static registry registrator("org.apache.cassandra.locator.EverywhereStrategy");
|
||||
static registry registrator_short_name("EverywhereStrategy");
|
||||
|
||||
@@ -20,7 +20,7 @@ public:
|
||||
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const override { /* noop */ }
|
||||
virtual void validate_options(const gms::feature_service&) const override;
|
||||
|
||||
std::optional<std::unordered_set<sstring>> recognized_options(const topology&) const override {
|
||||
// We explicitly allow all options
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <algorithm>
|
||||
#include "local_strategy.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
|
||||
namespace locator {
|
||||
@@ -23,6 +24,9 @@ future<host_id_set> local_strategy::calculate_natural_endpoints(const token& t,
|
||||
}
|
||||
|
||||
void local_strategy::validate_options(const gms::feature_service&) const {
|
||||
if (_uses_tablets) {
|
||||
throw exceptions::configuration_exception("LocalStrategy doesn't support tablet replication");
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::unordered_set<sstring>> local_strategy::recognized_options(const topology&) const {
|
||||
|
||||
@@ -70,6 +70,9 @@ void simple_strategy::validate_options(const gms::feature_service&) const {
|
||||
throw exceptions::configuration_exception("SimpleStrategy requires a replication_factor strategy option.");
|
||||
}
|
||||
parse_replication_factor(it->second);
|
||||
if (_uses_tablets) {
|
||||
throw exceptions::configuration_exception("SimpleStrategy doesn't support tablet replication");
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::unordered_set<sstring>>simple_strategy::recognized_options(const topology&) const {
|
||||
|
||||
@@ -851,9 +851,8 @@ void tablet_aware_replication_strategy::validate_tablet_options(const abstract_r
|
||||
void tablet_aware_replication_strategy::process_tablet_options(abstract_replication_strategy& ars,
|
||||
replication_strategy_config_options& opts,
|
||||
replication_strategy_params params) {
|
||||
if (params.initial_tablets.has_value()) {
|
||||
_initial_tablets = *params.initial_tablets;
|
||||
ars._uses_tablets = true;
|
||||
if (ars._uses_tablets) {
|
||||
_initial_tablets = params.initial_tablets.value_or(0);
|
||||
mark_as_per_table(ars);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,6 +77,12 @@ struct host_id_or_endpoint {
|
||||
gms::inet_address resolve_endpoint(const token_metadata&) const;
|
||||
};
|
||||
|
||||
using host_id_or_endpoint_list = std::vector<host_id_or_endpoint>;
|
||||
|
||||
[[nodiscard]] inline bool check_host_ids_contain_only_uuid(const auto& host_ids) {
|
||||
return std::ranges::none_of(host_ids, [](const auto& node_str) { return locator::host_id_or_endpoint{node_str}.has_endpoint(); });
|
||||
}
|
||||
|
||||
class token_metadata_impl;
|
||||
struct topology_change_info;
|
||||
|
||||
|
||||
11
main.cc
11
main.cc
@@ -1389,7 +1389,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
scfg.statement_tenants = {
|
||||
{dbcfg.statement_scheduling_group, "$user"},
|
||||
{default_scheduling_group(), "$system"},
|
||||
{dbcfg.streaming_scheduling_group, "$maintenance"}
|
||||
{dbcfg.streaming_scheduling_group, "$maintenance", false}
|
||||
};
|
||||
scfg.streaming = dbcfg.streaming_scheduling_group;
|
||||
scfg.gossip = dbcfg.gossip_scheduling_group;
|
||||
@@ -1404,7 +1404,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}
|
||||
|
||||
// Delay listening messaging_service until gossip message handlers are registered
|
||||
messaging.start(mscfg, scfg, creds).get();
|
||||
messaging.start(mscfg, scfg, creds, std::ref(feature_service)).get();
|
||||
auto stop_ms = defer_verbose_shutdown("messaging service", [&messaging] {
|
||||
messaging.invoke_on_all(&netw::messaging_service::stop).get();
|
||||
});
|
||||
@@ -1944,6 +1944,13 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
ss.local().uninit_address_map().get();
|
||||
});
|
||||
|
||||
// Need to make sure storage service stopped using group0 before running group0_service.abort()
|
||||
// Normally it is done in storage_service::do_drain(), but in case start up fail we need to do it
|
||||
// here as well
|
||||
auto stop_group0_usage_in_storage_service = defer_verbose_shutdown("group 0 usage in local storage", [&ss] {
|
||||
ss.local().wait_for_group0_stop().get();
|
||||
});
|
||||
|
||||
// Setup group0 early in case the node is bootstrapped already and the group exists.
|
||||
// Need to do it before allowing incoming messaging service connections since
|
||||
// storage proxy's and migration manager's verbs may access group0.
|
||||
|
||||
@@ -119,6 +119,7 @@
|
||||
#include "idl/mapreduce_request.dist.impl.hh"
|
||||
#include "idl/storage_service.dist.impl.hh"
|
||||
#include "idl/join_node.dist.impl.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
|
||||
namespace netw {
|
||||
|
||||
@@ -232,9 +233,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
|
||||
return _rpc->unregister_handler(verb);
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port)
|
||||
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service)
|
||||
: messaging_service(config{std::move(id), ip, ip, port},
|
||||
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
|
||||
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr, feature_service)
|
||||
{}
|
||||
|
||||
static
|
||||
@@ -419,13 +420,14 @@ void messaging_service::do_start_listen() {
|
||||
}
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials)
|
||||
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials, gms::feature_service& feature_service)
|
||||
: _cfg(std::move(cfg))
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
|
||||
, _clients(PER_SHARD_CONNECTION_COUNT + scfg.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT)
|
||||
, _scheduling_config(scfg)
|
||||
, _scheduling_info_for_connection_index(initial_scheduling_info())
|
||||
, _feature_service(feature_service)
|
||||
{
|
||||
_rpc->set_logger(&rpc_logger);
|
||||
|
||||
@@ -434,7 +436,8 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
|
||||
// which in turn relies on _connection_index_for_tenant to be initialized.
|
||||
_connection_index_for_tenant.reserve(_scheduling_config.statement_tenants.size());
|
||||
for (unsigned i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
|
||||
_connection_index_for_tenant.push_back({_scheduling_config.statement_tenants[i].sched_group, i});
|
||||
auto& tenant_cfg = _scheduling_config.statement_tenants[i];
|
||||
_connection_index_for_tenant.push_back({tenant_cfg.sched_group, i, tenant_cfg.enabled});
|
||||
}
|
||||
|
||||
register_handler(this, messaging_verb::CLIENT_ID, [this] (rpc::client_info& ci, gms::inet_address broadcast_address, uint32_t src_cpu_id, rpc::optional<uint64_t> max_result_size, rpc::optional<utils::UUID> host_id) {
|
||||
@@ -457,6 +460,7 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
|
||||
});
|
||||
|
||||
init_local_preferred_ip_cache(_cfg.preferred_ips);
|
||||
init_feature_listeners();
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
@@ -679,16 +683,22 @@ messaging_service::get_rpc_client_idx(messaging_verb verb) const {
|
||||
return idx;
|
||||
}
|
||||
|
||||
// A statement or statement-ack verb
|
||||
const auto curr_sched_group = current_scheduling_group();
|
||||
for (unsigned i = 0; i < _connection_index_for_tenant.size(); ++i) {
|
||||
if (_connection_index_for_tenant[i].sched_group == curr_sched_group) {
|
||||
// i == 0: the default tenant maps to the default client indexes belonging to the interval
|
||||
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
|
||||
idx += i * PER_TENANT_CONNECTION_COUNT;
|
||||
break;
|
||||
if (_connection_index_for_tenant[i].enabled) {
|
||||
// i == 0: the default tenant maps to the default client indexes belonging to the interval
|
||||
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
|
||||
idx += i * PER_TENANT_CONNECTION_COUNT;
|
||||
break;
|
||||
} else {
|
||||
// If the tenant is disable, immediately return current index to
|
||||
// use $system tenant.
|
||||
return idx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return idx;
|
||||
}
|
||||
|
||||
@@ -793,6 +803,22 @@ void messaging_service::cache_preferred_ip(gms::inet_address ep, gms::inet_addre
|
||||
remove_rpc_client(msg_addr(ep));
|
||||
}
|
||||
|
||||
void messaging_service::init_feature_listeners() {
|
||||
_maintenance_tenant_enabled_listener = _feature_service.maintenance_tenant.when_enabled([this] {
|
||||
enable_scheduling_tenant("$maintenance");
|
||||
});
|
||||
}
|
||||
|
||||
void messaging_service::enable_scheduling_tenant(std::string_view name) {
|
||||
for (size_t i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
|
||||
if (_scheduling_config.statement_tenants[i].name == name) {
|
||||
_scheduling_config.statement_tenants[i].enabled = true;
|
||||
_connection_index_for_tenant[i].enabled = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gms::inet_address messaging_service::get_public_endpoint_for(const gms::inet_address& ip) const {
|
||||
auto i = _preferred_to_endpoint.find(ip);
|
||||
return i != _preferred_to_endpoint.end() ? i->second : ip;
|
||||
|
||||
@@ -45,6 +45,7 @@ namespace gms {
|
||||
class gossip_digest_ack2;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
class feature_service;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
@@ -299,6 +300,7 @@ public:
|
||||
struct tenant {
|
||||
scheduling_group sched_group;
|
||||
sstring name;
|
||||
bool enabled = true;
|
||||
};
|
||||
// Must have at least one element. No two tenants should have the same
|
||||
// scheduling group. [0] is the default tenant, that all unknown
|
||||
@@ -319,6 +321,7 @@ private:
|
||||
struct tenant_connection_index {
|
||||
scheduling_group sched_group;
|
||||
unsigned cliend_idx;
|
||||
bool enabled;
|
||||
};
|
||||
private:
|
||||
config _cfg;
|
||||
@@ -337,6 +340,7 @@ private:
|
||||
scheduling_config _scheduling_config;
|
||||
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
|
||||
std::vector<tenant_connection_index> _connection_index_for_tenant;
|
||||
gms::feature_service& _feature_service;
|
||||
|
||||
struct connection_ref;
|
||||
std::unordered_multimap<locator::host_id, connection_ref> _host_connections;
|
||||
@@ -351,8 +355,8 @@ private:
|
||||
public:
|
||||
using clock_type = lowres_clock;
|
||||
|
||||
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port);
|
||||
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>);
|
||||
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service);
|
||||
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>, gms::feature_service& feature_service);
|
||||
~messaging_service();
|
||||
|
||||
future<> start();
|
||||
@@ -544,6 +548,12 @@ public:
|
||||
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
|
||||
unsigned get_rpc_client_idx(messaging_verb verb) const;
|
||||
static constexpr std::array<std::string_view, 3> _connection_types_prefix = {"statement:", "statement-ack:", "forward:"}; // "forward" is the old name for "mapreduce"
|
||||
|
||||
void init_feature_listeners();
|
||||
private:
|
||||
std::any _maintenance_tenant_enabled_listener;
|
||||
|
||||
void enable_scheduling_tenant(std::string_view name);
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
|
||||
@@ -186,6 +186,8 @@ std::set<gms::inet_address> task_manager_module::get_nodes() const noexcept {
|
||||
_ss._topology_state_machine._topology.transition_nodes
|
||||
) | boost::adaptors::transformed([&ss = _ss] (auto& node) {
|
||||
return ss.host2ip(locator::host_id{node.first.uuid()});
|
||||
}) | boost::adaptors::filtered([&ss = _ss] (auto& ip) {
|
||||
return ss._gossiper.is_alive(ip);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1367,7 +1367,7 @@ reader_concurrency_semaphore::can_admit_read(const reader_permit::impl& permit)
|
||||
}
|
||||
|
||||
if (!has_available_units(permit.base_resources())) {
|
||||
auto reason = _resources.memory >= permit.base_resources().memory ? reason::memory_resources : reason::count_resources;
|
||||
auto reason = _resources.memory >= permit.base_resources().memory ? reason::count_resources : reason::memory_resources;
|
||||
if (_inactive_reads.empty()) {
|
||||
return {can_admit::no, reason};
|
||||
} else {
|
||||
|
||||
@@ -349,11 +349,6 @@ repair_reader::repair_reader(
|
||||
future<mutation_fragment_opt>
|
||||
repair_reader::read_mutation_fragment() {
|
||||
++_reads_issued;
|
||||
// Use a very long timeout for the reader to break out any eventual
|
||||
// deadlock within the reader. Thirty minutes should be more than
|
||||
// enough to read a single mutation fragment.
|
||||
auto timeout = db::timeout_clock::now() + std::chrono::minutes(30);
|
||||
_reader.set_timeout(timeout); // reset to db::no_timeout in pause()
|
||||
return _reader().then_wrapped([this] (future<mutation_fragment_opt> f) {
|
||||
try {
|
||||
auto mfopt = f.get();
|
||||
@@ -397,7 +392,6 @@ void repair_reader::check_current_dk() {
|
||||
}
|
||||
|
||||
void repair_reader::pause() {
|
||||
_reader.set_timeout(db::no_timeout);
|
||||
if (_reader_handle) {
|
||||
_reader_handle->pause();
|
||||
}
|
||||
|
||||
@@ -246,6 +246,9 @@ public:
|
||||
bool no_compacted_sstable_undeleted() const;
|
||||
|
||||
future<> stop(sstring reason = "table removal") noexcept;
|
||||
|
||||
// Clear sstable sets
|
||||
void clear_sstables();
|
||||
};
|
||||
|
||||
using storage_group_ptr = lw_shared_ptr<storage_group>;
|
||||
@@ -305,6 +308,7 @@ public:
|
||||
const storage_group_map& storage_groups() const;
|
||||
|
||||
future<> stop_storage_groups() noexcept;
|
||||
void clear_storage_groups();
|
||||
void remove_storage_group(size_t id);
|
||||
storage_group& storage_group_for_id(const schema_ptr&, size_t i) const;
|
||||
storage_group* maybe_storage_group_for_id(const schema_ptr&, size_t i) const;
|
||||
|
||||
@@ -2273,6 +2273,13 @@ future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, std:
|
||||
return flush_table_on_all_shards(sharded_db, sharded_db.local().find_uuid(ks_name, table_name));
|
||||
}
|
||||
|
||||
static future<> force_new_commitlog_segments(std::unique_ptr<db::commitlog>& cl1, std::unique_ptr<db::commitlog>& cl2) {
|
||||
co_await cl1->force_new_active_segment();
|
||||
if (cl2) {
|
||||
co_await cl2->force_new_active_segment();
|
||||
}
|
||||
}
|
||||
|
||||
future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names) {
|
||||
/**
|
||||
* #14870
|
||||
@@ -2283,7 +2290,7 @@ future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std
|
||||
* as sstable-ish a universe as we can, as soon as we can.
|
||||
*/
|
||||
return sharded_db.invoke_on_all([] (replica::database& db) {
|
||||
return db._commitlog->force_new_active_segment();
|
||||
return force_new_commitlog_segments(db._commitlog, db._schema_commitlog);
|
||||
}).then([&, ks_name, table_names = std::move(table_names)] {
|
||||
return parallel_for_each(table_names, [&, ks_name] (const auto& table_name) {
|
||||
return flush_table_on_all_shards(sharded_db, ks_name, table_name);
|
||||
@@ -2294,7 +2301,7 @@ future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std
|
||||
future<> database::flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
|
||||
// see above
|
||||
return sharded_db.invoke_on_all([] (replica::database& db) {
|
||||
return db._commitlog->force_new_active_segment();
|
||||
return force_new_commitlog_segments(db._commitlog, db._schema_commitlog);
|
||||
}).then([&, ks_name] {
|
||||
auto& ks = sharded_db.local().find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) {
|
||||
|
||||
@@ -627,7 +627,9 @@ future<> storage_group_manager::for_each_storage_group_gently(std::function<futu
|
||||
|
||||
void storage_group_manager::for_each_storage_group(std::function<void(size_t, storage_group&)> f) const {
|
||||
for (auto& [id, sg]: _storage_groups) {
|
||||
f(id, *sg);
|
||||
if (auto holder = try_hold_gate(sg->async_gate())) {
|
||||
f(id, *sg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,6 +641,12 @@ future<> storage_group_manager::stop_storage_groups() noexcept {
|
||||
return parallel_for_each(_storage_groups | boost::adaptors::map_values, [] (auto sg) { return sg->stop("table removal"); });
|
||||
}
|
||||
|
||||
void storage_group_manager::clear_storage_groups() {
|
||||
for (auto& [id, sg]: _storage_groups) {
|
||||
sg->clear_sstables();
|
||||
}
|
||||
}
|
||||
|
||||
void storage_group_manager::remove_storage_group(size_t id) {
|
||||
if (auto it = _storage_groups.find(id); it != _storage_groups.end()) {
|
||||
_storage_groups.erase(it);
|
||||
@@ -1576,9 +1584,7 @@ table::stop() {
|
||||
co_await _sstable_deletion_gate.close();
|
||||
co_await std::move(gate_closed_fut);
|
||||
co_await get_row_cache().invalidate(row_cache::external_updater([this] {
|
||||
for_each_compaction_group([] (compaction_group& cg) {
|
||||
cg.clear_sstables();
|
||||
});
|
||||
_sg_manager->clear_storage_groups();
|
||||
_sstables = make_compound_sstable_set();
|
||||
}));
|
||||
_cache.refresh_snapshot();
|
||||
@@ -2288,6 +2294,12 @@ void compaction_group::clear_sstables() {
|
||||
_maintenance_sstables = _t.make_maintenance_sstable_set();
|
||||
}
|
||||
|
||||
void storage_group::clear_sstables() {
|
||||
for (auto cg : compaction_groups()) {
|
||||
cg->clear_sstables();
|
||||
}
|
||||
}
|
||||
|
||||
table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_options> sopts, compaction_manager& compaction_manager,
|
||||
sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker,
|
||||
locator::effective_replication_map_ptr erm)
|
||||
@@ -3754,6 +3766,7 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato
|
||||
co_await clear_inactive_reads_for_tablet(db, sg);
|
||||
// compaction_group::stop takes care of flushing.
|
||||
co_await stop_compaction_groups(sg);
|
||||
co_await utils::get_local_injector().inject("delay_tablet_compaction_groups_cleanup", std::chrono::seconds(5));
|
||||
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
|
||||
_sg_manager->remove_storage_group(tid.value());
|
||||
}
|
||||
|
||||
@@ -102,16 +102,27 @@ static const auto raft_manual_recovery_doc = "https://docs.scylladb.com/master/a
|
||||
|
||||
class group0_rpc: public service::raft_rpc {
|
||||
direct_failure_detector::failure_detector& _direct_fd;
|
||||
gms::gossiper& _gossiper;
|
||||
public:
|
||||
explicit group0_rpc(direct_failure_detector::failure_detector& direct_fd,
|
||||
raft_state_machine& sm, netw::messaging_service& ms,
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id)
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id, gms::gossiper& gossiper)
|
||||
: raft_rpc(sm, ms, address_map, std::move(raft_fd), gid, srv_id)
|
||||
, _direct_fd(direct_fd)
|
||||
, _direct_fd(direct_fd), _gossiper(gossiper)
|
||||
{}
|
||||
|
||||
virtual void on_configuration_change(raft::server_address_set add, raft::server_address_set del) override {
|
||||
for (const auto& addr: add) {
|
||||
auto ip_for_id = _address_map.find(addr.id);
|
||||
if (!ip_for_id) {
|
||||
// Make sure that the addresses of new nodes in the configuration are in the address map
|
||||
auto ips = _gossiper.get_nodes_with_host_id(locator::host_id(addr.id.uuid()));
|
||||
for (auto ip : ips) {
|
||||
if (_gossiper.is_normal(ip)) {
|
||||
_address_map.add_or_update_entry(addr.id, ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Entries explicitly managed via `rpc::on_configuration_change() should NOT be
|
||||
// expirable.
|
||||
_address_map.set_nonexpiring(addr.id);
|
||||
@@ -204,7 +215,7 @@ const raft::server_id& raft_group0::load_my_id() {
|
||||
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
|
||||
service::migration_manager& mm, bool topology_change_enabled) {
|
||||
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), _feat, topology_change_enabled);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id, _gossiper);
|
||||
// Keep a reference to a specific RPC class.
|
||||
auto& rpc_ref = *rpc;
|
||||
auto storage = std::make_unique<raft_sys_table_storage>(qp, gid, my_id);
|
||||
@@ -382,9 +393,11 @@ future<> raft_group0::abort() {
|
||||
co_await smp::invoke_on_all([this]() {
|
||||
return uninit_rpc_verbs(_ms.local());
|
||||
});
|
||||
co_await _shutdown_gate.close();
|
||||
|
||||
_leadership_monitor_as.request_abort();
|
||||
|
||||
co_await _shutdown_gate.close();
|
||||
|
||||
co_await std::move(_leadership_monitor);
|
||||
|
||||
co_await stop_group0();
|
||||
@@ -429,6 +442,7 @@ future<> raft_group0::leadership_monitor_fiber() {
|
||||
}
|
||||
});
|
||||
|
||||
auto holder = hold_group0_gate();
|
||||
while (true) {
|
||||
while (!group0_server().is_leader()) {
|
||||
co_await group0_server().wait_for_state_change(&_leadership_monitor_as);
|
||||
|
||||
@@ -291,6 +291,11 @@ public:
|
||||
return _raft_gr.group0_with_timeouts();
|
||||
}
|
||||
|
||||
// Hold shutdown gate to be waited during shutdown
|
||||
gate::holder hold_group0_gate() {
|
||||
return _shutdown_gate.hold();
|
||||
}
|
||||
|
||||
// Returns true after the group 0 server has been started.
|
||||
bool joined_group0() const;
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
|
||||
}
|
||||
return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...)
|
||||
.handle_exception_type([loc= std::move(loc), id] (const seastar::rpc::closed_error& e) {;
|
||||
const auto msg = fmt::format("Failed to execute {} on leader {}: {}", loc.function_name(), id, e);
|
||||
const auto msg = fmt::format("Failed to execute {}, destination {}: {}", loc.function_name(), id, e);
|
||||
rlogger.trace("{}", msg);
|
||||
return make_exception_future<Ret>(raft::transport_error(msg));
|
||||
});
|
||||
|
||||
@@ -1726,11 +1726,17 @@ public:
|
||||
// handler is being removed from the b::list, so if any live iterator points at it,
|
||||
// move it to the next object (this requires that the list is traversed in the forward
|
||||
// direction).
|
||||
bool drop_end = false;
|
||||
for (auto& itp : _live_iterators) {
|
||||
if (&**itp == handler) {
|
||||
++*itp;
|
||||
drop_end |= (*itp == end());
|
||||
}
|
||||
}
|
||||
if (drop_end) {
|
||||
const auto [first, last] = std::ranges::remove_if(_live_iterators, [this] (iterator* pit) { return *pit == end(); });
|
||||
_live_iterators.erase(first, last);
|
||||
}
|
||||
}
|
||||
class iterator_guard {
|
||||
cancellable_write_handlers_list& _handlers;
|
||||
@@ -6780,7 +6786,7 @@ void storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstra
|
||||
it->timeout_cb();
|
||||
}
|
||||
++it;
|
||||
if (need_preempt()) {
|
||||
if (need_preempt() && it != _cancellable_write_handlers_list->end()) {
|
||||
cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it};
|
||||
seastar::thread::yield();
|
||||
}
|
||||
|
||||
@@ -126,6 +126,26 @@ session_manager& get_topology_session_manager() {
|
||||
return topology_session_manager;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
[[nodiscard]] locator::host_id_or_endpoint_list string_list_to_endpoint_list(const std::vector<sstring>& src_node_strings) {
|
||||
locator::host_id_or_endpoint_list resulting_node_list;
|
||||
resulting_node_list.reserve(src_node_strings.size());
|
||||
for (const sstring& n : src_node_strings) {
|
||||
try {
|
||||
resulting_node_list.emplace_back(n);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(::format("Failed to parse node list: {}: invalid node={}: {}", src_node_strings, n, std::current_exception()));
|
||||
}
|
||||
}
|
||||
return resulting_node_list;
|
||||
}
|
||||
|
||||
[[nodiscard]] locator::host_id_or_endpoint_list parse_node_list(const std::string_view comma_separated_list) {
|
||||
return string_list_to_endpoint_list(utils::split_comma_separated_list(comma_separated_list));
|
||||
}
|
||||
} // namespace
|
||||
|
||||
static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30};
|
||||
|
||||
storage_service::storage_service(abort_source& abort_source,
|
||||
@@ -572,11 +592,11 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
on_fatal_internal_error(rtlogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id));
|
||||
}
|
||||
SCYLLA_ASSERT(existing_ip);
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
|
||||
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
|
||||
if (rs.ring.has_value()) {
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
|
||||
update_topology(host_id, ip, rs);
|
||||
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
|
||||
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
|
||||
} else {
|
||||
// After adding replacing endpoint above the node will no longer be reported for reads and writes,
|
||||
@@ -1032,7 +1052,7 @@ public:
|
||||
|
||||
// }}} raft_ip_address_updater
|
||||
|
||||
future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
|
||||
future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
|
||||
while (!_group0_as.abort_requested()) {
|
||||
bool err = false;
|
||||
try {
|
||||
@@ -1134,7 +1154,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
std::optional<abort_source> as;
|
||||
|
||||
try {
|
||||
@@ -1175,7 +1195,7 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<d
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(const std::list<locator::host_id_or_endpoint>& hoeps) {
|
||||
std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const {
|
||||
std::unordered_set<raft::server_id> ids;
|
||||
for (const auto& hoep : hoeps) {
|
||||
std::optional<raft::server_id> id;
|
||||
@@ -1196,16 +1216,8 @@ std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(
|
||||
}
|
||||
|
||||
std::unordered_set<raft::server_id> storage_service::ignored_nodes_from_join_params(const join_node_request_params& params) {
|
||||
std::unordered_set<raft::server_id> ignored_nodes;
|
||||
|
||||
if (!params.ignore_nodes.empty()) {
|
||||
std::list<locator::host_id_or_endpoint> ignore_nodes_params;
|
||||
for (const auto& n : params.ignore_nodes) {
|
||||
ignore_nodes_params.emplace_back(n);
|
||||
}
|
||||
|
||||
ignored_nodes = find_raft_nodes_from_hoeps(ignore_nodes_params);
|
||||
}
|
||||
const locator::host_id_or_endpoint_list ignore_nodes_params = string_list_to_endpoint_list(params.ignore_nodes);
|
||||
std::unordered_set<raft::server_id> ignored_nodes{find_raft_nodes_from_hoeps(ignore_nodes_params)};
|
||||
|
||||
if (params.replaced_id) {
|
||||
// insert node that should be replaced to ignore list so that other topology operations
|
||||
@@ -1805,6 +1817,11 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
|
||||
if (raft_replace_info) {
|
||||
join_params.replaced_id = raft_replace_info->raft_id;
|
||||
join_params.ignore_nodes = utils::split_comma_separated_list(_db.local().get_config().ignore_dead_nodes_for_replace());
|
||||
if (!locator::check_host_ids_contain_only_uuid(join_params.ignore_nodes)) {
|
||||
slogger.warn("Warning: Using IP addresses for '--ignore-dead-nodes-for-replace' is deprecated and will"
|
||||
" be disabled in a future release. Please use host IDs instead. Provided values: {}",
|
||||
_db.local().get_config().ignore_dead_nodes_for_replace());
|
||||
}
|
||||
}
|
||||
|
||||
// if the node is bootstrapped the function will do nothing since we already created group0 in main.cc
|
||||
@@ -1867,9 +1884,9 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
|
||||
co_await raft_initialize_discovery_leader(join_params);
|
||||
|
||||
// start topology coordinator fiber
|
||||
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks);
|
||||
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, _group0->hold_group0_gate(), sys_dist_ks);
|
||||
// start cleanup fiber
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, proxy);
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, _group0->hold_group0_gate(), proxy);
|
||||
|
||||
// Need to start system_distributed_keyspace before bootstrap because bootstrapping
|
||||
// process may access those tables.
|
||||
@@ -2150,7 +2167,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
||||
// Start the topology coordinator monitor fiber. If we are the leader, this will start
|
||||
// the topology coordinator which is responsible for driving the upgrade process.
|
||||
try {
|
||||
_raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), sys_dist_ks);
|
||||
_raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate(), sys_dist_ks);
|
||||
} catch (...) {
|
||||
// The calls above can theoretically fail due to coroutine frame allocation failure.
|
||||
// Abort in this case as the node should be in a pretty bad shape anyway.
|
||||
@@ -2176,7 +2193,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
||||
}
|
||||
|
||||
try {
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), proxy);
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy);
|
||||
start_tablet_split_monitor();
|
||||
} catch (...) {
|
||||
rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception());
|
||||
@@ -2184,19 +2201,6 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
||||
}
|
||||
}
|
||||
|
||||
std::list<locator::host_id_or_endpoint> storage_service::parse_node_list(sstring comma_separated_list) {
|
||||
std::vector<sstring> ignore_nodes_strs = utils::split_comma_separated_list(std::move(comma_separated_list));
|
||||
std::list<locator::host_id_or_endpoint> ignore_nodes;
|
||||
for (const sstring& n : ignore_nodes_strs) {
|
||||
try {
|
||||
ignore_nodes.push_back(locator::host_id_or_endpoint(n));
|
||||
} catch (...) {
|
||||
throw std::runtime_error(::format("Failed to parse node list: {}: invalid node={}: {}", ignore_nodes_strs, n, std::current_exception()));
|
||||
}
|
||||
}
|
||||
return ignore_nodes;
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info) {
|
||||
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &replacement_info] {
|
||||
@@ -3231,9 +3235,11 @@ future<> storage_service::stop() {
|
||||
}
|
||||
|
||||
future<> storage_service::wait_for_group0_stop() {
|
||||
_group0_as.request_abort();
|
||||
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
|
||||
co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_cleanup_fiber), std::move(_upgrade_to_topology_coordinator_fiber));
|
||||
if (!_group0_as.abort_requested()) {
|
||||
_group0_as.request_abort();
|
||||
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
|
||||
co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_cleanup_fiber), std::move(_upgrade_to_topology_coordinator_fiber));
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
|
||||
@@ -3373,12 +3379,14 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
|
||||
.address = replace_address,
|
||||
};
|
||||
|
||||
bool node_ip_specified = false;
|
||||
for (auto& hoep : parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace())) {
|
||||
locator::host_id host_id;
|
||||
gms::loaded_endpoint_state st;
|
||||
// Resolve both host_id and endpoint
|
||||
if (hoep.has_endpoint()) {
|
||||
st.endpoint = hoep.endpoint();
|
||||
node_ip_specified = true;
|
||||
} else {
|
||||
host_id = hoep.id();
|
||||
auto res = _gossiper.get_nodes_with_host_id(host_id);
|
||||
@@ -3404,6 +3412,12 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
|
||||
ri.ignore_nodes.emplace(host_id, std::move(st));
|
||||
}
|
||||
|
||||
if (node_ip_specified) {
|
||||
slogger.warn("Warning: Using IP addresses for '--ignore-dead-nodes-for-replace' is deprecated and will"
|
||||
" be disabled in the next release. Please use host IDs instead. Provided values: {}",
|
||||
_db.local().get_config().ignore_dead_nodes_for_replace());
|
||||
}
|
||||
|
||||
slogger.info("Host {}/{} is replacing {}/{} ignore_nodes={}", get_token_metadata().get_my_id(), get_broadcast_address(), replace_host_id, replace_address,
|
||||
fmt::join(ri.ignore_nodes | boost::adaptors::transformed ([] (const auto& x) {
|
||||
return fmt::format("{}/{}", x.first, x.second.endpoint);
|
||||
@@ -3647,6 +3661,7 @@ static size_t count_normal_token_owners(const topology& topology) {
|
||||
|
||||
future<> storage_service::raft_decommission() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
@@ -3993,7 +4008,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::raft_removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
|
||||
future<> storage_service::raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
|
||||
auto id = raft::server_id{host_id.uuid()};
|
||||
utils::UUID request_id;
|
||||
|
||||
@@ -4062,6 +4077,11 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
|
||||
|
||||
request_id = guard.new_group0_state_id();
|
||||
|
||||
if (auto itr = _topology_state_machine._topology.requests.find(id);
|
||||
itr != _topology_state_machine._topology.requests.end() && itr->second == topology_request::remove) {
|
||||
throw std::runtime_error("Removenode failed. Concurrent request for removal already in progress");
|
||||
}
|
||||
try {
|
||||
// Make non voter during request submission for better HA
|
||||
co_await _group0->make_nonvoters(ignored_ids, _group0_as, raft_timeout{});
|
||||
@@ -4094,7 +4114,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
|
||||
future<> storage_service::removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
|
||||
return run_with_api_lock_conditionally(sstring("removenode"), !raft_topology_change_enabled(), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
|
||||
ss.check_ability_to_perform_topology_operation("removenode");
|
||||
@@ -4643,6 +4663,7 @@ future<> storage_service::do_drain() {
|
||||
|
||||
future<> storage_service::do_cluster_cleanup() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
@@ -4713,6 +4734,7 @@ future<> storage_service::wait_for_topology_not_busy() {
|
||||
|
||||
future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
@@ -5467,12 +5489,15 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
|
||||
try {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto group0_holder = _group0->hold_group0_gate();
|
||||
// do barrier to make sure we always see the latest topology
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
if (raft_server.get_current_term() != term) {
|
||||
// Return an error since the command is from outdated leader
|
||||
co_return result;
|
||||
}
|
||||
auto id = raft_server.id();
|
||||
group0_holder.release();
|
||||
|
||||
{
|
||||
auto& state = _raft_topology_cmd_handler_state;
|
||||
@@ -5584,7 +5609,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
break;
|
||||
case raft_topology_cmd::command::stream_ranges: {
|
||||
co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> {
|
||||
const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second;
|
||||
const auto& rs = _topology_state_machine._topology.find(id)->second;
|
||||
auto tstate = _topology_state_machine._topology.tstate;
|
||||
if (!rs.ring || rs.ring->tokens.empty()) {
|
||||
rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state);
|
||||
@@ -5633,11 +5658,11 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
utils::get_local_injector().inject("stop_after_streaming",
|
||||
[] { std::raise(SIGSTOP); });
|
||||
} else {
|
||||
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id;
|
||||
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
|
||||
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
|
||||
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &raft_server, replaced_id] () -> future<> {
|
||||
if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) {
|
||||
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id()));
|
||||
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &id, replaced_id] () -> future<> {
|
||||
if (!_topology_state_machine._topology.req_param.contains(id)) {
|
||||
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id));
|
||||
}
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
|
||||
auto ignored_nodes = boost::copy_range<std::unordered_set<locator::host_id>>(_topology_state_machine._topology.ignored_nodes | boost::adaptors::transformed([] (const auto& id) {
|
||||
@@ -5721,7 +5746,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
break;
|
||||
case node_state::rebuilding: {
|
||||
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc;
|
||||
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[id]).source_dc;
|
||||
rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
|
||||
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
|
||||
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
|
||||
@@ -5768,7 +5793,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
case node_state::none:
|
||||
case node_state::removing:
|
||||
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
|
||||
raft_server.id(), rs.state));
|
||||
id, rs.state));
|
||||
break;
|
||||
}
|
||||
}));
|
||||
@@ -6484,6 +6509,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
});
|
||||
|
||||
auto& g0_server = _group0->group0_server();
|
||||
auto g0_holder = _group0->hold_group0_gate();
|
||||
if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) {
|
||||
// There is a peculiar case that can happen if the leader is killed
|
||||
// and then replaced very quickly:
|
||||
@@ -6673,6 +6699,10 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
|
||||
co_return join_node_response_result{};
|
||||
}
|
||||
|
||||
if (utils::get_local_injector().enter("join_node_response_drop_expiring")) {
|
||||
_group0->modifiable_address_map().force_drop_expiring_entries();
|
||||
}
|
||||
|
||||
try {
|
||||
co_return co_await std::visit(overloaded_functor {
|
||||
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {
|
||||
@@ -7320,8 +7350,8 @@ bool storage_service::is_repair_based_node_ops_enabled(streaming::stream_reason
|
||||
{"removenode", streaming::stream_reason::removenode},
|
||||
{"rebuild", streaming::stream_reason::rebuild},
|
||||
};
|
||||
auto enabled_list_str = _db.local().get_config().allowed_repair_based_node_ops();
|
||||
std::vector<sstring> enabled_list = utils::split_comma_separated_list(std::move(enabled_list_str));
|
||||
const sstring& enabled_list_str = _db.local().get_config().allowed_repair_based_node_ops();
|
||||
std::vector<sstring> enabled_list = utils::split_comma_separated_list(enabled_list_str);
|
||||
std::unordered_set<streaming::stream_reason> enabled_set;
|
||||
for (const sstring& op : enabled_list) {
|
||||
try {
|
||||
|
||||
@@ -373,8 +373,6 @@ private:
|
||||
|
||||
public:
|
||||
|
||||
static std::list<locator::host_id_or_endpoint> parse_node_list(sstring comma_separated_list);
|
||||
|
||||
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
|
||||
|
||||
@@ -699,7 +697,7 @@ public:
|
||||
*
|
||||
* @param hostIdString token for the node
|
||||
*/
|
||||
future<> removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes);
|
||||
future<> removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req);
|
||||
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
||||
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
@@ -835,7 +833,7 @@ private:
|
||||
future<> _raft_state_monitor = make_ready_future<>();
|
||||
// This fibers monitors raft state and start/stops the topology change
|
||||
// coordinator fiber
|
||||
future<> raft_state_monitor_fiber(raft::server&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
future<> raft_state_monitor_fiber(raft::server&, gate::holder, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
|
||||
public:
|
||||
bool topology_global_queue_empty() const {
|
||||
@@ -865,13 +863,13 @@ private:
|
||||
// as well as the system.peers table.
|
||||
shared_ptr<raft_ip_address_updater> _raft_ip_address_updater;
|
||||
|
||||
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const std::list<locator::host_id_or_endpoint>& hoeps);
|
||||
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const;
|
||||
|
||||
future<raft_topology_cmd_result> raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd);
|
||||
|
||||
future<> raft_initialize_discovery_leader(const join_node_request_params& params);
|
||||
future<> raft_decommission();
|
||||
future<> raft_removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params);
|
||||
future<> raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params);
|
||||
future<> raft_rebuild(utils::optional_param source_dc);
|
||||
future<> raft_check_and_repair_cdc_streams();
|
||||
future<> update_topology_with_local_metadata(raft::server&);
|
||||
@@ -976,7 +974,7 @@ private:
|
||||
semaphore _join_node_response_handler_mutex{1};
|
||||
|
||||
future<> _sstable_cleanup_fiber = make_ready_future<>();
|
||||
future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
|
||||
future<> sstable_cleanup_fiber(raft::server& raft, gate::holder, sharded<service::storage_proxy>& proxy) noexcept;
|
||||
|
||||
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
|
||||
abort_source _group0_as;
|
||||
|
||||
@@ -109,6 +109,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
std::chrono::milliseconds _ring_delay;
|
||||
|
||||
gate::holder _group0_holder;
|
||||
|
||||
using drop_guard_and_retake = bool_class<class retake_guard_tag>;
|
||||
|
||||
// Engaged if an ongoing topology change should be rolled back. The string inside
|
||||
@@ -783,12 +785,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
size_t unimportant_init_tablet_count = 2; // must be a power of 2
|
||||
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
|
||||
|
||||
for (const auto& table : ks.metadata()->tables()) {
|
||||
auto tables_with_mvs = ks.metadata()->tables();
|
||||
auto views = ks.metadata()->views();
|
||||
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
|
||||
for (const auto& table_or_mv : tables_with_mvs) {
|
||||
try {
|
||||
locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table->id());
|
||||
locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table_or_mv->id());
|
||||
locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params);
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table, tmptr, old_tablets);
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, old_tablets);
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {},"
|
||||
@@ -797,11 +802,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break; // ... and only create mutations deleting the global req
|
||||
}
|
||||
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table->id());
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id());
|
||||
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
|
||||
auto last_token = new_tablet_map.get_last_token(tablet_id);
|
||||
updates.emplace_back(co_await make_canonical_mutation_gently(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table->id())
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id())
|
||||
.set_new_replicas(last_token, tablet_info.replicas)
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::tablet_transition_kind::rebuild)
|
||||
@@ -826,7 +831,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
if (error.empty()) {
|
||||
const sstring strategy_name = "NetworkTopologyStrategy";
|
||||
auto ks_md = keyspace_metadata::new_keyspace(ks_name, strategy_name, repl_opts,
|
||||
new_ks_props.get_initial_tablets(strategy_name, true).specified_count,
|
||||
new_ks_props.get_initial_tablets(std::nullopt),
|
||||
new_ks_props.get_durable_writes(), new_ks_props.get_storage_options());
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
@@ -1577,7 +1582,30 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtlogger.info("entered `{}` transition state", *tstate);
|
||||
switch (*tstate) {
|
||||
case topology::transition_state::join_group0: {
|
||||
auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard)));
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
if (node.rs->state == node_state::replacing) {
|
||||
// Make sure all nodes are no longer trying to write to a node being replaced. This is important
|
||||
// if the new node have the same IP, so that old write will not go to the new node by mistake after this point.
|
||||
// It is important to do so before the call to finish_accepting_node() below since after this call the new node becomes
|
||||
// a full member of the cluster and it starts loading an initial snapshot. Since snapshot loading is not atomic any queries
|
||||
// that are done in parallel may see a partial state.
|
||||
try {
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
std::current_exception());
|
||||
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool accepted;
|
||||
std::tie(node, accepted) = co_await finish_accepting_node(std::move(node));
|
||||
|
||||
// If responding to the joining node failed, move the node to the left state and
|
||||
// stop the topology transition.
|
||||
@@ -1649,22 +1677,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
case node_state::replacing: {
|
||||
SCYLLA_ASSERT(!node.rs->ring);
|
||||
// Make sure all nodes are no longer trying to write to a node being replaced. This is important if the new node have the same IP, so that old write will not
|
||||
// go to the new node by mistake
|
||||
try {
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
std::current_exception());
|
||||
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
|
||||
break;
|
||||
}
|
||||
|
||||
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
|
||||
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
|
||||
SCYLLA_ASSERT(it != _topo_sm._topology.normal_nodes.end());
|
||||
@@ -2494,6 +2506,7 @@ public:
|
||||
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
|
||||
, _tablet_allocator(tablet_allocator)
|
||||
, _ring_delay(ring_delay)
|
||||
, _group0_holder(_group0.hold_group0_gate())
|
||||
{}
|
||||
|
||||
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
|
||||
|
||||
@@ -23,8 +23,8 @@
|
||||
|
||||
#include <variant>
|
||||
|
||||
template<typename T>
|
||||
static inline T consume_be(temporary_buffer<char>& p) {
|
||||
template<typename T, ContiguousSharedBuffer Buffer>
|
||||
static inline T consume_be(Buffer& p) {
|
||||
T i = read_be<T>(p.get());
|
||||
p.trim_front(sizeof(T));
|
||||
return i;
|
||||
@@ -60,7 +60,9 @@ enum class read_status { ready, waiting };
|
||||
// }
|
||||
// return pc._u32;
|
||||
//
|
||||
class primitive_consumer {
|
||||
template<ContiguousSharedBuffer Buffer>
|
||||
class primitive_consumer_impl {
|
||||
using FragmentedBuffer = basic_fragmented_buffer<Buffer>;
|
||||
private:
|
||||
// state machine progress:
|
||||
enum class prestate {
|
||||
@@ -103,20 +105,26 @@ private:
|
||||
|
||||
// state for READING_BYTES prestate
|
||||
size_t _read_bytes_len = 0;
|
||||
utils::small_vector<temporary_buffer<char>, 1> _read_bytes;
|
||||
temporary_buffer<char> _read_bytes_buf; // for contiguous reading.
|
||||
utils::small_vector<Buffer, 1> _read_bytes;
|
||||
temporary_buffer<char>* _read_bytes_where_contiguous; // which buffer to set, _key, _val, _cell_path or _pk?
|
||||
fragmented_temporary_buffer* _read_bytes_where;
|
||||
FragmentedBuffer* _read_bytes_where;
|
||||
|
||||
// Alloc-free
|
||||
inline read_status read_partial_int(temporary_buffer<char>& data, prestate next_state) noexcept {
|
||||
inline read_status read_partial_int(Buffer& data, prestate next_state) noexcept {
|
||||
std::copy(data.begin(), data.end(), _read_int.bytes);
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
_prestate = next_state;
|
||||
return read_status::waiting;
|
||||
}
|
||||
inline read_status read_partial_int(prestate next_state) noexcept {
|
||||
_pos = 0;
|
||||
_prestate = next_state;
|
||||
return read_status::waiting;
|
||||
}
|
||||
template <typename VintType, prestate ReadingVint, prestate ReadingVintWithLen>
|
||||
inline read_status read_vint(temporary_buffer<char>& data, typename VintType::value_type& dest) {
|
||||
inline read_status read_vint(Buffer& data, typename VintType::value_type& dest) {
|
||||
if (data.empty()) {
|
||||
_prestate = ReadingVint;
|
||||
return read_status::waiting;
|
||||
@@ -128,9 +136,8 @@ private:
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -140,23 +147,23 @@ private:
|
||||
}
|
||||
}
|
||||
template <typename VintType>
|
||||
inline read_status read_vint_with_len(temporary_buffer<char>& data, typename VintType::value_type& dest) {
|
||||
inline read_status read_vint_with_len(Buffer& data, typename VintType::value_type& dest) {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
dest = VintType::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
return read_status::waiting;
|
||||
};
|
||||
public:
|
||||
primitive_consumer(reader_permit permit) : _permit(std::move(permit)) {}
|
||||
primitive_consumer_impl(reader_permit permit) : _permit(std::move(permit)) {}
|
||||
|
||||
inline read_status read_8(temporary_buffer<char>& data) {
|
||||
inline read_status read_8(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint8_t)) {
|
||||
_u8 = consume_be<uint8_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -170,7 +177,7 @@ public:
|
||||
// (this is the common case), do this immediately. Otherwise, remember
|
||||
// what we have in the buffer, and remember to continue later by using
|
||||
// a "prestate":
|
||||
inline read_status read_16(temporary_buffer<char>& data) {
|
||||
inline read_status read_16(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint16_t)) {
|
||||
_u16 = consume_be<uint16_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -179,7 +186,7 @@ public:
|
||||
}
|
||||
}
|
||||
// Alloc-free
|
||||
inline read_status read_32(temporary_buffer<char>& data) noexcept {
|
||||
inline read_status read_32(Buffer& data) noexcept {
|
||||
if (data.size() >= sizeof(uint32_t)) {
|
||||
_u32 = consume_be<uint32_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -187,7 +194,10 @@ public:
|
||||
return read_partial_int(data, prestate::READING_U32);
|
||||
}
|
||||
}
|
||||
inline read_status read_64(temporary_buffer<char>& data) {
|
||||
inline read_status read_32() noexcept {
|
||||
return read_partial_int(prestate::READING_U32);
|
||||
}
|
||||
inline read_status read_64(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint64_t)) {
|
||||
_u64 = consume_be<uint64_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -195,16 +205,24 @@ public:
|
||||
return read_partial_int(data, prestate::READING_U64);
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes_contiguous(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
temporary_buffer<char> share(Buffer& data, uint32_t offset, uint32_t len) {
|
||||
if constexpr(std::is_same_v<Buffer, temporary_buffer<char>>) {
|
||||
return data.share(offset, len);
|
||||
} else {
|
||||
auto ret = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin() + offset, data.begin() + offset + len, ret.get_write());
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes_contiguous(Buffer& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
if (data.size() >= len) {
|
||||
where = data.share(0, len);
|
||||
where = share(data, 0, len);
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
// copy what we have so far, read the rest later
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
_pos = data.size();
|
||||
@@ -213,12 +231,12 @@ public:
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, fragmented_temporary_buffer& where) {
|
||||
inline read_status read_bytes(Buffer& data, uint32_t len, FragmentedBuffer& where) {
|
||||
if (data.size() >= len) {
|
||||
auto fragments = std::move(where).release();
|
||||
fragments.clear();
|
||||
fragments.push_back(data.share(0, len));
|
||||
where = fragmented_temporary_buffer(std::move(fragments), len);
|
||||
where = FragmentedBuffer(std::move(fragments), len);
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
@@ -233,7 +251,7 @@ public:
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
inline read_status read_short_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
inline read_status read_short_length_bytes(Buffer& data, temporary_buffer<char>& where) {
|
||||
if (data.size() >= sizeof(uint16_t)) {
|
||||
_u16 = consume_be<uint16_t>(data);
|
||||
} else {
|
||||
@@ -242,19 +260,19 @@ public:
|
||||
}
|
||||
return read_bytes_contiguous(data, uint32_t{_u16}, where);
|
||||
}
|
||||
inline read_status read_unsigned_vint(temporary_buffer<char>& data) {
|
||||
inline read_status read_unsigned_vint(Buffer& data) {
|
||||
return read_vint<
|
||||
unsigned_vint,
|
||||
prestate::READING_UNSIGNED_VINT,
|
||||
prestate::READING_UNSIGNED_VINT_WITH_LEN>(data, _u64);
|
||||
}
|
||||
inline read_status read_signed_vint(temporary_buffer<char>& data) {
|
||||
inline read_status read_signed_vint(Buffer& data) {
|
||||
return read_vint<
|
||||
signed_vint,
|
||||
prestate::READING_SIGNED_VINT,
|
||||
prestate::READING_SIGNED_VINT_WITH_LEN>(data, _i64);
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes_contiguous(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
inline read_status read_unsigned_vint_length_bytes_contiguous(Buffer& data, temporary_buffer<char>& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
@@ -267,9 +285,8 @@ public:
|
||||
data.trim_front(len);
|
||||
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -279,7 +296,7 @@ public:
|
||||
}
|
||||
}
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, fragmented_temporary_buffer& where) {
|
||||
inline read_status read_unsigned_vint_length_bytes(Buffer& data, FragmentedBuffer& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
|
||||
_read_bytes_where = &where;
|
||||
@@ -292,9 +309,8 @@ public:
|
||||
data.trim_front(len);
|
||||
return read_bytes(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -307,7 +323,7 @@ public:
|
||||
private:
|
||||
// Reads bytes belonging to an integer of size len. Returns true
|
||||
// if a full integer is now available.
|
||||
bool process_int(temporary_buffer<char>& data, unsigned len) {
|
||||
bool process_int(Buffer& data, unsigned len) {
|
||||
SCYLLA_ASSERT(_pos < len);
|
||||
auto n = std::min((size_t)(len - _pos), data.size());
|
||||
std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos);
|
||||
@@ -316,9 +332,18 @@ private:
|
||||
return _pos == len;
|
||||
}
|
||||
public:
|
||||
read_status consume_u32(Buffer& data) {
|
||||
if (process_int(data, sizeof(uint32_t))) {
|
||||
_u32 = net::ntoh(_read_int.uint32);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
return read_status::waiting;
|
||||
}
|
||||
|
||||
// Feeds data into the state machine.
|
||||
// After the call, when data is not empty then active() can be assumed to be false.
|
||||
read_status consume(temporary_buffer<char>& data) {
|
||||
read_status consume(Buffer& data) {
|
||||
if (__builtin_expect(_prestate == prestate::NONE, true)) {
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -360,12 +385,12 @@ public:
|
||||
return read_vint_with_len<signed_vint>(data, _i64);
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
if (read_bytes_contiguous(data, _u64, *_read_bytes_where_contiguous) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
@@ -375,12 +400,12 @@ public:
|
||||
}
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
@@ -390,11 +415,11 @@ public:
|
||||
}
|
||||
case prestate::READING_BYTES_CONTIGUOUS: {
|
||||
auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy(data.begin(), data.begin() + n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy(data.begin(), data.begin() + n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
*_read_bytes_where_contiguous = std::move(_read_bytes.front());
|
||||
*_read_bytes_where_contiguous = std::move(_read_bytes_buf);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -406,8 +431,8 @@ public:
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
std::vector<temporary_buffer<char>> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
|
||||
*_read_bytes_where = fragmented_temporary_buffer(std::move(fragments), _read_bytes_len);
|
||||
std::vector<Buffer> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
|
||||
*_read_bytes_where = FragmentedBuffer(std::move(fragments), _read_bytes_len);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -435,12 +460,7 @@ public:
|
||||
}
|
||||
break;
|
||||
case prestate::READING_U32:
|
||||
if (process_int(data, sizeof(uint32_t))) {
|
||||
_u32 = net::ntoh(_read_int.uint32);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
break;
|
||||
return consume_u32(data);
|
||||
case prestate::READING_U64:
|
||||
if (process_int(data, sizeof(uint64_t))) {
|
||||
_u64 = net::ntoh(_read_int.uint64);
|
||||
@@ -461,6 +481,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using primitive_consumer = primitive_consumer_impl<temporary_buffer<char>>;
|
||||
|
||||
template <typename StateProcessor>
|
||||
class continuous_data_consumer : protected primitive_consumer {
|
||||
using proceed = data_consumer::proceed;
|
||||
|
||||
@@ -1110,7 +1110,7 @@ public:
|
||||
_consumer.consume_row_end();
|
||||
return;
|
||||
}
|
||||
if (_state != state::ROW_START || primitive_consumer::active()) {
|
||||
if (_state != state::ROW_START || data_consumer::primitive_consumer::active()) {
|
||||
throw malformed_sstable_exception("end of input, but not end of row");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,6 +145,24 @@ private:
|
||||
//
|
||||
using block_set_type = std::set<promoted_index_block, block_comparator>;
|
||||
block_set_type _blocks;
|
||||
private:
|
||||
using Buffer = cached_file::page_view;
|
||||
|
||||
struct u32_parser {
|
||||
data_consumer::primitive_consumer_impl<Buffer>& parser;
|
||||
|
||||
void reset() {
|
||||
parser.read_32();
|
||||
}
|
||||
|
||||
data_consumer::read_status consume(Buffer& buf) {
|
||||
return parser.consume_u32(buf);
|
||||
}
|
||||
|
||||
uint32_t value() const {
|
||||
return parser._u32;
|
||||
}
|
||||
};
|
||||
public:
|
||||
const schema& _s;
|
||||
uint64_t _promoted_index_start;
|
||||
@@ -152,26 +170,50 @@ public:
|
||||
metrics& _metrics;
|
||||
const pi_index_type _blocks_count;
|
||||
cached_file& _cached_file;
|
||||
data_consumer::primitive_consumer _primitive_parser;
|
||||
clustering_parser _clustering_parser;
|
||||
promoted_index_block_parser _block_parser;
|
||||
data_consumer::primitive_consumer_impl<Buffer> _primitive_parser;
|
||||
u32_parser _u32_parser;
|
||||
clustering_parser<Buffer> _clustering_parser;
|
||||
promoted_index_block_parser<Buffer> _block_parser;
|
||||
reader_permit _permit;
|
||||
cached_file::stream _stream;
|
||||
logalloc::allocating_section _as;
|
||||
private:
|
||||
// Feeds the stream into the consumer until the consumer is satisfied.
|
||||
// Does not give unconsumed data back to the stream.
|
||||
template <typename Consumer>
|
||||
future<> consume_stream(cached_file::stream& s, Consumer& c) {
|
||||
return repeat([&] {
|
||||
return s.next_page_view().then([&] (cached_file::page_view&& page) {
|
||||
future<> read(cached_file::offset_type pos, tracing::trace_state_ptr trace_state, Consumer& c) {
|
||||
struct retry_exception : std::exception {};
|
||||
_stream = _cached_file.read(pos, _permit, trace_state);
|
||||
c.reset();
|
||||
return repeat([this, pos, trace_state, &c] {
|
||||
return _stream.next_page_view().then([this, &c] (cached_file::page_view&& page) {
|
||||
if (!page) {
|
||||
on_internal_error(sstlog, "End of stream while parsing");
|
||||
}
|
||||
bool retry = false;
|
||||
return _as(_cached_file.region(), [&] {
|
||||
auto buf = page.get_buf();
|
||||
return stop_iteration(c.consume(buf) == data_consumer::read_status::ready);
|
||||
if (retry) {
|
||||
throw retry_exception();
|
||||
}
|
||||
retry = true;
|
||||
|
||||
auto status = c.consume(page);
|
||||
|
||||
utils::get_local_injector().inject("cached_promoted_index_parsing_invalidate_buf_across_page", [&page] {
|
||||
page.release_and_scramble();
|
||||
});
|
||||
|
||||
utils::get_local_injector().inject("cached_promoted_index_bad_alloc_parsing_across_page", [this] {
|
||||
// Prevent reserve explosion in testing.
|
||||
_as.set_lsa_reserve(1);
|
||||
_as.set_std_reserve(1);
|
||||
throw std::bad_alloc();
|
||||
});
|
||||
|
||||
return stop_iteration(status == data_consumer::read_status::ready);
|
||||
});
|
||||
}).handle_exception_type([this, pos, trace_state, &c] (const retry_exception& e) {
|
||||
_stream = _cached_file.read(pos, _permit, trace_state);
|
||||
c.reset();
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -183,48 +225,17 @@ private:
|
||||
return _promoted_index_size - (_blocks_count - idx) * sizeof(pi_offset_type);
|
||||
}
|
||||
|
||||
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(_promoted_index_start + get_offset_entry_pos(idx), _permit, trace_state);
|
||||
return _stream.next_page_view().then([this] (cached_file::page_view page) {
|
||||
temporary_buffer<char> buf = page.get_buf();
|
||||
static_assert(noexcept(std::declval<data_consumer::primitive_consumer>().read_32(buf)));
|
||||
if (__builtin_expect(_primitive_parser.read_32(buf) == data_consumer::read_status::ready, true)) {
|
||||
return make_ready_future<pi_offset_type>(_primitive_parser._u32);
|
||||
}
|
||||
return consume_stream(_stream, _primitive_parser).then([this] {
|
||||
return _primitive_parser._u32;
|
||||
});
|
||||
});
|
||||
}
|
||||
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state);
|
||||
|
||||
// Postconditions:
|
||||
// - block.start is engaged and valid.
|
||||
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(_promoted_index_start + block.offset, _permit, trace_state);
|
||||
_clustering_parser.reset();
|
||||
return consume_stream(_stream, _clustering_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(_clustering_parser.get_and_reset());
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
});
|
||||
}
|
||||
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state);
|
||||
|
||||
// Postconditions:
|
||||
// - block.end is engaged, all fields in the block are valid
|
||||
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(_promoted_index_start + block.offset, _permit, trace_state);
|
||||
_block_parser.reset();
|
||||
return consume_stream(_stream, _block_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(std::move(_block_parser.start()));
|
||||
block.end.emplace(std::move(_block_parser.end()));
|
||||
block.end_open_marker = _block_parser.end_open_marker();
|
||||
block.data_file_offset = _block_parser.offset();
|
||||
block.width = _block_parser.width();
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
});
|
||||
}
|
||||
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state);
|
||||
|
||||
public:
|
||||
/// \brief Returns a pointer to promoted_index_block entry which has at least offset and index fields valid.
|
||||
future<promoted_index_block*> get_block_only_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
auto i = _blocks.lower_bound(idx);
|
||||
@@ -242,6 +253,7 @@ private:
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
void erase_range(block_set_type::iterator begin, block_set_type::iterator end) {
|
||||
while (begin != end) {
|
||||
--_metrics.block_count;
|
||||
@@ -267,6 +279,7 @@ public:
|
||||
, _blocks_count(blocks_count)
|
||||
, _cached_file(f)
|
||||
, _primitive_parser(permit)
|
||||
, _u32_parser(_primitive_parser)
|
||||
, _clustering_parser(s, permit, cvfl, true)
|
||||
, _block_parser(s, permit, std::move(cvfl))
|
||||
, _permit(std::move(permit))
|
||||
@@ -333,6 +346,10 @@ public:
|
||||
erase_range(_blocks.begin(), _blocks.lower_bound(block->index));
|
||||
}
|
||||
|
||||
void clear() {
|
||||
erase_range(_blocks.begin(), _blocks.end());
|
||||
}
|
||||
|
||||
cached_file& file() { return _cached_file; }
|
||||
};
|
||||
} // namespace sstables::mc
|
||||
@@ -350,6 +367,40 @@ struct fmt::formatter<sstables::mc::cached_promoted_index::promoted_index_block>
|
||||
};
|
||||
|
||||
namespace sstables::mc {
|
||||
|
||||
inline
|
||||
future<cached_promoted_index::pi_offset_type>
|
||||
cached_promoted_index::read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
return read(_promoted_index_start + get_offset_entry_pos(idx), trace_state, _u32_parser).then([idx, this] {
|
||||
sstlog.trace("cached_promoted_index {}: read_block_offset: idx: {}, offset: {}", fmt::ptr(this), idx, _u32_parser.value());
|
||||
return _u32_parser.value();
|
||||
});
|
||||
}
|
||||
|
||||
inline
|
||||
future<> cached_promoted_index::read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
return read(_promoted_index_start + block.offset, trace_state, _clustering_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(_clustering_parser.get_and_reset());
|
||||
sstlog.trace("cached_promoted_index {}: read_block_start: {}", fmt::ptr(this), block);
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
});
|
||||
}
|
||||
|
||||
inline
|
||||
future<> cached_promoted_index::read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
|
||||
return read(_promoted_index_start + block.offset, trace_state, _block_parser).then([this, &block] {
|
||||
auto mem_before = block.memory_usage();
|
||||
block.start.emplace(std::move(_block_parser.start()));
|
||||
block.end.emplace(std::move(_block_parser.end()));
|
||||
block.end_open_marker = _block_parser.end_open_marker();
|
||||
block.data_file_offset = _block_parser.offset();
|
||||
block.width = _block_parser.width();
|
||||
_metrics.used_bytes += block.memory_usage() - mem_before;
|
||||
sstlog.trace("cached_promoted_index {}: read_block: {}", fmt::ptr(this), block);
|
||||
});
|
||||
}
|
||||
|
||||
/// Cursor implementation which does binary search over index entries.
|
||||
///
|
||||
/// Memory consumption: O(log(N))
|
||||
@@ -460,6 +511,8 @@ public:
|
||||
, _trace_state(std::move(trace_state))
|
||||
{ }
|
||||
|
||||
cached_promoted_index& promoted_index() { return _promoted_index; }
|
||||
|
||||
future<std::optional<skip_info>> advance_to(position_in_partition_view pos) override {
|
||||
position_in_partition::less_compare less(_s);
|
||||
|
||||
|
||||
@@ -26,20 +26,22 @@ namespace mc {
|
||||
// while (cp.consume(next_buf()) == read_status::waiting) {}
|
||||
// position_in_partition pos = cp.get();
|
||||
//
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class clustering_parser {
|
||||
using FragmentedBuffer = basic_fragmented_buffer<Buffer>;
|
||||
const schema& _s;
|
||||
column_values_fixed_lengths _clustering_values_fixed_lengths;
|
||||
bool _parsing_start_key;
|
||||
boost::iterator_range<column_values_fixed_lengths::const_iterator> ck_range;
|
||||
|
||||
std::vector<fragmented_temporary_buffer> clustering_key_values;
|
||||
std::vector<FragmentedBuffer> clustering_key_values;
|
||||
bound_kind_m kind{};
|
||||
|
||||
fragmented_temporary_buffer column_value;
|
||||
FragmentedBuffer column_value;
|
||||
uint64_t ck_blocks_header = 0;
|
||||
uint32_t ck_blocks_header_offset = 0;
|
||||
std::optional<position_in_partition> _pos;
|
||||
data_consumer::primitive_consumer _primitive;
|
||||
data_consumer::primitive_consumer_impl<Buffer> _primitive;
|
||||
|
||||
enum class state {
|
||||
CLUSTERING_START,
|
||||
@@ -79,7 +81,7 @@ class clustering_parser {
|
||||
|
||||
position_in_partition make_position() {
|
||||
auto key = clustering_key_prefix::from_range(clustering_key_values | boost::adaptors::transformed(
|
||||
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
|
||||
[] (const FragmentedBuffer & b) { return typename FragmentedBuffer::view(b); }));
|
||||
|
||||
if (kind == bound_kind_m::clustering) {
|
||||
return position_in_partition::for_key(std::move(key));
|
||||
@@ -108,7 +110,7 @@ public:
|
||||
|
||||
// Feeds the data into the state machine.
|
||||
// Returns read_status::ready when !active() after the call.
|
||||
read_status consume(temporary_buffer<char>& data) {
|
||||
read_status consume(Buffer& data) {
|
||||
if (_primitive.consume(data) == read_status::waiting) {
|
||||
return read_status::waiting;
|
||||
}
|
||||
@@ -202,12 +204,15 @@ public:
|
||||
}
|
||||
|
||||
void reset() {
|
||||
_parsing_start_key = true;
|
||||
_state = state::CLUSTERING_START;
|
||||
_primitive.reset();
|
||||
}
|
||||
};
|
||||
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class promoted_index_block_parser {
|
||||
clustering_parser _clustering;
|
||||
clustering_parser<Buffer> _clustering;
|
||||
|
||||
std::optional<position_in_partition> _start_pos;
|
||||
std::optional<position_in_partition> _end_pos;
|
||||
@@ -228,7 +233,7 @@ class promoted_index_block_parser {
|
||||
DONE,
|
||||
} _state = state::START;
|
||||
|
||||
data_consumer::primitive_consumer _primitive;
|
||||
data_consumer::primitive_consumer_impl<Buffer> _primitive;
|
||||
public:
|
||||
using read_status = data_consumer::read_status;
|
||||
|
||||
@@ -246,7 +251,7 @@ public:
|
||||
// Feeds the data into the state machine.
|
||||
// Returns read_status::ready when whole block was parsed.
|
||||
// If returns read_status::waiting then data.empty() after the call.
|
||||
read_status consume(temporary_buffer<char>& data) {
|
||||
read_status consume(Buffer& data) {
|
||||
static constexpr size_t width_base = 65536;
|
||||
if (_primitive.consume(data) == read_status::waiting) {
|
||||
return read_status::waiting;
|
||||
@@ -318,7 +323,7 @@ public:
|
||||
|
||||
void reset() {
|
||||
_end_open_marker.reset();
|
||||
_clustering.set_parsing_start_key(true);
|
||||
_clustering.reset();
|
||||
_state = state::START;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -71,7 +71,7 @@ private:
|
||||
};
|
||||
|
||||
struct m_parser_context {
|
||||
mc::promoted_index_block_parser block_parser;
|
||||
mc::promoted_index_block_parser<temporary_buffer<char>> block_parser;
|
||||
|
||||
m_parser_context(const schema& s, reader_permit permit, column_values_fixed_lengths cvfl)
|
||||
: block_parser(s, std::move(permit), std::move(cvfl))
|
||||
|
||||
@@ -104,13 +104,16 @@ def check_increases_metric(metrics, metric_names):
|
||||
assert saved_metrics[n] < get_metric(metrics, n, None, the_metrics), f'metric {n} did not increase'
|
||||
|
||||
@contextmanager
|
||||
def check_increases_operation(metrics, operation_names):
|
||||
def check_increases_operation(metrics, operation_names, metric_name = 'scylla_alternator_operation', expected_value=None):
|
||||
the_metrics = get_metrics(metrics)
|
||||
saved_metrics = { x: get_metric(metrics, 'scylla_alternator_operation', {'op': x}, the_metrics) for x in operation_names }
|
||||
saved_metrics = { x: get_metric(metrics, metric_name, {'op': x}, the_metrics) for x in operation_names }
|
||||
yield
|
||||
the_metrics = get_metrics(metrics)
|
||||
for op in operation_names:
|
||||
assert saved_metrics[op] < get_metric(metrics, 'scylla_alternator_operation', {'op': op}, the_metrics)
|
||||
if expected_value:
|
||||
assert expected_value == get_metric(metrics, metric_name, {'op': op}, the_metrics) - saved_metrics[op]
|
||||
else:
|
||||
assert saved_metrics[op] < get_metric(metrics, metric_name, {'op': op}, the_metrics)
|
||||
|
||||
###### Test for metrics that count DynamoDB API operations:
|
||||
|
||||
@@ -125,6 +128,16 @@ def test_batch_get_item(test_table_s, metrics):
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
def test_batch_write_item_count(test_table_s, metrics):
|
||||
with check_increases_operation(metrics, ['BatchWriteItem'], metric_name='scylla_alternator_batch_item_count', expected_value=2):
|
||||
test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}, {'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
|
||||
|
||||
def test_batch_get_item_count(test_table_s, metrics):
|
||||
with check_increases_operation(metrics, ['BatchGetItem'], metric_name='scylla_alternator_batch_item_count', expected_value=2):
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}, {'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
# Test counters for CreateTable, DescribeTable, UpdateTable and DeleteTable
|
||||
def test_table_operations(dynamodb, metrics):
|
||||
with check_increases_operation(metrics, ['CreateTable', 'DescribeTable', 'UpdateTable', 'DeleteTable']):
|
||||
|
||||
@@ -453,3 +453,52 @@ SEASTAR_THREAD_TEST_CASE(test_invalidation) {
|
||||
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_page_view_as_contiguous_shared_buffer) {
|
||||
auto page_size = cached_file::page_size;
|
||||
test_file tf = make_test_file(page_size);
|
||||
|
||||
cached_file_stats metrics;
|
||||
logalloc::region region;
|
||||
cached_file cf(tf.f, metrics, cf_lru, region, page_size);
|
||||
|
||||
auto s = cf.read(1, std::nullopt);
|
||||
cached_file::page_view p = s.next_page_view().get();
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, page_size - 1), sstring(p.begin(), p.end()));
|
||||
BOOST_REQUIRE_EQUAL(p.size(), page_size - 1);
|
||||
BOOST_REQUIRE(!p.empty());
|
||||
|
||||
p.trim(10);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, 10), sstring(p.begin(), p.end()));
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, 10), sstring(p.get_write(), p.end()));
|
||||
|
||||
p.trim_front(1);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(2, 9), sstring(p.begin(), p.end()));
|
||||
|
||||
// Check movability
|
||||
{
|
||||
auto p_cpy = p.share();
|
||||
auto p1 = std::move(p_cpy);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(2, 9), sstring(p1.begin(), p1.end()));
|
||||
BOOST_REQUIRE(p_cpy.empty());
|
||||
BOOST_REQUIRE(p_cpy.size() == 0);
|
||||
BOOST_REQUIRE(!p_cpy);
|
||||
}
|
||||
|
||||
auto p2 = p.share(2, 3);
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(4, 3), sstring(p2.begin(), p2.end()));
|
||||
p2.trim_front(1); // should not affect p
|
||||
|
||||
p.trim_front(9);
|
||||
BOOST_REQUIRE_EQUAL(p.size(), 0);
|
||||
BOOST_REQUIRE(p.begin() == p.end());
|
||||
|
||||
p = {};
|
||||
BOOST_REQUIRE_EQUAL(p.size(), 0);
|
||||
BOOST_REQUIRE(p.begin() == p.end());
|
||||
BOOST_REQUIRE(!p);
|
||||
BOOST_REQUIRE_EQUAL(sstring(p.begin(), p.end()), sstring());
|
||||
|
||||
// p should not affect p2
|
||||
BOOST_REQUIRE_EQUAL(tf.contents.substr(5, 2), sstring(p2.begin(), p2.end()));
|
||||
}
|
||||
|
||||
@@ -2024,3 +2024,35 @@ SEASTAR_TEST_CASE(test_oversized_several_medium) {
|
||||
SEASTAR_TEST_CASE(test_oversized_several_large) {
|
||||
co_await test_oversized(8, 32);
|
||||
}
|
||||
|
||||
// tests #20862 - buffer usage counter not being updated correctly
|
||||
SEASTAR_TEST_CASE(test_commitlog_buffer_size_counter) {
|
||||
commitlog::config cfg;
|
||||
tmpdir tmp;
|
||||
cfg.commit_log_location = tmp.path().string();
|
||||
auto log = co_await commitlog::create_commitlog(cfg);
|
||||
|
||||
rp_set rps;
|
||||
// uncomment for verbosity
|
||||
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
|
||||
|
||||
auto uuid = make_table_id();
|
||||
auto size = 1024;
|
||||
|
||||
auto size_before_alloc = log.get_buffer_size();
|
||||
|
||||
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
|
||||
dst.fill('1', size);
|
||||
});
|
||||
h.release();
|
||||
|
||||
auto size_after_alloc = log.get_buffer_size();
|
||||
co_await log.sync_all_segments();
|
||||
auto size_after_sync = log.get_buffer_size();
|
||||
|
||||
BOOST_CHECK_LE(size_before_alloc, size_after_alloc);
|
||||
BOOST_CHECK_LE(size_after_sync, size_before_alloc);
|
||||
|
||||
co_await log.shutdown();
|
||||
co_await log.clear();
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/sstable_test_env.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "test/lib/make_random_string.hh"
|
||||
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
|
||||
@@ -46,3 +47,96 @@ SEASTAR_TEST_CASE(test_abort_during_index_read) {
|
||||
consumer_ctx.close().get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_promoted_index_parsing_page_crossing_and_retries) {
|
||||
return test_env::do_with_async([](test_env& env) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
testlog.info("Skipped because error injection is not enabled");
|
||||
#else
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto pk = ss.make_pkey();
|
||||
auto mut = mutation(s, pk);
|
||||
|
||||
// enough to have same index block whose clustering key is split across pages
|
||||
std::vector<clustering_key> keys;
|
||||
const auto n_keys = 100;
|
||||
auto key_size = cached_file::page_size / 3; // guarantees that index blocks are not congruent with page size.
|
||||
keys.reserve(n_keys);
|
||||
for (int i = 0; i < n_keys; ++i) {
|
||||
keys.push_back(ss.make_ckey(make_random_string(key_size)));
|
||||
ss.add_row(mut, keys[i], "v");
|
||||
}
|
||||
|
||||
clustering_key::less_compare less(*s);
|
||||
std::sort(keys.begin(), keys.end(), less);
|
||||
|
||||
env.manager().set_promoted_index_block_size(1); // force entry for each row
|
||||
auto mut_reader = make_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), std::move(mut));
|
||||
auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer());
|
||||
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
tracing::trace_state_ptr trace = nullptr;
|
||||
|
||||
auto index = std::make_unique<index_reader>(sst, permit, trace, use_caching::yes, true);
|
||||
auto close_index = deferred_close(*index);
|
||||
|
||||
index->advance_to(dht::ring_position_view(pk)).get();
|
||||
index->read_partition_data().get();
|
||||
|
||||
auto cur = dynamic_cast<mc::bsearch_clustered_cursor*>(index->current_clustered_cursor());
|
||||
BOOST_REQUIRE(cur);
|
||||
|
||||
std::optional<cached_file::offset_type> prev_offset;
|
||||
int crossed_page = 0;
|
||||
|
||||
utils::get_local_injector().enable("cached_promoted_index_parsing_invalidate_buf_across_page", false);
|
||||
|
||||
for (int i = 0; i < n_keys - 1; ++i) {
|
||||
auto block_offset = cur->promoted_index().get_block_only_offset(i, trace).get()->offset;
|
||||
auto next_block_offset = cur->promoted_index().get_block_only_offset(i + 1, trace).get()->offset;
|
||||
|
||||
auto start_page = block_offset / cached_file::page_size;
|
||||
auto end_page = (next_block_offset - 1) / cached_file::page_size;
|
||||
if (start_page != end_page) {
|
||||
auto pos = position_in_partition::for_key(keys[i]);
|
||||
position_in_partition::equal_compare eq(*s);
|
||||
|
||||
testlog.info("Crossed page at block {}, offset [{}, {})", i, block_offset, next_block_offset);
|
||||
crossed_page++;
|
||||
|
||||
auto* block = cur->promoted_index().get_block(i, trace).get();
|
||||
|
||||
testlog.debug("key : {}", pos);
|
||||
testlog.debug("start : {}", *block->start);
|
||||
testlog.debug("end : {}", *block->end);
|
||||
|
||||
BOOST_REQUIRE(eq(*block->start, pos));
|
||||
BOOST_REQUIRE(eq(*block->end, pos));
|
||||
if (prev_offset) {
|
||||
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
|
||||
}
|
||||
|
||||
cur->promoted_index().clear();
|
||||
|
||||
utils::get_local_injector().enable("cached_promoted_index_bad_alloc_parsing_across_page", true);
|
||||
block = cur->promoted_index().get_block(i, trace).get();
|
||||
|
||||
testlog.debug("start : {}", *block->start);
|
||||
testlog.debug("end : {}", *block->end);
|
||||
BOOST_REQUIRE(eq(*block->start, pos));
|
||||
BOOST_REQUIRE(eq(*block->end, pos));
|
||||
if (prev_offset) {
|
||||
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
|
||||
}
|
||||
|
||||
prev_offset = block->data_file_offset;
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_GE(crossed_page, 6); // If not, increase n_keys
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
@@ -299,22 +299,27 @@ def test_lwt_support_with_tablets(cql, test_keyspace, skip_without_tablets):
|
||||
# We want to ensure that we can only change the RF of any DC by at most 1 at a time
|
||||
# if we use tablets. That provides us with the guarantee that the old and the new QUORUM
|
||||
# overlap by at least one node.
|
||||
def test_alter_tablet_keyspace(cql, this_dc):
|
||||
def test_alter_tablet_keyspace_rf(cql, this_dc):
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }} "
|
||||
f"AND TABLETS = {{ 'enabled': true, 'initial': 128 }}") as keyspace:
|
||||
def change_opt_rf(rf_opt, new_rf):
|
||||
cql.execute(f"ALTER KEYSPACE {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}")
|
||||
cql.execute(f"ALTER KEYSPACE {keyspace} "
|
||||
f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}")
|
||||
|
||||
def change_dc_rf(new_rf):
|
||||
change_opt_rf(this_dc, new_rf)
|
||||
def change_default_rf(new_rf):
|
||||
change_opt_rf("replication_factor", new_rf)
|
||||
|
||||
change_dc_rf(2)
|
||||
change_dc_rf(3)
|
||||
change_dc_rf(2) # increase RF by 1 should be OK
|
||||
change_dc_rf(3) # increase RF by 1 again should be OK
|
||||
change_dc_rf(3) # setting the same RF shouldn't cause problems
|
||||
change_dc_rf(4) # increase RF by 1 again should be OK
|
||||
change_dc_rf(3) # decrease RF by 1 should be OK
|
||||
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(5)
|
||||
change_dc_rf(5) # increase RF by 2 should fail
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(1)
|
||||
change_dc_rf(1) # decrease RF by 2 should fail
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(10)
|
||||
change_dc_rf(10) # increase RF by 2+ should fail
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(0) # decrease RF by 2+ should fail
|
||||
|
||||
@@ -709,7 +709,7 @@ private:
|
||||
port = tmp.local_address().port();
|
||||
}
|
||||
// Don't start listening so tests can be run in parallel if cfg_in.ms_listen is not set to true explicitly.
|
||||
_ms.start(host_id, listen, std::move(port)).get();
|
||||
_ms.start(host_id, listen, std::move(port), std::ref(_feature_service)).get();
|
||||
stop_ms = defer(stop_type(stop_ms_func));
|
||||
|
||||
if (cfg_in.ms_listen) {
|
||||
|
||||
@@ -12,7 +12,9 @@
|
||||
#include <seastar/core/app-template.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/application_state.hh"
|
||||
@@ -56,6 +58,7 @@ int main(int ac, char ** av) {
|
||||
|
||||
sharded<abort_source> abort_sources;
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
sharded<gms::feature_service> feature_service;
|
||||
sharded<netw::messaging_service> messaging;
|
||||
|
||||
abort_sources.start().get();
|
||||
@@ -68,7 +71,10 @@ int main(int ac, char ** av) {
|
||||
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||
auto stop_token_mgr = defer([&] { token_metadata.stop().get(); });
|
||||
|
||||
messaging.start(locator::host_id{}, listen, 7000).get();
|
||||
auto cfg = gms::feature_config_from_db_config(db::config(), {});
|
||||
feature_service.start(cfg).get();
|
||||
|
||||
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
|
||||
auto stop_messaging = deferred_stop(messaging);
|
||||
|
||||
gms::gossip_config gcfg;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "gms/feature_service.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossip_digest_syn.hh"
|
||||
#include "gms/gossip_digest_ack.hh"
|
||||
@@ -192,8 +193,11 @@ int main(int ac, char ** av) {
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
|
||||
auto stop_tm = deferred_stop(token_metadata);
|
||||
seastar::sharded<gms::feature_service> feature_service;
|
||||
auto cfg = gms::feature_config_from_db_config(db::config(), {});
|
||||
feature_service.start(cfg).get();
|
||||
seastar::sharded<netw::messaging_service> messaging;
|
||||
messaging.start(locator::host_id{}, listen, 7000).get();
|
||||
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
|
||||
auto stop_messaging = deferred_stop(messaging);
|
||||
seastar::sharded<tester> testers;
|
||||
testers.start(std::ref(messaging)).get();
|
||||
|
||||
@@ -24,31 +24,31 @@ def test_rebuild_with_dc(nodetool):
|
||||
|
||||
|
||||
def test_removenode(nodetool):
|
||||
nodetool("removenode", "675ed9f4-6564-6dbd-can8-43fddce952gy", expected_requests=[
|
||||
nodetool("removenode", "ac9e2ad5-c6d7-4769-a64b-6e73173ccd86", expected_requests=[
|
||||
expected_request("POST", "/storage_service/remove_node",
|
||||
params={"host_id": "675ed9f4-6564-6dbd-can8-43fddce952gy"})])
|
||||
params={"host_id": "ac9e2ad5-c6d7-4769-a64b-6e73173ccd86"})])
|
||||
|
||||
|
||||
def test_removenode_ignore_nodes_one_node(nodetool):
|
||||
nodetool("removenode",
|
||||
"675ed9f4-6564-6dbd-can8-43fddce952gy",
|
||||
"ac9e2ad5-c6d7-4769-a64b-6e73173ccd86",
|
||||
"--ignore-dead-nodes",
|
||||
"88eed9f4-6564-6dbd-can8-43fddce952gy",
|
||||
"c0f4683f-61aa-43d4-98b5-99e2c5d27952",
|
||||
expected_requests=[
|
||||
expected_request("POST", "/storage_service/remove_node", params={
|
||||
"host_id": "675ed9f4-6564-6dbd-can8-43fddce952gy",
|
||||
"ignore_nodes": "88eed9f4-6564-6dbd-can8-43fddce952gy"})])
|
||||
"host_id": "ac9e2ad5-c6d7-4769-a64b-6e73173ccd86",
|
||||
"ignore_nodes": "c0f4683f-61aa-43d4-98b5-99e2c5d27952"})])
|
||||
|
||||
|
||||
def test_removenode_ignore_nodes_two_nodes(nodetool):
|
||||
nodetool("removenode",
|
||||
"675ed9f4-6564-6dbd-can8-43fddce952gy",
|
||||
"ac9e2ad5-c6d7-4769-a64b-6e73173ccd86",
|
||||
"--ignore-dead-nodes",
|
||||
"88eed9f4-6564-6dbd-can8-43fddce952gy,99eed9f4-6564-6dbd-can8-43fddce952gy",
|
||||
"c0f4683f-61aa-43d4-98b5-99e2c5d27952,7f066eb5-b76c-4587-922f-d71e2d7c3b51",
|
||||
expected_requests=[
|
||||
expected_request("POST", "/storage_service/remove_node", params={
|
||||
"host_id": "675ed9f4-6564-6dbd-can8-43fddce952gy",
|
||||
"ignore_nodes": "88eed9f4-6564-6dbd-can8-43fddce952gy,99eed9f4-6564-6dbd-can8-43fddce952gy"})])
|
||||
"host_id": "ac9e2ad5-c6d7-4769-a64b-6e73173ccd86",
|
||||
"ignore_nodes": "c0f4683f-61aa-43d4-98b5-99e2c5d27952,7f066eb5-b76c-4587-922f-d71e2d7c3b51"})])
|
||||
|
||||
|
||||
def test_removenode_status(nodetool):
|
||||
@@ -67,7 +67,7 @@ def test_removenode_force(nodetool):
|
||||
def test_removenode_status_with_ignore_dead_nodes(nodetool, scylla_only):
|
||||
check_nodetool_fails_with(
|
||||
nodetool,
|
||||
("removenode", "status", "--ignore-dead-nodes", "675ed9f4-6564-6dbd-can8-43fddce952gy"),
|
||||
("removenode", "status", "--ignore-dead-nodes", "ac9e2ad5-c6d7-4769-a64b-6e73173ccd86"),
|
||||
{"expected_requests": []},
|
||||
["error processing arguments: cannot use --ignore-dead-nodes with status or force"])
|
||||
|
||||
@@ -75,6 +75,6 @@ def test_removenode_status_with_ignore_dead_nodes(nodetool, scylla_only):
|
||||
def test_removenode_force_with_ignore_dead_nodes(nodetool, scylla_only):
|
||||
check_nodetool_fails_with(
|
||||
nodetool,
|
||||
("removenode", "force", "--ignore-dead-nodes", "675ed9f4-6564-6dbd-can8-43fddce952gy"),
|
||||
("removenode", "force", "--ignore-dead-nodes", "ac9e2ad5-c6d7-4769-a64b-6e73173ccd86"),
|
||||
{"expected_requests": []},
|
||||
["error processing arguments: cannot use --ignore-dead-nodes with status or force"])
|
||||
|
||||
@@ -526,6 +526,10 @@ class ManagerClient():
|
||||
rows = await self.cql.run_async(f"select id from system_schema.tables where keyspace_name = '{keyspace}' and table_name = '{table}'")
|
||||
return rows[0].id
|
||||
|
||||
async def get_view_id(self, keyspace: str, view: str):
|
||||
rows = await self.cql.run_async(f"select id from system_schema.views where keyspace_name = '{keyspace}' and view_name = '{view}'")
|
||||
return rows[0].id
|
||||
|
||||
async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.):
|
||||
"""Wait till a server sees a minimum given count of other servers"""
|
||||
if count < 1:
|
||||
|
||||
@@ -13,7 +13,7 @@ class TabletReplicas(NamedTuple):
|
||||
last_token: int
|
||||
replicas: list[tuple[HostID, int]]
|
||||
|
||||
async def get_all_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str) -> list[TabletReplicas]:
|
||||
async def get_all_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, is_view: bool = False) -> list[TabletReplicas]:
|
||||
"""
|
||||
Retrieves the tablet distribution for a given table.
|
||||
This call is guaranteed to see all prior changes applied to group0 tables.
|
||||
@@ -27,7 +27,10 @@ async def get_all_tablet_replicas(manager: ManagerClient, server: ServerInfo, ke
|
||||
# reflects the finalized tablet movement.
|
||||
await read_barrier(manager.api, server.ip_addr)
|
||||
|
||||
table_id = await manager.get_table_id(keyspace_name, table_name)
|
||||
if is_view:
|
||||
table_id = await manager.get_view_id(keyspace_name, table_name)
|
||||
else:
|
||||
table_id = await manager.get_table_id(keyspace_name, table_name)
|
||||
rows = await manager.get_cql().run_async(f"SELECT last_token, replicas FROM system.tablets where "
|
||||
f"table_id = {table_id}", host=host)
|
||||
return [TabletReplicas(
|
||||
|
||||
@@ -135,3 +135,19 @@ async def test_rebuild_node(manager: ManagerClient, random_tables: RandomTables)
|
||||
servers = await manager.running_servers()
|
||||
await manager.rebuild_node(servers[0].server_id)
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_removenode(manager: ManagerClient):
|
||||
servers = await manager.running_servers()
|
||||
assert len(servers) >= 3
|
||||
|
||||
await manager.server_stop_gracefully(servers[2].server_id)
|
||||
|
||||
try:
|
||||
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id),
|
||||
manager.remove_node(servers[1].server_id, servers[2].server_id)])
|
||||
except Exception as e:
|
||||
logger.info(f"exception raised due to concurrent remove node requests: {e}")
|
||||
else:
|
||||
raise Exception("concurrent removenode request should result in a failure, but unexpectedly succeeded")
|
||||
|
||||
|
||||
@@ -27,16 +27,20 @@ async def test_long_join(manager: ManagerClient) -> None:
|
||||
await asyncio.gather(task)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_long_join_drop_wntries_on_bootstrapping(manager: ManagerClient) -> None:
|
||||
async def test_long_join_drop_entries_on_bootstrapping(manager: ManagerClient) -> None:
|
||||
"""The test checks that join works even if expiring entries are dropped
|
||||
on the joining node between placement of the join request and its processing"""
|
||||
s1 = await manager.server_add()
|
||||
servers = await manager.servers_add(2)
|
||||
inj = 'topology_coordinator_pause_before_processing_backlog'
|
||||
await manager.api.enable_injection(s1.ip_addr, inj, one_shot=True)
|
||||
s2 = await manager.server_add(start=False, config={
|
||||
[await manager.api.enable_injection(s.ip_addr, inj, one_shot=True) for s in servers]
|
||||
s = await manager.server_add(start=False, config={
|
||||
'error_injections_at_startup': ['pre_server_start_drop_expiring']
|
||||
})
|
||||
task = asyncio.create_task(manager.server_start(s2.server_id))
|
||||
await manager.server_sees_other_server(s1.ip_addr, s2.ip_addr, interval=300)
|
||||
await manager.api.message_injection(s1.ip_addr, inj)
|
||||
task = asyncio.create_task(manager.server_start(s.server_id))
|
||||
log = await manager.server_open_log(s.server_id)
|
||||
await log.wait_for("init - starting gossiper")
|
||||
servers.append(s)
|
||||
await manager.servers_see_each_other(servers, interval=300)
|
||||
await manager.api.enable_injection(s.ip_addr, 'join_node_response_drop_expiring', one_shot=True)
|
||||
[await manager.api.message_injection(s.ip_addr, inj) for s in servers[:-1]]
|
||||
await asyncio.gather(task)
|
||||
|
||||
@@ -79,6 +79,10 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
logging.info(f"Restarting {others}")
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
# Prevent scylladb/scylladb#20791
|
||||
logging.info(f"Wait until {srv1} sees {others} as alive")
|
||||
await manager.server_sees_others(srv1.server_id, len(others))
|
||||
|
||||
logging.info(f"{others} restarted, waiting until driver reconnects to them")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
|
||||
|
||||
@@ -106,9 +106,15 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
parameters=[k, v],
|
||||
host=host2)
|
||||
finish_time = time.time()
|
||||
await replace_future
|
||||
s = await replace_future
|
||||
logger.info(f"done, writes count {next_id}, took {finish_time - start_time} seconds")
|
||||
|
||||
# make sure that after we start snapshot transfer we no longer have stale writes
|
||||
log = await manager.server_open_log(s.server_id)
|
||||
m = await log.wait_for("group0_raft_sm - transfer snapshot from ")
|
||||
errs = await log.grep("storage_proxy - Failed to apply mutation from", from_mark=m)
|
||||
assert len(errs) == 0
|
||||
|
||||
result_set = await manager.get_cql().run_async(SimpleStatement("select * from ks.test_table",
|
||||
consistency_level=ConsistencyLevel.QUORUM),
|
||||
host=host2, all_pages=True)
|
||||
|
||||
@@ -3,13 +3,14 @@
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
from cassandra.protocol import ConfigurationException
|
||||
from cassandra.protocol import ConfigurationException, InvalidRequest
|
||||
from cassandra.query import SimpleStatement, ConsistencyLevel
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import HTTPError, read_barrier
|
||||
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import wait_for_cql_and_get_hosts
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
import time
|
||||
import pytest
|
||||
import logging
|
||||
@@ -127,12 +128,14 @@ async def test_tablet_rf_change(manager: ManagerClient, direction):
|
||||
|
||||
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from}}}")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
||||
await cql.run_async("CREATE MATERIALIZED VIEW test.test_mv AS SELECT pk FROM test.test WHERE pk IS NOT NULL PRIMARY KEY (pk)")
|
||||
|
||||
logger.info("Populating table")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in range(128)])
|
||||
|
||||
async def check_allocated_replica(expected: int):
|
||||
replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
|
||||
replicas = replicas + await get_all_tablet_replicas(manager, servers[0], 'test', 'test_mv', is_view=True)
|
||||
for r in replicas:
|
||||
logger.info(f"{r.replicas}")
|
||||
assert len(r.replicas) == expected
|
||||
@@ -189,6 +192,54 @@ async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClien
|
||||
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS(test.test) WHERE pk={k}", host=host[0])
|
||||
|
||||
|
||||
# ALTER tablets KS cannot change RF of any DC by more than 1 at a time.
|
||||
# In a multi-dc environment, we can create replicas in a DC that didn't have replicas before,
|
||||
# but the above requirement should still be honoured, because we'd be changing RF from 0 to N in the new DC.
|
||||
# Reproduces https://github.com/scylladb/scylladb/issues/20039#issuecomment-2271365060
|
||||
# See also cql-pytest/test_tablets.py::test_alter_tablet_keyspace_rf for basic scenarios tested
|
||||
@pytest.mark.asyncio
|
||||
async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "enable_tablets": "true"}
|
||||
|
||||
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
|
||||
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
|
||||
await manager.servers_add(2, config=config, property_file={'dc': f'dc1', 'rack': 'myrack'})
|
||||
await manager.servers_add(2, config=config, property_file={'dc': f'dc2', 'rack': 'myrack'})
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("create keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}")
|
||||
# need to create a table to not change only the schema, but also tablets replicas
|
||||
await cql.run_async("create table ks.t (pk int primary key)")
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
# changing RF of dc2 from 0 to 2 should fail
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 2}")
|
||||
|
||||
# changing RF of dc2 from 0 to 1 should succeed
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 1}")
|
||||
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
|
||||
res = await cql.run_async("SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'")
|
||||
assert res[0].replication['dc1'] == '1'
|
||||
assert res[0].replication['dc2'] == '1'
|
||||
|
||||
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}")
|
||||
# as above, but decrementing
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}")
|
||||
# as above, but decrement 1 RF and increment the other
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 0}")
|
||||
# as above, but RFs are swapped
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 2}")
|
||||
|
||||
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 0}")
|
||||
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0}")
|
||||
|
||||
|
||||
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
|
||||
# Check that an existing cached read, will be cleaned up when the tablet it reads
|
||||
# from is migrated away.
|
||||
@@ -324,3 +375,44 @@ async def test_read_of_pending_replica_during_migration(manager: ManagerClient,
|
||||
|
||||
rows = await cql.run_async("SELECT pk from test.test")
|
||||
assert len(list(rows)) == 1
|
||||
|
||||
|
||||
# This test checks that --enable-tablets option and the TABLETS parameters of the CQL CREATE KEYSPACE
|
||||
# statemement are mutually correct from the "the least surprising behavior" concept. See comments inside
|
||||
# the test code for more details.
|
||||
@pytest.mark.parametrize("with_tablets", [True, False])
|
||||
@pytest.mark.parametrize("replication_strategy", ["NetworkTopologyStrategy", "SimpleStrategy", "EverywhereStrategy", "LocalStrategy"])
|
||||
@pytest.mark.asyncio
|
||||
async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, with_tablets, replication_strategy):
|
||||
cfg = {'enable_tablets': with_tablets}
|
||||
server = await manager.server_add(config=cfg)
|
||||
cql = manager.get_cql()
|
||||
|
||||
# Tablets are only possible when enabled and the replication strategy is NetworkTopology one
|
||||
tablets_possible = (replication_strategy == 'NetworkTopologyStrategy') and with_tablets
|
||||
|
||||
# First, check if a kesypace is able to be created with default CQL statement that
|
||||
# doesn't contain tablets parameters. When possible, tablets should be activated
|
||||
await cql.run_async(f"CREATE KEYSPACE test_d WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}};")
|
||||
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = 'test_d'").one()
|
||||
if tablets_possible:
|
||||
assert res.initial_tablets == 0
|
||||
else:
|
||||
assert res is None
|
||||
|
||||
# Next, check that explicit CQL request for enabling tablets can only be satisfied when
|
||||
# tablets are possible. Tablets must be activated in this case
|
||||
if tablets_possible:
|
||||
expectation = does_not_raise()
|
||||
else:
|
||||
expectation = pytest.raises(ConfigurationException)
|
||||
with expectation:
|
||||
await cql.run_async(f"CREATE KEYSPACE test_y WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': true}};")
|
||||
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = 'test_y'").one()
|
||||
assert res.initial_tablets == 0
|
||||
|
||||
# Finally, check that explicitly disabling tablets in CQL results in vnode-based keyspace
|
||||
# whenever tablets are enabled or not in config
|
||||
await cql.run_async(f"CREATE KEYSPACE test_n WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': false}};")
|
||||
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = 'test_n'").one()
|
||||
assert res is None
|
||||
|
||||
@@ -1222,3 +1222,53 @@ async def test_tablet_storage_freeing(manager: ManagerClient):
|
||||
logger.info("Verify that the table's disk usage on first node shrunk by about half.")
|
||||
size_after = await manager.server_get_sstables_disk_usage(servers[0].server_id, "test", "test")
|
||||
assert size_before * 0.33 < size_after < size_before * 0.66
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_schema_change_during_cleanup(manager: ManagerClient):
|
||||
logger.info("Start first node")
|
||||
servers = [await manager.server_add()]
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
async def check():
|
||||
logger.info("Checking table")
|
||||
rows = await cql.run_async("SELECT * FROM test.test;")
|
||||
assert rows == expected_rows
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
assert r.c == r.pk
|
||||
|
||||
s1_log = await manager.server_open_log(servers[0].server_id)
|
||||
s1_mark = await s1_log.mark()
|
||||
|
||||
logger.info("Start second node.")
|
||||
servers.append(await manager.server_add())
|
||||
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
||||
|
||||
await inject_error_on(manager, "delay_tablet_compaction_groups_cleanup", servers)
|
||||
|
||||
logger.info("Read system.tablets.")
|
||||
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
|
||||
assert len(tablet_replicas) == 1
|
||||
|
||||
logger.info("Migrating one tablet to another node.")
|
||||
t = tablet_replicas[0]
|
||||
migration_task = asyncio.create_task(
|
||||
manager.api.move_tablet(servers[0].ip_addr, "test", "test", *t.replicas[0], *(s1_host_id, 0), t.last_token))
|
||||
|
||||
logger.info("Waiting for log")
|
||||
await s1_log.wait_for('Initiating tablet cleanup of', from_mark=s1_mark, timeout=120)
|
||||
time.sleep(1)
|
||||
await cql.run_async("ALTER TABLE test.test WITH gc_grace_seconds = 0;")
|
||||
await migration_task
|
||||
|
||||
@@ -42,6 +42,7 @@
|
||||
|
||||
#include "api/scrub_status.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "log.hh"
|
||||
#include "release.hh"
|
||||
@@ -53,6 +54,7 @@
|
||||
#include "utils/pretty_printers.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -1379,6 +1381,12 @@ void removenode_operation(scylla_rest_client& client, const bpo::variables_map&
|
||||
std::unordered_map<sstring, sstring> params{{"host_id", op}};
|
||||
if (vm.contains("ignore-dead-nodes")) {
|
||||
params["ignore_nodes"] = vm["ignore-dead-nodes"].as<sstring>();
|
||||
const auto str_ids = utils::split_comma_separated_list(params["ignore_nodes"]);
|
||||
if (!locator::check_host_ids_contain_only_uuid(str_ids)) {
|
||||
fmt::print(std::cout, "\nWarning: Using IP addresses for '--ignore-dead-nodes' is deprecated and"
|
||||
" will be disabled in a future release. Please use host IDs instead. Run \"nodetool status\" to list all"
|
||||
" node IDs.\n\n");
|
||||
}
|
||||
}
|
||||
client.post("/storage_service/remove_node", std::move(params));
|
||||
}
|
||||
@@ -3802,7 +3810,7 @@ by any means!
|
||||
For more information, see: {}"
|
||||
)", doc_link("operating-scylla/nodetool-commands/removenode.html")),
|
||||
{
|
||||
typed_option<sstring>("ignore-dead-nodes", "Comma-separated list of dead nodes to ignore during removenode"),
|
||||
typed_option<sstring>("ignore-dead-nodes", "Comma-separated list of dead node host IDs to ignore during removenode"),
|
||||
},
|
||||
{
|
||||
typed_option<sstring>("remove-operation", "status|force|$HOST_ID - show status of current node removal, force completion of pending removal, or remove provided ID", 1),
|
||||
|
||||
@@ -79,6 +79,10 @@ private:
|
||||
}
|
||||
return std::unique_ptr<cached_page, cached_page_del>(this);
|
||||
}
|
||||
|
||||
bool only_ref() const {
|
||||
return _use_count <= 1;
|
||||
}
|
||||
public:
|
||||
explicit cached_page(cached_file* parent, page_idx_type idx, temporary_buffer<char> buf)
|
||||
: parent(parent)
|
||||
@@ -115,11 +119,10 @@ private:
|
||||
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
|
||||
}
|
||||
|
||||
// Returns a buffer which reflects contents of this page.
|
||||
// The buffer will not prevent eviction.
|
||||
// Returns a pointer to the contents of the page.
|
||||
// The buffer is invalidated when the page is evicted or when the owning LSA region invalidates references.
|
||||
temporary_buffer<char> get_buf_weak() {
|
||||
return temporary_buffer<char>(_lsa_buf.get(), _lsa_buf.size(), deleter());
|
||||
char* begin() {
|
||||
return _lsa_buf.get();
|
||||
}
|
||||
|
||||
size_t size_in_allocator() {
|
||||
@@ -208,10 +211,11 @@ public:
|
||||
class page_view {
|
||||
cached_page::ptr_type _page;
|
||||
size_t _offset;
|
||||
size_t _size;
|
||||
size_t _size = 0;
|
||||
std::optional<reader_permit::resource_units> _units;
|
||||
public:
|
||||
page_view() = default;
|
||||
|
||||
page_view(size_t offset, size_t size, cached_page::ptr_type page, std::optional<reader_permit::resource_units> units)
|
||||
: _page(std::move(page))
|
||||
, _offset(offset)
|
||||
@@ -219,15 +223,64 @@ public:
|
||||
, _units(std::move(units))
|
||||
{}
|
||||
|
||||
// The returned buffer is valid only until the LSA region associated with cached_file invalidates references.
|
||||
temporary_buffer<char> get_buf() {
|
||||
auto buf = _page->get_buf_weak();
|
||||
buf.trim(_size);
|
||||
buf.trim_front(_offset);
|
||||
return buf;
|
||||
page_view(page_view&& o) noexcept
|
||||
: _page(std::move(o._page))
|
||||
, _offset(std::exchange(o._offset, 0))
|
||||
, _size(std::exchange(o._size, 0))
|
||||
, _units(std::move(o._units))
|
||||
{}
|
||||
|
||||
page_view& operator=(page_view&& o) noexcept {
|
||||
_page = std::move(o._page);
|
||||
_offset = std::exchange(o._offset, 0);
|
||||
_size = std::exchange(o._size, 0);
|
||||
_units = std::move(o._units);
|
||||
return *this;
|
||||
}
|
||||
|
||||
operator bool() const { return bool(_page); }
|
||||
// Fills the page with garbage, releases the pointer and evicts the page so that it's no longer in cache.
|
||||
// For testing use-after-free on the buffer space.
|
||||
// After the call, the object is the same state as after being moved-from.
|
||||
void release_and_scramble() noexcept {
|
||||
if (_page->only_ref()) {
|
||||
std::memset(_page->_lsa_buf.get(), 0xfe, _page->_lsa_buf.size());
|
||||
cached_page& cp = *_page;
|
||||
_page = nullptr;
|
||||
cp.parent->_lru.remove(cp);
|
||||
cp.on_evicted();
|
||||
} else {
|
||||
_page = nullptr;
|
||||
}
|
||||
_size = 0;
|
||||
_offset = 0;
|
||||
_units = std::nullopt;
|
||||
}
|
||||
|
||||
operator bool() const { return bool(_page) && _size; }
|
||||
public: // ContiguousSharedBuffer concept
|
||||
const char* begin() const { return _page ? _page->begin() + _offset : nullptr; }
|
||||
const char* get() const { return begin(); }
|
||||
const char* end() const { return begin() + _size; }
|
||||
size_t size() const { return _size; }
|
||||
bool empty() const { return !_size; }
|
||||
char* get_write() { return const_cast<char*>(begin()); }
|
||||
|
||||
void trim(size_t pos) {
|
||||
_size = pos;
|
||||
}
|
||||
|
||||
void trim_front(size_t n) {
|
||||
_offset += n;
|
||||
_size -= n;
|
||||
}
|
||||
|
||||
page_view share() {
|
||||
return share(0, _size);
|
||||
}
|
||||
|
||||
page_view share(size_t pos, size_t len) {
|
||||
return page_view(_offset + pos, len, _page->share(), {});
|
||||
}
|
||||
};
|
||||
|
||||
// Generator of subsequent pages of data reflecting the contents of the file.
|
||||
@@ -306,7 +359,7 @@ public:
|
||||
? _cached_file->_last_page_size
|
||||
: page_size;
|
||||
units = get_page_units(page_size);
|
||||
page_view buf(_offset_in_page, size, std::move(page), std::move(units));
|
||||
page_view buf(_offset_in_page, size - _offset_in_page, std::move(page), std::move(units));
|
||||
_offset_in_page = 0;
|
||||
++_page_idx;
|
||||
return buf;
|
||||
|
||||
40
utils/contiguous_shared_buffer.hh
Normal file
40
utils/contiguous_shared_buffer.hh
Normal file
@@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <concepts>
|
||||
#include <memory>
|
||||
|
||||
// A contiguous buffer of char objects which can be trimmed and
|
||||
// supports zero-copy sharing of its underlying memory.
|
||||
template<typename T>
|
||||
concept ContiguousSharedBuffer = std::movable<T>
|
||||
&& std::default_initializable<T>
|
||||
&& requires(T& obj, size_t pos, size_t len) {
|
||||
|
||||
// Creates a new buffer that shares the memory of the original buffer.
|
||||
// The lifetime of the new buffer is independent of the original buffer.
|
||||
{ obj.share() } -> std::same_as<T>;
|
||||
|
||||
// Like share() but the new buffer represents a sub-range of the original buffer.
|
||||
{ obj.share(pos, len) } -> std::same_as<T>;
|
||||
|
||||
// Trims the suffix of a buffer so that 'len' is the index of the first removed byte.
|
||||
{ obj.trim(len) } -> std::same_as<void>;
|
||||
|
||||
// Trims the prefix of the buffer so that `pos` is the index of the first byte after the trim.
|
||||
{ obj.trim_front(pos) } -> std::same_as<void>;
|
||||
|
||||
{ obj.begin() } -> std::same_as<const char*>;
|
||||
{ obj.get() } -> std::same_as<const char*>;
|
||||
{ obj.get_write() } -> std::same_as<char*>;
|
||||
{ obj.end() } -> std::same_as<const char*>;
|
||||
{ obj.size() } -> std::same_as<size_t>;
|
||||
{ obj.empty() } -> std::same_as<bool>;
|
||||
};
|
||||
@@ -17,11 +17,13 @@
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "bytes_ostream.hh"
|
||||
#include "contiguous_shared_buffer.hh"
|
||||
#include "fragment_range.hh"
|
||||
|
||||
/// Fragmented buffer consisting of multiple temporary_buffer<char>
|
||||
class fragmented_temporary_buffer {
|
||||
using vector_type = std::vector<seastar::temporary_buffer<char>>;
|
||||
/// Fragmented buffer consisting of multiple Buffer objects.
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer {
|
||||
using vector_type = std::vector<Buffer>;
|
||||
vector_type _fragments;
|
||||
size_t _size_bytes = 0;
|
||||
public:
|
||||
@@ -30,15 +32,15 @@ public:
|
||||
class view;
|
||||
class istream;
|
||||
class reader;
|
||||
using ostream = seastar::memory_output_stream<vector_type::iterator>;
|
||||
using ostream = seastar::memory_output_stream<typename vector_type::iterator>;
|
||||
|
||||
fragmented_temporary_buffer() = default;
|
||||
basic_fragmented_buffer() = default;
|
||||
|
||||
fragmented_temporary_buffer(std::vector<seastar::temporary_buffer<char>> fragments, size_t size_bytes) noexcept
|
||||
basic_fragmented_buffer(std::vector<Buffer> fragments, size_t size_bytes) noexcept
|
||||
: _fragments(std::move(fragments)), _size_bytes(size_bytes)
|
||||
{ }
|
||||
|
||||
fragmented_temporary_buffer(const char* str, size_t size)
|
||||
basic_fragmented_buffer(const char* str, size_t size)
|
||||
{
|
||||
*this = allocate_to_fit(size);
|
||||
size_t pos = 0;
|
||||
@@ -54,10 +56,10 @@ public:
|
||||
|
||||
ostream get_ostream() noexcept {
|
||||
if (_fragments.size() != 1) {
|
||||
return ostream::fragmented(_fragments.begin(), _size_bytes);
|
||||
return typename ostream::fragmented(_fragments.begin(), _size_bytes);
|
||||
}
|
||||
auto& current = *_fragments.begin();
|
||||
return ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
|
||||
return typename ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
|
||||
}
|
||||
|
||||
using const_fragment_iterator = typename vector_type::const_iterator;
|
||||
@@ -100,23 +102,23 @@ public:
|
||||
_fragments.erase(it.base(), _fragments.end());
|
||||
}
|
||||
|
||||
// Creates a fragmented temporary buffer of a specified size, supplied as a parameter.
|
||||
// Creates a fragmented buffer of a specified size, supplied as a parameter.
|
||||
// Max chunk size is limited to 128kb (the same limit as `bytes_stream` has).
|
||||
static fragmented_temporary_buffer allocate_to_fit(size_t data_size) {
|
||||
static basic_fragmented_buffer allocate_to_fit(size_t data_size) {
|
||||
constexpr size_t max_fragment_size = default_fragment_size; // 128KB
|
||||
|
||||
const size_t full_fragment_count = data_size / max_fragment_size; // number of max-sized fragments
|
||||
const size_t last_fragment_size = data_size % max_fragment_size;
|
||||
|
||||
std::vector<seastar::temporary_buffer<char>> fragments;
|
||||
std::vector<Buffer> fragments;
|
||||
fragments.reserve(full_fragment_count + !!last_fragment_size);
|
||||
for (size_t i = 0; i < full_fragment_count; ++i) {
|
||||
fragments.emplace_back(seastar::temporary_buffer<char>(max_fragment_size));
|
||||
fragments.emplace_back(Buffer(max_fragment_size));
|
||||
}
|
||||
if (last_fragment_size) {
|
||||
fragments.emplace_back(seastar::temporary_buffer<char>(last_fragment_size));
|
||||
fragments.emplace_back(Buffer(last_fragment_size));
|
||||
}
|
||||
return fragmented_temporary_buffer(std::move(fragments), data_size);
|
||||
return basic_fragmented_buffer(std::move(fragments), data_size);
|
||||
}
|
||||
|
||||
vector_type release() && noexcept {
|
||||
@@ -124,7 +126,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class fragmented_temporary_buffer::view {
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer<Buffer>::view {
|
||||
vector_type::const_iterator _current;
|
||||
const char* _current_position = nullptr;
|
||||
size_t _current_size = 0;
|
||||
@@ -252,7 +255,7 @@ public:
|
||||
_current_size = std::min(_current_size, _total_size);
|
||||
}
|
||||
|
||||
bool operator==(const fragmented_temporary_buffer::view& other) const noexcept {
|
||||
bool operator==(const basic_fragmented_buffer::view& other) const noexcept {
|
||||
auto this_it = begin();
|
||||
auto other_it = other.begin();
|
||||
|
||||
@@ -285,10 +288,14 @@ public:
|
||||
return this_it == end() && other_it == other.end();
|
||||
}
|
||||
};
|
||||
|
||||
using fragmented_temporary_buffer = basic_fragmented_buffer<temporary_buffer<char>>;
|
||||
|
||||
static_assert(FragmentRange<fragmented_temporary_buffer::view>);
|
||||
static_assert(FragmentedView<fragmented_temporary_buffer::view>);
|
||||
|
||||
inline fragmented_temporary_buffer::operator view() const noexcept
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
inline basic_fragmented_buffer<Buffer>::operator view() const noexcept
|
||||
{
|
||||
if (!_size_bytes) {
|
||||
return view();
|
||||
@@ -305,7 +312,8 @@ concept ExceptionThrower = requires(T obj, size_t n) {
|
||||
|
||||
}
|
||||
|
||||
class fragmented_temporary_buffer::istream {
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer<Buffer>::istream {
|
||||
vector_type::const_iterator _current;
|
||||
const char* _current_position;
|
||||
const char* _current_end;
|
||||
@@ -465,29 +473,32 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline fragmented_temporary_buffer::istream fragmented_temporary_buffer::get_istream() const noexcept // allow empty (ut for that)
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
inline basic_fragmented_buffer<Buffer>::istream basic_fragmented_buffer<Buffer>::get_istream() const noexcept // allow empty (ut for that)
|
||||
{
|
||||
return istream(_fragments, _size_bytes);
|
||||
}
|
||||
|
||||
class fragmented_temporary_buffer::reader {
|
||||
std::vector<temporary_buffer<char>> _fragments;
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
class basic_fragmented_buffer<Buffer>::reader {
|
||||
using FragBuffer = basic_fragmented_buffer<Buffer>;
|
||||
FragBuffer::vector_type _fragments;
|
||||
size_t _left = 0;
|
||||
public:
|
||||
future<fragmented_temporary_buffer> read_exactly(input_stream<char>& in, size_t length) {
|
||||
_fragments = std::vector<temporary_buffer<char>>();
|
||||
future<FragBuffer> read_exactly(input_stream<char>& in, size_t length) {
|
||||
_fragments = FragBuffer::vector_type();
|
||||
_left = length;
|
||||
return repeat_until_value([this, length, &in] {
|
||||
if (!_left) {
|
||||
return make_ready_future<std::optional<fragmented_temporary_buffer>>(fragmented_temporary_buffer(std::move(_fragments), length));
|
||||
return make_ready_future<std::optional<FragBuffer>>(FragBuffer(std::move(_fragments), length));
|
||||
}
|
||||
return in.read_up_to(_left).then([this] (temporary_buffer<char> buf) {
|
||||
if (buf.empty()) {
|
||||
return std::make_optional(fragmented_temporary_buffer());
|
||||
return std::make_optional(FragBuffer());
|
||||
}
|
||||
_left -= buf.size();
|
||||
_fragments.emplace_back(std::move(buf));
|
||||
return std::optional<fragmented_temporary_buffer>();
|
||||
_fragments.emplace_back(Buffer(std::move(buf)));
|
||||
return std::optional<FragBuffer>();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -495,7 +506,8 @@ public:
|
||||
|
||||
// The operator below is used only for logging
|
||||
|
||||
inline std::ostream& operator<<(std::ostream& out, const fragmented_temporary_buffer::view& v) {
|
||||
template <ContiguousSharedBuffer Buffer>
|
||||
inline std::ostream& operator<<(std::ostream& out, const typename basic_fragmented_buffer<Buffer>::view& v) {
|
||||
for (bytes_view frag : fragment_range(v)) {
|
||||
out << to_hex(frag);
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/base64.hh"
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
@@ -51,12 +50,12 @@ public:
|
||||
|
||||
// rapidjson configuration macros
|
||||
#define RAPIDJSON_HAS_STDSTRING 1
|
||||
// Default rjson policy is to use SCYLLA_ASSERT() - which is dangerous for two reasons:
|
||||
// 1. SCYLLA_ASSERT() can be turned off with -DNDEBUG
|
||||
// 2. SCYLLA_ASSERT() crashes a program
|
||||
// Default rjson policy is to use assert() - which is dangerous for two reasons:
|
||||
// 1. assert() can be turned off with -DNDEBUG
|
||||
// 2. assert() crashes a program
|
||||
// Fortunately, the default policy can be overridden, and so rapidjson errors will
|
||||
// throw an rjson::error exception instead.
|
||||
#define RAPIDJSON_ASSERT(x) do { if (!(x)) throw rjson::error(fmt::format("JSON SCYLLA_ASSERT failed on condition '{}', at: {}", #x, current_backtrace_tasklocal())); } while (0)
|
||||
#define RAPIDJSON_ASSERT(x) do { if (!(x)) throw rjson::error(fmt::format("JSON assert failed on condition '{}', at: {}", #x, current_backtrace_tasklocal())); } while (0)
|
||||
// This macro is used for functions which are called for every json char making it
|
||||
// quite costly if not inlined, by default rapidjson only enables it if NDEBUG
|
||||
// is defined which isn't the case for us.
|
||||
|
||||
Reference in New Issue
Block a user