Compare commits
52 Commits
next
...
scylla-4.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e6c6d5f58 | ||
|
|
564b4c32b0 | ||
|
|
dfafc4e1a9 | ||
|
|
db286c5ca4 | ||
|
|
519fcd4729 | ||
|
|
9bcbcbbcf2 | ||
|
|
c622e5bfab | ||
|
|
905643bbc2 | ||
|
|
d396a298d6 | ||
|
|
1d9bbbc957 | ||
|
|
4f1878803e | ||
|
|
c5e2fad1c8 | ||
|
|
abd0fa52c0 | ||
|
|
dfa464c35b | ||
|
|
be29b35c4b | ||
|
|
97b7024c0c | ||
|
|
194ff1d226 | ||
|
|
b8f7fb35e1 | ||
|
|
f7d53ff607 | ||
|
|
eb190643f8 | ||
|
|
3f8345f1b8 | ||
|
|
891a3fa243 | ||
|
|
db31542805 | ||
|
|
b443b2574a | ||
|
|
2ee321d88e | ||
|
|
4563f4b992 | ||
|
|
81dc8eeec7 | ||
|
|
2d72f7d8e5 | ||
|
|
c6ee86b512 | ||
|
|
67348cd6e8 | ||
|
|
44cc4843f1 | ||
|
|
f1f5586bf6 | ||
|
|
3a447cd755 | ||
|
|
176aa91be5 | ||
|
|
4a3eff17ff | ||
|
|
2e00f6d0a1 | ||
|
|
bf509c3b16 | ||
|
|
84ef30752f | ||
|
|
f1b71ec216 | ||
|
|
93ed536fba | ||
|
|
ab3da4510c | ||
|
|
bb8fcbff68 | ||
|
|
af43d0c62d | ||
|
|
8c8c266f67 | ||
|
|
6d1301d93c | ||
|
|
be545d6d5d | ||
|
|
a1c15f0690 | ||
|
|
4d68c53389 | ||
|
|
7d1f352be2 | ||
|
|
0fe5335447 | ||
|
|
8a026b8b14 | ||
|
|
0760107b9f |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.1.2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -573,29 +573,66 @@ static bool validate_legal_tag_chars(std::string_view tag) {
|
||||
return std::all_of(tag.begin(), tag.end(), &is_legal_tag_char);
|
||||
}
|
||||
|
||||
static const std::unordered_set<std::string_view> allowed_write_isolation_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
|
||||
static void validate_tags(const std::map<sstring, sstring>& tags) {
|
||||
static const std::unordered_set<std::string_view> allowed_values = {
|
||||
"f", "forbid", "forbid_rmw",
|
||||
"a", "always", "always_use_lwt",
|
||||
"o", "only_rmw_uses_lwt",
|
||||
"u", "unsafe", "unsafe_rmw",
|
||||
};
|
||||
auto it = tags.find(rmw_operation::WRITE_ISOLATION_TAG_KEY);
|
||||
if (it != tags.end()) {
|
||||
std::string_view value = it->second;
|
||||
elogger.warn("Allowed values count {} {}", value, allowed_values.count(value));
|
||||
if (allowed_values.count(value) == 0) {
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw api_error("ValidationException",
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_values));
|
||||
format("Incorrect write isolation tag {}. Allowed values: {}", value, allowed_write_isolation_values));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static rmw_operation::write_isolation parse_write_isolation(std::string_view value) {
|
||||
if (!value.empty()) {
|
||||
switch (value[0]) {
|
||||
case 'f':
|
||||
return rmw_operation::write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return rmw_operation::write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return rmw_operation::write_isolation::UNSAFE_RMW;
|
||||
}
|
||||
}
|
||||
// Shouldn't happen as validate_tags() / set_default_write_isolation()
|
||||
// verify allow only a closed set of values.
|
||||
return rmw_operation::default_write_isolation;
|
||||
|
||||
}
|
||||
// This default_write_isolation is always overwritten in main.cc, which calls
|
||||
// set_default_write_isolation().
|
||||
rmw_operation::write_isolation rmw_operation::default_write_isolation =
|
||||
rmw_operation::write_isolation::LWT_ALWAYS;
|
||||
void rmw_operation::set_default_write_isolation(std::string_view value) {
|
||||
if (value.empty()) {
|
||||
throw std::runtime_error("When Alternator is enabled, write "
|
||||
"isolation policy must be selected, using the "
|
||||
"'--alternator-write-isolation' option. "
|
||||
"See docs/alternator/alternator.md for instructions.");
|
||||
}
|
||||
if (allowed_write_isolation_values.count(value) == 0) {
|
||||
throw std::runtime_error(format("Invalid --alternator-write-isolation "
|
||||
"setting '{}'. Allowed values: {}.",
|
||||
value, allowed_write_isolation_values));
|
||||
}
|
||||
default_write_isolation = parse_write_isolation(value);
|
||||
}
|
||||
|
||||
// FIXME: Updating tags currently relies on updating schema, which may be subject
|
||||
// to races during concurrent updates of the same table. Once Scylla schema updates
|
||||
// are fixed, this issue will automatically get fixed as well.
|
||||
enum class update_tags_action { add_tags, delete_tags };
|
||||
static future<> update_tags(const rjson::value& tags, schema_ptr schema, std::map<sstring, sstring>&& tags_map, update_tags_action action) {
|
||||
static future<> update_tags(service::migration_manager& mm, const rjson::value& tags, schema_ptr schema, std::map<sstring, sstring>&& tags_map, update_tags_action action) {
|
||||
if (action == update_tags_action::add_tags) {
|
||||
for (auto it = tags.Begin(); it != tags.End(); ++it) {
|
||||
const rjson::value& key = (*it)["Key"];
|
||||
@@ -622,24 +659,12 @@ static future<> update_tags(const rjson::value& tags, schema_ptr schema, std::ma
|
||||
}
|
||||
validate_tags(tags_map);
|
||||
|
||||
std::stringstream serialized_tags;
|
||||
serialized_tags << '{';
|
||||
for (auto& tag_entry : tags_map) {
|
||||
serialized_tags << format("'{}':'{}',", tag_entry.first, tag_entry.second);
|
||||
}
|
||||
std::string serialized_tags_str = serialized_tags.str();
|
||||
if (!tags_map.empty()) {
|
||||
serialized_tags_str[serialized_tags_str.size() - 1] = '}'; // trims the last ',' delimiter
|
||||
} else {
|
||||
serialized_tags_str.push_back('}');
|
||||
}
|
||||
|
||||
sstring req = format("ALTER TABLE \"{}\".\"{}\" WITH {} = {}",
|
||||
schema->ks_name(), schema->cf_name(), tags_extension::NAME, serialized_tags_str);
|
||||
return db::execute_cql(std::move(req)).discard_result();
|
||||
schema_builder builder(schema);
|
||||
builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared<tags_extension>(std::move(tags_map))}});
|
||||
return mm.announce_column_family_update(builder.build(), false, std::vector<view_ptr>(), false);
|
||||
}
|
||||
|
||||
static future<> add_tags(service::storage_proxy& proxy, schema_ptr schema, rjson::value& request_info) {
|
||||
static future<> add_tags(service::migration_manager& mm, service::storage_proxy& proxy, schema_ptr schema, rjson::value& request_info) {
|
||||
const rjson::value* tags = rjson::find(request_info, "Tags");
|
||||
if (!tags || !tags->IsArray()) {
|
||||
return make_exception_future<>(api_error("ValidationException", format("Cannot parse tags")));
|
||||
@@ -649,7 +674,7 @@ static future<> add_tags(service::storage_proxy& proxy, schema_ptr schema, rjson
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
|
||||
return update_tags(rjson::copy(*tags), schema, std::move(tags_map), update_tags_action::add_tags);
|
||||
return update_tags(mm, rjson::copy(*tags), schema, std::move(tags_map), update_tags_action::add_tags);
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
@@ -661,7 +686,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
|
||||
return api_error("AccessDeniedException", "Incorrect resource identifier");
|
||||
}
|
||||
schema_ptr schema = get_table_from_arn(_proxy, std::string_view(arn->GetString(), arn->GetStringLength()));
|
||||
add_tags(_proxy, schema, request).get();
|
||||
add_tags(_mm, _proxy, schema, request).get();
|
||||
return json_string("");
|
||||
});
|
||||
}
|
||||
@@ -682,7 +707,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
|
||||
schema_ptr schema = get_table_from_arn(_proxy, std::string_view(arn->GetString(), arn->GetStringLength()));
|
||||
|
||||
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
|
||||
update_tags(*tags, schema, std::move(tags_map), update_tags_action::delete_tags).get();
|
||||
update_tags(_mm, *tags, schema, std::move(tags_map), update_tags_action::delete_tags).get();
|
||||
return json_string("");
|
||||
});
|
||||
}
|
||||
@@ -710,6 +735,17 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
|
||||
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
|
||||
}
|
||||
|
||||
static future<> wait_for_schema_agreement(db::timeout_clock::time_point deadline) {
|
||||
return do_until([deadline] {
|
||||
if (db::timeout_clock::now() > deadline) {
|
||||
throw std::runtime_error("Unable to reach schema agreement");
|
||||
}
|
||||
return service::get_local_migration_manager().have_schema_agreement();
|
||||
}, [] {
|
||||
return seastar::sleep(500ms);
|
||||
});
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
@@ -903,9 +939,11 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
}).then([this, table_info = std::move(table_info), schema] () mutable {
|
||||
future<> f = make_ready_future<>();
|
||||
if (rjson::find(table_info, "Tags")) {
|
||||
f = add_tags(_proxy, schema, table_info);
|
||||
f = add_tags(_mm, _proxy, schema, table_info);
|
||||
}
|
||||
return f.then([table_info = std::move(table_info), schema] () mutable {
|
||||
return f.then([] {
|
||||
return wait_for_schema_agreement(db::timeout_clock::now() + 10s);
|
||||
}).then([table_info = std::move(table_info), schema] () mutable {
|
||||
rjson::value status = rjson::empty_object();
|
||||
supplement_table_info(table_info, *schema);
|
||||
rjson::set(status, "TableDescription", std::move(table_info));
|
||||
@@ -1195,22 +1233,9 @@ rmw_operation::write_isolation rmw_operation::get_write_isolation_for_schema(sch
|
||||
const auto& tags = get_tags_of_table(schema);
|
||||
auto it = tags.find(WRITE_ISOLATION_TAG_KEY);
|
||||
if (it == tags.end() || it->second.empty()) {
|
||||
// By default, fall back to always enforcing LWT
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
}
|
||||
switch (it->second[0]) {
|
||||
case 'f':
|
||||
return write_isolation::FORBID_RMW;
|
||||
case 'a':
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
case 'o':
|
||||
return write_isolation::LWT_RMW_ONLY;
|
||||
case 'u':
|
||||
return write_isolation::UNSAFE_RMW;
|
||||
default:
|
||||
// In case of an incorrect tag, fall back to the safest option: LWT_ALWAYS
|
||||
return write_isolation::LWT_ALWAYS;
|
||||
return default_write_isolation;
|
||||
}
|
||||
return parse_write_isolation(it->second);
|
||||
}
|
||||
|
||||
// shard_for_execute() checks whether execute() must be called on a specific
|
||||
@@ -1241,11 +1266,6 @@ std::optional<shard_id> rmw_operation::shard_for_execute(bool needs_read_before_
|
||||
// PutItem, DeleteItem). All these return nothing by default, but can
|
||||
// optionally return Attributes if requested via the ReturnValues option.
|
||||
static future<executor::request_return_type> rmw_operation_return(rjson::value&& attributes) {
|
||||
// As an optimization, in the simple and common case that nothing is to be
|
||||
// returned, quickly return an empty result:
|
||||
if (attributes.IsNull()) {
|
||||
return make_ready_future<executor::request_return_type>(json_string(""));
|
||||
}
|
||||
rjson::value ret = rjson::empty_object();
|
||||
if (!attributes.IsNull()) {
|
||||
rjson::set(ret, "Attributes", std::move(attributes));
|
||||
@@ -1261,7 +1281,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
stats& stats) {
|
||||
if (needs_read_before_write) {
|
||||
if (_write_isolation == write_isolation::FORBID_RMW) {
|
||||
throw api_error("ValidationException", "Read-modify-write operations not supported");
|
||||
throw api_error("ValidationException", "Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
|
||||
}
|
||||
stats.reads_before_write++;
|
||||
if (_write_isolation == write_isolation::UNSAFE_RMW) {
|
||||
@@ -2811,6 +2831,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
[] (std::vector<std::tuple<std::string, std::optional<rjson::value>>> responses) {
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::set(response, "Responses", rjson::empty_object());
|
||||
rjson::set(response, "UnprocessedKeys", rjson::empty_object());
|
||||
for (auto& t : responses) {
|
||||
if (!response["Responses"].HasMember(std::get<0>(t).c_str())) {
|
||||
rjson::set_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array());
|
||||
@@ -3080,7 +3101,7 @@ static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_d
|
||||
if (attrs.Size() != 1) {
|
||||
throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs));
|
||||
}
|
||||
bytes raw_value = pk_cdef.type->from_string(attrs[0][type_to_string(pk_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], pk_cdef);
|
||||
partition_key pk = partition_key::from_singular(*schema, pk_cdef.type->deserialize(raw_value));
|
||||
auto decorated_key = dht::decorate_key(*schema, pk);
|
||||
if (op != comparison_operator_type::EQ) {
|
||||
@@ -3105,7 +3126,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (attrs.Size() != expected_attrs_size) {
|
||||
throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs));
|
||||
}
|
||||
bytes raw_value = ck_cdef.type->from_string(attrs[0][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_value = get_key_from_typed_value(attrs[0], ck_cdef);
|
||||
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
|
||||
switch (op) {
|
||||
case comparison_operator_type::EQ:
|
||||
@@ -3119,7 +3140,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
case comparison_operator_type::GT:
|
||||
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false));
|
||||
case comparison_operator_type::BETWEEN: {
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(attrs[1][type_to_string(ck_cdef.type)].GetString());
|
||||
bytes raw_upper_limit = get_key_from_typed_value(attrs[1], ck_cdef);
|
||||
clustering_key upper_limit = clustering_key::from_single_value(*schema, raw_upper_limit);
|
||||
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit));
|
||||
}
|
||||
@@ -3132,9 +3153,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
||||
if (!ck_cdef.type->is_compatible_with(*utf8_type)) {
|
||||
throw api_error("ValidationException", format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type)));
|
||||
}
|
||||
std::string raw_upper_limit_str = attrs[0][type_to_string(ck_cdef.type)].GetString();
|
||||
bytes raw_upper_limit = ck_cdef.type->from_string(raw_upper_limit_str);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_upper_limit), ck, schema, ck_cdef.type);
|
||||
return get_clustering_range_for_begins_with(std::move(raw_value), ck, schema, ck_cdef.type);
|
||||
}
|
||||
default:
|
||||
throw api_error("ValidationException", format("Unknown primary key bound passed: {}", int(op)));
|
||||
|
||||
@@ -63,6 +63,10 @@ public:
|
||||
|
||||
static write_isolation get_write_isolation_for_schema(schema_ptr schema);
|
||||
|
||||
static write_isolation default_write_isolation;
|
||||
public:
|
||||
static void set_default_write_isolation(std::string_view mode);
|
||||
|
||||
protected:
|
||||
// The full request JSON
|
||||
rjson::value _request;
|
||||
|
||||
@@ -54,26 +54,22 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) {
|
||||
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
|
||||
}
|
||||
|
||||
static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
|
||||
std::vector<ss::token_range> res;
|
||||
for (auto d : service::get_local_storage_service().describe_ring(keyspace)) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
r.endpoint_details.push(ed);
|
||||
static ss::token_range token_range_endpoints_to_json(const dht::token_range_endpoints& d) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
res.push_back(r);
|
||||
r.endpoint_details.push(ed);
|
||||
}
|
||||
return res;
|
||||
return r;
|
||||
}
|
||||
|
||||
using ks_cf_func = std::function<future<json::json_return_type>(http_context&, std::unique_ptr<request>, sstring, std::vector<sstring>)>;
|
||||
@@ -192,13 +188,13 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
|
||||
ss::describe_any_ring.set(r, [&ctx](const_req req) {
|
||||
return describe_ring("");
|
||||
ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::describe_ring.set(r, [&ctx](const_req req) {
|
||||
auto keyspace = validate_keyspace(ctx, req.param);
|
||||
return describe_ring(keyspace);
|
||||
ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::get_host_id_map.set(r, [&ctx](const_req req) {
|
||||
|
||||
@@ -40,7 +40,8 @@ static dht::token to_token(int64_t value) {
|
||||
}
|
||||
|
||||
static dht::token to_token(bytes_view key) {
|
||||
if (key.empty()) {
|
||||
// Key should be 16 B long, of which first 8 B are used for token calculation
|
||||
if (key.size() != 2*sizeof(int64_t)) {
|
||||
return dht::minimum_token();
|
||||
}
|
||||
return to_token(stream_id::token_from_bytes(key));
|
||||
|
||||
@@ -130,7 +130,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
|
||||
*/
|
||||
future<db_clock::time_point> get_local_streams_timestamp();
|
||||
|
||||
/* Generate a new set of CDC streams and insert it into the distributed cdc_topology_description table.
|
||||
/* Generate a new set of CDC streams and insert it into the distributed cdc_generations table.
|
||||
* Returns the timestamp of this new generation.
|
||||
*
|
||||
* Should be called when starting the node for the first time (i.e., joining the ring).
|
||||
@@ -159,9 +159,9 @@ db_clock::time_point make_new_cdc_generation(
|
||||
std::optional<db_clock::time_point> get_streams_timestamp_for(const gms::inet_address& endpoint, const gms::gossiper&);
|
||||
|
||||
/* Inform CDC users about a generation of streams (identified by the given timestamp)
|
||||
* by inserting it into the cdc_description table.
|
||||
* by inserting it into the cdc_streams table.
|
||||
*
|
||||
* Assumes that the cdc_topology_description table contains this generation.
|
||||
* Assumes that the cdc_generations table contains this generation.
|
||||
*
|
||||
* Returning from this function does not mean that the table update was successful: the function
|
||||
* might run an asynchronous task in the background.
|
||||
|
||||
@@ -140,6 +140,9 @@ public:
|
||||
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
// Returns whether or not interposer consumer is used by a given strategy.
|
||||
bool use_interposer_consumer() const;
|
||||
};
|
||||
|
||||
// Creates a compaction_strategy object from one of the strategies available.
|
||||
|
||||
@@ -88,16 +88,13 @@ static data_value castas_fctn_simple(data_value from) {
|
||||
template<typename ToType>
|
||||
static data_value castas_fctn_from_decimal_to_float(data_value from) {
|
||||
auto val_from = value_cast<big_decimal>(from);
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
boost::multiprecision::cpp_rational r = val_from.unscaled_value();
|
||||
r /= boost::multiprecision::pow(ten, val_from.scale());
|
||||
return static_cast<ToType>(r);
|
||||
return static_cast<ToType>(val_from.as_rational());
|
||||
}
|
||||
|
||||
static utils::multiprecision_int from_decimal_to_cppint(const data_value& from) {
|
||||
const auto& val_from = value_cast<big_decimal>(from);
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
return boost::multiprecision::cpp_int(val_from.unscaled_value() / boost::multiprecision::pow(ten, val_from.scale()));
|
||||
auto r = val_from.as_rational();
|
||||
return utils::multiprecision_int(numerator(r)/denominator(r));
|
||||
}
|
||||
|
||||
template<typename ToType>
|
||||
|
||||
@@ -688,6 +688,11 @@ static query::range<bytes_view> to_range(const term_slice& slice, const query_op
|
||||
extract_bound(statements::bound::END));
|
||||
}
|
||||
|
||||
static bool contains_without_wraparound(
|
||||
const query::range<bytes_view>& range, bytes_view value, const serialized_tri_compare& cmp) {
|
||||
return !range.is_wrap_around(cmp) && range.contains(value, cmp);
|
||||
}
|
||||
|
||||
bool single_column_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
const partition_key& key,
|
||||
const clustering_key_prefix& ckey,
|
||||
@@ -702,13 +707,13 @@ bool single_column_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
return false;
|
||||
}
|
||||
return cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return to_range(_slice, options, _column_def.name_as_text()).contains(
|
||||
return contains_without_wraparound(to_range(_slice, options, _column_def.name_as_text()),
|
||||
cell_value_bv, _column_def.type->as_tri_comparator());
|
||||
});
|
||||
}
|
||||
|
||||
bool single_column_restriction::slice::is_satisfied_by(bytes_view data, const query_options& options) const {
|
||||
return to_range(_slice, options, _column_def.name_as_text()).contains(
|
||||
return contains_without_wraparound(to_range(_slice, options, _column_def.name_as_text()),
|
||||
data, _column_def.type->underlying_type()->as_tri_comparator());
|
||||
}
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ batch_statement::batch_statement(int bound_terms, type type_,
|
||||
, _has_conditions(boost::algorithm::any_of(_statements, [] (auto&& s) { return s.statement->has_conditions(); }))
|
||||
, _stats(stats)
|
||||
{
|
||||
validate();
|
||||
if (has_conditions()) {
|
||||
// A batch can be created not only by raw::batch_statement::prepare, but also by
|
||||
// cql_server::connection::process_batch, which doesn't call any methods of
|
||||
@@ -448,7 +449,6 @@ batch_statement::prepare(database& db, cql_stats& stats) {
|
||||
prep_attrs->collect_marker_specification(bound_names);
|
||||
|
||||
cql3::statements::batch_statement batch_statement_(bound_names.size(), _type, std::move(statements), std::move(prep_attrs), stats);
|
||||
batch_statement_.validate();
|
||||
|
||||
std::vector<uint16_t> partition_key_bind_indices;
|
||||
if (!have_multiple_cfs && batch_statement_.get_statements().size() > 0) {
|
||||
|
||||
12
database.cc
12
database.cc
@@ -113,11 +113,11 @@ make_flush_controller(const db::config& cfg, seastar::scheduling_group sg, const
|
||||
|
||||
inline
|
||||
std::unique_ptr<compaction_manager>
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg, abort_source& as) {
|
||||
make_compaction_manager(const db::config& cfg, database_config& dbcfg) {
|
||||
if (cfg.compaction_static_shares() > 0) {
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares(), as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, cfg.compaction_static_shares());
|
||||
}
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory, as);
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), dbcfg.available_memory);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
@@ -161,7 +161,7 @@ void keyspace::remove_user_type(const user_type ut) {
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as)
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm)
|
||||
: _stats(make_lw_shared<db_stats>())
|
||||
, _cl_stats(std::make_unique<cell_locker_stats>())
|
||||
, _cfg(cfg)
|
||||
@@ -198,7 +198,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _mutation_query_stage()
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _querier_cache(_read_concurrency_sem, dbcfg.available_memory * 0.04)
|
||||
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,
|
||||
@@ -1324,7 +1324,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), m.decorated_key(), slice, trace_state)
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), m.decorated_key(), slice, trace_state, timeout)
|
||||
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
|
||||
// ...now, that we got existing state of all affected counter
|
||||
// cells we can look for our shard in each of them, increment
|
||||
|
||||
@@ -1427,7 +1427,7 @@ public:
|
||||
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }
|
||||
|
||||
future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm, abort_source& as);
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::token_metadata& tm);
|
||||
database(database&&) = delete;
|
||||
~database();
|
||||
|
||||
|
||||
@@ -681,7 +681,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
@@ -736,6 +736,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator")
|
||||
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, redis_port(this, "redis_port", value_status::Used, 0, "Port on which the REDIS transport listens for clients.")
|
||||
, redis_ssl_port(this, "redis_ssl_port", value_status::Used, 0, "Port on which the REDIS TLS native transport listens for clients.")
|
||||
|
||||
@@ -314,6 +314,8 @@ public:
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<sstring> alternator_write_isolation;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
named_value<uint16_t> redis_port;
|
||||
|
||||
@@ -703,6 +703,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||
// (last_modification - manager::hints_timer_period) old.
|
||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -725,6 +726,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||
++this->shard_stats().discarded;
|
||||
}
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}).finally([units = std::move(units), ctx_ptr] {});
|
||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||
|
||||
@@ -72,7 +72,7 @@ schema_ptr view_build_status() {
|
||||
}
|
||||
|
||||
/* An internal table used by nodes to exchange CDC generation data. */
|
||||
schema_ptr cdc_topology_description() {
|
||||
schema_ptr cdc_generations() {
|
||||
thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION);
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION, {id})
|
||||
@@ -108,7 +108,7 @@ schema_ptr cdc_desc() {
|
||||
static std::vector<schema_ptr> all_tables() {
|
||||
return {
|
||||
view_build_status(),
|
||||
cdc_topology_description(),
|
||||
cdc_generations(),
|
||||
cdc_desc(),
|
||||
};
|
||||
}
|
||||
@@ -204,7 +204,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
|
||||
false).discard_result();
|
||||
}
|
||||
|
||||
/* We want to make sure that writes/reads to/from cdc_topology_description and cdc_description
|
||||
/* We want to make sure that writes/reads to/from cdc_generations and cdc_streams
|
||||
* are consistent: a read following an acknowledged write to the same partition should contact
|
||||
* at least one of the replicas that the write contacted.
|
||||
* Normally we would achieve that by always using CL = QUORUM,
|
||||
|
||||
@@ -48,10 +48,10 @@ public:
|
||||
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
|
||||
|
||||
/* Nodes use this table to communicate new CDC stream generations to other nodes. */
|
||||
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_topology_description";
|
||||
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_generations";
|
||||
|
||||
/* This table is used by CDC clients to learn about avaliable CDC streams. */
|
||||
static constexpr auto CDC_DESC = "cdc_description";
|
||||
static constexpr auto CDC_DESC = "cdc_streams";
|
||||
|
||||
/* Information required to modify/query some system_distributed tables, passed from the caller. */
|
||||
struct context {
|
||||
|
||||
@@ -38,11 +38,16 @@ future<> view_update_generator::start() {
|
||||
|
||||
// If we got here, we will process all tables we know about so far eventually so there
|
||||
// is no starvation
|
||||
for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) {
|
||||
for (auto table_it = _sstables_with_tables.begin(); table_it != _sstables_with_tables.end(); table_it = _sstables_with_tables.erase(table_it)) {
|
||||
auto& [t, t_sstables] = *table_it;
|
||||
schema_ptr s = t->schema();
|
||||
|
||||
vug_logger.trace("Processing {}.{}: {} sstables", s->ks_name(), s->cf_name(), t_sstables.size());
|
||||
|
||||
// Copy what we have so far so we don't miss new updates
|
||||
auto sstables = std::exchange(_sstables_with_tables[t], {});
|
||||
auto sstables = std::exchange(t_sstables, {});
|
||||
|
||||
const auto num_sstables = sstables.size();
|
||||
|
||||
try {
|
||||
// temporary: need an sstable set for the flat mutation reader, but the
|
||||
@@ -81,7 +86,7 @@ future<> view_update_generator::start() {
|
||||
// Move from staging will be retried upon restart.
|
||||
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
|
||||
}
|
||||
_registration_sem.signal();
|
||||
_registration_sem.signal(num_sstables);
|
||||
}
|
||||
// For each table, move the processed staging sstables into the table's base dir.
|
||||
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {
|
||||
|
||||
@@ -32,7 +32,10 @@
|
||||
namespace db::view {
|
||||
|
||||
class view_update_generator {
|
||||
public:
|
||||
static constexpr size_t registration_queue_size = 5;
|
||||
|
||||
private:
|
||||
database& _db;
|
||||
seastar::abort_source _as;
|
||||
future<> _started = make_ready_future<>();
|
||||
@@ -51,6 +54,8 @@ public:
|
||||
future<> start();
|
||||
future<> stop();
|
||||
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<table> table);
|
||||
|
||||
ssize_t available_register_units() const { return _registration_sem.available_units(); }
|
||||
private:
|
||||
bool should_throttle() const;
|
||||
};
|
||||
|
||||
@@ -43,16 +43,29 @@
|
||||
#include "log.hh"
|
||||
#include "db/config.hh"
|
||||
#include "database.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
|
||||
static logging::logger blogger("boot_strapper");
|
||||
|
||||
namespace dht {
|
||||
|
||||
future<> boot_strapper::bootstrap() {
|
||||
future<> boot_strapper::bootstrap(streaming::stream_reason reason) {
|
||||
blogger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens());
|
||||
|
||||
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _abort_source, _tokens, _address, "Bootstrap", streaming::stream_reason::bootstrap);
|
||||
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(gms::get_local_gossiper().get_unreachable_members()));
|
||||
sstring description;
|
||||
if (reason == streaming::stream_reason::bootstrap) {
|
||||
description = "Bootstrap";
|
||||
} else if (reason == streaming::stream_reason::replace) {
|
||||
description = "Replace";
|
||||
} else {
|
||||
return make_exception_future<>(std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap"));
|
||||
}
|
||||
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _abort_source, _tokens, _address, description, reason);
|
||||
auto nodes_to_filter = gms::get_local_gossiper().get_unreachable_members();
|
||||
if (reason == streaming::stream_reason::replace && _db.local().get_replace_address()) {
|
||||
nodes_to_filter.insert(_db.local().get_replace_address().value());
|
||||
}
|
||||
blogger.debug("nodes_to_filter={}", nodes_to_filter);
|
||||
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(nodes_to_filter));
|
||||
auto keyspaces = make_lw_shared<std::vector<sstring>>(_db.local().get_non_system_keyspaces());
|
||||
return do_for_each(*keyspaces, [this, keyspaces, streamer] (sstring& keyspace_name) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <unordered_set>
|
||||
#include "database_fwd.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
|
||||
@@ -66,7 +67,7 @@ public:
|
||||
, _token_metadata(tmd) {
|
||||
}
|
||||
|
||||
future<> bootstrap();
|
||||
future<> bootstrap(streaming::stream_reason reason);
|
||||
|
||||
/**
|
||||
* if initialtoken was specified, use that (split on comma).
|
||||
|
||||
@@ -91,7 +91,16 @@ range_streamer::get_range_fetch_map(const std::unordered_map<dht::token_range, s
|
||||
}
|
||||
|
||||
if (!found_source) {
|
||||
throw std::runtime_error(format("unable to find sufficient sources for streaming range {} in keyspace {}", range_, keyspace));
|
||||
auto& ks = _db.local().find_keyspace(keyspace);
|
||||
auto rf = ks.get_replication_strategy().get_replication_factor();
|
||||
// When a replacing node replaces a dead node with keyspace of RF
|
||||
// 1, it is expected that replacing node could not find a peer node
|
||||
// that contains data to stream from.
|
||||
if (_reason == streaming::stream_reason::replace && rf == 1) {
|
||||
logger.warn("Unable to find sufficient sources to stream range {} for keyspace {} with RF = 1 for replace operation", range_, keyspace);
|
||||
} else {
|
||||
throw std::runtime_error(format("unable to find sufficient sources for streaming range {} in keyspace {}", range_, keyspace));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -146,7 +146,7 @@ private:
|
||||
* here, we always exclude ourselves.
|
||||
* @return
|
||||
*/
|
||||
static std::unordered_map<inet_address, dht::token_range_vector>
|
||||
std::unordered_map<inet_address, dht::token_range_vector>
|
||||
get_range_fetch_map(const std::unordered_map<dht::token_range, std::vector<inet_address>>& ranges_with_sources,
|
||||
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
|
||||
const sstring& keyspace);
|
||||
|
||||
15
dist/common/scripts/scylla-housekeeping
vendored
15
dist/common/scripts/scylla-housekeeping
vendored
@@ -61,7 +61,15 @@ def sh_command(*args):
|
||||
return out
|
||||
|
||||
def get_url(path):
|
||||
return urllib.request.urlopen(path).read().decode('utf-8')
|
||||
# If server returns any error, like 403, or 500 urllib.request throws exception, which is not serializable.
|
||||
# When multiprocessing routines fail to serialize it, it throws ambiguous serialization exception
|
||||
# from get_json_from_url.
|
||||
# In order to see legit error we catch it from the inside of process, covert to string and
|
||||
# pass it as part of return value
|
||||
try:
|
||||
return 0, urllib.request.urlopen(path).read().decode('utf-8')
|
||||
except Exception as exc:
|
||||
return 1, str(exc)
|
||||
|
||||
def get_json_from_url(path):
|
||||
pool = mp.Pool(processes=1)
|
||||
@@ -71,13 +79,16 @@ def get_json_from_url(path):
|
||||
# to enforce a wallclock timeout.
|
||||
result = pool.apply_async(get_url, args=(path,))
|
||||
try:
|
||||
retval = result.get(timeout=5)
|
||||
status, retval = result.get(timeout=5)
|
||||
except mp.TimeoutError as err:
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
raise
|
||||
if status == 1:
|
||||
raise RuntimeError(f'Failed to get "{path}" due to the following error: {retval}')
|
||||
return json.loads(retval)
|
||||
|
||||
|
||||
def get_api(path):
|
||||
return get_json_from_url("http://" + api_address + path)
|
||||
|
||||
|
||||
13
dist/common/scripts/scylla_coredump_setup
vendored
13
dist/common/scripts/scylla_coredump_setup
vendored
@@ -65,8 +65,8 @@ Before=scylla-server.service
|
||||
After=local-fs.target
|
||||
|
||||
[Mount]
|
||||
What=/var/lib/systemd/coredump
|
||||
Where=/var/lib/scylla/coredump
|
||||
What=/var/lib/scylla/coredump
|
||||
Where=/var/lib/systemd/coredump
|
||||
Type=none
|
||||
Options=bind
|
||||
|
||||
@@ -78,6 +78,7 @@ WantedBy=multi-user.target
|
||||
makedirs('/var/lib/scylla/coredump')
|
||||
systemd_unit.reload()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').enable()
|
||||
systemd_unit('var-lib-systemd-coredump.mount').start()
|
||||
if os.path.exists('/usr/lib/sysctl.d/50-coredump.conf'):
|
||||
run('sysctl -p /usr/lib/sysctl.d/50-coredump.conf')
|
||||
else:
|
||||
@@ -99,6 +100,14 @@ WantedBy=multi-user.target
|
||||
try:
|
||||
run('coredumpctl --no-pager --no-legend info {}'.format(pid))
|
||||
print('\nsystemd-coredump is working finely.')
|
||||
|
||||
# get last coredump generated by bash and remove it, ignore inaccessaible ones
|
||||
corefile = out(cmd=r'coredumpctl -1 --no-legend dump 2>&1 | grep "bash" | '
|
||||
r'grep -v "inaccessible" | grep "Storage:\|Coredump:"',
|
||||
shell=True, exception=False)
|
||||
if corefile:
|
||||
corefile = corefile.split()[-1]
|
||||
run('rm -f {}'.format(corefile))
|
||||
except subprocess.CalledProcessError as e:
|
||||
print('Does not able to detect coredump, failed to configure systemd-coredump.')
|
||||
sys.exit(1)
|
||||
|
||||
3
dist/common/scripts/scylla_setup
vendored
3
dist/common/scripts/scylla_setup
vendored
@@ -374,6 +374,9 @@ if __name__ == '__main__':
|
||||
if not stat.S_ISBLK(os.stat(dsk).st_mode):
|
||||
print('{} is not block device'.format(dsk))
|
||||
continue
|
||||
if dsk in selected:
|
||||
print(f'{dsk} is already added')
|
||||
continue
|
||||
selected.append(dsk)
|
||||
devices.remove(dsk)
|
||||
disks = ','.join(selected)
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -182,7 +182,7 @@ class aws_instance:
|
||||
instance_size = self.instance_size()
|
||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||
return 'ixgbevf'
|
||||
if instance_class in ['c5', 'c5d', 'f1', 'g3', 'h1', 'i3', 'i3en', 'm5', 'm5d', 'p2', 'p3', 'r4', 'x1']:
|
||||
if instance_class in ['a1', 'c5', 'c5d', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||
return 'ena'
|
||||
if instance_class == 'm4':
|
||||
if instance_size == '16xlarge':
|
||||
|
||||
2
dist/docker/redhat/commandlineparser.py
vendored
2
dist/docker/redhat/commandlineparser.py
vendored
@@ -18,6 +18,8 @@ def parse():
|
||||
parser.add_argument('--api-address', default=None, dest='apiAddress')
|
||||
parser.add_argument('--alternator-address', default=None, dest='alternatorAddress', help="Alternator API address to listen to. Defaults to listen address.")
|
||||
parser.add_argument('--alternator-port', default=None, dest='alternatorPort', help="Alternator API port to listen to. Disabled by default.")
|
||||
parser.add_argument('--alternator-https-port', default=None, dest='alternatorHttpsPort', help="Alternator API TLS port to listen to. Disabled by default.")
|
||||
parser.add_argument('--alternator-write-isolation', default=None, dest='alternatorWriteIsolation', help="Alternator default write isolation policy.")
|
||||
parser.add_argument('--disable-version-check', default=False, action='store_true', dest='disable_housekeeping', help="Disable version check")
|
||||
parser.add_argument('--authenticator', default=None, dest='authenticator', help="Set authenticator class")
|
||||
parser.add_argument('--authorizer', default=None, dest='authorizer', help="Set authorizer class")
|
||||
|
||||
9
dist/docker/redhat/scyllasetup.py
vendored
9
dist/docker/redhat/scyllasetup.py
vendored
@@ -16,6 +16,8 @@ class ScyllaSetup:
|
||||
self._broadcastRpcAddress = arguments.broadcastRpcAddress
|
||||
self._apiAddress = arguments.apiAddress
|
||||
self._alternatorPort = arguments.alternatorPort
|
||||
self._alternatorHttpsPort = arguments.alternatorHttpsPort
|
||||
self._alternatorWriteIsolation = arguments.alternatorWriteIsolation
|
||||
self._smp = arguments.smp
|
||||
self._memory = arguments.memory
|
||||
self._overprovisioned = arguments.overprovisioned
|
||||
@@ -116,6 +118,13 @@ class ScyllaSetup:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-port %s" % self._alternatorPort]
|
||||
|
||||
if self._alternatorHttpsPort is not None:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-https-port %s" % self._alternatorHttpsPort]
|
||||
|
||||
if self._alternatorWriteIsolation is not None:
|
||||
args += ["--alternator-write-isolation %s" % self._alternatorWriteIsolation]
|
||||
|
||||
if self._authenticator is not None:
|
||||
args += ["--authenticator %s" % self._authenticator]
|
||||
|
||||
|
||||
@@ -25,6 +25,14 @@ By default, Scylla listens on this port on all network interfaces.
|
||||
To listen only on a specific interface, pass also an "`alternator-address`"
|
||||
option.
|
||||
|
||||
As we explain below in the "Write isolation policies", Alternator has
|
||||
four different choices for the implementation of writes, each with
|
||||
different advantages. You should consider which of the options makes
|
||||
more sense for your intended use case, and use the "`--alternator-write-isolation`"
|
||||
option to choose one. There is currently no default for this option: Trying
|
||||
to run Scylla with Alternator enabled without passing this option will
|
||||
result in an error asking you to set it.
|
||||
|
||||
DynamoDB clients usually specify a single "endpoint" address, e.g.,
|
||||
`dynamodb.us-east-1.amazonaws.com`, and a DNS server hosted on that address
|
||||
distributes the connections to many different backend nodes. Alternator
|
||||
@@ -108,12 +116,15 @@ implemented, with the following limitations:
|
||||
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
|
||||
or LOCAL_QUORUM (strong consistency).
|
||||
### Global Tables
|
||||
* Currently, *all* Alternator tables are created as "Global Tables", i.e., can
|
||||
be accessed from all of Scylla's DCs.
|
||||
* We do not yet support the DynamoDB API calls to make some of the tables
|
||||
global and others local to a particular DC: CreateGlobalTable,
|
||||
UpdateGlobalTable, DescribeGlobalTable, ListGlobalTables,
|
||||
UpdateGlobalTableSettings, DescribeGlobalTableSettings, and UpdateTable.
|
||||
* Currently, *all* Alternator tables are created as "global" tables and can
|
||||
be accessed from all the DCs existing at the time of the table's creation.
|
||||
If a DC is added after a table is created, the table won't be visible from
|
||||
the new DC and changing that requires a CQL "ALTER TABLE" statement to
|
||||
modify the table's replication strategy.
|
||||
* We do not yet support the DynamoDB API calls that control which table is
|
||||
visible from what DC: CreateGlobalTable, UpdateGlobalTable,
|
||||
DescribeGlobalTable, ListGlobalTables, UpdateGlobalTableSettings,
|
||||
DescribeGlobalTableSettings, and UpdateTable.
|
||||
### Backup and Restore
|
||||
* On-demand backup: the DynamoDB APIs are not yet supported: CreateBackup,
|
||||
DescribeBackup, DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||
@@ -153,23 +164,28 @@ implemented, with the following limitations:
|
||||
|
||||
### Write isolation policies
|
||||
DynamoDB API update requests may involve a read before the write - e.g., a
|
||||
_conditional_ update, or an update based on the old value of an attribute.
|
||||
_conditional_ update or an update based on the old value of an attribute.
|
||||
The read and the write should be treated as a single transaction - protected
|
||||
(_isolated_) from other parallel writes to the same item.
|
||||
|
||||
By default, Alternator does this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation. However, LWT significantly slows
|
||||
writes down, so Alternator supports three additional _write isolation
|
||||
policies_, which can be chosen on a per-table basis and may make sense for
|
||||
certain workloads as explained below.
|
||||
Alternator could do this isolation by using Scylla's LWT (lightweight
|
||||
transactions) for every write operation, but this significantly slows
|
||||
down writes, and not necessary for workloads which don't use read-modify-write
|
||||
(RMW) updates.
|
||||
|
||||
The write isolation policy of a table is configured by tagging the table (at
|
||||
CreateTable time, or any time later with TagResource) with the key
|
||||
So Alternator supports four _write isolation policies_, which can be chosen
|
||||
on a per-table basis and may make sense for certain workloads as explained
|
||||
below.
|
||||
|
||||
A default write isolation policy **must** be chosen using the
|
||||
`--alternator-write-isolation` configuration option. Additionally, the write
|
||||
isolation policy for a specific table can be overriden by tagging the table
|
||||
(at CreateTable time, or any time later with TagResource) with the key
|
||||
`system:write_isolation`, and one of the following values:
|
||||
|
||||
* `a`, `always`, or `always_use_lwt` - This is the default choice.
|
||||
It performs every write operation - even those that do not need a read
|
||||
before the write - as a lightweight transaction.
|
||||
* `a`, `always`, or `always_use_lwt` - This mode performs every write
|
||||
operation - even those that do not need a read before the write - as a
|
||||
lightweight transaction.
|
||||
|
||||
This is the slowest choice, but also the only choice guaranteed to work
|
||||
correctly for every workload.
|
||||
|
||||
@@ -10,10 +10,16 @@ This section will guide you through the steps for setting up the cluster:
|
||||
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
|
||||
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
|
||||
add to every "docker run" command: `-p 8000:8000` before the image name
|
||||
and `--alternator-port=8000` at the end. The "alternator-port" option
|
||||
specifies on which port Scylla will listen for the (unencrypted) DynamoDB API.
|
||||
and `--alternator-port=8000 --alternator-write-isolation=always` at the end.
|
||||
The "alternator-port" option specifies on which port Scylla will listen for
|
||||
the (unencrypted) DynamoDB API, and the "alternator-write-isolation" chooses
|
||||
whether or not Alternator will use LWT for every write.
|
||||
For example,
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000
|
||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000 --alternator-write-isolation=always
|
||||
The `--alternator-https-port=...` option can also be used to enable
|
||||
Alternator on an encrypted (HTTPS) port. Note that in this case, the files
|
||||
`/etc/scylla/scylla.crt` and `/etc/scylla/scylla.key` must be inserted into
|
||||
the image, containing the SSL certificate and key to use.
|
||||
|
||||
## Testing Scylla's DynamoDB API support:
|
||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||
|
||||
12
docs/cdc.md
12
docs/cdc.md
@@ -92,7 +92,7 @@ Shard-colocation is an optimization.
|
||||
|
||||
Having different generations operating at different points in time is necessary to maintain colocation in presence of topology changes. When a new node joins the cluster it modifies the token ring by refining existing vnodes into smaller vnodes. But before it does it, it will introduce a new CDC generation whose token ranges refine those new (smaller) vnodes (which means they also refine the old vnodes; that way writes will be colocated on both old and new replicas).
|
||||
|
||||
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::generate_topology_description` function. It then inserts the generation description into an internal distributed table `cdc_topology_description` in the `system_distributed` keyspace. The table is defined as follows (from db/system_distributed_keyspace.cc):
|
||||
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::generate_topology_description` function. It then inserts the generation description into an internal distributed table `cdc_generations` in the `system_distributed` keyspace. The table is defined as follows (from db/system_distributed_keyspace.cc):
|
||||
```
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_TOPOLOGY_DESCRIPTION, {id})
|
||||
/* The timestamp of this CDC generation. */
|
||||
@@ -131,11 +131,11 @@ Next, the node starts gossiping the timestamp of the new generation together wit
|
||||
}).get();
|
||||
```
|
||||
|
||||
When other nodes learn about the generation, they'll extract it from the `cdc_topology_description` table and save it using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
||||
When other nodes learn about the generation, they'll extract it from the `cdc_generations` table and save it using `cdc::metadata::insert(db_clock::time_point, topology_description&&)`.
|
||||
Notice that nodes learn about the generation together with the new node's tokens. When they learn about its tokens they'll immediately start sending writes to the new node (in the case of bootstrapping, it will become a pending replica). But the old generation will still be operating for a minute or two. Thus colocation will be lost for a while. This problem will be fixed when the two-phase-commit approach is implemented.
|
||||
|
||||
We're not able to prevent a node learning about a new generation too late due to a network partition: if gossip doesn't reach the node in time, some writes might be sent to the wrong (old) generation.
|
||||
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_topology_description`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (just the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
||||
However, it could happen that a node learns about the generation from gossip in time, but then won't be able to extract it from `cdc_generations`. In that case we can still maintain consistency: the node will remember that there is a new generation even though it doesn't yet know what it is (just the timestamp) using the `cdc::metadata::prepare(db_clock::time_point)` method, and then _reject_ writes for CDC-enabled tables that are supposed to use this new generation. The node will keep trying to read the generation's data in background until it succeeds or sees that it's not necessary anymore (e.g. because the generation was already superseded by a new generation).
|
||||
Thus we give up availability for safety. This likely won't happen if the administrator ensures that the cluster is not partitioned before bootstrapping a new node. This problem will also be mitigated with a future patch.
|
||||
|
||||
Due to the need of maintaining colocation we don't allow the client to send writes with arbitrary timestamps.
|
||||
@@ -144,7 +144,7 @@ Reason: we cannot allow writes before `T`, because they belong to the old genera
|
||||
|
||||
### Streams description table
|
||||
|
||||
The `cdc_description` table in the `system_distributed` keyspace allows CDC clients to learn about available sets of streams and the time intervals they are operating at. It's definition is as follows (db/system_distributed_keyspace.cc):
|
||||
The `cdc_streams` table in the `system_distributed` keyspace allows CDC clients to learn about available sets of streams and the time intervals they are operating at. It's definition is as follows (db/system_distributed_keyspace.cc):
|
||||
```
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::CDC_DESC, {id})
|
||||
/* The timestamp of this CDC generation. */
|
||||
@@ -161,9 +161,9 @@ where
|
||||
thread_local data_type cdc_stream_tuple_type = tuple_type_impl::get_instance({long_type, long_type});
|
||||
thread_local data_type cdc_streams_set_type = set_type_impl::get_instance(cdc_stream_tuple_type, false);
|
||||
```
|
||||
This table simply contains each generation's timestamp (as partition key) and the set of stream IDs used by this generation. It is meant to be user-facing, in contrast to `cdc_topology_description` which is used internally.
|
||||
This table simply contains each generation's timestamp (as partition key) and the set of stream IDs used by this generation. It is meant to be user-facing, in contrast to `cdc_generations` which is used internally.
|
||||
|
||||
When nodes learn about a CDC generation through gossip, they race to update the description table by inserting a proper row (see `cdc::update_streams_description`). This operation is idempotent so it doesn't matter if multiple nodes do it at the same time.
|
||||
|
||||
#### TODO: expired generations
|
||||
The `expired` column in `cdc_description` and `cdc_topology_description` means that this generation was superseded by some new generation and will soon be removed (its table entry will be gone). This functionality is yet to be implemented.
|
||||
The `expired` column in `cdc_streams` and `cdc_generations` means that this generation was superseded by some new generation and will soon be removed (its table entry will be gone). This functionality is yet to be implemented.
|
||||
|
||||
@@ -163,6 +163,20 @@ $ docker run --name some-scylla -d scylladb/scylla --alternator-port 8000
|
||||
|
||||
**Since: 3.2**
|
||||
|
||||
### `--alternator-https-port PORT`
|
||||
|
||||
The `--alternator-https-port` option is similar to `--alternator-port`, just enables an encrypted (HTTPS) port. Either the `--alternator-https-port` or `--alternator-http-port`, or both, can be used to enable Alternator.
|
||||
|
||||
Note that the `--alternator-https-port` option also requires that files `/etc/scylla/scylla.crt` and `/etc/scylla/scylla.key` be inserted into the image. These files contain an SSL certificate and key, respectively.
|
||||
|
||||
**Since: 4.2**
|
||||
|
||||
### `--alternator-write-isolation policy`
|
||||
|
||||
The `--alternator-write-isolation` command line option chooses between four allowed write isolation policies described in docs/alternator/alternator.md. This option must be specified if Alternator is enabled - it does not have a default.
|
||||
|
||||
**Since: 4.1**
|
||||
|
||||
### `--broadcast-address ADDR`
|
||||
|
||||
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.
|
||||
|
||||
@@ -175,6 +175,7 @@ public:
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::HIBERNATE,
|
||||
versioned_value::STATUS_BOOTSTRAPPING,
|
||||
versioned_value::STATUS_UNKNOWN,
|
||||
};
|
||||
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
||||
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
||||
|
||||
@@ -49,6 +49,7 @@ enum class stream_reason : uint8_t {
|
||||
removenode,
|
||||
rebuild,
|
||||
repair,
|
||||
replace,
|
||||
};
|
||||
|
||||
enum class stream_mutation_fragments_cmd : uint8_t {
|
||||
|
||||
10
lua.cc
10
lua.cc
@@ -264,14 +264,12 @@ static auto visit_lua_raw_value(lua_State* l, int index, Func&& f) {
|
||||
|
||||
template <typename Func>
|
||||
static auto visit_decimal(const big_decimal &v, Func&& f) {
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
const auto& dividend = v.unscaled_value();
|
||||
auto divisor = boost::multiprecision::pow(ten, v.scale());
|
||||
boost::multiprecision::cpp_rational r = v.as_rational();
|
||||
const boost::multiprecision::cpp_int& dividend = numerator(r);
|
||||
const boost::multiprecision::cpp_int& divisor = denominator(r);
|
||||
if (dividend % divisor == 0) {
|
||||
return f(utils::multiprecision_int(boost::multiprecision::cpp_int(dividend/divisor)));
|
||||
return f(utils::multiprecision_int(dividend/divisor));
|
||||
}
|
||||
boost::multiprecision::cpp_rational r = dividend;
|
||||
r /= divisor;
|
||||
return f(r.convert_to<double>());
|
||||
}
|
||||
|
||||
|
||||
10
main.cc
10
main.cc
@@ -78,6 +78,7 @@
|
||||
#include "cdc/log.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "alternator/tags_extension.hh"
|
||||
#include "alternator/rmw_operation.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@@ -736,7 +737,7 @@ int main(int ac, char** av) {
|
||||
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
|
||||
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
start_large_data_handler(db).get();
|
||||
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
@@ -1081,6 +1082,7 @@ int main(int ac, char** av) {
|
||||
}
|
||||
|
||||
if (cfg->alternator_port() || cfg->alternator_https_port()) {
|
||||
alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation());
|
||||
static sharded<alternator::executor> alternator_executor;
|
||||
static sharded<alternator::server> alternator_server;
|
||||
|
||||
@@ -1186,6 +1188,12 @@ int main(int ac, char** av) {
|
||||
}
|
||||
});
|
||||
|
||||
auto stop_compaction_manager = defer_verbose_shutdown("compaction manager", [&db] {
|
||||
db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
});
|
||||
|
||||
auto stop_redis_service = defer_verbose_shutdown("redis service", [&cfg] {
|
||||
if (cfg->redis_port() || cfg->redis_ssl_port()) {
|
||||
redis.stop().get();
|
||||
|
||||
@@ -2505,7 +2505,8 @@ mutation_partition::fully_discontinuous(const schema& s, const position_range& r
|
||||
future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& source,
|
||||
const dht::decorated_key& dk,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr)
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout)
|
||||
{
|
||||
struct range_and_reader {
|
||||
dht::partition_range range;
|
||||
@@ -2530,7 +2531,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
|
||||
auto cwqrb = counter_write_query_result_builder(*s);
|
||||
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>>(
|
||||
*s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb));
|
||||
auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout);
|
||||
auto f = r_a_r->reader.consume(std::move(cfq), timeout);
|
||||
return f.finally([r_a_r = std::move(r_a_r)] { });
|
||||
}
|
||||
|
||||
@@ -2605,7 +2606,7 @@ void mutation_cleaner_impl::start_worker() {
|
||||
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
|
||||
auto&& region = snp.region();
|
||||
return with_allocator(region.allocator(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
{
|
||||
// Allocating sections require the region to be reclaimable
|
||||
// which means that they cannot be nested.
|
||||
// It is, however, possible, that if the snapshot is taken
|
||||
@@ -2617,13 +2618,15 @@ stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexce
|
||||
}
|
||||
try {
|
||||
return _worker_state->alloc_section(region, [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
return snp.merge_partition_versions(_app_stats);
|
||||
});
|
||||
});
|
||||
} catch (...) {
|
||||
// Merging failed, give up as there is no guarantee of forward progress.
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -206,5 +206,6 @@ public:
|
||||
future<mutation_opt> counter_write_query(schema_ptr, const mutation_source&,
|
||||
const dht::decorated_key& dk,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_ptr);
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout);
|
||||
|
||||
|
||||
@@ -12,7 +12,11 @@
|
||||
# At the end of the build we check that the build-id is indeed in the
|
||||
# first page. At install time we check that patchelf doesn't modify
|
||||
# the program headers.
|
||||
|
||||
# gdb has a SO_NAME_MAX_PATH_SIZE of 512, so limit the path size to
|
||||
# that. The 512 includes the null at the end, hence the 511 bellow.
|
||||
|
||||
ORIGINAL_DYNAMIC_LINKER=$(gcc -### /dev/null -o t 2>&1 | perl -n -e '/-dynamic-linker ([^ ]*) / && print $1')
|
||||
DYNAMIC_LINKER=$(printf "%2000s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
DYNAMIC_LINKER=$(printf "%511s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
|
||||
echo $DYNAMIC_LINKER
|
||||
|
||||
@@ -1945,7 +1945,7 @@ future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_meta
|
||||
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, std::unordered_set<dht::token> replacing_tokens) {
|
||||
auto op = sstring("replace_with_repair");
|
||||
auto source_dc = get_local_dc();
|
||||
auto reason = streaming::stream_reason::bootstrap;
|
||||
auto reason = streaming::stream_reason::replace;
|
||||
tm.update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
|
||||
}
|
||||
|
||||
@@ -450,6 +450,7 @@ class repair_writer {
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
streaming::stream_reason _reason;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -561,11 +562,18 @@ public:
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
_mq[node_idx]->abort(ep);
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -588,6 +596,10 @@ public:
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
@@ -1191,6 +1203,23 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
// Must run inside a seastar thread
|
||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||
@@ -1216,18 +1245,7 @@ private:
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
for (auto& r : row_diff) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1238,15 +1256,7 @@ private:
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1936,22 +1946,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1977,22 +1982,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2017,22 +2017,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
20
row_cache.cc
20
row_cache.cc
@@ -528,8 +528,12 @@ public:
|
||||
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
|
||||
{
|
||||
if (!mfopt) {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
return _cache._read_section(_cache._tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
});
|
||||
});
|
||||
}
|
||||
_cache.on_partition_miss();
|
||||
const partition_start& ps = mfopt->as_partition_start();
|
||||
@@ -952,13 +956,15 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
// expensive and we need to amortize it somehow.
|
||||
do {
|
||||
STAP_PROBE(scylla, row_cache_update_partition_start);
|
||||
with_linearized_managed_bytes([&] {
|
||||
{
|
||||
if (!update) {
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
memtable_entry& mem_e = *m.partitions.begin();
|
||||
size_entry = mem_e.size_in_allocator_without_rows(_tracker.allocator());
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
||||
update = updater(_update_section, cache_i, mem_e, is_present, real_dirty_acc);
|
||||
});
|
||||
});
|
||||
}
|
||||
// We use cooperative deferring instead of futures so that
|
||||
@@ -970,14 +976,16 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
update = {};
|
||||
real_dirty_acc.unpin_memory(size_entry);
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto i = m.partitions.begin();
|
||||
memtable_entry& mem_e = *i;
|
||||
m.partitions.erase(i);
|
||||
mem_e.partition().evict(_tracker.memtable_cleaner());
|
||||
current_allocator().destroy(&mem_e);
|
||||
});
|
||||
});
|
||||
++partition_count;
|
||||
});
|
||||
}
|
||||
STAP_PROBE(scylla, row_cache_update_partition_end);
|
||||
} while (!m.partitions.empty() && !need_preempt());
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
@@ -1124,8 +1132,8 @@ future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&
|
||||
seastar::thread::maybe_yield();
|
||||
|
||||
while (true) {
|
||||
auto done = with_linearized_managed_bytes([&] {
|
||||
return _update_section(_tracker.region(), [&] {
|
||||
auto done = _update_section(_tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
auto it = _partitions.lower_bound(*_prev_snapshot_pos, cmp);
|
||||
auto end = _partitions.lower_bound(dht::ring_position_view::for_range_end(range), cmp);
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: e708d1df3a...c9c1dc5fa7
@@ -973,7 +973,11 @@ void storage_service::bootstrap() {
|
||||
} else {
|
||||
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata);
|
||||
// Does the actual streaming of newly replicated token ranges.
|
||||
bs.bootstrap().get();
|
||||
if (db().local().is_replacing()) {
|
||||
bs.bootstrap(streaming::stream_reason::replace).get();
|
||||
} else {
|
||||
bs.bootstrap(streaming::stream_reason::bootstrap).get();
|
||||
}
|
||||
}
|
||||
_db.invoke_on_all([this] (database& db) {
|
||||
for (auto& cf : db.get_non_system_column_families()) {
|
||||
@@ -1040,12 +1044,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
@@ -2602,11 +2610,8 @@ future<> storage_service::drain() {
|
||||
ss.do_stop_ms().get();
|
||||
|
||||
// Interrupt on going compaction and shutdown to prevent further compaction
|
||||
// No new compactions will be started from this call site on, but we don't need
|
||||
// to wait for them to stop. Drain leaves the node alive, and a future shutdown
|
||||
// will wait on the compaction_manager stop future.
|
||||
ss.db().invoke_on_all([] (auto& db) {
|
||||
db.get_compaction_manager().do_stop();
|
||||
return db.get_compaction_manager().stop();
|
||||
}).get();
|
||||
|
||||
ss.set_mode(mode::DRAINING, "flushing column families", false);
|
||||
|
||||
@@ -548,6 +548,7 @@ private:
|
||||
}
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0;
|
||||
virtual bool use_interposer_consumer() const = 0;
|
||||
|
||||
compaction_info finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) {
|
||||
_info->ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(ended_at.time_since_epoch()).count();
|
||||
@@ -629,8 +630,10 @@ public:
|
||||
return garbage_collected_sstable_writer(_gc_sstable_writer_data);
|
||||
}
|
||||
|
||||
bool contains_multi_fragment_runs() const {
|
||||
return _contains_multi_fragment_runs;
|
||||
bool enable_garbage_collected_sstable_writer() const {
|
||||
// FIXME: Disable GC writer if interposer consumer is enabled until they both can work simultaneously.
|
||||
// More details can be found at https://github.com/scylladb/scylla/issues/6472
|
||||
return _contains_multi_fragment_runs && !use_interposer_consumer();
|
||||
}
|
||||
|
||||
template <typename GCConsumer = noop_compacted_fragments_consumer>
|
||||
@@ -740,6 +743,10 @@ public:
|
||||
return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return _cf.get_compaction_strategy().use_interposer_consumer();
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Compacting {}", formatted_msg);
|
||||
}
|
||||
@@ -820,7 +827,7 @@ private:
|
||||
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
|
||||
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
|
||||
// meaning incremental compaction is disabled for this compaction.
|
||||
if (!_contains_multi_fragment_runs) {
|
||||
if (!enable_garbage_collected_sstable_writer()) {
|
||||
return;
|
||||
}
|
||||
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
|
||||
@@ -1238,6 +1245,10 @@ public:
|
||||
};
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Resharding {}", formatted_msg);
|
||||
}
|
||||
@@ -1330,7 +1341,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf)
|
||||
cf.schema()->ks_name(), cf.schema()->cf_name()));
|
||||
}
|
||||
auto c = make_compaction(cf, std::move(descriptor));
|
||||
if (c->contains_multi_fragment_runs()) {
|
||||
if (c->enable_garbage_collected_sstable_writer()) {
|
||||
auto gc_writer = c->make_garbage_collected_sstable_writer();
|
||||
return compaction::run(std::move(c), std::move(gc_writer));
|
||||
}
|
||||
|
||||
@@ -357,7 +357,7 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
|
||||
});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory)
|
||||
: _compaction_controller(sg, iop, 250ms, [this, available_memory] () -> float {
|
||||
auto b = backlog() / available_memory;
|
||||
// This means we are using an unimplemented strategy
|
||||
@@ -372,26 +372,17 @@ compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as)
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares)
|
||||
: _compaction_controller(sg, iop, shares)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(available_memory)
|
||||
, _early_abort_subscription(as.subscribe([this] {
|
||||
do_stop();
|
||||
}))
|
||||
, _available_memory(available_memory)
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager()
|
||||
: _compaction_controller(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
, _available_memory(1)
|
||||
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
{}
|
||||
|
||||
compaction_manager::~compaction_manager() {
|
||||
@@ -455,17 +446,11 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop() {
|
||||
do_stop();
|
||||
return std::move(*_stop_future);
|
||||
}
|
||||
|
||||
void compaction_manager::do_stop() {
|
||||
if (_stopped) {
|
||||
return;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
_stopped = true;
|
||||
cmlog.info("Asked to stop");
|
||||
_stopped = true;
|
||||
// Reset the metrics registry
|
||||
_metrics.clear();
|
||||
// Stop all ongoing compaction.
|
||||
@@ -475,10 +460,7 @@ void compaction_manager::do_stop() {
|
||||
// Wait for each task handler to stop. Copy list because task remove itself
|
||||
// from the list when done.
|
||||
auto tasks = _tasks;
|
||||
|
||||
// fine to ignore here, since it is used to set up the shared promise in
|
||||
// the finally block. Waiters will wait on the shared_future through stop().
|
||||
_stop_future.emplace(do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return do_with(std::move(tasks), [this] (std::list<lw_shared_ptr<task>>& tasks) {
|
||||
return parallel_for_each(tasks, [this] (auto& task) {
|
||||
return this->task_stop(task);
|
||||
});
|
||||
@@ -490,7 +472,7 @@ void compaction_manager::do_stop() {
|
||||
_compaction_submission_timer.cancel();
|
||||
cmlog.info("Stopped");
|
||||
return _compaction_controller.shutdown();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
|
||||
@@ -523,7 +505,8 @@ inline bool compaction_manager::maybe_stop_on_error(future<> f, stop_iteration w
|
||||
} catch (storage_io_error& e) {
|
||||
cmlog.error("compaction failed due to storage io error: {}: stopping", e.what());
|
||||
retry = false;
|
||||
do_stop();
|
||||
// FIXME discarded future.
|
||||
(void)stop();
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}: {}", std::current_exception(), decision_msg);
|
||||
retry = true;
|
||||
@@ -742,8 +725,8 @@ future<> compaction_manager::perform_sstable_upgrade(column_family* cf, bool exc
|
||||
// Note that we potentially could be doing multiple
|
||||
// upgrades here in parallel, but that is really the users
|
||||
// problem.
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_upgrade(), [&](auto&) {
|
||||
return tables;
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_upgrade(), [&](auto&) mutable {
|
||||
return std::exchange(tables, {});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include <seastar/core/scheduling.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "log.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include <vector>
|
||||
@@ -70,9 +69,6 @@ private:
|
||||
|
||||
// Used to assert that compaction_manager was explicitly stopped, if started.
|
||||
bool _stopped = true;
|
||||
// We use a shared promise to indicate whether or not we are stopped because it is legal
|
||||
// for stop() to be called twice. For instance it is called on DRAIN and shutdown.
|
||||
std::optional<future<>> _stop_future;
|
||||
|
||||
stats _stats;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
@@ -153,10 +149,9 @@ private:
|
||||
using get_candidates_func = std::function<std::vector<sstables::shared_sstable>(const column_family&)>;
|
||||
|
||||
future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func);
|
||||
optimized_optional<abort_source::subscription> _early_abort_subscription;
|
||||
public:
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares, abort_source& as);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, size_t available_memory, uint64_t shares);
|
||||
compaction_manager();
|
||||
~compaction_manager();
|
||||
|
||||
@@ -165,13 +160,9 @@ public:
|
||||
// Start compaction manager.
|
||||
void start();
|
||||
|
||||
// Stop all fibers. Ongoing compactions will be waited. Should only be called
|
||||
// once, from main teardown path.
|
||||
// Stop all fibers. Ongoing compactions will be waited.
|
||||
future<> stop();
|
||||
|
||||
// Stop all fibers, without waiting. Safe to be called multiple times.
|
||||
void do_stop();
|
||||
|
||||
bool stopped() const { return _stopped; }
|
||||
|
||||
// Submit a column family to be compacted.
|
||||
|
||||
@@ -1080,6 +1080,10 @@ reader_consumer compaction_strategy::make_interposer_consumer(const mutation_sou
|
||||
return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool compaction_strategy::use_interposer_consumer() const {
|
||||
return _compaction_strategy_impl->use_interposer_consumer();
|
||||
}
|
||||
|
||||
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
|
||||
::shared_ptr<compaction_strategy_impl> impl;
|
||||
|
||||
|
||||
@@ -99,5 +99,9 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
virtual bool use_interposer_consumer() const {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ private:
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
uint32_t _promoted_index_end;
|
||||
uint64_t _promoted_index_end;
|
||||
uint64_t _position;
|
||||
uint64_t _partition_header_length = 0;
|
||||
std::optional<deletion_time> _deletion_time;
|
||||
|
||||
@@ -346,6 +346,10 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) override;
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) override;
|
||||
|
||||
virtual bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ enum class stream_reason : uint8_t {
|
||||
removenode,
|
||||
rebuild,
|
||||
repair,
|
||||
replace,
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include "streaming/stream_mutation_fragments_cmd.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "frozen_mutation.hh"
|
||||
#include "mutation.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
@@ -203,15 +204,27 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
|
||||
}();
|
||||
|
||||
auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> {
|
||||
return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd>& sink) {
|
||||
return repeat([&sink, si, got_error_from_peer] () mutable {
|
||||
return si->reader(db::no_timeout).then([&sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable {
|
||||
if (mf && !(*got_error_from_peer)) {
|
||||
mutation_fragment_stream_validator validator(*(si->reader.schema()));
|
||||
return do_with(std::move(sink), std::move(validator), [si, got_error_from_peer] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd>& sink, mutation_fragment_stream_validator& validator) {
|
||||
return repeat([&sink, &validator, si, got_error_from_peer] () mutable {
|
||||
return si->reader(db::no_timeout).then([&sink, &validator, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable {
|
||||
if (*got_error_from_peer) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("Got status error code from peer"));
|
||||
}
|
||||
if (mf) {
|
||||
if (!validator(mf->mutation_fragment_kind())) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error(format("Stream reader mutation_fragment validator failed, previous={}, current={}",
|
||||
validator.previous_mutation_fragment_kind(), mf->mutation_fragment_kind())));
|
||||
}
|
||||
frozen_mutation_fragment fmf = freeze(*s, *mf);
|
||||
auto size = fmf.representation().size();
|
||||
streaming::get_local_stream_manager().update_progress(si->plan_id, si->id.addr, streaming::progress_info::direction::OUT, size);
|
||||
return sink(fmf, stream_mutation_fragments_cmd::mutation_fragment_data).then([] { return stop_iteration::no; });
|
||||
} else {
|
||||
if (!validator.on_end_of_stream()) {
|
||||
return make_exception_future<stop_iteration>(std::runtime_error(format("Stream reader mutation_fragment validator failed on end_of_stream, previous={}, current=end_of_stream",
|
||||
validator.previous_mutation_fragment_kind())));
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -73,6 +73,7 @@ done
|
||||
--alternator-address $SCYLLA_IP \
|
||||
$alternator_port_option \
|
||||
--alternator-enforce-authorization=1 \
|
||||
--alternator-write-isolation=always_use_lwt \
|
||||
--developer-mode=1 \
|
||||
--ring-delay-ms 0 --collectd 0 \
|
||||
--smp 2 -m 1G \
|
||||
|
||||
@@ -305,3 +305,16 @@ def test_batch_get_item_projection_expression(test_table):
|
||||
got_items = reply['Responses'][test_table.name]
|
||||
expected_items = [{k: item[k] for k in wanted if k in item} for item in items]
|
||||
assert multiset(got_items) == multiset(expected_items)
|
||||
|
||||
# Test that we return the required UnprocessedKeys/UnprocessedItems parameters
|
||||
def test_batch_unprocessed(test_table_s):
|
||||
p = random_string()
|
||||
write_reply = test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': p, 'a': 'hi'}}}],
|
||||
})
|
||||
assert 'UnprocessedItems' in write_reply and write_reply['UnprocessedItems'] == dict()
|
||||
|
||||
read_reply = test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': p}], 'ProjectionExpression': 'p, a', 'ConsistentRead': True}
|
||||
})
|
||||
assert 'UnprocessedKeys' in read_reply and read_reply['UnprocessedKeys'] == dict()
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
import json
|
||||
from botocore.exceptions import BotoCoreError, ClientError
|
||||
|
||||
def gen_json(n):
|
||||
@@ -122,3 +123,12 @@ def test_incorrect_json(dynamodb, test_table):
|
||||
req = get_signed_request(dynamodb, 'PutItem', incorrect_req)
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert validate_resp(response.text)
|
||||
|
||||
# Test that the value returned by PutItem is always a JSON object, not an empty string (see #6568)
|
||||
def test_put_item_return_type(dynamodb, test_table):
|
||||
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
|
||||
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert response.text
|
||||
# json::loads throws on invalid input
|
||||
json.loads(response.text)
|
||||
|
||||
@@ -132,6 +132,13 @@ BOOST_AUTO_TEST_CASE(test_big_decimal_div) {
|
||||
test_div("-0.25", 10, "-0.02");
|
||||
test_div("-0.26", 10, "-0.03");
|
||||
test_div("-10E10", 3, "-3E10");
|
||||
|
||||
// Document a small oddity, 1e1 has -1 decimal places, so dividing
|
||||
// it by 2 produces 0. This is not the behavior in cassandra, but
|
||||
// scylla doesn't expose arithmetic operations, so this doesn't
|
||||
// seem to be visible from CQL.
|
||||
test_div("10", 2, "5");
|
||||
test_div("1e1", 2, "0e1");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_big_decimal_assignadd) {
|
||||
|
||||
@@ -142,6 +142,19 @@ SEASTAR_TEST_CASE(test_decimal_to_bigint) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_decimal_to_float) {
|
||||
return do_with_cql_env_thread([&](auto& e) {
|
||||
e.execute_cql("CREATE TABLE test (key text primary key, value decimal)").get();
|
||||
e.execute_cql("INSERT INTO test (key, value) VALUES ('k1', 10)").get();
|
||||
e.execute_cql("INSERT INTO test (key, value) VALUES ('k2', 1e1)").get();
|
||||
auto v = e.execute_cql("SELECT key, CAST(value as float) from test").get0();
|
||||
assert_that(v).is_rows().with_rows_ignore_order({
|
||||
{{serialized("k1")}, {serialized(float(10))}},
|
||||
{{serialized("k2")}, {serialized(float(10))}},
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_varint_to_bigint) {
|
||||
return do_with_cql_env_thread([&](auto& e) {
|
||||
e.execute_cql("CREATE TABLE test (key text primary key, value varint)").get();
|
||||
|
||||
@@ -283,8 +283,8 @@ SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_description) {
|
||||
assert_unauthorized(format("DROP TABLE {}", full_name));
|
||||
};
|
||||
|
||||
test_table("cdc_description");
|
||||
test_table("cdc_topology_description");
|
||||
test_table("cdc_streams");
|
||||
test_table("cdc_generations");
|
||||
}, mk_cdc_test_config()).get();
|
||||
}
|
||||
|
||||
|
||||
@@ -4545,3 +4545,21 @@ SEASTAR_TEST_CASE(ck_slice_with_null_is_forbidden) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_impossible_where) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t(p int PRIMARY KEY, r int)");
|
||||
cquery_nofail(e, "INSERT INTO t(p,r) VALUES (0, 0)");
|
||||
cquery_nofail(e, "INSERT INTO t(p,r) VALUES (1, 10)");
|
||||
cquery_nofail(e, "INSERT INTO t(p,r) VALUES (2, 20)");
|
||||
require_rows(e, "SELECT * FROM t WHERE r>10 AND r<10 ALLOW FILTERING", {});
|
||||
require_rows(e, "SELECT * FROM t WHERE r>=10 AND r<=0 ALLOW FILTERING", {});
|
||||
|
||||
cquery_nofail(e, "CREATE TABLE t2(p int, c int, PRIMARY KEY(p, c)) WITH CLUSTERING ORDER BY (c DESC)");
|
||||
cquery_nofail(e, "INSERT INTO t2(p,c) VALUES (0, 0)");
|
||||
cquery_nofail(e, "INSERT INTO t2(p,c) VALUES (1, 10)");
|
||||
cquery_nofail(e, "INSERT INTO t2(p,c) VALUES (2, 20)");
|
||||
require_rows(e, "SELECT * FROM t2 WHERE c>10 AND c<10 ALLOW FILTERING", {});
|
||||
require_rows(e, "SELECT * FROM t2 WHERE c>=10 AND c<=0 ALLOW FILTERING", {});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, std::ref(mm_notif), std::ref(token_metadata), true).get();
|
||||
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
|
||||
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
db.invoke_on_all([] (database& db) {
|
||||
db.get_compaction_manager().start();
|
||||
}).get();
|
||||
|
||||
@@ -670,11 +670,11 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("8182496e-4baf-3a07-91e6-caa140388846"),
|
||||
utils::UUID("a65ea746-4d8a-3e5c-8fbf-5f70c14dbcbc"),
|
||||
utils::UUID("a65ea746-4d8a-3e5c-8fbf-5f70c14dbcbc"),
|
||||
utils::UUID("4c138336-4677-3520-8556-4aab007cfedb"),
|
||||
utils::UUID("4c138336-4677-3520-8556-4aab007cfedb"),
|
||||
utils::UUID("2fb5d448-c537-39d1-9384-5166bcdcaa9a"),
|
||||
utils::UUID("7786dd34-2256-38f8-881e-79b062397069"),
|
||||
utils::UUID("7786dd34-2256-38f8-881e-79b062397069"),
|
||||
utils::UUID("5ca0cc9b-3651-3651-96ab-2324cdc07300"),
|
||||
utils::UUID("5ca0cc9b-3651-3651-96ab-2324cdc07300"),
|
||||
utils::UUID("62e1e586-6eec-3ff5-882a-89386664694b"),
|
||||
utils::UUID("daf6ded5-c294-3b07-b6a0-1b318a3c2e17"),
|
||||
utils::UUID("370c7d8e-0a4a-394d-b627-318805c64584"),
|
||||
@@ -685,11 +685,11 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("a33bc2a7-33b7-335d-8644-ecfdd23d1ca6"),
|
||||
utils::UUID("8ec3169e-33f9-356e-9a20-172ddf4261dc"),
|
||||
utils::UUID("8ec3169e-33f9-356e-9a20-172ddf4261dc"),
|
||||
utils::UUID("6d3a2294-0e82-33b8-943a-459cc9f3bf76"),
|
||||
utils::UUID("6d3a2294-0e82-33b8-943a-459cc9f3bf76"),
|
||||
utils::UUID("72d2ee27-a675-397d-85e1-1c49d3dcba13"),
|
||||
utils::UUID("e5a2ec93-1f1a-33b2-ad2e-9795f4b6b229"),
|
||||
utils::UUID("e5a2ec93-1f1a-33b2-ad2e-9795f4b6b229"),
|
||||
utils::UUID("6f1f5e2a-834a-37f8-ae05-ef4a1f406996"),
|
||||
utils::UUID("6f1f5e2a-834a-37f8-ae05-ef4a1f406996"),
|
||||
utils::UUID("e4c2bd0d-5f02-3d6f-9a43-de38b152b1fd"),
|
||||
utils::UUID("3b2c4957-4434-3078-ae42-fedcd81ac8cd"),
|
||||
utils::UUID("90518efe-88e6-39bd-a0a6-d32efc80777a"),
|
||||
@@ -700,11 +700,11 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_functions) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("e8879c0e-a731-3ac5-9b43-d2ed33b331f2"),
|
||||
utils::UUID("4a20c241-583c-334e-9fe9-b906280f724f"),
|
||||
utils::UUID("4a20c241-583c-334e-9fe9-b906280f724f"),
|
||||
utils::UUID("9711e6c4-dfcd-3c09-bf8b-f02811f73730"),
|
||||
utils::UUID("9711e6c4-dfcd-3c09-bf8b-f02811f73730"),
|
||||
utils::UUID("f169e77d-8ee1-3994-9379-065bcb9d1646"),
|
||||
utils::UUID("7185d744-0038-37ff-9770-04764feedbb7"),
|
||||
utils::UUID("7185d744-0038-37ff-9770-04764feedbb7"),
|
||||
utils::UUID("6d285eda-8963-3687-9ba6-a00764324b67"),
|
||||
utils::UUID("6d285eda-8963-3687-9ba6-a00764324b67"),
|
||||
utils::UUID("e96eb4ca-4f90-3b47-bfed-81e4a441734c"),
|
||||
utils::UUID("14f6c60f-8ba3-3141-8958-dd74366ee1ca"),
|
||||
utils::UUID("987a3386-83d1-3436-b3fc-1d2a3cfdd659"),
|
||||
@@ -724,11 +724,11 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_cdc_options) {
|
||||
auto ext = std::make_shared<db::extensions>();
|
||||
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("07d3ffb8-b7f5-367d-b128-d34b2033b788"),
|
||||
utils::UUID("9500fd95-abeb-32ea-b7af-568021eee217"),
|
||||
utils::UUID("9500fd95-abeb-32ea-b7af-568021eee217"),
|
||||
utils::UUID("9bd2ee49-f6db-37c7-a81f-1c2524dec3bf"),
|
||||
utils::UUID("9bd2ee49-f6db-37c7-a81f-1c2524dec3bf"),
|
||||
utils::UUID("fd939d2a-41fc-33e8-aa65-7d7b1678b307"),
|
||||
utils::UUID("87f0a70e-9dcd-34ae-8b72-bb23addab551"),
|
||||
utils::UUID("87f0a70e-9dcd-34ae-8b72-bb23addab551"),
|
||||
utils::UUID("4c8bf5c8-4823-3f35-9e34-275978f130c9"),
|
||||
utils::UUID("4c8bf5c8-4823-3f35-9e34-275978f130c9"),
|
||||
utils::UUID("549d0735-3087-3cf5-b4b6-23518a803246"),
|
||||
utils::UUID("612eaafb-27a4-3c01-b292-5d4424585ff7"),
|
||||
utils::UUID("01ea7d67-6f30-3215-aaf0-b7e2266daec5"),
|
||||
|
||||
@@ -5771,3 +5771,89 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
|
||||
cm.stop().wait();
|
||||
});
|
||||
}
|
||||
|
||||
//
|
||||
// Test that https://github.com/scylladb/scylla/issues/6472 is gone
|
||||
//
|
||||
SEASTAR_TEST_CASE(test_bug_6472) {
|
||||
return test_setup::do_with_tmp_directory([] (test_env& env, sstring tmpdir_path) {
|
||||
auto builder = schema_builder("tests", "test_bug_6472")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map<sstring, sstring> opts = {
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS" },
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1" },
|
||||
};
|
||||
builder.set_compaction_strategy_options(opts);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
auto s = builder.build();
|
||||
|
||||
auto sst_gen = [&env, s, tmpdir_path, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
return env.make_sstable(s, tmpdir_path, (*gen)++, la, big);
|
||||
};
|
||||
|
||||
auto next_timestamp = [] (auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto make_expiring_cell = [&] (std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step), gc_clock::duration(step + 5s));
|
||||
return m;
|
||||
};
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.datadir = tmpdir_path;
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
reader_concurrency_semaphore sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{});
|
||||
cfg.read_concurrency_semaphore = &sem;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
|
||||
// Make 100 expiring cells which belong to different time windows
|
||||
std::vector<mutation> muts;
|
||||
muts.reserve(101);
|
||||
for (auto i = 1; i < 101; i++) {
|
||||
muts.push_back(make_expiring_cell(std::chrono::hours(i)));
|
||||
}
|
||||
muts.push_back(make_expiring_cell(std::chrono::hours(110)));
|
||||
|
||||
//
|
||||
// Reproduce issue 6472 by making an input set which causes both interposer and GC writer to be enabled
|
||||
//
|
||||
std::vector<shared_sstable> sstables_spanning_many_windows = {
|
||||
make_sstable_containing(sst_gen, muts),
|
||||
make_sstable_containing(sst_gen, muts),
|
||||
};
|
||||
utils::UUID run_id = utils::make_random_uuid();
|
||||
for (auto& sst : sstables_spanning_many_windows) {
|
||||
sstables::test(sst).set_run_identifier(run_id);
|
||||
}
|
||||
|
||||
// Make sure everything we wanted expired is expired by now.
|
||||
forward_jump_clocks(std::chrono::hours(101));
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(sstables_spanning_many_windows),
|
||||
*cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -421,23 +421,49 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
auto& view_update_generator = e.local_view_update_generator();
|
||||
auto s = test_table_schema();
|
||||
|
||||
std::vector<shared_sstable> ssts;
|
||||
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
|
||||
auto write_to_sstable = [&] (mutation m) {
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_write_priority();
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
return sst;
|
||||
};
|
||||
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key1)});
|
||||
mutation m(s, key);
|
||||
auto col = s->get_column_definition("v");
|
||||
for (int i = 1024; i < 1280; ++i) {
|
||||
auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))}));
|
||||
row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(sstring(fmt::format("v{}", i)))));
|
||||
// Scatter the data in a bunch of different sstables, so we
|
||||
// can test the registration semaphore of the view update
|
||||
// generator
|
||||
if (!(i % 10)) {
|
||||
ssts.push_back(write_to_sstable(std::exchange(m, mutation(s, key))));
|
||||
}
|
||||
}
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
ssts.push_back(write_to_sstable(std::move(m)));
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_write_priority();
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
view_update_generator.register_staging_sstable(sst, t).get();
|
||||
parallel_for_each(ssts.begin(), ssts.begin() + 10, [&] (shared_sstable& sst) {
|
||||
return view_update_generator.register_staging_sstable(sst, t);
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
parallel_for_each(ssts.begin() + 10, ssts.end(), [&] (shared_sstable& sst) {
|
||||
return view_update_generator.register_staging_sstable(sst, t);
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
eventually([&, key1, key2] {
|
||||
auto msg = e.execute_cql(fmt::format("SELECT * FROM t WHERE p = '{}'", key1)).get0();
|
||||
@@ -464,5 +490,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
});
|
||||
}
|
||||
|
||||
7
test/cql/cdc_too_short_stream_id_test.cql
Normal file
7
test/cql/cdc_too_short_stream_id_test.cql
Normal file
@@ -0,0 +1,7 @@
|
||||
create table tb (pk int primary key) with cdc = {'enabled': true};
|
||||
insert into tb (pk) VALUES (0);
|
||||
|
||||
-- Key of length != 128 b should return empty result set (issue #6570)
|
||||
select * from tb_scylla_cdc_log where "cdc$stream_id" = 0x00;
|
||||
|
||||
select * from tb_scylla_cdc_log where "cdc$stream_id" = 0x;
|
||||
19
test/cql/cdc_too_short_stream_id_test.result
Normal file
19
test/cql/cdc_too_short_stream_id_test.result
Normal file
@@ -0,0 +1,19 @@
|
||||
create table tb (pk int primary key) with cdc = {'enabled': true};
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
insert into tb (pk) VALUES (0);
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
|
||||
-- Key of length != 128 b should return empty result set (issue #6570)
|
||||
select * from tb_scylla_cdc_log where "cdc$stream_id" = 0x00;
|
||||
{
|
||||
"rows" : null
|
||||
}
|
||||
|
||||
select * from tb_scylla_cdc_log where "cdc$stream_id" = 0x;
|
||||
{
|
||||
"rows" : null
|
||||
}
|
||||
8
test/cql/lwt_batch_validation_test.cql
Normal file
8
test/cql/lwt_batch_validation_test.cql
Normal file
@@ -0,0 +1,8 @@
|
||||
CREATE KEYSPACE k WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||
USE k;
|
||||
CREATE TABLE t1 (userid int PRIMARY KEY);
|
||||
CREATE TABLE t2 (userid int PRIMARY KEY);
|
||||
BEGIN BATCH
|
||||
INSERT INTO t1 (userid) VALUES (1) IF NOT EXISTS
|
||||
INSERT INTO t2 (userid) VALUES (1) IF NOT EXISTS
|
||||
APPLY BATCH;
|
||||
24
test/cql/lwt_batch_validation_test.result
Normal file
24
test/cql/lwt_batch_validation_test.result
Normal file
@@ -0,0 +1,24 @@
|
||||
CREATE KEYSPACE k WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
USE k;
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
CREATE TABLE t1 (userid int PRIMARY KEY);
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
CREATE TABLE t2 (userid int PRIMARY KEY);
|
||||
{
|
||||
"status" : "ok"
|
||||
}
|
||||
BEGIN BATCH
|
||||
INSERT INTO t1 (userid) VALUES (1) IF NOT EXISTS
|
||||
INSERT INTO t2 (userid) VALUES (1) IF NOT EXISTS
|
||||
APPLY BATCH;
|
||||
{
|
||||
"message" : "exceptions::invalid_request_exception (Batch with conditions cannot span multiple tables)",
|
||||
"status" : "error"
|
||||
}
|
||||
@@ -460,7 +460,7 @@ public:
|
||||
|
||||
database_config dbcfg;
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata)).get();
|
||||
auto stop_db = defer([&db] {
|
||||
db.stop().get();
|
||||
});
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3079304936
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2013820447
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
708745264
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,9 @@
|
||||
Scylla.db
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Summary.db
|
||||
Index.db
|
||||
Digest.crc32
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user