mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-29 04:37:00 +00:00
Compare commits
85 Commits
copilot/fi
...
scylla-4.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1fcf38abd9 | ||
|
|
3375b8b86c | ||
|
|
586546ab32 | ||
|
|
e1d558cb01 | ||
|
|
b0a8f396b4 | ||
|
|
48e7ee374a | ||
|
|
3e85ecd1bd | ||
|
|
930a4af8b3 | ||
|
|
6a6d36058a | ||
|
|
ce57d0174d | ||
|
|
cd11f210ad | ||
|
|
1e2e203cf0 | ||
|
|
1a98c93a25 | ||
|
|
4f4845c94c | ||
|
|
ef745e1ce7 | ||
|
|
ae32aa970a | ||
|
|
a3eb12c5f1 | ||
|
|
b5cedfc177 | ||
|
|
8d9bc57aca | ||
|
|
1cbda629a2 | ||
|
|
baf0201a6e | ||
|
|
7dcffb963c | ||
|
|
dcfaf4d035 | ||
|
|
f974a54cbd | ||
|
|
30a96cc592 | ||
|
|
faf300382a | ||
|
|
55400598ff | ||
|
|
c177295bce | ||
|
|
d95aa77b62 | ||
|
|
fe54009855 | ||
|
|
bbe82236be | ||
|
|
abd73cab78 | ||
|
|
8fd7cf5cd1 | ||
|
|
dd88b2dd18 | ||
|
|
eee4c00e29 | ||
|
|
85071ceeb1 | ||
|
|
4cf201fc24 | ||
|
|
c6ad5cf556 | ||
|
|
51e3e6c655 | ||
|
|
8ac6579b30 | ||
|
|
3744e66244 | ||
|
|
d3bf349484 | ||
|
|
3e6a8ba5bd | ||
|
|
5f1785b9cf | ||
|
|
e1fd6cf989 | ||
|
|
b7328ff1e4 | ||
|
|
602ed43ac7 | ||
|
|
c42c91c5bb | ||
|
|
cf017b320a | ||
|
|
89e79023ae | ||
|
|
bc67da1a21 | ||
|
|
0c7643f1fe | ||
|
|
c563234f40 | ||
|
|
77b7a48a02 | ||
|
|
b2b1bfb159 | ||
|
|
d72cbe37aa | ||
|
|
9f7b560771 | ||
|
|
06af9c028c | ||
|
|
c74ab3ae80 | ||
|
|
32cd3a070a | ||
|
|
bb1554f09e | ||
|
|
2037d7550e | ||
|
|
c320c3f6da | ||
|
|
0ed70944aa | ||
|
|
89f860d409 | ||
|
|
0819d221f4 | ||
|
|
53f47d4e67 | ||
|
|
21ad12669a | ||
|
|
c812359383 | ||
|
|
1bd79705fb | ||
|
|
7e2ef386cc | ||
|
|
51bad7e72c | ||
|
|
0379d0c031 | ||
|
|
a8ef820f27 | ||
|
|
9908f009a4 | ||
|
|
48d8a075b4 | ||
|
|
e3ddd607bc | ||
|
|
511773d466 | ||
|
|
121cd383fa | ||
|
|
90639f48e5 | ||
|
|
8d029a04aa | ||
|
|
67995db899 | ||
|
|
282cd0df7c | ||
|
|
ce58994d30 | ||
|
|
78f5afec30 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
|||||||
[submodule "seastar"]
|
[submodule "seastar"]
|
||||||
path = seastar
|
path = seastar
|
||||||
url = ../seastar
|
url = ../scylla-seastar
|
||||||
ignore = dirty
|
ignore = dirty
|
||||||
[submodule "swagger-ui"]
|
[submodule "swagger-ui"]
|
||||||
path = swagger-ui
|
path = swagger-ui
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
PRODUCT=scylla
|
PRODUCT=scylla
|
||||||
VERSION=666.development
|
VERSION=4.0.3
|
||||||
|
|
||||||
if test -f version
|
if test -f version
|
||||||
then
|
then
|
||||||
|
|||||||
@@ -66,8 +66,9 @@ static std::string format_time_point(db_clock::time_point tp) {
|
|||||||
time_t time_point_repr = db_clock::to_time_t(tp);
|
time_t time_point_repr = db_clock::to_time_t(tp);
|
||||||
std::string time_point_str;
|
std::string time_point_str;
|
||||||
time_point_str.resize(17);
|
time_point_str.resize(17);
|
||||||
|
::tm time_buf;
|
||||||
// strftime prints the terminating null character as well
|
// strftime prints the terminating null character as well
|
||||||
std::strftime(time_point_str.data(), time_point_str.size(), "%Y%m%dT%H%M%SZ", std::gmtime(&time_point_repr));
|
std::strftime(time_point_str.data(), time_point_str.size(), "%Y%m%dT%H%M%SZ", ::gmtime_r(&time_point_repr, &time_buf));
|
||||||
time_point_str.resize(16);
|
time_point_str.resize(16);
|
||||||
return time_point_str;
|
return time_point_str;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -208,12 +208,11 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
|||||||
throw api_error("ValidationException",
|
throw api_error("ValidationException",
|
||||||
format("Non-string IndexName '{}'", index_name->GetString()));
|
format("Non-string IndexName '{}'", index_name->GetString()));
|
||||||
}
|
}
|
||||||
}
|
// If no tables for global indexes were found, the index may be local
|
||||||
|
if (!proxy.get_db().local().has_schema(keyspace_name, table_name)) {
|
||||||
// If no tables for global indexes were found, the index may be local
|
type = table_or_view_type::lsi;
|
||||||
if (!proxy.get_db().local().has_schema(keyspace_name, table_name)) {
|
table_name = lsi_name(orig_table_name, index_name->GetString());
|
||||||
type = table_or_view_type::lsi;
|
}
|
||||||
table_name = lsi_name(orig_table_name, index_name->GetString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -1019,13 +1018,22 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
|||||||
|
|
||||||
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||||
mutation m(schema, _pk);
|
mutation m(schema, _pk);
|
||||||
auto& row = m.partition().clustered_row(*schema, _ck);
|
// If there's no clustering key, a tombstone should be created directly
|
||||||
|
// on a partition, not on a clustering row - otherwise it will look like
|
||||||
|
// an open-ended range tombstone, which will crash on KA/LA sstable format.
|
||||||
|
// Ref: #6035
|
||||||
|
const bool use_partition_tombstone = schema->clustering_key_size() == 0;
|
||||||
if (!_cells) {
|
if (!_cells) {
|
||||||
// a DeleteItem operation:
|
if (use_partition_tombstone) {
|
||||||
row.apply(tombstone(ts, gc_clock::now()));
|
m.partition().apply(tombstone(ts, gc_clock::now()));
|
||||||
|
} else {
|
||||||
|
// a DeleteItem operation:
|
||||||
|
m.partition().clustered_row(*schema, _ck).apply(tombstone(ts, gc_clock::now()));
|
||||||
|
}
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
// else, a PutItem operation:
|
// else, a PutItem operation:
|
||||||
|
auto& row = m.partition().clustered_row(*schema, _ck);
|
||||||
attribute_collector attrs_collector;
|
attribute_collector attrs_collector;
|
||||||
for (auto& c : *_cells) {
|
for (auto& c : *_cells) {
|
||||||
const column_definition* cdef = schema->get_column_definition(c.column_name);
|
const column_definition* cdef = schema->get_column_definition(c.column_name);
|
||||||
@@ -1048,7 +1056,11 @@ mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
|||||||
// Scylla proper, to implement the operation to replace an entire
|
// Scylla proper, to implement the operation to replace an entire
|
||||||
// collection ("UPDATE .. SET x = ..") - see
|
// collection ("UPDATE .. SET x = ..") - see
|
||||||
// cql3::update_parameters::make_tombstone_just_before().
|
// cql3::update_parameters::make_tombstone_just_before().
|
||||||
row.apply(tombstone(ts-1, gc_clock::now()));
|
if (use_partition_tombstone) {
|
||||||
|
m.partition().apply(tombstone(ts-1, gc_clock::now()));
|
||||||
|
} else {
|
||||||
|
row.apply(tombstone(ts-1, gc_clock::now()));
|
||||||
|
}
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1202,11 +1214,6 @@ std::optional<shard_id> rmw_operation::shard_for_execute(bool needs_read_before_
|
|||||||
// PutItem, DeleteItem). All these return nothing by default, but can
|
// PutItem, DeleteItem). All these return nothing by default, but can
|
||||||
// optionally return Attributes if requested via the ReturnValues option.
|
// optionally return Attributes if requested via the ReturnValues option.
|
||||||
static future<executor::request_return_type> rmw_operation_return(rjson::value&& attributes) {
|
static future<executor::request_return_type> rmw_operation_return(rjson::value&& attributes) {
|
||||||
// As an optimization, in the simple and common case that nothing is to be
|
|
||||||
// returned, quickly return an empty result:
|
|
||||||
if (attributes.IsNull()) {
|
|
||||||
return make_ready_future<executor::request_return_type>(json_string(""));
|
|
||||||
}
|
|
||||||
rjson::value ret = rjson::empty_object();
|
rjson::value ret = rjson::empty_object();
|
||||||
if (!attributes.IsNull()) {
|
if (!attributes.IsNull()) {
|
||||||
rjson::set(ret, "Attributes", std::move(attributes));
|
rjson::set(ret, "Attributes", std::move(attributes));
|
||||||
@@ -2773,6 +2780,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
|||||||
[] (std::vector<std::tuple<std::string, std::optional<rjson::value>>> responses) {
|
[] (std::vector<std::tuple<std::string, std::optional<rjson::value>>> responses) {
|
||||||
rjson::value response = rjson::empty_object();
|
rjson::value response = rjson::empty_object();
|
||||||
rjson::set(response, "Responses", rjson::empty_object());
|
rjson::set(response, "Responses", rjson::empty_object());
|
||||||
|
rjson::set(response, "UnprocessedKeys", rjson::empty_object());
|
||||||
for (auto& t : responses) {
|
for (auto& t : responses) {
|
||||||
if (!response["Responses"].HasMember(std::get<0>(t).c_str())) {
|
if (!response["Responses"].HasMember(std::get<0>(t).c_str())) {
|
||||||
rjson::set_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array());
|
rjson::set_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array());
|
||||||
@@ -2889,6 +2897,7 @@ static future<executor::request_return_type> do_query(schema_ptr schema,
|
|||||||
uint32_t limit,
|
uint32_t limit,
|
||||||
db::consistency_level cl,
|
db::consistency_level cl,
|
||||||
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions,
|
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions,
|
||||||
|
query::partition_slice::option_set custom_opts,
|
||||||
service::client_state& client_state,
|
service::client_state& client_state,
|
||||||
cql3::cql_stats& cql_stats,
|
cql3::cql_stats& cql_stats,
|
||||||
tracing::trace_state_ptr trace_state,
|
tracing::trace_state_ptr trace_state,
|
||||||
@@ -2909,7 +2918,9 @@ static future<executor::request_return_type> do_query(schema_ptr schema,
|
|||||||
auto regular_columns = boost::copy_range<query::column_id_vector>(
|
auto regular_columns = boost::copy_range<query::column_id_vector>(
|
||||||
schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; }));
|
schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; }));
|
||||||
auto selection = cql3::selection::selection::wildcard(schema);
|
auto selection = cql3::selection::selection::wildcard(schema);
|
||||||
auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), selection->get_query_options());
|
query::partition_slice::option_set opts = selection->get_query_options();
|
||||||
|
opts.add(custom_opts);
|
||||||
|
auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), opts);
|
||||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, query::max_partitions);
|
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, query::max_partitions);
|
||||||
|
|
||||||
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
|
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
|
||||||
@@ -2939,11 +2950,38 @@ static future<executor::request_return_type> do_query(schema_ptr schema,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static dht::token token_for_segment(int segment, int total_segments) {
|
||||||
|
assert(total_segments > 1 && segment >= 0 && segment < total_segments);
|
||||||
|
uint64_t delta = std::numeric_limits<uint64_t>::max() / total_segments;
|
||||||
|
return dht::token::from_int64(std::numeric_limits<int64_t>::min() + delta * segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
static dht::partition_range get_range_for_segment(int segment, int total_segments) {
|
||||||
|
if (total_segments == 1) {
|
||||||
|
return dht::partition_range::make_open_ended_both_sides();
|
||||||
|
}
|
||||||
|
if (segment == 0) {
|
||||||
|
dht::token ending_token = token_for_segment(1, total_segments);
|
||||||
|
return dht::partition_range::make_ending_with(
|
||||||
|
dht::partition_range::bound(dht::ring_position::ending_at(ending_token), false));
|
||||||
|
} else if (segment == total_segments - 1) {
|
||||||
|
dht::token starting_token = token_for_segment(segment, total_segments);
|
||||||
|
return dht::partition_range::make_starting_with(
|
||||||
|
dht::partition_range::bound(dht::ring_position::starting_at(starting_token)));
|
||||||
|
} else {
|
||||||
|
dht::token starting_token = token_for_segment(segment, total_segments);
|
||||||
|
dht::token ending_token = token_for_segment(segment + 1, total_segments);
|
||||||
|
return dht::partition_range::make(
|
||||||
|
dht::partition_range::bound(dht::ring_position::starting_at(starting_token)),
|
||||||
|
dht::partition_range::bound(dht::ring_position::ending_at(ending_token), false)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(sarna):
|
// TODO(sarna):
|
||||||
// 1. Paging must have 1MB boundary according to the docs. IIRC we do have a replica-side reply size limit though - verify.
|
// 1. Paging must have 1MB boundary according to the docs. IIRC we do have a replica-side reply size limit though - verify.
|
||||||
// 2. Filtering - by passing appropriately created restrictions to pager as a last parameter
|
// 2. Filtering - by passing appropriately created restrictions to pager as a last parameter
|
||||||
// 3. Proper timeouts instead of gc_clock::now() and db::no_timeout
|
// 3. Proper timeouts instead of gc_clock::now() and db::no_timeout
|
||||||
// 4. Implement parallel scanning via Segments
|
|
||||||
future<executor::request_return_type> executor::scan(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
future<executor::request_return_type> executor::scan(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||||
_stats.api_operations.scan++;
|
_stats.api_operations.scan++;
|
||||||
elogger.trace("Scanning {}", request);
|
elogger.trace("Scanning {}", request);
|
||||||
@@ -2954,10 +2992,21 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
|
|||||||
return make_ready_future<request_return_type>(api_error("ValidationException",
|
return make_ready_future<request_return_type>(api_error("ValidationException",
|
||||||
"FilterExpression is not yet implemented in alternator"));
|
"FilterExpression is not yet implemented in alternator"));
|
||||||
}
|
}
|
||||||
if (get_int_attribute(request, "Segment") || get_int_attribute(request, "TotalSegments")) {
|
auto segment = get_int_attribute(request, "Segment");
|
||||||
// FIXME: need to support parallel scan. See issue #5059.
|
auto total_segments = get_int_attribute(request, "TotalSegments");
|
||||||
return make_ready_future<request_return_type>(api_error("ValidationException",
|
if (segment || total_segments) {
|
||||||
"Scan Segment/TotalSegments is not yet implemented in alternator"));
|
if (!segment || !total_segments) {
|
||||||
|
return make_ready_future<request_return_type>(api_error("ValidationException",
|
||||||
|
"Both Segment and TotalSegments attributes need to be present for a parallel scan"));
|
||||||
|
}
|
||||||
|
if (*segment < 0 || *segment >= *total_segments) {
|
||||||
|
return make_ready_future<request_return_type>(api_error("ValidationException",
|
||||||
|
"Segment must be non-negative and less than TotalSegments"));
|
||||||
|
}
|
||||||
|
if (*total_segments < 0 || *total_segments > 1000000) {
|
||||||
|
return make_ready_future<request_return_type>(api_error("ValidationException",
|
||||||
|
"TotalSegments must be non-negative and less or equal to 1000000"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey");
|
rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey");
|
||||||
@@ -2976,7 +3025,12 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
|
|||||||
|
|
||||||
auto attrs_to_get = calculate_attrs_to_get(request);
|
auto attrs_to_get = calculate_attrs_to_get(request);
|
||||||
|
|
||||||
dht::partition_range_vector partition_ranges{dht::partition_range::make_open_ended_both_sides()};
|
dht::partition_range_vector partition_ranges;
|
||||||
|
if (segment) {
|
||||||
|
partition_ranges.push_back(get_range_for_segment(*segment, *total_segments));
|
||||||
|
} else {
|
||||||
|
partition_ranges.push_back(dht::partition_range::make_open_ended_both_sides());
|
||||||
|
}
|
||||||
std::vector<query::clustering_range> ck_bounds{query::clustering_range::make_open_ended_both_sides()};
|
std::vector<query::clustering_range> ck_bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||||
|
|
||||||
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions;
|
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions;
|
||||||
@@ -2986,14 +3040,15 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
|
|||||||
partition_ranges = filtering_restrictions->get_partition_key_ranges(query_options);
|
partition_ranges = filtering_restrictions->get_partition_key_ranges(query_options);
|
||||||
ck_bounds = filtering_restrictions->get_clustering_bounds(query_options);
|
ck_bounds = filtering_restrictions->get_clustering_bounds(query_options);
|
||||||
}
|
}
|
||||||
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, trace_state, std::move(permit));
|
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||||
|
std::move(filtering_restrictions), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit));
|
||||||
}
|
}
|
||||||
|
|
||||||
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) {
|
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) {
|
||||||
if (attrs.Size() != 1) {
|
if (attrs.Size() != 1) {
|
||||||
throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs));
|
throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs));
|
||||||
}
|
}
|
||||||
bytes raw_value = pk_cdef.type->from_string(attrs[0][type_to_string(pk_cdef.type)].GetString());
|
bytes raw_value = get_key_from_typed_value(attrs[0], pk_cdef);
|
||||||
partition_key pk = partition_key::from_singular(*schema, pk_cdef.type->deserialize(raw_value));
|
partition_key pk = partition_key::from_singular(*schema, pk_cdef.type->deserialize(raw_value));
|
||||||
auto decorated_key = dht::decorate_key(*schema, pk);
|
auto decorated_key = dht::decorate_key(*schema, pk);
|
||||||
if (op != comparison_operator_type::EQ) {
|
if (op != comparison_operator_type::EQ) {
|
||||||
@@ -3018,7 +3073,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
|||||||
if (attrs.Size() != expected_attrs_size) {
|
if (attrs.Size() != expected_attrs_size) {
|
||||||
throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs));
|
throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs));
|
||||||
}
|
}
|
||||||
bytes raw_value = ck_cdef.type->from_string(attrs[0][type_to_string(ck_cdef.type)].GetString());
|
bytes raw_value = get_key_from_typed_value(attrs[0], ck_cdef);
|
||||||
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
|
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
|
||||||
switch (op) {
|
switch (op) {
|
||||||
case comparison_operator_type::EQ:
|
case comparison_operator_type::EQ:
|
||||||
@@ -3032,7 +3087,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
|||||||
case comparison_operator_type::GT:
|
case comparison_operator_type::GT:
|
||||||
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false));
|
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false));
|
||||||
case comparison_operator_type::BETWEEN: {
|
case comparison_operator_type::BETWEEN: {
|
||||||
bytes raw_upper_limit = ck_cdef.type->from_string(attrs[1][type_to_string(ck_cdef.type)].GetString());
|
bytes raw_upper_limit = get_key_from_typed_value(attrs[1], ck_cdef);
|
||||||
clustering_key upper_limit = clustering_key::from_single_value(*schema, raw_upper_limit);
|
clustering_key upper_limit = clustering_key::from_single_value(*schema, raw_upper_limit);
|
||||||
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit));
|
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit));
|
||||||
}
|
}
|
||||||
@@ -3045,9 +3100,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum
|
|||||||
if (!ck_cdef.type->is_compatible_with(*utf8_type)) {
|
if (!ck_cdef.type->is_compatible_with(*utf8_type)) {
|
||||||
throw api_error("ValidationException", format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type)));
|
throw api_error("ValidationException", format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type)));
|
||||||
}
|
}
|
||||||
std::string raw_upper_limit_str = attrs[0][type_to_string(ck_cdef.type)].GetString();
|
return get_clustering_range_for_begins_with(std::move(raw_value), ck, schema, ck_cdef.type);
|
||||||
bytes raw_upper_limit = ck_cdef.type->from_string(raw_upper_limit_str);
|
|
||||||
return get_clustering_range_for_begins_with(std::move(raw_upper_limit), ck, schema, ck_cdef.type);
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
throw api_error("ValidationException", format("Unknown primary key bound passed: {}", int(op)));
|
throw api_error("ValidationException", format("Unknown primary key bound passed: {}", int(op)));
|
||||||
@@ -3429,11 +3482,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
|
|||||||
if (rjson::find(request, "FilterExpression")) {
|
if (rjson::find(request, "FilterExpression")) {
|
||||||
return make_ready_future<request_return_type>(api_error("ValidationException", "FilterExpression is not yet implemented in alternator"));
|
return make_ready_future<request_return_type>(api_error("ValidationException", "FilterExpression is not yet implemented in alternator"));
|
||||||
}
|
}
|
||||||
bool forward = get_bool_attribute(request, "ScanIndexForward", true);
|
const bool forward = get_bool_attribute(request, "ScanIndexForward", true);
|
||||||
if (!forward) {
|
|
||||||
// FIXME: need to support the !forward (i.e., reverse sort order) case. See issue #5153.
|
|
||||||
return make_ready_future<request_return_type>(api_error("ValidationException", "ScanIndexForward=false is not yet implemented in alternator"));
|
|
||||||
}
|
|
||||||
|
|
||||||
rjson::value* key_conditions = rjson::find(request, "KeyConditions");
|
rjson::value* key_conditions = rjson::find(request, "KeyConditions");
|
||||||
rjson::value* key_condition_expression = rjson::find(request, "KeyConditionExpression");
|
rjson::value* key_condition_expression = rjson::find(request, "KeyConditionExpression");
|
||||||
@@ -3476,7 +3525,10 @@ future<executor::request_return_type> executor::query(client_state& client_state
|
|||||||
}
|
}
|
||||||
verify_all_are_used(request, "ExpressionAttributeValues", used_attribute_values, "KeyConditionExpression");
|
verify_all_are_used(request, "ExpressionAttributeValues", used_attribute_values, "KeyConditionExpression");
|
||||||
verify_all_are_used(request, "ExpressionAttributeNames", used_attribute_names, "KeyConditionExpression");
|
verify_all_are_used(request, "ExpressionAttributeNames", used_attribute_names, "KeyConditionExpression");
|
||||||
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, std::move(trace_state), std::move(permit));
|
query::partition_slice::option_set opts;
|
||||||
|
opts.set_if<query::partition_slice::option::reversed>(!forward);
|
||||||
|
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
|
||||||
|
std::move(filtering_restrictions), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit));
|
||||||
}
|
}
|
||||||
|
|
||||||
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {
|
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {
|
||||||
@@ -3567,12 +3619,12 @@ static std::map<sstring, sstring> get_network_topology_options(int rf) {
|
|||||||
// manually create the keyspace to override this predefined behavior.
|
// manually create the keyspace to override this predefined behavior.
|
||||||
future<> executor::create_keyspace(std::string_view keyspace_name) {
|
future<> executor::create_keyspace(std::string_view keyspace_name) {
|
||||||
sstring keyspace_name_str(keyspace_name);
|
sstring keyspace_name_str(keyspace_name);
|
||||||
return gms::get_up_endpoint_count().then([this, keyspace_name_str = std::move(keyspace_name_str)] (int up_endpoint_count) {
|
return gms::get_all_endpoint_count().then([this, keyspace_name_str = std::move(keyspace_name_str)] (int endpoint_count) {
|
||||||
int rf = 3;
|
int rf = 3;
|
||||||
if (up_endpoint_count < rf) {
|
if (endpoint_count < rf) {
|
||||||
rf = 1;
|
rf = 1;
|
||||||
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} live nodes.",
|
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.",
|
||||||
keyspace_name_str, rf, up_endpoint_count);
|
keyspace_name_str, rf, endpoint_count);
|
||||||
}
|
}
|
||||||
auto opts = get_network_topology_options(rf);
|
auto opts = get_network_topology_options(rf);
|
||||||
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
|
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
|
||||||
|
|||||||
@@ -54,26 +54,22 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) {
|
|||||||
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
|
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
|
static ss::token_range token_range_endpoints_to_json(const dht::token_range_endpoints& d) {
|
||||||
std::vector<ss::token_range> res;
|
ss::token_range r;
|
||||||
for (auto d : service::get_local_storage_service().describe_ring(keyspace)) {
|
r.start_token = d._start_token;
|
||||||
ss::token_range r;
|
r.end_token = d._end_token;
|
||||||
r.start_token = d._start_token;
|
r.endpoints = d._endpoints;
|
||||||
r.end_token = d._end_token;
|
r.rpc_endpoints = d._rpc_endpoints;
|
||||||
r.endpoints = d._endpoints;
|
for (auto det : d._endpoint_details) {
|
||||||
r.rpc_endpoints = d._rpc_endpoints;
|
ss::endpoint_detail ed;
|
||||||
for (auto det : d._endpoint_details) {
|
ed.host = det._host;
|
||||||
ss::endpoint_detail ed;
|
ed.datacenter = det._datacenter;
|
||||||
ed.host = det._host;
|
if (det._rack != "") {
|
||||||
ed.datacenter = det._datacenter;
|
ed.rack = det._rack;
|
||||||
if (det._rack != "") {
|
|
||||||
ed.rack = det._rack;
|
|
||||||
}
|
|
||||||
r.endpoint_details.push(ed);
|
|
||||||
}
|
}
|
||||||
res.push_back(r);
|
r.endpoint_details.push(ed);
|
||||||
}
|
}
|
||||||
return res;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
using ks_cf_func = std::function<future<json::json_return_type>(http_context&, std::unique_ptr<request>, sstring, std::vector<sstring>)>;
|
using ks_cf_func = std::function<future<json::json_return_type>(http_context&, std::unique_ptr<request>, sstring, std::vector<sstring>)>;
|
||||||
@@ -175,13 +171,13 @@ void set_storage_service(http_context& ctx, routes& r) {
|
|||||||
return make_ready_future<json::json_return_type>(res);
|
return make_ready_future<json::json_return_type>(res);
|
||||||
});
|
});
|
||||||
|
|
||||||
ss::describe_any_ring.set(r, [&ctx](const_req req) {
|
ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||||
return describe_ring("");
|
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
|
||||||
});
|
});
|
||||||
|
|
||||||
ss::describe_ring.set(r, [&ctx](const_req req) {
|
ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||||
auto keyspace = validate_keyspace(ctx, req.param);
|
auto keyspace = validate_keyspace(ctx, req->param);
|
||||||
return describe_ring(keyspace);
|
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
|
||||||
});
|
});
|
||||||
|
|
||||||
ss::get_host_id_map.set(r, [&ctx](const_req req) {
|
ss::get_host_id_map.set(r, [&ctx](const_req req) {
|
||||||
@@ -1000,6 +996,9 @@ void set_snapshot(http_context& ctx, routes& r) {
|
|||||||
if (column_family.empty()) {
|
if (column_family.empty()) {
|
||||||
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
|
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
|
||||||
} else {
|
} else {
|
||||||
|
if (keynames.empty()) {
|
||||||
|
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||||
|
}
|
||||||
if (keynames.size() > 1) {
|
if (keynames.size() > 1) {
|
||||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,6 +33,7 @@
|
|||||||
|
|
||||||
#include "auth/resource.hh"
|
#include "auth/resource.hh"
|
||||||
#include "seastarx.hh"
|
#include "seastarx.hh"
|
||||||
|
#include "exceptions/exceptions.hh"
|
||||||
|
|
||||||
namespace auth {
|
namespace auth {
|
||||||
|
|
||||||
@@ -52,9 +53,9 @@ struct role_config_update final {
|
|||||||
///
|
///
|
||||||
/// A logical argument error for a role-management operation.
|
/// A logical argument error for a role-management operation.
|
||||||
///
|
///
|
||||||
class roles_argument_exception : public std::invalid_argument {
|
class roles_argument_exception : public exceptions::invalid_request_exception {
|
||||||
public:
|
public:
|
||||||
using std::invalid_argument::invalid_argument;
|
using exceptions::invalid_request_exception::invalid_request_exception;
|
||||||
};
|
};
|
||||||
|
|
||||||
class role_already_exists : public roles_argument_exception {
|
class role_already_exists : public roles_argument_exception {
|
||||||
|
|||||||
@@ -30,10 +30,12 @@ std::atomic<int64_t> clocks_offset;
|
|||||||
|
|
||||||
std::ostream& operator<<(std::ostream& os, db_clock::time_point tp) {
|
std::ostream& operator<<(std::ostream& os, db_clock::time_point tp) {
|
||||||
auto t = db_clock::to_time_t(tp);
|
auto t = db_clock::to_time_t(tp);
|
||||||
return os << std::put_time(std::gmtime(&t), "%Y/%m/%d %T");
|
::tm t_buf;
|
||||||
|
return os << std::put_time(::gmtime_r(&t, &t_buf), "%Y/%m/%d %T");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string format_timestamp(api::timestamp_type ts) {
|
std::string format_timestamp(api::timestamp_type ts) {
|
||||||
auto t = std::time_t(std::chrono::duration_cast<std::chrono::seconds>(api::timestamp_clock::duration(ts)).count());
|
auto t = std::time_t(std::chrono::duration_cast<std::chrono::seconds>(api::timestamp_clock::duration(ts)).count());
|
||||||
return format("{}", std::put_time(std::gmtime(&t), "%Y/%m/%d %T"));
|
::tm t_buf;
|
||||||
|
return format("{}", std::put_time(::gmtime_r(&t, &t_buf), "%Y/%m/%d %T"));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,6 +68,7 @@ batch_statement::batch_statement(int bound_terms, type type_,
|
|||||||
, _has_conditions(boost::algorithm::any_of(_statements, [] (auto&& s) { return s.statement->has_conditions(); }))
|
, _has_conditions(boost::algorithm::any_of(_statements, [] (auto&& s) { return s.statement->has_conditions(); }))
|
||||||
, _stats(stats)
|
, _stats(stats)
|
||||||
{
|
{
|
||||||
|
validate();
|
||||||
if (has_conditions()) {
|
if (has_conditions()) {
|
||||||
// A batch can be created not only by raw::batch_statement::prepare, but also by
|
// A batch can be created not only by raw::batch_statement::prepare, but also by
|
||||||
// cql_server::connection::process_batch, which doesn't call any methods of
|
// cql_server::connection::process_batch, which doesn't call any methods of
|
||||||
@@ -448,7 +449,6 @@ batch_statement::prepare(database& db, cql_stats& stats) {
|
|||||||
prep_attrs->collect_marker_specification(bound_names);
|
prep_attrs->collect_marker_specification(bound_names);
|
||||||
|
|
||||||
cql3::statements::batch_statement batch_statement_(bound_names.size(), _type, std::move(statements), std::move(prep_attrs), stats);
|
cql3::statements::batch_statement batch_statement_(bound_names.size(), _type, std::move(statements), std::move(prep_attrs), stats);
|
||||||
batch_statement_.validate();
|
|
||||||
|
|
||||||
std::vector<uint16_t> partition_key_bind_indices;
|
std::vector<uint16_t> partition_key_bind_indices;
|
||||||
if (!have_multiple_cfs && batch_statement_.get_statements().size() > 0) {
|
if (!have_multiple_cfs && batch_statement_.get_statements().size() > 0) {
|
||||||
|
|||||||
@@ -434,6 +434,12 @@ GCC6_CONCEPT(
|
|||||||
static KeyType
|
static KeyType
|
||||||
generate_base_key_from_index_pk(const partition_key& index_pk, const std::optional<clustering_key>& index_ck, const schema& base_schema, const schema& view_schema) {
|
generate_base_key_from_index_pk(const partition_key& index_pk, const std::optional<clustering_key>& index_ck, const schema& base_schema, const schema& view_schema) {
|
||||||
const auto& base_columns = std::is_same_v<KeyType, partition_key> ? base_schema.partition_key_columns() : base_schema.clustering_key_columns();
|
const auto& base_columns = std::is_same_v<KeyType, partition_key> ? base_schema.partition_key_columns() : base_schema.clustering_key_columns();
|
||||||
|
|
||||||
|
// An empty key in the index paging state translates to an empty base key
|
||||||
|
if (index_pk.is_empty() && !index_ck) {
|
||||||
|
return KeyType::make_empty();
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<bytes_view> exploded_base_key;
|
std::vector<bytes_view> exploded_base_key;
|
||||||
exploded_base_key.reserve(base_columns.size());
|
exploded_base_key.reserve(base_columns.size());
|
||||||
|
|
||||||
@@ -507,8 +513,7 @@ indexed_table_select_statement::do_execute_base_query(
|
|||||||
if (old_paging_state && concurrency == 1) {
|
if (old_paging_state && concurrency == 1) {
|
||||||
auto base_pk = generate_base_key_from_index_pk<partition_key>(old_paging_state->get_partition_key(),
|
auto base_pk = generate_base_key_from_index_pk<partition_key>(old_paging_state->get_partition_key(),
|
||||||
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
|
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
|
||||||
if (_schema->clustering_key_size() > 0) {
|
if (old_paging_state->get_clustering_key() && _schema->clustering_key_size() > 0) {
|
||||||
assert(old_paging_state->get_clustering_key().has_value());
|
|
||||||
auto base_ck = generate_base_key_from_index_pk<clustering_key>(old_paging_state->get_partition_key(),
|
auto base_ck = generate_base_key_from_index_pk<clustering_key>(old_paging_state->get_partition_key(),
|
||||||
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
|
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
|
||||||
command->slice.set_range(*_schema, base_pk,
|
command->slice.set_range(*_schema, base_pk,
|
||||||
|
|||||||
@@ -614,11 +614,17 @@ public:
|
|||||||
future<sseg_ptr> terminate() {
|
future<sseg_ptr> terminate() {
|
||||||
assert(_closed);
|
assert(_closed);
|
||||||
if (!std::exchange(_terminated, true)) {
|
if (!std::exchange(_terminated, true)) {
|
||||||
clogger.trace("{} is closed but not terminated.", *this);
|
// write a terminating zero block iff we are ending (a reused)
|
||||||
if (_buffer.empty()) {
|
// block before actual file end.
|
||||||
new_buffer(0);
|
// we should only get here when all actual data is
|
||||||
|
// already flushed (see below, close()).
|
||||||
|
if (size_on_disk() < _segment_manager->max_size) {
|
||||||
|
clogger.trace("{} is closed but not terminated.", *this);
|
||||||
|
if (_buffer.empty()) {
|
||||||
|
new_buffer(0);
|
||||||
|
}
|
||||||
|
return cycle(true, true);
|
||||||
}
|
}
|
||||||
return cycle(true, true);
|
|
||||||
}
|
}
|
||||||
return make_ready_future<sseg_ptr>(shared_from_this());
|
return make_ready_future<sseg_ptr>(shared_from_this());
|
||||||
}
|
}
|
||||||
@@ -2127,8 +2133,9 @@ db::commitlog::read_log_file(const sstring& filename, const sstring& pfx, seasta
|
|||||||
}).handle_exception([w](auto ep) {
|
}).handle_exception([w](auto ep) {
|
||||||
w->s.set_exception(ep);
|
w->s.set_exception(ep);
|
||||||
});
|
});
|
||||||
|
// #6265 - must keep subscription alive.
|
||||||
return ret.done();
|
auto res = ret.done();
|
||||||
|
return res.finally([ret = std::move(ret)] {});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
11
db/config.cc
11
db/config.cc
@@ -681,7 +681,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
|||||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
|
||||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||||
@@ -689,6 +689,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
|||||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||||
|
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
|
||||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
||||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||||
@@ -859,7 +860,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
||||||
if (experimental()) {
|
if (experimental() && f != experimental_features_t::UNUSED) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
const auto& optval = experimental_features();
|
const auto& optval = experimental_features();
|
||||||
@@ -911,11 +912,13 @@ const db::extensions& db::config::extensions() const {
|
|||||||
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||||
// We decided against using the construct-on-first-use idiom here:
|
// We decided against using the construct-on-first-use idiom here:
|
||||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||||
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
|
// Lightweight transactions are no longer experimental. Map them
|
||||||
|
// to UNUSED switch for a while, then remove altogether.
|
||||||
|
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
|
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
|
||||||
return {LWT, UDF, CDC};
|
return {UDF, CDC};
|
||||||
}
|
}
|
||||||
|
|
||||||
template struct utils::config_file::named_value<seastar::log_level>;
|
template struct utils::config_file::named_value<seastar::log_level>;
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ namespace db {
|
|||||||
|
|
||||||
/// Enumeration of all valid values for the `experimental` config entry.
|
/// Enumeration of all valid values for the `experimental` config entry.
|
||||||
struct experimental_features_t {
|
struct experimental_features_t {
|
||||||
enum feature { LWT, UDF, CDC };
|
enum feature { UNUSED, UDF, CDC };
|
||||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||||
static std::vector<enum_option<experimental_features_t>> all();
|
static std::vector<enum_option<experimental_features_t>> all();
|
||||||
};
|
};
|
||||||
@@ -278,6 +278,7 @@ public:
|
|||||||
named_value<uint32_t> shutdown_announce_in_ms;
|
named_value<uint32_t> shutdown_announce_in_ms;
|
||||||
named_value<bool> developer_mode;
|
named_value<bool> developer_mode;
|
||||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||||
|
named_value<int32_t> force_gossip_generation;
|
||||||
named_value<bool> experimental;
|
named_value<bool> experimental;
|
||||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||||
named_value<size_t> lsa_reclamation_step;
|
named_value<size_t> lsa_reclamation_step;
|
||||||
|
|||||||
@@ -703,6 +703,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
|||||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||||
// (last_modification - manager::hints_timer_period) old.
|
// (last_modification - manager::hints_timer_period) old.
|
||||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||||
|
ctx_ptr->rps_set.erase(rp);
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -725,6 +726,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
|||||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||||
++this->shard_stats().discarded;
|
++this->shard_stats().discarded;
|
||||||
}
|
}
|
||||||
|
ctx_ptr->rps_set.erase(rp);
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}).finally([units = std::move(units), ctx_ptr] {});
|
}).finally([units = std::move(units), ctx_ptr] {});
|
||||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||||
|
|||||||
@@ -187,7 +187,7 @@ schema_ptr batchlog() {
|
|||||||
{{"cf_id", uuid_type}},
|
{{"cf_id", uuid_type}},
|
||||||
// regular columns
|
// regular columns
|
||||||
{
|
{
|
||||||
{"in_progress_ballot", timeuuid_type},
|
{"promise", timeuuid_type},
|
||||||
{"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl
|
{"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl
|
||||||
{"most_recent_commit_at", timeuuid_type},
|
{"most_recent_commit_at", timeuuid_type},
|
||||||
{"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl
|
{"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl
|
||||||
@@ -2196,13 +2196,13 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
|
|||||||
// FIXME: we need execute_cql_with_now()
|
// FIXME: we need execute_cql_with_now()
|
||||||
(void)now;
|
(void)now;
|
||||||
auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id());
|
auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id());
|
||||||
return f.then([s] (shared_ptr<cql3::untyped_result_set> results) mutable {
|
return f.then([s, key] (shared_ptr<cql3::untyped_result_set> results) mutable {
|
||||||
if (results->empty()) {
|
if (results->empty()) {
|
||||||
return service::paxos::paxos_state();
|
return service::paxos::paxos_state();
|
||||||
}
|
}
|
||||||
auto& row = results->one();
|
auto& row = results->one();
|
||||||
auto promised = row.has("in_progress_ballot")
|
auto promised = row.has("promise")
|
||||||
? row.get_as<utils::UUID>("in_progress_ballot") : utils::UUID_gen::min_time_UUID(0);
|
? row.get_as<utils::UUID>("promise") : utils::UUID_gen::min_time_UUID(0);
|
||||||
|
|
||||||
std::optional<service::paxos::proposal> accepted;
|
std::optional<service::paxos::proposal> accepted;
|
||||||
if (row.has("proposal")) {
|
if (row.has("proposal")) {
|
||||||
@@ -2211,9 +2211,14 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::optional<service::paxos::proposal> most_recent;
|
std::optional<service::paxos::proposal> most_recent;
|
||||||
if (row.has("most_recent_commit")) {
|
if (row.has("most_recent_commit_at")) {
|
||||||
|
// the value can be missing if it was pruned, suply empty one since
|
||||||
|
// it will not going to be used anyway
|
||||||
|
auto fm = row.has("most_recent_commit") ?
|
||||||
|
ser::deserialize_from_buffer<>(row.get_blob("most_recent_commit"), boost::type<frozen_mutation>(), 0) :
|
||||||
|
freeze(mutation(s, key));
|
||||||
most_recent = service::paxos::proposal(row.get_as<utils::UUID>("most_recent_commit_at"),
|
most_recent = service::paxos::proposal(row.get_as<utils::UUID>("most_recent_commit_at"),
|
||||||
ser::deserialize_from_buffer<>(row.get_blob("most_recent_commit"), boost::type<frozen_mutation>(), 0));
|
std::move(fm));
|
||||||
}
|
}
|
||||||
|
|
||||||
return service::paxos::paxos_state(promised, std::move(accepted), std::move(most_recent));
|
return service::paxos::paxos_state(promised, std::move(accepted), std::move(most_recent));
|
||||||
@@ -2228,7 +2233,7 @@ static int32_t paxos_ttl_sec(const schema& s) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
|
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
|
||||||
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||||
return execute_cql_with_timeout(cql,
|
return execute_cql_with_timeout(cql,
|
||||||
timeout,
|
timeout,
|
||||||
utils::UUID_gen::micros_timestamp(ballot),
|
utils::UUID_gen::micros_timestamp(ballot),
|
||||||
@@ -2240,13 +2245,14 @@ future<> save_paxos_promise(const schema& s, const partition_key& key, const uti
|
|||||||
}
|
}
|
||||||
|
|
||||||
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout) {
|
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout) {
|
||||||
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ?, proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||||
partition_key_view key = proposal.update.key(s);
|
partition_key_view key = proposal.update.key(s);
|
||||||
return execute_cql_with_timeout(cql,
|
return execute_cql_with_timeout(cql,
|
||||||
timeout,
|
timeout,
|
||||||
utils::UUID_gen::micros_timestamp(proposal.ballot),
|
utils::UUID_gen::micros_timestamp(proposal.ballot),
|
||||||
paxos_ttl_sec(s),
|
paxos_ttl_sec(s),
|
||||||
proposal.ballot,
|
proposal.ballot,
|
||||||
|
proposal.ballot,
|
||||||
ser::serialize_to_buffer<bytes>(proposal.update),
|
ser::serialize_to_buffer<bytes>(proposal.update),
|
||||||
to_legacy(*key.get_compound_type(s), key.representation()),
|
to_legacy(*key.get_compound_type(s), key.representation()),
|
||||||
s.id()
|
s.id()
|
||||||
@@ -2274,6 +2280,20 @@ future<> save_paxos_decision(const schema& s, const service::paxos::proposal& de
|
|||||||
).discard_result();
|
).discard_result();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
|
||||||
|
// This should be called only if a learn stage succeeded on all replicas.
|
||||||
|
// In this case we can remove learned paxos value using ballot's timestamp which
|
||||||
|
// guarantees that if there is more recent round it will not be affected.
|
||||||
|
static auto cql = format("DELETE most_recent_commit FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS);
|
||||||
|
|
||||||
|
return execute_cql_with_timeout(cql,
|
||||||
|
timeout,
|
||||||
|
utils::UUID_gen::micros_timestamp(ballot),
|
||||||
|
to_legacy(*key.get_compound_type(s), key.representation()),
|
||||||
|
s.id()
|
||||||
|
).discard_result();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace system_keyspace
|
} // namespace system_keyspace
|
||||||
|
|
||||||
sstring system_keyspace_name() {
|
sstring system_keyspace_name() {
|
||||||
|
|||||||
@@ -647,6 +647,7 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
|
|||||||
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
|
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
|
||||||
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
|
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
|
||||||
future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
|
future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
|
||||||
|
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
|
||||||
|
|
||||||
} // namespace system_keyspace
|
} // namespace system_keyspace
|
||||||
} // namespace db
|
} // namespace db
|
||||||
|
|||||||
@@ -1101,6 +1101,8 @@ future<> mutate_MV(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (paired_endpoint) {
|
if (paired_endpoint) {
|
||||||
|
// If paired endpoint is present, remove it from the list of pending endpoints to avoid duplicates
|
||||||
|
pending_endpoints.erase(std::remove(pending_endpoints.begin(), pending_endpoints.end(), *paired_endpoint), pending_endpoints.end());
|
||||||
// When paired endpoint is the local node, we can just apply
|
// When paired endpoint is the local node, we can just apply
|
||||||
// the mutation locally, unless there are pending endpoints, in
|
// the mutation locally, unless there are pending endpoints, in
|
||||||
// which case we want to do an ordinary write so the view mutation
|
// which case we want to do an ordinary write so the view mutation
|
||||||
|
|||||||
@@ -118,7 +118,7 @@ token token::midpoint(const token& t1, const token& t2) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
token token::get_random_token() {
|
token token::get_random_token() {
|
||||||
return {kind::key, dht::get_random_number<int64_t>()};
|
return token(kind::key, dht::get_random_number<uint64_t>());
|
||||||
}
|
}
|
||||||
|
|
||||||
token token::from_sstring(const sstring& t) {
|
token token::from_sstring(const sstring& t) {
|
||||||
|
|||||||
24
dht/token.hh
24
dht/token.hh
@@ -58,19 +58,27 @@ public:
|
|||||||
, _data(normalize(d)) { }
|
, _data(normalize(d)) { }
|
||||||
|
|
||||||
token(kind k, const bytes& b) : _kind(std::move(k)) {
|
token(kind k, const bytes& b) : _kind(std::move(k)) {
|
||||||
if (b.size() != sizeof(_data)) {
|
if (_kind != kind::key) {
|
||||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
_data = 0;
|
||||||
|
} else {
|
||||||
|
if (b.size() != sizeof(_data)) {
|
||||||
|
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||||
|
}
|
||||||
|
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||||
|
_data = net::ntoh(_data);
|
||||||
}
|
}
|
||||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
|
||||||
_data = net::ntoh(_data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
token(kind k, bytes_view b) : _kind(std::move(k)) {
|
token(kind k, bytes_view b) : _kind(std::move(k)) {
|
||||||
if (b.size() != sizeof(_data)) {
|
if (_kind != kind::key) {
|
||||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
_data = 0;
|
||||||
|
} else {
|
||||||
|
if (b.size() != sizeof(_data)) {
|
||||||
|
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||||
|
}
|
||||||
|
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||||
|
_data = net::ntoh(_data);
|
||||||
}
|
}
|
||||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
|
||||||
_data = net::ntoh(_data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool is_minimum() const {
|
bool is_minimum() const {
|
||||||
|
|||||||
1
dist/common/scripts/scylla_fstrim_setup
vendored
1
dist/common/scripts/scylla_fstrim_setup
vendored
@@ -31,5 +31,6 @@ if __name__ == '__main__':
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
if is_systemd():
|
if is_systemd():
|
||||||
systemd_unit('scylla-fstrim.timer').unmask()
|
systemd_unit('scylla-fstrim.timer').unmask()
|
||||||
|
systemd_unit('scylla-fstrim.timer').enable()
|
||||||
if is_redhat_variant():
|
if is_redhat_variant():
|
||||||
systemd_unit('fstrim.timer').disable()
|
systemd_unit('fstrim.timer').disable()
|
||||||
|
|||||||
6
dist/common/scripts/scylla_util.py
vendored
6
dist/common/scripts/scylla_util.py
vendored
@@ -182,7 +182,7 @@ class aws_instance:
|
|||||||
instance_size = self.instance_size()
|
instance_size = self.instance_size()
|
||||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||||
return 'ixgbevf'
|
return 'ixgbevf'
|
||||||
if instance_class in ['c5', 'c5d', 'f1', 'g3', 'h1', 'i3', 'i3en', 'm5', 'm5d', 'p2', 'p3', 'r4', 'x1']:
|
if instance_class in ['a1', 'c5', 'c5d', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||||
return 'ena'
|
return 'ena'
|
||||||
if instance_class == 'm4':
|
if instance_class == 'm4':
|
||||||
if instance_size == '16xlarge':
|
if instance_size == '16xlarge':
|
||||||
@@ -481,8 +481,8 @@ def parse_scylla_dirs_with_default(conf='/etc/scylla/scylla.yaml'):
|
|||||||
y['data_file_directories'] = [os.path.join(y['workdir'], 'data')]
|
y['data_file_directories'] = [os.path.join(y['workdir'], 'data')]
|
||||||
for t in [ "commitlog", "hints", "view_hints", "saved_caches" ]:
|
for t in [ "commitlog", "hints", "view_hints", "saved_caches" ]:
|
||||||
key = "%s_directory" % t
|
key = "%s_directory" % t
|
||||||
if key not in y or not y[k]:
|
if key not in y or not y[key]:
|
||||||
y[k] = os.path.join(y['workdir'], t)
|
y[key] = os.path.join(y['workdir'], t)
|
||||||
return y
|
return y
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
|||||||
ENV container docker
|
ENV container docker
|
||||||
|
|
||||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.0/latest/scylla.repo
|
||||||
ARG VERSION=666.development
|
ARG VERSION=4.0.*
|
||||||
|
|
||||||
ADD scylla_bashrc /scylla_bashrc
|
ADD scylla_bashrc /scylla_bashrc
|
||||||
|
|
||||||
|
|||||||
@@ -21,10 +21,6 @@ DynamoDB API requests.
|
|||||||
For example., "`--alternator-port=8000`" on the command line will run
|
For example., "`--alternator-port=8000`" on the command line will run
|
||||||
Alternator on port 8000 - the traditional port used by DynamoDB.
|
Alternator on port 8000 - the traditional port used by DynamoDB.
|
||||||
|
|
||||||
Alternator uses Scylla's LWT feature, which is currently considered
|
|
||||||
experimental and needs to be seperately enabled as well, e.g. with the
|
|
||||||
"`--experimental=on`" option.
|
|
||||||
|
|
||||||
By default, Scylla listens on this port on all network interfaces.
|
By default, Scylla listens on this port on all network interfaces.
|
||||||
To listen only on a specific interface, pass also an "`alternator-address`"
|
To listen only on a specific interface, pass also an "`alternator-address`"
|
||||||
option.
|
option.
|
||||||
@@ -55,9 +51,8 @@ Alternator's compatibility with DynamoDB, and will be updated as the work
|
|||||||
progresses and compatibility continues to improve.
|
progresses and compatibility continues to improve.
|
||||||
|
|
||||||
### API Server
|
### API Server
|
||||||
* Transport: HTTP mostly supported, but small features like CRC header and
|
* Transport: HTTP and HTTPS are mostly supported, but small features like CRC
|
||||||
compression are still missing. HTTPS supported on top of HTTP, so small
|
header and compression are still missing.
|
||||||
features may still be missing.
|
|
||||||
* Authorization (verifying the originator of the request): implemented
|
* Authorization (verifying the originator of the request): implemented
|
||||||
on top of system\_auth.roles table. The secret key used for authorization
|
on top of system\_auth.roles table. The secret key used for authorization
|
||||||
is the salted\_hash column from the roles table, selected with:
|
is the salted\_hash column from the roles table, selected with:
|
||||||
@@ -65,20 +60,19 @@ progresses and compatibility continues to improve.
|
|||||||
By default, authorization is not enforced at all. It can be turned on
|
By default, authorization is not enforced at all. It can be turned on
|
||||||
by providing an entry in Scylla configuration:
|
by providing an entry in Scylla configuration:
|
||||||
alternator\_enforce\_authorization: true
|
alternator\_enforce\_authorization: true
|
||||||
* DNS server for load balancing: Not yet supported. Client needs to pick
|
* Load balancing: Not a part of Alternator. One should use an external load
|
||||||
one of the live Scylla nodes and send a request to it.
|
balancer or DNS server to balance the requests between the live Scylla
|
||||||
|
nodes. We plan to publish a reference example soon.
|
||||||
### Table Operations
|
### Table Operations
|
||||||
* CreateTable: Supported. Note our implementation is synchronous.
|
* CreateTable and DeleteTable: Supported. Note our implementation is synchronous.
|
||||||
|
* DescribeTable: Partial implementation. Missing creation date and size estimate.
|
||||||
* UpdateTable: Not supported.
|
* UpdateTable: Not supported.
|
||||||
* DescribeTable: Partial implementation. Missing creation date and size esitmate.
|
|
||||||
* DeleteTable: Supported. Note our implementation is synchronous.
|
|
||||||
* ListTables: Supported.
|
* ListTables: Supported.
|
||||||
### Item Operations
|
### Item Operations
|
||||||
* GetItem: Support almost complete except that projection expressions can
|
* GetItem: Support almost complete except that projection expressions can
|
||||||
only ask for top-level attributes.
|
only ask for top-level attributes.
|
||||||
* PutItem: Support almost complete except that condition expressions can
|
* PutItem: Support almost complete except that condition expressions can
|
||||||
only refer to to-level attributes.
|
only refer to to-level attributes.
|
||||||
pre-put content) not yet supported.
|
|
||||||
* UpdateItem: Nested documents are supported but updates to nested attributes
|
* UpdateItem: Nested documents are supported but updates to nested attributes
|
||||||
are not (e.g., `SET a.b[3].c=val`), and neither are nested attributes in
|
are not (e.g., `SET a.b[3].c=val`), and neither are nested attributes in
|
||||||
condition expressions.
|
condition expressions.
|
||||||
@@ -90,15 +84,14 @@ progresses and compatibility continues to improve.
|
|||||||
* BatchWriteItem: Supported. Doesn't limit the number of items (DynamoDB
|
* BatchWriteItem: Supported. Doesn't limit the number of items (DynamoDB
|
||||||
limits to 25) or size of items (400 KB) or total request size (16 MB).
|
limits to 25) or size of items (400 KB) or total request size (16 MB).
|
||||||
### Scans
|
### Scans
|
||||||
* Scan: As usual, projection expressions only support top-level attributes.
|
Scan and Query are mostly supported, with the following limitations:
|
||||||
Filter expressions (to filter some of the items) partially supported:
|
* As above, projection expressions only support top-level attributes.
|
||||||
The ScanFilter syntax is supported but FilterExpression is not yet, and
|
* Filter expressions (to filter some of the items) are only partially
|
||||||
only equality operator is supported so far.
|
supported: The ScanFilter syntax is currently only supports the equality
|
||||||
The "Select" options which allows to count items instead of returning them
|
operator, and the FilterExpression syntax is not yet supported at all.
|
||||||
is not yet supported. Parallel scan is not yet supported.
|
* The "Select" options which allows to count items instead of returning them
|
||||||
* Query: Same issues as Scan above. Additionally, missing support for
|
is not yet supported.
|
||||||
KeyConditionExpression (an alternative syntax replacing the older
|
* Parallel scan is not yet supported.
|
||||||
KeyConditions parameter which we do support).
|
|
||||||
### Secondary Indexes
|
### Secondary Indexes
|
||||||
Global Secondary Indexes (GSI) and Local Secondary Indexes (LSI) are
|
Global Secondary Indexes (GSI) and Local Secondary Indexes (LSI) are
|
||||||
implemented, with the following limitations:
|
implemented, with the following limitations:
|
||||||
@@ -116,24 +109,28 @@ implemented, with the following limitations:
|
|||||||
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
|
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
|
||||||
or LOCAL_QUORUM (strong consistency).
|
or LOCAL_QUORUM (strong consistency).
|
||||||
### Global Tables
|
### Global Tables
|
||||||
* Not yet supported: CreateGlobalTable, UpdateGlobalTable,
|
* Currently, *all* Alternator tables are created as "Global Tables", i.e., can
|
||||||
DescribeGlobalTable, ListGlobalTables, UpdateGlobalTableSettings,
|
be accessed from all of Scylla's DCs.
|
||||||
DescribeGlobalTableSettings. Implementation will use Scylla's multi-DC
|
* We do not yet support the DynamoDB API calls to make some of the tables
|
||||||
features.
|
global and others local to a particular DC: CreateGlobalTable,
|
||||||
|
UpdateGlobalTable, DescribeGlobalTable, ListGlobalTables,
|
||||||
|
UpdateGlobalTableSettings, DescribeGlobalTableSettings, and UpdateTable.
|
||||||
### Backup and Restore
|
### Backup and Restore
|
||||||
* On-demand backup: Not yet supported: CreateBackup, DescribeBackup,
|
* On-demand backup: the DynamoDB APIs are not yet supported: CreateBackup,
|
||||||
DeleteBackup, ListBackups, RestoreTableFromBackup. Implementation will
|
DescribeBackup, DeleteBackup, ListBackups, RestoreTableFromBackup.
|
||||||
use Scylla's snapshots
|
Users can use Scylla's [snapshots](https://docs.scylladb.com/operating-scylla/procedures/backup-restore/)
|
||||||
|
or [Scylla Manager](https://docs.scylladb.com/operating-scylla/manager/2.0/backup/).
|
||||||
* Continuous backup: Not yet supported: UpdateContinuousBackups,
|
* Continuous backup: Not yet supported: UpdateContinuousBackups,
|
||||||
DescribeContinuousBackups, RestoreTableToPoinInTime.
|
DescribeContinuousBackups, RestoreTableToPoinInTime.
|
||||||
### Transations
|
### Transactions
|
||||||
* Not yet supported: TransactWriteItems, TransactGetItems.
|
* Not yet supported: TransactWriteItems, TransactGetItems.
|
||||||
Note that this is a new DynamoDB feature - these are more powerful than
|
Note that this is a new DynamoDB feature - these are more powerful than
|
||||||
the old conditional updates which were "lightweight transactions".
|
the old conditional updates which were "lightweight transactions".
|
||||||
### Streams (CDC)
|
### Streams
|
||||||
* Not yet supported
|
* Scylla has experimental support for [CDC](https://docs.scylladb.com/using-scylla/cdc/)
|
||||||
|
(change data capture), but the "DynamoDB Streams" API is not yet supported.
|
||||||
### Encryption at rest
|
### Encryption at rest
|
||||||
* Supported natively by Scylla, but needs to be enabled by default.
|
* Supported by Scylla Enterprise (not in open-source). Needs to be enabled.
|
||||||
### ARNs and tags
|
### ARNs and tags
|
||||||
* ARN is generated for every alternator table
|
* ARN is generated for every alternator table
|
||||||
* Tagging can be used with the help of the following requests:
|
* Tagging can be used with the help of the following requests:
|
||||||
@@ -166,7 +163,9 @@ implemented, with the following limitations:
|
|||||||
* Not required. Scylla cache is rather advanced and there is no need to place
|
* Not required. Scylla cache is rather advanced and there is no need to place
|
||||||
a cache in front of the database: https://www.scylladb.com/2017/07/31/database-caches-not-good/
|
a cache in front of the database: https://www.scylladb.com/2017/07/31/database-caches-not-good/
|
||||||
### Metrics
|
### Metrics
|
||||||
* Several metrics are available through the Grafana/Promethues stack: https://docs.scylladb.com/operating-scylla/monitoring/ It is different than the expectations of the current DynamoDB implementation. However, our
|
* Several metrics are available through the Grafana/Prometheus stack:
|
||||||
|
https://docs.scylladb.com/operating-scylla/monitoring/
|
||||||
|
Those are different from the current DynamoDB metrics, but Scylla's
|
||||||
monitoring is rather advanced and provide more insights to the internals.
|
monitoring is rather advanced and provide more insights to the internals.
|
||||||
|
|
||||||
## Alternator design and implementation
|
## Alternator design and implementation
|
||||||
@@ -229,8 +228,3 @@ one DynamoDB feature which we cannot support safely: we cannot modify
|
|||||||
a non-top-level attribute (e.g., a.b[3].c) directly without RMW. We plan
|
a non-top-level attribute (e.g., a.b[3].c) directly without RMW. We plan
|
||||||
to fix this in a future version by rethinking the data model we use for
|
to fix this in a future version by rethinking the data model we use for
|
||||||
attributes, or rethinking our implementation of RMW (as explained above).
|
attributes, or rethinking our implementation of RMW (as explained above).
|
||||||
|
|
||||||
For reasons explained above, the data model used by Alternator to store
|
|
||||||
data on disk is still in a state of flux, and may change in future versions.
|
|
||||||
Therefore, in this early stage it is not recommended to store important
|
|
||||||
production data using Alternator.
|
|
||||||
|
|||||||
@@ -10,12 +10,10 @@ This section will guide you through the steps for setting up the cluster:
|
|||||||
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
|
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
|
||||||
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
|
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
|
||||||
add to every "docker run" command: `-p 8000:8000` before the image name
|
add to every "docker run" command: `-p 8000:8000` before the image name
|
||||||
and `--alternator-port=8000 --experimental 1` at the end. The
|
and `--alternator-port=8000` at the end. The "alternator-port" option
|
||||||
"alternator-port" option specifies on which port Scylla will listen for
|
specifies on which port Scylla will listen for the (unencrypted) DynamoDB API.
|
||||||
the (unencrypted) DynamoDB API, and "--experimental 1" is required to
|
|
||||||
enable the experimental LWT feature which Alternator uses.
|
|
||||||
For example,
|
For example,
|
||||||
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000 --experimental 1`
|
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000
|
||||||
|
|
||||||
## Testing Scylla's DynamoDB API support:
|
## Testing Scylla's DynamoDB API support:
|
||||||
### Running AWS Tic Tac Toe demo app to test the cluster:
|
### Running AWS Tic Tac Toe demo app to test the cluster:
|
||||||
|
|||||||
@@ -76,6 +76,9 @@ Scylla with issue #4139 fixed)
|
|||||||
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
|
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
|
||||||
Scylla with issue #4363 fixed)
|
Scylla with issue #4363 fixed)
|
||||||
|
|
||||||
|
bit 5: CorrectUDTsInCollections (if set, indicates that the sstable was generated
|
||||||
|
by Scylla with issue #6130 fixed)
|
||||||
|
|
||||||
## extension_attributes subcomponent
|
## extension_attributes subcomponent
|
||||||
|
|
||||||
extension_attributes = extension_attribute_count extension_attribute*
|
extension_attributes = extension_attribute_count extension_attribute*
|
||||||
|
|||||||
@@ -110,10 +110,6 @@ feature_config feature_config_from_db_config(db::config& cfg) {
|
|||||||
fcfg.enable_cdc = true;
|
fcfg.enable_cdc = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cfg.check_experimental(db::experimental_features_t::LWT)) {
|
|
||||||
fcfg.enable_lwt = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return fcfg;
|
return fcfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -178,9 +174,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
|
|||||||
if (_config.enable_cdc) {
|
if (_config.enable_cdc) {
|
||||||
features.insert(gms::features::CDC);
|
features.insert(gms::features::CDC);
|
||||||
}
|
}
|
||||||
if (_config.enable_lwt) {
|
features.insert(gms::features::LWT);
|
||||||
features.insert(gms::features::LWT);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const sstring& s : _config.disabled_features) {
|
for (const sstring& s : _config.disabled_features) {
|
||||||
features.erase(s);
|
features.erase(s);
|
||||||
|
|||||||
@@ -41,7 +41,6 @@ struct feature_config {
|
|||||||
bool enable_sstables_mc_format = false;
|
bool enable_sstables_mc_format = false;
|
||||||
bool enable_user_defined_functions = false;
|
bool enable_user_defined_functions = false;
|
||||||
bool enable_cdc = false;
|
bool enable_cdc = false;
|
||||||
bool enable_lwt = false;
|
|
||||||
std::set<sstring> disabled_features;
|
std::set<sstring> disabled_features;
|
||||||
feature_config();
|
feature_config();
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1725,8 +1725,12 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
|||||||
// message on all cpus and forard them to cpu0 to process.
|
// message on all cpus and forard them to cpu0 to process.
|
||||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||||
g.init_messaging_service_handler(do_bind);
|
g.init_messaging_service_handler(do_bind);
|
||||||
}).then([this, generation_nbr, preload_local_states] {
|
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||||
build_seeds_list();
|
build_seeds_list();
|
||||||
|
if (_cfg.force_gossip_generation() > 0) {
|
||||||
|
generation_nbr = _cfg.force_gossip_generation();
|
||||||
|
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
|
||||||
|
}
|
||||||
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
||||||
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
||||||
local_state.mark_alive();
|
local_state.mark_alive();
|
||||||
|
|||||||
@@ -591,6 +591,7 @@ public:
|
|||||||
std::map<sstring, sstring> get_simple_states();
|
std::map<sstring, sstring> get_simple_states();
|
||||||
int get_down_endpoint_count();
|
int get_down_endpoint_count();
|
||||||
int get_up_endpoint_count();
|
int get_up_endpoint_count();
|
||||||
|
int get_all_endpoint_count();
|
||||||
sstring get_endpoint_state(sstring address);
|
sstring get_endpoint_state(sstring address);
|
||||||
failure_detector& fd() { return _fd; }
|
failure_detector& fd() { return _fd; }
|
||||||
};
|
};
|
||||||
@@ -637,6 +638,12 @@ inline future<int> get_up_endpoint_count() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline future<int> get_all_endpoint_count() {
|
||||||
|
return smp::submit_to(0, [] {
|
||||||
|
return static_cast<int>(get_local_gossiper().get_endpoint_states().size());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
inline future<> set_phi_convict_threshold(double phi) {
|
inline future<> set_phi_convict_threshold(double phi) {
|
||||||
return smp::submit_to(0, [phi] {
|
return smp::submit_to(0, [phi] {
|
||||||
get_local_gossiper().fd().set_phi_convict_threshold(phi);
|
get_local_gossiper().fd().set_phi_convict_threshold(phi);
|
||||||
|
|||||||
@@ -69,7 +69,8 @@ std::ostream& gms::operator<<(std::ostream& os, const inet_address& x) {
|
|||||||
auto&& bytes = x.bytes();
|
auto&& bytes = x.bytes();
|
||||||
auto i = 0u;
|
auto i = 0u;
|
||||||
auto acc = 0u;
|
auto acc = 0u;
|
||||||
for (auto b : bytes) {
|
// extra paranoid sign extension evasion - #5808
|
||||||
|
for (uint8_t b : bytes) {
|
||||||
acc <<= 8;
|
acc <<= 8;
|
||||||
acc |= b;
|
acc |= b;
|
||||||
if ((++i & 1) == 0) {
|
if ((++i & 1) == 0) {
|
||||||
|
|||||||
@@ -76,6 +76,8 @@ fedora_packages=(
|
|||||||
python3-psutil
|
python3-psutil
|
||||||
python3-cassandra-driver
|
python3-cassandra-driver
|
||||||
python3-colorama
|
python3-colorama
|
||||||
|
python3-boto3
|
||||||
|
python3-pytest
|
||||||
dnf-utils
|
dnf-utils
|
||||||
pigz
|
pigz
|
||||||
net-tools
|
net-tools
|
||||||
|
|||||||
17
main.cc
17
main.cc
@@ -662,9 +662,17 @@ int main(int ac, char** av) {
|
|||||||
|
|
||||||
supervisor::notify("starting tokens manager");
|
supervisor::notify("starting tokens manager");
|
||||||
token_metadata.start().get();
|
token_metadata.start().get();
|
||||||
auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
|
// storage_proxy holds a reference on it and is not yet stopped.
|
||||||
token_metadata.stop().get();
|
// what's worse is that the calltrace
|
||||||
});
|
// storage_proxy::do_query
|
||||||
|
// ::query_partition_key_range
|
||||||
|
// ::query_partition_key_range_concurrent
|
||||||
|
// leaves unwaited futures on the reactor and once it gets there
|
||||||
|
// the token_metadata instance is accessed and ...
|
||||||
|
//
|
||||||
|
//auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
|
||||||
|
// token_metadata.stop().get();
|
||||||
|
//});
|
||||||
|
|
||||||
supervisor::notify("starting migration manager notifier");
|
supervisor::notify("starting migration manager notifier");
|
||||||
mm_notifier.start().get();
|
mm_notifier.start().get();
|
||||||
@@ -1071,9 +1079,6 @@ int main(int ac, char** av) {
|
|||||||
static sharded<alternator::executor> alternator_executor;
|
static sharded<alternator::executor> alternator_executor;
|
||||||
static sharded<alternator::server> alternator_server;
|
static sharded<alternator::server> alternator_server;
|
||||||
|
|
||||||
if (!cfg->check_experimental(db::experimental_features_t::LWT)) {
|
|
||||||
throw std::runtime_error("Alternator enabled, but needs experimental LWT feature which wasn't enabled");
|
|
||||||
}
|
|
||||||
net::inet_address addr;
|
net::inet_address addr;
|
||||||
try {
|
try {
|
||||||
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();
|
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();
|
||||||
|
|||||||
@@ -452,6 +452,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
|||||||
case messaging_verb::PAXOS_PREPARE:
|
case messaging_verb::PAXOS_PREPARE:
|
||||||
case messaging_verb::PAXOS_ACCEPT:
|
case messaging_verb::PAXOS_ACCEPT:
|
||||||
case messaging_verb::PAXOS_LEARN:
|
case messaging_verb::PAXOS_LEARN:
|
||||||
|
case messaging_verb::PAXOS_PRUNE:
|
||||||
return 0;
|
return 0;
|
||||||
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
|
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
|
||||||
// sent on a different connection to avoid potential deadlocks
|
// sent on a different connection to avoid potential deadlocks
|
||||||
@@ -1179,14 +1180,14 @@ future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repai
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||||
void messaging_service::register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version)>&& func) {
|
void messaging_service::register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func) {
|
||||||
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
|
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
|
||||||
}
|
}
|
||||||
future<> messaging_service::unregister_repair_row_level_start() {
|
future<> messaging_service::unregister_repair_row_level_start() {
|
||||||
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
|
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
|
||||||
}
|
}
|
||||||
future<> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version) {
|
future<> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason) {
|
||||||
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version));
|
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||||
@@ -1281,6 +1282,19 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point
|
|||||||
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
|
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void messaging_service::register_paxos_prune(std::function<future<rpc::no_wait_type>(
|
||||||
|
const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info>)>&& func) {
|
||||||
|
register_handler(this, messaging_verb::PAXOS_PRUNE, std::move(func));
|
||||||
|
}
|
||||||
|
future<> messaging_service::unregister_paxos_prune() {
|
||||||
|
return unregister_handler(netw::messaging_verb::PAXOS_PRUNE);
|
||||||
|
}
|
||||||
|
future<>
|
||||||
|
messaging_service::send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id,
|
||||||
|
const partition_key& key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
|
||||||
|
return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_PRUNE, netw::msg_addr(peer), schema_id, key, ballot, std::move(trace_info));
|
||||||
|
}
|
||||||
|
|
||||||
void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func) {
|
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func) {
|
||||||
register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func));
|
register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func));
|
||||||
|
|||||||
@@ -139,7 +139,8 @@ enum class messaging_verb : int32_t {
|
|||||||
PAXOS_ACCEPT = 40,
|
PAXOS_ACCEPT = 40,
|
||||||
PAXOS_LEARN = 41,
|
PAXOS_LEARN = 41,
|
||||||
HINT_MUTATION = 42,
|
HINT_MUTATION = 42,
|
||||||
LAST = 43,
|
PAXOS_PRUNE = 43,
|
||||||
|
LAST = 44,
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace netw
|
} // namespace netw
|
||||||
@@ -341,9 +342,9 @@ public:
|
|||||||
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);
|
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);
|
||||||
|
|
||||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||||
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version)>&& func);
|
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func);
|
||||||
future<> unregister_repair_row_level_start();
|
future<> unregister_repair_row_level_start();
|
||||||
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version);
|
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason);
|
||||||
|
|
||||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||||
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);
|
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);
|
||||||
@@ -493,6 +494,14 @@ public:
|
|||||||
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
|
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||||
std::optional<tracing::trace_info> trace_info = std::nullopt);
|
std::optional<tracing::trace_info> trace_info = std::nullopt);
|
||||||
|
|
||||||
|
void register_paxos_prune(std::function<future<rpc::no_wait_type>(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key,
|
||||||
|
utils::UUID ballot, std::optional<tracing::trace_info>)>&& func);
|
||||||
|
|
||||||
|
future<> unregister_paxos_prune();
|
||||||
|
|
||||||
|
future<> send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id, const partition_key& key,
|
||||||
|
utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
|
||||||
|
|
||||||
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
|
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
|
||||||
future<> unregister_hint_mutation();
|
future<> unregister_hint_mutation();
|
||||||
|
|||||||
@@ -2605,7 +2605,7 @@ void mutation_cleaner_impl::start_worker() {
|
|||||||
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
|
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
|
||||||
auto&& region = snp.region();
|
auto&& region = snp.region();
|
||||||
return with_allocator(region.allocator(), [&] {
|
return with_allocator(region.allocator(), [&] {
|
||||||
return with_linearized_managed_bytes([&] {
|
{
|
||||||
// Allocating sections require the region to be reclaimable
|
// Allocating sections require the region to be reclaimable
|
||||||
// which means that they cannot be nested.
|
// which means that they cannot be nested.
|
||||||
// It is, however, possible, that if the snapshot is taken
|
// It is, however, possible, that if the snapshot is taken
|
||||||
@@ -2617,13 +2617,15 @@ stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexce
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
return _worker_state->alloc_section(region, [&] {
|
return _worker_state->alloc_section(region, [&] {
|
||||||
|
return with_linearized_managed_bytes([&] {
|
||||||
return snp.merge_partition_versions(_app_stats);
|
return snp.merge_partition_versions(_app_stats);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// Merging failed, give up as there is no guarantee of forward progress.
|
// Merging failed, give up as there is no guarantee of forward progress.
|
||||||
return stop_iteration::yes;
|
return stop_iteration::yes;
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -173,6 +173,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
|
|||||||
return handle_end_of_stream();
|
return handle_end_of_stream();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}).handle_exception([this] (std::exception_ptr ep) {
|
||||||
|
for (auto& q : _queue_reader_handles) {
|
||||||
|
if (q) {
|
||||||
|
q->abort(ep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return make_exception_future<>(std::move(ep));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,7 +12,11 @@
|
|||||||
# At the end of the build we check that the build-id is indeed in the
|
# At the end of the build we check that the build-id is indeed in the
|
||||||
# first page. At install time we check that patchelf doesn't modify
|
# first page. At install time we check that patchelf doesn't modify
|
||||||
# the program headers.
|
# the program headers.
|
||||||
|
|
||||||
|
# gdb has a SO_NAME_MAX_PATH_SIZE of 512, so limit the path size to
|
||||||
|
# that. The 512 includes the null at the end, hence the 511 bellow.
|
||||||
|
|
||||||
ORIGINAL_DYNAMIC_LINKER=$(gcc -### /dev/null -o t 2>&1 | perl -n -e '/-dynamic-linker ([^ ]*) / && print $1')
|
ORIGINAL_DYNAMIC_LINKER=$(gcc -### /dev/null -o t 2>&1 | perl -n -e '/-dynamic-linker ([^ ]*) / && print $1')
|
||||||
DYNAMIC_LINKER=$(printf "%2000s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
DYNAMIC_LINKER=$(printf "%511s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||||
|
|
||||||
echo $DYNAMIC_LINKER
|
echo $DYNAMIC_LINKER
|
||||||
|
|||||||
@@ -672,7 +672,8 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
|||||||
const std::vector<sstring>& cfs_,
|
const std::vector<sstring>& cfs_,
|
||||||
int id_,
|
int id_,
|
||||||
const std::vector<sstring>& data_centers_,
|
const std::vector<sstring>& data_centers_,
|
||||||
const std::vector<sstring>& hosts_)
|
const std::vector<sstring>& hosts_,
|
||||||
|
streaming::stream_reason reason_)
|
||||||
: db(db_)
|
: db(db_)
|
||||||
, partitioner(get_partitioner_for_tables(db_, keyspace_, cfs_))
|
, partitioner(get_partitioner_for_tables(db_, keyspace_, cfs_))
|
||||||
, keyspace(keyspace_)
|
, keyspace(keyspace_)
|
||||||
@@ -682,6 +683,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
|||||||
, shard(engine().cpu_id())
|
, shard(engine().cpu_id())
|
||||||
, data_centers(data_centers_)
|
, data_centers(data_centers_)
|
||||||
, hosts(hosts_)
|
, hosts(hosts_)
|
||||||
|
, reason(reason_)
|
||||||
, _row_level_repair(db.local().features().cluster_supports_row_level_repair()) {
|
, _row_level_repair(db.local().features().cluster_supports_row_level_repair()) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1462,7 +1464,7 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
|
|||||||
data_centers = options.data_centers, hosts = options.hosts] (database& localdb) mutable {
|
data_centers = options.data_centers, hosts = options.hosts] (database& localdb) mutable {
|
||||||
auto ri = make_lw_shared<repair_info>(db,
|
auto ri = make_lw_shared<repair_info>(db,
|
||||||
std::move(keyspace), std::move(ranges), std::move(cfs),
|
std::move(keyspace), std::move(ranges), std::move(cfs),
|
||||||
id, std::move(data_centers), std::move(hosts));
|
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair);
|
||||||
return repair_ranges(ri);
|
return repair_ranges(ri);
|
||||||
});
|
});
|
||||||
repair_results.push_back(std::move(f));
|
repair_results.push_back(std::move(f));
|
||||||
@@ -1524,14 +1526,15 @@ future<> repair_abort_all(seastar::sharded<database>& db) {
|
|||||||
future<> sync_data_using_repair(seastar::sharded<database>& db,
|
future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||||
sstring keyspace,
|
sstring keyspace,
|
||||||
dht::token_range_vector ranges,
|
dht::token_range_vector ranges,
|
||||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors) {
|
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||||
|
streaming::stream_reason reason) {
|
||||||
if (ranges.empty()) {
|
if (ranges.empty()) {
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}
|
}
|
||||||
return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
|
return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||||
int id = repair_tracker().next_repair_command();
|
int id = repair_tracker().next_repair_command();
|
||||||
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
|
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
|
||||||
return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
|
return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||||
auto cfs = list_column_families(db.local(), keyspace);
|
auto cfs = list_column_families(db.local(), keyspace);
|
||||||
if (cfs.empty()) {
|
if (cfs.empty()) {
|
||||||
rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace);
|
rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace);
|
||||||
@@ -1540,12 +1543,12 @@ future<> sync_data_using_repair(seastar::sharded<database>& db,
|
|||||||
std::vector<future<>> repair_results;
|
std::vector<future<>> repair_results;
|
||||||
repair_results.reserve(smp::count);
|
repair_results.reserve(smp::count);
|
||||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||||
auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors] (database& localdb) mutable {
|
auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors, reason] (database& localdb) mutable {
|
||||||
auto data_centers = std::vector<sstring>();
|
auto data_centers = std::vector<sstring>();
|
||||||
auto hosts = std::vector<sstring>();
|
auto hosts = std::vector<sstring>();
|
||||||
auto ri = make_lw_shared<repair_info>(service::get_local_storage_service().db(),
|
auto ri = make_lw_shared<repair_info>(service::get_local_storage_service().db(),
|
||||||
std::move(keyspace), std::move(ranges), std::move(cfs),
|
std::move(keyspace), std::move(ranges), std::move(cfs),
|
||||||
id, std::move(data_centers), std::move(hosts));
|
id, std::move(data_centers), std::move(hosts), reason);
|
||||||
ri->neighbors = std::move(neighbors);
|
ri->neighbors = std::move(neighbors);
|
||||||
return repair_ranges(ri);
|
return repair_ranges(ri);
|
||||||
});
|
});
|
||||||
@@ -1584,6 +1587,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
|
|||||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||||
rlogger.info("bootstrap_with_repair: started with keyspaces={}", keyspaces);
|
rlogger.info("bootstrap_with_repair: started with keyspaces={}", keyspaces);
|
||||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||||
|
auto reason = streaming::stream_reason::bootstrap;
|
||||||
for (auto& keyspace_name : keyspaces) {
|
for (auto& keyspace_name : keyspaces) {
|
||||||
if (!db.local().has_keyspace(keyspace_name)) {
|
if (!db.local().has_keyspace(keyspace_name)) {
|
||||||
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
|
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
|
||||||
@@ -1716,7 +1720,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
auto nr_ranges = desired_ranges.size();
|
auto nr_ranges = desired_ranges.size();
|
||||||
sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources)).get();
|
sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason).get();
|
||||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
||||||
}
|
}
|
||||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
|
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
|
||||||
@@ -1730,6 +1734,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db,
|
|||||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||||
bool is_removenode = myip != leaving_node;
|
bool is_removenode = myip != leaving_node;
|
||||||
auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair";
|
auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair";
|
||||||
|
streaming::stream_reason reason = is_removenode ? streaming::stream_reason::removenode : streaming::stream_reason::decommission;
|
||||||
rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node);
|
rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node);
|
||||||
for (auto& keyspace_name : keyspaces) {
|
for (auto& keyspace_name : keyspaces) {
|
||||||
if (!db.local().has_keyspace(keyspace_name)) {
|
if (!db.local().has_keyspace(keyspace_name)) {
|
||||||
@@ -1867,7 +1872,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db,
|
|||||||
ranges.swap(ranges_for_removenode);
|
ranges.swap(ranges_for_removenode);
|
||||||
}
|
}
|
||||||
auto nr_ranges_synced = ranges.size();
|
auto nr_ranges_synced = ranges.size();
|
||||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
|
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
||||||
}
|
}
|
||||||
@@ -1883,8 +1888,8 @@ future<> removenode_with_repair(seastar::sharded<database>& db, locator::token_m
|
|||||||
return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node));
|
return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node));
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring op, sstring source_dc) {
|
future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring op, sstring source_dc, streaming::stream_reason reason) {
|
||||||
return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op)] () mutable {
|
return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op), reason] () mutable {
|
||||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||||
rlogger.info("{}: started with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
rlogger.info("{}: started with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||||
@@ -1921,7 +1926,7 @@ future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
auto nr_ranges = ranges.size();
|
auto nr_ranges = ranges.size();
|
||||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
|
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
||||||
}
|
}
|
||||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||||
@@ -1933,11 +1938,13 @@ future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_meta
|
|||||||
if (source_dc.empty()) {
|
if (source_dc.empty()) {
|
||||||
source_dc = get_local_dc();
|
source_dc = get_local_dc();
|
||||||
}
|
}
|
||||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
|
auto reason = streaming::stream_reason::rebuild;
|
||||||
|
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm) {
|
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm) {
|
||||||
auto op = sstring("replace_with_repair");
|
auto op = sstring("replace_with_repair");
|
||||||
auto source_dc = get_local_dc();
|
auto source_dc = get_local_dc();
|
||||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
|
auto reason = streaming::stream_reason::bootstrap;
|
||||||
|
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -181,6 +181,7 @@ public:
|
|||||||
shard_id shard;
|
shard_id shard;
|
||||||
std::vector<sstring> data_centers;
|
std::vector<sstring> data_centers;
|
||||||
std::vector<sstring> hosts;
|
std::vector<sstring> hosts;
|
||||||
|
streaming::stream_reason reason;
|
||||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
|
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
|
||||||
size_t nr_failed_ranges = 0;
|
size_t nr_failed_ranges = 0;
|
||||||
bool aborted = false;
|
bool aborted = false;
|
||||||
@@ -211,7 +212,8 @@ public:
|
|||||||
const std::vector<sstring>& cfs_,
|
const std::vector<sstring>& cfs_,
|
||||||
int id_,
|
int id_,
|
||||||
const std::vector<sstring>& data_centers_,
|
const std::vector<sstring>& data_centers_,
|
||||||
const std::vector<sstring>& hosts_);
|
const std::vector<sstring>& hosts_,
|
||||||
|
streaming::stream_reason reason_);
|
||||||
future<> do_streaming();
|
future<> do_streaming();
|
||||||
void check_failed_ranges();
|
void check_failed_ranges();
|
||||||
future<> request_transfer_ranges(const sstring& cf,
|
future<> request_transfer_ranges(const sstring& cf,
|
||||||
|
|||||||
@@ -443,7 +443,7 @@ class repair_writer {
|
|||||||
uint64_t _estimated_partitions;
|
uint64_t _estimated_partitions;
|
||||||
size_t _nr_peer_nodes;
|
size_t _nr_peer_nodes;
|
||||||
// Needs more than one for repair master
|
// Needs more than one for repair master
|
||||||
std::vector<std::optional<future<uint64_t>>> _writer_done;
|
std::vector<std::optional<future<>>> _writer_done;
|
||||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||||
// Current partition written to disk
|
// Current partition written to disk
|
||||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||||
@@ -451,14 +451,18 @@ class repair_writer {
|
|||||||
// partition_start is written and is closed when a partition_end is
|
// partition_start is written and is closed when a partition_end is
|
||||||
// written.
|
// written.
|
||||||
std::vector<bool> _partition_opened;
|
std::vector<bool> _partition_opened;
|
||||||
|
streaming::stream_reason _reason;
|
||||||
|
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||||
public:
|
public:
|
||||||
repair_writer(
|
repair_writer(
|
||||||
schema_ptr schema,
|
schema_ptr schema,
|
||||||
uint64_t estimated_partitions,
|
uint64_t estimated_partitions,
|
||||||
size_t nr_peer_nodes)
|
size_t nr_peer_nodes,
|
||||||
|
streaming::stream_reason reason)
|
||||||
: _schema(std::move(schema))
|
: _schema(std::move(schema))
|
||||||
, _estimated_partitions(estimated_partitions)
|
, _estimated_partitions(estimated_partitions)
|
||||||
, _nr_peer_nodes(nr_peer_nodes) {
|
, _nr_peer_nodes(nr_peer_nodes)
|
||||||
|
, _reason(reason) {
|
||||||
init_writer();
|
init_writer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -495,9 +499,9 @@ public:
|
|||||||
table& t = db.local().find_column_family(_schema->id());
|
table& t = db.local().find_column_family(_schema->id());
|
||||||
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema,
|
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema,
|
||||||
make_generating_reader(_schema, std::move(get_next_mutation_fragment)),
|
make_generating_reader(_schema, std::move(get_next_mutation_fragment)),
|
||||||
[&db, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
|
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
|
||||||
auto& t = db.local().find_column_family(reader.schema());
|
auto& t = db.local().find_column_family(reader.schema());
|
||||||
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, streaming::stream_reason::repair).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
|
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
|
||||||
//FIXME: for better estimations this should be transmitted from remote
|
//FIXME: for better estimations this should be transmitted from remote
|
||||||
auto metadata = mutation_source_metadata{};
|
auto metadata = mutation_source_metadata{};
|
||||||
auto& cs = t->get_compaction_strategy();
|
auto& cs = t->get_compaction_strategy();
|
||||||
@@ -523,7 +527,15 @@ public:
|
|||||||
return consumer(std::move(reader));
|
return consumer(std::move(reader));
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
t.stream_in_progress());
|
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||||
|
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||||
|
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||||
|
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||||
|
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||||
|
_schema->ks_name(), _schema->cf_name(), ep);
|
||||||
|
_mq[node_idx]->abort(ep);
|
||||||
|
return make_exception_future<>(std::move(ep));
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> write_partition_end(unsigned node_idx) {
|
future<> write_partition_end(unsigned node_idx) {
|
||||||
@@ -550,23 +562,41 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<> write_end_of_stream(unsigned node_idx) {
|
||||||
|
if (_mq[node_idx]) {
|
||||||
|
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||||
|
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||||
|
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||||
|
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||||
|
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
return make_ready_future<>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
future<> do_wait_for_writer_done(unsigned node_idx) {
|
||||||
|
if (_writer_done[node_idx]) {
|
||||||
|
return std::move(*(_writer_done[node_idx]));
|
||||||
|
} else {
|
||||||
|
return make_ready_future<>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
future<> wait_for_writer_done() {
|
future<> wait_for_writer_done() {
|
||||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
|
||||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
}).handle_exception([this] (std::exception_ptr ep) {
|
||||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
_schema->ks_name(), _schema->cf_name(), ep);
|
||||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
return make_exception_future<>(std::move(ep));
|
||||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
|
||||||
rlogger.debug("Managed to write partitions={} to sstable", partitions);
|
|
||||||
return make_ready_future<>();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return make_ready_future<>();
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
named_semaphore& sem() {
|
||||||
|
return _sem;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
class repair_meta {
|
class repair_meta {
|
||||||
@@ -590,6 +620,7 @@ private:
|
|||||||
repair_master _repair_master;
|
repair_master _repair_master;
|
||||||
gms::inet_address _myip;
|
gms::inet_address _myip;
|
||||||
uint32_t _repair_meta_id;
|
uint32_t _repair_meta_id;
|
||||||
|
streaming::stream_reason _reason;
|
||||||
// Repair master's sharding configuration
|
// Repair master's sharding configuration
|
||||||
shard_config _master_node_shard_config;
|
shard_config _master_node_shard_config;
|
||||||
// Partitioner of repair master
|
// Partitioner of repair master
|
||||||
@@ -653,6 +684,7 @@ public:
|
|||||||
uint64_t seed,
|
uint64_t seed,
|
||||||
repair_master master,
|
repair_master master,
|
||||||
uint32_t repair_meta_id,
|
uint32_t repair_meta_id,
|
||||||
|
streaming::stream_reason reason,
|
||||||
shard_config master_node_shard_config,
|
shard_config master_node_shard_config,
|
||||||
size_t nr_peer_nodes = 1)
|
size_t nr_peer_nodes = 1)
|
||||||
: _db(db)
|
: _db(db)
|
||||||
@@ -666,6 +698,7 @@ public:
|
|||||||
, _repair_master(master)
|
, _repair_master(master)
|
||||||
, _myip(utils::fb_utilities::get_broadcast_address())
|
, _myip(utils::fb_utilities::get_broadcast_address())
|
||||||
, _repair_meta_id(repair_meta_id)
|
, _repair_meta_id(repair_meta_id)
|
||||||
|
, _reason(reason)
|
||||||
, _master_node_shard_config(std::move(master_node_shard_config))
|
, _master_node_shard_config(std::move(master_node_shard_config))
|
||||||
, _remote_partitioner(make_remote_partitioner())
|
, _remote_partitioner(make_remote_partitioner())
|
||||||
, _same_sharding_config(is_same_sharding_config())
|
, _same_sharding_config(is_same_sharding_config())
|
||||||
@@ -681,7 +714,7 @@ public:
|
|||||||
_seed,
|
_seed,
|
||||||
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
|
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
|
||||||
)
|
)
|
||||||
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes)
|
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes, _reason)
|
||||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||||
[] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
[] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||||
return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
|
return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
|
||||||
@@ -731,7 +764,8 @@ public:
|
|||||||
uint64_t max_row_buf_size,
|
uint64_t max_row_buf_size,
|
||||||
uint64_t seed,
|
uint64_t seed,
|
||||||
shard_config master_node_shard_config,
|
shard_config master_node_shard_config,
|
||||||
table_schema_version schema_version) {
|
table_schema_version schema_version,
|
||||||
|
streaming::stream_reason reason) {
|
||||||
return service::get_schema_for_write(schema_version, {from, src_cpu_id}).then([from,
|
return service::get_schema_for_write(schema_version, {from, src_cpu_id}).then([from,
|
||||||
repair_meta_id,
|
repair_meta_id,
|
||||||
range,
|
range,
|
||||||
@@ -739,7 +773,8 @@ public:
|
|||||||
max_row_buf_size,
|
max_row_buf_size,
|
||||||
seed,
|
seed,
|
||||||
master_node_shard_config,
|
master_node_shard_config,
|
||||||
schema_version] (schema_ptr s) {
|
schema_version,
|
||||||
|
reason] (schema_ptr s) {
|
||||||
auto& db = service::get_local_storage_proxy().get_db();
|
auto& db = service::get_local_storage_proxy().get_db();
|
||||||
auto& cf = db.local().find_column_family(s->id());
|
auto& cf = db.local().find_column_family(s->id());
|
||||||
node_repair_meta_id id{from, repair_meta_id};
|
node_repair_meta_id id{from, repair_meta_id};
|
||||||
@@ -752,6 +787,7 @@ public:
|
|||||||
seed,
|
seed,
|
||||||
repair_meta::repair_master::no,
|
repair_meta::repair_master::no,
|
||||||
repair_meta_id,
|
repair_meta_id,
|
||||||
|
reason,
|
||||||
std::move(master_node_shard_config));
|
std::move(master_node_shard_config));
|
||||||
bool insertion = repair_meta_map().emplace(id, rm).second;
|
bool insertion = repair_meta_map().emplace(id, rm).second;
|
||||||
if (!insertion) {
|
if (!insertion) {
|
||||||
@@ -1166,6 +1202,23 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||||
|
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||||
|
_repair_writer.create_writer(_db, node_idx);
|
||||||
|
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||||
|
if (update_buf) {
|
||||||
|
_working_row_buf_combined_hash.add(r.hash());
|
||||||
|
}
|
||||||
|
// The repair_row here is supposed to have
|
||||||
|
// mutation_fragment attached because we have stored it in
|
||||||
|
// to_repair_rows_list above where the repair_row is created.
|
||||||
|
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||||
|
auto dk_with_hash = r.get_dk_with_hash();
|
||||||
|
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||||
// Must run inside a seastar thread
|
// Must run inside a seastar thread
|
||||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||||
@@ -1191,18 +1244,7 @@ private:
|
|||||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||||
}
|
}
|
||||||
_repair_writer.create_writer(_db, node_idx);
|
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||||
for (auto& r : row_diff) {
|
|
||||||
if (update_buf) {
|
|
||||||
_working_row_buf_combined_hash.add(r.hash());
|
|
||||||
}
|
|
||||||
// The repair_row here is supposed to have
|
|
||||||
// mutation_fragment attached because we have stored it in
|
|
||||||
// to_repair_rows_list above where the repair_row is created.
|
|
||||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
|
||||||
auto dk_with_hash = r.get_dk_with_hash();
|
|
||||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
future<>
|
future<>
|
||||||
@@ -1213,15 +1255,7 @@ private:
|
|||||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||||
unsigned node_idx = 0;
|
unsigned node_idx = 0;
|
||||||
_repair_writer.create_writer(_db, node_idx);
|
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
|
||||||
// The repair_row here is supposed to have
|
|
||||||
// mutation_fragment attached because we have stored it in
|
|
||||||
// to_repair_rows_list above where the repair_row is created.
|
|
||||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
|
||||||
auto dk_with_hash = r.get_dk_with_hash();
|
|
||||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -1412,28 +1446,28 @@ public:
|
|||||||
|
|
||||||
// RPC API
|
// RPC API
|
||||||
future<>
|
future<>
|
||||||
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version) {
|
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version, streaming::stream_reason reason) {
|
||||||
if (remote_node == _myip) {
|
if (remote_node == _myip) {
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}
|
}
|
||||||
stats().rpc_call_nr++;
|
stats().rpc_call_nr++;
|
||||||
return netw::get_local_messaging_service().send_repair_row_level_start(msg_addr(remote_node),
|
return netw::get_local_messaging_service().send_repair_row_level_start(msg_addr(remote_node),
|
||||||
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), _algo, _max_row_buf_size, _seed,
|
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), _algo, _max_row_buf_size, _seed,
|
||||||
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, _master_node_shard_config.partitioner_name, std::move(schema_version));
|
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, _master_node_shard_config.partitioner_name, std::move(schema_version), reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
// RPC handler
|
// RPC handler
|
||||||
static future<>
|
static future<>
|
||||||
repair_row_level_start_handler(gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
|
repair_row_level_start_handler(gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
|
||||||
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
|
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
|
||||||
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version) {
|
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason) {
|
||||||
if (!_sys_dist_ks->local_is_initialized() || !_view_update_generator->local_is_initialized()) {
|
if (!_sys_dist_ks->local_is_initialized() || !_view_update_generator->local_is_initialized()) {
|
||||||
return make_exception_future<>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
|
return make_exception_future<>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
|
||||||
utils::fb_utilities::get_broadcast_address())));
|
utils::fb_utilities::get_broadcast_address())));
|
||||||
}
|
}
|
||||||
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
|
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
|
||||||
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
|
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
|
||||||
return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version));
|
return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
// RPC API
|
// RPC API
|
||||||
@@ -1904,22 +1938,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
|||||||
current_set_diff,
|
current_set_diff,
|
||||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||||
error = true;
|
error = true;
|
||||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||||
return sink.close();
|
|
||||||
}).then([sink] {
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
if (error) {
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
||||||
}
|
|
||||||
return sink.close().then([sink] {
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
}).finally([sink] () mutable {
|
||||||
|
return sink.close().finally([sink] { });
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1945,22 +1974,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
|||||||
current_rows,
|
current_rows,
|
||||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||||
error = true;
|
error = true;
|
||||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
return sink(repair_stream_cmd::error).then([] {
|
||||||
return sink.close();
|
|
||||||
}).then([sink] {
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
if (error) {
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
||||||
}
|
|
||||||
return sink.close().then([sink] {
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
}).finally([sink] () mutable {
|
||||||
|
return sink.close().finally([sink] { });
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1985,22 +2009,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
|||||||
error,
|
error,
|
||||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||||
error = true;
|
error = true;
|
||||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||||
return sink.close();
|
|
||||||
}).then([sink] {
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
if (error) {
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
||||||
}
|
|
||||||
return sink.close().then([sink] {
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
}).finally([sink] () mutable {
|
||||||
|
return sink.close().finally([sink] { });
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2104,15 +2123,16 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
|||||||
});
|
});
|
||||||
ms.register_repair_row_level_start([] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
ms.register_repair_row_level_start([] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
||||||
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
|
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
|
||||||
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version) {
|
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason) {
|
||||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||||
return smp::submit_to(src_cpu_id % smp::count, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
|
return smp::submit_to(src_cpu_id % smp::count, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
|
||||||
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, remote_partitioner_name, schema_version] () mutable {
|
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, remote_partitioner_name, schema_version, reason] () mutable {
|
||||||
|
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
|
||||||
return repair_meta::repair_row_level_start_handler(from, src_cpu_id, repair_meta_id, std::move(ks_name),
|
return repair_meta::repair_row_level_start_handler(from, src_cpu_id, repair_meta_id, std::move(ks_name),
|
||||||
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
|
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
|
||||||
shard_config{remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name)},
|
shard_config{remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name)},
|
||||||
schema_version);
|
schema_version, r);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
ms.register_repair_row_level_stop([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
ms.register_repair_row_level_stop([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||||
@@ -2442,6 +2462,7 @@ public:
|
|||||||
_seed,
|
_seed,
|
||||||
repair_meta::repair_master::yes,
|
repair_meta::repair_master::yes,
|
||||||
repair_meta_id,
|
repair_meta_id,
|
||||||
|
_ri.reason,
|
||||||
std::move(master_node_shard_config),
|
std::move(master_node_shard_config),
|
||||||
_all_live_peer_nodes.size());
|
_all_live_peer_nodes.size());
|
||||||
|
|
||||||
@@ -2456,7 +2477,7 @@ public:
|
|||||||
nodes_to_stop.reserve(_all_nodes.size());
|
nodes_to_stop.reserve(_all_nodes.size());
|
||||||
try {
|
try {
|
||||||
parallel_for_each(_all_nodes, [&, this] (const gms::inet_address& node) {
|
parallel_for_each(_all_nodes, [&, this] (const gms::inet_address& node) {
|
||||||
return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version).then([&] () {
|
return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version, _ri.reason).then([&] () {
|
||||||
nodes_to_stop.push_back(node);
|
nodes_to_stop.push_back(node);
|
||||||
return master.repair_get_estimated_partitions(node).then([this, node] (uint64_t partitions) {
|
return master.repair_get_estimated_partitions(node).then([this, node] (uint64_t partitions) {
|
||||||
rlogger.trace("Get repair_get_estimated_partitions for node={}, estimated_partitions={}", node, partitions);
|
rlogger.trace("Get repair_get_estimated_partitions for node={}, estimated_partitions={}", node, partitions);
|
||||||
|
|||||||
20
row_cache.cc
20
row_cache.cc
@@ -528,8 +528,12 @@ public:
|
|||||||
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
|
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
|
||||||
{
|
{
|
||||||
if (!mfopt) {
|
if (!mfopt) {
|
||||||
this->handle_end_of_stream();
|
return _cache._read_section(_cache._tracker.region(), [&] {
|
||||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
return with_linearized_managed_bytes([&] {
|
||||||
|
this->handle_end_of_stream();
|
||||||
|
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
_cache.on_partition_miss();
|
_cache.on_partition_miss();
|
||||||
const partition_start& ps = mfopt->as_partition_start();
|
const partition_start& ps = mfopt->as_partition_start();
|
||||||
@@ -952,13 +956,15 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
|||||||
// expensive and we need to amortize it somehow.
|
// expensive and we need to amortize it somehow.
|
||||||
do {
|
do {
|
||||||
STAP_PROBE(scylla, row_cache_update_partition_start);
|
STAP_PROBE(scylla, row_cache_update_partition_start);
|
||||||
with_linearized_managed_bytes([&] {
|
{
|
||||||
if (!update) {
|
if (!update) {
|
||||||
_update_section(_tracker.region(), [&] {
|
_update_section(_tracker.region(), [&] {
|
||||||
|
with_linearized_managed_bytes([&] {
|
||||||
memtable_entry& mem_e = *m.partitions.begin();
|
memtable_entry& mem_e = *m.partitions.begin();
|
||||||
size_entry = mem_e.size_in_allocator_without_rows(_tracker.allocator());
|
size_entry = mem_e.size_in_allocator_without_rows(_tracker.allocator());
|
||||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
||||||
update = updater(_update_section, cache_i, mem_e, is_present, real_dirty_acc);
|
update = updater(_update_section, cache_i, mem_e, is_present, real_dirty_acc);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// We use cooperative deferring instead of futures so that
|
// We use cooperative deferring instead of futures so that
|
||||||
@@ -970,14 +976,16 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
|||||||
update = {};
|
update = {};
|
||||||
real_dirty_acc.unpin_memory(size_entry);
|
real_dirty_acc.unpin_memory(size_entry);
|
||||||
_update_section(_tracker.region(), [&] {
|
_update_section(_tracker.region(), [&] {
|
||||||
|
with_linearized_managed_bytes([&] {
|
||||||
auto i = m.partitions.begin();
|
auto i = m.partitions.begin();
|
||||||
memtable_entry& mem_e = *i;
|
memtable_entry& mem_e = *i;
|
||||||
m.partitions.erase(i);
|
m.partitions.erase(i);
|
||||||
mem_e.partition().evict(_tracker.memtable_cleaner());
|
mem_e.partition().evict(_tracker.memtable_cleaner());
|
||||||
current_allocator().destroy(&mem_e);
|
current_allocator().destroy(&mem_e);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
++partition_count;
|
++partition_count;
|
||||||
});
|
}
|
||||||
STAP_PROBE(scylla, row_cache_update_partition_end);
|
STAP_PROBE(scylla, row_cache_update_partition_end);
|
||||||
} while (!m.partitions.empty() && !need_preempt());
|
} while (!m.partitions.empty() && !need_preempt());
|
||||||
with_allocator(standard_allocator(), [&] {
|
with_allocator(standard_allocator(), [&] {
|
||||||
@@ -1124,8 +1132,8 @@ future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&
|
|||||||
seastar::thread::maybe_yield();
|
seastar::thread::maybe_yield();
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
auto done = with_linearized_managed_bytes([&] {
|
auto done = _update_section(_tracker.region(), [&] {
|
||||||
return _update_section(_tracker.region(), [&] {
|
return with_linearized_managed_bytes([&] {
|
||||||
auto cmp = cache_entry::compare(_schema);
|
auto cmp = cache_entry::compare(_schema);
|
||||||
auto it = _partitions.lower_bound(*_prev_snapshot_pos, cmp);
|
auto it = _partitions.lower_bound(*_prev_snapshot_pos, cmp);
|
||||||
auto end = _partitions.lower_bound(dht::ring_position_view::for_range_end(range), cmp);
|
auto end = _partitions.lower_bound(dht::ring_position_view::for_range_end(range), cmp);
|
||||||
|
|||||||
@@ -319,10 +319,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
|
|||||||
+ column_offset(column_kind::regular_column),
|
+ column_offset(column_kind::regular_column),
|
||||||
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
||||||
|
|
||||||
std::sort(_raw._columns.begin(),
|
std::stable_sort(_raw._columns.begin(),
|
||||||
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||||
[] (auto x, auto y) { return x.id < y.id; });
|
[] (auto x, auto y) { return x.id < y.id; });
|
||||||
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||||
_raw._columns.begin() + column_offset(column_kind::static_column),
|
_raw._columns.begin() + column_offset(column_kind::static_column),
|
||||||
[] (auto x, auto y) { return x.id < y.id; });
|
[] (auto x, auto y) { return x.id < y.id; });
|
||||||
|
|
||||||
|
|||||||
@@ -33,9 +33,10 @@ import os
|
|||||||
procs = os.sysconf('SC_NPROCESSORS_ONLN')
|
procs = os.sysconf('SC_NPROCESSORS_ONLN')
|
||||||
mem = os.sysconf('SC_PHYS_PAGES') * os.sysconf('SC_PAGESIZE')
|
mem = os.sysconf('SC_PHYS_PAGES') * os.sysconf('SC_PAGESIZE')
|
||||||
|
|
||||||
|
mem_reserve = 1000000000
|
||||||
job_mem = 4000000000
|
job_mem = 4000000000
|
||||||
|
|
||||||
jobs = min(procs, mem // job_mem)
|
jobs = min(procs, (mem-mem_reserve) // job_mem)
|
||||||
jobs = max(jobs, 1)
|
jobs = max(jobs, 1)
|
||||||
|
|
||||||
print(jobs)
|
print(jobs)
|
||||||
|
|||||||
2
seastar
2
seastar
Submodule seastar updated: 92c488706c...0dc0fec831
@@ -190,4 +190,11 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
|
||||||
|
tracing::trace_state_ptr tr_state) {
|
||||||
|
logger.debug("Delete paxos state for ballot {}", ballot);
|
||||||
|
tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot);
|
||||||
|
return db::system_keyspace::delete_paxos_decision(*schema, key, ballot, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
} // end of namespace "service::paxos"
|
} // end of namespace "service::paxos"
|
||||||
|
|||||||
@@ -124,6 +124,9 @@ public:
|
|||||||
clock_type::time_point timeout);
|
clock_type::time_point timeout);
|
||||||
// Replica RPC endpoint for Paxos "learn".
|
// Replica RPC endpoint for Paxos "learn".
|
||||||
static future<> learn(schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
|
static future<> learn(schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
|
||||||
|
// Replica RPC endpoint for pruning Paxos table
|
||||||
|
static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
|
||||||
|
tracing::trace_state_ptr tr_state);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end of namespace "service::paxos"
|
} // end of namespace "service::paxos"
|
||||||
|
|||||||
@@ -171,6 +171,7 @@ public:
|
|||||||
const schema_ptr& schema() {
|
const schema_ptr& schema() {
|
||||||
return _schema;
|
return _schema;
|
||||||
}
|
}
|
||||||
|
// called only when all replicas replied
|
||||||
virtual void release_mutation() = 0;
|
virtual void release_mutation() = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -300,9 +301,10 @@ public:
|
|||||||
|
|
||||||
class cas_mutation : public mutation_holder {
|
class cas_mutation : public mutation_holder {
|
||||||
lw_shared_ptr<paxos::proposal> _proposal;
|
lw_shared_ptr<paxos::proposal> _proposal;
|
||||||
|
shared_ptr<paxos_response_handler> _handler;
|
||||||
public:
|
public:
|
||||||
explicit cas_mutation(paxos::proposal proposal , schema_ptr s)
|
explicit cas_mutation(paxos::proposal proposal, schema_ptr s, shared_ptr<paxos_response_handler> handler)
|
||||||
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))) {
|
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))), _handler(std::move(handler)) {
|
||||||
_size = _proposal->update.representation().size();
|
_size = _proposal->update.representation().size();
|
||||||
_schema = std::move(s);
|
_schema = std::move(s);
|
||||||
}
|
}
|
||||||
@@ -327,7 +329,11 @@ public:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
virtual void release_mutation() override {
|
virtual void release_mutation() override {
|
||||||
_proposal.release();
|
// The handler will be set for "learn", but not for PAXOS repair
|
||||||
|
// since repair may not include all replicas
|
||||||
|
if (_handler) {
|
||||||
|
_handler->prune(_proposal->ballot);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1184,6 +1190,12 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
|
|||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// debug output in mutate_internal needs this
|
||||||
|
std::ostream& operator<<(std::ostream& os, const paxos_response_handler& h) {
|
||||||
|
os << "paxos_response_handler{" << h.id() << "}";
|
||||||
|
return os;
|
||||||
|
}
|
||||||
|
|
||||||
// This function implements learning stage of Paxos protocol
|
// This function implements learning stage of Paxos protocol
|
||||||
future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool allow_hints) {
|
future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool allow_hints) {
|
||||||
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", decision, _cl_for_learn);
|
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", decision, _cl_for_learn);
|
||||||
@@ -1219,12 +1231,41 @@ future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool a
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Path for the "base" mutations
|
// Path for the "base" mutations
|
||||||
std::array<std::tuple<paxos::proposal, schema_ptr, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, _key.token())};
|
std::array<std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
|
||||||
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
|
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
|
||||||
|
|
||||||
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
|
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void paxos_response_handler::prune(utils::UUID ballot) {
|
||||||
|
if (_has_dead_endpoints) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ( _proxy->get_stats().cas_now_pruning >= pruning_limit) {
|
||||||
|
_proxy->get_stats().cas_coordinator_dropped_prune++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_proxy->get_stats().cas_now_pruning++;
|
||||||
|
_proxy->get_stats().cas_prune++;
|
||||||
|
// running in the background, but the amount of the bg job is limited by pruning_limit
|
||||||
|
// it is waited by holding shared pointer to storage_proxy which guaranties
|
||||||
|
// that storage_proxy::stop() will wait for this to complete
|
||||||
|
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
|
||||||
|
return futurize_apply([&] {
|
||||||
|
if (fbu::is_me(peer)) {
|
||||||
|
tracing::trace(tr_state, "prune: prune {} locally", ballot);
|
||||||
|
return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state);
|
||||||
|
} else {
|
||||||
|
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
|
||||||
|
netw::messaging_service& ms = netw::get_local_messaging_service();
|
||||||
|
return ms.send_paxos_prune(peer, _timeout, _schema->version(), _key.key(), ballot, tracing::make_trace_info(tr_state));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).finally([h = shared_from_this()] {
|
||||||
|
h->_proxy->get_stats().cas_now_pruning--;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
static std::vector<gms::inet_address>
|
static std::vector<gms::inet_address>
|
||||||
replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
|
replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
|
||||||
std::vector<gms::inet_address> endpoints;
|
std::vector<gms::inet_address> endpoints;
|
||||||
@@ -1571,6 +1612,14 @@ void storage_proxy_stats::stats::register_stats() {
|
|||||||
sm::make_histogram("cas_write_contention", sm::description("how many contended writes were encountered"),
|
sm::make_histogram("cas_write_contention", sm::description("how many contended writes were encountered"),
|
||||||
{storage_proxy_stats::current_scheduling_group_label()},
|
{storage_proxy_stats::current_scheduling_group_label()},
|
||||||
[this]{ return cas_write_contention.get_histogram(1, 8);}),
|
[this]{ return cas_write_contention.get_histogram(1, 8);}),
|
||||||
|
|
||||||
|
sm::make_total_operations("cas_prune", cas_prune,
|
||||||
|
sm::description("how many times paxos prune was done after successful cas operation"),
|
||||||
|
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||||
|
|
||||||
|
sm::make_total_operations("cas_dropped_prune", cas_coordinator_dropped_prune,
|
||||||
|
sm::description("how many times a coordinator did not perfom prune after cas"),
|
||||||
|
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||||
});
|
});
|
||||||
|
|
||||||
_metrics.add_group(REPLICA_STATS_CATEGORY, {
|
_metrics.add_group(REPLICA_STATS_CATEGORY, {
|
||||||
@@ -1606,6 +1655,9 @@ void storage_proxy_stats::stats::register_stats() {
|
|||||||
sm::description("number of operations that crossed a shard boundary"),
|
sm::description("number of operations that crossed a shard boundary"),
|
||||||
{storage_proxy_stats::current_scheduling_group_label()}),
|
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||||
|
|
||||||
|
sm::make_total_operations("cas_dropped_prune", cas_replica_dropped_prune,
|
||||||
|
sm::description("how many times a coordinator did not perfom prune after cas"),
|
||||||
|
{storage_proxy_stats::current_scheduling_group_label()}),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1879,11 +1931,11 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
|
|||||||
}
|
}
|
||||||
|
|
||||||
storage_proxy::response_id_type
|
storage_proxy::response_id_type
|
||||||
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& meta,
|
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& meta,
|
||||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
|
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
|
||||||
auto& [commit, s, t] = meta;
|
auto& [commit, s, h, t] = meta;
|
||||||
|
|
||||||
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s), cl,
|
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
|
||||||
db::write_type::CAS, tr_state, std::move(permit));
|
db::write_type::CAS, tr_state, std::move(permit));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1898,7 +1950,7 @@ storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, s
|
|||||||
auto keyspace_name = s->ks_name();
|
auto keyspace_name = s->ks_name();
|
||||||
keyspace& ks = _db.local().find_keyspace(keyspace_name);
|
keyspace& ks = _db.local().find_keyspace(keyspace_name);
|
||||||
|
|
||||||
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s), std::move(endpoints),
|
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
|
||||||
std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
|
std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2146,6 +2198,8 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
|||||||
cl_for_paxos, participants + 1, live_endpoints.size());
|
cl_for_paxos, participants + 1, live_endpoints.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool dead = participants != live_endpoints.size();
|
||||||
|
|
||||||
// Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key.
|
// Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key.
|
||||||
// If the values received from different replicas match, we skip a separate query stage thus saving
|
// If the values received from different replicas match, we skip a separate query stage thus saving
|
||||||
// one network round trip. To generate less traffic, only closest replicas send data, others send
|
// one network round trip. To generate less traffic, only closest replicas send data, others send
|
||||||
@@ -2153,7 +2207,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
|||||||
// list of participants by proximity to this instance.
|
// list of participants by proximity to this instance.
|
||||||
sort_endpoints_by_proximity(live_endpoints);
|
sort_endpoints_by_proximity(live_endpoints);
|
||||||
|
|
||||||
return paxos_participants{std::move(live_endpoints), required_participants};
|
return paxos_participants{std::move(live_endpoints), required_participants, dead};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -3412,7 +3466,9 @@ protected:
|
|||||||
uint32_t original_partition_limit() const {
|
uint32_t original_partition_limit() const {
|
||||||
return _cmd->partition_limit;
|
return _cmd->partition_limit;
|
||||||
}
|
}
|
||||||
|
virtual void adjust_targets_for_reconciliation() {}
|
||||||
void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
|
void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
|
||||||
|
adjust_targets_for_reconciliation();
|
||||||
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
|
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
|
||||||
auto exec = shared_from_this();
|
auto exec = shared_from_this();
|
||||||
|
|
||||||
@@ -3639,6 +3695,9 @@ public:
|
|||||||
virtual void got_cl() override {
|
virtual void got_cl() override {
|
||||||
_speculate_timer.cancel();
|
_speculate_timer.cancel();
|
||||||
}
|
}
|
||||||
|
virtual void adjust_targets_for_reconciliation() override {
|
||||||
|
_targets = used_targets();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
class range_slice_read_executor : public never_speculating_read_executor {
|
class range_slice_read_executor : public never_speculating_read_executor {
|
||||||
@@ -4942,6 +5001,42 @@ void storage_proxy::init_messaging_service() {
|
|||||||
|
|
||||||
return f;
|
return f;
|
||||||
});
|
});
|
||||||
|
ms.register_paxos_prune([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||||
|
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
|
||||||
|
static thread_local uint16_t pruning = 0;
|
||||||
|
static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit
|
||||||
|
auto src_addr = netw::messaging_service::get_source(cinfo);
|
||||||
|
auto src_ip = src_addr.addr;
|
||||||
|
tracing::trace_state_ptr tr_state;
|
||||||
|
if (trace_info) {
|
||||||
|
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
|
||||||
|
tracing::begin(tr_state);
|
||||||
|
tracing::trace(tr_state, "paxos_prune: message received from /{} ballot {}", src_ip, ballot);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pruning >= pruning_limit) {
|
||||||
|
get_stats().cas_replica_dropped_prune++;
|
||||||
|
tracing::trace(tr_state, "paxos_prune: do not prune due to overload", src_ip);
|
||||||
|
return make_ready_future<seastar::rpc::no_wait_type>(netw::messaging_service::no_wait());
|
||||||
|
}
|
||||||
|
|
||||||
|
pruning++;
|
||||||
|
return get_schema_for_read(schema_id, src_addr).then([this, key = std::move(key), ballot,
|
||||||
|
timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
|
||||||
|
dht::token token = dht::get_token(*schema, key);
|
||||||
|
unsigned shard = dht::shard_of(*schema, token);
|
||||||
|
bool local = shard == engine().cpu_id();
|
||||||
|
get_stats().replica_cross_shard_ops += !local;
|
||||||
|
return smp::submit_to(shard, _write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
|
||||||
|
local, key = std::move(key), ballot, timeout, src_ip, d = defer([] { pruning--; })] () {
|
||||||
|
tracing::trace_state_ptr tr_state = gt;
|
||||||
|
return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
|
||||||
|
tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip);
|
||||||
|
return netw::messaging_service::no_wait();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> storage_proxy::uninit_messaging_service() {
|
future<> storage_proxy::uninit_messaging_service() {
|
||||||
@@ -4956,7 +5051,8 @@ future<> storage_proxy::uninit_messaging_service() {
|
|||||||
ms.unregister_truncate(),
|
ms.unregister_truncate(),
|
||||||
ms.unregister_paxos_prepare(),
|
ms.unregister_paxos_prepare(),
|
||||||
ms.unregister_paxos_accept(),
|
ms.unregister_paxos_accept(),
|
||||||
ms.unregister_paxos_learn()
|
ms.unregister_paxos_learn(),
|
||||||
|
ms.unregister_paxos_prune()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -242,6 +242,7 @@ public:
|
|||||||
std::vector<gms::inet_address> endpoints;
|
std::vector<gms::inet_address> endpoints;
|
||||||
// How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL).
|
// How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL).
|
||||||
size_t required_participants;
|
size_t required_participants;
|
||||||
|
bool has_dead_endpoints;
|
||||||
};
|
};
|
||||||
|
|
||||||
const gms::feature_service& features() const { return _features; }
|
const gms::feature_service& features() const { return _features; }
|
||||||
@@ -317,7 +318,7 @@ private:
|
|||||||
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||||
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||||
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& proposal,
|
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
|
||||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||||
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
|
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
|
||||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||||
@@ -634,6 +635,11 @@ private:
|
|||||||
db::consistency_level _cl_for_learn;
|
db::consistency_level _cl_for_learn;
|
||||||
// Live endpoints, as per get_paxos_participants()
|
// Live endpoints, as per get_paxos_participants()
|
||||||
std::vector<gms::inet_address> _live_endpoints;
|
std::vector<gms::inet_address> _live_endpoints;
|
||||||
|
// True if there are dead endpoints
|
||||||
|
// We don't include endpoints known to be unavailable in pending
|
||||||
|
// endpoints list, but need to be aware of them to avoid pruning
|
||||||
|
// system.paxos data if some endpoint is missing a Paxos write.
|
||||||
|
bool _has_dead_endpoints;
|
||||||
// How many endpoints need to respond favourably for the protocol to progress to the next step.
|
// How many endpoints need to respond favourably for the protocol to progress to the next step.
|
||||||
size_t _required_participants;
|
size_t _required_participants;
|
||||||
// A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms
|
// A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms
|
||||||
@@ -651,6 +657,9 @@ private:
|
|||||||
// Unique request id for logging purposes.
|
// Unique request id for logging purposes.
|
||||||
const uint64_t _id = next_id++;
|
const uint64_t _id = next_id++;
|
||||||
|
|
||||||
|
// max pruning operations to run in parralel
|
||||||
|
static constexpr uint16_t pruning_limit = 1000;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
tracing::trace_state_ptr tr_state;
|
tracing::trace_state_ptr tr_state;
|
||||||
|
|
||||||
@@ -674,6 +683,7 @@ public:
|
|||||||
storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(_schema->ks_name(), _key.token(), _cl_for_paxos);
|
storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(_schema->ks_name(), _key.token(), _cl_for_paxos);
|
||||||
_live_endpoints = std::move(pp.endpoints);
|
_live_endpoints = std::move(pp.endpoints);
|
||||||
_required_participants = pp.required_participants;
|
_required_participants = pp.required_participants;
|
||||||
|
_has_dead_endpoints = pp.has_dead_endpoints;
|
||||||
tracing::trace(tr_state, "Create paxos_response_handler for token {} with live: {} and required participants: {}",
|
tracing::trace(tr_state, "Create paxos_response_handler for token {} with live: {} and required participants: {}",
|
||||||
_key.token(), _live_endpoints, _required_participants);
|
_key.token(), _live_endpoints, _required_participants);
|
||||||
}
|
}
|
||||||
@@ -691,6 +701,7 @@ public:
|
|||||||
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
|
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
|
||||||
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
|
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
|
||||||
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
|
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
|
||||||
|
void prune(utils::UUID ballot);
|
||||||
uint64_t id() const {
|
uint64_t id() const {
|
||||||
return _id;
|
return _id;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -116,6 +116,11 @@ struct write_stats {
|
|||||||
uint64_t cas_write_condition_not_met = 0;
|
uint64_t cas_write_condition_not_met = 0;
|
||||||
uint64_t cas_write_timeout_due_to_uncertainty = 0;
|
uint64_t cas_write_timeout_due_to_uncertainty = 0;
|
||||||
uint64_t cas_failed_read_round_optimization = 0;
|
uint64_t cas_failed_read_round_optimization = 0;
|
||||||
|
uint16_t cas_now_pruning = 0;
|
||||||
|
uint64_t cas_prune = 0;
|
||||||
|
uint64_t cas_coordinator_dropped_prune = 0;
|
||||||
|
uint64_t cas_replica_dropped_prune = 0;
|
||||||
|
|
||||||
|
|
||||||
std::chrono::microseconds last_mv_flow_control_delay; // delay added for MV flow control in the last request
|
std::chrono::microseconds last_mv_flow_control_delay; // delay added for MV flow control in the last request
|
||||||
public:
|
public:
|
||||||
|
|||||||
@@ -1007,12 +1007,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
|||||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||||
const std::vector<token>& sorted_tokens) const {
|
const std::vector<token>& sorted_tokens) const {
|
||||||
|
sstring ks = keyspace;
|
||||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||||
// non-system keyspace.
|
// non-system keyspace.
|
||||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
if (keyspace == "") {
|
||||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||||
|
if (keyspaces.empty()) {
|
||||||
|
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||||
|
}
|
||||||
|
ks = keyspaces[0];
|
||||||
}
|
}
|
||||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
|
||||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3409,10 +3413,13 @@ void feature_enabled_listener::on_enabled() {
|
|||||||
|
|
||||||
future<> read_sstables_format(distributed<storage_service>& ss) {
|
future<> read_sstables_format(distributed<storage_service>& ss) {
|
||||||
return db::system_keyspace::get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME).then([&ss] (std::optional<sstring> format_opt) {
|
return db::system_keyspace::get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME).then([&ss] (std::optional<sstring> format_opt) {
|
||||||
sstables::sstable_version_types format = sstables::from_string(format_opt.value_or("ka"));
|
if (format_opt) {
|
||||||
return ss.invoke_on_all([format] (storage_service& s) {
|
sstables::sstable_version_types format = sstables::from_string(*format_opt);
|
||||||
s._sstables_format = format;
|
return ss.invoke_on_all([format] (storage_service& s) {
|
||||||
});
|
s._sstables_format = format;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return make_ready_future<>();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -312,7 +312,13 @@ private:
|
|||||||
*/
|
*/
|
||||||
std::optional<db_clock::time_point> _cdc_streams_ts;
|
std::optional<db_clock::time_point> _cdc_streams_ts;
|
||||||
|
|
||||||
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka;
|
// _sstables_format is the format used for writing new sstables.
|
||||||
|
// Here we set its default value, but if we discover that all the nodes
|
||||||
|
// in the cluster support a newer format, _sstables_format will be set to
|
||||||
|
// that format. read_sstables_format() also overwrites _sstables_format
|
||||||
|
// if an sstable format was chosen earlier (and this choice was persisted
|
||||||
|
// in the system table).
|
||||||
|
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::la;
|
||||||
seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}};
|
seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}};
|
||||||
feature_enabled_listener _la_feature_listener;
|
feature_enabled_listener _la_feature_listener;
|
||||||
feature_enabled_listener _mc_feature_listener;
|
feature_enabled_listener _mc_feature_listener;
|
||||||
|
|||||||
@@ -72,47 +72,8 @@ private:
|
|||||||
static std::vector<column_info> build(
|
static std::vector<column_info> build(
|
||||||
const schema& s,
|
const schema& s,
|
||||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||||
bool is_static) {
|
const sstable_enabled_features& features,
|
||||||
std::vector<column_info> cols;
|
bool is_static);
|
||||||
if (s.is_dense()) {
|
|
||||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
|
||||||
cols.push_back(column_info{
|
|
||||||
&col.name(),
|
|
||||||
col.type,
|
|
||||||
col.id,
|
|
||||||
col.type->value_length_if_fixed(),
|
|
||||||
col.is_multi_cell(),
|
|
||||||
col.is_counter(),
|
|
||||||
false
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
cols.reserve(src.size());
|
|
||||||
for (auto&& desc : src) {
|
|
||||||
const bytes& type_name = desc.type_name.value;
|
|
||||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
|
||||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
|
||||||
std::optional<column_id> id;
|
|
||||||
bool schema_mismatch = false;
|
|
||||||
if (def) {
|
|
||||||
id = def->id;
|
|
||||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
|
||||||
def->is_counter() != type->is_counter() ||
|
|
||||||
!def->type->is_value_compatible_with(*type);
|
|
||||||
}
|
|
||||||
cols.push_back(column_info{
|
|
||||||
&desc.name.value,
|
|
||||||
type,
|
|
||||||
id,
|
|
||||||
type->value_length_if_fixed(),
|
|
||||||
type->is_multi_cell(),
|
|
||||||
type->is_counter(),
|
|
||||||
schema_mismatch
|
|
||||||
});
|
|
||||||
}
|
|
||||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
|
||||||
}
|
|
||||||
return cols;
|
|
||||||
}
|
|
||||||
|
|
||||||
utils::UUID schema_uuid;
|
utils::UUID schema_uuid;
|
||||||
std::vector<column_info> regular_schema_columns_from_sstable;
|
std::vector<column_info> regular_schema_columns_from_sstable;
|
||||||
@@ -125,10 +86,10 @@ private:
|
|||||||
state(state&&) = default;
|
state(state&&) = default;
|
||||||
state& operator=(state&&) = default;
|
state& operator=(state&&) = default;
|
||||||
|
|
||||||
state(const schema& s, const serialization_header& header)
|
state(const schema& s, const serialization_header& header, const sstable_enabled_features& features)
|
||||||
: schema_uuid(s.version())
|
: schema_uuid(s.version())
|
||||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, false))
|
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, features, false))
|
||||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, true))
|
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, features, true))
|
||||||
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
|
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
|
||||||
{}
|
{}
|
||||||
};
|
};
|
||||||
@@ -136,9 +97,10 @@ private:
|
|||||||
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
|
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
column_translation get_for_schema(const schema& s, const serialization_header& header) {
|
column_translation get_for_schema(
|
||||||
|
const schema& s, const serialization_header& header, const sstable_enabled_features& features) {
|
||||||
if (s.version() != _state->schema_uuid) {
|
if (s.version() != _state->schema_uuid) {
|
||||||
_state = make_lw_shared(state(s, header));
|
_state = make_lw_shared(state(s, header, features));
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ private:
|
|||||||
} _state = state::START;
|
} _state = state::START;
|
||||||
|
|
||||||
temporary_buffer<char> _key;
|
temporary_buffer<char> _key;
|
||||||
uint32_t _promoted_index_end;
|
uint64_t _promoted_index_end;
|
||||||
uint64_t _position;
|
uint64_t _position;
|
||||||
uint64_t _partition_header_length = 0;
|
uint64_t _partition_header_length = 0;
|
||||||
std::optional<deletion_time> _deletion_time;
|
std::optional<deletion_time> _deletion_time;
|
||||||
|
|||||||
@@ -38,6 +38,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "mp_row_consumer.hh"
|
#include "mp_row_consumer.hh"
|
||||||
|
#include "column_translation.hh"
|
||||||
|
#include "concrete_types.hh"
|
||||||
|
|
||||||
namespace sstables {
|
namespace sstables {
|
||||||
|
|
||||||
@@ -79,4 +81,86 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
|
|||||||
return ccb.build(timestamp);
|
return ccb.build(timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// See #6130.
|
||||||
|
static data_type freeze_types_in_collections(data_type t) {
|
||||||
|
return ::visit(*t, make_visitor(
|
||||||
|
[] (const map_type_impl& typ) -> data_type {
|
||||||
|
return map_type_impl::get_instance(
|
||||||
|
freeze_types_in_collections(typ.get_keys_type()->freeze()),
|
||||||
|
freeze_types_in_collections(typ.get_values_type()->freeze()),
|
||||||
|
typ.is_multi_cell());
|
||||||
|
},
|
||||||
|
[] (const set_type_impl& typ) -> data_type {
|
||||||
|
return set_type_impl::get_instance(
|
||||||
|
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||||
|
typ.is_multi_cell());
|
||||||
|
},
|
||||||
|
[] (const list_type_impl& typ) -> data_type {
|
||||||
|
return list_type_impl::get_instance(
|
||||||
|
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||||
|
typ.is_multi_cell());
|
||||||
|
},
|
||||||
|
[&] (const abstract_type& typ) -> data_type {
|
||||||
|
return std::move(t);
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If this function returns false, the caller cannot assume that the SSTable comes from Scylla.
|
||||||
|
* It might, if for some reason a table was created using Scylla that didn't contain any feature bit,
|
||||||
|
* but that should never happen. */
|
||||||
|
static bool is_certainly_scylla_sstable(const sstable_enabled_features& features) {
|
||||||
|
return features.enabled_features;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<column_translation::column_info> column_translation::state::build(
|
||||||
|
const schema& s,
|
||||||
|
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||||
|
const sstable_enabled_features& features,
|
||||||
|
bool is_static) {
|
||||||
|
std::vector<column_info> cols;
|
||||||
|
if (s.is_dense()) {
|
||||||
|
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||||
|
cols.push_back(column_info{
|
||||||
|
&col.name(),
|
||||||
|
col.type,
|
||||||
|
col.id,
|
||||||
|
col.type->value_length_if_fixed(),
|
||||||
|
col.is_multi_cell(),
|
||||||
|
col.is_counter(),
|
||||||
|
false
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
cols.reserve(src.size());
|
||||||
|
for (auto&& desc : src) {
|
||||||
|
const bytes& type_name = desc.type_name.value;
|
||||||
|
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||||
|
if (!features.is_enabled(CorrectUDTsInCollections) && is_certainly_scylla_sstable(features)) {
|
||||||
|
// See #6130.
|
||||||
|
type = freeze_types_in_collections(std::move(type));
|
||||||
|
}
|
||||||
|
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||||
|
std::optional<column_id> id;
|
||||||
|
bool schema_mismatch = false;
|
||||||
|
if (def) {
|
||||||
|
id = def->id;
|
||||||
|
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||||
|
def->is_counter() != type->is_counter() ||
|
||||||
|
!def->type->is_value_compatible_with(*type);
|
||||||
|
}
|
||||||
|
cols.push_back(column_info{
|
||||||
|
&desc.name.value,
|
||||||
|
type,
|
||||||
|
id,
|
||||||
|
type->value_length_if_fixed(),
|
||||||
|
type->is_multi_cell(),
|
||||||
|
type->is_counter(),
|
||||||
|
schema_mismatch
|
||||||
|
});
|
||||||
|
}
|
||||||
|
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||||
|
}
|
||||||
|
return cols;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1348,7 +1348,7 @@ public:
|
|||||||
, _consumer(consumer)
|
, _consumer(consumer)
|
||||||
, _sst(sst)
|
, _sst(sst)
|
||||||
, _header(sst->get_serialization_header())
|
, _header(sst->get_serialization_header())
|
||||||
, _column_translation(sst->get_column_translation(s, _header))
|
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
|
||||||
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
|
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
|
||||||
{
|
{
|
||||||
setup_columns(_regular_row, _column_translation.regular_columns());
|
setup_columns(_regular_row, _column_translation.regular_columns());
|
||||||
|
|||||||
@@ -792,8 +792,9 @@ public:
|
|||||||
const serialization_header& get_serialization_header() const {
|
const serialization_header& get_serialization_header() const {
|
||||||
return get_mutable_serialization_header(*_components);
|
return get_mutable_serialization_header(*_components);
|
||||||
}
|
}
|
||||||
column_translation get_column_translation(const schema& s, const serialization_header& h) {
|
column_translation get_column_translation(
|
||||||
return _column_translation.get_for_schema(s, h);
|
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
|
||||||
|
return _column_translation.get_for_schema(s, h, f);
|
||||||
}
|
}
|
||||||
const std::vector<unsigned>& get_shards_for_this_sstable() const {
|
const std::vector<unsigned>& get_shards_for_this_sstable() const {
|
||||||
return _shards;
|
return _shards;
|
||||||
|
|||||||
@@ -459,7 +459,8 @@ enum sstable_feature : uint8_t {
|
|||||||
ShadowableTombstones = 2, // See #3885
|
ShadowableTombstones = 2, // See #3885
|
||||||
CorrectStaticCompact = 3, // See #4139
|
CorrectStaticCompact = 3, // See #4139
|
||||||
CorrectEmptyCounters = 4, // See #4363
|
CorrectEmptyCounters = 4, // See #4363
|
||||||
End = 5,
|
CorrectUDTsInCollections = 5, // See #6130
|
||||||
|
End = 6,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Scylla-specific features enabled for a particular sstable.
|
// Scylla-specific features enabled for a particular sstable.
|
||||||
|
|||||||
38
test.py
38
test.py
@@ -203,6 +203,17 @@ class CqlTestSuite(TestSuite):
|
|||||||
def pattern(self):
|
def pattern(self):
|
||||||
return "*_test.cql"
|
return "*_test.cql"
|
||||||
|
|
||||||
|
class RunTestSuite(TestSuite):
|
||||||
|
"""TestSuite for test directory with a 'run' script """
|
||||||
|
|
||||||
|
def add_test(self, shortname, mode, options):
|
||||||
|
test = RunTest(self.next_id, shortname, self, mode, options)
|
||||||
|
self.tests.append(test)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pattern(self):
|
||||||
|
return "run"
|
||||||
|
|
||||||
|
|
||||||
class Test:
|
class Test:
|
||||||
"""Base class for CQL, Unit and Boost tests"""
|
"""Base class for CQL, Unit and Boost tests"""
|
||||||
@@ -332,6 +343,25 @@ class CqlTest(Test):
|
|||||||
if self.is_equal_result is False:
|
if self.is_equal_result is False:
|
||||||
print_unidiff(self.result, self.reject)
|
print_unidiff(self.result, self.reject)
|
||||||
|
|
||||||
|
class RunTest(Test):
|
||||||
|
"""Run tests in a directory started by a run script"""
|
||||||
|
|
||||||
|
def __init__(self, test_no, shortname, suite, mode, options):
|
||||||
|
super().__init__(test_no, shortname, suite, mode, options)
|
||||||
|
self.path = os.path.join(suite.path, shortname)
|
||||||
|
self.xmlout = os.path.join(options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml")
|
||||||
|
self.args = ["--junit-xml={}".format(self.xmlout)]
|
||||||
|
self.env = { 'SCYLLA': os.path.join("build", self.mode, "scylla") }
|
||||||
|
|
||||||
|
def print_summary(self):
|
||||||
|
print("Output of {} {}:".format(self.path, " ".join(self.args)))
|
||||||
|
print(read_log(self.log_filename))
|
||||||
|
|
||||||
|
async def run(self, options):
|
||||||
|
# This test can and should be killed gently, with SIGTERM, not with SIGKILL
|
||||||
|
self.success = await run_test(self, options, gentle_kill=True, env=self.env)
|
||||||
|
logging.info("Test #%d %s", self.id, "succeeded" if self.success else "failed ")
|
||||||
|
return self
|
||||||
|
|
||||||
class TabularConsoleOutput:
|
class TabularConsoleOutput:
|
||||||
"""Print test progress to the console"""
|
"""Print test progress to the console"""
|
||||||
@@ -375,7 +405,7 @@ class TabularConsoleOutput:
|
|||||||
print(msg)
|
print(msg)
|
||||||
|
|
||||||
|
|
||||||
async def run_test(test, options):
|
async def run_test(test, options, gentle_kill=False, env=dict()):
|
||||||
"""Run test program, return True if success else False"""
|
"""Run test program, return True if success else False"""
|
||||||
|
|
||||||
with open(test.log_filename, "wb") as log:
|
with open(test.log_filename, "wb") as log:
|
||||||
@@ -407,6 +437,7 @@ async def run_test(test, options):
|
|||||||
env=dict(os.environ,
|
env=dict(os.environ,
|
||||||
UBSAN_OPTIONS=":".join(filter(None, UBSAN_OPTIONS)),
|
UBSAN_OPTIONS=":".join(filter(None, UBSAN_OPTIONS)),
|
||||||
ASAN_OPTIONS=":".join(filter(None, ASAN_OPTIONS)),
|
ASAN_OPTIONS=":".join(filter(None, ASAN_OPTIONS)),
|
||||||
|
**env,
|
||||||
),
|
),
|
||||||
preexec_fn=os.setsid,
|
preexec_fn=os.setsid,
|
||||||
)
|
)
|
||||||
@@ -423,7 +454,10 @@ async def run_test(test, options):
|
|||||||
return True
|
return True
|
||||||
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
|
||||||
if process is not None:
|
if process is not None:
|
||||||
process.kill()
|
if gentle_kill:
|
||||||
|
process.terminate()
|
||||||
|
else:
|
||||||
|
process.kill()
|
||||||
stdout, _ = await process.communicate()
|
stdout, _ = await process.communicate()
|
||||||
if isinstance(e, asyncio.TimeoutError):
|
if isinstance(e, asyncio.TimeoutError):
|
||||||
report_error("Test timed out")
|
report_error("Test timed out")
|
||||||
|
|||||||
@@ -54,6 +54,8 @@ def pytest_addoption(parser):
|
|||||||
parser.addoption("--https", action="store_true",
|
parser.addoption("--https", action="store_true",
|
||||||
help="communicate via HTTPS protocol on port 8043 instead of HTTP when"
|
help="communicate via HTTPS protocol on port 8043 instead of HTTP when"
|
||||||
" running against a local Scylla installation")
|
" running against a local Scylla installation")
|
||||||
|
parser.addoption("--url", action="store",
|
||||||
|
help="communicate with given URL instead of defaults")
|
||||||
|
|
||||||
# "dynamodb" fixture: set up client object for communicating with the DynamoDB
|
# "dynamodb" fixture: set up client object for communicating with the DynamoDB
|
||||||
# API. Currently this chooses either Amazon's DynamoDB in the default region
|
# API. Currently this chooses either Amazon's DynamoDB in the default region
|
||||||
@@ -70,7 +72,10 @@ def dynamodb(request):
|
|||||||
# requires us to specify dummy region and credential parameters,
|
# requires us to specify dummy region and credential parameters,
|
||||||
# otherwise the user is forced to properly configure ~/.aws even
|
# otherwise the user is forced to properly configure ~/.aws even
|
||||||
# for local runs.
|
# for local runs.
|
||||||
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
|
if request.config.getoption('url') != None:
|
||||||
|
local_url = request.config.getoption('url')
|
||||||
|
else:
|
||||||
|
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
|
||||||
# Disable verifying in order to be able to use self-signed TLS certificates
|
# Disable verifying in order to be able to use self-signed TLS certificates
|
||||||
verify = not request.config.getoption('https')
|
verify = not request.config.getoption('https')
|
||||||
return boto3.resource('dynamodb', endpoint_url=local_url, verify=verify,
|
return boto3.resource('dynamodb', endpoint_url=local_url, verify=verify,
|
||||||
@@ -4,24 +4,30 @@
|
|||||||
set -e
|
set -e
|
||||||
|
|
||||||
script_path=$(dirname $(readlink -e $0))
|
script_path=$(dirname $(readlink -e $0))
|
||||||
|
source_path=$script_path/../..
|
||||||
|
|
||||||
# By default, we take the latest build/*/scylla as the executable:
|
# By default, we take the latest build/*/scylla as the executable:
|
||||||
SCYLLA=${SCYLLA-$(ls -t "$script_path/../build/"*"/scylla" | head -1)}
|
SCYLLA=${SCYLLA-$(ls -t "$source_path/build/"*"/scylla" | head -1)}
|
||||||
SCYLLA=$(readlink -f "$SCYLLA")
|
SCYLLA=$(readlink -f "$SCYLLA")
|
||||||
SCYLLA_IP=${IP-127.0.0.1}
|
|
||||||
CPUSET=${CPUSET-0}
|
|
||||||
CQLSH=${CQLSH-cqlsh}
|
|
||||||
|
|
||||||
# We need to use cqlsh to set up the authentication credentials expected by
|
# Below, we need to use python3 and the Cassandra drive to set up the
|
||||||
# some of the tests that check check authentication. If cqlsh is not installed
|
# authentication credentials expected by some of the tests that check
|
||||||
# there isn't much point of even starting Scylla
|
# authentication. If they are not installed there isn't much point of
|
||||||
if ! type "$CQLSH" >/dev/null 2>&1
|
# even starting Scylla
|
||||||
|
if ! python3 -c 'from cassandra.cluster import Cluster' >/dev/null 2>&1
|
||||||
then
|
then
|
||||||
echo "Error: cannot find '$CQLSH', needed for configuring Alternator authentication." >&2
|
echo "Error: python3 and python3-cassandra-driver must be installed to configure Alternator authentication." >&2
|
||||||
echo "Please install $CQLSH in your path, or set CQLSH to its location." >&2
|
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# Pick a loopback IP address for Scylla to run, in an attempt not to collide
|
||||||
|
# other concurrent runs of Scylla. CCM uses 127.0.0.<nodenum>, so if we use
|
||||||
|
# 127.1.*.* which cannot collide with it. Moreover, we'll take the last two
|
||||||
|
# bytes of the address from the current process - so as to allow multiple
|
||||||
|
# concurrent runs of this code to use a different address.
|
||||||
|
SCYLLA_IP=127.1.$(($$ >> 8 & 255)).$(($$ & 255))
|
||||||
|
echo "Running Scylla on $SCYLLA_IP"
|
||||||
|
|
||||||
tmp_dir=/tmp/alternator-test-$$
|
tmp_dir=/tmp/alternator-test-$$
|
||||||
mkdir $tmp_dir
|
mkdir $tmp_dir
|
||||||
|
|
||||||
@@ -52,6 +58,7 @@ trap 'cleanup' EXIT
|
|||||||
# to work. We only need to do this if the "--https" option was explicitly
|
# to work. We only need to do this if the "--https" option was explicitly
|
||||||
# passed - otherwise the test would not use HTTPS anyway.
|
# passed - otherwise the test would not use HTTPS anyway.
|
||||||
alternator_port_option="--alternator-port=8000"
|
alternator_port_option="--alternator-port=8000"
|
||||||
|
alternator_url="http://$SCYLLA_IP:8000"
|
||||||
for i
|
for i
|
||||||
do
|
do
|
||||||
if [ "$i" = --https ]
|
if [ "$i" = --https ]
|
||||||
@@ -59,53 +66,61 @@ do
|
|||||||
openssl genrsa 2048 > "$tmp_dir/scylla.key"
|
openssl genrsa 2048 > "$tmp_dir/scylla.key"
|
||||||
openssl req -new -x509 -nodes -sha256 -days 365 -subj "/C=IL/ST=None/L=None/O=None/OU=None/CN=example.com" -key "$tmp_dir/scylla.key" -out "$tmp_dir/scylla.crt"
|
openssl req -new -x509 -nodes -sha256 -days 365 -subj "/C=IL/ST=None/L=None/O=None/OU=None/CN=example.com" -key "$tmp_dir/scylla.key" -out "$tmp_dir/scylla.crt"
|
||||||
alternator_port_option="--alternator-https-port=8043"
|
alternator_port_option="--alternator-https-port=8043"
|
||||||
|
alternator_url="https://$SCYLLA_IP:8043"
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
"$SCYLLA" --options-file "$script_path/../conf/scylla.yaml" \
|
"$SCYLLA" --options-file "$source_path/conf/scylla.yaml" \
|
||||||
--alternator-address $SCYLLA_IP \
|
--alternator-address $SCYLLA_IP \
|
||||||
$alternator_port_option \
|
$alternator_port_option \
|
||||||
--alternator-enforce-authorization=1 \
|
--alternator-enforce-authorization=1 \
|
||||||
--experimental=on --developer-mode=1 \
|
--developer-mode=1 \
|
||||||
--ring-delay-ms 0 --collectd 0 \
|
--ring-delay-ms 0 --collectd 0 \
|
||||||
--cpuset "$CPUSET" -m 1G \
|
--smp 2 -m 1G \
|
||||||
--api-address $SCYLLA_IP --rpc-address $SCYLLA_IP \
|
--overprovisioned --unsafe-bypass-fsync 1 \
|
||||||
|
--api-address $SCYLLA_IP \
|
||||||
|
--rpc-address $SCYLLA_IP \
|
||||||
--listen-address $SCYLLA_IP \
|
--listen-address $SCYLLA_IP \
|
||||||
|
--prometheus-address $SCYLLA_IP \
|
||||||
--seed-provider-parameters seeds=$SCYLLA_IP \
|
--seed-provider-parameters seeds=$SCYLLA_IP \
|
||||||
--workdir "$tmp_dir" \
|
--workdir "$tmp_dir" \
|
||||||
--server-encryption-options keyfile="$tmp_dir/scylla.key" \
|
--server-encryption-options keyfile="$tmp_dir/scylla.key" \
|
||||||
--server-encryption-options certificate="$tmp_dir/scylla.crt" \
|
--server-encryption-options certificate="$tmp_dir/scylla.crt" \
|
||||||
--auto-snapshot 0 \
|
--auto-snapshot 0 \
|
||||||
|
--skip-wait-for-gossip-to-settle 0 \
|
||||||
>"$tmp_dir/log" 2>&1 &
|
>"$tmp_dir/log" 2>&1 &
|
||||||
SCYLLA_PROCESS=$!
|
SCYLLA_PROCESS=$!
|
||||||
|
|
||||||
# Set up the the proper authentication credentials needed by the Alternator
|
# Set up the the proper authentication credentials needed by the Alternator
|
||||||
# test. This requires connecting to Scylla with cqlsh - we'll wait up for
|
# test. This requires connecting to Scylla with CQL - we'll wait up for
|
||||||
# one minute for this to work:
|
# one minute for this to work:
|
||||||
|
setup_authentication() {
|
||||||
|
python3 -c 'from cassandra.cluster import Cluster; Cluster(["'$SCYLLA_IP'"]).connect().execute("INSERT INTO system_auth.roles (role, salted_hash) VALUES ('\''alternator'\'', '\''secret_pass'\'')")'
|
||||||
|
}
|
||||||
echo "Scylla is: $SCYLLA."
|
echo "Scylla is: $SCYLLA."
|
||||||
echo -n "Booting Scylla..."
|
echo -n "Booting Scylla..."
|
||||||
ok=
|
ok=
|
||||||
SECONDS=0
|
SECONDS=0
|
||||||
while ((SECONDS < 100))
|
while ((SECONDS < 200))
|
||||||
do
|
do
|
||||||
sleep 2
|
sleep 1
|
||||||
echo -n .
|
echo -n .
|
||||||
if ! kill -0 $SCYLLA_PROCESS 2>/dev/null
|
if ! kill -0 $SCYLLA_PROCESS 2>/dev/null
|
||||||
then
|
then
|
||||||
summary="Error: Scylla failed to boot after $SECONDS seconds."
|
summary="Error: Scylla failed to boot after $SECONDS seconds."
|
||||||
break
|
break
|
||||||
fi
|
fi
|
||||||
err=`"$CQLSH" -e "INSERT INTO system_auth.roles (role, salted_hash) VALUES ('alternator', 'secret_pass')" 2>&1` && ok=yes && break
|
err=`setup_authentication 2>&1` && ok=yes && break
|
||||||
case "$err" in
|
case "$err" in
|
||||||
"Connection error:"*)
|
*NoHostAvailable:*)
|
||||||
# This is what we expect while Scylla is still booting.
|
# This is what we expect while Scylla is still booting.
|
||||||
;;
|
;;
|
||||||
*"command not found")
|
*ImportError:*|*"command not found"*)
|
||||||
summary="Error: need 'cqlsh' in your path, to configure Alternator authentication."
|
summary="Error: need python3 and python3-cassandra-driver to configure Alternator authentication."
|
||||||
echo
|
echo
|
||||||
echo $summary
|
echo $summary
|
||||||
break;;
|
break;;
|
||||||
*)
|
*)
|
||||||
summary="Unknown cqlsh error, can't set authentication credentials: '$err'"
|
summary="Unknown error trying to set authentication credentials: '$err'"
|
||||||
echo
|
echo
|
||||||
echo $summary
|
echo $summary
|
||||||
break;;
|
break;;
|
||||||
@@ -125,7 +140,8 @@ else
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
cd "$script_path"
|
cd "$script_path"
|
||||||
pytest "$@"
|
set +e
|
||||||
|
pytest --url $alternator_url "$@"
|
||||||
code=$?
|
code=$?
|
||||||
case $code in
|
case $code in
|
||||||
0) summary="Alternator tests pass";;
|
0) summary="Alternator tests pass";;
|
||||||
1
test/alternator/suite.yaml
Normal file
1
test/alternator/suite.yaml
Normal file
@@ -0,0 +1 @@
|
|||||||
|
type: Run
|
||||||
@@ -305,3 +305,16 @@ def test_batch_get_item_projection_expression(test_table):
|
|||||||
got_items = reply['Responses'][test_table.name]
|
got_items = reply['Responses'][test_table.name]
|
||||||
expected_items = [{k: item[k] for k in wanted if k in item} for item in items]
|
expected_items = [{k: item[k] for k in wanted if k in item} for item in items]
|
||||||
assert multiset(got_items) == multiset(expected_items)
|
assert multiset(got_items) == multiset(expected_items)
|
||||||
|
|
||||||
|
# Test that we return the required UnprocessedKeys/UnprocessedItems parameters
|
||||||
|
def test_batch_unprocessed(test_table_s):
|
||||||
|
p = random_string()
|
||||||
|
write_reply = test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||||
|
test_table_s.name: [{'PutRequest': {'Item': {'p': p, 'a': 'hi'}}}],
|
||||||
|
})
|
||||||
|
assert 'UnprocessedItems' in write_reply and write_reply['UnprocessedItems'] == dict()
|
||||||
|
|
||||||
|
read_reply = test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||||
|
test_table_s.name: {'Keys': [{'p': p}], 'ProjectionExpression': 'p, a', 'ConsistentRead': True}
|
||||||
|
})
|
||||||
|
assert 'UnprocessedKeys' in read_reply and read_reply['UnprocessedKeys'] == dict()
|
||||||
@@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
|
import json
|
||||||
from botocore.exceptions import BotoCoreError, ClientError
|
from botocore.exceptions import BotoCoreError, ClientError
|
||||||
|
|
||||||
def gen_json(n):
|
def gen_json(n):
|
||||||
@@ -112,3 +113,12 @@ def test_incorrect_json(dynamodb, test_table):
|
|||||||
req = get_signed_request(dynamodb, 'PutItem', incorrect_req)
|
req = get_signed_request(dynamodb, 'PutItem', incorrect_req)
|
||||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||||
assert validate_resp(response.text)
|
assert validate_resp(response.text)
|
||||||
|
|
||||||
|
# Test that the value returned by PutItem is always a JSON object, not an empty string (see #6568)
|
||||||
|
def test_put_item_return_type(dynamodb, test_table):
|
||||||
|
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
|
||||||
|
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||||
|
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||||
|
assert response.text
|
||||||
|
# json::loads throws on invalid input
|
||||||
|
json.loads(response.text)
|
||||||
@@ -100,6 +100,14 @@ def test_query_basic_restrictions(dynamodb, filled_test_table):
|
|||||||
print(got_items)
|
print(got_items)
|
||||||
assert multiset([item for item in items if item['p'] == 'long' and item['c'].startswith('11')]) == multiset(got_items)
|
assert multiset([item for item in items if item['p'] == 'long' and item['c'].startswith('11')]) == multiset(got_items)
|
||||||
|
|
||||||
|
def test_query_nonexistent_table(dynamodb):
|
||||||
|
client = dynamodb.meta.client
|
||||||
|
with pytest.raises(ClientError, match="ResourceNotFoundException"):
|
||||||
|
client.query(TableName="i_do_not_exist", KeyConditions={
|
||||||
|
'p' : {'AttributeValueList': ['long'], 'ComparisonOperator': 'EQ'},
|
||||||
|
'c' : {'AttributeValueList': ['11'], 'ComparisonOperator': 'BEGINS_WITH'}
|
||||||
|
})
|
||||||
|
|
||||||
def test_begins_with(dynamodb, test_table):
|
def test_begins_with(dynamodb, test_table):
|
||||||
paginator = dynamodb.meta.client.get_paginator('query')
|
paginator = dynamodb.meta.client.get_paginator('query')
|
||||||
items = [{'p': 'unorthodox_chars', 'c': sort_key, 'str': 'a'} for sort_key in [u'ÿÿÿ', u'cÿbÿ', u'cÿbÿÿabg'] ]
|
items = [{'p': 'unorthodox_chars', 'c': sort_key, 'str': 'a'} for sort_key in [u'ÿÿÿ', u'cÿbÿ', u'cÿbÿÿabg'] ]
|
||||||
@@ -451,7 +459,6 @@ def test_query_limit_paging(test_table_sn):
|
|||||||
# return items sorted in reverse order. Combining this with Limit can
|
# return items sorted in reverse order. Combining this with Limit can
|
||||||
# be used to return the last items instead of the first items of the
|
# be used to return the last items instead of the first items of the
|
||||||
# partition.
|
# partition.
|
||||||
@pytest.mark.xfail(reason="ScanIndexForward not supported yet")
|
|
||||||
def test_query_reverse(test_table_sn):
|
def test_query_reverse(test_table_sn):
|
||||||
numbers = [Decimal(i) for i in range(20)]
|
numbers = [Decimal(i) for i in range(20)]
|
||||||
# Insert these numbers, in random order, into one partition:
|
# Insert these numbers, in random order, into one partition:
|
||||||
@@ -486,7 +493,6 @@ def test_query_reverse(test_table_sn):
|
|||||||
|
|
||||||
# Test that paging also works properly with reverse order
|
# Test that paging also works properly with reverse order
|
||||||
# (ScanIndexForward=false), i.e., reverse-order queries can be resumed
|
# (ScanIndexForward=false), i.e., reverse-order queries can be resumed
|
||||||
@pytest.mark.xfail(reason="ScanIndexForward not supported yet")
|
|
||||||
def test_query_reverse_paging(test_table_sn):
|
def test_query_reverse_paging(test_table_sn):
|
||||||
numbers = [Decimal(i) for i in range(20)]
|
numbers = [Decimal(i) for i in range(20)]
|
||||||
# Insert these numbers, in random order, into one partition:
|
# Insert these numbers, in random order, into one partition:
|
||||||
@@ -42,6 +42,11 @@ def test_scan_basic(filled_test_table):
|
|||||||
assert len(items) == len(got_items)
|
assert len(items) == len(got_items)
|
||||||
assert multiset(items) == multiset(got_items)
|
assert multiset(items) == multiset(got_items)
|
||||||
|
|
||||||
|
def test_scan_nonexistent_table(dynamodb):
|
||||||
|
client = dynamodb.meta.client
|
||||||
|
with pytest.raises(ClientError, match="ResourceNotFoundException"):
|
||||||
|
client.scan(TableName="i_do_not_exist")
|
||||||
|
|
||||||
def test_scan_with_paginator(dynamodb, filled_test_table):
|
def test_scan_with_paginator(dynamodb, filled_test_table):
|
||||||
test_table, items = filled_test_table
|
test_table, items = filled_test_table
|
||||||
paginator = dynamodb.meta.client.get_paginator('scan')
|
paginator = dynamodb.meta.client.get_paginator('scan')
|
||||||
@@ -239,7 +244,6 @@ def test_scan_select(filled_test_table):
|
|||||||
# a scan into multiple parts, and that these parts are in fact disjoint,
|
# a scan into multiple parts, and that these parts are in fact disjoint,
|
||||||
# and their union is the entire contents of the table. We do not actually
|
# and their union is the entire contents of the table. We do not actually
|
||||||
# try to run these queries in *parallel* in this test.
|
# try to run these queries in *parallel* in this test.
|
||||||
@pytest.mark.xfail(reason="parallel scan not supported yet")
|
|
||||||
def test_scan_parallel(filled_test_table):
|
def test_scan_parallel(filled_test_table):
|
||||||
test_table, items = filled_test_table
|
test_table, items = filled_test_table
|
||||||
for nsegments in [1, 2, 17]:
|
for nsegments in [1, 2, 17]:
|
||||||
@@ -250,3 +254,14 @@ def test_scan_parallel(filled_test_table):
|
|||||||
# The following comparison verifies that each of the expected item
|
# The following comparison verifies that each of the expected item
|
||||||
# in items was returned in one - and just one - of the segments.
|
# in items was returned in one - and just one - of the segments.
|
||||||
assert multiset(items) == multiset(got_items)
|
assert multiset(items) == multiset(got_items)
|
||||||
|
|
||||||
|
# Test correct handling of incorrect parallel scan parameters.
|
||||||
|
# Most of the corner cases (like TotalSegments=0) are validated
|
||||||
|
# by boto3 itself, but some checks can still be performed.
|
||||||
|
def test_scan_parallel_incorrect(filled_test_table):
|
||||||
|
test_table, items = filled_test_table
|
||||||
|
with pytest.raises(ClientError, match='ValidationException.*Segment'):
|
||||||
|
full_scan(test_table, TotalSegments=1000001, Segment=0)
|
||||||
|
for segment in [7, 9]:
|
||||||
|
with pytest.raises(ClientError, match='ValidationException.*Segment'):
|
||||||
|
full_scan(test_table, TotalSegments=5, Segment=segment)
|
||||||
@@ -244,11 +244,12 @@ def test_table_streams_off(dynamodb):
|
|||||||
table.delete();
|
table.delete();
|
||||||
# DynamoDB doesn't allow StreamSpecification to be empty map - if it
|
# DynamoDB doesn't allow StreamSpecification to be empty map - if it
|
||||||
# exists, it must have a StreamEnabled
|
# exists, it must have a StreamEnabled
|
||||||
with pytest.raises(ClientError, match='ValidationException'):
|
# Unfortunately, new versions of boto3 doesn't let us pass this...
|
||||||
table = create_test_table(dynamodb, StreamSpecification={},
|
#with pytest.raises(ClientError, match='ValidationException'):
|
||||||
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
# table = create_test_table(dynamodb, StreamSpecification={},
|
||||||
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
|
# KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||||||
table.delete();
|
# AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
|
||||||
|
# table.delete();
|
||||||
# Unfortunately, boto3 doesn't allow us to pass StreamSpecification=None.
|
# Unfortunately, boto3 doesn't allow us to pass StreamSpecification=None.
|
||||||
# This is what we had in issue #5796.
|
# This is what we had in issue #5796.
|
||||||
|
|
||||||
@@ -47,6 +47,11 @@
|
|||||||
using namespace db;
|
using namespace db;
|
||||||
|
|
||||||
static future<> cl_test(commitlog::config cfg, noncopyable_function<future<> (commitlog&)> f) {
|
static future<> cl_test(commitlog::config cfg, noncopyable_function<future<> (commitlog&)> f) {
|
||||||
|
// enable as needed.
|
||||||
|
// moved from static init because static init fiasco.
|
||||||
|
#if 0
|
||||||
|
logging::logger_registry().set_logger_level("commitlog", logging::log_level::trace);
|
||||||
|
#endif
|
||||||
tmpdir tmp;
|
tmpdir tmp;
|
||||||
cfg.commit_log_location = tmp.path().string();
|
cfg.commit_log_location = tmp.path().string();
|
||||||
return commitlog::create_commitlog(cfg).then([f = std::move(f)](commitlog log) mutable {
|
return commitlog::create_commitlog(cfg).then([f = std::move(f)](commitlog log) mutable {
|
||||||
@@ -67,13 +72,6 @@ static future<> cl_test(noncopyable_function<future<> (commitlog&)> f) {
|
|||||||
return cl_test(cfg, std::move(f));
|
return cl_test(cfg, std::move(f));
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
static int loggo = [] {
|
|
||||||
logging::logger_registry().set_logger_level("commitlog", logging::log_level::trace);
|
|
||||||
return 0;
|
|
||||||
}();
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// just write in-memory...
|
// just write in-memory...
|
||||||
SEASTAR_TEST_CASE(test_create_commitlog){
|
SEASTAR_TEST_CASE(test_create_commitlog){
|
||||||
return cl_test([](commitlog& log) {
|
return cl_test([](commitlog& log) {
|
||||||
@@ -296,7 +294,9 @@ SEASTAR_TEST_CASE(test_commitlog_closed) {
|
|||||||
|
|
||||||
SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
||||||
commitlog::config cfg;
|
commitlog::config cfg;
|
||||||
cfg.commitlog_segment_size_in_mb = 2;
|
|
||||||
|
constexpr auto max_size_mb = 2;
|
||||||
|
cfg.commitlog_segment_size_in_mb = max_size_mb;
|
||||||
cfg.commitlog_total_space_in_mb = 1;
|
cfg.commitlog_total_space_in_mb = 1;
|
||||||
cfg.commitlog_sync_period_in_ms = 1;
|
cfg.commitlog_sync_period_in_ms = 1;
|
||||||
return cl_test(cfg, [](commitlog& log) {
|
return cl_test(cfg, [](commitlog& log) {
|
||||||
@@ -305,9 +305,23 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
|||||||
|
|
||||||
// add a flush handler that simply says we're done with the range.
|
// add a flush handler that simply says we're done with the range.
|
||||||
auto r = log.add_flush_handler([&log, sem, segments](cf_id_type id, replay_position pos) {
|
auto r = log.add_flush_handler([&log, sem, segments](cf_id_type id, replay_position pos) {
|
||||||
*segments = log.get_active_segment_names();
|
auto f = make_ready_future<>();
|
||||||
log.discard_completed_segments(id);
|
// #6195 only get segment list at first callback. We can (not often)
|
||||||
sem->signal();
|
// be called again, but reading segment list at that point might (will)
|
||||||
|
// render same list as in the diff check below.
|
||||||
|
if (segments->empty()) {
|
||||||
|
*segments = log.get_active_segment_names();
|
||||||
|
// Verify #5899 - file size should not exceed the config max.
|
||||||
|
f = parallel_for_each(*segments, [](sstring filename) {
|
||||||
|
return file_size(filename).then([](uint64_t size) {
|
||||||
|
BOOST_REQUIRE_LE(size, max_size_mb * 1024 * 1024);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return f.then([&log, sem, id] {
|
||||||
|
log.discard_completed_segments(id);
|
||||||
|
sem->signal();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
auto set = make_lw_shared<std::set<segment_id_type>>();
|
auto set = make_lw_shared<std::set<segment_id_type>>();
|
||||||
|
|||||||
@@ -930,17 +930,17 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
|||||||
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
||||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||||
return make_ready_future();
|
return make_ready_future();
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
|
SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
|
||||||
config cfg;
|
config cfg;
|
||||||
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
||||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
|
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||||
return make_ready_future();
|
return make_ready_future();
|
||||||
}
|
}
|
||||||
@@ -950,7 +950,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
|||||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||||
return make_ready_future();
|
return make_ready_future();
|
||||||
}
|
}
|
||||||
@@ -958,9 +958,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
|||||||
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||||
config cfg;
|
config cfg;
|
||||||
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
||||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
|
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||||
return make_ready_future();
|
return make_ready_future();
|
||||||
}
|
}
|
||||||
@@ -973,7 +973,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
|||||||
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
||||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||||
});
|
});
|
||||||
return make_ready_future();
|
return make_ready_future();
|
||||||
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
|||||||
config cfg;
|
config cfg;
|
||||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||||
return make_ready_future();
|
return make_ready_future();
|
||||||
}
|
}
|
||||||
@@ -992,7 +992,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
|||||||
config cfg;
|
config cfg;
|
||||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||||
return make_ready_future();
|
return make_ready_future();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -117,6 +117,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
|
||||||
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||||
|
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
|
||||||
|
auto muts = gen(partition_nr);
|
||||||
|
schema_ptr s = gen.schema();
|
||||||
|
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
|
||||||
|
int mf_produced = 0;
|
||||||
|
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
|
||||||
|
if (mf_produced++ > 800) {
|
||||||
|
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
|
||||||
|
} else {
|
||||||
|
return source_reader(db::no_timeout);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
auto& partitioner = s->get_partitioner();
|
||||||
|
try {
|
||||||
|
distribute_reader_and_consume_on_shards(s,
|
||||||
|
make_generating_reader(s, std::move(get_next_mutation_fragment)),
|
||||||
|
[&partitioner, error] (flat_mutation_reader reader) mutable {
|
||||||
|
if (error) {
|
||||||
|
return make_exception_future<>(std::runtime_error("Failed to write"));
|
||||||
|
}
|
||||||
|
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
|
||||||
|
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
|
||||||
|
if (mf_opt) {
|
||||||
|
if (mf_opt->is_partition_start()) {
|
||||||
|
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
|
||||||
|
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
|
||||||
|
}
|
||||||
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
|
} else {
|
||||||
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
).get0();
|
||||||
|
} catch (...) {
|
||||||
|
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
|
||||||
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
class bucket_writer {
|
class bucket_writer {
|
||||||
|
|||||||
@@ -531,6 +531,43 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
|||||||
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||||
}});
|
}});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
{
|
||||||
|
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||||
|
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||||
|
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||||
|
auto paging_state = extract_paging_state(res);
|
||||||
|
|
||||||
|
assert_that(res).is_rows().with_rows({{
|
||||||
|
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||||
|
}});
|
||||||
|
|
||||||
|
// Override the actual paging state with one with empty keys,
|
||||||
|
// which is a valid paging state as well, and should return
|
||||||
|
// no rows.
|
||||||
|
paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(),
|
||||||
|
std::nullopt, paging_state->get_remaining(), paging_state->get_query_uuid(),
|
||||||
|
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
|
||||||
|
paging_state->get_rows_fetched_for_last_partition());
|
||||||
|
|
||||||
|
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||||
|
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||||
|
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||||
|
|
||||||
|
assert_that(res).is_rows().with_size(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// An artificial paging state with an empty key pair is also valid and is expected
|
||||||
|
// not to return rows (since no row matches an empty partition key)
|
||||||
|
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
|
||||||
|
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
|
||||||
|
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||||
|
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||||
|
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||||
|
|
||||||
|
assert_that(res).is_rows().with_size(0);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -200,5 +200,16 @@ BOOST_AUTO_TEST_CASE(inet_address) {
|
|||||||
auto res = ser::deserialize_from_buffer(buf, boost::type<gms::inet_address>{});
|
auto res = ser::deserialize_from_buffer(buf, boost::type<gms::inet_address>{});
|
||||||
BOOST_CHECK_EQUAL(res, ip);
|
BOOST_CHECK_EQUAL(res, ip);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stringify tests
|
||||||
|
{
|
||||||
|
for (sstring s : { "2001:6b0:8:2::232", "2a05:d018:223:f00:97af:f4d9:eac2:6a0f", "fe80::8898:3e04:215b:2cd6" }) {
|
||||||
|
gms::inet_address ip(s);
|
||||||
|
BOOST_CHECK(ip.addr().is_ipv6());
|
||||||
|
auto s2 = boost::lexical_cast<std::string>(ip);
|
||||||
|
gms::inet_address ip2(s);
|
||||||
|
BOOST_CHECK_EQUAL(ip2, ip);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5256,3 +5256,131 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
|||||||
test_sstable_log_too_many_rows_f(random, (random + 1), false);
|
test_sstable_log_too_many_rows_f(random, (random + 1), false);
|
||||||
test_sstable_log_too_many_rows_f((random + 1), random, true);
|
test_sstable_log_too_many_rows_f((random + 1), random, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The following test runs on test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection
|
||||||
|
// It was created using Scylla 3.0.x using the following CQL statements:
|
||||||
|
//
|
||||||
|
// CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||||
|
// CREATE TYPE ks.ut (a int, b int);
|
||||||
|
// CREATE TABLE ks.t ( pk int PRIMARY KEY,
|
||||||
|
// m map<int, frozen<ut>>,
|
||||||
|
// fm frozen<map<int, frozen<ut>>>,
|
||||||
|
// mm map<int, frozen<map<int, frozen<ut>>>>,
|
||||||
|
// fmm frozen<map<int, frozen<map<int, frozen<ut>>>>>,
|
||||||
|
// s set<frozen<ut>>,
|
||||||
|
// fs frozen<set<frozen<ut>>>,
|
||||||
|
// l list<frozen<ut>>,
|
||||||
|
// fl frozen<list<frozen<ut>>>
|
||||||
|
// ) WITH compression = {};
|
||||||
|
// UPDATE ks.t USING TIMESTAMP 1525385507816568 SET
|
||||||
|
// m[0] = {a: 0, b: 0},
|
||||||
|
// fm = {0: {a: 0, b: 0}},
|
||||||
|
// mm[0] = {0: {a: 0, b: 0}},
|
||||||
|
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||||
|
// s = s + {{a: 0, b: 0}},
|
||||||
|
// fs = {{a: 0, b: 0}},
|
||||||
|
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||||
|
// fl = [{a: 0, b: 0}]
|
||||||
|
// WHERE pk = 0;
|
||||||
|
//
|
||||||
|
// It checks whether a SSTable containing UDTs nested in collections, which contains incorrect serialization headers
|
||||||
|
// (doesn't wrap nested UDTs in the FrozenType<...> tag) can be loaded by new versions of Scylla.
|
||||||
|
|
||||||
|
static const sstring LEGACY_UDT_IN_COLLECTION_PATH =
|
||||||
|
"test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection";
|
||||||
|
|
||||||
|
SEASTAR_THREAD_TEST_CASE(test_legacy_udt_in_collection_table) {
|
||||||
|
auto abj = defer([] { await_background_jobs().get(); });
|
||||||
|
|
||||||
|
auto ut = user_type_impl::get_instance("ks", to_bytes("ut"),
|
||||||
|
{to_bytes("a"), to_bytes("b")},
|
||||||
|
{int32_type, int32_type}, false);
|
||||||
|
auto m_type = map_type_impl::get_instance(int32_type, ut, true);
|
||||||
|
auto fm_type = map_type_impl::get_instance(int32_type, ut, false);
|
||||||
|
auto mm_type = map_type_impl::get_instance(int32_type, fm_type, true);
|
||||||
|
auto fmm_type = map_type_impl::get_instance(int32_type, fm_type, false);
|
||||||
|
auto s_type = set_type_impl::get_instance(ut, true);
|
||||||
|
auto fs_type = set_type_impl::get_instance(ut, false);
|
||||||
|
auto l_type = list_type_impl::get_instance(ut, true);
|
||||||
|
auto fl_type = list_type_impl::get_instance(ut, false);
|
||||||
|
|
||||||
|
auto s = schema_builder("ks", "t")
|
||||||
|
.with_column("pk", int32_type, column_kind::partition_key)
|
||||||
|
.with_column("m", m_type)
|
||||||
|
.with_column("fm", fm_type)
|
||||||
|
.with_column("mm", mm_type)
|
||||||
|
.with_column("fmm", fmm_type)
|
||||||
|
.with_column("s", s_type)
|
||||||
|
.with_column("fs", fs_type)
|
||||||
|
.with_column("l", l_type)
|
||||||
|
.with_column("fl", fl_type)
|
||||||
|
.set_compressor_params(compression_parameters::no_compression())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
auto m_cdef = s->get_column_definition(to_bytes("m"));
|
||||||
|
auto fm_cdef = s->get_column_definition(to_bytes("fm"));
|
||||||
|
auto mm_cdef = s->get_column_definition(to_bytes("mm"));
|
||||||
|
auto fmm_cdef = s->get_column_definition(to_bytes("fmm"));
|
||||||
|
auto s_cdef = s->get_column_definition(to_bytes("s"));
|
||||||
|
auto fs_cdef = s->get_column_definition(to_bytes("fs"));
|
||||||
|
auto l_cdef = s->get_column_definition(to_bytes("l"));
|
||||||
|
auto fl_cdef = s->get_column_definition(to_bytes("fl"));
|
||||||
|
BOOST_REQUIRE(m_cdef && fm_cdef && mm_cdef && fmm_cdef && s_cdef && fs_cdef && l_cdef && fl_cdef);
|
||||||
|
|
||||||
|
auto ut_val = make_user_value(ut, {int32_t(0), int32_t(0)});
|
||||||
|
auto fm_val = make_map_value(fm_type, {{int32_t(0), ut_val}});
|
||||||
|
auto fmm_val = make_map_value(fmm_type, {{int32_t(0), fm_val}});
|
||||||
|
auto fs_val = make_set_value(fs_type, {ut_val});
|
||||||
|
auto fl_val = make_list_value(fl_type, {ut_val});
|
||||||
|
|
||||||
|
mutation mut{s, partition_key::from_deeply_exploded(*s, {0})};
|
||||||
|
auto ckey = clustering_key::make_empty();
|
||||||
|
|
||||||
|
// m[0] = {a: 0, b: 0}
|
||||||
|
{
|
||||||
|
collection_mutation_description desc;
|
||||||
|
desc.cells.emplace_back(int32_type->decompose(0),
|
||||||
|
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||||
|
mut.set_clustered_cell(ckey, *m_cdef, desc.serialize(*m_type));
|
||||||
|
}
|
||||||
|
|
||||||
|
// fm = {0: {a: 0, b: 0}}
|
||||||
|
mut.set_clustered_cell(ckey, *fm_cdef, atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val)));
|
||||||
|
|
||||||
|
// mm[0] = {0: {a: 0, b: 0}},
|
||||||
|
{
|
||||||
|
collection_mutation_description desc;
|
||||||
|
desc.cells.emplace_back(int32_type->decompose(0),
|
||||||
|
atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val), atomic_cell::collection_member::yes));
|
||||||
|
mut.set_clustered_cell(ckey, *mm_cdef, desc.serialize(*mm_type));
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||||
|
mut.set_clustered_cell(ckey, *fmm_cdef, atomic_cell::make_live(*fmm_type, write_timestamp, fmm_type->decompose(fmm_val)));
|
||||||
|
|
||||||
|
// s = s + {{a: 0, b: 0}},
|
||||||
|
{
|
||||||
|
collection_mutation_description desc;
|
||||||
|
desc.cells.emplace_back(ut->decompose(ut_val),
|
||||||
|
atomic_cell::make_live(*bytes_type, write_timestamp, bytes{}, atomic_cell::collection_member::yes));
|
||||||
|
mut.set_clustered_cell(ckey, *s_cdef, desc.serialize(*s_type));
|
||||||
|
}
|
||||||
|
|
||||||
|
// fs = {{a: 0, b: 0}},
|
||||||
|
mut.set_clustered_cell(ckey, *fs_cdef, atomic_cell::make_live(*fs_type, write_timestamp, fs_type->decompose(fs_val)));
|
||||||
|
|
||||||
|
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||||
|
{
|
||||||
|
collection_mutation_description desc;
|
||||||
|
desc.cells.emplace_back(timeuuid_type->decompose(utils::UUID("7fb27e80-7b12-11ea-9fad-f4d108a9e4a3")),
|
||||||
|
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||||
|
mut.set_clustered_cell(ckey, *l_cdef, desc.serialize(*l_type));
|
||||||
|
}
|
||||||
|
|
||||||
|
// fl = [{a: 0, b: 0}]
|
||||||
|
mut.set_clustered_cell(ckey, *fl_cdef, atomic_cell::make_live(*fl_type, write_timestamp, fl_type->decompose(fl_val)));
|
||||||
|
|
||||||
|
sstable_assertions sst(s, LEGACY_UDT_IN_COLLECTION_PATH);
|
||||||
|
sst.load();
|
||||||
|
assert_that(sst.read_rows_flat()).produces(mut).produces_end_of_stream();
|
||||||
|
}
|
||||||
|
|||||||
8
test/cql/lwt_batch_validation_test.cql
Normal file
8
test/cql/lwt_batch_validation_test.cql
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
CREATE KEYSPACE k WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||||
|
USE k;
|
||||||
|
CREATE TABLE t1 (userid int PRIMARY KEY);
|
||||||
|
CREATE TABLE t2 (userid int PRIMARY KEY);
|
||||||
|
BEGIN BATCH
|
||||||
|
INSERT INTO t1 (userid) VALUES (1) IF NOT EXISTS
|
||||||
|
INSERT INTO t2 (userid) VALUES (1) IF NOT EXISTS
|
||||||
|
APPLY BATCH;
|
||||||
24
test/cql/lwt_batch_validation_test.result
Normal file
24
test/cql/lwt_batch_validation_test.result
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
CREATE KEYSPACE k WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||||
|
{
|
||||||
|
"status" : "ok"
|
||||||
|
}
|
||||||
|
USE k;
|
||||||
|
{
|
||||||
|
"status" : "ok"
|
||||||
|
}
|
||||||
|
CREATE TABLE t1 (userid int PRIMARY KEY);
|
||||||
|
{
|
||||||
|
"status" : "ok"
|
||||||
|
}
|
||||||
|
CREATE TABLE t2 (userid int PRIMARY KEY);
|
||||||
|
{
|
||||||
|
"status" : "ok"
|
||||||
|
}
|
||||||
|
BEGIN BATCH
|
||||||
|
INSERT INTO t1 (userid) VALUES (1) IF NOT EXISTS
|
||||||
|
INSERT INTO t2 (userid) VALUES (1) IF NOT EXISTS
|
||||||
|
APPLY BATCH;
|
||||||
|
{
|
||||||
|
"message" : "exceptions::invalid_request_exception (Batch with conditions cannot span multiple tables)",
|
||||||
|
"status" : "error"
|
||||||
|
}
|
||||||
@@ -397,9 +397,6 @@ public:
|
|||||||
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
|
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
|
||||||
cfg->num_tokens.set(256);
|
cfg->num_tokens.set(256);
|
||||||
cfg->ring_delay_ms.set(500);
|
cfg->ring_delay_ms.set(500);
|
||||||
auto features = cfg->experimental_features();
|
|
||||||
features.emplace_back(db::experimental_features_t::LWT);
|
|
||||||
cfg->experimental_features(features);
|
|
||||||
cfg->shutdown_announce_in_ms.set(0);
|
cfg->shutdown_announce_in_ms.set(0);
|
||||||
cfg->broadcast_to_all_shards().get();
|
cfg->broadcast_to_all_shards().get();
|
||||||
create_directories((data_dir_path + "/system").c_str());
|
create_directories((data_dir_path + "/system").c_str());
|
||||||
@@ -439,7 +436,6 @@ public:
|
|||||||
|
|
||||||
gms::feature_config fcfg;
|
gms::feature_config fcfg;
|
||||||
fcfg.enable_cdc = true;
|
fcfg.enable_cdc = true;
|
||||||
fcfg.enable_lwt = true;
|
|
||||||
fcfg.enable_sstables_mc_format = true;
|
fcfg.enable_sstables_mc_format = true;
|
||||||
if (cfg->enable_user_defined_functions()) {
|
if (cfg->enable_user_defined_functions()) {
|
||||||
fcfg.enable_user_defined_functions = true;
|
fcfg.enable_user_defined_functions = true;
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
|||||||
|
3519784297
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user