Compare commits
36 Commits
next
...
scylla-3.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
413fcab833 | ||
|
|
9f3c3036bf | ||
|
|
ff2e108a6d | ||
|
|
ade788ffe8 | ||
|
|
1f8bb754d9 | ||
|
|
7b2eb09225 | ||
|
|
d2293f9fd5 | ||
|
|
25b31f6c23 | ||
|
|
742a1ce7d6 | ||
|
|
4ca9d23b83 | ||
|
|
9e97f3a9b3 | ||
|
|
183418f228 | ||
|
|
756574d094 | ||
|
|
a348418918 | ||
|
|
06c0bd0681 | ||
|
|
223c300435 | ||
|
|
ac8bef6781 | ||
|
|
68691907af | ||
|
|
f59d2fcbf1 | ||
|
|
bdc542143e | ||
|
|
061a02237c | ||
|
|
35b6505517 | ||
|
|
866c04dd64 | ||
|
|
dc588e6e7b | ||
|
|
f842154453 | ||
|
|
b38193f71d | ||
|
|
f47ba6dc06 | ||
|
|
0d0c1d4318 | ||
|
|
9225b17b99 | ||
|
|
00b3f28199 | ||
|
|
1bbe619689 | ||
|
|
c36f71c783 | ||
|
|
f5471d268b | ||
|
|
fd5c65d9dc | ||
|
|
3aa406bf00 | ||
|
|
c0253d9221 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=3.3.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -299,14 +299,14 @@ static void describe_key_schema(rjson::value& parent, const schema& schema, std:
|
||||
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::describe_table(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.describe_table++;
|
||||
rjson::value request = rjson::parse(content);
|
||||
elogger.trace("Describing table {}", request);
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
rjson::value table_description = rjson::empty_object();
|
||||
rjson::set(table_description, "TableName", rjson::from_string(schema->cf_name()));
|
||||
@@ -378,13 +378,13 @@ future<json::json_return_type> executor::describe_table(client_state& client_sta
|
||||
return make_ready_future<json::json_return_type>(make_jsonable(std::move(response)));
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::delete_table(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.delete_table++;
|
||||
rjson::value request = rjson::parse(content);
|
||||
elogger.trace("Deleting table {}", request);
|
||||
|
||||
std::string table_name = get_table_name(request);
|
||||
tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, table_name);
|
||||
tracing::add_table_name(trace_state, KEYSPACE_NAME, table_name);
|
||||
|
||||
if (!_proxy.get_db().local().has_schema(KEYSPACE_NAME, table_name)) {
|
||||
throw api_error("ResourceNotFoundException",
|
||||
@@ -481,14 +481,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
}
|
||||
|
||||
|
||||
future<json::json_return_type> executor::create_table(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.create_table++;
|
||||
rjson::value table_info = rjson::parse(content);
|
||||
elogger.trace("Creating table {}", table_info);
|
||||
std::string table_name = get_table_name(table_info);
|
||||
const rjson::value& attribute_definitions = table_info["AttributeDefinitions"];
|
||||
|
||||
tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, table_name);
|
||||
tracing::add_table_name(trace_state, KEYSPACE_NAME, table_name);
|
||||
|
||||
schema_builder builder(KEYSPACE_NAME, table_name);
|
||||
auto [hash_key, range_key] = parse_key_schema(table_info);
|
||||
@@ -749,14 +749,14 @@ static future<std::unique_ptr<rjson::value>> maybe_get_previous_item(
|
||||
bool need_read_before_write,
|
||||
alternator::stats& stats);
|
||||
|
||||
future<json::json_return_type> executor::put_item(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::put_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.put_item++;
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
rjson::value update_info = rjson::parse(content);
|
||||
elogger.trace("Updating value {}", update_info);
|
||||
|
||||
schema_ptr schema = get_table(_proxy, update_info);
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
if (rjson::find(update_info, "ConditionExpression")) {
|
||||
throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator");
|
||||
@@ -766,6 +766,7 @@ future<json::json_return_type> executor::put_item(client_state& client_state, st
|
||||
// FIXME: Need to support also the ALL_OLD option. See issue #5053.
|
||||
throw api_error("ValidationException", format("Unsupported ReturnValues={} for PutItem operation", return_values));
|
||||
}
|
||||
|
||||
const bool has_expected = update_info.HasMember("Expected");
|
||||
|
||||
const rjson::value& item = update_info["Item"];
|
||||
@@ -774,11 +775,11 @@ future<json::json_return_type> executor::put_item(client_state& client_state, st
|
||||
|
||||
return maybe_get_previous_item(_proxy, client_state, schema, item, has_expected, _stats).then(
|
||||
[this, schema, has_expected, update_info = rjson::copy(update_info), m = std::move(m),
|
||||
&client_state, start_time] (std::unique_ptr<rjson::value> previous_item) mutable {
|
||||
&client_state, start_time, trace_state] (std::unique_ptr<rjson::value> previous_item) mutable {
|
||||
if (has_expected) {
|
||||
verify_expected(update_info, previous_item);
|
||||
}
|
||||
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () {
|
||||
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () {
|
||||
_stats.api_operations.put_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.put_item_latency._count + 1);
|
||||
// Without special options on what to return, PutItem returns nothing.
|
||||
return make_ready_future<json::json_return_type>(json_string(""));
|
||||
@@ -806,13 +807,13 @@ static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr sc
|
||||
return m;
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::delete_item(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.delete_item++;
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
rjson::value update_info = rjson::parse(content);
|
||||
|
||||
schema_ptr schema = get_table(_proxy, update_info);
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
if (rjson::find(update_info, "ConditionExpression")) {
|
||||
throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator");
|
||||
@@ -831,11 +832,11 @@ future<json::json_return_type> executor::delete_item(client_state& client_state,
|
||||
|
||||
return maybe_get_previous_item(_proxy, client_state, schema, key, has_expected, _stats).then(
|
||||
[this, schema, has_expected, update_info = rjson::copy(update_info), m = std::move(m),
|
||||
&client_state, start_time] (std::unique_ptr<rjson::value> previous_item) mutable {
|
||||
&client_state, start_time, trace_state] (std::unique_ptr<rjson::value> previous_item) mutable {
|
||||
if (has_expected) {
|
||||
verify_expected(update_info, previous_item);
|
||||
}
|
||||
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () {
|
||||
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () {
|
||||
_stats.api_operations.delete_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.delete_item_latency._count + 1);
|
||||
// Without special options on what to return, DeleteItem returns nothing.
|
||||
return make_ready_future<json::json_return_type>(json_string(""));
|
||||
@@ -868,7 +869,7 @@ struct primary_key_equal {
|
||||
}
|
||||
};
|
||||
|
||||
future<json::json_return_type> executor::batch_write_item(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.batch_write_item++;
|
||||
rjson::value batch_info = rjson::parse(content);
|
||||
rjson::value& request_items = batch_info["RequestItems"];
|
||||
@@ -878,7 +879,7 @@ future<json::json_return_type> executor::batch_write_item(client_state& client_s
|
||||
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
schema_ptr schema = get_table_from_batch_request(_proxy, it);
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(1, primary_key_hash{schema}, primary_key_equal{schema});
|
||||
for (auto& request : it->value.GetArray()) {
|
||||
if (!request.IsObject() || request.MemberCount() != 1) {
|
||||
@@ -911,7 +912,7 @@ future<json::json_return_type> executor::batch_write_item(client_state& client_s
|
||||
}
|
||||
}
|
||||
|
||||
return _proxy.mutate(std::move(mutations), db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([] () {
|
||||
return _proxy.mutate(std::move(mutations), db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([] () {
|
||||
// Without special options on what to return, BatchWriteItem returns nothing,
|
||||
// unless there are UnprocessedItems - it's possible to just stop processing a batch
|
||||
// due to throttling. TODO(sarna): Consider UnprocessedItems when returning.
|
||||
@@ -1410,13 +1411,13 @@ static future<std::unique_ptr<rjson::value>> maybe_get_previous_item(
|
||||
}
|
||||
|
||||
|
||||
future<json::json_return_type> executor::update_item(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::update_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.update_item++;
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
rjson::value update_info = rjson::parse(content);
|
||||
elogger.trace("update_item {}", update_info);
|
||||
schema_ptr schema = get_table(_proxy, update_info);
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
if (rjson::find(update_info, "ConditionExpression")) {
|
||||
throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator");
|
||||
@@ -1472,7 +1473,7 @@ future<json::json_return_type> executor::update_item(client_state& client_state,
|
||||
return maybe_get_previous_item(_proxy, client_state, schema, pk, ck, has_update_expression, expression, has_expected, _stats).then(
|
||||
[this, schema, expression = std::move(expression), has_update_expression, ck = std::move(ck), has_expected,
|
||||
update_info = rjson::copy(update_info), m = std::move(m), attrs_collector = std::move(attrs_collector),
|
||||
attribute_updates = rjson::copy(attribute_updates), ts, &client_state, start_time] (std::unique_ptr<rjson::value> previous_item) mutable {
|
||||
attribute_updates = rjson::copy(attribute_updates), ts, &client_state, start_time, trace_state] (std::unique_ptr<rjson::value> previous_item) mutable {
|
||||
if (has_expected) {
|
||||
verify_expected(update_info, previous_item);
|
||||
}
|
||||
@@ -1603,7 +1604,7 @@ future<json::json_return_type> executor::update_item(client_state& client_state,
|
||||
row.apply(row_marker(ts));
|
||||
|
||||
elogger.trace("Applying mutation {}", m);
|
||||
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () {
|
||||
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () {
|
||||
// Without special options on what to return, UpdateItem returns nothing.
|
||||
_stats.api_operations.update_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.update_item_latency._count + 1);
|
||||
return make_ready_future<json::json_return_type>(json_string(""));
|
||||
@@ -1630,7 +1631,7 @@ static db::consistency_level get_read_consistency(const rjson::value& request) {
|
||||
return consistent_read ? db::consistency_level::LOCAL_QUORUM : db::consistency_level::LOCAL_ONE;
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::get_item(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.get_item++;
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
rjson::value table_info = rjson::parse(content);
|
||||
@@ -1638,7 +1639,7 @@ future<json::json_return_type> executor::get_item(client_state& client_state, st
|
||||
|
||||
schema_ptr schema = get_table(_proxy, table_info);
|
||||
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
rjson::value& query_key = table_info["Key"];
|
||||
db::consistency_level cl = get_read_consistency(table_info);
|
||||
@@ -1673,7 +1674,7 @@ future<json::json_return_type> executor::get_item(client_state& client_state, st
|
||||
});
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::batch_get_item(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
// FIXME: In this implementation, an unbounded batch size can cause
|
||||
// unbounded response JSON object to be buffered in memory, unbounded
|
||||
// parallelism of the requests, and unbounded amount of non-preemptable
|
||||
@@ -1701,7 +1702,7 @@ future<json::json_return_type> executor::batch_get_item(client_state& client_sta
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs;
|
||||
rs.schema = get_table_from_batch_request(_proxy, it);
|
||||
tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, rs.schema->cf_name());
|
||||
tracing::add_table_name(trace_state, KEYSPACE_NAME, rs.schema->cf_name());
|
||||
rs.cl = get_read_consistency(it->value);
|
||||
rs.attrs_to_get = calculate_attrs_to_get(it->value);
|
||||
auto& keys = (it->value)["Keys"];
|
||||
@@ -1867,10 +1868,11 @@ static future<json::json_return_type> do_query(schema_ptr schema,
|
||||
db::consistency_level cl,
|
||||
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions,
|
||||
service::client_state& client_state,
|
||||
cql3::cql_stats& cql_stats) {
|
||||
cql3::cql_stats& cql_stats,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
::shared_ptr<service::pager::paging_state> paging_state = nullptr;
|
||||
|
||||
tracing::trace(client_state.get_trace_state(), "Performing a database query");
|
||||
tracing::trace(trace_state, "Performing a database query");
|
||||
|
||||
if (exclusive_start_key) {
|
||||
partition_key pk = pk_from_json(*exclusive_start_key, schema);
|
||||
@@ -1887,7 +1889,7 @@ static future<json::json_return_type> do_query(schema_ptr schema,
|
||||
auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, query::max_partitions);
|
||||
|
||||
auto query_state_ptr = std::make_unique<service::query_state>(client_state, empty_service_permit());
|
||||
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, empty_service_permit());
|
||||
|
||||
command->slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
auto query_options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, std::vector<cql3::raw_value>{});
|
||||
@@ -1919,7 +1921,7 @@ static future<json::json_return_type> do_query(schema_ptr schema,
|
||||
// 2. Filtering - by passing appropriately created restrictions to pager as a last parameter
|
||||
// 3. Proper timeouts instead of gc_clock::now() and db::no_timeout
|
||||
// 4. Implement parallel scanning via Segments
|
||||
future<json::json_return_type> executor::scan(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::scan(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.scan++;
|
||||
rjson::value request_info = rjson::parse(content);
|
||||
elogger.trace("Scanning {}", request_info);
|
||||
@@ -1956,7 +1958,7 @@ future<json::json_return_type> executor::scan(client_state& client_state, std::s
|
||||
partition_ranges = filtering_restrictions->get_partition_key_ranges(query_options);
|
||||
ck_bounds = filtering_restrictions->get_clustering_bounds(query_options);
|
||||
}
|
||||
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats);
|
||||
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, trace_state);
|
||||
}
|
||||
|
||||
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) {
|
||||
@@ -2079,14 +2081,14 @@ calculate_bounds(schema_ptr schema, const rjson::value& conditions) {
|
||||
return {std::move(partition_ranges), std::move(ck_bounds)};
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::query(client_state& client_state, std::string content) {
|
||||
future<json::json_return_type> executor::query(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
|
||||
_stats.api_operations.query++;
|
||||
rjson::value request_info = rjson::parse(content);
|
||||
elogger.trace("Querying {}", request_info);
|
||||
|
||||
schema_ptr schema = get_table_or_view(_proxy, request_info);
|
||||
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
|
||||
rjson::value* exclusive_start_key = rjson::find(request_info, "ExclusiveStartKey");
|
||||
db::consistency_level cl = get_read_consistency(request_info);
|
||||
@@ -2129,7 +2131,7 @@ future<json::json_return_type> executor::query(client_state& client_state, std::
|
||||
throw api_error("ValidationException", format("QueryFilter can only contain non-primary key attributes: Primary key attribute: {}", ck_defs.front()->name_as_text()));
|
||||
}
|
||||
}
|
||||
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats);
|
||||
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, std::move(trace_state));
|
||||
}
|
||||
|
||||
static void validate_limit(int limit) {
|
||||
@@ -2238,18 +2240,20 @@ future<> executor::maybe_create_keyspace() {
|
||||
});
|
||||
}
|
||||
|
||||
static void create_tracing_session(executor::client_state& client_state) {
|
||||
static tracing::trace_state_ptr create_tracing_session() {
|
||||
tracing::trace_state_props_set props;
|
||||
props.set<tracing::trace_state_props::full_tracing>();
|
||||
client_state.create_tracing_session(tracing::trace_type::QUERY, props);
|
||||
return tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, props);
|
||||
}
|
||||
|
||||
void executor::maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query) {
|
||||
tracing::trace_state_ptr executor::maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query) {
|
||||
tracing::trace_state_ptr trace_state;
|
||||
if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
|
||||
create_tracing_session(client_state);
|
||||
tracing::add_query(client_state.get_trace_state(), query);
|
||||
tracing::begin(client_state.get_trace_state(), format("Alternator {}", op), client_state.get_client_address());
|
||||
trace_state = create_tracing_session();
|
||||
tracing::add_query(trace_state, query);
|
||||
tracing::begin(trace_state, format("Alternator {}", op), client_state.get_client_address());
|
||||
}
|
||||
return trace_state;
|
||||
}
|
||||
|
||||
future<> executor::start() {
|
||||
|
||||
@@ -46,26 +46,26 @@ public:
|
||||
|
||||
executor(service::storage_proxy& proxy, service::migration_manager& mm) : _proxy(proxy), _mm(mm) {}
|
||||
|
||||
future<json::json_return_type> create_table(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> describe_table(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> delete_table(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> put_item(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> get_item(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> delete_item(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> update_item(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> create_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> put_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> update_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> list_tables(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> scan(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> scan(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> describe_endpoints(client_state& client_state, std::string content, std::string host_header);
|
||||
future<json::json_return_type> batch_write_item(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> batch_get_item(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> query(client_state& client_state, std::string content);
|
||||
future<json::json_return_type> batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
future<json::json_return_type> query(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
|
||||
|
||||
future<> start();
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
|
||||
future<> maybe_create_keyspace();
|
||||
|
||||
static void maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query);
|
||||
static tracing::trace_state_ptr maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -231,9 +231,9 @@ future<json::json_return_type> server::handle_api_request(std::unique_ptr<reques
|
||||
// We use unique_ptr because client_state cannot be moved or copied
|
||||
return do_with(std::make_unique<executor::client_state>(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr<executor::client_state>& client_state) mutable {
|
||||
client_state->set_raw_keyspace(executor::KEYSPACE_NAME);
|
||||
executor::maybe_trace_query(*client_state, op, req->content);
|
||||
tracing::trace(client_state->get_trace_state(), op);
|
||||
return callback_it->second(_executor.local(), *client_state, std::move(req));
|
||||
tracing::trace_state_ptr trace_state = executor::maybe_trace_query(*client_state, op, req->content);
|
||||
tracing::trace(trace_state, op);
|
||||
return callback_it->second(_executor.local(), *client_state, trace_state, std::move(req)).finally([trace_state] {});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -253,21 +253,21 @@ void server::set_routes(routes& r) {
|
||||
server::server(seastar::sharded<executor>& e)
|
||||
: _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false)
|
||||
, _callbacks{
|
||||
{"CreateTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) {
|
||||
return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req)] { return e.create_table(client_state, req->content); }); }
|
||||
{"CreateTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) {
|
||||
return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req), trace_state = std::move(trace_state)] () mutable { return e.create_table(client_state, std::move(trace_state), req->content); }); }
|
||||
},
|
||||
{"DescribeTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_table(client_state, req->content); }},
|
||||
{"DeleteTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_table(client_state, req->content); }},
|
||||
{"PutItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.put_item(client_state, req->content); }},
|
||||
{"UpdateItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.update_item(client_state, req->content); }},
|
||||
{"GetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.get_item(client_state, req->content); }},
|
||||
{"DeleteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_item(client_state, req->content); }},
|
||||
{"ListTables", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.list_tables(client_state, req->content); }},
|
||||
{"Scan", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.scan(client_state, req->content); }},
|
||||
{"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }},
|
||||
{"BatchWriteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_write_item(client_state, req->content); }},
|
||||
{"BatchGetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_get_item(client_state, req->content); }},
|
||||
{"Query", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.query(client_state, req->content); }},
|
||||
{"DescribeTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.describe_table(client_state, std::move(trace_state), req->content); }},
|
||||
{"DeleteTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.delete_table(client_state, std::move(trace_state), req->content); }},
|
||||
{"PutItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.put_item(client_state, std::move(trace_state), req->content); }},
|
||||
{"UpdateItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.update_item(client_state, std::move(trace_state), req->content); }},
|
||||
{"GetItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.get_item(client_state, std::move(trace_state), req->content); }},
|
||||
{"DeleteItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.delete_item(client_state, std::move(trace_state), req->content); }},
|
||||
{"ListTables", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.list_tables(client_state, req->content); }},
|
||||
{"Scan", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.scan(client_state, std::move(trace_state), req->content); }},
|
||||
{"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }},
|
||||
{"BatchWriteItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.batch_write_item(client_state, std::move(trace_state), req->content); }},
|
||||
{"BatchGetItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.batch_get_item(client_state, std::move(trace_state), req->content); }},
|
||||
{"Query", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.query(client_state, std::move(trace_state), req->content); }},
|
||||
} {
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@
|
||||
namespace alternator {
|
||||
|
||||
class server {
|
||||
using alternator_callback = std::function<future<json::json_return_type>(executor&, executor::client_state&, std::unique_ptr<request>)>;
|
||||
using alternator_callback = std::function<future<json::json_return_type>(executor&, executor::client_state&, tracing::trace_state_ptr, std::unique_ptr<request>)>;
|
||||
using alternator_callbacks_map = std::unordered_map<std::string_view, alternator_callback>;
|
||||
|
||||
seastar::httpd::http_server_control _control;
|
||||
|
||||
@@ -193,9 +193,9 @@ future<> service::start(::service::migration_manager& mm) {
|
||||
future<> service::stop() {
|
||||
// Only one of the shards has the listener registered, but let's try to
|
||||
// unregister on each one just to make sure.
|
||||
_mnotifier.unregister_listener(_migration_listener.get());
|
||||
|
||||
return _permissions_cache->stop().then([this] {
|
||||
return _mnotifier.unregister_listener(_migration_listener.get()).then([this] {
|
||||
return _permissions_cache->stop();
|
||||
}).then([this] {
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop());
|
||||
});
|
||||
}
|
||||
|
||||
37
cdc/cdc.cc
37
cdc/cdc.cc
@@ -73,6 +73,7 @@ static future<> populate_desc(db_context ctx, const schema& s);
|
||||
class cdc::cdc_service::impl : service::migration_listener::empty_listener {
|
||||
friend cdc_service;
|
||||
db_context _ctxt;
|
||||
bool _stopped = false;
|
||||
public:
|
||||
impl(db_context ctxt)
|
||||
: _ctxt(std::move(ctxt))
|
||||
@@ -80,7 +81,13 @@ public:
|
||||
_ctxt._migration_notifier.register_listener(this);
|
||||
}
|
||||
~impl() {
|
||||
_ctxt._migration_notifier.unregister_listener(this);
|
||||
assert(_stopped);
|
||||
}
|
||||
|
||||
future<> stop() {
|
||||
return _ctxt._migration_notifier.unregister_listener(this).then([this] {
|
||||
_stopped = true;
|
||||
});
|
||||
}
|
||||
|
||||
void on_before_create_column_family(const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
|
||||
@@ -198,6 +205,10 @@ cdc::cdc_service::cdc_service(db_context ctxt)
|
||||
_impl->_ctxt._proxy.set_cdc_service(this);
|
||||
}
|
||||
|
||||
future<> cdc::cdc_service::stop() {
|
||||
return _impl->stop();
|
||||
}
|
||||
|
||||
cdc::cdc_service::~cdc_service() = default;
|
||||
|
||||
cdc::options::options(const std::map<sstring, sstring>& map) {
|
||||
@@ -260,7 +271,6 @@ sstring desc_name(const sstring& table_name) {
|
||||
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
|
||||
schema_builder b(s.ks_name(), log_name(s.cf_name()));
|
||||
b.set_default_time_to_live(gc_clock::duration{s.cdc_options().ttl()});
|
||||
b.set_comment(sprint("CDC log for %s.%s", s.ks_name(), s.cf_name()));
|
||||
b.with_column("stream_id", uuid_type, column_kind::partition_key);
|
||||
b.with_column("time", timeuuid_type, column_kind::clustering_key);
|
||||
@@ -405,6 +415,7 @@ private:
|
||||
bytes _decomposed_time;
|
||||
::shared_ptr<const transformer::streams_type> _streams;
|
||||
const column_definition& _op_col;
|
||||
ttl_opt _cdc_ttl_opt;
|
||||
|
||||
clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) const {
|
||||
const auto log_ck = clustering_key::from_exploded(
|
||||
@@ -416,7 +427,8 @@ private:
|
||||
auto cdef = m.schema()->get_column_definition(to_bytes("_" + column.name()));
|
||||
auto value = atomic_cell::make_live(*column.type,
|
||||
_time.timestamp(),
|
||||
bytes_view(pk_value[pos]));
|
||||
bytes_view(pk_value[pos]),
|
||||
_cdc_ttl_opt);
|
||||
m.set_cell(log_ck, *cdef, std::move(value));
|
||||
++pos;
|
||||
}
|
||||
@@ -424,7 +436,7 @@ private:
|
||||
}
|
||||
|
||||
void set_operation(const clustering_key& ck, operation op, mutation& m) const {
|
||||
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _time.timestamp(), _op_col.type->decompose(operation_native_type(op))));
|
||||
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _time.timestamp(), _op_col.type->decompose(operation_native_type(op)), _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
partition_key stream_id(const net::inet_address& ip, unsigned int shard_id) const {
|
||||
@@ -443,7 +455,11 @@ public:
|
||||
, _decomposed_time(timeuuid_type->decompose(_time))
|
||||
, _streams(std::move(streams))
|
||||
, _op_col(*_log_schema->get_column_definition(to_bytes("operation")))
|
||||
{}
|
||||
{
|
||||
if (_schema->cdc_options().ttl()) {
|
||||
_cdc_ttl_opt = std::chrono::seconds(_schema->cdc_options().ttl());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
|
||||
// more details like tombstones/ttl? Probably not but keep in mind.
|
||||
@@ -475,7 +491,8 @@ public:
|
||||
auto cdef = _log_schema->get_column_definition(to_bytes("_" + column.name()));
|
||||
auto value = atomic_cell::make_live(*column.type,
|
||||
_time.timestamp(),
|
||||
bytes_view(exploded[pos]));
|
||||
bytes_view(exploded[pos]),
|
||||
_cdc_ttl_opt);
|
||||
res.set_cell(log_ck, *cdef, std::move(value));
|
||||
++pos;
|
||||
}
|
||||
@@ -531,11 +548,11 @@ public:
|
||||
for (const auto& column : _schema->clustering_key_columns()) {
|
||||
assert (pos < ck_value.size());
|
||||
auto cdef = _log_schema->get_column_definition(to_bytes("_" + column.name()));
|
||||
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos])));
|
||||
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
|
||||
if (pirow) {
|
||||
assert(pirow->has(column.name_as_text()));
|
||||
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos])));
|
||||
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos]), _cdc_ttl_opt));
|
||||
}
|
||||
|
||||
++pos;
|
||||
@@ -564,7 +581,7 @@ public:
|
||||
}
|
||||
|
||||
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(op)));
|
||||
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values)));
|
||||
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values), _cdc_ttl_opt));
|
||||
|
||||
if (pirow && pirow->has(cdef.name_as_text())) {
|
||||
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(column_op::set)));
|
||||
@@ -573,7 +590,7 @@ public:
|
||||
|
||||
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
|
||||
assert(pikey->explode() != log_ck.explode());
|
||||
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values)));
|
||||
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values), _cdc_ttl_opt));
|
||||
}
|
||||
} else {
|
||||
cdc_log.warn("Non-atomic cell ignored {}.{}:{}", _schema->ks_name(), _schema->cf_name(), cdef.name_as_text());
|
||||
|
||||
@@ -81,6 +81,7 @@ class cdc_service {
|
||||
class impl;
|
||||
std::unique_ptr<impl> _impl;
|
||||
public:
|
||||
future<> stop();
|
||||
cdc_service(service::storage_proxy&);
|
||||
cdc_service(db_context);
|
||||
~cdc_service();
|
||||
|
||||
@@ -266,7 +266,7 @@ bool column_condition::applies_to(const data_value* cell_value, const query_opti
|
||||
return value.has_value() && is_satisfied_by(operator_type::EQ, *cell_value->type(), *column.type, *cell_value, *value);
|
||||
});
|
||||
} else {
|
||||
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return value.has_value() == false; });
|
||||
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return !value.has_value() || value->empty(); });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -450,8 +450,9 @@ query_processor::~query_processor() {
|
||||
}
|
||||
|
||||
future<> query_processor::stop() {
|
||||
_mnotifier.unregister_listener(_migration_subscriber.get());
|
||||
return _authorized_prepared_cache.stop().finally([this] { return _prepared_cache.stop(); });
|
||||
return _mnotifier.unregister_listener(_migration_subscriber.get()).then([this] {
|
||||
return _authorized_prepared_cache.stop().finally([this] { return _prepared_cache.stop(); });
|
||||
});
|
||||
}
|
||||
|
||||
future<::shared_ptr<result_message>>
|
||||
|
||||
@@ -807,7 +807,12 @@ public:
|
||||
// (Note: wait_for_pending(pos) waits for operation _at_ pos (and before),
|
||||
replay_position rp(me->_desc.id, position_type(fp));
|
||||
return me->_pending_ops.wait_for_pending(rp, timeout).then([me, fp] {
|
||||
assert(me->_flush_pos > fp);
|
||||
assert(me->_segment_manager->cfg.mode != sync_mode::BATCH || me->_flush_pos > fp);
|
||||
if (me->_flush_pos <= fp) {
|
||||
// previous op we were waiting for was not sync one, so it did not flush
|
||||
// force flush here
|
||||
return me->do_flush(fp);
|
||||
}
|
||||
return make_ready_future<sseg_ptr>(me);
|
||||
});
|
||||
}
|
||||
@@ -1323,7 +1328,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
v.emplace_back(iovec{ buf.get_write(), s});
|
||||
m += s;
|
||||
}
|
||||
return f.dma_write(max_size - rem, std::move(v)).then([&rem](size_t s) {
|
||||
return f.dma_write(max_size - rem, std::move(v), service::get_local_commitlog_priority()).then([&rem](size_t s) {
|
||||
rem -= s;
|
||||
return stop_iteration::no;
|
||||
});
|
||||
|
||||
@@ -405,11 +405,8 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
|
||||
return _proxy.send_hint_to_endpoint(std::move(m), end_point_key());
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
auto timeout = db::timeout_clock::now() + 1h;
|
||||
//FIXME: Add required frozen_mutation overloads
|
||||
return _proxy.mutate({m.fm.unfreeze(m.s)}, consistency_level::ALL, timeout, nullptr, empty_service_permit());
|
||||
return _proxy.mutate_hint_from_scratch(std::move(m));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1792,7 +1792,7 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) {
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s, database& db) {
|
||||
return (s.get() == batchlog().get())
|
||||
return (s.get() == batchlog().get()) || (s.get() == paxos().get())
|
||||
|| s == v3::scylla_views_builds_in_progress();
|
||||
}
|
||||
|
||||
|
||||
@@ -311,7 +311,7 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
|
||||
if (!cdef.is_computed()) {
|
||||
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
|
||||
// once "computed_columns feature" is supported by every node
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_base)) {
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
|
||||
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
|
||||
}
|
||||
computed_value = token_column_computation().compute_value(*_base, base_key, update);
|
||||
@@ -894,7 +894,11 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
if (_update && !_update->is_end_of_partition()) {
|
||||
if (_update->is_clustering_row()) {
|
||||
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
|
||||
generate_update(std::move(*_update).as_clustering_row(), { });
|
||||
auto existing_tombstone = _existing_tombstone_tracker.current_tombstone();
|
||||
auto existing = existing_tombstone
|
||||
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
|
||||
}
|
||||
return advance_updates();
|
||||
}
|
||||
@@ -1187,8 +1191,9 @@ future<> view_builder::stop() {
|
||||
vlogger.info("Stopping view builder");
|
||||
_as.request_abort();
|
||||
return _started.finally([this] {
|
||||
_mnotifier.unregister_listener(this);
|
||||
return _sem.wait().then([this] {
|
||||
return _mnotifier.unregister_listener(this).then([this] {
|
||||
return _sem.wait();
|
||||
}).then([this] {
|
||||
_sem.broken();
|
||||
return _build_step.join();
|
||||
}).handle_exception_type([] (const broken_semaphore&) {
|
||||
|
||||
2
dist/debian/build_deb.sh
vendored
2
dist/debian/build_deb.sh
vendored
@@ -125,7 +125,7 @@ if [ -z "$TARGET" ]; then
|
||||
fi
|
||||
RELOC_PKG_FULLPATH=$(readlink -f $RELOC_PKG)
|
||||
RELOC_PKG_BASENAME=$(basename $RELOC_PKG)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE | sed 's/\.rc/~rc/')
|
||||
SCYLLA_RELEASE=$(cat SCYLLA-RELEASE-FILE)
|
||||
|
||||
ln -fv $RELOC_PKG_FULLPATH ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz
|
||||
|
||||
6
dist/debian/debian/scylla-server.postrm
vendored
6
dist/debian/debian/scylla-server.postrm
vendored
@@ -6,8 +6,12 @@ case "$1" in
|
||||
purge|remove)
|
||||
rm -rf /etc/systemd/system/scylla-housekeeping-daily.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-housekeeping-restart.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-helper.slice.d/
|
||||
# We need to keep dependencies.conf and sysconfdir.conf on 'remove',
|
||||
# otherwise it will be missing after rollback.
|
||||
if [ "$1" = "purge" ]; then
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -5,7 +5,7 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-3.3/latest/scylla.repo
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -1935,7 +1935,8 @@ future<> gossiper::do_stop_gossiping() {
|
||||
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
|
||||
logger.info("Announcing shutdown");
|
||||
add_local_application_state(application_state::STATUS, _value_factory.shutdown(true)).get();
|
||||
for (inet_address addr : _live_endpoints) {
|
||||
auto live_endpoints = _live_endpoints;
|
||||
for (inet_address addr : live_endpoints) {
|
||||
msg_addr id = get_msg_addr(addr);
|
||||
logger.trace("Sending a GossipShutdown to {}", id);
|
||||
ms().send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {
|
||||
|
||||
17
lua.cc
17
lua.cc
@@ -888,10 +888,13 @@ db_clock::time_point timestamp_return_visitor::operator()(const lua_table&) {
|
||||
}
|
||||
|
||||
static data_value convert_from_lua(lua_slice_state &l, const data_type& type) {
|
||||
if (lua_isnil(l, -1)) {
|
||||
return data_value::make_null(type);
|
||||
}
|
||||
return ::visit(*type, from_lua_visitor{l});
|
||||
}
|
||||
|
||||
static bytes convert_return(lua_slice_state &l, const data_type& return_type) {
|
||||
static bytes_opt convert_return(lua_slice_state &l, const data_type& return_type) {
|
||||
int num_return_vals = lua_gettop(l);
|
||||
if (num_return_vals != 1) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
@@ -901,7 +904,11 @@ static bytes convert_return(lua_slice_state &l, const data_type& return_type) {
|
||||
// FIXME: It should be possible to avoid creating the data_value,
|
||||
// or even better, change the function::execute interface to
|
||||
// return a data_value instead of bytes_opt.
|
||||
return convert_from_lua(l, return_type).serialize();
|
||||
data_value ret = convert_from_lua(l, return_type);
|
||||
if (ret.is_null()) {
|
||||
return {};
|
||||
}
|
||||
return ret.serialize();
|
||||
}
|
||||
|
||||
static void push_sstring(lua_slice_state& l, const sstring& v) {
|
||||
@@ -1065,7 +1072,7 @@ lua::runtime_config lua::make_runtime_config(const db::config& config) {
|
||||
}
|
||||
|
||||
// run the script for at most max_instructions
|
||||
future<bytes> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_value>& values, data_type return_type, const lua::runtime_config& cfg) {
|
||||
future<bytes_opt> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_value>& values, data_type return_type, const lua::runtime_config& cfg) {
|
||||
lua_slice_state l = load_script(cfg, bitcode);
|
||||
unsigned nargs = values.size();
|
||||
if (!lua_checkstack(l, nargs)) {
|
||||
@@ -1088,7 +1095,7 @@ future<bytes> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_
|
||||
auto start = ::now();
|
||||
switch (lua_resume(l, nullptr, nargs)) {
|
||||
case LUA_OK:
|
||||
return make_ready_future<bytes_opt>(convert_return(l, return_type));
|
||||
return make_ready_future<std::optional<bytes_opt>>(convert_return(l, return_type));
|
||||
case LUA_YIELD: {
|
||||
nargs = 0;
|
||||
elapsed += ::now() - start;
|
||||
@@ -1096,7 +1103,7 @@ future<bytes> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_
|
||||
millisecond ms = elapsed;
|
||||
throw exceptions::invalid_request_exception(format("lua execution timeout: {}ms elapsed", ms.count()));
|
||||
}
|
||||
return make_ready_future<bytes_opt>(bytes_opt());
|
||||
return make_ready_future<std::optional<bytes_opt>>(std::nullopt);
|
||||
}
|
||||
default:
|
||||
throw exceptions::invalid_request_exception(std::string("lua execution failed: ") +
|
||||
|
||||
4
lua.hh
4
lua.hh
@@ -41,6 +41,6 @@ struct runtime_config {
|
||||
runtime_config make_runtime_config(const db::config& config);
|
||||
|
||||
sstring compile(const runtime_config& cfg, const std::vector<sstring>& arg_names, sstring script);
|
||||
seastar::future<bytes> run_script(bitcode_view bitcode, const std::vector<data_value>& values,
|
||||
data_type return_type, const runtime_config& cfg);
|
||||
seastar::future<bytes_opt> run_script(bitcode_view bitcode, const std::vector<data_value>& values,
|
||||
data_type return_type, const runtime_config& cfg);
|
||||
}
|
||||
|
||||
6
main.cc
6
main.cc
@@ -946,6 +946,12 @@ int main(int ac, char** av) {
|
||||
ss.init_messaging_service_part().get();
|
||||
api::set_server_messaging_service(ctx).get();
|
||||
api::set_server_storage_service(ctx).get();
|
||||
|
||||
gossiper.local().register_(ss.shared_from_this());
|
||||
auto stop_listening = defer_verbose_shutdown("storage service notifications", [&gossiper, &ss] {
|
||||
gossiper.local().unregister_(ss.shared_from_this());
|
||||
});
|
||||
|
||||
ss.init_server_without_the_messaging_service_part().get();
|
||||
supervisor::notify("starting batchlog manager");
|
||||
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
|
||||
|
||||
@@ -39,6 +39,9 @@
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "types/map.hh"
|
||||
#include "compaction_garbage_collector.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
|
||||
logging::logger mplog("mutation_partition");
|
||||
|
||||
template<bool reversed>
|
||||
struct reversal_traits;
|
||||
@@ -1238,7 +1241,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
|
||||
void
|
||||
row::append_cell(column_id id, atomic_cell_or_collection value) {
|
||||
if (_type == storage_type::vector && id < max_vector_size) {
|
||||
assert(_storage.vector.v.size() <= id);
|
||||
if (_storage.vector.v.size() > id) {
|
||||
on_internal_error(mplog, format("Attempted to append cell#{} to row already having {} cells", id, _storage.vector.v.size()));
|
||||
}
|
||||
_storage.vector.v.resize(id);
|
||||
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
|
||||
_storage.vector.present.set(id);
|
||||
|
||||
@@ -58,7 +58,8 @@ EOS
|
||||
# For systems with not a lot of memory, override default reservations for the slices
|
||||
# seastar has a minimum reservation of 1.5GB that kicks in, and 21GB * 0.07 = 1.5GB.
|
||||
# So for anything smaller than that we will not use percentages in the helper slice
|
||||
MEMTOTAL_BYTES=$(cat /proc/meminfo | grep MemTotal | awk '{print $2 * 1024}')
|
||||
MEMTOTAL=$(cat /proc/meminfo |grep -e "^MemTotal:"|sed -s 's/^MemTotal:\s*\([0-9]*\) kB$/\1/')
|
||||
MEMTOTAL_BYTES=$(($MEMTOTAL * 1024))
|
||||
if [ $MEMTOTAL_BYTES -lt 23008753371 ]; then
|
||||
mkdir -p /etc/systemd/system/scylla-helper.slice.d/
|
||||
cat << EOS > /etc/systemd/system/scylla-helper.slice.d/memory.conf
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 3f3e117de3...f54084c08f
@@ -75,25 +75,22 @@ public:
|
||||
class client_state_for_another_shard {
|
||||
private:
|
||||
const client_state* _cs;
|
||||
tracing::global_trace_state_ptr _trace_state;
|
||||
seastar::sharded<auth::service>* _auth_service;
|
||||
client_state_for_another_shard(const client_state* cs, tracing::global_trace_state_ptr gt,
|
||||
seastar::sharded<auth::service>* auth_service) : _cs(cs), _trace_state(gt), _auth_service(auth_service) {}
|
||||
client_state_for_another_shard(const client_state* cs, seastar::sharded<auth::service>* auth_service) : _cs(cs), _auth_service(auth_service) {}
|
||||
friend client_state;
|
||||
public:
|
||||
client_state get() const {
|
||||
return client_state(_cs, _trace_state, _auth_service);
|
||||
return client_state(_cs, _auth_service);
|
||||
}
|
||||
};
|
||||
private:
|
||||
client_state(const client_state* cs, tracing::global_trace_state_ptr gt, seastar::sharded<auth::service>* auth_service)
|
||||
: _keyspace(cs->_keyspace), _trace_state_ptr(gt), _user(cs->_user), _auth_state(cs->_auth_state),
|
||||
client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service)
|
||||
: _keyspace(cs->_keyspace), _user(cs->_user), _auth_state(cs->_auth_state),
|
||||
_is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address),
|
||||
_auth_service(auth_service ? &auth_service->local() : nullptr) {}
|
||||
friend client_state_for_another_shard;
|
||||
private:
|
||||
sstring _keyspace;
|
||||
tracing::trace_state_ptr _trace_state_ptr;
|
||||
#if 0
|
||||
private static final Logger logger = LoggerFactory.getLogger(ClientState.class);
|
||||
public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION;
|
||||
@@ -138,18 +135,6 @@ public:
|
||||
struct internal_tag {};
|
||||
struct external_tag {};
|
||||
|
||||
void create_tracing_session(tracing::trace_type type, tracing::trace_state_props_set props) {
|
||||
_trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(type, props);
|
||||
}
|
||||
|
||||
tracing::trace_state_ptr& get_trace_state() {
|
||||
return _trace_state_ptr;
|
||||
}
|
||||
|
||||
const tracing::trace_state_ptr& get_trace_state() const {
|
||||
return _trace_state_ptr;
|
||||
}
|
||||
|
||||
auth_state get_auth_state() const noexcept {
|
||||
return _auth_state;
|
||||
}
|
||||
@@ -344,7 +329,7 @@ public:
|
||||
}
|
||||
|
||||
client_state_for_another_shard move_to_other_shard() {
|
||||
return client_state_for_another_shard(this, _trace_state_ptr, _auth_service ? &_auth_service->container() : nullptr);
|
||||
return client_state_for_another_shard(this, _auth_service ? &_auth_service->container() : nullptr);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
@@ -42,8 +42,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
|
||||
class keyspace_metadata;
|
||||
class view_ptr;
|
||||
@@ -129,14 +131,24 @@ public:
|
||||
|
||||
class migration_notifier {
|
||||
private:
|
||||
std::vector<migration_listener*> _listeners;
|
||||
class listener_vector {
|
||||
std::vector<migration_listener*> _vec;
|
||||
rwlock _vec_lock;
|
||||
|
||||
public:
|
||||
void add(migration_listener* listener);
|
||||
future<> remove(migration_listener* listener);
|
||||
// This must be called on a thread.
|
||||
void for_each(noncopyable_function<void(migration_listener*)> func);
|
||||
};
|
||||
listener_vector _listeners;
|
||||
|
||||
public:
|
||||
/// Register a migration listener on current shard.
|
||||
void register_listener(migration_listener* listener);
|
||||
|
||||
/// Unregister a migration listener on current shard.
|
||||
void unregister_listener(migration_listener* listener);
|
||||
future<> unregister_listener(migration_listener* listener);
|
||||
|
||||
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
future<> create_column_family(const schema_ptr& cfm);
|
||||
|
||||
@@ -70,6 +70,11 @@ migration_manager::migration_manager(migration_notifier& notifier) : _notifier(n
|
||||
future<> migration_manager::stop()
|
||||
{
|
||||
mlogger.info("stopping migration service");
|
||||
_as.request_abort();
|
||||
if (!_cluster_upgraded) {
|
||||
_wait_cluster_upgraded.broken();
|
||||
}
|
||||
|
||||
return uninit_messaging_service().then([this] {
|
||||
return parallel_for_each(_schema_pulls.begin(), _schema_pulls.end(), [] (auto&& e) {
|
||||
serialized_action& sp = e.second;
|
||||
@@ -97,6 +102,10 @@ void migration_manager::init_messaging_service()
|
||||
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_cdc().when_enabled(update_schema));
|
||||
}
|
||||
_feature_listeners.push_back(ss.cluster_supports_schema_tables_v3().when_enabled([this] {
|
||||
_cluster_upgraded = true;
|
||||
_wait_cluster_upgraded.broadcast();
|
||||
}));
|
||||
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
|
||||
@@ -161,14 +170,37 @@ future<> migration_manager::uninit_messaging_service()
|
||||
);
|
||||
}
|
||||
|
||||
void migration_notifier::register_listener(migration_listener* listener)
|
||||
{
|
||||
_listeners.emplace_back(listener);
|
||||
void migration_notifier::listener_vector::add(migration_listener* listener) {
|
||||
_vec.push_back(listener);
|
||||
}
|
||||
|
||||
void migration_notifier::unregister_listener(migration_listener* listener)
|
||||
future<> migration_notifier::listener_vector::remove(migration_listener* listener) {
|
||||
return with_lock(_vec_lock.for_write(), [this, listener] {
|
||||
_vec.erase(std::remove(_vec.begin(), _vec.end(), listener), _vec.end());
|
||||
});
|
||||
}
|
||||
|
||||
void migration_notifier::listener_vector::for_each(noncopyable_function<void(migration_listener*)> func) {
|
||||
_vec_lock.for_read().lock().get();
|
||||
auto unlock = defer([this] {
|
||||
_vec_lock.for_read().unlock();
|
||||
});
|
||||
// We grab a lock in remove(), but not in add(), so we
|
||||
// iterate using indexes to guard agaist the vector being
|
||||
// reallocated.
|
||||
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
|
||||
func(_vec[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void migration_notifier::register_listener(migration_listener* listener)
|
||||
{
|
||||
_listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end());
|
||||
_listeners.add(listener);
|
||||
}
|
||||
|
||||
future<> migration_notifier::unregister_listener(migration_listener* listener)
|
||||
{
|
||||
return _listeners.remove(listener);
|
||||
}
|
||||
|
||||
future<> migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state)
|
||||
@@ -220,7 +252,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
|
||||
{
|
||||
auto& proxy = get_local_storage_proxy();
|
||||
auto& db = proxy.get_db().local();
|
||||
auto& ss = get_storage_service().local();
|
||||
|
||||
if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) {
|
||||
mlogger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
|
||||
return make_ready_future<>();
|
||||
@@ -228,21 +260,23 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
|
||||
|
||||
// Disable pulls during rolling upgrade from 1.7 to 2.0 to avoid
|
||||
// schema version inconsistency. See https://github.com/scylladb/scylla/issues/2802.
|
||||
if (!ss.cluster_supports_schema_tables_v3()) {
|
||||
if (!_cluster_upgraded) {
|
||||
mlogger.debug("Delaying pull with {} until cluster upgrade is complete", endpoint);
|
||||
return ss.cluster_supports_schema_tables_v3().when_enabled().then([this, their_version, endpoint] {
|
||||
return _wait_cluster_upgraded.wait().then([this, their_version, endpoint] {
|
||||
return maybe_schedule_schema_pull(their_version, endpoint);
|
||||
});
|
||||
}).finally([me = shared_from_this()] {});
|
||||
}
|
||||
|
||||
if (db.get_version() == database::empty_version || runtime::get_uptime() < migration_delay) {
|
||||
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
|
||||
mlogger.debug("Submitting migration task for {}", endpoint);
|
||||
return submit_migration_task(endpoint);
|
||||
} else {
|
||||
}
|
||||
|
||||
return with_gate(_background_tasks, [this, &db, endpoint] {
|
||||
// Include a delay to make sure we have a chance to apply any changes being
|
||||
// pushed out simultaneously. See CASSANDRA-5025
|
||||
return sleep(migration_delay).then([this, &proxy, endpoint] {
|
||||
return sleep_abortable(migration_delay, _as).then([this, &db, endpoint] {
|
||||
// grab the latest version of the schema since it may have changed again since the initial scheduling
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
auto* ep_state = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
@@ -256,7 +290,6 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
|
||||
return make_ready_future<>();
|
||||
}
|
||||
utils::UUID current_version{value->value};
|
||||
auto& db = proxy.get_db().local();
|
||||
if (db.get_version() == current_version) {
|
||||
mlogger.debug("not submitting migration task for {} because our versions match", endpoint);
|
||||
return make_ready_future<>();
|
||||
@@ -264,7 +297,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
|
||||
mlogger.debug("submitting migration task for {}", endpoint);
|
||||
return submit_migration_task(endpoint);
|
||||
});
|
||||
}
|
||||
}).finally([me = shared_from_this()] {});
|
||||
}
|
||||
|
||||
future<> migration_manager::submit_migration_task(const gms::inet_address& endpoint)
|
||||
@@ -357,56 +390,56 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
|
||||
|
||||
future<> migration_notifier::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
return seastar::async([this, ksm] {
|
||||
auto&& name = ksm->name();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& name = ksm->name();
|
||||
_listeners.for_each([&name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_create_keyspace(name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Create keyspace notification failed {}: {}", name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_notifier::create_column_family(const schema_ptr& cfm) {
|
||||
return seastar::async([this, cfm] {
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& ks_name = cfm->ks_name();
|
||||
const auto& cf_name = cfm->cf_name();
|
||||
_listeners.for_each([&ks_name, &cf_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_create_column_family(ks_name, cf_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Create column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_notifier::create_user_type(const user_type& type) {
|
||||
return seastar::async([this, type] {
|
||||
auto&& ks_name = type->_keyspace;
|
||||
auto&& type_name = type->get_name_as_string();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& ks_name = type->_keyspace;
|
||||
const auto& type_name = type->get_name_as_string();
|
||||
_listeners.for_each([&ks_name, &type_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_create_user_type(ks_name, type_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Create user type notification failed {}.{}: {}", ks_name, type_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_notifier::create_view(const view_ptr& view) {
|
||||
return seastar::async([this, view] {
|
||||
auto&& ks_name = view->ks_name();
|
||||
auto&& view_name = view->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& ks_name = view->ks_name();
|
||||
const auto& view_name = view->cf_name();
|
||||
_listeners.for_each([&ks_name, &view_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_create_view(ks_name, view_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Create view notification failed {}.{}: {}", ks_name, view_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -426,56 +459,56 @@ public void notifyCreateAggregate(UDAggregate udf)
|
||||
|
||||
future<> migration_notifier::update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
return seastar::async([this, ksm] {
|
||||
auto&& name = ksm->name();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& name = ksm->name();
|
||||
_listeners.for_each([&name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_update_keyspace(name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Update keyspace notification failed {}: {}", name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_notifier::update_column_family(const schema_ptr& cfm, bool columns_changed) {
|
||||
return seastar::async([this, cfm, columns_changed] {
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& ks_name = cfm->ks_name();
|
||||
const auto& cf_name = cfm->cf_name();
|
||||
_listeners.for_each([&ks_name, &cf_name, columns_changed] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_update_column_family(ks_name, cf_name, columns_changed);
|
||||
} catch (...) {
|
||||
mlogger.warn("Update column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_notifier::update_user_type(const user_type& type) {
|
||||
return seastar::async([this, type] {
|
||||
auto&& ks_name = type->_keyspace;
|
||||
auto&& type_name = type->get_name_as_string();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& ks_name = type->_keyspace;
|
||||
const auto& type_name = type->get_name_as_string();
|
||||
_listeners.for_each([&ks_name, &type_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_update_user_type(ks_name, type_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Update user type notification failed {}.{}: {}", ks_name, type_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_notifier::update_view(const view_ptr& view, bool columns_changed) {
|
||||
return seastar::async([this, view, columns_changed] {
|
||||
auto&& ks_name = view->ks_name();
|
||||
auto&& view_name = view->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& ks_name = view->ks_name();
|
||||
const auto& view_name = view->cf_name();
|
||||
_listeners.for_each([&ks_name, &view_name, columns_changed] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_update_view(ks_name, view_name, columns_changed);
|
||||
} catch (...) {
|
||||
mlogger.warn("Update view notification failed {}.{}: {}", ks_name, view_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -495,27 +528,27 @@ public void notifyUpdateAggregate(UDAggregate udf)
|
||||
|
||||
future<> migration_notifier::drop_keyspace(const sstring& ks_name) {
|
||||
return seastar::async([this, ks_name] {
|
||||
for (auto&& listener : _listeners) {
|
||||
_listeners.for_each([&ks_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_drop_keyspace(ks_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Drop keyspace notification failed {}: {}", ks_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_notifier::drop_column_family(const schema_ptr& cfm) {
|
||||
return seastar::async([this, cfm] {
|
||||
auto&& cf_name = cfm->cf_name();
|
||||
auto&& ks_name = cfm->ks_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
const auto& cf_name = cfm->cf_name();
|
||||
const auto& ks_name = cfm->ks_name();
|
||||
_listeners.for_each([&ks_name, &cf_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_drop_column_family(ks_name, cf_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Drop column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -523,13 +556,13 @@ future<> migration_notifier::drop_user_type(const user_type& type) {
|
||||
return seastar::async([this, type] {
|
||||
auto&& ks_name = type->_keyspace;
|
||||
auto&& type_name = type->get_name_as_string();
|
||||
for (auto&& listener : _listeners) {
|
||||
_listeners.for_each([&ks_name, &type_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_drop_user_type(ks_name, type_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Drop user type notification failed {}.{}: {}", ks_name, type_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -537,38 +570,38 @@ future<> migration_notifier::drop_view(const view_ptr& view) {
|
||||
return seastar::async([this, view] {
|
||||
auto&& ks_name = view->ks_name();
|
||||
auto&& view_name = view->cf_name();
|
||||
for (auto&& listener : _listeners) {
|
||||
_listeners.for_each([&ks_name, &view_name] (migration_listener* listener) {
|
||||
try {
|
||||
listener->on_drop_view(ks_name, view_name);
|
||||
} catch (...) {
|
||||
mlogger.warn("Drop view notification failed {}.{}: {}", ks_name, view_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void migration_notifier::before_create_column_family(const schema& schema,
|
||||
std::vector<mutation>& mutations, api::timestamp_type timestamp) {
|
||||
for (auto&& listener : _listeners) {
|
||||
_listeners.for_each([&mutations, &schema, timestamp] (migration_listener* listener) {
|
||||
// allow exceptions. so a listener can effectively kill a create-table
|
||||
listener->on_before_create_column_family(schema, mutations, timestamp);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_notifier::before_update_column_family(const schema& new_schema,
|
||||
const schema& old_schema, std::vector<mutation>& mutations, api::timestamp_type ts) {
|
||||
for (auto&& listener : _listeners) {
|
||||
_listeners.for_each([&mutations, &new_schema, &old_schema, ts] (migration_listener* listener) {
|
||||
// allow exceptions. so a listener can effectively kill an update-column
|
||||
listener->on_before_update_column_family(new_schema, old_schema, mutations, ts);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void migration_notifier::before_drop_column_family(const schema& schema,
|
||||
std::vector<mutation>& mutations, api::timestamp_type ts) {
|
||||
for (auto&& listener : _listeners) {
|
||||
_listeners.for_each([&mutations, &schema, ts] (migration_listener* listener) {
|
||||
// allow exceptions. so a listener can effectively kill a drop-column
|
||||
listener->on_before_drop_column_family(schema, mutations, ts);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -62,6 +62,9 @@ private:
|
||||
std::vector<gms::feature::listener_registration> _feature_listeners;
|
||||
seastar::gate _background_tasks;
|
||||
static const std::chrono::milliseconds migration_delay;
|
||||
seastar::abort_source _as;
|
||||
bool _cluster_upgraded = false;
|
||||
seastar::condition_variable _wait_cluster_upgraded;
|
||||
public:
|
||||
explicit migration_manager(migration_notifier&);
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ private:
|
||||
public:
|
||||
query_state(client_state& client_state, service_permit permit)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(_client_state.get_trace_state())
|
||||
, _trace_state_ptr(tracing::trace_state_ptr())
|
||||
, _permit(std::move(permit))
|
||||
{ }
|
||||
|
||||
|
||||
@@ -2319,6 +2319,14 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
|
||||
allow_hints::no);
|
||||
}
|
||||
|
||||
future<> storage_proxy::mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s) {
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
const auto timeout = db::timeout_clock::now() + 1h;
|
||||
std::array<mutation, 1> ms{fm_a_s.fm.unfreeze(fm_a_s.s)};
|
||||
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, empty_service_permit(), timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
|
||||
* is not available.
|
||||
|
||||
@@ -471,6 +471,8 @@ public:
|
||||
*/
|
||||
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
|
||||
future<> mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s);
|
||||
|
||||
// Send a mutation to one specific remote target.
|
||||
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
|
||||
// hinted handoff support, and just one target. See also
|
||||
|
||||
@@ -555,7 +555,6 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
|
||||
}
|
||||
slogger.info("Starting up server gossip");
|
||||
|
||||
_gossiper.register_(this->shared_from_this());
|
||||
auto generation_number = db::system_keyspace::increment_and_get_generation().get0();
|
||||
_gossiper.start_gossiping(generation_number, app_states, gms::bind_messaging_port(bool(do_bind))).get();
|
||||
|
||||
@@ -1431,7 +1430,8 @@ future<> storage_service::drain_on_shutdown() {
|
||||
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
|
||||
slogger.info("Drain on shutdown: system distributed keyspace stopped");
|
||||
|
||||
get_storage_proxy().invoke_on_all([&ss] (storage_proxy& local_proxy) mutable {
|
||||
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.unregister_subscriber(&local_proxy);
|
||||
return local_proxy.drain_on_shutdown();
|
||||
}).get();
|
||||
@@ -1445,7 +1445,7 @@ future<> storage_service::drain_on_shutdown() {
|
||||
}).get();
|
||||
slogger.info("Drain on shutdown: shutdown commitlog done");
|
||||
|
||||
ss._mnotifier.local().unregister_listener(&ss);
|
||||
ss._mnotifier.local().unregister_listener(&ss).get();
|
||||
|
||||
slogger.info("Drain on shutdown: done");
|
||||
});
|
||||
|
||||
@@ -4702,3 +4702,45 @@ SEASTAR_TEST_CASE(test_describe_view_schema) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Test that tombstones with future timestamps work correctly
|
||||
// when a write with lower timestamp arrives - in such case,
|
||||
// if the base row is covered by such a tombstone, a view update
|
||||
// needs to take it into account. Refs #5793
|
||||
SEASTAR_TEST_CASE(test_views_with_future_tombstones) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY (a,b,c));");
|
||||
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT * FROM t"
|
||||
" WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (b,a,c);");
|
||||
|
||||
// Partition tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 10 where a=1;");
|
||||
auto msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (1,2,3,4,5) using timestamp 8;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
// Range tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 16 where a=2 and b > 1 and b < 4;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (2,3,4,5,6) using timestamp 12;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
// Row tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 24 where a=3 and b=4 and c=5;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (3,4,5,6,7) using timestamp 18;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -415,6 +415,8 @@ BOOST_AUTO_TEST_CASE(test_varint) {
|
||||
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00deadbeef"))), boost::multiprecision::cpp_int("0xdeadbeef"));
|
||||
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00ffffffffffffffffffffffffffffffff"))), boost::multiprecision::cpp_int("340282366920938463463374607431768211455"));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(from_hex("80000000"), varint_type->decompose(boost::multiprecision::cpp_int(-2147483648)));
|
||||
|
||||
test_parsing_fails(varint_type, "1A");
|
||||
}
|
||||
|
||||
|
||||
@@ -79,6 +79,20 @@ SEASTAR_TEST_CASE(test_user_function_out_of_memory) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_user_function_use_null) {
|
||||
return with_udf_enabled([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE my_table (key text PRIMARY KEY, val int);").get();
|
||||
e.execute_cql("INSERT INTO my_table (key, val) VALUES ('foo', null);").get();
|
||||
e.execute_cql("CREATE FUNCTION my_func1(val int) CALLED ON NULL INPUT RETURNS int LANGUAGE Lua AS 'return val + 1';").get();
|
||||
e.execute_cql("CREATE FUNCTION my_func2(val int) CALLED ON NULL INPUT RETURNS int LANGUAGE Lua AS 'return val';").get();
|
||||
BOOST_REQUIRE_EXCEPTION(e.execute_cql("SELECT my_func1(val) FROM my_table;").get0(), ire, message_equals("lua execution failed: ?:-1: attempt to perform arithmetic on a nil value"));
|
||||
auto res = e.execute_cql("SELECT my_func2(val) FROM my_table;").get0();
|
||||
assert_that(res).is_rows().with_rows({{std::nullopt}});
|
||||
res = e.execute_cql("SELECT val FROM my_table;").get0();
|
||||
assert_that(res).is_rows().with_rows({{std::nullopt}});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_user_function_wrong_return_type) {
|
||||
return with_udf_enabled([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE my_table (key text PRIMARY KEY, val int);").get();
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
inline
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 17) {
|
||||
size_t attempts = 0;
|
||||
while (true) {
|
||||
try {
|
||||
@@ -43,7 +43,7 @@ void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
|
||||
inline
|
||||
bool eventually_true(noncopyable_function<bool ()> f) {
|
||||
const unsigned max_attempts = 10;
|
||||
const unsigned max_attempts = 15;
|
||||
unsigned attempts = 0;
|
||||
while (true) {
|
||||
if (f()) {
|
||||
|
||||
@@ -133,6 +133,7 @@ fi
|
||||
tmpdir=$(mktemp -d)
|
||||
|
||||
docker_common_args+=(
|
||||
--pids-limit -1 \
|
||||
--network host \
|
||||
--cap-add SYS_PTRACE \
|
||||
-v "$PWD:$PWD:z" \
|
||||
|
||||
@@ -38,7 +38,13 @@ cql_server::event_notifier::event_notifier(service::migration_notifier& mn) : _m
|
||||
cql_server::event_notifier::~event_notifier()
|
||||
{
|
||||
service::get_local_storage_service().unregister_subscriber(this);
|
||||
_mnotifier.unregister_listener(this);
|
||||
assert(_stopped);
|
||||
}
|
||||
|
||||
future<> cql_server::event_notifier::stop() {
|
||||
return _mnotifier.unregister_listener(this).then([this]{
|
||||
_stopped = true;
|
||||
});
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::register_event(event::event_type et, cql_server::connection* conn)
|
||||
|
||||
@@ -197,6 +197,8 @@ future<> cql_server::stop() {
|
||||
return c.shutdown().then([nr_conn, nr_conn_total] {
|
||||
clogger.debug("cql_server: shutdown connection {} out of {} done", ++(*nr_conn), nr_conn_total);
|
||||
});
|
||||
}).then([this] {
|
||||
return _notifier->stop();
|
||||
}).then([this] {
|
||||
return std::move(_stopped);
|
||||
});
|
||||
@@ -372,6 +374,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
|
||||
trace_props.set_if<tracing::trace_state_props::log_slow_query>(tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled());
|
||||
trace_props.set_if<tracing::trace_state_props::full_tracing>(tracing_request != tracing_request_type::not_requested);
|
||||
tracing::trace_state_ptr trace_state;
|
||||
|
||||
if (trace_props) {
|
||||
if (cqlop == cql_binary_opcode::QUERY ||
|
||||
@@ -379,15 +382,15 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
cqlop == cql_binary_opcode::EXECUTE ||
|
||||
cqlop == cql_binary_opcode::BATCH) {
|
||||
trace_props.set_if<tracing::trace_state_props::write_on_close>(tracing_request == tracing_request_type::write_on_close);
|
||||
client_state.create_tracing_session(tracing::trace_type::QUERY, trace_props);
|
||||
trace_state = tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, trace_props);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::set_request_size(client_state.get_trace_state(), fbuf.bytes_left());
|
||||
tracing::set_request_size(trace_state, fbuf.bytes_left());
|
||||
|
||||
auto linearization_buffer = std::make_unique<bytes_ostream>();
|
||||
auto linearization_buffer_ptr = linearization_buffer.get();
|
||||
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit)] () mutable {
|
||||
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable {
|
||||
// When using authentication, we need to ensure we are doing proper state transitions,
|
||||
// i.e. we cannot simply accept any query/exec ops unless auth is complete
|
||||
switch (client_state.get_auth_state()) {
|
||||
@@ -409,7 +412,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
break;
|
||||
}
|
||||
|
||||
tracing::set_username(client_state.get_trace_state(), client_state.user());
|
||||
tracing::set_username(trace_state, client_state.user());
|
||||
|
||||
auto wrap_in_foreign = [] (future<std::unique_ptr<cql_server::response>> f) {
|
||||
return f.then([] (std::unique_ptr<cql_server::response> p) {
|
||||
@@ -418,19 +421,19 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
};
|
||||
auto in = request_reader(std::move(fbuf), *linearization_buffer_ptr);
|
||||
switch (cqlop) {
|
||||
case cql_binary_opcode::STARTUP: return wrap_in_foreign(process_startup(stream, std::move(in), client_state));
|
||||
case cql_binary_opcode::AUTH_RESPONSE: return wrap_in_foreign(process_auth_response(stream, std::move(in), client_state));
|
||||
case cql_binary_opcode::OPTIONS: return wrap_in_foreign(process_options(stream, std::move(in), client_state));
|
||||
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit));
|
||||
case cql_binary_opcode::PREPARE: return wrap_in_foreign(process_prepare(stream, std::move(in), client_state));
|
||||
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit));
|
||||
case cql_binary_opcode::BATCH: return wrap_in_foreign(process_batch(stream, std::move(in), client_state, std::move(permit)));
|
||||
case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state));
|
||||
case cql_binary_opcode::STARTUP: return wrap_in_foreign(process_startup(stream, std::move(in), client_state, trace_state));
|
||||
case cql_binary_opcode::AUTH_RESPONSE: return wrap_in_foreign(process_auth_response(stream, std::move(in), client_state, trace_state));
|
||||
case cql_binary_opcode::OPTIONS: return wrap_in_foreign(process_options(stream, std::move(in), client_state, trace_state));
|
||||
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit), trace_state);
|
||||
case cql_binary_opcode::PREPARE: return wrap_in_foreign(process_prepare(stream, std::move(in), client_state, trace_state));
|
||||
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit), trace_state);
|
||||
case cql_binary_opcode::BATCH: return wrap_in_foreign(process_batch(stream, std::move(in), client_state, std::move(permit), trace_state));
|
||||
case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state, trace_state));
|
||||
default: throw exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop)));
|
||||
}
|
||||
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer)] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> f) -> foreign_ptr<std::unique_ptr<cql_server::response>> {
|
||||
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> f) -> foreign_ptr<std::unique_ptr<cql_server::response>> {
|
||||
auto stop_trace = defer([&] {
|
||||
tracing::stop_foreground(client_state.get_trace_state());
|
||||
tracing::stop_foreground(trace_state);
|
||||
});
|
||||
--_server._requests_serving;
|
||||
try {
|
||||
@@ -461,28 +464,28 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
break;
|
||||
}
|
||||
|
||||
tracing::set_response_size(client_state.get_trace_state(), response->size());
|
||||
tracing::set_response_size(trace_state, response->size());
|
||||
return response;
|
||||
} catch (const exceptions::unavailable_exception& ex) {
|
||||
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, client_state.get_trace_state());
|
||||
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, trace_state);
|
||||
} catch (const exceptions::read_timeout_exception& ex) {
|
||||
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, client_state.get_trace_state());
|
||||
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, trace_state);
|
||||
} catch (const exceptions::read_failure_exception& ex) {
|
||||
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, client_state.get_trace_state());
|
||||
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, trace_state);
|
||||
} catch (const exceptions::mutation_write_timeout_exception& ex) {
|
||||
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, client_state.get_trace_state());
|
||||
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, trace_state);
|
||||
} catch (const exceptions::mutation_write_failure_exception& ex) {
|
||||
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, client_state.get_trace_state());
|
||||
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, trace_state);
|
||||
} catch (const exceptions::already_exists_exception& ex) {
|
||||
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, client_state.get_trace_state());
|
||||
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, trace_state);
|
||||
} catch (const exceptions::prepared_query_not_found_exception& ex) {
|
||||
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, client_state.get_trace_state());
|
||||
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
|
||||
} catch (const exceptions::cassandra_exception& ex) {
|
||||
return make_error(stream, ex.code(), ex.what(), client_state.get_trace_state());
|
||||
return make_error(stream, ex.code(), ex.what(), trace_state);
|
||||
} catch (std::exception& ex) {
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), client_state.get_trace_state());
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), trace_state);
|
||||
} catch (...) {
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", client_state.get_trace_state());
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -695,8 +698,8 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
|
||||
return _buffer_reader.read_exactly(_read_buf, length);
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto options = in.read_string_map();
|
||||
auto compression_opt = options.find("COMPRESSION");
|
||||
if (compression_opt != options.end()) {
|
||||
@@ -712,33 +715,31 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
|
||||
}
|
||||
auto& a = client_state.get_auth_service()->underlying_authenticator();
|
||||
if (a.require_authentication()) {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), client_state.get_trace_state()));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), trace_state));
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, trace_state));
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
|
||||
auto buf = in.read_raw_bytes_view(in.bytes_left());
|
||||
auto challenge = sasl_challenge->evaluate_response(buf);
|
||||
if (sasl_challenge->is_complete()) {
|
||||
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge)](auth::authenticated_user user) mutable {
|
||||
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable {
|
||||
client_state.set_login(std::move(user));
|
||||
auto f = client_state.check_user_can_login();
|
||||
return f.then([this, stream, &client_state, challenge = std::move(challenge)]() mutable {
|
||||
auto tr_state = client_state.get_trace_state();
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), tr_state));
|
||||
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
|
||||
});
|
||||
});
|
||||
}
|
||||
auto tr_state = client_state.get_trace_state();
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), tr_state));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), trace_state));
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, client_state.get_trace_state()));
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, std::move(trace_state)));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -753,22 +754,23 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, bool init_trace) {
|
||||
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state,
|
||||
bool init_trace) {
|
||||
auto query = in.read_long_string_view();
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
q_state->options = in.read_options(version, serialization_format, timeout_config, cql_config);
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
|
||||
if (init_trace) {
|
||||
tracing::set_page_size(query_state.get_trace_state(), options.get_page_size());
|
||||
tracing::set_consistency_level(query_state.get_trace_state(), options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(query_state.get_trace_state(), options.get_serial_consistency());
|
||||
tracing::add_query(query_state.get_trace_state(), query);
|
||||
tracing::set_user_timestamp(query_state.get_trace_state(), options.get_specific_options().timestamp);
|
||||
tracing::set_page_size(trace_state, options.get_page_size());
|
||||
tracing::set_consistency_level(trace_state, options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
|
||||
tracing::add_query(trace_state, query);
|
||||
tracing::set_user_timestamp(trace_state, options.get_specific_options().timestamp);
|
||||
|
||||
tracing::begin(query_state.get_trace_state(), "Execute CQL3 query", client_state.get_client_address());
|
||||
tracing::begin(trace_state, "Execute CQL3 query", client_state.get_client_address());
|
||||
}
|
||||
|
||||
return qp.local().process(query, query_state, options).then([q_state = std::move(q_state), stream, skip_metadata, version] (auto msg) {
|
||||
@@ -783,16 +785,18 @@ process_query_internal(service::client_state& client_state, distributed<cql3::qu
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
cql_server::connection::process_query_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
|
||||
service::client_state& cs, service_permit permit) {
|
||||
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state) {
|
||||
return smp::submit_to(shard, _server._config.bounce_request_smp_service_group,
|
||||
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit)] () {
|
||||
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit),
|
||||
gt = tracing::global_trace_state_ptr(std::move(trace_state))] () {
|
||||
service::client_state client_state = cs.get();
|
||||
cql_server& server = s.get().local();
|
||||
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream]
|
||||
tracing::trace_state_ptr trace_state = gt;
|
||||
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream, trace_state]
|
||||
(bytes_ostream& linearization_buffer, service::client_state& client_state) {
|
||||
request_reader in(is, linearization_buffer);
|
||||
return process_query_internal(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
|
||||
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), false).then([] (auto msg) {
|
||||
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), trace_state, false).then([] (auto msg) {
|
||||
// result here has to be foreign ptr
|
||||
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
|
||||
});
|
||||
@@ -801,27 +805,27 @@ cql_server::connection::process_query_on_shard(unsigned shard, uint16_t stream,
|
||||
}
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
|
||||
cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
|
||||
{
|
||||
fragmented_temporary_buffer::istream is = in.get_stream();
|
||||
|
||||
return process_query_internal(client_state, _server._query_processor, in, stream,
|
||||
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, true)
|
||||
.then([stream, &client_state, this, is, permit] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
|
||||
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, trace_state, true)
|
||||
.then([stream, &client_state, this, is, permit, trace_state] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
|
||||
unsigned* shard = std::get_if<unsigned>(&msg);
|
||||
if (shard) {
|
||||
return process_query_on_shard(*shard, stream, is, client_state, std::move(permit));
|
||||
return process_query_on_shard(*shard, stream, is, client_state, std::move(permit), std::move(trace_state));
|
||||
}
|
||||
return make_ready_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg)));
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto query = sstring(in.read_long_string_view());
|
||||
|
||||
tracing::add_query(client_state.get_trace_state(), query);
|
||||
tracing::begin(client_state.get_trace_state(), "Preparing CQL3 query", client_state.get_client_address());
|
||||
tracing::add_query(trace_state, query);
|
||||
tracing::begin(trace_state, "Preparing CQL3 query", client_state.get_client_address());
|
||||
|
||||
auto cpu_id = engine().cpu_id();
|
||||
auto cpus = boost::irange(0u, smp::count);
|
||||
@@ -833,13 +837,13 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}).then([this, query, stream, &client_state] () mutable {
|
||||
tracing::trace(client_state.get_trace_state(), "Done preparing on remote shards");
|
||||
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state] (auto msg) {
|
||||
tracing::trace(client_state.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
|
||||
}).then([this, query, stream, &client_state, trace_state] () mutable {
|
||||
tracing::trace(trace_state, "Done preparing on remote shards");
|
||||
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state, trace_state] (auto msg) {
|
||||
tracing::trace(trace_state, "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
|
||||
return messages::result_message::prepared::cql::get_id(msg);
|
||||
}));
|
||||
return make_result(stream, *msg, client_state.get_trace_state(), _version);
|
||||
return make_result(stream, *msg, trace_state, _version);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -847,7 +851,8 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
|
||||
future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, bool init_trace) {
|
||||
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit,
|
||||
tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
|
||||
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
|
||||
bool needs_authorization = false;
|
||||
@@ -864,7 +869,7 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
|
||||
throw exceptions::prepared_query_not_found_exception(id);
|
||||
}
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, client_state.get_trace_state(), std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
if (version == 1) {
|
||||
std::vector<cql3::raw_value_view> values;
|
||||
@@ -879,33 +884,33 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
|
||||
if (init_trace) {
|
||||
tracing::set_page_size(client_state.get_trace_state(), options.get_page_size());
|
||||
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
|
||||
tracing::add_query(client_state.get_trace_state(), prepared->raw_cql_statement);
|
||||
tracing::add_prepared_statement(client_state.get_trace_state(), prepared);
|
||||
tracing::set_page_size(trace_state, options.get_page_size());
|
||||
tracing::set_consistency_level(trace_state, options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
|
||||
tracing::add_query(trace_state, prepared->raw_cql_statement);
|
||||
tracing::add_prepared_statement(trace_state, prepared);
|
||||
|
||||
tracing::begin(client_state.get_trace_state(), seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
|
||||
tracing::begin(trace_state, seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
|
||||
client_state.get_client_address());
|
||||
}
|
||||
|
||||
auto stmt = prepared->statement;
|
||||
tracing::trace(query_state.get_trace_state(), "Checking bounds");
|
||||
tracing::trace(trace_state, "Checking bounds");
|
||||
if (stmt->get_bound_terms() != options.get_values_count()) {
|
||||
const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}",
|
||||
stmt->get_bound_terms(),
|
||||
options.get_values_count());
|
||||
tracing::trace(query_state.get_trace_state(), msg);
|
||||
tracing::trace(trace_state, msg);
|
||||
throw exceptions::invalid_request_exception(msg);
|
||||
}
|
||||
|
||||
options.prepare(prepared->bound_names);
|
||||
|
||||
if (init_trace) {
|
||||
tracing::add_prepared_query_options(client_state.get_trace_state(), options);
|
||||
tracing::add_prepared_query_options(trace_state, options);
|
||||
}
|
||||
|
||||
tracing::trace(query_state.get_trace_state(), "Processing a statement");
|
||||
tracing::trace(trace_state, "Processing a statement");
|
||||
return qp.local().process_statement_prepared(std::move(prepared), std::move(cache_key), query_state, options, needs_authorization)
|
||||
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version] (auto msg) {
|
||||
if (msg->move_to_shard()) {
|
||||
@@ -919,16 +924,18 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
cql_server::connection::process_execute_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
|
||||
service::client_state& cs, service_permit permit) {
|
||||
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state) {
|
||||
return smp::submit_to(shard, _server._config.bounce_request_smp_service_group,
|
||||
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit)] () {
|
||||
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit),
|
||||
gt = tracing::global_trace_state_ptr(std::move(trace_state))] () {
|
||||
service::client_state client_state = cs.get();
|
||||
tracing::trace_state_ptr trace_state = gt;
|
||||
cql_server& server = s.get().local();
|
||||
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream]
|
||||
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream, trace_state]
|
||||
(bytes_ostream& linearization_buffer, service::client_state& client_state) {
|
||||
request_reader in(is, linearization_buffer);
|
||||
return process_execute_internal(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
|
||||
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), false).then([] (auto msg) {
|
||||
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), trace_state, false).then([] (auto msg) {
|
||||
// result here has to be foreign ptr
|
||||
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
|
||||
});
|
||||
@@ -937,22 +944,22 @@ cql_server::connection::process_execute_on_shard(unsigned shard, uint16_t stream
|
||||
}
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connection::process_execute(uint16_t stream, request_reader in,
|
||||
service::client_state& client_state, service_permit permit) {
|
||||
service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) {
|
||||
fragmented_temporary_buffer::istream is = in.get_stream();
|
||||
|
||||
return process_execute_internal(client_state, _server._query_processor, in, stream,
|
||||
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, true)
|
||||
.then([stream, &client_state, this, is, permit] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
|
||||
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, trace_state, true)
|
||||
.then([stream, &client_state, this, is, permit, trace_state] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
|
||||
unsigned* shard = std::get_if<unsigned>(&msg);
|
||||
if (shard) {
|
||||
return process_execute_on_shard(*shard, stream, is, client_state, std::move(permit));
|
||||
return process_execute_on_shard(*shard, stream, is, client_state, std::move(permit), trace_state);
|
||||
}
|
||||
return make_ready_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg)));
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>>
|
||||
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
|
||||
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
|
||||
{
|
||||
if (_version == 1) {
|
||||
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
|
||||
@@ -968,7 +975,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
modifications.reserve(n);
|
||||
values.reserve(n);
|
||||
|
||||
tracing::begin(client_state.get_trace_state(), "Execute batch of CQL3 queries", client_state.get_client_address());
|
||||
tracing::begin(trace_state, "Execute batch of CQL3 queries", client_state.get_client_address());
|
||||
|
||||
for ([[gnu::unused]] auto i : boost::irange(0u, n)) {
|
||||
const auto kind = in.read_byte();
|
||||
@@ -982,7 +989,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
auto query = in.read_long_string_view();
|
||||
stmt_ptr = _server._query_processor.local().get_statement(query, client_state);
|
||||
ps = stmt_ptr->checked_weak_from_this();
|
||||
tracing::add_query(client_state.get_trace_state(), query);
|
||||
tracing::add_query(trace_state, query);
|
||||
break;
|
||||
}
|
||||
case 1: {
|
||||
@@ -1001,7 +1008,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
needs_authorization = pending_authorization_entries.emplace(std::move(cache_key), ps->checked_weak_from_this()).second;
|
||||
}
|
||||
|
||||
tracing::add_query(client_state.get_trace_state(), ps->raw_cql_statement);
|
||||
tracing::add_query(trace_state, ps->raw_cql_statement);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
@@ -1015,8 +1022,8 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::statements::modification_statement> modif_statement_ptr = static_pointer_cast<cql3::statements::modification_statement>(ps->statement);
|
||||
tracing::add_table_name(client_state.get_trace_state(), modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
|
||||
tracing::add_prepared_statement(client_state.get_trace_state(), ps);
|
||||
tracing::add_table_name(trace_state, modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
|
||||
tracing::add_prepared_statement(trace_state, ps);
|
||||
|
||||
modifications.emplace_back(std::move(modif_statement_ptr), needs_authorization);
|
||||
|
||||
@@ -1031,16 +1038,16 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
values.emplace_back(std::move(tmp));
|
||||
}
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
||||
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values)));
|
||||
auto& options = *q_state->options;
|
||||
|
||||
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
|
||||
tracing::add_prepared_query_options(client_state.get_trace_state(), options);
|
||||
tracing::trace(client_state.get_trace_state(), "Creating a batch statement");
|
||||
tracing::set_consistency_level(trace_state, options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
|
||||
tracing::add_prepared_query_options(trace_state, options);
|
||||
tracing::trace(trace_state, "Creating a batch statement");
|
||||
|
||||
auto batch = ::make_shared<cql3::statements::batch_statement>(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), _server._query_processor.local().get_cql_stats());
|
||||
return _server._query_processor.local().process_batch(batch, query_state, options, std::move(pending_authorization_entries))
|
||||
@@ -1050,15 +1057,15 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>>
|
||||
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
std::vector<sstring> event_types;
|
||||
in.read_string_list(event_types);
|
||||
for (auto&& event_type : event_types) {
|
||||
auto et = parse_event_type(event_type);
|
||||
_server._notifier->register_event(et, this);
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, std::move(trace_state)));
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const
|
||||
|
||||
@@ -96,9 +96,6 @@ struct cql_query_state {
|
||||
service::query_state query_state;
|
||||
std::unique_ptr<cql3::query_options> options;
|
||||
|
||||
cql_query_state(service::client_state& client_state, service_permit permit)
|
||||
: query_state(client_state, std::move(permit))
|
||||
{ }
|
||||
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit)
|
||||
: query_state(client_state, std::move(trace_state_ptr), std::move(permit))
|
||||
{ }
|
||||
@@ -197,14 +194,14 @@ private:
|
||||
cql_binary_frame_v3 parse_frame(temporary_buffer<char> buf) const;
|
||||
future<fragmented_temporary_buffer> read_and_decompress_frame(size_t length, uint8_t flags);
|
||||
future<std::optional<cql_binary_frame_v3>> read_frame();
|
||||
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
|
||||
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
|
||||
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
|
||||
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
|
||||
std::unique_ptr<cql_server::response> make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const;
|
||||
@@ -224,11 +221,12 @@ private:
|
||||
std::unique_ptr<cql_server::response> make_auth_challenge(int16_t, bytes, const tracing::trace_state_ptr& tr_state) const;
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
|
||||
process_execute_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
|
||||
service::client_state& cs, service_permit permit);
|
||||
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
process_query_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
|
||||
service::client_state& cs, service_permit permit);
|
||||
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
|
||||
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none);
|
||||
|
||||
@@ -266,7 +264,9 @@ class cql_server::event_notifier : public service::migration_listener,
|
||||
std::set<cql_server::connection*> _schema_change_listeners;
|
||||
std::unordered_map<gms::inet_address, event::status_change::status_type> _last_status_change;
|
||||
service::migration_notifier& _mnotifier;
|
||||
bool _stopped = false;
|
||||
public:
|
||||
future<> stop();
|
||||
event_notifier(service::migration_notifier& mn);
|
||||
~event_notifier();
|
||||
void register_event(cql_transport::event::event_type et, cql_server::connection* conn);
|
||||
|
||||
15
types.cc
15
types.cc
@@ -2082,12 +2082,19 @@ static size_t concrete_serialized_size(const string_type_impl::native_type& v) {
|
||||
static size_t concrete_serialized_size(const bytes_type_impl::native_type& v) { return v.size(); }
|
||||
static size_t concrete_serialized_size(const inet_addr_type_impl::native_type& v) { return v.get().size(); }
|
||||
|
||||
static size_t concrete_serialized_size(const boost::multiprecision::cpp_int& num) {
|
||||
if (!num) {
|
||||
static size_t concrete_serialized_size_aux(const boost::multiprecision::cpp_int& num) {
|
||||
if (num) {
|
||||
return align_up(boost::multiprecision::msb(num) + 2, 8u) / 8;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
auto pnum = abs(num);
|
||||
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
|
||||
}
|
||||
|
||||
static size_t concrete_serialized_size(const boost::multiprecision::cpp_int& num) {
|
||||
if (num < 0) {
|
||||
return concrete_serialized_size_aux(-num - 1);
|
||||
}
|
||||
return concrete_serialized_size_aux(num);
|
||||
}
|
||||
|
||||
static size_t concrete_serialized_size(const varint_type_impl::native_type& v) {
|
||||
|
||||
Reference in New Issue
Block a user