Compare commits

..

9 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
5515d5fb7d test/alternator: fix delete_item_no_ts test, add LWT rejection tests for delete ops, simplify assertions, update docs
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-05 17:31:41 +00:00
copilot-swe-agent[bot]
328c263aed alternator: add custom timestamp support to DeleteItem and BatchWriteItem DeleteRequest
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-05 16:16:27 +00:00
copilot-swe-agent[bot]
a6d36a480d test/alternator: add explicit only_rmw_uses_lwt isolation to test_table_ts and test_table_ts_ss
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 22:17:17 +00:00
copilot-swe-agent[bot]
85354ae26a test/alternator: restore test_table_ts_lwt with always isolation; update docs
- Restore test_table_ts_lwt fixture with system:write_isolation=always to
  explicitly test that the timestamp attribute is rejected in LWT_ALWAYS mode
- Add test_timestamp_attribute_lwt_always_rejected which verifies that even
  a plain PutItem with a timestamp is rejected when always_use_lwt is set
- Keep test_timestamp_attribute_with_condition_rejected using test_table_ts
  (with the test runner's default only_rmw_uses_lwt isolation) to test
  that a ConditionExpression triggers LWT rejection
- Update docs: fix item 4 (non-numeric now rejected), improve Limitations
  section to clearly state always_use_lwt is incompatible with the feature
  and recommend system:write_isolation=only_rmw_uses_lwt

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 22:12:07 +00:00
copilot-swe-agent[bot]
786fa68faa test/alternator: address review comments on test_timestamp_attribute
- Use scope="module" instead of testpy_test_fixture_scope in fixtures
- Rename test_table_ts_sc to test_table_ts_ss (ss = string+string keys)
- Remove test_table_ts_lwt; use test_table_ts for LWT-rejection test
  (the test server runs with only_rmw_uses_lwt, so conditions trigger LWT)
- Add comment that fixtures make tests implicitly Scylla-only
- Change non-numeric timestamp attribute behavior: reject with
  ValidationException instead of silently storing (test + C++ implementation)
- Add test_timestamp_attribute_microseconds: verifies the timestamp unit
  is microseconds and tests interaction with default server timestamps
- Add import time for the new microseconds test

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 21:58:34 +00:00
copilot-swe-agent[bot]
a57f781852 test: refactor test_timestamp_attribute to use shared fixtures
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 21:38:52 +00:00
copilot-swe-agent[bot]
7f79b90e91 Fix timestamp_attribute: non-numeric handling, tag visibility, and clean up
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 20:35:48 +00:00
copilot-swe-agent[bot]
175b8a8a5e Add system:timestamp_attribute feature for custom write timestamps in Alternator
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 20:27:26 +00:00
copilot-swe-agent[bot]
c2ef8075ee Initial plan 2026-02-25 20:04:37 +00:00
3530 changed files with 7045 additions and 5732 deletions

View File

@@ -1,53 +0,0 @@
name: Backport with Jira Integration
on:
push:
branches:
- master
- next-*.*
- branch-*.*
pull_request_target:
types: [labeled, closed]
branches:
- master
- next
- next-*.*
- branch-*.*
jobs:
backport-on-push:
if: github.event_name == 'push'
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'push'
base_branch: ${{ github.ref }}
commits: ${{ github.event.before }}..${{ github.sha }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
backport-on-label:
if: github.event_name == 'pull_request_target' && github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'labeled'
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
pull_request_number: ${{ github.event.pull_request.number }}
head_commit: ${{ github.event.pull_request.base.sha }}
label_name: ${{ github.event.label.name }}
pr_state: ${{ github.event.pull_request.state }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
backport-chain:
if: github.event_name == 'pull_request_target' && github.event.action == 'closed' && github.event.pull_request.merged == true
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'chain'
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
pull_request_number: ${{ github.event.pull_request.number }}
pr_body: ${{ github.event.pull_request.body }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -12,24 +12,6 @@ jobs:
if: (github.event_name == 'issue_comment' && github.event.comment.user.login != 'scylladbbot') || github.event.label.name == 'conflicts'
runs-on: ubuntu-latest
steps:
- name: Verify Org Membership
id: verify_author
shell: bash
run: |
if [[ "${{ github.event_name }}" == "pull_request_target" ]]; then
AUTHOR="${{ github.event.pull_request.user.login }}"
ASSOCIATION="${{ github.event.pull_request.author_association }}"
else
AUTHOR="${{ github.event.comment.user.login }}"
ASSOCIATION="${{ github.event.comment.author_association }}"
fi
if [[ "$ASSOCIATION" == "MEMBER" || "$ASSOCIATION" == "OWNER" ]]; then
echo "member=true" >> $GITHUB_OUTPUT
else
echo "::warning::${AUTHOR} is not a member of scylladb (association: ${ASSOCIATION}); skipping CI trigger."
echo "member=false" >> $GITHUB_OUTPUT
fi
- name: Validate Comment Trigger
if: github.event_name == 'issue_comment'
id: verify_comment
@@ -48,13 +30,13 @@ jobs:
fi
- name: Trigger Scylla-CI-Route Jenkins Job
if: steps.verify_author.outputs.member == 'true' && (github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true')
if: github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true'
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
PR_NUMBER: "${{ github.event.issue.number || github.event.pull_request.number }}"
PR_REPO_NAME: "${{ github.event.repository.full_name }}"
run: |
PR_NUMBER=${{ github.event.issue.number || github.event.pull_request.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

View File

@@ -300,6 +300,7 @@ add_subdirectory(locator)
add_subdirectory(message)
add_subdirectory(mutation)
add_subdirectory(mutation_writer)
add_subdirectory(node_ops)
add_subdirectory(readers)
add_subdirectory(replica)
add_subdirectory(raft)

View File

@@ -7,6 +7,7 @@
*/
#include <fmt/ranges.h>
#include <cstdlib>
#include <seastar/core/on_internal_error.hh>
#include "alternator/executor.hh"
#include "alternator/consumed_capacity.hh"
@@ -63,7 +64,6 @@
#include "types/types.hh"
#include "db/system_keyspace.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "alternator/ttl_tag.hh"
using namespace std::chrono_literals;
@@ -109,6 +109,16 @@ const sstring TABLE_CREATION_TIME_TAG_KEY("system:table_creation_time");
// configured by UpdateTimeToLive to be the expiration-time attribute for
// this table.
extern const sstring TTL_TAG_KEY("system:ttl_attribute");
// If this tag is present, it stores the name of an attribute whose numeric
// value (in microseconds since the Unix epoch) is used as the write timestamp
// for PutItem and UpdateItem operations. When the named attribute is present
// in a PutItem or UpdateItem request, its value is used as the timestamp of
// the write, and the attribute itself is NOT stored in the item. This allows
// users to control write ordering for last-write-wins semantics. Because LWT
// does not allow setting a custom write timestamp, operations using this
// feature are incompatible with conditions (which require LWT), and with
// the LWT_ALWAYS write isolation mode; such operations are rejected.
static const sstring TIMESTAMP_TAG_KEY("system:timestamp_attribute");
// This will be set to 1 in a case, where user DID NOT specify a range key.
// The way GSI / LSI is implemented by Alternator assumes user specified keys will come first
// in materialized view's key list. Then, if needed missing keys are added (current implementation
@@ -165,7 +175,7 @@ static map_type attrs_type() {
static const column_definition& attrs_column(const schema& schema) {
const column_definition* cdef = schema.get_column_definition(bytes(executor::ATTRS_COLUMN_NAME));
throwing_assert(cdef);
SCYLLA_ASSERT(cdef);
return *cdef;
}
@@ -1338,13 +1348,14 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
// Alternator uses tags whose keys start with the "system:" prefix for
// internal purposes. Those should not be readable by ListTagsOfResource,
// nor writable with TagResource or UntagResource (see #24098).
// Only a few specific system tags, currently only "system:write_isolation"
// and "system:initial_tablets", are deliberately intended to be set and read
// by the user, so are not considered "internal".
// Only a few specific system tags, currently only "system:write_isolation",
// "system:initial_tablets", and "system:timestamp_attribute", are deliberately
// intended to be set and read by the user, so are not considered "internal".
static bool tag_key_is_internal(std::string_view tag_key) {
return tag_key.starts_with("system:")
&& tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY
&& tag_key != INITIAL_TABLETS_TAG_KEY;
&& tag_key != INITIAL_TABLETS_TAG_KEY
&& tag_key != TIMESTAMP_TAG_KEY;
}
enum class update_tags_action { add_tags, delete_tags };
@@ -1650,7 +1661,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
throwing_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
// command. We can't inspect the current database schema at this point
@@ -2299,8 +2310,11 @@ public:
// After calling pk_from_json() and ck_from_json() to extract the pk and ck
// components of a key, and if that succeeded, call check_key() to further
// check that the key doesn't have any spurious components.
static void check_key(const rjson::value& key, const schema_ptr& schema) {
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
// allow_extra_attribute: set to true when the key may contain one extra
// non-key attribute (e.g., the timestamp pseudo-attribute for DeleteItem).
static void check_key(const rjson::value& key, const schema_ptr& schema, bool allow_extra_attribute = false) {
const unsigned expected = (schema->clustering_key_size() == 0 ? 1 : 2) + (allow_extra_attribute ? 1 : 0);
if (key.MemberCount() != expected) {
throw api_error::validation("Given key attribute not in schema");
}
}
@@ -2347,6 +2361,57 @@ void validate_value(const rjson::value& v, const char* caller) {
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
// Convert a DynamoDB number (big_decimal) to an api::timestamp_type
// (microseconds since the Unix epoch). Fractional microseconds are truncated.
// Returns nullopt if the value is negative or zero.
static std::optional<api::timestamp_type> bigdecimal_to_timestamp(const big_decimal& bd) {
if (bd.unscaled_value() <= 0) {
return std::nullopt;
}
if (bd.scale() == 0) {
// Fast path: integer value, no decimal adjustment needed
return static_cast<api::timestamp_type>(bd.unscaled_value());
}
// General case: adjust for decimal scale.
// big_decimal stores value as unscaled_value * 10^(-scale).
// scale > 0 means divide by 10^scale (truncate fractional part).
// scale < 0 means multiply by 10^|scale| (add trailing zeros).
auto str = bd.unscaled_value().str();
if (bd.scale() > 0) {
int len = str.length();
if (len <= bd.scale()) {
return std::nullopt; // Number < 1
}
str = str.substr(0, len - bd.scale());
} else {
if (bd.scale() < -18) {
// Too large to represent as int64_t
return std::nullopt;
}
for (int i = 0; i < -bd.scale(); i++) {
str.push_back('0');
}
}
long long result = strtoll(str.c_str(), nullptr, 10);
if (result <= 0) {
return std::nullopt;
}
return static_cast<api::timestamp_type>(result);
}
// Try to extract a write timestamp from a DynamoDB-typed value.
// The value should be a number ({"N": "..."}), representing microseconds
// since the Unix epoch. Returns nullopt if the value is not a valid number
// or doesn't represent a valid timestamp.
static std::optional<api::timestamp_type> try_get_timestamp(const rjson::value& attr_value) {
std::optional<big_decimal> n = try_unwrap_number(attr_value);
if (!n) {
return std::nullopt;
}
return bigdecimal_to_timestamp(*n);
}
class put_or_delete_item {
private:
partition_key _pk;
@@ -2362,11 +2427,17 @@ private:
// that length can have different meaning depends on the operation but the
// the calculation of length in bytes to WCU is the same.
uint64_t _length_in_bytes = 0;
// If the table has a system:timestamp_attribute tag, and the named
// attribute was found in the item with a valid numeric value, this holds
// the extracted timestamp. The attribute is not added to _cells.
std::optional<api::timestamp_type> _custom_timestamp;
public:
struct delete_item {};
struct put_item {};
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes);
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
// put_or_delete_item doesn't keep a reference to schema (so it can be
// moved between shards for LWT) so it needs to be given again to build():
mutation build(schema_ptr schema, api::timestamp_type ts) const;
@@ -2381,11 +2452,32 @@ public:
bool is_put_item() noexcept {
return _cells.has_value();
}
// Returns the custom write timestamp extracted from the timestamp attribute,
// if any. If not set, the caller should use api::new_timestamp() instead.
std::optional<api::timestamp_type> custom_timestamp() const noexcept {
return _custom_timestamp;
}
};
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item)
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item, const std::optional<bytes>& timestamp_attribute)
: _pk(pk_from_json(key, schema)), _ck(ck_from_json(key, schema)) {
check_key(key, schema);
if (timestamp_attribute) {
// The timestamp attribute may be provided as a "pseudo-key": it is
// not a real key column, but can be included in the "Key" object to
// carry the custom write timestamp. If found, extract the timestamp
// and don't store it in the item.
const rjson::value* ts_val = rjson::find(key, to_string_view(*timestamp_attribute));
if (ts_val) {
if (auto t = try_get_timestamp(*ts_val)) {
_custom_timestamp = t;
} else {
throw api_error::validation(fmt::format(
"The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)",
to_string_view(*timestamp_attribute)));
}
}
}
check_key(key, schema, _custom_timestamp.has_value());
}
// find_attribute() checks whether the named attribute is stored in the
@@ -2472,7 +2564,8 @@ static inline void validate_value_if_index_key(
}
}
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes)
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute)
: _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) {
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
@@ -2481,6 +2574,17 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
// If this is the timestamp attribute, it must be a valid numeric value
// (microseconds since epoch). Use it as the write timestamp and do not
// store it in the item data. Reject the write if the value is non-numeric.
if (timestamp_attribute && column_name == *timestamp_attribute) {
if (auto t = try_get_timestamp(it->value)) {
_custom_timestamp = t;
// The attribute is consumed as timestamp, not stored in _cells.
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*timestamp_attribute)));
}
_length_in_bytes += column_name.size();
if (!cdef) {
// This attribute may be a key column of one of the GSI or LSI,
@@ -2672,6 +2776,13 @@ rmw_operation::rmw_operation(service::storage_proxy& proxy, rjson::value&& reque
// _pk and _ck will be assigned later, by the subclass's constructor
// (each operation puts the key in a slightly different location in
// the request).
const auto tags_ptr = db::get_tags_of_table(_schema);
if (tags_ptr) {
auto it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (it != tags_ptr->end() && !it->second.empty()) {
_timestamp_attribute = to_bytes(it->second);
}
}
}
std::optional<mutation> rmw_operation::apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) {
@@ -2816,6 +2927,21 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
.alternator = true,
.alternator_streams_increased_compatibility = schema()->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
// If the operation uses a custom write timestamp (from the
// system:timestamp_attribute tag), LWT is incompatible because LWT
// requires the timestamp to be set by the Paxos protocol. Reject the
// operation if it would need to use LWT.
if (has_custom_timestamp()) {
bool would_use_lwt = _write_isolation == write_isolation::LWT_ALWAYS ||
(needs_read_before_write &&
_write_isolation != write_isolation::FORBID_RMW &&
_write_isolation != write_isolation::UNSAFE_RMW);
if (would_use_lwt) {
throw api_error::validation(
"Using the system:timestamp_attribute is not compatible with "
"conditional writes or the 'always' write isolation policy.");
}
}
if (needs_read_before_write) {
if (_write_isolation == write_isolation::FORBID_RMW) {
throw api_error::validation("Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
@@ -2838,12 +2964,14 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
}
} else if (_write_isolation != write_isolation::LWT_ALWAYS) {
std::optional<mutation> m = apply(nullptr, api::new_timestamp(), cdc_opts);
throwing_assert(m); // !needs_read_before_write, so apply() did not check a condition
SCYLLA_ASSERT(m); // !needs_read_before_write, so apply() did not check a condition
return proxy.mutate(utils::chunked_vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes, false, std::move(cdc_opts)).then([this, &wcu_total] () mutable {
return rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total);
});
}
throwing_assert(cas_shard);
if (!cas_shard) {
on_internal_error(elogger, "cas_shard is not set");
}
// If we're still here, we need to do this write using LWT:
global_stats.write_using_lwt++;
per_table_stats.write_using_lwt++;
@@ -2912,7 +3040,8 @@ public:
put_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{},
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name()))) {
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name())),
_timestamp_attribute) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -2944,6 +3073,9 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -2961,7 +3093,10 @@ public:
} else {
_return_attributes = {};
}
return _mutation_builder.build(_schema, ts);
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
}
virtual ~put_item_operation() = default;
};
@@ -3013,7 +3148,7 @@ public:
parsed::condition_expression _condition_expression;
delete_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}) {
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}, _timestamp_attribute) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -3044,6 +3179,9 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept override {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -3064,7 +3202,10 @@ public:
if (_consumed_capacity._total_bytes == 0) {
_consumed_capacity._total_bytes = 1;
}
return _mutation_builder.build(_schema, ts);
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
}
virtual ~delete_item_operation() = default;
};
@@ -3251,10 +3392,13 @@ future<> executor::do_batch_write(
// Do a normal write, without LWT:
utils::chunked_vector<mutation> mutations;
mutations.reserve(mutation_builders.size());
api::timestamp_type now = api::new_timestamp();
api::timestamp_type default_ts = api::new_timestamp();
bool any_cdc_enabled = false;
for (auto& b : mutation_builders) {
mutations.push_back(b.second.build(b.first, now));
// Use custom timestamp from the timestamp attribute if available,
// otherwise use the default timestamp for all items in this batch.
api::timestamp_type ts = b.second.custom_timestamp().value_or(default_ts);
mutations.push_back(b.second.build(b.first, ts));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
@@ -3354,6 +3498,16 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
1, primary_key_hash{schema}, primary_key_equal{schema});
// Look up the timestamp attribute tag once per table (shared by all
// PutRequests and DeleteRequests for this table).
std::optional<bytes> ts_attr;
const auto tags_ptr = db::get_tags_of_table(schema);
if (tags_ptr) {
auto tag_it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (tag_it != tags_ptr->end() && !tag_it->second.empty()) {
ts_attr = to_bytes(tag_it->second);
}
}
for (auto& request : it->value.GetArray()) {
auto& r = get_single_member(request, "RequestItems element");
const auto r_name = rjson::to_string_view(r.name);
@@ -3362,7 +3516,8 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
validate_is_object(item, "Item in PutRequest");
auto&& put_item = put_or_delete_item(
item, schema, put_or_delete_item::put_item{},
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())));
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())),
ts_attr);
mutation_builders.emplace_back(schema, std::move(put_item));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -3373,7 +3528,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
const rjson::value& key = get_member(r.value, "Key", "DeleteRequest");
validate_is_object(key, "Key in DeleteRequest");
mutation_builders.emplace_back(schema, put_or_delete_item(
key, schema, put_or_delete_item::delete_item{}));
key, schema, put_or_delete_item::delete_item{}, ts_attr));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(),
mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -3982,6 +4137,10 @@ public:
virtual ~update_item_operation() = default;
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override;
bool needs_read_before_write() const;
// Returns true if the timestamp attribute is being set in this update
// (via AttributeUpdates PUT or UpdateExpression SET). Used to detect
// whether a custom write timestamp will be used.
bool has_custom_timestamp() const noexcept;
private:
void delete_attribute(bytes&& column_name, const std::unique_ptr<rjson::value>& previous_item, const api::timestamp_type ts, deletable_row& row,
@@ -4116,6 +4275,44 @@ update_item_operation::needs_read_before_write() const {
(_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::UPDATED_NEW);
}
bool
update_item_operation::has_custom_timestamp() const noexcept {
if (!_timestamp_attribute) {
return false;
}
// Check if the timestamp attribute is being set via AttributeUpdates PUT
// with a valid numeric value.
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
// Only consider it a custom timestamp if the value is numeric
if (try_get_timestamp((it->value)["Value"])) {
return true;
}
}
break;
}
}
}
// Check if the timestamp attribute is being set via UpdateExpression SET.
// We can't check the actual value type without resolving the expression
// (which requires previous_item), so we conservatively return true if the
// attribute appears in a SET action, and handle the non-numeric case in apply().
// A non-numeric value will cause apply() to throw a ValidationException.
if (!_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
return std::holds_alternative<parsed::update_expression::action::set>(action._action);
}
}
return false;
}
// action_result() returns the result of applying an UpdateItem action -
// this result is either a JSON object or an unset optional which indicates
// the action was a deletion. The caller (update_item_operation::apply()
@@ -4391,6 +4588,17 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
}
std::string action = rjson::to_string((it->value)["Action"]);
// If this is the timestamp attribute being PUT, it must be a valid
// numeric value (microseconds since epoch). Use it as the write
// timestamp and skip storing it. Reject if the value is non-numeric.
if (_timestamp_attribute && column_name == *_timestamp_attribute && action == "PUT") {
if (it->value.HasMember("Value")) {
if (try_get_timestamp((it->value)["Value"])) {
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -4494,6 +4702,20 @@ inline void update_item_operation::apply_update_expression(const std::unique_ptr
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(fmt::format("UpdateItem cannot update key column {}", column_name));
}
// If this is the timestamp attribute being set via UpdateExpression SET,
// it must be a valid numeric value (microseconds since epoch). Use it as
// the write timestamp and skip storing it. Reject if non-numeric.
if (_timestamp_attribute && to_bytes(column_name) == *_timestamp_attribute &&
actions.second.has_value() &&
std::holds_alternative<parsed::update_expression::action::set>(actions.second.get_value()._action)) {
std::optional<rjson::value> result = action_result(actions.second.get_value(), previous_item.get());
if (result) {
if (try_get_timestamp(*result)) {
continue; // Skip - already used as timestamp
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (actions.second.has_value()) {
// An action on a top-level attribute column_name. The single
// action is actions.second.get_value(). We can simply invoke
@@ -4542,6 +4764,44 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
return {};
}
// If the table has a timestamp attribute, look for it in the update
// (AttributeUpdates PUT or UpdateExpression SET). If found with a valid
// numeric value, use it as the write timestamp instead of the provided ts.
api::timestamp_type effective_ts = ts;
if (_timestamp_attribute) {
bool found_ts = false;
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
if (auto t = try_get_timestamp((it->value)["Value"])) {
effective_ts = *t;
found_ts = true;
}
}
break;
}
}
}
if (!found_ts && !_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
if (std::holds_alternative<parsed::update_expression::action::set>(action._action)) {
std::optional<rjson::value> result = action_result(action, previous_item.get());
if (result) {
if (auto t = try_get_timestamp(*result)) {
effective_ts = *t;
}
}
}
}
}
}
// In the ReturnValues=ALL_NEW case, we make a copy of previous_item into
// _return_attributes and parts of it will be overwritten by the new
// updates (in do_update() and do_delete()). We need to make a copy and
@@ -4570,10 +4830,10 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
auto& row = m.partition().clustered_row(*_schema, _ck);
auto modified_attrs = attribute_collector();
if (!_update_expression.empty()) {
apply_update_expression(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
apply_update_expression(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
}
if (_attribute_updates) {
apply_attribute_updates(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
apply_attribute_updates(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
}
if (!modified_attrs.empty()) {
auto serialized_map = modified_attrs.to_mut().serialize(*attrs_type());
@@ -4584,7 +4844,7 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
// marker. An update with only DELETE operations must not add a row marker
// (this was issue #5862) but any other update, even an empty one, should.
if (any_updates || !any_deletes) {
row.apply(row_marker(ts));
row.apply(row_marker(effective_ts));
} else if (_returnvalues == returnvalues::ALL_NEW && !previous_item) {
// There was no pre-existing item, and we're not creating one, so
// don't report the new item in the returned Attributes.
@@ -5412,7 +5672,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
}
static dht::token token_for_segment(int segment, int total_segments) {
throwing_assert(total_segments > 1 && segment >= 0 && segment < total_segments);
SCYLLA_ASSERT(total_segments > 1 && segment >= 0 && segment < total_segments);
uint64_t delta = std::numeric_limits<uint64_t>::max() / total_segments;
return dht::token::from_int64(std::numeric_limits<int64_t>::min() + delta * segment);
}

View File

@@ -18,6 +18,7 @@
#include "executor.hh"
#include "tracing/trace_state.hh"
#include "keys/keys.hh"
#include "bytes.hh"
namespace alternator {
@@ -72,6 +73,11 @@ protected:
clustering_key _ck = clustering_key::make_empty();
write_isolation _write_isolation;
mutable wcu_consumed_capacity_counter _consumed_capacity;
// If the table has a "system:timestamp_attribute" tag, this holds the
// name of the attribute (converted to bytes) whose numeric value should
// be used as the write timestamp instead of the current time. The
// attribute itself is NOT stored in the item data.
std::optional<bytes> _timestamp_attribute;
// All RMW operations can have a ReturnValues parameter from the following
// choices. But note that only UpdateItem actually supports all of them:
enum class returnvalues {
@@ -113,6 +119,9 @@ public:
// Convert the above apply() into the signature needed by cas_request:
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override;
virtual ~rmw_operation() = default;
// Returns true if the operation will use a custom write timestamp (from the
// system:timestamp_attribute tag). Subclasses override this as needed.
virtual bool has_custom_timestamp() const noexcept { return false; }
const wcu_consumed_capacity_counter& consumed_capacity() const noexcept { return _consumed_capacity; }
schema_ptr schema() const { return _schema; }
const rjson::value& request() const { return _request; }

View File

@@ -710,7 +710,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
++_executor._stats.requests_blocked_memory;
}
auto units = co_await std::move(units_fut);
throwing_assert(req->content_stream);
SCYLLA_ASSERT(req->content_stream);
chunked_content content = co_await read_entire_stream(*req->content_stream, request_content_length_limit);
// If the request had no Content-Length, we reserved too many units
// so need to return some

View File

@@ -46,7 +46,6 @@
#include "alternator/executor.hh"
#include "alternator/controller.hh"
#include "alternator/serialization.hh"
#include "alternator/ttl_tag.hh"
#include "dht/sharder.hh"
#include "db/config.hh"
#include "db/tags/utils.hh"
@@ -58,10 +57,19 @@ static logging::logger tlogger("alternator_ttl");
namespace alternator {
// We write the expiration-time attribute enabled on a table in a
// tag TTL_TAG_KEY.
// Currently, the *value* of this tag is simply the name of the attribute,
// and the expiration scanner interprets it as an Alternator attribute name -
// It can refer to a real column or if that doesn't exist, to a member of
// the ":attrs" map column. Although this is designed for Alternator, it may
// be good enough for CQL as well (there, the ":attrs" column won't exist).
extern const sstring TTL_TAG_KEY;
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.update_time_to_live++;
if (!_proxy.features().alternator_ttl) {
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Upgrade all nodes to a version that supports it.");
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
}
schema_ptr schema = get_table(_proxy, request);
@@ -316,7 +324,9 @@ static future<std::vector<std::pair<dht::token_range, locator::host_id>>> get_se
const auto& tm = *erm->get_token_metadata_ptr();
const auto& sorted_tokens = tm.sorted_tokens();
std::vector<std::pair<dht::token_range, locator::host_id>> ret;
throwing_assert(!sorted_tokens.empty());
if (sorted_tokens.empty()) {
on_internal_error(tlogger, "Token metadata is empty");
}
auto prev_tok = sorted_tokens.back();
for (const auto& tok : sorted_tokens) {
co_await coroutine::maybe_yield();
@@ -553,7 +563,7 @@ static future<> scan_table_ranges(
expiration_service::stats& expiration_stats)
{
const schema_ptr& s = scan_ctx.s;
throwing_assert(partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
SCYLLA_ASSERT (partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
auto p = service::pager::query_pagers::pager(proxy, s, scan_ctx.selection, *scan_ctx.query_state_ptr,
*scan_ctx.query_options, scan_ctx.command, std::move(partition_ranges), nullptr);
while (!p->is_exhausted()) {
@@ -630,38 +640,13 @@ static future<> scan_table_ranges(
}
} else {
// For a real column to contain an expiration time, it
// must be a numeric type. We currently support decimal
// (used by Alternator TTL) as well as bigint, int and
// timestamp (used by CQL per-row TTL).
switch (meta[*expiration_column]->type->get_kind()) {
case abstract_type::kind::decimal:
// Used by Alternator TTL for key columns not stored
// in the map. The value is in seconds, fractional
// part is ignored.
expired = is_expired(value_cast<big_decimal>(v), now);
break;
case abstract_type::kind::long_kind:
// Used by CQL per-row TTL. The value is in seconds.
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int64_t>(v))), now);
break;
case abstract_type::kind::int32:
// Used by CQL per-row TTL. The value is in seconds.
// Using int type is not recommended because it will
// overflow in 2038, but we support it to allow users
// to use existing int columns for expiration.
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int32_t>(v))), now);
break;
case abstract_type::kind::timestamp:
// Used by CQL per-row TTL. The value is in milliseconds
// but we truncate it to gc_clock's precision (whole seconds).
expired = is_expired(gc_clock::time_point(std::chrono::duration_cast<gc_clock::duration>(value_cast<db_clock::time_point>(v).time_since_epoch())), now);
break;
default:
// Should never happen - we verified the column's type
// before starting the scan.
[[unlikely]]
on_internal_error(tlogger, format("expiration scanner value of unsupported type {} in column {}", meta[*expiration_column]->type->cql3_type_name(), scan_ctx.column_name) );
}
// must be a numeric type.
// FIXME: Currently we only support decimal_type (which is
// what Alternator uses), but other numeric types can be
// supported as well to make this feature more useful in CQL.
// Note that kind::decimal is also checked above.
big_decimal n = value_cast<big_decimal>(v);
expired = is_expired(n, now);
}
if (expired) {
expiration_stats.items_deleted++;
@@ -723,12 +708,16 @@ static future<bool> scan_table(
co_return false;
}
// attribute_name may be one of the schema's columns (in Alternator, this
// means a key column, in CQL it's a regular column), or an element in
// Alternator's attrs map encoded in Alternator's JSON encoding (which we
// decode). If attribute_name is a real column, in Alternator it will have
// the type decimal, counting seconds since the UNIX epoch, while in CQL
// it will one of the types bigint or int (counting seconds) or timestamp
// (counting milliseconds).
// means it's a key column), or an element in Alternator's attrs map
// encoded in Alternator's JSON encoding.
// FIXME: To make this less Alternators-specific, we should encode in the
// single key's value three things:
// 1. The name of a column
// 2. Optionally if column is a map, a member in the map
// 3. The deserializer for the value: CQL or Alternator (JSON).
// The deserializer can be guessed: If the given column or map item is
// numeric, it can be used directly. If it is a "bytes" type, it needs to
// be deserialized using Alternator's deserializer.
bytes column_name = to_bytes(*attribute_name);
const column_definition *cd = s->get_column_definition(column_name);
std::optional<std::string> member;
@@ -747,14 +736,11 @@ static future<bool> scan_table(
data_type column_type = cd->type;
// Verify that the column has the right type: If "member" exists
// the column must be a map, and if it doesn't, the column must
// be decimal_type (Alternator), bigint, int or timestamp (CQL).
// If the column has the wrong type nothing can get expired in
// this table, and it's pointless to scan it.
// (currently) be a decimal_type. If the column has the wrong type
// nothing can get expired in this table, and it's pointless to
// scan it.
if ((member && column_type->get_kind() != abstract_type::kind::map) ||
(!member && column_type->get_kind() != abstract_type::kind::decimal &&
column_type->get_kind() != abstract_type::kind::long_kind &&
column_type->get_kind() != abstract_type::kind::int32 &&
column_type->get_kind() != abstract_type::kind::timestamp)) {
(!member && column_type->get_kind() != abstract_type::kind::decimal)) {
tlogger.info("table {} TTL column has unsupported type, not scanning", s->cf_name());
co_return false;
}
@@ -892,10 +878,12 @@ future<> expiration_service::run() {
future<> expiration_service::start() {
// Called by main() on each shard to start the expiration-service
// thread. Just runs run() in the background and allows stop().
if (!shutting_down()) {
_end = run().handle_exception([] (std::exception_ptr ep) {
tlogger.error("expiration_service failed: {}", ep);
});
if (_db.features().alternator_ttl) {
if (!shutting_down()) {
_end = run().handle_exception([] (std::exception_ptr ep) {
tlogger.error("expiration_service failed: {}", ep);
});
}
}
return make_ready_future<>();
}

View File

@@ -1,26 +0,0 @@
/*
* Copyright 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "seastarx.hh"
#include <seastar/core/sstring.hh>
namespace alternator {
// We use the table tag TTL_TAG_KEY ("system:ttl_attribute") to remember
// which attribute was chosen as the expiration-time attribute for
// Alternator's TTL and CQL's per-row TTL features.
// Currently, the *value* of this tag is simply the name of the attribute:
// It can refer to a real column or if that doesn't exist, to a member of
// the ":attrs" map column (which Alternator uses).
extern const sstring TTL_TAG_KEY;
} // namespace alternator
// let users use TTL_TAG_KEY without the "alternator::" prefix,
// to make it easier to move it to a different namespace later.
using alternator::TTL_TAG_KEY;

View File

@@ -3085,48 +3085,6 @@
}
]
},
{
"path":"/storage_service/tablets/snapshots",
"operations":[
{
"method":"POST",
"summary":"Takes the snapshot for the given keyspaces/tables. A snapshot name must be specified.",
"type":"void",
"nickname":"take_cluster_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"tag",
"description":"the tag given to the snapshot",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"keyspace",
"description":"Keyspace(s) to snapshot. Multiple keyspaces can be provided using a comma-separated list. If omitted, snapshot all keyspaces.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"Table(s) to snapshot. Multiple tables (in a single keyspace) can be provided using a comma-separated list. If omitted, snapshot all tables in the given keyspace(s).",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/quiesce_topology",
"operations":[

View File

@@ -783,13 +783,17 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
apilog.info("cleanup_all global={}", global);
if (global) {
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
co_return co_await ss.do_clusterwide_vnodes_cleanup();
});
auto done = !global ? false : co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
if (!ss.is_topology_coordinator_enabled()) {
co_return false;
}
co_await ss.do_clusterwide_vnodes_cleanup();
co_return true;
});
if (done) {
co_return json::json_return_type(0);
}
// fall back to the local cleanup if local cleanup is requested
// fall back to the local cleanup if topology coordinator is not enabled or local cleanup is requested
auto& db = ctx.db;
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::global_cleanup_compaction_task_impl>({}, db);
@@ -797,7 +801,9 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
// Mark this node as clean
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
co_await ss.reset_cleanup_needed();
if (ss.is_topology_coordinator_enabled()) {
co_await ss.reset_cleanup_needed();
}
});
co_return json::json_return_type(0);
@@ -808,6 +814,9 @@ future<json::json_return_type>
rest_reset_cleanup_needed(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("reset_cleanup_needed");
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
if (!ss.is_topology_coordinator_enabled()) {
throw std::runtime_error("mark_node_as_clean is only supported when topology over raft is enabled");
}
return ss.reset_cleanup_needed();
});
co_return json_void();
@@ -1565,7 +1574,16 @@ rest_reload_raft_topology_state(sharded<service::storage_service>& ss, service::
static
future<json::json_return_type>
rest_upgrade_to_raft_topology(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("Requested to schedule upgrade to raft topology, but this version does not need it since it uses raft topology by default.");
apilog.info("Requested to schedule upgrade to raft topology");
try {
co_await ss.invoke_on(0, [] (auto& ss) {
return ss.start_upgrade_to_raft_topology();
});
} catch (...) {
auto ex = std::current_exception();
apilog.error("Failed to schedule upgrade to raft topology: {}", ex);
std::rethrow_exception(std::move(ex));
}
co_return json_void();
}
@@ -2007,8 +2025,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
auto tcopt = req->get_query_param("tc");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
@@ -2033,27 +2049,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
}
});
ss::take_cluster_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
apilog.info("take_cluster_snapshot: {}", req->get_query_params());
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("table"), ",");
// Note: not published/active. Retain as internal option, but...
auto sfopt = req->get_query_param("skip_flush");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
std::vector<sstring> keynames = split(req->get_query_param("keyspace"), ",");
try {
co_await snap_ctl.local().take_cluster_column_family_snapshot(keynames, column_families, tag, opts);
co_return json_void();
} catch (...) {
apilog.error("take_cluster_snapshot failed: {}", std::current_exception());
throw;
}
});
ss::del_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
apilog.info("del_snapshot: {}", req->get_query_params());
auto tag = req->get_query_param("tag");

View File

@@ -1519,9 +1519,7 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
| std::views::transform(std::mem_fn(&sstables::sstable::run_identifier))
| std::ranges::to<std::unordered_set>());
};
const auto injected_threshold = utils::get_local_injector().inject_parameter<size_t>("set_sstable_count_reduction_threshold");
const auto threshold = injected_threshold.value_or(size_t(std::max(schema->max_compaction_threshold(), 32)));
const auto threshold = size_t(std::max(schema->max_compaction_threshold(), 32));
auto count = co_await num_runs_for_compaction();
if (count <= threshold) {
cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}",
@@ -1536,7 +1534,9 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
auto& cstate = get_compaction_state(&t);
try {
while (can_perform_regular_compaction(t) && co_await num_runs_for_compaction() > threshold) {
co_await cstate.compaction_done.when();
co_await cstate.compaction_done.wait([this, &t] {
return !can_perform_regular_compaction(t);
});
}
} catch (const broken_condition_variable&) {
co_return;

View File

@@ -874,16 +874,7 @@ maintenance_socket: ignore
# The `tablets` option cannot be changed using `ALTER KEYSPACE`.
tablets_mode_for_new_keyspaces: enabled
# Require every tablet-enabled keyspace to be RF-rack-valid.
#
# A tablet-enabled keyspace is RF-rack-valid when, for each data center,
# its replication factor (RF) is 0, 1, or exactly equal to the number of
# racks in that data center. Setting the RF to the number of racks ensures
# that a single rack failure never results in data unavailability.
#
# When set to true, CREATE KEYSPACE and ALTER KEYSPACE statements that
# would produce an RF-rack-invalid keyspace are rejected.
# When set to false, such statements are allowed but emit a warning.
# Enforce RF-rack-valid keyspaces.
rf_rack_valid_keyspaces: false
#

View File

@@ -1361,6 +1361,7 @@ scylla_core = (['message/messaging_service.cc',
'service/topology_state_machine.cc',
'service/topology_mutation.cc',
'service/topology_coordinator.cc',
'node_ops/node_ops_ctl.cc',
'node_ops/task_manager_module.cc',
'reader_concurrency_semaphore_group.cc',
'utils/disk_space_monitor.cc',

View File

@@ -874,8 +874,8 @@ cfamDefinition[cql3::statements::create_table_statement::raw_statement& expr]
;
cfamColumns[cql3::statements::create_table_statement::raw_statement& expr]
@init { bool is_static=false, is_ttl=false; }
: k=ident v=comparatorType (K_TTL {is_ttl = true;})? (K_STATIC {is_static = true;})? { $expr.add_definition(k, v, is_static, is_ttl); }
@init { bool is_static=false; }
: k=ident v=comparatorType (K_STATIC {is_static = true;})? { $expr.add_definition(k, v, is_static); }
(K_PRIMARY K_KEY { $expr.add_key_aliases(std::vector<shared_ptr<cql3::column_identifier>>{k}); })?
| K_PRIMARY K_KEY '(' pkDef[expr] (',' c=ident { $expr.add_column_alias(c); } )* ')'
;
@@ -1042,7 +1042,6 @@ alterTableStatement returns [std::unique_ptr<alter_table_statement::raw_statemen
std::vector<alter_table_statement::column_change> column_changes;
std::vector<std::pair<shared_ptr<cql3::column_identifier::raw>, shared_ptr<cql3::column_identifier::raw>>> renames;
auto attrs = std::make_unique<cql3::attributes::raw>();
shared_ptr<cql3::column_identifier::raw> ttl_change;
}
: K_ALTER K_COLUMNFAMILY cf=columnFamilyName
( K_ALTER id=cident K_TYPE v=comparatorType { type = alter_table_statement::type::alter; column_changes.emplace_back(alter_table_statement::column_change{id, v}); }
@@ -1061,11 +1060,9 @@ alterTableStatement returns [std::unique_ptr<alter_table_statement::raw_statemen
| K_RENAME { type = alter_table_statement::type::rename; }
id1=cident K_TO toId1=cident { renames.emplace_back(id1, toId1); }
( K_AND idn=cident K_TO toIdn=cident { renames.emplace_back(idn, toIdn); } )*
| K_TTL { type = alter_table_statement::type::ttl; }
( id=cident { ttl_change = id; } | K_NULL )
)
{
$expr = std::make_unique<alter_table_statement::raw_statement>(std::move(cf), type, std::move(column_changes), std::move(props), std::move(renames), std::move(attrs), std::move(ttl_change));
$expr = std::make_unique<alter_table_statement::raw_statement>(std::move(cf), type, std::move(column_changes), std::move(props), std::move(renames), std::move(attrs));
}
;
@@ -2074,21 +2071,7 @@ vector_type returns [shared_ptr<cql3::cql3_type::raw> pt]
{
if ($d.text[0] == '-')
throw exceptions::invalid_request_exception("Vectors must have a dimension greater than 0");
unsigned long parsed_dimension;
try {
parsed_dimension = std::stoul($d.text);
} catch (const std::exception& e) {
throw exceptions::invalid_request_exception(format("Invalid vector dimension: {}", $d.text));
}
static_assert(sizeof(unsigned long) >= sizeof(vector_dimension_t));
if (parsed_dimension == 0) {
throw exceptions::invalid_request_exception("Vectors must have a dimension greater than 0");
}
if (parsed_dimension > cql3::cql3_type::MAX_VECTOR_DIMENSION) {
throw exceptions::invalid_request_exception(
format("Vectors must have a dimension less than or equal to {}", cql3::cql3_type::MAX_VECTOR_DIMENSION));
}
$pt = cql3::cql3_type::raw::vector(t, static_cast<vector_dimension_t>(parsed_dimension));
$pt = cql3::cql3_type::raw::vector(t, std::stoul($d.text));
}
;

View File

@@ -27,7 +27,7 @@ public:
struct vector_test_result {
test_result result;
std::optional<vector_dimension_t> dimension_opt;
std::optional<size_t> dimension_opt;
};
static bool is_assignable(test_result tr) {

View File

@@ -307,14 +307,17 @@ public:
class cql3_type::raw_vector : public raw {
shared_ptr<raw> _type;
vector_dimension_t _dimension;
size_t _dimension;
// This limitation is acquired from the maximum number of dimensions in OpenSearch.
static constexpr size_t MAX_VECTOR_DIMENSION = 16000;
virtual sstring to_string() const override {
return seastar::format("vector<{}, {}>", _type, _dimension);
}
public:
raw_vector(shared_ptr<raw> type, vector_dimension_t dimension)
raw_vector(shared_ptr<raw> type, size_t dimension)
: _type(std::move(type)), _dimension(dimension) {
}
@@ -414,7 +417,7 @@ cql3_type::raw::tuple(std::vector<shared_ptr<raw>> ts) {
}
shared_ptr<cql3_type::raw>
cql3_type::raw::vector(shared_ptr<raw> t, vector_dimension_t dimension) {
cql3_type::raw::vector(shared_ptr<raw> t, size_t dimension) {
return ::make_shared<raw_vector>(std::move(t), dimension);
}

View File

@@ -39,9 +39,6 @@ public:
data_type get_type() const { return _type; }
const sstring& to_string() const { return _type->cql3_type_name(); }
// This limitation is acquired from the maximum number of dimensions in OpenSearch.
static constexpr vector_dimension_t MAX_VECTOR_DIMENSION = 16000;
// For UserTypes, we need to know the current keyspace to resolve the
// actual type used, so Raw is a "not yet prepared" CQL3Type.
class raw {
@@ -67,7 +64,7 @@ public:
static shared_ptr<raw> list(shared_ptr<raw> t);
static shared_ptr<raw> set(shared_ptr<raw> t);
static shared_ptr<raw> tuple(std::vector<shared_ptr<raw>> ts);
static shared_ptr<raw> vector(shared_ptr<raw> t, vector_dimension_t dimension);
static shared_ptr<raw> vector(shared_ptr<raw> t, size_t dimension);
static shared_ptr<raw> frozen(shared_ptr<raw> t);
friend sstring format_as(const raw& r) {
return r.to_string();

View File

@@ -502,8 +502,8 @@ vector_validate_assignable_to(const collection_constructor& c, data_dictionary::
throw exceptions::invalid_request_exception(format("Invalid vector type literal for {} of type {}", *receiver.name, receiver.type->as_cql3_type()));
}
vector_dimension_t expected_size = vt->get_dimension();
if (expected_size == 0) {
size_t expected_size = vt->get_dimension();
if (!expected_size) {
throw exceptions::invalid_request_exception(format("Invalid vector type literal for {}: type {} expects at least one element",
*receiver.name, receiver.type->as_cql3_type()));
}

View File

@@ -18,7 +18,7 @@ namespace functions {
namespace detail {
std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension_t dimension) {
std::vector<float> extract_float_vector(const bytes_opt& param, size_t dimension) {
if (!param) {
throw exceptions::invalid_request_exception("Cannot extract float vector from null parameter");
}
@@ -156,7 +156,7 @@ std::vector<data_type> retrieve_vector_arg_types(const function_name& name, cons
}
}
vector_dimension_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
auto type = vector_type_impl::get_instance(float_type, dimension);
return {type, type};
}
@@ -170,7 +170,7 @@ bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters)
// Extract dimension from the vector type
const auto& type = static_cast<const vector_type_impl&>(*arg_types()[0]);
vector_dimension_t dimension = type.get_dimension();
size_t dimension = type.get_dimension();
// Optimized path: extract floats directly from bytes, bypassing data_value overhead
std::vector<float> v1 = detail::extract_float_vector(parameters[0], dimension);

View File

@@ -39,7 +39,7 @@ namespace detail {
// Extract float vector directly from serialized bytes, bypassing data_value overhead.
// This is an internal API exposed for testing purposes.
// Vector<float, N> wire format: N floats as big-endian uint32_t values, 4 bytes each.
std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension_t dimension);
std::vector<float> extract_float_vector(const bytes_opt& param, size_t dimension);
} // namespace detail

View File

@@ -1,20 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <ostream>
namespace cql3 {
class result;
void print_query_results_text(std::ostream& os, const result& result);
void print_query_results_json(std::ostream& os, const result& result);
} // namespace cql3

View File

@@ -9,10 +9,8 @@
*/
#include <cstdint>
#include "types/json_utils.hh"
#include "utils/assert.hh"
#include "utils/hashers.hh"
#include "utils/rjson.hh"
#include "cql3/result_set.hh"
namespace cql3 {
@@ -197,85 +195,4 @@ make_empty_metadata() {
return empty_metadata_cache;
}
void print_query_results_text(std::ostream& os, const cql3::result& result) {
const auto& metadata = result.get_metadata();
const auto& column_metadata = metadata.get_names();
struct column_values {
size_t max_size{0};
sstring header_format;
sstring row_format;
std::vector<sstring> values;
void add(sstring value) {
max_size = std::max(max_size, value.size());
values.push_back(std::move(value));
}
};
std::vector<column_values> columns;
columns.resize(column_metadata.size());
for (size_t i = 0; i < column_metadata.size(); ++i) {
columns[i].add(column_metadata[i]->name->text());
}
for (const auto& row : result.result_set().rows()) {
for (size_t i = 0; i < row.size(); ++i) {
if (row[i]) {
columns[i].add(column_metadata[i]->type->to_string(linearized(managed_bytes_view(*row[i]))));
} else {
columns[i].add("");
}
}
}
std::vector<sstring> separators(columns.size(), sstring());
for (size_t i = 0; i < columns.size(); ++i) {
auto& col_values = columns[i];
col_values.header_format = seastar::format(" {{:<{}}} ", col_values.max_size);
col_values.row_format = seastar::format(" {{:>{}}} ", col_values.max_size);
for (size_t c = 0; c < col_values.max_size; ++c) {
separators[i] += "-";
}
}
for (size_t r = 0; r < result.result_set().rows().size() + 1; ++r) {
std::vector<sstring> row;
row.reserve(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
const auto& format = r == 0 ? columns[i].header_format : columns[i].row_format;
row.push_back(fmt::format(fmt::runtime(std::string_view(format)), columns[i].values[r]));
}
fmt::print(os, "{}\n", fmt::join(row, "|"));
if (!r) {
fmt::print(os, "-{}-\n", fmt::join(separators, "-+-"));
}
}
}
void print_query_results_json(std::ostream& os, const cql3::result& result) {
const auto& metadata = result.get_metadata();
const auto& column_metadata = metadata.get_names();
rjson::streaming_writer writer(os);
writer.StartArray();
for (const auto& row : result.result_set().rows()) {
writer.StartObject();
for (size_t i = 0; i < row.size(); ++i) {
writer.Key(column_metadata[i]->name->text());
if (!row[i] || row[i]->empty()) {
writer.Null();
continue;
}
const auto value = to_json_string(*column_metadata[i]->type, *row[i]);
const auto type = to_json_type(*column_metadata[i]->type, *row[i]);
writer.RawValue(value, type);
}
writer.EndObject();
}
writer.EndArray();
}
}

View File

@@ -10,7 +10,6 @@
#include "cdc/log.hh"
#include "index/vector_index.hh"
#include "types/types.hh"
#include "utils/assert.hh"
#include <seastar/core/coroutine.hh>
#include "cql3/query_options.hh"
@@ -31,9 +30,6 @@
#include "cql3/query_processor.hh"
#include "cdc/cdc_extension.hh"
#include "cdc/cdc_partitioner.hh"
#include "db/tags/extension.hh"
#include "db/tags/utils.hh"
#include "alternator/ttl_tag.hh"
namespace cql3 {
@@ -47,8 +43,7 @@ alter_table_statement::alter_table_statement(uint32_t bound_terms,
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes> attrs,
shared_ptr<column_identifier::raw> ttl_change)
std::unique_ptr<attributes> attrs)
: schema_altering_statement(std::move(name))
, _bound_terms(bound_terms)
, _type(t)
@@ -56,7 +51,6 @@ alter_table_statement::alter_table_statement(uint32_t bound_terms,
, _properties(std::move(properties))
, _renames(std::move(renames))
, _attrs(std::move(attrs))
, _ttl_change(std::move(ttl_change))
{
}
@@ -386,21 +380,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
throw exceptions::invalid_request_exception("Cannot drop columns from a non-CQL3 table");
}
invoke_column_change_fn(std::mem_fn(&alter_table_statement::drop_column));
// If we dropped the column used for per-row TTL, we need to remove the tag.
if (std::optional<std::string> ttl_column = db::find_tag(*s, TTL_TAG_KEY)) {
for (auto& [raw_name, raw_validator, is_static] : _column_changes) {
if (*ttl_column == raw_name->text()) {
const std::map<sstring, sstring>* tags_ptr = db::get_tags_of_table(s);
if (tags_ptr) {
std::map<sstring, sstring> tags_map = *tags_ptr;
tags_map.erase(TTL_TAG_KEY);
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
}
break;
}
}
}
break;
case alter_table_statement::type::opts:
@@ -455,7 +434,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
break;
case alter_table_statement::type::rename:
{
for (auto&& entry : _renames) {
auto from = entry.first->prepare_column_identifier(*s);
auto to = entry.second->prepare_column_identifier(*s);
@@ -492,53 +470,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
}
return make_pair(std::move(new_base_schema), std::move(view_updates));
}
case alter_table_statement::type::ttl:
if (!db.features().cql_row_ttl) {
throw exceptions::invalid_request_exception("The CQL per-row TTL feature is not yet supported by this cluster. Upgrade all nodes to use it.");
}
if (_ttl_change) {
// Enable per-row TTL with chosen column for expiration time
const column_definition *cdef =
s->get_column_definition(to_bytes(_ttl_change->text()));
if (!cdef) {
throw exceptions::invalid_request_exception(fmt::format("Column '{}' does not exist in table {}.{}", _ttl_change->text(), keyspace(), column_family()));
}
if (cdef->type != timestamp_type && cdef->type != long_type && cdef->type != int32_type) {
throw exceptions::invalid_request_exception(fmt::format("TTL column {} must be of type timestamp, bigint or int, can't be {}", _ttl_change->text(), cdef->type->as_cql3_type().to_string()));
}
if (cdef->is_primary_key()) {
throw exceptions::invalid_request_exception(fmt::format("Cannot use a primary key column {} as a TTL column", _ttl_change->text()));
}
if (cdef->is_static()) {
throw exceptions::invalid_request_exception(fmt::format("Cannot use a static column {} as a TTL column", _ttl_change->text()));
}
std::optional<std::string> old_ttl_column = db::find_tag(*s, TTL_TAG_KEY);
if (old_ttl_column) {
throw exceptions::invalid_request_exception(fmt::format("Cannot set TTL column, table {}.{} already has a TTL column defined: {}", keyspace(), column_family(), *old_ttl_column));
}
const std::map<sstring, sstring>* old_tags_ptr = db::get_tags_of_table(s);
std::map<sstring, sstring> tags_map;
if (old_tags_ptr) {
// tags_ptr is a constant pointer to schema data. To modify
// it, we must make a copy.
tags_map = *old_tags_ptr;
}
tags_map[TTL_TAG_KEY] = _ttl_change->text();
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
} else {
// Disable per-row TTL
const std::map<sstring, sstring>* tags_ptr = db::get_tags_of_table(s);
if (!tags_ptr || tags_ptr->find(TTL_TAG_KEY) == tags_ptr->end()) {
throw exceptions::invalid_request_exception(fmt::format("Cannot unset TTL column, table {}.{} does not have a TTL column set", keyspace(), column_family()));
}
// tags_ptr is a constant pointer to schema data. To modify it, we
// must make a copy.
std::map<sstring, sstring> tags_map = *tags_ptr;
tags_map.erase(TTL_TAG_KEY);
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
}
break;
}
return make_pair(cfm.build(), std::move(view_updates));
}
@@ -577,15 +508,13 @@ alter_table_statement::raw_statement::raw_statement(cf_name name,
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes::raw> attrs,
shared_ptr<column_identifier::raw> ttl_change)
std::unique_ptr<attributes::raw> attrs)
: cf_statement(std::move(name))
, _type(t)
, _column_changes(std::move(column_changes))
, _properties(std::move(properties))
, _renames(std::move(renames))
, _attrs(std::move(attrs))
, _ttl_change(std::move(ttl_change))
{}
std::unique_ptr<cql3::statements::prepared_statement>
@@ -610,8 +539,7 @@ alter_table_statement::raw_statement::prepare(data_dictionary::database db, cql_
_column_changes,
_properties,
_renames,
std::move(prepared_attrs),
_ttl_change
std::move(prepared_attrs)
),
ctx,
// since alter table is `cql_statement_no_metadata` (it doesn't return any metadata when preparing)

View File

@@ -32,7 +32,6 @@ public:
drop,
opts,
rename,
ttl,
};
using renames_type = std::vector<std::pair<shared_ptr<column_identifier::raw>,
shared_ptr<column_identifier::raw>>>;
@@ -51,7 +50,6 @@ private:
const std::optional<cf_prop_defs> _properties;
const renames_type _renames;
const std::unique_ptr<attributes> _attrs;
shared_ptr<column_identifier::raw> _ttl_change;
public:
alter_table_statement(uint32_t bound_terms,
cf_name name,
@@ -59,8 +57,7 @@ public:
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes> attrs,
shared_ptr<column_identifier::raw> ttl_change);
std::unique_ptr<attributes> attrs);
virtual uint32_t get_bound_terms() const override;
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
@@ -81,7 +78,6 @@ class alter_table_statement::raw_statement : public raw::cf_statement {
const std::optional<cf_prop_defs> _properties;
const alter_table_statement::renames_type _renames;
const std::unique_ptr<attributes::raw> _attrs;
shared_ptr<column_identifier::raw> _ttl_change;
public:
raw_statement(cf_name name,
@@ -89,8 +85,7 @@ public:
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes::raw> attrs,
shared_ptr<column_identifier::raw> ttl_change);
std::unique_ptr<attributes::raw> attrs);
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -30,9 +30,6 @@
#include "service/storage_proxy.hh"
#include "db/config.hh"
#include "compaction/time_window_compaction_strategy.hh"
#include "db/tags/extension.hh"
#include "db/tags/utils.hh"
#include "alternator/ttl_tag.hh"
namespace cql3 {
@@ -44,12 +41,10 @@ create_table_statement::create_table_statement(cf_name name,
::shared_ptr<cf_prop_defs> properties,
bool if_not_exists,
column_set_type static_columns,
::shared_ptr<column_identifier> ttl_column,
const std::optional<table_id>& id)
: schema_altering_statement{name}
, _use_compact_storage(false)
, _static_columns{static_columns}
, _ttl_column{ttl_column}
, _properties{properties}
, _if_not_exists{if_not_exists}
, _id(id)
@@ -128,13 +123,6 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
#endif
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true);
// Remembering which column was designated as the TTL column for row-based
// TTL column is done using a "tag" extension. If there is no TTL column,
// we don't need this extension at all.
if (_ttl_column) {
std::map<sstring, sstring> tags_map = {{TTL_TAG_KEY, _ttl_column->text()}};
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
}
}
void create_table_statement::add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, const std::vector<data_type>& types, column_kind kind) const
@@ -210,7 +198,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
}
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _ttl_column, _properties.properties()->get_id());
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
bool ks_uses_tablets;
try {
@@ -415,27 +403,6 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
}
}
// If a TTL column is defined, it must be a regular column - not a static
// column or part of the primary key.
if (_ttl_column) {
if (!db.features().cql_row_ttl) {
throw exceptions::invalid_request_exception("The CQL per-row TTL feature is not yet supported by this cluster. Upgrade all nodes to use it.");
}
for (const auto& alias : key_aliases) {
if (alias->text() == _ttl_column->text()) {
throw exceptions::invalid_request_exception(format("TTL column {} cannot be part of the PRIMARY KEY", alias->text()));
}
}
for (const auto& alias : _column_aliases) {
if (alias->text() == _ttl_column->text()) {
throw exceptions::invalid_request_exception(format("TTL column {} cannot be part of the PRIMARY KEY", alias->text()));
}
}
if (_static_columns.contains(_ttl_column)) {
throw exceptions::invalid_request_exception(format("TTL column {} cannot be a static column", _ttl_column->text()));
}
}
return std::make_unique<prepared_statement>(audit_info(), stmt, std::move(stmt_warnings));
}
@@ -458,23 +425,12 @@ data_type create_table_statement::raw_statement::get_type_and_remove(column_map_
return _properties.get_reversable_type(*t, type);
}
void create_table_statement::raw_statement::add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static, bool is_ttl) {
void create_table_statement::raw_statement::add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static) {
_defined_names.emplace(def);
_definitions.emplace(def, type);
if (is_static) {
_static_columns.emplace(def);
}
if (is_ttl) {
if (_ttl_column) {
throw exceptions::invalid_request_exception(fmt::format("Cannot have more than one TTL column in a table. Saw {} and {}", _ttl_column->text(), def->text()));
}
// FIXME: find a way to check cql3_type::raw without fmt::format
auto type_name = fmt::format("{}", type);
if (type_name != "timestamp" && type_name != "bigint" && type_name != "int") {
throw exceptions::invalid_request_exception(fmt::format("TTL column '{}' must be of type timestamp, bigint or int, can't be {}", def->text(), type_name));
}
_ttl_column = def;
}
}
void create_table_statement::raw_statement::add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases) {

View File

@@ -57,7 +57,6 @@ class create_table_statement : public schema_altering_statement {
shared_ptr_equal_by_value<column_identifier>>;
column_map_type _columns;
column_set_type _static_columns;
::shared_ptr<column_identifier> _ttl_column; // for row-based TTL
const ::shared_ptr<cf_prop_defs> _properties;
const bool _if_not_exists;
std::optional<table_id> _id;
@@ -66,7 +65,6 @@ public:
::shared_ptr<cf_prop_defs> properties,
bool if_not_exists,
column_set_type static_columns,
::shared_ptr<column_identifier> ttl_column,
const std::optional<table_id>& id);
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
@@ -102,7 +100,6 @@ private:
std::vector<std::vector<::shared_ptr<column_identifier>>> _key_aliases;
std::vector<::shared_ptr<column_identifier>> _column_aliases;
create_table_statement::column_set_type _static_columns;
::shared_ptr<column_identifier> _ttl_column; // for row-based TTL
std::multiset<::shared_ptr<column_identifier>,
indirect_less<::shared_ptr<column_identifier>, column_identifier::text_comparator>> _defined_names;
@@ -119,7 +116,7 @@ public:
data_type get_type_and_remove(column_map_type& columns, ::shared_ptr<column_identifier> t);
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static, bool is_ttl);
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static);
void add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases);

View File

@@ -2004,7 +2004,9 @@ static std::optional<ann_ordering_info> get_ann_ordering_info(
auto indexes = sim.list_indexes();
auto it = std::find_if(indexes.begin(), indexes.end(), [&prepared_ann_ordering](const auto& ind) {
return secondary_index::vector_index::is_vector_index_on_column(ind.metadata(), prepared_ann_ordering.first->name_as_text());
return (ind.metadata().options().contains(db::index::secondary_index::custom_class_option_name) &&
ind.metadata().options().at(db::index::secondary_index::custom_class_option_name) == ANN_CUSTOM_INDEX_OPTION) &&
(ind.target_column() == prepared_ann_ordering.first->name_as_text());
});
if (it == indexes.end()) {

View File

@@ -55,21 +55,8 @@ int32_t batchlog_shard_of(db_clock::time_point written_at) {
return hash & ((1ULL << batchlog_shard_bits) - 1);
}
bool is_batchlog_v1(const schema& schema) {
return schema.cf_name() == system_keyspace::BATCHLOG;
}
std::pair<partition_key, clustering_key>
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional<utils::UUID> id) {
if (is_batchlog_v1(schema)) {
if (!id) {
on_internal_error(blogger, "get_batchlog_key(): key for batchlog v1 requires batchlog id");
}
auto pkey = partition_key::from_single_value(schema, {serialized(*id)});
auto ckey = clustering_key::make_empty();
return std::pair(std::move(pkey), std::move(ckey));
}
auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)});
std::vector<bytes> ckey_components;
@@ -98,14 +85,6 @@ mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_
auto cdef_data = schema->get_column_definition(to_bytes("data"));
m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
if (is_batchlog_v1(*schema)) {
auto cdef_version = schema->get_column_definition(to_bytes("version"));
m.set_cell(ckey, *cdef_version, atomic_cell::make_live(*cdef_version->type, timestamp, serialized(version)));
auto cdef_written_at = schema->get_column_definition(to_bytes("written_at"));
m.set_cell(ckey, *cdef_written_at, atomic_cell::make_live(*cdef_written_at->type, timestamp, serialized(now)));
}
return m;
}
@@ -143,10 +122,9 @@ mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clo
const std::chrono::seconds db::batchlog_manager::replay_interval;
const uint32_t db::batchlog_manager::page_size;
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config)
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
: _qp(qp)
, _sys_ks(sys_ks)
, _fs(fs)
, _replay_timeout(config.replay_timeout)
, _replay_rate(config.replay_rate)
, _delay(config.delay)
@@ -322,206 +300,149 @@ future<> db::batchlog_manager::maybe_migrate_v1_to_v2() {
});
}
namespace {
using clock_type = db_clock::rep;
struct replay_stats {
std::optional<db_clock::time_point> min_too_fresh;
bool need_cleanup = false;
};
} // anonymous namespace
static future<db::all_batches_replayed> process_batch(
cql3::query_processor& qp,
db::batchlog_manager::stats& stats,
db::batchlog_manager::post_replay_cleanup cleanup,
utils::rate_limiter& limiter,
schema_ptr schema,
std::unordered_map<int32_t, replay_stats>& replay_stats_per_shard,
const db_clock::time_point now,
db_clock::duration replay_timeout,
std::chrono::seconds write_timeout,
const cql3::untyped_result_set::row& row) {
const bool is_v1 = db::is_batchlog_v1(*schema);
const auto stage = is_v1 ? db::batchlog_stage::initial : static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
const auto batch_shard = is_v1 ? 0 : row.get_as<int32_t>("shard");
auto written_at = row.get_as<db_clock::time_point>("written_at");
auto id = row.get_as<utils::UUID>("id");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = replay_timeout;
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
co_return db::all_batches_replayed::no;
}
auto data = row.get_blob_unfragmented("data");
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
utils::chunked_vector<mutation> mutations;
bool send_failed = false;
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
try {
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
auto in = ser::as_input_stream(data);
while (in.size()) {
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
const auto tbl = qp.db().try_find_table(fm.column_family_id());
if (!tbl) {
continue;
}
if (written_at <= tbl->get_truncation_time()) {
continue;
}
schema_ptr s = tbl->schema();
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
}
fms.emplace_back(std::move(fm), std::move(s));
}
if (now < written_at + timeout) {
blogger.debug("Skipping replay of {}, too fresh", id);
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
co_return db::all_batches_replayed::no;
}
auto size = data.size();
for (const auto& [fm, s] : fms) {
mutations.emplace_back(fm.to_mutation(s));
co_await coroutine::maybe_yield();
}
if (!mutations.empty()) {
const auto ttl = [written_at]() -> clock_type {
/*
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
* This ensures that deletes aren't "undone" by an old batch replay.
*/
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
warn(unimplemented::cause::HINT);
#if 0
for (auto& m : *mutations) {
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
}
#endif
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
}();
if (ttl > 0) {
// Origin does the send manually, however I can't see a super great reason to do so.
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
// in both cases.
// FIXME: verify that the above is reasonably true.
co_await limiter.reserve(size);
stats.write_attempts += mutations.size();
auto timeout = db::timeout_clock::now() + write_timeout;
if (cleanup) {
co_await qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
} else {
co_await qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
}
}
}
} catch (data_dictionary::no_such_keyspace& ex) {
// should probably ignore and drop the batch
} catch (const data_dictionary::no_such_column_family&) {
// As above -- we should drop the batch if the table doesn't exist anymore.
} catch (...) {
blogger.warn("Replay failed (will retry): {}", std::current_exception());
// timeout, overload etc.
// Do _not_ remove the batch, assuning we got a node write error.
// Since we don't have hints (which origin is satisfied with),
// we have to resort to keeping this batch to next lap.
if (is_v1 || !cleanup || stage == db::batchlog_stage::failed_replay) {
co_return db::all_batches_replayed::no;
}
send_failed = true;
}
auto& sp = qp.proxy();
if (send_failed) {
blogger.debug("Moving batch {} to stage failed_replay", id);
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, db::batchlog_stage::failed_replay, written_at, id);
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
}
// delete batch
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
co_await qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
shard_written_at.need_cleanup = true;
co_return db::all_batches_replayed(!send_failed);
}
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v1(post_replay_cleanup) {
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
utils::rate_limiter limiter(throttle);
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
// Use a stable `now` across all batches, so skip/replay decisions are the
// same across a while prefix of written_at (across all ids).
const auto now = db_clock::now();
auto batch = [this, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
all_replayed = all_replayed && co_await process_batch(_qp, _stats, post_replay_cleanup::no, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
co_return stop_iteration::no;
};
co_await with_gate(_gate, [this, &all_replayed, batch = std::move(batch)] () mutable -> future<> {
blogger.debug("Started replayAllFailedBatches");
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
co_await _qp.query_internal(
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
db::consistency_level::ONE,
{},
page_size,
batch);
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
});
co_return all_replayed;
}
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v2(post_replay_cleanup cleanup) {
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
co_await maybe_migrate_v1_to_v2();
typedef db_clock::rep clock_type;
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
utils::rate_limiter limiter(throttle);
auto limiter = make_lw_shared<utils::rate_limiter>(throttle);
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
struct replay_stats {
std::optional<db_clock::time_point> min_too_fresh;
bool need_cleanup = false;
};
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
// Use a stable `now` across all batches, so skip/replay decisions are the
// same across a while prefix of written_at (across all ids).
const auto now = db_clock::now();
auto batch = [this, cleanup, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
all_replayed = all_replayed && co_await process_batch(_qp, _stats, cleanup, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
auto batch = [this, cleanup, limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
const auto stage = static_cast<batchlog_stage>(row.get_as<int8_t>("stage"));
const auto batch_shard = row.get_as<int32_t>("shard");
auto written_at = row.get_as<db_clock::time_point>("written_at");
auto id = row.get_as<utils::UUID>("id");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = _replay_timeout;
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
all_replayed = all_batches_replayed::no;
co_return stop_iteration::no;
}
auto data = row.get_blob_unfragmented("data");
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
utils::chunked_vector<mutation> mutations;
bool send_failed = false;
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
try {
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
auto in = ser::as_input_stream(data);
while (in.size()) {
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
const auto tbl = _qp.db().try_find_table(fm.column_family_id());
if (!tbl) {
continue;
}
if (written_at <= tbl->get_truncation_time()) {
continue;
}
schema_ptr s = tbl->schema();
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
}
fms.emplace_back(std::move(fm), std::move(s));
}
if (now < written_at + timeout) {
blogger.debug("Skipping replay of {}, too fresh", id);
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
co_return stop_iteration::no;
}
auto size = data.size();
for (const auto& [fm, s] : fms) {
mutations.emplace_back(fm.to_mutation(s));
co_await coroutine::maybe_yield();
}
if (!mutations.empty()) {
const auto ttl = [written_at]() -> clock_type {
/*
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
* This ensures that deletes aren't "undone" by an old batch replay.
*/
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
warn(unimplemented::cause::HINT);
#if 0
for (auto& m : *mutations) {
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
}
#endif
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
}();
if (ttl > 0) {
// Origin does the send manually, however I can't see a super great reason to do so.
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
// in both cases.
// FIXME: verify that the above is reasonably true.
co_await limiter->reserve(size);
_stats.write_attempts += mutations.size();
auto timeout = db::timeout_clock::now() + write_timeout;
if (cleanup) {
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
} else {
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
}
}
}
} catch (data_dictionary::no_such_keyspace& ex) {
// should probably ignore and drop the batch
} catch (const data_dictionary::no_such_column_family&) {
// As above -- we should drop the batch if the table doesn't exist anymore.
} catch (...) {
blogger.warn("Replay failed (will retry): {}", std::current_exception());
all_replayed = all_batches_replayed::no;
// timeout, overload etc.
// Do _not_ remove the batch, assuning we got a node write error.
// Since we don't have hints (which origin is satisfied with),
// we have to resort to keeping this batch to next lap.
if (!cleanup || stage == batchlog_stage::failed_replay) {
co_return stop_iteration::no;
}
send_failed = true;
}
auto& sp = _qp.proxy();
if (send_failed) {
blogger.debug("Moving batch {} to stage failed_replay", id);
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, batchlog_stage::failed_replay, written_at, id);
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
}
// delete batch
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
shard_written_at.need_cleanup = true;
co_return stop_iteration::no;
};
@@ -580,10 +501,3 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
co_return all_replayed;
}
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
if (_fs.batchlog_v2) {
return replay_all_failed_batches_v2(cleanup);
}
return replay_all_failed_batches_v1(cleanup);
}

View File

@@ -27,12 +27,6 @@ class query_processor;
} // namespace cql3
namespace gms {
class feature_service;
} // namespace gms
namespace db {
class system_keyspace;
@@ -55,11 +49,6 @@ class batchlog_manager : public peering_sharded_service<batchlog_manager> {
public:
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
struct stats {
uint64_t write_attempts = 0;
};
private:
static constexpr std::chrono::seconds replay_interval = std::chrono::seconds(60);
static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
@@ -67,13 +56,14 @@ private:
using clock_type = lowres_clock;
stats _stats;
struct stats {
uint64_t write_attempts = 0;
} _stats;
seastar::metrics::metric_groups _metrics;
cql3::query_processor& _qp;
db::system_keyspace& _sys_ks;
gms::feature_service& _fs;
db_clock::duration _replay_timeout;
uint64_t _replay_rate;
std::chrono::milliseconds _delay;
@@ -94,14 +84,12 @@ private:
future<> maybe_migrate_v1_to_v2();
future<all_batches_replayed> replay_all_failed_batches_v1(post_replay_cleanup cleanup);
future<all_batches_replayed> replay_all_failed_batches_v2(post_replay_cleanup cleanup);
future<all_batches_replayed> replay_all_failed_batches(post_replay_cleanup cleanup);
public:
// Takes a QP, not a distributes. Because this object is supposed
// to be per shard and does no dispatching beyond delegating the the
// shard qp (which is what you feed here).
batchlog_manager(cql3::query_processor&, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config);
batchlog_manager(cql3::query_processor&, db::system_keyspace& sys_ks, batchlog_manager_config config);
// abort the replay loop and return its future.
future<> drain();
@@ -114,7 +102,7 @@ public:
return _last_replay;
}
const stats& get_stats() const {
const stats& stats() const {
return _stats;
}
private:

View File

@@ -1292,7 +1292,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, fd_initial_value_ms(this, "fd_initial_value_ms", value_status::Used, 2 * 1000, "The initial failure_detector interval time in milliseconds.")
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
, developer_mode(this, "developer_mode", value_status::Used, DEVELOPER_MODE_DEFAULT, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Deprecated, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user.")
, experimental_features(this, "experimental_features", value_status::Used, {}, experimental_features_help_string())
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step.")

View File

@@ -16,7 +16,6 @@
#include <string>
#include <tuple>
#include "cql3/cql3_type.hh"
#include "types/user.hh"
#include "types/map.hh"
#include "types/list.hh"
@@ -114,7 +113,7 @@ std::vector<data_type> type_parser::get_type_parameters(bool multicell)
throw parse_exception(_str, _idx, "unexpected end of string");
}
std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
std::tuple<data_type, size_t> type_parser::get_vector_parameters()
{
if (is_eos() || _str[_idx] != '(') {
throw std::logic_error("internal error");
@@ -129,7 +128,7 @@ std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
}
data_type type = do_parse(true);
vector_dimension_t size = 0;
size_t size = 0;
if (_str[_idx] == ',') {
++_idx;
skip_blank();
@@ -143,20 +142,7 @@ std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
throw parse_exception(_str, _idx, "expected digit or ')'");
}
unsigned long parsed_size;
try {
parsed_size = std::stoul(_str.substr(i, _idx - i));
} catch (const std::exception& e) {
throw parse_exception(_str, i, format("Invalid vector dimension: {}", e.what()));
}
static_assert(sizeof(unsigned long) >= sizeof(vector_dimension_t));
if (parsed_size == 0) {
throw parse_exception(_str, _idx, "Vectors must have a dimension greater than 0");
}
if (parsed_size > cql3::cql3_type::MAX_VECTOR_DIMENSION) {
throw parse_exception(_str, _idx, format("Vectors must have a dimension less than or equal to {}", cql3::cql3_type::MAX_VECTOR_DIMENSION));
}
size = static_cast<vector_dimension_t>(parsed_size);
size = std::stoul(_str.substr(i, _idx - i));
++_idx; // skipping ')'
return std::make_tuple(type, size);

View File

@@ -97,7 +97,7 @@ public:
}
#endif
std::vector<data_type> get_type_parameters(bool multicell=true);
std::tuple<data_type, vector_dimension_t> get_vector_parameters();
std::tuple<data_type, size_t> get_vector_parameters();
std::tuple<sstring, bytes, std::vector<bytes>, std::vector<data_type>> get_user_type_parameters();
data_type do_parse(bool multicell = true);

View File

@@ -21,16 +21,14 @@
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "sstables/sstables_manager.hh"
#include "service/storage_proxy.hh"
logging::logger snap_log("snapshots");
namespace db {
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>& sp, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
: _config(std::move(cfg))
, _db(db)
, _sp(sp)
, _ops("snapshot_ctl")
, _task_manager_module(make_shared<snapshot::task_manager_module>(tm))
, _storage_manager(sstm)
@@ -106,45 +104,6 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
});
}
future<> snapshot_ctl::take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (tag.empty()) {
throw std::invalid_argument("You must supply a snapshot name.");
}
if (ks_names.size() != 1 && !tables.empty()) {
throw std::invalid_argument("Cannot name tables when doing multiple keyspaces snapshot");
}
if (ks_names.empty()) {
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(ks_names));
}
return run_snapshot_modify_operation([this, ks_names = std::move(ks_names), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
return do_take_cluster_column_family_snapshot(std::move(ks_names), std::move(tables), std::move(tag), opts);
});
}
future<> snapshot_ctl::do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (tables.empty()) {
co_await coroutine::parallel_for_each(ks_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag);
});
co_await _sp.local().snapshot_keyspace(
ks_names | std::views::transform([&](auto& ks) { return std::make_pair(ks, sstring{}); })
| std::ranges::to<std::unordered_multimap>(),
tag, opts
);
co_return;
};
auto ks = ks_names[0];
co_await check_snapshot_not_exist(ks, tag, tables);
co_await _sp.local().snapshot_keyspace(
tables | std::views::transform([&](auto& cf) { return std::make_pair(ks, cf); })
| std::ranges::to<std::unordered_multimap>(),
tag, opts
);
}
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
@@ -226,4 +185,4 @@ future<int64_t> snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) {
}));
}
}
}

View File

@@ -24,7 +24,6 @@
using namespace seastar;
namespace sstables { class storage_manager; }
namespace service { class storage_proxy; }
namespace db {
@@ -64,7 +63,7 @@ public:
using db_snapshot_details = std::vector<table_snapshot_details_ext>;
snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>&, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
future<> stop();
@@ -96,17 +95,6 @@ public:
*/
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Takes the snapshot of multiple tables or a whole keyspace, or all keyspaces,
* using global, clusterwide topology coordinated op.
* A snapshot name must be specified.
*
* @param ks_names the keyspaces to snapshot
* @param tables optional - a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Remove the snapshot with the given name from the given keyspaces.
* If no tag is specified we will remove all snapshots.
@@ -123,7 +111,6 @@ public:
private:
config _config;
sharded<replica::database>& _db;
sharded<service::storage_proxy>& _sp;
seastar::rwlock _lock;
seastar::named_gate _ops;
shared_ptr<snapshot::task_manager_module> _task_manager_module;
@@ -146,7 +133,6 @@ private:
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
};
}
}

View File

@@ -770,6 +770,13 @@ system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) {
co_return res;
}
bool system_distributed_keyspace::workload_prioritization_tables_exists() {
auto wp_table = get_updated_service_levels(_qp.db(), true);
auto table = _qp.db().try_find_table(NAME, wp_table->cf_name());
return table && table->schema()->equal_columns(*wp_table);
}
future<qos::service_levels_info> system_distributed_keyspace::get_service_levels(qos::query_context ctx) const {
return qos::get_service_levels(_qp, NAME, SERVICE_LEVELS, db::consistency_level::ONE, ctx);
}

View File

@@ -117,6 +117,7 @@ public:
future<qos::service_levels_info> get_service_level(sstring service_level_name) const;
future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const;
future<> drop_service_level(sstring service_level_name) const;
bool workload_prioritization_tables_exists();
private:
future<> create_tables(std::vector<schema_ptr> tables);

View File

@@ -335,10 +335,6 @@ schema_ptr system_keyspace::topology_requests() {
.with_column("truncate_table_id", uuid_type)
.with_column("new_keyspace_rf_change_ks_name", utf8_type)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_column("snapshot_table_ids", set_type_impl::get_instance(uuid_type, false))
.with_column("snapshot_tag", utf8_type)
.with_column("snapshot_expiry", timestamp_type)
.with_column("snapshot_skip_flush", boolean_type)
.set_comment("Topology request tracking")
.with_hash_version()
.build();
@@ -3585,18 +3581,6 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
entry.new_keyspace_rf_change_ks_name = row.get_as<sstring>("new_keyspace_rf_change_ks_name");
entry.new_keyspace_rf_change_data = row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
}
if (row.has("snapshot_table_ids")) {
entry.snapshot_tag = row.get_as<sstring>("snapshot_tag");
entry.snapshot_skip_flush = row.get_as<bool>("snapshot_skip_flush");
entry.snapshot_table_ids = row.get_set<utils::UUID>("snapshot_table_ids")
| std::views::transform([](auto& uuid) { return table_id(uuid); })
| std::ranges::to<std::unordered_set>()
;
;
if (row.has("snapshot_expiry")) {
entry.snapshot_expiry = row.get_as<db_clock::time_point>("snapshot_expiry");
}
}
return entry;
}

View File

@@ -417,10 +417,6 @@ public:
std::optional<sstring> new_keyspace_rf_change_ks_name;
// The KS options to be used when executing the scheduled ALTER KS statement
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_data;
std::optional<std::unordered_set<table_id>> snapshot_table_ids;
std::optional<sstring> snapshot_tag;
std::optional<db_clock::time_point> snapshot_expiry;
bool snapshot_skip_flush;
};
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;

View File

@@ -98,13 +98,14 @@ public:
auto hostid = eps.get_host_id();
set_cell(cr, "up", gossiper.is_alive(hostid));
if (gossiper.is_shutdown(endpoint)) {
if (!ss.raft_topology_change_enabled() || gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", gossiper.get_gossip_status(endpoint));
} else {
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
}
set_cell(cr, "load", gossiper.get_application_state_value(endpoint, gms::application_state::LOAD));
if (ss.raft_topology_change_enabled() && !gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
}
set_cell(cr, "host_id", hostid.uuid());
if (tm.get_topology().has_node(hostid)) {

View File

@@ -97,9 +97,7 @@ bcp LICENSE-ScyllaDB-Source-Available.md /licenses/
run microdnf clean all
run microdnf --setopt=tsflags=nodocs -y update
run microdnf --setopt=tsflags=nodocs -y install hostname kmod procps-ng python3 python3-pip cpio
# Extract only systemctl binary from systemd package to avoid installing the whole systemd in the container.
run bash -rc "microdnf download systemd && rpm2cpio systemd-*.rpm | cpio -idmv ./usr/bin/systemctl && rm -rf systemd-*.rpm"
run microdnf --setopt=tsflags=nodocs -y install hostname kmod procps-ng python3 python3-pip
run curl -L --output /etc/yum.repos.d/scylla.repo ${repo_file_url}
run pip3 install --no-cache-dir --prefix /usr supervisor
run bash -ec "echo LANG=C.UTF-8 > /etc/locale.conf"
@@ -108,8 +106,6 @@ run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
run mkdir -p /var/log/scylla
run chown -R scylla:scylla /var/lib/scylla
run sed -i -e 's/^SCYLLA_ARGS=".*"$/SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --network-stack posix"/' /etc/sysconfig/scylla-server
# Cleanup packages not needed in the final image and clean package manager cache to reduce image size.
run bash -rc "microdnf remove -y cpio && microdnf clean all"
run mkdir -p /opt/scylladb/supervisor
run touch /opt/scylladb/SCYLLA-CONTAINER-FILE

View File

@@ -213,3 +213,71 @@ Alternator table, the following features will not work for this table:
* Enabling Streams with CreateTable or UpdateTable doesn't work
(results in an error).
See <https://github.com/scylladb/scylla/issues/23838>.
## Custom write timestamps
DynamoDB doesn't allow clients to set the write timestamp of updates. All
updates use the current server time as their timestamp, and ScyllaDB uses
these timestamps for last-write-wins conflict resolution when concurrent
writes reach different replicas.
ScyllaDB Alternator extends this with the `system:timestamp_attribute` tag,
which allows specifying a custom write timestamp for each PutItem,
UpdateItem, DeleteItem, or BatchWriteItem request. To use this feature:
1. Tag the table (at CreateTable time or using TagResource) with
`system:timestamp_attribute` set to the name of an attribute that will
hold the custom write timestamp.
2. When performing a PutItem or UpdateItem, include the named attribute
in the request with a numeric value. The value represents the write
timestamp in **microseconds since the Unix epoch** (this is the same
unit used internally by ScyllaDB for timestamps).
For a DeleteItem or a BatchWriteItem DeleteRequest, include the named
attribute in the `Key` parameter (it will be stripped from the key
before use).
3. The named attribute is **not stored** in the item data - it only
controls the write timestamp. If you also want to record the timestamp
as data, use a separate attribute for that purpose.
4. If the named attribute is absent, the write proceeds normally using the
current server time as the timestamp. If the named attribute is present
but has a non-numeric value, the write is rejected with a ValidationException.
### Limitations
- **Incompatible with conditions**: If the write includes a ConditionExpression
(or uses the `Expected` legacy condition), LWT is needed and the operation
is rejected with a ValidationException, because LWT requires the write
timestamp to be set by the Paxos protocol, not by the client.
- **Incompatible with `always` write isolation**: Tables using the `always`
(or `always_use_lwt`) write isolation policy cannot use the timestamp
attribute feature at all, because every write uses LWT in that mode.
When using `system:timestamp_attribute`, consider tagging the table with
`system:write_isolation=only_rmw_uses_lwt` (or `forbid_rmw`) so that
unconditional writes do not use LWT.
### Example use case
This feature is useful for ingesting data from multiple sources where each
record has a known logical timestamp. By setting the `system:timestamp_attribute`
tag, you can ensure that the record with the highest logical timestamp always
wins, regardless of ingestion order:
```python
# Create table with timestamp attribute
dynamodb.create_table(
TableName='my_table',
...
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'write_ts'}]
)
# Write a record with a specific timestamp (in microseconds since epoch)
table.put_item(Item={
'pk': 'my_key',
'data': 'new_value',
'write_ts': Decimal('1700000000000000'), # Nov 14, 2023 in microseconds
})
```

View File

@@ -402,82 +402,3 @@ it also describes authentication/authorization and service levels. Additionally,
statement: `DESCRIBE SCHEMA WITH INTERNALS AND PASSWORDS`, which also includes the information about hashed passwords of the roles.
For more details, see [the article on DESCRIBE SCHEMA](./describe-schema.rst).
## Per-row TTL
CQL's traditional time-to-live (TTL) feature attaches an expiration time to
each cell - i.e., each value in each column. For example, the statement:
```
UPDATE tbl USING TTL 60 SET x = 1 WHERE p = 2
```
Sets a new value for the column `x` in row `p = 2`, and asks for this value to
expire in 60 seconds. When a row is updated incrementally, with different
columns set at different times, this can result in different pieces of the
row expiring at different times. Applications rarely want partially-expired
rows, so they often need to re-write an entire row each time the row needs
updating. In particular, it is not possible to change the expiration time of
an existing row without re-writing it.
Per-row time-to-live (TTL) is a new CQL feature that is an alternative to
the traditional per-cell TTL. One column is designated as the "expiration
time" column, and the value of this column determines when the entire row
will expire. It becomes possible to update pieces of a row without changing
its expiration time, and vice versa - to change a row's expiration time
without rewriting its data.
The expiration-time column of a table can be chosen when it is created by
adding the keyword "TTL" to one of the columns:
```cql
CREATE TABLE tab (
id int PRIMARY KEY,
t text,
expiration timestamp TTL
);
```
The TTL column's name, in this example `expiration`, can be anything.
Per-row TTL can also be enabled on an existing table by adding the "TTL"
designation to one of the existing columns, with:
```cql
ALTER TABLE tab TTL colname
```
Or per-row TTL can be disabled (rows will never expire), with:
```cql
ALTER TABLE tab TTL NULL
```
It is not possible to enable per-row TTL if it's already enabled, or disable
it when already disabled. If you have TTL enabled on one column and want to
enable it instead on a second column, you must do it in two steps: First
disable TTL and then re-enable it on the second column.
The designated TTL column must have the type `timestamp` or `bigint`,
and specifies the absolute time when the row should expire (the `bigint`
type is interpreted as seconds since the UNIX epoch). It must be a regular
column (not a primary key column or a static column), and there can only be
one such column.
The 32-bit type `int` (specifying number of seconds since the UNIX epoch)
is also supported, but not recommended because it will wrap around in 2038.
Unless you must use the `int` type because of pre-existing expiration data
with that type, please prefer `timestamp` or `bigint`.
Another important feature of per-row TTL is that if CDC is enabled, when a
row expires a deletion event appears in the CDC log - something that doesn't
happen in per-cell TTL. This deletion event can be distinguished from user-
initiated deletes: Whereas user-initiated deletes have `cdc_operation` set to
3 (`row_delete`) or 4 (`partition_delete`), those generated by expiration have
`cdc_operation` -3 (`service_row_delete`) or -4 (`service_partition_delete`).
Unlike per-cell TTL where a value becomes unreadable at the precise specified
second, the per-row TTL's expiration is _eventual_ - the row will expire
some time _after_ its requested expiration time, where this "some time" can
be controlled by the configuration `alternator_ttl_period_in_seconds`. Until
the row is actually deleted, it can still be read, and even written.
Importantly, the CDC event will appear immediately after the row is finally
deleted.
It's important to re-iterate that the per-cell TTL and per-row TTL features
are separate and distinct, use a different CQL syntax, have a different
implementation and provide different guarantees. It is possible to use
both features in the same table, or even the same row.

View File

@@ -476,7 +476,7 @@ Creating a new table uses the ``CREATE TABLE`` statement:
: [ ',' PRIMARY KEY '(' `primary_key` ')' ]
: ')' [ WITH `table_options` ]
column_definition: `column_name` `cql_type` [ TTL ] [ STATIC ] [ PRIMARY KEY]
column_definition: `column_name` `cql_type` [ STATIC ] [ PRIMARY KEY]
primary_key: `partition_key` [ ',' `clustering_columns` ]
@@ -572,11 +572,6 @@ A :token:`column_definition` is primarily comprised of the name of the column de
which restricts which values are accepted for that column. Additionally, a column definition can have the following
modifiers:
``TTL``
declares the column as being the expiration-time column for the
`per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
feature.
``STATIC``
declares the column as being a :ref:`static column <static-columns>`.
@@ -1177,7 +1172,6 @@ Altering an existing table uses the ``ALTER TABLE`` statement:
: | DROP '(' `column_name` ( ',' `column_name` )* ')' [ USING TIMESTAMP `timestamp` ]
: | ALTER `column_name` TYPE `cql_type`
: | WITH `options`
: | TTL (`column_name` | NULL)
: | scylla_encryption_options: '=' '{'[`cipher_algorithm` : <hash>]','[`secret_key_strength` : <len>]','[`key_provider`: <provider>]'}'
For instance:
@@ -1221,11 +1215,6 @@ The ``ALTER TABLE`` statement can:
- Change or add any of the ``Encryption options`` above.
- Change or add any of the :ref:`CDC options <cdc-options>` above.
- Change or add per-partition rate limits. See :ref:`Limiting the rate of requests per partition <ddl-per-parition-rate-limit>`.
- Enable `per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
using the given column as the expiration-time column, or disable per-row
TTL on this table. If per-row TTL is already enabled, to change the choice
of expiration-time column you must first disable per-row TTL and then
re-enable it using the chosen column.
.. warning:: Dropping a column assumes that the timestamps used for the value of this column are "real" timestamp in
microseconds. Using "real" timestamps in microseconds is the default is and is **strongly** recommended, but as

View File

@@ -277,9 +277,6 @@ Dropping a secondary index uses the ``DROP INDEX`` statement:
The ``DROP INDEX`` statement is used to drop an existing secondary index. The argument of the statement is the index
name, which may optionally specify the keyspace of the index.
If the index is currently being built, the ``DROP INDEX`` can still be executed. Once the ``DROP INDEX`` command is issued,
the system stops the build process and cleans up any partially built data associated with the index.
.. If the index does not exists, the statement will return an error, unless ``IF EXISTS`` is used in which case the
.. operation is a no-op.

View File

@@ -13,19 +13,6 @@ The TTL can be set when defining a Table (CREATE), or when using the INSERT and
The expiration works at the individual column level, which provides a lot of flexibility.
By default, the TTL value is null, which means that the data will not expire.
This document is about CQL's classic per-write TTL feature, where individual
columns from the same row can expire at separate times if written at
different times. ScyllaDB also supports an alternative TTL feature,
`Per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_.
In *per-row TTL* each row has an expiration time for the entire row,
defined by the value of the expiration-time column. In per-row TTL, the
entire row expires together regardless of how its indivial columns were
written, and the expiration time of an entire row can be modified by modifying
the expiration-time column. Another benefit of per-row TTL is that it
generates a CDC event when a row expires - in contrast in per-write TTL
(the feature described in this document) where expiration events do not
show up in CDC.
.. note::
The expiration time is always calculated as *now() on the Coordinator + TTL* where, *now()* is the wall clock during the corresponding write operation.

View File

@@ -10,11 +10,6 @@ The CDC log table reflects operations that are performed on the base table. Diff
* row range deletions,
* partition deletions.
Note that TTL expirations are not operations, and not reflected in the CDC
log tables. If you do need CDC events when entire rows expire, consider
using `per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
which does generate special CDC events when rows expire.
The following sections describe how each of these operations are handled by the CDC log.
.. include:: /features/cdc/_common/cdc-updates.rst

View File

@@ -15,10 +15,6 @@ It is not always clear under which circumstances data is deleted when using Time
This article clarifies what may not be apparent.
It corrects some assumptions you may have that are not exactly true.
This document is about CQL's :doc:`per-write TTL feature </cql/time-to-live>`,
the `per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
feature behaves differently.
Facts About Expiring Data
-------------------------

View File

@@ -898,63 +898,6 @@ By default, each input sstable is filtered individually. Use ``--merge`` to filt
Output sstables use the latest supported sstable format (can be changed with ``--sstable-version``).
split
^^^^^
Split SSTable(s) into multiple output SSTables based on token boundaries.
This operation divides SSTable(s) according to the specified split tokens, creating one output SSTable per token range.
This is useful for redistributing data across different token ranges, such as when preparing data for different nodes or shards.
Tokens should be provided via the ``--split-token`` (or ``-t``) option. Multiple tokens can be specified by repeating the option.
The tokens will be sorted automatically to ensure proper ordering.
For N split tokens, N+1 output SSTables will be generated:
* First SSTable: from minimum token to first split token
* Middle SSTables: between consecutive split tokens
* Last SSTable: from last split token to maximum token
By default, each input SSTable is split individually. Use ``--merge`` to split the combined content of all input SSTables, producing a single set of output SSTables.
Output SSTables use the latest supported sstable format (can be changed with ``--sstable-version``) and are written to the directory specified by ``--output-dir``.
**Examples:**
Split a single SSTable at token boundaries 100 and 500:
.. code-block:: console
scylla sstable split --split-token 100 --split-token 500 /path/to/md-123456-big-Data.db
Or using the short-hand form:
.. code-block:: console
scylla sstable split -t 100 -t 500 /path/to/md-123456-big-Data.db
This will create 3 output SSTables:
* One containing partitions with tokens < 100
* One containing partitions with tokens >= 100 and < 500
* One containing partitions with tokens >= 500
Split multiple SSTables individually:
.. code-block:: console
scylla sstable split -t 100 -t 500 /path/to/md-123456-big-Data.db /path/to/md-123457-big-Data.db
This will split each input SSTable separately, creating 6 output SSTables total (3 per input).
Split multiple SSTables as a combined stream:
.. code-block:: console
scylla sstable split --merge -t 100 -t 500 /path/to/md-123456-big-Data.db /path/to/md-123457-big-Data.db
This will merge both input SSTables first, then split the combined data, creating 3 output SSTables.
Examples
--------
Dumping the content of the SStable:

8
docs/poetry.lock generated
View File

@@ -954,14 +954,14 @@ markdown = ">=3.4"
[[package]]
name = "sphinx-multiversion-scylla"
version = "0.3.7"
version = "0.3.4"
description = "Add support for multiple versions to sphinx"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "sphinx_multiversion_scylla-0.3.7-py3-none-any.whl", hash = "sha256:6205d261a77c90b7ea3105311d1d56014736a5148966133c34344512bb8c4e4f"},
{file = "sphinx_multiversion_scylla-0.3.7.tar.gz", hash = "sha256:fc1ddd58e82cfd8810c1be6db8717a244043c04c1c632e9bd1436415d1db0d3b"},
{file = "sphinx_multiversion_scylla-0.3.4-py3-none-any.whl", hash = "sha256:e64d49d39a8eccf06a9cb8bbe88eecb3eb2082e6b91a478b55dc7d0268d8e0b6"},
{file = "sphinx_multiversion_scylla-0.3.4.tar.gz", hash = "sha256:8f7c94a89c794334d78ef21761a8bf455aaa7361e71037cf2ac2ca51cb47a0ba"},
]
[package.dependencies]
@@ -1592,4 +1592,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "6cca33ab39b6d8817d0502530788dc74a1d3d7e6683d0eda25884d41e38fc741"
content-hash = "9a17caa38b3c88f3fe3d1a60fdb73a96aa12ff1e30ecb00e2f9249e7ba9f859c"

View File

@@ -13,7 +13,7 @@ sphinx-scylladb-theme = "^1.8.10"
sphinx-sitemap = "^2.6.0"
sphinx-autobuild = "^2024.4.19"
Sphinx = "^8.0.0"
sphinx-multiversion-scylla = "^0.3.7"
sphinx-multiversion-scylla = "^0.3.4"
sphinxcontrib-datatemplates = "^0.9.2"
sphinx-scylladb-markdown = "^0.1.4"
sphinx_collapse ="^0.1.3"

View File

@@ -11,13 +11,9 @@ ScyllaDB. This means that:
* You should follow the upgrade policy:
* Starting with version **2025.4**, upgrades can **skip minor versions** if:
* They remain within the same major version (for example, upgrading
directly from *2025.1 → 2025.4* is supported).
* You upgrade to the next major version (for example, upgrading
directly from *2025.3 → 2026.1* is supported).
* Starting with version **2025.4**, upgrades can skip minor versions as long
as they remain within the same major version (for example, upgrading directly
from 2025.1 → 2025.4 is supported).
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
each successive X.Y version must be installed in order, **without skipping
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3

View File

@@ -4,7 +4,8 @@ Upgrade ScyllaDB
.. toctree::
ScyllaDB 2025.x to ScyllaDB 2026.1 <upgrade-guide-from-2025.x-to-2026.1/index>
ScyllaDB 2025.x to ScyllaDB 2025.4 <upgrade-guide-from-2025.x-to-2025.4/index>
ScyllaDB 2025.4 Patch Upgrades <upgrade-guide-from-2025.4.x-to-2025.4.y>
ScyllaDB Image <ami-upgrade>

View File

@@ -0,0 +1,266 @@
.. |SCYLLA_NAME| replace:: ScyllaDB
.. |SRC_VERSION| replace:: 2025.4.x
.. |NEW_VERSION| replace:: 2025.4.y
==========================================================================
Upgrade - |SCYLLA_NAME| |SRC_VERSION| to |NEW_VERSION| (Patch Upgrades)
==========================================================================
This document describes a step-by-step procedure for upgrading from
|SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION| (where "y" is
the latest available version), and rolling back to version |SRC_VERSION|
if necessary.
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL),
CentOS, Debian, and Ubuntu.
See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
for information about supported versions.
It also applies to the ScyllaDB official image on EC2, GCP, or Azure.
Upgrade Procedure
=================
.. note::
Apply the following procedure **serially** on each node. Do not move to the next
node before validating that the node is up and running the new version.
A ScyllaDB upgrade is a rolling procedure that does **not** require a full cluster
shutdown. For each of the nodes in the cluster, you will:
#. Drain the node and back up the data.
#. Backup configuration file.
#. Stop ScyllaDB.
#. Download and install new ScyllaDB packages.
#. Start ScyllaDB.
#. Validate that the upgrade was successful.
**Before** upgrading, check which version you are running now using
``scylla --version``. Note the current version in case you want to roll back
the upgrade.
**During** the rolling upgrade it is highly recommended:
* Not to use new |NEW_VERSION| features.
* Not to run administration functions, like repairs, refresh, rebuild or add
or remove nodes. See
`sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
ScyllaDB Manager's scheduled or running repairs.
* Not to apply schema changes.
Upgrade Steps
=============
Back up the data
------------------------------
Back up all the data to an external device. We recommend using
`ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
to create backups.
Alternatively, you can use the ``nodetool snapshot`` command.
For **each** node in the cluster, run the following:
.. code:: sh
nodetool drain
nodetool snapshot
Take note of the directory name that nodetool gives you, and copy all
the directories with this name under ``/var/lib/scylla`` to a backup device.
When the upgrade is completed on all nodes, remove the snapshot with the
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of
space.
Back up the configuration file
------------------------------
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
in case you need to roll back the upgrade.
.. tabs::
.. group-tab:: Debian/Ubuntu
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
.. group-tab:: RHEL/CentOS
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
Gracefully stop the node
------------------------
.. code:: sh
sudo service scylla-server stop
Download and install the new release
------------------------------------
You dont need to update the ScyllaDB DEB or RPM repo when you upgrade to
a patch release.
.. tabs::
.. group-tab:: Debian/Ubuntu
To install a patch version on Debian or Ubuntu, run:
.. code:: sh
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
To install a patch version on RHEL or CentOS, run:
.. code:: sh
sudo yum clean all
sudo yum update scylla\* -y
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If you're using the ScyllaDB official image (recommended), see
the **Debian/Ubuntu** tab for upgrade instructions.
If you're using your own image and have installed ScyllaDB packages for
Ubuntu or Debian, you need to apply an extended upgrade procedure:
#. Install the new ScyllaDB version with the additional
``scylla-machine-image`` package:
.. code-block:: console
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
sudo apt-get dist-upgrade scylla-machine-image
#. Run ``scylla_setup`` without ``running io_setup``.
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
Start the node
--------------
.. code:: sh
sudo service start scylla-server
Validate
--------
#. Check cluster status with ``nodetool status`` and make sure **all** nodes,
including the one you just upgraded, are in UN status.
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
to check the ScyllaDB version.
#. Use ``journalctl _COMM=scylla`` to check there are no new errors in the log.
#. Check again after 2 minutes to validate that no new issues are introduced.
Once you are sure the node upgrade is successful, move to the next node in
the cluster.
Rollback Procedure
==================
The following procedure describes a rollback from ScyllaDB release
|NEW_VERSION| to |SRC_VERSION|. Apply this procedure if an upgrade from
|SRC_VERSION| to |NEW_VERSION| failed before completing on all nodes.
* Use this procedure only on nodes you upgraded to |NEW_VERSION|.
* Execute the following commands one node at a time, moving to the next node only
after the rollback procedure is completed successfully.
ScyllaDB rollback is a rolling procedure that does **not** require a full
cluster shutdown. For each of the nodes to roll back to |SRC_VERSION|, you will:
#. Drain the node and stop ScyllaDB.
#. Downgrade to the previous release.
#. Restore the configuration file.
#. Restart ScyllaDB.
#. Validate the rollback success.
Rollback Steps
==============
Gracefully shutdown ScyllaDB
-----------------------------
.. code:: sh
nodetool drain
sudo service stop scylla-server
Downgrade to the previous release
----------------------------------
.. tabs::
.. group-tab:: Debian/Ubuntu
To downgrade to |SRC_VERSION| on Debian or Ubuntu, run:
.. code-block:: console
:substitutions:
sudo apt-get install scylla=|SRC_VERSION|\* scylla-server=|SRC_VERSION|\* scylla-tools=|SRC_VERSION|\* scylla-tools-core=|SRC_VERSION|\* scylla-kernel-conf=|SRC_VERSION|\* scylla-conf=|SRC_VERSION|\*
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
To downgrade to |SRC_VERSION| on RHEL or CentOS, run:
.. code-block:: console
:substitutions:
sudo yum downgrade scylla\*-|SRC_VERSION|-\* -y
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If youre using the ScyllaDB official image (recommended), see
the **Debian/Ubuntu** tab for upgrade instructions.
If youre using your own image and have installed ScyllaDB packages for
Ubuntu or Debian, you need to additionally downgrade
the ``scylla-machine-image`` package.
.. code-block:: console
:substitutions:
sudo apt-get install scylla=|SRC_VERSION|\* scylla-server=|SRC_VERSION|\* scylla-tools=|SRC_VERSION|\* scylla-tools-core=|SRC_VERSION|\* scylla-kernel-conf=|SRC_VERSION|\* scylla-conf=|SRC_VERSION|\*
sudo apt-get install scylla-machine-image=|SRC_VERSION|\*
Answer y to the first two questions.
Restore the configuration file
------------------------------
.. code:: sh
sudo rm -rf /etc/scylla/scylla.yaml
sudo cp -a /etc/scylla/scylla.yaml.backup /etc/scylla/scylla.yaml
Start the node
--------------
.. code:: sh
sudo service scylla-server start
Validate
--------
Check upgrade instruction above for validation. Once you are sure the node
rollback is successful, move to the next node in the cluster.

View File

@@ -0,0 +1,13 @@
==========================================================
Upgrade - ScyllaDB 2025.x to ScyllaDB 2025.4
==========================================================
.. toctree::
:maxdepth: 2
:hidden:
Upgrade ScyllaDB <upgrade-guide-from-2025.x-to-2025.4>
Metrics Update <metric-update-2025.x-to-2025.4>
* :doc:`Upgrade from ScyllaDB 2025.x to ScyllaDB 2025.4 <upgrade-guide-from-2025.x-to-2025.4>`
* :doc:`Metrics Update Between 2025.x and 2025.4 <metric-update-2025.x-to-2025.4>`

View File

@@ -0,0 +1,68 @@
.. |SRC_VERSION| replace:: 2025.x
.. |NEW_VERSION| replace:: 2025.4
.. |PRECEDING_VERSION| replace:: 2025.3
================================================================
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
================================================================
.. toctree::
:maxdepth: 2
:hidden:
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
New Metrics in |NEW_VERSION|
--------------------------------------
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
.. list-table::
:widths: 25 150
:header-rows: 1
* - Metric
- Description
* - scylla_database_total_view_updates_due_to_replica_count_mismatch
- The total number of view updates for which there were more view replicas
than base replicas and we had to generate an extra view update because
the additional view replica wouldn't get paired with any base replica.
It should only increase during the Replication Factor (RF) change. It
should stop increasing shortly after finishing the RF change.
* - scylla_database_total_writes_rejected_due_to_out_of_space_prevention
- Counts write operations that were rejected due to disabled user tables
writes.
* - scylla_index_query_latencies
- Index query latencies.
* - scylla_reactor_aio_retries
- The total number of IOCB-s re-submitted via thread-pool.
* - scylla_reactor_io_threaded_fallbacks
- The total number of io-threaded-fallbacks operations.
* - scylla_repair_inc_sst_read_bytes
- The total number of bytes read from SStables for incremental repair
on this shard.
* - scylla_repair_inc_sst_skipped_bytes
- The total number of bytes skipped from SStables for incremental repair
on this shard.
* - scylla_repair_tablet_time_ms
- The time spent on tablet repair on this shard (in milliseconds).
* - scylla_s3_downloads_blocked_on_memory
- Counts the number of times the S3 client downloads were delayed due to
insufficient memory availability.
* - scylla_s3_memory_usage
- The total number of bytes consumed by the S3 client.
* - scylla_s3_total_read_prefetch_bytes
- The total number of bytes requested from object.
* - scylla_storage_proxy_replica_fenced_out_requests
- The number of requests that resulted in a stale_topology_exception.
* - scylla_vector_store_dns_refreshes
- The number of DNS refreshes.
New and Updated Metrics in Previous 2025.x Releases
-------------------------------------------------------
* `Metrics Update Between 2025.2 and 2025.3 <https://docs.scylladb.com/manual/branch-2025.3/upgrade/upgrade-guides/upgrade-guide-from-2025.2-to-2025.3/metric-update-2025.2-to-2025.3.html>`_
* `Metrics Update Between 2025.1 and 2025.2 <https://docs.scylladb.com/manual/branch-2025.2/upgrade/upgrade-guides/upgrade-guide-from-2025.1-to-2025.2/metric-update-2025.1-to-2025.2.html>`_

View File

@@ -0,0 +1,371 @@
.. |SCYLLA_NAME| replace:: ScyllaDB
.. |SRC_VERSION| replace:: 2025.x
.. |NEW_VERSION| replace:: 2025.4
.. |ROLLBACK| replace:: rollback
.. _ROLLBACK: ./#rollback-procedure
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2025.x to 2025.4
.. _SCYLLA_METRICS: ../metric-update-2025.x-to-2025.4
=======================================================================================
Upgrade from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
=======================================================================================
This document describes a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
to |SCYLLA_NAME| |NEW_VERSION| and rollback to version |SRC_VERSION| if necessary.
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS, Debian,
and Ubuntu. See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
for information about supported versions.
It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
Before You Upgrade ScyllaDB
==============================
**Upgrade Your Driver**
If you're using a `ScyllaDB driver <https://docs.scylladb.com/stable/drivers/index.html>`_,
upgrade the driver before upgrading ScyllaDB. The latest two versions of each driver
are supported. See `Driver Support <https://docs.scylladb.com/stable/versioning/driver-support.html>`_.
**Upgrade ScyllaDB Monitoring Stack**
If you're using the ScyllaDB Monitoring Stack, verify that your Monitoring Stack
version supports the ScyllaDB version to which you want to upgrade. See
`ScyllaDB Monitoring Stack Support Matrix <https://monitoring.docs.scylladb.com/stable/reference/matrix.html>`_.
We recommend upgrading the Monitoring Stack to the latest version.
**Check Feature Updates**
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
at the `ScyllaDB Community Forum <https://forum.scylladb.com/c/scylladb-release-notes/>`_.
Upgrade Procedure
=================
A ScyllaDB upgrade is a rolling procedure that does **not** require full cluster shutdown.
For each of the nodes in the cluster, serially (i.e., one node at a time), you will:
* Check that the cluster's schema is synchronized
* Drain the node and backup the data
* Backup the configuration file
* Stop ScyllaDB
* Download and install new ScyllaDB packages
* Start ScyllaDB
* Validate that the upgrade was successful
.. caution::
Apply the procedure **serially** on each node. Do not move to the next node before
validating that the node you upgraded is up and running the new version.
**During** the rolling upgrade, it is highly recommended:
* Not to use the new |NEW_VERSION| features.
* Not to run administration functions, such as repairs, refresh, rebuild, or add
or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
ScyllaDB Manager's scheduled or running repairs.
* Not to apply schema changes.
Upgrade Steps
=============
Check the cluster schema
-------------------------
Make sure that all nodes have the schema synchronized before the upgrade. The upgrade
procedure will fail if there is a schema disagreement between nodes.
.. code:: sh
nodetool describecluster
Backup the data
-----------------------------------
Before any major procedure, like an upgrade, it is recommended to backup all the data
to an external device.
We recommend using `ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
to create backups.
Alternatively, you can use the ``nodetool snapshot`` command.
For **each** node in the cluster, run the following:
.. code:: sh
nodetool drain
nodetool snapshot
Take note of the directory name that nodetool gives you, and copy all the directories
having that name under ``/var/lib/scylla`` to an external backup device.
When the upgrade is completed on all nodes, remove the snapshot with the
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of space.
Backup the configuration file
------------------------------
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
in case you need to rollback the upgrade.
.. tabs::
.. group-tab:: Debian/Ubuntu
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
.. group-tab:: RHEL/CentOS
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
Gracefully stop the node
------------------------
.. code:: sh
sudo service scylla-server stop
Download and install the new release
------------------------------------
Before upgrading, check what version you are running now using ``scylla --version``.
You should take note of the current version in case you want to |ROLLBACK|_ the upgrade.
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Update the ScyllaDB deb repo to |NEW_VERSION|.
.. code-block:: console
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.4.list
#. Install the new ScyllaDB version:
.. code-block:: console
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
#. Update the ScyllaDB rpm repo to |NEW_VERSION|.
.. code-block:: console
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.4.repo
#. Install the new ScyllaDB version:
.. code:: sh
sudo yum clean all
sudo yum update scylla\* -y
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If youre using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
tab for upgrade instructions.
If youre using your own image and installed ScyllaDB packages for Ubuntu or Debian,
you need to apply an extended upgrade procedure:
#. Update the ScyllaDB deb repo (see the **Debian/Ubuntu** tab).
#. Install the new ScyllaDB version with the additional ``scylla-machine-image`` package:
.. code-block:: console
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
sudo apt-get dist-upgrade scylla-machine-image
#. Run ``scylla_setup`` without ``running io_setup``.
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
Start the node
--------------
.. code:: sh
sudo service scylla-server start
Validate
--------
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including
the one you just upgraded, are in ``UN`` status.
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to
validate there are no new errors in the log.
#. Check again after two minutes to validate no new issues are introduced.
Once you are sure the node upgrade was successful, move to the next node in the cluster.
Rollback Procedure
==================
.. warning::
The rollback procedure can be applied **only** if some nodes have not been
upgraded to |NEW_VERSION| yet. As soon as the last node in the rolling upgrade
procedure is started with |NEW_VERSION|, rollback becomes impossible. At that
point, the only way to restore a cluster to |SRC_VERSION| is by restoring it
from backup.
The following procedure describes a rollback from |SCYLLA_NAME| |NEW_VERSION| to
|SRC_VERSION|. Apply this procedure if an upgrade from |SRC_VERSION| to
|NEW_VERSION| fails before completing on all nodes.
* Use this procedure only on the nodes you upgraded to |NEW_VERSION|.
* Execute the following commands one node at a time, moving to the next node
only after the rollback procedure is completed successfully.
ScyllaDB rollback is a rolling procedure that does **not** require full cluster shutdown.
For each of the nodes you rollback to |SRC_VERSION|, serially (i.e., one node
at a time), you will:
* Drain the node and stop ScyllaDB
* Retrieve the old ScyllaDB packages
* Restore the configuration file
* Reload systemd configuration
* Restart ScyllaDB
* Validate the rollback success
Apply the procedure **serially** on each node. Do not move to the next node
before validating that the rollback was successful and the node is up and
running the old version.
Rollback Steps
==============
Drain and gracefully stop the node
----------------------------------
.. code:: sh
nodetool drain
sudo service scylla-server stop
Restore and install the old release
------------------------------------
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Remove the old repo file.
.. code:: sh
sudo rm -rf /etc/apt/sources.list.d/scylla.list
#. Restore the |SRC_VERSION| packages backed up during the upgrade.
.. code:: sh
sudo cp ~/scylla.list-backup /etc/apt/sources.list.d/scylla.list
sudo chown root.root /etc/apt/sources.list.d/scylla.list
sudo chmod 644 /etc/apt/sources.list.d/scylla.list
#. Install:
.. code-block::
sudo apt-get update
sudo apt-get remove scylla\* -y
sudo apt-get install scylla
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
#. Remove the old repo file.
.. code:: sh
sudo rm -rf /etc/yum.repos.d/scylla.repo
#. Restore the |SRC_VERSION| packages backed up during the upgrade procedure.
.. code:: sh
sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo
sudo chown root.root /etc/yum.repos.d/scylla.repo
sudo chmod 644 /etc/yum.repos.d/scylla.repo
#. Install:
.. code:: console
sudo yum clean all
sudo yum remove scylla\*
sudo yum install scylla
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If youre using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
tab for upgrade instructions.
If youre using your own image and installed ScyllaDB packages for Ubuntu or Debian,
you need to additionally restore the ``scylla-machine-image`` package.
#. Restore the |SRC_VERSION| packages backed up during the upgrade
(see the **Debian/Ubuntu** tab).
#. Install:
.. code-block::
sudo apt-get update
sudo apt-get remove scylla\* -y
sudo apt-get install scylla
sudo apt-get install scylla-machine-image
Answer y to the first two questions.
Restore the configuration file
------------------------------
.. code:: sh
sudo rm -rf /etc/scylla/scylla.yaml
sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml
Reload systemd configuration
----------------------------
You must reload the unit file if the systemd unit file is changed.
.. code:: sh
sudo systemctl daemon-reload
Start the node
--------------
.. code:: sh
sudo service scylla-server start
Validate
--------
Check the upgrade instructions above for validation. Once you are sure the node
rollback is successful, move to the next node in the cluster.

View File

@@ -1,13 +0,0 @@
==========================================================
Upgrade - ScyllaDB 2025.x to ScyllaDB 2026.1
==========================================================
.. toctree::
:maxdepth: 2
:hidden:
Upgrade ScyllaDB <upgrade-guide-from-2025.x-to-2026.1>
Metrics Update <metric-update-2025.x-to-2026.1>
* :doc:`Upgrade from ScyllaDB 2025.x to ScyllaDB 2026.1 <upgrade-guide-from-2025.x-to-2026.1>`
* :doc:`Metrics Update Between 2025.x and 2026.1 <metric-update-2025.x-to-2026.1>`

View File

@@ -1,82 +0,0 @@
.. |SRC_VERSION| replace:: 2025.x
.. |NEW_VERSION| replace:: 2026.1
.. |PRECEDING_VERSION| replace:: 2025.4
================================================================
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
================================================================
.. toctree::
:maxdepth: 2
:hidden:
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
New Metrics in |NEW_VERSION|
--------------------------------------
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
.. list-table::
:widths: 25 150
:header-rows: 1
* - Metric
- Description
* - scylla_alternator_operation_size_kb
- Histogram of item sizes involved in a request.
* - scylla_column_family_total_disk_space_before_compression
- Hypothetical total disk space used if data files weren't compressed
* - scylla_group_name_auto_repair_enabled_nr
- Number of tablets with auto repair enabled.
* - scylla_group_name_auto_repair_needs_repair_nr
- Number of tablets with auto repair enabled that currently need repair.
* - scylla_lsa_compact_time_ms
- Total time spent on segment compaction that was not accounted under ``reclaim_time_ms``.
* - scylla_lsa_evict_time_ms
- Total time spent on evicting objects that was not accounted under ``reclaim_time_ms``,
* - scylla_lsa_reclaim_time_ms
- Total time spent in reclaiming LSA memory back to std allocator.
* - scylla_object_storage_memory_usage
- Total number of bytes consumed by the object storage client.
* - scylla_tablet_ops_failed
- Number of failed tablet auto repair attempts.
* - scylla_tablet_ops_succeeded
- Number of successful tablet auto repair attempts.
Renamed Metrics in |NEW_VERSION|
--------------------------------------
The following metrics are renamed in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
.. list-table::
:widths: 25 150
:header-rows: 1
* - Metric Name in |PRECEDING_VERSION|
- Metric Name in |NEW_VERSION|
* - scylla_s3_memory_usage
- scylla_object_storage_memory_usage
Removed Metrics in |NEW_VERSION|
--------------------------------------
The following metrics are removed in ScyllaDB |NEW_VERSION|.
* scylla_redis_current_connections
* scylla_redis_op_latency
* scylla_redis_operation
* scylla_redis_operation
* scylla_redis_requests_latency
* scylla_redis_requests_served
* scylla_redis_requests_serving
New and Updated Metrics in Previous Releases
-------------------------------------------------------
* `Metrics Update Between 2025.3 and 2025.4 <https://docs.scylladb.com/manual/branch-2025.4/upgrade/upgrade-guides/upgrade-guide-from-2025.x-to-2025.4/metric-update-2025.x-to-2025.4.html>`_
* `Metrics Update Between 2025.2 and 2025.3 <https://docs.scylladb.com/manual/branch-2025.3/upgrade/upgrade-guides/upgrade-guide-from-2025.2-to-2025.3/metric-update-2025.2-to-2025.3.html>`_
* `Metrics Update Between 2025.1 and 2025.2 <https://docs.scylladb.com/manual/branch-2025.2/upgrade/upgrade-guides/upgrade-guide-from-2025.1-to-2025.2/metric-update-2025.1-to-2025.2.html>`_

View File

@@ -1,371 +0,0 @@
.. |SCYLLA_NAME| replace:: ScyllaDB
.. |SRC_VERSION| replace:: 2025.x
.. |NEW_VERSION| replace:: 2026.1
.. |ROLLBACK| replace:: rollback
.. _ROLLBACK: ./#rollback-procedure
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2025.x to 2026.1
.. _SCYLLA_METRICS: ../metric-update-2025.x-to-2026.1
=======================================================================================
Upgrade from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
=======================================================================================
This document describes a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
to |SCYLLA_NAME| |NEW_VERSION| and rollback to version |SRC_VERSION| if necessary.
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS, Debian,
and Ubuntu. See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
for information about supported versions. It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
See :doc:`About Upgrade </upgrade/about-upgrade/>` for the ScyllaDB upgrade policy.
Before You Upgrade ScyllaDB
==============================
**Upgrade Your Driver**
If you're using a `ScyllaDB driver <https://docs.scylladb.com/stable/drivers/index.html>`_,
upgrade the driver before upgrading ScyllaDB. The latest two versions of each driver
are supported. See `Driver Support <https://docs.scylladb.com/stable/versioning/driver-support.html>`_.
**Upgrade ScyllaDB Monitoring Stack**
If you're using the ScyllaDB Monitoring Stack, verify that your Monitoring Stack
version supports the ScyllaDB version to which you want to upgrade. See
`ScyllaDB Monitoring Stack Support Matrix <https://monitoring.docs.scylladb.com/stable/reference/matrix.html>`_.
We recommend upgrading the Monitoring Stack to the latest version.
**Check Feature Updates**
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
at the `ScyllaDB Community Forum <https://forum.scylladb.com/c/scylladb-release-notes/>`_.
Upgrade Procedure
=================
A ScyllaDB upgrade is a rolling procedure that does **not** require full cluster shutdown.
For each of the nodes in the cluster, serially (i.e., one node at a time), you will:
* Check that the cluster's schema is synchronized
* Drain the node and backup the data
* Backup the configuration file
* Stop ScyllaDB
* Download and install new ScyllaDB packages
* Start ScyllaDB
* Validate that the upgrade was successful
.. caution::
Apply the procedure **serially** on each node. Do not move to the next node before
validating that the node you upgraded is up and running the new version.
**During** the rolling upgrade, it is highly recommended:
* Not to use the new |NEW_VERSION| features.
* Not to run administration functions, such as repairs, refresh, rebuild, or add
or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
ScyllaDB Manager's scheduled or running repairs.
* Not to apply schema changes.
Upgrade Steps
=============
Check the cluster schema
-------------------------
Make sure that all nodes have the schema synchronized before the upgrade. The upgrade
procedure will fail if there is a schema disagreement between nodes.
.. code:: sh
nodetool describecluster
Backup the data
-----------------------------------
Before any major procedure, like an upgrade, it is recommended to backup all the data
to an external device.
We recommend using `ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
to create backups.
Alternatively, you can use the ``nodetool snapshot`` command.
For **each** node in the cluster, run the following:
.. code:: sh
nodetool drain
nodetool snapshot
Take note of the directory name that nodetool gives you, and copy all the directories
having that name under ``/var/lib/scylla`` to an external backup device.
When the upgrade is completed on all nodes, remove the snapshot with the
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of space.
Backup the configuration file
------------------------------
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
in case you need to rollback the upgrade.
.. tabs::
.. group-tab:: Debian/Ubuntu
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
.. group-tab:: RHEL/CentOS
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
Gracefully stop the node
------------------------
.. code:: sh
sudo service scylla-server stop
Download and install the new release
------------------------------------
Before upgrading, check what version you are running now using ``scylla --version``.
You should take note of the current version in case you want to |ROLLBACK|_ the upgrade.
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Update the ScyllaDB deb repo to |NEW_VERSION|.
.. code-block:: console
:substitutions:
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/|UBUNTU_SCYLLADB_LIST|
#. Install the new ScyllaDB version:
.. code-block:: console
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
#. Update the ScyllaDB rpm repo to |NEW_VERSION|.
.. code-block:: console
:substitutions:
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/|CENTOS_SCYLLADB_REPO|
#. Install the new ScyllaDB version:
.. code:: sh
sudo yum clean all
sudo yum update scylla\* -y
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If youre using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
tab for upgrade instructions.
If youre using your own image and installed ScyllaDB packages for Ubuntu or Debian,
you need to apply an extended upgrade procedure:
#. Update the ScyllaDB deb repo (see the **Debian/Ubuntu** tab).
#. Install the new ScyllaDB version with the additional ``scylla-machine-image`` package:
.. code-block:: console
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
sudo apt-get dist-upgrade scylla-machine-image
#. Run ``scylla_setup`` without ``running io_setup``.
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
Start the node
--------------
.. code:: sh
sudo service scylla-server start
Validate
--------
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including
the one you just upgraded, are in ``UN`` status.
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to
validate there are no new errors in the log.
#. Check again after two minutes to validate no new issues are introduced.
Once you are sure the node upgrade was successful, move to the next node in the cluster.
Rollback Procedure
==================
.. warning::
The rollback procedure can be applied **only** if some nodes have not been
upgraded to |NEW_VERSION| yet. As soon as the last node in the rolling upgrade
procedure is started with |NEW_VERSION|, rollback becomes impossible. At that
point, the only way to restore a cluster to |SRC_VERSION| is by restoring it
from backup.
The following procedure describes a rollback from |SCYLLA_NAME| |NEW_VERSION| to
|SRC_VERSION|. Apply this procedure if an upgrade from |SRC_VERSION| to
|NEW_VERSION| fails before completing on all nodes.
* Use this procedure only on the nodes you upgraded to |NEW_VERSION|.
* Execute the following commands one node at a time, moving to the next node
only after the rollback procedure is completed successfully.
ScyllaDB rollback is a rolling procedure that does **not** require full cluster shutdown.
For each of the nodes you rollback to |SRC_VERSION|, serially (i.e., one node
at a time), you will:
* Drain the node and stop ScyllaDB
* Retrieve the old ScyllaDB packages
* Restore the configuration file
* Reload systemd configuration
* Restart ScyllaDB
* Validate the rollback success
Apply the procedure **serially** on each node. Do not move to the next node
before validating that the rollback was successful and the node is up and
running the old version.
Rollback Steps
==============
Drain and gracefully stop the node
----------------------------------
.. code:: sh
nodetool drain
sudo service scylla-server stop
Restore and install the old release
------------------------------------
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Remove the old repo file.
.. code:: sh
sudo rm -rf /etc/apt/sources.list.d/scylla.list
#. Restore the |SRC_VERSION| packages backed up during the upgrade.
.. code:: sh
sudo cp ~/scylla.list-backup /etc/apt/sources.list.d/scylla.list
sudo chown root.root /etc/apt/sources.list.d/scylla.list
sudo chmod 644 /etc/apt/sources.list.d/scylla.list
#. Install:
.. code-block::
sudo apt-get update
sudo apt-get remove scylla\* -y
sudo apt-get install scylla
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
#. Remove the old repo file.
.. code:: sh
sudo rm -rf /etc/yum.repos.d/scylla.repo
#. Restore the |SRC_VERSION| packages backed up during the upgrade procedure.
.. code:: sh
sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo
sudo chown root.root /etc/yum.repos.d/scylla.repo
sudo chmod 644 /etc/yum.repos.d/scylla.repo
#. Install:
.. code:: console
sudo yum clean all
sudo yum remove scylla\*
sudo yum install scylla
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If youre using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
tab for upgrade instructions.
If youre using your own image and installed ScyllaDB packages for Ubuntu or Debian,
you need to additionally restore the ``scylla-machine-image`` package.
#. Restore the |SRC_VERSION| packages backed up during the upgrade
(see the **Debian/Ubuntu** tab).
#. Install:
.. code-block::
sudo apt-get update
sudo apt-get remove scylla\* -y
sudo apt-get install scylla
sudo apt-get install scylla-machine-image
Answer y to the first two questions.
Restore the configuration file
------------------------------
.. code:: sh
sudo rm -rf /etc/scylla/scylla.yaml
sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml
Reload systemd configuration
----------------------------
You must reload the unit file if the systemd unit file is changed.
.. code:: sh
sudo systemctl daemon-reload
Start the node
--------------
.. code:: sh
sudo service scylla-server start
Validate
--------
Check the upgrade instructions above for validation. Once you are sure the node
rollback is successful, move to the next node in the cluster.

View File

@@ -187,6 +187,39 @@ std::set<sstring> feature_service::to_feature_set(sstring features_string) {
return features;
}
class persistent_feature_enabler : public i_endpoint_state_change_subscriber {
gossiper& _g;
feature_service& _feat;
db::system_keyspace& _sys_ks;
service::storage_service& _ss;
public:
persistent_feature_enabler(gossiper& g, feature_service& f, db::system_keyspace& s, service::storage_service& ss)
: _g(g)
, _feat(f)
, _sys_ks(s)
, _ss(ss)
{
}
future<> on_join(inet_address ep, locator::host_id id, endpoint_state_ptr state, gms::permit_id) override {
return enable_features();
}
future<> on_change(inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) override {
if (states.contains(application_state::SUPPORTED_FEATURES)) {
return enable_features();
}
return make_ready_future();
}
future<> enable_features();
};
future<> feature_service::enable_features_on_join(gossiper& g, db::system_keyspace& sys_ks, service::storage_service& ss) {
auto enabler = make_shared<persistent_feature_enabler>(g, *this, sys_ks, ss);
g.register_(enabler);
return enabler->enable_features();
}
future<> feature_service::on_system_tables_loaded(db::system_keyspace& sys_ks) {
return enable_features_on_startup(sys_ks);
}
@@ -266,6 +299,31 @@ void feature_service::check_features(const std::set<sstring>& enabled_features,
}
}
future<> persistent_feature_enabler::enable_features() {
if (_ss.raft_topology_change_enabled()) {
co_return;
}
auto loaded_peer_features = co_await _sys_ks.load_peer_features();
auto&& features = _g.get_supported_features(loaded_peer_features, gossiper::ignore_features_of_local_node::no);
// Persist enabled feature in the `system.scylla_local` table under the "enabled_features" key.
// The key itself is maintained as an `unordered_set<string>` and serialized via `to_string`
// function to preserve readability.
std::set<sstring> feats_set = co_await _sys_ks.load_local_enabled_features();
for (feature& f : _feat.registered_features() | std::views::values) {
if (!f && features.contains(f.name())) {
feats_set.emplace(f.name());
}
}
co_await _sys_ks.save_local_enabled_features(std::move(feats_set), true);
co_await _feat.container().invoke_on_all([&features] (feature_service& fs) -> future<> {
auto features_v = features | std::ranges::to<std::set<std::string_view>>();
co_await fs.enable(std::move(features_v));
});
}
future<> feature_service::enable(std::set<std::string_view> list) {
// `gms::feature::enable` should be run within a seastar thread context
return seastar::async([this, list = std::move(list)] {

View File

@@ -81,7 +81,6 @@ public:
gms::feature user_defined_functions { *this, "UDF"sv };
gms::feature alternator_streams { *this, "ALTERNATOR_STREAMS"sv };
gms::feature alternator_ttl { *this, "ALTERNATOR_TTL"sv };
gms::feature cql_row_ttl { *this, "CQL_ROW_TTL"sv };
gms::feature range_scan_data_variant { *this, "RANGE_SCAN_DATA_VARIANT"sv };
gms::feature cdc_generations_v2 { *this, "CDC_GENERATIONS_V2"sv };
gms::feature user_defined_aggregates { *this, "UDA"sv };
@@ -112,7 +111,6 @@ public:
gms::feature large_collection_detection { *this, "LARGE_COLLECTION_DETECTION"sv };
gms::feature range_tombstone_and_dead_rows_detection { *this, "RANGE_TOMBSTONE_AND_DEAD_ROWS_DETECTION"sv };
gms::feature truncate_as_topology_operation { *this, "TRUNCATE_AS_TOPOLOGY_OPERATION"sv };
gms::feature snapshot_as_topology_operation { *this, "SNAPSHOT_AS_TOPOLOGY_OPERATION"sv };
gms::feature secondary_indexes_on_static_columns { *this, "SECONDARY_INDEXES_ON_STATIC_COLUMNS"sv };
gms::feature tablets { *this, "TABLETS"sv };
gms::feature table_digest_insensitive_to_expiry { *this, "TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"sv };
@@ -185,12 +183,12 @@ public:
gms::feature size_based_load_balancing { *this, "SIZE_BASED_LOAD_BALANCING"sv };
gms::feature topology_noop_request { *this, "TOPOLOGY_NOOP_REQUEST"sv };
gms::feature tablets_intermediate_fallback_cleanup { *this, "TABLETS_INTERMEDIATE_FALLBACK_CLEANUP"sv };
gms::feature batchlog_v2 { *this, "BATCHLOG_V2"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
static std::set<sstring> to_feature_set(sstring features_string);
future<> enable_features_on_join(gossiper&, db::system_keyspace&, service::storage_service&);
future<> on_system_tables_loaded(db::system_keyspace& sys_ks);
// Performs the feature check.

View File

@@ -998,24 +998,6 @@ future<> gossiper::send_echo(locator::host_id host_id, std::chrono::milliseconds
return ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + timeout_ms, _abort_source, generation_number, notify_up);
}
future<> gossiper::failure_detector_loop_sleep(std::chrono::seconds duration) {
bool sleep_expired = false;
bool abort_requested = false;
timer<> sleep_timer([&] {
sleep_expired = true;
_failure_detector_loop_cv.signal();
});
auto as_sub = _abort_source.subscribe([&] () noexcept {
abort_requested = true;
sleep_timer.cancel();
_failure_detector_loop_cv.signal();
});
sleep_timer.arm(duration);
while (is_enabled() && !sleep_expired && !abort_requested) {
co_await _failure_detector_loop_cv.when();
}
}
future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, generation_type gossip_generation, uint64_t live_endpoints_version) {
auto last = gossiper::clk::now();
auto diff = gossiper::clk::duration(0);
@@ -1054,7 +1036,7 @@ future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, gene
host_id, node, _live_endpoints, _live_endpoints_version, live_endpoints_version);
co_return;
} else {
co_await failure_detector_loop_sleep(echo_interval);
co_await sleep_abortable(echo_interval, _abort_source).handle_exception_type([] (const abort_requested_exception&) {});
}
}
co_return;
@@ -1070,7 +1052,7 @@ future<> gossiper::failure_detector_loop() {
try {
if (_live_endpoints.empty()) {
logger.debug("failure_detector_loop: Wait until live_nodes={} is not empty", _live_endpoints);
co_await failure_detector_loop_sleep(std::chrono::seconds(1));
co_await sleep_abortable(std::chrono::seconds(1), _abort_source);
continue;
}
auto nodes = _live_endpoints | std::ranges::to<std::vector>();
@@ -2398,7 +2380,6 @@ future<> gossiper::do_stop_gossiping() {
// Set disable flag and cancel the timer makes sure gossip loop will not be scheduled
co_await container().invoke_on_all([] (gms::gossiper& g) {
g._enabled = false;
g._failure_detector_loop_cv.broadcast();
});
_scheduled_gossip_task.cancel();
// Take the semaphore makes sure existing gossip loop is finished
@@ -2514,6 +2495,26 @@ future<> gossiper::wait_alive(noncopyable_function<std::vector<locator::host_id>
return wait_alive_helper(std::move(get_nodes), timeout);
}
future<> gossiper::wait_for_live_nodes_to_show_up(size_t n) {
logger::rate_limit rate_limit{std::chrono::seconds{5}};
// Account for gossip slowness. 3 minutes is probably overkill but we don't want flaky tests.
constexpr auto timeout_delay = std::chrono::minutes{3};
auto timeout = gossiper::clk::now() + timeout_delay;
while (get_live_members().size() < n) {
if (timeout <= gossiper::clk::now()) {
auto err = ::format("Timed out waiting for {} live nodes to show up in gossip", n);
logger.error("{}", err);
throw std::runtime_error{std::move(err)};
}
logger.log(log_level::info, rate_limit,
"Waiting for {} live nodes to show up in gossip, currently {} present...",
n, get_live_members().size());
co_await sleep_abortable(std::chrono::milliseconds(10), _abort_source);
}
logger.info("Live nodes seen in gossip: {}", get_live_members());
}
const versioned_value* gossiper::get_application_state_ptr(locator::host_id endpoint, application_state appstate) const noexcept {
auto eps = get_endpoint_state_ptr(std::move(endpoint));
if (!eps) {
@@ -2572,6 +2573,62 @@ std::string_view gossiper::get_gossip_status(const locator::host_id& endpoint) c
return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS));
}
future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std::optional<int32_t> force_after) const {
static constexpr std::chrono::milliseconds GOSSIP_SETTLE_POLL_INTERVAL_MS{1000};
static constexpr int32_t GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
if (force_after && *force_after == 0) {
logger.warn("Skipped to wait for gossip to settle by user request since skip_wait_for_gossip_to_settle is set zero. Do not use this in production!");
co_return;
}
int32_t total_polls = 0;
int32_t num_okay = 0;
auto ep_size = _endpoint_state_map.size();
auto delay = initial_delay;
co_await sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source);
while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
co_await sleep_abortable(delay, _abort_source);
delay = GOSSIP_SETTLE_POLL_INTERVAL_MS;
auto current_size = _endpoint_state_map.size();
total_polls++;
if (current_size == ep_size && _msg_processing == 0) {
logger.debug("Gossip looks settled");
num_okay++;
} else {
logger.info("Gossip not settled after {} polls.", total_polls);
num_okay = 0;
}
ep_size = current_size;
if (force_after && *force_after > 0 && total_polls > *force_after) {
logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls);
break;
}
}
if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
} else {
logger.info("No gossip backlog; proceeding");
}
}
future<> gossiper::wait_for_gossip_to_settle() const {
auto force_after = _gcfg.skip_wait_for_gossip_to_settle;
if (force_after != 0) {
co_await wait_for_gossip(GOSSIP_SETTLE_MIN_WAIT_MS, force_after);
}
}
future<> gossiper::wait_for_range_setup() const {
logger.info("Waiting for pending range setup...");
auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms);
auto force_after = _gcfg.skip_wait_for_gossip_to_settle;
return wait_for_gossip(ring_delay, force_after);
}
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
// We allow to bootstrap a new node in only two cases:
// 1) The node is a completely new node and no state in gossip at all

View File

@@ -66,6 +66,7 @@ struct gossip_config {
uint32_t ring_delay_ms = 30 * 1000;
uint32_t shadow_round_ms = 300 * 1000;
uint32_t shutdown_announce_ms = 2 * 1000;
uint32_t skip_wait_for_gossip_to_settle = -1;
utils::updateable_value<uint32_t> failure_detector_timeout_ms;
utils::updateable_value<int32_t> force_gossip_generation;
utils::updateable_value<utils::UUID> recovery_leader;
@@ -212,6 +213,8 @@ public:
static constexpr std::chrono::milliseconds INTERVAL{1000};
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
static constexpr std::chrono::milliseconds GOSSIP_SETTLE_MIN_WAIT_MS{5000};
// Maximum difference between remote generation value and generation
// value this node would get if this node were restarted that we are
// willing to accept about a peer.
@@ -522,6 +525,9 @@ public:
future<> wait_alive(std::vector<locator::host_id> nodes, std::chrono::milliseconds timeout);
future<> wait_alive(noncopyable_function<std::vector<locator::host_id>()> get_nodes, std::chrono::milliseconds timeout);
// Wait for `n` live nodes to show up in gossip (including ourself).
future<> wait_for_live_nodes_to_show_up(size_t n);
// Get live members synchronized to all shards
future<std::set<inet_address>> get_live_members_synchronized();
@@ -657,7 +663,12 @@ public:
public:
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
std::string_view get_gossip_status(const locator::host_id& endpoint) const noexcept;
public:
future<> wait_for_gossip_to_settle() const;
future<> wait_for_range_setup() const;
private:
future<> wait_for_gossip(std::chrono::milliseconds, std::optional<int32_t> = {}) const;
uint64_t _nr_run = 0;
uint64_t _msg_processing = 0;
@@ -668,7 +679,6 @@ private:
netw::messaging_service& _messaging;
gossip_address_map& _address_map;
gossip_config _gcfg;
condition_variable _failure_detector_loop_cv;
// Get features supported by a particular node
std::set<sstring> get_supported_features(locator::host_id endpoint) const;
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
@@ -695,7 +705,6 @@ public:
private:
future<> failure_detector_loop();
future<> failure_detector_loop_for_node(locator::host_id node, generation_type gossip_generation, uint64_t live_endpoints_version);
future<> failure_detector_loop_sleep(std::chrono::seconds duration);
};

View File

@@ -26,6 +26,14 @@ struct group0_peer_exchange {
std::variant<std::monostate, service::group0_info, std::vector<service::discovery_peer>> info;
};
enum class group0_upgrade_state : uint8_t {
recovery = 0,
use_pre_raft_procedures = 1,
synchronize = 2,
use_post_raft_procedures = 3,
};
verb [[with_client_info, cancellable]] get_group0_upgrade_state () -> service::group0_upgrade_state;
verb [[with_client_info, with_timeout, ip]] group0_peer_exchange (std::vector<service::discovery_peer> peers) -> service::group0_peer_exchange;
verb [[with_client_info, with_timeout]] group0_modify_config (raft::group_id gid, std::vector<raft::config_member> add, std::vector<raft::server_id> del);

View File

@@ -35,7 +35,6 @@ verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command
verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]], std::optional<full_position> [[version 5.2.0]];
verb [[with_timeout]] truncate (sstring, sstring);
verb [[]] truncate_with_tablets (sstring ks_name, sstring cf_name, service::frozen_topology_guard frozen_guard);
verb [[]] snapshot_with_tablets (utils::chunked_vector<table_id> table_ids, sstring tag, gc_clock::time_point created_at, bool, std::optional<gc_clock::time_point> expiry, service::frozen_topology_guard frozen_guard);
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> service::paxos::prepare_response [[unique_ptr]];
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> bool;
verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]], service::fencing_token fence [[version 2025.4]]);

View File

@@ -94,10 +94,7 @@ void secondary_index_manager::reload() {
auto index_name = it->first;
if (!table_indices.contains(index_name)) {
it = _indices.erase(it);
if (auto mi = _metrics.find(index_name); mi != _metrics.end()) {
mi->second->deregister();
_metrics.erase(mi);
}
_metrics.erase(index_name);
} else {
++it;
}
@@ -239,10 +236,6 @@ stats::stats(const sstring& ks_name, const sstring& index_name) {
.set_skip_when_empty()});
}
void stats::deregister() {
metrics.clear();
}
void stats::add_latency(std::chrono::steady_clock::duration d) {
query_latency.add(d);
}

View File

@@ -118,7 +118,6 @@ public:
stats(const sstring& ks_name, const sstring& index_name);
stats(const stats&) = delete;
stats& operator=(const stats&) = delete;
void deregister();
void add_latency(std::chrono::steady_clock::duration d);
};

View File

@@ -16,7 +16,6 @@
#include "index/vector_index.hh"
#include "index/secondary_index.hh"
#include "index/secondary_index_manager.hh"
#include "index/target_parser.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "utils/managed_string.hh"
@@ -105,19 +104,6 @@ const static std::unordered_map<sstring, std::function<void(const sstring&, cons
{"rescoring", std::bind_front(validate_enumerated_option, boolean_values)},
};
sstring get_vector_index_target_column(const sstring& targets) {
std::optional<rjson::value> json_value = rjson::try_parse(targets);
if (!json_value || !json_value->IsObject()) {
return target_parser::get_target_column_name_from_string(targets);
}
rjson::value* pk = rjson::find(*json_value, "pk");
if (pk && pk->IsArray() && !pk->Empty()) {
return sstring(rjson::to_string_view(pk->GetArray()[0]));
}
return target_parser::get_target_column_name_from_string(targets);
}
bool vector_index::is_rescoring_enabled(const index_options_map& properties) {
auto q = properties.find("quantization");
auto r = properties.find("rescoring");
@@ -334,23 +320,16 @@ bool vector_index::has_vector_index(const schema& s) {
bool vector_index::has_vector_index_on_column(const schema& s, const sstring& target_name) {
for (const auto& index : s.indices()) {
if (is_vector_index_on_column(index, target_name)) {
return true;
auto class_it = index.options().find(db::index::secondary_index::custom_class_option_name);
auto target_it = index.options().find(cql3_parser::index_target::target_option_name);
if (class_it != index.options().end() && target_it != index.options().end()) {
auto custom_class = secondary_index_manager::get_custom_class_factory(class_it->second);
return custom_class && dynamic_cast<vector_index*>((*custom_class)().get()) && target_it->second == target_name;
}
}
return false;
}
bool vector_index::is_vector_index_on_column(const index_metadata& im, const sstring& target_name) {
auto class_it = im.options().find(db::index::secondary_index::custom_class_option_name);
auto target_it = im.options().find(cql3_parser::index_target::target_option_name);
if (class_it != im.options().end() && target_it != im.options().end()) {
auto custom_class = secondary_index_manager::get_custom_class_factory(class_it->second);
return custom_class && dynamic_cast<vector_index*>((*custom_class)().get()) && get_vector_index_target_column(target_it->second) == target_name;
}
return false;
}
/// Returns the schema version of the base table at which the index was created.
/// This is used to determine if the index needs to be rebuilt after a schema change.
/// The CREATE INDEX and DROP INDEX statements does change the schema version.

View File

@@ -34,7 +34,6 @@ public:
table_schema_version index_version(const schema& schema) override;
static bool has_vector_index(const schema& s);
static bool has_vector_index_on_column(const schema& s, const sstring& target_name);
static bool is_vector_index_on_column(const index_metadata& im, const sstring& target_name);
static void check_cdc_options(const schema& schema);
static bool is_rescoring_enabled(const index_options_map& properties);

View File

@@ -80,6 +80,8 @@ fedora_packages=(
gdb
lua-devel
yaml-cpp-devel
antlr3-tool
antlr3-C++-devel
jsoncpp-devel
rapidjson-devel
snappy-devel
@@ -236,69 +238,6 @@ arch_packages=(
snappy
)
ANTLR3_VERSION=3.5.3
ANTLR3_JAR_URL="https://repo1.maven.org/maven2/org/antlr/antlr-complete/${ANTLR3_VERSION}/antlr-complete-${ANTLR3_VERSION}.jar"
ANTLR3_JAR_SHA256=e781de9b3e2cc1297dfdaf656da946a1fd22f449bd9e0ce1e12d488976887f83
ANTLR3_SOURCE_URL="https://github.com/antlr/antlr3/archive/${ANTLR3_VERSION}/antlr3-${ANTLR3_VERSION}.tar.gz"
ANTLR3_SOURCE_SHA256=a0892bcf164573d539b930e57a87ea45333141863a0dd3a49e5d8c919c8a58ab
# Patches from Fedora 43 (src.fedoraproject.org) that apply to the C++ headers
ANTLR3_PATCHES=(
0006-antlr3memory.hpp-fix-for-C-20-mode.patch
0008-unconst-cyclicdfa-gcc-14.patch
)
install_antlr3() {
local prefix=/usr/local
local jardir="${prefix}/share/java"
local bindir="${prefix}/bin"
local includedir="${prefix}/include"
if [ -f "${jardir}/antlr-complete-${ANTLR3_VERSION}.jar" ] \
&& [ -f "${bindir}/antlr3" ] \
&& [ -f "${includedir}/antlr3.hpp" ]; then
echo "antlr3 ${ANTLR3_VERSION} already installed, skipping"
return
fi
local tmpdir
tmpdir=$(mktemp -d)
# Download and install the complete JAR
mkdir -p "${jardir}"
curl -fSL -o "${tmpdir}/antlr-complete-${ANTLR3_VERSION}.jar" "${ANTLR3_JAR_URL}"
echo "${ANTLR3_JAR_SHA256} ${tmpdir}/antlr-complete-${ANTLR3_VERSION}.jar" | sha256sum --check
mv "${tmpdir}/antlr-complete-${ANTLR3_VERSION}.jar" "${jardir}/"
# Create the antlr3 wrapper script
mkdir -p "${bindir}"
cat > "${bindir}/antlr3" <<'WRAPPER'
#!/bin/bash
exec java -cp /usr/local/share/java/antlr-complete-ANTLR3_VERSION.jar org.antlr.Tool "$@"
WRAPPER
sed -i "s/ANTLR3_VERSION/${ANTLR3_VERSION}/" "${bindir}/antlr3"
chmod +x "${bindir}/antlr3"
# Download and extract the source for C++ headers
curl -fSL -o "${tmpdir}/antlr3-${ANTLR3_VERSION}.tar.gz" "${ANTLR3_SOURCE_URL}"
echo "${ANTLR3_SOURCE_SHA256} ${tmpdir}/antlr3-${ANTLR3_VERSION}.tar.gz" | sha256sum --check
tar -xzf "${tmpdir}/antlr3-${ANTLR3_VERSION}.tar.gz" -C "${tmpdir}"
# Apply patches to C++ headers
local srcdir="${tmpdir}/antlr3-${ANTLR3_VERSION}"
local patchdir
patchdir="$(dirname "$0")/tools/antlr3-patches"
for patch in "${ANTLR3_PATCHES[@]}"; do
patch -d "${srcdir}" -p1 < "${patchdir}/${patch}"
done
# Install C++ headers (header-only library)
mkdir -p "${includedir}"
install -m 644 "${srcdir}"/runtime/Cpp/include/* "${includedir}/"
rm -rf "${tmpdir}"
echo "antlr3 ${ANTLR3_VERSION} installed to ${prefix}"
}
go_arch() {
local -A GO_ARCH=(
["x86_64"]=amd64
@@ -451,8 +390,6 @@ elif [ "$ID" = "fedora" ]; then
fi
dnf install -y "${fedora_packages[@]}" "${fedora_python3_packages[@]}"
install_antlr3
# Fedora 45 tightened key checks, and cassandra-stress is not signed yet.
dnf install --no-gpgchecks -y https://github.com/scylladb/cassandra-stress/releases/download/v3.18.1/cassandra-stress-java21-3.18.1-1.noarch.rpm

View File

@@ -347,8 +347,8 @@ install -d -m755 "$retc"/scylla.d
scylla_yaml_dir=$(mktemp -d)
scylla_yaml=$scylla_yaml_dir/scylla.yaml
grep -v api_ui_dir conf/scylla.yaml | grep -v api_doc_dir > $scylla_yaml
echo "api_ui_dir: $prefix/swagger-ui/dist/" >> $scylla_yaml
echo "api_doc_dir: $prefix/api/api-doc/" >> $scylla_yaml
echo "api_ui_dir: /opt/scylladb/swagger-ui/dist/" >> $scylla_yaml
echo "api_doc_dir: /opt/scylladb/api/api-doc/" >> $scylla_yaml
installconfig 644 $scylla_yaml "$retc"/scylla
rm -rf $scylla_yaml_dir

View File

@@ -743,7 +743,7 @@ struct from_lua_visitor {
}
const data_type& elements_type = t.get_elements_type();
vector_dimension_t num_elements = t.get_dimension();
size_t num_elements = t.get_dimension();
using table_pair = std::pair<utils::multiprecision_int, data_value>;
std::vector<table_pair> pairs;

38
main.cc
View File

@@ -1617,6 +1617,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
gcfg.ring_delay_ms = cfg->ring_delay_ms();
gcfg.shadow_round_ms = cfg->shadow_round_ms();
gcfg.shutdown_announce_ms = cfg->shutdown_announce_in_ms();
gcfg.skip_wait_for_gossip_to_settle = cfg->skip_wait_for_gossip_to_settle();
gcfg.group0_id = group0_id;
gcfg.host_id = host_id;
gcfg.failure_detector_timeout_ms = cfg->failure_detector_timeout_in_ms;
@@ -1676,7 +1677,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
service::raft_group0 group0_service{
stop_signal.as_local_abort_source(), raft_gr.local(), messaging,
gossiper.local(), feature_service.local(), group0_client, dbcfg.gossip_scheduling_group};
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client, dbcfg.gossip_scheduling_group};
checkpoint(stop_signal, "starting tablet allocator");
service::tablet_allocator::config tacfg {
@@ -2110,7 +2111,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
db::snapshot_ctl::config snap_cfg = {
.backup_sched_group = dbcfg.streaming_scheduling_group,
};
snapshot_ctl.start(std::ref(db), std::ref(proxy), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
snapshot_ctl.start(std::ref(db), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl] {
snapshot_ctl.stop().get();
});
@@ -2414,7 +2415,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
bm_cfg.delay = std::chrono::milliseconds(cfg->ring_delay_ms());
bm_cfg.replay_cleanup_after_replays = cfg->batchlog_replay_cleanup_after_replays();
bm.start(std::ref(qp), std::ref(sys_ks), std::ref(feature_service), bm_cfg).get();
bm.start(std::ref(qp), std::ref(sys_ks), bm_cfg).get();
auto stop_batchlog_manager = defer_verbose_shutdown("batchlog manager", [&bm] {
bm.stop().get();
});
@@ -2447,6 +2448,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
view_backlog_broker.stop().get();
});
if (!ss.local().raft_topology_change_enabled()) {
startlog.info("Waiting for gossip to settle before accepting client requests...");
gossiper.local().wait_for_gossip_to_settle().get();
}
checkpoint(stop_signal, "allow replaying hints");
proxy.invoke_on_all(&service::storage_proxy::allow_replaying_hints).get();
@@ -2506,18 +2512,22 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sharded<alternator::expiration_service> es;
std::any stop_expiration_service;
// Start the expiration service on all shards.
// This service is used both by Alternator (for its TTL feature)
// and by CQL (for its per-row TTL feature).
checkpoint(stop_signal, "starting the expiration service");
es.start(seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db)),
if (cfg->alternator_port() || cfg->alternator_https_port()) {
// Start the expiration service on all shards.
// Currently we only run it if Alternator is enabled, because
// only Alternator uses it for its TTL feature. But in the
// future if we add a CQL interface to it, we may want to
// start this outside the Alternator if().
checkpoint(stop_signal, "starting the expiration service");
es.start(seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db)),
std::ref(proxy), std::ref(gossiper)).get();
stop_expiration_service = defer_verbose_shutdown("expiration service", [&es] {
es.stop().get();
});
with_scheduling_group(maintenance_scheduling_group, [&es] {
return es.invoke_on_all(&alternator::expiration_service::start);
}).get();
stop_expiration_service = defer_verbose_shutdown("expiration service", [&es] {
es.stop().get();
});
with_scheduling_group(maintenance_scheduling_group, [&es] {
return es.invoke_on_all(&alternator::expiration_service::start);
}).get();
}
db.invoke_on_all(&replica::database::revert_initial_system_read_concurrency_boost).get();
notify_set.notify_all(configurable::system_state::started).get();

View File

@@ -728,7 +728,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::TABLE_LOAD_STATS_V1:
case messaging_verb::TABLE_LOAD_STATS:
case messaging_verb::WORK_ON_VIEW_BUILDING_TASKS:
case messaging_verb::SNAPSHOT_WITH_TABLETS:
return 1;
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:

View File

@@ -210,8 +210,7 @@ enum class messaging_verb : int32_t {
REPAIR_UPDATE_REPAIRED_AT_FOR_MERGE = 81,
WORK_ON_VIEW_BUILDING_TASKS = 82,
NOTIFY_BANNED = 83,
SNAPSHOT_WITH_TABLETS = 84,
LAST = 85,
LAST = 84,
};
} // namespace netw

19
node_ops/CMakeLists.txt Normal file
View File

@@ -0,0 +1,19 @@
add_library(node_ops STATIC)
target_sources(node_ops
PRIVATE
node_ops_ctl.cc)
target_include_directories(node_ops
PUBLIC
${CMAKE_SOURCE_DIR})
target_link_libraries(node_ops
PUBLIC
utils
Seastar::seastar
PRIVATE
service)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(node_ops REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers node_ops
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

210
node_ops/node_ops_ctl.cc Normal file
View File

@@ -0,0 +1,210 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/config.hh"
#include "gms/gossiper.hh"
#include "message/messaging_service.hh"
#include "node_ops/node_ops_ctl.hh"
#include "service/storage_service.hh"
#include "replica/database.hh"
#include <fmt/ranges.h>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "idl/node_ops.dist.hh"
static logging::logger nlogger("node_ops");
node_ops_ctl::node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid)
: ss(ss_)
, host_id(id)
, endpoint(ep)
, tmptr(ss.get_token_metadata_ptr())
, req(cmd, uuid)
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
{}
node_ops_ctl::~node_ops_ctl() {
if (heartbeat_updater_done_fut) {
on_internal_error_noexcept(nlogger, "node_ops_ctl destroyed without stopping");
}
}
const node_ops_id& node_ops_ctl::uuid() const noexcept {
return req.ops_uuid;
}
// may be called multiple times
void node_ops_ctl::start(sstring desc_, std::function<bool(locator::host_id)> sync_to_node) {
desc = std::move(desc_);
nlogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
refresh_sync_nodes(std::move(sync_to_node));
}
void node_ops_ctl::refresh_sync_nodes(std::function<bool(locator::host_id)> sync_to_node) {
// sync data with all normal token owners
sync_nodes.clear();
auto can_sync_with_node = [] (const locator::node& node) {
// Sync with reachable token owners.
// Note that although nodes in `being_replaced` and `being_removed`
// are still token owners, they are known to be dead and can't be sync'ed with.
switch (node.get_state()) {
case locator::node::state::normal:
case locator::node::state::being_decommissioned:
return true;
default:
return false;
}
};
tmptr->for_each_token_owner([&] (const locator::node& node) {
seastar::thread::maybe_yield();
// FIXME: use node* rather than endpoint
auto endpoint = node.host_id();
if (!ignore_nodes.contains(endpoint) && can_sync_with_node(node) && sync_to_node(endpoint)) {
sync_nodes.insert(endpoint);
}
});
for (auto& node : sync_nodes) {
if (!ss.gossiper().is_alive(node)) {
nodes_down.emplace(node);
}
}
if (!nodes_down.empty()) {
auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
nlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
nlogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
}
future<> node_ops_ctl::stop() noexcept {
co_await stop_heartbeat_updater();
}
// Caller should set the required req members before prepare
future<> node_ops_ctl::prepare(node_ops_cmd cmd) noexcept {
return send_to_all(cmd);
}
void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) {
if (heartbeat_updater_done_fut) {
on_internal_error(nlogger, "heartbeat_updater already started");
}
heartbeat_updater_done_fut = heartbeat_updater(cmd);
}
future<> node_ops_ctl::query_pending_op() {
req.cmd = node_ops_cmd::query_pending_ops;
co_await coroutine::parallel_for_each(sync_nodes, [this] (const locator::host_id& node) -> future<> {
auto resp = co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
if (std::ranges::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
}
});
}
future<> node_ops_ctl::stop_heartbeat_updater() noexcept {
if (heartbeat_updater_done_fut) {
as.request_abort();
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
}
}
future<> node_ops_ctl::done(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> node_ops_ctl::abort(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> node_ops_ctl::abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
nlogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
try {
co_await abort(cmd);
} catch (...) {
nlogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) {
req.cmd = cmd;
req.ignore_nodes = ignore_nodes |
std::views::transform([&] (locator::host_id id) { return ss.gossiper().get_address_map().get(id); }) |
std::ranges::to<std::list>();
sstring op_desc = ::format("{}", cmd);
nlogger.info("{}[{}]: Started {}", desc, uuid(), req);
auto cmd_category = categorize_node_ops_cmd(cmd);
co_await coroutine::parallel_for_each(sync_nodes, [&] (const locator::host_id& node) -> future<> {
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
// Note that we still send abort commands to failed nodes.
co_return;
}
try {
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
} catch (const seastar::rpc::unknown_verb_error&) {
if (cmd_category == node_ops_cmd_category::prepare) {
nlogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
} else {
nlogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
}
nodes_unknown_verb.emplace(node);
} catch (const seastar::rpc::closed_error&) {
nlogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc);
nodes_down.emplace(node);
} catch (...) {
nlogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
nodes_failed.emplace(node);
}
});
std::vector<sstring> errors;
if (!nodes_failed.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed));
}
if (!nodes_unknown_verb.empty()) {
if (cmd_category == node_ops_cmd_category::prepare) {
errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
} else {
errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
}
}
if (!nodes_down.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down));
}
if (!errors.empty()) {
co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; "))));
}
nlogger.info("{}[{}]: Finished {}", desc, uuid(), req);
}
future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) {
nlogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
while (!as.abort_requested()) {
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
co_await coroutine::parallel_for_each(sync_nodes, [&] (const locator::host_id& node) -> future<> {
try {
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
} catch (...) {
nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
};
});
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
}
nlogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
}

View File

@@ -18,7 +18,15 @@
#include <seastar/core/abort_source.hh>
#include <list>
#include <unordered_set>
namespace service {
class storage_service;
}
namespace locator {
class token_metadata;
}
class node_ops_info {
public:
@@ -58,6 +66,17 @@ enum class node_ops_cmd : uint32_t {
repair_updater,
};
enum class node_ops_cmd_category {
prepare,
heartbeat,
sync_data,
abort,
done,
other
};
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept;
template <>
struct fmt::formatter<node_ops_cmd> : fmt::formatter<string_view> {
auto format(node_ops_cmd, fmt::format_context& ctx) const -> decltype(ctx.out());
@@ -108,6 +127,43 @@ struct node_ops_cmd_response {
}
};
class node_ops_ctl {
std::unordered_set<locator::host_id> nodes_unknown_verb;
std::unordered_set<locator::host_id> nodes_down;
std::unordered_set<locator::host_id> nodes_failed;
public:
const service::storage_service& ss;
sstring desc;
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
gms::inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
lw_shared_ptr<const locator::token_metadata> tmptr;
std::unordered_set<locator::host_id> sync_nodes;
std::unordered_set<locator::host_id> ignore_nodes;
node_ops_cmd_request req;
std::chrono::seconds heartbeat_interval;
abort_source as;
std::optional<future<>> heartbeat_updater_done_fut;
explicit node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id());
~node_ops_ctl();
const node_ops_id& uuid() const noexcept;
// may be called multiple times
void start(sstring desc_, std::function<bool(locator::host_id)> sync_to_node = [] (locator::host_id) { return true; });
void refresh_sync_nodes(std::function<bool(locator::host_id)> sync_to_node = [] (locator::host_id) { return true; });
future<> stop() noexcept;
// Caller should set the required req members before prepare
future<> prepare(node_ops_cmd cmd) noexcept;
void start_heartbeat_updater(node_ops_cmd cmd);
future<> query_pending_op();
future<> stop_heartbeat_updater() noexcept;
future<> done(node_ops_cmd cmd) noexcept;
future<> abort(node_ops_cmd cmd) noexcept;
future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept;
future<> send_to_all(node_ops_cmd cmd);
future<> heartbeat_updater(node_ops_cmd cmd);
};
template <>
struct fmt::formatter<node_ops_cmd_request> : fmt::formatter<string_view> {
auto format(const node_ops_cmd_request&, fmt::format_context& ctx) const -> decltype(ctx.out());

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
size 6505524
oid sha256:9034610470ff645fab03da5ad6c690e5b41f3307ea4b529c7e63b0786a1289ed
size 6539600

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
size 6502668
oid sha256:0c4bbf51dbe01d684ea5b9a9157781988ed499604d2fde90143bad0b9a5594f0
size 6543944

View File

@@ -375,12 +375,9 @@ public:
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
}
auto ex = named_semaphore_aborted(_semaphore._name);
_ex = std::make_exception_ptr(ex);
_ttl_timer.cancel();
_state = reader_permit::state::preemptive_aborted;
_aux_data.pr.set_exception(ex);
_aux_data.pr.set_exception(named_semaphore_aborted(_semaphore._name));
_semaphore.on_permit_preemptive_aborted();
}
@@ -1526,18 +1523,12 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
// Do not admit the read as it is unlikely to finish before its timeout. The condition is:
// permit's remaining time <= preemptive_abort_factor * permit's time budget
//
// The additional checks are needed:
// + remaining_time > 0 -- to avoid preemptive aborting reads that already timed out but are still
// in the wait list due to scheduling delays. It also effectively disables
// preemptive aborting when the factor is set to 0
// + permit.timeout() < db::no_timeout -- to avoid preemptively aborting reads without timeout.
// Useful is tests when _preemptive_abort_factor is set to 1.0
// to avoid additional sleeps to wait for the read to be shed.
// The additional check for remaining_time > 0 is to avoid preemptive aborting reads
// that already timed out but are still in the wait list due to scheduling delays.
// It also effectively disables preemptive aborting when the factor is set to 0.
const auto time_budget = permit.timeout() - permit.created();
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
if (remaining_time > db::timeout_clock::duration::zero() &&
permit.timeout() < db::no_timeout &&
remaining_time <= _preemptive_abort_factor() * time_budget) {
if (remaining_time > db::timeout_clock::duration::zero() && remaining_time <= _preemptive_abort_factor() * time_budget) {
permit.on_preemptive_aborted();
using ms = std::chrono::milliseconds;
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",

View File

@@ -2505,6 +2505,40 @@ future<std::optional<double>> repair::tablet_repair_task_impl::expected_total_wo
co_return sz ? std::make_optional<double>(sz) : std::nullopt;
}
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept {
switch (cmd) {
case node_ops_cmd::removenode_prepare:
case node_ops_cmd::replace_prepare:
case node_ops_cmd::decommission_prepare:
case node_ops_cmd::bootstrap_prepare:
return node_ops_cmd_category::prepare;
case node_ops_cmd::removenode_heartbeat:
case node_ops_cmd::replace_heartbeat:
case node_ops_cmd::decommission_heartbeat:
case node_ops_cmd::bootstrap_heartbeat:
return node_ops_cmd_category::heartbeat;
case node_ops_cmd::removenode_sync_data:
return node_ops_cmd_category::sync_data;
case node_ops_cmd::removenode_abort:
case node_ops_cmd::replace_abort:
case node_ops_cmd::decommission_abort:
case node_ops_cmd::bootstrap_abort:
return node_ops_cmd_category::abort;
case node_ops_cmd::removenode_done:
case node_ops_cmd::replace_done:
case node_ops_cmd::decommission_done:
case node_ops_cmd::bootstrap_done:
return node_ops_cmd_category::done;
default:
return node_ops_cmd_category::other;
}
}
auto fmt::formatter<node_ops_cmd>::format(node_ops_cmd cmd, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
std::string_view name;

View File

@@ -2633,7 +2633,7 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
all_replayed = co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time));
}
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={} all_replayed={}", req.repair_uuid, from, issue_flush, all_replayed);
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={}", req.repair_uuid, from, issue_flush);
}
);
if (!all_replayed) {

View File

@@ -3304,13 +3304,6 @@ db::replay_position table::highest_flushed_replay_position() const {
return _highest_flushed_rp;
}
struct snapshot_tablet_info {
size_t id;
dht::token first_token, last_token;
db_clock::time_point repair_time;
int64_t repaired_at;
};
struct manifest_json : public json::json_base {
struct info : public json::json_base {
json::json_element<sstring> version;
@@ -3443,7 +3436,6 @@ struct manifest_json : public json::json_base {
json::json_element<uint64_t> index_size;
json::json_element<int64_t> first_token;
json::json_element<int64_t> last_token;
json::json_element<uint64_t> tablet_id;
sstable_info() {
register_params();
@@ -3456,9 +3448,6 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
if (e.tablet_id) {
tablet_id = *e.tablet_id;
}
}
sstable_info(const sstable_info& e) {
register_params();
@@ -3468,7 +3457,6 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
tablet_id = e.tablet_id;
}
sstable_info(sstable_info&& e) {
register_params();
@@ -3478,7 +3466,6 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
tablet_id = e.tablet_id;
}
sstable_info& operator=(sstable_info&& e) {
id = e.id;
@@ -3487,7 +3474,6 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
tablet_id = e.tablet_id;
return *this;
}
private:
@@ -3498,51 +3484,6 @@ struct manifest_json : public json::json_base {
add(&index_size, "index_size");
add(&first_token, "first_token");
add(&last_token, "last_token");
add(&tablet_id, "tablet_id");
}
};
struct tablet_info : public json::json_base {
json::json_element<uint64_t> id;
json::json_element<int64_t> first_token;
json::json_element<int64_t> last_token;
json::json_element<time_t> repair_time;
json::json_element<int64_t> repaired_at;
tablet_info() {
register_params();
}
tablet_info(const snapshot_tablet_info& e) {
register_params();
id = e.id;
first_token = dht::token::to_int64(e.first_token);
last_token = dht::token::to_int64(e.last_token);
repair_time = db_clock::to_time_t(e.repair_time);
repaired_at = e.repaired_at;
}
tablet_info(const tablet_info& e) {
register_params();
id = e.id;
first_token = e.first_token;
last_token = e.last_token;
repair_time = e.repair_time;
repaired_at = e.repaired_at;
}
tablet_info& operator=(tablet_info&& e) {
id = e.id;
first_token = e.first_token;
last_token = e.last_token;
repair_time = e.repair_time;
repaired_at = e.repaired_at;
return *this;
}
private:
void register_params() {
add(&id, "id");
add(&first_token, "first_token");
add(&last_token, "last_token");
add(&repair_time, "repair_time");
add(&repaired_at, "repaired_at");
}
};
@@ -3551,7 +3492,6 @@ struct manifest_json : public json::json_base {
json::json_element<snapshot_info> snapshot;
json::json_element<table_info> table;
json::json_chunked_list<sstable_info> sstables;
json::json_chunked_list<tablet_info> tablets;
manifest_json() {
register_params();
@@ -3563,7 +3503,6 @@ struct manifest_json : public json::json_base {
snapshot = std::move(e.snapshot);
table = std::move(e.table);
sstables = std::move(e.sstables);
tablets = std::move(e.tablets);
}
manifest_json& operator=(manifest_json&& e) {
if (this != &e) {
@@ -3572,7 +3511,6 @@ struct manifest_json : public json::json_base {
snapshot = std::move(e.snapshot);
table = std::move(e.table);
sstables = std::move(e.sstables);
tablets = std::move(e.tablets);
}
return *this;
}
@@ -3583,7 +3521,6 @@ private:
add(&snapshot, "snapshot");
add(&table, "table");
add(&sstables, "sstables");
add(&tablets, "tablets");
}
};
@@ -3597,7 +3534,7 @@ public:
using snapshot_sstable_set = foreign_ptr<std::unique_ptr<utils::chunked_vector<sstables::sstable_snapshot_metadata>>>;
static future<> write_manifest(const locator::topology& topology, snapshot_writer& writer, std::vector<snapshot_sstable_set> sstable_sets, std::vector<snapshot_tablet_info> tablets, sstring name, db::snapshot_options opts, schema_ptr schema, std::optional<int64_t> tablet_count) {
static future<> write_manifest(const locator::topology& topology, snapshot_writer& writer, std::vector<snapshot_sstable_set> sstable_sets, sstring name, db::snapshot_options opts, schema_ptr schema, std::optional<int64_t> tablet_count) {
manifest_json manifest;
manifest_json::info info;
@@ -3633,11 +3570,6 @@ static future<> write_manifest(const locator::topology& topology, snapshot_write
manifest.sstables.push(manifest_json::sstable_info(md));
}
}
for (const auto& sti : tablets) {
manifest.tablets.push(manifest_json::tablet_info(sti));
}
auto streamer = json::stream_object(std::move(manifest));
auto out = co_await writer.stream_for("manifest.json");
std::exception_ptr ex;
@@ -3762,33 +3694,11 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, c
tlogger.debug("snapshot {}: seal_snapshot", name);
const auto& topology = sharded_db.local().get_token_metadata().get_topology();
std::optional<int64_t> min_tablet_count;
std::vector<snapshot_tablet_info> tablets;
std::unordered_set<size_t> tids;
if (t.uses_tablets()) {
SCYLLA_ASSERT(!tablet_counts.empty());
min_tablet_count = *std::ranges::min_element(tablet_counts);
auto erm = t.get_effective_replication_map();
auto& tm = erm->get_token_metadata().tablets().get_tablet_map(s->id());
for (auto& ssts : sstable_sets) {
for (auto& sst : *ssts) {
auto tok = sst.first_token;
auto tid = tm.get_tablet_id(dht::token::from_int64(tok));
sst.tablet_id = tid.id;
if (tids.emplace(tid.id).second) {
auto& tinfo = tm.get_tablet_info(tid);
tablets.emplace_back(snapshot_tablet_info{
.id = tid.id,
.first_token = tm.get_first_token(tid),
.last_token = tm.get_last_token(tid),
.repair_time = tinfo.repair_time,
.repaired_at = tinfo.sstables_repaired_at,
});
}
}
}
}
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, std::move(opts), s, min_tablet_count).handle_exception([&] (std::exception_ptr ptr) {
co_await write_manifest(topology, *writer, std::move(sstable_sets), name, std::move(opts), s, min_tablet_count).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed to seal snapshot in {}: {}.", name, ptr);
ex = std::move(ptr);
});

View File

@@ -39,7 +39,6 @@
#include "utils/hashers.hh"
#include "alternator/extract_from_attrs.hh"
#include "utils/managed_string.hh"
#include "alternator/ttl_tag.hh"
#include <boost/lexical_cast.hpp>
@@ -1073,9 +1072,6 @@ managed_string schema::get_create_statement(const schema_describe_helper& helper
}
} else {
os << "TABLE " << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(cf_name()) << " (";
// Find the name of the per-row TTL column, if there is one, so we
// can mark it with "TTL".
std::optional<std::string> ttl_column = db::find_tag(*this, TTL_TAG_KEY);
for (auto& cdef : all_columns()) {
if (with_internals && dropped_columns().contains(cdef.name_as_text())) {
// If the column has been re-added after a drop, we don't include it right away. Instead, we'll add the
@@ -1086,9 +1082,6 @@ managed_string schema::get_create_statement(const schema_describe_helper& helper
os << "\n ";
column_definition_as_cql_key(os, cdef);
if (ttl_column && *ttl_column == cdef.name_as_text()) {
os << " TTL";
}
os << ",";
}

View File

@@ -50,6 +50,7 @@ target_link_libraries(service
PRIVATE
cql3
mutation
node_ops
raft
repair
streaming

View File

@@ -61,6 +61,10 @@ migration_manager::migration_manager(migration_notifier& notifier, gms::feature_
, _sys_ks(sysks)
, _group0_barrier(this_shard_id() == 0 ?
std::function<future<>()>([this] () -> future<> {
if ((co_await _group0_client.get_group0_upgrade_state()).second == group0_upgrade_state::use_pre_raft_procedures) {
on_internal_error(mlogger, "Trying to pull schema over raft while in pre raft procedures");
}
// This will run raft barrier and will sync schema with the leader
co_await with_scheduling_group(_gossiper.get_scheduling_group(), [this] {
return start_group0_operation().discard_result();

View File

@@ -57,7 +57,7 @@ enum class group0_upgrade_state : uint8_t {
// Schema changes may still arrive from other nodes for some time. However, if no failures occur
// during the upgrade procedure, eventually all nodes should enter `synchronize` state. Then
// the nodes ensure that schema is synchronized across the entire cluster before entering `use_post_raft_procedures`.
synchronize = 2, // Deprecated
synchronize = 2,
// In `use_post_raft_procedures` state the upgrade is finished. The node performs schema changes
// using group 0, i.e. by constructing appropriate Raft commands and sending them to the Raft group 0 cluster.
@@ -66,6 +66,8 @@ enum class group0_upgrade_state : uint8_t {
inline constexpr uint8_t group0_upgrade_state_last = 3;
std::ostream& operator<<(std::ostream&, group0_upgrade_state);
struct wrong_destination {
raft::server_id reached_id;
};
@@ -83,24 +85,3 @@ using raft_ticker_type = seastar::timer<lowres_clock>;
static constexpr raft_ticker_type::duration raft_tick_interval = std::chrono::milliseconds(100);
} // namespace service
template <> struct fmt::formatter<service::group0_upgrade_state> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const service::group0_upgrade_state& s, fmt::format_context& ctx) const {
switch (s) {
case service::group0_upgrade_state::recovery:
return fmt::format_to(ctx.out(), "recovery");
break;
case service::group0_upgrade_state::use_post_raft_procedures:
return fmt::format_to(ctx.out(), "use_post_raft_procedures");
break;
case service::group0_upgrade_state::synchronize:
return fmt::format_to(ctx.out(), "synchronize");
break;
case service::group0_upgrade_state::use_pre_raft_procedures:
return fmt::format_to(ctx.out(), "use_pre_raft_procedures");
break;
}
}
};

View File

@@ -55,11 +55,31 @@ namespace service {
static logging::logger slogger("group0_raft_sm");
group0_state_machine::group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss,
gms::gossiper& gossiper, gms::feature_service& feat)
gms::gossiper& gossiper, gms::feature_service& feat,
bool topology_change_enabled)
: _client(client), _mm(mm), _sp(sp), _ss(ss)
, _gate("group0_state_machine")
, _topology_change_enabled(topology_change_enabled)
, _state_id_handler(ss._topology_state_machine, sp.local_db(), gossiper)
, _feature_service(feat)
, _topology_on_raft_support_listener(feat.supports_consistent_topology_changes.when_enabled([this] () noexcept {
// Using features to decide whether to start fetching topology snapshots
// or not is technically not correct because we also use features to guard
// whether upgrade can be started, and upgrade starts by writing
// to the system.topology table (namely, to the `upgrade_state` column).
// If some node at that point didn't mark the feature as enabled
// locally, there is a risk that it might try to pull a snapshot
// and will decide not to use `raft_pull_snapshot` verb.
//
// The above issue is mitigated by requiring administrators to
// wait until the SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES feature
// is enabled on all nodes.
//
// The biggest value of using a cluster feature here is so that
// the node won't try to fetch a topology snapshot if the other
// node doesn't support it yet.
_topology_change_enabled = true;
}))
, _in_memory_state_machine_enabled(utils::get_local_injector().is_enabled("group0_enable_sm_immediately")) {
_state_id_handler.run();
}
@@ -475,26 +495,28 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
std::optional<service::raft_snapshot> topology_snp;
std::optional<service::raft_snapshot> raft_snp;
auto auth_tables = db::system_keyspace::auth_tables();
std::vector<table_id> tables;
tables.reserve(3);
tables.push_back(db::system_keyspace::topology()->id());
tables.push_back(db::system_keyspace::topology_requests()->id());
tables.push_back(db::system_keyspace::cdc_generations_v3()->id());
if (_topology_change_enabled) {
auto auth_tables = db::system_keyspace::auth_tables();
std::vector<table_id> tables;
tables.reserve(3);
tables.push_back(db::system_keyspace::topology()->id());
tables.push_back(db::system_keyspace::topology_requests()->id());
tables.push_back(db::system_keyspace::cdc_generations_v3()->id());
topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_snapshot(
&_mm._messaging, hid, as, from_id, service::raft_snapshot_pull_params{std::move(tables)});
topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_snapshot(
&_mm._messaging, hid, as, from_id, service::raft_snapshot_pull_params{std::move(tables)});
tables = std::vector<table_id>();
tables.reserve(auth_tables.size() + 1);
tables = std::vector<table_id>();
tables.reserve(auth_tables.size() + 1);
for (const auto& schema : auth_tables) {
tables.push_back(schema->id());
for (const auto& schema : auth_tables) {
tables.push_back(schema->id());
}
tables.push_back(db::system_keyspace::service_levels_v2()->id());
raft_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_snapshot(
&_mm._messaging, hid, as, from_id, service::raft_snapshot_pull_params{std::move(tables)});
}
tables.push_back(db::system_keyspace::service_levels_v2()->id());
raft_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_snapshot(
&_mm._messaging, hid, as, from_id, service::raft_snapshot_pull_params{std::move(tables)});
auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary());

View File

@@ -108,8 +108,10 @@ class group0_state_machine : public raft_state_machine {
storage_service& _ss;
seastar::named_gate _gate;
abort_source _abort_source;
bool _topology_change_enabled;
group0_state_id_handler _state_id_handler;
gms::feature_service& _feature_service;
gms::feature::listener_registration _topology_on_raft_support_listener;
// This boolean controls whether the in-memory data structures should be updated
// after snapshot transfer / command application.
@@ -140,7 +142,7 @@ class group0_state_machine : public raft_state_machine {
future<> reload_state();
public:
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss,
gms::gossiper& gossiper, gms::feature_service& feat);
gms::gossiper& gossiper, gms::feature_service& feat, bool topology_change_enabled);
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;

File diff suppressed because it is too large Load Diff

View File

@@ -103,6 +103,7 @@ class raft_group0 {
sharded<netw::messaging_service>& _ms;
gms::gossiper& _gossiper;
gms::feature_service& _feat;
db::system_keyspace& _sys_ks;
raft_group0_client& _client;
seastar::scheduling_group _sg;
@@ -144,6 +145,7 @@ public:
sharded<netw::messaging_service>& ms,
gms::gossiper& gs,
gms::feature_service& feat,
db::system_keyspace& sys_ks,
raft_group0_client& client,
seastar::scheduling_group sg);
@@ -184,7 +186,7 @@ public:
//
// Also make sure to call `finish_setup_after_join` after the node has joined the cluster and entered NORMAL state.
future<> setup_group0(db::system_keyspace&, const std::unordered_set<gms::inet_address>& initial_contact_nodes, shared_ptr<group0_handshaker> handshaker,
service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm,
std::optional<replace_info>, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled,
const join_node_request_params& params);
// Call during the startup procedure before networking is enabled.
@@ -203,7 +205,18 @@ public:
//
// If the node has just bootstrapped, causes the group 0 server to become a voter.
//
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
// If the node has just upgraded, enables a feature listener for the RAFT feature
// which will start a procedure to create group 0 and switch administrative operations to use it.
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);
// If Raft is disabled or in RECOVERY mode, returns `false`.
// Otherwise:
// - waits for the Raft upgrade procedure to finish if it's currently in progress,
// - performs a Raft read barrier,
// - returns `true`.
//
// This is a prerequisite for performing group 0 configuration operations.
future<bool> wait_for_raft();
// Check whether the given Raft server is a member of group 0 configuration
// according to our current knowledge.
@@ -214,6 +227,18 @@ public:
// if wait_for_raft() was called earlier and returned `true`.
bool is_member(raft::server_id, bool include_voters_only);
// Become a non-voter in group 0.
//
// Assumes we've finished the startup procedure (`setup_group0()` finished earlier).
// `wait_for_raft` must've also been called earlier and returned `true`.
future<> become_nonvoter(abort_source& as, std::optional<raft_timeout> timeout = std::nullopt);
// Make the given server, other than us, a non-voter in group 0.
//
// Assumes we've finished the startup procedure (`setup_group0()` finished earlier).
// `wait_for_raft` must've also been called earlier and returned `true`.
future<> make_nonvoter(raft::server_id, abort_source&, std::optional<raft_timeout> timeout = std::nullopt);
// Modify the voter status of the given servers in group 0.
//
// The `voters_add` are changed to voters, the `voters_del` are changed to non-voters.
@@ -225,6 +250,29 @@ public:
future<> modify_voters(const std::unordered_set<raft::server_id>& voters_add, const std::unordered_set<raft::server_id>& voters_del, abort_source& as,
std::optional<raft_timeout> timeout = std::nullopt);
// Remove ourselves from group 0.
//
// Assumes we've finished the startup procedure (`setup_group0()` finished earlier).
// Assumes to run during decommission, after the node entered LEFT status.
// `wait_for_raft` must've also been called earlier and returned `true`.
//
// FIXME: make it retryable and do nothing if we're not a member.
// Currently if we call leave_group0 twice, it will get stuck the second time
// (it will try to forward an entry to a leader but never find the leader).
// Not sure how easy or hard it is and whether it's a problem worth solving; if decommission crashes,
// one can simply call `removenode` on another node to make sure we areremoved (from group 0 too).
future<> leave_group0();
// Remove `host` from group 0.
//
// Assumes that either
// 1. we've finished bootstrapping and now running a `removenode` operation,
// 2. or we're currently bootstrapping and replacing an existing node.
//
// In both cases, `setup_group0()` must have finished earlier.
// `wait_for_raft` must've also been called earlier and returned `true`
future<> remove_from_group0(raft::server_id);
// Assumes that this node's Raft server ID is already initialized and returns it.
// It's a fatal error if the id is missing.
//
@@ -242,6 +290,10 @@ public:
// or when joining an old cluster which does not support JOIN_NODE RPC).
shared_ptr<group0_handshaker> make_legacy_handshaker(raft::is_voter can_vote);
// Waits until all upgrade to raft group 0 finishes and all nodes switched
// to use_post_raft_procedures.
future<> wait_for_all_nodes_to_finish_upgrade(abort_source& as);
raft_group0_client& client() {
return _client;
}
@@ -275,13 +327,28 @@ private:
static void init_rpc_verbs(raft_group0& shard0_this);
static future<> uninit_rpc_verbs(netw::messaging_service& ms);
future<bool> raft_upgrade_complete() const;
future<> do_abort_and_drain();
// Handle peer_exchange RPC
future<group0_peer_exchange> peer_exchange(discovery::peer_list peers);
raft_server_for_group create_server_for_group0(raft::group_id id, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm);
service::migration_manager& mm, bool topology_change_enabled);
// Creates or joins group 0 and switches schema/topology changes to use group 0.
// Can be restarted after a crash. Does nothing if the procedure was already finished once.
//
// The main part of the procedure which may block (due to concurrent schema changes or communication with
// other nodes) runs in background, so it's safe to call `upgrade_to_group0` and wait for it to finish
// from places which must not block.
//
// Precondition: the SUPPORTS_RAFT cluster feature is enabled.
future<> upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);
// Blocking part of `upgrade_to_group0`, runs in background.
future<> do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);
// Start a Raft server for the cluster-wide group 0 and join it to the group.
// Called during bootstrap or upgrade.
@@ -310,7 +377,7 @@ private:
// Preconditions: Raft local feature enabled
// and we haven't initialized group 0 yet after last Scylla start (`joined_group0()` is false).
// Postcondition: `joined_group0()` is true.
future<> join_group0(std::vector<gms::inet_address> seeds, shared_ptr<group0_handshaker> handshaker, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, db::system_keyspace& sys_ks,
future<> join_group0(std::vector<gms::inet_address> seeds, shared_ptr<group0_handshaker> handshaker, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, db::system_keyspace& sys_ks, bool topology_change_enabled,
const join_node_request_params& params);
// Start an existing Raft server for the cluster-wide group 0.
@@ -324,7 +391,7 @@ private:
// XXX: perhaps it would be good to make this function callable multiple times,
// if we want to handle crashes of the group 0 server without crashing the entire Scylla process
// (we could then try restarting the server internally).
future<> start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);
// Modify the given server voter status in Raft group 0 configuration.
// Retries on raft::commit_status_unknown.

View File

@@ -258,33 +258,65 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source&
throw exceptions::configuration_exception{"cannot start group0 operation in the maintenance mode"};
}
group0_upgrade_state upgrade_state = get_group0_upgrade_state();
if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) {
// The version no longer supports pre raft procedures
on_internal_error(logger, format("unexpected group0 upgrade state {} in start_operation", upgrade_state));
std::pair<rwlock::holder, group0_upgrade_state> upgrade_lock_and_state = co_await get_group0_upgrade_state();
auto [upgrade_lock_holder, upgrade_state] = std::move(upgrade_lock_and_state);
switch (upgrade_state) {
case group0_upgrade_state::synchronize:
logger.info("start_operation: waiting until local node leaves synchronize state to start a group 0 operation");
upgrade_lock_holder.release();
co_await when_any(wait_until_group0_upgraded(as), sleep_abortable(std::chrono::seconds{10}, as));
// Checks whether above wait returned due to sleep timeout, which confirms the upgrade procedure stuck case.
// Returns the corresponding runtime error in such cases.
upgrade_lock_and_state = co_await get_group0_upgrade_state();
upgrade_lock_holder = std::move(upgrade_lock_and_state.first);
upgrade_state = std::move(upgrade_lock_and_state.second);
upgrade_lock_holder.release();
if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) {
throw std::runtime_error{
"Cannot perform schema or topology changes during this time; the cluster is currently upgrading to use Raft for schema operations."
" If this error keeps happening, check the logs of your nodes to learn the state of upgrade. The upgrade procedure may get stuck"
" if there was a node failure."};
}
[[fallthrough]];
case group0_upgrade_state::use_post_raft_procedures: {
auto operation_holder = co_await get_units(_operation_mutex, 1, as);
co_await _raft_gr.group0_with_timeouts().read_barrier(&as, timeout);
// Take `_group0_read_apply_mutex` *after* read barrier.
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
auto read_apply_holder = co_await hold_read_apply_mutex(as);
auto observed_group0_state_id = co_await get_last_group0_state_id();
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
co_return group0_guard {
std::make_unique<group0_guard::impl>(
std::move(operation_holder),
std::move(read_apply_holder),
observed_group0_state_id,
new_group0_state_id,
// Not holding any lock in this case, but move the upgrade lock holder for consistent code
std::move(upgrade_lock_holder),
true
)
};
}
case group0_upgrade_state::recovery:
logger.warn("starting operation in RECOVERY mode (using old procedures)");
[[fallthrough]];
case group0_upgrade_state::use_pre_raft_procedures:
co_return group0_guard {
std::make_unique<group0_guard::impl>(
semaphore_units<>{},
semaphore_units<>{},
utils::UUID{},
generate_group0_state_id(utils::UUID{}),
std::move(upgrade_lock_holder),
false
)
};
}
auto operation_holder = co_await get_units(_operation_mutex, 1, as);
co_await _raft_gr.group0_with_timeouts().read_barrier(&as, timeout);
// Take `_group0_read_apply_mutex` *after* read barrier.
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
auto read_apply_holder = co_await hold_read_apply_mutex(as);
auto observed_group0_state_id = co_await get_last_group0_state_id();
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
co_return group0_guard {
std::make_unique<group0_guard::impl>(
std::move(operation_holder),
std::move(read_apply_holder),
observed_group0_state_id,
new_group0_state_id,
// Not holding any lock in this case, but move the upgrade lock holder for consistent code
rwlock::holder{},
true
)
};
}
template<typename Command>
@@ -373,11 +405,21 @@ future<> raft_group0_client::init() {
}
}
group0_upgrade_state raft_group0_client::get_group0_upgrade_state() {
return _upgrade_state;
future<std::pair<rwlock::holder, group0_upgrade_state>> raft_group0_client::get_group0_upgrade_state() {
auto holder = co_await _upgrade_lock.hold_read_lock();
if (_upgrade_state == group0_upgrade_state::use_pre_raft_procedures) {
co_return std::pair{std::move(holder), _upgrade_state};
}
co_return std::pair{rwlock::holder{}, _upgrade_state};
}
future<> raft_group0_client::set_group0_upgrade_state(group0_upgrade_state state) {
// We could explicitly handle abort here but we assume that if someone holds the lock,
// they will eventually finish (say, due to abort) and release it.
auto holder = co_await _upgrade_lock.hold_write_lock();
auto value = [] (group0_upgrade_state s) constexpr {
switch (s) {
case service::group0_upgrade_state::use_post_raft_procedures:
@@ -401,6 +443,24 @@ future<> raft_group0_client::set_group0_upgrade_state(group0_upgrade_state state
co_await _sys_ks.save_group0_upgrade_state(value(state));
_upgrade_state = state;
if (_upgrade_state == group0_upgrade_state::use_post_raft_procedures) {
_upgraded.broadcast();
}
}
future<> raft_group0_client::wait_until_group0_upgraded(abort_source& as) {
auto sub = as.subscribe([this] () noexcept { _upgraded.broadcast(); });
if (!sub) {
throw abort_requested_exception{};
}
co_await _upgraded.wait([this, &as, sub = std::move(sub)] {
return _upgrade_state == group0_upgrade_state::use_post_raft_procedures || as.abort_requested();
});
if (as.abort_requested()) {
throw abort_requested_exception{};
}
}
future<semaphore_units<>> raft_group0_client::hold_read_apply_mutex(abort_source& as) {

View File

@@ -96,6 +96,8 @@ class raft_group0_client {
// `_upgrade_state` is a cached (perhaps outdated) version of the upgrade state stored on disk.
group0_upgrade_state _upgrade_state{group0_upgrade_state::recovery}; // loaded from disk in `init()`
seastar::rwlock _upgrade_lock;
seastar::condition_variable _upgraded;
std::unordered_map<utils::UUID, std::optional<service::broadcast_tables::query_result>> _results;
@@ -165,10 +167,26 @@ public:
// Returns the current group 0 upgrade state.
//
// The possible transitions are: `use_pre_raft_procedures` -> `synchronize` -> `use_post_raft_procedures`.
// Once entering a state, we cannot rollback (even after a restart - the state is persisted).
//
// Possible values now are: recovery (maintenance mode only) and use_post_raft_procedures
// Other value are no longer supported and if discovered during boot the boot will fail
group0_upgrade_state get_group0_upgrade_state();
// An exception to these rules is manual recovery, represented by `recovery` state.
// It can be entered by the user manually modifying the system.local table through CQL
// and restarting the node. In this state the node will not join group 0 or start the group 0 Raft server,
// it will perform all operations as in `use_pre_raft_procedures`, and not attempt to perform the upgrade.
//
// If the returned state is `use_pre_raft_procedures`, the returned `rwlock::holder`
// prevents the state from being changed (`set_group0_upgrade_state` will wait).
//
// When performing an operation that assumes the state to be `use_pre_raft_procedures`
// (e.g. a schema change using the old method), keep the holder until your operation finishes.
//
// Note that we don't need to hold the lock during `synchronize` or `use_post_raft_procedures`:
// in `synchronize` group 0 operations are disabled, and `use_post_raft_procedures` is the final
// state so it won't change (unless through manual recovery - which should not be required
// in the final state anyway). Thus, when `synchronize` or `use_post_raft_procedures` is returned,
// the holder does not actually hold any lock.
future<std::pair<rwlock::holder, group0_upgrade_state>> get_group0_upgrade_state();
// Ensures that nobody holds any `rwlock::holder`s returned from `get_group0_upgrade_state()`
// then changes the state to `s`.
@@ -177,6 +195,9 @@ public:
// and follow the correct sequence of states.
future<> set_group0_upgrade_state(group0_upgrade_state s);
// Wait until group 0 upgrade enters the `use_post_raft_procedures` state.
future<> wait_until_group0_upgraded(abort_source&);
future<semaphore_units<>> hold_read_apply_mutex(abort_source&);
bool in_recovery() const;

View File

@@ -242,10 +242,7 @@ class storage_proxy::remote {
const db::view::view_building_state_machine& _vb_state_machine;
abort_source _group0_as;
// These two could probably share, but nice to
// have named separations...
seastar::named_gate _truncate_gate;
seastar::named_gate _snapshot_gate;
netw::connection_drop_slot_t _connection_dropped;
netw::connection_drop_registration_t _condrop_registration;
@@ -257,7 +254,6 @@ public:
sharded<paxos::paxos_store>& paxos_store, raft_group0_client& group0_client, topology_state_machine& tsm, const db::view::view_building_state_machine& vbsm)
: _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks), _paxos_store(paxos_store), _group0_client(group0_client), _topology_state_machine(tsm), _vb_state_machine(vbsm)
, _truncate_gate("storage_proxy::remote::truncate_gate")
, _snapshot_gate("storage_proxy::remote::snapshot_gate")
, _connection_dropped(std::bind_front(&remote::connection_dropped, this))
, _condrop_registration(_ms.when_connection_drops(_connection_dropped))
{
@@ -272,7 +268,6 @@ public:
ser::storage_proxy_rpc_verbs::register_read_digest(&_ms, std::bind_front(&remote::handle_read_digest, this));
ser::storage_proxy_rpc_verbs::register_truncate(&_ms, std::bind_front(&remote::handle_truncate, this));
ser::storage_proxy_rpc_verbs::register_truncate_with_tablets(&_ms, std::bind_front(&remote::handle_truncate_with_tablets, this));
ser::storage_proxy_rpc_verbs::register_snapshot_with_tablets(&_ms, std::bind_front(&remote::handle_snapshot_with_tablets, this));
// Register PAXOS verb handlers
ser::storage_proxy_rpc_verbs::register_paxos_prepare(&_ms, std::bind_front(&remote::handle_paxos_prepare, this));
ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&remote::handle_paxos_accept, this));
@@ -287,7 +282,6 @@ public:
future<> stop() {
_group0_as.request_abort();
co_await _truncate_gate.close();
co_await _snapshot_gate.close();
co_await ser::storage_proxy_rpc_verbs::unregister(&_ms);
_stopped = true;
}
@@ -480,12 +474,6 @@ public:
}
}
future<> snapshot_with_tablets(const std::vector<std::pair<sstring, sstring>>& ks_cf_names, sstring tag, const db::snapshot_options& opts) {
co_await seastar::with_gate(_snapshot_gate, [&] () -> future<> {
co_await request_snapshot_with_tablets(ks_cf_names, tag, opts);
});
}
future<> send_truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms) {
auto s = _sp.local_db().find_schema(keyspace, cfname);
auto erm_ptr = s->table().get_effective_replication_map();
@@ -983,18 +971,6 @@ private:
co_await replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname);
}
future<> handle_snapshot_with_tablets(utils::chunked_vector<table_id> ids, sstring tag, gc_clock::time_point ts, bool skip_flush, std::optional<gc_clock::time_point> expiry, service::frozen_topology_guard frozen_guard) {
topology_guard guard(frozen_guard);
db::snapshot_options opts {
.skip_flush = skip_flush,
.created_at = ts,
.expires_at = expiry
};
co_await coroutine::parallel_for_each(ids, [&] (const table_id& id) -> future<> {
co_await replica::database::snapshot_table_on_all_shards(_sp._db, id, tag, opts);
});
}
future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>
handle_paxos_prepare(
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
@@ -1137,15 +1113,11 @@ private:
}
}
using begin_op_func = std::function<std::string()>;
using can_replace_op_with_ongoing_func = std::function<bool(const db::system_keyspace::topology_requests_entry&, const service::global_topology_request&)>;
using create_op_mutations_func = std::function<global_topology_request(topology_request_tracking_mutation_builder&)>;
future<> do_topology_request(std::string_view reason, begin_op_func begin, can_replace_op_with_ongoing_func can_replace_op, create_op_mutations_func create_mutations, std::string_view origin) {
future<> request_truncate_with_tablets(sstring ks_name, sstring cf_name) {
if (this_shard_id() != 0) {
// group0 is only set on shard 0
co_return co_await _sp.container().invoke_on(0, [&] (storage_proxy& sp) {
return sp.remote().do_topology_request(reason, begin, can_replace_op, create_mutations, origin);
return sp.remote().request_truncate_with_tablets(ks_name, cf_name);
});
}
@@ -1154,10 +1126,10 @@ private:
while (true) {
group0_guard guard = co_await _group0_client.start_operation(_group0_as, raft_timeout{});
auto desc = begin();
const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name);
if (!_sp._features.topology_global_request_queue) {
// Check if we already have a similar op for the same table. This can happen when a, say, truncate has timed out
// Check if we already have a truncate queued for the same table. This can happen when a truncate has timed out
// and the client retried by issuing the same truncate again. In this case, instead of failing the request with
// an "Another global topology request is ongoing" error, we can wait for the already queued request to complete.
// Note that we can not do this for a truncate which the topology coordinator has already started processing,
@@ -1166,15 +1138,21 @@ private:
utils::UUID ongoing_global_request_id = _topology_state_machine._topology.global_request_id.value();
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id);
auto global_request = std::get<service::global_topology_request>(topology_requests_entry.request_type);
if (can_replace_op(topology_requests_entry, global_request)) {
global_request_id = ongoing_global_request_id;
slogger.info("Ongoing {} (global request ID {}) detected; waiting for it to complete", desc, global_request_id);
break;
if (global_request == global_topology_request::truncate_table) {
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::truncate_table) {
if (topology_requests_entry.truncate_table_id == table_id) {
global_request_id = ongoing_global_request_id;
slogger.info("Ongoing TRUNCATE for table {}.{} (global request ID {}) detected; waiting for it to complete",
ks_name, cf_name, global_request_id);
break;
}
}
}
slogger.warn("Another global topology request ({}) is ongoing during attempt to {}", global_request, desc);
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to {}, please retry.", desc));
slogger.warn("Another global topology request ({}) is ongoing during attempt to TRUNCATE table {}.{}",
global_request, ks_name, cf_name);
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to TRUNCATE table {}.{}, please retry.",
ks_name, cf_name));
}
}
@@ -1182,24 +1160,28 @@ private:
topology_mutation_builder builder(guard.write_timestamp());
topology_request_tracking_mutation_builder trbuilder(global_request_id, _sp._features.topology_requests_type_column);
auto req = create_mutations(trbuilder);
trbuilder.set_truncate_table_data(table_id)
.set("done", false)
.set("start_time", db_clock::now());
if (!_sp._features.topology_global_request_queue) {
builder.set_global_topology_request(req)
builder.set_global_topology_request(global_topology_request::truncate_table)
.set_global_topology_request_id(global_request_id);
} else {
builder.queue_global_topology_request_id(global_request_id);
trbuilder.set("request_type", req);
trbuilder.set("request_type", global_topology_request::truncate_table);
}
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);
topology_change change{{builder.build(), trbuilder.build()}};
sstring reason = "Truncating table";
group0_command g0_cmd = _group0_client.prepare_command(std::move(change), guard, reason);
try {
co_await _group0_client.add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
slogger.debug("{}: concurrent modification, retrying", origin);
slogger.debug("request_truncate_with_tablets: concurrent modification, retrying");
}
}
@@ -1209,74 +1191,6 @@ private:
throw std::runtime_error(error);
}
}
future<> request_truncate_with_tablets(sstring ks_name, sstring cf_name) {
table_id id;
co_await do_topology_request("Truncating table"
, [&] {
id = _sp.local_db().find_uuid(ks_name, cf_name);
return fmt::format("TRUNCATE table {}.{}", ks_name, cf_name);
}
, [&](const db::system_keyspace::topology_requests_entry& entry, const service::global_topology_request& global_request) {
if (global_request == global_topology_request::truncate_table) {
const std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::truncate_table) {
return entry.truncate_table_id == id;
}
}
return false;
}
, [&](topology_request_tracking_mutation_builder& trbuilder) {
trbuilder.set_truncate_table_data(id)
.set("done", false)
.set("start_time", db_clock::now());
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);
return global_topology_request::truncate_table;
}
, "request_truncate_with_tablets"
);
}
future<> request_snapshot_with_tablets(const std::vector<std::pair<sstring, sstring>> ks_cf_names, sstring tag, const db::snapshot_options& opts) {
std::unordered_set<table_id> ids;
co_await do_topology_request("Snapshot table"
, [&] {
auto& db = _sp.local_db();
for (auto& [ks_name, cf_name] : ks_cf_names) {
if (cf_name.empty()) {
auto& ks = db.find_keyspace(ks_name);
auto id_range = ks.metadata()->cf_meta_data() | std::views::values | std::views::transform(std::mem_fn(&schema::id));
ids.insert(id_range.begin(), id_range.end());
} else {
ids.insert(db.find_uuid(ks_name, cf_name));
}
}
return fmt::format("SNAPSHOT tables {}", ks_cf_names);
}
, [&](const db::system_keyspace::topology_requests_entry& entry, const service::global_topology_request& global_request) {
if (global_request == global_topology_request::snapshot_tables) {
const std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::snapshot_tables) {
return entry.snapshot_table_ids == ids && entry.snapshot_tag == tag;
}
}
return false;
}
, [&](topology_request_tracking_mutation_builder& trbuilder) {
trbuilder.set_snapshot_tables_data(ids, tag, opts.skip_flush)
.set("done", false)
.set("start_time", db_clock::now());
if (opts.expires_at) {
trbuilder.set("snapshot_expiry", db_clock::from_time_t(gc_clock::to_time_t(*opts.expires_at)));
}
slogger.info("Creating SNAPSHOT global topology request for tables {}", ks_cf_names);
return global_topology_request::snapshot_tables;
}
, "request_snapshot_with_tablets"
);
}
};
using namespace exceptions;
@@ -4398,7 +4312,7 @@ storage_proxy::mutate_atomically_result(utils::chunked_vector<mutation> mutation
public:
context(storage_proxy & p, utils::chunked_vector<mutation>&& mutations, lw_shared_ptr<cdc::operation_result_tracker>&& cdc_tracker, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, coordinator_mutate_options options)
: _p(p)
, _schema(_p.local_db().find_schema(db::system_keyspace::NAME, _p.features().batchlog_v2 ? db::system_keyspace::BATCHLOG_V2 : db::system_keyspace::BATCHLOG))
, _schema(_p.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2))
, _ermp(_p.local_db().find_column_family(_schema->id()).get_effective_replication_map())
, _mutations(std::move(mutations))
, _cdc_tracker(std::move(cdc_tracker))
@@ -7156,29 +7070,6 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std:
}
}
future<> storage_proxy::snapshot_keyspace(std::unordered_multimap<sstring, sstring> ks_tables, sstring tag, const db::snapshot_options& opts) {
if (!features().snapshot_as_topology_operation) {
throw std::runtime_error("Cannot do cluster wide snapshot. Feature 'snapshot_as_topology_operation' is not available in cluster");
}
for (auto& [ksname, _] : ks_tables) {
const replica::keyspace& ks = local_db().find_keyspace(ksname);
if (ks.get_replication_strategy().is_local()) {
throw std::invalid_argument(fmt::format("Keyspace {} uses local replication", ksname));
}
if (!ks.uses_tablets()) {
throw std::invalid_argument(fmt::format("Keyspace {} does not use tablets", ksname));
}
}
slogger.debug("Starting a blocking snapshot operation on keyspaces {}", ks_tables);
auto table_pairs = ks_tables | std::views::transform([](auto& p) { return std::pair<sstring, sstring>(p.first, p.second); })
| std::ranges::to<std::vector>()
;
co_await remote().snapshot_with_tablets(table_pairs, tag, opts);
}
db::system_keyspace& storage_proxy::system_keyspace() {
return remote().system_keyspace();
}

View File

@@ -72,7 +72,6 @@ class feature_service;
namespace db {
class system_keyspace;
struct snapshot_options;
namespace view {
struct view_building_state_machine;
@@ -788,11 +787,6 @@ public:
*/
future<> truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms);
/**
* Performs snapshot on keyspace/tables. To snapshot all tables in a keyspace, put "ks: ''" in map
*/
future<> snapshot_keyspace(std::unordered_multimap<sstring, sstring> ks_tables, sstring tag, const db::snapshot_options& opts);
/*
* Executes data query on the whole cluster.
*

File diff suppressed because it is too large Load Diff

View File

@@ -56,6 +56,7 @@
class node_ops_cmd_request;
class node_ops_cmd_response;
struct node_ops_ctl;
class node_ops_info;
enum class node_ops_cmd : uint32_t;
class repair_service;
@@ -224,10 +225,22 @@ private:
utils::sequenced_set<table_id> _tablet_split_candidates;
future<> _tablet_split_monitor = make_ready_future<>();
std::unordered_map<node_ops_id, node_ops_meta_data> _node_ops;
std::list<std::optional<node_ops_id>> _node_ops_abort_queue;
seastar::condition_variable _node_ops_abort_cond;
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
future<> _node_ops_abort_thread;
shared_ptr<node_ops::task_manager_module> _node_ops_module;
shared_ptr<service::task_manager_module> _tablets_module;
shared_ptr<service::topo::task_manager_module> _global_topology_requests_module;
gms::gossip_address_map& _address_map;
void node_ops_insert(node_ops_id, gms::inet_address coordinator, std::list<inet_address> ignore_nodes,
std::function<future<>()> abort_func);
future<> node_ops_update_heartbeat(node_ops_id ops_uuid);
future<> node_ops_done(node_ops_id ops_uuid);
future<> node_ops_abort(node_ops_id ops_uuid);
void node_ops_signal_abort(std::optional<node_ops_id> ops_uuid);
future<> node_ops_abort_thread();
future<service::tablet_operation_result> do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<service::tablet_operation_result>(locator::tablet_metadata_guard&)> op);
@@ -340,6 +353,7 @@ private:
return _batchlog_manager;
}
friend struct ::node_ops_ctl;
friend void check_raft_rpc_scheduling_group(storage_service&, std::string_view);
friend class db::schema_tables::schema_applier;
public:
@@ -456,6 +470,11 @@ private:
future<replacement_info> prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<locator::host_id, sstring>& loaded_peer_features);
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens, replacement_info replace_info);
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);
future<> wait_for_ring_to_settle();
public:
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
@@ -469,6 +488,7 @@ public:
future<> init_address_map(gms::gossip_address_map& address_map);
future<> uninit_address_map();
bool is_topology_coordinator_enabled() const;
future<> drain_on_shutdown();
@@ -480,6 +500,7 @@ private:
bool should_bootstrap();
bool is_replacing();
bool is_first_node();
raft::server* get_group_server_if_raft_topolgy_enabled();
future<> start_sys_dist_ks() const;
future<> join_topology(sharded<service::storage_proxy>& proxy,
std::unordered_set<gms::inet_address> initial_contact_nodes,
@@ -495,6 +516,11 @@ public:
private:
void set_mode(mode m);
// Stream data for which we become a new replica.
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
// and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming.
future<> bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info);
public:
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(sstring keyspace, std::optional<table_id> tid) const;
@@ -610,6 +636,38 @@ private:
sharded<db::view::view_builder>& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
bool _isolated = false;
private:
/**
* Handle node bootstrap
*
* @param endpoint bootstrapping node
*/
future<> handle_state_bootstrap(inet_address endpoint, locator::host_id id, gms::permit_id);
/**
* Handle node move to normal state. That is, node is entering token ring and participating
* in reads.
*
* @param endpoint node
*/
future<> handle_state_normal(inet_address endpoint, locator::host_id id, gms::permit_id);
/**
* Handle node leaving the ring. This will happen when a node is decommissioned
*
* @param endpoint If reason for leaving is decommission, endpoint is the leaving node.
* @param pieces STATE_LEFT,token
*/
future<> handle_state_left(inet_address endpoint, locator::host_id id, std::vector<sstring> pieces, gms::permit_id);
/**
* Handle notification that a node being actively removed from the ring via 'removenode'
*
* @param endpoint node
* @param pieces is REMOVED_TOKEN (node is gone)
*/
future<> handle_state_removed(inet_address endpoint, locator::host_id id, std::vector<sstring> pieces, gms::permit_id);
private:
future<> excise(std::unordered_set<token> tokens, inet_address endpoint_ip, locator::host_id endpoint_hid,
gms::permit_id);
@@ -811,6 +869,17 @@ public:
});
}
template <typename Func>
auto run_with_api_lock_in_gossiper_mode_only(sstring operation, Func&& func) {
return container().invoke_on(0, [operation = std::move(operation),
func = std::forward<Func>(func)] (storage_service& ss) mutable {
if (ss.raft_topology_change_enabled()) {
return func(ss);
}
return ss.run_with_api_lock_internal(ss, std::forward<Func>(func), operation);
});
}
private:
void do_isolate_on_error(disk_error type);
future<> isolate();
@@ -830,6 +899,7 @@ public:
private:
std::unordered_set<locator::host_id> _normal_state_handled_on_boot;
bool is_normal_state_handled_on_boot(locator::host_id);
future<> wait_for_normal_state_handled_on_boot();
friend class group0_state_machine;
@@ -859,6 +929,7 @@ private:
public:
bool raft_topology_change_enabled() const;
bool legacy_topology_change_enabled() const;
private:
future<> _raft_state_monitor = make_ready_future<>();
@@ -955,6 +1026,10 @@ public:
future<> do_clusterwide_vnodes_cleanup();
future<> reset_cleanup_needed();
// Starts the upgrade procedure to topology on raft.
// Must be called on shard 0.
future<> start_upgrade_to_raft_topology();
// Must be called on shard 0.
topology::upgrade_state_type get_topology_upgrade_state() const;
@@ -970,6 +1045,7 @@ public:
private:
// Tracks progress of the upgrade to topology coordinator.
future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>();
future<> track_upgrade_progress_to_topology_coordinator(sharded<service::storage_proxy>& proxy);
future<> transit_tablet(table_id, dht::token, noncopyable_function<std::tuple<utils::chunked_vector<canonical_mutation>, sstring>(const locator::tablet_map& tmap, api::timestamp_type)> prepare_mutations);
future<service::group0_guard> get_guard_for_tablet_update();

View File

@@ -1158,17 +1158,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
co_await update_topology_state(std::move(guard), std::move(updates), "no-op request completed");
}
break;
case global_topology_request::snapshot_tables: {
rtlogger.info("SNAPSHOT TABLES requested");
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::snapshot_tables)
.set_global_topology_request(req)
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.set_session(session_id(req_id));
co_await update_topology_state(std::move(guard), {builder.build()}, "SNAPSHOT TABLES requested");
}
break;
}
}
@@ -2236,11 +2225,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
}
}
using get_table_ids_func = std::function<std::unordered_set<table_id>(const db::system_keyspace::topology_requests_entry&)>;
using send_rpc_func = std::function<future<>(locator::host_id, const service::frozen_topology_guard&)>;
using desc_func = std::function<std::string()>;
future<> handle_topology_ordered_op(group0_guard guard, get_table_ids_func get_table_ids, send_rpc_func send_rpc, desc_func desc, std::string_view what) {
future<> handle_truncate_table(group0_guard guard) {
// Execute a barrier to make sure the nodes we are performing truncate on see the session
// and are able to create a topology_guard using the frozen_guard we are sending over RPC
// TODO: Exclude nodes which don't contain replicas of the table we are truncating
@@ -2252,44 +2237,46 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
// handler performed the truncate and cleared the session, but crashed before finalizing the request
if (_topo_sm._topology.session) {
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id);
std::unordered_set<table_id> tables;
try {
tables = get_table_ids(topology_requests_entry);
} catch (std::exception& e) {
error = e.what();
}
if (!tables.empty()) {
const table_id& table_id = topology_requests_entry.truncate_table_id;
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(table_id);
if (table) {
const sstring& ks_name = table->schema()->ks_name();
const sstring& cf_name = table->schema()->cf_name();
rtlogger.info("Performing TRUNCATE TABLE for {}.{}", ks_name, cf_name);
// Collect the IDs of the hosts with replicas, but ignore excluded nodes
std::unordered_set<locator::host_id> replica_hosts;
for (auto table_id : tables) {
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
for (const locator::tablet_replica& replica: tinfo.replicas) {
if (!_topo_sm._topology.excluded_tablet_nodes.contains(raft::server_id(replica.host.uuid()))) {
replica_hosts.insert(replica.host);
}
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
for (const locator::tablet_replica& replica: tinfo.replicas) {
if (!_topo_sm._topology.excluded_tablet_nodes.contains(raft::server_id(replica.host.uuid()))) {
replica_hosts.insert(replica.host);
}
return make_ready_future<>();
});
}
}
return make_ready_future<>();
});
// Release the guard to avoid blocking group0 for long periods of time while invoking RPCs
release_guard(std::move(guard));
co_await utils::get_local_injector().inject(fmt::format("{}_table_wait", what), utils::wait_for_message(std::chrono::minutes(2)));
co_await utils::get_local_injector().inject("truncate_table_wait", utils::wait_for_message(std::chrono::minutes(2)));
// Check if all the nodes with replicas are alive
for (const locator::host_id& replica_host: replica_hosts) {
if (!_gossiper.is_alive(replica_host)) {
throw std::runtime_error(::format("Cannot perform {} because host {} is down", desc(), replica_host));
throw std::runtime_error(::format("Cannot perform TRUNCATE on table {}.{} because host {} is down", ks_name, cf_name, replica_host));
}
}
// Send the RPC to all replicas
const service::frozen_topology_guard frozen_guard { _topo_sm._topology.session };
co_await coroutine::parallel_for_each(replica_hosts, [&] (const locator::host_id& host_id) -> future<> {
co_await send_rpc(host_id, frozen_guard);
co_await ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
});
} else {
error = ::format("Cannot TRUNCATE table with UUID {} because it does not exist.", table_id);
}
// Clear the session and save the error message
@@ -2309,15 +2296,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
}
try {
co_await update_topology_state(std::move(guard), std::move(updates), fmt::format("Clear {} session", what));
co_await update_topology_state(std::move(guard), std::move(updates), "Clear truncate session");
break;
} catch (group0_concurrent_modification&) {
}
}
}
utils::get_local_injector().inject(fmt::format("{}_crash_after_session_clear", what), [what] {
rtlogger.info("{}_crash_after_session_clear hit, killing the node", what);
utils::get_local_injector().inject("truncate_crash_after_session_clear", [] {
rtlogger.info("truncate_crash_after_session_clear hit, killing the node");
_exit(1);
});
@@ -2343,76 +2330,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
.build());
try {
co_await update_topology_state(std::move(guard), std::move(updates), fmt::format("{}{} has completed", ::toupper(what[0]), what.substr(1)));
co_await update_topology_state(std::move(guard), std::move(updates), "Truncate has completed");
break;
} catch (group0_concurrent_modification&) {
}
}
}
future<> handle_truncate_table(group0_guard guard) {
std::string ks_name, cf_name;
co_await handle_topology_ordered_op(std::move(guard)
, [&](const db::system_keyspace::topology_requests_entry& topology_requests_entry) {
const table_id& id = topology_requests_entry.truncate_table_id;
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(id);
if (table) {
ks_name = table->schema()->ks_name();
cf_name = table->schema()->cf_name();
rtlogger.info("Performing TRUNCATE TABLE for {}.{}", ks_name, cf_name);
return std::unordered_set<table_id>({ id });
}
throw std::invalid_argument(fmt::format("Cannot TRUNCATE table with UUID {} because it does not exist.", id));
}
, [&](locator::host_id host_id, const service::frozen_topology_guard& frozen_guard) {
return ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
}
, [&] {
return fmt::format("TRUNCATE on table {}.{}", ks_name, cf_name);
}
, "truncate"
);
}
future<> handle_snapshot_tables(group0_guard guard) {
utils::chunked_vector<table_id> ids;
sstring tag;
bool skip_flush;
gc_clock::time_point t;
std::optional<gc_clock::time_point> expiry;
co_await handle_topology_ordered_op(std::move(guard)
, [&](const db::system_keyspace::topology_requests_entry& topology_requests_entry) {
tag = *topology_requests_entry.snapshot_tag;
skip_flush = topology_requests_entry.snapshot_skip_flush;
t = gc_clock::from_time_t(db_clock::to_time_t(topology_requests_entry.start_time));
if (topology_requests_entry.snapshot_expiry) {
expiry = gc_clock::from_time_t(db_clock::to_time_t(*topology_requests_entry.snapshot_expiry));
}
for (auto& id : *topology_requests_entry.snapshot_table_ids) {
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(id);
if (!table) {
throw std::invalid_argument(fmt::format("Cannot SNAPSHOT table with UUID {} because it does not exist.", id));
}
ids.emplace_back(id);
}
rtlogger.info("Performing SNAPSHOT TABLES for {}", ids);
return *topology_requests_entry.snapshot_table_ids;
}
, [&](locator::host_id host_id, const service::frozen_topology_guard& frozen_guard) {
return ser::storage_proxy_rpc_verbs::send_snapshot_with_tablets(&_messaging, host_id, ids, tag, t, skip_flush, expiry, frozen_guard);
}
, [&] {
return fmt::format("SNAPSHOT on tables {}", ids);
}
, "snapshot"
);
}
// This function must not release and reacquire the guard, callers rely
// on the fact that the block which calls this is atomic.
// FIXME: Don't take the ownership of the guard to make the above guarantee explicit.
@@ -3289,9 +3213,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
case topology::transition_state::truncate_table:
co_await handle_truncate_table(std::move(guard));
break;
case topology::transition_state::snapshot_tables:
co_await handle_snapshot_tables(std::move(guard));
break;
}
co_return true;
};
@@ -3792,6 +3713,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
future<> refresh_tablet_load_stats();
future<> start_tablet_load_stats_refresher();
// Precondition: the state machine upgrade state is not at upgrade_state::done.
future<> do_upgrade_step(group0_guard);
future<> build_coordinator_state(group0_guard);
future<> await_event() {
_as.check();
co_await _topo_sm.event.when();
@@ -3842,6 +3767,8 @@ public:
_db.get_notifier().register_listener(this);
}
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
future<bool> maybe_run_upgrade();
future<> run();
future<> stop();
@@ -4082,6 +4009,147 @@ future<> topology_coordinator::start_tablet_load_stats_refresher() {
}
}
future<> topology_coordinator::do_upgrade_step(group0_guard guard) {
switch (_topo_sm._topology.upgrade_state) {
case topology::upgrade_state_type::not_upgraded:
on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error(
"topology_coordinator was started even though upgrade to raft topology was not requested yet")));
case topology::upgrade_state_type::build_coordinator_state:
utils::get_local_injector().inject("topology_coordinator_fail_to_build_state_during_upgrade", [] {
throw std::runtime_error("failed to build topology coordinator state due to error injection");
});
co_await build_coordinator_state(std::move(guard));
co_return;
case topology::upgrade_state_type::done:
on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error(
"topology_coordinator::do_upgrade_step called after upgrade was completed")));
}
}
future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
// Wait until all nodes reach use_post_raft_procedures
rtlogger.info("waiting for all nodes to finish upgrade to raft schema");
release_guard(std::move(guard));
co_await _group0.wait_for_all_nodes_to_finish_upgrade(_as);
auto tmptr = get_token_metadata_ptr();
auto sl_version = co_await _sys_ks.get_service_levels_version();
if (!sl_version || *sl_version < 2) {
rtlogger.info("migrating service levels data");
co_await qos::service_level_controller::migrate_to_v2(tmptr->get_normal_token_owners().size(), _sys_ks, _sys_ks.query_processor(), _group0.client(), _as);
}
auto auth_version = co_await _sys_ks.get_auth_version();
if (auth_version < db::system_keyspace::auth_version_t::v2) {
rtlogger.info("migrating system_auth keyspace data");
co_await auth::migrate_to_auth_v2(_sys_ks, _group0.client(),
[this] (abort_source&) { return start_operation();}, _as);
}
rtlogger.info("building initial raft topology state and CDC generation");
guard = co_await start_operation();
auto get_application_state = [&] (locator::host_id host_id, const gms::application_state_map& epmap, gms::application_state app_state) -> sstring {
const auto it = epmap.find(app_state);
if (it == epmap.end()) {
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}: application state {} is missing in gossip",
host_id, app_state));
}
// it's versioned_value::value(), not std::optional::value() - it does not throw
return it->second.value();
};
// Create a new CDC generation
auto get_sharding_info_for_host_id = [&] (locator::host_id host_id) -> std::pair<size_t, uint8_t> {
const auto eptr = _gossiper.get_endpoint_state_ptr(host_id);
if (!eptr) {
throw std::runtime_error(format("no gossiper endpoint state for node {}", host_id));
}
const auto& epmap = eptr->get_application_state_map();
const auto shard_count = std::stoi(get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT));
const auto ignore_msb = std::stoi(get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS));
return std::make_pair<size_t, uint8_t>(shard_count, ignore_msb);
};
auto [cdc_gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), std::nullopt, get_sharding_info_for_host_id);
guard = std::move(guard_);
topology_mutation_builder builder(guard.write_timestamp());
std::set<sstring> enabled_features;
// Build per-node state
for (const auto& node: tmptr->get_topology().get_nodes()) {
if (!node.get().is_member()) {
continue;
}
const auto& host_id = node.get().host_id();
const auto eptr = _gossiper.get_endpoint_state_ptr(host_id);
if (!eptr) {
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {} as gossip contains no data for it", host_id));
}
const auto& epmap = eptr->get_application_state_map();
const auto datacenter = get_application_state(host_id, epmap, gms::application_state::DC);
const auto rack = get_application_state(host_id, epmap, gms::application_state::RACK);
const auto tokens_v = tmptr->get_tokens(host_id);
const std::unordered_set<dht::token> tokens(tokens_v.begin(), tokens_v.end());
const auto release_version = get_application_state(host_id, epmap, gms::application_state::RELEASE_VERSION);
const auto num_tokens = tokens.size();
const auto shard_count = get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT);
const auto ignore_msb = get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS);
const auto supported_features_s = get_application_state(host_id, epmap, gms::application_state::SUPPORTED_FEATURES);
const auto supported_features = gms::feature_service::to_feature_set(supported_features_s);
if (enabled_features.empty()) {
enabled_features = supported_features;
} else {
std::erase_if(enabled_features, [&] (const sstring& f) { return !supported_features.contains(f); });
}
builder.with_node(raft::server_id(host_id.uuid()))
.set("datacenter", datacenter)
.set("rack", rack)
.set("tokens", tokens)
.set("node_state", node_state::normal)
.set("release_version", release_version)
.set("num_tokens", (uint32_t)num_tokens)
.set("tokens_string", "")
.set("shard_count", (uint32_t)std::stoi(shard_count))
.set("ignore_msb", (uint32_t)std::stoi(ignore_msb))
.set("cleanup_status", cleanup_status::clean)
.set("request_id", utils::UUID())
.set("supported_features", supported_features);
rtlogger.debug("node {} will contain the following parameters: "
"datacenter={}, rack={}, tokens={}, shard_count={}, ignore_msb={}, supported_features={}",
host_id, datacenter, rack, tokens, shard_count, ignore_msb, supported_features);
}
// Build the static columns
const bool add_ts_delay = true; // This is not the first generation, so add delay
auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay);
const cdc::generation_id_v2 cdc_gen_id {
.ts = cdc_gen_ts,
.id = cdc_gen_uuid,
};
builder.set_version(topology::initial_version)
.set_fence_version(topology::initial_version)
.add_new_committed_cdc_generation(cdc_gen_id)
.add_enabled_features(std::move(enabled_features));
// Commit
builder.set_upgrade_state(topology::upgrade_state_type::done);
auto reason = "upgrade: build the initial state";
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
}
future<> topology_coordinator::fence_previous_coordinator() {
// Write empty change to make sure that a guard taken by any previous coordinator cannot
// be used to do a successful write any more. Otherwise the following can theoretically happen
@@ -4202,6 +4270,39 @@ bool topology_coordinator::handle_topology_coordinator_error(std::exception_ptr
return false;
}
future<bool> topology_coordinator::maybe_run_upgrade() {
if (_topo_sm._topology.upgrade_state == topology::upgrade_state_type::done) {
// Upgrade was already done, nothing to do
co_return true;
}
rtlogger.info("topology coordinator fiber is upgrading the cluster to raft topology mode");
auto abort = _as.subscribe([this] () noexcept {
_topo_sm.event.broadcast();
});
while (!_as.abort_requested() && _topo_sm._topology.upgrade_state != topology::upgrade_state_type::done) {
bool sleep = false;
try {
auto guard = co_await start_operation();
co_await do_upgrade_step(std::move(guard));
} catch (...) {
sleep = handle_topology_coordinator_error(std::current_exception());
}
if (sleep) {
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
rtlogger.debug("sleep failed: {}", std::current_exception());
}
}
co_await coroutine::maybe_yield();
}
co_return !_as.abort_requested();
}
future<> topology_coordinator::run() {
auto abort = _as.subscribe([this] () noexcept {
_topo_sm.event.broadcast();
@@ -4346,9 +4447,12 @@ future<> run_topology_coordinator(
lifecycle_notifier.register_subscriber(&coordinator);
try {
rtlogger.info("start topology coordinator fiber");
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
return coordinator.run();
});
const bool upgrade_done = co_await coordinator.maybe_run_upgrade();
if (upgrade_done) {
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
return coordinator.run();
});
}
} catch (...) {
ex = std::current_exception();
}

View File

@@ -361,16 +361,6 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
return *this;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_snapshot_tables_data(const std::unordered_set<table_id>& table_ids, const sstring& tag, bool skip_flush) {
auto uuids = table_ids | std::views::transform(std::mem_fn(&table_id::uuid));
apply_atomic("snapshot_table_ids",
make_set_value(schema().get_column_definition("snapshot_table_ids")->type,
set_type_impl::native_type(uuids.begin(), uuids.end())));
apply_atomic("snapshot_tag", tag);
apply_atomic("snapshot_skip_flush", skip_flush);
return *this;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_new_keyspace_rf_change_data(
const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc) {
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);

Some files were not shown because too many files have changed in this diff Show More