mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 19:35:12 +00:00
Compare commits
171 Commits
ykaul/comp
...
next-2026.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5231c77e8e | ||
|
|
d5efd1f676 | ||
|
|
d14d07a079 | ||
|
|
70261dc674 | ||
|
|
d280517e27 | ||
|
|
7634d3f7d4 | ||
|
|
b49cf6247f | ||
|
|
878f341338 | ||
|
|
67b3ad94a0 | ||
|
|
c97ce32f47 | ||
|
|
3468e8de8b | ||
|
|
3df951bc9c | ||
|
|
eb3326b417 | ||
|
|
e84e7dfb7a | ||
|
|
3aced88586 | ||
|
|
4043d95810 | ||
|
|
cc39b54173 | ||
|
|
183c6d120e | ||
|
|
dffb266b79 | ||
|
|
6a91d046f3 | ||
|
|
74dd33811e | ||
|
|
18ceeaf3ef | ||
|
|
f5eb99f149 | ||
|
|
cddde464ca | ||
|
|
b6cb025e9b | ||
|
|
f5bb9b6282 | ||
|
|
5f93d57d6e | ||
|
|
cf237e060a | ||
|
|
6f7bf30a14 | ||
|
|
74b523ea20 | ||
|
|
cb8253067d | ||
|
|
bcda39f716 | ||
|
|
6165124fcc | ||
|
|
d222e6e2a4 | ||
|
|
cfebe17592 | ||
|
|
935e6a495d | ||
|
|
cd79b99112 | ||
|
|
474e962e01 | ||
|
|
a50aa7e689 | ||
|
|
d18eb9479f | ||
|
|
69c58c6589 | ||
|
|
16ed338a89 | ||
|
|
5687a4840d | ||
|
|
e414b2b0b9 | ||
|
|
99ac36b353 | ||
|
|
c136b2e640 | ||
|
|
724b9e66ea | ||
|
|
9f11920b15 | ||
|
|
a50e6215aa | ||
|
|
6011cb8a4c | ||
|
|
073710a661 | ||
|
|
40740104ab | ||
|
|
ad7647c3c7 | ||
|
|
e5e6608f20 | ||
|
|
34adb0e069 | ||
|
|
d584bd7358 | ||
|
|
b7f86eaabc | ||
|
|
ece9af229d | ||
|
|
72da1207d7 | ||
|
|
b093477cf7 | ||
|
|
a725e39218 | ||
|
|
68c2e292ac | ||
|
|
c42397e995 | ||
|
|
1aafe0708a | ||
|
|
fa6f239cc7 | ||
|
|
25ba3bd649 | ||
|
|
fab90224b3 | ||
|
|
3bd308986a | ||
|
|
db28411548 | ||
|
|
a4608804d8 | ||
|
|
e9b16a11ba | ||
|
|
701366a8d1 | ||
|
|
da438507d0 | ||
|
|
1344278a19 | ||
|
|
14812ea1e0 | ||
|
|
ef005c10ba | ||
|
|
88bd5ea1b7 | ||
|
|
1071c39f17 | ||
|
|
aa6a0ad326 | ||
|
|
d4ff613c0a | ||
|
|
44b18f3399 | ||
|
|
b0c5eed384 | ||
|
|
afd68187ea | ||
|
|
440d9f2d82 | ||
|
|
e0eb3bde8d | ||
|
|
6892642176 | ||
|
|
ed5dd645e8 | ||
|
|
bfd1302311 | ||
|
|
736011b663 | ||
|
|
8faf62a1aa | ||
|
|
a28689a99a | ||
|
|
370f3fd2e8 | ||
|
|
92a43557dc | ||
|
|
694c1aed98 | ||
|
|
35f14544dc | ||
|
|
1965741914 | ||
|
|
1d631f7bac | ||
|
|
24cd98e454 | ||
|
|
be3239fc58 | ||
|
|
8990346c75 | ||
|
|
fa130051a6 | ||
|
|
63f9362c89 | ||
|
|
9cbb1b851e | ||
|
|
c1fc596203 | ||
|
|
b26e6f7330 | ||
|
|
c6f6e81fe5 | ||
|
|
e0445269e5 | ||
|
|
e42ad62561 | ||
|
|
96e8414963 | ||
|
|
135809d97b | ||
|
|
0a16d90acb | ||
|
|
56ae02d8a3 | ||
|
|
2c75123bbd | ||
|
|
e646b763e7 | ||
|
|
ea26186043 | ||
|
|
c60e3d5cf7 | ||
|
|
b520e74128 | ||
|
|
c4ab0ddb85 | ||
|
|
201ed53837 | ||
|
|
325497d460 | ||
|
|
dcdd2f7e72 | ||
|
|
1039ed9ed2 | ||
|
|
620df7103f | ||
|
|
6fce090e30 | ||
|
|
941011bb4a | ||
|
|
c73f3ac55f | ||
|
|
531f137ed3 | ||
|
|
fcf7c4c90d | ||
|
|
926886fcfb | ||
|
|
eec0b20dbc | ||
|
|
374be94faa | ||
|
|
dce0c24a02 | ||
|
|
b078cd1e72 | ||
|
|
9c4d3ce097 | ||
|
|
0b6b380b80 | ||
|
|
b10028e556 | ||
|
|
8a80e2c3be | ||
|
|
fb0974a329 | ||
|
|
a39fb9d29a | ||
|
|
638efedc3c | ||
|
|
465636bc53 | ||
|
|
0d05e3b4a4 | ||
|
|
930fb4c330 | ||
|
|
02d474fca8 | ||
|
|
68b783103e | ||
|
|
1ac910c2ab | ||
|
|
218f8adc8f | ||
|
|
4988077249 | ||
|
|
b4c0ad20cf | ||
|
|
88c55cf7ed | ||
|
|
2c0de7d9b3 | ||
|
|
1b2b453782 | ||
|
|
38bad5f316 | ||
|
|
1bafc8394c | ||
|
|
8fb91e245f | ||
|
|
89a17491db | ||
|
|
bcdab2e012 | ||
|
|
d41c5a7db4 | ||
|
|
dd83666733 | ||
|
|
72bb3113ac | ||
|
|
751af38f2a | ||
|
|
7cdf7d62a2 | ||
|
|
4657d9e32c | ||
|
|
61877e9dfb | ||
|
|
8d34127684 | ||
|
|
159675e975 | ||
|
|
d1a24aa16a | ||
|
|
9c82b76755 | ||
|
|
3726e31c03 | ||
|
|
8a16746e55 | ||
|
|
82460e7a38 |
4
.github/CODEOWNERS
vendored
4
.github/CODEOWNERS
vendored
@@ -32,8 +32,8 @@ counters* @nuivall
|
||||
tests/counter_test* @nuivall
|
||||
|
||||
# DOCS
|
||||
docs/* @annastuchlik @tzach
|
||||
docs/alternator @annastuchlik @tzach @nyh
|
||||
/docs/ @annastuchlik @tzach
|
||||
/docs/alternator/ @annastuchlik @tzach @nyh
|
||||
|
||||
# GOSSIP
|
||||
gms/* @tgrabiec @asias @kbr-scylla
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.2.0-dev
|
||||
VERSION=2026.2.0-rc0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1892,7 +1892,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
}
|
||||
if (vector_index_updates->Size() > 1) {
|
||||
// VectorIndexUpdates mirrors GlobalSecondaryIndexUpdates.
|
||||
// Since DynamoDB artifically limits the latter to just a
|
||||
// Since DynamoDB artificially limits the latter to just a
|
||||
// single operation (one Create or one Delete), we also
|
||||
// place the same artificial limit on VectorIndexUpdates,
|
||||
// and throw the same LimitExceeded error if the client
|
||||
|
||||
@@ -1354,7 +1354,7 @@ static future<executor::request_return_type> query_vector(
|
||||
std::unordered_set<std::string> used_attribute_values;
|
||||
// Parse the Select parameter and determine which attributes to return.
|
||||
// For a vector index, the default Select is ALL_ATTRIBUTES (full items).
|
||||
// ALL_PROJECTED_ATTRIBUTES is significantly more efficent because it
|
||||
// ALL_PROJECTED_ATTRIBUTES is significantly more efficient because it
|
||||
// returns what the vector store returned without looking up additional
|
||||
// base-table data. Currently only the primary key attributes are projected
|
||||
// but in the future we'll implement projecting additional attributes into
|
||||
|
||||
@@ -167,46 +167,8 @@ static schema_ptr get_schema_from_arn(service::storage_proxy& proxy, const strea
|
||||
}
|
||||
}
|
||||
|
||||
// ShardId. Must be between 28 and 65 characters inclusive.
|
||||
// UUID is 36 bytes as string (including dashes).
|
||||
// Prepend a version/type marker (`S`) -> 37
|
||||
class stream_shard_id : public utils::UUID {
|
||||
public:
|
||||
using UUID = utils::UUID;
|
||||
static constexpr char marker = 'S';
|
||||
|
||||
stream_shard_id() = default;
|
||||
stream_shard_id(const UUID& uuid)
|
||||
: UUID(uuid)
|
||||
{}
|
||||
stream_shard_id(const table_id& tid)
|
||||
: UUID(tid.uuid())
|
||||
{}
|
||||
stream_shard_id(std::string_view v)
|
||||
: UUID(v.substr(1))
|
||||
{
|
||||
if (v[0] != marker) {
|
||||
throw std::invalid_argument(std::string(v));
|
||||
}
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const stream_shard_id& arn) {
|
||||
const UUID& uuid = arn;
|
||||
return os << marker << uuid;
|
||||
}
|
||||
friend std::istream& operator>>(std::istream& is, stream_shard_id& arn) {
|
||||
std::string s;
|
||||
is >> s;
|
||||
arn = stream_shard_id(s);
|
||||
return is;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace alternator
|
||||
|
||||
template<typename ValueType>
|
||||
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_shard_id>
|
||||
: public from_string_helper<ValueType, alternator::stream_shard_id>
|
||||
{};
|
||||
template<typename ValueType>
|
||||
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_arn>
|
||||
: public from_string_helper<ValueType, alternator::stream_arn>
|
||||
@@ -218,7 +180,8 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
_stats.api_operations.list_streams++;
|
||||
|
||||
auto limit = rjson::get_opt<int>(request, "Limit").value_or(100);
|
||||
auto streams_start = rjson::get_opt<stream_shard_id>(request, "ExclusiveStartStreamArn");
|
||||
auto streams_start = rjson::get_opt<stream_arn>(request, "ExclusiveStartStreamArn");
|
||||
|
||||
auto table = find_table(_proxy, request);
|
||||
auto db = _proxy.data_dictionary();
|
||||
|
||||
@@ -244,34 +207,34 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
cfs = db.get_tables();
|
||||
}
|
||||
|
||||
// # 12601 (maybe?) - sort the set of tables on ID. This should ensure we never
|
||||
// generate duplicates in a paged listing here. Can obviously miss things if they
|
||||
// are added between paged calls and end up with a "smaller" UUID/ARN, but that
|
||||
// is to be expected.
|
||||
// We need to sort the tables to ensure a stable order for paging.
|
||||
// We sort by keyspace and table name, which will also allow us to skip to
|
||||
// the right position by ExclusiveStartStreamArn.
|
||||
auto cmp = [](std::string_view ks1, std::string_view cf1, std::string_view ks2, std::string_view cf2) {
|
||||
return ks1 == ks2 ? cf1 < cf2 : ks1 < ks2;
|
||||
};
|
||||
if (std::cmp_less(limit, cfs.size()) || streams_start) {
|
||||
std::sort(cfs.begin(), cfs.end(), [](const data_dictionary::table& t1, const data_dictionary::table& t2) {
|
||||
return t1.schema()->id().uuid() < t2.schema()->id().uuid();
|
||||
});
|
||||
std::sort(cfs.begin(), cfs.end(),
|
||||
[&cmp](const data_dictionary::table& t1, const data_dictionary::table& t2) {
|
||||
return cmp(t1.schema()->ks_name(), t1.schema()->cf_name(),
|
||||
t2.schema()->ks_name(), t2.schema()->cf_name());
|
||||
});
|
||||
}
|
||||
|
||||
auto i = cfs.begin();
|
||||
auto e = cfs.end();
|
||||
|
||||
if (streams_start) {
|
||||
i = std::find_if(i, e, [&](const data_dictionary::table& t) {
|
||||
return t.schema()->id().uuid() == streams_start
|
||||
&& cdc::get_base_table(db.real_database(), *t.schema())
|
||||
&& is_alternator_keyspace(t.schema()->ks_name())
|
||||
;
|
||||
});
|
||||
if (i != e) {
|
||||
++i;
|
||||
}
|
||||
i = std::upper_bound(i, e, *streams_start,
|
||||
[&cmp](const stream_arn& arn, const data_dictionary::table& t) {
|
||||
return cmp(arn.keyspace_name(), arn.table_name(),
|
||||
t.schema()->ks_name(), t.schema()->cf_name());
|
||||
});
|
||||
}
|
||||
|
||||
auto ret = rjson::empty_object();
|
||||
auto streams = rjson::empty_array();
|
||||
std::optional<stream_shard_id> last;
|
||||
std::optional<stream_arn> last;
|
||||
|
||||
for (;limit > 0 && i != e; ++i) {
|
||||
auto s = i->schema();
|
||||
@@ -282,19 +245,24 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
}
|
||||
if (cdc::is_log_for_some_table(db.real_database(), ks_name, cf_name)) {
|
||||
rjson::value new_entry = rjson::empty_object();
|
||||
last = i->schema()->id();
|
||||
|
||||
auto arn = stream_arn{ i->schema(), cdc::get_base_table(db.real_database(), *i->schema()) };
|
||||
rjson::add(new_entry, "StreamArn", arn);
|
||||
rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s)));
|
||||
rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(s->cf_name())));
|
||||
rjson::push_back(streams, std::move(new_entry));
|
||||
last = std::move(arn);
|
||||
--limit;
|
||||
}
|
||||
}
|
||||
|
||||
rjson::add(ret, "Streams", std::move(streams));
|
||||
|
||||
if (last) {
|
||||
// Only emit LastEvaluatedStreamArn when we stopped because we hit the
|
||||
// limit (limit == 0), meaning there may be more streams to list.
|
||||
// If we exhausted all tables naturally (limit > 0), there are no more
|
||||
// streams, so we must not emit a cookie.
|
||||
if (last && limit == 0) {
|
||||
rjson::add(ret, "LastEvaluatedStreamArn", *last);
|
||||
}
|
||||
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
||||
@@ -614,7 +582,7 @@ void stream_id_range::prepare_for_iterating()
|
||||
// the function returns `stream_id_range` that will allow iteration over children Streams shards for the Streams shard `parent`
|
||||
// a child Streams shard is defined as a Streams shard that touches token range that was previously covered by `parent` Streams shard
|
||||
// Streams shard contains a token, that represents end of the token range for that Streams shard (inclusive)
|
||||
// begginning of the token range is defined by previous Streams shard's token + 1
|
||||
// beginning of the token range is defined by previous Streams shard's token + 1
|
||||
// NOTE: With vnodes, ranges of Streams' shards wrap, while with tablets the biggest allowed token number is always a range end.
|
||||
// NOTE: both streams generation are guaranteed to cover whole range and be non-empty
|
||||
// NOTE: it's possible to get more than one stream shard with the same token value (thus some of those stream shards will be empty) -
|
||||
|
||||
@@ -856,7 +856,9 @@ rest_exclude_node(sharded<service::storage_service>& ss, std::unique_ptr<http::r
|
||||
}
|
||||
|
||||
apilog.info("exclude_node: hosts={}", hosts);
|
||||
co_await ss.local().mark_excluded(hosts);
|
||||
co_await ss.local().run_with_no_api_lock([hosts = std::move(hosts)] (service::storage_service& ss) {
|
||||
return ss.mark_excluded(hosts);
|
||||
});
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
@@ -1731,7 +1733,9 @@ rest_create_vnode_tablet_migration(http_context& ctx, sharded<service::storage_s
|
||||
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
|
||||
}
|
||||
auto keyspace = validate_keyspace(ctx, req);
|
||||
co_await ss.local().prepare_for_tablets_migration(keyspace);
|
||||
co_await ss.local().run_with_no_api_lock([keyspace] (service::storage_service& ss) {
|
||||
return ss.prepare_for_tablets_migration(keyspace);
|
||||
});
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
@@ -1743,7 +1747,9 @@ rest_get_vnode_tablet_migration(http_context& ctx, sharded<service::storage_serv
|
||||
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
|
||||
}
|
||||
auto keyspace = validate_keyspace(ctx, req);
|
||||
auto status = co_await ss.local().get_tablets_migration_status_with_node_details(keyspace);
|
||||
auto status = co_await ss.local().run_with_no_api_lock([keyspace] (service::storage_service& ss) {
|
||||
return ss.get_tablets_migration_status_with_node_details(keyspace);
|
||||
});
|
||||
|
||||
ss::vnode_tablet_migration_status result;
|
||||
result.keyspace = status.keyspace;
|
||||
@@ -1768,7 +1774,9 @@ rest_set_vnode_tablet_migration_node_storage_mode(http_context& ctx, sharded<ser
|
||||
}
|
||||
auto mode_str = req->get_query_param("intended_mode");
|
||||
auto mode = service::intended_storage_mode_from_string(mode_str);
|
||||
co_await ss.local().set_node_intended_storage_mode(mode);
|
||||
co_await ss.local().run_with_no_api_lock([mode] (service::storage_service& ss) {
|
||||
return ss.set_node_intended_storage_mode(mode);
|
||||
});
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
@@ -1782,7 +1790,9 @@ rest_finalize_vnode_tablet_migration(http_context& ctx, sharded<service::storage
|
||||
auto keyspace = validate_keyspace(ctx, req);
|
||||
validate_keyspace(ctx, keyspace);
|
||||
|
||||
co_await ss.local().finalize_tablets_migration(keyspace);
|
||||
co_await ss.local().run_with_no_api_lock([keyspace] (service::storage_service& ss) {
|
||||
return ss.finalize_tablets_migration(keyspace);
|
||||
});
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
@@ -1859,90 +1869,106 @@ rest_bind(FuncType func, BindArgs&... args) {
|
||||
return std::bind_front(func, std::ref(args)...);
|
||||
}
|
||||
|
||||
// Hold the storage_service async gate for the duration of async REST
|
||||
// handlers so stop() drains in-flight requests before teardown.
|
||||
// Synchronous handlers don't yield and need no gate.
|
||||
static seastar::httpd::future_json_function
|
||||
gated(sharded<service::storage_service>& ss, seastar::httpd::future_json_function fn) {
|
||||
return [fn = std::move(fn), &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto holder = ss.local().hold_async_gate();
|
||||
co_return co_await fn(std::move(req));
|
||||
};
|
||||
}
|
||||
|
||||
static seastar::httpd::json_request_function
|
||||
gated(sharded<service::storage_service>&, seastar::httpd::json_request_function fn) {
|
||||
return fn;
|
||||
}
|
||||
|
||||
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
|
||||
ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss));
|
||||
ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss));
|
||||
ss::get_scylla_release_version.set(r, rest_bind(rest_get_scylla_release_version, ss));
|
||||
ss::get_schema_version.set(r, rest_bind(rest_get_schema_version, ss));
|
||||
ss::get_range_to_endpoint_map.set(r, rest_bind(rest_get_range_to_endpoint_map, ctx, ss));
|
||||
ss::get_pending_range_to_endpoint_map.set(r, rest_bind(rest_get_pending_range_to_endpoint_map, ctx));
|
||||
ss::describe_ring.set(r, rest_bind(rest_describe_ring, ctx, ss));
|
||||
ss::get_current_generation_number.set(r, rest_bind(rest_get_current_generation_number, ss));
|
||||
ss::get_natural_endpoints.set(r, rest_bind(rest_get_natural_endpoints, ctx, ss));
|
||||
ss::get_natural_endpoints_v2.set(r, rest_bind(rest_get_natural_endpoints_v2, ctx, ss));
|
||||
ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss));
|
||||
ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss));
|
||||
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
|
||||
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
|
||||
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
|
||||
ss::decommission.set(r, rest_bind(rest_decommission, ss, ssc));
|
||||
ss::logstor_compaction.set(r, rest_bind(rest_logstor_compaction, ctx));
|
||||
ss::logstor_flush.set(r, rest_bind(rest_logstor_flush, ctx));
|
||||
ss::move.set(r, rest_bind(rest_move, ss));
|
||||
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
|
||||
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
|
||||
ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss));
|
||||
ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss));
|
||||
ss::set_logging_level.set(r, rest_bind(rest_set_logging_level));
|
||||
ss::get_logging_levels.set(r, rest_bind(rest_get_logging_levels));
|
||||
ss::get_operation_mode.set(r, rest_bind(rest_get_operation_mode, ss));
|
||||
ss::is_starting.set(r, rest_bind(rest_is_starting, ss));
|
||||
ss::get_drain_progress.set(r, rest_bind(rest_get_drain_progress, ss));
|
||||
ss::drain.set(r, rest_bind(rest_drain, ss));
|
||||
ss::stop_gossiping.set(r, rest_bind(rest_stop_gossiping, ss));
|
||||
ss::start_gossiping.set(r, rest_bind(rest_start_gossiping, ss));
|
||||
ss::is_gossip_running.set(r, rest_bind(rest_is_gossip_running, ss));
|
||||
ss::stop_daemon.set(r, rest_bind(rest_stop_daemon));
|
||||
ss::is_initialized.set(r, rest_bind(rest_is_initialized, ss));
|
||||
ss::join_ring.set(r, rest_bind(rest_join_ring));
|
||||
ss::is_joined.set(r, rest_bind(rest_is_joined, ss));
|
||||
ss::is_incremental_backups_enabled.set(r, rest_bind(rest_is_incremental_backups_enabled, ctx));
|
||||
ss::set_incremental_backups_enabled.set(r, rest_bind(rest_set_incremental_backups_enabled, ctx));
|
||||
ss::rebuild.set(r, rest_bind(rest_rebuild, ss));
|
||||
ss::bulk_load.set(r, rest_bind(rest_bulk_load));
|
||||
ss::bulk_load_async.set(r, rest_bind(rest_bulk_load_async));
|
||||
ss::reschedule_failed_deletions.set(r, rest_bind(rest_reschedule_failed_deletions));
|
||||
ss::sample_key_range.set(r, rest_bind(rest_sample_key_range));
|
||||
ss::reset_local_schema.set(r, rest_bind(rest_reset_local_schema, ss));
|
||||
ss::set_trace_probability.set(r, rest_bind(rest_set_trace_probability));
|
||||
ss::get_trace_probability.set(r, rest_bind(rest_get_trace_probability));
|
||||
ss::get_slow_query_info.set(r, rest_bind(rest_get_slow_query_info));
|
||||
ss::set_slow_query.set(r, rest_bind(rest_set_slow_query));
|
||||
ss::deliver_hints.set(r, rest_bind(rest_deliver_hints));
|
||||
ss::get_cluster_name.set(r, rest_bind(rest_get_cluster_name, ss));
|
||||
ss::get_partitioner_name.set(r, rest_bind(rest_get_partitioner_name, ss));
|
||||
ss::get_tombstone_warn_threshold.set(r, rest_bind(rest_get_tombstone_warn_threshold));
|
||||
ss::set_tombstone_warn_threshold.set(r, rest_bind(rest_set_tombstone_warn_threshold));
|
||||
ss::get_tombstone_failure_threshold.set(r, rest_bind(rest_get_tombstone_failure_threshold));
|
||||
ss::set_tombstone_failure_threshold.set(r, rest_bind(rest_set_tombstone_failure_threshold));
|
||||
ss::get_batch_size_failure_threshold.set(r, rest_bind(rest_get_batch_size_failure_threshold));
|
||||
ss::set_batch_size_failure_threshold.set(r, rest_bind(rest_set_batch_size_failure_threshold));
|
||||
ss::set_hinted_handoff_throttle_in_kb.set(r, rest_bind(rest_set_hinted_handoff_throttle_in_kb));
|
||||
ss::get_exceptions.set(r, rest_bind(rest_get_exceptions, ss));
|
||||
ss::get_total_hints_in_progress.set(r, rest_bind(rest_get_total_hints_in_progress));
|
||||
ss::get_total_hints.set(r, rest_bind(rest_get_total_hints));
|
||||
ss::get_ownership.set(r, rest_bind(rest_get_ownership, ctx, ss));
|
||||
ss::get_effective_ownership.set(r, rest_bind(rest_get_effective_ownership, ctx, ss));
|
||||
ss::retrain_dict.set(r, rest_bind(rest_retrain_dict, ctx, ss, group0_client));
|
||||
ss::estimate_compression_ratios.set(r, rest_bind(rest_estimate_compression_ratios, ctx, ss));
|
||||
ss::sstable_info.set(r, rest_bind(rest_sstable_info, ctx));
|
||||
ss::logstor_info.set(r, rest_bind(rest_logstor_info, ctx));
|
||||
ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client));
|
||||
ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss));
|
||||
ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss));
|
||||
ss::raft_topology_get_cmd_status.set(r, rest_bind(rest_raft_topology_get_cmd_status, ss));
|
||||
ss::move_tablet.set(r, rest_bind(rest_move_tablet, ctx, ss));
|
||||
ss::add_tablet_replica.set(r, rest_bind(rest_add_tablet_replica, ctx, ss));
|
||||
ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss));
|
||||
ss::repair_tablet.set(r, rest_bind(rest_repair_tablet, ctx, ss));
|
||||
ss::tablet_balancing_enable.set(r, rest_bind(rest_tablet_balancing_enable, ss));
|
||||
ss::create_vnode_tablet_migration.set(r, rest_bind(rest_create_vnode_tablet_migration, ctx, ss));
|
||||
ss::get_vnode_tablet_migration.set(r, rest_bind(rest_get_vnode_tablet_migration, ctx, ss));
|
||||
ss::set_vnode_tablet_migration_node_storage_mode.set(r, rest_bind(rest_set_vnode_tablet_migration_node_storage_mode, ctx, ss));
|
||||
ss::finalize_vnode_tablet_migration.set(r, rest_bind(rest_finalize_vnode_tablet_migration, ctx, ss));
|
||||
ss::quiesce_topology.set(r, rest_bind(rest_quiesce_topology, ss));
|
||||
sp::get_schema_versions.set(r, rest_bind(rest_get_schema_versions, ss));
|
||||
ss::drop_quarantined_sstables.set(r, rest_bind(rest_drop_quarantined_sstables, ctx, ss));
|
||||
ss::get_token_endpoint.set(r, gated(ss, rest_bind(rest_get_token_endpoint, ctx, ss)));
|
||||
ss::get_release_version.set(r, gated(ss, rest_bind(rest_get_release_version, ss)));
|
||||
ss::get_scylla_release_version.set(r, gated(ss, rest_bind(rest_get_scylla_release_version, ss)));
|
||||
ss::get_schema_version.set(r, gated(ss, rest_bind(rest_get_schema_version, ss)));
|
||||
ss::get_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_range_to_endpoint_map, ctx, ss)));
|
||||
ss::get_pending_range_to_endpoint_map.set(r, gated(ss, rest_bind(rest_get_pending_range_to_endpoint_map, ctx)));
|
||||
ss::describe_ring.set(r, gated(ss, rest_bind(rest_describe_ring, ctx, ss)));
|
||||
ss::get_current_generation_number.set(r, gated(ss, rest_bind(rest_get_current_generation_number, ss)));
|
||||
ss::get_natural_endpoints.set(r, gated(ss, rest_bind(rest_get_natural_endpoints, ctx, ss)));
|
||||
ss::get_natural_endpoints_v2.set(r, gated(ss, rest_bind(rest_get_natural_endpoints_v2, ctx, ss)));
|
||||
ss::cdc_streams_check_and_repair.set(r, gated(ss, rest_bind(rest_cdc_streams_check_and_repair, ss)));
|
||||
ss::cleanup_all.set(r, gated(ss, rest_bind(rest_cleanup_all, ctx, ss)));
|
||||
ss::reset_cleanup_needed.set(r, gated(ss, rest_bind(rest_reset_cleanup_needed, ctx, ss)));
|
||||
ss::force_flush.set(r, gated(ss, rest_bind(rest_force_flush, ctx)));
|
||||
ss::force_keyspace_flush.set(r, gated(ss, rest_bind(rest_force_keyspace_flush, ctx)));
|
||||
ss::decommission.set(r, gated(ss, rest_bind(rest_decommission, ss, ssc)));
|
||||
ss::logstor_compaction.set(r, gated(ss, rest_bind(rest_logstor_compaction, ctx)));
|
||||
ss::logstor_flush.set(r, gated(ss, rest_bind(rest_logstor_flush, ctx)));
|
||||
ss::move.set(r, gated(ss, rest_bind(rest_move, ss)));
|
||||
ss::remove_node.set(r, gated(ss, rest_bind(rest_remove_node, ss)));
|
||||
ss::exclude_node.set(r, gated(ss, rest_bind(rest_exclude_node, ss)));
|
||||
ss::get_removal_status.set(r, gated(ss, rest_bind(rest_get_removal_status, ss)));
|
||||
ss::force_remove_completion.set(r, gated(ss, rest_bind(rest_force_remove_completion, ss)));
|
||||
ss::set_logging_level.set(r, gated(ss, rest_bind(rest_set_logging_level)));
|
||||
ss::get_logging_levels.set(r, gated(ss, rest_bind(rest_get_logging_levels)));
|
||||
ss::get_operation_mode.set(r, gated(ss, rest_bind(rest_get_operation_mode, ss)));
|
||||
ss::is_starting.set(r, gated(ss, rest_bind(rest_is_starting, ss)));
|
||||
ss::get_drain_progress.set(r, gated(ss, rest_bind(rest_get_drain_progress, ss)));
|
||||
ss::drain.set(r, gated(ss, rest_bind(rest_drain, ss)));
|
||||
ss::stop_gossiping.set(r, gated(ss, rest_bind(rest_stop_gossiping, ss)));
|
||||
ss::start_gossiping.set(r, gated(ss, rest_bind(rest_start_gossiping, ss)));
|
||||
ss::is_gossip_running.set(r, gated(ss, rest_bind(rest_is_gossip_running, ss)));
|
||||
ss::stop_daemon.set(r, gated(ss, rest_bind(rest_stop_daemon)));
|
||||
ss::is_initialized.set(r, gated(ss, rest_bind(rest_is_initialized, ss)));
|
||||
ss::join_ring.set(r, gated(ss, rest_bind(rest_join_ring)));
|
||||
ss::is_joined.set(r, gated(ss, rest_bind(rest_is_joined, ss)));
|
||||
ss::is_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_is_incremental_backups_enabled, ctx)));
|
||||
ss::set_incremental_backups_enabled.set(r, gated(ss, rest_bind(rest_set_incremental_backups_enabled, ctx)));
|
||||
ss::rebuild.set(r, gated(ss, rest_bind(rest_rebuild, ss)));
|
||||
ss::bulk_load.set(r, gated(ss, rest_bind(rest_bulk_load)));
|
||||
ss::bulk_load_async.set(r, gated(ss, rest_bind(rest_bulk_load_async)));
|
||||
ss::reschedule_failed_deletions.set(r, gated(ss, rest_bind(rest_reschedule_failed_deletions)));
|
||||
ss::sample_key_range.set(r, gated(ss, rest_bind(rest_sample_key_range)));
|
||||
ss::reset_local_schema.set(r, gated(ss, rest_bind(rest_reset_local_schema, ss)));
|
||||
ss::set_trace_probability.set(r, gated(ss, rest_bind(rest_set_trace_probability)));
|
||||
ss::get_trace_probability.set(r, gated(ss, rest_bind(rest_get_trace_probability)));
|
||||
ss::get_slow_query_info.set(r, gated(ss, rest_bind(rest_get_slow_query_info)));
|
||||
ss::set_slow_query.set(r, gated(ss, rest_bind(rest_set_slow_query)));
|
||||
ss::deliver_hints.set(r, gated(ss, rest_bind(rest_deliver_hints)));
|
||||
ss::get_cluster_name.set(r, gated(ss, rest_bind(rest_get_cluster_name, ss)));
|
||||
ss::get_partitioner_name.set(r, gated(ss, rest_bind(rest_get_partitioner_name, ss)));
|
||||
ss::get_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_warn_threshold)));
|
||||
ss::set_tombstone_warn_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_warn_threshold)));
|
||||
ss::get_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_get_tombstone_failure_threshold)));
|
||||
ss::set_tombstone_failure_threshold.set(r, gated(ss, rest_bind(rest_set_tombstone_failure_threshold)));
|
||||
ss::get_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_get_batch_size_failure_threshold)));
|
||||
ss::set_batch_size_failure_threshold.set(r, gated(ss, rest_bind(rest_set_batch_size_failure_threshold)));
|
||||
ss::set_hinted_handoff_throttle_in_kb.set(r, gated(ss, rest_bind(rest_set_hinted_handoff_throttle_in_kb)));
|
||||
ss::get_exceptions.set(r, gated(ss, rest_bind(rest_get_exceptions, ss)));
|
||||
ss::get_total_hints_in_progress.set(r, gated(ss, rest_bind(rest_get_total_hints_in_progress)));
|
||||
ss::get_total_hints.set(r, gated(ss, rest_bind(rest_get_total_hints)));
|
||||
ss::get_ownership.set(r, gated(ss, rest_bind(rest_get_ownership, ctx, ss)));
|
||||
ss::get_effective_ownership.set(r, gated(ss, rest_bind(rest_get_effective_ownership, ctx, ss)));
|
||||
ss::retrain_dict.set(r, gated(ss, rest_bind(rest_retrain_dict, ctx, ss, group0_client)));
|
||||
ss::estimate_compression_ratios.set(r, gated(ss, rest_bind(rest_estimate_compression_ratios, ctx, ss)));
|
||||
ss::sstable_info.set(r, gated(ss, rest_bind(rest_sstable_info, ctx)));
|
||||
ss::logstor_info.set(r, gated(ss, rest_bind(rest_logstor_info, ctx)));
|
||||
ss::reload_raft_topology_state.set(r, gated(ss, rest_bind(rest_reload_raft_topology_state, ss, group0_client)));
|
||||
ss::upgrade_to_raft_topology.set(r, gated(ss, rest_bind(rest_upgrade_to_raft_topology, ss)));
|
||||
ss::raft_topology_upgrade_status.set(r, gated(ss, rest_bind(rest_raft_topology_upgrade_status, ss)));
|
||||
ss::raft_topology_get_cmd_status.set(r, gated(ss, rest_bind(rest_raft_topology_get_cmd_status, ss)));
|
||||
ss::move_tablet.set(r, gated(ss, rest_bind(rest_move_tablet, ctx, ss)));
|
||||
ss::add_tablet_replica.set(r, gated(ss, rest_bind(rest_add_tablet_replica, ctx, ss)));
|
||||
ss::del_tablet_replica.set(r, gated(ss, rest_bind(rest_del_tablet_replica, ctx, ss)));
|
||||
ss::repair_tablet.set(r, gated(ss, rest_bind(rest_repair_tablet, ctx, ss)));
|
||||
ss::tablet_balancing_enable.set(r, gated(ss, rest_bind(rest_tablet_balancing_enable, ss)));
|
||||
ss::create_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_create_vnode_tablet_migration, ctx, ss)));
|
||||
ss::get_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_get_vnode_tablet_migration, ctx, ss)));
|
||||
ss::set_vnode_tablet_migration_node_storage_mode.set(r, gated(ss, rest_bind(rest_set_vnode_tablet_migration_node_storage_mode, ctx, ss)));
|
||||
ss::finalize_vnode_tablet_migration.set(r, gated(ss, rest_bind(rest_finalize_vnode_tablet_migration, ctx, ss)));
|
||||
ss::quiesce_topology.set(r, gated(ss, rest_bind(rest_quiesce_topology, ss)));
|
||||
sp::get_schema_versions.set(r, gated(ss, rest_bind(rest_get_schema_versions, ss)));
|
||||
ss::drop_quarantined_sstables.set(r, gated(ss, rest_bind(rest_drop_quarantined_sstables, ctx, ss)));
|
||||
}
|
||||
|
||||
void unset_storage_service(http_context& ctx, routes& r) {
|
||||
|
||||
@@ -113,8 +113,8 @@ static category_set parse_audit_categories(const sstring& data) {
|
||||
return result;
|
||||
}
|
||||
|
||||
static std::map<sstring, std::set<sstring>> parse_audit_tables(const sstring& data) {
|
||||
std::map<sstring, std::set<sstring>> result;
|
||||
static audit::audited_tables_t parse_audit_tables(const sstring& data) {
|
||||
audit::audited_tables_t result;
|
||||
if (!data.empty()) {
|
||||
std::vector<sstring> tokens;
|
||||
boost::split(tokens, data, boost::is_any_of(","));
|
||||
@@ -139,8 +139,8 @@ static std::map<sstring, std::set<sstring>> parse_audit_tables(const sstring& da
|
||||
return result;
|
||||
}
|
||||
|
||||
static std::set<sstring> parse_audit_keyspaces(const sstring& data) {
|
||||
std::set<sstring> result;
|
||||
static audit::audited_keyspaces_t parse_audit_keyspaces(const sstring& data) {
|
||||
audit::audited_keyspaces_t result;
|
||||
if (!data.empty()) {
|
||||
std::vector<sstring> tokens;
|
||||
boost::split(tokens, data, boost::is_any_of(","));
|
||||
@@ -156,8 +156,8 @@ audit::audit(locator::shared_token_metadata& token_metadata,
|
||||
cql3::query_processor& qp,
|
||||
service::migration_manager& mm,
|
||||
std::set<sstring>&& audit_modes,
|
||||
std::set<sstring>&& audited_keyspaces,
|
||||
std::map<sstring, std::set<sstring>>&& audited_tables,
|
||||
audited_keyspaces_t&& audited_keyspaces,
|
||||
audited_tables_t&& audited_tables,
|
||||
category_set&& audited_categories,
|
||||
const db::config& cfg)
|
||||
: _token_metadata(token_metadata)
|
||||
@@ -165,8 +165,8 @@ audit::audit(locator::shared_token_metadata& token_metadata,
|
||||
, _audited_tables(std::move(audited_tables))
|
||||
, _audited_categories(std::move(audited_categories))
|
||||
, _cfg(cfg)
|
||||
, _cfg_keyspaces_observer(cfg.audit_keyspaces.observe([this] (sstring const& new_value){ update_config<std::set<sstring>>(new_value, parse_audit_keyspaces, _audited_keyspaces); }))
|
||||
, _cfg_tables_observer(cfg.audit_tables.observe([this] (sstring const& new_value){ update_config<std::map<sstring, std::set<sstring>>>(new_value, parse_audit_tables, _audited_tables); }))
|
||||
, _cfg_keyspaces_observer(cfg.audit_keyspaces.observe([this] (sstring const& new_value){ update_config<audited_keyspaces_t>(new_value, parse_audit_keyspaces, _audited_keyspaces); }))
|
||||
, _cfg_tables_observer(cfg.audit_tables.observe([this] (sstring const& new_value){ update_config<audited_tables_t>(new_value, parse_audit_tables, _audited_tables); }))
|
||||
, _cfg_categories_observer(cfg.audit_categories.observe([this] (sstring const& new_value){ update_config<category_set>(new_value, parse_audit_categories, _audited_categories); }))
|
||||
{
|
||||
_storage_helper_ptr = create_storage_helper(std::move(audit_modes), qp, mm);
|
||||
@@ -181,8 +181,8 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
|
||||
return make_ready_future<>();
|
||||
}
|
||||
category_set audited_categories = parse_audit_categories(cfg.audit_categories());
|
||||
std::map<sstring, std::set<sstring>> audited_tables = parse_audit_tables(cfg.audit_tables());
|
||||
std::set<sstring> audited_keyspaces = parse_audit_keyspaces(cfg.audit_keyspaces());
|
||||
audit::audited_tables_t audited_tables = parse_audit_tables(cfg.audit_tables());
|
||||
audit::audited_keyspaces_t audited_keyspaces = parse_audit_keyspaces(cfg.audit_keyspaces());
|
||||
|
||||
logger.info("Audit is enabled. Auditing to: \"{}\", with the following categories: \"{}\", keyspaces: \"{}\", and tables: \"{}\"",
|
||||
cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables());
|
||||
@@ -304,7 +304,7 @@ future<> inspect_login(const sstring& username, socket_address client_ip, bool e
|
||||
return audit::local_audit_instance().log_login(username, client_ip, error);
|
||||
}
|
||||
|
||||
bool audit::should_log_table(const sstring& keyspace, const sstring& name) const {
|
||||
bool audit::should_log_table(std::string_view keyspace, std::string_view name) const {
|
||||
auto keyspace_it = _audited_tables.find(keyspace);
|
||||
return keyspace_it != _audited_tables.cend() && keyspace_it->second.find(name) != keyspace_it->second.cend();
|
||||
}
|
||||
@@ -319,8 +319,8 @@ bool audit::will_log(statement_category cat, std::string_view keyspace, std::str
|
||||
// so it is logged whenever the category matches.
|
||||
return _audited_categories.contains(cat)
|
||||
&& (keyspace.empty()
|
||||
|| _audited_keyspaces.find(sstring(keyspace)) != _audited_keyspaces.cend()
|
||||
|| should_log_table(sstring(keyspace), sstring(table))
|
||||
|| _audited_keyspaces.find(keyspace) != _audited_keyspaces.cend()
|
||||
|| should_log_table(keyspace, table)
|
||||
|| cat == statement_category::AUTH
|
||||
|| cat == statement_category::ADMIN
|
||||
|| cat == statement_category::DCL);
|
||||
|
||||
@@ -129,10 +129,15 @@ public:
|
||||
class storage_helper;
|
||||
|
||||
class audit final : public seastar::async_sharded_service<audit> {
|
||||
public:
|
||||
// Transparent comparator (std::less<>) enables heterogeneous lookup with
|
||||
// string_view keys.
|
||||
using audited_keyspaces_t = std::set<sstring, std::less<>>;
|
||||
using audited_tables_t = std::map<sstring, std::set<sstring, std::less<>>, std::less<>>;
|
||||
private:
|
||||
locator::shared_token_metadata& _token_metadata;
|
||||
std::set<sstring> _audited_keyspaces;
|
||||
// Maps keyspace name to set of table names in that keyspace
|
||||
std::map<sstring, std::set<sstring>> _audited_tables;
|
||||
audited_keyspaces_t _audited_keyspaces;
|
||||
audited_tables_t _audited_tables;
|
||||
category_set _audited_categories;
|
||||
|
||||
std::unique_ptr<storage_helper> _storage_helper_ptr;
|
||||
@@ -145,7 +150,7 @@ class audit final : public seastar::async_sharded_service<audit> {
|
||||
template<class T>
|
||||
void update_config(const sstring & new_value, std::function<T(const sstring&)> parse_func, T& cfg_parameter);
|
||||
|
||||
bool should_log_table(const sstring& keyspace, const sstring& name) const;
|
||||
bool should_log_table(std::string_view keyspace, std::string_view name) const;
|
||||
public:
|
||||
static seastar::sharded<audit>& audit_instance() {
|
||||
// FIXME: leaked intentionally to avoid shutdown problems, see #293
|
||||
@@ -164,8 +169,8 @@ public:
|
||||
cql3::query_processor& qp,
|
||||
service::migration_manager& mm,
|
||||
std::set<sstring>&& audit_modes,
|
||||
std::set<sstring>&& audited_keyspaces,
|
||||
std::map<sstring, std::set<sstring>>&& audited_tables,
|
||||
audited_keyspaces_t&& audited_keyspaces,
|
||||
audited_tables_t&& audited_tables,
|
||||
category_set&& audited_categories,
|
||||
const db::config& cfg);
|
||||
~audit();
|
||||
|
||||
@@ -1625,7 +1625,7 @@ struct process_change_visitor {
|
||||
if (_enable_updating_state) {
|
||||
if (_request_options.alternator && _alternator_schema_has_no_clustering_key && _clustering_row_states.empty()) {
|
||||
// Alternator's table can be with or without clustering key. If the clustering key exists,
|
||||
// delete request will be `clustered_row_delete` and will be hanlded there.
|
||||
// delete request will be `clustered_row_delete` and will be handled there.
|
||||
// If the clustering key doesn't exist, delete request will be `partition_delete` and will be handled here.
|
||||
// The no-clustering-key case is slightly tricky, because insert of such item is handled by `clustered_row_cells`
|
||||
// and has some value as clustering_key (the value currently seems to be empty bytes object).
|
||||
@@ -1933,7 +1933,7 @@ public:
|
||||
if (_options.alternator && !_alternator_clustering_keys_to_ignore.empty()) {
|
||||
// we filter mutations for Alternator's changes here.
|
||||
// We do it per mutation object (user might submit a batch of those in one go
|
||||
// and some might be splitted because of different timestamps),
|
||||
// and some might be split because of different timestamps),
|
||||
// ignore key set is cleared afterwards.
|
||||
// If single mutation object contains two separate changes to the same row
|
||||
// and at least one of them is ignored, all of them will be ignored.
|
||||
|
||||
@@ -240,7 +240,7 @@ static max_purgeable get_max_purgeable_timestamp(const compaction_group_view& ta
|
||||
// and if the memtable also contains the key we're calculating max purgeable timestamp for.
|
||||
// First condition helps to not penalize the common scenario where memtable only contains
|
||||
// newer data.
|
||||
if (memtable_min_timestamp <= compacting_max_timestamp && table_s.memtable_has_key(dk)) {
|
||||
if (!table_s.skip_memtable_for_tombstone_gc() && memtable_min_timestamp <= compacting_max_timestamp && table_s.memtable_has_key(dk)) {
|
||||
timestamp = memtable_min_timestamp;
|
||||
source = max_purgeable::timestamp_source::memtable_possibly_shadowing_data;
|
||||
}
|
||||
|
||||
@@ -39,6 +39,9 @@ public:
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const = 0;
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const = 0;
|
||||
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const = 0;
|
||||
// Returns true when tombstone GC considers only the repaired sstable set, meaning the
|
||||
// memtable does not need to be consulted (its data is always newer than any GC-eligible tombstone).
|
||||
virtual bool skip_memtable_for_tombstone_gc() const noexcept = 0;
|
||||
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const = 0;
|
||||
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept = 0;
|
||||
virtual compaction_strategy& get_compaction_strategy() const noexcept = 0;
|
||||
|
||||
@@ -406,7 +406,11 @@ commitlog_total_space_in_mb: -1
|
||||
# In short, `ms` needs more CPU during sstable writes,
|
||||
# but should behave better during reads,
|
||||
# although it might behave worse for very long clustering keys.
|
||||
#
|
||||
# `ms` sstable format works even better with `column_index_size_in_kb` set to 1,
|
||||
# so keep those two settings in sync (either both set, or both unset).
|
||||
sstable_format: ms
|
||||
column_index_size_in_kb: 1
|
||||
|
||||
# Auto-scaling of the promoted index prevents running out of memory
|
||||
# when the promoted index grows too large (due to partitions with many rows
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -23,15 +23,113 @@ namespace cql3 {
|
||||
|
||||
namespace restrictions {
|
||||
|
||||
/// A set of discrete values.
|
||||
using value_list = std::vector<managed_bytes>; // Sorted and deduped using value comparator.
|
||||
|
||||
/// General set of values. Empty set and single-element sets are always value_list. interval is
|
||||
/// never singular and never has start > end. Universal set is a interval with both bounds null.
|
||||
using value_set = std::variant<value_list, interval<managed_bytes>>;
|
||||
|
||||
// For some boolean expression (say (X = 3) = TRUE, this represents a function that solves for X.
|
||||
// (here, it would return 3). The expression is obtained by equating some factors of the WHERE
|
||||
// clause to TRUE.
|
||||
using solve_for_t = std::function<value_set (const query_options&)>;
|
||||
|
||||
struct on_row {
|
||||
bool operator==(const on_row&) const = default;
|
||||
};
|
||||
|
||||
struct on_column {
|
||||
const column_definition* column;
|
||||
|
||||
bool operator==(const on_column&) const = default;
|
||||
};
|
||||
|
||||
// Placeholder type indicating we're solving for the partition key token.
|
||||
struct on_partition_key_token {
|
||||
const ::schema* schema;
|
||||
|
||||
bool operator==(const on_partition_key_token&) const = default;
|
||||
};
|
||||
|
||||
struct on_clustering_key_prefix {
|
||||
std::vector<const column_definition*> columns;
|
||||
|
||||
bool operator==(const on_clustering_key_prefix&) const = default;
|
||||
};
|
||||
|
||||
// A predicate on a column or a combination of columns. The WHERE clause analyzer
|
||||
// will attempt to convert predicates (that return true or false for a particular row)
|
||||
// to solvers (that return the set of column values that satisfy the predicate) when possible.
|
||||
struct predicate {
|
||||
// A function that returns the set of values that satisfy the filter. Can be unset,
|
||||
// in which case the filter must be interpreted.
|
||||
solve_for_t solve_for;
|
||||
// The original filter for this column.
|
||||
expr::expression filter;
|
||||
// What column the predicate can be solved for
|
||||
std::variant<
|
||||
on_row, // cannot determine, so predicate is on entire row
|
||||
on_column, // solving for a single column: e.g. c1 = 3
|
||||
on_partition_key_token, // solving for the token, e.g. token(pk1, pk2) >= :var
|
||||
on_clustering_key_prefix // solving for a clustering key prefix: e.g. (ck1, ck2) >= (3, 4)
|
||||
> on;
|
||||
// Whether the returned value_set will resolve to a single value.
|
||||
bool is_singleton = false;
|
||||
// Whether the returned value_set follows CQL comparison semantics
|
||||
bool comparable = true;
|
||||
bool is_multi_column = false;
|
||||
bool is_not_null_single_column = false;
|
||||
bool equality = false; // operator is EQ
|
||||
bool is_in = false; // operator is IN
|
||||
bool is_slice = false; // operator is LT/LTE/GT/GTE
|
||||
bool is_upper_bound = false; // operator is LT/LTE
|
||||
bool is_lower_bound = false; // operator is GT/GTE
|
||||
expr::comparison_order order = expr::comparison_order::cql;
|
||||
std::optional<expr::oper_t> op; // the binary operator, if any
|
||||
bool is_subscript = false; // whether the LHS is a subscript (map element access)
|
||||
};
|
||||
|
||||
///In some cases checking if columns have indexes is undesired of even
|
||||
///impossible, because e.g. the query runs on a pseudo-table, which does not
|
||||
///have an index-manager, or even a table object.
|
||||
using check_indexes = bool_class<class check_indexes_tag>;
|
||||
|
||||
// A function that returns the partition key ranges for a query. It is the solver of
|
||||
// WHERE clause fragments such as WHERE token(pk) > 1 or WHERE pk1 IN :list1 AND pk2 IN :list2.
|
||||
using get_partition_key_ranges_fn_t = std::function<dht::partition_range_vector (const query_options&)>;
|
||||
|
||||
// A function that returns the clustering key ranges for a query. It is the solver of
|
||||
// WHERE clause fragments such as WHERE ck > 1 or WHERE (ck1, ck2) > (1, 2).
|
||||
using get_clustering_bounds_fn_t = std::function<std::vector<query::clustering_range> (const query_options& options)>;
|
||||
|
||||
// A function that returns a singleton value, usable for a key (e.g. bytes_opt)
|
||||
using get_singleton_value_fn_t = std::function<bytes_opt (const query_options&)>;
|
||||
|
||||
struct no_partition_range_restrictions {
|
||||
};
|
||||
|
||||
struct token_range_restrictions {
|
||||
predicate token_restrictions;
|
||||
};
|
||||
|
||||
struct single_column_partition_range_restrictions {
|
||||
std::vector<predicate> per_column_restrictions;
|
||||
};
|
||||
|
||||
using partition_range_restrictions = std::variant<
|
||||
no_partition_range_restrictions,
|
||||
token_range_restrictions,
|
||||
single_column_partition_range_restrictions>;
|
||||
|
||||
// A map of per-column predicate vectors, ordered by schema position.
|
||||
using single_column_predicate_vectors = std::map<const column_definition*, std::vector<predicate>, expr::schema_pos_column_definition_comparator>;
|
||||
|
||||
/**
|
||||
* The restrictions corresponding to the relations specified on the where-clause of CQL query.
|
||||
*/
|
||||
class statement_restrictions {
|
||||
struct private_tag {}; // Tag for private constructor
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
|
||||
@@ -81,7 +179,7 @@ private:
|
||||
bool _has_queriable_regular_index = false, _has_queriable_pk_index = false, _has_queriable_ck_index = false;
|
||||
bool _has_multi_column; ///< True iff _clustering_columns_restrictions has a multi-column restriction.
|
||||
|
||||
std::optional<expr::expression> _where; ///< The entire WHERE clause.
|
||||
std::vector<expr::expression> _where; ///< The entire WHERE clause (factorized).
|
||||
|
||||
/// Parts of _where defining the clustering slice.
|
||||
///
|
||||
@@ -96,7 +194,7 @@ private:
|
||||
/// 4.4 elements other than the last have only EQ or IN atoms
|
||||
/// 4.5 the last element has only EQ, IN, or is_slice() atoms
|
||||
/// 5. if multi-column, then each element is a binary_operator
|
||||
std::vector<expr::expression> _clustering_prefix_restrictions;
|
||||
std::vector<predicate> _clustering_prefix_restrictions;
|
||||
|
||||
/// Like _clustering_prefix_restrictions, but for the indexing table (if this is an index-reading statement).
|
||||
/// Recall that the index-table CK is (token, PK, CK) of the base table for a global index and (indexed column,
|
||||
@@ -105,7 +203,7 @@ private:
|
||||
/// Elements are conjunctions of single-column binary operators with the same LHS.
|
||||
/// Element order follows the indexing-table clustering key.
|
||||
/// In case of a global index the first element's (token restriction) RHS is a dummy value, it is filled later.
|
||||
std::optional<std::vector<expr::expression>> _idx_tbl_ck_prefix;
|
||||
std::optional<std::vector<predicate>> _idx_tbl_ck_prefix;
|
||||
|
||||
/// Parts of _where defining the partition range.
|
||||
///
|
||||
@@ -113,16 +211,25 @@ private:
|
||||
/// binary_operators on token. If single-column restrictions define the partition range, each element holds
|
||||
/// restrictions for one partition column. Each partition column has a corresponding element, but the elements
|
||||
/// are in arbitrary order.
|
||||
std::vector<expr::expression> _partition_range_restrictions;
|
||||
partition_range_restrictions _partition_range_restrictions;
|
||||
|
||||
bool _partition_range_is_simple; ///< False iff _partition_range_restrictions imply a Cartesian product.
|
||||
|
||||
|
||||
check_indexes _check_indexes = check_indexes::yes;
|
||||
/// Columns that appear on the LHS of an EQ restriction (not IN).
|
||||
/// For multi-column EQ like (ck1, ck2) = (1, 2), all columns in the tuple are included.
|
||||
std::unordered_set<const column_definition*> _columns_with_eq;
|
||||
std::vector<const column_definition*> _column_defs_for_filtering;
|
||||
schema_ptr _view_schema;
|
||||
std::optional<secondary_index::index> _idx_opt;
|
||||
expr::expression _idx_restrictions = expr::conjunction({});
|
||||
get_partition_key_ranges_fn_t _get_partition_key_ranges_fn;
|
||||
get_clustering_bounds_fn_t _get_clustering_bounds_fn;
|
||||
get_clustering_bounds_fn_t _get_global_index_clustering_ranges_fn;
|
||||
get_clustering_bounds_fn_t _get_global_index_token_clustering_ranges_fn;
|
||||
get_clustering_bounds_fn_t _get_local_index_clustering_ranges_fn;
|
||||
get_singleton_value_fn_t _value_for_index_partition_key_fn;
|
||||
public:
|
||||
/**
|
||||
* Creates a new empty <code>StatementRestrictions</code>.
|
||||
@@ -130,9 +237,10 @@ public:
|
||||
* @param cfm the column family meta data
|
||||
* @return a new empty <code>StatementRestrictions</code>.
|
||||
*/
|
||||
statement_restrictions(schema_ptr schema, bool allow_filtering);
|
||||
statement_restrictions(private_tag, schema_ptr schema, bool allow_filtering);
|
||||
|
||||
friend statement_restrictions analyze_statement_restrictions(
|
||||
public:
|
||||
friend shared_ptr<const statement_restrictions> analyze_statement_restrictions(
|
||||
data_dictionary::database db,
|
||||
schema_ptr schema,
|
||||
statements::statement_type type,
|
||||
@@ -142,9 +250,15 @@ public:
|
||||
bool for_view,
|
||||
bool allow_filtering,
|
||||
check_indexes do_check_indexes);
|
||||
friend shared_ptr<const statement_restrictions> make_trivial_statement_restrictions(
|
||||
schema_ptr schema,
|
||||
bool allow_filtering);
|
||||
|
||||
private:
|
||||
statement_restrictions(data_dictionary::database db,
|
||||
// Important: objects of this class captures `this` extensively and so must remain non-copyable.
|
||||
statement_restrictions(const statement_restrictions&) = delete;
|
||||
statement_restrictions& operator=(const statement_restrictions&) = delete;
|
||||
statement_restrictions(private_tag,
|
||||
data_dictionary::database db,
|
||||
schema_ptr schema,
|
||||
statements::statement_type type,
|
||||
const expr::expression& where_clause,
|
||||
@@ -211,10 +325,7 @@ public:
|
||||
|
||||
bool has_token_restrictions() const;
|
||||
|
||||
// Checks whether the given column has an EQ restriction.
|
||||
// EQ restriction is `col = ...` or `(col, col2) = ...`
|
||||
// IN restriction is NOT an EQ restriction, this function will not look for IN restrictions.
|
||||
// Uses column_defintion::operator== for comparison, columns with the same name but different schema will not be equal.
|
||||
// Checks whether the given column has an EQ restriction (not IN).
|
||||
bool has_eq_restriction_on_column(const column_definition&) const;
|
||||
|
||||
/**
|
||||
@@ -224,12 +335,6 @@ public:
|
||||
*/
|
||||
std::vector<const column_definition*> get_column_defs_for_filtering(data_dictionary::database db) const;
|
||||
|
||||
/**
|
||||
* Gives a score that the index has - index with the highest score will be chosen
|
||||
* in find_idx()
|
||||
*/
|
||||
int score(const secondary_index::index& index) const;
|
||||
|
||||
/**
|
||||
* Determines the index to be used with the restriction.
|
||||
* @param db - the data_dictionary::database context (for extracting index manager)
|
||||
@@ -250,18 +355,8 @@ public:
|
||||
|
||||
size_t partition_key_restrictions_size() const;
|
||||
|
||||
bool parition_key_restrictions_have_supporting_index(const secondary_index::secondary_index_manager& index_manager, expr::allow_local_index allow_local) const;
|
||||
|
||||
size_t clustering_columns_restrictions_size() const;
|
||||
|
||||
bool clustering_columns_restrictions_have_supporting_index(
|
||||
const secondary_index::secondary_index_manager& index_manager,
|
||||
expr::allow_local_index allow_local) const;
|
||||
|
||||
bool multi_column_clustering_restrictions_are_supported_by(const secondary_index::index& index) const;
|
||||
|
||||
bounds_slice get_clustering_slice() const;
|
||||
|
||||
/**
|
||||
* Checks if the clustering key has some unrestricted components.
|
||||
* @return <code>true</code> if the clustering key has some unrestricted components, <code>false</code> otherwise.
|
||||
@@ -279,15 +374,6 @@ public:
|
||||
|
||||
schema_ptr get_view_schema() const { return _view_schema; }
|
||||
private:
|
||||
std::pair<std::optional<secondary_index::index>, expr::expression> do_find_idx(const secondary_index::secondary_index_manager& sim) const;
|
||||
void add_restriction(const expr::binary_operator& restr, schema_ptr schema, bool allow_filtering, bool for_view);
|
||||
void add_is_not_restriction(const expr::binary_operator& restr, schema_ptr schema, bool for_view);
|
||||
void add_single_column_parition_key_restriction(const expr::binary_operator& restr, schema_ptr schema, bool allow_filtering, bool for_view);
|
||||
void add_token_partition_key_restriction(const expr::binary_operator& restr);
|
||||
void add_single_column_clustering_key_restriction(const expr::binary_operator& restr, schema_ptr schema, bool allow_filtering);
|
||||
void add_multi_column_clustering_key_restriction(const expr::binary_operator& restr);
|
||||
void add_single_column_nonprimary_key_restriction(const expr::binary_operator& restr);
|
||||
|
||||
void process_partition_key_restrictions(bool for_view, bool allow_filtering, statements::statement_type type);
|
||||
|
||||
/**
|
||||
@@ -315,7 +401,17 @@ private:
|
||||
void add_clustering_restrictions_to_idx_ck_prefix(const schema& idx_tbl_schema);
|
||||
|
||||
unsigned int num_clustering_prefix_columns_that_need_not_be_filtered() const;
|
||||
void calculate_column_defs_for_filtering_and_erase_restrictions_used_for_index(data_dictionary::database db);
|
||||
void calculate_column_defs_for_filtering_and_erase_restrictions_used_for_index(
|
||||
data_dictionary::database db,
|
||||
const single_column_predicate_vectors& sc_pk_pred_vectors,
|
||||
const single_column_predicate_vectors& sc_ck_pred_vectors,
|
||||
const single_column_predicate_vectors& sc_nonpk_pred_vectors);
|
||||
get_partition_key_ranges_fn_t build_partition_key_ranges_fn() const;
|
||||
get_clustering_bounds_fn_t build_get_clustering_bounds_fn() const;
|
||||
get_clustering_bounds_fn_t build_get_global_index_clustering_ranges_fn() const;
|
||||
get_clustering_bounds_fn_t build_get_global_index_token_clustering_ranges_fn() const;
|
||||
get_clustering_bounds_fn_t build_get_local_index_clustering_ranges_fn() const;
|
||||
get_singleton_value_fn_t build_value_for_index_partition_key_fn() const;
|
||||
public:
|
||||
/**
|
||||
* Returns the specified range of the partition key.
|
||||
@@ -389,7 +485,10 @@ public:
|
||||
private:
|
||||
/// Prepares internal data for evaluating index-table queries. Must be called before
|
||||
/// get_local_index_clustering_ranges().
|
||||
void prepare_indexed_local(const schema& idx_tbl_schema);
|
||||
void prepare_indexed_local(const schema& idx_tbl_schema,
|
||||
const single_column_predicate_vectors& sc_pk_pred_vectors,
|
||||
const single_column_predicate_vectors& sc_ck_pred_vectors,
|
||||
const single_column_predicate_vectors& sc_nonpk_pred_vectors);
|
||||
|
||||
/// Prepares internal data for evaluating index-table queries. Must be called before
|
||||
/// get_global_index_clustering_ranges() or get_global_index_token_clustering_ranges().
|
||||
@@ -398,15 +497,18 @@ private:
|
||||
public:
|
||||
/// Calculates clustering ranges for querying a global-index table.
|
||||
std::vector<query::clustering_range> get_global_index_clustering_ranges(
|
||||
const query_options& options, const schema& idx_tbl_schema) const;
|
||||
const query_options& options) const;
|
||||
|
||||
/// Calculates clustering ranges for querying a global-index table for queries with token restrictions present.
|
||||
std::vector<query::clustering_range> get_global_index_token_clustering_ranges(
|
||||
const query_options& options, const schema& idx_tbl_schema) const;
|
||||
const query_options& options) const;
|
||||
|
||||
/// Calculates clustering ranges for querying a local-index table.
|
||||
std::vector<query::clustering_range> get_local_index_clustering_ranges(
|
||||
const query_options& options, const schema& idx_tbl_schema) const;
|
||||
const query_options& options) const;
|
||||
|
||||
/// Finds the value of partition key of the index table
|
||||
bytes_opt value_for_index_partition_key(const query_options&) const;
|
||||
|
||||
sstring to_string() const;
|
||||
|
||||
@@ -416,7 +518,7 @@ public:
|
||||
bool is_empty() const;
|
||||
};
|
||||
|
||||
statement_restrictions analyze_statement_restrictions(
|
||||
shared_ptr<const statement_restrictions> analyze_statement_restrictions(
|
||||
data_dictionary::database db,
|
||||
schema_ptr schema,
|
||||
statements::statement_type type,
|
||||
@@ -427,23 +529,14 @@ statement_restrictions analyze_statement_restrictions(
|
||||
bool allow_filtering,
|
||||
check_indexes do_check_indexes);
|
||||
|
||||
|
||||
// Extracts all binary operators which have the given column on their left hand side.
|
||||
// Extracts only single-column restrictions.
|
||||
// Does not include multi-column restrictions.
|
||||
// Does not include token() restrictions.
|
||||
// Does not include boolean constant restrictions.
|
||||
// For example "WHERE c = 1 AND (a, c) = (2, 1) AND token(p) < 2 AND FALSE" will return {"c = 1"}.
|
||||
std::vector<expr::expression> extract_single_column_restrictions_for_column(const expr::expression&, const column_definition&);
|
||||
shared_ptr<const statement_restrictions> make_trivial_statement_restrictions(
|
||||
schema_ptr schema,
|
||||
bool allow_filtering);
|
||||
|
||||
|
||||
// Checks whether this expression is empty - doesn't restrict anything
|
||||
bool is_empty_restriction(const expr::expression&);
|
||||
|
||||
// Finds the value of the given column in the expression
|
||||
// In case of multpiple possible values calls on_internal_error
|
||||
bytes_opt value_for(const column_definition&, const expr::expression&, const query_options&);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -90,6 +90,20 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
|
||||
auto& current_rf_per_dc = ks.metadata()->strategy_options();
|
||||
auto new_rf_per_dc = _attrs->get_replication_options();
|
||||
new_rf_per_dc.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
// Check if multi-RF change is allowed: all DC changes must be 0->N or N->0.
|
||||
auto all_changes_are_0_N = [&] {
|
||||
for (const auto& [dc, new_rf] : new_rf_per_dc) {
|
||||
auto old_rf_val = size_t(0);
|
||||
if (auto it = current_rf_per_dc.find(dc); it != current_rf_per_dc.end()) {
|
||||
old_rf_val = locator::get_replication_factor(it->second);
|
||||
}
|
||||
auto new_rf_val = locator::get_replication_factor(new_rf);
|
||||
if (old_rf_val != new_rf_val && old_rf_val != 0 && new_rf_val != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
unsigned total_abs_rfs_diff = 0;
|
||||
for (const auto& [new_dc, new_rf] : new_rf_per_dc) {
|
||||
auto old_rf = locator::replication_strategy_config_option(sstring("0"));
|
||||
@@ -103,7 +117,9 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
|
||||
// first we need to report non-existing DCs, then if RFs aren't changed by too much.
|
||||
continue;
|
||||
}
|
||||
if (total_abs_rfs_diff += get_abs_rf_diff(old_rf, new_rf); total_abs_rfs_diff >= 2) {
|
||||
if (total_abs_rfs_diff += get_abs_rf_diff(old_rf, new_rf); total_abs_rfs_diff >= 2 &&
|
||||
!(qp.proxy().features().keyspace_multi_rf_change && locator::uses_rack_list_exclusively(current_rf_per_dc)
|
||||
&& locator::uses_rack_list_exclusively(new_ks->strategy_options()) && all_changes_are_0_N())) {
|
||||
throw exceptions::invalid_request_exception("Only one DC's RF can be changed at a time and not by more than 1");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +89,10 @@ public:
|
||||
|
||||
const std::vector<single_statement>& statements() const { return _statements; }
|
||||
|
||||
audit::audit_info_ptr audit_info() const {
|
||||
return audit::audit::create_audit_info(audit::statement_category::DML, sstring(), sstring(), true);
|
||||
}
|
||||
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
@@ -411,10 +411,10 @@ bool ks_prop_defs::get_durable_writes() const {
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat, const db::config& cfg) {
|
||||
auto sc = get_replication_strategy_class().value();
|
||||
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0 for N.T.S. only
|
||||
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0. The strategy will
|
||||
// validate it and throw an error if it does not support tablets.
|
||||
auto enable_tablets = feat.tablets && cfg.enable_tablets_by_default();
|
||||
std::optional<unsigned> default_initial_tablets = enable_tablets && locator::abstract_replication_strategy::to_qualified_class_name(sc) == "org.apache.cassandra.locator.NetworkTopologyStrategy"
|
||||
? std::optional<unsigned>(0) : std::nullopt;
|
||||
std::optional<unsigned> default_initial_tablets = enable_tablets ? std::optional<unsigned>(0) : std::nullopt;
|
||||
auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets());
|
||||
bool uses_tablets = initial_tablets.has_value();
|
||||
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
|
||||
@@ -440,7 +440,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
|
||||
sc = old->strategy_name();
|
||||
options = old_options;
|
||||
}
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(old->name(), *sc, options, initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(old->name(), *sc, options, initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options(), {}, old->next_strategy_options_opt());
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -626,7 +626,7 @@ modification_statement::prepare(data_dictionary::database db, prepare_context& c
|
||||
// Since this cache is only meaningful for LWT queries, just clear the ids
|
||||
// if it's not a conditional statement so that the AST nodes don't
|
||||
// participate in the caching mechanism later.
|
||||
if (!prepared_stmt->has_conditions() && prepared_stmt->_restrictions.has_value()) {
|
||||
if (!prepared_stmt->has_conditions() && prepared_stmt->_restrictions) {
|
||||
ctx.clear_pk_function_calls_cache();
|
||||
}
|
||||
prepared_stmt->_may_use_token_aware_routing = ctx.get_partition_key_bind_indexes(*schema).size() != 0;
|
||||
|
||||
@@ -94,7 +94,7 @@ private:
|
||||
std::optional<bool> _is_raw_counter_shard_write;
|
||||
|
||||
protected:
|
||||
std::optional<restrictions::statement_restrictions> _restrictions;
|
||||
shared_ptr<const restrictions::statement_restrictions> _restrictions;
|
||||
public:
|
||||
typedef std::optional<std::unordered_map<sstring, bytes_opt>> json_cache_opt;
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ public:
|
||||
uint32_t bound_terms,
|
||||
lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
|
||||
bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator,
|
||||
|
||||
@@ -109,7 +109,7 @@ public:
|
||||
std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats, const cql_config& cfg, bool for_view);
|
||||
private:
|
||||
std::vector<selection::prepared_selector> maybe_jsonize_select_clause(std::vector<selection::prepared_selector> select, data_dictionary::database db, schema_ptr schema);
|
||||
::shared_ptr<restrictions::statement_restrictions> prepare_restrictions(
|
||||
::shared_ptr<const restrictions::statement_restrictions> prepare_restrictions(
|
||||
data_dictionary::database db,
|
||||
schema_ptr schema,
|
||||
prepare_context& ctx,
|
||||
|
||||
@@ -1027,7 +1027,7 @@ view_indexed_table_select_statement::prepare(data_dictionary::database db,
|
||||
uint32_t bound_terms,
|
||||
lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
|
||||
bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator,
|
||||
@@ -1139,7 +1139,7 @@ lw_shared_ptr<const service::pager::paging_state> view_indexed_table_select_stat
|
||||
auto& last_base_pk = last_pos.partition;
|
||||
auto* last_base_ck = last_pos.position.has_key() ? &last_pos.position.key() : nullptr;
|
||||
|
||||
bytes_opt indexed_column_value = restrictions::value_for(*cdef, _used_index_restrictions, options);
|
||||
bytes_opt indexed_column_value = _restrictions->value_for_index_partition_key(options);
|
||||
|
||||
auto index_pk = [&]() {
|
||||
if (_index.metadata().local()) {
|
||||
@@ -1350,12 +1350,7 @@ dht::partition_range_vector view_indexed_table_select_statement::get_partition_r
|
||||
dht::partition_range_vector view_indexed_table_select_statement::get_partition_ranges_for_global_index_posting_list(const query_options& options) const {
|
||||
dht::partition_range_vector partition_ranges;
|
||||
|
||||
const column_definition* cdef = _schema->get_column_definition(to_bytes(_index.target_column()));
|
||||
if (!cdef) {
|
||||
throw exceptions::invalid_request_exception("Indexed column not found in schema");
|
||||
}
|
||||
|
||||
bytes_opt value = restrictions::value_for(*cdef, _used_index_restrictions, options);
|
||||
bytes_opt value = _restrictions->value_for_index_partition_key(options);
|
||||
if (value) {
|
||||
auto pk = partition_key::from_single_value(*_view_schema, *value);
|
||||
auto dk = dht::decorate_key(*_view_schema, pk);
|
||||
@@ -1374,11 +1369,11 @@ query::partition_slice view_indexed_table_select_statement::get_partition_slice_
|
||||
// Only EQ restrictions on base partition key can be used in an index view query
|
||||
if (pk_restrictions_is_single && _restrictions->partition_key_restrictions_is_all_eq()) {
|
||||
partition_slice_builder.with_ranges(
|
||||
_restrictions->get_global_index_clustering_ranges(options, *_view_schema));
|
||||
_restrictions->get_global_index_clustering_ranges(options));
|
||||
} else if (_restrictions->has_token_restrictions()) {
|
||||
// Restrictions like token(p1, p2) < 0 have all partition key components restricted, but require special handling.
|
||||
partition_slice_builder.with_ranges(
|
||||
_restrictions->get_global_index_token_clustering_ranges(options, *_view_schema));
|
||||
_restrictions->get_global_index_token_clustering_ranges(options));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1389,7 +1384,7 @@ query::partition_slice view_indexed_table_select_statement::get_partition_slice_
|
||||
partition_slice_builder partition_slice_builder{*_view_schema};
|
||||
|
||||
partition_slice_builder.with_ranges(
|
||||
_restrictions->get_local_index_clustering_ranges(options, *_view_schema));
|
||||
_restrictions->get_local_index_clustering_ranges(options));
|
||||
|
||||
return partition_slice_builder.build();
|
||||
}
|
||||
@@ -1607,7 +1602,7 @@ public:
|
||||
uint32_t bound_terms,
|
||||
lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
|
||||
bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator,
|
||||
@@ -1645,7 +1640,7 @@ private:
|
||||
uint32_t bound_terms,
|
||||
lw_shared_ptr<const select_statement::parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
|
||||
bool is_reversed,
|
||||
parallelized_select_statement::ordering_comparator_type ordering_comparator,
|
||||
@@ -2076,7 +2071,7 @@ static select_statement::ordering_comparator_type get_similarity_ordering_compar
|
||||
|
||||
::shared_ptr<cql3::statements::select_statement> vector_indexed_table_select_statement::prepare(data_dictionary::database db, schema_ptr schema,
|
||||
uint32_t bound_terms, lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<attributes> attrs) {
|
||||
|
||||
@@ -2589,7 +2584,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
return make_unique<prepared_statement>(audit_info(), std::move(stmt), ctx, std::move(partition_key_bind_indices), std::move(warnings));
|
||||
}
|
||||
|
||||
::shared_ptr<restrictions::statement_restrictions>
|
||||
::shared_ptr<const restrictions::statement_restrictions>
|
||||
select_statement::prepare_restrictions(data_dictionary::database db,
|
||||
schema_ptr schema,
|
||||
prepare_context& ctx,
|
||||
@@ -2599,8 +2594,8 @@ select_statement::prepare_restrictions(data_dictionary::database db,
|
||||
restrictions::check_indexes do_check_indexes)
|
||||
{
|
||||
try {
|
||||
return ::make_shared<restrictions::statement_restrictions>(restrictions::analyze_statement_restrictions(db, schema, statement_type::SELECT, _where_clause, ctx,
|
||||
selection->contains_only_static_columns(), for_view, allow_filtering, do_check_indexes));
|
||||
return restrictions::analyze_statement_restrictions(db, schema, statement_type::SELECT, _where_clause, ctx,
|
||||
selection->contains_only_static_columns(), for_view, allow_filtering, do_check_indexes);
|
||||
} catch (const exceptions::unrecognized_entity_exception& e) {
|
||||
if (contains_alias(e.entity)) {
|
||||
throw exceptions::invalid_request_exception(format("Aliases aren't allowed in the WHERE clause (name: '{}')", e.entity));
|
||||
|
||||
@@ -200,7 +200,7 @@ public:
|
||||
uint32_t bound_terms,
|
||||
lw_shared_ptr<const parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions,
|
||||
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
|
||||
bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator,
|
||||
@@ -372,7 +372,7 @@ public:
|
||||
|
||||
static ::shared_ptr<cql3::statements::select_statement> prepare(data_dictionary::database db, schema_ptr schema, uint32_t bound_terms,
|
||||
lw_shared_ptr<const parameters> parameters, ::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
|
||||
::shared_ptr<const restrictions::statement_restrictions> restrictions, ::shared_ptr<std::vector<size_t>> group_by_cell_indices, bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator, prepared_ann_ordering_type prepared_ann_ordering, std::optional<expr::expression> limit,
|
||||
std::optional<expr::expression> per_partition_limit, cql_stats& stats, const secondary_index::index& index, std::unique_ptr<cql3::attributes> attrs);
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ public:
|
||||
: update_statement(std::move(audit_info), statement_type::INSERT, bound_terms, s, std::move(attrs), stats)
|
||||
, _value(std::move(v))
|
||||
, _default_unset(default_unset) {
|
||||
_restrictions = restrictions::statement_restrictions(s, false);
|
||||
_restrictions = cql3::restrictions::make_trivial_statement_restrictions(s, false);
|
||||
}
|
||||
private:
|
||||
virtual void execute_operations_for_key(mutation& m, const clustering_key_prefix& prefix, const update_parameters& params, const json_cache_opt& json_cache) const override;
|
||||
|
||||
@@ -224,10 +224,12 @@ keyspace_metadata::keyspace_metadata(std::string_view name,
|
||||
bool durable_writes,
|
||||
std::vector<schema_ptr> cf_defs,
|
||||
user_types_metadata user_types,
|
||||
storage_options storage_opts)
|
||||
storage_options storage_opts,
|
||||
std::optional<locator::replication_strategy_config_options> next_options)
|
||||
: _name{name}
|
||||
, _strategy_name{locator::abstract_replication_strategy::to_qualified_class_name(strategy_name.empty() ? "NetworkTopologyStrategy" : strategy_name)}
|
||||
, _strategy_options{std::move(strategy_options)}
|
||||
, _next_strategy_options{std::move(next_options)}
|
||||
, _initial_tablets(initial_tablets)
|
||||
, _durable_writes{durable_writes}
|
||||
, _user_types{std::move(user_types)}
|
||||
@@ -273,14 +275,15 @@ keyspace_metadata::new_keyspace(std::string_view name,
|
||||
std::optional<consistency_config_option> consistency_option,
|
||||
bool durables_writes,
|
||||
storage_options storage_opts,
|
||||
std::vector<schema_ptr> cf_defs)
|
||||
std::vector<schema_ptr> cf_defs,
|
||||
std::optional<locator::replication_strategy_config_options> next_options)
|
||||
{
|
||||
return ::make_lw_shared<keyspace_metadata>(name, strategy_name, options, initial_tablets, consistency_option, durables_writes, cf_defs, user_types_metadata{}, storage_opts);
|
||||
return ::make_lw_shared<keyspace_metadata>(name, strategy_name, options, initial_tablets, consistency_option, durables_writes, cf_defs, user_types_metadata{}, storage_opts, next_options);
|
||||
}
|
||||
|
||||
lw_shared_ptr<keyspace_metadata>
|
||||
keyspace_metadata::new_keyspace(const keyspace_metadata& ksm) {
|
||||
return new_keyspace(ksm.name(), ksm.strategy_name(), ksm.strategy_options(), ksm.initial_tablets(), ksm.consistency_option(), ksm.durable_writes(), ksm.get_storage_options());
|
||||
return new_keyspace(ksm.name(), ksm.strategy_name(), ksm.strategy_options(), ksm.initial_tablets(), ksm.consistency_option(), ksm.durable_writes(), ksm.get_storage_options(), {}, ksm.next_strategy_options_opt());
|
||||
}
|
||||
|
||||
void keyspace_metadata::add_user_type(const user_type ut) {
|
||||
@@ -649,8 +652,8 @@ struct fmt::formatter<data_dictionary::user_types_metadata> {
|
||||
};
|
||||
|
||||
auto fmt::formatter<data_dictionary::keyspace_metadata>::format(const data_dictionary::keyspace_metadata& m, fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
||||
fmt::format_to(ctx.out(), "KSMetaData{{name={}, strategyClass={}, strategyOptions={}, cfMetaData={}, durable_writes={}, tablets=",
|
||||
m.name(), m.strategy_name(), m.strategy_options(), m.cf_meta_data(), m.durable_writes());
|
||||
fmt::format_to(ctx.out(), "KSMetaData{{name={}, strategyClass={}, strategyOptions={}, nextStrategyOptions={}, cfMetaData={}, durable_writes={}, tablets=",
|
||||
m.name(), m.strategy_name(), m.strategy_options(), m.next_strategy_options_opt(), m.cf_meta_data(), m.durable_writes());
|
||||
if (m.initial_tablets()) {
|
||||
if (auto initial_tablets = m.initial_tablets().value()) {
|
||||
fmt::format_to(ctx.out(), "{{\"initial\":{}}}", initial_tablets);
|
||||
|
||||
@@ -28,7 +28,9 @@ namespace data_dictionary {
|
||||
class keyspace_metadata final {
|
||||
sstring _name;
|
||||
sstring _strategy_name;
|
||||
// If _next_strategy_options has value, there is ongoing rf change of this keyspace.
|
||||
locator::replication_strategy_config_options _strategy_options;
|
||||
std::optional<locator::replication_strategy_config_options> _next_strategy_options;
|
||||
std::optional<unsigned> _initial_tablets;
|
||||
std::unordered_map<sstring, schema_ptr> _cf_meta_data;
|
||||
bool _durable_writes;
|
||||
@@ -44,7 +46,8 @@ public:
|
||||
bool durable_writes,
|
||||
std::vector<schema_ptr> cf_defs = std::vector<schema_ptr>{},
|
||||
user_types_metadata user_types = user_types_metadata{},
|
||||
storage_options storage_opts = storage_options{});
|
||||
storage_options storage_opts = storage_options{},
|
||||
std::optional<locator::replication_strategy_config_options> next_options = std::nullopt);
|
||||
static lw_shared_ptr<keyspace_metadata>
|
||||
new_keyspace(std::string_view name,
|
||||
std::string_view strategy_name,
|
||||
@@ -53,7 +56,8 @@ public:
|
||||
std::optional<consistency_config_option> consistency_option,
|
||||
bool durables_writes = true,
|
||||
storage_options storage_opts = {},
|
||||
std::vector<schema_ptr> cf_defs = {});
|
||||
std::vector<schema_ptr> cf_defs = {},
|
||||
std::optional<locator::replication_strategy_config_options> next_options = std::nullopt);
|
||||
static lw_shared_ptr<keyspace_metadata>
|
||||
new_keyspace(const keyspace_metadata& ksm);
|
||||
void validate(const gms::feature_service&, const locator::topology&) const;
|
||||
@@ -66,6 +70,18 @@ public:
|
||||
const locator::replication_strategy_config_options& strategy_options() const {
|
||||
return _strategy_options;
|
||||
}
|
||||
void set_strategy_options(const locator::replication_strategy_config_options& options) {
|
||||
_strategy_options = options;
|
||||
}
|
||||
const std::optional<locator::replication_strategy_config_options>& next_strategy_options_opt() const {
|
||||
return _next_strategy_options;
|
||||
}
|
||||
void set_next_strategy_options(const locator::replication_strategy_config_options& options) {
|
||||
_next_strategy_options = options;
|
||||
}
|
||||
void clear_next_strategy_options() {
|
||||
_next_strategy_options = std::nullopt;
|
||||
}
|
||||
locator::replication_strategy_config_options strategy_options_v1() const;
|
||||
std::optional<unsigned> initial_tablets() const {
|
||||
return _initial_tablets;
|
||||
|
||||
@@ -277,7 +277,7 @@ filter_for_query(consistency_level cl,
|
||||
|
||||
host_id_vector_replica_set selected_endpoints;
|
||||
|
||||
// Pre-select endpoints based on client preference. If the endpoints
|
||||
// Preselect endpoints based on client preference. If the endpoints
|
||||
// selected this way aren't enough to satisfy CL requirements select the
|
||||
// remaining ones according to the load-balancing strategy as before.
|
||||
if (!preferred_endpoints.empty()) {
|
||||
|
||||
@@ -33,6 +33,11 @@ enum class schema_feature {
|
||||
|
||||
// Per-table tablet options
|
||||
TABLET_OPTIONS,
|
||||
|
||||
// When enabled, `system_schema.keyspaces` will keep three replication values:
|
||||
// the initial, the current, and the target replication factor,
|
||||
// which reflect the phases of the multi RF change.
|
||||
KEYSPACE_MULTI_RF_CHANGE,
|
||||
};
|
||||
|
||||
using schema_features = enum_set<super_enum<schema_feature,
|
||||
@@ -43,7 +48,8 @@ using schema_features = enum_set<super_enum<schema_feature,
|
||||
schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
schema_feature::GROUP0_SCHEMA_VERSIONING,
|
||||
schema_feature::IN_MEMORY_TABLES,
|
||||
schema_feature::TABLET_OPTIONS
|
||||
schema_feature::TABLET_OPTIONS,
|
||||
schema_feature::KEYSPACE_MULTI_RF_CHANGE
|
||||
>>;
|
||||
|
||||
}
|
||||
|
||||
@@ -216,6 +216,7 @@ schema_ptr keyspaces() {
|
||||
{"durable_writes", boolean_type},
|
||||
{"replication", map_type_impl::get_instance(utf8_type, utf8_type, false)},
|
||||
{"replication_v2", map_type_impl::get_instance(utf8_type, utf8_type, false)}, // with rack list RF
|
||||
{"next_replication", map_type_impl::get_instance(utf8_type, utf8_type, false)}, // target rack list RF for this RF change
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
@@ -1178,6 +1179,14 @@ utils::chunked_vector<mutation> make_create_keyspace_mutations(schema_features f
|
||||
// If the maps are different, the upgrade must be already done.
|
||||
store_map(m, ckey, "replication_v2", timestamp, cql3::statements::to_flattened_map(map));
|
||||
}
|
||||
if (features.contains<schema_feature::KEYSPACE_MULTI_RF_CHANGE>()) {
|
||||
const auto& next_map_opt = keyspace->next_strategy_options_opt();
|
||||
if (next_map_opt) {
|
||||
auto next_map = *next_map_opt;
|
||||
next_map["class"] = keyspace->strategy_name();
|
||||
store_map(m, ckey, "next_replication", timestamp, cql3::statements::to_flattened_map(next_map));
|
||||
}
|
||||
}
|
||||
|
||||
if (features.contains<schema_feature::SCYLLA_KEYSPACES>()) {
|
||||
schema_ptr scylla_keyspaces_s = scylla_keyspaces();
|
||||
@@ -1251,6 +1260,7 @@ future<lw_shared_ptr<keyspace_metadata>> create_keyspace_metadata(
|
||||
// (or screw up shared pointers)
|
||||
const auto& replication = row.get_nonnull<map_type_impl::native_type>("replication");
|
||||
const auto& replication_v2 = row.get<map_type_impl::native_type>("replication_v2");
|
||||
const auto& next_replication = row.get<map_type_impl::native_type>("next_replication");
|
||||
|
||||
cql3::statements::property_definitions::map_type flat_strategy_options;
|
||||
for (auto& p : replication_v2 ? *replication_v2 : replication) {
|
||||
@@ -1259,6 +1269,17 @@ future<lw_shared_ptr<keyspace_metadata>> create_keyspace_metadata(
|
||||
auto strategy_options = cql3::statements::from_flattened_map(flat_strategy_options);
|
||||
auto strategy_name = std::get<sstring>(strategy_options["class"]);
|
||||
strategy_options.erase("class");
|
||||
|
||||
std::optional<cql3::statements::property_definitions::extended_map_type> next_strategy_options = std::nullopt;
|
||||
if (next_replication) {
|
||||
cql3::statements::property_definitions::map_type flat_next_replication;
|
||||
for (auto& p : *next_replication) {
|
||||
flat_next_replication.emplace(value_cast<sstring>(p.first), value_cast<sstring>(p.second));
|
||||
}
|
||||
next_strategy_options = cql3::statements::from_flattened_map(flat_next_replication);
|
||||
next_strategy_options->erase("class");
|
||||
}
|
||||
|
||||
bool durable_writes = row.get_nonnull<bool>("durable_writes");
|
||||
|
||||
data_dictionary::storage_options storage_opts;
|
||||
@@ -1284,7 +1305,7 @@ future<lw_shared_ptr<keyspace_metadata>> create_keyspace_metadata(
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return keyspace_metadata::new_keyspace(keyspace_name, strategy_name, strategy_options, initial_tablets, consistency, durable_writes, storage_opts);
|
||||
co_return keyspace_metadata::new_keyspace(keyspace_name, strategy_name, strategy_options, initial_tablets, consistency, durable_writes, storage_opts, {}, next_strategy_options);
|
||||
}
|
||||
|
||||
template<typename V>
|
||||
|
||||
@@ -300,6 +300,7 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("upgrade_state", utf8_type, column_kind::static_column)
|
||||
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.with_column("ongoing_rf_changes", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.set_comment("Current state of topology change machine")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
@@ -3350,6 +3351,12 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
}
|
||||
}
|
||||
|
||||
if (some_row.has("ongoing_rf_changes")) {
|
||||
for (auto&& v : deserialize_set_column(*topology(), some_row, "ongoing_rf_changes")) {
|
||||
ret.ongoing_rf_changes.insert(value_cast<utils::UUID>(v));
|
||||
}
|
||||
}
|
||||
|
||||
if (some_row.has("enabled_features")) {
|
||||
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
}
|
||||
|
||||
@@ -1584,9 +1584,11 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
|
||||
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
||||
if (tombstone && _existing && !_existing->is_end_of_partition()) {
|
||||
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
|
||||
if (_existing->is_clustering_row()) {
|
||||
if (_existing->is_range_tombstone_change()) {
|
||||
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
|
||||
} else if (_existing->is_clustering_row()) {
|
||||
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
|
||||
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
} else if (_existing->is_static_row()) {
|
||||
@@ -1597,9 +1599,10 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
return should_stop_updates() ? stop() : advance_existings();
|
||||
}
|
||||
|
||||
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
|
||||
if (_update && !_update->is_end_of_partition()) {
|
||||
if (_update->is_clustering_row()) {
|
||||
if (_update->is_range_tombstone_change()) {
|
||||
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
|
||||
} else if (_update->is_clustering_row()) {
|
||||
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
});
|
||||
|
||||
@@ -240,6 +240,9 @@ future<> view_update_generator::process_staging_sstables(lw_shared_ptr<replica::
|
||||
_progress_tracker->on_sstable_registration(sst);
|
||||
}
|
||||
|
||||
utils::get_local_injector().inject("view_update_generator_pause_before_processing",
|
||||
utils::wait_for_message(std::chrono::minutes(5))).get();
|
||||
|
||||
// Generate view updates from staging sstables
|
||||
auto start_time = db_clock::now();
|
||||
auto [result, input_size] = generate_updates_from_staging_sstables(table, sstables);
|
||||
|
||||
@@ -271,7 +271,7 @@ The json structure is as follows:
|
||||
}
|
||||
|
||||
The `manifest` member contains the following attributes:
|
||||
- `version` - respresenting the version of the manifest itself. It is incremented when members are added or removed from the manifest.
|
||||
- `version` - representing the version of the manifest itself. It is incremented when members are added or removed from the manifest.
|
||||
- `scope` - the scope of metadata stored in this manifest file. The following scopes are supported:
|
||||
- `node` - the manifest describes all SSTables owned by this node in this snapshot.
|
||||
|
||||
|
||||
@@ -12,7 +12,9 @@ Schema:
|
||||
CREATE TABLE system_schema.keyspaces (
|
||||
keyspace_name text PRIMARY KEY,
|
||||
durable_writes boolean,
|
||||
replication frozen<map<text, text>>
|
||||
replication frozen<map<text, text>>,
|
||||
replication_v2 frozen<map<text, text>>,
|
||||
next_replication frozen<map<text, text>>
|
||||
)
|
||||
```
|
||||
|
||||
@@ -31,6 +33,8 @@ Columns:
|
||||
stored as a flattened map of the extended options map (see below).
|
||||
|
||||
For `SimpleStrategy` there is a single option `"replication_factor"` specifying the replication factor.
|
||||
* `next_replication` - the target replication factor for the keyspace during rf change.
|
||||
If there is no ongoing rf change, `next_replication` value is not set.
|
||||
|
||||
Extended options map used by NetworkTopologyStrategy is a map where values can be either strings or lists of strings.
|
||||
|
||||
|
||||
@@ -146,6 +146,25 @@ AWS Security Token Service (STS) or the EC2 Instance Metadata Service.
|
||||
- When set, these values are used by the S3 client to sign requests.
|
||||
- If not set, requests are sent unsigned, which may not be accepted by all servers.
|
||||
|
||||
.. _admin-oci-object-storage:
|
||||
|
||||
Using Oracle OCI Object Storage
|
||||
=================================
|
||||
|
||||
Oracle Cloud Infrastructure (OCI) Object Storage is compatible with the Amazon
|
||||
S3 API, so it works with ScyllaDB without additional configuration.
|
||||
|
||||
To use OCI Object Storage, follow the same configuration as for AWS S3, and
|
||||
specify your OCI S3-compatible endpoint.
|
||||
|
||||
Example:
|
||||
|
||||
.. code:: yaml
|
||||
|
||||
object_storage_endpoints:
|
||||
- name: https://idedxcgnkfkt.compat.objectstorage.us-ashburn-1.oci.customer-oci.com:443
|
||||
aws_region: us-ashburn-1
|
||||
|
||||
.. _admin-compression:
|
||||
|
||||
Compression
|
||||
|
||||
@@ -231,6 +231,46 @@ Add New DC
|
||||
|
||||
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
|
||||
|
||||
If the keyspace uses rack list replication, update the replication factor in one ``ALTER KEYSPACE`` statement, under the following rules:
|
||||
* Existing datacenters must keep their current replication factor.
|
||||
* A new datacenter can be assigned a replication factor (**0 to N**).
|
||||
* An existing datacenter can be removed (**N to 0**).
|
||||
|
||||
.. warning::
|
||||
|
||||
While adding a new datacenter and altering keyspaces, do **not** perform any reads or writes that involve the new datacenter.
|
||||
In particular, avoid using global consistency levels (such as ``ALL``, ``EACH_QUORUM``) that would include the new datacenter in the operation.
|
||||
Use ``LOCAL_*`` consistency levels (e.g., ``LOCAL_QUORUM``, ``LOCAL_ONE``) until the new datacenter is fully operational.
|
||||
|
||||
Before
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DESCRIBE KEYSPACE mykeyspace4;
|
||||
|
||||
CREATE KEYSPACE mykeyspace4 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>']} AND tablets = { 'enabled': true };
|
||||
|
||||
The following is **not** allowed because it changes the replication factor of ``<existing_dc>`` (adds ``<existing_rack4>``) and adds ``<new_dc>`` in the same statement:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
ALTER KEYSPACE mykeyspace4 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>', '<existing_rack4>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
|
||||
|
||||
Add all the nodes to the new datacenter and then:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
ALTER KEYSPACE mykeyspace4 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
|
||||
|
||||
After
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DESCRIBE KEYSPACE mykeyspace4;
|
||||
CREATE KEYSPACE mykeyspace4 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
|
||||
|
||||
You can abort the keyspace alteration using :doc:`Task manager </operating-scylla/admin-tools/task-manager>`.
|
||||
|
||||
#. If any vnode keyspace was altered, run ``nodetool rebuild`` on each node in the new datacenter, specifying the existing datacenter name in the rebuild command.
|
||||
|
||||
For example:
|
||||
|
||||
@@ -102,6 +102,34 @@ Procedure
|
||||
|
||||
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
|
||||
|
||||
If the keyspace uses rack list replication, update the replication factor in one ``ALTER KEYSPACE`` statement, under the following rules:
|
||||
* Existing datacenters must keep their current replication factor.
|
||||
* An existing datacenter can be removed (**N to 0**).
|
||||
* A new datacenter can be assigned a replication factor (**0 to N**).
|
||||
|
||||
.. warning::
|
||||
|
||||
While removing a datacenter and altering keyspaces, do **not** perform any reads or writes that involve the datacenter being removed.
|
||||
In particular, avoid using global consistency levels (such as ``ALL``, ``EACH_QUORUM``) that would include the decommissioned datacenter in the operation.
|
||||
Use ``LOCAL_*`` consistency levels (e.g., ``LOCAL_QUORUM``, ``LOCAL_ONE``) until the datacenter is fully decommissioned.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh> DESCRIBE nba4
|
||||
cqlsh> CREATE KEYSPACE nba4 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : ['RAC4', 'RAC5'], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
|
||||
|
||||
The following is **not** allowed because it changes the replication factor of ``EUROPE-DC`` (adds ``RAC9``) and removes ``ASIA-DC`` in the same statement:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh> ALTER KEYSPACE nba4 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : [], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8', 'RAC9']} AND tablets = { 'enabled': true };
|
||||
|
||||
Remove all replicas from the decommissioned datacenter:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh> ALTER KEYSPACE nba4 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : [], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
|
||||
|
||||
.. note::
|
||||
|
||||
If table audit is enabled, the ``audit`` keyspace is automatically created with ``NetworkTopologyStrategy``.
|
||||
@@ -113,6 +141,10 @@ Procedure
|
||||
|
||||
Failure to do so will result in decommission errors such as "zero replica after the removal".
|
||||
|
||||
.. warning::
|
||||
|
||||
Removal of replicas from a datacenter cannot be aborted. To get back to the previous replication, wait until the ALTER KEYSPACE finishes and then add the replicas back by running another ALTER KEYSPACE statement.
|
||||
|
||||
#. Run :doc:`nodetool decommission </operating-scylla/nodetool-commands/decommission>` on every node in the data center that is to be removed.
|
||||
Refer to :doc:`Remove a Node from a ScyllaDB Cluster - Down Scale </operating-scylla/procedures/cluster-management/remove-node>` for further information.
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/smp.hh>
|
||||
#include "db/schema_features.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "gms/feature.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
@@ -179,6 +180,7 @@ db::schema_features feature_service::cluster_schema_features() const {
|
||||
f.set<db::schema_feature::GROUP0_SCHEMA_VERSIONING>();
|
||||
f.set_if<db::schema_feature::IN_MEMORY_TABLES>(bool(in_memory_tables));
|
||||
f.set_if<db::schema_feature::TABLET_OPTIONS>(bool(tablet_options));
|
||||
f.set_if<db::schema_feature::KEYSPACE_MULTI_RF_CHANGE>(bool(keyspace_multi_rf_change));
|
||||
return f;
|
||||
}
|
||||
|
||||
|
||||
@@ -182,6 +182,7 @@ public:
|
||||
gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv };
|
||||
gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv };
|
||||
gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv };
|
||||
gms::feature keyspace_multi_rf_change { *this, "KEYSPACE_MULTI_RF_CHANGE"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
@@ -381,6 +381,10 @@ public:
|
||||
return _nodes.at(node)._du.capacity;
|
||||
}
|
||||
|
||||
bool has_node(host_id node) const {
|
||||
return _nodes.contains(node);
|
||||
}
|
||||
|
||||
shard_id get_shard_count(host_id node) const {
|
||||
if (!_nodes.contains(node)) {
|
||||
return 0;
|
||||
|
||||
@@ -153,19 +153,27 @@ struct hash<locator::range_based_tablet_id> {
|
||||
|
||||
namespace locator {
|
||||
|
||||
/// Creates a new replica set with old_replica replaced by new_replica.
|
||||
/// If there is no old_replica, the set is returned unchanged.
|
||||
/// Returns a copy of the replica set with the following modifications:
|
||||
/// - If both old_replica and new_replica are set, old_replica is substituted
|
||||
/// with new_replica. If old_replica is not found in rs, the set is returned as-is.
|
||||
/// - If only old_replica is set, it is removed from the result.
|
||||
/// - If only new_replica is set, it is appended to the result.
|
||||
inline
|
||||
tablet_replica_set replace_replica(const tablet_replica_set& rs, tablet_replica old_replica, tablet_replica new_replica) {
|
||||
tablet_replica_set replace_replica(const tablet_replica_set& rs, std::optional<tablet_replica> old_replica, std::optional<tablet_replica> new_replica) {
|
||||
tablet_replica_set result;
|
||||
result.reserve(rs.size());
|
||||
for (auto&& r : rs) {
|
||||
if (r == old_replica) {
|
||||
result.push_back(new_replica);
|
||||
if (old_replica.has_value() && r == old_replica.value()) {
|
||||
if (new_replica.has_value()) {
|
||||
result.push_back(new_replica.value());
|
||||
}
|
||||
} else {
|
||||
result.push_back(r);
|
||||
}
|
||||
}
|
||||
if (!old_replica.has_value() && new_replica.has_value()) {
|
||||
result.push_back(new_replica.value());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -383,8 +391,8 @@ bool is_post_cleanup(tablet_replica replica, const tablet_info& tinfo, const tab
|
||||
struct tablet_migration_info {
|
||||
locator::tablet_transition_kind kind;
|
||||
locator::global_tablet_id tablet;
|
||||
locator::tablet_replica src;
|
||||
locator::tablet_replica dst;
|
||||
std::optional<locator::tablet_replica> src;
|
||||
std::optional<locator::tablet_replica> dst;
|
||||
};
|
||||
|
||||
class tablet_map;
|
||||
|
||||
2
main.cc
2
main.cc
@@ -942,7 +942,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
auto background_reclaim_scheduling_group = create_scheduling_group("background_reclaim", "bgre", 50).get();
|
||||
|
||||
// Maintenance supergroup -- the collection of background low-prio activites
|
||||
// Maintenance supergroup -- the collection of background low-prio activities
|
||||
auto maintenance_supergroup = create_scheduling_supergroup(200).get();
|
||||
auto bandwidth_updater = io_throughput_updater("maintenance supergroup", maintenance_supergroup,
|
||||
cfg->maintenance_io_throughput_mb_per_sec.is_set() ? cfg->maintenance_io_throughput_mb_per_sec : cfg->stream_io_throughput_mb_per_sec);
|
||||
|
||||
@@ -16,6 +16,8 @@ Usage:
|
||||
import argparse, os, sys
|
||||
from typing import Sequence
|
||||
|
||||
from test.pylib.driver_utils import safe_driver_shutdown
|
||||
|
||||
def read_statements(path: str) -> list[tuple[int, str]]:
|
||||
stms: list[tuple[int, str]] = []
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
@@ -56,7 +58,7 @@ def exec_statements(statements: list[tuple[int, str]], socket_path: str, timeout
|
||||
print(f"ERROR executing statement from file line {lineno}: {s}\n{e}", file=sys.stderr)
|
||||
return 1
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
safe_driver_shutdown(cluster)
|
||||
return 0
|
||||
|
||||
def main(argv: Sequence[str]) -> int:
|
||||
|
||||
@@ -269,6 +269,10 @@ public:
|
||||
// Gets the view a sstable currently belongs to.
|
||||
compaction::compaction_group_view& view_for_sstable(const sstables::shared_sstable& sst) const;
|
||||
utils::small_vector<compaction::compaction_group_view*, 3> all_views() const;
|
||||
// Returns true iff v is the repaired view of this compaction group.
|
||||
bool is_repaired_view(const compaction::compaction_group_view* v) const noexcept;
|
||||
// Returns an sstable set containing only repaired sstables (those classified as repaired).
|
||||
lw_shared_ptr<sstables::sstable_set> make_repaired_sstable_set() const;
|
||||
|
||||
seastar::condition_variable& get_staging_done_condition() noexcept {
|
||||
return _staging_done_condition;
|
||||
@@ -404,6 +408,8 @@ public:
|
||||
|
||||
// Make an sstable set spanning all sstables in the storage_group
|
||||
lw_shared_ptr<const sstables::sstable_set> make_sstable_set() const;
|
||||
// Like make_sstable_set(), but restricted to repaired sstables only across all compaction groups.
|
||||
lw_shared_ptr<const sstables::sstable_set> make_repaired_sstable_set() const;
|
||||
|
||||
future<utils::chunked_vector<logstor::segment_snapshot>> take_logstor_snapshot() const;
|
||||
|
||||
|
||||
@@ -1006,7 +1006,7 @@ future<database::keyspace_change_per_shard> database::prepare_update_keyspace_on
|
||||
co_await modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) -> future<> {
|
||||
auto& ks = db.find_keyspace(ksm.name());
|
||||
auto new_ksm = ::make_lw_shared<keyspace_metadata>(ksm.name(), ksm.strategy_name(), ksm.strategy_options(), ksm.initial_tablets(), ksm.consistency_option(), ksm.durable_writes(),
|
||||
ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to<std::vector>(), ks.metadata()->user_types(), ksm.get_storage_options());
|
||||
ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to<std::vector>(), ks.metadata()->user_types(), ksm.get_storage_options(), ksm.next_strategy_options_opt());
|
||||
|
||||
auto change = co_await db.prepare_update_keyspace(ks, new_ksm, pending_token_metadata.local());
|
||||
changes[this_shard_id()] = make_foreign(std::make_unique<keyspace_change>(std::move(change)));
|
||||
|
||||
@@ -757,6 +757,10 @@ private:
|
||||
// groups during tablet split with overlapping token range, and we need to include them all in a single
|
||||
// sstable set to allow safe tombstone gc.
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc(const compaction_group&) const;
|
||||
// Like sstable_set_for_tombstone_gc(), but restricted to repaired sstables only across all compaction
|
||||
// groups of the same tablet (storage group). Used by the tombstone_gc=repair optimization to avoid
|
||||
// scanning unrepaired sstables when looking for GC-blocking shadows.
|
||||
lw_shared_ptr<const sstables::sstable_set> make_repaired_sstable_set_for_tombstone_gc(const compaction_group&) const;
|
||||
|
||||
bool cache_enabled() const {
|
||||
return _config.enable_cache && _schema->caching_options().enabled();
|
||||
|
||||
@@ -1203,11 +1203,35 @@ future<utils::chunked_vector<logstor::segment_snapshot>> storage_group::take_log
|
||||
co_return std::move(snp);
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstables::sstable_set> storage_group::make_repaired_sstable_set() const {
|
||||
if (_split_ready_groups.empty() && _merging_groups.empty()) {
|
||||
return _main_cg->make_repaired_sstable_set();
|
||||
}
|
||||
const auto& schema = _main_cg->_t.schema();
|
||||
std::vector<lw_shared_ptr<sstables::sstable_set>> underlying;
|
||||
underlying.reserve(1 + _merging_groups.size() + _split_ready_groups.size());
|
||||
underlying.emplace_back(_main_cg->make_repaired_sstable_set());
|
||||
for (const auto& cg : _merging_groups) {
|
||||
if (!cg->empty()) {
|
||||
underlying.emplace_back(cg->make_repaired_sstable_set());
|
||||
}
|
||||
}
|
||||
for (const auto& cg : _split_ready_groups) {
|
||||
underlying.emplace_back(cg->make_repaired_sstable_set());
|
||||
}
|
||||
return make_lw_shared(sstables::make_compound_sstable_set(schema, std::move(underlying)));
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstables::sstable_set> table::sstable_set_for_tombstone_gc(const compaction_group& cg) const {
|
||||
auto& sg = storage_group_for_id(cg.group_id());
|
||||
return sg.make_sstable_set();
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstables::sstable_set> table::make_repaired_sstable_set_for_tombstone_gc(const compaction_group& cg) const {
|
||||
auto& sg = storage_group_for_id(cg.group_id());
|
||||
return sg.make_repaired_sstable_set();
|
||||
}
|
||||
|
||||
bool tablet_storage_group_manager::all_storage_groups_split() {
|
||||
auto& tmap = tablet_map();
|
||||
if (_split_ready_seq_number == tmap.resize_decision().sequence_number) {
|
||||
@@ -3000,9 +3024,47 @@ public:
|
||||
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
|
||||
return make_sstable_set_for_this_view(_cg.maintenance_sstables(), [this] { return *_cg.make_maintenance_sstable_set(); });
|
||||
}
|
||||
private:
|
||||
// Returns true when tombstone GC is restricted to the repaired set:
|
||||
// tombstone_gc=repair mode and this view is the repaired view.
|
||||
//
|
||||
// The optimization is safe for materialized view tables as well as base tables.
|
||||
// The key invariant for MV: MV tablet repair calls flush_hints() before
|
||||
// take_storage_snapshot(). flush_hints() creates a sync point that covers BOTH
|
||||
// _hints_manager (base mutations) AND _hints_for_views_manager (view mutations).
|
||||
// It waits until all pending hints — including any D_view hint stored in
|
||||
// _hints_for_views_manager while the target node was down — have been replayed
|
||||
// to the target node. Only then is take_storage_snapshot() called, which flushes
|
||||
// the MV memtable and captures D_view in the repairing sstable. After repair
|
||||
// completes, D_view is in the repaired set.
|
||||
//
|
||||
// If a subsequent base repair later replays a D_base hint that causes another
|
||||
// D_view write (same key and timestamp), it is a no-op duplicate: the original
|
||||
// D_view already in the repaired set still prevents T_mv from being purged.
|
||||
//
|
||||
// USING TIMESTAMP with timestamps predating (gc_before + propagation_delay) is
|
||||
// explicitly UB and excluded from the safety argument.
|
||||
bool is_tombstone_gc_repaired_only() const noexcept {
|
||||
return _cg.is_repaired_view(this) &&
|
||||
_t.schema()->tombstone_gc_options().mode() == tombstone_gc_mode::repair;
|
||||
}
|
||||
public:
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
||||
// Optimization: when tombstone_gc=repair and this is the repaired view, only check
|
||||
// repaired sstables. The repair ordering guarantee ensures that by the time a tombstone
|
||||
// becomes GC-eligible (repair_time committed to Raft), any data it shadows has already
|
||||
// been promoted from repairing to repaired. Unrepaired data always has timestamps newer
|
||||
// than any GC-eligible tombstone (legitimate writes; USING TIMESTAMP abuse is UB).
|
||||
// For all other tombstone_gc modes this invariant does not hold, so we fall through to
|
||||
// the full storage-group set.
|
||||
if (is_tombstone_gc_repaired_only()) {
|
||||
return _t.make_repaired_sstable_set_for_tombstone_gc(_cg);
|
||||
}
|
||||
return _t.sstable_set_for_tombstone_gc(_cg);
|
||||
}
|
||||
bool skip_memtable_for_tombstone_gc() const noexcept override {
|
||||
return is_tombstone_gc_repaired_only();
|
||||
}
|
||||
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
|
||||
return compaction::get_fully_expired_sstables(*this, sstables, query_time);
|
||||
}
|
||||
@@ -5419,6 +5481,21 @@ compaction::compaction_group_view& compaction_group::view_for_unrepaired_data()
|
||||
return *_unrepaired_view;
|
||||
}
|
||||
|
||||
bool compaction_group::is_repaired_view(const compaction::compaction_group_view* v) const noexcept {
|
||||
return v == _repaired_view.get();
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> compaction_group::make_repaired_sstable_set() const {
|
||||
auto set = make_lw_shared<sstables::sstable_set>(make_main_sstable_set());
|
||||
auto sstables_repaired_at = get_sstables_repaired_at();
|
||||
for (auto& sst : *_main_sstables->all()) {
|
||||
if (repair::is_repaired(sstables_repaired_at, sst)) {
|
||||
set->insert(sst);
|
||||
}
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
compaction::compaction_group_view& compaction_group::view_for_sstable(const sstables::shared_sstable& sst) const {
|
||||
switch (_repair_sstable_classifier(sst, get_sstables_repaired_at())) {
|
||||
case repair_sstable_classification::unrepaired: return *_unrepaired_view;
|
||||
|
||||
@@ -493,7 +493,7 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
|
||||
// If partition row limit is applied to paging, we still need to fall back
|
||||
// to filtering the results to avoid extraneous rows on page breaks.
|
||||
if (!filtering_restrictions && cmd->slice.partition_row_limit() < query::max_rows_if_set) {
|
||||
filtering_restrictions = ::make_shared<cql3::restrictions::statement_restrictions>(s, true);
|
||||
filtering_restrictions = cql3::restrictions::make_trivial_statement_restrictions(s, true);
|
||||
}
|
||||
if (filtering_restrictions) {
|
||||
return std::make_unique<filtering_query_pager>(proxy, std::move(s), std::move(selection), state,
|
||||
|
||||
@@ -1342,6 +1342,11 @@ future<bool> storage_service::ongoing_rf_change(const group0_guard& guard, sstri
|
||||
co_return true;
|
||||
}
|
||||
}
|
||||
for (auto request_id : _topology_state_machine._topology.ongoing_rf_changes) {
|
||||
if (co_await ongoing_ks_rf_change(request_id)) {
|
||||
co_return true;
|
||||
}
|
||||
}
|
||||
co_return false;
|
||||
}
|
||||
|
||||
@@ -2426,7 +2431,7 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
|
||||
}
|
||||
|
||||
future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
|
||||
return run_with_no_api_lock([this] (storage_service& ss) {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
const auto& tm = ss.get_token_metadata();
|
||||
auto token_map = dht::token::describe_ownership(tm.sorted_tokens());
|
||||
// describeOwnership returns tokens in an unspecified order, let's re-order them
|
||||
@@ -2434,7 +2439,7 @@ future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
|
||||
for (auto entry : token_map) {
|
||||
locator::host_id id = tm.get_endpoint(entry.first).value();
|
||||
auto token_ownership = entry.second;
|
||||
ownership[_address_map.get(id)] += token_ownership;
|
||||
ownership[ss._address_map.get(id)] += token_ownership;
|
||||
}
|
||||
return ownership;
|
||||
});
|
||||
@@ -2843,12 +2848,8 @@ future<> storage_service::raft_removenode(locator::host_id host_id, locator::hos
|
||||
}
|
||||
|
||||
future<> storage_service::mark_excluded(const std::vector<locator::host_id>& hosts) {
|
||||
if (this_shard_id() != 0) {
|
||||
// group0 is only set on shard 0.
|
||||
co_return co_await container().invoke_on(0, [&] (auto& ss) {
|
||||
return ss.mark_excluded(hosts);
|
||||
});
|
||||
}
|
||||
// Callers forward to shard 0 via run_with_no_api_lock (group0 is only set on shard 0).
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
@@ -3093,8 +3094,8 @@ future<sstring> storage_service::wait_for_topology_request_completion(utils::UUI
|
||||
}
|
||||
|
||||
future<> storage_service::abort_topology_request(utils::UUID request_id) {
|
||||
co_await container().invoke_on(0, [request_id, this] (storage_service& ss) {
|
||||
return _topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id);
|
||||
co_await container().invoke_on(0, [request_id] (storage_service& ss) {
|
||||
return ss._topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3107,13 +3108,13 @@ future<> storage_service::wait_for_topology_not_busy() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::abort_paused_rf_change(utils::UUID request_id) {
|
||||
future<> storage_service::abort_rf_change(utils::UUID request_id) {
|
||||
auto holder = _async_gate.hold();
|
||||
|
||||
if (this_shard_id() != 0) {
|
||||
// group0 is only set on shard 0.
|
||||
co_return co_await container().invoke_on(0, [&] (auto& ss) {
|
||||
return ss.abort_paused_rf_change(request_id);
|
||||
return ss.abort_rf_change(request_id);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3124,20 +3125,81 @@ future<> storage_service::abort_paused_rf_change(utils::UUID request_id) {
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
bool found = std::ranges::contains(_topology_state_machine._topology.paused_rf_change_requests, request_id);
|
||||
if (!found) {
|
||||
slogger.warn("RF change request with id '{}' is not paused, so it can't be aborted", request_id);
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
if (std::ranges::contains(_topology_state_machine._topology.paused_rf_change_requests, request_id)) { // keyspace_rf_change_kind::conversion_to_rack_list
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.resume_rf_change_request(_topology_state_machine._topology.paused_rf_change_requests, request_id).build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(request_id)
|
||||
.done("Aborted by user request")
|
||||
.build()));
|
||||
} else if (std::ranges::contains(_topology_state_machine._topology.ongoing_rf_changes, request_id)) { // keyspace_rf_change_kind::multi_rf_change
|
||||
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
|
||||
if (!req_entry.error.empty()) {
|
||||
slogger.warn("RF change request with id '{}' was already aborted", request_id);
|
||||
co_return;
|
||||
}
|
||||
sstring ks_name = *req_entry.new_keyspace_rf_change_ks_name;
|
||||
if (!_db.local().has_keyspace(ks_name)) {
|
||||
co_return;
|
||||
}
|
||||
auto& ks = _db.local().find_keyspace(ks_name);
|
||||
// Check the tablet maps: if any tablet still has a missing replica
|
||||
// (i.e., needs extending), we can abort. Otherwise, we're in the
|
||||
// replica removal phase and aborting would require a rollback.
|
||||
auto next_replication = ks.metadata()->next_strategy_options_opt().value()
|
||||
| std::views::transform([] (const auto& pair) {
|
||||
return std::make_pair(pair.first, std::get<locator::rack_list>(pair.second));
|
||||
}) | std::ranges::to<std::unordered_map<sstring, std::vector<sstring>>>();
|
||||
|
||||
const auto& tm = *get_token_metadata_ptr();
|
||||
bool has_missing_replica = false;
|
||||
auto all_tables = ks.metadata()->tables();
|
||||
auto all_views = ks.metadata()->views()
|
||||
| std::views::transform([] (const auto& view) { return schema_ptr(view); })
|
||||
| std::ranges::to<std::vector<schema_ptr>>();
|
||||
all_tables.insert(all_tables.end(), all_views.begin(), all_views.end());
|
||||
for (const auto& table : all_tables) {
|
||||
if (!tm.tablets().has_tablet_map(table->id()) || !tm.tablets().is_base_table(table->id())) {
|
||||
continue;
|
||||
}
|
||||
const auto& tmap = tm.tablets().get_tablet_map(table->id());
|
||||
for (const auto& ti : tmap.tablets()) {
|
||||
std::unordered_map<sstring, std::vector<sstring>> dc_to_racks;
|
||||
for (const auto& r : ti.replicas) {
|
||||
const auto& node_dc_rack = tm.get_topology().get_node(r.host).dc_rack();
|
||||
dc_to_racks[node_dc_rack.dc].push_back(node_dc_rack.rack);
|
||||
}
|
||||
auto diff = subtract_replication(next_replication, dc_to_racks);
|
||||
if (!diff.empty()) {
|
||||
has_missing_replica = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (has_missing_replica) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (has_missing_replica) {
|
||||
auto ks_md = make_lw_shared<data_dictionary::keyspace_metadata>(*ks.metadata());
|
||||
ks_md->set_next_strategy_options(ks_md->strategy_options());
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db.local(), ks_md, guard.write_timestamp());
|
||||
for (auto& m : schema_muts) {
|
||||
updates.push_back(canonical_mutation(m));
|
||||
}
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(request_id)
|
||||
.abort("Aborted by user request")
|
||||
.build()));
|
||||
} else {
|
||||
slogger.warn("RF change request with id '{}' is ongoing, but it started removing replicas, so it can't be aborted", request_id);
|
||||
co_return;
|
||||
}
|
||||
} else {
|
||||
slogger.warn("RF change request with id '{}' can't be aborted", request_id);
|
||||
co_return;
|
||||
}
|
||||
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.resume_rf_change_request(_topology_state_machine._topology.paused_rf_change_requests, request_id).build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(request_id)
|
||||
.done("Aborted by user request")
|
||||
.build()));
|
||||
|
||||
topology_change change{std::move(updates)};
|
||||
mixed_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
format("aborting rf change request {}", request_id));
|
||||
|
||||
@@ -3895,11 +3957,8 @@ future<> storage_service::update_tablet_metadata(const locator::tablet_metadata_
|
||||
}
|
||||
|
||||
future<> storage_service::prepare_for_tablets_migration(const sstring& ks_name) {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return co_await container().invoke_on(0, [&] (auto& ss) {
|
||||
return ss.prepare_for_tablets_migration(ks_name);
|
||||
});
|
||||
}
|
||||
// Called via run_with_no_api_lock (forwards to shard 0).
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
@@ -4039,11 +4098,8 @@ future<> storage_service::prepare_for_tablets_migration(const sstring& ks_name)
|
||||
}
|
||||
|
||||
future<> storage_service::set_node_intended_storage_mode(intended_storage_mode mode) {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return co_await container().invoke_on(0, [mode] (auto& ss) {
|
||||
return ss.set_node_intended_storage_mode(mode);
|
||||
});
|
||||
}
|
||||
// Called via run_with_no_api_lock (forwards to shard 0).
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
@@ -4139,11 +4195,8 @@ storage_service::migration_status storage_service::get_tablets_migration_status(
|
||||
}
|
||||
|
||||
future<storage_service::keyspace_migration_status> storage_service::get_tablets_migration_status_with_node_details(const sstring& ks_name) {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) {
|
||||
return ss.get_tablets_migration_status_with_node_details(ks_name);
|
||||
});
|
||||
}
|
||||
// Called via run_with_no_api_lock (forwards to shard 0).
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
keyspace_migration_status result;
|
||||
result.keyspace = ks_name;
|
||||
@@ -4204,11 +4257,8 @@ future<storage_service::keyspace_migration_status> storage_service::get_tablets_
|
||||
}
|
||||
|
||||
future<> storage_service::finalize_tablets_migration(const sstring& ks_name) {
|
||||
if (this_shard_id() != 0) {
|
||||
co_return co_await container().invoke_on(0, [&ks_name] (auto& ss) {
|
||||
return ss.finalize_tablets_migration(ks_name);
|
||||
});
|
||||
}
|
||||
// Called via run_with_no_api_lock (forwards to shard 0).
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
slogger.info("Finalizing vnodes-to-tablets migration for keyspace '{}'", ks_name);
|
||||
|
||||
|
||||
@@ -780,13 +780,19 @@ private:
|
||||
*/
|
||||
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream_by_keyspace);
|
||||
|
||||
// REST handlers are gated at the registration site (see gated() in
|
||||
// api/storage_service.cc) so stop() drains in-flight requests before
|
||||
// teardown. run_with_api_lock_internal and run_with_no_api_lock hold
|
||||
// _async_gate on shard 0 as well, because REST requests arriving on
|
||||
// any shard are forwarded there for execution.
|
||||
template <typename Func>
|
||||
auto run_with_api_lock_internal(storage_service& ss, Func&& func, sstring& operation) {
|
||||
auto holder = ss._async_gate.hold();
|
||||
if (!ss._operation_in_progress.empty()) {
|
||||
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
|
||||
}
|
||||
ss._operation_in_progress = std::move(operation);
|
||||
return func(ss).finally([&ss] {
|
||||
return func(ss).finally([&ss, holder = std::move(holder)] {
|
||||
ss._operation_in_progress = sstring();
|
||||
});
|
||||
}
|
||||
@@ -794,6 +800,10 @@ private:
|
||||
public:
|
||||
int32_t get_exception_count();
|
||||
|
||||
auto hold_async_gate() {
|
||||
return _async_gate.hold();
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
auto run_with_api_lock(sstring operation, Func&& func) {
|
||||
return container().invoke_on(0, [operation = std::move(operation),
|
||||
@@ -804,8 +814,10 @@ public:
|
||||
|
||||
template <typename Func>
|
||||
auto run_with_no_api_lock(Func&& func) {
|
||||
return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
|
||||
return func(ss);
|
||||
return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable
|
||||
-> futurize_t<std::invoke_result_t<Func, storage_service&>> {
|
||||
auto holder = ss._async_gate.hold();
|
||||
co_return co_await futurize_invoke(func, ss);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -975,7 +987,7 @@ public:
|
||||
|
||||
future<> wait_for_topology_not_busy();
|
||||
|
||||
future<> abort_paused_rf_change(utils::UUID request_id);
|
||||
future<> abort_rf_change(utils::UUID request_id);
|
||||
|
||||
private:
|
||||
semaphore _do_sample_sstables_concurrency_limiter{1};
|
||||
|
||||
@@ -154,7 +154,7 @@ auto coordinator::create_operation_ctx(const schema& schema, const dht::token& t
|
||||
co_await utils::get_local_injector().inject("sc_coordinator_wait_before_acquire_server",
|
||||
utils::wait_for_message(5min));
|
||||
|
||||
auto raft_server = co_await _groups_manager.acquire_server(raft_info.group_id, as);
|
||||
auto raft_server = co_await _groups_manager.acquire_server(schema.id(), raft_info.group_id, as);
|
||||
|
||||
co_return operation_ctx {
|
||||
.erm = std::move(erm),
|
||||
|
||||
@@ -332,11 +332,27 @@ void groups_manager::update(token_metadata_ptr new_tm) {
|
||||
schedule_raft_groups_deletion(false);
|
||||
}
|
||||
|
||||
future<raft_server> groups_manager::acquire_server(raft::group_id group_id, abort_source& as) {
|
||||
future<raft_server> groups_manager::acquire_server(table_id table_id, raft::group_id group_id, abort_source& as) {
|
||||
if (!_features.strongly_consistent_tables) {
|
||||
on_internal_error(logger, "strongly consistent tables are not enabled on this shard");
|
||||
}
|
||||
|
||||
// A concurrent DROP TABLE may have already removed the table from database
|
||||
// registries and erased the raft group from _raft_groups via
|
||||
// schedule_raft_group_deletion. The schema.table() in create_operation_ctx()
|
||||
// might not fail though in this case because someone might be holding
|
||||
// lw_shared_ptr<table>, so that the table is dropped but the table object
|
||||
// is still alive.
|
||||
//
|
||||
// Check that the table still exists in the database to turn the
|
||||
// fatal on_internal_error below into a clean no_such_column_family
|
||||
// exception.
|
||||
//
|
||||
// When the table does exist, we proceed to acquire state.gate->hold().
|
||||
// This prevents schedule_raft_group_deletion (which co_awaits gate::close)
|
||||
// from erasing the group until the DML operation completes.
|
||||
_db.find_column_family(table_id);
|
||||
|
||||
const auto it = _raft_groups.find(group_id);
|
||||
if (it == _raft_groups.end()) {
|
||||
on_internal_error(logger, format("raft group {} not found", group_id));
|
||||
|
||||
@@ -110,7 +110,7 @@ public:
|
||||
void update(locator::token_metadata_ptr new_tm);
|
||||
|
||||
// The raft_server instance is used to submit write commands and perform read_barrier() before reads.
|
||||
future<raft_server> acquire_server(raft::group_id group_id, abort_source& as);
|
||||
future<raft_server> acquire_server(table_id table_id, raft::group_id group_id, abort_source& as);
|
||||
|
||||
// Called during node boot. Waits for all raft::server instances corresponding
|
||||
// to the latest group0 state to start.
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include <ranges>
|
||||
#include <utility>
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/switch_to.hh>
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
@@ -533,6 +534,38 @@ struct hash<migration_tablet_set> {
|
||||
|
||||
namespace service {
|
||||
|
||||
// Subtract right from left. The result contains only keys from left.
|
||||
std::unordered_map<sstring, std::vector<sstring>> subtract_replication(const std::unordered_map<sstring, std::vector<sstring>>& left, const std::unordered_map<sstring, std::vector<sstring>>& right) {
|
||||
std::unordered_map<sstring, std::vector<sstring>> res;
|
||||
for (const auto& [dc, rf_value] : left) {
|
||||
auto it = right.find(dc);
|
||||
if (it == right.end()) {
|
||||
res[dc] = rf_value;
|
||||
} else {
|
||||
std::vector<sstring> diff = rf_value | std::views::filter([&] (const sstring& rack) {
|
||||
return std::find(it->second.begin(), it->second.end(), rack) == it->second.end();
|
||||
}) | std::ranges::to<std::vector<sstring>>();
|
||||
if (!diff.empty()) {
|
||||
res[dc] = diff;
|
||||
}
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
bool rf_count_per_dc_equals(const locator::replication_strategy_config_options& current, const locator::replication_strategy_config_options& next) {
|
||||
if (current.size() != next.size()) {
|
||||
return false;
|
||||
}
|
||||
for (const auto& [dc, current_rf_value] : current) {
|
||||
auto it = next.find(dc);
|
||||
if (it == next.end() || get_replication_factor(it->second) != get_replication_factor(current_rf_value)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// The algorithm aims to equalize tablet count on each shard.
|
||||
/// This goal is based on the assumption that every shard has similar processing power and space capacity,
|
||||
/// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we
|
||||
@@ -1050,17 +1083,22 @@ public:
|
||||
return _topology != nullptr && _sys_ks != nullptr && !_topology->paused_rf_change_requests.empty();
|
||||
}
|
||||
|
||||
bool ongoing_rf_change() const {
|
||||
return _topology != nullptr && _sys_ks != nullptr && !_topology->ongoing_rf_changes.empty();
|
||||
}
|
||||
|
||||
future<migration_plan> make_plan() {
|
||||
const locator::topology& topo = _tm->get_topology();
|
||||
migration_plan plan;
|
||||
|
||||
auto rack_list_colocation = ongoing_rack_list_colocation();
|
||||
auto rf_change_prep = co_await prepare_per_rack_rf_change_plan(plan);
|
||||
|
||||
// Prepare plans for each DC separately and combine them to be executed in parallel.
|
||||
for (auto&& dc : topo.get_datacenters()) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation || !rf_change_prep.actions.empty()) {
|
||||
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
||||
auto rack_plan = co_await make_plan(dc, rack);
|
||||
auto rack_plan = co_await make_plan(dc, rack, rf_change_prep.actions[{dc, rack}]);
|
||||
auto level = rack_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
||||
lblogger.log(level, "Plan for {}/{}: {}", dc, rack, plan_summary(rack_plan));
|
||||
plan.merge(std::move(rack_plan));
|
||||
@@ -1450,6 +1488,387 @@ public:
|
||||
co_return std::move(plan);
|
||||
}
|
||||
|
||||
enum class rf_change_state {
|
||||
ready, // RF change is ready (succeed or failed).
|
||||
needs_extending,
|
||||
needs_shrinking,
|
||||
};
|
||||
|
||||
using process_views = bool_class<struct process_views_tag>;
|
||||
struct rf_change_action {
|
||||
sstring keyspace;
|
||||
rf_change_state state;
|
||||
process_views pv = process_views::no;
|
||||
};
|
||||
using rf_change_actions = std::unordered_map<locator::endpoint_dc_rack, std::vector<rf_change_action>>;
|
||||
struct rf_change_preparation {
|
||||
rf_change_actions actions;
|
||||
};
|
||||
|
||||
// Determines which dc+rack combinations need RF change actions for a given keyspace,
|
||||
// by comparing current tablet replicas against the target replication configuration.
|
||||
// Scans in priority order: extend tables, extend views, shrink views, shrink tables.
|
||||
// Returns the first non-empty set of per-rack actions; colocated tables are skipped.
|
||||
// An empty result means all tablets already match the target configuration.
|
||||
future<rf_change_preparation> determine_rf_change_actions_per_rack(const sstring& ks_name, const std::vector<schema_ptr>& tables, const std::vector<schema_ptr>& views, const locator::replication_strategy_config_options& next) {
|
||||
auto add_entry = [&ks_name] (rf_change_preparation& prep, const sstring& dc, const sstring& rack, rf_change_state state, process_views pv) {
|
||||
locator::endpoint_dc_rack key{dc, rack};
|
||||
auto& actions = prep.actions[key];
|
||||
if (std::none_of(actions.begin(), actions.end(), [&](const rf_change_action& a) { return a.keyspace == ks_name; })) {
|
||||
actions.push_back(rf_change_action{.keyspace = ks_name, .state = state, .pv = pv});
|
||||
}
|
||||
};
|
||||
|
||||
auto next_replication = next | std::views::transform([] (const auto& pair) {
|
||||
return std::make_pair(pair.first, std::get<rack_list>(pair.second));
|
||||
}) | std::ranges::to<std::unordered_map<sstring, std::vector<sstring>>>();
|
||||
|
||||
auto scan_tables = [&] (const std::vector<schema_ptr>& table_list, rf_change_state target_state, process_views pv) -> future<rf_change_preparation> {
|
||||
rf_change_preparation prep;
|
||||
for (const auto& table : table_list) {
|
||||
if (!_tm->tablets().is_base_table(table->id())) {
|
||||
continue;
|
||||
}
|
||||
const auto& tmap = _tm->tablets().get_tablet_map(table->id());
|
||||
for (const tablet_info& ti : tmap.tablets()) {
|
||||
std::unordered_map<sstring, std::vector<sstring>> dc_to_racks;
|
||||
for (const auto& r : ti.replicas) {
|
||||
const auto& node_dc_rack = _tm->get_topology().get_node(r.host).dc_rack();
|
||||
dc_to_racks[node_dc_rack.dc].push_back(node_dc_rack.rack);
|
||||
}
|
||||
|
||||
auto diff = (target_state == rf_change_state::needs_extending ?
|
||||
subtract_replication(next_replication, dc_to_racks) : subtract_replication(dc_to_racks, next_replication))
|
||||
| std::views::filter([] (const auto& pair) {
|
||||
return !pair.second.empty();
|
||||
}
|
||||
) | std::ranges::to<std::unordered_map<sstring, std::vector<sstring>>>();
|
||||
|
||||
for (const auto& [dc, racks] : diff) {
|
||||
for (const auto& rack : racks) {
|
||||
add_entry(prep, dc, rack, target_state, pv);
|
||||
}
|
||||
}
|
||||
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return prep;
|
||||
};
|
||||
|
||||
// Extend base tables.
|
||||
if (auto prep = co_await scan_tables(tables, rf_change_state::needs_extending, process_views::no); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
if (utils::get_local_injector().enter("determine_rf_change_actions_per_rack_throw")) {
|
||||
lblogger.info("determine_rf_change_actions_per_rack_throw: entered");
|
||||
throw std::runtime_error("determine_rf_change_actions_per_rack_throw injection");
|
||||
}
|
||||
|
||||
// Extend views.
|
||||
if (auto prep = co_await scan_tables(views, rf_change_state::needs_extending, process_views::yes); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
// Shrink views.
|
||||
if (auto prep = co_await scan_tables(views, rf_change_state::needs_shrinking, process_views::yes); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
// Shrink base tables.
|
||||
if (auto prep = co_await scan_tables(tables, rf_change_state::needs_shrinking, process_views::no); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
co_return rf_change_preparation{};
|
||||
}
|
||||
|
||||
future<rf_change_preparation> prepare_per_rack_rf_change_plan(migration_plan& mplan) {
|
||||
lblogger.debug("In prepare_per_rack_rf_change_plan");
|
||||
|
||||
rf_change_preparation res;
|
||||
keyspace_rf_change_plan plan;
|
||||
if (!ongoing_rf_change()) {
|
||||
co_return res;
|
||||
}
|
||||
|
||||
for (const auto& request_id : _topology->ongoing_rf_changes) {
|
||||
auto req_entry = co_await _sys_ks->get_topology_request_entry(request_id);
|
||||
sstring ks_name = *req_entry.new_keyspace_rf_change_ks_name;
|
||||
|
||||
if (!_db.has_keyspace(ks_name)) {
|
||||
if (!plan.completion) {
|
||||
plan.completion = rf_change_completion_info{
|
||||
.request_id = request_id,
|
||||
.ks_name = ks_name,
|
||||
.error = format("Keyspace {} not found", ks_name),
|
||||
.saved_ks_props = req_entry.new_keyspace_rf_change_data.value(),
|
||||
};
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
if (!ks.metadata()->next_strategy_options_opt()) {
|
||||
on_internal_error(lblogger, format("There is an ongoing rf change request {} for keyspace {}, "
|
||||
"but the keyspace does not have next replication settings", request_id, ks_name));
|
||||
}
|
||||
|
||||
auto tables = ks.metadata()->tables();
|
||||
auto views = ks.metadata()->views() | std::views::transform([] (const auto& view) { return schema_ptr(view); }) | std::ranges::to<std::vector<schema_ptr>>();
|
||||
auto rf_change_prep = co_await determine_rf_change_actions_per_rack(ks_name, tables, views, *ks.metadata()->next_strategy_options_opt());
|
||||
if (rf_change_prep.actions.empty()) {
|
||||
if (!plan.completion) {
|
||||
plan.completion = rf_change_completion_info{
|
||||
.request_id = request_id,
|
||||
.ks_name = ks_name,
|
||||
.error = req_entry.error,
|
||||
.saved_ks_props = req_entry.new_keyspace_rf_change_data.value()
|
||||
};
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if any extending action targets a dc+rack with no available nodes.
|
||||
// If so, the RF change can never complete and should be aborted.
|
||||
sstring error_msg = "";
|
||||
const auto& topo = _tm->get_topology();
|
||||
const auto& dc_rack_nodes = topo.get_datacenter_rack_nodes();
|
||||
for (const auto& [dc_rack, actions] : rf_change_prep.actions) {
|
||||
bool needs_extending = std::ranges::any_of(actions, [] (const rf_change_action& a) {
|
||||
return a.state == rf_change_state::needs_extending;
|
||||
});
|
||||
if (!needs_extending) {
|
||||
break;
|
||||
}
|
||||
bool has_live_node = false;
|
||||
bool has_down_node = false;
|
||||
auto dc_it = dc_rack_nodes.find(dc_rack.dc);
|
||||
if (dc_it != dc_rack_nodes.end()) {
|
||||
auto rack_it = dc_it->second.find(dc_rack.rack);
|
||||
if (rack_it != dc_it->second.end()) {
|
||||
for (const auto& node_ref : rack_it->second) {
|
||||
const auto& node = node_ref.get();
|
||||
if (_skiplist.contains(node.host_id())) {
|
||||
has_down_node = true;
|
||||
break;
|
||||
}
|
||||
if (!node.is_excluded()) {
|
||||
has_live_node = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (has_down_node) {
|
||||
lblogger.warn("RF change for keyspace {} requires extending to {}/{} but there are down nodes there; aborting",
|
||||
ks_name, dc_rack.dc, dc_rack.rack);
|
||||
error_msg = format("RF change aborted: there are down nodes in required rack {}/{}", dc_rack.dc, dc_rack.rack);
|
||||
break;
|
||||
}
|
||||
if (!has_live_node) {
|
||||
lblogger.warn("RF change for keyspace {} requires extending to {}/{} but no available nodes exist there; aborting",
|
||||
ks_name, dc_rack.dc, dc_rack.rack);
|
||||
error_msg = format("RF change aborted: no available nodes in required rack {}/{}", dc_rack.dc, dc_rack.rack);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!error_msg.empty()) {
|
||||
plan.aborts.push_back(rf_change_abort_info{
|
||||
.request_id = request_id,
|
||||
.ks_name = ks_name,
|
||||
.error = error_msg,
|
||||
.current_replication = ks.metadata()->strategy_options(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto& [dc_rack, actions] : rf_change_prep.actions) {
|
||||
auto& dst = res.actions[dc_rack];
|
||||
dst.insert(dst.end(), std::make_move_iterator(actions.begin()), std::make_move_iterator(actions.end()));
|
||||
}
|
||||
}
|
||||
mplan.set_rf_change_plan(std::move(plan));
|
||||
co_return res;
|
||||
}
|
||||
|
||||
future<migration_plan> make_rf_change_plan(node_load_map& nodes, std::vector<rf_change_action> actions, sstring dc, sstring rack) {
|
||||
lblogger.debug("In make_rf_change_plan");
|
||||
|
||||
migration_plan mplan;
|
||||
keyspace_rf_change_plan plan;
|
||||
|
||||
auto nodes_by_load_dst = nodes | std::views::filter([&] (const auto& host_load) {
|
||||
auto& [host, load] = host_load;
|
||||
auto& node = *load.node;
|
||||
return node.dc_rack().dc == dc && node.dc_rack().rack == rack;
|
||||
}) | std::views::keys | std::ranges::to<std::vector<host_id>>();
|
||||
|
||||
bool has_extending = std::ranges::any_of(actions, [] (const rf_change_action& a) {
|
||||
return a.state == rf_change_state::needs_extending;
|
||||
});
|
||||
if (has_extending) {
|
||||
// Check that all normal, non-excluded nodes in the target dc/rack are present in the
|
||||
// balanced node set. If any such node is missing, extending cannot safely proceed.
|
||||
const auto& topo = _tm->get_topology();
|
||||
const auto& dc_rack_nodes = topo.get_datacenter_rack_nodes();
|
||||
bool missing_node = false;
|
||||
auto dc_it = dc_rack_nodes.find(dc);
|
||||
if (dc_it != dc_rack_nodes.end()) {
|
||||
auto rack_it = dc_it->second.find(rack);
|
||||
if (rack_it != dc_it->second.end()) {
|
||||
for (const auto& node_ref : rack_it->second) {
|
||||
const auto& node = node_ref.get();
|
||||
if (node.is_normal() && !node.is_excluded() && !nodes.contains(node.host_id())) {
|
||||
missing_node = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (missing_node || nodes_by_load_dst.empty()) {
|
||||
lblogger.warn("Not all non-excluded nodes are available for RF change extending plan in dc {}, rack {}", dc, rack);
|
||||
// Filter out extending actions since not all nodes are available.
|
||||
// Shrinking actions can still proceed without target nodes.
|
||||
std::erase_if(actions, [] (const rf_change_action& a) {
|
||||
return a.state == rf_change_state::needs_extending;
|
||||
});
|
||||
if (actions.empty()) {
|
||||
co_return mplan;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto nodes_cmp = nodes_by_load_cmp(nodes);
|
||||
auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) {
|
||||
return nodes_cmp(b, a);
|
||||
};
|
||||
|
||||
// Ascending load heap of candidate target nodes.
|
||||
std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
|
||||
const locator::topology& topo = _tm->get_topology();
|
||||
locator::endpoint_dc_rack location{dc, rack};
|
||||
|
||||
for (const auto& action : actions) {
|
||||
const auto& ks_name = action.keyspace;
|
||||
const auto& rf_change_state = action.state;
|
||||
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
auto table_list = action.pv
|
||||
? ks.metadata()->views() | std::views::transform([] (const auto& view) { return schema_ptr(view); }) | std::ranges::to<std::vector<schema_ptr>>()
|
||||
: ks.metadata()->tables();
|
||||
for (const auto& table_or_mv : table_list) {
|
||||
const auto& tmap = _tm->tablets().get_tablet_map(table_or_mv->id());
|
||||
co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) -> future<> {
|
||||
if (!_tm->tablets().is_base_table(table_or_mv->id())) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto gid = locator::global_tablet_id{table_or_mv->id(), tid};
|
||||
|
||||
auto it = std::find_if(ti.replicas.begin(), ti.replicas.end(), [&] (const tablet_replica& r) {
|
||||
return topo.get_node(r.host).dc_rack() == location;
|
||||
});
|
||||
|
||||
auto replica = it != ti.replicas.end() ? std::optional<tablet_replica>{*it} : std::nullopt;
|
||||
|
||||
auto* tti = tmap.get_tablet_transition_info(tid);
|
||||
bool pending_replica_in_this_rack = false;
|
||||
bool leaving_replica_in_this_rack = false;
|
||||
if (tti) {
|
||||
auto leaving_replica = get_leaving_replica(ti, *tti);
|
||||
leaving_replica_in_this_rack = leaving_replica.has_value() && topo.get_node(leaving_replica->host).dc_rack() == location;
|
||||
pending_replica_in_this_rack = tti->pending_replica.has_value() && topo.get_node(tti->pending_replica->host).dc_rack() == location;
|
||||
}
|
||||
|
||||
if ((rf_change_state == rf_change_state::needs_extending && (replica && !leaving_replica_in_this_rack)) ||
|
||||
(rf_change_state == rf_change_state::needs_shrinking && (!replica && !pending_replica_in_this_rack))) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Skip tablet that is in transitions.
|
||||
if (tti) {
|
||||
lblogger.debug("Skipped rf change extending for tablet={} which is already in transition={} stage={}", gid, tti->transition, tti->stage);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Skip tablet that is about to be in transition.
|
||||
if (_scheduled_tablets.contains(gid)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
migration_tablet_set source_tablets {
|
||||
.tablet_s = gid, // Ignore the merge co-location.
|
||||
};
|
||||
if (rf_change_state == rf_change_state::needs_extending) {
|
||||
// Pick the least loaded node as target.
|
||||
std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
auto target = nodes_by_load_dst.back();
|
||||
|
||||
lblogger.debug("target node: {}, avg_load={}", target, nodes[target].avg_load);
|
||||
|
||||
auto dst = global_shard_id {target, _load_sketch->get_least_loaded_shard(target)};
|
||||
|
||||
lblogger.trace("target shard: {}, tablets={}, load={}", dst.shard,
|
||||
nodes[target].shards[dst.shard].tablet_count,
|
||||
nodes[target].shard_load(dst.shard, _target_tablet_size));
|
||||
|
||||
tablet_replica pending_replica{
|
||||
.host = target,
|
||||
.shard = dst.shard,
|
||||
};
|
||||
auto next = ti.replicas;
|
||||
next.push_back(pending_replica);
|
||||
tablet_migration_info mig{
|
||||
.kind = locator::tablet_transition_kind::rebuild_v2,
|
||||
.tablet = gid,
|
||||
.src = std::nullopt,
|
||||
.dst = pending_replica,
|
||||
};
|
||||
auto mig_streaming_info = get_migration_streaming_info(topo, ti, mig);
|
||||
pick(*_load_sketch, dst.host, dst.shard, source_tablets);
|
||||
if (can_accept_load(nodes, mig_streaming_info)) {
|
||||
lblogger.debug("Starting rebuild_v2 transition to {}.{} of tablet {}; new_replica = {}", dc, rack, gid, pending_replica);
|
||||
apply_load(nodes, mig_streaming_info);
|
||||
mark_as_scheduled(mig);
|
||||
mplan.add(std::move(mig));
|
||||
}
|
||||
increase_node_load(nodes, dst, source_tablets);
|
||||
std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
} else {
|
||||
auto next = ti.replicas | std::views::filter([&] (const tablet_replica& r) {
|
||||
return r != *replica;
|
||||
}) | std::ranges::to<tablet_replica_set>();
|
||||
tablet_migration_info mig{
|
||||
.kind = locator::tablet_transition_kind::rebuild_v2,
|
||||
.tablet = gid,
|
||||
.src = *replica,
|
||||
.dst = std::nullopt,
|
||||
};
|
||||
auto mig_streaming_info = get_migration_streaming_info(topo, ti, mig);
|
||||
// The node being shrunk may be excluded/down and lack complete tablet stats.
|
||||
// Since we're removing a replica (not placing one), accurate load data isn't needed.
|
||||
if (_load_sketch->has_node(replica->host)) {
|
||||
unload(*_load_sketch, replica->host, replica->shard, source_tablets);
|
||||
}
|
||||
if (can_accept_load(nodes, mig_streaming_info)) {
|
||||
apply_load(nodes, mig_streaming_info);
|
||||
mark_as_scheduled(mig);
|
||||
mplan.add(std::move(mig));
|
||||
}
|
||||
if (nodes.contains(replica->host)) {
|
||||
decrease_node_load(nodes, *replica, source_tablets);
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
}
|
||||
mplan.set_rf_change_plan(std::move(plan));
|
||||
co_return mplan;
|
||||
}
|
||||
|
||||
// Returns true if a table has replicas of all its sibling tablets co-located.
|
||||
// This is used for determining whether merge can be finalized, since co-location
|
||||
// is a strict requirement for sibling tablets to be merged.
|
||||
@@ -2658,14 +3077,13 @@ public:
|
||||
src_shard.dusage->used -= tablet_sizes;
|
||||
}
|
||||
|
||||
// Adjusts the load of the source and destination (host:shard) that were picked for the migration.
|
||||
void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, const migration_tablet_set& tablet_set) {
|
||||
void increase_node_load(node_load_map& nodes, tablet_replica replica, const migration_tablet_set& tablet_set) {
|
||||
auto tablet_count = tablet_set.tablets().size();
|
||||
auto tablet_sizes = tablet_set.tablet_set_disk_size;
|
||||
auto table = tablet_set.tablets().front().table;
|
||||
|
||||
auto& dst_node = nodes[dst.host];
|
||||
auto& dst_shard = dst_node.shards[dst.shard];
|
||||
auto& dst_node = nodes[replica.host];
|
||||
auto& dst_shard = dst_node.shards[replica.shard];
|
||||
dst_shard.tablet_count += tablet_count;
|
||||
dst_shard.tablet_count_per_table[table] += tablet_count;
|
||||
dst_shard.tablet_sizes_per_table[table] += tablet_sizes;
|
||||
@@ -2675,9 +3093,15 @@ public:
|
||||
dst_node.tablet_count += tablet_count;
|
||||
dst_node.dusage->used += tablet_sizes;
|
||||
dst_node.update();
|
||||
}
|
||||
|
||||
auto& src_node = nodes[src.host];
|
||||
auto& src_shard = src_node.shards[src.shard];
|
||||
void decrease_node_load(node_load_map& nodes, tablet_replica replica, const migration_tablet_set& tablet_set) {
|
||||
auto tablet_count = tablet_set.tablets().size();
|
||||
auto tablet_sizes = tablet_set.tablet_set_disk_size;
|
||||
auto table = tablet_set.tablets().front().table;
|
||||
|
||||
auto& src_node = nodes[replica.host];
|
||||
auto& src_shard = src_node.shards[replica.shard];
|
||||
src_shard.tablet_count -= tablet_count;
|
||||
src_shard.tablet_count_per_table[table] -= tablet_count;
|
||||
src_shard.tablet_sizes_per_table[table] -= tablet_sizes;
|
||||
@@ -2693,6 +3117,12 @@ public:
|
||||
src_node.update();
|
||||
}
|
||||
|
||||
// Adjusts the load of the source and destination (host:shard) that were picked for the migration.
|
||||
void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, const migration_tablet_set& tablet_set) {
|
||||
increase_node_load(nodes, dst, tablet_set);
|
||||
decrease_node_load(nodes, src, tablet_set);
|
||||
}
|
||||
|
||||
static void unload(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) {
|
||||
sketch.unload(host, shard, tablet_set.tablets().size(), tablet_set.tablet_set_disk_size);
|
||||
}
|
||||
@@ -3643,7 +4073,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<migration_plan> make_plan(dc_name dc, std::optional<sstring> rack = std::nullopt) {
|
||||
future<migration_plan> make_plan(dc_name dc, std::optional<sstring> rack = std::nullopt, std::vector<rf_change_action> rf_change_actions = {}) {
|
||||
migration_plan plan;
|
||||
|
||||
if (utils::get_local_injector().enter("tablet_migration_bypass")) {
|
||||
@@ -3761,12 +4191,6 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
if (nodes.empty()) {
|
||||
lblogger.debug("No nodes to balance.");
|
||||
_current_stats->stop_balance++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
// Detect finished drain.
|
||||
|
||||
for (auto i = nodes_to_drain.begin(); i != nodes_to_drain.end();) {
|
||||
@@ -3841,7 +4265,6 @@ public:
|
||||
}
|
||||
lblogger.debug("No candidate nodes");
|
||||
_current_stats->stop_no_candidates++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
|
||||
@@ -4003,7 +4426,7 @@ public:
|
||||
|
||||
print_node_stats(nodes, only_active::yes);
|
||||
|
||||
if (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) {
|
||||
if (has_dest_nodes && (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) && !nodes.empty()) {
|
||||
host_id target = *min_load_node;
|
||||
lblogger.info("target node: {}, avg_load: {}, max: {}", target, min_load, max_load);
|
||||
plan.merge(co_await make_internode_plan(nodes, nodes_to_drain, target));
|
||||
@@ -4015,6 +4438,10 @@ public:
|
||||
plan.merge(co_await make_intranode_plan(nodes, nodes_to_drain));
|
||||
}
|
||||
|
||||
if (!rf_change_actions.empty() && rack.has_value()) {
|
||||
plan.merge(co_await make_rf_change_plan(nodes, rf_change_actions, dc, rack.value()));
|
||||
}
|
||||
|
||||
if (_tm->tablets().balancing_enabled() && plan.empty() && !ongoing_rack_list_colocation()) {
|
||||
auto dc_merge_plan = co_await make_merge_colocation_plan(nodes);
|
||||
auto level = dc_merge_plan.tablet_migration_count() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
|
||||
@@ -8,8 +8,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "tablet_allocator_fwd.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include <seastar/core/metrics.hh>
|
||||
@@ -181,6 +183,34 @@ struct tablet_rack_list_colocation_plan {
|
||||
}
|
||||
};
|
||||
|
||||
struct rf_change_completion_info {
|
||||
utils::UUID request_id;
|
||||
sstring ks_name;
|
||||
sstring error;
|
||||
std::unordered_map<sstring, sstring> saved_ks_props;
|
||||
};
|
||||
|
||||
struct rf_change_abort_info {
|
||||
utils::UUID request_id;
|
||||
sstring ks_name;
|
||||
sstring error;
|
||||
locator::replication_strategy_config_options current_replication;
|
||||
};
|
||||
|
||||
struct keyspace_rf_change_plan {
|
||||
std::optional<rf_change_completion_info> completion;
|
||||
std::vector<rf_change_abort_info> aborts;
|
||||
|
||||
size_t size() const { return (completion ? 1 : 0) + aborts.size(); };
|
||||
|
||||
void merge(keyspace_rf_change_plan&& other) {
|
||||
if (!completion) {
|
||||
completion = std::move(other.completion);
|
||||
}
|
||||
std::move(other.aborts.begin(), other.aborts.end(), std::back_inserter(aborts));
|
||||
}
|
||||
};
|
||||
|
||||
class migration_plan {
|
||||
public:
|
||||
using migrations_vector = utils::chunked_vector<tablet_migration_info>;
|
||||
@@ -189,19 +219,22 @@ private:
|
||||
table_resize_plan _resize_plan;
|
||||
tablet_repair_plan _repair_plan;
|
||||
tablet_rack_list_colocation_plan _rack_list_colocation_plan;
|
||||
keyspace_rf_change_plan _rf_change_plan;
|
||||
bool _has_nodes_to_drain = false;
|
||||
std::vector<drain_failure> _drain_failures;
|
||||
public:
|
||||
/// Returns true iff there are decommissioning nodes which own some tablet replicas.
|
||||
bool has_nodes_to_drain() const { return _has_nodes_to_drain; }
|
||||
bool requires_schema_changes() const { return _rf_change_plan.size() > 0; }
|
||||
|
||||
const migrations_vector& migrations() const { return _migrations; }
|
||||
bool empty() const { return !size(); }
|
||||
size_t size() const { return _migrations.size() + _resize_plan.size() + _repair_plan.size() + _rack_list_colocation_plan.size() + _drain_failures.size(); }
|
||||
size_t size() const { return _migrations.size() + _resize_plan.size() + _repair_plan.size() + _rack_list_colocation_plan.size() + _drain_failures.size() + _rf_change_plan.size(); }
|
||||
size_t tablet_migration_count() const { return _migrations.size(); }
|
||||
size_t resize_decision_count() const { return _resize_plan.size(); }
|
||||
size_t tablet_repair_count() const { return _repair_plan.size(); }
|
||||
size_t tablet_rack_list_colocation_count() const { return _rack_list_colocation_plan.size(); }
|
||||
size_t keyspace_rf_change_count() const { return _rf_change_plan.size(); }
|
||||
const std::vector<drain_failure>& drain_failures() const { return _drain_failures; }
|
||||
|
||||
void add(tablet_migration_info info) {
|
||||
@@ -225,6 +258,7 @@ public:
|
||||
_resize_plan.merge(std::move(other._resize_plan));
|
||||
_repair_plan.merge(std::move(other._repair_plan));
|
||||
_rack_list_colocation_plan.merge(std::move(other._rack_list_colocation_plan));
|
||||
_rf_change_plan.merge(std::move(other._rf_change_plan));
|
||||
}
|
||||
|
||||
void set_has_nodes_to_drain(bool b) {
|
||||
@@ -249,6 +283,12 @@ public:
|
||||
_rack_list_colocation_plan = std::move(rack_list_colocation_plan);
|
||||
}
|
||||
|
||||
const keyspace_rf_change_plan& rf_change_plan() const { return _rf_change_plan; }
|
||||
|
||||
void set_rf_change_plan(keyspace_rf_change_plan rf_change_plan) {
|
||||
_rf_change_plan = std::move(rf_change_plan);
|
||||
}
|
||||
|
||||
future<std::unordered_set<locator::global_tablet_id>> get_migration_tablet_ids() const;
|
||||
};
|
||||
|
||||
@@ -317,6 +357,9 @@ future<bool> requires_rack_list_colocation(
|
||||
db::system_keyspace* sys_ks,
|
||||
utils::UUID request_id);
|
||||
|
||||
bool rf_count_per_dc_equals(const locator::replication_strategy_config_options& current, const locator::replication_strategy_config_options& next);
|
||||
std::unordered_map<sstring, std::vector<sstring>> subtract_replication(const std::unordered_map<sstring, std::vector<sstring>>& left, const std::unordered_map<sstring, std::vector<sstring>>& right);
|
||||
|
||||
}
|
||||
|
||||
template <>
|
||||
|
||||
@@ -452,7 +452,7 @@ future<std::optional<tasks::task_status>> global_topology_request_virtual_task::
|
||||
}
|
||||
|
||||
future<> global_topology_request_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint) noexcept {
|
||||
return _ss.abort_paused_rf_change(id.uuid());
|
||||
return _ss.abort_rf_change(id.uuid());
|
||||
}
|
||||
|
||||
future<std::vector<tasks::task_stats>> global_topology_request_virtual_task::get_stats() {
|
||||
|
||||
@@ -414,6 +414,20 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
}
|
||||
};
|
||||
|
||||
future<> update_topology_state_with_mixed_change(
|
||||
group0_guard guard, utils::chunked_vector<canonical_mutation>&& updates, const sstring& reason) {
|
||||
try {
|
||||
rtlogger.info("updating topology state with mixed change: {}", reason);
|
||||
rtlogger.trace("update_topology_state mutations: {}", updates);
|
||||
mixed_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("race while changing state: {}. Retrying", reason);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
raft::server_id parse_replaced_node(const std::optional<request_param>& req_param) const {
|
||||
return service::topology::parse_replaced_node(req_param);
|
||||
}
|
||||
@@ -961,6 +975,63 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
}
|
||||
}
|
||||
|
||||
enum class keyspace_rf_change_kind {
|
||||
default_rf_change,
|
||||
conversion_to_rack_list,
|
||||
multi_rf_change
|
||||
};
|
||||
|
||||
future<keyspace_rf_change_kind> choose_keyspace_rf_change_kind(utils::UUID req_id,
|
||||
lw_shared_ptr<keyspace_metadata> old_ks_md,
|
||||
lw_shared_ptr<keyspace_metadata> new_ks_md,
|
||||
const std::vector<schema_ptr>& tables_with_mvs) {
|
||||
const auto& new_replication_strategy_config = new_ks_md->strategy_options();
|
||||
const auto& old_replication_strategy_config = old_ks_md->strategy_options();
|
||||
auto check_needs_colocation = [&] () -> future<bool> {
|
||||
bool rack_list_conversion = false;
|
||||
for (const auto& [dc, rf_value] : new_replication_strategy_config) {
|
||||
if (std::holds_alternative<locator::rack_list>(rf_value)) {
|
||||
auto it = old_replication_strategy_config.find(dc);
|
||||
if (it != old_replication_strategy_config.end() && std::holds_alternative<sstring>(it->second)) {
|
||||
rack_list_conversion = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return rack_list_conversion ? co_await requires_rack_list_colocation(_db, get_token_metadata_ptr(), &_sys_ks, req_id) : false;
|
||||
};
|
||||
auto all_changes_are_0_N = [&] {
|
||||
auto all_dcs = old_replication_strategy_config | std::views::keys;
|
||||
auto new_dcs = new_replication_strategy_config | std::views::keys;
|
||||
std::set<sstring> dcs(all_dcs.begin(), all_dcs.end());
|
||||
dcs.insert(new_dcs.begin(), new_dcs.end());
|
||||
for (const auto& dc : dcs) {
|
||||
auto old_it = old_replication_strategy_config.find(dc);
|
||||
auto new_it = new_replication_strategy_config.find(dc);
|
||||
size_t old_rf = (old_it != old_replication_strategy_config.end()) ? locator::get_replication_factor(old_it->second) : 0;
|
||||
size_t new_rf = (new_it != new_replication_strategy_config.end()) ? locator::get_replication_factor(new_it->second) : 0;
|
||||
if (old_rf == new_rf) {
|
||||
continue;
|
||||
}
|
||||
if (old_rf != 0 && new_rf != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
if (tables_with_mvs.empty()) {
|
||||
co_return keyspace_rf_change_kind::default_rf_change;
|
||||
}
|
||||
if (co_await check_needs_colocation()) {
|
||||
co_return keyspace_rf_change_kind::conversion_to_rack_list;
|
||||
}
|
||||
if (_feature_service.keyspace_multi_rf_change && locator::uses_rack_list_exclusively(old_replication_strategy_config) && locator::uses_rack_list_exclusively(new_replication_strategy_config) && !rf_count_per_dc_equals(old_replication_strategy_config, new_replication_strategy_config) && all_changes_are_0_N()) {
|
||||
co_return keyspace_rf_change_kind::multi_rf_change;
|
||||
}
|
||||
co_return keyspace_rf_change_kind::default_rf_change;
|
||||
}
|
||||
|
||||
// Precondition: there is no node request and no ongoing topology transition
|
||||
// (checked under the guard we're holding).
|
||||
future<> handle_global_request(group0_guard guard) {
|
||||
@@ -1016,9 +1087,18 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
saved_ks_props = *req_entry.new_keyspace_rf_change_data;
|
||||
}
|
||||
|
||||
auto tbuilder_with_request_drop = [&] () {
|
||||
topology_mutation_builder tbuilder(guard.write_timestamp());
|
||||
tbuilder.set_transition_state(topology::transition_state::tablet_migration)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
|
||||
return tbuilder;
|
||||
};
|
||||
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
sstring error;
|
||||
bool needs_colocation = false;
|
||||
if (_db.has_keyspace(ks_name)) {
|
||||
try {
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
@@ -1030,82 +1110,93 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
size_t unimportant_init_tablet_count = 2; // must be a power of 2
|
||||
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
|
||||
|
||||
auto schedule_migrations = [&] () -> future<> {
|
||||
auto tables_with_mvs = ks.metadata()->tables();
|
||||
auto views = ks.metadata()->views();
|
||||
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
|
||||
if (!tables_with_mvs.empty()) {
|
||||
auto table = tables_with_mvs.front();
|
||||
auto tablet_count = tmptr->tablets().get_tablet_map(table->id()).tablet_count();
|
||||
locator::replication_strategy_params params{ks_md->strategy_options(), tablet_count, ks.metadata()->consistency_option()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params, tmptr->get_topology());
|
||||
auto tables_with_mvs = ks.metadata()->tables();
|
||||
auto views = ks.metadata()->views();
|
||||
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
|
||||
auto rf_change_kind = co_await choose_keyspace_rf_change_kind(req_id, ks.metadata(), ks_md, tables_with_mvs);
|
||||
switch (rf_change_kind) {
|
||||
case keyspace_rf_change_kind::default_rf_change: {
|
||||
if (!tables_with_mvs.empty()) {
|
||||
auto table = tables_with_mvs.front();
|
||||
auto tablet_count = tmptr->tablets().get_tablet_map(table->id()).tablet_count();
|
||||
locator::replication_strategy_params params{ks_md->strategy_options(), tablet_count, ks.metadata()->consistency_option()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params, tmptr->get_topology());
|
||||
|
||||
auto check_needs_colocation = [&] () -> future<bool> {
|
||||
const auto& new_replication_strategy_config = new_strategy->get_config_options();
|
||||
const auto& old_replication_strategy_config = ks.metadata()->strategy_options();
|
||||
bool rack_list_conversion = false;
|
||||
for (const auto& [dc, rf_value] : new_replication_strategy_config) {
|
||||
if (std::holds_alternative<locator::rack_list>(rf_value)) {
|
||||
auto it = old_replication_strategy_config.find(dc);
|
||||
if (it != old_replication_strategy_config.end() && std::holds_alternative<sstring>(it->second)) {
|
||||
rack_list_conversion = true;
|
||||
break;
|
||||
for (const auto& table_or_mv : tables_with_mvs) {
|
||||
if (!tmptr->tablets().is_base_table(table_or_mv->id())) {
|
||||
// Apply the transition only on base tables.
|
||||
// If this table has a base table then the transition will be applied on the base table, and
|
||||
// the base table will coordinate the transition for the entire group.
|
||||
continue;
|
||||
}
|
||||
auto old_tablets = co_await tmptr->tablets().get_tablet_map(table_or_mv->id()).clone_gently();
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, co_await old_tablets.clone_gently());
|
||||
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id());
|
||||
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
|
||||
auto last_token = new_tablet_map.get_last_token(tablet_id);
|
||||
auto old_tablet_info = old_tablets.get_tablet_info(last_token);
|
||||
auto abandoning_replicas = locator::substract_sets(old_tablet_info.replicas, tablet_info.replicas);
|
||||
auto new_replicas = locator::substract_sets(tablet_info.replicas, old_tablet_info.replicas);
|
||||
if (abandoning_replicas.size() + new_replicas.size() > 1) {
|
||||
throw std::runtime_error(fmt::format("Invalid state of a tablet {} of a table {}.{}. Expected replication factor: {}, but the tablet has replicas only on {}. "
|
||||
"Try again later or use the \"Fixing invalid replica state with RF change\" procedure to fix the problem.", tablet_id, ks_name, table_or_mv->cf_name(),
|
||||
ks.get_replication_strategy().get_replication_factor(*tmptr), old_tablet_info.replicas));
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return rack_list_conversion ? co_await requires_rack_list_colocation(_db, tmptr, &_sys_ks, req_id) : false;
|
||||
};
|
||||
if (needs_colocation = co_await check_needs_colocation(); needs_colocation) {
|
||||
co_return;
|
||||
}
|
||||
for (const auto& table_or_mv : tables_with_mvs) {
|
||||
if (!tmptr->tablets().is_base_table(table_or_mv->id())) {
|
||||
// Apply the transition only on base tables.
|
||||
// If this table has a base table then the transition will be applied on the base table, and
|
||||
// the base table will coordinate the transition for the entire group.
|
||||
continue;
|
||||
}
|
||||
auto old_tablets = co_await tmptr->tablets().get_tablet_map(table_or_mv->id()).clone_gently();
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, co_await old_tablets.clone_gently());
|
||||
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id());
|
||||
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
|
||||
auto last_token = new_tablet_map.get_last_token(tablet_id);
|
||||
auto old_tablet_info = old_tablets.get_tablet_info(last_token);
|
||||
auto abandoning_replicas = locator::substract_sets(old_tablet_info.replicas, tablet_info.replicas);
|
||||
auto new_replicas = locator::substract_sets(tablet_info.replicas, old_tablet_info.replicas);
|
||||
if (abandoning_replicas.size() + new_replicas.size() > 1) {
|
||||
throw std::runtime_error(fmt::format("Invalid state of a tablet {} of a table {}.{}. Expected replication factor: {}, but the tablet has replicas only on {}. "
|
||||
"Try again later or use the \"Fixing invalid replica state with RF change\" procedure to fix the problem.", tablet_id, ks_name, table_or_mv->cf_name(),
|
||||
ks.get_replication_strategy().get_replication_factor(*tmptr), old_tablet_info.replicas));
|
||||
}
|
||||
updates.emplace_back(co_await make_canonical_mutation_gently(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id())
|
||||
.set_new_replicas(last_token, tablet_info.replicas)
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
|
||||
.build()
|
||||
));
|
||||
|
||||
updates.emplace_back(co_await make_canonical_mutation_gently(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id())
|
||||
.set_new_replicas(last_token, tablet_info.replicas)
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
|
||||
.build()
|
||||
));
|
||||
|
||||
// Calculate abandoning replica and abort view building tasks on them
|
||||
if (!abandoning_replicas.empty()) {
|
||||
if (abandoning_replicas.size() != 1) {
|
||||
on_internal_error(rtlogger, fmt::format("Keyspace RF abandons {} replicas for table {} and tablet id {}", abandoning_replicas.size(), table_or_mv->id(), tablet_id));
|
||||
// Calculate abandoning replica and abort view building tasks on them
|
||||
if (!abandoning_replicas.empty()) {
|
||||
if (abandoning_replicas.size() != 1) {
|
||||
on_internal_error(rtlogger, fmt::format("Keyspace RF abandons {} replicas for table {} and tablet id {}", abandoning_replicas.size(), table_or_mv->id(), tablet_id));
|
||||
}
|
||||
_vb_coordinator->abort_tasks(updates, guard, table_or_mv->id(), *abandoning_replicas.begin(), last_token);
|
||||
}
|
||||
_vb_coordinator->abort_tasks(updates, guard, table_or_mv->id(), *abandoning_replicas.begin(), last_token);
|
||||
}
|
||||
|
||||
co_await coroutine::maybe_yield();
|
||||
});
|
||||
co_await coroutine::maybe_yield();
|
||||
});
|
||||
}
|
||||
}
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
updates.emplace_back(m);
|
||||
}
|
||||
|
||||
updates.push_back(canonical_mutation(tbuilder_with_request_drop().build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
|
||||
.done()
|
||||
.build()));
|
||||
break;
|
||||
}
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
updates.emplace_back(m);
|
||||
case keyspace_rf_change_kind::conversion_to_rack_list: {
|
||||
rtlogger.info("keyspace_rf_change for keyspace {} postponed for colocation", ks_name);
|
||||
topology_mutation_builder tbuilder = tbuilder_with_request_drop();
|
||||
tbuilder.pause_rf_change_request(req_id);
|
||||
updates.push_back(canonical_mutation(tbuilder.build()));
|
||||
break;
|
||||
}
|
||||
};
|
||||
co_await schedule_migrations();
|
||||
case keyspace_rf_change_kind::multi_rf_change: {
|
||||
rtlogger.info("keyspace_rf_change for keyspace {} will use multi-rf change procedure", ks_name);
|
||||
ks_md->set_next_strategy_options(ks_md->strategy_options());
|
||||
ks_md->set_strategy_options(ks.metadata()->strategy_options()); // start from the old strategy
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
updates.emplace_back(m);
|
||||
}
|
||||
|
||||
topology_mutation_builder tbuilder = tbuilder_with_request_drop();
|
||||
tbuilder.start_rf_change_migrations(req_id);
|
||||
updates.push_back(canonical_mutation(tbuilder.build()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, desired new ks opts: {}, error: {}",
|
||||
@@ -1116,22 +1207,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist";
|
||||
}
|
||||
|
||||
bool pause_request = needs_colocation && error.empty();
|
||||
topology_mutation_builder tbuilder(guard.write_timestamp());
|
||||
tbuilder.set_transition_state(topology::transition_state::tablet_migration)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
|
||||
if (pause_request) {
|
||||
rtlogger.info("keyspace_rf_change for keyspace {} postponed for colocation", ks_name);
|
||||
tbuilder.pause_rf_change_request(req_id);
|
||||
} else {
|
||||
if (error != "") {
|
||||
updates.push_back(canonical_mutation(tbuilder_with_request_drop().build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
|
||||
.done(error)
|
||||
.build()));
|
||||
}
|
||||
updates.push_back(canonical_mutation(tbuilder.build()));
|
||||
|
||||
sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props);
|
||||
rtlogger.trace("do update {} reason {}", updates, reason);
|
||||
@@ -1615,6 +1696,83 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
.build());
|
||||
}
|
||||
|
||||
// Updates keyspace properties; removes system_schema.keyspaces::next_replication;
|
||||
// finishes RF change request; Removes request from system.topology::ongoing_rf_changes.
|
||||
void generate_rf_change_completion_update(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const rf_change_completion_info& completion) {
|
||||
if (rtlogger.is_enabled(seastar::log_level::debug)) {
|
||||
sstring props_str;
|
||||
for (const auto& [key, value] : completion.saved_ks_props) {
|
||||
props_str += fmt::format(" {}={};", key, value);
|
||||
}
|
||||
rtlogger.debug("generate_rf_change_completion_update: request_id={}, ks_name={}, error='{}', saved_ks_props:{}",
|
||||
completion.request_id, completion.ks_name, completion.error, props_str);
|
||||
}
|
||||
sstring error = completion.error;
|
||||
if (_db.has_keyspace(completion.ks_name)) {
|
||||
auto& ks = _db.find_keyspace(completion.ks_name);
|
||||
if (error.empty()) {
|
||||
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{completion.saved_ks_props.begin(), completion.saved_ks_props.end()}};
|
||||
new_ks_props.validate();
|
||||
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *get_token_metadata_ptr(), _db.features(), _db.get_config());
|
||||
ks_md->clear_next_strategy_options();
|
||||
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
out.emplace_back(m);
|
||||
}
|
||||
} else {
|
||||
auto ks_md = make_lw_shared<data_dictionary::keyspace_metadata>(*ks.metadata());
|
||||
ks_md->clear_next_strategy_options();
|
||||
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
out.emplace_back(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
out.emplace_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.finish_rf_change_migrations(_topo_sm._topology.ongoing_rf_changes, completion.request_id)
|
||||
.build());
|
||||
|
||||
out.push_back(canonical_mutation(topology_request_tracking_mutation_builder(completion.request_id)
|
||||
.done(error)
|
||||
.build()));
|
||||
}
|
||||
|
||||
// Sets next_replication to current_replication and sets error on the topology request.
|
||||
// Similar to storage_service::abort_rf_change for the ongoing_rf_changes case.
|
||||
void generate_rf_change_abort_update(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const rf_change_abort_info& abort_info) {
|
||||
rtlogger.debug("generate_rf_change_abort_update: request_id={}, ks_name={}, error='{}'", abort_info.request_id, abort_info.ks_name, abort_info.error);
|
||||
|
||||
if (!_db.has_keyspace(abort_info.ks_name)) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto& ks = _db.find_keyspace(abort_info.ks_name);
|
||||
auto ks_md = make_lw_shared<data_dictionary::keyspace_metadata>(*ks.metadata());
|
||||
ks_md->set_next_strategy_options(abort_info.current_replication);
|
||||
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m : schema_muts) {
|
||||
out.emplace_back(m);
|
||||
}
|
||||
|
||||
out.push_back(canonical_mutation(topology_request_tracking_mutation_builder(abort_info.request_id)
|
||||
.abort(abort_info.error)
|
||||
.build()));
|
||||
}
|
||||
|
||||
future<> generate_rf_change_updates(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const keyspace_rf_change_plan& rf_change_plan) {
|
||||
for (const auto& abort_info : rf_change_plan.aborts) {
|
||||
co_await coroutine::maybe_yield();
|
||||
generate_rf_change_abort_update(out, guard, abort_info);
|
||||
}
|
||||
if (rf_change_plan.completion.has_value()) {
|
||||
generate_rf_change_completion_update(out, guard, *rf_change_plan.completion);
|
||||
}
|
||||
}
|
||||
|
||||
future<> generate_migration_updates(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const migration_plan& plan) {
|
||||
if (plan.resize_plan().finalize_resize.empty() || plan.has_nodes_to_drain()) {
|
||||
// schedule tablet migration only if there are no pending resize finalisations or if the node is draining.
|
||||
@@ -1637,6 +1795,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
if (auto request_to_resume = plan.rack_list_colocation_plan().request_to_resume(); request_to_resume) {
|
||||
generate_rf_change_resume_update(out, guard, request_to_resume);
|
||||
}
|
||||
|
||||
co_await generate_rf_change_updates(out, guard, plan.rf_change_plan());
|
||||
}
|
||||
|
||||
auto sched_time = db_clock::now();
|
||||
@@ -2225,9 +2385,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
}
|
||||
|
||||
bool has_nodes_to_drain = false;
|
||||
bool requires_schema_changes = false;
|
||||
if (!preempt) {
|
||||
auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr(), &_topo_sm._topology, &_sys_ks, {}, get_dead_nodes());
|
||||
has_nodes_to_drain = plan.has_nodes_to_drain();
|
||||
requires_schema_changes = plan.requires_schema_changes();
|
||||
if (!drain || plan.has_nodes_to_drain()) {
|
||||
co_await generate_migration_updates(updates, guard, plan);
|
||||
}
|
||||
@@ -2243,7 +2405,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), format("Tablet migration"));
|
||||
if (requires_schema_changes) {
|
||||
co_await update_topology_state_with_mixed_change(std::move(guard), std::move(updates), format("Tablet migration"));
|
||||
} else {
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), format("Tablet migration"));
|
||||
}
|
||||
}
|
||||
|
||||
if (needs_barrier) {
|
||||
@@ -4134,7 +4300,11 @@ future<std::optional<group0_guard>> topology_coordinator::maybe_start_tablet_mig
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "Starting tablet migration");
|
||||
if (plan.requires_schema_changes()) {
|
||||
co_await update_topology_state_with_mixed_change(std::move(guard), std::move(updates), "Starting tablet migration");
|
||||
} else {
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "Starting tablet migration");
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
|
||||
@@ -283,6 +283,20 @@ topology_mutation_builder& topology_mutation_builder::resume_rf_change_request(c
|
||||
}
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::start_rf_change_migrations(const utils::UUID& id) {
|
||||
return apply_set("ongoing_rf_changes", collection_apply_mode::update, std::vector<data_value>{id});
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::finish_rf_change_migrations(const std::unordered_set<utils::UUID>& values, const utils::UUID& id) {
|
||||
if (values.contains(id)) {
|
||||
auto new_values = values;
|
||||
new_values.erase(id);
|
||||
return apply_set("ongoing_rf_changes", collection_apply_mode::overwrite, new_values | std::views::transform([] (const auto& id) { return data_value{id}; }));
|
||||
} else {
|
||||
return *this;
|
||||
}
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_upgrade_state_done() {
|
||||
return apply_atomic("upgrade_state", "done");
|
||||
}
|
||||
@@ -353,6 +367,10 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
|
||||
return _set_type ? builder_base::set(cell, value) : *this;
|
||||
}
|
||||
|
||||
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::abort(sstring error) {
|
||||
return set("error", error);
|
||||
}
|
||||
|
||||
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
|
||||
set("end_time", db_clock::now());
|
||||
if (error) {
|
||||
|
||||
@@ -132,6 +132,8 @@ public:
|
||||
topology_mutation_builder& drop_first_global_topology_request_id(const std::vector<utils::UUID>&, const utils::UUID&);
|
||||
topology_mutation_builder& pause_rf_change_request(const utils::UUID&);
|
||||
topology_mutation_builder& resume_rf_change_request(const std::unordered_set<utils::UUID>&, const utils::UUID&);
|
||||
topology_mutation_builder& start_rf_change_migrations(const utils::UUID&);
|
||||
topology_mutation_builder& finish_rf_change_migrations(const std::unordered_set<utils::UUID>&, const utils::UUID&);
|
||||
topology_node_mutation_builder& with_node(raft::server_id);
|
||||
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
|
||||
};
|
||||
@@ -156,6 +158,7 @@ public:
|
||||
using builder_base::del;
|
||||
topology_request_tracking_mutation_builder& set(const char* cell, topology_request value);
|
||||
topology_request_tracking_mutation_builder& set(const char* cell, global_topology_request value);
|
||||
topology_request_tracking_mutation_builder& abort(sstring error);
|
||||
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
|
||||
topology_request_tracking_mutation_builder& set_truncate_table_data(const table_id& table_id);
|
||||
topology_request_tracking_mutation_builder& set_new_keyspace_rf_change_data(const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc);
|
||||
|
||||
@@ -209,6 +209,10 @@ struct topology {
|
||||
// It may happen during altering from numerical RF to rack list.
|
||||
std::unordered_set<utils::UUID> paused_rf_change_requests;
|
||||
|
||||
// The ids of ongoing RF change requests.
|
||||
// Here we keep the ids only for rf-changes using rack_lists.
|
||||
std::unordered_set<utils::UUID> ongoing_rf_changes;
|
||||
|
||||
// The IDs of the committed yet unpublished CDC generations sorted by timestamps.
|
||||
std::vector<cdc::generation_id> unpublished_cdc_generations;
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ public:
|
||||
// The exception will include a complete backtrace, so no need to add call-site identifiers to the message.
|
||||
inline void parse_assert(bool condition, std::optional<component_name> filename = {}, const char* message = nullptr) {
|
||||
if (!condition) [[unlikely]] {
|
||||
on_parse_error(message, filename);
|
||||
on_parse_error(message ? message : sstring(), filename);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2538,6 +2538,14 @@ future<> sstable::write_components(
|
||||
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats] () mutable {
|
||||
auto close_mr = deferred_close(mr);
|
||||
auto wr = get_writer(*schema, estimated_partitions, cfg, stats);
|
||||
utils::get_local_injector().inject("write_components_writer_created", [&schema] (auto& handler) -> future<> {
|
||||
if (schema->ks_name() != handler.get("ks_name") || schema->cf_name() != handler.get("cf_name")) {
|
||||
co_return;
|
||||
}
|
||||
sstlog.info("write_components_writer_created: waiting for message");
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds(30));
|
||||
sstlog.info("write_components_writer_created: message received");
|
||||
}).get();
|
||||
mr.consume_in_thread(std::move(wr));
|
||||
}).finally([this] {
|
||||
assert_large_data_handler_is_running();
|
||||
|
||||
@@ -538,6 +538,17 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
|
||||
}
|
||||
});
|
||||
// TemporaryHashes is not tracked in recognized_components (and thus
|
||||
// not in all_components()), because it is a transient file created
|
||||
// during SSTable writing and removed before sealing. If the write
|
||||
// failed before sealing, the file may still be on disk and must be
|
||||
// cleaned up explicitly.
|
||||
// Use file_exists() to avoid a C++ exception on the common path
|
||||
// where the file was already removed before sealing.
|
||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||
if (co_await file_exists(temp_hashes)) {
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||
}
|
||||
if (sync) {
|
||||
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
#include <cfloat>
|
||||
#include <algorithm>
|
||||
@@ -560,6 +561,9 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::
|
||||
size_t nr_sst_current = 0;
|
||||
|
||||
while (!sstables.empty()) {
|
||||
co_await utils::get_local_injector().inject("load_and_stream_before_streaming_batch",
|
||||
utils::wait_for_message(60s));
|
||||
|
||||
const size_t batch_sst_nr = std::min(16uz, sstables.size());
|
||||
auto sst_processed = sstables
|
||||
| std::views::reverse
|
||||
@@ -723,8 +727,18 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
// throughout its lifetime.
|
||||
auto erm = co_await await_topology_quiesced_and_get_erm(table_id);
|
||||
|
||||
// Obtain a phaser guard to prevent the table from being destroyed
|
||||
// while streaming is in progress. table::stop() calls
|
||||
// _pending_streams_phaser.close() which blocks until all outstanding
|
||||
// stream_in_progress() guards are released, so holding this guard
|
||||
// keeps the table alive for the entire streaming operation.
|
||||
// find_column_family throws no_such_column_family if the table was
|
||||
// already dropped before we got here.
|
||||
auto& tbl = _db.local().find_column_family(table_id);
|
||||
auto stream_guard = tbl.stream_in_progress();
|
||||
|
||||
std::unique_ptr<sstable_streamer> streamer;
|
||||
if (_db.local().find_column_family(table_id).uses_tablets()) {
|
||||
if (tbl.uses_tablets()) {
|
||||
streamer =
|
||||
std::make_unique<tablet_sstable_streamer>(_messaging, _db, table_id, std::move(erm), std::move(sstables), primary, unlink_sstables(unlink), scope);
|
||||
} else {
|
||||
|
||||
@@ -93,8 +93,6 @@
|
||||
#include <ctime>
|
||||
#include <deque>
|
||||
|
||||
#include "utils/rolling_max_tracker.hh"
|
||||
|
||||
#include <endian.h>
|
||||
#include <exception>
|
||||
#if __has_include(<execinfo.h>)
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "db/view/view_update_checks.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "replica/exceptions.hh"
|
||||
#include "streaming/stream_mutation_fragments_cmd.hh"
|
||||
#include "consumer.hh"
|
||||
#include "readers/generating.hh"
|
||||
@@ -147,9 +148,9 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
})));
|
||||
}
|
||||
|
||||
auto get_next_mutation_fragment = [guard = std::move(guard), &as, &sm = container(), source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
|
||||
auto get_next_mutation_fragment = [guard = std::move(guard), &as, &sm = container(), &db = _db, source, plan_id, from, s, cmd_status, offstrategy_update, permit] () mutable {
|
||||
guard.check();
|
||||
return source().then([&sm, &guard, &as, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
|
||||
return source().then([&sm, &db, &guard, &as, plan_id, from, s, cmd_status, offstrategy_update, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
|
||||
if (opt) {
|
||||
auto cmd = std::get<1>(*opt);
|
||||
if (cmd) {
|
||||
@@ -183,11 +184,14 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
co_await sleep_abortable(std::chrono::milliseconds(5), as_);
|
||||
}
|
||||
sslog.info("stream_mutation_fragments: released");
|
||||
}).then([mf = std::move(mf)] () mutable {
|
||||
}).then([mf = std::move(mf), &db] () mutable {
|
||||
if (utils::get_local_injector().is_enabled("stream_mutation_fragments_rx_error")) {
|
||||
sslog.info("stream_mutation_fragments_rx_error: throw");
|
||||
throw std::runtime_error("stream_mutation_fragments_rx_error");
|
||||
}
|
||||
if (db.local().is_in_critical_disk_utilization_mode()) {
|
||||
throw replica::critical_disk_utilization_exception("rejected streamed mutation fragment");
|
||||
}
|
||||
return mutation_fragment_opt(std::move(mf));
|
||||
});
|
||||
} else {
|
||||
|
||||
5
test.py
5
test.py
@@ -326,7 +326,10 @@ async def main() -> int:
|
||||
except Exception as e:
|
||||
print(palette.fail(e))
|
||||
raise
|
||||
|
||||
if exit_code == 5:
|
||||
print(palette.fail("No tests were collected. Please check the test names and modes you provided, as well as"
|
||||
"the test markers if you used the '--markers' option."
|
||||
"Alternatively you can check with --list option if there any errors."))
|
||||
if 'coverage' in options.modes:
|
||||
coverage.generate_coverage_report(path_to("coverage", "tests"))
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from test.alternator.util import create_test_table, is_aws, scylla_log
|
||||
from test.conftest import dynamic_scope
|
||||
from test.cqlpy.conftest import host # add required fixtures
|
||||
from test.pylib.driver_utils import safe_driver_shutdown
|
||||
from test.pylib.skip_types import skip_env
|
||||
from test.pylib.suite.python import add_host_option
|
||||
from urllib.parse import urlparse
|
||||
from functools import cache
|
||||
@@ -198,7 +199,7 @@ def dynamodbstreams(request, get_valid_alternator_role):
|
||||
def dynamodb_test_connection(dynamodb, request, optional_rest_api):
|
||||
scylla_log(optional_rest_api, f'test/alternator: Starting {request.node.parent.name}::{request.node.name}', 'info')
|
||||
if dynamodb_test_connection.scylla_crashed:
|
||||
pytest.skip('Server down')
|
||||
skip_env('Server down')
|
||||
yield
|
||||
try:
|
||||
# We want to run a do-nothing DynamoDB command. The health-check
|
||||
@@ -345,7 +346,7 @@ def filled_test_table(dynamodb):
|
||||
@pytest.fixture(scope=dynamic_scope())
|
||||
def scylla_only(dynamodb):
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only feature not supported by AWS')
|
||||
skip_env('Scylla-only feature not supported by AWS')
|
||||
|
||||
# "dynamodb_bug" is similar to "scylla_only", except instead of skipping
|
||||
# the test, it is expected to fail (xfail) on AWS DynamoDB. It should be
|
||||
@@ -365,7 +366,7 @@ def dynamodb_bug(dynamodb):
|
||||
@pytest.fixture(scope=dynamic_scope())
|
||||
def rest_api(dynamodb, optional_rest_api):
|
||||
if optional_rest_api is None:
|
||||
pytest.skip('Cannot connect to Scylla REST API')
|
||||
skip_env('Cannot connect to Scylla REST API')
|
||||
return optional_rest_api
|
||||
@pytest.fixture(scope=dynamic_scope())
|
||||
def optional_rest_api(dynamodb):
|
||||
@@ -417,7 +418,7 @@ def xfail_tablets(request, has_tablets):
|
||||
@pytest.fixture(scope="function")
|
||||
def skip_tablets(has_tablets):
|
||||
if has_tablets:
|
||||
pytest.skip("Test may crash when Alternator tables use tablets")
|
||||
skip_env("Test may crash when Alternator tables use tablets")
|
||||
|
||||
# Alternator tests normally use only the DynamoDB API. However, a few tests
|
||||
# need to use CQL to set up Scylla-only features such as service levels or
|
||||
@@ -432,7 +433,7 @@ def cql(dynamodb):
|
||||
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
|
||||
from cassandra.policies import RoundRobinPolicy
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only CQL API not supported by AWS')
|
||||
skip_env('Scylla-only CQL API not supported by AWS')
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
host, = re.search(r'.*://([^:]*):', url).groups()
|
||||
profile = ExecutionProfile(
|
||||
@@ -453,6 +454,6 @@ def cql(dynamodb):
|
||||
# "BEGIN BATCH APPLY BATCH" is the closest to do-nothing I could find
|
||||
ret.execute("BEGIN BATCH APPLY BATCH")
|
||||
except NoHostAvailable:
|
||||
pytest.skip('Could not connect to Scylla-only CQL API')
|
||||
skip_env('Could not connect to Scylla-only CQL API')
|
||||
yield ret
|
||||
safe_driver_shutdown(cluster)
|
||||
|
||||
@@ -21,6 +21,7 @@ from functools import cache
|
||||
|
||||
import re
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
from .util import unique_table_name, random_string, new_test_table
|
||||
from .test_gsi_updatetable import wait_for_gsi, wait_for_gsi_gone
|
||||
from .test_gsi import assert_index_query
|
||||
@@ -1143,7 +1144,7 @@ def test_rbac_system_table_write(dynamodb, cql, test_table_s):
|
||||
ExpressionAttributeValues={':val': old_val})
|
||||
except Exception as e:
|
||||
if 'alternator_allow_system_table_write' in str(e):
|
||||
pytest.skip('need alternator_allow_system_table_write=true')
|
||||
skip_env('need alternator_allow_system_table_write=true')
|
||||
else:
|
||||
raise
|
||||
with new_role(cql) as (role, key):
|
||||
|
||||
@@ -33,6 +33,8 @@ import urllib.parse
|
||||
from contextlib import contextmanager
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
from .util import new_test_table, scylla_config_temporary
|
||||
from .test_cql_rbac import new_dynamodb, new_role
|
||||
|
||||
@@ -86,7 +88,7 @@ def local_process_id(ip, port):
|
||||
|
||||
# A fixture to find the Scylla log file, returning the log file's path.
|
||||
# If the log file cannot be found, or it's not Scylla, the fixture calls
|
||||
# pytest.skip() to skip any test which uses it. The fixture has module
|
||||
# skip_env() to skip any test which uses it. The fixture has module
|
||||
# scope, so looking for the log file only happens once. Individual tests
|
||||
# should use the function-scope fixture "logfile" below, which takes care
|
||||
# of opening the log file for reading in the right place.
|
||||
@@ -108,18 +110,18 @@ def logfile_path(dynamodb):
|
||||
port = 443 if p.scheme == 'https' else 80
|
||||
pid = local_process_id(ip, port)
|
||||
if not pid:
|
||||
pytest.skip("Can't find local process")
|
||||
skip_env("Can't find local process")
|
||||
# Now that we know the process id, use /proc to find if its standard
|
||||
# output is redirected to a file. If it is, that's the log file. If it
|
||||
# isn't a file, we don't known where the user is writing the log...
|
||||
try:
|
||||
log = os.readlink(f'/proc/{pid}/fd/1')
|
||||
except:
|
||||
pytest.skip("Can't find local log file")
|
||||
skip_env("Can't find local log file")
|
||||
# If the process's standard output is some pipe or device, it's
|
||||
# not the log file we were hoping for...
|
||||
if not log.startswith('/') or not os.path.isfile(log):
|
||||
pytest.skip("Can't find local log file")
|
||||
skip_env("Can't find local log file")
|
||||
# Scylla can be configured to put the log in syslog, not in the standard
|
||||
# output. So let's verify that the file which we found actually looks
|
||||
# like a Scylla log and isn't just empty or something... The Scylla log
|
||||
@@ -127,7 +129,7 @@ def logfile_path(dynamodb):
|
||||
with open(log, 'r') as f:
|
||||
head = f.read(7)
|
||||
if head != 'Scylla ':
|
||||
pytest.skip("Not a Scylla log file")
|
||||
skip_env("Not a Scylla log file")
|
||||
yield log
|
||||
|
||||
# The "logfile" fixture returns the log file open for reading at the end.
|
||||
|
||||
@@ -13,6 +13,8 @@ import urllib3
|
||||
from botocore.exceptions import BotoCoreError, ClientError
|
||||
from packaging.version import Version
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
from test.alternator.util import random_bytes, random_string, get_signed_request, manual_request, ManualRequestError
|
||||
|
||||
|
||||
@@ -102,7 +104,7 @@ def test_too_large_request(dynamodb, test_table):
|
||||
|
||||
def test_too_large_request_chunked(dynamodb, test_table):
|
||||
if Version(urllib3.__version__) < Version('1.26'):
|
||||
pytest.skip("urllib3 before 1.26.0 threw broken pipe and did not read response and cause issue #8195. Fixed by pull request urllib3/urllib3#1524")
|
||||
skip_env("urllib3 before 1.26.0 threw broken pipe and did not read response and cause issue #8195. Fixed by pull request urllib3/urllib3#1524")
|
||||
# To make a request very large, we just stuff it with a lot of spaces :-)
|
||||
spaces = ' ' * (17 * 1024 * 1024)
|
||||
req = get_signed_request(dynamodb, 'PutItem',
|
||||
@@ -126,7 +128,7 @@ def test_too_large_request_chunked(dynamodb, test_table):
|
||||
@pytest.mark.parametrize("mb", [17, 50])
|
||||
def test_too_large_request_content_length(dynamodb, test_table, mb):
|
||||
if Version(urllib3.__version__) < Version('1.26'):
|
||||
pytest.skip("urllib3 before 1.26.0 threw broken pipe and did not read response and cause issue #8195. Fixed by pull request urllib3/urllib3#1524")
|
||||
skip_env("urllib3 before 1.26.0 threw broken pipe and did not read response and cause issue #8195. Fixed by pull request urllib3/urllib3#1524")
|
||||
spaces = ' ' * (mb * 1024 * 1024)
|
||||
req = get_signed_request(dynamodb, 'PutItem',
|
||||
'{"TableName": "' + test_table.name + '", ' + spaces + '"Item": {"p": {"S": "x"}, "c": {"S": "x"}}}')
|
||||
|
||||
@@ -36,6 +36,7 @@ from botocore.exceptions import ClientError
|
||||
from test.alternator.test_cql_rbac import new_dynamodb, new_role
|
||||
from test.alternator.util import random_string, new_test_table, is_aws, scylla_config_read, scylla_config_temporary, get_signed_request
|
||||
from test.alternator.test_vector import vs
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
|
||||
# are not available on AWS (of course), but may also not be available for
|
||||
@@ -46,7 +47,7 @@ from test.alternator.test_vector import vs
|
||||
@pytest.fixture(scope="module")
|
||||
def metrics(dynamodb):
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only feature not supported by AWS')
|
||||
skip_env('Scylla-only feature not supported by AWS')
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
# The Prometheus API is on port 9180, and always http, not https.
|
||||
url = re.sub(r':[0-9]+(/|$)', ':9180', url)
|
||||
@@ -54,7 +55,7 @@ def metrics(dynamodb):
|
||||
url = url + '/metrics'
|
||||
resp = requests.get(url)
|
||||
if resp.status_code != 200:
|
||||
pytest.skip('Metrics port 9180 is not available')
|
||||
skip_env('Metrics port 9180 is not available')
|
||||
yield url
|
||||
|
||||
# Utility function for fetching all metrics from Scylla, using an HTTP request
|
||||
@@ -891,15 +892,15 @@ def test_total_operations(dynamodb, metrics):
|
||||
def alternator_ttl_period_in_seconds(dynamodb, request):
|
||||
# If not running on Scylla, skip the test
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only test skipped')
|
||||
skip_env('Scylla-only test skipped')
|
||||
# In Scylla, we can inspect the configuration via a system table
|
||||
# (which is also visible in Alternator)
|
||||
period = scylla_config_read(dynamodb, 'alternator_ttl_period_in_seconds')
|
||||
if period is None:
|
||||
pytest.skip('missing TTL feature, skipping test')
|
||||
skip_env('missing TTL feature, skipping test')
|
||||
period = float(period)
|
||||
if period > 1 and not request.config.getoption('runveryslow'):
|
||||
pytest.skip('need --runveryslow option to run')
|
||||
skip_env('need --runveryslow option to run')
|
||||
return period
|
||||
|
||||
# Test metrics of the background expiration thread run for Alternator's TTL
|
||||
|
||||
@@ -9,6 +9,7 @@ import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.alternator.util import random_string
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
|
||||
# Test trivial support for the ReturnValues parameter in PutItem, UpdateItem
|
||||
@@ -90,7 +91,7 @@ def skip_if_returnvalues_on_condition_check_failure_not_supported():
|
||||
from packaging.version import Version
|
||||
# This release added support for ReturnValuesOnConditionCheckFailure
|
||||
if (Version(botocore.__version__) < Version('1.29.164')):
|
||||
pytest.skip("Botocore version 1.29.164 or above required to run this test")
|
||||
skip_env("Botocore version 1.29.164 or above required to run this test")
|
||||
|
||||
# Testing ReturnValuesOnConditionCheckFailure feature which returns values only
|
||||
# on failed condition expression.
|
||||
|
||||
@@ -15,6 +15,7 @@ from boto3.dynamodb.types import TypeDeserializer
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.alternator.util import is_aws, scylla_config_temporary, unique_table_name, create_test_table, new_test_table, random_string, full_scan, freeze, list_tables, get_region, manual_request
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
TAGS = []
|
||||
# The following fixture is to ensure that tests in this module will be tested with both vnodes and tablets.
|
||||
@@ -29,7 +30,7 @@ TAGS = []
|
||||
], ids=["using vnodes", "using tablets"], autouse=True)
|
||||
def tags_param(request, dynamodb):
|
||||
if is_aws(dynamodb) and request.param[0].get('Value') != 'none':
|
||||
pytest.skip('vnodes/tablets parameterization not applicable on AWS')
|
||||
skip_env('vnodes/tablets parameterization not applicable on AWS')
|
||||
# Set TAGS in the global namespace of this module
|
||||
global TAGS
|
||||
TAGS = request.param
|
||||
@@ -179,6 +180,136 @@ def test_list_streams_paged(dynamodb, dynamodbstreams):
|
||||
break
|
||||
streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn'])
|
||||
|
||||
# The previous test (test_list_streams_paged) verifies that paging in
|
||||
# ListStreams works by passing the cookie returned as LastEvaluatedStreamArn
|
||||
# from one call, into ExclusiveStartStreamArn given to the next call. But that
|
||||
# test did not check what the value of this "ExclusiveStartStreamArn" looks
|
||||
# like. The DynamoDB documentation says that it should be "The stream ARN of
|
||||
# the item where the operation stopped", so in this test we check that it
|
||||
# indeed is the last returned ARN - and not some opaque cookie of a different
|
||||
# form.
|
||||
# This test also verifies that the final page has no LastEvaluatedStreamArn,
|
||||
# something which test_list_streams_paged also did not check.
|
||||
def test_list_streams_paged_cookie(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1:
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2:
|
||||
wait_for_active_stream(dynamodbstreams, table1)
|
||||
wait_for_active_stream(dynamodbstreams, table2)
|
||||
# Page through all streams one at a time, checking on every page
|
||||
# that LastEvaluatedStreamArn equals the StreamArn of the last
|
||||
# returned stream. The DynamoDB documentation says it should be
|
||||
# "The stream ARN of the item where the operation stopped" - i.e.,
|
||||
# the actual ARN, not an opaque cookie of a different form.
|
||||
# We also verify that the final page has no LastEvaluatedStreamArn,
|
||||
# indicating the end of the list.
|
||||
seen_arns = []
|
||||
response = dynamodbstreams.list_streams(Limit=1)
|
||||
while True:
|
||||
assert 'Streams' in response
|
||||
# All but the last page must return exactly one stream because
|
||||
# we used Limit=1, but the last page may return 0 or 1 stream.
|
||||
if 'LastEvaluatedStreamArn' not in response:
|
||||
# Reached the last page.
|
||||
assert len(response['Streams']) <= 1
|
||||
if response['Streams']:
|
||||
seen_arns.append(response['Streams'][0]['StreamArn'])
|
||||
break
|
||||
assert len(response['Streams']) == 1
|
||||
seen_arns.append(response['Streams'][0]['StreamArn'])
|
||||
# The cookie must equal the StreamArn of the last returned item.
|
||||
assert response['LastEvaluatedStreamArn'] == response['Streams'][-1]['StreamArn']
|
||||
response = dynamodbstreams.list_streams(
|
||||
Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn'])
|
||||
# For completeness, validate that both test tables were listed in
|
||||
# the result, and there were no duplicates.
|
||||
assert len(seen_arns) == len(set(seen_arns))
|
||||
for table in [table1, table2]:
|
||||
table_arns = [s['StreamArn'] for s in
|
||||
dynamodbstreams.list_streams(TableName=table.name)['Streams']]
|
||||
assert any(arn in seen_arns for arn in table_arns)
|
||||
|
||||
# If the last page of ListStreams results is not full because there are no
|
||||
# more streams to list, it shouldn't have LastEvaluatedStreamArn: It's silly
|
||||
# to return one and then force the user to retrieve another empty page).
|
||||
# Note that we focus on the case of the last page not being full because if
|
||||
# the last page *is* full (list Limit streams), the implementation may not be
|
||||
# aware that there are no more streams to list.
|
||||
def test_list_streams_unfull_last_page_no_cookie(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
||||
wait_for_active_stream(dynamodbstreams, table)
|
||||
# With a very high Limit the page is definitely not full, so
|
||||
# LastEvaluatedStreamArn must be absent. Limit=100 is not very
|
||||
# high, but DynamoDB doesn't allow any higher and it is very
|
||||
# unlikely we'll ever run this test with 100 live streams so
|
||||
# we can expect the page to not be full.
|
||||
response = dynamodbstreams.list_streams(Limit=100)
|
||||
assert 'Streams' in response
|
||||
assert len(response['Streams']) >= 1
|
||||
assert 'LastEvaluatedStreamArn' not in response
|
||||
|
||||
# Test what happens if we do a paging ListStreams, a page returns as
|
||||
# LastEvaluatedStreamArn an ARN of some table's stream, but then we delete
|
||||
# that table before retrieving the next page. The paging should be able to
|
||||
# continue without error, and see the rest of the streams that we haven't
|
||||
# seen yet.
|
||||
def test_list_streams_paged_resume_on_deleted_table(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1:
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2:
|
||||
wait_for_active_stream(dynamodbstreams, table1)
|
||||
wait_for_active_stream(dynamodbstreams, table2)
|
||||
# Page through the streams one at a time (Limit=1), waiting until
|
||||
# we get either table1 or table2.
|
||||
# Set found_table to the table found, other_table to the one not
|
||||
# yet found, and last_arn to LastEvaluatedStreamArn where we stopped.
|
||||
response = dynamodbstreams.list_streams(Limit=1)
|
||||
while True:
|
||||
assert 'Streams' in response
|
||||
assert len(response['Streams']) == 1
|
||||
last_arn = response['LastEvaluatedStreamArn']
|
||||
if response['Streams'][0]['TableName'] == table1.name:
|
||||
found_table, other_table = table1, table2
|
||||
break
|
||||
if response['Streams'][0]['TableName'] == table2.name:
|
||||
found_table, other_table = table2, table1
|
||||
break
|
||||
response = dynamodbstreams.list_streams(
|
||||
Limit=1, ExclusiveStartStreamArn=last_arn)
|
||||
# Delete found_table, the one which last_arn that we are holding
|
||||
# refers to.
|
||||
found_table.delete()
|
||||
found_table.meta.client.get_waiter('table_not_exists').wait(TableName=found_table.name)
|
||||
# Try to continue the paging, with last_arn that now refers to the
|
||||
# already deleted table. We expect to be able to continue without
|
||||
# error, and see other_table which we haven't seen yet (and not
|
||||
# see found_table again, of course). Other than that, it's not clear
|
||||
# what we expect to see - it would be nice not to have the list
|
||||
# restarted from scratch, but this test doesn't rule this
|
||||
# implementation out (and I'm not sure we should rule it out).
|
||||
response = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=last_arn)
|
||||
found_both = False
|
||||
while True:
|
||||
assert 'Streams' in response
|
||||
if response['Streams']:
|
||||
assert len(response['Streams']) == 1
|
||||
assert response['Streams'][0]['TableName'] != found_table.name
|
||||
if response['Streams'][0]['TableName'] == other_table.name:
|
||||
found_both = True
|
||||
if 'LastEvaluatedStreamArn' not in response:
|
||||
break
|
||||
response = dynamodbstreams.list_streams(
|
||||
Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn'])
|
||||
assert found_both
|
||||
# When we'll go out of scope on create_stream_test_table that
|
||||
# create found_table - which we deleted - it will expect this
|
||||
# table to exist and fail when it doesn't. So let's recreate the table.
|
||||
# It doesn't matter which schema we use, it just needs to exist.
|
||||
found_table = dynamodb.create_table(TableName=found_table.name,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ],
|
||||
BillingMode='PAY_PER_REQUEST')
|
||||
found_table.meta.client.get_waiter('table_exists').wait(TableName=found_table.name)
|
||||
|
||||
|
||||
# ListStreams with paging should be able correctly return a full list of
|
||||
# pre-existing streams even if additional tables were added between pages
|
||||
# and caused Scylla's hash table of tables to be reorganized.
|
||||
@@ -294,6 +425,36 @@ def test_describe_nonexistent_stream(dynamodb, dynamodbstreams):
|
||||
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
|
||||
dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf')
|
||||
|
||||
# test_describe_nonexistent_stream checked the case where a StreamArn is
|
||||
# completely bogus. Here we want to check well-formed names that point to
|
||||
# table names that do not exist. We should return ResourceNotFoundException
|
||||
# in this case, not ValidationException.
|
||||
# To reduce our assumptions of what valid ARNs look like, we don't hardcode
|
||||
# any ARN formats here. Instead, we create a stream, get its real ARN, and
|
||||
# then modify it to point to a non-existent table.
|
||||
def test_describe_stream_nonexistent_table(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
||||
streams = dynamodbstreams.list_streams(TableName=table.name)
|
||||
arn = streams['Streams'][0]['StreamArn']
|
||||
# Replace the table name in the ARN with one that doesn't exist.
|
||||
# In both DynamoDB and Alternator, the table name appears in the ARN
|
||||
# at least once, and replacing it yields a well-formed but non-existent
|
||||
# ARN.
|
||||
# In Alternator, the table name appears twice - in both keyspace and
|
||||
# CDC log name parts - so we want to try both replacements to verify
|
||||
# that both incorrect keyspace name and incorrect table names are
|
||||
# handled.
|
||||
nonexistent = table.name + '_nonexistent_' + random_string()
|
||||
count = arn.count(table.name)
|
||||
assert count >= 1 # sanity check that the name appeared in the ARN at all
|
||||
for i in range(count):
|
||||
# Replace only the i-th occurrence of table.name.
|
||||
parts = arn.split(table.name)
|
||||
modified_arn = table.name.join(parts[:i+1]) + nonexistent + table.name.join(parts[i+1:])
|
||||
assert modified_arn != arn
|
||||
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
||||
dynamodbstreams.describe_stream(StreamArn=modified_arn)
|
||||
|
||||
def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
|
||||
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
||||
streams = dynamodbstreams.list_streams(TableName=table.name)
|
||||
|
||||
@@ -33,17 +33,20 @@ def get_base_table(cql, table_id):
|
||||
|
||||
# validate that streams tables are synchronized with tablets count - all new cdc shards have been created
|
||||
def assert_number_of_streams_is_equal_to_number_of_tablets(rest_api, cql, ks, table_name, cdc_log_table_name):
|
||||
# we'll try twice here, as in my tests occasionally i've got into a situation, where this function
|
||||
# failed, it seems streams were not yet fully updated (debug build).
|
||||
for x in range(0, 2):
|
||||
# Retry with exponential backoff, as on slow builds (e.g. debug aarch64)
|
||||
# the background CDC stream regeneration may take a while to catch up.
|
||||
sleep = 0.1
|
||||
for x in range(0, 7):
|
||||
# Don't sleep on the first try - we might be already in sync.
|
||||
if x > 0:
|
||||
time.sleep(sleep)
|
||||
sleep *= 2
|
||||
tablet_count = get_tablet_count_for_base_table_of_table(rest_api, cql, ks, cdc_log_table_name)
|
||||
ts = cql.execute(f"SELECT toUnixTimestamp(timestamp) AS ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='{table_name}' ORDER BY timestamp DESC LIMIT 1").one().ts
|
||||
CdcStreamState_CURRENT = 0 # from test.cluster.test_cdc_with_tablets.CdcStreamState.CURRENT
|
||||
count = cql.execute(f"SELECT count(*) FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='{table_name}' AND timestamp = {ts} AND stream_state = {CdcStreamState_CURRENT} limit 1").one().count
|
||||
if count == tablet_count:
|
||||
break
|
||||
# on debug occasionally we need more time
|
||||
time.sleep(0.1)
|
||||
assert count == tablet_count
|
||||
|
||||
# return tablet count for given cdc_log table (table that holds cdc data for given user table)
|
||||
|
||||
@@ -11,6 +11,7 @@ import requests
|
||||
from botocore.exceptions import ClientError
|
||||
from boto3.dynamodb.conditions import Key
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
from .util import full_scan, scylla_config_read, scylla_config_temporary
|
||||
|
||||
internal_prefix = '.scylla.alternator.'
|
||||
@@ -175,7 +176,7 @@ def test_write_to_config(scylla_only, dynamodb):
|
||||
print(str(e))
|
||||
print('alternator_allow_system_table_write' in str(e))
|
||||
if 'alternator_allow_system_table_write' in str(e):
|
||||
pytest.skip('need alternator_allow_system_table_write=true')
|
||||
skip_env('need alternator_allow_system_table_write=true')
|
||||
else:
|
||||
raise
|
||||
try:
|
||||
|
||||
@@ -13,6 +13,8 @@ import time
|
||||
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
from packaging.version import Version
|
||||
|
||||
from test.alternator.util import multiset, create_test_table, unique_table_name, random_string
|
||||
@@ -139,7 +141,7 @@ def test_table_tags(dynamodb):
|
||||
# so older versions of the library cannot run this test.
|
||||
import botocore
|
||||
if (Version(botocore.__version__) < Version('1.12.136')):
|
||||
pytest.skip("Botocore version 1.12.136 or above required to run this test")
|
||||
skip_env("Botocore version 1.12.136 or above required to run this test")
|
||||
|
||||
table = create_test_table(dynamodb,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||||
@@ -220,7 +222,7 @@ def test_too_long_tags_from_creation(dynamodb):
|
||||
# so older versions of the library cannot run this test.
|
||||
import botocore
|
||||
if (Version(botocore.__version__) < Version('1.12.136')):
|
||||
pytest.skip("Botocore version 1.12.136 or above required to run this test")
|
||||
skip_env("Botocore version 1.12.136 or above required to run this test")
|
||||
name = unique_table_name()
|
||||
# Setting 100 tags is not allowed, the following table creation should fail:
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
@@ -245,7 +247,7 @@ def test_forbidden_tags_from_creation(scylla_only, dynamodb):
|
||||
# so older versions of the library cannot run this test.
|
||||
import botocore
|
||||
if (Version(botocore.__version__) < Version('1.12.136')):
|
||||
pytest.skip("Botocore version 1.12.136 or above required to run this test")
|
||||
skip_env("Botocore version 1.12.136 or above required to run this test")
|
||||
name = unique_table_name()
|
||||
# It is not allowed to set the system:write_isolation to "dog", so the
|
||||
# following table creation should fail:
|
||||
|
||||
@@ -22,12 +22,13 @@ import pytest
|
||||
import requests
|
||||
|
||||
from test.alternator.util import random_string, full_scan, full_query, create_test_table, scylla_config_temporary
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
|
||||
# The "with_tracing" fixture ensures that tracing is enabled throughout
|
||||
# the run of a test function, and disabled when it ends. If tracing cannot be
|
||||
# enabled, the test is pytest.skip()ed. This will of course happens if we run
|
||||
# the test with "--aws" (tracing is a Scylla-only feature).
|
||||
# enabled, the test is skipped via skip_env(). This will of course happens
|
||||
# if we run the test with "--aws" (tracing is a Scylla-only feature).
|
||||
# Note that to support (in the future) the ability for Alternator tests to
|
||||
# run in parallel, the tests here need to be prepared that completely
|
||||
# unrelated requests get traced during a test with with_tracing.
|
||||
@@ -35,15 +36,15 @@ from test.alternator.util import random_string, full_scan, full_query, create_te
|
||||
def with_tracing(rest_api):
|
||||
probability_resp = requests.get(rest_api+'/storage_service/trace_probability')
|
||||
if probability_resp.status_code != 200:
|
||||
pytest.skip('Failed to fetch tracing probability')
|
||||
skip_env('Failed to fetch tracing probability')
|
||||
probability = probability_resp.text
|
||||
response = requests.post(rest_api+'/storage_service/trace_probability?probability=1')
|
||||
if response.status_code != 200:
|
||||
pytest.skip('Failed to enable tracing')
|
||||
skip_env('Failed to enable tracing')
|
||||
# verify that tracing is really enabled
|
||||
response = requests.get(rest_api+'/storage_service/trace_probability')
|
||||
if response.status_code != 200 or response.content.decode('utf-8') != '1':
|
||||
pytest.skip('Failed to verify tracing')
|
||||
skip_env('Failed to verify tracing')
|
||||
yield
|
||||
print("with_tracing restoring tracing")
|
||||
response = requests.post(rest_api+'/storage_service/trace_probability?probability='+probability)
|
||||
@@ -51,7 +52,7 @@ def with_tracing(rest_api):
|
||||
pytest.fail('Failed to disable tracing after with_tracing test')
|
||||
response = requests.get(rest_api+'/storage_service/trace_probability')
|
||||
if response.status_code != 200 or response.content.decode('utf-8') != '0':
|
||||
pytest.skip('Failed to verify tracing disabled')
|
||||
skip_env('Failed to verify tracing disabled')
|
||||
|
||||
# Similarly to the fixture above, slow query logging is enabled only for the run of the
|
||||
# test function. Slow logging is set up with threshold equal to 0 microseconds,
|
||||
@@ -61,22 +62,22 @@ def with_slow_query_logging(rest_api):
|
||||
print("with_slow_query_logging enabling slow query logging")
|
||||
slow_query_info = requests.get(rest_api+'/storage_service/slow_query')
|
||||
if slow_query_info.status_code != 200:
|
||||
pytest.skip('Failed to fetch slow query logging info')
|
||||
skip_env('Failed to fetch slow query logging info')
|
||||
slow_query_json = json.loads(slow_query_info.text)
|
||||
print(slow_query_json)
|
||||
response = requests.post(rest_api+'/storage_service/slow_query?enable=true')
|
||||
if response.status_code != 200:
|
||||
pytest.skip('Failed to enable slow query logging')
|
||||
skip_env('Failed to enable slow query logging')
|
||||
response = requests.post(rest_api+'/storage_service/slow_query?threshold=0')
|
||||
if response.status_code != 200:
|
||||
pytest.skip('Failed to enable slow query logging threshold')
|
||||
skip_env('Failed to enable slow query logging threshold')
|
||||
# verify that logging is really enabled
|
||||
response = requests.get(rest_api+'/storage_service/slow_query')
|
||||
if response.status_code != 200:
|
||||
pytest.skip('Failed to verify slow query logging')
|
||||
skip_env('Failed to verify slow query logging')
|
||||
response_json = json.loads(response.text)
|
||||
if response_json['enable'] != True or response_json['threshold'] != 0:
|
||||
pytest.skip('Failed to verify slow query logging values')
|
||||
skip_env('Failed to verify slow query logging values')
|
||||
print(response_json)
|
||||
yield
|
||||
print("with_slow_query_logging restoring slow query logging")
|
||||
|
||||
@@ -13,6 +13,7 @@ from decimal import Decimal
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.pylib.skip_types import skip_bug, skip_env
|
||||
from .util import new_test_table, random_string, full_query, unique_table_name, is_aws, client_no_transform, multiset, scylla_config_read
|
||||
|
||||
# The following fixture is to ensure that Alternator TTL is being tested with both vnodes and tablets.
|
||||
@@ -70,12 +71,12 @@ def waits_for_expiration(dynamodb, request):
|
||||
if request.config.getoption('runveryslow'):
|
||||
return
|
||||
else:
|
||||
pytest.skip('need --runveryslow option to run')
|
||||
skip_env('need --runveryslow option to run')
|
||||
period = scylla_config_read(dynamodb, 'alternator_ttl_period_in_seconds')
|
||||
assert period is not None
|
||||
period = float(period)
|
||||
if period > 1 and not request.config.getoption('runveryslow'):
|
||||
pytest.skip('need --runveryslow option to run')
|
||||
skip_env('need --runveryslow option to run')
|
||||
|
||||
# The veryslow_on_aws says that this test is very slow on AWS, but
|
||||
# always reasonably fast on Scylla. If fastness on Scylla requires a
|
||||
@@ -84,7 +85,7 @@ def waits_for_expiration(dynamodb, request):
|
||||
@pytest.fixture(scope="module")
|
||||
def veryslow_on_aws(dynamodb, request):
|
||||
if is_aws(dynamodb) and not request.config.getoption('runveryslow'):
|
||||
pytest.skip('need --runveryslow option to run')
|
||||
skip_env('need --runveryslow option to run')
|
||||
|
||||
# Test the DescribeTimeToLive operation on a table where the time-to-live
|
||||
# feature was *not* enabled.
|
||||
@@ -655,6 +656,12 @@ def test_ttl_expiration_lsi_key(dynamodb, waits_for_expiration):
|
||||
# content), and a special userIdentity flag saying that this is not a regular
|
||||
# REMOVE but an expiration. Reproduces issue #11523.
|
||||
def test_ttl_expiration_streams(dynamodb, dynamodbstreams, waits_for_expiration):
|
||||
# Alternator Streams currently doesn't work with tablets, so until
|
||||
# #23838 is solved, skip this test on tablets.
|
||||
for tag in TAGS:
|
||||
if tag['Key'] == 'system:initial_tablets' and tag['Value'].isdigit():
|
||||
skip_bug("Streams test skipped on tablets due to #23838")
|
||||
|
||||
# In my experiments, a 30-minute (1800 seconds) is the typical
|
||||
# expiration delay in this test. If the test doesn't finish within
|
||||
# max_duration, we report a failure.
|
||||
|
||||
@@ -18,6 +18,7 @@ from botocore.exceptions import ClientError
|
||||
import boto3.dynamodb.types
|
||||
|
||||
from .util import random_string, new_test_table, unique_table_name, scylla_config_read, scylla_config_write, client_no_transform, is_aws
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
# Monkey-patch the boto3 library to stop doing its own error-checking on
|
||||
# numbers. This works around a bug https://github.com/boto/boto3/issues/2500
|
||||
@@ -41,7 +42,7 @@ boto3.dynamodb.types.DYNAMODB_CONTEXT = decimal.Context(prec=100)
|
||||
@pytest.fixture(scope="module")
|
||||
def vs(new_dynamodb_session, dynamodb):
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only: vector search extensions not available on DynamoDB')
|
||||
skip_env('Scylla-only: vector search extensions not available on DynamoDB')
|
||||
resource = new_dynamodb_session()
|
||||
client = resource.meta.client
|
||||
# Patch the client to support the new APIs:
|
||||
@@ -1039,7 +1040,7 @@ def vector_store_configured(table_vs):
|
||||
@pytest.fixture(scope="module")
|
||||
def needs_vector_store(table_vs):
|
||||
if not vector_store_configured(table_vs):
|
||||
pytest.skip('Vector Store is not configured (run with --vs)')
|
||||
skip_env('Vector Store is not configured (run with --vs)')
|
||||
|
||||
# The context manager unconfigured_vector_store() temporarily (for the
|
||||
# duration of the "with" block) un-configures the vector store in Scylla -
|
||||
@@ -1218,7 +1219,7 @@ def test_wait_for_vector_index_active(vs, needs_vector_store):
|
||||
# and this test used to fail before this was fixed.
|
||||
# To save a bit of time, we don't test all combinations of hash and range
|
||||
# key types but test each type at least once as a hash key and a range key.
|
||||
@pytest.mark.skip(reason="Bug in vector store for non-string keys, fails very slowly so let's skip")
|
||||
@pytest.mark.skip_bug(reason="Bug in vector store for non-string keys, fails very slowly so let's skip")
|
||||
@pytest.mark.parametrize('hash_type,range_type', [
|
||||
('N', None), ('B', None), ('S', 'N'), ('S', 'B'),
|
||||
], ids=[
|
||||
@@ -1654,7 +1655,7 @@ def test_deleteitem_vectorindex(vs, needs_vector_store, with_ck):
|
||||
def test_vector_with_ttl(vs, needs_vector_store, have_ck):
|
||||
period = scylla_config_read(vs, 'alternator_ttl_period_in_seconds')
|
||||
if period is None or float(period) > 1:
|
||||
pytest.skip('need alternator_ttl_period_in_seconds <= 1 to run this test quickly')
|
||||
skip_env('need alternator_ttl_period_in_seconds <= 1 to run this test quickly')
|
||||
key_schema = [{'AttributeName': 'p', 'KeyType': 'HASH'}]
|
||||
attr_defs = [{'AttributeName': 'p', 'AttributeType': 'S'}]
|
||||
if have_ck:
|
||||
|
||||
@@ -14,6 +14,8 @@ import pytest
|
||||
from contextlib import contextmanager
|
||||
from botocore.hooks import HierarchicalEmitter
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
# The "pytest-randomly" pytest plugins modifies the default "random" to repeat
|
||||
# the same pseudo-random sequence (with the same seed) in each separate test.
|
||||
# But we currently rely on random_string() at al. to return unique keys that
|
||||
@@ -247,7 +249,7 @@ def scylla_inject_error(rest_api, err, one_shot=False):
|
||||
response = requests.get(f'{rest_api}/v2/error_injection/injection')
|
||||
print("Enabled error injections:", response.content.decode('utf-8'))
|
||||
if response.content.decode('utf-8') == "[]":
|
||||
pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")
|
||||
skip_env("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
|
||||
@@ -2179,13 +2179,17 @@ SEASTAR_TEST_CASE(test_commitlog_buffer_size_counter) {
|
||||
SEASTAR_TEST_CASE(test_commitlog_handle_replayed_segments) {
|
||||
commitlog::config cfg;
|
||||
|
||||
constexpr uint64_t max_size_mb = 8 * 1024;
|
||||
// Keep max_size_mb small: the test creates (4 + max_size_mb/seg_size) files
|
||||
// of seg_size each per iteration. With a large value (e.g. 8*1024) this
|
||||
// results in hundreds of 32 MB fallocate calls per iteration which can
|
||||
// easily exceed the CI timeout of 15 minutes (SCYLLADB-1496).
|
||||
constexpr uint64_t max_size_mb = 128;
|
||||
|
||||
cfg.commitlog_total_space_in_mb = max_size_mb * smp::count;
|
||||
cfg.allow_going_over_size_limit = false;
|
||||
cfg.commitlog_sync_period_in_ms = 1;
|
||||
|
||||
for (size_t k = 0; k < 100; ++k) {
|
||||
for (size_t k = 0; k < 10; ++k) {
|
||||
tmpdir tmp;
|
||||
cfg.commit_log_location = tmp.path().string();
|
||||
|
||||
|
||||
@@ -104,6 +104,7 @@ public:
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
|
||||
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
||||
virtual bool skip_memtable_for_tombstone_gc() const noexcept override { return false; }
|
||||
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
|
||||
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
|
||||
virtual compaction::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }
|
||||
|
||||
@@ -89,16 +89,7 @@ static future<> create_object_of_size(storage::client& c
|
||||
co_await sink.close();
|
||||
}
|
||||
|
||||
static future<> compare_object_data(const local_gcs_wrapper& env, std::string_view object_name, std::vector<temporary_buffer<char>>&& bufs) {
|
||||
auto& c = env.client();
|
||||
auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) {
|
||||
return s + buf.size();
|
||||
});
|
||||
|
||||
auto source = c.create_download_source(env.bucket, object_name);
|
||||
auto is1 = seastar::input_stream<char>(std::move(source));
|
||||
auto is2 = seastar::input_stream<char>(create_memory_source(std::move(bufs)));
|
||||
|
||||
static future<> compare_stream_data(seastar::input_stream<char>& is1, seastar::input_stream<char>& is2, size_t total) {
|
||||
uint64_t read = 0;
|
||||
while (!is1.eof()) {
|
||||
auto buf = co_await is1.read();
|
||||
@@ -113,6 +104,23 @@ static future<> compare_object_data(const local_gcs_wrapper& env, std::string_vi
|
||||
BOOST_REQUIRE_EQUAL(read, total);
|
||||
}
|
||||
|
||||
static std::tuple<seastar::input_stream<char>, size_t> stream_from_buffers(std::vector<temporary_buffer<char>>&& bufs) {
|
||||
auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) {
|
||||
return s + buf.size();
|
||||
});
|
||||
auto is = seastar::input_stream<char>(create_memory_source(std::move(bufs)));
|
||||
return std::make_tuple(std::move(is), total);
|
||||
}
|
||||
|
||||
static future<> compare_object_data(const local_gcs_wrapper& env, std::string_view object_name, std::vector<temporary_buffer<char>>&& bufs) {
|
||||
auto& c = env.client();
|
||||
auto source = c.create_download_source(env.bucket, object_name);
|
||||
auto is1 = seastar::input_stream<char>(std::move(source));
|
||||
auto [is2, total] = stream_from_buffers(std::move(bufs));
|
||||
|
||||
co_await compare_stream_data(is1, is2, total);
|
||||
}
|
||||
|
||||
using namespace std::string_literals;
|
||||
static constexpr auto prefix = "bork/ninja/"s;
|
||||
|
||||
@@ -277,4 +285,34 @@ SEASTAR_FIXTURE_TEST_CASE(test_merge_objects, local_gcs_wrapper, *check_gcp_stor
|
||||
co_await compare_object_data(env, name, std::move(bufs));
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_FIXTURE_TEST_CASE(test_gcp_storage_read_large_object_iov, local_gcs_wrapper, *check_gcp_storage_test_enabled()) {
|
||||
auto& c = client();
|
||||
auto name = make_name();
|
||||
std::vector<temporary_buffer<char>> written;
|
||||
|
||||
auto dest_size = 32*1024*1024 + 357 + 1022*67;
|
||||
|
||||
// ensure we remove the object
|
||||
objects_to_delete.emplace_back(name);
|
||||
co_await create_object_of_size(c, bucket, name, dest_size, &written);
|
||||
auto source = c.create_download_source(bucket, name);
|
||||
auto f = create_file_for_seekable_source(std::move(source));
|
||||
auto [is2, total] = stream_from_buffers(std::move(std::move(written)));
|
||||
std::vector<temporary_buffer<char>> bufs;
|
||||
std::vector<iovec> vecs;
|
||||
for (size_t i = 0; i < total; ) {
|
||||
auto n = std::min(total - i, size_t(8192));
|
||||
bufs.emplace_back(n);
|
||||
vecs.emplace_back(iovec{ .iov_base = bufs.back().get_write(), .iov_len = n });
|
||||
i += n;
|
||||
}
|
||||
auto read = co_await f.dma_read(0, std::move(vecs));
|
||||
BOOST_REQUIRE_EQUAL(read, total);
|
||||
auto [is1, total2] = stream_from_buffers(std::move(std::move(bufs)));
|
||||
BOOST_REQUIRE_EQUAL(total, total2);
|
||||
co_await compare_stream_data(is1, is2, total);
|
||||
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -548,7 +548,7 @@ SEASTAR_THREAD_TEST_CASE(test_read_all) {
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
env.db().invoke_on_all([] (replica::database& db) {
|
||||
db.set_querier_cache_entry_ttl(2s);
|
||||
db.set_querier_cache_entry_ttl(60s);
|
||||
}).get();
|
||||
|
||||
const auto ks = create_vnodes_keyspace(env);
|
||||
@@ -605,7 +605,7 @@ SEASTAR_THREAD_TEST_CASE(test_read_all_multi_range) {
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
env.db().invoke_on_all([] (replica::database& db) {
|
||||
db.set_querier_cache_entry_ttl(2s);
|
||||
db.set_querier_cache_entry_ttl(60s);
|
||||
}).get();
|
||||
|
||||
const auto ks = create_vnodes_keyspace(env);
|
||||
@@ -667,7 +667,7 @@ SEASTAR_THREAD_TEST_CASE(test_read_with_partition_row_limits) {
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
env.db().invoke_on_all([] (replica::database& db) {
|
||||
db.set_querier_cache_entry_ttl(2s);
|
||||
db.set_querier_cache_entry_ttl(60s);
|
||||
}).get();
|
||||
|
||||
const auto ks = create_vnodes_keyspace(env);
|
||||
|
||||
@@ -5171,8 +5171,16 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
// Use a fixed partition key. The row-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5244,8 +5252,16 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5264,7 +5280,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
for (auto idx = 0; idx < rows - 1; idx++) {
|
||||
@@ -5326,7 +5341,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
int live_rows = 0;
|
||||
@@ -5436,7 +5450,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
|
||||
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
std::map<bytes, bytes> kv_map;
|
||||
for (auto i = 0; i < elements; i++) {
|
||||
@@ -5512,7 +5525,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
|
||||
// Create a mutation with a clustering row whose serialized cell value
|
||||
// exceeds the 1-byte thresholds, so partition_size, row_size, and
|
||||
// cell_size records are all generated.
|
||||
// Use make_pkey() (no argument) to generate a key on this shard.
|
||||
auto pk = ss.make_pkey();
|
||||
mutation m(s, pk);
|
||||
auto ck = ss.make_ckey("ck1");
|
||||
@@ -5622,7 +5634,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
|
||||
// Create 6 partitions, each with one row of increasing size.
|
||||
// Since each partition has exactly one row, we get 6 row_size records
|
||||
// competing for 3 slots.
|
||||
// Use make_pkeys() to generate shard-local keys.
|
||||
auto pkeys = ss.make_pkeys(6);
|
||||
utils::chunked_vector<mutation> muts;
|
||||
for (int i = 0; i < 6; i++) {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1997,10 +1997,10 @@ static
|
||||
future<> apply_plan(token_metadata& tm, const migration_plan& plan, service::topology& topology, shared_load_stats* load_stats) {
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
co_await tm.tablets().mutate_tablet_map_async(mig.tablet.table, [&] (tablet_map& tmap) {
|
||||
if (load_stats) {
|
||||
if (load_stats && mig.src && mig.dst) {
|
||||
global_tablet_id gid {mig.tablet.table, mig.tablet.tablet};
|
||||
dht::token_range trange {tmap.get_token_range(mig.tablet.tablet)};
|
||||
auto new_stats = load_stats->stats.migrate_tablet_size(mig.src.host, mig.dst.host, gid, trange);
|
||||
auto new_stats = load_stats->stats.migrate_tablet_size(mig.src->host, mig.dst->host, gid, trange);
|
||||
if (new_stats) {
|
||||
load_stats->stats = std::move(*new_stats);
|
||||
}
|
||||
@@ -2661,13 +2661,13 @@ SEASTAR_THREAD_TEST_CASE(test_rack_list_conversion) {
|
||||
for (auto& mig : plan.migrations()) {
|
||||
testlog.info("Rack list colocation migration: {}", mig);
|
||||
BOOST_REQUIRE(mig.kind == locator::tablet_transition_kind::migration);
|
||||
BOOST_REQUIRE(mig.src.host == host3 || mig.src.host == host4);
|
||||
if (mig.src.host == host3) {
|
||||
BOOST_REQUIRE(mig.src && (mig.src->host == host3 || mig.src->host == host4));
|
||||
if (mig.src->host == host3) {
|
||||
BOOST_REQUIRE(mig.tablet.tablet == A);
|
||||
BOOST_REQUIRE(mig.dst.host == host5 || mig.dst.host == host6);
|
||||
BOOST_REQUIRE(mig.dst && (mig.dst->host == host5 || mig.dst->host == host6));
|
||||
} else {
|
||||
BOOST_REQUIRE(mig.tablet.tablet == B);
|
||||
BOOST_REQUIRE(mig.dst.host == host1 || mig.dst.host == host2);
|
||||
BOOST_REQUIRE(mig.dst && (mig.dst->host == host1 || mig.dst->host == host2));
|
||||
}
|
||||
}
|
||||
}).get();
|
||||
@@ -2745,8 +2745,9 @@ SEASTAR_THREAD_TEST_CASE(test_colocation_skipped_on_excluded_nodes) {
|
||||
rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats(), [&] (const migration_plan& plan) {
|
||||
// Verify that only rebuilding migrations involve the excluded host.
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
BOOST_REQUIRE_NE(mig.dst.host, host1);
|
||||
if (mig.src.host == host1) {
|
||||
BOOST_REQUIRE(mig.src && mig.dst);
|
||||
BOOST_REQUIRE_NE(mig.dst->host, host1);
|
||||
if (mig.src->host == host1) {
|
||||
BOOST_REQUIRE(mig.kind == tablet_transition_kind::rebuild_v2);
|
||||
}
|
||||
}
|
||||
@@ -2790,8 +2791,9 @@ SEASTAR_THREAD_TEST_CASE(test_no_intranode_migration_on_draining_node) {
|
||||
rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats(), [&] (const migration_plan& plan) {
|
||||
// Verify no intra-node migrations on the draining host.
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
if (mig.src.host == host1) {
|
||||
BOOST_REQUIRE_NE(mig.dst.host, host1);
|
||||
BOOST_REQUIRE(mig.src && mig.dst);
|
||||
if (mig.src->host == host1) {
|
||||
BOOST_REQUIRE_NE(mig.dst->host, host1);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
@@ -4945,7 +4947,8 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_ignores_hosts_with_incomplete_stats)
|
||||
BOOST_REQUIRE(!plan.empty());
|
||||
BOOST_REQUIRE(!plan.migrations().empty());
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
BOOST_REQUIRE_EQUAL(mig.src.host, host2);
|
||||
BOOST_REQUIRE(mig.src);
|
||||
BOOST_REQUIRE_EQUAL(mig.src->host, host2);
|
||||
}
|
||||
}
|
||||
}).get();
|
||||
|
||||
@@ -58,7 +58,7 @@ async def get_ready_maintenance_session(socket_path: str, timeout: int = 60):
|
||||
session.execute("SELECT key FROM system.local LIMIT 1")
|
||||
return session
|
||||
except Exception:
|
||||
c.shutdown()
|
||||
safe_driver_shutdown(c)
|
||||
return None
|
||||
|
||||
session = await wait_for(try_connect, deadline)
|
||||
@@ -90,7 +90,7 @@ async def connect_with_credentials(ip: str, username: str, password: str, timeou
|
||||
try:
|
||||
return c.connect()
|
||||
except NoHostAvailable:
|
||||
c.shutdown()
|
||||
safe_driver_shutdown(c)
|
||||
return None
|
||||
return await wait_for(try_connect, time.time() + timeout)
|
||||
|
||||
@@ -240,7 +240,7 @@ async def test_no_default_superuser_maintenance_socket_ops(manager: ManagerClien
|
||||
except Unauthorized:
|
||||
return True
|
||||
finally:
|
||||
c.shutdown()
|
||||
safe_driver_shutdown(c)
|
||||
|
||||
await wait_for(check_superuser_revoked, time.time() + 60)
|
||||
|
||||
@@ -257,11 +257,11 @@ async def test_no_default_superuser_maintenance_socket_ops(manager: ManagerClien
|
||||
auth_provider=PlainTextAuthProvider(username=new_role, password=new_role_password))
|
||||
try:
|
||||
c.connect()
|
||||
c.shutdown()
|
||||
return None # Still cached, retry
|
||||
except NoHostAvailable:
|
||||
c.shutdown()
|
||||
return True
|
||||
finally:
|
||||
safe_driver_shutdown(c)
|
||||
|
||||
await wait_for(check_role_dropped, time.time() + 60)
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
from cassandra.protocol import ResultMessage
|
||||
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.manager_client import ManagerClient, safe_driver_shutdown
|
||||
from test.pylib.util import unique_name
|
||||
|
||||
|
||||
@@ -138,7 +138,7 @@ def _prepare_and_execute(host: str, query: str) -> tuple[bytes, bool, int]:
|
||||
return prepared_metadata_id, captured["metadata_changed"], len(rows)
|
||||
finally:
|
||||
session.shutdown()
|
||||
cluster.shutdown()
|
||||
safe_driver_shutdown(cluster)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -532,3 +532,115 @@ async def test_anonymous_user(manager: ManagerClient) -> None:
|
||||
return
|
||||
|
||||
assert False, f"None of clients use sl:default, rows={rows}"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_per_service_level_cql_requests_serving(manager: ManagerClient) -> None:
|
||||
"""Test that the per-service-level cql_requests_serving metric correctly
|
||||
reflects the number of in-flight CQL requests for each service level.
|
||||
|
||||
Uses error injection to pause requests mid-flight, then verifies the gauge
|
||||
shows exactly the right count per scheduling group. Sends two requests on
|
||||
a custom service level and one on sl:default (the cassandra superuser) to
|
||||
verify both per-SL counters independently.
|
||||
"""
|
||||
cmdline = ['--logger-log-level', 'debug_error_injection=debug']
|
||||
server = await manager.server_add(config=auth_config, cmdline=cmdline)
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
|
||||
# Create one new service level and one new user attached to it.
|
||||
# The cassandra superuser already uses sl:default.
|
||||
sl_a = unique_name()
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}")
|
||||
await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true")
|
||||
await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a")
|
||||
|
||||
# Open dedicated driver sessions for user_a and the cassandra superuser.
|
||||
# Disable schema and token metadata refresh on both to prevent background
|
||||
# driver queries from interfering with the error injection and metrics.
|
||||
cluster_a = manager.con_gen([server.ip_addr],
|
||||
manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a'))
|
||||
cluster_a.schema_metadata_enabled = False
|
||||
cluster_a.token_metadata_enabled = False
|
||||
session_a = cluster_a.connect()
|
||||
|
||||
cluster_default = manager.con_gen([server.ip_addr],
|
||||
manager.port, manager.use_ssl, PlainTextAuthProvider(username='cassandra', password='cassandra'))
|
||||
cluster_default.schema_metadata_enabled = False
|
||||
cluster_default.token_metadata_enabled = False
|
||||
session_default = cluster_default.connect()
|
||||
|
||||
try:
|
||||
# Warm up both sessions to ensure the driver has established connections
|
||||
# before enabling injection. Without this, execute() may fail with
|
||||
# NoHostAvailable and the query never reaches the server.
|
||||
session_a.execute("SELECT key FROM system.local")
|
||||
session_default.execute("SELECT key FROM system.local")
|
||||
|
||||
# Enable error injection to pause CQL requests.
|
||||
await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False)
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
|
||||
# Send two requests from user_a (sl:sl_a) and one from the cassandra
|
||||
# superuser (sl:default). Each pauses at the injection point, keeping
|
||||
# the cql_requests_serving gauge incremented.
|
||||
mark = await log.mark()
|
||||
task_a1 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
|
||||
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
|
||||
|
||||
mark = await log.mark()
|
||||
task_a2 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
|
||||
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
|
||||
|
||||
mark = await log.mark()
|
||||
task_default = asyncio.ensure_future(asyncio.to_thread(session_default.execute, "SELECT * FROM system.local"))
|
||||
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
|
||||
|
||||
# All three requests are now paused. Verify the per-SL gauges.
|
||||
# Use >= because a stray request from the manager's CQL session
|
||||
# (whose metadata refresh we cannot disable) could also get paused,
|
||||
# inflating the sl:default counter.
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
serving_a = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': f'sl:{sl_a}'})
|
||||
serving_default = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': 'sl:default'})
|
||||
assert serving_a >= 2, \
|
||||
f"Expected at least 2 in-flight requests for sl:{sl_a}, got {serving_a}"
|
||||
assert serving_default >= 1, \
|
||||
f"Expected at least 1 in-flight request for sl:default, got {serving_default}"
|
||||
|
||||
# Release paused requests by sending messages. Each message_injection
|
||||
# call sends one message per shard, waking one handler per shard.
|
||||
# Send enough to cover our three requests plus any stray ones.
|
||||
total_paused = int(serving_a + serving_default)
|
||||
for _ in range(total_paused):
|
||||
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
|
||||
# Wait for all requests to complete.
|
||||
await task_a1
|
||||
await task_a2
|
||||
await task_default
|
||||
|
||||
# Disable injection before checking metrics to avoid pausing any
|
||||
# stray requests that might arrive.
|
||||
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
|
||||
# Verify gauges are back to zero. Use a retry loop because a stray
|
||||
# request from the manager's CQL session may be briefly in-flight.
|
||||
async def metrics_are_zero():
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
serving_a = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': f'sl:{sl_a}'})
|
||||
serving_default = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': 'sl:default'})
|
||||
assert serving_a == 0, \
|
||||
f"Expected 0 in-flight requests for sl:{sl_a} after release, got {serving_a}"
|
||||
assert serving_default == 0, \
|
||||
f"Expected 0 in-flight requests for sl:default after release, got {serving_default}"
|
||||
return True
|
||||
await wait_for(metrics_are_zero, deadline=time.time() + 60)
|
||||
finally:
|
||||
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
safe_driver_shutdown(cluster_a)
|
||||
safe_driver_shutdown(cluster_default)
|
||||
|
||||
@@ -14,7 +14,7 @@ from unittest import mock
|
||||
from cassandra.cluster import Cluster, DefaultConnection, NoHostAvailable
|
||||
from cassandra import connection
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.manager_client import ManagerClient, safe_driver_shutdown
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -51,7 +51,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
|
||||
# We expect failure or timeout
|
||||
pass
|
||||
finally:
|
||||
c.shutdown()
|
||||
safe_driver_shutdown(c)
|
||||
|
||||
def attempt_good_connection():
|
||||
nonlocal connections_observed
|
||||
@@ -66,7 +66,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
|
||||
if count >= num_connections/2:
|
||||
connections_observed = True
|
||||
finally:
|
||||
c.shutdown()
|
||||
safe_driver_shutdown(c)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
|
||||
@@ -18,8 +18,9 @@ from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from multiprocessing import Event
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from test import TOP_SRC_DIR, path_to
|
||||
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.skip_types import skip_env
|
||||
from test.pylib.util import unique_name
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.async_cql import run_async
|
||||
@@ -381,7 +382,7 @@ async def prepare_3_racks_cluster(request, manager):
|
||||
@pytest.fixture(scope="function")
|
||||
def internet_dependency_enabled(request) -> None:
|
||||
if request.config.getoption('skip_internet_dependent_tests'):
|
||||
pytest.skip(reason="skip_internet_dependent_tests is set")
|
||||
skip_env(reason="skip_internet_dependent_tests is set")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -393,3 +394,8 @@ async def key_provider(request, tmpdir, scylla_binary):
|
||||
"""Encryption providers fixture"""
|
||||
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
|
||||
yield res
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def failure_detector_timeout(build_mode):
|
||||
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]
|
||||
|
||||
@@ -184,7 +184,7 @@ class TestBypassCache(Tester):
|
||||
session.execute(query.format(idx, varchar_c, varchar_v))
|
||||
return session
|
||||
|
||||
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/6045")
|
||||
@pytest.mark.skip_bug(reason="https://github.com/scylladb/scylladb/issues/6045")
|
||||
def test_range_scan_bypass_cache(self):
|
||||
session = self.insert_data_for_scan_range()
|
||||
node = self.cluster.nodelist()[0]
|
||||
|
||||
@@ -18,6 +18,7 @@ from ccmlib.scylla_cluster import ScyllaCluster
|
||||
from ccmlib.scylla_node import ScyllaNode
|
||||
|
||||
from dtest_class import Tester, create_cf, create_ks
|
||||
from test.pylib.skip_types import skip_env
|
||||
from tools.assertions import (
|
||||
assert_all,
|
||||
assert_almost_equal,
|
||||
@@ -712,7 +713,7 @@ class TestCommitLog(Tester):
|
||||
node1.stop(gently=False)
|
||||
|
||||
if self._get_commitlog_size()[0] > self.big_columns:
|
||||
pytest.skip("Skipping rollback scenario as data was written to commitlog before stopping node")
|
||||
skip_env("Skipping rollback scenario as data was written to commitlog before stopping node")
|
||||
|
||||
# restart the node
|
||||
node1.start(wait_for_binary_proto=True)
|
||||
|
||||
@@ -11,6 +11,8 @@ import pytest
|
||||
|
||||
from dtest_class import Tester, create_ks
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
# Those are ideal values according to c* specifications
|
||||
# they should pass
|
||||
@@ -271,7 +273,7 @@ class TestLimits(Tester):
|
||||
|
||||
def test_max_cells(self):
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
pytest.skip("client times out in debug mode")
|
||||
skip_env("client times out in debug mode")
|
||||
cluster = self.prepare()
|
||||
cluster.set_configuration_options(values={"query_tombstone_page_limit": 9999999, "batch_size_warn_threshold_in_kb": 1024 * 1024, "batch_size_fail_threshold_in_kb": 1024 * 1024, "commitlog_segment_size_in_mb": 64})
|
||||
cluster.populate(1).start(jvm_args=["--smp", "1", "--memory", "2G", "--logger-log-level", "lsa-timing=debug"])
|
||||
|
||||
@@ -234,7 +234,7 @@ class TestScrubIndexes(TestHelper):
|
||||
"tablets_initial_scale_factor": 1,
|
||||
}
|
||||
)
|
||||
cluster.populate(1).start(jvm_args=["--smp", "1"])
|
||||
cluster.populate(1).start(jvm_args=["--smp", "1"], wait_for_binary_proto=True)
|
||||
node1 = cluster.nodelist()[0]
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
@@ -261,7 +261,7 @@ class TestScrubIndexes(TestHelper):
|
||||
|
||||
# Restart and check data again
|
||||
cluster.stop()
|
||||
cluster.start()
|
||||
cluster.start(jvm_args=["--smp", "1"], wait_for_binary_proto=True)
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
session.execute("USE %s" % (KEYSPACE))
|
||||
|
||||
@@ -62,7 +62,7 @@ async def test_delete_partition_rows_from_table_with_mv(manager: ManagerClient)
|
||||
node_count = 2
|
||||
await manager.servers_add(node_count, config={'error_injections_at_startup': ['view_update_limit', 'delay_before_remote_view_update']})
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}") as ks:
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, PRIMARY KEY (key, c))")
|
||||
await insert_with_concurrency(cql, f"{ks}.tab", 200, 100)
|
||||
|
||||
|
||||
@@ -19,9 +19,9 @@ from cassandra.query import SimpleStatement # type: ignore
|
||||
@pytest.mark.skip_mode(mode='release', reason="error injections aren't enabled in release mode")
|
||||
async def test_mv_fail_building(manager: ManagerClient) -> None:
|
||||
node_count = 3
|
||||
servers = await manager.servers_add(node_count)
|
||||
servers = await manager.servers_add(node_count, auto_rack_dc="dc")
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}") as ks:
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, PRIMARY KEY (key, c))")
|
||||
# Insert initial rows for building an index
|
||||
for i in range(10):
|
||||
|
||||
@@ -77,6 +77,7 @@ async def test_staging_backlog_processed_after_restart(manager: ManagerClient):
|
||||
# Restart node0
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
await manager.server_start(servers[0].server_id)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Assert that node0 has no data for base table and MV
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
||||
@@ -84,6 +85,7 @@ async def test_staging_backlog_processed_after_restart(manager: ManagerClient):
|
||||
await assert_row_count_on_host(cql, hosts[0], ks, "tab", 0)
|
||||
await assert_row_count_on_host(cql, hosts[0], ks, "mv", 0)
|
||||
await manager.server_start(servers[1].server_id)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Repair the base table
|
||||
s0_log = await manager.server_open_log(servers[0].server_id)
|
||||
@@ -98,11 +100,13 @@ async def test_staging_backlog_processed_after_restart(manager: ManagerClient):
|
||||
await assert_row_count_on_host(cql, hosts[0], ks, "tab", 1000)
|
||||
await assert_row_count_on_host(cql, hosts[0], ks, "mv", 0)
|
||||
await manager.server_start(servers[1].server_id)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Restart node0 with staging backlog
|
||||
s0_mark = await s0_log.mark()
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
await manager.server_start(servers[0].server_id)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
await s0_log.wait_for(f"Processed {ks}.tab", from_mark=s0_mark, timeout=60)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user