Compare commits

..

4 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
bb1ab98fc9 Remove unused unimplemented::cause enum values and document remaining ones
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-05 22:26:58 +00:00
copilot-swe-agent[bot]
bdbc47a333 Restore unimplemented::cause::SUPER - still needed for error reporting
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-05 21:01:46 +00:00
copilot-swe-agent[bot]
5407b6b43d Remove dead code for super table handling
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-05 20:59:18 +00:00
copilot-swe-agent[bot]
8582156257 Initial plan 2025-12-05 20:48:35 +00:00
105 changed files with 1990 additions and 1495 deletions

9
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall
auth/* @nuivall @ptrsmrn
# CACHE
row_cache* @tgrabiec
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall
cql3/* @tgrabiec @nuivall @ptrsmrn
# COUNTERS
counters* @nuivall
tests/counter_test* @nuivall
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
# DOCS
docs/* @annastuchlik @tzach
@@ -57,6 +57,7 @@ repair/* @tgrabiec @asias
# SCHEMA MANAGEMENT
db/schema_tables* @tgrabiec
db/legacy_schema_migrator* @tgrabiec
service/migration* @tgrabiec
schema* @tgrabiec

View File

@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
if is_draft:
labels_to_add.append("conflicts")
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
pr_comment += "Please resolve them and mark this PR as ready for review"
backport_pr.create_issue_comment(pr_comment)
# Apply all labels at once if we have any

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -3,13 +3,10 @@ name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
pull_request_target:
types:
- unlabeled
jobs:
trigger-jenkins:
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI-Route Jenkins Job

View File

@@ -0,0 +1,109 @@
# Analysis of unimplemented::cause Enum Values
This document provides an analysis of the `unimplemented::cause` enum values after cleanup.
## Removed Unused Enum Values (20 values removed)
The following enum values had **zero usages** in the codebase and have been removed:
- `LWT` - Lightweight transactions
- `PAGING` - Query result paging
- `AUTH` - Authentication
- `PERMISSIONS` - Permission checking
- `COUNTERS` - Counter columns
- `MIGRATIONS` - Schema migrations
- `GOSSIP` - Gossip protocol
- `TOKEN_RESTRICTION` - Token-based restrictions
- `LEGACY_COMPOSITE_KEYS` - Legacy composite key handling
- `COLLECTION_RANGE_TOMBSTONES` - Collection range tombstones
- `RANGE_DELETES` - Range deletion operations
- `COMPRESSION` - Compression features
- `NONATOMIC` - Non-atomic operations
- `CONSISTENCY` - Consistency level handling
- `WRAP_AROUND` - Token wrap-around handling
- `STORAGE_SERVICE` - Storage service operations
- `SCHEMA_CHANGE` - Schema change operations
- `MIXED_CF` - Mixed column family operations
- `SSTABLE_FORMAT_M` - SSTable format M
## Remaining Enum Values (8 values kept)
### 1. `API` (4 usages)
**Impact**: REST API features that are not fully implemented.
**Usages**:
- `api/column_family.cc:1052` - Fails when `split_output` parameter is used in major compaction
- `api/compaction_manager.cc:100,146,216` - Warns when force_user_defined_compaction or related operations are called
**User Impact**: Some REST API endpoints for compaction management are stubs and will warn or fail.
### 2. `INDEXES` (6 usages)
**Impact**: Secondary index features not fully supported.
**Usages**:
- `api/column_family.cc:433,440,449,456` - Warns about index-related operations
- `cql3/restrictions/statement_restrictions.cc:1158` - Fails when attempting filtering on collection columns without proper indexing
- `cql3/statements/update_statement.cc:149` - Warns about index operations
**User Impact**: Some advanced secondary index features (especially filtering on collections) are not available.
### 3. `TRIGGERS` (2 usages)
**Impact**: Trigger support is not implemented.
**Usages**:
- `db/schema_tables.cc:2017` - Warns when loading trigger metadata from schema tables
- `service/storage_proxy.cc:4166` - Warns when processing trigger-related operations
**User Impact**: Cassandra triggers (stored procedures that execute on data changes) are not supported.
### 4. `METRICS` (1 usage)
**Impact**: Some query processor metrics are not collected.
**Usages**:
- `cql3/query_processor.cc:585` - Warns about missing metrics implementation
**User Impact**: Minor - some internal metrics may not be available.
### 5. `VALIDATION` (4 usages)
**Impact**: Schema validation checks are partially implemented.
**Usages**:
- `cql3/functions/token_fct.hh:38` - Warns about validation in token functions
- `cql3/statements/drop_keyspace_statement.cc:40` - Warns when dropping keyspace
- `cql3/statements/truncate_statement.cc:87` - Warns when truncating table
- `service/migration_manager.cc:750` - Warns during schema migrations
**User Impact**: Some schema validation checks are skipped (with warnings logged).
### 6. `REVERSED` (1 usage)
**Impact**: Reversed type support in CQL protocol.
**Usages**:
- `transport/server.cc:2085` - Fails when trying to use reversed types in CQL protocol
**User Impact**: Reversed types are not supported in the CQL protocol implementation.
### 7. `HINT` (1 usage)
**Impact**: Hint replaying is not implemented.
**Usages**:
- `db/batchlog_manager.cc:251` - Warns when attempting to replay hints
**User Impact**: Cassandra hints (temporary storage of writes when nodes are down) are not supported.
### 8. `SUPER` (2 usages)
**Impact**: Super column families are not supported.
**Usages**:
- `db/legacy_schema_migrator.cc:157` - Fails when encountering super column family in legacy schema
- `db/schema_tables.cc:2288` - Fails when encountering super column family in schema tables
**User Impact**: Super column families (legacy Cassandra feature) will cause errors if encountered in legacy data or schema migrations.
## Summary
- **Removed**: 20 unused enum values (76% reduction)
- **Kept**: 8 actively used enum values (24% remaining)
- **Total lines removed**: ~40 lines from enum definition and switch statement
The remaining enum values represent actual unimplemented features that users may encounter, with varying impacts ranging from warnings (TRIGGERS, METRICS, VALIDATION, HINT) to failures (API split_output, INDEXES on collections, REVERSED types, SUPER tables).

View File

@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
if (!comparison_operator.IsString()) {
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
}
std::string op = rjson::to_string(comparison_operator);
std::string op = comparison_operator.GetString();
auto it = ops.find(op);
if (it == ops.end()) {
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
}
if (kv1.name == "S") {
return cmp(rjson::to_string_view(kv1.value),
rjson::to_string_view(kv2.value));
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
}
if (kv1.name == "B") {
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(rjson::to_string_view(kv_v.value),
rjson::to_string_view(kv_lb.value),
rjson::to_string_view(kv_ub.value),
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
bounds_from_query);
}
if (kv_v.name == "B") {

View File

@@ -8,8 +8,6 @@
#include "consumed_capacity.hh"
#include "error.hh"
#include "utils/rjson.hh"
#include <fmt/format.h>
namespace alternator {
@@ -34,12 +32,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
if (!return_consumed->IsString()) {
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
}
std::string_view consumed = rjson::to_string_view(*return_consumed);
std::string consumed = return_consumed->GetString();
if (consumed == "INDEXES") {
throw api_error::validation("INDEXES consumed capacity is not supported");
}
if (consumed != "TOTAL") {
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
throw api_error::validation("Unknown consumed capacity "+ consumed);
}
return true;
}

View File

@@ -419,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
if (!table_name_value->IsString()) {
throw api_error::validation("Non-string TableName field in request");
}
std::string table_name = rjson::to_string(*table_name_value);
std::string table_name = table_name_value->GetString();
return table_name;
}
@@ -546,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
// does exist but the index does not (ValidationException).
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
@@ -587,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
attribute_name, value));
}
return rjson::to_string(*attribute_value);
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
}
// Convenience function for getting the value of a boolean attribute, or a
@@ -1080,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
}
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == name) {
auto type = attribute_info["AttributeType"].GetString();
data_type dt = parse_key_type(type);
if (computed_column) {
// Computed column for GSI (doesn't choose a real column as-is
@@ -1116,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First element of KeySchema must be an object");
}
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
throw api_error::validation("First key in KeySchema must be a HASH key");
}
v = rjson::find((*key_schema)[0], "AttributeName");
@@ -1124,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First key in KeySchema must have string AttributeName");
}
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
std::string hash_key = rjson::to_string(*v);
std::string hash_key = v->GetString();
std::string range_key;
if (key_schema->Size() == 2) {
if (!(*key_schema)[1].IsObject()) {
throw api_error::validation("Second element of KeySchema must be an object");
}
v = rjson::find((*key_schema)[1], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
throw api_error::validation("Second key in KeySchema must be a RANGE key");
}
v = rjson::find((*key_schema)[1], "AttributeName");
@@ -1887,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string def_type = type_to_string(def.type);
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
auto type = attribute_info["AttributeType"].GetString();
if (type != def_type) {
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
}
@@ -2223,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
// The put_or_delete_item class builds the mutations needed by the PutItem and
// DeleteItem operations - either as stand-alone commands or part of a list
// of commands in BatchWriteItem.
// of commands in BatchWriteItems.
// put_or_delete_item splits each operation into two stages: Constructing the
// object parses and validates the user input (throwing exceptions if there
// are input errors). Later, build() generates the actual mutation, with a
// specified timestamp. This split is needed because of the peculiar needs of
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
@@ -2362,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
@@ -2739,7 +2739,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
auto read_command = needs_read_before_write ?
previous_item_read_command(proxy, schema(), _ck, selection) :
nullptr;
return proxy.cas(schema(), std::move(*cas_shard), *this, read_command, to_partition_ranges(*schema(), _pk),
return proxy.cas(schema(), std::move(*cas_shard), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout, true, std::move(cdc_opts)).then([this, read_command, &wcu_total] (bool is_applied) mutable {
if (!is_applied) {
@@ -2783,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
return;
}
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
if (!used.contains(rjson::to_string(it->name))) {
if (!used.contains(it->name.GetString())) {
throw api_error::validation(
format("{} has spurious '{}', not used in {}",
field_name, rjson::to_string_view(it->name), operation));
field_name, it->name.GetString(), operation));
}
}
}
@@ -3000,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
}
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
try {
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
} catch(data_dictionary::no_such_column_family&) {
@@ -3026,20 +3026,17 @@ struct primary_key_equal {
};
// This is a cas_request subclass for applying given put_or_delete_items to
// one partition using LWT as part as BatchWriteItem. This is a write-only
// one partition using LWT as part as BatchWriteItems. This is a write-only
// operation, not needing the previous value of the item (the mutation to be
// done is known prior to starting the operation). Nevertheless, we want to
// do this mutation via LWT to ensure that it is serialized with other LWT
// mutations to the same partition.
//
// The std::vector<put_or_delete_item> must remain alive until the
// storage_proxy::cas() future is resolved.
class put_or_delete_item_cas_request : public service::cas_request {
schema_ptr schema;
const std::vector<put_or_delete_item>& _mutation_builders;
std::vector<put_or_delete_item> _mutation_builders;
public:
put_or_delete_item_cas_request(schema_ptr s, const std::vector<put_or_delete_item>& b) :
schema(std::move(s)), _mutation_builders(b) { }
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
schema(std::move(s)), _mutation_builders(std::move(b)) { }
virtual ~put_or_delete_item_cas_request() = default;
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override {
std::optional<mutation> ret;
@@ -3055,48 +3052,20 @@ public:
}
};
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit)
{
if (!cas_shard.this_shard()) {
_stats.shard_bounce_for_lwt++;
return container().invoke_on(cas_shard.shard(), _ssg,
[cs = client_state.move_to_other_shard(),
&mb = mutation_builders,
&dk,
ks = schema->ks_name(),
cf = schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(executor& self) mutable {
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt), &self]
(service::client_state& client_state) mutable {
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
});
}
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
auto timeout = executor::default_timeout();
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
timeout, timeout, true, std::move(cdc_opts)).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
// does not need to support conditional updates.
}
@@ -3118,11 +3087,13 @@ struct schema_decorated_key_equal {
// FIXME: if we failed writing some of the mutations, need to return a list
// of these failed mutations rather than fail the whole write (issue #5650).
future<> executor::do_batch_write(
static future<> do_batch_write(service::storage_proxy& proxy,
smp_service_group ssg,
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit) {
service_permit permit,
stats& stats) {
if (mutation_builders.empty()) {
return make_ready_future<>();
}
@@ -3144,7 +3115,7 @@ future<> executor::do_batch_write(
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
return proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
@@ -3153,48 +3124,55 @@ future<> executor::do_batch_write(
false,
cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
});
} else {
// Do the write via LWT:
// Multiple mutations may be destined for the same partition, adding
// or deleting different items of one partition. Join them together
// because we can do them in one cas() call.
using map_type = std::unordered_map<schema_decorated_key,
std::vector<put_or_delete_item>,
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto&& b : std::move(mutation_builders)) {
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
.schema = b.first,
.dk = dht::decorate_key(*b.first, b.second.pk())
});
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders.try_emplace(schema_decorated_key{b.first, dk});
it->second.push_back(std::move(b.second));
}
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
_stats.write_using_lwt++;
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
auto s = e.first.schema;
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, std::move(e.second), client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
mb = e.second,
dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
static const auto* injection_name = "alternator_executor_batch_write_wait";
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
const auto ks = handler.get("keyspace");
const auto cf = handler.get("table");
const auto shard = std::atoll(handler.get("shard")->data());
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
elogger.info("{}: hit", injection_name);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
elogger.info("{}: continue", injection_name);
}
}).then([&e, desired_shard = std::move(desired_shard),
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
{
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
});
}).finally([key_builders = std::move(key_builders)]{});
// The desired_shard on the original shard remains alive for the duration
// of cas_write on this shard and prevents any tablet operations.
// However, we need a local instance of cas_shard on this shard
// to pass it to sp::cas, so we just create a new one.
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, std::move(mb), client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
});
}
}
@@ -3341,7 +3319,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
_stats.api_operations.batch_write_item_batch_total += total_items;
_stats.api_operations.batch_write_item_histogram.add(total_items);
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
// FIXME: Issue #5650: If we failed writing some of the updates,
// need to return a list of these failed updates in UnprocessedItems
// rather than fail the whole write (issue #5650).
@@ -3386,7 +3364,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
}
rjson::value newv = rjson::empty_object();
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
std::string attr = rjson::to_string(it->name);
std::string attr = it->name.GetString();
auto x = members.find(attr);
if (x != members.end()) {
if (x->second) {
@@ -3606,7 +3584,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
attribute_path_map_add("AttributesToGet", ret, it->GetString());
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
@@ -4272,12 +4250,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
// Note that it.key() is the name of the column, *it is the operation
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
const column_definition* cdef = _schema->get_column_definition(column_name);
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
}
std::string action = rjson::to_string((it->value)["Action"]);
std::string action = (it->value)["Action"].GetString();
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -5474,7 +5452,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
sstring key = rjson::to_sstring(it->name);
std::string key = it->name.GetString();
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
@@ -5482,13 +5460,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (key == pk_cdef.name_as_text()) {
if (sstring(key) == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
@@ -5889,7 +5867,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
int limit = limit_json ? limit_json->GetInt() : 100;
if (limit < 1 || limit > 100) {
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");

View File

@@ -40,7 +40,6 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
}
namespace cdc {
@@ -58,7 +57,6 @@ class schema_builder;
namespace alternator {
class rmw_operation;
class put_or_delete_item;
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
@@ -221,16 +219,6 @@ private:
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit);
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit);
public:
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);

View File

@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
return {"", nullptr};
}
auto it = v.MemberBegin();
const std::string it_key = rjson::to_string(it->name);
const std::string it_key = it->name.GetString();
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
return {std::move(it_key), nullptr};
}

View File

@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
}
sstring attribute_name = rjson::to_sstring(*v);
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {

View File

@@ -3051,7 +3051,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -8,7 +8,6 @@
*/
#include "auth/certificate_authenticator.hh"
#include "auth/cache.hh"
#include <boost/regex.hpp>
#include <fmt/ranges.h>
@@ -35,14 +34,13 @@ static const class_registrator<auth::authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();
@@ -77,9 +75,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
}
continue;
} catch (const std::out_of_range&) {
} catch (std::out_of_range&) {
// just fallthrough
} catch (const boost::regex_error&) {
} catch (boost::regex_error&) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
}
}

View File

@@ -26,15 +26,13 @@ class raft_group0_client;
namespace auth {
class cache;
extern const std::string_view certificate_authenticator_name;
class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
~certificate_authenticator();
future<> start() override;

View File

@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
try {
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
} catch (const exceptions::already_exists_exception&) {}
} catch (exceptions::already_exists_exception&) {}
}
}

View File

@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
} else {
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
}
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
}
}
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
[resource](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}

View File

@@ -337,13 +337,13 @@ future<authenticated_user> password_authenticator::authenticate(
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
co_return username;
} catch (const std::system_error &) {
} catch (std::system_error &) {
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (const exceptions::authentication_exception& e) {
} catch (exceptions::authentication_exception& e) {
std::throw_with_nested(e);
} catch (const exceptions::unavailable_exception& e) {
} catch (exceptions::unavailable_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));

View File

@@ -226,7 +226,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (const ::service::group0_concurrent_modification&) {
} catch (::service::group0_concurrent_modification&) {
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}

View File

@@ -192,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
log.info("Created default superuser role '{}'.", _superuser);
} catch (const exceptions::unavailable_exception& e) {
} catch(const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}

View File

@@ -81,7 +81,7 @@ public:
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -126,7 +126,7 @@ public:
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
_complete = true;
return {};
}
@@ -141,7 +141,7 @@ public:
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}

View File

@@ -1062,6 +1062,7 @@ scylla_core = (['message/messaging_service.cc',
'db/hints/resource_manager.cc',
'db/hints/sync_point.cc',
'db/large_data_handler.cc',
'db/legacy_schema_migrator.cc',
'db/marshal/type_parser.cc',
'db/per_partition_rate_limit_options.cc',
'db/rate_limiter.cc',

View File

@@ -186,10 +186,6 @@ void alter_table_statement::add_column(const query_options&, const schema& schem
if (!schema.is_compound()) {
throw exceptions::invalid_request_exception("Cannot use non-frozen collections with a non-composite PRIMARY KEY");
}
if (schema.is_super()) {
throw exceptions::invalid_request_exception("Cannot use non-frozen collections with super column families");
}
// If there used to be a non-frozen collection column with the same name (that has been dropped),
// we could still have some data using the old type, and so we can't allow adding a collection

View File

@@ -331,7 +331,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (!cl_for_paxos) [[unlikely]] {
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(std::move(cl_for_paxos).assume_error());
}
std::unique_ptr<cas_request> request;
seastar::shared_ptr<cas_request> request;
schema_ptr schema;
db::timeout_clock::time_point now = db::timeout_clock::now();
@@ -354,9 +354,9 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (keys.empty()) {
continue;
}
if (!request) {
if (request.get() == nullptr) {
schema = statement.s;
request = std::make_unique<cas_request>(schema, std::move(keys));
request = seastar::make_shared<cas_request>(schema, std::move(keys));
} else if (keys.size() != 1 || keys.front().equal(request->key().front(), dht::ring_position_comparator(*schema)) == false) {
throw exceptions::invalid_request_exception("BATCH with conditions cannot span multiple partitions");
}
@@ -366,7 +366,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
request->add_row_update(statement, std::move(ranges), std::move(json_cache), statement_options);
}
if (!request) {
if (request.get() == nullptr) {
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional BATCH"));
}
@@ -377,10 +377,9 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
);
}
auto* request_ptr = request.get();
return qp.proxy().cas(schema, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
return qp.proxy().cas(schema, std::move(cas_shard), request, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request = std::move(request)] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request] (bool is_applied) {
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
});
}

View File

@@ -401,8 +401,7 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
type.is_update() ? "update" : "deletion"));
}
auto request = std::make_unique<cas_request>(s, std::move(keys));
auto* request_ptr = request.get();
auto request = seastar::make_shared<cas_request>(s, std::move(keys));
// cas_request can be used for batches as well single statements; Here we have just a single
// modification in the list of CAS commands, since we're handling single-statement execution.
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
@@ -428,9 +427,9 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
tablet_info = erm->check_locality(token);
}
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
return qp.proxy().cas(s, std::move(cas_shard), request, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request, tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
result->add_tablet_info(tablet_replicas, token_range);
return result;

View File

@@ -10,6 +10,7 @@ target_sources(db
schema_applier.cc
schema_tables.cc
cql_type_parser.cc
legacy_schema_migrator.cc
commitlog/commitlog.cc
commitlog/commitlog_replayer.cc
commitlog/commitlog_entry.cc

View File

@@ -0,0 +1,597 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
// Since Scylla 2.0, we use system tables whose schemas were introduced in
// Cassandra 3. If Scylla boots to find a data directory with system tables
// with older schemas - produced by pre-2.0 Scylla or by pre-3.0 Cassandra,
// we need to migrate these old tables to the new format.
//
// We provide here a function, db::legacy_schema_migrator::migrate(),
// for a one-time migration from old to new system tables. The function
// reads old system tables, write them back in the new format, and finally
// delete the old system tables. Scylla's main should call this function and
// wait for the returned future, before starting to serve the database.
#include <boost/iterator/filter_iterator.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/util/log.hh>
#include <map>
#include <unordered_set>
#include <chrono>
#include "replica/database.hh"
#include "legacy_schema_migrator.hh"
#include "system_keyspace.hh"
#include "schema_tables.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "utils/rjson.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/util.hh"
#include "cql3/statements/property_definitions.hh"
static seastar::logger mlogger("legacy_schema_migrator");
namespace db {
namespace legacy_schema_migrator {
// local data carriers
class migrator {
public:
static const std::unordered_set<sstring> legacy_schema_tables;
migrator(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp)
: _sp(sp), _db(db), _sys_ks(sys_ks), _qp(qp) {
}
migrator(migrator&&) = default;
typedef db_clock::time_point time_point;
// TODO: we don't support triggers.
// this is a placeholder.
struct trigger {
time_point timestamp;
sstring name;
std::unordered_map<sstring, sstring> options;
};
struct table {
time_point timestamp;
schema_ptr metadata;
std::vector<trigger> triggers;
};
struct type {
time_point timestamp;
user_type metadata;
};
struct function {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
bool called_on_null_input;
sstring language;
sstring body;
};
struct aggregate {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
sstring final_func;
sstring initcond;
sstring state_func;
sstring state_type;
};
struct keyspace {
time_point timestamp;
sstring name;
bool durable_writes;
std::map<sstring, sstring> replication_params;
std::vector<table> tables;
std::vector<type> types;
std::vector<function> functions;
std::vector<aggregate> aggregates;
};
class unsupported_feature : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
static sstring fmt_query(const char* fmt, const char* table) {
return fmt::format(fmt::runtime(fmt), db::system_keyspace::NAME, table);
}
typedef ::shared_ptr<cql3::untyped_result_set> result_set_type;
typedef const cql3::untyped_result_set::row row_type;
future<> read_table(keyspace& dst, sstring cf_name, time_point timestamp) {
auto fmt = "SELECT * FROM {}.{} WHERE keyspace_name = ? AND columnfamily_name = ?";
auto tq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNFAMILIES);
auto cq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNS);
auto zq = fmt_query(fmt, db::system_keyspace::legacy::TRIGGERS);
typedef std::tuple<future<result_set_type>, future<result_set_type>, future<result_set_type>, future<db::schema_tables::legacy::schema_mutations>> result_tuple;
return when_all(_qp.execute_internal(tq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(cq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(zq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
db::schema_tables::legacy::read_table_mutations(_sp, dst.name, cf_name, db::system_keyspace::legacy::column_families()))
.then([&dst, cf_name, timestamp](result_tuple&& t) {
result_set_type tables = std::get<0>(t).get();
result_set_type columns = std::get<1>(t).get();
result_set_type triggers = std::get<2>(t).get();
db::schema_tables::legacy::schema_mutations sm = std::get<3>(t).get();
row_type& td = tables->one();
auto ks_name = td.get_as<sstring>("keyspace_name");
auto cf_name = td.get_as<sstring>("columnfamily_name");
auto id = table_id(td.get_or("cf_id", generate_legacy_id(ks_name, cf_name).uuid()));
schema_builder builder(dst.name, cf_name, id);
builder.with_version(sm.digest());
auto type_str = td.get_or("type", sstring("standard"));
if (type_str == "Super") {
fail(unimplemented::cause::SUPER);
}
auto comparator = td.get_as<sstring>("comparator");
bool is_compound = cell_comparator::check_compound(comparator);
builder.set_is_compound(is_compound);
cell_comparator::read_collections(builder, comparator);
bool filter_sparse = false;
data_type default_validator = {};
if (td.has("default_validator")) {
default_validator = db::schema_tables::parse_type(td.get_as<sstring>("default_validator"));
if (default_validator->is_counter()) {
builder.set_is_counter(true);
}
builder.set_default_validation_class(default_validator);
}
/*
* Determine whether or not the table is *really* dense
* We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively),
* but we can trust is_dense value of false.
*/
auto is_dense = td.get_opt<bool>("is_dense");
if (!is_dense || *is_dense) {
is_dense = [&] {
/*
* As said above, this method is only here because we need to deal with thrift upgrades.
* Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
* then we'll have saved the "is_dense" value and will be good to go.
*
* But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
* to infer that information without relying on it in that case. And for the most part this is
* easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
* having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
* PRIMARY KEY defined.
*
* So we need to recognize those special case CQL3 table with only a primary key. If we have some
* clustering columns, we're fine as said above. So the only problem is that we cannot decide for
* sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
* has been created in CQL3 by say:
* CREATE TABLE test (k int PRIMARY KEY)
* in which case it should not be dense. However, we can limit our margin of error by assuming we are
* in the latter case only if the comparator is exactly CompositeType(UTF8Type).
*/
std::optional<column_id> max_cl_idx;
const cql3::untyped_result_set::row * regular = nullptr;
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
if (kind_str == "compact_value") {
continue;
}
auto kind = db::schema_tables::deserialize_kind(kind_str);
if (kind == column_kind::regular_column) {
if (regular != nullptr) {
return false;
}
regular = &row;
continue;
}
if (kind == column_kind::clustering_key) {
max_cl_idx = std::max(column_id(row.get_or("component_index", 0)), max_cl_idx.value_or(column_id()));
}
}
auto is_cql3_only_pk_comparator = [](const sstring& comparator) {
if (!cell_comparator::check_compound(comparator)) {
return false;
}
// CMH. We don't have composites, nor a parser for it. This is a simple way of c
// checking the same.
auto comma = comparator.find(',');
if (comma != sstring::npos) {
return false;
}
auto off = comparator.find('(');
auto end = comparator.find(')');
return comparator.compare(off, end - off, utf8_type->name()) == 0;
};
if (max_cl_idx) {
auto n = std::count(comparator.begin(), comparator.end(), ','); // num comp - 1
return *max_cl_idx == n;
}
if (regular) {
return false;
}
return !is_cql3_only_pk_comparator(comparator);
}();
// now, if switched to sparse, remove redundant compact_value column and the last clustering column,
// directly copying CASSANDRA-11502 logic. See CASSANDRA-11315.
filter_sparse = !*is_dense;
}
builder.set_is_dense(*is_dense);
auto is_cql = !*is_dense && is_compound;
auto is_static_compact = !*is_dense && !is_compound;
// org.apache.cassandra.schema.LegacySchemaMigrator#isEmptyCompactValueColumn
auto is_empty_compact_value = [](const cql3::untyped_result_set::row& column_row) {
auto kind_str = column_row.get_as<sstring>("type");
// Cassandra only checks for "compact_value", but Scylla generates "regular" instead (#2586)
return (kind_str == "compact_value" || kind_str == "regular")
&& column_row.get_as<sstring>("column_name").empty();
};
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
auto kind = db::schema_tables::deserialize_kind(kind_str);
auto component_index = kind > column_kind::clustering_key ? 0 : column_id(row.get_or("component_index", 0));
auto name = row.get_or<sstring>("column_name", sstring());
auto validator = db::schema_tables::parse_type(row.get_as<sstring>("validator"));
if (is_empty_compact_value(row)) {
continue;
}
if (filter_sparse) {
if (kind_str == "compact_value") {
continue;
}
if (kind == column_kind::clustering_key && !is_compound) {
continue;
}
}
std::optional<index_metadata_kind> index_kind;
sstring index_name;
index_options_map options;
if (row.has("index_type")) {
index_kind = schema_tables::deserialize_index_kind(row.get_as<sstring>("index_type"));
}
if (row.has("index_name")) {
index_name = row.get_as<sstring>("index_name");
}
if (row.has("index_options")) {
sstring index_options_str = row.get_as<sstring>("index_options");
options = rjson::parse_to_map<index_options_map>(std::string_view(index_options_str));
sstring type;
auto i = options.find("index_keys");
if (i != options.end()) {
options.erase(i);
type = "KEYS";
}
i = options.find("index_keys_and_values");
if (i != options.end()) {
options.erase(i);
type = "KEYS_AND_VALUES";
}
if (type.empty()) {
if (validator->is_collection() && validator->is_multi_cell()) {
type = "FULL";
} else {
type = "VALUES";
}
}
auto column = cql3::util::maybe_quote(name);
options["target"] = validator->is_collection()
? type + "(" + column + ")"
: column;
}
if (index_kind) {
// Origin assumes index_name is always set, so let's do the same
builder.with_index(index_metadata(index_name, options, *index_kind, index_metadata::is_local_index::no));
}
data_type column_name_type = [&] {
if (is_static_compact && kind == column_kind::regular_column) {
return db::schema_tables::parse_type(comparator);
}
return utf8_type;
}();
auto column_name = [&] {
try {
return column_name_type->from_string(name);
} catch (marshal_exception&) {
// #2597: Scylla < 2.0 writes names in serialized form, try to recover
column_name_type->validate(to_bytes_view(name));
return to_bytes(name);
}
}();
builder.with_column_ordered(column_definition(std::move(column_name), std::move(validator), kind, component_index));
}
if (is_static_compact) {
builder.set_regular_column_name_type(db::schema_tables::parse_type(comparator));
}
if (td.has("gc_grace_seconds")) {
builder.set_gc_grace_seconds(td.get_as<int32_t>("gc_grace_seconds"));
}
if (td.has("min_compaction_threshold")) {
builder.set_min_compaction_threshold(td.get_as<int32_t>("min_compaction_threshold"));
}
if (td.has("max_compaction_threshold")) {
builder.set_max_compaction_threshold(td.get_as<int32_t>("max_compaction_threshold"));
}
if (td.has("comment")) {
builder.set_comment(td.get_as<sstring>("comment"));
}
if (td.has("memtable_flush_period_in_ms")) {
builder.set_memtable_flush_period(td.get_as<int32_t>("memtable_flush_period_in_ms"));
}
if (td.has("caching")) {
builder.set_caching_options(caching_options::from_sstring(td.get_as<sstring>("caching")));
}
if (td.has("default_time_to_live")) {
builder.set_default_time_to_live(gc_clock::duration(td.get_as<int32_t>("default_time_to_live")));
}
if (td.has("speculative_retry")) {
builder.set_speculative_retry(td.get_as<sstring>("speculative_retry"));
}
if (td.has("compaction_strategy_class")) {
auto strategy = td.get_as<sstring>("compaction_strategy_class");
try {
builder.set_compaction_strategy(compaction::compaction_strategy::type(strategy));
} catch (const exceptions::configuration_exception& e) {
// If compaction strategy class isn't supported, fallback to incremental.
mlogger.warn("Falling back to incremental compaction strategy after the problem: {}", e.what());
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
}
}
if (td.has("compaction_strategy_options")) {
sstring strategy_options_str = td.get_as<sstring>("compaction_strategy_options");
builder.set_compaction_strategy_options(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options_str)));
}
auto comp_param = td.get_as<sstring>("compression_parameters");
compression_parameters cp(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(comp_param)));
builder.set_compressor_params(cp);
if (td.has("min_index_interval")) {
builder.set_min_index_interval(td.get_as<int32_t>("min_index_interval"));
} else if (td.has("index_interval")) { // compatibility
builder.set_min_index_interval(td.get_as<int32_t>("index_interval"));
}
if (td.has("max_index_interval")) {
builder.set_max_index_interval(td.get_as<int32_t>("max_index_interval"));
}
if (td.has("bloom_filter_fp_chance")) {
builder.set_bloom_filter_fp_chance(td.get_as<double>("bloom_filter_fp_chance"));
} else {
builder.set_bloom_filter_fp_chance(builder.get_bloom_filter_fp_chance());
}
if (td.has("dropped_columns")) {
auto map = td.get_map<sstring, int64_t>("dropped_columns");
for (auto&& e : map) {
builder.without_column(e.first, api::timestamp_type(e.second));
};
}
// ignore version. we're transient
if (!triggers->empty()) {
throw unsupported_feature("triggers");
}
dst.tables.emplace_back(table{timestamp, builder.build() });
});
}
future<> read_tables(keyspace& dst) {
auto query = fmt_query("SELECT columnfamily_name, writeTime(type) AS timestamp FROM {}.{} WHERE keyspace_name = ?",
db::system_keyspace::legacy::COLUMNFAMILIES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
return read_table(dst, row.get_as<sstring>("columnfamily_name"), row.get_as<time_point>("timestamp"));
}).finally([result] {});
});
}
future<time_point> read_type_timestamp(keyspace& dst, sstring type_name) {
// TODO: Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot
// use the writeTime() CQL function, and must resort to a lower level.
// Origin digs up the actual cells of target partition and gets timestamp from there.
// We should do the same, but g-dam that's messy. Lets give back dung value for now.
return make_ready_future<time_point>(dst.timestamp);
}
future<> read_types(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::USERTYPES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
auto name = row.get_blob_unfragmented("type_name");
auto columns = row.get_list<bytes>("field_names");
auto types = row.get_list<sstring>("field_types");
std::vector<data_type> field_types;
for (auto&& value : types) {
field_types.emplace_back(db::schema_tables::parse_type(value));
}
auto ut = user_type_impl::get_instance(dst.name, name, columns, field_types, false);
return read_type_timestamp(dst, value_cast<sstring>(utf8_type->deserialize(name))).then([ut = std::move(ut), &dst](time_point timestamp) {
dst.types.emplace_back(type{timestamp, ut});
});
}).finally([result] {});
});
}
future<> read_functions(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::FUNCTIONS);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("functions");
}
});
}
future<> read_aggregates(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::AGGREGATES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("aggregates");
}
});
}
future<keyspace> read_keyspace(sstring ks_name, bool durable_writes, sstring strategy_class, sstring strategy_options, time_point timestamp) {
auto map = rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options));
map.emplace("class", std::move(strategy_class));
auto ks = ::make_lw_shared<keyspace>(keyspace{timestamp, std::move(ks_name), durable_writes, std::move(map) });
return read_tables(*ks).then([this, ks] {
//Collection<Type> types = readTypes(keyspaceName);
return read_types(*ks);
}).then([this, ks] {
return read_functions(*ks);
}).then([this, ks] {
return read_aggregates(*ks);
}).then([ks] {
return make_ready_future<keyspace>(std::move(*ks));
});
}
future<> read_all_keyspaces() {
static auto ks_filter = [](row_type& row) {
auto ks_name = row.get_as<sstring>("keyspace_name");
return ks_name != db::system_keyspace::NAME && ks_name != db::schema_tables::v3::NAME;
};
auto query = fmt_query("SELECT keyspace_name, durable_writes, strategy_options, strategy_class, writeTime(durable_writes) AS timestamp FROM {}.{}",
db::system_keyspace::legacy::KEYSPACES);
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([this](result_set_type result) {
auto i = boost::make_filter_iterator(ks_filter, result->begin(), result->end());
auto e = boost::make_filter_iterator(ks_filter, result->end(), result->end());
return parallel_for_each(i, e, [this](row_type& row) {
return read_keyspace(row.get_as<sstring>("keyspace_name")
, row.get_as<bool>("durable_writes")
, row.get_as<sstring>("strategy_class")
, row.get_as<sstring>("strategy_options")
, row.get_as<db_clock::time_point>("timestamp")
).then([this](keyspace ks) {
_keyspaces.emplace_back(std::move(ks));
});
}).finally([result] {});
});
}
future<> drop_legacy_tables() {
mlogger.info("Dropping legacy schema tables");
auto with_snapshot = !_keyspaces.empty();
for (const sstring& cfname : legacy_schema_tables) {
co_await replica::database::legacy_drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
}
}
future<> store_keyspaces_in_new_schema_tables() {
mlogger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
_keyspaces.size(), db::schema_tables::v3::NAME);
utils::chunked_vector<mutation> mutations;
for (auto& ks : _keyspaces) {
auto ksm = ::make_lw_shared<keyspace_metadata>(ks.name
, ks.replication_params["class"] // TODO, make ksm like c3?
, cql3::statements::property_definitions::to_extended_map(ks.replication_params)
, std::nullopt
, std::nullopt
, ks.durable_writes);
// we want separate time stamps for tables/types, so cannot bulk them into the ksm.
for (auto&& m : db::schema_tables::make_create_keyspace_mutations(schema_features::full(), ksm, ks.timestamp.time_since_epoch().count(), false)) {
mutations.emplace_back(std::move(m));
}
for (auto& t : ks.tables) {
db::schema_tables::add_table_or_view_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), true, mutations);
}
for (auto& t : ks.types) {
db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations);
}
}
return _qp.proxy().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
}
future<> flush_schemas() {
auto& db = _qp.db().real_database().container();
return replica::database::flush_tables_on_all_shards(db, db::schema_tables::all_table_infos(schema_features::full()));
}
future<> migrate() {
return read_all_keyspaces().then([this]() {
// write metadata to the new schema tables
return store_keyspaces_in_new_schema_tables()
.then(std::bind(&migrator::flush_schemas, this))
.then(std::bind(&migrator::drop_legacy_tables, this))
.then([] { mlogger.info("Completed migration of legacy schema tables"); });
});
}
sharded<service::storage_proxy>& _sp;
sharded<replica::database>& _db;
sharded<db::system_keyspace>& _sys_ks;
cql3::query_processor& _qp;
std::vector<keyspace> _keyspaces;
};
const std::unordered_set<sstring> migrator::legacy_schema_tables = {
db::system_keyspace::legacy::KEYSPACES,
db::system_keyspace::legacy::COLUMNFAMILIES,
db::system_keyspace::legacy::COLUMNS,
db::system_keyspace::legacy::TRIGGERS,
db::system_keyspace::legacy::USERTYPES,
db::system_keyspace::legacy::FUNCTIONS,
db::system_keyspace::legacy::AGGREGATES,
};
}
}
future<>
db::legacy_schema_migrator::migrate(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp) {
return do_with(migrator(sp, db, sys_ks, qp), std::bind(&migrator::migrate, std::placeholders::_1));
}

View File

@@ -0,0 +1,37 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "seastarx.hh"
namespace replica {
class database;
}
namespace cql3 {
class query_processor;
}
namespace service {
class storage_proxy;
}
namespace db {
class system_keyspace;
namespace legacy_schema_migrator {
future<> migrate(sharded<service::storage_proxy>&, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor&);
}
}

View File

@@ -542,7 +542,6 @@ public:
// Returns the range tombstone for the key range adjacent to the cursor's position from the side of smaller keys.
// Excludes the range for the row itself. That information is returned by range_tombstone_for_row().
// It's possible that range_tombstone() is empty and range_tombstone_for_row() is not empty.
// Note that this is different from the meaning of rows_entry::range_tombstone(), which includes the row itself.
tombstone range_tombstone() const { return _range_tombstone; }
// Can be called when cursor is pointing at a row.

View File

@@ -1287,15 +1287,6 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
, _partitions(dht::raw_token_less_comparator{})
, _underlying(src())
, _snapshot_source(std::move(src))
, _update_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.update {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _populate_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.populate {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _read_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.read {}.{}", _schema->ks_name(), _schema->cf_name());
}))
{
try {
with_allocator(_tracker.allocator(), [this, cont] {

View File

@@ -143,7 +143,7 @@ static computed_columns_map get_computed_columns(const schema_mutations& sm);
static std::vector<column_definition> create_columns_from_column_rows(
const query::result_set& rows, const sstring& keyspace,
const sstring& table, bool is_super, column_view_virtual is_view_virtual, const computed_columns_map& computed_columns,
const sstring& table, column_view_virtual is_view_virtual, const computed_columns_map& computed_columns,
const data_dictionary::user_types_storage& user_types);
@@ -404,7 +404,10 @@ const std::unordered_set<table_id>& schema_tables_holding_schema_mutations() {
computed_columns(),
dropped_columns(),
indexes(),
scylla_tables()}) {
scylla_tables(),
db::system_keyspace::legacy::column_families(),
db::system_keyspace::legacy::columns(),
db::system_keyspace::legacy::triggers()}) {
SCYLLA_ASSERT(s->clustering_key_size() > 0);
auto&& first_column_name = s->clustering_column_at(0).name_as_text();
SCYLLA_ASSERT(first_column_name == "table_name"
@@ -1801,9 +1804,6 @@ static schema_mutations make_table_mutations(schema_ptr table, api::timestamp_ty
auto scylla_tables_mutation = make_scylla_tables_mutation(table, timestamp);
list_type_impl::native_type flags;
if (table->is_super()) {
flags.emplace_back("super");
}
if (table->is_dense()) {
flags.emplace_back("dense");
}
@@ -2277,7 +2277,6 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
auto id = table_id(table_row.get_nonnull<utils::UUID>("id"));
schema_builder builder{ks_name, cf_name, id};
auto cf = cf_type::standard;
auto is_dense = false;
auto is_counter = false;
auto is_compound = false;
@@ -2286,7 +2285,6 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
if (flags) {
for (auto& s : *flags) {
if (s == "super") {
// cf = cf_type::super;
fail(unimplemented::cause::SUPER);
} else if (s == "dense") {
is_dense = true;
@@ -2302,9 +2300,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
std::vector<column_definition> column_defs = create_columns_from_column_rows(
query::result_set(sm.columns_mutation()),
ks_name,
cf_name,/*,
fullRawComparator, */
cf == cf_type::super,
cf_name,
column_view_virtual::no,
computed_columns,
user_types);
@@ -2483,9 +2479,7 @@ static computed_columns_map get_computed_columns(const schema_mutations& sm) {
static std::vector<column_definition> create_columns_from_column_rows(
const query::result_set& rows,
const sstring& keyspace,
const sstring& table, /*,
AbstractType<?> rawComparator, */
bool is_super,
const sstring& table,
column_view_virtual is_view_virtual,
const computed_columns_map& computed_columns,
const data_dictionary::user_types_storage& user_types)
@@ -2562,12 +2556,12 @@ static schema_builder prepare_view_schema_builder_from_mutations(const schema_ct
}
auto computed_columns = get_computed_columns(sm);
auto column_defs = create_columns_from_column_rows(query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns, user_types);
auto column_defs = create_columns_from_column_rows(query::result_set(sm.columns_mutation()), ks_name, cf_name, column_view_virtual::no, computed_columns, user_types);
for (auto&& cdef : column_defs) {
builder.with_column_ordered(cdef);
}
if (sm.view_virtual_columns_mutation()) {
column_defs = create_columns_from_column_rows(query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns, user_types);
column_defs = create_columns_from_column_rows(query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, column_view_virtual::yes, computed_columns, user_types);
for (auto&& cdef : column_defs) {
builder.with_column_ordered(cdef);
}
@@ -2837,6 +2831,26 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
}
namespace legacy {
table_schema_version schema_mutations::digest() const {
md5_hasher h;
const db::schema_features no_features;
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, no_features);
db::schema_tables::feed_hash_for_schema_digest(h, _columns, no_features);
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s)
{
mutation cf_m = co_await read_schema_partition_for_table(proxy, s, keyspace_name, table_name);
mutation col_m = co_await read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name);
co_return schema_mutations{std::move(cf_m), std::move(col_m)};
}
} // namespace legacy
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);

View File

@@ -155,6 +155,24 @@ schema_ptr scylla_table_schema_history();
const std::unordered_set<table_id>& schema_tables_holding_schema_mutations();
}
namespace legacy {
class schema_mutations {
mutation _columnfamilies;
mutation _columns;
public:
schema_mutations(mutation columnfamilies, mutation columns)
: _columnfamilies(std::move(columnfamilies))
, _columns(std::move(columns))
{ }
table_schema_version digest() const;
};
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s);
}
struct qualified_name {
sstring keyspace_name;
sstring table_name;

View File

@@ -847,6 +847,8 @@ schema_ptr system_keyspace::corrupt_data() {
return corrupt_data;
}
static constexpr auto schema_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
/*static*/ schema_ptr system_keyspace::scylla_local() {
static thread_local auto scylla_local = [] {
schema_builder builder(generate_legacy_id(NAME, SCYLLA_LOCAL), NAME, SCYLLA_LOCAL,
@@ -1358,6 +1360,289 @@ schema_ptr system_keyspace::role_permissions() {
return schema;
}
schema_ptr system_keyspace::legacy::hints() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
// partition key
{{"target_id", uuid_type}},
// clustering key
{{"hint_id", timeuuid_type}, {"message_version", int32_type}},
// regular columns
{{"mutation", bytes_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* hints awaiting delivery"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"enabled", "false"}});
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::batchlog() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG,
// partition key
{{"id", uuid_type}},
// clustering key
{},
// regular columns
{{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* batchlog entries"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::keyspaces() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, KEYSPACES), NAME, KEYSPACES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{},
// regular columns
{
{"durable_writes", boolean_type},
{"strategy_class", utf8_type},
{"strategy_options", utf8_type}
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* keyspace definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::column_families() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNFAMILIES), NAME, COLUMNFAMILIES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}},
// regular columns
{
{"bloom_filter_fp_chance", double_type},
{"caching", utf8_type},
{"cf_id", uuid_type},
{"comment", utf8_type},
{"compaction_strategy_class", utf8_type},
{"compaction_strategy_options", utf8_type},
{"comparator", utf8_type},
{"compression_parameters", utf8_type},
{"default_time_to_live", int32_type},
{"default_validator", utf8_type},
{"dropped_columns", map_type_impl::get_instance(utf8_type, long_type, true)},
{"gc_grace_seconds", int32_type},
{"is_dense", boolean_type},
{"key_validator", utf8_type},
{"max_compaction_threshold", int32_type},
{"max_index_interval", int32_type},
{"memtable_flush_period_in_ms", int32_type},
{"min_compaction_threshold", int32_type},
{"min_index_interval", int32_type},
{"speculative_retry", utf8_type},
{"subcomparator", utf8_type},
{"type", utf8_type},
// The following 4 columns are only present up until 2.1.8 tables
{"key_aliases", utf8_type},
{"value_alias", utf8_type},
{"column_aliases", utf8_type},
{"index_interval", int32_type},},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* table definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::columns() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNS), NAME, COLUMNS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"column_name", utf8_type}},
// regular columns
{
{"component_index", int32_type},
{"index_name", utf8_type},
{"index_options", utf8_type},
{"index_type", utf8_type},
{"type", utf8_type},
{"validator", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"column definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::triggers() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, TRIGGERS), NAME, TRIGGERS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"trigger_name", utf8_type}},
// regular columns
{
{"trigger_options", map_type_impl::get_instance(utf8_type, utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"trigger definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::usertypes() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, USERTYPES), NAME, USERTYPES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"type_name", utf8_type}},
// regular columns
{
{"field_names", list_type_impl::get_instance(utf8_type, true)},
{"field_types", list_type_impl::get_instance(utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::functions() {
/**
* Note: we have our own "legacy" version of this table (in schema_tables),
* but it is (afaik) not used, and differs slightly from the origin one.
* This is based on the origin schema, since we're more likely to encounter
* installations of that to migrate, rather than our own (if we dont use the table).
*/
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, FUNCTIONS), NAME, FUNCTIONS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"function_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_names", list_type_impl::get_instance(utf8_type, true)},
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"body", utf8_type},
{"language", utf8_type},
{"return_type", utf8_type},
{"called_on_null_input", boolean_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::aggregates() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, AGGREGATES), NAME, AGGREGATES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"aggregate_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"final_func", utf8_type},
{"initcond", bytes_type},
{"return_type", utf8_type},
{"state_func", utf8_type},
{"state_type", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined aggregate definition"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::dicts() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, DICTS);
@@ -2330,6 +2615,13 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
r.insert(r.end(), {sstables_registry()});
}
// legacy schema
r.insert(r.end(), {
// TODO: once we migrate hints/batchlog and add converter
// legacy::hints(), legacy::batchlog(),
legacy::keyspaces(), legacy::column_families(),
legacy::columns(), legacy::triggers(), legacy::usertypes(),
legacy::functions(), legacy::aggregates(), });
return r;
}

View File

@@ -241,6 +241,28 @@ public:
static schema_ptr cdc_local();
};
struct legacy {
static constexpr auto HINTS = "hints";
static constexpr auto BATCHLOG = "batchlog";
static constexpr auto KEYSPACES = "schema_keyspaces";
static constexpr auto COLUMNFAMILIES = "schema_columnfamilies";
static constexpr auto COLUMNS = "schema_columns";
static constexpr auto TRIGGERS = "schema_triggers";
static constexpr auto USERTYPES = "schema_usertypes";
static constexpr auto FUNCTIONS = "schema_functions";
static constexpr auto AGGREGATES = "schema_aggregates";
static schema_ptr keyspaces();
static schema_ptr column_families();
static schema_ptr columns();
static schema_ptr triggers();
static schema_ptr usertypes();
static schema_ptr functions();
static schema_ptr aggregates();
static schema_ptr hints();
static schema_ptr batchlog();
};
// Partition estimates for a given range of tokens.
struct range_estimates {
schema_ptr schema;

View File

@@ -1744,115 +1744,6 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
&& std::ranges::contains(shards, this_shard_id());
}
static endpoints_to_update get_view_natural_endpoint_vnodes(
locator::host_id me,
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
locator::endpoint_dc_rack my_location,
const locator::network_topology_strategy* network_topology,
replica::cf_stats& cf_stats) {
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector base_endpoints, view_endpoints;
auto& my_datacenter = my_location.dc;
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (!network_topology || node.get().dc() == my_datacenter) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
view_endpoints.push_back(view_node);
}
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
}
static std::optional<locator::host_id> get_unpaired_view_endpoint(
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
replica::cf_stats& cf_stats) {
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
for (auto&& base_node : base_nodes) {
if (base_dc_racks.contains(base_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
base_node.get().dc(), base_node.get().rack());
return std::nullopt;
}
base_dc_racks.insert(base_node.get().dc_rack());
}
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
for (auto&& view_node : view_nodes) {
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
view_node.get().dc(), view_node.get().rack());
return std::nullopt;
}
// Track unpaired replicas in both sets
if (base_dc_racks.contains(view_node.get().dc_rack())) {
paired_view_dc_racks.insert(view_node.get().dc_rack());
} else {
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
}
}
if (unpaired_view_dc_rack_replicas.size() > 0) {
// There are view replicas that can't be paired with any base replica
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
if (unpaired_view_dc_rack_replicas.size() > 0) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
unpaired_view_dc_rack_replicas | std::views::values);
}
return extra_replica;
}
return std::nullopt;
}
// Calculate the node ("natural endpoint") to which this node should send
// a view update.
//
@@ -1865,19 +1756,29 @@ static std::optional<locator::host_id> get_unpaired_view_endpoint(
// of this function is to find, assuming that this node is one of the base
// replicas for a given partition, the paired view replica.
//
// When using vnodes, we have an optimization called "self-pairing" - if a single
// node is both a base replica and a view replica for a write, the pairing is
// modified so that this node sends the update to itself and this node is removed
// from the lists of nodes paired by index. This self-pairing optimization can
// cause the pairing to change after view ranges are moved between nodes.
// In the past, we used an optimization called "self-pairing" that if a single
// node was both a base replica and a view replica for a write, the pairing is
// modified so that this node would send the update to itself. This self-
// pairing optimization could cause the pairing to change after view ranges
// are moved between nodes, so currently we only use it if
// use_legacy_self_pairing is set to true. When using tablets - where range
// movements are common - it is strongly recommended to set it to false.
//
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
// we pair only nodes in the same datacenter.
//
// If the table uses tablets, then pairing is rack-aware. In this case, in each
// rack where we have a base replica there is also one replica of each view tablet.
// Therefore, the base replicas are naturally paired with the view replicas that
// are in the same rack.
// When use_legacy_self_pairing is enabled, if one of the base replicas
// also happens to be a view replica, it is paired with itself
// (with the other nodes paired by order in the list
// after taking this node out).
//
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
// and the replication factor in the node's datacenter is a multiple of the number
// of racks in the datacenter, then pairing is rack-aware. In this case,
// all racks have the same number of replicas, and those are never migrated
// outside their racks. Therefore, the base replicas are naturally paired with the
// view replicas that are in the same rack, based on the ordinal position.
// Note that typically, there is a single replica per rack and pairing is trivial.
//
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
@@ -1905,12 +1806,19 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_rack_aware_view_pairing,
replica::cf_stats& cf_stats) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
auto& my_location = topology.get_location(me);
auto& my_datacenter = my_location.dc;
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
bool simple_rack_aware_pairing = false;
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector orig_base_endpoints, orig_view_endpoints;
node_vector base_endpoints, view_endpoints;
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
if (auto* np = topology.find_node(ep)) {
@@ -1921,7 +1829,6 @@ endpoints_to_update get_view_natural_endpoint(
// We need to use get_replicas() for pairing to be stable in case base or view tablet
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
return resolve(topology, ep, false);
}) | std::ranges::to<node_vector>();
@@ -1945,43 +1852,231 @@ endpoints_to_update get_view_natural_endpoint(
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
auto leaving_base = it->get().host_id();
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
view_token, use_tablets, cf_stats);
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
}
}
}
if (!use_tablets) {
return get_view_natural_endpoint_vnodes(
me,
base_nodes,
view_nodes,
my_location,
network_topology,
cf_stats);
std::function<bool(const locator::node&)> is_candidate;
if (network_topology) {
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
} else {
is_candidate = [&] (const locator::node&) { return true; };
}
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (is_candidate(node)) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
std::optional<locator::host_id> paired_replica;
for (auto&& view_node : view_nodes) {
if (view_node.get().dc_rack() == my_location) {
paired_replica = view_node.get().host_id();
break;
if (use_legacy_self_pairing) {
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (is_candidate(view_node)) {
view_endpoints.push_back(view_node);
}
}
} else {
for (auto&& view_node : view_nodes) {
process_candidate(view_endpoints, view_node);
}
}
if (paired_replica && base_nodes.size() == view_nodes.size()) {
// We don't need to find any extra replicas, so we can return early
return {.natural_endpoint = paired_replica};
// Try optimizing for simple rack-aware pairing
// If the numbers of base and view replica differ, that means an RF change is taking place
// and we can't use simple rack-aware pairing.
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
// Simple rack-aware pairing is possible when the datacenter replication factor
// is a multiple of the number of racks in the datacenter.
if (dc_rf % racks.size() == 0) {
simple_rack_aware_pairing = true;
size_t rack_rf = dc_rf / racks.size();
// If any rack doesn't have enough nodes to satisfy the per-rack rf
// simple rack-aware pairing is disabled.
for (const auto& [rack, nodes] : racks) {
if (nodes.size() < rack_rf) {
simple_rack_aware_pairing = false;
break;
}
}
}
if (dc_rf != base_endpoints.size()) {
// If the datacenter replication factor is not equal to the number of base replicas,
// we're in progress of a RF change and we can't use simple rack-aware pairing.
simple_rack_aware_pairing = false;
}
if (simple_rack_aware_pairing) {
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
}
}
if (!paired_replica) {
// We couldn't find any view replica in our rack
orig_base_endpoints = base_endpoints;
orig_view_endpoints = view_endpoints;
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
// Use best-match, for the minimum number of base and view replicas in each rack,
// and ordinal match for the rest.
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
if (rack_aware_pairing && !simple_rack_aware_pairing) {
struct indexed_replica {
size_t idx;
std::reference_wrapper<const locator::node> node;
};
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
// First, index all replicas by rack
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
size_t idx = 0;
for (const auto& r: replicas) {
racks[r.get().rack()].emplace_back(idx++, r);
}
};
index_replica_set(base_racks, base_endpoints);
index_replica_set(view_racks, view_endpoints);
// Try optimistically pairing `me` first
const auto& my_base_replicas = base_racks[my_location.rack];
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
if (base_it == my_base_replicas.end()) {
return {};
}
const auto& my_view_replicas = view_racks[my_location.rack];
size_t idx = base_it - my_base_replicas.begin();
if (idx < my_view_replicas.size()) {
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
} else {
// If the number of view replicas is larger than the number of base replicas,
// we need to find the unpaired view replica, so we can't return yet.
paired_replica = my_view_replicas[idx].node;
}
}
// Collect all unpaired base and view replicas,
// where the number of replicas in the base rack is different than the respective view rack
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
for (const auto& [rack, base_replicas] : base_racks) {
const auto& view_replicas = view_racks[rack];
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
unpaired_base_replicas.emplace_back(base_replicas[i]);
}
}
for (const auto& [rack, view_replicas] : view_racks) {
const auto& base_replicas = base_racks[rack];
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
unpaired_view_replicas.emplace_back(view_replicas[i]);
}
}
// Sort by the original ordinality, and copy the sorted results
// back into {base,view}_endpoints, for backward compatible processing below.
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
base_endpoints.clear();
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
view_endpoints.clear();
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (!paired_replica && base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
if (!paired_replica && idx >= view_endpoints.size()) {
// There are fewer view replicas than base replicas
// FIXME: This might still happen when reducing replication factor with tablets,
// see https://github.com/scylladb/scylladb/issues/21492
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
me,
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
} else if (base_endpoints.size() < view_endpoints.size()) {
// There are fewer base replicas than view replicas.
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_endpoints.back();
if (base_endpoints.size() < view_endpoints.size() - 1) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
}
}
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
return {.natural_endpoint = paired_replica,
.endpoint_with_no_pairing = no_pairing_replica};
if (!paired_replica) {
paired_replica = view_endpoints[idx];
}
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
// This can happen when the view replica with no pairing is in another DC.
// We need to send an update to it if there are no base replicas in that DC yet,
// as it won't receive updates otherwise.
std::unordered_set<sstring> dcs_with_base_replicas;
for (const auto& base_node : base_nodes) {
dcs_with_base_replicas.insert(base_node.get().dc());
}
for (const auto& view_node : view_nodes) {
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_node;
break;
}
}
}
// https://github.com/scylladb/scylladb/issues/19439
// With tablets, a node being replaced might transition to "left" state
// but still be kept as a replica.
// As of writing this hints are not prepared to handle nodes that are left
// but are still replicas. Therefore, there is no other sensible option
// right now but to give up attempt to send the update or write a hint
// to the paired, permanently down replica.
// We use the same workaround for the extra replica.
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
if (!replica) {
return std::nullopt;
}
const auto& node = replica->get();
if (!node.left()) {
return node.host_id();
} else {
return std::nullopt;
}
};
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
}
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
@@ -2041,6 +2136,12 @@ future<> view_update_generator::mutate_MV(
{
auto& ks = _db.find_keyspace(base->ks_name());
auto& replication = ks.get_replication_strategy();
// We set legacy self-pairing for old vnode-based tables (for backward
// compatibility), and unset it for tablets - where range movements
// are more frequent and backward compatibility is less important.
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
// on a view, like we have the synchronous_updates_flag.
bool use_legacy_self_pairing = !ks.uses_tablets();
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
auto get_erm = [&] (table_id id) {
auto it = erms.find(id);
@@ -2053,6 +2154,10 @@ future<> view_update_generator::mutate_MV(
for (const auto& mut : view_updates) {
(void)get_erm(mut.s->id());
}
// Enable rack-aware view updates pairing for tablets
// when the cluster feature is enabled so that all replicas agree
// on the pairing algorithm.
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
auto me = base_ermp->get_topology().my_host_id();
static constexpr size_t max_concurrent_updates = 128;
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
@@ -2060,7 +2165,7 @@ future<> view_update_generator::mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto view_ermp = erms.at(mut.s->id());
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
ks.uses_tablets(), cf_stats);
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
if (no_pairing_endpoint) {

View File

@@ -305,7 +305,8 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_basic_rack_aware_view_pairing,
replica::cf_stats& cf_stats);
/// Verify that the provided keyspace is eligible for storing materialized views.

View File

@@ -1 +1 @@
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --collector.ethtool.metrics-include='(bw_in_allowance_exceeded|bw_out_allowance_exceeded|conntrack_allowance_exceeded|conntrack_allowance_available|linklocal_allowance_exceeded)' --collector.ethtool --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"

View File

@@ -28,8 +28,7 @@ Incremental Repair is only supported for tables that use the tablets architectur
Incremental Repair Modes
------------------------
Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation.
The available modes are:

View File

@@ -53,13 +53,13 @@ ScyllaDB nodetool cluster repair command supports the following options:
nodetool cluster repair --tablet-tokens 1,10474535988
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
For example:
::
nodetool cluster repair --incremental-mode disabled
nodetool cluster repair --incremental-mode regular
- ``keyspace`` executes a repair on a specific keyspace. The default is all keyspaces.

View File

@@ -110,6 +110,7 @@ To display the log classes (output changes with each version so your display may
keys
keyspace_utils
large_data
legacy_schema_migrator
lister
load_balancer
load_broadcaster

8
docs/poetry.lock generated
View File

@@ -1018,14 +1018,14 @@ sphinx-markdown-tables = "0.0.17"
[[package]]
name = "sphinx-scylladb-theme"
version = "1.8.10"
version = "1.8.9"
description = "A Sphinx Theme for ScyllaDB documentation projects"
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "sphinx_scylladb_theme-1.8.10-py3-none-any.whl", hash = "sha256:8b930f33bec7308ccaa92698ebb5ad85059bcbf93a463f92917aeaf473fce632"},
{file = "sphinx_scylladb_theme-1.8.10.tar.gz", hash = "sha256:8a78a9b692d9a946be2c4a64aa472fd82204cc8ea0b1ee7f60de6db35b356326"},
{file = "sphinx_scylladb_theme-1.8.9-py3-none-any.whl", hash = "sha256:f8649a7753a29494fd2b417d1cb855035dddb9ebd498ea033fd73f5f9338271e"},
{file = "sphinx_scylladb_theme-1.8.9.tar.gz", hash = "sha256:ab7cda4c10a0d067c5c3a45f7b1f68cb8ebefe135a0be0738bfa282a344769b6"},
]
[package.dependencies]
@@ -1603,4 +1603,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "0ae673106f45d3465cbdabbf511e165ca44feadd34d7753f2e68093afaa95c79"
content-hash = "74912627a3f424290ed7889451c0bdb1a862ab85b1d07c85f4f3b8c34f32a020"

View File

@@ -9,7 +9,7 @@ package-mode = false
python = "^3.10"
pygments = "^2.18.0"
redirects_cli ="^0.1.3"
sphinx-scylladb-theme = "^1.8.10"
sphinx-scylladb-theme = "^1.8.9"
sphinx-sitemap = "^2.6.0"
sphinx-autobuild = "^2024.4.19"
Sphinx = "^7.3.7"

View File

@@ -129,6 +129,6 @@ struct direct_fd_ping_reply {
std::variant<std::monostate, service::wrong_destination, service::group_liveness_info> result;
};
verb [[with_client_info, with_timeout, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
verb [[with_client_info, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
} // namespace service

View File

@@ -38,7 +38,6 @@ debian_base_packages=(
python3-aiohttp
python3-pyparsing
python3-colorama
python3-dev
python3-tabulate
python3-pytest
python3-pytest-asyncio
@@ -66,7 +65,6 @@ debian_base_packages=(
git-lfs
e2fsprogs
fuse3
libev-dev # for python driver
)
fedora_packages=(
@@ -92,7 +90,6 @@ fedora_packages=(
patchelf
python3
python3-aiohttp
python3-devel
python3-pip
python3-file-magic
python3-colorama
@@ -157,8 +154,6 @@ fedora_packages=(
https://github.com/scylladb/cassandra-stress/releases/download/v3.18.1/cassandra-stress-java21-3.18.1-1.noarch.rpm
elfutils
jq
libev-devel # for python driver
)
fedora_python3_packages=(

View File

@@ -200,10 +200,7 @@ enum class tablet_repair_incremental_mode : uint8_t {
disabled,
};
// FIXME: Incremental repair is disabled by default due to
// https://github.com/scylladb/scylladb/issues/26041 and
// https://github.com/scylladb/scylladb/issues/27414
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::disabled};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);

View File

@@ -39,6 +39,7 @@
#include "api/api_init.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/legacy_schema_migrator.hh"
#include "service/storage_service.hh"
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"
@@ -1640,7 +1641,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
fd.start(
std::ref(fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count(), dbcfg.gossip_scheduling_group).get();
service::direct_fd_clock::base::duration{std::chrono::milliseconds{cfg->direct_failure_detector_ping_timeout_in_ms()}}.count()).get();
auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] {
fd.stop().get();
@@ -1850,6 +1851,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
group0_client.init().get();
checkpoint(stop_signal, "initializing system schema");
// schema migration, if needed, is also done on shard 0
db::legacy_schema_migrator::migrate(proxy, db, sys_ks, qp.local()).get();
db::schema_tables::save_system_schema(qp.local()).get();
db::schema_tables::recalculate_schema_version(sys_ks, proxy, feature_service.local()).get();

View File

@@ -686,7 +686,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::RAFT_PULL_SNAPSHOT:
case messaging_verb::NOTIFY_BANNED:
case messaging_verb::DIRECT_FD_PING:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.
@@ -748,6 +747,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
case messaging_verb::DIRECT_FD_PING:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:

View File

@@ -575,15 +575,10 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
}
res.row.set_range_tombstone(cur.range_tombstone_for_row() + src_cur.range_tombstone());
if (need_preempt()) {
lb = position_in_partition(cur.position());
++tracker.get_stats().rows_covered_by_range_tombstones_from_memtable;
return stop_iteration::no;
}
// FIXME: Compact the row
++tracker.get_stats().rows_covered_by_range_tombstones_from_memtable;
cur.next();
// FIXME: preempt
}
}
{

View File

@@ -297,17 +297,17 @@ public:
const dht::token_range& token_range() const noexcept;
size_t memtable_count() const;
size_t memtable_count() const noexcept;
const compaction_group_ptr& main_compaction_group() const noexcept;
const std::vector<compaction_group_ptr>& split_ready_compaction_groups() const;
compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept;
uint64_t live_disk_space_used() const;
uint64_t live_disk_space_used() const noexcept;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const;
utils::small_vector<compaction_group_ptr, 3> compaction_groups();
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept;
utils::small_vector<compaction_group_ptr, 3> compaction_groups() noexcept;
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const noexcept;
utils::small_vector<compaction_group_ptr, 3> split_unready_groups() const;
bool split_unready_groups_are_empty() const;
@@ -430,7 +430,7 @@ public:
virtual storage_group& storage_group_for_token(dht::token) const = 0;
virtual utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;

View File

@@ -1133,7 +1133,7 @@ public:
// The tablet filter is used to not double account migrating tablets, so it's important that
// only one of pending or leaving replica is accounted based on current migration stage.
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept;
const db::view::stats& get_view_stats() const {
return _view_stats;

View File

@@ -234,12 +234,18 @@ distributed_loader::get_sstables_from_upload_dir(sharded<replica::database>& db,
}
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, type, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) {
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables), &get_abort_src, &db] (auto& global_table, auto& directory) {
return directory.start(global_table.as_sharded_parameter(),
sharded_parameter([bucket, endpoint, type, prefix, &get_abort_src] {
sharded_parameter([bucket, endpoint, prefix, &get_abort_src, &db] {
auto eps = db.local().get_config().object_storage_endpoints()
| std::views::filter([&endpoint](auto& ep) { return ep.key() == endpoint; })
;
if (eps.empty()) {
throw std::invalid_argument(fmt::format("Undefined endpoint {}", endpoint));
}
seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr;
auto opts = data_dictionary::make_object_storage_options(endpoint, type, bucket, prefix, as);
auto opts = data_dictionary::make_object_storage_options(endpoint, eps.front().type(), bucket, prefix, as);
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
}),
sstables,

View File

@@ -92,7 +92,7 @@ public:
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_upload_dir(sharded<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
static future<> process_upload_dir(sharded<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
};

View File

@@ -708,7 +708,7 @@ public:
return *_single_sg;
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const override {
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const noexcept override {
return locator::combined_load_stats{
.table_ls = locator::table_load_stats{
.size_in_bytes = _single_sg->live_disk_space_used(),
@@ -874,7 +874,7 @@ public:
return storage_group_for_id(storage_group_of(token).first);
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const override;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
bool all_storage_groups_split() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
@@ -922,7 +922,7 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran
return _main_cg;
}
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const {
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept {
action(_main_cg);
for (auto& cg : _merging_groups) {
action(cg);
@@ -932,7 +932,7 @@ void storage_group::for_each_compaction_group(std::function<void(const compactio
}
}
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() {
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() noexcept {
utils::small_vector<compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -940,7 +940,7 @@ utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups()
return cgs;
}
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const {
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const noexcept {
utils::small_vector<const_compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -1890,7 +1890,7 @@ sstables::file_size_stats compaction_group::live_disk_space_used_full_stats() co
return _main_sstables->get_file_size_stats() + _maintenance_sstables->get_file_size_stats();
}
uint64_t storage_group::live_disk_space_used() const {
uint64_t storage_group::live_disk_space_used() const noexcept {
auto cgs = const_cast<storage_group&>(*this).compaction_groups();
return std::ranges::fold_left(cgs | std::views::transform(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0), std::plus{});
}
@@ -2813,7 +2813,7 @@ void table::on_flush_timer() {
});
}
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::table_load_stats table_stats;
table_stats.split_ready_seq_number = _split_ready_seq_number;
@@ -2836,7 +2836,7 @@ locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std:
};
}
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
return _sg_manager->table_load_stats(std::move(tablet_filter));
}
@@ -3453,7 +3453,7 @@ size_t compaction_group::memtable_count() const noexcept {
return _memtables->size();
}
size_t storage_group::memtable_count() const {
size_t storage_group::memtable_count() const noexcept {
return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{});
}

View File

@@ -134,14 +134,11 @@ bool is_compatible(column_kind k1, column_kind k2);
enum class cf_type : uint8_t {
standard,
super,
};
inline sstring cf_type_to_sstring(cf_type t) {
if (t == cf_type::standard) {
return "Standard";
} else if (t == cf_type::super) {
return "Super";
}
throw std::invalid_argument(format("unknown type: {:d}\n", uint8_t(t)));
}
@@ -149,8 +146,6 @@ inline sstring cf_type_to_sstring(cf_type t) {
inline cf_type sstring_to_cf_type(sstring name) {
if (name == "Standard") {
return cf_type::standard;
} else if (name == "Super") {
return cf_type::super;
}
throw std::invalid_argument(format("unknown type: {}\n", name));
}
@@ -688,13 +683,13 @@ public:
}
bool is_cql3_table() const {
return !is_super() && !is_dense() && is_compound();
return !is_dense() && is_compound();
}
bool is_compact_table() const {
return !is_cql3_table();
}
bool is_static_compact_table() const {
return !is_super() && !is_dense() && !is_compound();
return !is_dense() && !is_compound();
}
const table_id& id() const {
@@ -711,10 +706,6 @@ public:
return _raw._type;
}
bool is_super() const {
return _raw._type == cf_type::super;
}
gc_clock::duration gc_grace_seconds() const {
auto seconds = std::chrono::seconds(_raw._props.gc_grace_seconds);
return std::chrono::duration_cast<gc_clock::duration>(seconds);

View File

@@ -38,9 +38,8 @@ for required in jq curl; do
fi
done
FORCE=0
ALLOW_SUBMODULE=0
ALLOW_UNSTABLE=0
ALLOW_ANY_BRANCH=0
function print_usage {
cat << EOF
@@ -61,18 +60,12 @@ Options:
-h
Print this help message and exit.
--allow-submodule
Allow a PR to update a submudule
--allow-unstable
--force
Do not check current branch to be next*
Do not check jenkins job status
--allow-any-branch
Merge PR even if target branch is not next
--force
Sets all above --allow-* options
--allow-submodule
Allow a PR to update a submudule
EOF
}
@@ -80,23 +73,13 @@ while [[ $# -gt 0 ]]
do
case $1 in
"--force"|"-f")
ALLOW_UNSTABLE=1
ALLOW_SUBMODULE=1
ALLOW_ANY_BRANCH=1
FORCE=1
shift 1
;;
--allow-submodule)
ALLOW_SUBMODULE=1
shift
;;
--allow-unstable)
ALLOW_UNSTABLE=1
shift
;;
--allow-any-branch)
ALLOW_ANY_BRANCH=1
shift
;;
+([0-9]))
PR_NUM=$1
shift 1
@@ -164,7 +147,7 @@ check_jenkins_job_status() {
fi
}
if [[ $ALLOW_UNSTABLE -eq 0 ]]; then
if [[ $FORCE -eq 0 ]]; then
check_jenkins_job_status
fi
@@ -196,19 +179,17 @@ echo -n "Fetching full name of author $PR_LOGIN... "
USER_NAME=$(curl -s "https://api.github.com/users/$PR_LOGIN" | jq -r .name)
echo "$USER_NAME"
if [[ $ALLOW_ANY_BRANCH -eq 0 ]]; then
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
TARGET_BASE="unknown"
if [[ ${BASE_BRANCH} == master ]]; then
TARGET_BASE="next"
elif [[ ${BASE_BRANCH} == branch-* ]]; then
TARGET_BASE=${BASE_BRANCH//branch/next}
fi
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}. Use --allow-any-branch or --force to skip this check"
exit 1
fi
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
TARGET_BASE="unknown"
if [[ ${BASE_BRANCH} == master ]]; then
TARGET_BASE="next"
elif [[ ${BASE_BRANCH} == branch-* ]]; then
TARGET_BASE=${BASE_BRANCH//branch/next}
fi
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}"
exit 1
fi
git fetch "$REMOTE" pull/$PR_NUM/head

View File

@@ -6,7 +6,6 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "seastar/core/scheduling.hh"
#include "utils/assert.hh"
#include <unordered_set>
@@ -18,7 +17,6 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/switch_to.hh>
#include "utils/log.hh"
@@ -120,7 +118,7 @@ struct failure_detector::impl {
// Fetches endpoint updates from _endpoint_queue and performs the add/remove operation.
// Runs on shard 0 only.
future<> update_endpoint_fiber(seastar::scheduling_group sg);
future<> update_endpoint_fiber();
future<> _update_endpoint_fiber = make_ready_future<>();
// Workers running on this shard.
@@ -142,7 +140,7 @@ struct failure_detector::impl {
// The unregistering process requires cross-shard operations which we perform on this fiber.
future<> _destroy_subscriptions = make_ready_future<>();
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg);
impl(failure_detector& parent, pinger&, clock&, clock::interval_t ping_period, clock::interval_t ping_timeout);
~impl();
// Inform update_endpoint_fiber() about an added/removed endpoint.
@@ -179,19 +177,19 @@ struct failure_detector::impl {
};
failure_detector::failure_detector(
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout, sg))
pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
: _impl(std::make_unique<impl>(*this, pinger, clock, ping_period, ping_timeout))
{}
failure_detector::impl::impl(
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout, seastar::scheduling_group sg)
failure_detector& parent, pinger& pinger, clock& clock, clock::interval_t ping_period, clock::interval_t ping_timeout)
: _parent(parent), _pinger(pinger), _clock(clock), _ping_period(ping_period), _ping_timeout(ping_timeout) {
if (this_shard_id() != 0) {
return;
}
_num_workers.resize(smp::count, 0);
_update_endpoint_fiber = update_endpoint_fiber(sg);
_update_endpoint_fiber = update_endpoint_fiber();
}
void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoint_update update) {
@@ -207,9 +205,9 @@ void failure_detector::impl::send_update_endpoint(pinger::endpoint_id ep, endpoi
_endpoint_changed.signal();
}
future<> failure_detector::impl::update_endpoint_fiber(seastar::scheduling_group sg) {
future<> failure_detector::impl::update_endpoint_fiber() {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await coroutine::switch_to(sg);
while (true) {
co_await _endpoint_changed.wait([this] { return !_endpoint_updates.empty(); });
@@ -482,7 +480,7 @@ static future<bool> ping_with_timeout(pinger::endpoint_id id, clock::timepoint_t
}
});
auto f = pinger.ping(id, timeout, timeout_as, c);
auto f = pinger.ping(id, timeout_as);
auto sleep_and_abort = [] (clock::timepoint_t timeout, abort_source& timeout_as, clock& c) -> future<> {
co_await c.sleep_until(timeout, timeout_as).then_wrapped([&timeout_as] (auto&& f) {
// Avoid throwing if sleep was aborted.

View File

@@ -19,6 +19,26 @@ class abort_source;
namespace direct_failure_detector {
class pinger {
public:
// Opaque endpoint ID.
// A specific implementation of `pinger` maps those IDs to 'real' addresses.
using endpoint_id = utils::UUID;
// Send a message to `ep` and wait until it responds.
// The wait can be aborted using `as`.
// Abort should be signalized with `abort_requested_exception`.
//
// If the ping fails in an expected way (e.g. the endpoint is down and refuses to connect),
// returns `false`. If it succeeds, returns `true`.
virtual future<bool> ping(endpoint_id ep, abort_source& as) = 0;
protected:
// The `pinger` object must not be destroyed through the `pinger` interface.
// `failure_detector` does not take ownership of `pinger`, only a non-owning reference.
~pinger() = default;
};
// A clock that uses abstract units to measure time.
// The implementation is responsible for periodically advancing the clock.
//
@@ -40,33 +60,12 @@ public:
// Aborts should be signalized using `seastar::sleep_aborted`.
virtual future<> sleep_until(timepoint_t tp, abort_source& as) = 0;
virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const = 0;
protected:
// The `clock` object must not be destroyed through the `clock` interface.
// `failure_detector` does not take ownership of `clock`, only a non-owning reference.
~clock() = default;
};
class pinger {
public:
// Opaque endpoint ID.
// A specific implementation of `pinger` maps those IDs to 'real' addresses.
using endpoint_id = utils::UUID;
// Send a message to `ep` and wait until it responds.
// The wait can be aborted using `as`.
// Abort should be signalized with `abort_requested_exception`.
//
// If the ping fails in an expected way (e.g. the endpoint is down and refuses to connect),
// returns `false`. If it succeeds, returns `true`.
virtual future<bool> ping(endpoint_id ep, clock::timepoint_t timeout, abort_source& as, clock& c) = 0;
protected:
// The `pinger` object must not be destroyed through the `pinger` interface.
// `failure_detector` does not take ownership of `pinger`, only a non-owning reference.
~pinger() = default;
};
class listener {
public:
// Called when an endpoint in the detected set (added by `failure_detector::add_endpoint`) responds to a ping
@@ -128,10 +127,7 @@ public:
// Duration after which a ping is aborted, so that next ping can be started
// (pings are sent sequentially).
clock::interval_t ping_timeout,
// Scheduling group used for fibers inside the failure detector.
seastar::scheduling_group sg
clock::interval_t ping_timeout
);
~failure_detector();

View File

@@ -18,7 +18,6 @@
#include "utils/error_injection.hh"
#include "seastar/core/shared_future.hh"
#include <chrono>
#include <seastar/core/coroutine.hh>
#include <seastar/core/when_all.hh>
#include <seastar/core/sleep.hh>
@@ -203,11 +202,8 @@ void raft_group_registry::init_rpc_verbs() {
});
ser::raft_rpc_verbs::register_direct_fd_ping(&_ms,
[this] (const rpc::client_info&, rpc::opt_time_point timeout, raft::server_id dst) -> future<direct_fd_ping_reply> {
if (timeout && *timeout <= netw::messaging_service::clock_type::now()) {
throw timed_out_error{};
}
[this] (const rpc::client_info&, raft::server_id dst) -> future<direct_fd_ping_reply> {
// XXX: update address map here as well?
if (_my_id != dst) {
return make_ready_future<direct_fd_ping_reply>(direct_fd_ping_reply {
@@ -217,10 +213,19 @@ void raft_group_registry::init_rpc_verbs() {
});
}
return make_ready_future<direct_fd_ping_reply>(direct_fd_ping_reply {
.result = service::group_liveness_info{
.group0_alive = _group0_is_alive,
return container().invoke_on(0, [] (raft_group_registry& me) -> future<direct_fd_ping_reply> {
bool group0_alive = false;
if (me._group0_id) {
auto* group0_server = me.find_server(*me._group0_id);
if (group0_server && group0_server->is_alive()) {
group0_alive = true;
}
}
co_return direct_fd_ping_reply {
.result = service::group_liveness_info{
.group0_alive = group0_alive,
}
};
});
});
}
@@ -375,12 +380,6 @@ future<> raft_group_registry::start_server_for_group(raft_server_for_group new_g
co_await server.abort();
std::rethrow_exception(ex);
}
if (gid == _group0_id) {
co_await container().invoke_on_all([] (raft_group_registry& rg) {
rg._group0_is_alive = true;
});
}
}
future<> raft_group_registry::abort_server(raft::group_id gid, sstring reason) {
@@ -390,18 +389,14 @@ future<> raft_group_registry::abort_server(raft::group_id gid, sstring reason) {
if (const auto it = _servers.find(gid); it != _servers.end()) {
auto& [gid, s] = *it;
if (!s.aborted) {
if (gid == _group0_id) {
co_await container().invoke_on_all([] (raft_group_registry& rg) {
rg._group0_is_alive = false;
});
}
s.aborted = s.server->abort(std::move(reason))
.handle_exception([gid] (std::exception_ptr ex) {
rslog.warn("Failed to abort raft group server {}: {}", gid, ex);
});
}
co_await s.aborted->get_future();
return s.aborted->get_future();
}
return make_ready_future<>();
}
unsigned raft_group_registry::shard_for_group(const raft::group_id& gid) const {
@@ -522,13 +517,11 @@ future<> raft_server_with_timeouts::read_barrier(seastar::abort_source* as, std:
}, "read_barrier", as, timeout);
}
future<bool> direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) {
future<bool> direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) {
auto dst_id = raft::server_id{id};
try {
std::chrono::milliseconds timeout_ms = c.to_milliseconds(timeout);
netw::messaging_service::clock_type::time_point deadline = netw::messaging_service::clock_type::now() + timeout_ms;
auto reply = co_await ser::raft_rpc_verbs::send_direct_fd_ping(&_ms, locator::host_id{id}, deadline, as, dst_id);
auto reply = co_await ser::raft_rpc_verbs::send_direct_fd_ping(&_ms, locator::host_id{id}, as, dst_id);
if (auto* wrong_dst = std::get_if<wrong_destination>(&reply.result)) {
// FIXME: after moving to host_id based verbs we will not get `wrong_destination`
// any more since the connection will fail
@@ -561,11 +554,4 @@ future<> direct_fd_clock::sleep_until(direct_failure_detector::clock::timepoint_
return sleep_abortable(t - n, as);
}
std::chrono::milliseconds direct_fd_clock::to_milliseconds(direct_failure_detector::clock::timepoint_t tp) const {
auto t = base::time_point{base::duration{tp}};
auto n = base::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(t - n);
}
} // end of namespace service

View File

@@ -127,7 +127,6 @@ private:
// My Raft ID. Shared between different Raft groups.
raft::server_id _my_id;
bool _group0_is_alive = false;
public:
raft_group_registry(raft::server_id my_id, netw::messaging_service& ms,
direct_failure_detector::failure_detector& fd);
@@ -182,9 +181,6 @@ public:
unsigned shard_for_group(const raft::group_id& gid) const;
shared_ptr<raft::failure_detector> failure_detector();
direct_failure_detector::failure_detector& direct_fd() { return _direct_fd; }
bool is_group0_alive() const {
return _group0_is_alive;
}
};
// Implementation of `direct_failure_detector::pinger` which uses DIRECT_FD_PING verb for pinging.
@@ -202,7 +198,7 @@ public:
direct_fd_pinger(const direct_fd_pinger&) = delete;
direct_fd_pinger(direct_fd_pinger&&) = delete;
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) override;
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override;
};
// XXX: find a better place to put this?
@@ -211,7 +207,6 @@ struct direct_fd_clock : public direct_failure_detector::clock {
direct_failure_detector::clock::timepoint_t now() noexcept override;
future<> sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) override;
std::chrono::milliseconds to_milliseconds(direct_failure_detector::clock::timepoint_t tp) const override;
};
} // end of namespace service

View File

@@ -6688,11 +6688,10 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
}
};
auto request = std::make_unique<read_cas_request>();
auto* request_ptr = request.get();
auto request = seastar::make_shared<read_cas_request>();
return cas(std::move(s), std::move(cas_shard), *request_ptr, cmd, std::move(partition_ranges), std::move(query_options),
cl, db::consistency_level::ANY, timeout, cas_timeout, false).then([request = std::move(request)] (bool is_applied) mutable {
return cas(std::move(s), std::move(cas_shard), request, cmd, std::move(partition_ranges), std::move(query_options),
cl, db::consistency_level::ANY, timeout, cas_timeout, false).then([request] (bool is_applied) mutable {
return make_ready_future<coordinator_query_result>(std::move(request->res));
});
}
@@ -6755,13 +6754,11 @@ static mutation_write_failure_exception read_failure_to_write(read_failure_excep
* NOTE: `cmd` argument can be nullptr, in which case it's guaranteed that this function would not perform
* any reads of committed values (in case user of the function is not interested in them).
*
* NOTE: The `request` object must be guaranteed to be alive until the returned future is resolved.
*
* WARNING: the function must be called on a shard that owns the key cas() operates on.
* The cas_shard must be created *before* selecting the shard, to protect against
* concurrent tablet migrations.
*/
future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, cas_request& request, lw_shared_ptr<query::read_command> cmd,
future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, shared_ptr<cas_request> request, lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges, storage_proxy::coordinator_query_options query_options,
db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn,
clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write, cdc::per_request_options cdc_opts) {
@@ -6862,7 +6859,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, cas_requ
qr = std::move(cqr.query_result);
}
auto mutation = request.apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot), cdc_opts);
auto mutation = request->apply(std::move(qr), cmd->slice, utils::UUID_gen::micros_timestamp(ballot), cdc_opts);
condition_met = true;
if (!mutation) {
if (write) {

View File

@@ -829,7 +829,7 @@ public:
clock_type::time_point timeout,
tracing::trace_state_ptr trace_state = nullptr);
future<bool> cas(schema_ptr schema, cas_shard cas_shard, cas_request& request, lw_shared_ptr<query::read_command> cmd,
future<bool> cas(schema_ptr schema, cas_shard cas_shard, shared_ptr<cas_request> request, lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges, coordinator_query_options query_options,
db::consistency_level cl_for_paxos, db::consistency_level cl_for_learn,
clock_type::time_point write_timeout, clock_type::time_point cas_timeout, bool write = true, cdc::per_request_options cdc_opts = {});

View File

@@ -57,10 +57,7 @@ public:
index_list indexes;
index_consumer(logalloc::region& r, schema_ptr s)
: _s(s)
, _alloc_section(abstract_formatter([s] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "index_consumer {}.{}", s->ks_name(), s->cf_name());
}))
: _s(std::move(s))
, _region(r)
{ }
@@ -788,9 +785,6 @@ public:
_sstable->manager().get_cache_tracker().region(),
_sstable->manager().get_cache_tracker().get_partition_index_cache_stats()))
, _index_cache(caching ? *_sstable->_index_cache : *_local_index_cache)
, _alloc_section(abstract_formatter([sst = _sstable] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "index_reader {}", sst->get_filename());
}))
, _region(_sstable->manager().get_cache_tracker().region())
, _use_caching(caching)
, _single_page_read(single_partition_read) // all entries for a given partition are within a single page

View File

@@ -284,9 +284,6 @@ public:
, _clustering_parser(s, permit, _ctr.clustering_column_value_fix_legths(), true)
, _block_parser(s, permit, _ctr.clustering_column_value_fix_legths())
, _permit(std::move(permit))
, _as(abstract_formatter([s] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "cached_promoted_index {}.{}", s.ks_name(), s.cf_name());
}))
{ }
~cached_promoted_index() {

View File

@@ -2485,6 +2485,11 @@ void sstable::validate_originating_host_id() const {
}
return;
}
if (*originating_host_id != local_host_id) {
// FIXME refrain from throwing an exception because of #10148
sstlog.warn("Host id {} does not match local host id {} while validating SSTable: {}. Load foreign SSTables via the upload dir instead.", *originating_host_id, local_host_id, get_filename());
}
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,

View File

@@ -135,17 +135,13 @@ future<> storage_manager::update_config(const db::config& cfg) {
co_return;
}
auto storage_manager::get_endpoint(const sstring& endpoint) -> object_storage_endpoint& {
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
auto found = _object_storage_endpoints.find(endpoint);
if (found == _object_storage_endpoints.end()) {
smlogger.error("unable to find {} in configured object-storage endpoints", endpoint);
throw std::invalid_argument(format("endpoint {} not found", endpoint));
}
return found->second;
}
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
auto& ep = get_endpoint(endpoint);
auto& ep = found->second;
if (ep.client == nullptr) {
ep.client = make_object_storage_client(ep.cfg, _object_storage_clients_memory, [&ct = container()] (std::string ep) {
return ct.local().get_endpoint_client(ep);
@@ -154,10 +150,6 @@ shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client
return ep.client;
}
sstring storage_manager::get_endpoint_type(sstring endpoint) {
return get_endpoint(endpoint).cfg.type();
}
bool storage_manager::is_known_endpoint(sstring endpoint) const {
return _object_storage_endpoints.contains(endpoint);
}

View File

@@ -70,7 +70,6 @@ class storage_manager : public peering_sharded_service<storage_manager> {
seastar::metrics::metric_groups metrics;
future<> update_config(const db::config&);
object_storage_endpoint& get_endpoint(const sstring& ep);
public:
struct config {
@@ -81,7 +80,6 @@ public:
storage_manager(const db::config&, config cfg);
shared_ptr<object_storage_client> get_endpoint_client(sstring endpoint);
bool is_known_endpoint(sstring endpoint) const;
sstring get_endpoint_type(sstring endpoint);
future<> stop();
std::vector<sstring> endpoints(sstring type = "") const noexcept;
};

View File

@@ -205,13 +205,6 @@ private:
}
bool tablet_in_scope(locator::tablet_id) const;
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
// Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from
// the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else.
static future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
};
host_id_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const {
@@ -350,52 +343,55 @@ public:
}
};
future<std::vector<tablet_sstable_collection>> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
auto tablets_sstables =
tablets_ranges | std::views::transform([](auto range) { return tablet_sstable_collection{.tablet_range = range}; }) | std::ranges::to<std::vector>();
if (sstables.empty() || tablets_sstables.empty()) {
co_return std::move(tablets_sstables);
}
// sstables are sorted by first key in reverse order.
auto reversed_sstables = sstables | std::views::reverse;
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : tablets_sstables) {
for (const auto& sst : reversed_sstables) {
auto sst_first = sst->get_first_decorated_key().token();
auto sst_last = sst->get_last_decorated_key().token();
// SSTable entirely after tablet -> no further SSTables (larger keys) can overlap
if (tablet_range.after(sst_first, dht::token_comparator{})) {
break;
}
// SSTable entirely before tablet -> skip and continue scanning later (larger keys)
if (tablet_range.before(sst_last, dht::token_comparator{})) {
continue;
}
if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) {
sstables_fully_contained.push_back(sst);
} else {
sstables_partially_contained.push_back(sst);
}
co_await coroutine::maybe_yield();
}
}
co_return std::move(tablets_sstables);
}
future<> tablet_sstable_streamer::stream(shared_ptr<stream_progress> progress) {
if (progress) {
progress->start(_tablet_map.tablet_count());
}
auto classified_sstables = co_await get_sstables_for_tablets(
_sstables, _tablet_map.tablet_ids() | std::views::filter([this](auto tid) { return tablet_in_scope(tid); }) | std::views::transform([this](auto tid) {
return _tablet_map.get_token_range(tid);
}) | std::ranges::to<std::vector>());
// sstables are sorted by first key in reverse order.
auto sstable_it = _sstables.rbegin();
for (auto tablet_id : _tablet_map.tablet_ids() | std::views::filter([this] (auto tid) { return tablet_in_scope(tid); })) {
auto tablet_range = _tablet_map.get_token_range(tablet_id);
auto sstable_token_range = [] (const sstables::shared_sstable& sst) {
return dht::token_range(sst->get_first_decorated_key().token(),
sst->get_last_decorated_key().token());
};
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
// sstable is exhausted if its last key is before the current tablet range
auto exhausted = [&tablet_range] (const sstables::shared_sstable& sst) {
return tablet_range.before(sst->get_last_decorated_key().token(), dht::token_comparator{});
};
while (sstable_it != _sstables.rend() && exhausted(*sstable_it)) {
sstable_it++;
}
for (auto sst_it = sstable_it; sst_it != _sstables.rend(); sst_it++) {
auto sst_token_range = sstable_token_range(*sst_it);
// sstables are sorted by first key, so should skip this SSTable since it
// doesn't overlap with the current tablet range.
if (!tablet_range.overlaps(sst_token_range, dht::token_comparator{})) {
// If the start of the next SSTable's token range lies beyond the current tablet's token
// range, we can safely conclude that no more relevant SSTables remain for this tablet.
if (tablet_range.after(sst_token_range.start()->value(), dht::token_comparator{})) {
break;
}
continue;
}
if (tablet_range.contains(sst_token_range, dht::token_comparator{})) {
sstables_fully_contained.push_back(*sst_it);
} else {
sstables_partially_contained.push_back(*sst_it);
}
co_await coroutine::maybe_yield();
}
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : classified_sstables) {
auto per_tablet_progress = make_shared<per_tablet_stream_progress>(
progress,
sstables_fully_contained.size() + sstables_partially_contained.size());
@@ -755,9 +751,8 @@ future<> sstables_loader::download_task_impl::run() {
};
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
auto ep_type = _loader.local()._storage_manager.get_endpoint_type(_endpoint);
std::vector<seastar::abort_source> shard_aborts(smp::count);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, ep_type, _bucket, _prefix, cfg, [&] {
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg, [&] {
return &shard_aborts[this_shard_id()];
});
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
@@ -837,7 +832,3 @@ future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, s
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
co_return task->id();
}
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));
}

View File

@@ -10,8 +10,6 @@
#include <vector>
#include <seastar/core/sharded.hh>
#include "dht/i_partitioner_fwd.hh"
#include "dht/token.hh"
#include "schema/schema_fwd.hh"
#include "sstables/shared_sstable.hh"
#include "tasks/task_manager.hh"
@@ -154,18 +152,3 @@ struct fmt::formatter<sstables_loader::stream_scope> : fmt::formatter<string_vie
}
}
};
struct tablet_sstable_collection {
dht::token_range tablet_range;
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
};
// This function is intended for test purposes only.
// It assigns the given sstables to the given tablet ranges based on token containment.
// It returns a vector of tablet_sstable_collection, each containing the tablet range
// and the sstables that are fully or partially contained within that range.
// The prerequisite is the tablet ranges are sorted by the range in ascending order and non-overlapping.
// Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order.
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);

View File

@@ -205,7 +205,7 @@ def test_batch_write_invalid_operation(test_table_s):
# In test_item.py we have a bunch of test_empty_* tests on different ways to
# create an empty item (which in Scylla requires the special CQL row marker
# to be supported correctly). BatchWriteItem provides yet another way of
# to be supported correctly). BatchWriteItems provides yet another way of
# creating items, so check the empty case here too:
def test_empty_batch_write(test_table):
p = random_string()
@@ -214,7 +214,7 @@ def test_empty_batch_write(test_table):
batch.put_item({'p': p, 'c': c})
assert test_table.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item'] == {'p': p, 'c': c}
# Test that BatchWriteItem allows writing to multiple tables in one operation
# Test that BatchWriteItems allows writing to multiple tables in one operation
def test_batch_write_multiple_tables(test_table_s, test_table):
p1 = random_string()
c1 = random_string()

View File

@@ -370,7 +370,6 @@ add_scylla_test(combined_tests
sstable_compression_config_test.cc
sstable_directory_test.cc
sstable_set_test.cc
sstable_tablet_streaming.cc
statement_restrictions_test.cc
storage_proxy_test.cc
tablets_test.cc

View File

@@ -1450,7 +1450,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
std::map<sstring, replication_strategy_config_option> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto rf = num_racks;
auto max_rf_factor = std::ranges::min(std::ranges::views::transform(node_count_per_rack.at(dc), [] (auto& x) { return x.second; }));
auto rf = num_racks * tests::random::get_int(1UL, max_rf_factor);
options.emplace(dc, fmt::to_string(rf));
}
return options;
@@ -1486,7 +1487,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_tablets = true;
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
@@ -1500,7 +1502,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
*ars_ptr,
base_token,
view_token,
use_tablets,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
cf_stats).natural_endpoint;
// view pair must be found
@@ -1522,6 +1525,181 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
}
}
// Called in a seastar thread
void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
std::map<sstring, size_t> node_count_per_dc;
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
std::vector<ring_point> ring_points;
auto& random_engine = seastar::testing::local_random_engine;
unsigned shard_count = 2;
size_t num_dcs = 1 + tests::random::get_int(3);
// Generate a random cluster
double point = 1;
for (size_t dc = 0; dc < num_dcs; ++dc) {
sstring dc_name = fmt::format("{}", 100 + dc);
size_t num_racks = 2 + tests::random::get_int(4);
for (size_t rack = 0; rack < num_racks; ++rack) {
sstring rack_name = fmt::format("{}", 10 + rack);
size_t rack_nodes = 1 + tests::random::get_int(2);
for (size_t i = 1; i <= rack_nodes; ++i) {
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
node_count_per_dc[dc_name]++;
node_count_per_rack[dc_name][rack_name]++;
point++;
}
}
}
testlog.debug("node_count_per_rack={}", node_count_per_rack);
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
co_await tm.update_normal_tokens(std::move(tokens), id);
}
}).get();
auto base_schema = schema_builder("ks", "base")
.with_column("k", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
auto view_schema = schema_builder("ks", "view")
.with_column("v", utf8_type, column_kind::partition_key)
.with_column("k", utf8_type)
.build();
auto tmptr = stm.get();
// Create the replication strategy
auto make_random_options = [&] () {
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
std::map<sstring, replication_strategy_config_option> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto rf = more_or_less ?
tests::random::get_int(num_racks, node_count_per_dc[dc]) :
tests::random::get_int(1UL, num_racks);
options.emplace(dc, fmt::to_string(rf));
}
return options;
};
auto options = make_random_options();
size_t tablet_count = 1 + tests::random::get_int(99);
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", params, tmptr->get_topology());
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
auto base_table_id = base_schema->id();
testlog.debug("base_table_id={}", base_table_id);
auto view_table_id = view_schema->id();
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
testlog.debug("view_table_id={}", view_table_id);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
tm.tablets().set_tablet_map(base_table_id, co_await base_tmap.clone_gently());
tm.tablets().set_tablet_map(view_table_id, co_await view_tmap.clone_gently());
}).get();
tmptr = stm.get();
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
auto& topology = tmptr->get_topology();
testlog.debug("topology: {}", topology.get_datacenter_racks());
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
std::unordered_map<sstring, size_t> same_rack_pairs;
std::unordered_map<sstring, size_t> cross_rack_pairs;
for (const auto& base_replica : base_replicas) {
auto& base_host = base_replica.host;
auto view_ep_opt = db::view::get_view_natural_endpoint(
base_host,
base_erm,
view_erm,
*ars_ptr,
base_token,
view_token,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
cf_stats).natural_endpoint;
// view pair must be found
if (!view_ep_opt) {
BOOST_FAIL(format("Could not pair base_host={} base_token={} view_token={}", base_host, base_token, view_token));
}
BOOST_REQUIRE(view_ep_opt);
auto& view_ep = *view_ep_opt;
// Assert pairing uniqueness
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
BOOST_REQUIRE(inserted_base_pair);
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
BOOST_REQUIRE(inserted_view_pair);
auto& base_location = topology.find_node(base_host)->dc_rack();
auto& view_location = topology.find_node(view_ep)->dc_rack();
// Assert dc- and rack- aware pairing
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
if (base_location.rack == view_location.rack) {
same_rack_pairs[base_location.dc]++;
} else {
cross_rack_pairs[base_location.dc]++;
}
}
for (const auto& [dc, rf_opt] : options) {
auto rf = locator::get_replication_factor(rf_opt);
BOOST_REQUIRE_EQUAL(same_rack_pairs[dc] + cross_rack_pairs[dc], rf);
}
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_lt_racks) {
test_complex_rack_aware_view_pairing_test(false);
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_gt_racks) {
test_complex_rack_aware_view_pairing_test(true);
}
SEASTAR_THREAD_TEST_CASE(test_rack_diff) {
BOOST_REQUIRE(diff_racks({}, {}).empty());

View File

@@ -1,367 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "dht/token.hh"
#include "sstable_test.hh"
#include "sstables_loader.hh"
#include "test/lib/sstable_test_env.hh"
BOOST_AUTO_TEST_SUITE(sstable_tablet_streaming_test)
using namespace sstables;
std::vector<shared_sstable> make_sstables_with_ranges(test_env& env, const std::vector<std::pair<int64_t, int64_t>>& ranges) {
std::vector<shared_sstable> ssts;
for (const auto& [first, last] : ranges) {
auto sst = env.make_sstable(uncompressed_schema(), uncompressed_dir());
test(sst).set_first_and_last_keys(dht::decorated_key(dht::token{first}, partition_key(std::vector<bytes>{"1"})),
dht::decorated_key(dht::token{last}, partition_key(std::vector<bytes>{"1"})));
ssts.push_back(std::move(sst));
}
// By sorting SSTables by their primary key, we enable runs to be
// streamed incrementally. Overlapping fragments can be deduplicated,
// reducing the amount of data sent over the wire. Elements are
// popped from the back of the vector, so we sort in descending
// order to begin with the smaller tokens.
// See sstable_streamer constructor for more details.
std::ranges::sort(ssts, [](const shared_sstable& x, const shared_sstable& y) { return x->compare_by_first_key(*y) > 0; });
return ssts;
}
std::vector<dht::token_range> get_tablet_sstable_collection(auto&&... tablet_ranges) {
// tablet ranges are left-non-inclusive, see `tablet_map::get_token_range` for details
std::vector<dht::token_range> collections{dht::token_range::make({tablet_ranges.start()->value(), false}, {tablet_ranges.end()->value(), true})...};
std::sort(collections.begin(), collections.end(), [](auto const& a, auto const& b) { return a.start()->value() < b.start()->value(); });
return collections;
}
#define REQUIRE_WITH_CONTEXT(sstables, expected_size) \
BOOST_TEST_CONTEXT("Testing with ranges: " << [&] { \
std::stringstream ss; \
for (const auto& sst : (sstables)) { \
ss << dht::token_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token()) << ", "; \
} \
return ss.str(); \
}()) \
BOOST_REQUIRE_EQUAL(sstables.size(), expected_size)
SEASTAR_TEST_CASE(test_streaming_ranges_distribution) {
return test_env::do_with_async([](test_env& env) {
// 1) Exact boundary equality: SSTable == tablet
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{5, 10},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 2) Single-point overlaps at start/end
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{4, 5}, // touches start, non-inclusive, skip
{10, 11}, // touches end
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 3) Tablet fully inside a large SSTable
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 20},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 4) Multiple SSTables fully contained in tablet
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{6, 7},
{7, 8},
{8, 9},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 3);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 5) Two overlapping but not fully contained SSTables
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 6}, // overlaps at left
{9, 15}, // overlaps at right
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 6) Unsorted input (helper sorts) + mixed overlaps
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{50}, dht::token{100}});
// Intentionally unsorted by first token
auto ssts = make_sstables_with_ranges(env,
{
{120, 130},
{0, 10},
{60, 70}, // fully contained
{40, 55}, // partial
{95, 105}, // partial
{80, 90}, // fully contained
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 7) Empty SSTable list
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
std::vector<shared_sstable> ssts;
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 8) Tablet outside all SSTables
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
auto ssts = make_sstables_with_ranges(env,
{
{1, 2},
{3, 4},
{10, 20},
{300, 400},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 9) Boundary adjacency with multiple fragments
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
auto ssts = make_sstables_with_ranges(env,
{
{50, 100}, // touches start -> non-inclusive, skip
{100, 120}, // starts at start -> partially contained
{180, 200}, // ends at end -> fully contained
{200, 220}, // touches end -> partial
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 1);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 10) Large SSTable set where early break should occur
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{1000}, dht::token{2000}});
auto ssts = make_sstables_with_ranges(env,
{
{100, 200},
{300, 400},
{900, 950},
{1001, 1100}, // fully contained
{1500, 1600}, // fully contained
{2101, 2200}, // entirely after -> should trigger early break in ascending scan
{1999, 2100}, // overlap, partially contained
{3000, 3100},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 10) https://github.com/scylladb/scylladb/pull/26980 example, tested
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{4}, dht::token{5}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 5},
{0, 3},
{2, 5},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
// None fully contained; three partial overlaps
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
});
}
SEASTAR_TEST_CASE(test_streaming_ranges_distribution_in_tablets) {
return test_env::do_with_async([](test_env& env) {
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}}, dht::token_range{dht::token{11}, dht::token{15}});
auto ssts = make_sstables_with_ranges(env,
{
{5, 10},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
}
{
// Multiple tablets with a hole between [10,11]
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}},
dht::token_range{dht::token{5}, dht::token{9}},
dht::token_range{dht::token{12}, dht::token{15}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 4}, // T.start==S.start, but non-inclusive -> partial
{5, 9}, // same as above
{6, 8}, // fully in second tablet
{10, 11}, // falls in the hole, should be rejected
{8, 13}, // overlaps second and third tablets (partial in both)
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 2);
REQUIRE_WITH_CONTEXT(res[2].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
}
{
// SSTables outside any tablet range
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{20}, dht::token{25}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 5}, // before
{30, 35}, // after
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
{
// Edge case: SSTable touching tablet boundary
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{4, 5}, // touches start, non-inclusive, skip
{10, 11}, // touches end
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
{
// No tablets, but some SSTables
auto collection = get_tablet_sstable_collection();
auto ssts = make_sstables_with_ranges(env,
{
{0, 5},
{10, 15},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
BOOST_REQUIRE_EQUAL(res.size(), 0); // no tablets → nothing to classify
}
{
// No SSTables, but some tablets
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{10}, dht::token{15}});
std::vector<shared_sstable> ssts; // empty
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
}
{
// No tablets and no SSTables
auto collection = get_tablet_sstable_collection();
std::vector<shared_sstable> ssts; // empty
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
BOOST_REQUIRE_EQUAL(res.size(), 0);
}
{
// SSTable spanning two tablets
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}}, dht::token_range{dht::token{5}, dht::token{9}});
auto ssts = make_sstables_with_ranges(env,
{
{2, 7}, // spans both tablets
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
// Tablet [0,4] sees partial overlap
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
// Tablet [5,9] sees partial overlap
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
}
{
// SSTable spanning three tablets with a hole in between
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{3}},
dht::token_range{dht::token{4}, dht::token{6}},
dht::token_range{dht::token{8}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{2, 9}, // spans across tablets 1,2,3 and hole [7]
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
}
{
// SSTable fully covering one tablet and partially overlapping another
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{6}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 7}, // fully covers first tablet, partial in second
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
}
});
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -395,7 +395,7 @@ SEASTAR_TEST_CASE(test_builder_with_concurrent_drop) {
assert_that(msg).is_rows().is_empty();
msg = e.execute_cql("select * from system_distributed.view_build_status").get();
assert_that(msg).is_rows().is_empty();
}, 30);
});
});
}

View File

@@ -13,8 +13,7 @@ import ssl
import tempfile
import platform
import urllib.parse
from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from multiprocessing import Event, Process
from pathlib import Path
from typing import TYPE_CHECKING
from test.pylib.runner import testpy_test_fixture_scope
@@ -187,14 +186,15 @@ async def manager_api_sock_path(request: pytest.FixtureRequest, testpy_test: Tes
await asyncio.get_running_loop().run_in_executor(None, stop_event.wait)
finally:
await mgr.stop()
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, run_manager())
start_event.wait()
yield sock_path
manager_process = Process(target=lambda: asyncio.run(run_manager()))
manager_process.start()
start_event.wait()
stop_event.set()
future.result()
yield sock_path
stop_event.set()
manager_process.join()
@pytest.fixture(scope=testpy_test_fixture_scope)

View File

@@ -25,7 +25,7 @@ from typing import Any, Optional, override
import pytest
import requests
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.cluster import NoHostAvailable, Session
from cassandra.query import SimpleStatement, named_tuple_factory
from ccmlib.scylla_node import ScyllaNode, NodeError
@@ -1135,14 +1135,6 @@ class TestCQLAudit(AuditTester):
session.execute("DROP TABLE test1")
def _get_attempt_count(self, session: Session, *, execution_profile=EXEC_PROFILE_DEFAULT, consistency_level: ConsistencyLevel = ConsistencyLevel.ONE) -> int:
# dtest env is using FlakyRetryPolicy which has `max_retries` attribute
cl_profile = session.execution_profile_clone_update(execution_profile, consistency_level=consistency_level)
policy = cl_profile.retry_policy
retries = getattr(policy, "max_retries", None)
assert retries is not None
return 1 + retries
def _test_insert_failure_doesnt_report_success_assign_nodes(self, session: Session = None):
all_nodes: set[ScyllaNode] = set(self.cluster.nodelist())
assert len(all_nodes) == 7
@@ -1162,7 +1154,6 @@ class TestCQLAudit(AuditTester):
for i in range(256):
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES ({i}, 1337)", consistency_level=ConsistencyLevel.THREE)
session.execute(stmt)
attempt_count = self._get_attempt_count(session, consistency_level=ConsistencyLevel.THREE)
token = rows_to_list(session.execute(f"SELECT token(k) FROM ks.test1 WHERE k = {i}"))[0][0]
@@ -1177,9 +1168,9 @@ class TestCQLAudit(AuditTester):
audit_partition_nodes = [address_to_node[address] for address in audit_nodes]
insert_node = address_to_node[insert_node.pop()]
kill_node = address_to_node[partitions.pop()]
return audit_partition_nodes, insert_node, kill_node, stmt.query_string, attempt_count
return audit_partition_nodes, insert_node, kill_node, stmt.query_string
return [], [], None, None, None
return [], [], None, None
@pytest.mark.exclude_errors("audit - Unexpected exception when writing log with: node_ip")
def test_insert_failure_doesnt_report_success(self):
@@ -1201,7 +1192,7 @@ class TestCQLAudit(AuditTester):
with self.assert_exactly_n_audit_entries_were_added(session, 1):
conn.execute(stmt)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail, query_fail_count = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
# TODO: remove the loop when scylladb#24473 is fixed
# We call get_host_id only to cache host_id
@@ -1240,8 +1231,8 @@ class TestCQLAudit(AuditTester):
# If any audit mode is not done yet, continue polling.
all_modes_done = True
for mode, rows in rows_dict.items():
rows_with_error = [row for row in rows if row.error and row.operation == query_to_fail]
if len(rows_with_error) == query_fail_count:
rows_with_error = list(filter(lambda r: r.error, rows))
if len(rows_with_error) == 6:
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
assert rows_with_error[0].error is True
assert rows_with_error[0].consistency == "THREE"

View File

@@ -16,26 +16,16 @@ from test.cluster.util import get_topology_coordinator, new_test_keyspace, recon
logger = logging.getLogger(__name__)
# This test makes sure that view building is done mainly in the streaming
# scheduling group. We check that by grepping all relevant logs in TRACE mode
# and verifying that they come from the streaming scheduling group.
#
# For more context, see: https://github.com/scylladb/scylladb/issues/21232.
# This test reproduces the issue in non-tablet mode.
# This test makes sure that view building is done mainly in the streaming scheduling group
# and not the gossip scheduling group. We do that by measuring the time each group was
# busy during the view building process and confirming that the gossip group was busy
# much less than the streaming group.
# Reproduces https://github.com/scylladb/scylladb/issues/21232
@pytest.mark.asyncio
@skip_mode('debug', 'the test needs to do some work which takes too much time in debug mode')
async def test_view_building_scheduling_group(manager: ManagerClient):
# Note: The view building coordinator works in the gossiping scheduling group,
# and we intentionally omit it here.
# Note: We include "view" for keyspaces that don't use the view building coordinator
# and will follow the legacy path instead.
loggers = ["view_building_worker", "view_consumer", "view_update_generator", "view"]
# Flatten the list of lists.
cmdline = sum([["--logger-log-level", f"{logger}=trace"] for logger in loggers], [])
server = await manager.server_add(cmdline=cmdline)
server = await manager.server_add()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (p int, c int, PRIMARY KEY (p, c))")
@@ -45,30 +35,21 @@ async def test_view_building_scheduling_group(manager: ManagerClient):
batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(inserts) + "\nAPPLY BATCH\n"
await manager.cql.run_async(batch)
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
metrics_before = await manager.metrics.query(server.ip_addr)
ms_gossip_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv AS SELECT p, c FROM {ks}.tab WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
await wait_for_view(cql, 'mv', 1)
logger_alternative = "|".join(loggers)
pattern = rf"\[shard [0-9]+:(.+)\] ({logger_alternative}) - "
results = await log.grep(pattern, from_mark=mark)
# Sanity check. If there are no logs, something's wrong.
assert len(results) > 0
# In case of non-tablet keyspaces, we won't use the view building coordinator.
# Instead, view updates will follow the legacy path. Along the way, we'll observe
# this message, which will be printed using another scheduling group, so let's
# filter it out.
predicate = lambda result: f"Building view {ks}.mv, starting at token" not in result[0]
results = list(filter(predicate, results))
# Take the first parenthesized match for each result, i.e. the scheduling group.
sched_groups = [matches[1] for _, matches in results]
assert all(sched_group == "strm" for sched_group in sched_groups)
metrics_after = await manager.metrics.query(server.ip_addr)
ms_gossip_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
ms_streaming = ms_streaming_after - ms_streaming_before
ms_statement = ms_gossip_after - ms_gossip_before
ratio = ms_statement / ms_streaming
print(f"ms_streaming: {ms_streaming}, ms_statement: {ms_statement}, ratio: {ratio}")
assert ratio < 0.1
# A sanity check test ensures that starting and shutting down Scylla when view building is
# disabled is conducted properly and we don't run into any issues.

View File

@@ -25,14 +25,12 @@ import json
from cassandra.auth import PlainTextAuthProvider
import threading
import random
import re
from test.cluster.util import get_replication
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.conftest import skip_mode
from test.pylib.tablets import get_tablet_replica
logger = logging.getLogger(__name__)
@@ -971,118 +969,3 @@ async def test_alternator_concurrent_rmw_same_partition_different_server(manager
t.join()
finally:
table.delete()
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
"""
Reproducer for issue #27353.
LWT requires that storage_proxy::cas() is invoked on a valid shard — the one
returned by sharder.try_get_shard_for_reads() for a tablets-based table.
The bug: if the current shard is invalid and we jump to the valid shard, that
new shard may become invalid again by the time we attempt to capture the ERM.
This leads to a failure of the CAS path.
The fix: retry the validity check and jump again if the current shard is already
invalid. We should exit the loop once the shard is valid *and* we hold a strong pointer
to the ERM — which prevents further tablet movements until the ERM is released.
This problem is specific to BatchWriteItem; other commands are already handled
correctly.
"""
config = alternator_config.copy()
config['alternator_write_isolation'] = 'always_use_lwt'
cmdline = [
'--logger-log-level', 'alternator-executor=trace',
'--logger-log-level', 'alternator_controller=trace',
'--logger-log-level', 'paxos=trace'
]
server = await manager.server_add(config=config, cmdline=cmdline)
alternator = get_alternator(server.ip_addr)
logger.info("Creating alternator test table")
table = alternator.create_table(TableName=unique_table_name(),
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
BillingMode='PAY_PER_REQUEST',
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N'}])
table_name = table.name
ks_name = 'alternator_' + table_name
last_token = 7 # Any token works since we have only one tablet
(src_host_id, src_shard) = await get_tablet_replica(manager, server, ks_name, table_name, last_token)
dst_shard = 0 if src_shard == 1 else 1
logger.info("Inject 'intranode_migration_streaming_wait'")
await manager.api.enable_injection(server.ip_addr,
"intranode_migration_streaming_wait",
one_shot=False)
logger.info("Start tablet migration")
intranode_migration_task = asyncio.create_task(
manager.api.move_tablet(server.ip_addr, ks_name, table_name,
src_host_id, src_shard,
src_host_id, dst_shard, last_token))
logger.info("Open server logs")
log = await manager.server_open_log(server.server_id)
logger.info("Wait for intranode_migration_streaming_wait")
await log.wait_for("intranode_migration_streaming: waiting")
logger.info("Inject 'alternator_executor_batch_write_wait'")
await manager.api.enable_injection(server.ip_addr,
"alternator_executor_batch_write_wait",
one_shot=False,
parameters={
'table': table_name,
'keyspace': ks_name,
'shard': dst_shard
})
m = await log.mark()
# Start a background thread, which tries to hit the alternator_executor_batch_write_wait
# injection on the destination shard.
logger.info("Start a batch_write thread")
stop_event = threading.Event()
def run_batch():
alternator = get_alternator(server.ip_addr)
table = alternator.Table(table_name)
while not stop_event.is_set():
with table.batch_writer() as batch:
batch.put_item(Item={'p': 1, 'x': 'hellow world'})
t = ThreadWrapper(target=run_batch)
t.start()
logger.info("Waiting for 'alternator_executor_batch_write_wait: hit'")
await log.wait_for("alternator_executor_batch_write_wait: hit", from_mark=m)
# We have a batch request with "streaming" cas_shard on the destination shard.
# This means we have already made a decision to jump to the src_shard.
# Now we're releasing the tablet migration so that it reaches write_both_read_new and
# and invaldiates this decision.
m = await log.mark()
await manager.api.message_injection(server.ip_addr, "intranode_migration_streaming_wait")
# The next barrier must be for the write_both_read_new, we need a guarantee
# that the src_shard observed it
logger.info("Waiting for the next barrier")
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
from_mark=m)
# Now we have a guarantee that a new barrier succeeded on the src_shard,
# this means the src_shard has already transitioned to write_both_read_new,
# and our batch write will have to jump back to the destination shard.
logger.info("Release the 'alternator_executor_batch_write_wait'")
await manager.api.message_injection(server.ip_addr, "alternator_executor_batch_write_wait")
logger.info("Waiting for migratino task to finish")
await intranode_migration_task
stop_event.set()
t.join()

View File

@@ -220,14 +220,14 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 0, 100)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
skipped_bytes = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes = get_incremental_repair_sst_read_bytes(servers[0])
# Nothing to skip. Repair all data.
assert skipped_bytes == 0
assert read_bytes > 0
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
skipped_bytes2 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes2 = get_incremental_repair_sst_read_bytes(servers[0])
# Skip all. Nothing to repair
@@ -236,7 +236,7 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 200, 300)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
skipped_bytes3 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes3 = get_incremental_repair_sst_read_bytes(servers[0])
# Both skipped and read bytes should grow
@@ -272,7 +272,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert get_sstables_repaired_at(map0, token) == sstables_repaired_at
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
map1 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map1={map1}')
# Check sstables_repaired_at is increased by 1
@@ -288,7 +288,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert len(enable) == 1
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
map2 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map2={map2}')
# Check sstables_repaired_at is increased by 1
@@ -313,7 +313,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
# Repair should not finish with error
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
try:
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental', timeout=10)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, timeout=10)
assert False # Check the tablet repair is not supposed to finish
except TimeoutError:
logger.info("Repair timeout as expected")
@@ -329,7 +329,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# 1 add 0 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
@@ -355,7 +355,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
else:
assert False # Wrong ops
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# 1 add 1 skip 1 mark
for log in logs:
@@ -394,7 +394,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -402,7 +402,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -436,7 +436,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
@@ -445,7 +445,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 3
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -505,7 +505,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
await manager.server_start(servers[1].server_id)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
scylla_path = get_scylla_path(cql)
@@ -521,8 +521,8 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -532,7 +532,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
# Second repair
await inject_error_on(manager, "repair_tablet_no_update_sstables_repair_at", servers)
# some sstable repaired_at = 3, but sstables_repaired_at = 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await inject_error_off(manager, "repair_tablet_no_update_sstables_repair_at", servers)
scylla_path = get_scylla_path(cql)
@@ -561,8 +561,8 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -574,7 +574,7 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
last_tokens = [t.last_token for t in replicas]
for t in last_tokens[0::2]:
logging.info(f"Start repair for token={t}");
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t, incremental_mode='incremental') # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t) # sstables_repaired_at 3
scylla_path = get_scylla_path(cql)
@@ -595,7 +595,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -659,18 +659,13 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
assert read1 == 0
assert skip2 == 0
assert read2 > 0
await do_repair_and_check('incremental', 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
def check2(skip1, read1, skip2, read2):
assert skip1 == skip2
assert read1 == read2
await do_repair_and_check('disabled', 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
# FIXME: Incremental repair is disabled by default due to
# https://github.com/scylladb/scylladb/issues/26041 and
# https://github.com/scylladb/scylladb/issues/27414
await do_repair_and_check(None, 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
def check3(skip1, read1, skip2, read2):
assert skip1 < skip2
assert read1 == read2
@@ -682,14 +677,14 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
await do_repair_and_check('full', 1, rf'Starting tablet repair by API .* incremental_mode=full.*', check4)
@pytest.mark.asyncio
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
for s in servers:
time1 += get_repair_tablet_time_ms(s)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
for s in servers:
time2 += get_repair_tablet_time_ms(s)
@@ -699,7 +694,7 @@ async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
# Reproducer for https://github.com/scylladb/scylladb/issues/26346
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_incremental_repair_finishes_when_tablet_skips_end_repair_stage(manager):
async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
servers = await manager.servers_add(3, auto_rack_dc="dc1")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
@@ -724,7 +719,7 @@ async def test_incremental_repair_finishes_when_tablet_skips_end_repair_stage(ma
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_incremental_repair_rejoin_do_tablet_operation(manager):
async def test_repair_rejoin_do_tablet_operation(manager):
cmdline = ['--logger-log-level', 'raft_topology=debug']
servers = await manager.servers_add(3, auto_rack_dc="dc1", cmdline=cmdline)

View File

@@ -10,7 +10,6 @@ import logging
from test.pylib.rest_client import inject_error_one_shot
from test.cluster.util import new_test_keyspace
from test.pylib.util import gather_safely
logger = logging.getLogger(__name__)
@@ -34,12 +33,25 @@ async def test_broken_bootstrap(manager: ManagerClient):
except Exception:
pass
await gather_safely(*(manager.server_stop(srv.server_id) for srv in [server_a, server_b]))
await manager.server_stop(server_b.server_id)
await manager.server_stop(server_a.server_id)
stop_event = asyncio.Event()
async def worker():
logger.info("Worker started")
while not stop_event.is_set():
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
await asyncio.sleep(0.1)
logger.info("Worker stopped")
await manager.server_start(server_a.server_id)
await manager.driver_connect()
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
worker_task = asyncio.create_task(worker())
await asyncio.sleep(20)
stop_event.set()
await worker_task

View File

@@ -4,8 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from typing import Any
from cassandra.query import ConsistencyLevel, SimpleStatement
from cassandra.policies import FallthroughRetryPolicy
from cassandra.query import ConsistencyLevel
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
from test.pylib.manager_client import ManagerClient
@@ -1597,7 +1596,7 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
async def truncate_table():
await asyncio.sleep(10)
logger.info("Executing truncate during bootstrap")
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
await cql.run_async(f"TRUNCATE {ks}.test USING TIMEOUT 1m")
truncate_task = asyncio.create_task(truncate_table())
logger.info("Adding fourth node")

View File

@@ -67,11 +67,11 @@ nodetool_cmd.conf = False
# Run the external "nodetool" executable (can be overridden by the NODETOOL
# environment variable). Only call this if the REST API doesn't work.
def run_nodetool(cql, *args, **subprocess_kwargs):
def run_nodetool(cql, *args):
# TODO: We may need to change this function or its callers to add proper
# support for testing on multi-node clusters.
host = cql.cluster.contact_points[0]
return subprocess.run([nodetool_cmd(), '-h', host, *args], **subprocess_kwargs)
subprocess.run([nodetool_cmd(), '-h', host, *args])
def flush(cql, table):
ks, cf = table.split('.')
@@ -157,28 +157,6 @@ def disablebinary(cql):
else:
run_nodetool(cql, "disablebinary")
def getlogginglevel(cql, logger):
if has_rest_api(cql):
resp = requests.get(f'{rest_api_url(cql)}/system/logger/{logger}')
if resp.ok:
return resp.text.strip()
raise RuntimeError(f"failed to fetch logging level for {logger}: {resp.status_code} {resp.text}")
result = run_nodetool(
cql,
"getlogginglevels",
capture_output=True,
text=True,
check=True,
)
for line in result.stdout.splitlines():
stripped = line.strip()
parts = stripped.split()
if len(parts) >= 2 and parts[0] == logger:
return parts[-1]
raise RuntimeError(f"logger {logger} not found in getlogginglevels output")
def setlogginglevel(cql, logger, level):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/system/logger/{logger}', params={'level': level})

View File

@@ -10,7 +10,6 @@ import re
import requests
import socket
import struct
from test.cqlpy import nodetool
from test.cqlpy.util import cql_session
def get_protocol_error_metrics(host) -> int:
@@ -59,50 +58,11 @@ def try_connect(host, port, creds, protocol_version):
with cql_with_protocol(host, port, creds, protocol_version) as session:
return 1 if session else 0
@pytest.fixture
def debug_exceptions_logging(request, cql):
def _read_level() -> str | None:
try:
level = nodetool.getlogginglevel(cql, "exception")
if level:
level = level.strip().strip('"').lower()
return level
except Exception as exc:
print(f"Failed to read exception logger level: {exc}")
return None
def _set_and_verify(level: str) -> bool:
try:
nodetool.setlogginglevel(cql, "exception", level)
except Exception as exc:
print(f"Failed to set exception logger level to '{level}': {exc}")
return False
observed = _read_level()
if observed == level:
return True
print(f"Exception logger level observed as '{observed}' while expecting '{level}'")
return False
def _restore_logging():
if not enabled and previous_level is None:
return
target_level = previous_level or "info"
_set_and_verify(target_level)
previous_level = _read_level()
enabled = _set_and_verify("debug")
yield
_restore_logging()
# If there is a protocol version mismatch, the server should
# raise a protocol error, which is counted in the metrics.
def test_protocol_version_mismatch(scylla_only, debug_exceptions_logging, request, host):
run_count = 200
cpp_exception_threshold = 20
def test_protocol_version_mismatch(scylla_only, request, host):
run_count = 100
cpp_exception_threshold = 10
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)
@@ -284,8 +244,8 @@ def _protocol_error_impl(
s.close()
def _test_impl(host, flag):
run_count = 200
cpp_exception_threshold = 20
run_count = 100
cpp_exception_threshold = 10
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)
@@ -307,47 +267,47 @@ def no_ssl(request):
yield
# Malformed BATCH with an invalid kind triggers a protocol error.
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, host):
_test_impl(host, "trigger_bad_batch")
# Send OPTIONS during AUTHENTICATE to trigger auth-state error.
def test_unexpected_message_during_auth(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_unexpected_message_during_auth(scylla_only, no_ssl, host):
_test_impl(host, "trigger_unexpected_auth")
# STARTUP with an invalid/missing string-map entry should produce a protocol error.
def test_process_startup_invalid_string_map(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_process_startup_invalid_string_map(scylla_only, no_ssl, host):
_test_impl(host, "trigger_process_startup_invalid_string_map")
# STARTUP with unknown COMPRESSION option should produce a protocol error.
def test_unknown_compression_algorithm(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_unknown_compression_algorithm(scylla_only, no_ssl, host):
_test_impl(host, "trigger_unknown_compression")
# QUERY long-string truncation: declared length > provided bytes triggers protocol error.
def test_process_query_internal_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_process_query_internal_malformed_query(scylla_only, no_ssl, host):
_test_impl(host, "trigger_process_query_internal_malformed_query")
# QUERY options malformed: PAGE_SIZE flag set but page_size truncated triggers protocol error.
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, host):
_test_impl(host, "trigger_process_query_internal_fail_read_options")
# PREPARE long-string truncation: declared length > provided bytes triggers protocol error.
def test_process_prepare_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_process_prepare_malformed_query(scylla_only, no_ssl, host):
_test_impl(host, "trigger_process_prepare_malformed_query")
# EXECUTE cache-key malformed: short-bytes length > provided bytes triggers protocol error.
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, host):
_test_impl(host, "trigger_process_execute_internal_malformed_cache_key")
# REGISTER malformed string list: declared string length > provided bytes triggers protocol error.
def test_process_register_malformed_string_list(scylla_only, no_ssl, debug_exceptions_logging, host):
def test_process_register_malformed_string_list(scylla_only, no_ssl, host):
_test_impl(host, "trigger_process_register_malformed_string_list")
# Test if the protocol exceptions do not decrease after running the test happy path.
# This is to ensure that the protocol exceptions are not cleared or reset
# during the test execution.
def test_no_protocol_exceptions(scylla_only, no_ssl, debug_exceptions_logging, host):
run_count = 200
cpp_exception_threshold = 20
def test_no_protocol_exceptions(scylla_only, no_ssl, host):
run_count = 100
cpp_exception_threshold = 10
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)

View File

@@ -881,7 +881,7 @@ private:
_fd.start(
std::ref(_fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{600}}.count(), gcfg.gossip_scheduling_group).get();
service::direct_fd_clock::base::duration{std::chrono::milliseconds{600}}.count()).get();
auto stop_fd = defer_verbose_shutdown("direct failure detector", [this] {
_fd.stop().get();

View File

@@ -163,11 +163,6 @@ public:
_sst->_shards.push_back(this_shard_id());
}
void set_first_and_last_keys(const dht::decorated_key& first_key, const dht::decorated_key& last_key) {
_sst->_first = first_key;
_sst->_last = last_key;
}
void rewrite_toc_without_component(component_type component) {
SCYLLA_ASSERT(component != component_type::TOC);
_sst->_recognized_components.erase(component);

View File

@@ -30,7 +30,7 @@ static const int cell_size = 128;
static bool cancelled = false;
template<typename MutationGenerator>
void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen, std::function<mutation()> before_flush = {}) {
void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
tests::reader_concurrency_semaphore_wrapper semaphore;
cache_tracker tracker;
row_cache cache(s, make_empty_snapshot_source(), tracker, is_continuous::yes);
@@ -58,10 +58,6 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen, std::f
return;
}
}
if (before_flush) {
mutation m = before_flush();
mt->apply(m);
}
});
memtable_slm.stop();
std::cout << format("Memtable fill took {:.6f} [ms], {}", fill_d.count() * 1000, memtable_slm) << std::endl;
@@ -185,43 +181,6 @@ static void test_partition_with_lots_of_small_rows() {
});
}
static void test_partition_with_lots_of_small_rows_covered_by_tombstone() {
auto s = schema_builder("ks", "cf")
.with_column("pk", uuid_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v1", bytes_type, column_kind::regular_column)
.with_column("v2", bytes_type, column_kind::regular_column)
.with_column("v3", bytes_type, column_kind::regular_column)
.build();
auto pk = dht::decorate_key(*s, partition_key::from_single_value(*s,
serialized(utils::UUID_gen::get_time_UUID())));
int ck_idx = 0;
int flush_ck_idx = 0;
run_test("Large partition, lots of small rows covered by single tombstone", s, [&] {
mutation m(s, pk);
auto val = data_value(bytes(bytes::initialized_later(), cell_size));
auto ck = clustering_key::from_single_value(*s, serialized(ck_idx++));
auto ts = api::new_timestamp();
m.set_clustered_cell(ck, "v1", val, ts);
m.set_clustered_cell(ck, "v2", val, ts);
m.set_clustered_cell(ck, "v3", val, ts);
return m;
}, [&] { // before_flush
// Delete key range [-inf, flush_ck_idx)
std::cout << "Generated " << (ck_idx - flush_ck_idx) << " rows\n";
auto m = mutation(s, pk);
auto ck = clustering_key::from_single_value(*s, serialized(flush_ck_idx));
m.partition().apply_row_tombstone(*s, range_tombstone(
position_in_partition_view::before_all_clustered_rows(),
position_in_partition_view::before_key(ck),
tombstone(api::new_timestamp(), gc_clock::now())));
flush_ck_idx = ck_idx;
return m;
});
}
static void test_partition_with_few_small_rows() {
auto s = schema_builder("ks", "cf")
.with_column("pk", uuid_type, column_kind::partition_key)
@@ -316,7 +275,6 @@ int scylla_row_cache_update_main(int argc, char** argv) {
cancelled = true;
});
logalloc::prime_segment_pool(memory::stats().total_memory(), memory::min_free_memory()).get();
test_partition_with_lots_of_small_rows_covered_by_tombstone();
test_small_partitions();
test_partition_with_few_small_rows();
test_partition_with_lots_of_small_rows();

View File

@@ -109,7 +109,6 @@ class ResourceGather(ABC):
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise

View File

@@ -789,7 +789,7 @@ class ScyllaServer:
while time.time() < self.start_time + self.TOPOLOGY_TIMEOUT and not self.stop_event.is_set():
assert self.cmd is not None
if self.cmd.returncode is not None:
if self.cmd.returncode:
self.cmd = None
if expected_error is not None:
with self.log_filename.open("r", encoding="utf-8") as log_file:

View File

@@ -31,7 +31,7 @@ struct test_pinger: public direct_failure_detector::pinger {
std::unordered_map<endpoint_id, size_t> _pings;
bool _block = false;
virtual future<bool> ping(endpoint_id ep, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) override {
virtual future<bool> ping(endpoint_id ep, abort_source& as) override {
bool ret = false;
co_await invoke_abortable_on(0, [this, ep, &ret] (abort_source& as) -> future<> {
++_pings[ep];
@@ -91,9 +91,6 @@ struct test_clock : public direct_failure_detector::clock {
throw sleep_aborted{};
}
}
virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const override {
throw std::logic_error("to_milliseconds is not implemented");
}
};
struct test_listener : public direct_failure_detector::listener {
@@ -132,7 +129,7 @@ SEASTAR_TEST_CASE(failure_detector_test) {
test_pinger pinger;
test_clock clock;
sharded<direct_failure_detector::failure_detector> fd;
co_await fd.start(std::ref(pinger), std::ref(clock), 10, 30, seastar::current_scheduling_group());
co_await fd.start(std::ref(pinger), std::ref(clock), 10, 30);
test_listener l1, l2;
auto sub1 = co_await fd.local().register_listener(l1, 95);

View File

@@ -1065,7 +1065,7 @@ public:
}
// Can be called on any shard.
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) override {
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override {
try {
co_await invoke_abortable_on(0, [this, id] (abort_source& as) {
return _rpc.ping(raft::server_id{id}, as);
@@ -1127,10 +1127,6 @@ public:
throw sleep_aborted{};
}
}
virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const override {
throw std::logic_error("to_milliseconds is not implemented");
}
};
class direct_fd_listener : public raft::failure_detector, public direct_failure_detector::listener {
@@ -1440,7 +1436,7 @@ public:
// _fd_service must be started before raft server,
// because as soon as raft server is started, it may start adding endpoints to the service.
// _fd_service is using _server's RPC, but not until the first endpoint is added.
co_await _fd_service->start(std::ref(*_fd_pinger), std::ref(*_fd_clock), fd_ping_period.count(), fd_ping_timeout.count(), seastar::current_scheduling_group());
co_await _fd_service->start(std::ref(*_fd_pinger), std::ref(*_fd_clock), fd_ping_period.count(), fd_ping_timeout.count());
_fd_subscription.emplace(co_await _fd_service->local().register_listener(*_fd_listener, _fd_convict_threshold.count()));
co_await _server->start();
}

View File

@@ -654,7 +654,7 @@ void cluster_repair_operation(scylla_rest_client& client, const bpo::variables_m
for (const auto& table : tables.empty() ? ks_to_cfs[keyspace] : tables) {
repair_params["table"] = table;
try {
sstring task_id = rjson::to_sstring(client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"]);
sstring task_id = client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"].GetString();
log("Starting repair with task_id={} keyspace={} table={}", task_id, keyspace, table);

View File

@@ -1,4 +1,4 @@
FROM registry.fedoraproject.org/fedora:43
FROM docker.io/fedora:42
ARG CLANG_BUILD="SKIP"
ARG CLANG_ARCHIVES

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-43-20251208
docker.io/scylladb/scylla-toolchain:fedora-42-20251122

View File

@@ -65,7 +65,7 @@ SCYLLA_BUILD_DIR_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_BUILD_DIR}"
SCYLLA_NINJA_FILE_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_NINJA_FILE}"
# Which LLVM release to build in order to compile Scylla
LLVM_CLANG_TAG=21.1.6
LLVM_CLANG_TAG=20.1.8
CLANG_ARCHIVE=$(cd "${SCYLLA_DIR}" && realpath -m "${CLANG_ARCHIVE}")
@@ -186,3 +186,7 @@ if [[ $? -ne 0 ]]; then
fi
set -e
tar -C / -xpzf "${CLANG_ARCHIVE}"
dnf remove -y clang clang-libs
# above package removal might have removed those symbolic links, which will cause ccache not to work later on. Manually restore them.
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang++

View File

@@ -29,8 +29,11 @@ class counted_data_source_impl : public data_source_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () {
return fun();
}).finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};
@@ -57,8 +60,11 @@ class counted_data_sink_impl : public data_sink_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () mutable {
return fun();
}).finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};

View File

@@ -832,12 +832,6 @@ to_bytes(bytes_view x) {
return bytes(x.begin(), x.size());
}
inline
bytes
to_bytes(std::string_view x) {
return to_bytes(to_bytes_view(x));
}
inline
bytes_opt
to_bytes_opt(bytes_view_opt bv) {

View File

@@ -21,33 +21,14 @@ static logging::logger ulogger("unimplemented");
std::string_view format_as(cause c) {
switch (c) {
case cause::API: return "API";
case cause::INDEXES: return "INDEXES";
case cause::LWT: return "LWT";
case cause::PAGING: return "PAGING";
case cause::AUTH: return "AUTH";
case cause::PERMISSIONS: return "PERMISSIONS";
case cause::TRIGGERS: return "TRIGGERS";
case cause::COUNTERS: return "COUNTERS";
case cause::METRICS: return "METRICS";
case cause::MIGRATIONS: return "MIGRATIONS";
case cause::GOSSIP: return "GOSSIP";
case cause::TOKEN_RESTRICTION: return "TOKEN_RESTRICTION";
case cause::LEGACY_COMPOSITE_KEYS: return "LEGACY_COMPOSITE_KEYS";
case cause::COLLECTION_RANGE_TOMBSTONES: return "COLLECTION_RANGE_TOMBSTONES";
case cause::RANGE_DELETES: return "RANGE_DELETES";
case cause::VALIDATION: return "VALIDATION";
case cause::REVERSED: return "REVERSED";
case cause::COMPRESSION: return "COMPRESSION";
case cause::NONATOMIC: return "NONATOMIC";
case cause::CONSISTENCY: return "CONSISTENCY";
case cause::HINT: return "HINT";
case cause::SUPER: return "SUPER";
case cause::WRAP_AROUND: return "WRAP_AROUND";
case cause::STORAGE_SERVICE: return "STORAGE_SERVICE";
case cause::API: return "API";
case cause::SCHEMA_CHANGE: return "SCHEMA_CHANGE";
case cause::MIXED_CF: return "MIXED_CF";
case cause::SSTABLE_FORMAT_M: return "SSTABLE_FORMAT_M";
}
abort();
}

View File

@@ -15,33 +15,14 @@
namespace unimplemented {
enum class cause {
API,
INDEXES,
LWT,
PAGING,
AUTH,
PERMISSIONS,
TRIGGERS,
COUNTERS,
METRICS,
MIGRATIONS,
GOSSIP,
TOKEN_RESTRICTION,
LEGACY_COMPOSITE_KEYS,
COLLECTION_RANGE_TOMBSTONES,
RANGE_DELETES,
VALIDATION,
REVERSED,
COMPRESSION,
NONATOMIC,
CONSISTENCY,
HINT,
SUPER,
WRAP_AROUND, // Support for handling wrap around ranges in queries on database level and below
STORAGE_SERVICE,
SCHEMA_CHANGE,
MIXED_CF,
SSTABLE_FORMAT_M,
API, // REST API features not implemented (force_user_defined_compaction, split_output in major compaction)
INDEXES, // Secondary index features (filtering on collections, clustering columns)
TRIGGERS, // Trigger support in schema tables and storage proxy
METRICS, // Query processor metrics
VALIDATION, // Schema validation in DDL statements (drop keyspace, truncate, token functions)
REVERSED, // Reversed types in CQL protocol
HINT, // Hint replaying in batchlog manager
SUPER, // Super column families (legacy Cassandra feature, never supported)
};
[[noreturn]] void fail(cause what);

View File

@@ -1,41 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <fmt/format.h>
#include <functional>
/// Type-erased formatter.
/// Allows passing formattable objects without exposing their types.
class abstract_formatter {
std::function<void(fmt::format_context&)> _formatter;
public:
abstract_formatter() = default;
template<typename Func>
requires std::is_invocable_v<Func, fmt::format_context&>
explicit abstract_formatter(Func&& f) : _formatter(std::forward<Func>(f)) {}
fmt::format_context::iterator format_to(fmt::format_context& ctx) const {
if (_formatter) {
_formatter(ctx);
}
return ctx.out();
}
explicit operator bool() const noexcept { return bool(_formatter); }
};
template <> struct fmt::formatter<abstract_formatter> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const abstract_formatter& formatter, fmt::format_context& ctx) const {
return formatter.format_to(ctx);
}
};

View File

@@ -461,9 +461,6 @@ public:
, _metrics(m)
, _lru(l)
, _region(reg)
, _as(abstract_formatter([this] (fmt::format_context& ctx) {
fmt::format_to(ctx.out(), "cached_file {}", _file_name);
}))
, _cache(page_idx_less_comparator())
, _size(size)
{

View File

@@ -204,7 +204,7 @@ public:
public:
template <typename Clock, typename Duration>
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr, std::source_location loc = std::source_location::current()) {
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr) {
if (!_shared_data) {
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
}
@@ -234,8 +234,7 @@ public:
throw;
}
catch (const std::exception& e) {
on_internal_error(errinj_logger, fmt::format("Error injection [{}] wait_for_message timeout: Called from `{}` @ {}:{}:{:d}: {}",
_shared_data->injection_name, loc.function_name(), loc.file_name(), loc.line(), loc.column(), e.what()));
on_internal_error(errinj_logger, "Error injection wait_for_message timeout: " + std::string(e.what()));
}
++_read_messages_counter;
}

Some files were not shown because too many files have changed in this diff Show More