mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 09:00:35 +00:00
Compare commits
2 Commits
copilot/do
...
SCYLLADB-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0313d3e291 | ||
|
|
4a06935bb0 |
53
.github/workflows/call_backport_with_jira.yaml
vendored
53
.github/workflows/call_backport_with_jira.yaml
vendored
@@ -1,53 +0,0 @@
|
||||
name: Backport with Jira Integration
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- next-*.*
|
||||
- branch-*.*
|
||||
pull_request_target:
|
||||
types: [labeled, closed]
|
||||
branches:
|
||||
- master
|
||||
- next
|
||||
- next-*.*
|
||||
- branch-*.*
|
||||
|
||||
jobs:
|
||||
backport-on-push:
|
||||
if: github.event_name == 'push'
|
||||
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
|
||||
with:
|
||||
event_type: 'push'
|
||||
base_branch: ${{ github.ref }}
|
||||
commits: ${{ github.event.before }}..${{ github.sha }}
|
||||
secrets:
|
||||
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
backport-on-label:
|
||||
if: github.event_name == 'pull_request_target' && github.event.action == 'labeled'
|
||||
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
|
||||
with:
|
||||
event_type: 'labeled'
|
||||
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
|
||||
pull_request_number: ${{ github.event.pull_request.number }}
|
||||
head_commit: ${{ github.event.pull_request.base.sha }}
|
||||
label_name: ${{ github.event.label.name }}
|
||||
pr_state: ${{ github.event.pull_request.state }}
|
||||
secrets:
|
||||
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
backport-chain:
|
||||
if: github.event_name == 'pull_request_target' && github.event.action == 'closed' && github.event.pull_request.merged == true
|
||||
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
|
||||
with:
|
||||
event_type: 'chain'
|
||||
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
|
||||
pull_request_number: ${{ github.event.pull_request.number }}
|
||||
pr_body: ${{ github.event.pull_request.body }}
|
||||
secrets:
|
||||
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
26
.github/workflows/trigger-scylla-ci.yaml
vendored
26
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -12,24 +12,6 @@ jobs:
|
||||
if: (github.event_name == 'issue_comment' && github.event.comment.user.login != 'scylladbbot') || github.event.label.name == 'conflicts'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Verify Org Membership
|
||||
id: verify_author
|
||||
shell: bash
|
||||
run: |
|
||||
if [[ "${{ github.event_name }}" == "pull_request_target" ]]; then
|
||||
AUTHOR="${{ github.event.pull_request.user.login }}"
|
||||
ASSOCIATION="${{ github.event.pull_request.author_association }}"
|
||||
else
|
||||
AUTHOR="${{ github.event.comment.user.login }}"
|
||||
ASSOCIATION="${{ github.event.comment.author_association }}"
|
||||
fi
|
||||
if [[ "$ASSOCIATION" == "MEMBER" || "$ASSOCIATION" == "OWNER" ]]; then
|
||||
echo "member=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "::warning::${AUTHOR} is not a member of scylladb (association: ${ASSOCIATION}); skipping CI trigger."
|
||||
echo "member=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: Validate Comment Trigger
|
||||
if: github.event_name == 'issue_comment'
|
||||
id: verify_comment
|
||||
@@ -48,13 +30,13 @@ jobs:
|
||||
fi
|
||||
|
||||
- name: Trigger Scylla-CI-Route Jenkins Job
|
||||
if: steps.verify_author.outputs.member == 'true' && (github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true')
|
||||
if: github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true'
|
||||
env:
|
||||
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
|
||||
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
|
||||
JENKINS_URL: "https://jenkins.scylladb.com"
|
||||
PR_NUMBER: "${{ github.event.issue.number || github.event.pull_request.number }}"
|
||||
PR_REPO_NAME: "${{ github.event.repository.full_name }}"
|
||||
run: |
|
||||
PR_NUMBER=${{ github.event.issue.number || github.event.pull_request.number }}
|
||||
PR_REPO_NAME=${{ github.event.repository.full_name }}
|
||||
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
|
||||
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail
|
||||
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v
|
||||
|
||||
@@ -300,6 +300,7 @@ add_subdirectory(locator)
|
||||
add_subdirectory(message)
|
||||
add_subdirectory(mutation)
|
||||
add_subdirectory(mutation_writer)
|
||||
add_subdirectory(node_ops)
|
||||
add_subdirectory(readers)
|
||||
add_subdirectory(replica)
|
||||
add_subdirectory(raft)
|
||||
|
||||
@@ -63,7 +63,6 @@
|
||||
#include "types/types.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "cql3/statements/ks_prop_defs.hh"
|
||||
#include "alternator/ttl_tag.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -165,7 +164,7 @@ static map_type attrs_type() {
|
||||
|
||||
static const column_definition& attrs_column(const schema& schema) {
|
||||
const column_definition* cdef = schema.get_column_definition(bytes(executor::ATTRS_COLUMN_NAME));
|
||||
throwing_assert(cdef);
|
||||
SCYLLA_ASSERT(cdef);
|
||||
return *cdef;
|
||||
}
|
||||
|
||||
@@ -1650,7 +1649,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
|
||||
throwing_assert(this_shard_id() == 0);
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// We begin by parsing and validating the content of the CreateTable
|
||||
// command. We can't inspect the current database schema at this point
|
||||
@@ -2838,12 +2837,14 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
}
|
||||
} else if (_write_isolation != write_isolation::LWT_ALWAYS) {
|
||||
std::optional<mutation> m = apply(nullptr, api::new_timestamp(), cdc_opts);
|
||||
throwing_assert(m); // !needs_read_before_write, so apply() did not check a condition
|
||||
SCYLLA_ASSERT(m); // !needs_read_before_write, so apply() did not check a condition
|
||||
return proxy.mutate(utils::chunked_vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes, false, std::move(cdc_opts)).then([this, &wcu_total] () mutable {
|
||||
return rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total);
|
||||
});
|
||||
}
|
||||
throwing_assert(cas_shard);
|
||||
if (!cas_shard) {
|
||||
on_internal_error(elogger, "cas_shard is not set");
|
||||
}
|
||||
// If we're still here, we need to do this write using LWT:
|
||||
global_stats.write_using_lwt++;
|
||||
per_table_stats.write_using_lwt++;
|
||||
@@ -5412,7 +5413,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
}
|
||||
|
||||
static dht::token token_for_segment(int segment, int total_segments) {
|
||||
throwing_assert(total_segments > 1 && segment >= 0 && segment < total_segments);
|
||||
SCYLLA_ASSERT(total_segments > 1 && segment >= 0 && segment < total_segments);
|
||||
uint64_t delta = std::numeric_limits<uint64_t>::max() / total_segments;
|
||||
return dht::token::from_int64(std::numeric_limits<int64_t>::min() + delta * segment);
|
||||
}
|
||||
|
||||
@@ -710,7 +710,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
++_executor._stats.requests_blocked_memory;
|
||||
}
|
||||
auto units = co_await std::move(units_fut);
|
||||
throwing_assert(req->content_stream);
|
||||
SCYLLA_ASSERT(req->content_stream);
|
||||
chunked_content content = co_await read_entire_stream(*req->content_stream, request_content_length_limit);
|
||||
// If the request had no Content-Length, we reserved too many units
|
||||
// so need to return some
|
||||
|
||||
@@ -46,7 +46,6 @@
|
||||
#include "alternator/executor.hh"
|
||||
#include "alternator/controller.hh"
|
||||
#include "alternator/serialization.hh"
|
||||
#include "alternator/ttl_tag.hh"
|
||||
#include "dht/sharder.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/tags/utils.hh"
|
||||
@@ -58,10 +57,19 @@ static logging::logger tlogger("alternator_ttl");
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// We write the expiration-time attribute enabled on a table in a
|
||||
// tag TTL_TAG_KEY.
|
||||
// Currently, the *value* of this tag is simply the name of the attribute,
|
||||
// and the expiration scanner interprets it as an Alternator attribute name -
|
||||
// It can refer to a real column or if that doesn't exist, to a member of
|
||||
// the ":attrs" map column. Although this is designed for Alternator, it may
|
||||
// be good enough for CQL as well (there, the ":attrs" column won't exist).
|
||||
extern const sstring TTL_TAG_KEY;
|
||||
|
||||
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.update_time_to_live++;
|
||||
if (!_proxy.features().alternator_ttl) {
|
||||
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Upgrade all nodes to a version that supports it.");
|
||||
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
|
||||
}
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
@@ -316,7 +324,9 @@ static future<std::vector<std::pair<dht::token_range, locator::host_id>>> get_se
|
||||
const auto& tm = *erm->get_token_metadata_ptr();
|
||||
const auto& sorted_tokens = tm.sorted_tokens();
|
||||
std::vector<std::pair<dht::token_range, locator::host_id>> ret;
|
||||
throwing_assert(!sorted_tokens.empty());
|
||||
if (sorted_tokens.empty()) {
|
||||
on_internal_error(tlogger, "Token metadata is empty");
|
||||
}
|
||||
auto prev_tok = sorted_tokens.back();
|
||||
for (const auto& tok : sorted_tokens) {
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -553,7 +563,7 @@ static future<> scan_table_ranges(
|
||||
expiration_service::stats& expiration_stats)
|
||||
{
|
||||
const schema_ptr& s = scan_ctx.s;
|
||||
throwing_assert(partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
|
||||
SCYLLA_ASSERT (partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
|
||||
auto p = service::pager::query_pagers::pager(proxy, s, scan_ctx.selection, *scan_ctx.query_state_ptr,
|
||||
*scan_ctx.query_options, scan_ctx.command, std::move(partition_ranges), nullptr);
|
||||
while (!p->is_exhausted()) {
|
||||
@@ -630,38 +640,13 @@ static future<> scan_table_ranges(
|
||||
}
|
||||
} else {
|
||||
// For a real column to contain an expiration time, it
|
||||
// must be a numeric type. We currently support decimal
|
||||
// (used by Alternator TTL) as well as bigint, int and
|
||||
// timestamp (used by CQL per-row TTL).
|
||||
switch (meta[*expiration_column]->type->get_kind()) {
|
||||
case abstract_type::kind::decimal:
|
||||
// Used by Alternator TTL for key columns not stored
|
||||
// in the map. The value is in seconds, fractional
|
||||
// part is ignored.
|
||||
expired = is_expired(value_cast<big_decimal>(v), now);
|
||||
break;
|
||||
case abstract_type::kind::long_kind:
|
||||
// Used by CQL per-row TTL. The value is in seconds.
|
||||
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int64_t>(v))), now);
|
||||
break;
|
||||
case abstract_type::kind::int32:
|
||||
// Used by CQL per-row TTL. The value is in seconds.
|
||||
// Using int type is not recommended because it will
|
||||
// overflow in 2038, but we support it to allow users
|
||||
// to use existing int columns for expiration.
|
||||
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int32_t>(v))), now);
|
||||
break;
|
||||
case abstract_type::kind::timestamp:
|
||||
// Used by CQL per-row TTL. The value is in milliseconds
|
||||
// but we truncate it to gc_clock's precision (whole seconds).
|
||||
expired = is_expired(gc_clock::time_point(std::chrono::duration_cast<gc_clock::duration>(value_cast<db_clock::time_point>(v).time_since_epoch())), now);
|
||||
break;
|
||||
default:
|
||||
// Should never happen - we verified the column's type
|
||||
// before starting the scan.
|
||||
[[unlikely]]
|
||||
on_internal_error(tlogger, format("expiration scanner value of unsupported type {} in column {}", meta[*expiration_column]->type->cql3_type_name(), scan_ctx.column_name) );
|
||||
}
|
||||
// must be a numeric type.
|
||||
// FIXME: Currently we only support decimal_type (which is
|
||||
// what Alternator uses), but other numeric types can be
|
||||
// supported as well to make this feature more useful in CQL.
|
||||
// Note that kind::decimal is also checked above.
|
||||
big_decimal n = value_cast<big_decimal>(v);
|
||||
expired = is_expired(n, now);
|
||||
}
|
||||
if (expired) {
|
||||
expiration_stats.items_deleted++;
|
||||
@@ -723,12 +708,16 @@ static future<bool> scan_table(
|
||||
co_return false;
|
||||
}
|
||||
// attribute_name may be one of the schema's columns (in Alternator, this
|
||||
// means a key column, in CQL it's a regular column), or an element in
|
||||
// Alternator's attrs map encoded in Alternator's JSON encoding (which we
|
||||
// decode). If attribute_name is a real column, in Alternator it will have
|
||||
// the type decimal, counting seconds since the UNIX epoch, while in CQL
|
||||
// it will one of the types bigint or int (counting seconds) or timestamp
|
||||
// (counting milliseconds).
|
||||
// means it's a key column), or an element in Alternator's attrs map
|
||||
// encoded in Alternator's JSON encoding.
|
||||
// FIXME: To make this less Alternators-specific, we should encode in the
|
||||
// single key's value three things:
|
||||
// 1. The name of a column
|
||||
// 2. Optionally if column is a map, a member in the map
|
||||
// 3. The deserializer for the value: CQL or Alternator (JSON).
|
||||
// The deserializer can be guessed: If the given column or map item is
|
||||
// numeric, it can be used directly. If it is a "bytes" type, it needs to
|
||||
// be deserialized using Alternator's deserializer.
|
||||
bytes column_name = to_bytes(*attribute_name);
|
||||
const column_definition *cd = s->get_column_definition(column_name);
|
||||
std::optional<std::string> member;
|
||||
@@ -747,14 +736,11 @@ static future<bool> scan_table(
|
||||
data_type column_type = cd->type;
|
||||
// Verify that the column has the right type: If "member" exists
|
||||
// the column must be a map, and if it doesn't, the column must
|
||||
// be decimal_type (Alternator), bigint, int or timestamp (CQL).
|
||||
// If the column has the wrong type nothing can get expired in
|
||||
// this table, and it's pointless to scan it.
|
||||
// (currently) be a decimal_type. If the column has the wrong type
|
||||
// nothing can get expired in this table, and it's pointless to
|
||||
// scan it.
|
||||
if ((member && column_type->get_kind() != abstract_type::kind::map) ||
|
||||
(!member && column_type->get_kind() != abstract_type::kind::decimal &&
|
||||
column_type->get_kind() != abstract_type::kind::long_kind &&
|
||||
column_type->get_kind() != abstract_type::kind::int32 &&
|
||||
column_type->get_kind() != abstract_type::kind::timestamp)) {
|
||||
(!member && column_type->get_kind() != abstract_type::kind::decimal)) {
|
||||
tlogger.info("table {} TTL column has unsupported type, not scanning", s->cf_name());
|
||||
co_return false;
|
||||
}
|
||||
@@ -781,7 +767,7 @@ static future<bool> scan_table(
|
||||
// by tasking another node to take over scanning of the dead node's primary
|
||||
// ranges. What we do here is that this node will also check expiration
|
||||
// on its *secondary* ranges - but only those whose primary owner is down.
|
||||
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet, erm->get_topology()); // throws if no secondary replica
|
||||
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica
|
||||
if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) {
|
||||
if (!gossiper.is_alive(tablet_primary_replica.host)) {
|
||||
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);
|
||||
@@ -892,10 +878,12 @@ future<> expiration_service::run() {
|
||||
future<> expiration_service::start() {
|
||||
// Called by main() on each shard to start the expiration-service
|
||||
// thread. Just runs run() in the background and allows stop().
|
||||
if (!shutting_down()) {
|
||||
_end = run().handle_exception([] (std::exception_ptr ep) {
|
||||
tlogger.error("expiration_service failed: {}", ep);
|
||||
});
|
||||
if (_db.features().alternator_ttl) {
|
||||
if (!shutting_down()) {
|
||||
_end = run().handle_exception([] (std::exception_ptr ep) {
|
||||
tlogger.error("expiration_service failed: {}", ep);
|
||||
});
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
/*
|
||||
* Copyright 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
namespace alternator {
|
||||
// We use the table tag TTL_TAG_KEY ("system:ttl_attribute") to remember
|
||||
// which attribute was chosen as the expiration-time attribute for
|
||||
// Alternator's TTL and CQL's per-row TTL features.
|
||||
// Currently, the *value* of this tag is simply the name of the attribute:
|
||||
// It can refer to a real column or if that doesn't exist, to a member of
|
||||
// the ":attrs" map column (which Alternator uses).
|
||||
extern const sstring TTL_TAG_KEY;
|
||||
} // namespace alternator
|
||||
|
||||
// let users use TTL_TAG_KEY without the "alternator::" prefix,
|
||||
// to make it easier to move it to a different namespace later.
|
||||
using alternator::TTL_TAG_KEY;
|
||||
@@ -3085,48 +3085,6 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
{
|
||||
"path":"/storage_service/tablets/snapshots",
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Takes the snapshot for the given keyspaces/tables. A snapshot name must be specified.",
|
||||
"type":"void",
|
||||
"nickname":"take_cluster_snapshot",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"tag",
|
||||
"description":"the tag given to the snapshot",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace(s) to snapshot. Multiple keyspaces can be provided using a comma-separated list. If omitted, snapshot all keyspaces.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"table",
|
||||
"description":"Table(s) to snapshot. Multiple tables (in a single keyspace) can be provided using a comma-separated list. If omitted, snapshot all tables in the given keyspace(s).",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
{
|
||||
"path":"/storage_service/quiesce_topology",
|
||||
"operations":[
|
||||
|
||||
@@ -783,13 +783,17 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
|
||||
|
||||
apilog.info("cleanup_all global={}", global);
|
||||
|
||||
if (global) {
|
||||
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
|
||||
co_return co_await ss.do_clusterwide_vnodes_cleanup();
|
||||
});
|
||||
auto done = !global ? false : co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
|
||||
if (!ss.is_topology_coordinator_enabled()) {
|
||||
co_return false;
|
||||
}
|
||||
co_await ss.do_clusterwide_vnodes_cleanup();
|
||||
co_return true;
|
||||
});
|
||||
if (done) {
|
||||
co_return json::json_return_type(0);
|
||||
}
|
||||
// fall back to the local cleanup if local cleanup is requested
|
||||
// fall back to the local cleanup if topology coordinator is not enabled or local cleanup is requested
|
||||
auto& db = ctx.db;
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::global_cleanup_compaction_task_impl>({}, db);
|
||||
@@ -797,7 +801,9 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
|
||||
|
||||
// Mark this node as clean
|
||||
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
|
||||
co_await ss.reset_cleanup_needed();
|
||||
if (ss.is_topology_coordinator_enabled()) {
|
||||
co_await ss.reset_cleanup_needed();
|
||||
}
|
||||
});
|
||||
|
||||
co_return json::json_return_type(0);
|
||||
@@ -808,6 +814,9 @@ future<json::json_return_type>
|
||||
rest_reset_cleanup_needed(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
apilog.info("reset_cleanup_needed");
|
||||
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
|
||||
if (!ss.is_topology_coordinator_enabled()) {
|
||||
throw std::runtime_error("mark_node_as_clean is only supported when topology over raft is enabled");
|
||||
}
|
||||
return ss.reset_cleanup_needed();
|
||||
});
|
||||
co_return json_void();
|
||||
@@ -1565,7 +1574,16 @@ rest_reload_raft_topology_state(sharded<service::storage_service>& ss, service::
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_upgrade_to_raft_topology(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
apilog.info("Requested to schedule upgrade to raft topology, but this version does not need it since it uses raft topology by default.");
|
||||
apilog.info("Requested to schedule upgrade to raft topology");
|
||||
try {
|
||||
co_await ss.invoke_on(0, [] (auto& ss) {
|
||||
return ss.start_upgrade_to_raft_topology();
|
||||
});
|
||||
} catch (...) {
|
||||
auto ex = std::current_exception();
|
||||
apilog.error("Failed to schedule upgrade to raft topology: {}", ex);
|
||||
std::rethrow_exception(std::move(ex));
|
||||
}
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
@@ -2007,8 +2025,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
auto tag = req->get_query_param("tag");
|
||||
auto column_families = split(req->get_query_param("cf"), ",");
|
||||
auto sfopt = req->get_query_param("sf");
|
||||
auto tcopt = req->get_query_param("tc");
|
||||
|
||||
db::snapshot_options opts = {
|
||||
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
|
||||
};
|
||||
@@ -2033,27 +2049,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
}
|
||||
});
|
||||
|
||||
ss::take_cluster_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
apilog.info("take_cluster_snapshot: {}", req->get_query_params());
|
||||
auto tag = req->get_query_param("tag");
|
||||
auto column_families = split(req->get_query_param("table"), ",");
|
||||
// Note: not published/active. Retain as internal option, but...
|
||||
auto sfopt = req->get_query_param("skip_flush");
|
||||
|
||||
db::snapshot_options opts = {
|
||||
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
|
||||
};
|
||||
|
||||
std::vector<sstring> keynames = split(req->get_query_param("keyspace"), ",");
|
||||
try {
|
||||
co_await snap_ctl.local().take_cluster_column_family_snapshot(keynames, column_families, tag, opts);
|
||||
co_return json_void();
|
||||
} catch (...) {
|
||||
apilog.error("take_cluster_snapshot failed: {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
|
||||
ss::del_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
apilog.info("del_snapshot: {}", req->get_query_params());
|
||||
auto tag = req->get_query_param("tag");
|
||||
|
||||
@@ -110,23 +110,15 @@ future<> cache::prune(const resource& r) {
|
||||
future<> cache::reload_all_permissions() noexcept {
|
||||
SCYLLA_ASSERT(_permission_loader);
|
||||
auto units = co_await get_units(_loading_sem, 1, _as);
|
||||
auto copy_keys = [] (const std::unordered_map<resource, permission_set>& m) {
|
||||
std::vector<resource> keys;
|
||||
keys.reserve(m.size());
|
||||
for (const auto& [res, _] : m) {
|
||||
keys.push_back(res);
|
||||
}
|
||||
return keys;
|
||||
};
|
||||
const role_or_anonymous anon;
|
||||
for (const auto& res : copy_keys(_anonymous_permissions)) {
|
||||
_anonymous_permissions[res] = co_await _permission_loader(anon, res);
|
||||
for (auto& [res, perms] : _anonymous_permissions) {
|
||||
perms = co_await _permission_loader(anon, res);
|
||||
}
|
||||
for (auto& [role, entry] : _roles) {
|
||||
auto& perms_cache = entry->cached_permissions;
|
||||
auto r = role_or_anonymous(role);
|
||||
for (const auto& res : copy_keys(perms_cache)) {
|
||||
perms_cache[res] = co_await _permission_loader(r, res);
|
||||
for (auto& [res, perms] : perms_cache) {
|
||||
perms = co_await _permission_loader(r, res);
|
||||
}
|
||||
}
|
||||
logger.debug("Reloaded auth cache with {} entries", _roles.size());
|
||||
@@ -236,7 +228,6 @@ future<> cache::load_all() {
|
||||
co_await distribute_role(name, role);
|
||||
}
|
||||
co_await container().invoke_on_others([this](cache& c) -> future<> {
|
||||
auto units = co_await get_units(c._loading_sem, 1, c._as);
|
||||
c._current_version = _current_version;
|
||||
co_await c.prune_all();
|
||||
});
|
||||
@@ -296,11 +287,10 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
|
||||
|
||||
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
|
||||
auto role_ptr = role.get();
|
||||
co_await container().invoke_on_others([&name, role_ptr](cache& c) -> future<> {
|
||||
auto units = co_await get_units(c._loading_sem, 1, c._as);
|
||||
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
|
||||
if (!role_ptr) {
|
||||
c.remove_role(name);
|
||||
co_return;
|
||||
return;
|
||||
}
|
||||
auto role_copy = make_lw_shared<role_record>(*role_ptr);
|
||||
c.add_role(name, std::move(role_copy));
|
||||
|
||||
@@ -1519,9 +1519,7 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
|
||||
| std::views::transform(std::mem_fn(&sstables::sstable::run_identifier))
|
||||
| std::ranges::to<std::unordered_set>());
|
||||
};
|
||||
const auto injected_threshold = utils::get_local_injector().inject_parameter<size_t>("set_sstable_count_reduction_threshold");
|
||||
const auto threshold = injected_threshold.value_or(size_t(std::max(schema->max_compaction_threshold(), 32)));
|
||||
|
||||
const auto threshold = size_t(std::max(schema->max_compaction_threshold(), 32));
|
||||
auto count = co_await num_runs_for_compaction();
|
||||
if (count <= threshold) {
|
||||
cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}",
|
||||
@@ -1536,7 +1534,9 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
|
||||
auto& cstate = get_compaction_state(&t);
|
||||
try {
|
||||
while (can_perform_regular_compaction(t) && co_await num_runs_for_compaction() > threshold) {
|
||||
co_await cstate.compaction_done.when();
|
||||
co_await cstate.compaction_done.wait([this, &t] {
|
||||
return !can_perform_regular_compaction(t);
|
||||
});
|
||||
}
|
||||
} catch (const broken_condition_variable&) {
|
||||
co_return;
|
||||
|
||||
@@ -874,16 +874,7 @@ maintenance_socket: ignore
|
||||
# The `tablets` option cannot be changed using `ALTER KEYSPACE`.
|
||||
tablets_mode_for_new_keyspaces: enabled
|
||||
|
||||
# Require every tablet-enabled keyspace to be RF-rack-valid.
|
||||
#
|
||||
# A tablet-enabled keyspace is RF-rack-valid when, for each data center,
|
||||
# its replication factor (RF) is 0, 1, or exactly equal to the number of
|
||||
# racks in that data center. Setting the RF to the number of racks ensures
|
||||
# that a single rack failure never results in data unavailability.
|
||||
#
|
||||
# When set to true, CREATE KEYSPACE and ALTER KEYSPACE statements that
|
||||
# would produce an RF-rack-invalid keyspace are rejected.
|
||||
# When set to false, such statements are allowed but emit a warning.
|
||||
# Enforce RF-rack-valid keyspaces.
|
||||
rf_rack_valid_keyspaces: false
|
||||
|
||||
#
|
||||
|
||||
@@ -1192,7 +1192,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/azure/identity/default_credentials.cc',
|
||||
'utils/gcp/gcp_credentials.cc',
|
||||
'utils/gcp/object_storage.cc',
|
||||
'utils/gcp/object_storage_retry_strategy.cc',
|
||||
'gms/version_generator.cc',
|
||||
'gms/versioned_value.cc',
|
||||
'gms/gossiper.cc',
|
||||
@@ -1361,6 +1360,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'service/topology_state_machine.cc',
|
||||
'service/topology_mutation.cc',
|
||||
'service/topology_coordinator.cc',
|
||||
'node_ops/node_ops_ctl.cc',
|
||||
'node_ops/task_manager_module.cc',
|
||||
'reader_concurrency_semaphore_group.cc',
|
||||
'utils/disk_space_monitor.cc',
|
||||
|
||||
25
cql3/Cql.g
25
cql3/Cql.g
@@ -874,8 +874,8 @@ cfamDefinition[cql3::statements::create_table_statement::raw_statement& expr]
|
||||
;
|
||||
|
||||
cfamColumns[cql3::statements::create_table_statement::raw_statement& expr]
|
||||
@init { bool is_static=false, is_ttl=false; }
|
||||
: k=ident v=comparatorType (K_TTL {is_ttl = true;})? (K_STATIC {is_static = true;})? { $expr.add_definition(k, v, is_static, is_ttl); }
|
||||
@init { bool is_static=false; }
|
||||
: k=ident v=comparatorType (K_STATIC {is_static = true;})? { $expr.add_definition(k, v, is_static); }
|
||||
(K_PRIMARY K_KEY { $expr.add_key_aliases(std::vector<shared_ptr<cql3::column_identifier>>{k}); })?
|
||||
| K_PRIMARY K_KEY '(' pkDef[expr] (',' c=ident { $expr.add_column_alias(c); } )* ')'
|
||||
;
|
||||
@@ -1042,7 +1042,6 @@ alterTableStatement returns [std::unique_ptr<alter_table_statement::raw_statemen
|
||||
std::vector<alter_table_statement::column_change> column_changes;
|
||||
std::vector<std::pair<shared_ptr<cql3::column_identifier::raw>, shared_ptr<cql3::column_identifier::raw>>> renames;
|
||||
auto attrs = std::make_unique<cql3::attributes::raw>();
|
||||
shared_ptr<cql3::column_identifier::raw> ttl_change;
|
||||
}
|
||||
: K_ALTER K_COLUMNFAMILY cf=columnFamilyName
|
||||
( K_ALTER id=cident K_TYPE v=comparatorType { type = alter_table_statement::type::alter; column_changes.emplace_back(alter_table_statement::column_change{id, v}); }
|
||||
@@ -1061,11 +1060,9 @@ alterTableStatement returns [std::unique_ptr<alter_table_statement::raw_statemen
|
||||
| K_RENAME { type = alter_table_statement::type::rename; }
|
||||
id1=cident K_TO toId1=cident { renames.emplace_back(id1, toId1); }
|
||||
( K_AND idn=cident K_TO toIdn=cident { renames.emplace_back(idn, toIdn); } )*
|
||||
| K_TTL { type = alter_table_statement::type::ttl; }
|
||||
( id=cident { ttl_change = id; } | K_NULL )
|
||||
)
|
||||
{
|
||||
$expr = std::make_unique<alter_table_statement::raw_statement>(std::move(cf), type, std::move(column_changes), std::move(props), std::move(renames), std::move(attrs), std::move(ttl_change));
|
||||
$expr = std::make_unique<alter_table_statement::raw_statement>(std::move(cf), type, std::move(column_changes), std::move(props), std::move(renames), std::move(attrs));
|
||||
}
|
||||
;
|
||||
|
||||
@@ -2074,21 +2071,7 @@ vector_type returns [shared_ptr<cql3::cql3_type::raw> pt]
|
||||
{
|
||||
if ($d.text[0] == '-')
|
||||
throw exceptions::invalid_request_exception("Vectors must have a dimension greater than 0");
|
||||
unsigned long parsed_dimension;
|
||||
try {
|
||||
parsed_dimension = std::stoul($d.text);
|
||||
} catch (const std::exception& e) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid vector dimension: {}", $d.text));
|
||||
}
|
||||
static_assert(sizeof(unsigned long) >= sizeof(vector_dimension_t));
|
||||
if (parsed_dimension == 0) {
|
||||
throw exceptions::invalid_request_exception("Vectors must have a dimension greater than 0");
|
||||
}
|
||||
if (parsed_dimension > cql3::cql3_type::MAX_VECTOR_DIMENSION) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
format("Vectors must have a dimension less than or equal to {}", cql3::cql3_type::MAX_VECTOR_DIMENSION));
|
||||
}
|
||||
$pt = cql3::cql3_type::raw::vector(t, static_cast<vector_dimension_t>(parsed_dimension));
|
||||
$pt = cql3::cql3_type::raw::vector(t, std::stoul($d.text));
|
||||
}
|
||||
;
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ public:
|
||||
|
||||
struct vector_test_result {
|
||||
test_result result;
|
||||
std::optional<vector_dimension_t> dimension_opt;
|
||||
std::optional<size_t> dimension_opt;
|
||||
};
|
||||
|
||||
static bool is_assignable(test_result tr) {
|
||||
|
||||
@@ -307,14 +307,17 @@ public:
|
||||
|
||||
class cql3_type::raw_vector : public raw {
|
||||
shared_ptr<raw> _type;
|
||||
vector_dimension_t _dimension;
|
||||
size_t _dimension;
|
||||
|
||||
// This limitation is acquired from the maximum number of dimensions in OpenSearch.
|
||||
static constexpr size_t MAX_VECTOR_DIMENSION = 16000;
|
||||
|
||||
virtual sstring to_string() const override {
|
||||
return seastar::format("vector<{}, {}>", _type, _dimension);
|
||||
}
|
||||
|
||||
public:
|
||||
raw_vector(shared_ptr<raw> type, vector_dimension_t dimension)
|
||||
raw_vector(shared_ptr<raw> type, size_t dimension)
|
||||
: _type(std::move(type)), _dimension(dimension) {
|
||||
}
|
||||
|
||||
@@ -414,7 +417,7 @@ cql3_type::raw::tuple(std::vector<shared_ptr<raw>> ts) {
|
||||
}
|
||||
|
||||
shared_ptr<cql3_type::raw>
|
||||
cql3_type::raw::vector(shared_ptr<raw> t, vector_dimension_t dimension) {
|
||||
cql3_type::raw::vector(shared_ptr<raw> t, size_t dimension) {
|
||||
return ::make_shared<raw_vector>(std::move(t), dimension);
|
||||
}
|
||||
|
||||
|
||||
@@ -39,9 +39,6 @@ public:
|
||||
data_type get_type() const { return _type; }
|
||||
const sstring& to_string() const { return _type->cql3_type_name(); }
|
||||
|
||||
// This limitation is acquired from the maximum number of dimensions in OpenSearch.
|
||||
static constexpr vector_dimension_t MAX_VECTOR_DIMENSION = 16000;
|
||||
|
||||
// For UserTypes, we need to know the current keyspace to resolve the
|
||||
// actual type used, so Raw is a "not yet prepared" CQL3Type.
|
||||
class raw {
|
||||
@@ -67,7 +64,7 @@ public:
|
||||
static shared_ptr<raw> list(shared_ptr<raw> t);
|
||||
static shared_ptr<raw> set(shared_ptr<raw> t);
|
||||
static shared_ptr<raw> tuple(std::vector<shared_ptr<raw>> ts);
|
||||
static shared_ptr<raw> vector(shared_ptr<raw> t, vector_dimension_t dimension);
|
||||
static shared_ptr<raw> vector(shared_ptr<raw> t, size_t dimension);
|
||||
static shared_ptr<raw> frozen(shared_ptr<raw> t);
|
||||
friend sstring format_as(const raw& r) {
|
||||
return r.to_string();
|
||||
|
||||
@@ -502,8 +502,8 @@ vector_validate_assignable_to(const collection_constructor& c, data_dictionary::
|
||||
throw exceptions::invalid_request_exception(format("Invalid vector type literal for {} of type {}", *receiver.name, receiver.type->as_cql3_type()));
|
||||
}
|
||||
|
||||
vector_dimension_t expected_size = vt->get_dimension();
|
||||
if (expected_size == 0) {
|
||||
size_t expected_size = vt->get_dimension();
|
||||
if (!expected_size) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid vector type literal for {}: type {} expects at least one element",
|
||||
*receiver.name, receiver.type->as_cql3_type()));
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ namespace functions {
|
||||
|
||||
namespace detail {
|
||||
|
||||
std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension_t dimension) {
|
||||
std::vector<float> extract_float_vector(const bytes_opt& param, size_t dimension) {
|
||||
if (!param) {
|
||||
throw exceptions::invalid_request_exception("Cannot extract float vector from null parameter");
|
||||
}
|
||||
@@ -69,7 +69,7 @@ float compute_cosine_similarity(std::span<const float> v1, std::span<const float
|
||||
}
|
||||
|
||||
if (squared_norm_a == 0 || squared_norm_b == 0) {
|
||||
return std::numeric_limits<float>::quiet_NaN();
|
||||
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
|
||||
}
|
||||
|
||||
// The cosine similarity is in the range [-1, 1].
|
||||
@@ -156,7 +156,7 @@ std::vector<data_type> retrieve_vector_arg_types(const function_name& name, cons
|
||||
}
|
||||
}
|
||||
|
||||
vector_dimension_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
|
||||
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
|
||||
auto type = vector_type_impl::get_instance(float_type, dimension);
|
||||
return {type, type};
|
||||
}
|
||||
@@ -170,7 +170,7 @@ bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters)
|
||||
|
||||
// Extract dimension from the vector type
|
||||
const auto& type = static_cast<const vector_type_impl&>(*arg_types()[0]);
|
||||
vector_dimension_t dimension = type.get_dimension();
|
||||
size_t dimension = type.get_dimension();
|
||||
|
||||
// Optimized path: extract floats directly from bytes, bypassing data_value overhead
|
||||
std::vector<float> v1 = detail::extract_float_vector(parameters[0], dimension);
|
||||
|
||||
@@ -39,7 +39,7 @@ namespace detail {
|
||||
// Extract float vector directly from serialized bytes, bypassing data_value overhead.
|
||||
// This is an internal API exposed for testing purposes.
|
||||
// Vector<float, N> wire format: N floats as big-endian uint32_t values, 4 bytes each.
|
||||
std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension_t dimension);
|
||||
std::vector<float> extract_float_vector(const bytes_opt& param, size_t dimension);
|
||||
|
||||
} // namespace detail
|
||||
|
||||
|
||||
@@ -137,6 +137,15 @@ public:
|
||||
return value_type();
|
||||
}
|
||||
|
||||
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
|
||||
cache_value_ptr vp = _cache.find(key.key());
|
||||
if (!vp) {
|
||||
return false;
|
||||
}
|
||||
(*vp)->update_result_metadata_id(std::move(metadata_id));
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename Pred>
|
||||
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
|
||||
void remove_if(Pred&& pred) {
|
||||
|
||||
@@ -481,6 +481,12 @@ public:
|
||||
|
||||
void update_authorized_prepared_cache_config();
|
||||
|
||||
/// Update the result metadata_id of a cached prepared statement.
|
||||
/// Returns true if the entry was found and updated, false if it was evicted.
|
||||
bool update_prepared_result_metadata_id(const cql3::prepared_cache_key_type& cache_key, cql3::cql_metadata_id_type metadata_id) {
|
||||
return _prepared_cache.update_result_metadata_id(cache_key, std::move(metadata_id));
|
||||
}
|
||||
|
||||
void reset_cache();
|
||||
|
||||
bool topology_global_queue_empty();
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <ostream>
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class result;
|
||||
|
||||
void print_query_results_text(std::ostream& os, const result& result);
|
||||
void print_query_results_json(std::ostream& os, const result& result);
|
||||
|
||||
} // namespace cql3
|
||||
@@ -9,10 +9,8 @@
|
||||
*/
|
||||
|
||||
#include <cstdint>
|
||||
#include "types/json_utils.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/hashers.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
|
||||
namespace cql3 {
|
||||
@@ -197,85 +195,4 @@ make_empty_metadata() {
|
||||
return empty_metadata_cache;
|
||||
}
|
||||
|
||||
void print_query_results_text(std::ostream& os, const cql3::result& result) {
|
||||
const auto& metadata = result.get_metadata();
|
||||
const auto& column_metadata = metadata.get_names();
|
||||
|
||||
struct column_values {
|
||||
size_t max_size{0};
|
||||
sstring header_format;
|
||||
sstring row_format;
|
||||
std::vector<sstring> values;
|
||||
|
||||
void add(sstring value) {
|
||||
max_size = std::max(max_size, value.size());
|
||||
values.push_back(std::move(value));
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<column_values> columns;
|
||||
columns.resize(column_metadata.size());
|
||||
|
||||
for (size_t i = 0; i < column_metadata.size(); ++i) {
|
||||
columns[i].add(column_metadata[i]->name->text());
|
||||
}
|
||||
|
||||
for (const auto& row : result.result_set().rows()) {
|
||||
for (size_t i = 0; i < row.size(); ++i) {
|
||||
if (row[i]) {
|
||||
columns[i].add(column_metadata[i]->type->to_string(linearized(managed_bytes_view(*row[i]))));
|
||||
} else {
|
||||
columns[i].add("");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<sstring> separators(columns.size(), sstring());
|
||||
for (size_t i = 0; i < columns.size(); ++i) {
|
||||
auto& col_values = columns[i];
|
||||
col_values.header_format = seastar::format(" {{:<{}}} ", col_values.max_size);
|
||||
col_values.row_format = seastar::format(" {{:>{}}} ", col_values.max_size);
|
||||
for (size_t c = 0; c < col_values.max_size; ++c) {
|
||||
separators[i] += "-";
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t r = 0; r < result.result_set().rows().size() + 1; ++r) {
|
||||
std::vector<sstring> row;
|
||||
row.reserve(columns.size());
|
||||
for (size_t i = 0; i < columns.size(); ++i) {
|
||||
const auto& format = r == 0 ? columns[i].header_format : columns[i].row_format;
|
||||
row.push_back(fmt::format(fmt::runtime(std::string_view(format)), columns[i].values[r]));
|
||||
}
|
||||
fmt::print(os, "{}\n", fmt::join(row, "|"));
|
||||
if (!r) {
|
||||
fmt::print(os, "-{}-\n", fmt::join(separators, "-+-"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void print_query_results_json(std::ostream& os, const cql3::result& result) {
|
||||
const auto& metadata = result.get_metadata();
|
||||
const auto& column_metadata = metadata.get_names();
|
||||
|
||||
rjson::streaming_writer writer(os);
|
||||
|
||||
writer.StartArray();
|
||||
for (const auto& row : result.result_set().rows()) {
|
||||
writer.StartObject();
|
||||
for (size_t i = 0; i < row.size(); ++i) {
|
||||
writer.Key(column_metadata[i]->name->text());
|
||||
if (!row[i] || row[i]->empty()) {
|
||||
writer.Null();
|
||||
continue;
|
||||
}
|
||||
const auto value = to_json_string(*column_metadata[i]->type, *row[i]);
|
||||
const auto type = to_json_type(*column_metadata[i]->type, *row[i]);
|
||||
writer.RawValue(value, type);
|
||||
}
|
||||
writer.EndObject();
|
||||
}
|
||||
writer.EndArray();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
|
||||
#include "cdc/log.hh"
|
||||
#include "index/vector_index.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "cql3/query_options.hh"
|
||||
@@ -31,9 +30,6 @@
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "cdc/cdc_partitioner.hh"
|
||||
#include "db/tags/extension.hh"
|
||||
#include "db/tags/utils.hh"
|
||||
#include "alternator/ttl_tag.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -47,8 +43,7 @@ alter_table_statement::alter_table_statement(uint32_t bound_terms,
|
||||
std::vector<column_change> column_changes,
|
||||
std::optional<cf_prop_defs> properties,
|
||||
renames_type renames,
|
||||
std::unique_ptr<attributes> attrs,
|
||||
shared_ptr<column_identifier::raw> ttl_change)
|
||||
std::unique_ptr<attributes> attrs)
|
||||
: schema_altering_statement(std::move(name))
|
||||
, _bound_terms(bound_terms)
|
||||
, _type(t)
|
||||
@@ -56,7 +51,6 @@ alter_table_statement::alter_table_statement(uint32_t bound_terms,
|
||||
, _properties(std::move(properties))
|
||||
, _renames(std::move(renames))
|
||||
, _attrs(std::move(attrs))
|
||||
, _ttl_change(std::move(ttl_change))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -386,21 +380,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
|
||||
throw exceptions::invalid_request_exception("Cannot drop columns from a non-CQL3 table");
|
||||
}
|
||||
invoke_column_change_fn(std::mem_fn(&alter_table_statement::drop_column));
|
||||
|
||||
// If we dropped the column used for per-row TTL, we need to remove the tag.
|
||||
if (std::optional<std::string> ttl_column = db::find_tag(*s, TTL_TAG_KEY)) {
|
||||
for (auto& [raw_name, raw_validator, is_static] : _column_changes) {
|
||||
if (*ttl_column == raw_name->text()) {
|
||||
const std::map<sstring, sstring>* tags_ptr = db::get_tags_of_table(s);
|
||||
if (tags_ptr) {
|
||||
std::map<sstring, sstring> tags_map = *tags_ptr;
|
||||
tags_map.erase(TTL_TAG_KEY);
|
||||
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case alter_table_statement::type::opts:
|
||||
@@ -455,7 +434,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
|
||||
break;
|
||||
|
||||
case alter_table_statement::type::rename:
|
||||
{
|
||||
for (auto&& entry : _renames) {
|
||||
auto from = entry.first->prepare_column_identifier(*s);
|
||||
auto to = entry.second->prepare_column_identifier(*s);
|
||||
@@ -492,53 +470,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
|
||||
}
|
||||
return make_pair(std::move(new_base_schema), std::move(view_updates));
|
||||
}
|
||||
case alter_table_statement::type::ttl:
|
||||
if (!db.features().cql_row_ttl) {
|
||||
throw exceptions::invalid_request_exception("The CQL per-row TTL feature is not yet supported by this cluster. Upgrade all nodes to use it.");
|
||||
}
|
||||
if (_ttl_change) {
|
||||
// Enable per-row TTL with chosen column for expiration time
|
||||
const column_definition *cdef =
|
||||
s->get_column_definition(to_bytes(_ttl_change->text()));
|
||||
if (!cdef) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Column '{}' does not exist in table {}.{}", _ttl_change->text(), keyspace(), column_family()));
|
||||
}
|
||||
if (cdef->type != timestamp_type && cdef->type != long_type && cdef->type != int32_type) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("TTL column {} must be of type timestamp, bigint or int, can't be {}", _ttl_change->text(), cdef->type->as_cql3_type().to_string()));
|
||||
}
|
||||
if (cdef->is_primary_key()) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Cannot use a primary key column {} as a TTL column", _ttl_change->text()));
|
||||
}
|
||||
if (cdef->is_static()) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Cannot use a static column {} as a TTL column", _ttl_change->text()));
|
||||
}
|
||||
std::optional<std::string> old_ttl_column = db::find_tag(*s, TTL_TAG_KEY);
|
||||
if (old_ttl_column) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Cannot set TTL column, table {}.{} already has a TTL column defined: {}", keyspace(), column_family(), *old_ttl_column));
|
||||
}
|
||||
const std::map<sstring, sstring>* old_tags_ptr = db::get_tags_of_table(s);
|
||||
std::map<sstring, sstring> tags_map;
|
||||
if (old_tags_ptr) {
|
||||
// tags_ptr is a constant pointer to schema data. To modify
|
||||
// it, we must make a copy.
|
||||
tags_map = *old_tags_ptr;
|
||||
}
|
||||
tags_map[TTL_TAG_KEY] = _ttl_change->text();
|
||||
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
|
||||
} else {
|
||||
// Disable per-row TTL
|
||||
const std::map<sstring, sstring>* tags_ptr = db::get_tags_of_table(s);
|
||||
if (!tags_ptr || tags_ptr->find(TTL_TAG_KEY) == tags_ptr->end()) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Cannot unset TTL column, table {}.{} does not have a TTL column set", keyspace(), column_family()));
|
||||
}
|
||||
// tags_ptr is a constant pointer to schema data. To modify it, we
|
||||
// must make a copy.
|
||||
std::map<sstring, sstring> tags_map = *tags_ptr;
|
||||
tags_map.erase(TTL_TAG_KEY);
|
||||
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return make_pair(cfm.build(), std::move(view_updates));
|
||||
}
|
||||
@@ -577,15 +508,13 @@ alter_table_statement::raw_statement::raw_statement(cf_name name,
|
||||
std::vector<column_change> column_changes,
|
||||
std::optional<cf_prop_defs> properties,
|
||||
renames_type renames,
|
||||
std::unique_ptr<attributes::raw> attrs,
|
||||
shared_ptr<column_identifier::raw> ttl_change)
|
||||
std::unique_ptr<attributes::raw> attrs)
|
||||
: cf_statement(std::move(name))
|
||||
, _type(t)
|
||||
, _column_changes(std::move(column_changes))
|
||||
, _properties(std::move(properties))
|
||||
, _renames(std::move(renames))
|
||||
, _attrs(std::move(attrs))
|
||||
, _ttl_change(std::move(ttl_change))
|
||||
{}
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement>
|
||||
@@ -610,8 +539,7 @@ alter_table_statement::raw_statement::prepare(data_dictionary::database db, cql_
|
||||
_column_changes,
|
||||
_properties,
|
||||
_renames,
|
||||
std::move(prepared_attrs),
|
||||
_ttl_change
|
||||
std::move(prepared_attrs)
|
||||
),
|
||||
ctx,
|
||||
// since alter table is `cql_statement_no_metadata` (it doesn't return any metadata when preparing)
|
||||
|
||||
@@ -32,7 +32,6 @@ public:
|
||||
drop,
|
||||
opts,
|
||||
rename,
|
||||
ttl,
|
||||
};
|
||||
using renames_type = std::vector<std::pair<shared_ptr<column_identifier::raw>,
|
||||
shared_ptr<column_identifier::raw>>>;
|
||||
@@ -51,7 +50,6 @@ private:
|
||||
const std::optional<cf_prop_defs> _properties;
|
||||
const renames_type _renames;
|
||||
const std::unique_ptr<attributes> _attrs;
|
||||
shared_ptr<column_identifier::raw> _ttl_change;
|
||||
public:
|
||||
alter_table_statement(uint32_t bound_terms,
|
||||
cf_name name,
|
||||
@@ -59,8 +57,7 @@ public:
|
||||
std::vector<column_change> column_changes,
|
||||
std::optional<cf_prop_defs> properties,
|
||||
renames_type renames,
|
||||
std::unique_ptr<attributes> attrs,
|
||||
shared_ptr<column_identifier::raw> ttl_change);
|
||||
std::unique_ptr<attributes> attrs);
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
@@ -81,7 +78,6 @@ class alter_table_statement::raw_statement : public raw::cf_statement {
|
||||
const std::optional<cf_prop_defs> _properties;
|
||||
const alter_table_statement::renames_type _renames;
|
||||
const std::unique_ptr<attributes::raw> _attrs;
|
||||
shared_ptr<column_identifier::raw> _ttl_change;
|
||||
|
||||
public:
|
||||
raw_statement(cf_name name,
|
||||
@@ -89,8 +85,7 @@ public:
|
||||
std::vector<column_change> column_changes,
|
||||
std::optional<cf_prop_defs> properties,
|
||||
renames_type renames,
|
||||
std::unique_ptr<attributes::raw> attrs,
|
||||
shared_ptr<column_identifier::raw> ttl_change);
|
||||
std::unique_ptr<attributes::raw> attrs);
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
|
||||
@@ -30,9 +30,6 @@
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "db/config.hh"
|
||||
#include "compaction/time_window_compaction_strategy.hh"
|
||||
#include "db/tags/extension.hh"
|
||||
#include "db/tags/utils.hh"
|
||||
#include "alternator/ttl_tag.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -44,12 +41,10 @@ create_table_statement::create_table_statement(cf_name name,
|
||||
::shared_ptr<cf_prop_defs> properties,
|
||||
bool if_not_exists,
|
||||
column_set_type static_columns,
|
||||
::shared_ptr<column_identifier> ttl_column,
|
||||
const std::optional<table_id>& id)
|
||||
: schema_altering_statement{name}
|
||||
, _use_compact_storage(false)
|
||||
, _static_columns{static_columns}
|
||||
, _ttl_column{ttl_column}
|
||||
, _properties{properties}
|
||||
, _if_not_exists{if_not_exists}
|
||||
, _id(id)
|
||||
@@ -128,13 +123,6 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
|
||||
#endif
|
||||
|
||||
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true);
|
||||
// Remembering which column was designated as the TTL column for row-based
|
||||
// TTL column is done using a "tag" extension. If there is no TTL column,
|
||||
// we don't need this extension at all.
|
||||
if (_ttl_column) {
|
||||
std::map<sstring, sstring> tags_map = {{TTL_TAG_KEY, _ttl_column->text()}};
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
|
||||
}
|
||||
}
|
||||
|
||||
void create_table_statement::add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, const std::vector<data_type>& types, column_kind kind) const
|
||||
@@ -210,7 +198,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
}
|
||||
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;
|
||||
|
||||
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _ttl_column, _properties.properties()->get_id());
|
||||
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
|
||||
|
||||
bool ks_uses_tablets;
|
||||
try {
|
||||
@@ -415,27 +403,6 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
}
|
||||
}
|
||||
|
||||
// If a TTL column is defined, it must be a regular column - not a static
|
||||
// column or part of the primary key.
|
||||
if (_ttl_column) {
|
||||
if (!db.features().cql_row_ttl) {
|
||||
throw exceptions::invalid_request_exception("The CQL per-row TTL feature is not yet supported by this cluster. Upgrade all nodes to use it.");
|
||||
}
|
||||
for (const auto& alias : key_aliases) {
|
||||
if (alias->text() == _ttl_column->text()) {
|
||||
throw exceptions::invalid_request_exception(format("TTL column {} cannot be part of the PRIMARY KEY", alias->text()));
|
||||
}
|
||||
}
|
||||
for (const auto& alias : _column_aliases) {
|
||||
if (alias->text() == _ttl_column->text()) {
|
||||
throw exceptions::invalid_request_exception(format("TTL column {} cannot be part of the PRIMARY KEY", alias->text()));
|
||||
}
|
||||
}
|
||||
if (_static_columns.contains(_ttl_column)) {
|
||||
throw exceptions::invalid_request_exception(format("TTL column {} cannot be a static column", _ttl_column->text()));
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_unique<prepared_statement>(audit_info(), stmt, std::move(stmt_warnings));
|
||||
}
|
||||
|
||||
@@ -458,23 +425,12 @@ data_type create_table_statement::raw_statement::get_type_and_remove(column_map_
|
||||
return _properties.get_reversable_type(*t, type);
|
||||
}
|
||||
|
||||
void create_table_statement::raw_statement::add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static, bool is_ttl) {
|
||||
void create_table_statement::raw_statement::add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static) {
|
||||
_defined_names.emplace(def);
|
||||
_definitions.emplace(def, type);
|
||||
if (is_static) {
|
||||
_static_columns.emplace(def);
|
||||
}
|
||||
if (is_ttl) {
|
||||
if (_ttl_column) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Cannot have more than one TTL column in a table. Saw {} and {}", _ttl_column->text(), def->text()));
|
||||
}
|
||||
// FIXME: find a way to check cql3_type::raw without fmt::format
|
||||
auto type_name = fmt::format("{}", type);
|
||||
if (type_name != "timestamp" && type_name != "bigint" && type_name != "int") {
|
||||
throw exceptions::invalid_request_exception(fmt::format("TTL column '{}' must be of type timestamp, bigint or int, can't be {}", def->text(), type_name));
|
||||
}
|
||||
_ttl_column = def;
|
||||
}
|
||||
}
|
||||
|
||||
void create_table_statement::raw_statement::add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases) {
|
||||
|
||||
@@ -57,7 +57,6 @@ class create_table_statement : public schema_altering_statement {
|
||||
shared_ptr_equal_by_value<column_identifier>>;
|
||||
column_map_type _columns;
|
||||
column_set_type _static_columns;
|
||||
::shared_ptr<column_identifier> _ttl_column; // for row-based TTL
|
||||
const ::shared_ptr<cf_prop_defs> _properties;
|
||||
const bool _if_not_exists;
|
||||
std::optional<table_id> _id;
|
||||
@@ -66,7 +65,6 @@ public:
|
||||
::shared_ptr<cf_prop_defs> properties,
|
||||
bool if_not_exists,
|
||||
column_set_type static_columns,
|
||||
::shared_ptr<column_identifier> ttl_column,
|
||||
const std::optional<table_id>& id);
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
@@ -102,7 +100,6 @@ private:
|
||||
std::vector<std::vector<::shared_ptr<column_identifier>>> _key_aliases;
|
||||
std::vector<::shared_ptr<column_identifier>> _column_aliases;
|
||||
create_table_statement::column_set_type _static_columns;
|
||||
::shared_ptr<column_identifier> _ttl_column; // for row-based TTL
|
||||
|
||||
std::multiset<::shared_ptr<column_identifier>,
|
||||
indirect_less<::shared_ptr<column_identifier>, column_identifier::text_comparator>> _defined_names;
|
||||
@@ -119,7 +116,7 @@ public:
|
||||
|
||||
data_type get_type_and_remove(column_map_type& columns, ::shared_ptr<column_identifier> t);
|
||||
|
||||
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static, bool is_ttl);
|
||||
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static);
|
||||
|
||||
void add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases);
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ public:
|
||||
std::vector<sstring> warnings;
|
||||
private:
|
||||
cql_metadata_id_type _metadata_id;
|
||||
bool _result_metadata_is_empty;
|
||||
|
||||
public:
|
||||
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
|
||||
@@ -71,6 +72,15 @@ public:
|
||||
void calculate_metadata_id();
|
||||
|
||||
cql_metadata_id_type get_metadata_id() const;
|
||||
|
||||
bool result_metadata_is_empty() const {
|
||||
return _result_metadata_is_empty;
|
||||
}
|
||||
|
||||
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
|
||||
_metadata_id = std::move(metadata_id);
|
||||
_result_metadata_is_empty = false;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
|
||||
, partition_key_bind_indices(std::move(partition_key_bind_indices))
|
||||
, warnings(std::move(warnings))
|
||||
, _metadata_id(bytes{})
|
||||
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
|
||||
{
|
||||
statement->set_audit_info(std::move(audit_info));
|
||||
}
|
||||
|
||||
@@ -2004,7 +2004,9 @@ static std::optional<ann_ordering_info> get_ann_ordering_info(
|
||||
|
||||
auto indexes = sim.list_indexes();
|
||||
auto it = std::find_if(indexes.begin(), indexes.end(), [&prepared_ann_ordering](const auto& ind) {
|
||||
return secondary_index::vector_index::is_vector_index_on_column(ind.metadata(), prepared_ann_ordering.first->name_as_text());
|
||||
return (ind.metadata().options().contains(db::index::secondary_index::custom_class_option_name) &&
|
||||
ind.metadata().options().at(db::index::secondary_index::custom_class_option_name) == ANN_CUSTOM_INDEX_OPTION) &&
|
||||
(ind.target_column() == prepared_ann_ordering.first->name_as_text());
|
||||
});
|
||||
|
||||
if (it == indexes.end()) {
|
||||
|
||||
@@ -55,21 +55,8 @@ int32_t batchlog_shard_of(db_clock::time_point written_at) {
|
||||
return hash & ((1ULL << batchlog_shard_bits) - 1);
|
||||
}
|
||||
|
||||
bool is_batchlog_v1(const schema& schema) {
|
||||
return schema.cf_name() == system_keyspace::BATCHLOG;
|
||||
}
|
||||
|
||||
std::pair<partition_key, clustering_key>
|
||||
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional<utils::UUID> id) {
|
||||
if (is_batchlog_v1(schema)) {
|
||||
if (!id) {
|
||||
on_internal_error(blogger, "get_batchlog_key(): key for batchlog v1 requires batchlog id");
|
||||
}
|
||||
auto pkey = partition_key::from_single_value(schema, {serialized(*id)});
|
||||
auto ckey = clustering_key::make_empty();
|
||||
return std::pair(std::move(pkey), std::move(ckey));
|
||||
}
|
||||
|
||||
auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)});
|
||||
|
||||
std::vector<bytes> ckey_components;
|
||||
@@ -98,14 +85,6 @@ mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_
|
||||
auto cdef_data = schema->get_column_definition(to_bytes("data"));
|
||||
m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
|
||||
|
||||
if (is_batchlog_v1(*schema)) {
|
||||
auto cdef_version = schema->get_column_definition(to_bytes("version"));
|
||||
m.set_cell(ckey, *cdef_version, atomic_cell::make_live(*cdef_version->type, timestamp, serialized(version)));
|
||||
|
||||
auto cdef_written_at = schema->get_column_definition(to_bytes("written_at"));
|
||||
m.set_cell(ckey, *cdef_written_at, atomic_cell::make_live(*cdef_written_at->type, timestamp, serialized(now)));
|
||||
}
|
||||
|
||||
return m;
|
||||
}
|
||||
|
||||
@@ -143,10 +122,9 @@ mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clo
|
||||
const std::chrono::seconds db::batchlog_manager::replay_interval;
|
||||
const uint32_t db::batchlog_manager::page_size;
|
||||
|
||||
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config)
|
||||
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
|
||||
: _qp(qp)
|
||||
, _sys_ks(sys_ks)
|
||||
, _fs(fs)
|
||||
, _replay_timeout(config.replay_timeout)
|
||||
, _replay_rate(config.replay_rate)
|
||||
, _delay(config.delay)
|
||||
@@ -322,206 +300,149 @@ future<> db::batchlog_manager::maybe_migrate_v1_to_v2() {
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
using clock_type = db_clock::rep;
|
||||
|
||||
struct replay_stats {
|
||||
std::optional<db_clock::time_point> min_too_fresh;
|
||||
bool need_cleanup = false;
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
static future<db::all_batches_replayed> process_batch(
|
||||
cql3::query_processor& qp,
|
||||
db::batchlog_manager::stats& stats,
|
||||
db::batchlog_manager::post_replay_cleanup cleanup,
|
||||
utils::rate_limiter& limiter,
|
||||
schema_ptr schema,
|
||||
std::unordered_map<int32_t, replay_stats>& replay_stats_per_shard,
|
||||
const db_clock::time_point now,
|
||||
db_clock::duration replay_timeout,
|
||||
std::chrono::seconds write_timeout,
|
||||
const cql3::untyped_result_set::row& row) {
|
||||
const bool is_v1 = db::is_batchlog_v1(*schema);
|
||||
const auto stage = is_v1 ? db::batchlog_stage::initial : static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
|
||||
const auto batch_shard = is_v1 ? 0 : row.get_as<int32_t>("shard");
|
||||
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
||||
auto id = row.get_as<utils::UUID>("id");
|
||||
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
||||
auto timeout = replay_timeout;
|
||||
|
||||
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
|
||||
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
|
||||
co_return db::all_batches_replayed::no;
|
||||
}
|
||||
|
||||
auto data = row.get_blob_unfragmented("data");
|
||||
|
||||
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
|
||||
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
bool send_failed = false;
|
||||
|
||||
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
|
||||
|
||||
try {
|
||||
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
|
||||
auto in = ser::as_input_stream(data);
|
||||
while (in.size()) {
|
||||
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
|
||||
const auto tbl = qp.db().try_find_table(fm.column_family_id());
|
||||
if (!tbl) {
|
||||
continue;
|
||||
}
|
||||
if (written_at <= tbl->get_truncation_time()) {
|
||||
continue;
|
||||
}
|
||||
schema_ptr s = tbl->schema();
|
||||
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
|
||||
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
|
||||
}
|
||||
fms.emplace_back(std::move(fm), std::move(s));
|
||||
}
|
||||
|
||||
if (now < written_at + timeout) {
|
||||
blogger.debug("Skipping replay of {}, too fresh", id);
|
||||
|
||||
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
|
||||
|
||||
co_return db::all_batches_replayed::no;
|
||||
}
|
||||
|
||||
auto size = data.size();
|
||||
|
||||
for (const auto& [fm, s] : fms) {
|
||||
mutations.emplace_back(fm.to_mutation(s));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
if (!mutations.empty()) {
|
||||
const auto ttl = [written_at]() -> clock_type {
|
||||
/*
|
||||
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
|
||||
* This ensures that deletes aren't "undone" by an old batch replay.
|
||||
*/
|
||||
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
|
||||
warn(unimplemented::cause::HINT);
|
||||
#if 0
|
||||
for (auto& m : *mutations) {
|
||||
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
|
||||
}
|
||||
#endif
|
||||
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
|
||||
}();
|
||||
|
||||
if (ttl > 0) {
|
||||
// Origin does the send manually, however I can't see a super great reason to do so.
|
||||
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
|
||||
// in both cases.
|
||||
// FIXME: verify that the above is reasonably true.
|
||||
co_await limiter.reserve(size);
|
||||
stats.write_attempts += mutations.size();
|
||||
auto timeout = db::timeout_clock::now() + write_timeout;
|
||||
if (cleanup) {
|
||||
co_await qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
|
||||
} else {
|
||||
co_await qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (data_dictionary::no_such_keyspace& ex) {
|
||||
// should probably ignore and drop the batch
|
||||
} catch (const data_dictionary::no_such_column_family&) {
|
||||
// As above -- we should drop the batch if the table doesn't exist anymore.
|
||||
} catch (...) {
|
||||
blogger.warn("Replay failed (will retry): {}", std::current_exception());
|
||||
// timeout, overload etc.
|
||||
// Do _not_ remove the batch, assuning we got a node write error.
|
||||
// Since we don't have hints (which origin is satisfied with),
|
||||
// we have to resort to keeping this batch to next lap.
|
||||
if (is_v1 || !cleanup || stage == db::batchlog_stage::failed_replay) {
|
||||
co_return db::all_batches_replayed::no;
|
||||
}
|
||||
send_failed = true;
|
||||
}
|
||||
|
||||
auto& sp = qp.proxy();
|
||||
|
||||
if (send_failed) {
|
||||
blogger.debug("Moving batch {} to stage failed_replay", id);
|
||||
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, db::batchlog_stage::failed_replay, written_at, id);
|
||||
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
}
|
||||
|
||||
// delete batch
|
||||
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
|
||||
co_await qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
shard_written_at.need_cleanup = true;
|
||||
|
||||
co_return db::all_batches_replayed(!send_failed);
|
||||
}
|
||||
|
||||
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v1(post_replay_cleanup) {
|
||||
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
|
||||
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
|
||||
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
|
||||
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
|
||||
utils::rate_limiter limiter(throttle);
|
||||
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
|
||||
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
|
||||
|
||||
// Use a stable `now` across all batches, so skip/replay decisions are the
|
||||
// same across a while prefix of written_at (across all ids).
|
||||
const auto now = db_clock::now();
|
||||
|
||||
auto batch = [this, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
||||
all_replayed = all_replayed && co_await process_batch(_qp, _stats, post_replay_cleanup::no, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
|
||||
co_await with_gate(_gate, [this, &all_replayed, batch = std::move(batch)] () mutable -> future<> {
|
||||
blogger.debug("Started replayAllFailedBatches");
|
||||
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
|
||||
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
|
||||
db::consistency_level::ONE,
|
||||
{},
|
||||
page_size,
|
||||
batch);
|
||||
|
||||
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
|
||||
});
|
||||
|
||||
co_return all_replayed;
|
||||
}
|
||||
|
||||
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v2(post_replay_cleanup cleanup) {
|
||||
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
|
||||
co_await maybe_migrate_v1_to_v2();
|
||||
|
||||
typedef db_clock::rep clock_type;
|
||||
|
||||
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
|
||||
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
|
||||
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
|
||||
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
|
||||
utils::rate_limiter limiter(throttle);
|
||||
auto limiter = make_lw_shared<utils::rate_limiter>(throttle);
|
||||
|
||||
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
||||
|
||||
struct replay_stats {
|
||||
std::optional<db_clock::time_point> min_too_fresh;
|
||||
bool need_cleanup = false;
|
||||
};
|
||||
|
||||
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
|
||||
|
||||
// Use a stable `now` across all batches, so skip/replay decisions are the
|
||||
// same across a while prefix of written_at (across all ids).
|
||||
const auto now = db_clock::now();
|
||||
|
||||
auto batch = [this, cleanup, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
||||
all_replayed = all_replayed && co_await process_batch(_qp, _stats, cleanup, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
|
||||
auto batch = [this, cleanup, limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
const auto stage = static_cast<batchlog_stage>(row.get_as<int8_t>("stage"));
|
||||
const auto batch_shard = row.get_as<int32_t>("shard");
|
||||
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
||||
auto id = row.get_as<utils::UUID>("id");
|
||||
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
||||
auto timeout = _replay_timeout;
|
||||
|
||||
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
|
||||
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
|
||||
all_replayed = all_batches_replayed::no;
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto data = row.get_blob_unfragmented("data");
|
||||
|
||||
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
|
||||
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
bool send_failed = false;
|
||||
|
||||
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
|
||||
|
||||
try {
|
||||
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
|
||||
auto in = ser::as_input_stream(data);
|
||||
while (in.size()) {
|
||||
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
|
||||
const auto tbl = _qp.db().try_find_table(fm.column_family_id());
|
||||
if (!tbl) {
|
||||
continue;
|
||||
}
|
||||
if (written_at <= tbl->get_truncation_time()) {
|
||||
continue;
|
||||
}
|
||||
schema_ptr s = tbl->schema();
|
||||
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
|
||||
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
|
||||
}
|
||||
fms.emplace_back(std::move(fm), std::move(s));
|
||||
}
|
||||
|
||||
if (now < written_at + timeout) {
|
||||
blogger.debug("Skipping replay of {}, too fresh", id);
|
||||
|
||||
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
|
||||
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
auto size = data.size();
|
||||
|
||||
for (const auto& [fm, s] : fms) {
|
||||
mutations.emplace_back(fm.to_mutation(s));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
if (!mutations.empty()) {
|
||||
const auto ttl = [written_at]() -> clock_type {
|
||||
/*
|
||||
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
|
||||
* This ensures that deletes aren't "undone" by an old batch replay.
|
||||
*/
|
||||
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
|
||||
warn(unimplemented::cause::HINT);
|
||||
#if 0
|
||||
for (auto& m : *mutations) {
|
||||
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
|
||||
}
|
||||
#endif
|
||||
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
|
||||
}();
|
||||
|
||||
if (ttl > 0) {
|
||||
// Origin does the send manually, however I can't see a super great reason to do so.
|
||||
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
|
||||
// in both cases.
|
||||
// FIXME: verify that the above is reasonably true.
|
||||
co_await limiter->reserve(size);
|
||||
_stats.write_attempts += mutations.size();
|
||||
auto timeout = db::timeout_clock::now() + write_timeout;
|
||||
if (cleanup) {
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
|
||||
} else {
|
||||
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (data_dictionary::no_such_keyspace& ex) {
|
||||
// should probably ignore and drop the batch
|
||||
} catch (const data_dictionary::no_such_column_family&) {
|
||||
// As above -- we should drop the batch if the table doesn't exist anymore.
|
||||
} catch (...) {
|
||||
blogger.warn("Replay failed (will retry): {}", std::current_exception());
|
||||
all_replayed = all_batches_replayed::no;
|
||||
// timeout, overload etc.
|
||||
// Do _not_ remove the batch, assuning we got a node write error.
|
||||
// Since we don't have hints (which origin is satisfied with),
|
||||
// we have to resort to keeping this batch to next lap.
|
||||
if (!cleanup || stage == batchlog_stage::failed_replay) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
send_failed = true;
|
||||
}
|
||||
|
||||
auto& sp = _qp.proxy();
|
||||
|
||||
if (send_failed) {
|
||||
blogger.debug("Moving batch {} to stage failed_replay", id);
|
||||
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, batchlog_stage::failed_replay, written_at, id);
|
||||
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
}
|
||||
|
||||
// delete batch
|
||||
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
|
||||
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
||||
|
||||
shard_written_at.need_cleanup = true;
|
||||
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
|
||||
@@ -580,10 +501,3 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
|
||||
co_return all_replayed;
|
||||
}
|
||||
|
||||
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
|
||||
if (_fs.batchlog_v2) {
|
||||
return replay_all_failed_batches_v2(cleanup);
|
||||
}
|
||||
return replay_all_failed_batches_v1(cleanup);
|
||||
}
|
||||
|
||||
@@ -27,12 +27,6 @@ class query_processor;
|
||||
|
||||
} // namespace cql3
|
||||
|
||||
namespace gms {
|
||||
|
||||
class feature_service;
|
||||
|
||||
} // namespace gms
|
||||
|
||||
namespace db {
|
||||
|
||||
class system_keyspace;
|
||||
@@ -55,11 +49,6 @@ class batchlog_manager : public peering_sharded_service<batchlog_manager> {
|
||||
public:
|
||||
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
|
||||
|
||||
struct stats {
|
||||
uint64_t write_attempts = 0;
|
||||
};
|
||||
|
||||
|
||||
private:
|
||||
static constexpr std::chrono::seconds replay_interval = std::chrono::seconds(60);
|
||||
static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
|
||||
@@ -67,13 +56,14 @@ private:
|
||||
|
||||
using clock_type = lowres_clock;
|
||||
|
||||
stats _stats;
|
||||
struct stats {
|
||||
uint64_t write_attempts = 0;
|
||||
} _stats;
|
||||
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
|
||||
cql3::query_processor& _qp;
|
||||
db::system_keyspace& _sys_ks;
|
||||
gms::feature_service& _fs;
|
||||
db_clock::duration _replay_timeout;
|
||||
uint64_t _replay_rate;
|
||||
std::chrono::milliseconds _delay;
|
||||
@@ -94,14 +84,12 @@ private:
|
||||
|
||||
future<> maybe_migrate_v1_to_v2();
|
||||
|
||||
future<all_batches_replayed> replay_all_failed_batches_v1(post_replay_cleanup cleanup);
|
||||
future<all_batches_replayed> replay_all_failed_batches_v2(post_replay_cleanup cleanup);
|
||||
future<all_batches_replayed> replay_all_failed_batches(post_replay_cleanup cleanup);
|
||||
public:
|
||||
// Takes a QP, not a distributes. Because this object is supposed
|
||||
// to be per shard and does no dispatching beyond delegating the the
|
||||
// shard qp (which is what you feed here).
|
||||
batchlog_manager(cql3::query_processor&, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config);
|
||||
batchlog_manager(cql3::query_processor&, db::system_keyspace& sys_ks, batchlog_manager_config config);
|
||||
|
||||
// abort the replay loop and return its future.
|
||||
future<> drain();
|
||||
@@ -114,7 +102,7 @@ public:
|
||||
return _last_replay;
|
||||
}
|
||||
|
||||
const stats& get_stats() const {
|
||||
const stats& stats() const {
|
||||
return _stats;
|
||||
}
|
||||
private:
|
||||
|
||||
@@ -1292,7 +1292,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, fd_initial_value_ms(this, "fd_initial_value_ms", value_status::Used, 2 * 1000, "The initial failure_detector interval time in milliseconds.")
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, DEVELOPER_MODE_DEFAULT, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Deprecated, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, experimental_features_help_string())
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step.")
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
|
||||
#include "cql3/cql3_type.hh"
|
||||
#include "types/user.hh"
|
||||
#include "types/map.hh"
|
||||
#include "types/list.hh"
|
||||
@@ -114,7 +113,7 @@ std::vector<data_type> type_parser::get_type_parameters(bool multicell)
|
||||
throw parse_exception(_str, _idx, "unexpected end of string");
|
||||
}
|
||||
|
||||
std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
|
||||
std::tuple<data_type, size_t> type_parser::get_vector_parameters()
|
||||
{
|
||||
if (is_eos() || _str[_idx] != '(') {
|
||||
throw std::logic_error("internal error");
|
||||
@@ -129,7 +128,7 @@ std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
|
||||
}
|
||||
|
||||
data_type type = do_parse(true);
|
||||
vector_dimension_t size = 0;
|
||||
size_t size = 0;
|
||||
if (_str[_idx] == ',') {
|
||||
++_idx;
|
||||
skip_blank();
|
||||
@@ -143,20 +142,7 @@ std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
|
||||
throw parse_exception(_str, _idx, "expected digit or ')'");
|
||||
}
|
||||
|
||||
unsigned long parsed_size;
|
||||
try {
|
||||
parsed_size = std::stoul(_str.substr(i, _idx - i));
|
||||
} catch (const std::exception& e) {
|
||||
throw parse_exception(_str, i, format("Invalid vector dimension: {}", e.what()));
|
||||
}
|
||||
static_assert(sizeof(unsigned long) >= sizeof(vector_dimension_t));
|
||||
if (parsed_size == 0) {
|
||||
throw parse_exception(_str, _idx, "Vectors must have a dimension greater than 0");
|
||||
}
|
||||
if (parsed_size > cql3::cql3_type::MAX_VECTOR_DIMENSION) {
|
||||
throw parse_exception(_str, _idx, format("Vectors must have a dimension less than or equal to {}", cql3::cql3_type::MAX_VECTOR_DIMENSION));
|
||||
}
|
||||
size = static_cast<vector_dimension_t>(parsed_size);
|
||||
size = std::stoul(_str.substr(i, _idx - i));
|
||||
|
||||
++_idx; // skipping ')'
|
||||
return std::make_tuple(type, size);
|
||||
|
||||
@@ -97,7 +97,7 @@ public:
|
||||
}
|
||||
#endif
|
||||
std::vector<data_type> get_type_parameters(bool multicell=true);
|
||||
std::tuple<data_type, vector_dimension_t> get_vector_parameters();
|
||||
std::tuple<data_type, size_t> get_vector_parameters();
|
||||
std::tuple<sstring, bytes, std::vector<bytes>, std::vector<data_type>> get_user_type_parameters();
|
||||
data_type do_parse(bool multicell = true);
|
||||
|
||||
|
||||
@@ -21,16 +21,14 @@
|
||||
#include "replica/database.hh"
|
||||
#include "replica/global_table_ptr.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
logging::logger snap_log("snapshots");
|
||||
|
||||
namespace db {
|
||||
|
||||
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>& sp, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
|
||||
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
|
||||
: _config(std::move(cfg))
|
||||
, _db(db)
|
||||
, _sp(sp)
|
||||
, _ops("snapshot_ctl")
|
||||
, _task_manager_module(make_shared<snapshot::task_manager_module>(tm))
|
||||
, _storage_manager(sstm)
|
||||
@@ -106,45 +104,6 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
if (tag.empty()) {
|
||||
throw std::invalid_argument("You must supply a snapshot name.");
|
||||
}
|
||||
if (ks_names.size() != 1 && !tables.empty()) {
|
||||
throw std::invalid_argument("Cannot name tables when doing multiple keyspaces snapshot");
|
||||
}
|
||||
if (ks_names.empty()) {
|
||||
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(ks_names));
|
||||
}
|
||||
|
||||
return run_snapshot_modify_operation([this, ks_names = std::move(ks_names), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
|
||||
return do_take_cluster_column_family_snapshot(std::move(ks_names), std::move(tables), std::move(tag), opts);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
if (tables.empty()) {
|
||||
co_await coroutine::parallel_for_each(ks_names, [tag, this] (const auto& ks_name) {
|
||||
return check_snapshot_not_exist(ks_name, tag);
|
||||
});
|
||||
co_await _sp.local().snapshot_keyspace(
|
||||
ks_names | std::views::transform([&](auto& ks) { return std::make_pair(ks, sstring{}); })
|
||||
| std::ranges::to<std::unordered_multimap>(),
|
||||
tag, opts
|
||||
);
|
||||
co_return;
|
||||
};
|
||||
|
||||
auto ks = ks_names[0];
|
||||
co_await check_snapshot_not_exist(ks, tag, tables);
|
||||
|
||||
co_await _sp.local().snapshot_keyspace(
|
||||
tables | std::views::transform([&](auto& cf) { return std::make_pair(ks, cf); })
|
||||
| std::ranges::to<std::unordered_multimap>(),
|
||||
tag, opts
|
||||
);
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
co_await check_snapshot_not_exist(ks_name, tag, tables);
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
|
||||
@@ -226,4 +185,4 @@ future<int64_t> snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) {
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@
|
||||
using namespace seastar;
|
||||
|
||||
namespace sstables { class storage_manager; }
|
||||
namespace service { class storage_proxy; }
|
||||
|
||||
namespace db {
|
||||
|
||||
@@ -64,7 +63,7 @@ public:
|
||||
|
||||
using db_snapshot_details = std::vector<table_snapshot_details_ext>;
|
||||
|
||||
snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>&, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
|
||||
snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
|
||||
|
||||
future<> stop();
|
||||
|
||||
@@ -96,17 +95,6 @@ public:
|
||||
*/
|
||||
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
|
||||
/**
|
||||
* Takes the snapshot of multiple tables or a whole keyspace, or all keyspaces,
|
||||
* using global, clusterwide topology coordinated op.
|
||||
* A snapshot name must be specified.
|
||||
*
|
||||
* @param ks_names the keyspaces to snapshot
|
||||
* @param tables optional - a vector of tables names to snapshot
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
*/
|
||||
future<> take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
|
||||
/**
|
||||
* Remove the snapshot with the given name from the given keyspaces.
|
||||
* If no tag is specified we will remove all snapshots.
|
||||
@@ -123,7 +111,6 @@ public:
|
||||
private:
|
||||
config _config;
|
||||
sharded<replica::database>& _db;
|
||||
sharded<service::storage_proxy>& _sp;
|
||||
seastar::rwlock _lock;
|
||||
seastar::named_gate _ops;
|
||||
shared_ptr<snapshot::task_manager_module> _task_manager_module;
|
||||
@@ -146,7 +133,6 @@ private:
|
||||
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
future<> do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -770,6 +770,13 @@ system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) {
|
||||
co_return res;
|
||||
}
|
||||
|
||||
bool system_distributed_keyspace::workload_prioritization_tables_exists() {
|
||||
auto wp_table = get_updated_service_levels(_qp.db(), true);
|
||||
auto table = _qp.db().try_find_table(NAME, wp_table->cf_name());
|
||||
|
||||
return table && table->schema()->equal_columns(*wp_table);
|
||||
}
|
||||
|
||||
future<qos::service_levels_info> system_distributed_keyspace::get_service_levels(qos::query_context ctx) const {
|
||||
return qos::get_service_levels(_qp, NAME, SERVICE_LEVELS, db::consistency_level::ONE, ctx);
|
||||
}
|
||||
|
||||
@@ -117,6 +117,7 @@ public:
|
||||
future<qos::service_levels_info> get_service_level(sstring service_level_name) const;
|
||||
future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const;
|
||||
future<> drop_service_level(sstring service_level_name) const;
|
||||
bool workload_prioritization_tables_exists();
|
||||
|
||||
private:
|
||||
future<> create_tables(std::vector<schema_ptr> tables);
|
||||
|
||||
@@ -335,10 +335,6 @@ schema_ptr system_keyspace::topology_requests() {
|
||||
.with_column("truncate_table_id", uuid_type)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type)
|
||||
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.with_column("snapshot_table_ids", set_type_impl::get_instance(uuid_type, false))
|
||||
.with_column("snapshot_tag", utf8_type)
|
||||
.with_column("snapshot_expiry", timestamp_type)
|
||||
.with_column("snapshot_skip_flush", boolean_type)
|
||||
.set_comment("Topology request tracking")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
@@ -3585,18 +3581,6 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
|
||||
entry.new_keyspace_rf_change_ks_name = row.get_as<sstring>("new_keyspace_rf_change_ks_name");
|
||||
entry.new_keyspace_rf_change_data = row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
|
||||
}
|
||||
if (row.has("snapshot_table_ids")) {
|
||||
entry.snapshot_tag = row.get_as<sstring>("snapshot_tag");
|
||||
entry.snapshot_skip_flush = row.get_as<bool>("snapshot_skip_flush");
|
||||
entry.snapshot_table_ids = row.get_set<utils::UUID>("snapshot_table_ids")
|
||||
| std::views::transform([](auto& uuid) { return table_id(uuid); })
|
||||
| std::ranges::to<std::unordered_set>()
|
||||
;
|
||||
;
|
||||
if (row.has("snapshot_expiry")) {
|
||||
entry.snapshot_expiry = row.get_as<db_clock::time_point>("snapshot_expiry");
|
||||
}
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
@@ -417,10 +417,6 @@ public:
|
||||
std::optional<sstring> new_keyspace_rf_change_ks_name;
|
||||
// The KS options to be used when executing the scheduled ALTER KS statement
|
||||
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_data;
|
||||
std::optional<std::unordered_set<table_id>> snapshot_table_ids;
|
||||
std::optional<sstring> snapshot_tag;
|
||||
std::optional<db_clock::time_point> snapshot_expiry;
|
||||
bool snapshot_skip_flush;
|
||||
};
|
||||
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;
|
||||
|
||||
|
||||
@@ -2308,7 +2308,6 @@ future<> view_builder::drain() {
|
||||
vlogger.info("Draining view builder");
|
||||
_as.request_abort();
|
||||
co_await _mnotifier.unregister_listener(this);
|
||||
co_await _ops_gate.close();
|
||||
co_await _vug.drain();
|
||||
co_await _sem.wait();
|
||||
_sem.broken();
|
||||
@@ -2743,48 +2742,30 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na
|
||||
}
|
||||
|
||||
// Do it in the background, serialized and broadcast from shard 0.
|
||||
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
|
||||
return dispatch_create_view(std::move(ks_name), std::move(view_name));
|
||||
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
static_cast<void>(dispatch_create_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to dispatch view creation {}.{}: {}", ks_name, view_name, ep);
|
||||
}));
|
||||
}
|
||||
|
||||
future<> view_builder::dispatch_update_view(sstring ks_name, sstring view_name) {
|
||||
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::nullopt);
|
||||
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
|
||||
if (step_it == _base_to_build_step.end()) {
|
||||
co_return; // In case all the views for this CF have finished building already.
|
||||
}
|
||||
auto status_it = std::ranges::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
|
||||
return bs.view->id() == view->id();
|
||||
});
|
||||
if (status_it != step_it->second.build_status.end()) {
|
||||
status_it->view = std::move(view);
|
||||
}
|
||||
}
|
||||
|
||||
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
|
||||
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do it in the background, serialized.
|
||||
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
|
||||
return dispatch_update_view(std::move(ks_name), std::move(view_name));
|
||||
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const seastar::gate_closed_exception&) {
|
||||
vlogger.warn("Ignoring gate_closed_exception during view update {}.{}", ks_name, view_name);
|
||||
} catch (const seastar::broken_named_semaphore&) {
|
||||
vlogger.warn("Ignoring broken_named_semaphore during view update {}.{}", ks_name, view_name);
|
||||
} catch (const replica::no_such_column_family&) {
|
||||
vlogger.warn("Ignoring no_such_column_family during view update {}.{}", ks_name, view_name);
|
||||
(void)with_semaphore(_sem, view_builder_semaphore_units, [ks_name, view_name, this] {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
|
||||
if (step_it == _base_to_build_step.end()) {
|
||||
return;// In case all the views for this CF have finished building already.
|
||||
}
|
||||
}));
|
||||
auto status_it = std::ranges::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
|
||||
return bs.view->id() == view->id();
|
||||
});
|
||||
if (status_it != step_it->second.build_status.end()) {
|
||||
status_it->view = std::move(view);
|
||||
}
|
||||
}).handle_exception_type([] (replica::no_such_column_family&) { });
|
||||
}
|
||||
|
||||
future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) {
|
||||
@@ -2846,9 +2827,7 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
|
||||
}
|
||||
|
||||
// Do it in the background, serialized and broadcast from shard 0.
|
||||
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
|
||||
return dispatch_drop_view(std::move(ks_name), std::move(view_name));
|
||||
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
static_cast<void>(dispatch_drop_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to dispatch view drop {}.{}: {}", ks_name, view_name, ep);
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
@@ -191,7 +190,6 @@ class view_builder final : public service::migration_listener::only_view_notific
|
||||
// Guard the whole startup routine with a semaphore so that it's not intercepted by
|
||||
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
|
||||
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
|
||||
seastar::gate _ops_gate;
|
||||
seastar::abort_source _as;
|
||||
future<> _step_fiber = make_ready_future<>();
|
||||
// Used to coordinate between shards the conclusion of the build process for a particular view.
|
||||
@@ -286,7 +284,6 @@ private:
|
||||
future<> mark_as_built(view_ptr);
|
||||
void setup_metrics();
|
||||
future<> dispatch_create_view(sstring ks_name, sstring view_name);
|
||||
future<> dispatch_update_view(sstring ks_name, sstring view_name);
|
||||
future<> dispatch_drop_view(sstring ks_name, sstring view_name);
|
||||
future<> handle_seed_view_build_progress(const sstring& ks_name, const sstring& view_name);
|
||||
future<> handle_create_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units);
|
||||
|
||||
@@ -98,13 +98,14 @@ public:
|
||||
auto hostid = eps.get_host_id();
|
||||
|
||||
set_cell(cr, "up", gossiper.is_alive(hostid));
|
||||
if (gossiper.is_shutdown(endpoint)) {
|
||||
if (!ss.raft_topology_change_enabled() || gossiper.is_shutdown(endpoint)) {
|
||||
set_cell(cr, "status", gossiper.get_gossip_status(endpoint));
|
||||
} else {
|
||||
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
|
||||
}
|
||||
set_cell(cr, "load", gossiper.get_application_state_value(endpoint, gms::application_state::LOAD));
|
||||
|
||||
if (ss.raft_topology_change_enabled() && !gossiper.is_shutdown(endpoint)) {
|
||||
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
|
||||
}
|
||||
set_cell(cr, "host_id", hostid.uuid());
|
||||
|
||||
if (tm.get_topology().has_node(hostid)) {
|
||||
|
||||
6
dist/docker/redhat/build_docker.sh
vendored
6
dist/docker/redhat/build_docker.sh
vendored
@@ -97,9 +97,7 @@ bcp LICENSE-ScyllaDB-Source-Available.md /licenses/
|
||||
|
||||
run microdnf clean all
|
||||
run microdnf --setopt=tsflags=nodocs -y update
|
||||
run microdnf --setopt=tsflags=nodocs -y install hostname kmod procps-ng python3 python3-pip cpio
|
||||
# Extract only systemctl binary from systemd package to avoid installing the whole systemd in the container.
|
||||
run bash -rc "microdnf download systemd && rpm2cpio systemd-*.rpm | cpio -idmv ./usr/bin/systemctl && rm -rf systemd-*.rpm"
|
||||
run microdnf --setopt=tsflags=nodocs -y install hostname kmod procps-ng python3 python3-pip
|
||||
run curl -L --output /etc/yum.repos.d/scylla.repo ${repo_file_url}
|
||||
run pip3 install --no-cache-dir --prefix /usr supervisor
|
||||
run bash -ec "echo LANG=C.UTF-8 > /etc/locale.conf"
|
||||
@@ -108,8 +106,6 @@ run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
|
||||
run mkdir -p /var/log/scylla
|
||||
run chown -R scylla:scylla /var/lib/scylla
|
||||
run sed -i -e 's/^SCYLLA_ARGS=".*"$/SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --network-stack posix"/' /etc/sysconfig/scylla-server
|
||||
# Cleanup packages not needed in the final image and clean package manager cache to reduce image size.
|
||||
run bash -rc "microdnf remove -y cpio && microdnf clean all"
|
||||
|
||||
run mkdir -p /opt/scylladb/supervisor
|
||||
run touch /opt/scylladb/SCYLLA-CONTAINER-FILE
|
||||
|
||||
@@ -142,6 +142,10 @@ want modify a non-top-level attribute directly (e.g., a.b[3].c) need RMW:
|
||||
Alternator implements such requests by reading the entire top-level
|
||||
attribute a, modifying only a.b[3].c, and then writing back a.
|
||||
|
||||
Currently, Alternator doesn't use Tablets. That's because Alternator relies
|
||||
on LWT (lightweight transactions), and LWT is not supported in keyspaces
|
||||
with Tablets enabled.
|
||||
|
||||
```{eval-rst}
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
|
||||
@@ -187,23 +187,6 @@ You can create a keyspace with tablets enabled with the ``tablets = {'enabled':
|
||||
the keyspace schema with ``tablets = { 'enabled': false }`` or
|
||||
``tablets = { 'enabled': true }``.
|
||||
|
||||
.. _keyspace-rf-rack-valid-to-enforce-rack-list:
|
||||
|
||||
Enforcing Rack-List Replication for Tablet Keyspaces
|
||||
------------------------------------------------------------------
|
||||
|
||||
The ``rf_rack_valid_keyspaces`` is a legacy option that ensures that all keyspaces with tablets enabled are
|
||||
:term:`RF-rack-valid <RF-rack-valid keyspace>`.
|
||||
|
||||
Requiring every tablet keyspace to use the rack list replication factor exclusively is enough to guarantee the keyspace is
|
||||
:term:`RF-rack-valid <RF-rack-valid keyspace>`. It reduces restrictions and provides stronger guarantees compared
|
||||
to ``rf_rack_valid_keyspaces`` option.
|
||||
|
||||
To enforce rack list in tablet keyspaces, use ``enforce_rack_list`` option. It can be set only if all tablet keyspaces use
|
||||
rack list. To ensure that, follow a procedure of :ref:`conversion to rack list replication factor <conversion-to-rack-list-rf>`.
|
||||
After that restart all nodes in the cluster, with ``enforce_rack_list`` enabled and ``rf_rack_valid_keyspaces`` disabled. Make
|
||||
sure to avoid setting or updating replication factor (with CREATE KEYSPACE or ALTER KEYSPACE) while nodes are being restarted.
|
||||
|
||||
.. _tablets-limitations:
|
||||
|
||||
Limitations and Unsupported Features
|
||||
|
||||
@@ -402,82 +402,3 @@ it also describes authentication/authorization and service levels. Additionally,
|
||||
statement: `DESCRIBE SCHEMA WITH INTERNALS AND PASSWORDS`, which also includes the information about hashed passwords of the roles.
|
||||
|
||||
For more details, see [the article on DESCRIBE SCHEMA](./describe-schema.rst).
|
||||
|
||||
## Per-row TTL
|
||||
|
||||
CQL's traditional time-to-live (TTL) feature attaches an expiration time to
|
||||
each cell - i.e., each value in each column. For example, the statement:
|
||||
```
|
||||
UPDATE tbl USING TTL 60 SET x = 1 WHERE p = 2
|
||||
```
|
||||
Sets a new value for the column `x` in row `p = 2`, and asks for this value to
|
||||
expire in 60 seconds. When a row is updated incrementally, with different
|
||||
columns set at different times, this can result in different pieces of the
|
||||
row expiring at different times. Applications rarely want partially-expired
|
||||
rows, so they often need to re-write an entire row each time the row needs
|
||||
updating. In particular, it is not possible to change the expiration time of
|
||||
an existing row without re-writing it.
|
||||
|
||||
Per-row time-to-live (TTL) is a new CQL feature that is an alternative to
|
||||
the traditional per-cell TTL. One column is designated as the "expiration
|
||||
time" column, and the value of this column determines when the entire row
|
||||
will expire. It becomes possible to update pieces of a row without changing
|
||||
its expiration time, and vice versa - to change a row's expiration time
|
||||
without rewriting its data.
|
||||
|
||||
The expiration-time column of a table can be chosen when it is created by
|
||||
adding the keyword "TTL" to one of the columns:
|
||||
```cql
|
||||
CREATE TABLE tab (
|
||||
id int PRIMARY KEY,
|
||||
t text,
|
||||
expiration timestamp TTL
|
||||
);
|
||||
```
|
||||
The TTL column's name, in this example `expiration`, can be anything.
|
||||
|
||||
Per-row TTL can also be enabled on an existing table by adding the "TTL"
|
||||
designation to one of the existing columns, with:
|
||||
```cql
|
||||
ALTER TABLE tab TTL colname
|
||||
```
|
||||
Or per-row TTL can be disabled (rows will never expire), with:
|
||||
```cql
|
||||
ALTER TABLE tab TTL NULL
|
||||
```
|
||||
|
||||
It is not possible to enable per-row TTL if it's already enabled, or disable
|
||||
it when already disabled. If you have TTL enabled on one column and want to
|
||||
enable it instead on a second column, you must do it in two steps: First
|
||||
disable TTL and then re-enable it on the second column.
|
||||
|
||||
The designated TTL column must have the type `timestamp` or `bigint`,
|
||||
and specifies the absolute time when the row should expire (the `bigint`
|
||||
type is interpreted as seconds since the UNIX epoch). It must be a regular
|
||||
column (not a primary key column or a static column), and there can only be
|
||||
one such column.
|
||||
|
||||
The 32-bit type `int` (specifying number of seconds since the UNIX epoch)
|
||||
is also supported, but not recommended because it will wrap around in 2038.
|
||||
Unless you must use the `int` type because of pre-existing expiration data
|
||||
with that type, please prefer `timestamp` or `bigint`.
|
||||
|
||||
Another important feature of per-row TTL is that if CDC is enabled, when a
|
||||
row expires a deletion event appears in the CDC log - something that doesn't
|
||||
happen in per-cell TTL. This deletion event can be distinguished from user-
|
||||
initiated deletes: Whereas user-initiated deletes have `cdc_operation` set to
|
||||
3 (`row_delete`) or 4 (`partition_delete`), those generated by expiration have
|
||||
`cdc_operation` -3 (`service_row_delete`) or -4 (`service_partition_delete`).
|
||||
|
||||
Unlike per-cell TTL where a value becomes unreadable at the precise specified
|
||||
second, the per-row TTL's expiration is _eventual_ - the row will expire
|
||||
some time _after_ its requested expiration time, where this "some time" can
|
||||
be controlled by the configuration `alternator_ttl_period_in_seconds`. Until
|
||||
the row is actually deleted, it can still be read, and even written.
|
||||
Importantly, the CDC event will appear immediately after the row is finally
|
||||
deleted.
|
||||
|
||||
It's important to re-iterate that the per-cell TTL and per-row TTL features
|
||||
are separate and distinct, use a different CQL syntax, have a different
|
||||
implementation and provide different guarantees. It is possible to use
|
||||
both features in the same table, or even the same row.
|
||||
|
||||
@@ -200,6 +200,8 @@ for two cases. One is setting replication factor to 0, in which case the number
|
||||
The other is when the numeric replication factor is equal to the current number of replicas
|
||||
for a given datacanter, in which case the current rack list is preserved.
|
||||
|
||||
Altering from a numeric replication factor to a rack list is not supported yet.
|
||||
|
||||
Note that when ``ALTER`` ing keyspaces and supplying ``replication_factor``,
|
||||
auto-expansion will only *add* new datacenters for safety, it will not alter
|
||||
existing datacenters or remove any even if they are no longer in the cluster.
|
||||
@@ -422,21 +424,6 @@ Altering from a rack list to a numeric replication factor is not supported.
|
||||
|
||||
Keyspaces which use rack lists are :term:`RF-rack-valid <RF-rack-valid keyspace>` if each rack in the rack list contains at least one node (excluding :doc:`zero-token nodes </architecture/zero-token-nodes>`).
|
||||
|
||||
.. _conversion-to-rack-list-rf:
|
||||
|
||||
Conversion to rack-list replication factor
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
To migrate a keyspace from a numeric replication factor to a rack-list replication factor, provide the rack-list replication factor explicitly in ALTER KEYSPACE statement. The number of racks in the list must be equal to the numeric replication factor. The replication factor can be converted in any number of DCs at once. In a statement that converts replication factor, no replication factor updates (increase or decrease) are allowed in any DC.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE KEYSPACE Excelsior
|
||||
WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 1} AND tablets = { 'enabled': true };
|
||||
|
||||
ALTER KEYSPACE Excelsior
|
||||
WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : ['RAC1', 'RAC2', 'RAC3'], 'dc2' : ['RAC4']} AND tablets = { 'enabled': true };
|
||||
|
||||
.. _drop-keyspace-statement:
|
||||
|
||||
DROP KEYSPACE
|
||||
@@ -476,7 +463,7 @@ Creating a new table uses the ``CREATE TABLE`` statement:
|
||||
: [ ',' PRIMARY KEY '(' `primary_key` ')' ]
|
||||
: ')' [ WITH `table_options` ]
|
||||
|
||||
column_definition: `column_name` `cql_type` [ TTL ] [ STATIC ] [ PRIMARY KEY]
|
||||
column_definition: `column_name` `cql_type` [ STATIC ] [ PRIMARY KEY]
|
||||
|
||||
primary_key: `partition_key` [ ',' `clustering_columns` ]
|
||||
|
||||
@@ -572,11 +559,6 @@ A :token:`column_definition` is primarily comprised of the name of the column de
|
||||
which restricts which values are accepted for that column. Additionally, a column definition can have the following
|
||||
modifiers:
|
||||
|
||||
``TTL``
|
||||
declares the column as being the expiration-time column for the
|
||||
`per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
|
||||
feature.
|
||||
|
||||
``STATIC``
|
||||
declares the column as being a :ref:`static column <static-columns>`.
|
||||
|
||||
@@ -1177,7 +1159,6 @@ Altering an existing table uses the ``ALTER TABLE`` statement:
|
||||
: | DROP '(' `column_name` ( ',' `column_name` )* ')' [ USING TIMESTAMP `timestamp` ]
|
||||
: | ALTER `column_name` TYPE `cql_type`
|
||||
: | WITH `options`
|
||||
: | TTL (`column_name` | NULL)
|
||||
: | scylla_encryption_options: '=' '{'[`cipher_algorithm` : <hash>]','[`secret_key_strength` : <len>]','[`key_provider`: <provider>]'}'
|
||||
|
||||
For instance:
|
||||
@@ -1221,11 +1202,6 @@ The ``ALTER TABLE`` statement can:
|
||||
- Change or add any of the ``Encryption options`` above.
|
||||
- Change or add any of the :ref:`CDC options <cdc-options>` above.
|
||||
- Change or add per-partition rate limits. See :ref:`Limiting the rate of requests per partition <ddl-per-parition-rate-limit>`.
|
||||
- Enable `per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
|
||||
using the given column as the expiration-time column, or disable per-row
|
||||
TTL on this table. If per-row TTL is already enabled, to change the choice
|
||||
of expiration-time column you must first disable per-row TTL and then
|
||||
re-enable it using the chosen column.
|
||||
|
||||
.. warning:: Dropping a column assumes that the timestamps used for the value of this column are "real" timestamp in
|
||||
microseconds. Using "real" timestamps in microseconds is the default is and is **strongly** recommended, but as
|
||||
|
||||
@@ -277,9 +277,6 @@ Dropping a secondary index uses the ``DROP INDEX`` statement:
|
||||
The ``DROP INDEX`` statement is used to drop an existing secondary index. The argument of the statement is the index
|
||||
name, which may optionally specify the keyspace of the index.
|
||||
|
||||
If the index is currently being built, the ``DROP INDEX`` can still be executed. Once the ``DROP INDEX`` command is issued,
|
||||
the system stops the build process and cleans up any partially built data associated with the index.
|
||||
|
||||
.. If the index does not exists, the statement will return an error, unless ``IF EXISTS`` is used in which case the
|
||||
.. operation is a no-op.
|
||||
|
||||
|
||||
@@ -13,19 +13,6 @@ The TTL can be set when defining a Table (CREATE), or when using the INSERT and
|
||||
The expiration works at the individual column level, which provides a lot of flexibility.
|
||||
By default, the TTL value is null, which means that the data will not expire.
|
||||
|
||||
This document is about CQL's classic per-write TTL feature, where individual
|
||||
columns from the same row can expire at separate times if written at
|
||||
different times. ScyllaDB also supports an alternative TTL feature,
|
||||
`Per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_.
|
||||
In *per-row TTL* each row has an expiration time for the entire row,
|
||||
defined by the value of the expiration-time column. In per-row TTL, the
|
||||
entire row expires together regardless of how its indivial columns were
|
||||
written, and the expiration time of an entire row can be modified by modifying
|
||||
the expiration-time column. Another benefit of per-row TTL is that it
|
||||
generates a CDC event when a row expires - in contrast in per-write TTL
|
||||
(the feature described in this document) where expiration events do not
|
||||
show up in CDC.
|
||||
|
||||
.. note::
|
||||
|
||||
The expiration time is always calculated as *now() on the Coordinator + TTL* where, *now()* is the wall clock during the corresponding write operation.
|
||||
|
||||
@@ -10,11 +10,6 @@ The CDC log table reflects operations that are performed on the base table. Diff
|
||||
* row range deletions,
|
||||
* partition deletions.
|
||||
|
||||
Note that TTL expirations are not operations, and not reflected in the CDC
|
||||
log tables. If you do need CDC events when entire rows expire, consider
|
||||
using `per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
|
||||
which does generate special CDC events when rows expire.
|
||||
|
||||
The following sections describe how each of these operations are handled by the CDC log.
|
||||
|
||||
.. include:: /features/cdc/_common/cdc-updates.rst
|
||||
|
||||
@@ -10,6 +10,7 @@ Install ScyllaDB |CURRENT_VERSION|
|
||||
/getting-started/install-scylla/launch-on-azure
|
||||
/getting-started/installation-common/scylla-web-installer
|
||||
/getting-started/install-scylla/install-on-linux
|
||||
/getting-started/installation-common/install-jmx
|
||||
/getting-started/install-scylla/run-in-docker
|
||||
/getting-started/installation-common/unified-installer
|
||||
/getting-started/installation-common/air-gapped-install
|
||||
@@ -23,9 +24,9 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`Launch ScyllaDB on AWS </getting-started/install-scylla/launch-on-aws>`
|
||||
* :doc:`Launch ScyllaDB on GCP </getting-started/install-scylla/launch-on-gcp>`
|
||||
* :doc:`Launch ScyllaDB on Azure </getting-started/install-scylla/launch-on-azure>`
|
||||
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on AWS </getting-started/install-scylla/launch-on-aws>`
|
||||
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on GCP </getting-started/install-scylla/launch-on-gcp>`
|
||||
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on Azure </getting-started/install-scylla/launch-on-azure>`
|
||||
|
||||
|
||||
.. panel-box::
|
||||
@@ -34,7 +35,8 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`Install ScyllaDB with Web Installer (recommended) </getting-started/installation-common/scylla-web-installer>`
|
||||
* :doc:`Install ScyllaDB Linux Packages </getting-started/install-scylla/install-on-linux>`
|
||||
* :doc:`Install ScyllaDB |CURRENT_VERSION| Linux Packages </getting-started/install-scylla/install-on-linux>`
|
||||
* :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
|
||||
* :doc:`Install ScyllaDB Without root Privileges </getting-started/installation-common/unified-installer>`
|
||||
* :doc:`Air-gapped Server Installation </getting-started/installation-common/air-gapped-install>`
|
||||
* :doc:`ScyllaDB Developer Mode </getting-started/installation-common/dev-mod>`
|
||||
|
||||
@@ -94,6 +94,16 @@ Install ScyllaDB
|
||||
|
||||
apt-get install scylla{,-server,-kernel-conf,-node-exporter,-conf,-python3,-cqlsh}=2025.3.1-0.20250907.2bbf3cf669bb-1
|
||||
|
||||
|
||||
#. (Ubuntu only) Set Java 11.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y openjdk-11-jre-headless
|
||||
sudo update-java-alternatives --jre-headless -s java-1.11.0-openjdk-amd64
|
||||
|
||||
|
||||
.. group-tab:: Centos/RHEL
|
||||
|
||||
#. Install the EPEL repository.
|
||||
@@ -147,6 +157,14 @@ Install ScyllaDB
|
||||
|
||||
sudo yum install scylla-5.2.3
|
||||
|
||||
(Optional) Install scylla-jmx
|
||||
-------------------------------
|
||||
|
||||
scylla-jmx is an optional package and is not installed by default.
|
||||
If you need JMX server, see :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`.
|
||||
|
||||
|
||||
|
||||
.. include:: /getting-started/_common/setup-after-install.rst
|
||||
|
||||
Next Steps
|
||||
|
||||
78
docs/getting-started/installation-common/install-jmx.rst
Normal file
78
docs/getting-started/installation-common/install-jmx.rst
Normal file
@@ -0,0 +1,78 @@
|
||||
|
||||
======================================
|
||||
Install scylla-jmx Package
|
||||
======================================
|
||||
|
||||
scylla-jmx is an optional package and is not installed by default.
|
||||
If you need JMX server, you can still install it from scylla-jmx GitHub page.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
#. Download .deb package from scylla-jmx page.
|
||||
|
||||
Access to https://github.com/scylladb/scylla-jmx, select latest
|
||||
release from "releases", download a file end with ".deb".
|
||||
|
||||
#. (Optional) Transfer the downloaded package to the install node.
|
||||
|
||||
If the pc from which you downloaded the package is different from
|
||||
the node where you install scylladb, you will need to transfer
|
||||
the files to the node.
|
||||
|
||||
#. Install scylla-jmx package.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt install -y ./scylla-jmx_<version>_all.deb
|
||||
|
||||
|
||||
.. group-tab:: Centos/RHEL
|
||||
|
||||
#. Download .rpm package from scylla-jmx page.
|
||||
|
||||
Access to https://github.com/scylladb/scylla-jmx, select latest
|
||||
release from "releases", download a file end with ".rpm".
|
||||
|
||||
#. (Optional) Transfer the downloaded package to the install node.
|
||||
|
||||
If the pc from which you downloaded the package is different from
|
||||
the node where you install scylladb, you will need to transfer
|
||||
the files to the node.
|
||||
|
||||
#. Install scylla-jmx package.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo yum install -y ./scylla-jmx-<version>.noarch.rpm
|
||||
|
||||
|
||||
.. group-tab:: Install without root privileges
|
||||
|
||||
#. Download .tar.gz package from scylla-jmx page.
|
||||
|
||||
Access to https://github.com/scylladb/scylla-jmx, select latest
|
||||
release from "releases", download a file end with ".tar.gz".
|
||||
|
||||
#. (Optional) Transfer the downloaded package to the install node.
|
||||
|
||||
If the pc from which you downloaded the package is different from
|
||||
the node where you install scylladb, you will need to transfer
|
||||
the files to the node.
|
||||
|
||||
#. Install scylla-jmx package.
|
||||
|
||||
.. code:: console
|
||||
|
||||
tar xpf scylla-jmx-<version>.noarch.tar.gz
|
||||
cd scylla-jmx
|
||||
./install.sh --nonroot
|
||||
|
||||
Next Steps
|
||||
-----------
|
||||
|
||||
* :doc:`Configure ScyllaDB </getting-started/system-configuration>`
|
||||
* Manage your clusters with `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_
|
||||
* Monitor your cluster and data with `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_
|
||||
* Get familiar with ScyllaDB’s :doc:`command line reference guide </operating-scylla/nodetool>`.
|
||||
* Learn about ScyllaDB at `ScyllaDB University <https://university.scylladb.com/>`_
|
||||
@@ -49,6 +49,11 @@ Download and Install
|
||||
|
||||
./install.sh --nonroot --python3 ~/scylladb/python3/bin/python3
|
||||
|
||||
#. (Optional) Install scylla-jmx
|
||||
|
||||
scylla-jmx is an optional package and is not installed by default.
|
||||
If you need JMX server, see :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`.
|
||||
|
||||
Configure and Run ScyllaDB
|
||||
----------------------------
|
||||
|
||||
|
||||
@@ -15,10 +15,6 @@ It is not always clear under which circumstances data is deleted when using Time
|
||||
This article clarifies what may not be apparent.
|
||||
It corrects some assumptions you may have that are not exactly true.
|
||||
|
||||
This document is about CQL's :doc:`per-write TTL feature </cql/time-to-live>`,
|
||||
the `per-row TTL <https://docs.scylladb.com/stable/cql/cql-extensions.html#per-row-ttl>`_
|
||||
feature behaves differently.
|
||||
|
||||
|
||||
Facts About Expiring Data
|
||||
-------------------------
|
||||
|
||||
@@ -898,63 +898,6 @@ By default, each input sstable is filtered individually. Use ``--merge`` to filt
|
||||
|
||||
Output sstables use the latest supported sstable format (can be changed with ``--sstable-version``).
|
||||
|
||||
split
|
||||
^^^^^
|
||||
|
||||
Split SSTable(s) into multiple output SSTables based on token boundaries.
|
||||
|
||||
This operation divides SSTable(s) according to the specified split tokens, creating one output SSTable per token range.
|
||||
This is useful for redistributing data across different token ranges, such as when preparing data for different nodes or shards.
|
||||
|
||||
Tokens should be provided via the ``--split-token`` (or ``-t``) option. Multiple tokens can be specified by repeating the option.
|
||||
The tokens will be sorted automatically to ensure proper ordering.
|
||||
|
||||
For N split tokens, N+1 output SSTables will be generated:
|
||||
|
||||
* First SSTable: from minimum token to first split token
|
||||
* Middle SSTables: between consecutive split tokens
|
||||
* Last SSTable: from last split token to maximum token
|
||||
|
||||
By default, each input SSTable is split individually. Use ``--merge`` to split the combined content of all input SSTables, producing a single set of output SSTables.
|
||||
|
||||
Output SSTables use the latest supported sstable format (can be changed with ``--sstable-version``) and are written to the directory specified by ``--output-dir``.
|
||||
|
||||
**Examples:**
|
||||
|
||||
Split a single SSTable at token boundaries 100 and 500:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla sstable split --split-token 100 --split-token 500 /path/to/md-123456-big-Data.db
|
||||
|
||||
Or using the short-hand form:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla sstable split -t 100 -t 500 /path/to/md-123456-big-Data.db
|
||||
|
||||
This will create 3 output SSTables:
|
||||
|
||||
* One containing partitions with tokens < 100
|
||||
* One containing partitions with tokens >= 100 and < 500
|
||||
* One containing partitions with tokens >= 500
|
||||
|
||||
Split multiple SSTables individually:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla sstable split -t 100 -t 500 /path/to/md-123456-big-Data.db /path/to/md-123457-big-Data.db
|
||||
|
||||
This will split each input SSTable separately, creating 6 output SSTables total (3 per input).
|
||||
|
||||
Split multiple SSTables as a combined stream:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla sstable split --merge -t 100 -t 500 /path/to/md-123456-big-Data.db /path/to/md-123457-big-Data.db
|
||||
|
||||
This will merge both input SSTables first, then split the combined data, creating 3 output SSTables.
|
||||
|
||||
Examples
|
||||
--------
|
||||
Dumping the content of the SStable:
|
||||
|
||||
@@ -25,8 +25,4 @@ For Example:
|
||||
|
||||
nodetool rebuild <source-dc-name>
|
||||
|
||||
``nodetool rebuild`` command works only for vnode keyspaces. For tablet keyspaces, use ``nodetool cluster repair`` instead.
|
||||
|
||||
See :doc:`Data Distribution with Tablets </architecture/tablets/>`.
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -155,6 +155,7 @@ Add New DC
|
||||
UN 54.235.9.159 109.75 KB 256 ? 39798227-9f6f-4868-8193-08570856c09a RACK1
|
||||
UN 54.146.228.25 128.33 KB 256 ? 7a4957a1-9590-4434-9746-9c8a6f796a0c RACK1
|
||||
|
||||
.. TODO possibly provide additional information WRT how ALTER works with tablets
|
||||
|
||||
#. When all nodes are up and running ``ALTER`` the following Keyspaces in the new nodes:
|
||||
|
||||
@@ -170,68 +171,26 @@ Add New DC
|
||||
|
||||
DESCRIBE KEYSPACE mykeyspace;
|
||||
|
||||
CREATE KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3};
|
||||
CREATE KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3};
|
||||
|
||||
ALTER Command
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
ALTER KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
|
||||
ALTER KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
|
||||
ALTER KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
|
||||
ALTER KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
|
||||
ALTER KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
|
||||
ALTER KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
|
||||
|
||||
After
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DESCRIBE KEYSPACE mykeyspace;
|
||||
CREATE KEYSPACE mykeyspace WITH REPLICATION = {'class': 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
|
||||
CREATE KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
|
||||
CREATE KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
|
||||
CREATE KEYSPACE mykeyspace WITH REPLICATION = {'class’: 'NetworkTopologyStrategy', <exiting_dc>:3, <new_dc>: 3};
|
||||
CREATE KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
|
||||
CREATE KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
|
||||
|
||||
For tablet keyspaces, update the replication factor one by one:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DESCRIBE KEYSPACE mykeyspace2;
|
||||
|
||||
CREATE KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3} AND tablets = { 'enabled': true };
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 1} AND tablets = { 'enabled': true };
|
||||
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 2} AND tablets = { 'enabled': true };
|
||||
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3} AND tablets = { 'enabled': true };
|
||||
|
||||
.. note::
|
||||
If ``rf_rack_valid_keyspaces`` option is set, a tablet keyspace needs to use rack list replication factor, so that a new DC (rack) can be added. See :ref:`the conversion procedure <conversion-to-rack-list-rf>`. In this case, to add a datacenter:
|
||||
|
||||
Before
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DESCRIBE KEYSPACE mykeyspace3;
|
||||
|
||||
CREATE KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>']} AND tablets = { 'enabled': true };
|
||||
|
||||
Add all the nodes to the new datacenter and then alter the keyspace one by one:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>']} AND tablets = { 'enabled': true };
|
||||
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>']} AND tablets = { 'enabled': true };
|
||||
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
|
||||
|
||||
After
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DESCRIBE KEYSPACE mykeyspace3;
|
||||
CREATE KEYSPACE mykeyspace3 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
|
||||
|
||||
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
|
||||
|
||||
#. If any vnode keyspace was altered, run ``nodetool rebuild`` on each node in the new datacenter, specifying the existing datacenter name in the rebuild command.
|
||||
#. Run ``nodetool rebuild`` on each node in the new datacenter, specify the existing datacenter name in the rebuild command.
|
||||
|
||||
For example:
|
||||
|
||||
@@ -239,7 +198,7 @@ Add New DC
|
||||
|
||||
The rebuild ensures that the new nodes that were just added to the cluster will recognize the existing datacenters in the cluster.
|
||||
|
||||
#. If any vnode keyspace was altered, run a full cluster repair, using :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair>` on each node, or using `ScyllaDB Manager ad-hoc repair <https://manager.docs.scylladb.com/stable/repair>`_
|
||||
#. Run a full cluster repair, using :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair>` on each node, or using `ScyllaDB Manager ad-hoc repair <https://manager.docs.scylladb.com/stable/repair>`_
|
||||
|
||||
#. If you are using ScyllaDB Monitoring, update the `monitoring stack <https://monitoring.docs.scylladb.com/stable/install/monitoring_stack.html#configure-scylla-nodes-from-files>`_ to monitor it. If you are using ScyllaDB Manager, make sure you install the `Manager Agent <https://manager.docs.scylladb.com/stable/install-scylla-manager-agent.html>`_ and Manager can access the new DC.
|
||||
|
||||
|
||||
@@ -40,14 +40,12 @@ Prerequisites
|
||||
Procedure
|
||||
---------
|
||||
|
||||
#. If there are vnode keyspaces in this DC, run the ``nodetool repair -pr`` command on each node in the data-center that is going to be decommissioned. This will verify that all the data is in sync between the decommissioned data-center and the other data-centers in the cluster.
|
||||
#. Run the ``nodetool repair -pr`` command on each node in the data-center that is going to be decommissioned. This will verify that all the data is in sync between the decommissioned data-center and the other data-centers in the cluster.
|
||||
|
||||
For example:
|
||||
|
||||
If the ASIA-DC cluster is to be removed, then, run the ``nodetool repair -pr`` command on all the nodes in the ASIA-DC
|
||||
|
||||
#. If there are tablet keyspaces in this DC, run the ``nodetool cluster repair`` on an arbitrary node. The reason for running repair is to ensure that any updates stored only on the about-to-be-decommissioned replicas are propagated to the other replicas, before the replicas on the decommissioned datacenter are dropped.
|
||||
|
||||
#. ALTER every cluster KEYSPACE, so that the keyspaces will no longer replicate data to the decommissioned data-center.
|
||||
|
||||
For example:
|
||||
@@ -75,33 +73,6 @@ Procedure
|
||||
|
||||
cqlsh> ALTER KEYSPACE nba WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 0, 'EUROPE-DC' : 3};
|
||||
|
||||
For tablet keyspaces, update the replication factor one by one:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh> DESCRIBE nba2
|
||||
cqlsh> CREATE KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 2, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh> ALTER KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 1, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
|
||||
cqlsh> ALTER KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 0, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
|
||||
|
||||
.. note::
|
||||
If ``rf_rack_valid_keyspaces`` option is set, a tablet keyspace needs to use rack list replication factor, so that the DC can be removed. See :ref:`the conversion procedure <conversion-to-rack-list-rf>`. In this case, to remove a datacenter:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh> DESCRIBE nba3
|
||||
cqlsh> CREATE KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : ['RAC4', 'RAC5'], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh> ALTER KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : ['RAC4'], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
|
||||
cqlsh> ALTER KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : [], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
|
||||
|
||||
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
|
||||
|
||||
.. note::
|
||||
|
||||
If table audit is enabled, the ``audit`` keyspace is automatically created with ``NetworkTopologyStrategy``.
|
||||
|
||||
8
docs/poetry.lock
generated
8
docs/poetry.lock
generated
@@ -954,14 +954,14 @@ markdown = ">=3.4"
|
||||
|
||||
[[package]]
|
||||
name = "sphinx-multiversion-scylla"
|
||||
version = "0.3.7"
|
||||
version = "0.3.4"
|
||||
description = "Add support for multiple versions to sphinx"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "sphinx_multiversion_scylla-0.3.7-py3-none-any.whl", hash = "sha256:6205d261a77c90b7ea3105311d1d56014736a5148966133c34344512bb8c4e4f"},
|
||||
{file = "sphinx_multiversion_scylla-0.3.7.tar.gz", hash = "sha256:fc1ddd58e82cfd8810c1be6db8717a244043c04c1c632e9bd1436415d1db0d3b"},
|
||||
{file = "sphinx_multiversion_scylla-0.3.4-py3-none-any.whl", hash = "sha256:e64d49d39a8eccf06a9cb8bbe88eecb3eb2082e6b91a478b55dc7d0268d8e0b6"},
|
||||
{file = "sphinx_multiversion_scylla-0.3.4.tar.gz", hash = "sha256:8f7c94a89c794334d78ef21761a8bf455aaa7361e71037cf2ac2ca51cb47a0ba"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -1592,4 +1592,4 @@ files = [
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.10"
|
||||
content-hash = "6cca33ab39b6d8817d0502530788dc74a1d3d7e6683d0eda25884d41e38fc741"
|
||||
content-hash = "9a17caa38b3c88f3fe3d1a60fdb73a96aa12ff1e30ecb00e2f9249e7ba9f859c"
|
||||
|
||||
@@ -13,7 +13,7 @@ sphinx-scylladb-theme = "^1.8.10"
|
||||
sphinx-sitemap = "^2.6.0"
|
||||
sphinx-autobuild = "^2024.4.19"
|
||||
Sphinx = "^8.0.0"
|
||||
sphinx-multiversion-scylla = "^0.3.7"
|
||||
sphinx-multiversion-scylla = "^0.3.4"
|
||||
sphinxcontrib-datatemplates = "^0.9.2"
|
||||
sphinx-scylladb-markdown = "^0.1.4"
|
||||
sphinx_collapse ="^0.1.3"
|
||||
|
||||
@@ -11,13 +11,9 @@ ScyllaDB. This means that:
|
||||
|
||||
* You should follow the upgrade policy:
|
||||
|
||||
* Starting with version **2025.4**, upgrades can **skip minor versions** if:
|
||||
|
||||
* They remain within the same major version (for example, upgrading
|
||||
directly from *2025.1 → 2025.4* is supported).
|
||||
* You upgrade to the next major version (for example, upgrading
|
||||
directly from *2025.3 → 2026.1* is supported).
|
||||
|
||||
* Starting with version **2025.4**, upgrades can skip minor versions as long
|
||||
as they remain within the same major version (for example, upgrading directly
|
||||
from 2025.1 → 2025.4 is supported).
|
||||
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
|
||||
each successive X.Y version must be installed in order, **without skipping
|
||||
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3
|
||||
|
||||
@@ -4,7 +4,8 @@ Upgrade ScyllaDB
|
||||
|
||||
.. toctree::
|
||||
|
||||
ScyllaDB 2025.x to ScyllaDB 2026.1 <upgrade-guide-from-2025.x-to-2026.1/index>
|
||||
ScyllaDB 2025.x to ScyllaDB 2025.4 <upgrade-guide-from-2025.x-to-2025.4/index>
|
||||
ScyllaDB 2025.4 Patch Upgrades <upgrade-guide-from-2025.4.x-to-2025.4.y>
|
||||
ScyllaDB Image <ami-upgrade>
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,266 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 2025.4.x
|
||||
.. |NEW_VERSION| replace:: 2025.4.y
|
||||
|
||||
==========================================================================
|
||||
Upgrade - |SCYLLA_NAME| |SRC_VERSION| to |NEW_VERSION| (Patch Upgrades)
|
||||
==========================================================================
|
||||
|
||||
This document describes a step-by-step procedure for upgrading from
|
||||
|SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION| (where "y" is
|
||||
the latest available version), and rolling back to version |SRC_VERSION|
|
||||
if necessary.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL),
|
||||
CentOS, Debian, and Ubuntu.
|
||||
See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
for information about supported versions.
|
||||
|
||||
It also applies to the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
.. note::
|
||||
Apply the following procedure **serially** on each node. Do not move to the next
|
||||
node before validating that the node is up and running the new version.
|
||||
|
||||
A ScyllaDB upgrade is a rolling procedure that does **not** require a full cluster
|
||||
shutdown. For each of the nodes in the cluster, you will:
|
||||
|
||||
#. Drain the node and back up the data.
|
||||
#. Backup configuration file.
|
||||
#. Stop ScyllaDB.
|
||||
#. Download and install new ScyllaDB packages.
|
||||
#. Start ScyllaDB.
|
||||
#. Validate that the upgrade was successful.
|
||||
|
||||
**Before** upgrading, check which version you are running now using
|
||||
``scylla --version``. Note the current version in case you want to roll back
|
||||
the upgrade.
|
||||
|
||||
**During** the rolling upgrade it is highly recommended:
|
||||
|
||||
* Not to use new |NEW_VERSION| features.
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add
|
||||
or remove nodes. See
|
||||
`sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
|
||||
ScyllaDB Manager's scheduled or running repairs.
|
||||
* Not to apply schema changes.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
Back up the data
|
||||
------------------------------
|
||||
|
||||
Back up all the data to an external device. We recommend using
|
||||
`ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
|
||||
to create backups.
|
||||
|
||||
Alternatively, you can use the ``nodetool snapshot`` command.
|
||||
For **each** node in the cluster, run the following:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
nodetool snapshot
|
||||
|
||||
Take note of the directory name that nodetool gives you, and copy all
|
||||
the directories with this name under ``/var/lib/scylla`` to a backup device.
|
||||
|
||||
When the upgrade is completed on all nodes, remove the snapshot with the
|
||||
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of
|
||||
space.
|
||||
|
||||
Back up the configuration file
|
||||
------------------------------
|
||||
|
||||
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
|
||||
in case you need to roll back the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
|
||||
|
||||
Gracefully stop the node
|
||||
------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
You don’t need to update the ScyllaDB DEB or RPM repo when you upgrade to
|
||||
a patch release.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
To install a patch version on Debian or Ubuntu, run:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
To install a patch version on RHEL or CentOS, run:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum update scylla\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you're using the ScyllaDB official image (recommended), see
|
||||
the **Debian/Ubuntu** tab for upgrade instructions.
|
||||
|
||||
If you're using your own image and have installed ScyllaDB packages for
|
||||
Ubuntu or Debian, you need to apply an extended upgrade procedure:
|
||||
|
||||
#. Install the new ScyllaDB version with the additional
|
||||
``scylla-machine-image`` package:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
sudo apt-get dist-upgrade scylla-machine-image
|
||||
#. Run ``scylla_setup`` without ``running io_setup``.
|
||||
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service start scylla-server
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes,
|
||||
including the one you just upgraded, are in UN status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
|
||||
to check the ScyllaDB version.
|
||||
#. Use ``journalctl _COMM=scylla`` to check there are no new errors in the log.
|
||||
#. Check again after 2 minutes to validate that no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in
|
||||
the cluster.
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
The following procedure describes a rollback from ScyllaDB release
|
||||
|NEW_VERSION| to |SRC_VERSION|. Apply this procedure if an upgrade from
|
||||
|SRC_VERSION| to |NEW_VERSION| failed before completing on all nodes.
|
||||
|
||||
* Use this procedure only on nodes you upgraded to |NEW_VERSION|.
|
||||
* Execute the following commands one node at a time, moving to the next node only
|
||||
after the rollback procedure is completed successfully.
|
||||
|
||||
ScyllaDB rollback is a rolling procedure that does **not** require a full
|
||||
cluster shutdown. For each of the nodes to roll back to |SRC_VERSION|, you will:
|
||||
|
||||
#. Drain the node and stop ScyllaDB.
|
||||
#. Downgrade to the previous release.
|
||||
#. Restore the configuration file.
|
||||
#. Restart ScyllaDB.
|
||||
#. Validate the rollback success.
|
||||
|
||||
Rollback Steps
|
||||
==============
|
||||
|
||||
Gracefully shutdown ScyllaDB
|
||||
-----------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service stop scylla-server
|
||||
|
||||
Downgrade to the previous release
|
||||
----------------------------------
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
To downgrade to |SRC_VERSION| on Debian or Ubuntu, run:
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo apt-get install scylla=|SRC_VERSION|\* scylla-server=|SRC_VERSION|\* scylla-tools=|SRC_VERSION|\* scylla-tools-core=|SRC_VERSION|\* scylla-kernel-conf=|SRC_VERSION|\* scylla-conf=|SRC_VERSION|\*
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
To downgrade to |SRC_VERSION| on RHEL or CentOS, run:
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo yum downgrade scylla\*-|SRC_VERSION|-\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see
|
||||
the **Debian/Ubuntu** tab for upgrade instructions.
|
||||
|
||||
If you’re using your own image and have installed ScyllaDB packages for
|
||||
Ubuntu or Debian, you need to additionally downgrade
|
||||
the ``scylla-machine-image`` package.
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo apt-get install scylla=|SRC_VERSION|\* scylla-server=|SRC_VERSION|\* scylla-tools=|SRC_VERSION|\* scylla-tools-core=|SRC_VERSION|\* scylla-kernel-conf=|SRC_VERSION|\* scylla-conf=|SRC_VERSION|\*
|
||||
sudo apt-get install scylla-machine-image=|SRC_VERSION|\*
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
|
||||
Restore the configuration file
|
||||
------------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/scylla/scylla.yaml
|
||||
sudo cp -a /etc/scylla/scylla.yaml.backup /etc/scylla/scylla.yaml
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
Check upgrade instruction above for validation. Once you are sure the node
|
||||
rollback is successful, move to the next node in the cluster.
|
||||
@@ -0,0 +1,13 @@
|
||||
==========================================================
|
||||
Upgrade - ScyllaDB 2025.x to ScyllaDB 2025.4
|
||||
==========================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-2025.x-to-2025.4>
|
||||
Metrics Update <metric-update-2025.x-to-2025.4>
|
||||
|
||||
* :doc:`Upgrade from ScyllaDB 2025.x to ScyllaDB 2025.4 <upgrade-guide-from-2025.x-to-2025.4>`
|
||||
* :doc:`Metrics Update Between 2025.x and 2025.4 <metric-update-2025.x-to-2025.4>`
|
||||
@@ -0,0 +1,68 @@
|
||||
.. |SRC_VERSION| replace:: 2025.x
|
||||
.. |NEW_VERSION| replace:: 2025.4
|
||||
.. |PRECEDING_VERSION| replace:: 2025.3
|
||||
|
||||
================================================================
|
||||
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
|
||||
New Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_database_total_view_updates_due_to_replica_count_mismatch
|
||||
- The total number of view updates for which there were more view replicas
|
||||
than base replicas and we had to generate an extra view update because
|
||||
the additional view replica wouldn't get paired with any base replica.
|
||||
It should only increase during the Replication Factor (RF) change. It
|
||||
should stop increasing shortly after finishing the RF change.
|
||||
* - scylla_database_total_writes_rejected_due_to_out_of_space_prevention
|
||||
- Counts write operations that were rejected due to disabled user tables
|
||||
writes.
|
||||
* - scylla_index_query_latencies
|
||||
- Index query latencies.
|
||||
* - scylla_reactor_aio_retries
|
||||
- The total number of IOCB-s re-submitted via thread-pool.
|
||||
* - scylla_reactor_io_threaded_fallbacks
|
||||
- The total number of io-threaded-fallbacks operations.
|
||||
* - scylla_repair_inc_sst_read_bytes
|
||||
- The total number of bytes read from SStables for incremental repair
|
||||
on this shard.
|
||||
* - scylla_repair_inc_sst_skipped_bytes
|
||||
- The total number of bytes skipped from SStables for incremental repair
|
||||
on this shard.
|
||||
* - scylla_repair_tablet_time_ms
|
||||
- The time spent on tablet repair on this shard (in milliseconds).
|
||||
* - scylla_s3_downloads_blocked_on_memory
|
||||
- Counts the number of times the S3 client downloads were delayed due to
|
||||
insufficient memory availability.
|
||||
* - scylla_s3_memory_usage
|
||||
- The total number of bytes consumed by the S3 client.
|
||||
* - scylla_s3_total_read_prefetch_bytes
|
||||
- The total number of bytes requested from object.
|
||||
* - scylla_storage_proxy_replica_fenced_out_requests
|
||||
- The number of requests that resulted in a stale_topology_exception.
|
||||
* - scylla_vector_store_dns_refreshes
|
||||
- The number of DNS refreshes.
|
||||
|
||||
New and Updated Metrics in Previous 2025.x Releases
|
||||
-------------------------------------------------------
|
||||
|
||||
* `Metrics Update Between 2025.2 and 2025.3 <https://docs.scylladb.com/manual/branch-2025.3/upgrade/upgrade-guides/upgrade-guide-from-2025.2-to-2025.3/metric-update-2025.2-to-2025.3.html>`_
|
||||
* `Metrics Update Between 2025.1 and 2025.2 <https://docs.scylladb.com/manual/branch-2025.2/upgrade/upgrade-guides/upgrade-guide-from-2025.1-to-2025.2/metric-update-2025.1-to-2025.2.html>`_
|
||||
|
||||
|
||||
@@ -0,0 +1,374 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 2025.x
|
||||
.. |NEW_VERSION| replace:: 2025.4
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2025.x to 2025.4
|
||||
.. _SCYLLA_METRICS: ../metric-update-2025.x-to-2025.4
|
||||
|
||||
=======================================================================================
|
||||
Upgrade from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
|
||||
=======================================================================================
|
||||
|
||||
This document describes a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
|
||||
to |SCYLLA_NAME| |NEW_VERSION| and rollback to version |SRC_VERSION| if necessary.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS, Debian,
|
||||
and Ubuntu. See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
for information about supported versions.
|
||||
|
||||
It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
Before You Upgrade ScyllaDB
|
||||
==============================
|
||||
|
||||
**Upgrade Your Driver**
|
||||
|
||||
If you're using a `ScyllaDB driver <https://docs.scylladb.com/stable/drivers/index.html>`_,
|
||||
upgrade the driver before upgrading ScyllaDB. The latest two versions of each driver
|
||||
are supported. See `Driver Support <https://docs.scylladb.com/stable/versioning/driver-support.html>`_.
|
||||
|
||||
**Upgrade ScyllaDB Monitoring Stack**
|
||||
|
||||
If you're using the ScyllaDB Monitoring Stack, verify that your Monitoring Stack
|
||||
version supports the ScyllaDB version to which you want to upgrade. See
|
||||
`ScyllaDB Monitoring Stack Support Matrix <https://monitoring.docs.scylladb.com/stable/reference/matrix.html>`_.
|
||||
|
||||
We recommend upgrading the Monitoring Stack to the latest version.
|
||||
|
||||
**Check Feature Updates**
|
||||
|
||||
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
|
||||
at the `ScyllaDB Community Forum <https://forum.scylladb.com/c/scylladb-release-notes/>`_.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
A ScyllaDB upgrade is a rolling procedure that does **not** require full cluster shutdown.
|
||||
For each of the nodes in the cluster, serially (i.e., one node at a time), you will:
|
||||
|
||||
* Check that the cluster's schema is synchronized
|
||||
* Drain the node and backup the data
|
||||
* Backup the configuration file
|
||||
* Stop ScyllaDB
|
||||
* Download and install new ScyllaDB packages
|
||||
* Start ScyllaDB
|
||||
* Validate that the upgrade was successful
|
||||
|
||||
.. caution::
|
||||
|
||||
Apply the procedure **serially** on each node. Do not move to the next node before
|
||||
validating that the node you upgraded is up and running the new version.
|
||||
|
||||
**During** the rolling upgrade, it is highly recommended:
|
||||
|
||||
* Not to use the new |NEW_VERSION| features.
|
||||
* Not to run administration functions, such as repairs, refresh, rebuild, or add
|
||||
or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
|
||||
ScyllaDB Manager's scheduled or running repairs.
|
||||
* Not to apply schema changes.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
Check the cluster schema
|
||||
-------------------------
|
||||
|
||||
Make sure that all nodes have the schema synchronized before the upgrade. The upgrade
|
||||
procedure will fail if there is a schema disagreement between nodes.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool describecluster
|
||||
|
||||
Backup the data
|
||||
-----------------------------------
|
||||
|
||||
Before any major procedure, like an upgrade, it is recommended to backup all the data
|
||||
to an external device.
|
||||
We recommend using `ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
|
||||
to create backups.
|
||||
|
||||
Alternatively, you can use the ``nodetool snapshot`` command.
|
||||
For **each** node in the cluster, run the following:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
nodetool snapshot
|
||||
|
||||
Take note of the directory name that nodetool gives you, and copy all the directories
|
||||
having that name under ``/var/lib/scylla`` to an external backup device.
|
||||
|
||||
When the upgrade is completed on all nodes, remove the snapshot with the
|
||||
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of space.
|
||||
|
||||
Backup the configuration file
|
||||
------------------------------
|
||||
|
||||
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
|
||||
in case you need to rollback the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
|
||||
|
||||
|
||||
Gracefully stop the node
|
||||
------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
Before upgrading, check what version you are running now using ``scylla --version``.
|
||||
You should take note of the current version in case you want to |ROLLBACK|_ the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
#. Update the ScyllaDB deb repo to |NEW_VERSION|.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.4.list
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
#. Update the ScyllaDB rpm repo to |NEW_VERSION|.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.4.repo
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum update scylla\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
|
||||
tab for upgrade instructions.
|
||||
|
||||
If you’re using your own image and installed ScyllaDB packages for Ubuntu or Debian,
|
||||
you need to apply an extended upgrade procedure:
|
||||
|
||||
#. Update the ScyllaDB deb repo (see the **Debian/Ubuntu** tab).
|
||||
#. Install the new ScyllaDB version with the additional ``scylla-machine-image`` package:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
sudo apt-get dist-upgrade scylla-machine-image
|
||||
|
||||
#. Run ``scylla_setup`` without ``running io_setup``.
|
||||
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
|
||||
|
||||
|
||||
If you need JMX server, see
|
||||
:doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
|
||||
and get new version.
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including
|
||||
the one you just upgraded, are in ``UN`` status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
|
||||
to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
|
||||
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to
|
||||
validate there are no new errors in the log.
|
||||
#. Check again after two minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
.. warning::
|
||||
|
||||
The rollback procedure can be applied **only** if some nodes have not been
|
||||
upgraded to |NEW_VERSION| yet. As soon as the last node in the rolling upgrade
|
||||
procedure is started with |NEW_VERSION|, rollback becomes impossible. At that
|
||||
point, the only way to restore a cluster to |SRC_VERSION| is by restoring it
|
||||
from backup.
|
||||
|
||||
The following procedure describes a rollback from |SCYLLA_NAME| |NEW_VERSION| to
|
||||
|SRC_VERSION|. Apply this procedure if an upgrade from |SRC_VERSION| to
|
||||
|NEW_VERSION| fails before completing on all nodes.
|
||||
|
||||
* Use this procedure only on the nodes you upgraded to |NEW_VERSION|.
|
||||
* Execute the following commands one node at a time, moving to the next node
|
||||
only after the rollback procedure is completed successfully.
|
||||
|
||||
ScyllaDB rollback is a rolling procedure that does **not** require full cluster shutdown.
|
||||
For each of the nodes you rollback to |SRC_VERSION|, serially (i.e., one node
|
||||
at a time), you will:
|
||||
|
||||
* Drain the node and stop ScyllaDB
|
||||
* Retrieve the old ScyllaDB packages
|
||||
* Restore the configuration file
|
||||
* Reload systemd configuration
|
||||
* Restart ScyllaDB
|
||||
* Validate the rollback success
|
||||
|
||||
Apply the procedure **serially** on each node. Do not move to the next node
|
||||
before validating that the rollback was successful and the node is up and
|
||||
running the old version.
|
||||
|
||||
Rollback Steps
|
||||
==============
|
||||
|
||||
Drain and gracefully stop the node
|
||||
----------------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-server stop
|
||||
|
||||
Restore and install the old release
|
||||
------------------------------------
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp ~/scylla.list-backup /etc/apt/sources.list.d/scylla.list
|
||||
sudo chown root.root /etc/apt/sources.list.d/scylla.list
|
||||
sudo chmod 644 /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/yum.repos.d/scylla.repo
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade procedure.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo
|
||||
sudo chown root.root /etc/yum.repos.d/scylla.repo
|
||||
sudo chmod 644 /etc/yum.repos.d/scylla.repo
|
||||
|
||||
#. Install:
|
||||
|
||||
.. code:: console
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum remove scylla\*
|
||||
sudo yum install scylla
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
|
||||
tab for upgrade instructions.
|
||||
|
||||
If you’re using your own image and installed ScyllaDB packages for Ubuntu or Debian,
|
||||
you need to additionally restore the ``scylla-machine-image`` package.
|
||||
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade
|
||||
(see the **Debian/Ubuntu** tab).
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla
|
||||
sudo apt-get install scylla-machine-image
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
Restore the configuration file
|
||||
------------------------------
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/scylla/scylla.yaml
|
||||
sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml
|
||||
|
||||
Reload systemd configuration
|
||||
----------------------------
|
||||
|
||||
You must reload the unit file if the systemd unit file is changed.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
Check the upgrade instructions above for validation. Once you are sure the node
|
||||
rollback is successful, move to the next node in the cluster.
|
||||
@@ -1,13 +0,0 @@
|
||||
==========================================================
|
||||
Upgrade - ScyllaDB 2025.x to ScyllaDB 2026.1
|
||||
==========================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-2025.x-to-2026.1>
|
||||
Metrics Update <metric-update-2025.x-to-2026.1>
|
||||
|
||||
* :doc:`Upgrade from ScyllaDB 2025.x to ScyllaDB 2026.1 <upgrade-guide-from-2025.x-to-2026.1>`
|
||||
* :doc:`Metrics Update Between 2025.x and 2026.1 <metric-update-2025.x-to-2026.1>`
|
||||
@@ -1,82 +0,0 @@
|
||||
.. |SRC_VERSION| replace:: 2025.x
|
||||
.. |NEW_VERSION| replace:: 2026.1
|
||||
.. |PRECEDING_VERSION| replace:: 2025.4
|
||||
|
||||
================================================================
|
||||
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
|
||||
New Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_alternator_operation_size_kb
|
||||
- Histogram of item sizes involved in a request.
|
||||
* - scylla_column_family_total_disk_space_before_compression
|
||||
- Hypothetical total disk space used if data files weren't compressed
|
||||
* - scylla_group_name_auto_repair_enabled_nr
|
||||
- Number of tablets with auto repair enabled.
|
||||
* - scylla_group_name_auto_repair_needs_repair_nr
|
||||
- Number of tablets with auto repair enabled that currently need repair.
|
||||
* - scylla_lsa_compact_time_ms
|
||||
- Total time spent on segment compaction that was not accounted under ``reclaim_time_ms``.
|
||||
* - scylla_lsa_evict_time_ms
|
||||
- Total time spent on evicting objects that was not accounted under ``reclaim_time_ms``,
|
||||
* - scylla_lsa_reclaim_time_ms
|
||||
- Total time spent in reclaiming LSA memory back to std allocator.
|
||||
* - scylla_object_storage_memory_usage
|
||||
- Total number of bytes consumed by the object storage client.
|
||||
* - scylla_tablet_ops_failed
|
||||
- Number of failed tablet auto repair attempts.
|
||||
* - scylla_tablet_ops_succeeded
|
||||
- Number of successful tablet auto repair attempts.
|
||||
|
||||
Renamed Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are renamed in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric Name in |PRECEDING_VERSION|
|
||||
- Metric Name in |NEW_VERSION|
|
||||
* - scylla_s3_memory_usage
|
||||
- scylla_object_storage_memory_usage
|
||||
|
||||
Removed Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are removed in ScyllaDB |NEW_VERSION|.
|
||||
|
||||
* scylla_redis_current_connections
|
||||
* scylla_redis_op_latency
|
||||
* scylla_redis_operation
|
||||
* scylla_redis_operation
|
||||
* scylla_redis_requests_latency
|
||||
* scylla_redis_requests_served
|
||||
* scylla_redis_requests_serving
|
||||
|
||||
New and Updated Metrics in Previous Releases
|
||||
-------------------------------------------------------
|
||||
|
||||
* `Metrics Update Between 2025.3 and 2025.4 <https://docs.scylladb.com/manual/branch-2025.4/upgrade/upgrade-guides/upgrade-guide-from-2025.x-to-2025.4/metric-update-2025.x-to-2025.4.html>`_
|
||||
* `Metrics Update Between 2025.2 and 2025.3 <https://docs.scylladb.com/manual/branch-2025.3/upgrade/upgrade-guides/upgrade-guide-from-2025.2-to-2025.3/metric-update-2025.2-to-2025.3.html>`_
|
||||
* `Metrics Update Between 2025.1 and 2025.2 <https://docs.scylladb.com/manual/branch-2025.2/upgrade/upgrade-guides/upgrade-guide-from-2025.1-to-2025.2/metric-update-2025.1-to-2025.2.html>`_
|
||||
|
||||
|
||||
@@ -1,371 +0,0 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 2025.x
|
||||
.. |NEW_VERSION| replace:: 2026.1
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2025.x to 2026.1
|
||||
.. _SCYLLA_METRICS: ../metric-update-2025.x-to-2026.1
|
||||
|
||||
=======================================================================================
|
||||
Upgrade from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
|
||||
=======================================================================================
|
||||
|
||||
This document describes a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
|
||||
to |SCYLLA_NAME| |NEW_VERSION| and rollback to version |SRC_VERSION| if necessary.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS, Debian,
|
||||
and Ubuntu. See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
for information about supported versions. It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
See :doc:`About Upgrade </upgrade/about-upgrade/>` for the ScyllaDB upgrade policy.
|
||||
|
||||
Before You Upgrade ScyllaDB
|
||||
==============================
|
||||
|
||||
**Upgrade Your Driver**
|
||||
|
||||
If you're using a `ScyllaDB driver <https://docs.scylladb.com/stable/drivers/index.html>`_,
|
||||
upgrade the driver before upgrading ScyllaDB. The latest two versions of each driver
|
||||
are supported. See `Driver Support <https://docs.scylladb.com/stable/versioning/driver-support.html>`_.
|
||||
|
||||
**Upgrade ScyllaDB Monitoring Stack**
|
||||
|
||||
If you're using the ScyllaDB Monitoring Stack, verify that your Monitoring Stack
|
||||
version supports the ScyllaDB version to which you want to upgrade. See
|
||||
`ScyllaDB Monitoring Stack Support Matrix <https://monitoring.docs.scylladb.com/stable/reference/matrix.html>`_.
|
||||
|
||||
We recommend upgrading the Monitoring Stack to the latest version.
|
||||
|
||||
**Check Feature Updates**
|
||||
|
||||
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
|
||||
at the `ScyllaDB Community Forum <https://forum.scylladb.com/c/scylladb-release-notes/>`_.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
A ScyllaDB upgrade is a rolling procedure that does **not** require full cluster shutdown.
|
||||
For each of the nodes in the cluster, serially (i.e., one node at a time), you will:
|
||||
|
||||
* Check that the cluster's schema is synchronized
|
||||
* Drain the node and backup the data
|
||||
* Backup the configuration file
|
||||
* Stop ScyllaDB
|
||||
* Download and install new ScyllaDB packages
|
||||
* Start ScyllaDB
|
||||
* Validate that the upgrade was successful
|
||||
|
||||
.. caution::
|
||||
|
||||
Apply the procedure **serially** on each node. Do not move to the next node before
|
||||
validating that the node you upgraded is up and running the new version.
|
||||
|
||||
**During** the rolling upgrade, it is highly recommended:
|
||||
|
||||
* Not to use the new |NEW_VERSION| features.
|
||||
* Not to run administration functions, such as repairs, refresh, rebuild, or add
|
||||
or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
|
||||
ScyllaDB Manager's scheduled or running repairs.
|
||||
* Not to apply schema changes.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
Check the cluster schema
|
||||
-------------------------
|
||||
|
||||
Make sure that all nodes have the schema synchronized before the upgrade. The upgrade
|
||||
procedure will fail if there is a schema disagreement between nodes.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool describecluster
|
||||
|
||||
Backup the data
|
||||
-----------------------------------
|
||||
|
||||
Before any major procedure, like an upgrade, it is recommended to backup all the data
|
||||
to an external device.
|
||||
We recommend using `ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
|
||||
to create backups.
|
||||
|
||||
Alternatively, you can use the ``nodetool snapshot`` command.
|
||||
For **each** node in the cluster, run the following:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
nodetool snapshot
|
||||
|
||||
Take note of the directory name that nodetool gives you, and copy all the directories
|
||||
having that name under ``/var/lib/scylla`` to an external backup device.
|
||||
|
||||
When the upgrade is completed on all nodes, remove the snapshot with the
|
||||
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of space.
|
||||
|
||||
Backup the configuration file
|
||||
------------------------------
|
||||
|
||||
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
|
||||
in case you need to rollback the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
|
||||
|
||||
|
||||
Gracefully stop the node
|
||||
------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
Before upgrading, check what version you are running now using ``scylla --version``.
|
||||
You should take note of the current version in case you want to |ROLLBACK|_ the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
#. Update the ScyllaDB deb repo to |NEW_VERSION|.
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/|UBUNTU_SCYLLADB_LIST|
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
#. Update the ScyllaDB rpm repo to |NEW_VERSION|.
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/|CENTOS_SCYLLADB_REPO|
|
||||
|
||||
#. Install the new ScyllaDB version:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum update scylla\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
|
||||
tab for upgrade instructions.
|
||||
|
||||
If you’re using your own image and installed ScyllaDB packages for Ubuntu or Debian,
|
||||
you need to apply an extended upgrade procedure:
|
||||
|
||||
#. Update the ScyllaDB deb repo (see the **Debian/Ubuntu** tab).
|
||||
#. Install the new ScyllaDB version with the additional ``scylla-machine-image`` package:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
sudo apt-get dist-upgrade scylla-machine-image
|
||||
|
||||
#. Run ``scylla_setup`` without ``running io_setup``.
|
||||
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including
|
||||
the one you just upgraded, are in ``UN`` status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
|
||||
to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
|
||||
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to
|
||||
validate there are no new errors in the log.
|
||||
#. Check again after two minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
.. warning::
|
||||
|
||||
The rollback procedure can be applied **only** if some nodes have not been
|
||||
upgraded to |NEW_VERSION| yet. As soon as the last node in the rolling upgrade
|
||||
procedure is started with |NEW_VERSION|, rollback becomes impossible. At that
|
||||
point, the only way to restore a cluster to |SRC_VERSION| is by restoring it
|
||||
from backup.
|
||||
|
||||
The following procedure describes a rollback from |SCYLLA_NAME| |NEW_VERSION| to
|
||||
|SRC_VERSION|. Apply this procedure if an upgrade from |SRC_VERSION| to
|
||||
|NEW_VERSION| fails before completing on all nodes.
|
||||
|
||||
* Use this procedure only on the nodes you upgraded to |NEW_VERSION|.
|
||||
* Execute the following commands one node at a time, moving to the next node
|
||||
only after the rollback procedure is completed successfully.
|
||||
|
||||
ScyllaDB rollback is a rolling procedure that does **not** require full cluster shutdown.
|
||||
For each of the nodes you rollback to |SRC_VERSION|, serially (i.e., one node
|
||||
at a time), you will:
|
||||
|
||||
* Drain the node and stop ScyllaDB
|
||||
* Retrieve the old ScyllaDB packages
|
||||
* Restore the configuration file
|
||||
* Reload systemd configuration
|
||||
* Restart ScyllaDB
|
||||
* Validate the rollback success
|
||||
|
||||
Apply the procedure **serially** on each node. Do not move to the next node
|
||||
before validating that the rollback was successful and the node is up and
|
||||
running the old version.
|
||||
|
||||
Rollback Steps
|
||||
==============
|
||||
|
||||
Drain and gracefully stop the node
|
||||
----------------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-server stop
|
||||
|
||||
Restore and install the old release
|
||||
------------------------------------
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp ~/scylla.list-backup /etc/apt/sources.list.d/scylla.list
|
||||
sudo chown root.root /etc/apt/sources.list.d/scylla.list
|
||||
sudo chmod 644 /etc/apt/sources.list.d/scylla.list
|
||||
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
#. Remove the old repo file.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/yum.repos.d/scylla.repo
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade procedure.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo
|
||||
sudo chown root.root /etc/yum.repos.d/scylla.repo
|
||||
sudo chmod 644 /etc/yum.repos.d/scylla.repo
|
||||
|
||||
#. Install:
|
||||
|
||||
.. code:: console
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum remove scylla\*
|
||||
sudo yum install scylla
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
|
||||
tab for upgrade instructions.
|
||||
|
||||
If you’re using your own image and installed ScyllaDB packages for Ubuntu or Debian,
|
||||
you need to additionally restore the ``scylla-machine-image`` package.
|
||||
|
||||
|
||||
#. Restore the |SRC_VERSION| packages backed up during the upgrade
|
||||
(see the **Debian/Ubuntu** tab).
|
||||
#. Install:
|
||||
|
||||
.. code-block::
|
||||
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\* -y
|
||||
sudo apt-get install scylla
|
||||
sudo apt-get install scylla-machine-image
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
Restore the configuration file
|
||||
------------------------------
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/scylla/scylla.yaml
|
||||
sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml
|
||||
|
||||
Reload systemd configuration
|
||||
----------------------------
|
||||
|
||||
You must reload the unit file if the systemd unit file is changed.
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
Check the upgrade instructions above for validation. Once you are sure the node
|
||||
rollback is successful, move to the next node in the cluster.
|
||||
@@ -187,6 +187,39 @@ std::set<sstring> feature_service::to_feature_set(sstring features_string) {
|
||||
return features;
|
||||
}
|
||||
|
||||
class persistent_feature_enabler : public i_endpoint_state_change_subscriber {
|
||||
gossiper& _g;
|
||||
feature_service& _feat;
|
||||
db::system_keyspace& _sys_ks;
|
||||
service::storage_service& _ss;
|
||||
|
||||
public:
|
||||
persistent_feature_enabler(gossiper& g, feature_service& f, db::system_keyspace& s, service::storage_service& ss)
|
||||
: _g(g)
|
||||
, _feat(f)
|
||||
, _sys_ks(s)
|
||||
, _ss(ss)
|
||||
{
|
||||
}
|
||||
future<> on_join(inet_address ep, locator::host_id id, endpoint_state_ptr state, gms::permit_id) override {
|
||||
return enable_features();
|
||||
}
|
||||
future<> on_change(inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) override {
|
||||
if (states.contains(application_state::SUPPORTED_FEATURES)) {
|
||||
return enable_features();
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> enable_features();
|
||||
};
|
||||
|
||||
future<> feature_service::enable_features_on_join(gossiper& g, db::system_keyspace& sys_ks, service::storage_service& ss) {
|
||||
auto enabler = make_shared<persistent_feature_enabler>(g, *this, sys_ks, ss);
|
||||
g.register_(enabler);
|
||||
return enabler->enable_features();
|
||||
}
|
||||
|
||||
future<> feature_service::on_system_tables_loaded(db::system_keyspace& sys_ks) {
|
||||
return enable_features_on_startup(sys_ks);
|
||||
}
|
||||
@@ -266,6 +299,31 @@ void feature_service::check_features(const std::set<sstring>& enabled_features,
|
||||
}
|
||||
}
|
||||
|
||||
future<> persistent_feature_enabler::enable_features() {
|
||||
if (_ss.raft_topology_change_enabled()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto loaded_peer_features = co_await _sys_ks.load_peer_features();
|
||||
auto&& features = _g.get_supported_features(loaded_peer_features, gossiper::ignore_features_of_local_node::no);
|
||||
|
||||
// Persist enabled feature in the `system.scylla_local` table under the "enabled_features" key.
|
||||
// The key itself is maintained as an `unordered_set<string>` and serialized via `to_string`
|
||||
// function to preserve readability.
|
||||
std::set<sstring> feats_set = co_await _sys_ks.load_local_enabled_features();
|
||||
for (feature& f : _feat.registered_features() | std::views::values) {
|
||||
if (!f && features.contains(f.name())) {
|
||||
feats_set.emplace(f.name());
|
||||
}
|
||||
}
|
||||
co_await _sys_ks.save_local_enabled_features(std::move(feats_set), true);
|
||||
|
||||
co_await _feat.container().invoke_on_all([&features] (feature_service& fs) -> future<> {
|
||||
auto features_v = features | std::ranges::to<std::set<std::string_view>>();
|
||||
co_await fs.enable(std::move(features_v));
|
||||
});
|
||||
}
|
||||
|
||||
future<> feature_service::enable(std::set<std::string_view> list) {
|
||||
// `gms::feature::enable` should be run within a seastar thread context
|
||||
return seastar::async([this, list = std::move(list)] {
|
||||
|
||||
@@ -81,7 +81,6 @@ public:
|
||||
gms::feature user_defined_functions { *this, "UDF"sv };
|
||||
gms::feature alternator_streams { *this, "ALTERNATOR_STREAMS"sv };
|
||||
gms::feature alternator_ttl { *this, "ALTERNATOR_TTL"sv };
|
||||
gms::feature cql_row_ttl { *this, "CQL_ROW_TTL"sv };
|
||||
gms::feature range_scan_data_variant { *this, "RANGE_SCAN_DATA_VARIANT"sv };
|
||||
gms::feature cdc_generations_v2 { *this, "CDC_GENERATIONS_V2"sv };
|
||||
gms::feature user_defined_aggregates { *this, "UDA"sv };
|
||||
@@ -112,7 +111,6 @@ public:
|
||||
gms::feature large_collection_detection { *this, "LARGE_COLLECTION_DETECTION"sv };
|
||||
gms::feature range_tombstone_and_dead_rows_detection { *this, "RANGE_TOMBSTONE_AND_DEAD_ROWS_DETECTION"sv };
|
||||
gms::feature truncate_as_topology_operation { *this, "TRUNCATE_AS_TOPOLOGY_OPERATION"sv };
|
||||
gms::feature snapshot_as_topology_operation { *this, "SNAPSHOT_AS_TOPOLOGY_OPERATION"sv };
|
||||
gms::feature secondary_indexes_on_static_columns { *this, "SECONDARY_INDEXES_ON_STATIC_COLUMNS"sv };
|
||||
gms::feature tablets { *this, "TABLETS"sv };
|
||||
gms::feature table_digest_insensitive_to_expiry { *this, "TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"sv };
|
||||
@@ -185,12 +183,12 @@ public:
|
||||
gms::feature size_based_load_balancing { *this, "SIZE_BASED_LOAD_BALANCING"sv };
|
||||
gms::feature topology_noop_request { *this, "TOPOLOGY_NOOP_REQUEST"sv };
|
||||
gms::feature tablets_intermediate_fallback_cleanup { *this, "TABLETS_INTERMEDIATE_FALLBACK_CLEANUP"sv };
|
||||
gms::feature batchlog_v2 { *this, "BATCHLOG_V2"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
static std::set<sstring> to_feature_set(sstring features_string);
|
||||
future<> enable_features_on_join(gossiper&, db::system_keyspace&, service::storage_service&);
|
||||
future<> on_system_tables_loaded(db::system_keyspace& sys_ks);
|
||||
|
||||
// Performs the feature check.
|
||||
|
||||
@@ -998,24 +998,6 @@ future<> gossiper::send_echo(locator::host_id host_id, std::chrono::milliseconds
|
||||
return ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + timeout_ms, _abort_source, generation_number, notify_up);
|
||||
}
|
||||
|
||||
future<> gossiper::failure_detector_loop_sleep(std::chrono::seconds duration) {
|
||||
bool sleep_expired = false;
|
||||
bool abort_requested = false;
|
||||
timer<> sleep_timer([&] {
|
||||
sleep_expired = true;
|
||||
_failure_detector_loop_cv.signal();
|
||||
});
|
||||
auto as_sub = _abort_source.subscribe([&] () noexcept {
|
||||
abort_requested = true;
|
||||
sleep_timer.cancel();
|
||||
_failure_detector_loop_cv.signal();
|
||||
});
|
||||
sleep_timer.arm(duration);
|
||||
while (is_enabled() && !sleep_expired && !abort_requested) {
|
||||
co_await _failure_detector_loop_cv.when();
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, generation_type gossip_generation, uint64_t live_endpoints_version) {
|
||||
auto last = gossiper::clk::now();
|
||||
auto diff = gossiper::clk::duration(0);
|
||||
@@ -1054,7 +1036,7 @@ future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, gene
|
||||
host_id, node, _live_endpoints, _live_endpoints_version, live_endpoints_version);
|
||||
co_return;
|
||||
} else {
|
||||
co_await failure_detector_loop_sleep(echo_interval);
|
||||
co_await sleep_abortable(echo_interval, _abort_source).handle_exception_type([] (const abort_requested_exception&) {});
|
||||
}
|
||||
}
|
||||
co_return;
|
||||
@@ -1070,7 +1052,7 @@ future<> gossiper::failure_detector_loop() {
|
||||
try {
|
||||
if (_live_endpoints.empty()) {
|
||||
logger.debug("failure_detector_loop: Wait until live_nodes={} is not empty", _live_endpoints);
|
||||
co_await failure_detector_loop_sleep(std::chrono::seconds(1));
|
||||
co_await sleep_abortable(std::chrono::seconds(1), _abort_source);
|
||||
continue;
|
||||
}
|
||||
auto nodes = _live_endpoints | std::ranges::to<std::vector>();
|
||||
@@ -2398,7 +2380,6 @@ future<> gossiper::do_stop_gossiping() {
|
||||
// Set disable flag and cancel the timer makes sure gossip loop will not be scheduled
|
||||
co_await container().invoke_on_all([] (gms::gossiper& g) {
|
||||
g._enabled = false;
|
||||
g._failure_detector_loop_cv.broadcast();
|
||||
});
|
||||
_scheduled_gossip_task.cancel();
|
||||
// Take the semaphore makes sure existing gossip loop is finished
|
||||
@@ -2514,6 +2495,26 @@ future<> gossiper::wait_alive(noncopyable_function<std::vector<locator::host_id>
|
||||
return wait_alive_helper(std::move(get_nodes), timeout);
|
||||
}
|
||||
|
||||
future<> gossiper::wait_for_live_nodes_to_show_up(size_t n) {
|
||||
logger::rate_limit rate_limit{std::chrono::seconds{5}};
|
||||
// Account for gossip slowness. 3 minutes is probably overkill but we don't want flaky tests.
|
||||
constexpr auto timeout_delay = std::chrono::minutes{3};
|
||||
auto timeout = gossiper::clk::now() + timeout_delay;
|
||||
while (get_live_members().size() < n) {
|
||||
if (timeout <= gossiper::clk::now()) {
|
||||
auto err = ::format("Timed out waiting for {} live nodes to show up in gossip", n);
|
||||
logger.error("{}", err);
|
||||
throw std::runtime_error{std::move(err)};
|
||||
}
|
||||
|
||||
logger.log(log_level::info, rate_limit,
|
||||
"Waiting for {} live nodes to show up in gossip, currently {} present...",
|
||||
n, get_live_members().size());
|
||||
co_await sleep_abortable(std::chrono::milliseconds(10), _abort_source);
|
||||
}
|
||||
logger.info("Live nodes seen in gossip: {}", get_live_members());
|
||||
}
|
||||
|
||||
const versioned_value* gossiper::get_application_state_ptr(locator::host_id endpoint, application_state appstate) const noexcept {
|
||||
auto eps = get_endpoint_state_ptr(std::move(endpoint));
|
||||
if (!eps) {
|
||||
@@ -2572,6 +2573,62 @@ std::string_view gossiper::get_gossip_status(const locator::host_id& endpoint) c
|
||||
return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS));
|
||||
}
|
||||
|
||||
future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std::optional<int32_t> force_after) const {
|
||||
static constexpr std::chrono::milliseconds GOSSIP_SETTLE_POLL_INTERVAL_MS{1000};
|
||||
static constexpr int32_t GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
|
||||
|
||||
if (force_after && *force_after == 0) {
|
||||
logger.warn("Skipped to wait for gossip to settle by user request since skip_wait_for_gossip_to_settle is set zero. Do not use this in production!");
|
||||
co_return;
|
||||
}
|
||||
|
||||
int32_t total_polls = 0;
|
||||
int32_t num_okay = 0;
|
||||
auto ep_size = _endpoint_state_map.size();
|
||||
|
||||
auto delay = initial_delay;
|
||||
|
||||
co_await sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source);
|
||||
while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
|
||||
co_await sleep_abortable(delay, _abort_source);
|
||||
delay = GOSSIP_SETTLE_POLL_INTERVAL_MS;
|
||||
|
||||
auto current_size = _endpoint_state_map.size();
|
||||
total_polls++;
|
||||
if (current_size == ep_size && _msg_processing == 0) {
|
||||
logger.debug("Gossip looks settled");
|
||||
num_okay++;
|
||||
} else {
|
||||
logger.info("Gossip not settled after {} polls.", total_polls);
|
||||
num_okay = 0;
|
||||
}
|
||||
ep_size = current_size;
|
||||
if (force_after && *force_after > 0 && total_polls > *force_after) {
|
||||
logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
|
||||
logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
|
||||
} else {
|
||||
logger.info("No gossip backlog; proceeding");
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::wait_for_gossip_to_settle() const {
|
||||
auto force_after = _gcfg.skip_wait_for_gossip_to_settle;
|
||||
if (force_after != 0) {
|
||||
co_await wait_for_gossip(GOSSIP_SETTLE_MIN_WAIT_MS, force_after);
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::wait_for_range_setup() const {
|
||||
logger.info("Waiting for pending range setup...");
|
||||
auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms);
|
||||
auto force_after = _gcfg.skip_wait_for_gossip_to_settle;
|
||||
return wait_for_gossip(ring_delay, force_after);
|
||||
}
|
||||
|
||||
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
|
||||
// We allow to bootstrap a new node in only two cases:
|
||||
// 1) The node is a completely new node and no state in gossip at all
|
||||
|
||||
@@ -66,6 +66,7 @@ struct gossip_config {
|
||||
uint32_t ring_delay_ms = 30 * 1000;
|
||||
uint32_t shadow_round_ms = 300 * 1000;
|
||||
uint32_t shutdown_announce_ms = 2 * 1000;
|
||||
uint32_t skip_wait_for_gossip_to_settle = -1;
|
||||
utils::updateable_value<uint32_t> failure_detector_timeout_ms;
|
||||
utils::updateable_value<int32_t> force_gossip_generation;
|
||||
utils::updateable_value<utils::UUID> recovery_leader;
|
||||
@@ -212,6 +213,8 @@ public:
|
||||
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
||||
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
||||
|
||||
static constexpr std::chrono::milliseconds GOSSIP_SETTLE_MIN_WAIT_MS{5000};
|
||||
|
||||
// Maximum difference between remote generation value and generation
|
||||
// value this node would get if this node were restarted that we are
|
||||
// willing to accept about a peer.
|
||||
@@ -522,6 +525,9 @@ public:
|
||||
future<> wait_alive(std::vector<locator::host_id> nodes, std::chrono::milliseconds timeout);
|
||||
future<> wait_alive(noncopyable_function<std::vector<locator::host_id>()> get_nodes, std::chrono::milliseconds timeout);
|
||||
|
||||
// Wait for `n` live nodes to show up in gossip (including ourself).
|
||||
future<> wait_for_live_nodes_to_show_up(size_t n);
|
||||
|
||||
// Get live members synchronized to all shards
|
||||
future<std::set<inet_address>> get_live_members_synchronized();
|
||||
|
||||
@@ -657,7 +663,12 @@ public:
|
||||
public:
|
||||
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
|
||||
std::string_view get_gossip_status(const locator::host_id& endpoint) const noexcept;
|
||||
public:
|
||||
future<> wait_for_gossip_to_settle() const;
|
||||
future<> wait_for_range_setup() const;
|
||||
private:
|
||||
future<> wait_for_gossip(std::chrono::milliseconds, std::optional<int32_t> = {}) const;
|
||||
|
||||
uint64_t _nr_run = 0;
|
||||
uint64_t _msg_processing = 0;
|
||||
|
||||
@@ -668,7 +679,6 @@ private:
|
||||
netw::messaging_service& _messaging;
|
||||
gossip_address_map& _address_map;
|
||||
gossip_config _gcfg;
|
||||
condition_variable _failure_detector_loop_cv;
|
||||
// Get features supported by a particular node
|
||||
std::set<sstring> get_supported_features(locator::host_id endpoint) const;
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
||||
@@ -695,7 +705,6 @@ public:
|
||||
private:
|
||||
future<> failure_detector_loop();
|
||||
future<> failure_detector_loop_for_node(locator::host_id node, generation_type gossip_generation, uint64_t live_endpoints_version);
|
||||
future<> failure_detector_loop_sleep(std::chrono::seconds duration);
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -26,6 +26,14 @@ struct group0_peer_exchange {
|
||||
std::variant<std::monostate, service::group0_info, std::vector<service::discovery_peer>> info;
|
||||
};
|
||||
|
||||
enum class group0_upgrade_state : uint8_t {
|
||||
recovery = 0,
|
||||
use_pre_raft_procedures = 1,
|
||||
synchronize = 2,
|
||||
use_post_raft_procedures = 3,
|
||||
};
|
||||
|
||||
verb [[with_client_info, cancellable]] get_group0_upgrade_state () -> service::group0_upgrade_state;
|
||||
verb [[with_client_info, with_timeout, ip]] group0_peer_exchange (std::vector<service::discovery_peer> peers) -> service::group0_peer_exchange;
|
||||
verb [[with_client_info, with_timeout]] group0_modify_config (raft::group_id gid, std::vector<raft::config_member> add, std::vector<raft::server_id> del);
|
||||
|
||||
|
||||
@@ -35,7 +35,6 @@ verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command
|
||||
verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]], std::optional<full_position> [[version 5.2.0]];
|
||||
verb [[with_timeout]] truncate (sstring, sstring);
|
||||
verb [[]] truncate_with_tablets (sstring ks_name, sstring cf_name, service::frozen_topology_guard frozen_guard);
|
||||
verb [[]] snapshot_with_tablets (utils::chunked_vector<table_id> table_ids, sstring tag, gc_clock::time_point created_at, bool, std::optional<gc_clock::time_point> expiry, service::frozen_topology_guard frozen_guard);
|
||||
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> service::paxos::prepare_response [[unique_ptr]];
|
||||
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> bool;
|
||||
verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]], service::fencing_token fence [[version 2025.4]]);
|
||||
|
||||
@@ -94,10 +94,7 @@ void secondary_index_manager::reload() {
|
||||
auto index_name = it->first;
|
||||
if (!table_indices.contains(index_name)) {
|
||||
it = _indices.erase(it);
|
||||
if (auto mi = _metrics.find(index_name); mi != _metrics.end()) {
|
||||
mi->second->deregister();
|
||||
_metrics.erase(mi);
|
||||
}
|
||||
_metrics.erase(index_name);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
@@ -239,10 +236,6 @@ stats::stats(const sstring& ks_name, const sstring& index_name) {
|
||||
.set_skip_when_empty()});
|
||||
}
|
||||
|
||||
void stats::deregister() {
|
||||
metrics.clear();
|
||||
}
|
||||
|
||||
void stats::add_latency(std::chrono::steady_clock::duration d) {
|
||||
query_latency.add(d);
|
||||
}
|
||||
|
||||
@@ -118,7 +118,6 @@ public:
|
||||
stats(const sstring& ks_name, const sstring& index_name);
|
||||
stats(const stats&) = delete;
|
||||
stats& operator=(const stats&) = delete;
|
||||
void deregister();
|
||||
void add_latency(std::chrono::steady_clock::duration d);
|
||||
};
|
||||
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
#include "index/vector_index.hh"
|
||||
#include "index/secondary_index.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include "index/target_parser.hh"
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
@@ -105,19 +104,6 @@ const static std::unordered_map<sstring, std::function<void(const sstring&, cons
|
||||
{"rescoring", std::bind_front(validate_enumerated_option, boolean_values)},
|
||||
};
|
||||
|
||||
sstring get_vector_index_target_column(const sstring& targets) {
|
||||
std::optional<rjson::value> json_value = rjson::try_parse(targets);
|
||||
if (!json_value || !json_value->IsObject()) {
|
||||
return target_parser::get_target_column_name_from_string(targets);
|
||||
}
|
||||
|
||||
rjson::value* pk = rjson::find(*json_value, "pk");
|
||||
if (pk && pk->IsArray() && !pk->Empty()) {
|
||||
return sstring(rjson::to_string_view(pk->GetArray()[0]));
|
||||
}
|
||||
return target_parser::get_target_column_name_from_string(targets);
|
||||
}
|
||||
|
||||
bool vector_index::is_rescoring_enabled(const index_options_map& properties) {
|
||||
auto q = properties.find("quantization");
|
||||
auto r = properties.find("rescoring");
|
||||
@@ -334,23 +320,16 @@ bool vector_index::has_vector_index(const schema& s) {
|
||||
|
||||
bool vector_index::has_vector_index_on_column(const schema& s, const sstring& target_name) {
|
||||
for (const auto& index : s.indices()) {
|
||||
if (is_vector_index_on_column(index, target_name)) {
|
||||
return true;
|
||||
auto class_it = index.options().find(db::index::secondary_index::custom_class_option_name);
|
||||
auto target_it = index.options().find(cql3_parser::index_target::target_option_name);
|
||||
if (class_it != index.options().end() && target_it != index.options().end()) {
|
||||
auto custom_class = secondary_index_manager::get_custom_class_factory(class_it->second);
|
||||
return custom_class && dynamic_cast<vector_index*>((*custom_class)().get()) && target_it->second == target_name;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool vector_index::is_vector_index_on_column(const index_metadata& im, const sstring& target_name) {
|
||||
auto class_it = im.options().find(db::index::secondary_index::custom_class_option_name);
|
||||
auto target_it = im.options().find(cql3_parser::index_target::target_option_name);
|
||||
if (class_it != im.options().end() && target_it != im.options().end()) {
|
||||
auto custom_class = secondary_index_manager::get_custom_class_factory(class_it->second);
|
||||
return custom_class && dynamic_cast<vector_index*>((*custom_class)().get()) && get_vector_index_target_column(target_it->second) == target_name;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Returns the schema version of the base table at which the index was created.
|
||||
/// This is used to determine if the index needs to be rebuilt after a schema change.
|
||||
/// The CREATE INDEX and DROP INDEX statements does change the schema version.
|
||||
|
||||
@@ -34,7 +34,6 @@ public:
|
||||
table_schema_version index_version(const schema& schema) override;
|
||||
static bool has_vector_index(const schema& s);
|
||||
static bool has_vector_index_on_column(const schema& s, const sstring& target_name);
|
||||
static bool is_vector_index_on_column(const index_metadata& im, const sstring& target_name);
|
||||
static void check_cdc_options(const schema& schema);
|
||||
|
||||
static bool is_rescoring_enabled(const index_options_map& properties);
|
||||
|
||||
@@ -80,6 +80,8 @@ fedora_packages=(
|
||||
gdb
|
||||
lua-devel
|
||||
yaml-cpp-devel
|
||||
antlr3-tool
|
||||
antlr3-C++-devel
|
||||
jsoncpp-devel
|
||||
rapidjson-devel
|
||||
snappy-devel
|
||||
@@ -236,69 +238,6 @@ arch_packages=(
|
||||
snappy
|
||||
)
|
||||
|
||||
ANTLR3_VERSION=3.5.3
|
||||
ANTLR3_JAR_URL="https://repo1.maven.org/maven2/org/antlr/antlr-complete/${ANTLR3_VERSION}/antlr-complete-${ANTLR3_VERSION}.jar"
|
||||
ANTLR3_JAR_SHA256=e781de9b3e2cc1297dfdaf656da946a1fd22f449bd9e0ce1e12d488976887f83
|
||||
ANTLR3_SOURCE_URL="https://github.com/antlr/antlr3/archive/${ANTLR3_VERSION}/antlr3-${ANTLR3_VERSION}.tar.gz"
|
||||
ANTLR3_SOURCE_SHA256=a0892bcf164573d539b930e57a87ea45333141863a0dd3a49e5d8c919c8a58ab
|
||||
# Patches from Fedora 43 (src.fedoraproject.org) that apply to the C++ headers
|
||||
ANTLR3_PATCHES=(
|
||||
0006-antlr3memory.hpp-fix-for-C-20-mode.patch
|
||||
0008-unconst-cyclicdfa-gcc-14.patch
|
||||
)
|
||||
|
||||
install_antlr3() {
|
||||
local prefix=/usr/local
|
||||
local jardir="${prefix}/share/java"
|
||||
local bindir="${prefix}/bin"
|
||||
local includedir="${prefix}/include"
|
||||
|
||||
if [ -f "${jardir}/antlr-complete-${ANTLR3_VERSION}.jar" ] \
|
||||
&& [ -f "${bindir}/antlr3" ] \
|
||||
&& [ -f "${includedir}/antlr3.hpp" ]; then
|
||||
echo "antlr3 ${ANTLR3_VERSION} already installed, skipping"
|
||||
return
|
||||
fi
|
||||
|
||||
local tmpdir
|
||||
tmpdir=$(mktemp -d)
|
||||
|
||||
# Download and install the complete JAR
|
||||
mkdir -p "${jardir}"
|
||||
curl -fSL -o "${tmpdir}/antlr-complete-${ANTLR3_VERSION}.jar" "${ANTLR3_JAR_URL}"
|
||||
echo "${ANTLR3_JAR_SHA256} ${tmpdir}/antlr-complete-${ANTLR3_VERSION}.jar" | sha256sum --check
|
||||
mv "${tmpdir}/antlr-complete-${ANTLR3_VERSION}.jar" "${jardir}/"
|
||||
|
||||
# Create the antlr3 wrapper script
|
||||
mkdir -p "${bindir}"
|
||||
cat > "${bindir}/antlr3" <<'WRAPPER'
|
||||
#!/bin/bash
|
||||
exec java -cp /usr/local/share/java/antlr-complete-ANTLR3_VERSION.jar org.antlr.Tool "$@"
|
||||
WRAPPER
|
||||
sed -i "s/ANTLR3_VERSION/${ANTLR3_VERSION}/" "${bindir}/antlr3"
|
||||
chmod +x "${bindir}/antlr3"
|
||||
|
||||
# Download and extract the source for C++ headers
|
||||
curl -fSL -o "${tmpdir}/antlr3-${ANTLR3_VERSION}.tar.gz" "${ANTLR3_SOURCE_URL}"
|
||||
echo "${ANTLR3_SOURCE_SHA256} ${tmpdir}/antlr3-${ANTLR3_VERSION}.tar.gz" | sha256sum --check
|
||||
tar -xzf "${tmpdir}/antlr3-${ANTLR3_VERSION}.tar.gz" -C "${tmpdir}"
|
||||
|
||||
# Apply patches to C++ headers
|
||||
local srcdir="${tmpdir}/antlr3-${ANTLR3_VERSION}"
|
||||
local patchdir
|
||||
patchdir="$(dirname "$0")/tools/antlr3-patches"
|
||||
for patch in "${ANTLR3_PATCHES[@]}"; do
|
||||
patch -d "${srcdir}" -p1 < "${patchdir}/${patch}"
|
||||
done
|
||||
|
||||
# Install C++ headers (header-only library)
|
||||
mkdir -p "${includedir}"
|
||||
install -m 644 "${srcdir}"/runtime/Cpp/include/* "${includedir}/"
|
||||
|
||||
rm -rf "${tmpdir}"
|
||||
echo "antlr3 ${ANTLR3_VERSION} installed to ${prefix}"
|
||||
}
|
||||
|
||||
go_arch() {
|
||||
local -A GO_ARCH=(
|
||||
["x86_64"]=amd64
|
||||
@@ -451,8 +390,6 @@ elif [ "$ID" = "fedora" ]; then
|
||||
fi
|
||||
dnf install -y "${fedora_packages[@]}" "${fedora_python3_packages[@]}"
|
||||
|
||||
install_antlr3
|
||||
|
||||
# Fedora 45 tightened key checks, and cassandra-stress is not signed yet.
|
||||
dnf install --no-gpgchecks -y https://github.com/scylladb/cassandra-stress/releases/download/v3.18.1/cassandra-stress-java21-3.18.1-1.noarch.rpm
|
||||
|
||||
|
||||
@@ -347,8 +347,8 @@ install -d -m755 "$retc"/scylla.d
|
||||
scylla_yaml_dir=$(mktemp -d)
|
||||
scylla_yaml=$scylla_yaml_dir/scylla.yaml
|
||||
grep -v api_ui_dir conf/scylla.yaml | grep -v api_doc_dir > $scylla_yaml
|
||||
echo "api_ui_dir: $prefix/swagger-ui/dist/" >> $scylla_yaml
|
||||
echo "api_doc_dir: $prefix/api/api-doc/" >> $scylla_yaml
|
||||
echo "api_ui_dir: /opt/scylladb/swagger-ui/dist/" >> $scylla_yaml
|
||||
echo "api_doc_dir: /opt/scylladb/api/api-doc/" >> $scylla_yaml
|
||||
installconfig 644 $scylla_yaml "$retc"/scylla
|
||||
rm -rf $scylla_yaml_dir
|
||||
|
||||
|
||||
20
lang/lua.cc
20
lang/lua.cc
@@ -283,6 +283,17 @@ concept CanHandleLuaTypes = requires(Func f) {
|
||||
{ f(*static_cast<const lua_table*>(nullptr)) } -> std::same_as<lua_visit_ret_type<Func>>;
|
||||
};
|
||||
|
||||
// This is used to test if a double fits in a long long, so
|
||||
// we expect overflows. Prevent the sanitizer from complaining.
|
||||
#ifdef __clang__
|
||||
[[clang::no_sanitize("undefined")]]
|
||||
#endif
|
||||
static
|
||||
long long
|
||||
cast_to_long_long_allow_overflow(double v) {
|
||||
return (long long)v;
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
requires CanHandleLuaTypes<Func>
|
||||
static auto visit_lua_value(lua_State* l, int index, Func&& f) {
|
||||
@@ -293,10 +304,9 @@ static auto visit_lua_value(lua_State* l, int index, Func&& f) {
|
||||
auto operator()(const long long& v) { return f(utils::multiprecision_int(v)); }
|
||||
auto operator()(const utils::multiprecision_int& v) { return f(v); }
|
||||
auto operator()(const double& v) {
|
||||
auto min = double(std::numeric_limits<long long>::min());
|
||||
auto max = double(std::numeric_limits<long long>::max());
|
||||
if (min <= v && v <= max && std::trunc(v) == v) {
|
||||
return (*this)((long long)v);
|
||||
long long v2 = cast_to_long_long_allow_overflow(v);
|
||||
if (v2 == v) {
|
||||
return (*this)(v2);
|
||||
}
|
||||
// FIXME: We could use frexp to produce a decimal instead of a double
|
||||
return f(v);
|
||||
@@ -743,7 +753,7 @@ struct from_lua_visitor {
|
||||
}
|
||||
|
||||
const data_type& elements_type = t.get_elements_type();
|
||||
vector_dimension_t num_elements = t.get_dimension();
|
||||
size_t num_elements = t.get_dimension();
|
||||
|
||||
using table_pair = std::pair<utils::multiprecision_int, data_value>;
|
||||
std::vector<table_pair> pairs;
|
||||
|
||||
@@ -616,16 +616,12 @@ tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topo
|
||||
return maybe_get_primary_replica(id, replicas, topo, [&] (const auto& _) { return true; }).value();
|
||||
}
|
||||
|
||||
tablet_replica tablet_map::get_secondary_replica(tablet_id id, const locator::topology& topo) const {
|
||||
const auto& orig_replicas = get_tablet_info(id).replicas;
|
||||
if (orig_replicas.size() < 2) {
|
||||
tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
|
||||
if (get_tablet_info(id).replicas.size() < 2) {
|
||||
throw std::runtime_error(format("No secondary replica for tablet id {}", id));
|
||||
}
|
||||
tablet_replica_set replicas = orig_replicas;
|
||||
std::ranges::sort(replicas, tablet_replica_comparator(topo));
|
||||
// This formula must match the one in get_primary_replica(),
|
||||
// just with + 1.
|
||||
return replicas.at((size_t(id) + size_t(id) / replicas.size() + 1) % replicas.size());
|
||||
const auto& replicas = get_tablet_info(id).replicas;
|
||||
return replicas.at((size_t(id)+1) % replicas.size());
|
||||
}
|
||||
|
||||
std::optional<tablet_replica> tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const {
|
||||
|
||||
@@ -648,10 +648,9 @@ public:
|
||||
/// Returns the primary replica for the tablet
|
||||
tablet_replica get_primary_replica(tablet_id id, const locator::topology& topo) const;
|
||||
|
||||
/// Returns the secondary replica for the tablet: the replica that immediately follows the primary
|
||||
/// replica in the topology-sorted replica list.
|
||||
/// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector
|
||||
/// \throws std::runtime_error if the tablet has less than 2 replicas.
|
||||
tablet_replica get_secondary_replica(tablet_id id, const locator::topology& topo) const;
|
||||
tablet_replica get_secondary_replica(tablet_id id) const;
|
||||
|
||||
// Returns the replica that matches hosts and dcs filters for tablet_task_info.
|
||||
std::optional<tablet_replica> maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const;
|
||||
|
||||
38
main.cc
38
main.cc
@@ -1617,6 +1617,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
gcfg.ring_delay_ms = cfg->ring_delay_ms();
|
||||
gcfg.shadow_round_ms = cfg->shadow_round_ms();
|
||||
gcfg.shutdown_announce_ms = cfg->shutdown_announce_in_ms();
|
||||
gcfg.skip_wait_for_gossip_to_settle = cfg->skip_wait_for_gossip_to_settle();
|
||||
gcfg.group0_id = group0_id;
|
||||
gcfg.host_id = host_id;
|
||||
gcfg.failure_detector_timeout_ms = cfg->failure_detector_timeout_in_ms;
|
||||
@@ -1676,7 +1677,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
service::raft_group0 group0_service{
|
||||
stop_signal.as_local_abort_source(), raft_gr.local(), messaging,
|
||||
gossiper.local(), feature_service.local(), group0_client, dbcfg.gossip_scheduling_group};
|
||||
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client, dbcfg.gossip_scheduling_group};
|
||||
|
||||
checkpoint(stop_signal, "starting tablet allocator");
|
||||
service::tablet_allocator::config tacfg {
|
||||
@@ -2110,7 +2111,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
db::snapshot_ctl::config snap_cfg = {
|
||||
.backup_sched_group = dbcfg.streaming_scheduling_group,
|
||||
};
|
||||
snapshot_ctl.start(std::ref(db), std::ref(proxy), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
|
||||
snapshot_ctl.start(std::ref(db), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
|
||||
auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl] {
|
||||
snapshot_ctl.stop().get();
|
||||
});
|
||||
@@ -2414,7 +2415,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
bm_cfg.delay = std::chrono::milliseconds(cfg->ring_delay_ms());
|
||||
bm_cfg.replay_cleanup_after_replays = cfg->batchlog_replay_cleanup_after_replays();
|
||||
|
||||
bm.start(std::ref(qp), std::ref(sys_ks), std::ref(feature_service), bm_cfg).get();
|
||||
bm.start(std::ref(qp), std::ref(sys_ks), bm_cfg).get();
|
||||
auto stop_batchlog_manager = defer_verbose_shutdown("batchlog manager", [&bm] {
|
||||
bm.stop().get();
|
||||
});
|
||||
@@ -2447,6 +2448,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
view_backlog_broker.stop().get();
|
||||
});
|
||||
|
||||
if (!ss.local().raft_topology_change_enabled()) {
|
||||
startlog.info("Waiting for gossip to settle before accepting client requests...");
|
||||
gossiper.local().wait_for_gossip_to_settle().get();
|
||||
}
|
||||
|
||||
checkpoint(stop_signal, "allow replaying hints");
|
||||
proxy.invoke_on_all(&service::storage_proxy::allow_replaying_hints).get();
|
||||
|
||||
@@ -2506,18 +2512,22 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
sharded<alternator::expiration_service> es;
|
||||
std::any stop_expiration_service;
|
||||
|
||||
// Start the expiration service on all shards.
|
||||
// This service is used both by Alternator (for its TTL feature)
|
||||
// and by CQL (for its per-row TTL feature).
|
||||
checkpoint(stop_signal, "starting the expiration service");
|
||||
es.start(seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db)),
|
||||
if (cfg->alternator_port() || cfg->alternator_https_port()) {
|
||||
// Start the expiration service on all shards.
|
||||
// Currently we only run it if Alternator is enabled, because
|
||||
// only Alternator uses it for its TTL feature. But in the
|
||||
// future if we add a CQL interface to it, we may want to
|
||||
// start this outside the Alternator if().
|
||||
checkpoint(stop_signal, "starting the expiration service");
|
||||
es.start(seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db)),
|
||||
std::ref(proxy), std::ref(gossiper)).get();
|
||||
stop_expiration_service = defer_verbose_shutdown("expiration service", [&es] {
|
||||
es.stop().get();
|
||||
});
|
||||
with_scheduling_group(maintenance_scheduling_group, [&es] {
|
||||
return es.invoke_on_all(&alternator::expiration_service::start);
|
||||
}).get();
|
||||
stop_expiration_service = defer_verbose_shutdown("expiration service", [&es] {
|
||||
es.stop().get();
|
||||
});
|
||||
with_scheduling_group(maintenance_scheduling_group, [&es] {
|
||||
return es.invoke_on_all(&alternator::expiration_service::start);
|
||||
}).get();
|
||||
}
|
||||
|
||||
db.invoke_on_all(&replica::database::revert_initial_system_read_concurrency_boost).get();
|
||||
notify_set.notify_all(configurable::system_state::started).get();
|
||||
|
||||
@@ -728,7 +728,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::TABLE_LOAD_STATS_V1:
|
||||
case messaging_verb::TABLE_LOAD_STATS:
|
||||
case messaging_verb::WORK_ON_VIEW_BUILDING_TASKS:
|
||||
case messaging_verb::SNAPSHOT_WITH_TABLETS:
|
||||
return 1;
|
||||
case messaging_verb::CLIENT_ID:
|
||||
case messaging_verb::MUTATION:
|
||||
|
||||
@@ -210,8 +210,7 @@ enum class messaging_verb : int32_t {
|
||||
REPAIR_UPDATE_REPAIRED_AT_FOR_MERGE = 81,
|
||||
WORK_ON_VIEW_BUILDING_TASKS = 82,
|
||||
NOTIFY_BANNED = 83,
|
||||
SNAPSHOT_WITH_TABLETS = 84,
|
||||
LAST = 85,
|
||||
LAST = 84,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
|
||||
19
node_ops/CMakeLists.txt
Normal file
19
node_ops/CMakeLists.txt
Normal file
@@ -0,0 +1,19 @@
|
||||
add_library(node_ops STATIC)
|
||||
target_sources(node_ops
|
||||
PRIVATE
|
||||
node_ops_ctl.cc)
|
||||
target_include_directories(node_ops
|
||||
PUBLIC
|
||||
${CMAKE_SOURCE_DIR})
|
||||
target_link_libraries(node_ops
|
||||
PUBLIC
|
||||
utils
|
||||
Seastar::seastar
|
||||
PRIVATE
|
||||
service)
|
||||
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
||||
target_precompile_headers(node_ops REUSE_FROM scylla-precompiled-header)
|
||||
endif()
|
||||
|
||||
check_headers(check-headers node_ops
|
||||
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)
|
||||
210
node_ops/node_ops_ctl.cc
Normal file
210
node_ops/node_ops_ctl.cc
Normal file
@@ -0,0 +1,210 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "node_ops/node_ops_ctl.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "idl/node_ops.dist.hh"
|
||||
|
||||
static logging::logger nlogger("node_ops");
|
||||
|
||||
node_ops_ctl::node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid)
|
||||
: ss(ss_)
|
||||
, host_id(id)
|
||||
, endpoint(ep)
|
||||
, tmptr(ss.get_token_metadata_ptr())
|
||||
, req(cmd, uuid)
|
||||
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
|
||||
{}
|
||||
|
||||
node_ops_ctl::~node_ops_ctl() {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
on_internal_error_noexcept(nlogger, "node_ops_ctl destroyed without stopping");
|
||||
}
|
||||
}
|
||||
|
||||
const node_ops_id& node_ops_ctl::uuid() const noexcept {
|
||||
return req.ops_uuid;
|
||||
}
|
||||
|
||||
// may be called multiple times
|
||||
void node_ops_ctl::start(sstring desc_, std::function<bool(locator::host_id)> sync_to_node) {
|
||||
desc = std::move(desc_);
|
||||
|
||||
nlogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
|
||||
|
||||
refresh_sync_nodes(std::move(sync_to_node));
|
||||
}
|
||||
|
||||
void node_ops_ctl::refresh_sync_nodes(std::function<bool(locator::host_id)> sync_to_node) {
|
||||
// sync data with all normal token owners
|
||||
sync_nodes.clear();
|
||||
auto can_sync_with_node = [] (const locator::node& node) {
|
||||
// Sync with reachable token owners.
|
||||
// Note that although nodes in `being_replaced` and `being_removed`
|
||||
// are still token owners, they are known to be dead and can't be sync'ed with.
|
||||
switch (node.get_state()) {
|
||||
case locator::node::state::normal:
|
||||
case locator::node::state::being_decommissioned:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
};
|
||||
tmptr->for_each_token_owner([&] (const locator::node& node) {
|
||||
seastar::thread::maybe_yield();
|
||||
// FIXME: use node* rather than endpoint
|
||||
auto endpoint = node.host_id();
|
||||
if (!ignore_nodes.contains(endpoint) && can_sync_with_node(node) && sync_to_node(endpoint)) {
|
||||
sync_nodes.insert(endpoint);
|
||||
}
|
||||
});
|
||||
|
||||
for (auto& node : sync_nodes) {
|
||||
if (!ss.gossiper().is_alive(node)) {
|
||||
nodes_down.emplace(node);
|
||||
}
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
|
||||
nlogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
nlogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::stop() noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
}
|
||||
|
||||
// Caller should set the required req members before prepare
|
||||
future<> node_ops_ctl::prepare(node_ops_cmd cmd) noexcept {
|
||||
return send_to_all(cmd);
|
||||
}
|
||||
|
||||
void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
on_internal_error(nlogger, "heartbeat_updater already started");
|
||||
}
|
||||
heartbeat_updater_done_fut = heartbeat_updater(cmd);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::query_pending_op() {
|
||||
req.cmd = node_ops_cmd::query_pending_ops;
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [this] (const locator::host_id& node) -> future<> {
|
||||
auto resp = co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
|
||||
nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
|
||||
if (std::ranges::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
|
||||
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::stop_heartbeat_updater() noexcept {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
as.request_abort();
|
||||
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
|
||||
}
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::done(node_ops_cmd cmd) noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
co_await send_to_all(cmd);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::abort(node_ops_cmd cmd) noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
co_await send_to_all(cmd);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
|
||||
nlogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
|
||||
try {
|
||||
co_await abort(cmd);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
|
||||
}
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) {
|
||||
req.cmd = cmd;
|
||||
req.ignore_nodes = ignore_nodes |
|
||||
std::views::transform([&] (locator::host_id id) { return ss.gossiper().get_address_map().get(id); }) |
|
||||
std::ranges::to<std::list>();
|
||||
sstring op_desc = ::format("{}", cmd);
|
||||
nlogger.info("{}[{}]: Started {}", desc, uuid(), req);
|
||||
auto cmd_category = categorize_node_ops_cmd(cmd);
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const locator::host_id& node) -> future<> {
|
||||
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
|
||||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
|
||||
// Note that we still send abort commands to failed nodes.
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
|
||||
nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
|
||||
} catch (const seastar::rpc::unknown_verb_error&) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
nlogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
|
||||
} else {
|
||||
nlogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
|
||||
}
|
||||
nodes_unknown_verb.emplace(node);
|
||||
} catch (const seastar::rpc::closed_error&) {
|
||||
nlogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc);
|
||||
nodes_down.emplace(node);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
|
||||
nodes_failed.emplace(node);
|
||||
}
|
||||
});
|
||||
std::vector<sstring> errors;
|
||||
if (!nodes_failed.empty()) {
|
||||
errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed));
|
||||
}
|
||||
if (!nodes_unknown_verb.empty()) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
|
||||
} else {
|
||||
errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
|
||||
}
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down));
|
||||
}
|
||||
if (!errors.empty()) {
|
||||
co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; "))));
|
||||
}
|
||||
nlogger.info("{}[{}]: Finished {}", desc, uuid(), req);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) {
|
||||
nlogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
|
||||
while (!as.abort_requested()) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const locator::host_id& node) -> future<> {
|
||||
try {
|
||||
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
|
||||
nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
|
||||
};
|
||||
});
|
||||
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
|
||||
}
|
||||
nlogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
|
||||
}
|
||||
@@ -18,7 +18,15 @@
|
||||
#include <seastar/core/abort_source.hh>
|
||||
|
||||
#include <list>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace service {
|
||||
class storage_service;
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
class token_metadata;
|
||||
}
|
||||
|
||||
class node_ops_info {
|
||||
public:
|
||||
@@ -58,6 +66,17 @@ enum class node_ops_cmd : uint32_t {
|
||||
repair_updater,
|
||||
};
|
||||
|
||||
enum class node_ops_cmd_category {
|
||||
prepare,
|
||||
heartbeat,
|
||||
sync_data,
|
||||
abort,
|
||||
done,
|
||||
other
|
||||
};
|
||||
|
||||
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept;
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<node_ops_cmd> : fmt::formatter<string_view> {
|
||||
auto format(node_ops_cmd, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
@@ -108,6 +127,43 @@ struct node_ops_cmd_response {
|
||||
}
|
||||
};
|
||||
|
||||
class node_ops_ctl {
|
||||
std::unordered_set<locator::host_id> nodes_unknown_verb;
|
||||
std::unordered_set<locator::host_id> nodes_down;
|
||||
std::unordered_set<locator::host_id> nodes_failed;
|
||||
|
||||
public:
|
||||
const service::storage_service& ss;
|
||||
sstring desc;
|
||||
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
|
||||
gms::inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
|
||||
lw_shared_ptr<const locator::token_metadata> tmptr;
|
||||
std::unordered_set<locator::host_id> sync_nodes;
|
||||
std::unordered_set<locator::host_id> ignore_nodes;
|
||||
node_ops_cmd_request req;
|
||||
std::chrono::seconds heartbeat_interval;
|
||||
abort_source as;
|
||||
std::optional<future<>> heartbeat_updater_done_fut;
|
||||
|
||||
explicit node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id());
|
||||
~node_ops_ctl();
|
||||
const node_ops_id& uuid() const noexcept;
|
||||
// may be called multiple times
|
||||
void start(sstring desc_, std::function<bool(locator::host_id)> sync_to_node = [] (locator::host_id) { return true; });
|
||||
void refresh_sync_nodes(std::function<bool(locator::host_id)> sync_to_node = [] (locator::host_id) { return true; });
|
||||
future<> stop() noexcept;
|
||||
// Caller should set the required req members before prepare
|
||||
future<> prepare(node_ops_cmd cmd) noexcept;
|
||||
void start_heartbeat_updater(node_ops_cmd cmd);
|
||||
future<> query_pending_op();
|
||||
future<> stop_heartbeat_updater() noexcept;
|
||||
future<> done(node_ops_cmd cmd) noexcept;
|
||||
future<> abort(node_ops_cmd cmd) noexcept;
|
||||
future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept;
|
||||
future<> send_to_all(node_ops_cmd cmd);
|
||||
future<> heartbeat_updater(node_ops_cmd cmd);
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<node_ops_cmd_request> : fmt::formatter<string_view> {
|
||||
auto format(const node_ops_cmd_request&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
|
||||
size 6505524
|
||||
oid sha256:9034610470ff645fab03da5ad6c690e5b41f3307ea4b529c7e63b0786a1289ed
|
||||
size 6539600
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
|
||||
size 6502668
|
||||
oid sha256:0c4bbf51dbe01d684ea5b9a9157781988ed499604d2fde90143bad0b9a5594f0
|
||||
size 6543944
|
||||
|
||||
@@ -375,12 +375,9 @@ public:
|
||||
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
|
||||
}
|
||||
|
||||
auto ex = named_semaphore_aborted(_semaphore._name);
|
||||
_ex = std::make_exception_ptr(ex);
|
||||
|
||||
_ttl_timer.cancel();
|
||||
_state = reader_permit::state::preemptive_aborted;
|
||||
_aux_data.pr.set_exception(ex);
|
||||
_aux_data.pr.set_exception(named_semaphore_aborted(_semaphore._name));
|
||||
_semaphore.on_permit_preemptive_aborted();
|
||||
}
|
||||
|
||||
@@ -1526,18 +1523,12 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
// Do not admit the read as it is unlikely to finish before its timeout. The condition is:
|
||||
// permit's remaining time <= preemptive_abort_factor * permit's time budget
|
||||
//
|
||||
// The additional checks are needed:
|
||||
// + remaining_time > 0 -- to avoid preemptive aborting reads that already timed out but are still
|
||||
// in the wait list due to scheduling delays. It also effectively disables
|
||||
// preemptive aborting when the factor is set to 0
|
||||
// + permit.timeout() < db::no_timeout -- to avoid preemptively aborting reads without timeout.
|
||||
// Useful is tests when _preemptive_abort_factor is set to 1.0
|
||||
// to avoid additional sleeps to wait for the read to be shed.
|
||||
// The additional check for remaining_time > 0 is to avoid preemptive aborting reads
|
||||
// that already timed out but are still in the wait list due to scheduling delays.
|
||||
// It also effectively disables preemptive aborting when the factor is set to 0.
|
||||
const auto time_budget = permit.timeout() - permit.created();
|
||||
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
|
||||
if (remaining_time > db::timeout_clock::duration::zero() &&
|
||||
permit.timeout() < db::no_timeout &&
|
||||
remaining_time <= _preemptive_abort_factor() * time_budget) {
|
||||
if (remaining_time > db::timeout_clock::duration::zero() && remaining_time <= _preemptive_abort_factor() * time_budget) {
|
||||
permit.on_preemptive_aborted();
|
||||
using ms = std::chrono::milliseconds;
|
||||
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
|
||||
|
||||
@@ -2505,6 +2505,40 @@ future<std::optional<double>> repair::tablet_repair_task_impl::expected_total_wo
|
||||
co_return sz ? std::make_optional<double>(sz) : std::nullopt;
|
||||
}
|
||||
|
||||
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept {
|
||||
switch (cmd) {
|
||||
case node_ops_cmd::removenode_prepare:
|
||||
case node_ops_cmd::replace_prepare:
|
||||
case node_ops_cmd::decommission_prepare:
|
||||
case node_ops_cmd::bootstrap_prepare:
|
||||
return node_ops_cmd_category::prepare;
|
||||
|
||||
case node_ops_cmd::removenode_heartbeat:
|
||||
case node_ops_cmd::replace_heartbeat:
|
||||
case node_ops_cmd::decommission_heartbeat:
|
||||
case node_ops_cmd::bootstrap_heartbeat:
|
||||
return node_ops_cmd_category::heartbeat;
|
||||
|
||||
case node_ops_cmd::removenode_sync_data:
|
||||
return node_ops_cmd_category::sync_data;
|
||||
|
||||
case node_ops_cmd::removenode_abort:
|
||||
case node_ops_cmd::replace_abort:
|
||||
case node_ops_cmd::decommission_abort:
|
||||
case node_ops_cmd::bootstrap_abort:
|
||||
return node_ops_cmd_category::abort;
|
||||
|
||||
case node_ops_cmd::removenode_done:
|
||||
case node_ops_cmd::replace_done:
|
||||
case node_ops_cmd::decommission_done:
|
||||
case node_ops_cmd::bootstrap_done:
|
||||
return node_ops_cmd_category::done;
|
||||
|
||||
default:
|
||||
return node_ops_cmd_category::other;
|
||||
}
|
||||
}
|
||||
|
||||
auto fmt::formatter<node_ops_cmd>::format(node_ops_cmd cmd, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
std::string_view name;
|
||||
|
||||
@@ -2633,7 +2633,7 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
all_replayed = co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
|
||||
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time));
|
||||
}
|
||||
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={} all_replayed={}", req.repair_uuid, from, issue_flush, all_replayed);
|
||||
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={}", req.repair_uuid, from, issue_flush);
|
||||
}
|
||||
);
|
||||
if (!all_replayed) {
|
||||
|
||||
@@ -3304,13 +3304,6 @@ db::replay_position table::highest_flushed_replay_position() const {
|
||||
return _highest_flushed_rp;
|
||||
}
|
||||
|
||||
struct snapshot_tablet_info {
|
||||
size_t id;
|
||||
dht::token first_token, last_token;
|
||||
db_clock::time_point repair_time;
|
||||
int64_t repaired_at;
|
||||
};
|
||||
|
||||
struct manifest_json : public json::json_base {
|
||||
struct info : public json::json_base {
|
||||
json::json_element<sstring> version;
|
||||
@@ -3443,7 +3436,6 @@ struct manifest_json : public json::json_base {
|
||||
json::json_element<uint64_t> index_size;
|
||||
json::json_element<int64_t> first_token;
|
||||
json::json_element<int64_t> last_token;
|
||||
json::json_element<uint64_t> tablet_id;
|
||||
|
||||
sstable_info() {
|
||||
register_params();
|
||||
@@ -3456,9 +3448,6 @@ struct manifest_json : public json::json_base {
|
||||
index_size = e.index_size;
|
||||
first_token = e.first_token;
|
||||
last_token = e.last_token;
|
||||
if (e.tablet_id) {
|
||||
tablet_id = *e.tablet_id;
|
||||
}
|
||||
}
|
||||
sstable_info(const sstable_info& e) {
|
||||
register_params();
|
||||
@@ -3468,7 +3457,6 @@ struct manifest_json : public json::json_base {
|
||||
index_size = e.index_size;
|
||||
first_token = e.first_token;
|
||||
last_token = e.last_token;
|
||||
tablet_id = e.tablet_id;
|
||||
}
|
||||
sstable_info(sstable_info&& e) {
|
||||
register_params();
|
||||
@@ -3478,7 +3466,6 @@ struct manifest_json : public json::json_base {
|
||||
index_size = e.index_size;
|
||||
first_token = e.first_token;
|
||||
last_token = e.last_token;
|
||||
tablet_id = e.tablet_id;
|
||||
}
|
||||
sstable_info& operator=(sstable_info&& e) {
|
||||
id = e.id;
|
||||
@@ -3487,7 +3474,6 @@ struct manifest_json : public json::json_base {
|
||||
index_size = e.index_size;
|
||||
first_token = e.first_token;
|
||||
last_token = e.last_token;
|
||||
tablet_id = e.tablet_id;
|
||||
return *this;
|
||||
}
|
||||
private:
|
||||
@@ -3498,51 +3484,6 @@ struct manifest_json : public json::json_base {
|
||||
add(&index_size, "index_size");
|
||||
add(&first_token, "first_token");
|
||||
add(&last_token, "last_token");
|
||||
add(&tablet_id, "tablet_id");
|
||||
}
|
||||
};
|
||||
|
||||
struct tablet_info : public json::json_base {
|
||||
json::json_element<uint64_t> id;
|
||||
json::json_element<int64_t> first_token;
|
||||
json::json_element<int64_t> last_token;
|
||||
json::json_element<time_t> repair_time;
|
||||
json::json_element<int64_t> repaired_at;
|
||||
|
||||
tablet_info() {
|
||||
register_params();
|
||||
}
|
||||
tablet_info(const snapshot_tablet_info& e) {
|
||||
register_params();
|
||||
id = e.id;
|
||||
first_token = dht::token::to_int64(e.first_token);
|
||||
last_token = dht::token::to_int64(e.last_token);
|
||||
repair_time = db_clock::to_time_t(e.repair_time);
|
||||
repaired_at = e.repaired_at;
|
||||
}
|
||||
tablet_info(const tablet_info& e) {
|
||||
register_params();
|
||||
id = e.id;
|
||||
first_token = e.first_token;
|
||||
last_token = e.last_token;
|
||||
repair_time = e.repair_time;
|
||||
repaired_at = e.repaired_at;
|
||||
}
|
||||
tablet_info& operator=(tablet_info&& e) {
|
||||
id = e.id;
|
||||
first_token = e.first_token;
|
||||
last_token = e.last_token;
|
||||
repair_time = e.repair_time;
|
||||
repaired_at = e.repaired_at;
|
||||
return *this;
|
||||
}
|
||||
private:
|
||||
void register_params() {
|
||||
add(&id, "id");
|
||||
add(&first_token, "first_token");
|
||||
add(&last_token, "last_token");
|
||||
add(&repair_time, "repair_time");
|
||||
add(&repaired_at, "repaired_at");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3551,7 +3492,6 @@ struct manifest_json : public json::json_base {
|
||||
json::json_element<snapshot_info> snapshot;
|
||||
json::json_element<table_info> table;
|
||||
json::json_chunked_list<sstable_info> sstables;
|
||||
json::json_chunked_list<tablet_info> tablets;
|
||||
|
||||
manifest_json() {
|
||||
register_params();
|
||||
@@ -3563,7 +3503,6 @@ struct manifest_json : public json::json_base {
|
||||
snapshot = std::move(e.snapshot);
|
||||
table = std::move(e.table);
|
||||
sstables = std::move(e.sstables);
|
||||
tablets = std::move(e.tablets);
|
||||
}
|
||||
manifest_json& operator=(manifest_json&& e) {
|
||||
if (this != &e) {
|
||||
@@ -3572,7 +3511,6 @@ struct manifest_json : public json::json_base {
|
||||
snapshot = std::move(e.snapshot);
|
||||
table = std::move(e.table);
|
||||
sstables = std::move(e.sstables);
|
||||
tablets = std::move(e.tablets);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -3583,7 +3521,6 @@ private:
|
||||
add(&snapshot, "snapshot");
|
||||
add(&table, "table");
|
||||
add(&sstables, "sstables");
|
||||
add(&tablets, "tablets");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3597,7 +3534,7 @@ public:
|
||||
|
||||
using snapshot_sstable_set = foreign_ptr<std::unique_ptr<utils::chunked_vector<sstables::sstable_snapshot_metadata>>>;
|
||||
|
||||
static future<> write_manifest(const locator::topology& topology, snapshot_writer& writer, std::vector<snapshot_sstable_set> sstable_sets, std::vector<snapshot_tablet_info> tablets, sstring name, db::snapshot_options opts, schema_ptr schema, std::optional<int64_t> tablet_count) {
|
||||
static future<> write_manifest(const locator::topology& topology, snapshot_writer& writer, std::vector<snapshot_sstable_set> sstable_sets, sstring name, db::snapshot_options opts, schema_ptr schema, std::optional<int64_t> tablet_count) {
|
||||
manifest_json manifest;
|
||||
|
||||
manifest_json::info info;
|
||||
@@ -3633,11 +3570,6 @@ static future<> write_manifest(const locator::topology& topology, snapshot_write
|
||||
manifest.sstables.push(manifest_json::sstable_info(md));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& sti : tablets) {
|
||||
manifest.tablets.push(manifest_json::tablet_info(sti));
|
||||
}
|
||||
|
||||
auto streamer = json::stream_object(std::move(manifest));
|
||||
auto out = co_await writer.stream_for("manifest.json");
|
||||
std::exception_ptr ex;
|
||||
@@ -3762,33 +3694,11 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, c
|
||||
tlogger.debug("snapshot {}: seal_snapshot", name);
|
||||
const auto& topology = sharded_db.local().get_token_metadata().get_topology();
|
||||
std::optional<int64_t> min_tablet_count;
|
||||
std::vector<snapshot_tablet_info> tablets;
|
||||
std::unordered_set<size_t> tids;
|
||||
if (t.uses_tablets()) {
|
||||
SCYLLA_ASSERT(!tablet_counts.empty());
|
||||
min_tablet_count = *std::ranges::min_element(tablet_counts);
|
||||
|
||||
auto erm = t.get_effective_replication_map();
|
||||
auto& tm = erm->get_token_metadata().tablets().get_tablet_map(s->id());
|
||||
for (auto& ssts : sstable_sets) {
|
||||
for (auto& sst : *ssts) {
|
||||
auto tok = sst.first_token;
|
||||
auto tid = tm.get_tablet_id(dht::token::from_int64(tok));
|
||||
sst.tablet_id = tid.id;
|
||||
if (tids.emplace(tid.id).second) {
|
||||
auto& tinfo = tm.get_tablet_info(tid);
|
||||
tablets.emplace_back(snapshot_tablet_info{
|
||||
.id = tid.id,
|
||||
.first_token = tm.get_first_token(tid),
|
||||
.last_token = tm.get_last_token(tid),
|
||||
.repair_time = tinfo.repair_time,
|
||||
.repaired_at = tinfo.sstables_repaired_at,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, std::move(opts), s, min_tablet_count).handle_exception([&] (std::exception_ptr ptr) {
|
||||
co_await write_manifest(topology, *writer, std::move(sstable_sets), name, std::move(opts), s, min_tablet_count).handle_exception([&] (std::exception_ptr ptr) {
|
||||
tlogger.error("Failed to seal snapshot in {}: {}.", name, ptr);
|
||||
ex = std::move(ptr);
|
||||
});
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user