Compare commits
122 Commits
copilot/fi
...
scylla-4.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c43a0dc29 | ||
|
|
8399aac6bc | ||
|
|
b1a70d0ad4 | ||
|
|
2251a1c577 | ||
|
|
f8c7c485d2 | ||
|
|
d60bed1953 | ||
|
|
259203a394 | ||
|
|
5f284633d4 | ||
|
|
66cc4be8f6 | ||
|
|
9ca6aa5535 | ||
|
|
6e63db8c72 | ||
|
|
803da18727 | ||
|
|
165d89860e | ||
|
|
4a5116a0ae | ||
|
|
6d9ff622df | ||
|
|
65bc33c921 | ||
|
|
5e90f06ca2 | ||
|
|
2036de3245 | ||
|
|
0924e4d92f | ||
|
|
b8313775c5 | ||
|
|
ec0002a67f | ||
|
|
ebdf5f9e55 | ||
|
|
32c0e4f110 | ||
|
|
5f48444a98 | ||
|
|
8930ea5407 | ||
|
|
311cd6403c | ||
|
|
b71821435a | ||
|
|
cd29e2643c | ||
|
|
59aa1834a7 | ||
|
|
436b305286 | ||
|
|
1d85051e8d | ||
|
|
3f52d8733b | ||
|
|
eece444547 | ||
|
|
2ab51c4055 | ||
|
|
4a1a1feb55 | ||
|
|
76995933e0 | ||
|
|
f840263fdd | ||
|
|
b4887ce4a5 | ||
|
|
849e12bf2e | ||
|
|
f124f97f99 | ||
|
|
4ee0b489cf | ||
|
|
382dcb9d34 | ||
|
|
07b7df9171 | ||
|
|
7fa3a988e3 | ||
|
|
7b23574224 | ||
|
|
ac207c892b | ||
|
|
a023b3bb7a | ||
|
|
0b9db42d9c | ||
|
|
df8d4482c5 | ||
|
|
442d7bf9ff | ||
|
|
bc6422d16d | ||
|
|
76f4bc4c6f | ||
|
|
dc4efb0a1e | ||
|
|
f699d23f0b | ||
|
|
d5e5a6fe48 | ||
|
|
5a43c6ec81 | ||
|
|
2aae8bb206 | ||
|
|
c206399379 | ||
|
|
787b324916 | ||
|
|
dfe90a69f5 | ||
|
|
d03d6f41c2 | ||
|
|
0e86f1bf66 | ||
|
|
392a007b3a | ||
|
|
254b898cd8 | ||
|
|
6fb84ed7e0 | ||
|
|
9002592ee0 | ||
|
|
5d6a7272e7 | ||
|
|
96625fa54b | ||
|
|
4f5f404619 | ||
|
|
cd4502ee64 | ||
|
|
3e6c6d5f58 | ||
|
|
564b4c32b0 | ||
|
|
dfafc4e1a9 | ||
|
|
db286c5ca4 | ||
|
|
519fcd4729 | ||
|
|
9bcbcbbcf2 | ||
|
|
c622e5bfab | ||
|
|
905643bbc2 | ||
|
|
d396a298d6 | ||
|
|
1d9bbbc957 | ||
|
|
4f1878803e | ||
|
|
c5e2fad1c8 | ||
|
|
abd0fa52c0 | ||
|
|
dfa464c35b | ||
|
|
be29b35c4b | ||
|
|
97b7024c0c | ||
|
|
194ff1d226 | ||
|
|
b8f7fb35e1 | ||
|
|
f7d53ff607 | ||
|
|
eb190643f8 | ||
|
|
3f8345f1b8 | ||
|
|
891a3fa243 | ||
|
|
db31542805 | ||
|
|
b443b2574a | ||
|
|
2ee321d88e | ||
|
|
4563f4b992 | ||
|
|
81dc8eeec7 | ||
|
|
2d72f7d8e5 | ||
|
|
c6ee86b512 | ||
|
|
67348cd6e8 | ||
|
|
44cc4843f1 | ||
|
|
f1f5586bf6 | ||
|
|
3a447cd755 | ||
|
|
176aa91be5 | ||
|
|
4a3eff17ff | ||
|
|
2e00f6d0a1 | ||
|
|
bf509c3b16 | ||
|
|
84ef30752f | ||
|
|
f1b71ec216 | ||
|
|
93ed536fba | ||
|
|
ab3da4510c | ||
|
|
bb8fcbff68 | ||
|
|
af43d0c62d | ||
|
|
8c8c266f67 | ||
|
|
6d1301d93c | ||
|
|
be545d6d5d | ||
|
|
a1c15f0690 | ||
|
|
4d68c53389 | ||
|
|
7d1f352be2 | ||
|
|
0fe5335447 | ||
|
|
8a026b8b14 | ||
|
|
0760107b9f |
5
.gitmodules
vendored
5
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
@@ -12,3 +12,6 @@
|
||||
[submodule "zstd"]
|
||||
path = zstd
|
||||
url = ../zstd
|
||||
[submodule "abseil"]
|
||||
path = abseil
|
||||
url = ../abseil-cpp
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.1.8
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
1
abseil
Submodule
1
abseil
Submodule
Submodule abseil added at 2069dc796a
@@ -129,7 +129,7 @@ future<std::string> get_key_from_roles(cql3::query_processor& qp, std::string us
|
||||
auth::meta::roles_table::qualified_name(), auth::meta::roles_table::role_col_name);
|
||||
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
auto timeout = auth::internal_distributed_timeout_config();
|
||||
auto& timeout = auth::internal_distributed_timeout_config();
|
||||
return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
auto res = f.get0();
|
||||
auto salted_hash = std::optional<sstring>();
|
||||
|
||||
@@ -141,6 +141,11 @@ struct nonempty : public size_check {
|
||||
|
||||
// Check that array has the expected number of elements
|
||||
static void verify_operand_count(const rjson::value* array, const size_check& expected, const rjson::value& op) {
|
||||
if (!array && expected(0)) {
|
||||
// If expected() allows an empty AttributeValueList, it is also fine
|
||||
// that it is missing.
|
||||
return;
|
||||
}
|
||||
if (!array || !array->IsArray()) {
|
||||
throw api_error("ValidationException", "With ComparisonOperator, AttributeValueList must be given and an array");
|
||||
}
|
||||
@@ -365,31 +370,35 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
|
||||
|
||||
struct cmp_lt {
|
||||
template <typename T> bool operator()(const T& lhs, const T& rhs) const { return lhs < rhs; }
|
||||
// We cannot use the normal comparison operators like "<" on the bytes
|
||||
// type, because they treat individual bytes as signed but we need to
|
||||
// compare them as *unsigned*. So we need a specialization for bytes.
|
||||
bool operator()(const bytes& lhs, const bytes& rhs) const { return compare_unsigned(lhs, rhs) < 0; }
|
||||
static constexpr const char* diagnostic = "LT operator";
|
||||
};
|
||||
|
||||
struct cmp_le {
|
||||
// bytes only has <, so we cannot use <=.
|
||||
template <typename T> bool operator()(const T& lhs, const T& rhs) const { return lhs < rhs || lhs == rhs; }
|
||||
template <typename T> bool operator()(const T& lhs, const T& rhs) const { return lhs <= rhs; }
|
||||
bool operator()(const bytes& lhs, const bytes& rhs) const { return compare_unsigned(lhs, rhs) <= 0; }
|
||||
static constexpr const char* diagnostic = "LE operator";
|
||||
};
|
||||
|
||||
struct cmp_ge {
|
||||
// bytes only has <, so we cannot use >=.
|
||||
template <typename T> bool operator()(const T& lhs, const T& rhs) const { return rhs < lhs || lhs == rhs; }
|
||||
template <typename T> bool operator()(const T& lhs, const T& rhs) const { return lhs >= rhs; }
|
||||
bool operator()(const bytes& lhs, const bytes& rhs) const { return compare_unsigned(lhs, rhs) >= 0; }
|
||||
static constexpr const char* diagnostic = "GE operator";
|
||||
};
|
||||
|
||||
struct cmp_gt {
|
||||
// bytes only has <, so we cannot use >.
|
||||
template <typename T> bool operator()(const T& lhs, const T& rhs) const { return rhs < lhs; }
|
||||
template <typename T> bool operator()(const T& lhs, const T& rhs) const { return lhs > rhs; }
|
||||
bool operator()(const bytes& lhs, const bytes& rhs) const { return compare_unsigned(lhs, rhs) > 0; }
|
||||
static constexpr const char* diagnostic = "GT operator";
|
||||
};
|
||||
|
||||
// True if v is between lb and ub, inclusive. Throws if lb > ub.
|
||||
template <typename T>
|
||||
bool check_BETWEEN(const T& v, const T& lb, const T& ub) {
|
||||
if (ub < lb) {
|
||||
if (cmp_lt()(ub, lb)) {
|
||||
throw api_error("ValidationException",
|
||||
format("BETWEEN operator requires lower_bound <= upper_bound, but {} > {}", lb, ub));
|
||||
}
|
||||
|
||||
@@ -573,29 +573,66 @@ static bool validate_legal_tag_chars(std::string_view tag) {
|
||||
return std::all_of(tag.begin(), tag.end(), &is_legal_tag_char);
|
||||
}
|
||||
|
||||
static const std::unordered_set<std::string_view> allowed_write_isolation_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
|
||||
static void validate_tags(const std::map<sstring, sstring>& tags) {
|
||||
static const std::unordered_set<std::string_view> allowed_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
auto it = tags.find(rmw_operation::WRITE_ISOLATION_TAG_KEY);
|
||||
if (it != tags.end()) {
|
||||
std::string_view value = it->second;
|
||||
elogger.warn("Allowed values count {} {}", value, allowed_values.count(value));
|
||||
if (allowed_values.count(value) == 0) {
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw api_error("ValidationException",
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_values));
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_write_isolation_values));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static rmw_operation::write_isolation parse_write_isolation(std::string_view value) {
|
||||
if (!value.empty()) {
|
||||
switch (value[0]) {
|
||||
case 'f':
|
||||
return rmw_operation::write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return rmw_operation::write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return rmw_operation::write_isolation::UNSAFE_RMW;
|
||||
}
|
||||
}
|
||||
// Shouldn't happen as validate_tags() / set_default_write_isolation()
|
||||
// verify allow only a closed set of values.
|
||||
return rmw_operation::default_write_isolation;
|
||||
|
||||
}
|
||||
// This default_write_isolation is always overwritten in main.cc, which calls
|
||||
// set_default_write_isolation().
|
||||
rmw_operation::write_isolation rmw_operation::default_write_isolation =
|
||||
rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
void rmw_operation::set_default_write_isolation(std::string_view value) {
|
||||
if (value.empty()) {
|
||||
throw std::runtime_error("When Alternator is enabled, write "
|
||||
"isolation policy must be selected, using the "
|
||||
"'--alternator-write-isolation' option. "
|
||||
"See docs/alternator/alternator.md for instructions.");
|
||||
}
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw std::runtime_error(format("Invalid --alternator-write-isolation "
|
||||
"setting '{}'. Allowed values: {}.",
|
||||
value, allowed_write_isolation_values));
|
||||
}
|
||||
default_write_isolation = parse_write_isolation(value);
|
||||
}
|
||||
|
||||
// FIXME: Updating tags currently relies on updating schema, which may be subject
|
||||
// to races during concurrent updates of the same table. Once Scylla schema updates
|
||||
// are fixed, this issue will automatically get fixed as well.
|
||||
enum class update_tags_action { add_tags, delete_tags };
|
||||
static future<> update_tags(const rjson::value& tags, schema_ptr schema, std::map<sstring, sstring>&& tags_map, update_tags_action action) {
|
||||
static future<> update_tags(service::migration_manager& mm, const rjson::value& tags, schema_ptr schema, std::map<sstring, sstring>&& tags_map, update_tags_action action) {
|
||||
if (action == update_tags_action::add_tags) {
|
||||
for (auto it = tags.Begin(); it != tags.End(); ++it) {
|
||||
const rjson::value& key = (*it)["Key"];
|
||||
@@ -622,24 +659,12 @@ static future<> update_tags(const rjson::value& tags, schema_ptr schema, std::ma
|
||||
}
|
||||
validate_tags(tags_map);
|
||||
|
||||
std::stringstream serialized_tags;
|
||||
serialized_tags << '{';
|
||||
for (auto& tag_entry : tags_map) {
|
||||
serialized_tags << format("'{}':'{}',", tag_entry.first, tag_entry.second);
|
||||
}
|
||||
std::string serialized_tags_str = serialized_tags.str();
|
||||
if (!tags_map.empty()) {
|
||||
serialized_tags_str[serialized_tags_str.size() - 1] = '}'; // trims the last ',' delimiter
|
||||
} else {
|
||||
serialized_tags_str.push_back('}');
|
||||
}
|
||||
|
||||
sstring req = format("ALTER TABLE \"{}\".\"{}\" WITH {} = {}",
|
||||
schema->ks_name(), schema->cf_name(), tags_extension::NAME, serialized_tags_str);
|
||||
return db::execute_cql(std::move(req)).discard_result();
|
||||
schema_builder builder(schema);
|
||||
builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared<tags_extension>(std::move(tags_map))}});
|
||||
return mm.announce_column_family_update(builder.build(), false, std::vector<view_ptr>(), false);
|
||||
}
|
||||
|
||||
static future<> add_tags(service::storage_proxy& proxy, schema_ptr schema, rjson::value& request_info) {
|
||||
static future<> add_tags(service::migration_manager& mm, service::storage_proxy& proxy, schema_ptr schema, rjson::value& request_info) {
|
||||
const rjson::value* tags = rjson::find(request_info, "Tags");
|
||||
if (!tags || !tags->IsArray()) {
|
||||
return make_exception_future<>(api_error("ValidationException", format("Cannot parse tags")));
|
||||
@@ -649,7 +674,7 @@ static future<> add_tags(service::storage_proxy& proxy, schema_ptr schema, rjson
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
|
||||
return update_tags(rjson::copy(*tags), schema, std::move(tags_map), update_tags_action::add_tags);
|
||||
return update_tags(mm, rjson::copy(*tags), schema, std::move(tags_map), update_tags_action::add_tags);
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
@@ -661,7 +686,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
|
||||
return api_error("AccessDeniedException", "Incorrect resource identifier");
|
||||
}
|
||||
schema_ptr schema = get_table_from_arn(_proxy, std::string_view(arn->GetString(), arn->GetStringLength()));
|
||||
add_tags(_proxy, schema, request).get();
|
||||
add_tags(_mm, _proxy, schema, request).get();
|
||||
return json_string("");
|
||||
});
|
||||
}
|
||||
@@ -682,7 +707,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
|
||||
schema_ptr schema = get_table_from_arn(_proxy, std::string_view(arn->GetString(), arn->GetStringLength()));
|
||||
|
||||
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
|
||||
update_tags(*tags, schema, std::move(tags_map), update_tags_action::delete_tags).get();
|
||||
update_tags(_mm, *tags, schema, std::move(tags_map), update_tags_action::delete_tags).get();
|
||||
return json_string("");
|
||||
});
|
||||
}
|
||||
@@ -710,6 +735,17 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
|
||||
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
|
||||
}
|
||||
|
||||
static future<> wait_for_schema_agreement(db::timeout_clock::time_point deadline) {
|
||||
return do_until([deadline] {
|
||||
if (db::timeout_clock::now() > deadline) {
|
||||
throw std::runtime_error("Unable to reach schema agreement");
|
||||
}
|
||||
return service::get_local_migration_manager().have_schema_agreement();
|
||||
}, [] {
|
||||
return seastar::sleep(500ms);
|
||||
});
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
@@ -903,9 +939,11 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
}).then([this, table_info = std::move(table_info), schema] () mutable {
|
||||
future<> f = make_ready_future<>();
|
||||
if (rjson::find(table_info, "Tags")) {
|
||||
f = add_tags(_proxy, schema, table_info);
|
||||
f = add_tags(_mm, _proxy, schema, table_info);
|
||||
}
|
||||
return f.then([table_info = std::move(table_info), schema] () mutable {
|
||||
return f.then([] {
|
||||
return wait_for_schema_agreement(db::timeout_clock::now() + 10s);
|
||||
}).then([table_info = std::move(table_info), schema] () mutable {
|
||||
rjson::value status = rjson::empty_object();
|
||||
supplement_table_info(table_info, *schema);
|
||||
rjson::set(status, "TableDescription", std::move(table_info));
|
||||
@@ -933,15 +971,24 @@ class attribute_collector {
|
||||
void add(bytes&& name, atomic_cell&& cell) {
|
||||
collected.emplace(std::move(name), std::move(cell));
|
||||
}
|
||||
void add(const bytes& name, atomic_cell&& cell) {
|
||||
collected.emplace(name, std::move(cell));
|
||||
}
|
||||
public:
|
||||
attribute_collector() : collected(attrs_type()->get_keys_type()->as_less_comparator()) { }
|
||||
void put(bytes&& name, bytes&& val, api::timestamp_type ts) {
|
||||
add(std::move(name), atomic_cell::make_live(*bytes_type, ts, std::move(val), atomic_cell::collection_member::yes));
|
||||
void put(bytes&& name, const bytes& val, api::timestamp_type ts) {
|
||||
add(std::move(name), atomic_cell::make_live(*bytes_type, ts, val, atomic_cell::collection_member::yes));
|
||||
|
||||
}
|
||||
void put(const bytes& name, const bytes& val, api::timestamp_type ts) {
|
||||
add(name, atomic_cell::make_live(*bytes_type, ts, val, atomic_cell::collection_member::yes));
|
||||
}
|
||||
void del(bytes&& name, api::timestamp_type ts) {
|
||||
add(std::move(name), atomic_cell::make_dead(ts, gc_clock::now()));
|
||||
}
|
||||
void del(const bytes& name, api::timestamp_type ts) {
|
||||
add(name, atomic_cell::make_dead(ts, gc_clock::now()));
|
||||
}
|
||||
collection_mutation_description to_mut() {
|
||||
collection_mutation_description ret;
|
||||
for (auto&& e : collected) {
|
||||
@@ -1021,7 +1068,7 @@ public:
|
||||
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item);
|
||||
// put_or_delete_item doesn't keep a reference to schema (so it can be
|
||||
// moved between shards for LWT) so it needs to be given again to build():
|
||||
mutation build(schema_ptr schema, api::timestamp_type ts);
|
||||
mutation build(schema_ptr schema, api::timestamp_type ts) const;
|
||||
const partition_key& pk() const { return _pk; }
|
||||
const clustering_key& ck() const { return _ck; }
|
||||
};
|
||||
@@ -1050,7 +1097,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
}
|
||||
}
|
||||
|
||||
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) const {
|
||||
mutation m(schema, _pk);
|
||||
// If there's no clustering key, a tombstone should be created directly
|
||||
// on a partition, not on a clustering row - otherwise it will look like
|
||||
@@ -1072,7 +1119,7 @@ mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||
for (auto& c : *_cells) {
|
||||
const column_definition* cdef = schema->get_column_definition(c.column_name);
|
||||
if (!cdef) {
|
||||
attrs_collector.put(std::move(c.column_name), std::move(c.value), ts);
|
||||
attrs_collector.put(c.column_name, c.value, ts);
|
||||
} else {
|
||||
row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, std::move(c.value)));
|
||||
}
|
||||
@@ -1195,22 +1242,9 @@ rmw_operation::write_isolation rmw_operation::get_write_isolation_for_schema(sch
|
||||
const auto& tags = get_tags_of_table(schema);
|
||||
auto it = tags.find(WRITE_ISOLATION_TAG_KEY);
|
||||
if (it == tags.end() || it->second.empty()) {
|
||||
// By default, fall back to always enforcing LWT
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
}
|
||||
switch (it->second[0]) {
|
||||
case 'f':
|
||||
return write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return write_isolation::UNSAFE_RMW;
|
||||
default:
|
||||
// In case of an incorrect tag, fall back to the safest option: LWT_ALWAYS
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
return default_write_isolation;
|
||||
}
|
||||
return parse_write_isolation(it->second);
|
||||
}
|
||||
|
||||
// shard_for_execute() checks whether execute() must be called on a specific
|
||||
@@ -1241,11 +1275,6 @@ std::optional<shard_id> rmw_operation::shard_for_execute(bool needs_read_before_
|
||||
// PutItem, DeleteItem). All these return nothing by default, but can
|
||||
// optionally return Attributes if requested via the ReturnValues option.
|
||||
static future<executor::request_return_type> rmw_operation_return(rjson::value&& attributes) {
|
||||
// As an optimization, in the simple and common case that nothing is to be
|
||||
// returned, quickly return an empty result:
|
||||
if (attributes.IsNull()) {
|
||||
return make_ready_future<executor::request_return_type>(json_string(""));
|
||||
}
|
||||
rjson::value ret = rjson::empty_object();
|
||||
if (!attributes.IsNull()) {
|
||||
rjson::set(ret, "Attributes", std::move(attributes));
|
||||
@@ -1261,7 +1290,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
stats& stats) {
|
||||
if (needs_read_before_write) {
|
||||
if (_write_isolation == write_isolation::FORBID_RMW) {
|
||||
throw api_error("ValidationException", "Read-modify-write operations not supported");
|
||||
throw api_error("ValidationException", "Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
|
||||
}
|
||||
stats.reads_before_write++;
|
||||
if (_write_isolation == write_isolation::UNSAFE_RMW) {
|
||||
@@ -1370,7 +1399,7 @@ public:
|
||||
check_needs_read_before_write(_condition_expression) ||
|
||||
_returnvalues == returnvalues::ALL_OLD;
|
||||
}
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) override {
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const override {
|
||||
std::unordered_set<std::string> used_attribute_values;
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
if (!verify_expected(_request, previous_item) ||
|
||||
@@ -1382,6 +1411,7 @@ public:
|
||||
// efficient than throwing an exception.
|
||||
return {};
|
||||
}
|
||||
_return_attributes = {};
|
||||
if (_returnvalues == returnvalues::ALL_OLD && previous_item) {
|
||||
// previous_item is supposed to have been created with
|
||||
// describe_item(), so has the "Item" attribute:
|
||||
@@ -1448,7 +1478,7 @@ public:
|
||||
check_needs_read_before_write(_condition_expression) ||
|
||||
_returnvalues == returnvalues::ALL_OLD;
|
||||
}
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) override {
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const override {
|
||||
std::unordered_set<std::string> used_attribute_values;
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
if (!verify_expected(_request, previous_item) ||
|
||||
@@ -1460,6 +1490,7 @@ public:
|
||||
// efficient than throwing an exception.
|
||||
return {};
|
||||
}
|
||||
_return_attributes = {};
|
||||
if (_returnvalues == returnvalues::ALL_OLD && previous_item) {
|
||||
rjson::value* item = rjson::find(*previous_item, "Item");
|
||||
if (item) {
|
||||
@@ -1543,7 +1574,7 @@ public:
|
||||
virtual ~put_or_delete_item_cas_request() = default;
|
||||
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts) override {
|
||||
std::optional<mutation> ret;
|
||||
for (put_or_delete_item& mutation_builder : _mutation_builders) {
|
||||
for (const put_or_delete_item& mutation_builder : _mutation_builders) {
|
||||
// We assume all these builders have the same partition.
|
||||
if (ret) {
|
||||
ret->apply(mutation_builder.build(schema, ts));
|
||||
@@ -2367,7 +2398,7 @@ public:
|
||||
|
||||
update_item_operation(service::storage_proxy& proxy, rjson::value&& request);
|
||||
virtual ~update_item_operation() = default;
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) override;
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const override;
|
||||
bool needs_read_before_write() const;
|
||||
};
|
||||
|
||||
@@ -2431,7 +2462,7 @@ update_item_operation::needs_read_before_write() const {
|
||||
}
|
||||
|
||||
std::optional<mutation>
|
||||
update_item_operation::apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) {
|
||||
update_item_operation::apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const {
|
||||
std::unordered_set<std::string> used_attribute_values;
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
if (!verify_expected(_request, previous_item) ||
|
||||
@@ -2811,6 +2842,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
[] (std::vector<std::tuple<std::string, std::optional<rjson::value>>> responses) {
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::set(response, "Responses", rjson::empty_object());
|
||||
rjson::set(response, "UnprocessedKeys", rjson::empty_object());
|
||||
for (auto& t : responses) {
|
||||
if (!response["Responses"].HasMember(std::get<0>(t).c_str())) {
|
||||
rjson::set_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array());
|
||||
@@ -3080,7 +3112,7 @@ static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_d
|
||||
if (attrs.Size() != 1) {
|
||||
throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs));
|
||||
}
|
||||
bytes raw_value = pk_cdef.type->from_string(attrs[0][type_to_string(pk_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], pk_cdef);
|
||||
partition_key pk = partition_key::from_singular(*schema, pk_cdef.type->deserialize(raw_value));
|
||||
auto decorated_key = dht::decorate_key(*schema, pk);
|
||||
if (op != comparison_operator_type::EQ) {
|
||||
@@ -3105,7 +3137,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (attrs.Size() != expected_attrs_size) {
|
||||
throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs));
|
||||
}
|
||||
bytes raw_value = ck_cdef.type->from_string(attrs[0][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], ck_cdef);
|
||||
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
|
||||
switch (op) {
|
||||
case comparison_operator_type::EQ:
|
||||
@@ -3119,7 +3151,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
case comparison_operator_type::GT:
|
||||
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false));
|
||||
case comparison_operator_type::BETWEEN: {
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(attrs[1][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_upper_limit = get_key_from_typed_value(attrs[1], ck_cdef);
|
||||
clustering_key upper_limit = clustering_key::from_single_value(*schema, raw_upper_limit);
|
||||
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit));
|
||||
}
|
||||
@@ -3132,9 +3164,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (!ck_cdef.type->is_compatible_with(*utf8_type)) {
|
||||
throw api_error("ValidationException", format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type)));
|
||||
}
|
||||
std::string raw_upper_limit_str = attrs[0][type_to_string(ck_cdef.type)].GetString();
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(raw_upper_limit_str);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_upper_limit), ck, schema, ck_cdef.type);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_value), ck, schema, ck_cdef.type);
|
||||
}
|
||||
default:
|
||||
throw api_error("ValidationException", format("Unknown primary key bound passed: {}", int(op)));
|
||||
|
||||
@@ -63,6 +63,10 @@ public:
|
||||
|
||||
static write_isolation get_write_isolation_for_schema(schema_ptr schema);
|
||||
|
||||
static write_isolation default_write_isolation;
|
||||
public:
|
||||
static void set_default_write_isolation(std::string_view mode);
|
||||
|
||||
protected:
|
||||
// The full request JSON
|
||||
rjson::value _request;
|
||||
@@ -83,7 +87,11 @@ protected:
|
||||
// When _returnvalues != NONE, apply() should store here, in JSON form,
|
||||
// the values which are to be returned in the "Attributes" field.
|
||||
// The default null JSON means do not return an Attributes field at all.
|
||||
rjson::value _return_attributes;
|
||||
// This field is marked "mutable" so that the const apply() can modify
|
||||
// it (see explanation below), but note that because apply() may be
|
||||
// called more than once, if apply() will sometimes set this field it
|
||||
// must set it (even if just to the default empty value) every time.
|
||||
mutable rjson::value _return_attributes;
|
||||
public:
|
||||
// The constructor of a rmw_operation subclass should parse the request
|
||||
// and try to discover as many input errors as it can before really
|
||||
@@ -96,7 +104,12 @@ public:
|
||||
// conditional expression, apply() should return an empty optional.
|
||||
// apply() may throw if it encounters input errors not discovered during
|
||||
// the constructor.
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) = 0;
|
||||
// apply() may be called more than once in case of contention, so it must
|
||||
// not change the state saved in the object (issue #7218 was caused by
|
||||
// violating this). We mark apply() "const" to let the compiler validate
|
||||
// this for us. The output-only field _return_attributes is marked
|
||||
// "mutable" above so that apply() can still write to it.
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const = 0;
|
||||
// Convert the above apply() into the signature needed by cas_request:
|
||||
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts) override;
|
||||
virtual ~rmw_operation() = default;
|
||||
|
||||
@@ -54,26 +54,22 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) {
|
||||
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
|
||||
}
|
||||
|
||||
static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
|
||||
std::vector<ss::token_range> res;
|
||||
for (auto d : service::get_local_storage_service().describe_ring(keyspace)) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
r.endpoint_details.push(ed);
|
||||
static ss::token_range token_range_endpoints_to_json(const dht::token_range_endpoints& d) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
res.push_back(r);
|
||||
r.endpoint_details.push(ed);
|
||||
}
|
||||
return res;
|
||||
return r;
|
||||
}
|
||||
|
||||
using ks_cf_func = std::function<future<json::json_return_type>(http_context&, std::unique_ptr<request>, sstring, std::vector<sstring>)>;
|
||||
@@ -192,13 +188,13 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
|
||||
ss::describe_any_ring.set(r, [&ctx](const_req req) {
|
||||
return describe_ring("");
|
||||
ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::describe_ring.set(r, [&ctx](const_req req) {
|
||||
auto keyspace = validate_keyspace(ctx, req.param);
|
||||
return describe_ring(keyspace);
|
||||
ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::get_host_id_map.set(r, [&ctx](const_req req) {
|
||||
@@ -273,8 +269,8 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
for (auto cf : column_families) {
|
||||
column_families_vec.push_back(&db.find_column_family(keyspace, cf));
|
||||
}
|
||||
return parallel_for_each(column_families_vec, [&cm] (column_family* cf) {
|
||||
return cm.perform_cleanup(cf);
|
||||
return parallel_for_each(column_families_vec, [&cm, &db] (column_family* cf) {
|
||||
return cm.perform_cleanup(db, cf);
|
||||
});
|
||||
}).then([]{
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
|
||||
@@ -40,7 +40,8 @@ static dht::token to_token(int64_t value) {
|
||||
}
|
||||
|
||||
static dht::token to_token(bytes_view key) {
|
||||
if (key.empty()) {
|
||||
// Key should be 16 B long, of which first 8 B are used for token calculation
|
||||
if (key.size() != 2*sizeof(int64_t)) {
|
||||
return dht::minimum_token();
|
||||
}
|
||||
return to_token(stream_id::token_from_bytes(key));
|
||||
|
||||
@@ -130,7 +130,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
|
||||
*/
|
||||
future<db_clock::time_point> get_local_streams_timestamp();
|
||||
|
||||
/* Generate a new set of CDC streams and insert it into the distributed cdc_topology_description table.
|
||||
/* Generate a new set of CDC streams and insert it into the distributed cdc_generations table.
|
||||
* Returns the timestamp of this new generation.
|
||||
*
|
||||
* Should be called when starting the node for the first time (i.e., joining the ring).
|
||||
@@ -159,9 +159,9 @@ db_clock::time_point make_new_cdc_generation(
|
||||
std::optional<db_clock::time_point> get_streams_timestamp_for(const gms::inet_address& endpoint, const gms::gossiper&);
|
||||
|
||||
/* Inform CDC users about a generation of streams (identified by the given timestamp)
|
||||
* by inserting it into the cdc_description table.
|
||||
* by inserting it into the cdc_streams table.
|
||||
*
|
||||
* Assumes that the cdc_topology_description table contains this generation.
|
||||
* Assumes that the cdc_generations table contains this generation.
|
||||
*
|
||||
* Returning from this function does not mean that the table update was successful: the function
|
||||
* might run an asynchronous task in the background.
|
||||
|
||||
@@ -1146,7 +1146,7 @@ public:
|
||||
if (r.row().deleted_at()) {
|
||||
touched_parts.set<stats::part_type::ROW_DELETE>();
|
||||
cdc_op = operation::row_delete;
|
||||
if (pirow) {
|
||||
if (pirow && pikey) {
|
||||
for (const column_definition& column: _schema->regular_columns()) {
|
||||
assert(pirow->has(column.name_as_text()));
|
||||
auto& cdef = *_log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
|
||||
|
||||
@@ -140,6 +140,9 @@ public:
|
||||
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
// Returns whether or not interposer consumer is used by a given strategy.
|
||||
bool use_interposer_consumer() const;
|
||||
};
|
||||
|
||||
// Creates a compaction_strategy object from one of the strategies available.
|
||||
|
||||
59
configure.py
59
configure.py
@@ -381,6 +381,7 @@ scylla_tests = set([
|
||||
'test/boost/view_schema_ckey_test',
|
||||
'test/boost/vint_serialization_test',
|
||||
'test/boost/virtual_reader_test',
|
||||
'test/boost/stall_free_test',
|
||||
'test/manual/ec2_snitch_test',
|
||||
'test/manual/gce_snitch_test',
|
||||
'test/manual/gossip',
|
||||
@@ -1265,9 +1266,9 @@ def query_seastar_flags(pc_file, link_static_cxx=False):
|
||||
return cflags, libs
|
||||
|
||||
for mode in build_modes:
|
||||
seastar_cflags, seastar_libs = query_seastar_flags(pc[mode], link_static_cxx=args.staticcxx)
|
||||
modes[mode]['seastar_cflags'] = seastar_cflags
|
||||
modes[mode]['seastar_libs'] = seastar_libs
|
||||
seastar_pc_cflags, seastar_pc_libs = query_seastar_flags(pc[mode], link_static_cxx=args.staticcxx)
|
||||
modes[mode]['seastar_cflags'] = seastar_pc_cflags
|
||||
modes[mode]['seastar_libs'] = seastar_pc_libs
|
||||
|
||||
# We need to use experimental features of the zstd library (to use our own allocators for the (de)compression context),
|
||||
# which are available only when the library is linked statically.
|
||||
@@ -1288,6 +1289,46 @@ def configure_zstd(build_dir, mode):
|
||||
os.makedirs(zstd_build_dir, exist_ok=True)
|
||||
subprocess.check_call(zstd_cmd, shell=False, cwd=zstd_build_dir)
|
||||
|
||||
def configure_abseil(build_dir, mode):
|
||||
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
|
||||
|
||||
abseil_cflags = seastar_cflags + ' ' + modes[mode]['cxx_ld_flags']
|
||||
cmake_mode = MODE_TO_CMAKE_BUILD_TYPE[mode]
|
||||
abseil_cmake_args = [
|
||||
'-DCMAKE_BUILD_TYPE={}'.format(cmake_mode),
|
||||
'-DCMAKE_INSTALL_PREFIX={}'.format(build_dir + '/inst'), # just to avoid a warning from absl
|
||||
'-DCMAKE_C_COMPILER={}'.format(args.cc),
|
||||
'-DCMAKE_CXX_COMPILER={}'.format(args.cxx),
|
||||
'-DCMAKE_CXX_FLAGS_{}={}'.format(cmake_mode.upper(), abseil_cflags),
|
||||
]
|
||||
|
||||
abseil_cmd = ['cmake', '-G', 'Ninja', os.path.relpath('abseil', abseil_build_dir)] + abseil_cmake_args
|
||||
|
||||
os.makedirs(abseil_build_dir, exist_ok=True)
|
||||
subprocess.check_call(abseil_cmd, shell=False, cwd=abseil_build_dir)
|
||||
|
||||
abseil_libs = ['absl/' + lib for lib in [
|
||||
'container/libabsl_hashtablez_sampler.a',
|
||||
'container/libabsl_raw_hash_set.a',
|
||||
'synchronization/libabsl_synchronization.a',
|
||||
'synchronization/libabsl_graphcycles_internal.a',
|
||||
'debugging/libabsl_stacktrace.a',
|
||||
'debugging/libabsl_symbolize.a',
|
||||
'debugging/libabsl_debugging_internal.a',
|
||||
'debugging/libabsl_demangle_internal.a',
|
||||
'time/libabsl_time.a',
|
||||
'time/libabsl_time_zone.a',
|
||||
'numeric/libabsl_int128.a',
|
||||
'hash/libabsl_city.a',
|
||||
'hash/libabsl_hash.a',
|
||||
'base/libabsl_malloc_internal.a',
|
||||
'base/libabsl_spinlock_wait.a',
|
||||
'base/libabsl_base.a',
|
||||
'base/libabsl_dynamic_annotations.a',
|
||||
'base/libabsl_raw_logging_internal.a',
|
||||
'base/libabsl_exponential_biased.a',
|
||||
'base/libabsl_throw_delegate.a']]
|
||||
|
||||
args.user_cflags += " " + pkg_config('jsoncpp', '--cflags')
|
||||
args.user_cflags += ' -march=' + args.target
|
||||
libs = ' '.join([maybe_static(args.staticyamlcpp, '-lyaml-cpp'), '-latomic', '-llz4', '-lz', '-lsnappy', pkg_config('jsoncpp', '--libs'),
|
||||
@@ -1318,6 +1359,7 @@ if any(filter(thrift_version.startswith, thrift_boost_versions)):
|
||||
for pkg in pkgs:
|
||||
args.user_cflags += ' ' + pkg_config(pkg, '--cflags')
|
||||
libs += ' ' + pkg_config(pkg, '--libs')
|
||||
args.user_cflags += '-I abseil'
|
||||
user_cflags = args.user_cflags + ' -fvisibility=hidden'
|
||||
user_ldflags = args.user_ldflags + ' -fvisibility=hidden'
|
||||
if args.staticcxx:
|
||||
@@ -1348,6 +1390,9 @@ else:
|
||||
for mode in build_modes:
|
||||
configure_zstd(outdir, mode)
|
||||
|
||||
for mode in build_modes:
|
||||
configure_abseil(outdir, mode)
|
||||
|
||||
# configure.py may run automatically from an already-existing build.ninja.
|
||||
# If the user interrupts configure.py in the middle, we need build.ninja
|
||||
# to remain in a valid state. So we write our output to a temporary
|
||||
@@ -1485,6 +1530,8 @@ with open(buildfile_tmp, 'w') as f:
|
||||
objs.extend(['$builddir/' + mode + '/' + artifact for artifact in [
|
||||
'libdeflate/libdeflate.a',
|
||||
'zstd/lib/libzstd.a',
|
||||
] + [
|
||||
'abseil/' + x for x in abseil_libs
|
||||
]])
|
||||
objs.append('$builddir/' + mode + '/gen/utils/gz/crc_combine_table.o')
|
||||
if binary in tests:
|
||||
@@ -1638,6 +1685,12 @@ with open(buildfile_tmp, 'w') as f:
|
||||
f.write(' subdir = build/{mode}/zstd\n'.format(**locals()))
|
||||
f.write(' target = libzstd.a\n'.format(**locals()))
|
||||
|
||||
for lib in abseil_libs:
|
||||
f.write('build build/{mode}/abseil/{lib}: ninja\n'.format(**locals()))
|
||||
f.write(' pool = submodule_pool\n')
|
||||
f.write(' subdir = build/{mode}/abseil\n'.format(**locals()))
|
||||
f.write(' target = {lib}\n'.format(**locals()))
|
||||
|
||||
mode = 'dev' if 'dev' in modes else modes[0]
|
||||
f.write('build checkheaders: phony || {}\n'.format(' '.join(['$builddir/{}/{}.o'.format(mode, hh) for hh in headers])))
|
||||
|
||||
|
||||
@@ -267,10 +267,13 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/// The same as `impl_max_function_for' but without knowledge of `Type'.
|
||||
/// The same as `impl_max_function_for' but without compile-time dependency on `Type'.
|
||||
class impl_max_dynamic_function final : public aggregate_function::aggregate {
|
||||
data_type _io_type;
|
||||
opt_bytes _max;
|
||||
public:
|
||||
impl_max_dynamic_function(data_type io_type) : _io_type(std::move(io_type)) {}
|
||||
|
||||
virtual void reset() override {
|
||||
_max = {};
|
||||
}
|
||||
@@ -278,12 +281,11 @@ public:
|
||||
return _max.value_or(bytes{});
|
||||
}
|
||||
virtual void add_input(cql_serialization_format sf, const std::vector<opt_bytes>& values) override {
|
||||
if (!values[0]) {
|
||||
if (values.empty() || !values[0]) {
|
||||
return;
|
||||
}
|
||||
const auto val = *values[0];
|
||||
if (!_max || *_max < val) {
|
||||
_max = val;
|
||||
if (!_max || _io_type->less(*_max, *values[0])) {
|
||||
_max = values[0];
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -298,10 +300,13 @@ public:
|
||||
};
|
||||
|
||||
class max_dynamic_function final : public native_aggregate_function {
|
||||
data_type _io_type;
|
||||
public:
|
||||
max_dynamic_function(data_type io_type) : native_aggregate_function("max", io_type, { io_type }) {}
|
||||
max_dynamic_function(data_type io_type)
|
||||
: native_aggregate_function("max", io_type, { io_type })
|
||||
, _io_type(std::move(io_type)) {}
|
||||
virtual std::unique_ptr<aggregate> new_aggregate() override {
|
||||
return std::make_unique<impl_max_dynamic_function>();
|
||||
return std::make_unique<impl_max_dynamic_function>(_io_type);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -358,10 +363,13 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/// The same as `impl_min_function_for' but without knowledge of `Type'.
|
||||
/// The same as `impl_min_function_for' but without compile-time dependency on `Type'.
|
||||
class impl_min_dynamic_function final : public aggregate_function::aggregate {
|
||||
data_type _io_type;
|
||||
opt_bytes _min;
|
||||
public:
|
||||
impl_min_dynamic_function(data_type io_type) : _io_type(std::move(io_type)) {}
|
||||
|
||||
virtual void reset() override {
|
||||
_min = {};
|
||||
}
|
||||
@@ -369,12 +377,11 @@ public:
|
||||
return _min.value_or(bytes{});
|
||||
}
|
||||
virtual void add_input(cql_serialization_format sf, const std::vector<opt_bytes>& values) override {
|
||||
if (!values[0]) {
|
||||
if (values.empty() || !values[0]) {
|
||||
return;
|
||||
}
|
||||
const auto val = *values[0];
|
||||
if (!_min || val < *_min) {
|
||||
_min = val;
|
||||
if (!_min || _io_type->less(*values[0], *_min)) {
|
||||
_min = values[0];
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -389,10 +396,13 @@ public:
|
||||
};
|
||||
|
||||
class min_dynamic_function final : public native_aggregate_function {
|
||||
data_type _io_type;
|
||||
public:
|
||||
min_dynamic_function(data_type io_type) : native_aggregate_function("min", io_type, { io_type }) {}
|
||||
min_dynamic_function(data_type io_type)
|
||||
: native_aggregate_function("min", io_type, { io_type })
|
||||
, _io_type(std::move(io_type)) {}
|
||||
virtual std::unique_ptr<aggregate> new_aggregate() override {
|
||||
return std::make_unique<impl_min_dynamic_function>();
|
||||
return std::make_unique<impl_min_dynamic_function>(_io_type);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -88,16 +88,13 @@ static data_value castas_fctn_simple(data_value from) {
|
||||
template<typename ToType>
|
||||
static data_value castas_fctn_from_decimal_to_float(data_value from) {
|
||||
auto val_from = value_cast<big_decimal>(from);
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
boost::multiprecision::cpp_rational r = val_from.unscaled_value();
|
||||
r /= boost::multiprecision::pow(ten, val_from.scale());
|
||||
return static_cast<ToType>(r);
|
||||
return static_cast<ToType>(val_from.as_rational());
|
||||
}
|
||||
|
||||
static utils::multiprecision_int from_decimal_to_cppint(const data_value& from) {
|
||||
const auto& val_from = value_cast<big_decimal>(from);
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
return boost::multiprecision::cpp_int(val_from.unscaled_value() / boost::multiprecision::pow(ten, val_from.scale()));
|
||||
auto r = val_from.as_rational();
|
||||
return utils::multiprecision_int(numerator(r)/denominator(r));
|
||||
}
|
||||
|
||||
template<typename ToType>
|
||||
|
||||
@@ -357,7 +357,12 @@ lists::setter_by_uuid::execute(mutation& m, const clustering_key_prefix& prefix,
|
||||
|
||||
collection_mutation_description mut;
|
||||
mut.cells.reserve(1);
|
||||
mut.cells.emplace_back(to_bytes(*index), params.make_cell(*ltype->value_comparator(), *value, atomic_cell::collection_member::yes));
|
||||
|
||||
if (!value) {
|
||||
mut.cells.emplace_back(to_bytes(*index), params.make_dead_cell());
|
||||
} else {
|
||||
mut.cells.emplace_back(to_bytes(*index), params.make_cell(*ltype->value_comparator(), *value, atomic_cell::collection_member::yes));
|
||||
}
|
||||
|
||||
m.set_cell(prefix, column, mut.serialize(*ltype));
|
||||
}
|
||||
|
||||
@@ -417,7 +417,7 @@ std::vector<const column_definition*> statement_restrictions::get_column_defs_fo
|
||||
_clustering_columns_restrictions->num_prefix_columns_that_need_not_be_filtered();
|
||||
for (auto&& cdef : _clustering_columns_restrictions->get_column_defs()) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
if (single_ck_restrs) {
|
||||
auto it = single_ck_restrs->restrictions().find(cdef);
|
||||
if (it != single_ck_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
@@ -688,6 +688,11 @@ static query::range<bytes_view> to_range(const term_slice& slice, const query_op
|
||||
extract_bound(statements::bound::END));
|
||||
}
|
||||
|
||||
static bool contains_without_wraparound(
|
||||
const query::range<bytes_view>& range, bytes_view value, const serialized_tri_compare& cmp) {
|
||||
return !range.is_wrap_around(cmp) && range.contains(value, cmp);
|
||||
}
|
||||
|
||||
bool single_column_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
const partition_key& key,
|
||||
const clustering_key_prefix& ckey,
|
||||
@@ -702,13 +707,13 @@ bool single_column_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
return false;
|
||||
}
|
||||
return cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return to_range(_slice, options, _column_def.name_as_text()).contains(
|
||||
return contains_without_wraparound(to_range(_slice, options, _column_def.name_as_text()),
|
||||
cell_value_bv, _column_def.type->as_tri_comparator());
|
||||
});
|
||||
}
|
||||
|
||||
bool single_column_restriction::slice::is_satisfied_by(bytes_view data, const query_options& options) const {
|
||||
return to_range(_slice, options, _column_def.name_as_text()).contains(
|
||||
return contains_without_wraparound(to_range(_slice, options, _column_def.name_as_text()),
|
||||
data, _column_def.type->underlying_type()->as_tri_comparator());
|
||||
}
|
||||
|
||||
|
||||
@@ -207,6 +207,9 @@ void alter_table_statement::add_column(const schema& schema, const table& cf, sc
|
||||
"because a collection with the same name and a different type has already been used in the past", column_name));
|
||||
}
|
||||
}
|
||||
if (type->is_counter() && !schema.is_counter()) {
|
||||
throw exceptions::configuration_exception(format("Cannot add a counter column ({}) in a non counter column family", column_name));
|
||||
}
|
||||
|
||||
cfm.with_column(column_name.name(), type, is_static ? column_kind::static_column : column_kind::regular_column);
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ batch_statement::batch_statement(int bound_terms, type type_,
|
||||
, _has_conditions(boost::algorithm::any_of(_statements, [] (auto&& s) { return s.statement->has_conditions(); }))
|
||||
, _stats(stats)
|
||||
{
|
||||
validate();
|
||||
if (has_conditions()) {
|
||||
// A batch can be created not only by raw::batch_statement::prepare, but also by
|
||||
// cql_server::connection::process_batch, which doesn't call any methods of
|
||||
@@ -448,7 +449,6 @@ batch_statement::prepare(database& db, cql_stats& stats) {
|
||||
prep_attrs->collect_marker_specification(bound_names);
|
||||
|
||||
cql3::statements::batch_statement batch_statement_(bound_names.size(), _type, std::move(statements), std::move(prep_attrs), stats);
|
||||
batch_statement_.validate();
|
||||
|
||||
std::vector<uint16_t> partition_key_bind_indices;
|
||||
if (!have_multiple_cfs && batch_statement_.get_statements().size() > 0) {
|
||||
|
||||
31
database.cc
31
database.cc
@@ -113,11 +113,11 @@ make_flush_controller(const db::config& cfg, seastar::scheduling_group sg, const
|
||||
|
||||
inline
|
||||
std::unique_ptr<compaction_manager>
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg) {
|
||||
if (cfg.compaction_static_shares() > 0) {
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares());
|
||||
}
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
@@ -161,7 +161,7 @@ void keyspace::remove_user_type(const user_type ut) {
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm)
|
||||
: _stats(make_lw_shared<db_stats>())
|
||||
, _cl_stats(std::make_unique<cell_locker_stats>())
|
||||
, _cfg(cfg)
|
||||
@@ -198,7 +198,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _mutation_query_stage()
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
|
||||
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,
|
||||
@@ -1324,7 +1324,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), m.decorated_key(), slice, trace_state)
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), m.decorated_key(), slice, trace_state, timeout)
|
||||
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
|
||||
// ...now, that we got existing state of all affected counter
|
||||
// cells we can look for our shard in each of them, increment
|
||||
@@ -1823,7 +1823,11 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
assert(low_mark <= rp || rp == db::replay_position());
|
||||
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
|
||||
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
|
||||
// the low_mark assertion does not hold, because we maybe/probably never got around to
|
||||
// creating the sstables that would create them.
|
||||
assert(!should_flush || low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
return truncate_views(cf, truncated_at, should_flush).then([&cf, truncated_at, rp] {
|
||||
// save_truncation_record() may actually fail after we cached the truncation time
|
||||
@@ -2001,9 +2005,10 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
reader_concurrency_semaphore* semaphore;
|
||||
};
|
||||
distributed<database>& _db;
|
||||
utils::UUID _table_id;
|
||||
std::vector<reader_context> _contexts;
|
||||
public:
|
||||
explicit streaming_reader_lifecycle_policy(distributed<database>& db) : _db(db), _contexts(smp::count) {
|
||||
streaming_reader_lifecycle_policy(distributed<database>& db, utils::UUID table_id) : _db(db), _table_id(table_id), _contexts(smp::count) {
|
||||
}
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
@@ -2032,7 +2037,12 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
});
|
||||
}
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
return *_contexts[this_shard_id()].semaphore;
|
||||
const auto shard = this_shard_id();
|
||||
if (!_contexts[shard].semaphore) {
|
||||
auto& cf = _db.local().find_column_family(_table_id);
|
||||
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
|
||||
}
|
||||
return *_contexts[shard].semaphore;
|
||||
}
|
||||
};
|
||||
auto ms = mutation_source([&db] (schema_ptr s,
|
||||
@@ -2043,7 +2053,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), std::move(s), pr, ps, pc,
|
||||
auto table_id = s->id();
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db, table_id), std::move(s), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
});
|
||||
auto&& full_slice = schema->full_slice();
|
||||
|
||||
@@ -903,7 +903,7 @@ public:
|
||||
lw_shared_ptr<const sstable_list> get_sstables_including_compacted_undeleted() const;
|
||||
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const;
|
||||
std::vector<sstables::shared_sstable> select_sstables(const dht::partition_range& range) const;
|
||||
std::vector<sstables::shared_sstable> candidates_for_compaction() const;
|
||||
std::vector<sstables::shared_sstable> non_staging_sstables() const;
|
||||
std::vector<sstables::shared_sstable> sstables_need_rewrite() const;
|
||||
size_t sstables_count() const;
|
||||
std::vector<uint64_t> sstable_count_per_level() const;
|
||||
@@ -1427,7 +1427,7 @@ public:
|
||||
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
|
||||
|
||||
future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm);
|
||||
database(database&&) = delete;
|
||||
~database();
|
||||
|
||||
|
||||
@@ -521,7 +521,7 @@ public:
|
||||
_segment_manager->totals.total_size_on_disk -= size_on_disk();
|
||||
_segment_manager->totals.total_size -= (size_on_disk() + _buffer.size_bytes());
|
||||
_segment_manager->add_file_to_delete(_file_name, _desc);
|
||||
} else {
|
||||
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
|
||||
clogger.warn("Segment {} is dirty and is left on disk.", *this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,6 +137,7 @@ public:
|
||||
|
||||
bool reuse_segments = true;
|
||||
bool use_o_dsync = false;
|
||||
bool warn_about_segments_left_on_disk_after_shutdown = true;
|
||||
|
||||
const db::extensions * extensions = nullptr;
|
||||
};
|
||||
|
||||
@@ -299,7 +299,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
mutation m(cf.schema(), fm.decorated_key(*cf.schema()));
|
||||
converting_mutation_partition_applier v(cm, *cf.schema(), m.partition());
|
||||
fm.partition().accept(cm, v);
|
||||
return do_with(std::move(m), [&db, &cf] (mutation m) {
|
||||
return do_with(std::move(m), [&db, &cf] (const mutation& m) {
|
||||
return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -681,7 +681,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
@@ -736,6 +736,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator")
|
||||
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, redis_port(this, "redis_port", value_status::Used, 0, "Port on which the REDIS transport listens for clients.")
|
||||
, redis_ssl_port(this, "redis_ssl_port", value_status::Used, 0, "Port on which the REDIS TLS native transport listens for clients.")
|
||||
|
||||
@@ -314,6 +314,8 @@ public:
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<sstring> alternator_write_isolation;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
named_value<uint16_t> redis_port;
|
||||
|
||||
@@ -224,7 +224,9 @@ future<> manager::end_point_hints_manager::stop(drain should_drain) noexcept {
|
||||
with_lock(file_update_mutex(), [this] {
|
||||
if (_hints_store_anchor) {
|
||||
hints_store_ptr tmp = std::exchange(_hints_store_anchor, nullptr);
|
||||
return tmp->shutdown().finally([tmp] {});
|
||||
return tmp->shutdown().finally([tmp] {
|
||||
return tmp->release();
|
||||
}).finally([tmp] {});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([&eptr] (auto e) { eptr = std::move(e); }).get();
|
||||
@@ -326,6 +328,10 @@ future<db::commitlog> manager::end_point_hints_manager::add_store() noexcept {
|
||||
// HH doesn't utilize the flow that benefits from reusing segments.
|
||||
// Therefore let's simply disable it to avoid any possible confusion.
|
||||
cfg.reuse_segments = false;
|
||||
// HH leaves segments on disk after commitlog shutdown, and later reads
|
||||
// them when commitlog is re-created. This is expected to happen regularly
|
||||
// during standard HH workload, so no need to print a warning about it.
|
||||
cfg.warn_about_segments_left_on_disk_after_shutdown = false;
|
||||
|
||||
return commitlog::create_commitlog(std::move(cfg)).then([this] (commitlog l) {
|
||||
// add_store() is triggered every time hint files are forcefully flushed to I/O (every hints_flush_period).
|
||||
@@ -352,7 +358,9 @@ future<> manager::end_point_hints_manager::flush_current_hints() noexcept {
|
||||
return futurize_invoke([this] {
|
||||
return with_lock(file_update_mutex(), [this]() -> future<> {
|
||||
return get_or_load().then([] (hints_store_ptr cptr) {
|
||||
return cptr->shutdown();
|
||||
return cptr->shutdown().finally([cptr] {
|
||||
return cptr->release();
|
||||
}).finally([cptr] {});
|
||||
}).then([this] {
|
||||
// Un-hold the commitlog object. Since we are under the exclusive _file_update_mutex lock there are no
|
||||
// other hints_store_ptr copies and this would destroy the commitlog shared value.
|
||||
@@ -703,6 +711,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||
// (last_modification - manager::hints_timer_period) old.
|
||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -725,6 +734,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||
++this->shard_stats().discarded;
|
||||
}
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}).finally([units = std::move(units), ctx_ptr] {});
|
||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||
|
||||
@@ -822,6 +822,14 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_s
|
||||
});
|
||||
}
|
||||
|
||||
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
|
||||
return merge_lock().then([&proxy, &feat] {
|
||||
return update_schema_version_and_announce(proxy, feat.cluster_schema_features());
|
||||
}).finally([] {
|
||||
return merge_unlock();
|
||||
});
|
||||
}
|
||||
|
||||
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
{
|
||||
return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] () mutable {
|
||||
|
||||
@@ -170,6 +170,13 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_s
|
||||
|
||||
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush);
|
||||
|
||||
// Recalculates the local schema version and publishes it in gossip.
|
||||
//
|
||||
// It is safe to call concurrently with recalculate_schema_version() and merge_schema() in which case it
|
||||
// is guaranteed that the schema version we end up with after all calls will reflect the most recent state
|
||||
// of feature_service and schema tables.
|
||||
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat);
|
||||
|
||||
future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after);
|
||||
|
||||
std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
|
||||
|
||||
@@ -72,7 +72,7 @@ schema_ptr view_build_status() {
|
||||
}
|
||||
|
||||
/* An internal table used by nodes to exchange CDC generation data. */
|
||||
schema_ptr cdc_topology_description() {
|
||||
schema_ptr cdc_generations() {
|
||||
thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION);
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION, {id})
|
||||
@@ -108,7 +108,7 @@ schema_ptr cdc_desc() {
|
||||
static std::vector<schema_ptr> all_tables() {
|
||||
return {
|
||||
view_build_status(),
|
||||
cdc_topology_description(),
|
||||
cdc_generations(),
|
||||
cdc_desc(),
|
||||
};
|
||||
}
|
||||
@@ -204,7 +204,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
|
||||
false).discard_result();
|
||||
}
|
||||
|
||||
/* We want to make sure that writes/reads to/from cdc_topology_description and cdc_description
|
||||
/* We want to make sure that writes/reads to/from cdc_generations and cdc_streams
|
||||
* are consistent: a read following an acknowledged write to the same partition should contact
|
||||
* at least one of the replicas that the write contacted.
|
||||
* Normally we would achieve that by always using CL = QUORUM,
|
||||
|
||||
@@ -48,10 +48,10 @@ public:
|
||||
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
|
||||
|
||||
/* Nodes use this table to communicate new CDC stream generations to other nodes. */
|
||||
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_topology_description";
|
||||
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_generations";
|
||||
|
||||
/* This table is used by CDC clients to learn about avaliable CDC streams. */
|
||||
static constexpr auto CDC_DESC = "cdc_description";
|
||||
static constexpr auto CDC_DESC = "cdc_streams";
|
||||
|
||||
/* Information required to modify/query some system_distributed tables, passed from the caller. */
|
||||
struct context {
|
||||
|
||||
@@ -36,19 +36,25 @@ future<> view_update_generator::start() {
|
||||
_pending_sstables.wait().get();
|
||||
}
|
||||
|
||||
// To ensure we don't race with updates, move the entire content
|
||||
// into a local variable.
|
||||
auto sstables_with_tables = std::exchange(_sstables_with_tables, {});
|
||||
|
||||
// If we got here, we will process all tables we know about so far eventually so there
|
||||
// is no starvation
|
||||
for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) {
|
||||
for (auto table_it = sstables_with_tables.begin(); table_it != sstables_with_tables.end(); table_it = sstables_with_tables.erase(table_it)) {
|
||||
auto& [t, sstables] = *table_it;
|
||||
schema_ptr s = t->schema();
|
||||
|
||||
// Copy what we have so far so we don't miss new updates
|
||||
auto sstables = std::exchange(_sstables_with_tables[t], {});
|
||||
vug_logger.trace("Processing {}.{}: {} sstables", s->ks_name(), s->cf_name(), sstables.size());
|
||||
|
||||
const auto num_sstables = sstables.size();
|
||||
|
||||
try {
|
||||
// temporary: need an sstable set for the flat mutation reader, but the
|
||||
// compaction_descriptor takes a vector. Soon this will become a compaction
|
||||
// so the transformation to the SSTable set will not be needed.
|
||||
auto ssts = make_lw_shared(t->get_compaction_strategy().make_sstable_set(s));
|
||||
// Exploit the fact that sstables in the staging directory
|
||||
// are usually non-overlapping and use a partitioned set for
|
||||
// the read.
|
||||
auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, make_lw_shared<sstable_list>(sstable_list{}), false));
|
||||
for (auto& sst : sstables) {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
@@ -81,7 +87,7 @@ future<> view_update_generator::start() {
|
||||
// Move from staging will be retried upon restart.
|
||||
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
|
||||
}
|
||||
_registration_sem.signal();
|
||||
_registration_sem.signal(num_sstables);
|
||||
}
|
||||
// For each table, move the processed staging sstables into the table's base dir.
|
||||
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {
|
||||
|
||||
@@ -32,7 +32,10 @@
|
||||
namespace db::view {
|
||||
|
||||
class view_update_generator {
|
||||
public:
|
||||
static constexpr size_t registration_queue_size = 5;
|
||||
|
||||
private:
|
||||
database& _db;
|
||||
seastar::abort_source _as;
|
||||
future<> _started = make_ready_future<>();
|
||||
@@ -51,6 +54,8 @@ public:
|
||||
future<> start();
|
||||
future<> stop();
|
||||
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<table> table);
|
||||
|
||||
ssize_t available_register_units() const { return _registration_sem.available_units(); }
|
||||
private:
|
||||
bool should_throttle() const;
|
||||
};
|
||||
|
||||
@@ -43,16 +43,29 @@
|
||||
#include "log.hh"
|
||||
#include "db/config.hh"
|
||||
#include "database.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
|
||||
static logging::logger blogger("boot_strapper");
|
||||
|
||||
namespace dht {
|
||||
|
||||
future<> boot_strapper::bootstrap() {
|
||||
future<> boot_strapper::bootstrap(streaming::stream_reason reason) {
|
||||
blogger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens());
|
||||
|
||||
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _abort_source, _tokens, _address, "Bootstrap", streaming::stream_reason::bootstrap);
|
||||
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(gms::get_local_gossiper().get_unreachable_members()));
|
||||
sstring description;
|
||||
if (reason == streaming::stream_reason::bootstrap) {
|
||||
description = "Bootstrap";
|
||||
} else if (reason == streaming::stream_reason::replace) {
|
||||
description = "Replace";
|
||||
} else {
|
||||
return make_exception_future<>(std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap"));
|
||||
}
|
||||
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _abort_source, _tokens, _address, description, reason);
|
||||
auto nodes_to_filter = gms::get_local_gossiper().get_unreachable_members();
|
||||
if (reason == streaming::stream_reason::replace && _db.local().get_replace_address()) {
|
||||
nodes_to_filter.insert(_db.local().get_replace_address().value());
|
||||
}
|
||||
blogger.debug("nodes_to_filter={}", nodes_to_filter);
|
||||
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(nodes_to_filter));
|
||||
auto keyspaces = make_lw_shared<std::vector<sstring>>(_db.local().get_non_system_keyspaces());
|
||||
return do_for_each(*keyspaces, [this, keyspaces, streamer] (sstring& keyspace_name) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <unordered_set>
|
||||
#include "database_fwd.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
|
||||
@@ -66,7 +67,7 @@ public:
|
||||
, _token_metadata(tmd) {
|
||||
}
|
||||
|
||||
future<> bootstrap();
|
||||
future<> bootstrap(streaming::stream_reason reason);
|
||||
|
||||
/**
|
||||
* if initialtoken was specified, use that (split on comma).
|
||||
|
||||
@@ -91,7 +91,16 @@ range_streamer::get_range_fetch_map(const std::unordered_map<dht::token_range, s
|
||||
}
|
||||
|
||||
if (!found_source) {
|
||||
throw std::runtime_error(format("unable to find sufficient sources for streaming range {} in keyspace {}", range_, keyspace));
|
||||
auto& ks = _db.local().find_keyspace(keyspace);
|
||||
auto rf = ks.get_replication_strategy().get_replication_factor();
|
||||
// When a replacing node replaces a dead node with keyspace of RF
|
||||
// 1, it is expected that replacing node could not find a peer node
|
||||
// that contains data to stream from.
|
||||
if (_reason == streaming::stream_reason::replace && rf == 1) {
|
||||
logger.warn("Unable to find sufficient sources to stream range {} for keyspace {} with RF = 1 for replace operation", range_, keyspace);
|
||||
} else {
|
||||
throw std::runtime_error(format("unable to find sufficient sources for streaming range {} in keyspace {}", range_, keyspace));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -146,7 +146,7 @@ private:
|
||||
* here, we always exclude ourselves.
|
||||
* @return
|
||||
*/
|
||||
static std::unordered_map<inet_address, dht::token_range_vector>
|
||||
std::unordered_map<inet_address, dht::token_range_vector>
|
||||
get_range_fetch_map(const std::unordered_map<dht::token_range, std::vector<inet_address>>& ranges_with_sources,
|
||||
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
|
||||
const sstring& keyspace);
|
||||
|
||||
15
dist/common/scripts/scylla-housekeeping
vendored
15
dist/common/scripts/scylla-housekeeping
vendored
@@ -61,7 +61,15 @@ def sh_command(*args):
|
||||
return out
|
||||
|
||||
def get_url(path):
|
||||
return urllib.request.urlopen(path).read().decode('utf-8')
|
||||
# If server returns any error, like 403, or 500 urllib.request throws exception, which is not serializable.
|
||||
# When multiprocessing routines fail to serialize it, it throws ambiguous serialization exception
|
||||
# from get_json_from_url.
|
||||
# In order to see legit error we catch it from the inside of process, covert to string and
|
||||
# pass it as part of return value
|
||||
try:
|
||||
return 0, urllib.request.urlopen(path).read().decode('utf-8')
|
||||
except Exception as exc:
|
||||
return 1, str(exc)
|
||||
|
||||
def get_json_from_url(path):
|
||||
pool = mp.Pool(processes=1)
|
||||
@@ -71,13 +79,16 @@ def get_json_from_url(path):
|
||||
# to enforce a wallclock timeout.
|
||||
result = pool.apply_async(get_url, args=(path,))
|
||||
try:
|
||||
retval = result.get(timeout=5)
|
||||
status, retval = result.get(timeout=5)
|
||||
except mp.TimeoutError as err:
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
raise
|
||||
if status == 1:
|
||||
raise RuntimeError(f'Failed to get "{path}" due to the following error: {retval}')
|
||||
return json.loads(retval)
|
||||
|
||||
|
||||
def get_api(path):
|
||||
return get_json_from_url("http://" + api_address + path)
|
||||
|
||||
|
||||
13
dist/common/scripts/scylla_coredump_setup
vendored
13
dist/common/scripts/scylla_coredump_setup
vendored
@@ -65,8 +65,8 @@ Before=scylla-server.service
|
||||
After=local-fs.target
|
||||
|
||||
[Mount]
|
||||
What=/var/lib/systemd/coredump
|
||||
Where=/var/lib/scylla/coredump
|
||||
What=/var/lib/scylla/coredump
|
||||
Where=/var/lib/systemd/coredump
|
||||
Type=none
|
||||
Options=bind
|
||||
|
||||
@@ -78,6 +78,7 @@ WantedBy=multi-user.target
|
||||
makedirs('/var/lib/scylla/coredump')
|
||||
systemd_unit.reload()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').enable()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').start()
|
||||
if os.path.exists('/usr/lib/sysctl.d/50-coredump.conf'):
|
||||
run('sysctl -p /usr/lib/sysctl.d/50-coredump.conf')
|
||||
else:
|
||||
@@ -99,6 +100,14 @@ WantedBy=multi-user.target
|
||||
try:
|
||||
run('coredumpctl --no-pager --no-legend info {}'.format(pid))
|
||||
print('\nsystemd-coredump is working finely.')
|
||||
|
||||
# get last coredump generated by bash and remove it, ignore inaccessaible ones
|
||||
corefile = out(cmd=r'coredumpctl -1 --no-legend dump 2>&1 | grep "bash" | '
|
||||
r'grep -v "inaccessible" | grep "Storage:\|Coredump:"',
|
||||
shell=True, exception=False)
|
||||
if corefile:
|
||||
corefile = corefile.split()[-1]
|
||||
run('rm -f {}'.format(corefile))
|
||||
except subprocess.CalledProcessError as e:
|
||||
print('Does not able to detect coredump, failed to configure systemd-coredump.')
|
||||
sys.exit(1)
|
||||
|
||||
3
dist/common/scripts/scylla_setup
vendored
3
dist/common/scripts/scylla_setup
vendored
@@ -374,6 +374,9 @@ if __name__ == '__main__':
|
||||
if not stat.S_ISBLK(os.stat(dsk).st_mode):
|
||||
print('{} is not block device'.format(dsk))
|
||||
continue
|
||||
if dsk in selected:
|
||||
print(f'{dsk} is already added')
|
||||
continue
|
||||
selected.append(dsk)
|
||||
devices.remove(dsk)
|
||||
disks = ','.join(selected)
|
||||
|
||||
20
dist/common/scripts/scylla_util.py
vendored
20
dist/common/scripts/scylla_util.py
vendored
@@ -182,7 +182,7 @@ class aws_instance:
|
||||
instance_size = self.instance_size()
|
||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||
return 'ixgbevf'
|
||||
if instance_class in ['c5', 'c5d', 'f1', 'g3', 'h1', 'i3', 'i3en', 'm5', 'm5d', 'p2', 'p3', 'r4', 'x1']:
|
||||
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||
return 'ena'
|
||||
if instance_class == 'm4':
|
||||
if instance_size == '16xlarge':
|
||||
@@ -329,7 +329,7 @@ class scylla_cpuinfo:
|
||||
|
||||
# When a CLI tool is not installed, use relocatable CLI tool provided by Scylla
|
||||
scylla_env = os.environ.copy()
|
||||
scylla_env['PATH'] = '{}:{}'.format(scylla_env['PATH'], scyllabindir())
|
||||
scylla_env['PATH'] = '{}:{}'.format(scyllabindir(), scylla_env['PATH'])
|
||||
|
||||
def run(cmd, shell=False, silent=False, exception=True):
|
||||
stdout = subprocess.DEVNULL if silent else None
|
||||
@@ -441,6 +441,19 @@ def dist_ver():
|
||||
return platform.dist()[1]
|
||||
|
||||
|
||||
SYSTEM_PARTITION_UUIDS = [
|
||||
'21686148-6449-6e6f-744e-656564454649', # BIOS boot partition
|
||||
'c12a7328-f81f-11d2-ba4b-00a0c93ec93b', # EFI system partition
|
||||
'024dee41-33e7-11d3-9d69-0008c781f39f' # MBR partition scheme
|
||||
]
|
||||
|
||||
def get_partition_uuid(dev):
|
||||
return out(f'lsblk -n -oPARTTYPE {dev}')
|
||||
|
||||
def is_system_partition(dev):
|
||||
uuid = get_partition_uuid(dev)
|
||||
return (uuid in SYSTEM_PARTITION_UUIDS)
|
||||
|
||||
def is_unused_disk(dev):
|
||||
# dev is not in /sys/class/block/, like /dev/nvme[0-9]+
|
||||
if not os.path.isdir('/sys/class/block/{dev}'.format(dev=dev.replace('/dev/', ''))):
|
||||
@@ -448,7 +461,8 @@ def is_unused_disk(dev):
|
||||
try:
|
||||
fd = os.open(dev, os.O_EXCL)
|
||||
os.close(fd)
|
||||
return True
|
||||
# dev is not reserved for system
|
||||
return not is_system_partition(dev)
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
1
dist/debian/control.mustache
vendored
1
dist/debian/control.mustache
vendored
@@ -5,6 +5,7 @@ Section: database
|
||||
Priority: optional
|
||||
X-Python3-Version: >= 3.4
|
||||
Standards-Version: 3.9.5
|
||||
Rules-Requires-Root: no
|
||||
|
||||
Package: {{product}}-conf
|
||||
Architecture: any
|
||||
|
||||
1
dist/debian/python3/control.mustache
vendored
1
dist/debian/python3/control.mustache
vendored
@@ -5,6 +5,7 @@ Section: python
|
||||
Priority: optional
|
||||
X-Python3-Version: >= 3.4
|
||||
Standards-Version: 3.9.5
|
||||
Rules-Requires-Root: no
|
||||
|
||||
Package: {{product}}-python3
|
||||
Architecture: amd64
|
||||
|
||||
1
dist/debian/rules.mustache
vendored
1
dist/debian/rules.mustache
vendored
@@ -37,6 +37,7 @@ override_dh_strip:
|
||||
# The binaries (ethtool...patchelf) don't pass dh_strip after going through patchelf. Since they are
|
||||
# already stripped, nothing is lost if we exclude them, so that's what we do.
|
||||
dh_strip -Xlibprotobuf.so.15 -Xld.so -Xethtool -Xgawk -Xgzip -Xhwloc-calc -Xhwloc-distrib -Xifconfig -Xlscpu -Xnetstat -Xpatchelf --dbg-package={{product}}-server-dbg
|
||||
find $(CURDIR)/debian/{{product}}-server-dbg/usr/lib/debug/.build-id/ -name "*.debug" -exec objcopy --decompress-debug-sections {} \;
|
||||
|
||||
override_dh_makeshlibs:
|
||||
|
||||
|
||||
2
dist/docker/redhat/commandlineparser.py
vendored
2
dist/docker/redhat/commandlineparser.py
vendored
@@ -18,6 +18,8 @@ def parse():
|
||||
parser.add_argument('--api-address', default=None, dest='apiAddress')
|
||||
parser.add_argument('--alternator-address', default=None, dest='alternatorAddress', help="Alternator API address to listen to. Defaults to listen address.")
|
||||
parser.add_argument('--alternator-port', default=None, dest='alternatorPort', help="Alternator API port to listen to. Disabled by default.")
|
||||
parser.add_argument('--alternator-https-port', default=None, dest='alternatorHttpsPort', help="Alternator API TLS port to listen to. Disabled by default.")
|
||||
parser.add_argument('--alternator-write-isolation', default=None, dest='alternatorWriteIsolation', help="Alternator default write isolation policy.")
|
||||
parser.add_argument('--disable-version-check', default=False, action='store_true', dest='disable_housekeeping', help="Disable version check")
|
||||
parser.add_argument('--authenticator', default=None, dest='authenticator', help="Set authenticator class")
|
||||
parser.add_argument('--authorizer', default=None, dest='authorizer', help="Set authorizer class")
|
||||
|
||||
9
dist/docker/redhat/scyllasetup.py
vendored
9
dist/docker/redhat/scyllasetup.py
vendored
@@ -16,6 +16,8 @@ class ScyllaSetup:
|
||||
self._broadcastRpcAddress = arguments.broadcastRpcAddress
|
||||
self._apiAddress = arguments.apiAddress
|
||||
self._alternatorPort = arguments.alternatorPort
|
||||
self._alternatorHttpsPort = arguments.alternatorHttpsPort
|
||||
self._alternatorWriteIsolation = arguments.alternatorWriteIsolation
|
||||
self._smp = arguments.smp
|
||||
self._memory = arguments.memory
|
||||
self._overprovisioned = arguments.overprovisioned
|
||||
@@ -116,6 +118,13 @@ class ScyllaSetup:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-port %s" % self._alternatorPort]
|
||||
|
||||
if self._alternatorHttpsPort is not None:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-https-port %s" % self._alternatorHttpsPort]
|
||||
|
||||
if self._alternatorWriteIsolation is not None:
|
||||
args += ["--alternator-write-isolation %s" % self._alternatorWriteIsolation]
|
||||
|
||||
if self._authenticator is not None:
|
||||
args += ["--authenticator %s" % self._authenticator]
|
||||
|
||||
|
||||
@@ -25,6 +25,14 @@ By default, Scylla listens on this port on all network interfaces.
|
||||
To listen only on a specific interface, pass also an "`alternator-address`"
|
||||
option.
|
||||
|
||||
As we explain below in the "Write isolation policies", Alternator has
|
||||
four different choices for the implementation of writes, each with
|
||||
different advantages. You should consider which of the options makes
|
||||
more sense for your intended use case, and use the "`--alternator-write-isolation`"
|
||||
option to choose one. There is currently no default for this option: Trying
|
||||
to run Scylla with Alternator enabled without passing this option will
|
||||
result in an error asking you to set it.
|
||||
|
||||
DynamoDB clients usually specify a single "endpoint" address, e.g.,
|
||||
`dynamodb.us-east-1.amazonaws.com`, and a DNS server hosted on that address
|
||||
distributes the connections to many different backend nodes. Alternator
|
||||
@@ -108,12 +116,15 @@ implemented, with the following limitations:
|
||||
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
|
||||
or LOCAL_QUORUM (strong consistency).
|
||||
### Global Tables
|
||||
* Currently, *all* Alternator tables are created as "Global Tables", i.e., can
|
||||
be accessed from all of Scylla's DCs.
|
||||
* We do not yet support the DynamoDB API calls to make some of the tables
|
||||
global and others local to a particular DC: CreateGlobalTable,
|
||||
UpdateGlobalTable, DescribeGlobalTable, ListGlobalTables,
|
||||
UpdateGlobalTableSettings, DescribeGlobalTableSettings, and UpdateTable.
|
||||
* Currently, *all* Alternator tables are created as "global" tables and can
|
||||
be accessed from all the DCs existing at the time of the table's creation.
|
||||
If a DC is added after a table is created, the table won't be visible from
|
||||
the new DC and changing that requires a CQL "ALTER TABLE" statement to
|
||||
modify the table's replication strategy.
|
||||
* We do not yet support the DynamoDB API calls that control which table is
|
||||
visible from what DC: CreateGlobalTable, UpdateGlobalTable,
|
||||
DescribeGlobalTable, ListGlobalTables, UpdateGlobalTableSettings,
|
||||
DescribeGlobalTableSettings, and UpdateTable.
|
||||
### Backup and Restore
|
||||
* On-demand backup: the DynamoDB APIs are not yet supported: CreateBackup,
|
||||
DescribeBackup, DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||
@@ -153,23 +164,28 @@ implemented, with the following limitations:
|
||||
|
||||
### Write isolation policies
|
||||
DynamoDB API update requests may involve a read before the write - e.g., a
|
||||
_conditional_ update, or an update based on the old value of an attribute.
|
||||
_conditional_ update or an update based on the old value of an attribute.
|
||||
The read and the write should be treated as a single transaction - protected
|
||||
(_isolated_) from other parallel writes to the same item.
|
||||
|
||||
By default, Alternator does this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation. However, LWT significantly slows
|
||||
writes down, so Alternator supports three additional _write isolation
|
||||
policies_, which can be chosen on a per-table basis and may make sense for
|
||||
certain workloads as explained below.
|
||||
Alternator could do this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation, but this significantly slows
|
||||
down writes, and not necessary for workloads which don't use read-modify-write
|
||||
(RMW) updates.
|
||||
|
||||
The write isolation policy of a table is configured by tagging the table (at
|
||||
CreateTable time, or any time later with TagResource) with the key
|
||||
So Alternator supports four _write isolation policies_, which can be chosen
|
||||
on a per-table basis and may make sense for certain workloads as explained
|
||||
below.
|
||||
|
||||
A default write isolation policy **must** be chosen using the
|
||||
`--alternator-write-isolation` configuration option. Additionally, the write
|
||||
isolation policy for a specific table can be overriden by tagging the table
|
||||
(at CreateTable time, or any time later with TagResource) with the key
|
||||
`system:write_isolation`, and one of the following values:
|
||||
|
||||
* `a`, `always`, or `always_use_lwt` - This is the default choice.
|
||||
It performs every write operation - even those that do not need a read
|
||||
before the write - as a lightweight transaction.
|
||||
* `a`, `always`, or `always_use_lwt` - This mode performs every write
|
||||
operation - even those that do not need a read before the write - as a
|
||||
lightweight transaction.
|
||||
|
||||
This is the slowest choice, but also the only choice guaranteed to work
|
||||
correctly for every workload.
|
||||
|
||||
@@ -10,10 +10,16 @@ This section will guide you through the steps for setting up the cluster:
|
||||
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
|
||||
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
|
||||
add to every "docker run" command: `-p 8000:8000` before the image name
|
||||
and `--alternator-port=8000` at the end. The "alternator-port" option
|
||||
specifies on which port Scylla will listen for the (unencrypted) DynamoDB API.
|
||||
and `--alternator-port=8000 --alternator-write-isolation=always` at the end.
|
||||
The "alternator-port" option specifies on which port Scylla will listen for
|
||||
the (unencrypted) DynamoDB API, and the "alternator-write-isolation" chooses
|
||||
whether or not Alternator will use LWT for every write.
|
||||
For example,
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000 --alternator-write-isolation=always
|
||||
The `--alternator-https-port=...` option can also be used to enable
|
||||
Alternator on an encrypted (HTTPS) port. Note that in this case, the files
|
||||
`/etc/scylla/scylla.crt` and `/etc/scylla/scylla.key` must be inserted into
|
||||
the image, containing the SSL certificate and key to use.
|
||||
|
||||
## Testing Scylla's DynamoDB API support:
|
||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||
|
||||
12
docs/cdc.md
12
docs/cdc.md
@@ -92,7 +92,7 @@ Shard-colocation is an optimization.
|
||||
|
||||
Having different generations operating at different points in time is necessary to maintain colocation in presence of topology changes. When a new node joins the cluster it modifies the token ring by refining existing vnodes into smaller vnodes. But before it does it, it will introduce a new CDC generation whose token ranges refine those new (smaller) vnodes (which means they also refine the old vnodes; that way writes will be colocated on both old and new replicas).
|
||||
|
||||
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::generate_topology_description` function. It then inserts the generation description into an internal distributed table `cdc_topology_description` in the `system_distributed` keyspace. The table is defined as follows (from db/system_distributed_keyspace.cc):
|
||||
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::generate_topology_description` function. It then inserts the generation description into an internal distributed table `cdc_generations` in the `system_distributed` keyspace. The table is defined as follows (from db/system_distributed_keyspace.cc):
|
||||
```
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION, {id})
|
||||
/* The timestamp of this CDC generation. */
|
||||
@@ -131,11 +131,11 @@ Next, the node starts gossiping the timestamp of the new generation together wit
|
||||
}).get();
|
||||
```
|
||||
|
||||
When other nodes learn about the generation, they'll extract it from the `cdc_topology_description` table and save it using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
||||
When other nodes learn about the generation, they'll extract it from the `cdc_generations` table and save it using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
||||
Notice that nodes learn about the generation together with the new node's tokens. When they learn about its tokens they'll immediately start sending writes to the new node (in the case of bootstrapping, it will become a pending replica). But the old generation will still be operating for a minute or two. Thus colocation will be lost for a while. This problem will be fixed when the two-phase-commit approach is implemented.
|
||||
|
||||
We're not able to prevent a node learning about a new generation too late due to a network partition: if gossip doesn't reach the node in time, some writes might be sent to the wrong (old) generation.
|
||||
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_topology_description`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (just the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
||||
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_generations`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (just the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
||||
Thus we give up availability for safety. This likely won't happen if the administrator ensures that the cluster is not partitioned before bootstrapping a new node. This problem will also be mitigated with a future patch.
|
||||
|
||||
Due to the need of maintaining colocation we don't allow the client to send writes with arbitrary timestamps.
|
||||
@@ -144,7 +144,7 @@ Reason: we cannot allow writes before `T`, because they belong to the old genera
|
||||
|
||||
### Streams description table
|
||||
|
||||
The `cdc_description` table in the `system_distributed` keyspace allows CDC clients to learn about available sets of streams and the time intervals they are operating at. It's definition is as follows (db/system_distributed_keyspace.cc):
|
||||
The `cdc_streams` table in the `system_distributed` keyspace allows CDC clients to learn about available sets of streams and the time intervals they are operating at. It's definition is as follows (db/system_distributed_keyspace.cc):
|
||||
```
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_DESC, {id})
|
||||
/* The timestamp of this CDC generation. */
|
||||
@@ -161,9 +161,9 @@ where
|
||||
thread_local data_type cdc_stream_tuple_type = tuple_type_impl::get_instance({long_type, long_type});
|
||||
thread_local data_type cdc_streams_set_type = set_type_impl::get_instance(cdc_stream_tuple_type, false);
|
||||
```
|
||||
This table simply contains each generation's timestamp (as partition key) and the set of stream IDs used by this generation. It is meant to be user-facing, in contrast to `cdc_topology_description` which is used internally.
|
||||
This table simply contains each generation's timestamp (as partition key) and the set of stream IDs used by this generation. It is meant to be user-facing, in contrast to `cdc_generations` which is used internally.
|
||||
|
||||
When nodes learn about a CDC generation through gossip, they race to update the description table by inserting a proper row (see `cdc::update_streams_description`). This operation is idempotent so it doesn't matter if multiple nodes do it at the same time.
|
||||
|
||||
#### TODO: expired generations
|
||||
The `expired` column in `cdc_description` and `cdc_topology_description` means that this generation was superseded by some new generation and will soon be removed (its table entry will be gone). This functionality is yet to be implemented.
|
||||
The `expired` column in `cdc_streams` and `cdc_generations` means that this generation was superseded by some new generation and will soon be removed (its table entry will be gone). This functionality is yet to be implemented.
|
||||
|
||||
@@ -163,6 +163,20 @@ $ docker run --name some-scylla -d scylladb/scylla --alternator-port 8000
|
||||
|
||||
**Since: 3.2**
|
||||
|
||||
### `--alternator-https-port PORT`
|
||||
|
||||
The `--alternator-https-port` option is similar to `--alternator-port`, just enables an encrypted (HTTPS) port. Either the `--alternator-https-port` or `--alternator-http-port`, or both, can be used to enable Alternator.
|
||||
|
||||
Note that the `--alternator-https-port` option also requires that files `/etc/scylla/scylla.crt` and `/etc/scylla/scylla.key` be inserted into the image. These files contain an SSL certificate and key, respectively.
|
||||
|
||||
**Since: 4.2**
|
||||
|
||||
### `--alternator-write-isolation policy`
|
||||
|
||||
The `--alternator-write-isolation` command line option chooses between four allowed write isolation policies described in docs/alternator/alternator.md. This option must be specified if Alternator is enabled - it does not have a default.
|
||||
|
||||
**Since: 4.1**
|
||||
|
||||
### `--broadcast-address ADDR`
|
||||
|
||||
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.
|
||||
|
||||
@@ -428,6 +428,7 @@ future<> gossiper::handle_shutdown_msg(inet_address from) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return seastar::async([this, from] {
|
||||
auto permit = this->lock_endpoint(from).get0();
|
||||
this->mark_as_shutdown(from);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -175,6 +175,7 @@ public:
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::HIBERNATE,
|
||||
versioned_value::STATUS_BOOTSTRAPPING,
|
||||
versioned_value::STATUS_UNKNOWN,
|
||||
};
|
||||
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
||||
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
||||
|
||||
@@ -49,6 +49,7 @@ enum class stream_reason : uint8_t {
|
||||
removenode,
|
||||
rebuild,
|
||||
repair,
|
||||
replace,
|
||||
};
|
||||
|
||||
enum class stream_mutation_fragments_cmd : uint8_t {
|
||||
|
||||
203
licenses/abseil-license.txt
Normal file
203
licenses/abseil-license.txt
Normal file
@@ -0,0 +1,203 @@
|
||||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
https://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
@@ -168,15 +168,33 @@ insert_token_range_to_sorted_container_while_unwrapping(
|
||||
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::get_ranges(inet_address ep) const {
|
||||
return get_ranges(ep, _token_metadata);
|
||||
return do_get_ranges(ep, _token_metadata, false);
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::get_ranges_in_thread(inet_address ep) const {
|
||||
return do_get_ranges(ep, _token_metadata, true);
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::get_ranges(inet_address ep, token_metadata& tm) const {
|
||||
return do_get_ranges(ep, tm, false);
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::get_ranges_in_thread(inet_address ep, token_metadata& tm) const {
|
||||
return do_get_ranges(ep, tm, true);
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const {
|
||||
dht::token_range_vector ret;
|
||||
auto prev_tok = tm.sorted_tokens().back();
|
||||
for (auto tok : tm.sorted_tokens()) {
|
||||
for (inet_address a : calculate_natural_endpoints(tok, tm)) {
|
||||
if (can_yield) {
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
if (a == ep) {
|
||||
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
|
||||
break;
|
||||
|
||||
@@ -113,10 +113,15 @@ public:
|
||||
// It the analogue of Origin's getAddressRanges().get(endpoint).
|
||||
// This function is not efficient, and not meant for the fast path.
|
||||
dht::token_range_vector get_ranges(inet_address ep) const;
|
||||
dht::token_range_vector get_ranges_in_thread(inet_address ep) const;
|
||||
|
||||
// Use the token_metadata provided by the caller instead of _token_metadata
|
||||
dht::token_range_vector get_ranges(inet_address ep, token_metadata& tm) const;
|
||||
dht::token_range_vector get_ranges_in_thread(inet_address ep, token_metadata& tm) const;
|
||||
private:
|
||||
dht::token_range_vector do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const;
|
||||
|
||||
public:
|
||||
// get_primary_ranges() returns the list of "primary ranges" for the given
|
||||
// endpoint. "Primary ranges" are the ranges that the node is responsible
|
||||
// for storing replica primarily, which means this is the first node
|
||||
|
||||
10
lua.cc
10
lua.cc
@@ -264,14 +264,12 @@ static auto visit_lua_raw_value(lua_State* l, int index, Func&& f) {
|
||||
|
||||
template <typename Func>
|
||||
static auto visit_decimal(const big_decimal &v, Func&& f) {
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
const auto& dividend = v.unscaled_value();
|
||||
auto divisor = boost::multiprecision::pow(ten, v.scale());
|
||||
boost::multiprecision::cpp_rational r = v.as_rational();
|
||||
const boost::multiprecision::cpp_int& dividend = numerator(r);
|
||||
const boost::multiprecision::cpp_int& divisor = denominator(r);
|
||||
if (dividend % divisor == 0) {
|
||||
return f(utils::multiprecision_int(boost::multiprecision::cpp_int(dividend/divisor)));
|
||||
return f(utils::multiprecision_int(dividend/divisor));
|
||||
}
|
||||
boost::multiprecision::cpp_rational r = dividend;
|
||||
r /= divisor;
|
||||
return f(r.convert_to<double>());
|
||||
}
|
||||
|
||||
|
||||
23
main.cc
23
main.cc
@@ -78,6 +78,7 @@
|
||||
#include "cdc/log.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "alternator/tags_extension.hh"
|
||||
#include "alternator/rmw_operation.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@@ -736,7 +737,7 @@ int main(int ac, char** av) {
|
||||
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
|
||||
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
start_large_data_handler(db).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -948,12 +949,16 @@ int main(int ac, char** av) {
|
||||
mm.init_messaging_service();
|
||||
}).get();
|
||||
supervisor::notify("initializing storage proxy RPC verbs");
|
||||
proxy.invoke_on_all([] (service::storage_proxy& p) {
|
||||
p.init_messaging_service();
|
||||
}).get();
|
||||
proxy.invoke_on_all(&service::storage_proxy::init_messaging_service).get();
|
||||
auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] {
|
||||
proxy.invoke_on_all(&service::storage_proxy::uninit_messaging_service).get();
|
||||
});
|
||||
|
||||
supervisor::notify("starting streaming service");
|
||||
streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator).get();
|
||||
auto stop_streaming_service = defer_verbose_shutdown("streaming service", [] {
|
||||
streaming::stream_session::uninit_streaming_service().get();
|
||||
});
|
||||
api::set_server_stream_manager(ctx).get();
|
||||
|
||||
supervisor::notify("starting hinted handoff manager");
|
||||
@@ -986,6 +991,9 @@ int main(int ac, char** av) {
|
||||
rs.stop().get();
|
||||
});
|
||||
repair_init_messaging_service_handler(rs, sys_dist_ks, view_update_generator).get();
|
||||
auto stop_repair_messages = defer_verbose_shutdown("repair message handlers", [] {
|
||||
repair_uninit_messaging_service_handler().get();
|
||||
});
|
||||
supervisor::notify("starting storage service", true);
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.init_messaging_service_part().get();
|
||||
@@ -1081,6 +1089,7 @@ int main(int ac, char** av) {
|
||||
}
|
||||
|
||||
if (cfg->alternator_port() || cfg->alternator_https_port()) {
|
||||
alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation());
|
||||
static sharded<alternator::executor> alternator_executor;
|
||||
static sharded<alternator::server> alternator_server;
|
||||
|
||||
@@ -1186,6 +1195,12 @@ int main(int ac, char** av) {
|
||||
}
|
||||
});
|
||||
|
||||
auto stop_compaction_manager = defer_verbose_shutdown("compaction manager", [&db] {
|
||||
db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
});
|
||||
|
||||
auto stop_redis_service = defer_verbose_shutdown("redis service", [&cfg] {
|
||||
if (cfg->redis_port() || cfg->redis_ssl_port()) {
|
||||
redis.stop().get();
|
||||
|
||||
@@ -731,6 +731,10 @@ void messaging_service::register_stream_mutation_fragments(std::function<future<
|
||||
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
|
||||
}
|
||||
|
||||
future<> messaging_service::unregister_stream_mutation_fragments() {
|
||||
return unregister_handler(messaging_verb::STREAM_MUTATION_FRAGMENTS);
|
||||
}
|
||||
|
||||
template<class SinkType, class SourceType>
|
||||
future<rpc::sink<SinkType>, rpc::source<SourceType>>
|
||||
do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr<messaging_service::rpc_protocol_client_wrapper> rpc_client, std::unique_ptr<messaging_service::rpc_protocol_wrapper>& rpc) {
|
||||
@@ -762,6 +766,9 @@ rpc::sink<repair_row_on_wire_with_cmd> messaging_service::make_sink_for_repair_g
|
||||
void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_row_on_wire_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_hash_with_cmd> source)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_row_diff_with_rpc_stream() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>
|
||||
@@ -781,6 +788,9 @@ rpc::sink<repair_stream_cmd> messaging_service::make_sink_for_repair_put_row_dif
|
||||
void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_stream_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_put_row_diff_with_rpc_stream() {
|
||||
return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>
|
||||
@@ -800,6 +810,9 @@ rpc::sink<repair_hash_with_cmd> messaging_service::make_sink_for_repair_get_full
|
||||
void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_stream() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM);
|
||||
}
|
||||
|
||||
// Send a message for verb
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
@@ -883,6 +896,9 @@ future<streaming::prepare_message> messaging_service::send_prepare_message(msg_a
|
||||
return send_message<streaming::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, id,
|
||||
std::move(msg), plan_id, std::move(description), reason);
|
||||
}
|
||||
future<> messaging_service::unregister_prepare_message() {
|
||||
return unregister_handler(messaging_verb::PREPARE_MESSAGE);
|
||||
}
|
||||
|
||||
// PREPARE_DONE_MESSAGE
|
||||
void messaging_service::register_prepare_done_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func) {
|
||||
@@ -892,6 +908,9 @@ future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id,
|
||||
return send_message<void>(this, messaging_verb::PREPARE_DONE_MESSAGE, id,
|
||||
plan_id, dst_cpu_id);
|
||||
}
|
||||
future<> messaging_service::unregister_prepare_done_message() {
|
||||
return unregister_handler(messaging_verb::PREPARE_DONE_MESSAGE);
|
||||
}
|
||||
|
||||
// STREAM_MUTATION
|
||||
void messaging_service::register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool> fragmented, rpc::optional<streaming::stream_reason> reason)>&& func) {
|
||||
@@ -916,6 +935,9 @@ future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id,
|
||||
return send_message<void>(this, messaging_verb::STREAM_MUTATION_DONE, id,
|
||||
plan_id, std::move(ranges), cf_id, dst_cpu_id);
|
||||
}
|
||||
future<> messaging_service::unregister_stream_mutation_done() {
|
||||
return unregister_handler(messaging_verb::STREAM_MUTATION_DONE);
|
||||
}
|
||||
|
||||
// COMPLETE_MESSAGE
|
||||
void messaging_service::register_complete_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id, rpc::optional<bool> failed)>&& func) {
|
||||
@@ -925,6 +947,9 @@ future<> messaging_service::send_complete_message(msg_addr id, UUID plan_id, uns
|
||||
return send_message<void>(this, messaging_verb::COMPLETE_MESSAGE, id,
|
||||
plan_id, dst_cpu_id, failed);
|
||||
}
|
||||
future<> messaging_service::unregister_complete_message() {
|
||||
return unregister_handler(messaging_verb::COMPLETE_MESSAGE);
|
||||
}
|
||||
|
||||
void messaging_service::register_gossip_echo(std::function<future<> ()>&& func) {
|
||||
register_handler(this, messaging_verb::GOSSIP_ECHO, std::move(func));
|
||||
@@ -1139,14 +1164,14 @@ future<partition_checksum> messaging_service::send_repair_checksum_range(
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void messaging_service::register_repair_get_full_row_hashes(std::function<future<std::unordered_set<repair_hash>> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
|
||||
void messaging_service::register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_full_row_hashes() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES);
|
||||
}
|
||||
future<std::unordered_set<repair_hash>> messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id) {
|
||||
return send_message<future<std::unordered_set<repair_hash>>>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id);
|
||||
future<repair_hash_set> messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id) {
|
||||
return send_message<future<repair_hash_set>>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
@@ -1171,13 +1196,13 @@ future<get_sync_boundary_response> messaging_service::send_repair_get_sync_bound
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void messaging_service::register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows)>&& func) {
|
||||
void messaging_service::register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_row_diff() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF);
|
||||
}
|
||||
future<repair_rows_on_wire> messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
|
||||
future<repair_rows_on_wire> messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows) {
|
||||
return send_message<future<repair_rows_on_wire>>(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(id), repair_meta_id, std::move(set_diff), needs_all_rows);
|
||||
}
|
||||
|
||||
|
||||
@@ -276,10 +276,12 @@ public:
|
||||
streaming::prepare_message msg, UUID plan_id, sstring description, rpc::optional<streaming::stream_reason> reason)>&& func);
|
||||
future<streaming::prepare_message> send_prepare_message(msg_addr id, streaming::prepare_message msg, UUID plan_id,
|
||||
sstring description, streaming::stream_reason);
|
||||
future<> unregister_prepare_message();
|
||||
|
||||
// Wrapper for PREPARE_DONE_MESSAGE verb
|
||||
void register_prepare_done_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func);
|
||||
future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id);
|
||||
future<> unregister_prepare_done_message();
|
||||
|
||||
// Wrapper for STREAM_MUTATION verb
|
||||
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool>, rpc::optional<streaming::stream_reason>)>&& func);
|
||||
@@ -288,6 +290,7 @@ public:
|
||||
// Wrapper for STREAM_MUTATION_FRAGMENTS
|
||||
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use.
|
||||
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
|
||||
future<> unregister_stream_mutation_fragments();
|
||||
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source);
|
||||
future<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
|
||||
|
||||
@@ -295,22 +298,27 @@ public:
|
||||
future<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);
|
||||
rpc::sink<repair_row_on_wire_with_cmd> make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source<repair_hash_with_cmd>& source);
|
||||
void register_repair_get_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_row_on_wire_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_hash_with_cmd> source)>&& func);
|
||||
future<> unregister_repair_get_row_diff_with_rpc_stream();
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>> make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);
|
||||
rpc::sink<repair_stream_cmd> make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source<repair_row_on_wire_with_cmd>& source);
|
||||
void register_repair_put_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_stream_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source)>&& func);
|
||||
future<> unregister_repair_put_row_diff_with_rpc_stream();
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM
|
||||
future<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>> make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);
|
||||
rpc::sink<repair_hash_with_cmd> make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source<repair_stream_cmd>& source);
|
||||
void register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source)>&& func);
|
||||
future<> unregister_repair_get_full_row_hashes_with_rpc_stream();
|
||||
|
||||
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
|
||||
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id);
|
||||
future<> unregister_stream_mutation_done();
|
||||
|
||||
void register_complete_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id, rpc::optional<bool> failed)>&& func);
|
||||
future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id, bool failed = false);
|
||||
future<> unregister_complete_message();
|
||||
|
||||
// Wrapper for REPAIR_CHECKSUM_RANGE verb
|
||||
void register_repair_checksum_range(std::function<future<partition_checksum> (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version)>&& func);
|
||||
@@ -318,9 +326,9 @@ public:
|
||||
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version);
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void register_repair_get_full_row_hashes(std::function<future<std::unordered_set<repair_hash>> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
|
||||
void register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
|
||||
future<> unregister_repair_get_full_row_hashes();
|
||||
future<std::unordered_set<repair_hash>> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id);
|
||||
future<repair_hash_set> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
void register_repair_get_combined_row_hash(std::function<future<get_combined_row_hash_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary)>&& func);
|
||||
@@ -333,9 +341,9 @@ public:
|
||||
future<get_sync_boundary_response> send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary);
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows)>&& func);
|
||||
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows)>&& func);
|
||||
future<> unregister_repair_get_row_diff();
|
||||
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows);
|
||||
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows);
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF
|
||||
void register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff)>&& func);
|
||||
|
||||
@@ -195,6 +195,7 @@ class read_context : public reader_lifecycle_policy {
|
||||
|
||||
// One for each shard. Index is shard id.
|
||||
std::vector<reader_meta> _readers;
|
||||
std::vector<reader_concurrency_semaphore*> _semaphores;
|
||||
|
||||
gate _dismantling_gate;
|
||||
|
||||
@@ -211,7 +212,8 @@ public:
|
||||
, _schema(std::move(s))
|
||||
, _cmd(cmd)
|
||||
, _ranges(ranges)
|
||||
, _trace_state(std::move(trace_state)) {
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _semaphores(smp::count, nullptr) {
|
||||
_readers.resize(smp::count);
|
||||
}
|
||||
|
||||
@@ -236,7 +238,12 @@ public:
|
||||
virtual void destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override;
|
||||
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
return _readers[this_shard_id()].rparts->semaphore;
|
||||
const auto shard = this_shard_id();
|
||||
if (!_semaphores[shard]) {
|
||||
auto& table = _db.local().find_column_family(_schema);
|
||||
_semaphores[shard] = &table.read_concurrency_semaphore();
|
||||
}
|
||||
return *_semaphores[shard];
|
||||
}
|
||||
|
||||
future<> lookup_readers();
|
||||
|
||||
@@ -2505,7 +2505,8 @@ mutation_partition::fully_discontinuous(const schema& s, const position_range& r
|
||||
future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& source,
|
||||
const dht::decorated_key& dk,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr)
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout)
|
||||
{
|
||||
struct range_and_reader {
|
||||
dht::partition_range range;
|
||||
@@ -2530,7 +2531,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
|
||||
auto cwqrb = counter_write_query_result_builder(*s);
|
||||
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>>(
|
||||
*s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb));
|
||||
auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout);
|
||||
auto f = r_a_r->reader.consume(std::move(cfq), timeout);
|
||||
return f.finally([r_a_r = std::move(r_a_r)] { });
|
||||
}
|
||||
|
||||
@@ -2605,7 +2606,7 @@ void mutation_cleaner_impl::start_worker() {
|
||||
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
|
||||
auto&& region = snp.region();
|
||||
return with_allocator(region.allocator(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
{
|
||||
// Allocating sections require the region to be reclaimable
|
||||
// which means that they cannot be nested.
|
||||
// It is, however, possible, that if the snapshot is taken
|
||||
@@ -2617,13 +2618,15 @@ stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexce
|
||||
}
|
||||
try {
|
||||
return _worker_state->alloc_section(region, [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
return snp.merge_partition_versions(_app_stats);
|
||||
});
|
||||
});
|
||||
} catch (...) {
|
||||
// Merging failed, give up as there is no guarantee of forward progress.
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -113,9 +113,6 @@ class reconcilable_result_builder {
|
||||
const schema& _schema;
|
||||
const query::partition_slice& _slice;
|
||||
|
||||
utils::chunked_vector<partition> _result;
|
||||
uint32_t _live_rows{};
|
||||
|
||||
bool _return_static_content_on_partition_with_no_rows{};
|
||||
bool _static_row_is_alive{};
|
||||
uint32_t _total_live_rows = 0;
|
||||
@@ -123,6 +120,10 @@ class reconcilable_result_builder {
|
||||
stop_iteration _stop;
|
||||
bool _short_read_allowed;
|
||||
std::optional<streamed_mutation_freezer> _mutation_consumer;
|
||||
|
||||
uint32_t _live_rows{};
|
||||
// make this the last member so it is destroyed first. #7240
|
||||
utils::chunked_vector<partition> _result;
|
||||
public:
|
||||
reconcilable_result_builder(const schema& s, const query::partition_slice& slice,
|
||||
query::result_memory_accounter&& accounter)
|
||||
@@ -206,5 +207,6 @@ public:
|
||||
future<mutation_opt> counter_write_query(schema_ptr, const mutation_source&,
|
||||
const dht::decorated_key& dk,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr);
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
|
||||
@@ -982,6 +982,435 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
|
||||
namespace {
|
||||
|
||||
struct fill_buffer_result {
|
||||
foreign_ptr<std::unique_ptr<const circular_buffer<mutation_fragment>>> buffer;
|
||||
bool end_of_stream = false;
|
||||
|
||||
fill_buffer_result() = default;
|
||||
fill_buffer_result(circular_buffer<mutation_fragment> buffer, bool end_of_stream)
|
||||
: buffer(make_foreign(std::make_unique<const circular_buffer<mutation_fragment>>(std::move(buffer))))
|
||||
, end_of_stream(end_of_stream) {
|
||||
}
|
||||
};
|
||||
|
||||
class inactive_evictable_reader : public reader_concurrency_semaphore::inactive_read {
|
||||
flat_mutation_reader_opt _reader;
|
||||
public:
|
||||
inactive_evictable_reader(flat_mutation_reader reader)
|
||||
: _reader(std::move(reader)) {
|
||||
}
|
||||
flat_mutation_reader reader() && {
|
||||
return std::move(*_reader);
|
||||
}
|
||||
virtual void evict() override {
|
||||
_reader = {};
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Encapsulates all data and logic that is local to the remote shard the
|
||||
// reader lives on.
|
||||
class evictable_reader : public flat_mutation_reader::impl {
|
||||
public:
|
||||
using auto_pause = bool_class<class auto_pause_tag>;
|
||||
|
||||
private:
|
||||
auto_pause _auto_pause;
|
||||
mutation_source _ms;
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
const dht::partition_range* _pr;
|
||||
const query::partition_slice& _ps;
|
||||
const io_priority_class& _pc;
|
||||
tracing::global_trace_state_ptr _trace_state;
|
||||
const mutation_reader::forwarding _fwd_mr;
|
||||
reader_concurrency_semaphore::inactive_read_handle _irh;
|
||||
bool _reader_created = false;
|
||||
bool _drop_partition_start = false;
|
||||
bool _drop_static_row = false;
|
||||
position_in_partition::tri_compare _tri_cmp;
|
||||
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
|
||||
// These are used when the reader has to be recreated (after having been
|
||||
// evicted while paused) and the range and/or slice it is recreated with
|
||||
// differs from the original ones.
|
||||
std::optional<dht::partition_range> _range_override;
|
||||
std::optional<query::partition_slice> _slice_override;
|
||||
bool _pending_next_partition = false;
|
||||
|
||||
flat_mutation_reader_opt _reader;
|
||||
|
||||
private:
|
||||
void do_pause(flat_mutation_reader reader);
|
||||
void maybe_pause(flat_mutation_reader reader);
|
||||
flat_mutation_reader_opt try_resume();
|
||||
void update_next_position(flat_mutation_reader& reader);
|
||||
void adjust_partition_slice();
|
||||
flat_mutation_reader recreate_reader();
|
||||
flat_mutation_reader resume_or_create_reader();
|
||||
bool should_drop_fragment(const mutation_fragment& mf);
|
||||
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
future<> fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
|
||||
public:
|
||||
evictable_reader(
|
||||
auto_pause ap,
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
~evictable_reader();
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
|
||||
virtual void next_partition() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
|
||||
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override {
|
||||
throw_with_backtrace<std::bad_function_call>();
|
||||
}
|
||||
reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && {
|
||||
return std::move(_irh);
|
||||
}
|
||||
void pause() {
|
||||
if (_reader) {
|
||||
do_pause(std::move(*_reader));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void evictable_reader::do_pause(flat_mutation_reader reader) {
|
||||
_irh = _semaphore.register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
|
||||
}
|
||||
|
||||
void evictable_reader::maybe_pause(flat_mutation_reader reader) {
|
||||
if (_auto_pause) {
|
||||
do_pause(std::move(reader));
|
||||
} else {
|
||||
_reader = std::move(reader);
|
||||
}
|
||||
}
|
||||
|
||||
flat_mutation_reader_opt evictable_reader::try_resume() {
|
||||
auto ir_ptr = _semaphore.unregister_inactive_read(std::move(_irh));
|
||||
if (!ir_ptr) {
|
||||
return {};
|
||||
}
|
||||
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
|
||||
return std::move(ir).reader();
|
||||
}
|
||||
|
||||
void evictable_reader::update_next_position(flat_mutation_reader& reader) {
|
||||
if (is_buffer_empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto rbegin = std::reverse_iterator(buffer().end());
|
||||
auto rend = std::reverse_iterator(buffer().begin());
|
||||
if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment::is_partition_start)); pk_it != rend) {
|
||||
_last_pkey = pk_it->as_partition_start().key();
|
||||
}
|
||||
|
||||
const auto last_pos = buffer().back().position();
|
||||
switch (last_pos.region()) {
|
||||
case partition_region::partition_start:
|
||||
_next_position_in_partition = position_in_partition::for_static_row();
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
if (reader.is_buffer_empty()) {
|
||||
_next_position_in_partition = position_in_partition::after_key(last_pos);
|
||||
} else {
|
||||
const auto& next_frag = reader.peek_buffer();
|
||||
if (next_frag.is_end_of_partition()) {
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
} else {
|
||||
_next_position_in_partition = position_in_partition(next_frag.position());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void evictable_reader::adjust_partition_slice() {
|
||||
if (!_slice_override) {
|
||||
_slice_override = _ps;
|
||||
}
|
||||
|
||||
auto ranges = _slice_override->default_row_ranges();
|
||||
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
|
||||
|
||||
_slice_override->clear_ranges();
|
||||
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
|
||||
}
|
||||
|
||||
flat_mutation_reader evictable_reader::recreate_reader() {
|
||||
const dht::partition_range* range = _pr;
|
||||
const query::partition_slice* slice = &_ps;
|
||||
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
switch (_next_position_in_partition.region()) {
|
||||
case partition_region::partition_start:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_drop_partition_start = true;
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
_drop_partition_start = true;
|
||||
_drop_static_row = true;
|
||||
adjust_partition_slice();
|
||||
slice = &*_slice_override;
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
}
|
||||
|
||||
// The original range contained a single partition and we've read it
|
||||
// all. We'd have to create a reader with an empty range that would
|
||||
// immediately be at EOS. This is not possible so just create an empty
|
||||
// reader instead.
|
||||
// This should be extremely rare (who'd create a multishard reader to
|
||||
// read a single partition) but still, let's make sure we handle it
|
||||
// correctly.
|
||||
if (_pr->is_singular() && !partition_range_is_inclusive) {
|
||||
return make_empty_flat_reader(_schema);
|
||||
}
|
||||
|
||||
_range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end());
|
||||
range = &*_range_override;
|
||||
}
|
||||
|
||||
return _ms.make_reader(
|
||||
_schema,
|
||||
no_reader_permit(),
|
||||
*range,
|
||||
*slice,
|
||||
_pc,
|
||||
_trace_state,
|
||||
streamed_mutation::forwarding::no,
|
||||
_fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader evictable_reader::resume_or_create_reader() {
|
||||
if (!_reader_created) {
|
||||
auto reader = _ms.make_reader(_schema, no_reader_permit(), *_pr, _ps, _pc, _trace_state, streamed_mutation::forwarding::no, _fwd_mr);
|
||||
_reader_created = true;
|
||||
return reader;
|
||||
}
|
||||
if (_reader) {
|
||||
return std::move(*_reader);
|
||||
}
|
||||
if (auto reader_opt = try_resume()) {
|
||||
return std::move(*reader_opt);
|
||||
}
|
||||
return recreate_reader();
|
||||
}
|
||||
|
||||
bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
if (_drop_partition_start && mf.is_partition_start()) {
|
||||
_drop_partition_start = false;
|
||||
return true;
|
||||
}
|
||||
if (_drop_static_row && mf.is_static_row()) {
|
||||
_drop_static_row = false;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
if (!_drop_partition_start && !_drop_static_row) {
|
||||
return reader.fill_buffer(timeout);
|
||||
}
|
||||
return repeat([this, &reader, timeout] {
|
||||
return reader.fill_buffer(timeout).then([this, &reader] {
|
||||
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
return do_fill_buffer(reader, timeout).then([this, &reader, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
reader.move_buffer_content_to(*this);
|
||||
auto stop = [this, &reader] {
|
||||
// The only problematic fragment kind is the range tombstone.
|
||||
// All other fragment kinds are safe to end the buffer on, and
|
||||
// are guaranteed to represent progress vs. the last buffer fill.
|
||||
if (!buffer().back().is_range_tombstone()) {
|
||||
return true;
|
||||
}
|
||||
if (reader.is_buffer_empty()) {
|
||||
return reader.is_end_of_stream();
|
||||
}
|
||||
const auto& next_pos = reader.peek_buffer().position();
|
||||
// To ensure safe progress we have to ensure the following:
|
||||
//
|
||||
// _next_position_in_partition < buffer.back().position() < next_pos
|
||||
//
|
||||
// * The first condition is to ensure we made progress since the
|
||||
// last buffer fill. Otherwise we might get into an endless loop if
|
||||
// the reader is recreated after each `fill_buffer()` call.
|
||||
// * The second condition is to ensure we have seen all fragments
|
||||
// with the same position. Otherwise we might jump over those
|
||||
// remaining fragments with the same position as the last
|
||||
// fragment's in the buffer when the reader is recreated.
|
||||
return _tri_cmp(_next_position_in_partition, buffer().back().position()) < 0 && _tri_cmp(buffer().back().position(), next_pos) < 0;
|
||||
};
|
||||
// Read additional fragments until it is safe to stop, if needed.
|
||||
// We have to ensure we stop at a fragment such that if the reader is
|
||||
// evicted and recreated later, we won't be skipping any fragments.
|
||||
// Practically, range tombstones are the only ones that are
|
||||
// problematic to end the buffer on. This is due to the fact range
|
||||
// tombstones can have the same position that multiple following range
|
||||
// tombstones, or a single following clustering row in the stream has.
|
||||
// When a range tombstone is the last in the buffer, we have to continue
|
||||
// to read until we are sure we've read all fragments sharing the same
|
||||
// position, so that we can safely continue reading from after said
|
||||
// position.
|
||||
return do_until(stop, [this, &reader, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return do_fill_buffer(reader, timeout);
|
||||
}
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([this, &reader] {
|
||||
update_next_position(reader);
|
||||
});
|
||||
}
|
||||
|
||||
evictable_reader::evictable_reader(
|
||||
auto_pause ap,
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: impl(std::move(schema))
|
||||
, _auto_pause(ap)
|
||||
, _ms(std::move(ms))
|
||||
, _semaphore(semaphore)
|
||||
, _pr(&pr)
|
||||
, _ps(ps)
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _tri_cmp(*_schema) {
|
||||
}
|
||||
|
||||
evictable_reader::~evictable_reader() {
|
||||
try_resume();
|
||||
}
|
||||
|
||||
future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
|
||||
if (pending_next_partition) {
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
}
|
||||
if (is_end_of_stream()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_with(resume_or_create_reader(),
|
||||
[this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable {
|
||||
if (pending_next_partition) {
|
||||
reader.next_partition();
|
||||
}
|
||||
|
||||
return fill_buffer(reader, timeout).then([this, &reader] {
|
||||
_end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty();
|
||||
maybe_pause(std::move(reader));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void evictable_reader::next_partition() {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_pending_next_partition = true;
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
}
|
||||
}
|
||||
|
||||
future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
_pr = ≺
|
||||
_last_pkey.reset();
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
|
||||
if (_reader) {
|
||||
return _reader->fast_forward_to(pr, timeout);
|
||||
}
|
||||
if (!_reader_created || !_irh) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (auto reader_opt = try_resume()) {
|
||||
auto f = reader_opt->fast_forward_to(pr, timeout);
|
||||
return f.then([this, reader = std::move(*reader_opt)] () mutable {
|
||||
maybe_pause(std::move(reader));
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
evictable_reader_handle::evictable_reader_handle(evictable_reader& r) : _r(&r)
|
||||
{ }
|
||||
|
||||
void evictable_reader_handle::evictable_reader_handle::pause() {
|
||||
_r->pause();
|
||||
}
|
||||
|
||||
flat_mutation_reader make_auto_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_flat_mutation_reader<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms), std::move(schema), semaphore, pr, ps,
|
||||
pc, std::move(trace_state), fwd_mr);
|
||||
}
|
||||
|
||||
std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto reader = std::make_unique<evictable_reader>(evictable_reader::auto_pause::no, std::move(ms), std::move(schema), semaphore, pr, ps,
|
||||
pc, std::move(trace_state), fwd_mr);
|
||||
auto handle = evictable_reader_handle(*reader.get());
|
||||
return std::pair(flat_mutation_reader(std::move(reader)), handle);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// A special-purpose shard reader.
|
||||
//
|
||||
// Shard reader manages a reader located on a remote shard. It transparently
|
||||
@@ -992,66 +1421,6 @@ namespace {
|
||||
// wrapped into a flat_mutation_reader, as it needs to be managed by a shared
|
||||
// pointer.
|
||||
class shard_reader : public enable_lw_shared_from_this<shard_reader>, public flat_mutation_reader::impl {
|
||||
struct fill_buffer_result {
|
||||
foreign_ptr<std::unique_ptr<const circular_buffer<mutation_fragment>>> buffer;
|
||||
bool end_of_stream = false;
|
||||
|
||||
fill_buffer_result() = default;
|
||||
fill_buffer_result(circular_buffer<mutation_fragment> buffer, bool end_of_stream)
|
||||
: buffer(make_foreign(std::make_unique<const circular_buffer<mutation_fragment>>(std::move(buffer))))
|
||||
, end_of_stream(end_of_stream) {
|
||||
}
|
||||
};
|
||||
|
||||
// Encapsulates all data and logic that is local to the remote shard the
|
||||
// reader lives on.
|
||||
class remote_reader {
|
||||
schema_ptr _schema;
|
||||
reader_lifecycle_policy& _lifecycle_policy;
|
||||
const dht::partition_range* _pr;
|
||||
const query::partition_slice& _ps;
|
||||
const io_priority_class& _pc;
|
||||
tracing::global_trace_state_ptr _trace_state;
|
||||
const mutation_reader::forwarding _fwd_mr;
|
||||
reader_concurrency_semaphore::inactive_read_handle _irh;
|
||||
bool _reader_created = false;
|
||||
bool _drop_partition_start = false;
|
||||
bool _drop_static_row = false;
|
||||
position_in_partition::tri_compare _tri_cmp;
|
||||
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
|
||||
// These are used when the reader has to be recreated (after having been
|
||||
// evicted while paused) and the range and/or slice it is recreated with
|
||||
// differs from the original ones.
|
||||
std::optional<dht::partition_range> _range_override;
|
||||
std::optional<query::partition_slice> _slice_override;
|
||||
|
||||
private:
|
||||
void update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer);
|
||||
void adjust_partition_slice();
|
||||
flat_mutation_reader recreate_reader();
|
||||
flat_mutation_reader resume_or_create_reader();
|
||||
bool should_drop_fragment(const mutation_fragment& mf);
|
||||
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
future<> fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer, db::timeout_clock::time_point timeout);
|
||||
|
||||
public:
|
||||
remote_reader(
|
||||
schema_ptr schema,
|
||||
reader_lifecycle_policy& lifecycle_policy,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
future<fill_buffer_result> fill_buffer(const dht::partition_range& pr, bool pending_next_partition, db::timeout_clock::time_point timeout);
|
||||
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout);
|
||||
reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && {
|
||||
return std::move(_irh);
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
|
||||
const unsigned _shard;
|
||||
@@ -1063,7 +1432,7 @@ private:
|
||||
bool _pending_next_partition = false;
|
||||
bool _stopped = false;
|
||||
std::optional<future<>> _read_ahead;
|
||||
foreign_ptr<std::unique_ptr<remote_reader>> _reader;
|
||||
foreign_ptr<std::unique_ptr<evictable_reader>> _reader;
|
||||
|
||||
private:
|
||||
future<> do_fill_buffer(db::timeout_clock::time_point timeout);
|
||||
@@ -1125,275 +1494,50 @@ void shard_reader::stop() noexcept {
|
||||
|
||||
_lifecycle_policy->destroy_reader(_shard, f.then([this] {
|
||||
return smp::submit_to(_shard, [this] {
|
||||
return make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(*_reader).inactive_read_handle()));
|
||||
}).then([this] (foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>> irh) {
|
||||
return reader_lifecycle_policy::stopped_reader{std::move(irh), detach_buffer(), _pending_next_partition};
|
||||
auto ret = std::tuple(
|
||||
make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(*_reader).inactive_read_handle())),
|
||||
make_foreign(std::make_unique<circular_buffer<mutation_fragment>>(_reader->detach_buffer())));
|
||||
_reader.reset();
|
||||
return ret;
|
||||
}).then([this] (std::tuple<foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>>,
|
||||
foreign_ptr<std::unique_ptr<circular_buffer<mutation_fragment>>>> remains) {
|
||||
auto&& [irh, remote_buffer] = remains;
|
||||
auto buffer = detach_buffer();
|
||||
for (const auto& mf : *remote_buffer) {
|
||||
buffer.emplace_back(*_schema, mf); // we are copying from the remote shard.
|
||||
}
|
||||
return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), _pending_next_partition};
|
||||
});
|
||||
}).finally([zis = shared_from_this()] {}));
|
||||
}
|
||||
|
||||
void shard_reader::remote_reader::update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) {
|
||||
if (buffer.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto rbegin = std::reverse_iterator(buffer.end());
|
||||
auto rend = std::reverse_iterator(buffer.begin());
|
||||
if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment::is_partition_start)); pk_it != rend) {
|
||||
_last_pkey = pk_it->as_partition_start().key();
|
||||
}
|
||||
|
||||
const auto last_pos = buffer.back().position();
|
||||
switch (last_pos.region()) {
|
||||
case partition_region::partition_start:
|
||||
_next_position_in_partition = position_in_partition::for_static_row();
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
if (reader.is_buffer_empty()) {
|
||||
_next_position_in_partition = position_in_partition::after_key(last_pos);
|
||||
} else {
|
||||
const auto& next_frag = reader.peek_buffer();
|
||||
if (next_frag.is_end_of_partition()) {
|
||||
buffer.emplace_back(reader.pop_mutation_fragment());
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
} else {
|
||||
_next_position_in_partition = position_in_partition(next_frag.position());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void shard_reader::remote_reader::adjust_partition_slice() {
|
||||
if (!_slice_override) {
|
||||
_slice_override = _ps;
|
||||
}
|
||||
|
||||
auto ranges = _slice_override->default_row_ranges();
|
||||
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
|
||||
|
||||
_slice_override->clear_ranges();
|
||||
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
|
||||
}
|
||||
|
||||
flat_mutation_reader shard_reader::remote_reader::recreate_reader() {
|
||||
const dht::partition_range* range = _pr;
|
||||
const query::partition_slice* slice = &_ps;
|
||||
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
switch (_next_position_in_partition.region()) {
|
||||
case partition_region::partition_start:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_drop_partition_start = true;
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
_drop_partition_start = true;
|
||||
_drop_static_row = true;
|
||||
adjust_partition_slice();
|
||||
slice = &*_slice_override;
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
}
|
||||
|
||||
// The original range contained a single partition and we've read it
|
||||
// all. We'd have to create a reader with an empty range that would
|
||||
// immediately be at EOS. This is not possible so just create an empty
|
||||
// reader instead.
|
||||
// This should be extremely rare (who'd create a multishard reader to
|
||||
// read a single partition) but still, let's make sure we handle it
|
||||
// correctly.
|
||||
if (_pr->is_singular() && !partition_range_is_inclusive) {
|
||||
return make_empty_flat_reader(_schema);
|
||||
}
|
||||
|
||||
_range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end());
|
||||
range = &*_range_override;
|
||||
}
|
||||
|
||||
return _lifecycle_policy.create_reader(
|
||||
_schema,
|
||||
*range,
|
||||
*slice,
|
||||
_pc,
|
||||
_trace_state,
|
||||
_fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader shard_reader::remote_reader::resume_or_create_reader() {
|
||||
if (!_reader_created) {
|
||||
auto reader = _lifecycle_policy.create_reader(_schema, *_pr, _ps, _pc, _trace_state, _fwd_mr);
|
||||
_reader_created = true;
|
||||
return reader;
|
||||
}
|
||||
if (auto reader_opt = _lifecycle_policy.try_resume(std::move(_irh))) {
|
||||
return std::move(*reader_opt);
|
||||
}
|
||||
return recreate_reader();
|
||||
}
|
||||
|
||||
bool shard_reader::remote_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
if (_drop_partition_start && mf.is_partition_start()) {
|
||||
_drop_partition_start = false;
|
||||
return true;
|
||||
}
|
||||
if (_drop_static_row && mf.is_static_row()) {
|
||||
_drop_static_row = false;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
if (!_drop_partition_start && !_drop_static_row) {
|
||||
return reader.fill_buffer(timeout);
|
||||
}
|
||||
return repeat([this, &reader, timeout] {
|
||||
return reader.fill_buffer(timeout).then([this, &reader] {
|
||||
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
buffer = reader.detach_buffer();
|
||||
auto stop = [this, &reader, &buffer] {
|
||||
// The only problematic fragment kind is the range tombstone.
|
||||
// All other fragment kinds are safe to end the buffer on, and
|
||||
// are guaranteed to represent progress vs. the last buffer fill.
|
||||
if (!buffer.back().is_range_tombstone()) {
|
||||
return true;
|
||||
}
|
||||
if (reader.is_buffer_empty()) {
|
||||
return reader.is_end_of_stream();
|
||||
}
|
||||
const auto& next_pos = reader.peek_buffer().position();
|
||||
// To ensure safe progress we have to ensure the following:
|
||||
//
|
||||
// _next_position_in_partition < buffer.back().position() < next_pos
|
||||
//
|
||||
// * The first condition is to ensure we made progress since the
|
||||
// last buffer fill. Otherwise we might get into an endless loop if
|
||||
// the reader is recreated after each `fill_buffer()` call.
|
||||
// * The second condition is to ensure we have seen all fragments
|
||||
// with the same position. Otherwise we might jump over those
|
||||
// remaining fragments with the same position as the last
|
||||
// fragment's in the buffer when the reader is recreated.
|
||||
return _tri_cmp(_next_position_in_partition, buffer.back().position()) < 0 && _tri_cmp(buffer.back().position(), next_pos) < 0;
|
||||
};
|
||||
// Read additional fragments until it is safe to stop, if needed.
|
||||
// We have to ensure we stop at a fragment such that if the reader is
|
||||
// evicted and recreated later, we won't be skipping any fragments.
|
||||
// Practically, range tombstones are the only ones that are
|
||||
// problematic to end the buffer on. This is due to the fact range
|
||||
// tombstones can have the same position that multiple following range
|
||||
// tombstones, or a single following clustering row in the stream has.
|
||||
// When a range tombstone is the last in the buffer, we have to continue
|
||||
// to read until we are sure we've read all fragments sharing the same
|
||||
// position, so that we can safely continue reading from after said
|
||||
// position.
|
||||
return do_until(stop, [this, &reader, &buffer, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return do_fill_buffer(reader, timeout);
|
||||
}
|
||||
buffer.emplace_back(reader.pop_mutation_fragment());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([this, &reader, &buffer] {
|
||||
update_next_position(reader, buffer);
|
||||
});
|
||||
}
|
||||
|
||||
shard_reader::remote_reader::remote_reader(
|
||||
schema_ptr schema,
|
||||
reader_lifecycle_policy& lifecycle_policy,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _schema(std::move(schema))
|
||||
, _lifecycle_policy(lifecycle_policy)
|
||||
, _pr(&pr)
|
||||
, _ps(ps)
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _tri_cmp(*_schema) {
|
||||
}
|
||||
|
||||
future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffer(const dht::partition_range& pr, bool pending_next_partition,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
// We could have missed a `fast_forward_to()` if the reader wasn't created yet.
|
||||
_pr = ≺
|
||||
if (pending_next_partition) {
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
}
|
||||
return do_with(resume_or_create_reader(), circular_buffer<mutation_fragment>{},
|
||||
[this, pending_next_partition, timeout] (flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) mutable {
|
||||
if (pending_next_partition) {
|
||||
reader.next_partition();
|
||||
}
|
||||
|
||||
return fill_buffer(reader, buffer, timeout).then([this, &reader, &buffer] {
|
||||
const auto eos = reader.is_end_of_stream() && reader.is_buffer_empty();
|
||||
_irh = _lifecycle_policy.pause(std::move(reader));
|
||||
return fill_buffer_result(std::move(buffer), eos);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
_pr = ≺
|
||||
_last_pkey.reset();
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
|
||||
if (!_reader_created || !_irh) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (auto reader_opt = _lifecycle_policy.try_resume(std::move(_irh))) {
|
||||
auto f = reader_opt->fast_forward_to(pr, timeout);
|
||||
return f.then([this, reader = std::move(*reader_opt)] () mutable {
|
||||
_irh = _lifecycle_policy.pause(std::move(reader));
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
|
||||
future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
auto fill_buf_fut = make_ready_future<fill_buffer_result>();
|
||||
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
|
||||
|
||||
struct reader_and_buffer_fill_result {
|
||||
foreign_ptr<std::unique_ptr<remote_reader>> reader;
|
||||
foreign_ptr<std::unique_ptr<evictable_reader>> reader;
|
||||
fill_buffer_result result;
|
||||
};
|
||||
|
||||
if (!_reader) {
|
||||
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema), pending_next_partition, timeout] {
|
||||
auto rreader = make_foreign(std::make_unique<remote_reader>(gs.get(), *_lifecycle_policy, *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
auto f = rreader->fill_buffer(*_pr, pending_next_partition, timeout);
|
||||
return f.then([rreader = std::move(rreader)] (fill_buffer_result res) mutable {
|
||||
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema), timeout] {
|
||||
auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] (
|
||||
schema_ptr s,
|
||||
reader_permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return lifecycle_policy->create_reader(std::move(s), pr, ps, pc, std::move(ts), fwd_mr);
|
||||
});
|
||||
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
|
||||
gs.get(), _lifecycle_policy->semaphore(), *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
auto f = rreader->fill_buffer(timeout);
|
||||
return f.then([rreader = std::move(rreader)] () mutable {
|
||||
auto res = fill_buffer_result(rreader->detach_buffer(), rreader->is_end_of_stream());
|
||||
return make_ready_future<reader_and_buffer_fill_result>(reader_and_buffer_fill_result{std::move(rreader), std::move(res)});
|
||||
});
|
||||
}).then([this, timeout] (reader_and_buffer_fill_result res) {
|
||||
@@ -1402,7 +1546,12 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
});
|
||||
} else {
|
||||
fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable {
|
||||
return _reader->fill_buffer(*_pr, pending_next_partition, timeout);
|
||||
if (pending_next_partition) {
|
||||
_reader->next_partition();
|
||||
}
|
||||
return _reader->fill_buffer(timeout).then([this] {
|
||||
return fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1651,27 +1800,9 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim
|
||||
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class inactive_shard_read : public reader_concurrency_semaphore::inactive_read {
|
||||
flat_mutation_reader_opt _reader;
|
||||
public:
|
||||
inactive_shard_read(flat_mutation_reader reader)
|
||||
: _reader(std::move(reader)) {
|
||||
}
|
||||
flat_mutation_reader reader() && {
|
||||
return std::move(*_reader);
|
||||
}
|
||||
virtual void evict() override {
|
||||
_reader = {};
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle
|
||||
reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader) {
|
||||
return sem.register_inactive_read(std::make_unique<inactive_shard_read>(std::move(reader)));
|
||||
return sem.register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
|
||||
}
|
||||
|
||||
flat_mutation_reader_opt
|
||||
@@ -1680,7 +1811,7 @@ reader_lifecycle_policy::try_resume(reader_concurrency_semaphore& sem, reader_co
|
||||
if (!ir_ptr) {
|
||||
return {};
|
||||
}
|
||||
auto& ir = static_cast<inactive_shard_read&>(*ir_ptr);
|
||||
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
|
||||
return std::move(ir).reader();
|
||||
}
|
||||
|
||||
|
||||
@@ -372,6 +372,64 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader,
|
||||
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
|
||||
|
||||
/// Make an auto-paused evictable reader.
|
||||
///
|
||||
/// The reader is paused after each use, that is after each call to any of its
|
||||
/// members that cause actual reading to be done (`fill_buffer()` and
|
||||
/// `fast_forward_to()`). When paused, the reader is made evictable, that it is
|
||||
/// it is registered with reader concurrency semaphore as an inactive read.
|
||||
/// The reader is resumed automatically on the next use. If it was evicted, it
|
||||
/// will be recreated at the position it left off reading. This is all
|
||||
/// transparent to its user.
|
||||
/// Parameters passed by reference have to be kept alive while the reader is
|
||||
/// alive.
|
||||
flat_mutation_reader make_auto_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
|
||||
class evictable_reader;
|
||||
|
||||
class evictable_reader_handle {
|
||||
friend std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(mutation_source, schema_ptr, reader_concurrency_semaphore&,
|
||||
const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, mutation_reader::forwarding);
|
||||
|
||||
private:
|
||||
evictable_reader* _r;
|
||||
|
||||
private:
|
||||
explicit evictable_reader_handle(evictable_reader& r);
|
||||
|
||||
public:
|
||||
void pause();
|
||||
};
|
||||
|
||||
/// Make a manually-paused evictable reader.
|
||||
///
|
||||
/// The reader can be paused via the evictable reader handle when desired. The
|
||||
/// intended usage is subsequent reads done in bursts, after which the reader is
|
||||
/// not used for some time. When paused, the reader is made evictable, that is,
|
||||
/// it is registered with reader concurrency semaphore as an inactive read.
|
||||
/// The reader is resumed automatically on the next use. If it was evicted, it
|
||||
/// will be recreated at the position it left off reading. This is all
|
||||
/// transparent to its user.
|
||||
/// Parameters passed by reference have to be kept alive while the reader is
|
||||
/// alive.
|
||||
std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
|
||||
/// Reader lifecycle policy for the mulitshard combining reader.
|
||||
///
|
||||
/// This policy is expected to make sure any additional resource the readers
|
||||
|
||||
@@ -63,7 +63,7 @@ shared_ptr<abstract_command> exists::prepare(service::storage_proxy& proxy, requ
|
||||
}
|
||||
|
||||
future<redis_message> exists::execute(service::storage_proxy& proxy, redis::redis_options& options, service_permit permit) {
|
||||
return seastar::do_for_each(_keys, [&proxy, &options, &permit, this] (bytes key) {
|
||||
return seastar::do_for_each(_keys, [&proxy, &options, permit, this] (bytes& key) {
|
||||
return redis::read_strings(proxy, options, key, permit).then([this] (lw_shared_ptr<strings_result> result) {
|
||||
if (result->has_result()) {
|
||||
_count++;
|
||||
|
||||
@@ -12,7 +12,11 @@
|
||||
# At the end of the build we check that the build-id is indeed in the
|
||||
# first page. At install time we check that patchelf doesn't modify
|
||||
# the program headers.
|
||||
|
||||
# gdb has a SO_NAME_MAX_PATH_SIZE of 512, so limit the path size to
|
||||
# that. The 512 includes the null at the end, hence the 511 bellow.
|
||||
|
||||
ORIGINAL_DYNAMIC_LINKER=$(gcc -### /dev/null -o t 2>&1 | perl -n -e '/-dynamic-linker ([^ ]*) / && print $1')
|
||||
DYNAMIC_LINKER=$(printf "%2000s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
DYNAMIC_LINKER=$(printf "%511s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
|
||||
echo $DYNAMIC_LINKER
|
||||
|
||||
@@ -1945,7 +1945,7 @@ future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_meta
|
||||
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, std::unordered_set<dht::token> replacing_tokens) {
|
||||
auto op = sstring("replace_with_repair");
|
||||
auto source_dc = get_local_dc();
|
||||
auto reason = streaming::stream_reason::bootstrap;
|
||||
auto reason = streaming::stream_reason::replace;
|
||||
tm.update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
#include <unordered_map>
|
||||
#include <exception>
|
||||
#include <absl/container/btree_set.h>
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
@@ -334,6 +335,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using repair_hash_set = absl::btree_set<repair_hash>;
|
||||
|
||||
// Return value of the REPAIR_GET_SYNC_BOUNDARY RPC verb
|
||||
struct get_sync_boundary_response {
|
||||
std::optional<repair_sync_boundary> boundary;
|
||||
|
||||
@@ -47,6 +47,7 @@
|
||||
#include "gms/gossiper.hh"
|
||||
#include "repair/row_level.hh"
|
||||
#include "mutation_source_metadata.hh"
|
||||
#include "utils/stall_free.hh"
|
||||
|
||||
extern logging::logger rlogger;
|
||||
|
||||
@@ -372,6 +373,7 @@ private:
|
||||
std::optional<utils::phased_barrier::operation> _local_read_op;
|
||||
// Local reader or multishard reader to read the range
|
||||
flat_mutation_reader _reader;
|
||||
std::optional<evictable_reader_handle> _reader_handle;
|
||||
// Current partition read from disk
|
||||
lw_shared_ptr<const decorated_key_with_hash> _current_dk;
|
||||
|
||||
@@ -390,32 +392,49 @@ public:
|
||||
, _sharder(remote_sharder, range, remote_shard)
|
||||
, _seed(seed)
|
||||
, _local_read_op(local_reader ? std::optional(cf.read_in_progress()) : std::nullopt)
|
||||
, _reader(make_reader(db, cf, local_reader)) {
|
||||
}
|
||||
|
||||
private:
|
||||
flat_mutation_reader
|
||||
make_reader(seastar::sharded<database>& db,
|
||||
column_family& cf,
|
||||
is_local_reader local_reader) {
|
||||
, _reader(nullptr) {
|
||||
if (local_reader) {
|
||||
return cf.make_streaming_reader(_schema, _range);
|
||||
auto ms = mutation_source([&cf] (
|
||||
schema_ptr s,
|
||||
reader_permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return cf.make_streaming_reader(std::move(s), pr, ps, fwd_mr);
|
||||
});
|
||||
std::tie(_reader, _reader_handle) = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
_schema,
|
||||
cf.streaming_read_concurrency_semaphore(),
|
||||
_range,
|
||||
_schema->full_slice(),
|
||||
service::get_local_streaming_read_priority(),
|
||||
{},
|
||||
mutation_reader::forwarding::no);
|
||||
} else {
|
||||
_reader = make_multishard_streaming_reader(db, _schema, [this] {
|
||||
auto shard_range = _sharder.next();
|
||||
if (shard_range) {
|
||||
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
|
||||
}
|
||||
return std::optional<dht::partition_range>();
|
||||
});
|
||||
}
|
||||
return make_multishard_streaming_reader(db, _schema, [this] {
|
||||
auto shard_range = _sharder.next();
|
||||
if (shard_range) {
|
||||
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
|
||||
}
|
||||
return std::optional<dht::partition_range>();
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
future<mutation_fragment_opt>
|
||||
read_mutation_fragment() {
|
||||
return _reader(db::no_timeout);
|
||||
}
|
||||
|
||||
void on_end_of_stream() {
|
||||
_reader = make_empty_flat_reader(_schema);
|
||||
_reader_handle.reset();
|
||||
}
|
||||
|
||||
lw_shared_ptr<const decorated_key_with_hash>& get_current_dk() {
|
||||
return _current_dk;
|
||||
}
|
||||
@@ -434,6 +453,11 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void pause() {
|
||||
if (_reader_handle) {
|
||||
_reader_handle->pause();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class repair_writer {
|
||||
@@ -450,6 +474,7 @@ class repair_writer {
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
streaming::stream_reason _reason;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -508,7 +533,7 @@ public:
|
||||
sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write();
|
||||
schema_ptr s = reader.schema();
|
||||
auto& pc = service::get_local_streaming_write_priority();
|
||||
return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s,
|
||||
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
|
||||
t->get_sstables_manager().configure_writer(),
|
||||
encoding_stats{}, pc).then([sst] {
|
||||
return sst->open_data();
|
||||
@@ -561,11 +586,18 @@ public:
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
_mq[node_idx]->abort(ep);
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -588,6 +620,10 @@ public:
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
@@ -635,7 +671,7 @@ private:
|
||||
// Tracks current sync boundary
|
||||
std::optional<repair_sync_boundary> _current_sync_boundary;
|
||||
// Contains the hashes of rows in the _working_row_buffor for all peer nodes
|
||||
std::vector<std::unordered_set<repair_hash>> _peer_row_hash_sets;
|
||||
std::vector<repair_hash_set> _peer_row_hash_sets;
|
||||
// Gate used to make sure pending operation of meta data is done
|
||||
seastar::gate _gate;
|
||||
sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes;
|
||||
@@ -723,11 +759,12 @@ public:
|
||||
public:
|
||||
future<> stop() {
|
||||
auto gate_future = _gate.close();
|
||||
auto writer_future = _repair_writer.wait_for_writer_done();
|
||||
auto f1 = _sink_source_for_get_full_row_hashes.close();
|
||||
auto f2 = _sink_source_for_get_row_diff.close();
|
||||
auto f3 = _sink_source_for_put_row_diff.close();
|
||||
return when_all_succeed(std::move(gate_future), std::move(writer_future), std::move(f1), std::move(f2), std::move(f3));
|
||||
return when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).finally([this] {
|
||||
return _repair_writer.wait_for_writer_done();
|
||||
});
|
||||
}
|
||||
|
||||
static std::unordered_map<node_repair_meta_id, lw_shared_ptr<repair_meta>>& repair_meta_map() {
|
||||
@@ -855,9 +892,9 @@ public:
|
||||
}
|
||||
|
||||
// Must run inside a seastar thread
|
||||
static std::unordered_set<repair_hash>
|
||||
get_set_diff(const std::unordered_set<repair_hash>& x, const std::unordered_set<repair_hash>& y) {
|
||||
std::unordered_set<repair_hash> set_diff;
|
||||
static repair_hash_set
|
||||
get_set_diff(const repair_hash_set& x, const repair_hash_set& y) {
|
||||
repair_hash_set set_diff;
|
||||
// Note std::set_difference needs x and y are sorted.
|
||||
std::copy_if(x.begin(), x.end(), std::inserter(set_diff, set_diff.end()),
|
||||
[&y] (auto& item) { thread::maybe_yield(); return y.find(item) == y.end(); });
|
||||
@@ -875,14 +912,14 @@ public:
|
||||
|
||||
}
|
||||
|
||||
std::unordered_set<repair_hash>& peer_row_hash_sets(unsigned node_idx) {
|
||||
repair_hash_set& peer_row_hash_sets(unsigned node_idx) {
|
||||
return _peer_row_hash_sets[node_idx];
|
||||
}
|
||||
|
||||
// Get a list of row hashes in _working_row_buf
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
working_row_hashes() {
|
||||
return do_with(std::unordered_set<repair_hash>(), [this] (std::unordered_set<repair_hash>& hashes) {
|
||||
return do_with(repair_hash_set(), [this] (repair_hash_set& hashes) {
|
||||
return do_for_each(_working_row_buf, [&hashes] (repair_row& r) {
|
||||
hashes.emplace(r.hash());
|
||||
}).then([&hashes] {
|
||||
@@ -1007,11 +1044,7 @@ private:
|
||||
return repair_hash(h.finalize_uint64());
|
||||
}
|
||||
|
||||
stop_iteration handle_mutation_fragment(mutation_fragment_opt mfopt, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
if (!mfopt) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
mutation_fragment& mf = *mfopt;
|
||||
stop_iteration handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
if (mf.is_partition_start()) {
|
||||
auto& start = mf.as_partition_start();
|
||||
_repair_reader.set_current_dk(start.key());
|
||||
@@ -1046,32 +1079,49 @@ private:
|
||||
}
|
||||
_gate.check();
|
||||
return _repair_reader.read_mutation_fragment().then([this, &cur_size, &new_rows_size, &cur_rows] (mutation_fragment_opt mfopt) mutable {
|
||||
return handle_mutation_fragment(std::move(mfopt), cur_size, new_rows_size, cur_rows);
|
||||
if (!mfopt) {
|
||||
_repair_reader.on_end_of_stream();
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
return handle_mutation_fragment(*mfopt, cur_size, new_rows_size, cur_rows);
|
||||
});
|
||||
}).then([&cur_rows, &new_rows_size] () mutable {
|
||||
}).then_wrapped([this, &cur_rows, &new_rows_size] (future<> fut) mutable {
|
||||
if (fut.failed()) {
|
||||
_repair_reader.on_end_of_stream();
|
||||
return make_exception_future<std::list<repair_row>, size_t>(fut.get_exception());
|
||||
}
|
||||
_repair_reader.pause();
|
||||
return make_ready_future<std::list<repair_row>, size_t>(std::move(cur_rows), new_rows_size);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> clear_row_buf() {
|
||||
return utils::clear_gently(_row_buf);
|
||||
}
|
||||
|
||||
future<> clear_working_row_buf() {
|
||||
return utils::clear_gently(_working_row_buf).then([this] {
|
||||
_working_row_buf_combined_hash.clear();
|
||||
});
|
||||
}
|
||||
|
||||
// Read rows from disk until _max_row_buf_size of rows are filled into _row_buf.
|
||||
// Calculate the combined checksum of the rows
|
||||
// Calculate the total size of the rows in _row_buf
|
||||
future<get_sync_boundary_response>
|
||||
get_sync_boundary(std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
auto f = make_ready_future<>();
|
||||
if (skipped_sync_boundary) {
|
||||
_current_sync_boundary = skipped_sync_boundary;
|
||||
_row_buf.clear();
|
||||
_working_row_buf.clear();
|
||||
_working_row_buf_combined_hash.clear();
|
||||
} else {
|
||||
_working_row_buf.clear();
|
||||
_working_row_buf_combined_hash.clear();
|
||||
f = clear_row_buf();
|
||||
}
|
||||
// Here is the place we update _last_sync_boundary
|
||||
rlogger.trace("SET _last_sync_boundary from {} to {}", _last_sync_boundary, _current_sync_boundary);
|
||||
_last_sync_boundary = _current_sync_boundary;
|
||||
return row_buf_size().then([this, sb = std::move(skipped_sync_boundary)] (size_t cur_size) {
|
||||
return f.then([this, sb = std::move(skipped_sync_boundary)] () mutable {
|
||||
return clear_working_row_buf().then([this, sb = sb] () mutable {
|
||||
return row_buf_size().then([this, sb = std::move(sb)] (size_t cur_size) {
|
||||
return read_rows_from_disk(cur_size).then([this, sb = std::move(sb)] (std::list<repair_row> new_rows, size_t new_rows_size) mutable {
|
||||
size_t new_rows_nr = new_rows.size();
|
||||
_row_buf.splice(_row_buf.end(), new_rows);
|
||||
@@ -1088,6 +1138,8 @@ private:
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> move_row_buf_to_working_row_buf() {
|
||||
@@ -1163,9 +1215,9 @@ private:
|
||||
}
|
||||
|
||||
future<std::list<repair_row>>
|
||||
copy_rows_from_working_row_buf_within_set_diff(std::unordered_set<repair_hash> set_diff) {
|
||||
copy_rows_from_working_row_buf_within_set_diff(repair_hash_set set_diff) {
|
||||
return do_with(std::list<repair_row>(), std::move(set_diff),
|
||||
[this] (std::list<repair_row>& rows, std::unordered_set<repair_hash>& set_diff) {
|
||||
[this] (std::list<repair_row>& rows, repair_hash_set& set_diff) {
|
||||
return do_for_each(_working_row_buf, [this, &set_diff, &rows] (const repair_row& r) {
|
||||
if (set_diff.count(r.hash()) > 0) {
|
||||
rows.push_back(r);
|
||||
@@ -1180,7 +1232,7 @@ private:
|
||||
// Give a set of row hashes, return the corresponding rows
|
||||
// If needs_all_rows is set, return all the rows in _working_row_buf, ignore the set_diff
|
||||
future<std::list<repair_row>>
|
||||
get_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) {
|
||||
get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) {
|
||||
if (needs_all_rows) {
|
||||
if (!_repair_master || _nr_peer_nodes == 1) {
|
||||
return make_ready_future<std::list<repair_row>>(std::move(_working_row_buf));
|
||||
@@ -1191,6 +1243,32 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>&& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return do_with(std::move(row_diff), [this, node_idx, update_buf] (std::list<repair_row>& row_diff) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return repeat([this, node_idx, update_buf, &row_diff] () mutable {
|
||||
if (row_diff.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
repair_row& r = row_diff.front();
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
|
||||
row_diff.pop_front();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
// Must run inside a seastar thread
|
||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||
@@ -1204,30 +1282,17 @@ private:
|
||||
stats().rx_row_nr += row_diff.size();
|
||||
stats().rx_row_nr_peer[from] += row_diff.size();
|
||||
if (update_buf) {
|
||||
std::list<repair_row> tmp;
|
||||
tmp.swap(_working_row_buf);
|
||||
// Both row_diff and _working_row_buf and are ordered, merging
|
||||
// two sored list to make sure the combination of row_diff
|
||||
// and _working_row_buf are ordered.
|
||||
std::merge(tmp.begin(), tmp.end(), row_diff.begin(), row_diff.end(), std::back_inserter(_working_row_buf),
|
||||
[this] (const repair_row& x, const repair_row& y) { thread::maybe_yield(); return _cmp(x.boundary(), y.boundary()) < 0; });
|
||||
utils::merge_to_gently(_working_row_buf, row_diff,
|
||||
[this] (const repair_row& x, const repair_row& y) { return _cmp(x.boundary(), y.boundary()) < 0; });
|
||||
}
|
||||
if (update_hash_set) {
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<repair_hash_set>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
for (auto& r : row_diff) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
||||
}
|
||||
do_apply_rows(std::move(row_diff), node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1235,19 +1300,9 @@ private:
|
||||
if (rows.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
});
|
||||
return to_repair_rows_list(std::move(rows)).then([this] (std::list<repair_row> row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
return do_apply_rows(std::move(row_diff), node_idx, update_working_row_buf::no);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1326,13 +1381,13 @@ private:
|
||||
public:
|
||||
// RPC API
|
||||
// Return the hashes of the rows in _working_row_buf
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
get_full_row_hashes(gms::inet_address remote_node) {
|
||||
if (remote_node == _myip) {
|
||||
return get_full_row_hashes_handler();
|
||||
}
|
||||
return netw::get_local_messaging_service().send_repair_get_full_row_hashes(msg_addr(remote_node),
|
||||
_repair_meta_id).then([this, remote_node] (std::unordered_set<repair_hash> hashes) {
|
||||
_repair_meta_id).then([this, remote_node] (repair_hash_set hashes) {
|
||||
rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size());
|
||||
_metrics.rx_hashes_nr += hashes.size();
|
||||
stats().rx_hashes_nr += hashes.size();
|
||||
@@ -1343,7 +1398,7 @@ public:
|
||||
|
||||
private:
|
||||
future<> get_full_row_hashes_source_op(
|
||||
lw_shared_ptr<std::unordered_set<repair_hash>> current_hashes,
|
||||
lw_shared_ptr<repair_hash_set> current_hashes,
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx,
|
||||
rpc::source<repair_hash_with_cmd>& source) {
|
||||
@@ -1381,12 +1436,12 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (remote_node == _myip) {
|
||||
return get_full_row_hashes_handler();
|
||||
}
|
||||
auto current_hashes = make_lw_shared<std::unordered_set<repair_hash>>();
|
||||
auto current_hashes = make_lw_shared<repair_hash_set>();
|
||||
return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx).then(
|
||||
[this, current_hashes, remote_node, node_idx]
|
||||
(rpc::sink<repair_stream_cmd>& sink, rpc::source<repair_hash_with_cmd>& source) mutable {
|
||||
@@ -1401,7 +1456,7 @@ public:
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
get_full_row_hashes_handler() {
|
||||
return with_gate(_gate, [this] {
|
||||
return working_row_hashes();
|
||||
@@ -1541,7 +1596,7 @@ public:
|
||||
// RPC API
|
||||
// Return rows in the _working_row_buf with hash within the given sef_diff
|
||||
// Must run inside a seastar thread
|
||||
void get_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, unsigned node_idx) {
|
||||
void get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (needs_all_rows || !set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return;
|
||||
@@ -1610,11 +1665,11 @@ private:
|
||||
}
|
||||
|
||||
future<> get_row_diff_sink_op(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
repair_hash_set set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
rpc::sink<repair_hash_with_cmd>& sink,
|
||||
gms::inet_address remote_node) {
|
||||
return do_with(std::move(set_diff), [needs_all_rows, remote_node, &sink] (std::unordered_set<repair_hash>& set_diff) mutable {
|
||||
return do_with(std::move(set_diff), [needs_all_rows, remote_node, &sink] (repair_hash_set& set_diff) mutable {
|
||||
if (inject_rpc_stream_error) {
|
||||
return make_exception_future<>(std::runtime_error("get_row_diff: Inject sender error in sink loop"));
|
||||
}
|
||||
@@ -1641,7 +1696,7 @@ private:
|
||||
public:
|
||||
// Must run inside a seastar thread
|
||||
void get_row_diff_with_rpc_stream(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
repair_hash_set set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
update_peer_row_hash_sets update_hash_set,
|
||||
gms::inet_address remote_node,
|
||||
@@ -1667,7 +1722,7 @@ public:
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<repair_rows_on_wire> get_row_diff_handler(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows) {
|
||||
future<repair_rows_on_wire> get_row_diff_handler(repair_hash_set set_diff, needs_all_rows_t needs_all_rows) {
|
||||
return with_gate(_gate, [this, set_diff = std::move(set_diff), needs_all_rows] () mutable {
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return to_repair_rows_on_wire(std::move(row_diff));
|
||||
@@ -1677,15 +1732,16 @@ public:
|
||||
|
||||
// RPC API
|
||||
// Send rows in the _working_row_buf with hash within the given sef_diff
|
||||
future<> put_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node) {
|
||||
future<> put_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto sz = set_diff.size();
|
||||
size_t sz = set_diff.size();
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, sz] (std::list<repair_row> row_diff) {
|
||||
if (row_diff.size() != sz) {
|
||||
throw std::runtime_error("row_diff.size() != set_diff.size()");
|
||||
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
|
||||
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
|
||||
}
|
||||
return do_with(std::move(row_diff), [this, remote_node] (std::list<repair_row>& row_diff) {
|
||||
return get_repair_rows_size(row_diff).then([this, remote_node, &row_diff] (size_t row_bytes) mutable {
|
||||
@@ -1752,17 +1808,18 @@ private:
|
||||
|
||||
public:
|
||||
future<> put_row_diff_with_rpc_stream(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
repair_hash_set set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto sz = set_diff.size();
|
||||
size_t sz = set_diff.size();
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, node_idx, sz] (std::list<repair_row> row_diff) {
|
||||
if (row_diff.size() != sz) {
|
||||
throw std::runtime_error("row_diff.size() != set_diff.size()");
|
||||
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
|
||||
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
|
||||
}
|
||||
return do_with(std::move(row_diff), [this, remote_node, node_idx] (std::list<repair_row>& row_diff) {
|
||||
return get_repair_rows_size(row_diff).then([this, remote_node, node_idx, &row_diff] (size_t row_bytes) mutable {
|
||||
@@ -1801,7 +1858,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source,
|
||||
bool &error,
|
||||
std::unordered_set<repair_hash>& current_set_diff,
|
||||
repair_hash_set& current_set_diff,
|
||||
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
|
||||
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
|
||||
rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd));
|
||||
@@ -1814,7 +1871,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
}
|
||||
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
|
||||
_metrics.rx_hashes_nr += current_set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(current_set_diff)));
|
||||
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(current_set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
if (fp.get_owner_shard() == this_shard_id()) {
|
||||
@@ -1892,12 +1949,12 @@ static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process
|
||||
if (status == repair_stream_cmd::get_full_row_hashes) {
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->get_full_row_hashes_handler().then([] (std::unordered_set<repair_hash> hashes) {
|
||||
return rm->get_full_row_hashes_handler().then([] (repair_hash_set hashes) {
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
return hashes;
|
||||
});
|
||||
}).then([sink] (std::unordered_set<repair_hash> hashes) mutable {
|
||||
return do_with(std::move(hashes), [sink] (std::unordered_set<repair_hash>& hashes) mutable {
|
||||
}).then([sink] (repair_hash_set hashes) mutable {
|
||||
return do_with(std::move(hashes), [sink] (repair_hash_set& hashes) mutable {
|
||||
return do_for_each(hashes, [sink] (const repair_hash& hash) mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}).then([sink] () mutable {
|
||||
@@ -1920,7 +1977,7 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source) {
|
||||
return do_with(false, std::unordered_set<repair_hash>(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, std::unordered_set<repair_hash>& current_set_diff) mutable {
|
||||
return do_with(false, repair_hash_set(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, repair_hash_set& current_set_diff) mutable {
|
||||
return repeat([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] () mutable {
|
||||
return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
|
||||
if (hash_cmd_opt) {
|
||||
@@ -1936,22 +1993,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1977,22 +2029,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2017,22 +2064,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2078,7 +2120,7 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->get_full_row_hashes_handler().then([] (std::unordered_set<repair_hash> hashes) {
|
||||
return rm->get_full_row_hashes_handler().then([] (repair_hash_set hashes) {
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
return hashes;
|
||||
});
|
||||
@@ -2106,11 +2148,11 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_row_diff([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
|
||||
repair_hash_set set_diff, bool needs_all_rows) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
_metrics.rx_hashes_nr += set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(set_diff)));
|
||||
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
if (fp.get_owner_shard() == this_shard_id()) {
|
||||
@@ -2178,6 +2220,25 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_uninit_messaging_service_handler() {
|
||||
return netw::get_messaging_service().invoke_on_all([] (auto& ms) {
|
||||
return when_all_succeed(
|
||||
ms.unregister_repair_get_row_diff_with_rpc_stream(),
|
||||
ms.unregister_repair_put_row_diff_with_rpc_stream(),
|
||||
ms.unregister_repair_get_full_row_hashes_with_rpc_stream(),
|
||||
ms.unregister_repair_get_full_row_hashes(),
|
||||
ms.unregister_repair_get_combined_row_hash(),
|
||||
ms.unregister_repair_get_sync_boundary(),
|
||||
ms.unregister_repair_get_row_diff(),
|
||||
ms.unregister_repair_put_row_diff(),
|
||||
ms.unregister_repair_row_level_start(),
|
||||
ms.unregister_repair_row_level_stop(),
|
||||
ms.unregister_repair_get_estimated_partitions(),
|
||||
ms.unregister_repair_set_estimated_partitions(),
|
||||
ms.unregister_repair_get_diff_algorithms()).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
class row_level_repair {
|
||||
repair_info& _ri;
|
||||
sstring _cf_name;
|
||||
@@ -2407,7 +2468,7 @@ private:
|
||||
// sequentially because the rows from repair follower 1 to
|
||||
// repair master might reduce the amount of missing data
|
||||
// between repair master and repair follower 2.
|
||||
std::unordered_set<repair_hash> set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes().get0());
|
||||
repair_hash_set set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes().get0());
|
||||
// Request missing sets from peer node
|
||||
rlogger.debug("Before get_row_diff to node {}, local={}, peer={}, set_diff={}",
|
||||
node, master.working_row_hashes().get0().size(), master.peer_row_hash_sets(node_idx).size(), set_diff.size());
|
||||
@@ -2430,9 +2491,9 @@ private:
|
||||
// So we can figure out which rows peer node are missing and send the missing rows to them
|
||||
check_in_shutdown();
|
||||
_ri.check_in_abort();
|
||||
std::unordered_set<repair_hash> local_row_hash_sets = master.working_row_hashes().get0();
|
||||
repair_hash_set local_row_hash_sets = master.working_row_hashes().get0();
|
||||
auto sz = _all_live_peer_nodes.size();
|
||||
std::vector<std::unordered_set<repair_hash>> set_diffs(sz);
|
||||
std::vector<repair_hash_set> set_diffs(sz);
|
||||
for (size_t idx : boost::irange(size_t(0), sz)) {
|
||||
set_diffs[idx] = repair_meta::get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx));
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ private:
|
||||
};
|
||||
|
||||
future<> repair_init_messaging_service_handler(repair_service& rs, distributed<db::system_distributed_keyspace>& sys_dist_ks, distributed<db::view::view_update_generator>& view_update_generator);
|
||||
future<> repair_uninit_messaging_service_handler();
|
||||
|
||||
class repair_info;
|
||||
|
||||
|
||||
20
row_cache.cc
20
row_cache.cc
@@ -528,8 +528,12 @@ public:
|
||||
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
|
||||
{
|
||||
if (!mfopt) {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
return _cache._read_section(_cache._tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
});
|
||||
});
|
||||
}
|
||||
_cache.on_partition_miss();
|
||||
const partition_start& ps = mfopt->as_partition_start();
|
||||
@@ -952,13 +956,15 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
// expensive and we need to amortize it somehow.
|
||||
do {
|
||||
STAP_PROBE(scylla, row_cache_update_partition_start);
|
||||
with_linearized_managed_bytes([&] {
|
||||
{
|
||||
if (!update) {
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
memtable_entry& mem_e = *m.partitions.begin();
|
||||
size_entry = mem_e.size_in_allocator_without_rows(_tracker.allocator());
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
||||
update = updater(_update_section, cache_i, mem_e, is_present, real_dirty_acc);
|
||||
});
|
||||
});
|
||||
}
|
||||
// We use cooperative deferring instead of futures so that
|
||||
@@ -970,14 +976,16 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
update = {};
|
||||
real_dirty_acc.unpin_memory(size_entry);
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto i = m.partitions.begin();
|
||||
memtable_entry& mem_e = *i;
|
||||
m.partitions.erase(i);
|
||||
mem_e.partition().evict(_tracker.memtable_cleaner());
|
||||
current_allocator().destroy(&mem_e);
|
||||
});
|
||||
});
|
||||
++partition_count;
|
||||
});
|
||||
}
|
||||
STAP_PROBE(scylla, row_cache_update_partition_end);
|
||||
} while (!m.partitions.empty() && !need_preempt());
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
@@ -1124,8 +1132,8 @@ future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&
|
||||
seastar::thread::maybe_yield();
|
||||
|
||||
while (true) {
|
||||
auto done = with_linearized_managed_bytes([&] {
|
||||
return _update_section(_tracker.region(), [&] {
|
||||
auto done = _update_section(_tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
auto it = _partitions.lower_bound(*_prev_snapshot_pos, cmp);
|
||||
auto end = _partitions.lower_bound(dht::ring_position_view::for_range_end(range), cmp);
|
||||
|
||||
@@ -79,7 +79,8 @@ executables = ['build/{}/scylla'.format(args.mode),
|
||||
'/usr/sbin/ethtool',
|
||||
'/usr/bin/netstat',
|
||||
'/usr/bin/hwloc-distrib',
|
||||
'/usr/bin/hwloc-calc']
|
||||
'/usr/bin/hwloc-calc',
|
||||
'/usr/bin/lsblk']
|
||||
|
||||
output = args.dest
|
||||
|
||||
|
||||
@@ -597,7 +597,7 @@ def current_shard():
|
||||
|
||||
|
||||
def find_db(shard=None):
|
||||
if not shard:
|
||||
if shard is None:
|
||||
shard = current_shard()
|
||||
return gdb.parse_and_eval('::debug::db')['_instances']['_M_impl']['_M_start'][shard]['service']['_p']
|
||||
|
||||
|
||||
@@ -63,6 +63,17 @@ MemoryHigh=1200M
|
||||
MemoryMax=1400M
|
||||
MemoryLimit=1400M
|
||||
EOS
|
||||
|
||||
# On CentOS7, systemd does not support percentage-based parameter.
|
||||
# To apply memory parameter on CentOS7, we need to override the parameter
|
||||
# in bytes, instead of percentage.
|
||||
elif [ "$RHEL" -a "$VERSION_ID" = "7" ]; then
|
||||
MEMORY_LIMIT=$((MEMTOTAL_BYTES / 100 * 5))
|
||||
mkdir -p /etc/systemd/system/scylla-helper.slice.d/
|
||||
cat << EOS > /etc/systemd/system/scylla-helper.slice.d/memory.conf
|
||||
[Slice]
|
||||
MemoryLimit=$MEMORY_LIMIT
|
||||
EOS
|
||||
fi
|
||||
|
||||
systemctl --system daemon-reload >/dev/null || true
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: e708d1df3a...88b6f0172c
@@ -25,6 +25,7 @@
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include <boost/range/algorithm/for_each.hpp>
|
||||
#include "utils/small_vector.hh"
|
||||
#include <absl/container/btree_set.h>
|
||||
|
||||
namespace ser {
|
||||
|
||||
@@ -81,6 +82,17 @@ static inline void serialize_array(Output& out, const Container& v) {
|
||||
template<typename Container>
|
||||
struct container_traits;
|
||||
|
||||
template<typename T>
|
||||
struct container_traits<absl::btree_set<T>> {
|
||||
struct back_emplacer {
|
||||
absl::btree_set<T>& c;
|
||||
back_emplacer(absl::btree_set<T>& c_) : c(c_) {}
|
||||
void operator()(T&& v) {
|
||||
c.emplace(std::move(v));
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
struct container_traits<std::unordered_set<T>> {
|
||||
struct back_emplacer {
|
||||
@@ -253,6 +265,27 @@ struct serializer<std::list<T>> {
|
||||
}
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
struct serializer<absl::btree_set<T>> {
|
||||
template<typename Input>
|
||||
static absl::btree_set<T> read(Input& in) {
|
||||
auto sz = deserialize(in, boost::type<uint32_t>());
|
||||
absl::btree_set<T> v;
|
||||
deserialize_array_helper<false, T>::doit(in, v, sz);
|
||||
return v;
|
||||
}
|
||||
template<typename Output>
|
||||
static void write(Output& out, const absl::btree_set<T>& v) {
|
||||
safe_serialize_as_uint32(out, v.size());
|
||||
serialize_array_helper<false, T>::doit(out, v);
|
||||
}
|
||||
template<typename Input>
|
||||
static void skip(Input& in) {
|
||||
auto sz = deserialize(in, boost::type<uint32_t>());
|
||||
skip_array<T>(in, sz);
|
||||
}
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
struct serializer<std::unordered_set<T>> {
|
||||
template<typename Input>
|
||||
|
||||
@@ -92,7 +92,7 @@ void migration_manager::init_messaging_service()
|
||||
//FIXME: future discarded.
|
||||
(void)with_gate(_background_tasks, [this] {
|
||||
mlogger.debug("features changed, recalculating schema version");
|
||||
return update_schema_version_and_announce(get_storage_proxy(), _feat.cluster_schema_features());
|
||||
return db::schema_tables::recalculate_schema_version(get_storage_proxy(), _feat);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
@@ -5063,18 +5063,22 @@ void storage_proxy::init_messaging_service() {
|
||||
future<> storage_proxy::uninit_messaging_service() {
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
return when_all_succeed(
|
||||
ms.unregister_counter_mutation(),
|
||||
ms.unregister_mutation(),
|
||||
ms.unregister_hint_mutation(),
|
||||
ms.unregister_mutation_done(),
|
||||
ms.unregister_mutation_failed(),
|
||||
ms.unregister_read_data(),
|
||||
ms.unregister_read_mutation_data(),
|
||||
ms.unregister_read_digest(),
|
||||
ms.unregister_truncate(),
|
||||
ms.unregister_get_schema_version(),
|
||||
ms.unregister_paxos_prepare(),
|
||||
ms.unregister_paxos_accept(),
|
||||
ms.unregister_paxos_learn(),
|
||||
ms.unregister_paxos_prune()
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>>
|
||||
@@ -5167,8 +5171,7 @@ future<> storage_proxy::drain_on_shutdown() {
|
||||
|
||||
future<>
|
||||
storage_proxy::stop() {
|
||||
// FIXME: hints manager should be stopped here but it seems like this function is never called
|
||||
return uninit_messaging_service();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -299,7 +299,6 @@ private:
|
||||
cdc::cdc_service* _cdc = nullptr;
|
||||
cdc_stats _cdc_stats;
|
||||
private:
|
||||
future<> uninit_messaging_service();
|
||||
future<coordinator_query_result> query_singular(lw_shared_ptr<query::read_command> cmd,
|
||||
dht::partition_range_vector&& partition_ranges,
|
||||
db::consistency_level cl,
|
||||
@@ -453,6 +452,7 @@ public:
|
||||
return next;
|
||||
}
|
||||
void init_messaging_service();
|
||||
future<> uninit_messaging_service();
|
||||
|
||||
// Applies mutation on this node.
|
||||
// Resolves with timed_out_error when timeout is reached.
|
||||
|
||||
@@ -420,6 +420,9 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
|
||||
app_states.emplace(gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts));
|
||||
app_states.emplace(gms::application_state::STATUS, versioned_value::normal(my_tokens));
|
||||
}
|
||||
if (replacing_a_node_with_same_ip || replacing_a_node_with_diff_ip) {
|
||||
app_states.emplace(gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens));
|
||||
}
|
||||
slogger.info("Starting up server gossip");
|
||||
|
||||
auto generation_number = db::system_keyspace::increment_and_get_generation().get0();
|
||||
@@ -973,7 +976,11 @@ void storage_service::bootstrap() {
|
||||
} else {
|
||||
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata);
|
||||
// Does the actual streaming of newly replicated token ranges.
|
||||
bs.bootstrap().get();
|
||||
if (db().local().is_replacing()) {
|
||||
bs.bootstrap(streaming::stream_reason::replace).get();
|
||||
} else {
|
||||
bs.bootstrap(streaming::stream_reason::bootstrap).get();
|
||||
}
|
||||
}
|
||||
_db.invoke_on_all([this] (database& db) {
|
||||
for (auto& cf : db.get_non_system_column_families()) {
|
||||
@@ -1040,12 +1047,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
@@ -2602,11 +2613,8 @@ future<> storage_service::drain() {
|
||||
ss.do_stop_ms().get();
|
||||
|
||||
// Interrupt on going compaction and shutdown to prevent further compaction
|
||||
// No new compactions will be started from this call site on, but we don't need
|
||||
// to wait for them to stop. Drain leaves the node alive, and a future shutdown
|
||||
// will wait on the compaction_manager stop future.
|
||||
ss.db().invoke_on_all([] (auto& db) {
|
||||
db.get_compaction_manager().do_stop();
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
|
||||
ss.set_mode(mode::DRAINING, "flushing column families", false);
|
||||
|
||||
@@ -548,6 +548,7 @@ private:
|
||||
}
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0;
|
||||
virtual bool use_interposer_consumer() const = 0;
|
||||
|
||||
compaction_info finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) {
|
||||
_info->ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(ended_at.time_since_epoch()).count();
|
||||
@@ -629,8 +630,10 @@ public:
|
||||
return garbage_collected_sstable_writer(_gc_sstable_writer_data);
|
||||
}
|
||||
|
||||
bool contains_multi_fragment_runs() const {
|
||||
return _contains_multi_fragment_runs;
|
||||
bool enable_garbage_collected_sstable_writer() const {
|
||||
// FIXME: Disable GC writer if interposer consumer is enabled until they both can work simultaneously.
|
||||
// More details can be found at https://github.com/scylladb/scylla/issues/6472
|
||||
return _contains_multi_fragment_runs && !use_interposer_consumer();
|
||||
}
|
||||
|
||||
template <typename GCConsumer = noop_compacted_fragments_consumer>
|
||||
@@ -740,6 +743,10 @@ public:
|
||||
return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return _cf.get_compaction_strategy().use_interposer_consumer();
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Compacting {}", formatted_msg);
|
||||
}
|
||||
@@ -820,7 +827,7 @@ private:
|
||||
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
|
||||
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
|
||||
// meaning incremental compaction is disabled for this compaction.
|
||||
if (!_contains_multi_fragment_runs) {
|
||||
if (!enable_garbage_collected_sstable_writer()) {
|
||||
return;
|
||||
}
|
||||
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
|
||||
@@ -1180,11 +1187,8 @@ private:
|
||||
// return estimated partitions per sstable for a given shard
|
||||
uint64_t partitions_per_sstable(shard_id s) const {
|
||||
uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size)));
|
||||
// As we adjust this estimate downwards from the compaction strategy, it can get to 0 so
|
||||
// make sure we're returning at least 1.
|
||||
return std::max(uint64_t(1),
|
||||
std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)),
|
||||
_cf.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions)));
|
||||
return std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)),
|
||||
_cf.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions));
|
||||
}
|
||||
public:
|
||||
resharding_compaction(column_family& cf, sstables::compaction_descriptor descriptor)
|
||||
@@ -1238,6 +1242,10 @@ public:
|
||||
};
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Resharding {}", formatted_msg);
|
||||
}
|
||||
@@ -1330,7 +1338,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf)
|
||||
cf.schema()->ks_name(), cf.schema()->cf_name()));
|
||||
}
|
||||
auto c = make_compaction(cf, std::move(descriptor));
|
||||
if (c->contains_multi_fragment_runs()) {
|
||||
if (c->enable_garbage_collected_sstable_writer()) {
|
||||
auto gc_writer = c->make_garbage_collected_sstable_writer();
|
||||
return compaction::run(std::move(c), std::move(gc_writer));
|
||||
}
|
||||
|
||||
@@ -92,6 +92,9 @@ public:
|
||||
void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true);
|
||||
void revert_charges(sstables::shared_sstable sst);
|
||||
private:
|
||||
// Returns true if this SSTable can be added or removed from the tracker.
|
||||
bool sstable_belongs_to_tracker(const sstables::shared_sstable& sst);
|
||||
|
||||
void disable() {
|
||||
_disabled = true;
|
||||
_ongoing_writes = {};
|
||||
|
||||
@@ -218,7 +218,7 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const c
|
||||
auto& cs = cf.get_compaction_strategy();
|
||||
|
||||
// Filter out sstables that are being compacted.
|
||||
for (auto& sst : cf.candidates_for_compaction()) {
|
||||
for (auto& sst : cf.non_staging_sstables()) {
|
||||
if (_compacting_sstables.count(sst)) {
|
||||
continue;
|
||||
}
|
||||
@@ -357,7 +357,7 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
|
||||
});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory)
|
||||
: _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
|
||||
auto b = backlog() / available_memory;
|
||||
// This means we are using an unimplemented strategy
|
||||
@@ -372,26 +372,17 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares)
|
||||
: _compaction_controller(sg, iop, shares)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
, _available_memory(available_memory)
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager()
|
||||
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(1)
|
||||
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
{}
|
||||
|
||||
compaction_manager::~compaction_manager() {
|
||||
@@ -455,17 +446,11 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop() {
|
||||
do_stop();
|
||||
return std::move(*_stop_future);
|
||||
}
|
||||
|
||||
void compaction_manager::do_stop() {
|
||||
if (_stopped) {
|
||||
return;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
_stopped = true;
|
||||
cmlog.info("Asked to stop");
|
||||
_stopped = true;
|
||||
// Reset the metrics registry
|
||||
_metrics.clear();
|
||||
// Stop all ongoing compaction.
|
||||
@@ -475,10 +460,7 @@ void compaction_manager::do_stop() {
|
||||
// Wait for each task handler to stop. Copy list because task remove itself
|
||||
// from the list when done.
|
||||
auto tasks = _tasks;
|
||||
|
||||
// fine to ignore here, since it is used to set up the shared promise in
|
||||
// the finally block. Waiters will wait on the shared_future through stop().
|
||||
_stop_future.emplace(do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return parallel_for_each(tasks, [this] (auto& task) {
|
||||
return this->task_stop(task);
|
||||
});
|
||||
@@ -490,7 +472,7 @@ void compaction_manager::do_stop() {
|
||||
_compaction_submission_timer.cancel();
|
||||
cmlog.info("Stopped");
|
||||
return _compaction_controller.shutdown();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
|
||||
@@ -523,7 +505,8 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
|
||||
} catch (storage_io_error& e) {
|
||||
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
|
||||
retry = false;
|
||||
do_stop();
|
||||
// FIXME discarded future.
|
||||
(void)stop();
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}: {}", std::current_exception(), decision_msg);
|
||||
retry = true;
|
||||
@@ -680,8 +663,8 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
static bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
const dht::token_range_vector& owned_ranges,
|
||||
bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
const dht::token_range_vector& sorted_owned_ranges,
|
||||
schema_ptr s) {
|
||||
auto first = sst->get_first_partition_key();
|
||||
auto last = sst->get_last_partition_key();
|
||||
@@ -689,29 +672,40 @@ static bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
auto last_token = dht::get_token(*s, last);
|
||||
dht::token_range sst_token_range = dht::token_range::make(first_token, last_token);
|
||||
|
||||
auto r = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), first_token,
|
||||
[] (const range<dht::token>& a, const dht::token& b) {
|
||||
// check that range a is before token b.
|
||||
return a.after(b, dht::token_comparator());
|
||||
});
|
||||
|
||||
// return true iff sst partition range isn't fully contained in any of the owned ranges.
|
||||
for (auto& r : owned_ranges) {
|
||||
if (r.contains(sst_token_range, dht::token_comparator())) {
|
||||
if (r != sorted_owned_ranges.end()) {
|
||||
if (r->contains(sst_token_range, dht::token_comparator())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
future<> compaction_manager::perform_cleanup(column_family* cf) {
|
||||
future<> compaction_manager::perform_cleanup(database& db, column_family* cf) {
|
||||
if (check_for_cleanup(cf)) {
|
||||
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
|
||||
cf->schema()->ks_name(), cf->schema()->cf_name()));
|
||||
}
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), [this] (const table& table) {
|
||||
auto schema = table.schema();
|
||||
auto owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
|
||||
return seastar::async([this, cf, &db] {
|
||||
auto schema = cf->schema();
|
||||
auto& rs = db.find_keyspace(schema->ks_name()).get_replication_strategy();
|
||||
auto sorted_owned_ranges = rs.get_ranges_in_thread(utils::fb_utilities::get_broadcast_address());
|
||||
auto sstables = std::vector<sstables::shared_sstable>{};
|
||||
const auto candidates = table.candidates_for_compaction();
|
||||
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&owned_ranges, schema] (const sstables::shared_sstable& sst) {
|
||||
return owned_ranges.empty() || needs_cleanup(sst, owned_ranges, schema);
|
||||
const auto candidates = get_candidates(*cf);
|
||||
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) {
|
||||
seastar::thread::maybe_yield();
|
||||
return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema);
|
||||
});
|
||||
return sstables;
|
||||
}).then([this, cf] (std::vector<sstables::shared_sstable> sstables) {
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(),
|
||||
[sstables = std::move(sstables)] (const table&) { return sstables; });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -726,7 +720,7 @@ future<> compaction_manager::perform_sstable_upgrade(column_family* cf, bool exc
|
||||
return cf->run_with_compaction_disabled([this, cf, &tables, exclude_current_version] {
|
||||
auto last_version = cf->get_sstables_manager().get_highest_supported_format();
|
||||
|
||||
for (auto& sst : cf->candidates_for_compaction()) {
|
||||
for (auto& sst : get_candidates(*cf)) {
|
||||
// if we are a "normal" upgrade, we only care about
|
||||
// tables with older versions, but potentially
|
||||
// we are to actually rewrite everything. (-a)
|
||||
@@ -742,8 +736,8 @@ future<> compaction_manager::perform_sstable_upgrade(column_family* cf, bool exc
|
||||
// Note that we potentially could be doing multiple
|
||||
// upgrades here in parallel, but that is really the users
|
||||
// problem.
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_upgrade(), [&](auto&) {
|
||||
return tables;
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_upgrade(), [&](auto&) mutable {
|
||||
return std::exchange(tables, {});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -751,8 +745,8 @@ future<> compaction_manager::perform_sstable_upgrade(column_family* cf, bool exc
|
||||
|
||||
// Submit a column family to be scrubbed and wait for its termination.
|
||||
future<> compaction_manager::perform_sstable_scrub(column_family* cf, bool skip_corrupted) {
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_scrub(skip_corrupted), [] (const table& cf) {
|
||||
return cf.candidates_for_compaction();
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_scrub(skip_corrupted), [this] (const table& cf) {
|
||||
return get_candidates(cf);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -829,7 +823,7 @@ double compaction_backlog_tracker::backlog() const {
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled) {
|
||||
if (_disabled || !sstable_belongs_to_tracker(sst)) {
|
||||
return;
|
||||
}
|
||||
_ongoing_writes.erase(sst);
|
||||
@@ -842,7 +836,7 @@ void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled) {
|
||||
if (_disabled || !sstable_belongs_to_tracker(sst)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -855,6 +849,10 @@ void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool compaction_backlog_tracker::sstable_belongs_to_tracker(const sstables::shared_sstable& sst) {
|
||||
return !sst->requires_view_building();
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp) {
|
||||
if (_disabled) {
|
||||
return;
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include <seastar/core/scheduling.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "log.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include <vector>
|
||||
@@ -70,9 +69,6 @@ private:
|
||||
|
||||
// Used to assert that compaction_manager was explicitly stopped, if started.
|
||||
bool _stopped = true;
|
||||
// We use a shared promise to indicate whether or not we are stopped because it is legal
|
||||
// for stop() to be called twice. For instance it is called on DRAIN and shutdown.
|
||||
std::optional<future<>> _stop_future;
|
||||
|
||||
stats _stats;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
@@ -153,10 +149,9 @@ private:
|
||||
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
|
||||
|
||||
future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func);
|
||||
optimized_optional<abort_source::subscription> _early_abort_subscription;
|
||||
public:
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares);
|
||||
compaction_manager();
|
||||
~compaction_manager();
|
||||
|
||||
@@ -165,13 +160,9 @@ public:
|
||||
// Start compaction manager.
|
||||
void start();
|
||||
|
||||
// Stop all fibers. Ongoing compactions will be waited. Should only be called
|
||||
// once, from main teardown path.
|
||||
// Stop all fibers. Ongoing compactions will be waited.
|
||||
future<> stop();
|
||||
|
||||
// Stop all fibers, without waiting. Safe to be called multiple times.
|
||||
void do_stop();
|
||||
|
||||
bool stopped() const { return _stopped; }
|
||||
|
||||
// Submit a column family to be compacted.
|
||||
@@ -184,7 +175,7 @@ public:
|
||||
// Cleanup is about discarding keys that are no longer relevant for a
|
||||
// given sstable, e.g. after node loses part of its token range because
|
||||
// of a newly added node.
|
||||
future<> perform_cleanup(column_family* cf);
|
||||
future<> perform_cleanup(database& db, column_family* cf);
|
||||
|
||||
// Submit a column family to be upgraded and wait for its termination.
|
||||
future<> perform_sstable_upgrade(column_family* cf, bool exclude_current_version);
|
||||
@@ -252,3 +243,5 @@ public:
|
||||
friend class compaction_weight_registration;
|
||||
};
|
||||
|
||||
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s);
|
||||
|
||||
|
||||
@@ -440,8 +440,8 @@ std::unique_ptr<sstable_set_impl> leveled_compaction_strategy::make_sstable_set(
|
||||
return std::make_unique<partitioned_sstable_set>(std::move(schema));
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) {
|
||||
return std::make_unique<partitioned_sstable_set>(std::move(schema), use_level_metadata);
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata) {
|
||||
return sstables::sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
|
||||
}
|
||||
|
||||
compaction_descriptor compaction_strategy_impl::get_major_compaction_job(column_family& cf, std::vector<sstables::shared_sstable> candidates) {
|
||||
@@ -1080,6 +1080,10 @@ reader_consumer compaction_strategy::make_interposer_consumer(const mutation_sou
|
||||
return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool compaction_strategy::use_interposer_consumer() const {
|
||||
return _compaction_strategy_impl->use_interposer_consumer();
|
||||
}
|
||||
|
||||
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
|
||||
::shared_ptr<compaction_strategy_impl> impl;
|
||||
|
||||
|
||||
@@ -99,5 +99,9 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
virtual bool use_interposer_consumer() const {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ private:
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
uint32_t _promoted_index_end;
|
||||
uint64_t _promoted_index_end;
|
||||
uint64_t _position;
|
||||
uint64_t _partition_header_length = 0;
|
||||
std::optional<deletion_time> _deletion_time;
|
||||
@@ -401,9 +401,16 @@ private:
|
||||
auto indexes = std::move(entries_reader->_consumer.indexes);
|
||||
return entries_reader->_context.close().then([indexes = std::move(indexes), ex = std::move(ex)] () mutable {
|
||||
if (ex) {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
return do_with(std::move(indexes), [ex = std::move(ex)] (index_list& indexes) mutable {
|
||||
return parallel_for_each(indexes, [] (index_entry& ie) mutable {
|
||||
return ie.close_pi_stream();
|
||||
}).then_wrapped([ex = std::move(ex)] (future<>&& fut) mutable {
|
||||
fut.ignore_ready_future();
|
||||
return make_exception_future<index_list>(std::move(ex));
|
||||
});
|
||||
});
|
||||
}
|
||||
return std::move(indexes);
|
||||
return make_ready_future<index_list>(std::move(indexes));
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
@@ -745,6 +745,11 @@ public:
|
||||
, _run_identifier(cfg.run_identifier)
|
||||
, _write_regular_as_static(cfg.correctly_serialize_static_compact_in_mc && s.is_static_compact_table())
|
||||
{
|
||||
// This can be 0 in some cases, which is albeit benign, can wreak havoc
|
||||
// in lower-level writer code, so clamp it to [1, +inf) here, which is
|
||||
// exactly what callers used to do anyway.
|
||||
estimated_partitions = std::max(uint64_t(1), estimated_partitions);
|
||||
|
||||
_sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance());
|
||||
_sst.write_toc(_pc);
|
||||
_sst.create_data().get();
|
||||
|
||||
@@ -101,7 +101,7 @@ public:
|
||||
incremental_selector make_incremental_selector() const;
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_set_impl> make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true);
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata = true);
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run);
|
||||
|
||||
|
||||
@@ -2009,6 +2009,11 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer&
|
||||
, _tombstone_written(false)
|
||||
, _range_tombstones(s)
|
||||
{
|
||||
// This can be 0 in some cases, which is albeit benign, can wreak havoc
|
||||
// in lower-level writer code, so clamp it to [1, +inf) here, which is
|
||||
// exactly what callers used to do anyway.
|
||||
estimated_partitions = std::max(uint64_t(1), estimated_partitions);
|
||||
|
||||
_sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance(), utils::filter_format::k_l_format);
|
||||
_sst._pi_write.desired_block_size = cfg.promoted_index_block_size;
|
||||
_sst._correctly_serialize_non_compound_range_tombstones = cfg.correctly_serialize_non_compound_range_tombstones;
|
||||
|
||||
@@ -346,6 +346,10 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) override;
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) override;
|
||||
|
||||
virtual bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ enum class stream_reason : uint8_t {
|
||||
removenode,
|
||||
rebuild,
|
||||
repair,
|
||||
replace,
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -230,7 +230,7 @@ void stream_session::init_messaging_service_handler() {
|
||||
schema_ptr s = reader.schema();
|
||||
auto& pc = service::get_local_streaming_write_priority();
|
||||
|
||||
return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s,
|
||||
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
|
||||
cf->get_sstables_manager().configure_writer(),
|
||||
encoding_stats{}, pc).then([sst] {
|
||||
return sst->open_data();
|
||||
@@ -319,6 +319,15 @@ void stream_session::init_messaging_service_handler() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> stream_session::uninit_messaging_service_handler() {
|
||||
return when_all_succeed(
|
||||
ms().unregister_prepare_message(),
|
||||
ms().unregister_prepare_done_message(),
|
||||
ms().unregister_stream_mutation_fragments(),
|
||||
ms().unregister_stream_mutation_done(),
|
||||
ms().unregister_complete_message()).discard_result();
|
||||
}
|
||||
|
||||
distributed<database>* stream_session::_db;
|
||||
distributed<db::system_distributed_keyspace>* stream_session::_sys_dist_ks;
|
||||
distributed<db::view::view_update_generator>* stream_session::_view_update_generator;
|
||||
@@ -342,9 +351,13 @@ future<> stream_session::init_streaming_service(distributed<database>& db, distr
|
||||
// });
|
||||
return get_stream_manager().start().then([] {
|
||||
gms::get_local_gossiper().register_(get_local_stream_manager().shared_from_this());
|
||||
return _db->invoke_on_all([] (auto& db) {
|
||||
init_messaging_service_handler();
|
||||
});
|
||||
return smp::invoke_on_all([] { init_messaging_service_handler(); });
|
||||
});
|
||||
}
|
||||
|
||||
future<> stream_session::uninit_streaming_service() {
|
||||
return smp::invoke_on_all([] {
|
||||
return uninit_messaging_service_handler();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -142,6 +142,7 @@ private:
|
||||
using token = dht::token;
|
||||
using ring_position = dht::ring_position;
|
||||
static void init_messaging_service_handler();
|
||||
static future<> uninit_messaging_service_handler();
|
||||
static distributed<database>* _db;
|
||||
static distributed<db::system_distributed_keyspace>* _sys_dist_ks;
|
||||
static distributed<db::view::view_update_generator>* _view_update_generator;
|
||||
@@ -152,6 +153,7 @@ public:
|
||||
static database& get_local_db() { return _db->local(); }
|
||||
static distributed<database>& get_db() { return *_db; };
|
||||
static future<> init_streaming_service(distributed<database>& db, distributed<db::system_distributed_keyspace>& sys_dist_ks, distributed<db::view::view_update_generator>& view_update_generator);
|
||||
static future<> uninit_streaming_service();
|
||||
public:
|
||||
/**
|
||||
* Streaming endpoint.
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include "streaming/stream_mutation_fragments_cmd.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "frozen_mutation.hh"
|
||||
#include "mutation.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
@@ -203,15 +204,27 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
|
||||
}();
|
||||
|
||||
auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> {
|
||||
return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd>& sink) {
|
||||
return repeat([&sink, si, got_error_from_peer] () mutable {
|
||||
return si->reader(db::no_timeout).then([&sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable {
|
||||
if (mf && !(*got_error_from_peer)) {
|
||||
mutation_fragment_stream_validator validator(*(si->reader.schema()));
|
||||
return do_with(std::move(sink), std::move(validator), [si, got_error_from_peer] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd>& sink, mutation_fragment_stream_validator& validator) {
|
||||
return repeat([&sink, &validator, si, got_error_from_peer] () mutable {
|
||||
return si->reader(db::no_timeout).then([&sink, &validator, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable {
|
||||
if (*got_error_from_peer) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("Got status error code from peer"));
|
||||
}
|
||||
if (mf) {
|
||||
if (!validator(mf->mutation_fragment_kind())) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error(format("Stream reader mutation_fragment validator failed, previous={}, current={}",
|
||||
validator.previous_mutation_fragment_kind(), mf->mutation_fragment_kind())));
|
||||
}
|
||||
frozen_mutation_fragment fmf = freeze(*s, *mf);
|
||||
auto size = fmf.representation().size();
|
||||
streaming::get_local_stream_manager().update_progress(si->plan_id, si->id.addr, streaming::progress_info::direction::OUT, size);
|
||||
return sink(fmf, stream_mutation_fragments_cmd::mutation_fragment_data).then([] { return stop_iteration::no; });
|
||||
} else {
|
||||
if (!validator.on_end_of_stream()) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error(format("Stream reader mutation_fragment validator failed on end_of_stream, previous={}, current=end_of_stream",
|
||||
validator.previous_mutation_fragment_kind())));
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
|
||||
4
table.cc
4
table.cc
@@ -1393,7 +1393,7 @@ future<std::unordered_set<sstring>> table::get_sstables_by_partition_key(const s
|
||||
[this] (std::unordered_set<sstring>& filenames, lw_shared_ptr<sstables::sstable_set::incremental_selector>& sel, partition_key& pk) {
|
||||
return do_with(dht::decorated_key(dht::decorate_key(*_schema, pk)),
|
||||
[this, &filenames, &sel, &pk](dht::decorated_key& dk) mutable {
|
||||
auto sst = sel->select(dk).sstables;
|
||||
const auto& sst = sel->select(dk).sstables;
|
||||
auto hk = sstables::sstable::make_hashed_key(*_schema, dk.key());
|
||||
|
||||
return do_for_each(sst, [this, &filenames, &dk, hk = std::move(hk)] (std::vector<sstables::shared_sstable>::const_iterator::reference s) mutable {
|
||||
@@ -1422,7 +1422,7 @@ std::vector<sstables::shared_sstable> table::select_sstables(const dht::partitio
|
||||
return _sstables->select(range);
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> table::candidates_for_compaction() const {
|
||||
std::vector<sstables::shared_sstable> table::non_staging_sstables() const {
|
||||
return boost::copy_range<std::vector<sstables::shared_sstable>>(*get_sstables()
|
||||
| boost::adaptors::filtered([this] (auto& sst) {
|
||||
return !_sstables_need_rewrite.count(sst->generation()) && !_sstables_staging.count(sst->generation());
|
||||
|
||||
@@ -28,8 +28,8 @@ fi
|
||||
SCYLLA_IP=127.1.$(($$ >> 8 & 255)).$(($$ & 255))
|
||||
echo "Running Scylla on $SCYLLA_IP"
|
||||
|
||||
tmp_dir=/tmp/alternator-test-$$
|
||||
mkdir $tmp_dir
|
||||
tmp_dir="$(readlink -e ${TMPDIR-/tmp})"/alternator-test-$$
|
||||
mkdir "$tmp_dir"
|
||||
|
||||
# We run the cleanup() function on exit for any reason - successful finish
|
||||
# of the script, an error (since we have "set -e"), or a signal.
|
||||
@@ -73,6 +73,7 @@ done
|
||||
--alternator-address $SCYLLA_IP \
|
||||
$alternator_port_option \
|
||||
--alternator-enforce-authorization=1 \
|
||||
--alternator-write-isolation=always_use_lwt \
|
||||
--developer-mode=1 \
|
||||
--ring-delay-ms 0 --collectd 0 \
|
||||
--smp 2 -m 1G \
|
||||
|
||||
@@ -305,3 +305,16 @@ def test_batch_get_item_projection_expression(test_table):
|
||||
got_items = reply['Responses'][test_table.name]
|
||||
expected_items = [{k: item[k] for k in wanted if k in item} for item in items]
|
||||
assert multiset(got_items) == multiset(expected_items)
|
||||
|
||||
# Test that we return the required UnprocessedKeys/UnprocessedItems parameters
|
||||
def test_batch_unprocessed(test_table_s):
|
||||
p = random_string()
|
||||
write_reply = test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': p, 'a': 'hi'}}}],
|
||||
})
|
||||
assert 'UnprocessedItems' in write_reply and write_reply['UnprocessedItems'] == dict()
|
||||
|
||||
read_reply = test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': p}], 'ProjectionExpression': 'p, a', 'ConsistentRead': True}
|
||||
})
|
||||
assert 'UnprocessedKeys' in read_reply and read_reply['UnprocessedKeys'] == dict()
|
||||
|
||||
@@ -1351,3 +1351,37 @@ def test_condition_expression_with_forbidden_rmw(scylla_only, dynamodb, test_tab
|
||||
assert test_table_s.get_item(Key={'p': s}, ConsistentRead=True)['Item'] == {'p': s, 'regular': 'write'}
|
||||
test_table_s.update_item(Key={'p': s}, AttributeUpdates={'write': {'Value': 'regular', 'Action': 'PUT'}})
|
||||
assert test_table_s.get_item(Key={'p': s}, ConsistentRead=True)['Item'] == {'p': s, 'regular': 'write', 'write': 'regular'}
|
||||
|
||||
# Reproducer for issue #6573: binary strings should be ordered as unsigned
|
||||
# bytes, i.e., byte 128 comes after 127, not before as with signed bytes.
|
||||
# Test the five ordering operators: <, <=, >, >=, between
|
||||
def test_condition_expression_unsigned_bytes(test_table_s):
|
||||
p = random_string()
|
||||
test_table_s.put_item(Item={'p': p, 'b': bytearray([127])})
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='b < :oldval',
|
||||
ExpressionAttributeValues={':newval': 1, ':oldval': bytearray([128])})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 1
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='b <= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': bytearray([128])})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 2
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='b between :oldval1 and :oldval2',
|
||||
ExpressionAttributeValues={':newval': 3, ':oldval1': bytearray([126]), ':oldval2': bytearray([128])})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 3
|
||||
|
||||
test_table_s.put_item(Item={'p': p, 'b': bytearray([128])})
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='b > :oldval',
|
||||
ExpressionAttributeValues={':newval': 4, ':oldval': bytearray([127])})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 4
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='b >= :oldval',
|
||||
ExpressionAttributeValues={':newval': 5, ':oldval': bytearray([127])})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 5
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user