Compare commits

..

3 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
91aab869b8 Fix exception name and remove unnecessary template keyword
- Use correct exception type: bufsize_mismatch_exception instead of bufsize_mismatch_error
- Remove unnecessary template keyword for read<pos_type>()
- Match parameter order with existing pattern (actual, expected)

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-11-06 12:01:01 +00:00
copilot-swe-agent[bot]
168e0a40e3 Fix oversized allocation in sstables::parse by using fragmented buffer
This addresses issue where reading summary positions could cause
large contiguous memory allocations (249856 bytes reported).
Added read_exactly_fragmented() method to random_access_reader to
support reading into fragmented buffers, avoiding oversized allocations.

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-11-06 11:57:10 +00:00
copilot-swe-agent[bot]
ac54f21504 Initial plan 2025-11-06 11:50:13 +00:00
174 changed files with 2711 additions and 7117 deletions

View File

@@ -142,31 +142,20 @@ def backport(repo, pr, version, commits, backport_base_branch, is_collaborator):
def with_github_keyword_prefix(repo, pr):
# GitHub issue pattern: #123, scylladb/scylladb#123, or full GitHub URLs
github_pattern = rf"(?:fix(?:|es|ed))\s*:?\s*(?:(?:(?:{repo.full_name})?#)|https://github\.com/{repo.full_name}/issues/)(\d+)"
# JIRA issue pattern: PKG-92 or https://scylladb.atlassian.net/browse/PKG-92
jira_pattern = r"(?:fix(?:|es|ed))\s*:?\s*(?:(?:https://scylladb\.atlassian\.net/browse/)?([A-Z]+-\d+))"
# Check PR body for GitHub issues
github_match = re.findall(github_pattern, pr.body, re.IGNORECASE)
# Check PR body for JIRA issues
jira_match = re.findall(jira_pattern, pr.body, re.IGNORECASE)
match = github_match or jira_match
if match:
pattern = rf"(?:fix(?:|es|ed))\s*:?\s*(?:(?:(?:{repo.full_name})?#)|https://github\.com/{repo.full_name}/issues/)(\d+)"
match = re.findall(pattern, pr.body, re.IGNORECASE)
if not match:
for commit in pr.get_commits():
match = re.findall(pattern, commit.commit.message, re.IGNORECASE)
if match:
print(f'{pr.number} has a valid close reference in commit message {commit.sha}')
break
if not match:
print(f'No valid close reference for {pr.number}')
return False
else:
return True
for commit in pr.get_commits():
github_match = re.findall(github_pattern, commit.commit.message, re.IGNORECASE)
jira_match = re.findall(jira_pattern, commit.commit.message, re.IGNORECASE)
if github_match or jira_match:
print(f'{pr.number} has a valid close reference in commit message {commit.sha}')
return True
print(f'No valid close reference for {pr.number}')
return False
def main():
args = parse_args()

View File

@@ -1,242 +0,0 @@
name: Trigger next gating
on:
pull_request_target:
types: [opened, reopened, synchronize]
issue_comment:
types: [created]
jobs:
trigger-ci:
runs-on: ubuntu-latest
steps:
- name: Dump GitHub context
env:
GITHUB_CONTEXT: ${{ toJson(github) }}
run: echo "$GITHUB_CONTEXT"
- name: Checkout PR code
uses: actions/checkout@v3
with:
fetch-depth: 0 # Needed to access full history
ref: ${{ github.event.pull_request.head.ref }}
- name: Fetch before commit if needed
run: |
if ! git cat-file -e ${{ github.event.before }} 2>/dev/null; then
echo "Fetching before commit ${{ github.event.before }}"
git fetch --depth=1 origin ${{ github.event.before }}
fi
- name: Compare commits for file changes
if: github.action == 'synchronize'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
echo "Base: ${{ github.event.before }}"
echo "Head: ${{ github.event.after }}"
TREE_BEFORE=$(git show -s --format=%T ${{ github.event.before }})
TREE_AFTER=$(git show -s --format=%T ${{ github.event.after }})
echo "TREE_BEFORE=$TREE_BEFORE" >> $GITHUB_ENV
echo "TREE_AFTER=$TREE_AFTER" >> $GITHUB_ENV
- name: Check if last push has file changes
run: |
if [[ "${{ env.TREE_BEFORE }}" == "${{ env.TREE_AFTER }}" ]]; then
echo "No file changes detected in the last push, only commit message edit."
echo "has_file_changes=false" >> $GITHUB_ENV
else
echo "File changes detected in the last push."
echo "has_file_changes=true" >> $GITHUB_ENV
fi
- name: Rule 1 - Check PR draft or conflict status
run: |
# Check if PR is in draft mode
IS_DRAFT="${{ github.event.pull_request.draft }}"
# Check if PR has 'conflict' label
HAS_CONFLICT_LABEL="false"
LABELS='${{ toJson(github.event.pull_request.labels) }}'
if echo "$LABELS" | jq -r '.[].name' | grep -q "^conflict$"; then
HAS_CONFLICT_LABEL="true"
fi
# Set draft_or_conflict variable
if [[ "$IS_DRAFT" == "true" || "$HAS_CONFLICT_LABEL" == "true" ]]; then
echo "draft_or_conflict=true" >> $GITHUB_ENV
echo "✅ Rule 1: PR is in draft mode or has conflict label - setting draft_or_conflict=true"
else
echo "draft_or_conflict=false" >> $GITHUB_ENV
echo "✅ Rule 1: PR is ready and has no conflict label - setting draft_or_conflict=false"
fi
echo "Draft status: $IS_DRAFT"
echo "Has conflict label: $HAS_CONFLICT_LABEL"
echo "Result: draft_or_conflict = $draft_or_conflict"
- name: Rule 2 - Check labels
run: |
# Check if PR has P0 or P1 labels
HAS_P0_P1_LABEL="false"
LABELS='${{ toJson(github.event.pull_request.labels) }}'
if echo "$LABELS" | jq -r '.[].name' | grep -E "^(P0|P1)$" > /dev/null; then
HAS_P0_P1_LABEL="true"
fi
# Check if PR already has force_on_cloud label
echo "HAS_FORCE_ON_CLOUD_LABEL=false" >> $GITHUB_ENV
if echo "$LABELS" | jq -r '.[].name' | grep -q "^force_on_cloud$"; then
HAS_FORCE_ON_CLOUD_LABEL="true"
echo "HAS_FORCE_ON_CLOUD_LABEL=true" >> $GITHUB_ENV
fi
echo "Has P0/P1 label: $HAS_P0_P1_LABEL"
echo "Has force_on_cloud label: $HAS_FORCE_ON_CLOUD_LABEL"
# Add force_on_cloud label if PR has P0/P1 and doesn't already have force_on_cloud
if [[ "$HAS_P0_P1_LABEL" == "true" && "$HAS_FORCE_ON_CLOUD_LABEL" == "false" ]]; then
echo "✅ Rule 2: PR has P0 or P1 label - adding force_on_cloud label"
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels" \
-d '{"labels":["force_on_cloud"]}'
elif [[ "$HAS_P0_P1_LABEL" == "true" && "$HAS_FORCE_ON_CLOUD_LABEL" == "true" ]]; then
echo "✅ Rule 2: PR has P0 or P1 label and already has force_on_cloud label - no action needed"
else
echo "✅ Rule 2: PR does not have P0 or P1 label - no force_on_cloud label needed"
fi
SKIP_UNIT_TEST_CUSTOM="false"
if echo "$LABELS" | jq -r '.[].name' | grep -q "^ci/skip_unit-tests_custom$"; then
SKIP_UNIT_TEST_CUSTOM="true"
fi
echo "SKIP_UNIT_TEST_CUSTOM=$SKIP_UNIT_TEST_CUSTOM" >> $GITHUB_ENV
- name: Rule 3 - Analyze changed files and set build requirements
run: |
# Get list of changed files
CHANGED_FILES=$(git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.event.pull_request.head.sha }})
echo "Changed files:"
echo "$CHANGED_FILES"
echo ""
# Initialize all requirements to false
REQUIRE_BUILD="false"
REQUIRE_DTEST="false"
REQUIRE_UNITTEST="false"
REQUIRE_ARTIFACTS="false"
REQUIRE_SCYLLA_GDB="false"
# Check each file against patterns
while IFS= read -r file; do
if [[ -n "$file" ]]; then
echo "Checking file: $file"
# Build pattern: ^(?!scripts\/pull_github_pr.sh).*$
# Everything except scripts/pull_github_pr.sh
if [[ "$file" != "scripts/pull_github_pr.sh" ]]; then
REQUIRE_BUILD="true"
echo " ✓ Matches build pattern"
fi
# Dtest pattern: ^(?!test(.py|\/)|dist\/docker\/|dist\/common\/scripts\/).*$
# Everything except test files, dist/docker/, dist/common/scripts/
if [[ ! "$file" =~ ^test\.(py|/).*$ ]] && [[ ! "$file" =~ ^dist/docker/.*$ ]] && [[ ! "$file" =~ ^dist/common/scripts/.*$ ]]; then
REQUIRE_DTEST="true"
echo " ✓ Matches dtest pattern"
fi
# Unittest pattern: ^(?!dist\/docker\/|dist\/common\/scripts).*$
# Everything except dist/docker/, dist/common/scripts/
if [[ ! "$file" =~ ^dist/docker/.*$ ]] && [[ ! "$file" =~ ^dist/common/scripts.*$ ]]; then
REQUIRE_UNITTEST="true"
echo " ✓ Matches unittest pattern"
fi
# Artifacts pattern: ^(?:dist|tools\/toolchain).*$
# Files starting with dist or tools/toolchain
if [[ "$file" =~ ^dist.*$ ]] || [[ "$file" =~ ^tools/toolchain.*$ ]]; then
REQUIRE_ARTIFACTS="true"
echo " ✓ Matches artifacts pattern"
fi
# Scylla GDB pattern: ^(scylla-gdb.py).*$
# Files starting with scylla-gdb.py
if [[ "$file" =~ ^scylla-gdb\.py.*$ ]]; then
REQUIRE_SCYLLA_GDB="true"
echo " ✓ Matches scylla_gdb pattern"
fi
fi
done <<< "$CHANGED_FILES"
# Set environment variables
echo "requireBuild=$REQUIRE_BUILD" >> $GITHUB_ENV
echo "requireDtest=$REQUIRE_DTEST" >> $GITHUB_ENV
echo "requireUnittest=$REQUIRE_UNITTEST" >> $GITHUB_ENV
echo "requireArtifacts=$REQUIRE_ARTIFACTS" >> $GITHUB_ENV
echo "requireScyllaGdb=$REQUIRE_SCYLLA_GDB" >> $GITHUB_ENV
echo ""
echo "✅ Rule 3: File analysis complete"
echo "Build required: $REQUIRE_BUILD"
echo "Dtest required: $REQUIRE_DTEST"
echo "Unittest required: $REQUIRE_UNITTEST"
echo "Artifacts required: $REQUIRE_ARTIFACTS"
echo "Scylla GDB required: $REQUIRE_SCYLLA_GDB"
- name: Determine Jenkins Job Name
run: |
if [[ "${{ github.ref_name }}" == "next" ]]; then
FOLDER_NAME="scylla-master"
elif [[ "${{ github.ref_name }}" == "next-enterprise" ]]; then
FOLDER_NAME="scylla-enterprise"
else
VERSION=$(echo "${{ github.ref_name }}" | awk -F'-' '{print $2}')
if [[ "$VERSION" =~ ^202[0-4]\.[0-9]+$ ]]; then
FOLDER_NAME="enterprise-$VERSION"
elif [[ "$VERSION" =~ ^[0-9]+\.[0-9]+$ ]]; then
FOLDER_NAME="scylla-$VERSION"
fi
fi
echo "JOB_NAME=${FOLDER_NAME}/job/scylla-ci" >> $GITHUB_ENV
- name: Trigger Jenkins Job
if: env.draft_or_conflict == 'false' && env.has_file_changes == 'true' && github.action == 'opened' || github.action == 'reopened'
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
echo "Triggering Jenkins Job: $JOB_NAME"
curl -X POST \
"$JENKINS_URL/job/$JOB_NAME/buildWithParameters? \
PR_NUMBER=$PR_NUMBER& \
RUN_DTEST=$REQUIRE_DTEST& \
RUN_ONLY_SCYLLA_GDB=$REQUIRE_SCYLLA_GDB& \
RUN_UNIT_TEST=$REQUIRE_UNITTEST& \
FORCE_ON_CLOUD=$HAS_FORCE_ON_CLOUD_LABEL& \
SKIP_UNIT_TEST_CUSTOM=$SKIP_UNIT_TEST_CUSTOM& \
RUN_ARTIFACT_TESTS=$REQUIRE_ARTIFACTS" \
--fail \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" \
-i -v
trigger-ci-via-comment:
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI Jenkins Job
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/$JOB_NAME/buildWithParameters?PR_NUMBER=$PR_NUMBER" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

View File

@@ -109,20 +109,6 @@ extern const sstring TTL_TAG_KEY("system:ttl_attribute");
// following ones are base table's keys added as needed or range key list will be empty.
static const sstring SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY("system:spurious_range_key_added_to_gsi_and_user_didnt_specify_range_key");
// The following tags also have the "system:" prefix but are NOT used
// by Alternator to store table properties - only the user ever writes to
// them, as a way to configure the table. As such, these tags are writable
// (and readable) by the user, and not hidden by tag_key_is_internal().
// The reason why both hidden (internal) and user-configurable tags share the
// same "system:" prefix is historic.
// Setting the tag with a numeric value will enable a specific initial number
// of tablets (setting the value to 0 means enabling tablets with
// an automatic selection of the best number of tablets).
// Setting this tag to any non-numeric value (e.g., an empty string or the
// word "none") will ask to disable tablets.
static constexpr auto INITIAL_TABLETS_TAG_KEY = "system:initial_tablets";
enum class table_status {
active = 0,
@@ -145,8 +131,7 @@ static std::string_view table_status_to_sstring(table_status tbl_status) {
return "UNKNOWN";
}
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type,
const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat, const db::tablets_mode_t::mode tablets_mode);
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type, const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat);
static map_type attrs_type() {
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
@@ -1246,13 +1231,12 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
// Alternator uses tags whose keys start with the "system:" prefix for
// internal purposes. Those should not be readable by ListTagsOfResource,
// nor writable with TagResource or UntagResource (see #24098).
// Only a few specific system tags, currently only "system:write_isolation"
// and "system:initial_tablets", are deliberately intended to be set and read
// by the user, so are not considered "internal".
// Only a few specific system tags, currently only system:write_isolation,
// are deliberately intended to be set and read by the user, so are not
// considered "internal".
static bool tag_key_is_internal(std::string_view tag_key) {
return tag_key.starts_with("system:")
&& tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY
&& tag_key != INITIAL_TABLETS_TAG_KEY;
return tag_key.starts_with("system:") &&
tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY;
}
enum class update_tags_action { add_tags, delete_tags };
@@ -1557,8 +1541,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
}
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
@@ -1785,7 +1768,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
utils::chunked_vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features());
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
if (stream_specification && stream_specification->IsObject()) {
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
@@ -1795,7 +1778,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
if (rs->uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
"If you want to use streams, create a table with vnodes by setting the tag 'experimental:initial_tablets' set to 'none'.");
}
}
}
@@ -1867,8 +1850,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats, std::move(tablets_mode));
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats);
});
}
@@ -1952,7 +1934,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
if (stream_enabled->GetBool()) {
if (p.local().local_db().find_keyspace(tab->ks_name()).get_replication_strategy().uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
"If you want to enable streams, re-create this table with vnodes (with the tag 'system:initial_tablets' set to 'none').");
"If you want to enable streams, re-create this table with vnodes (with the tag 'experimental:initial_tablets' set to 'none').");
}
if (tab->cdc_options().enabled()) {
co_return api_error::validation("Table already has an enabled stream: TableName: " + tab->cf_name());
@@ -2728,6 +2710,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
if (!cas_shard) {
on_internal_error(elogger, "cas_shard is not set");
}
// If we're still here, we need to do this write using LWT:
global_stats.write_using_lwt++;
per_table_stats.write_using_lwt++;
@@ -5972,20 +5955,22 @@ future<executor::request_return_type> executor::describe_continuous_backups(clie
// of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3.
// A smaller cluster (presumably, a test only), gets RF=1. The user may
// manually create the keyspace to override this predefined behavior.
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type ts,
const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat, const db::tablets_mode_t::mode tablets_mode) {
// Whether to use tablets for the table (actually for the keyspace of the
// table) is determined by tablets_mode (taken from the configuration
// option "tablets_mode_for_new_keyspaces"), as well as the presence and
// the value of a per-table tag system:initial_tablets
// (INITIAL_TABLETS_TAG_KEY).
// Setting the tag with a numeric value will enable a specific initial number
// of tablets (setting the value to 0 means enabling tablets with
// an automatic selection of the best number of tablets).
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type ts, const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat) {
// Even if the "tablets" experimental feature is available, we currently
// do not enable tablets by default on Alternator tables because LWT is
// not yet fully supported with tablets.
// The user can override the choice of whether or not to use tablets at
// table-creation time by supplying the following tag with a numeric value
// (setting the value to 0 means enabling tablets with automatic selection
// of the best number of tablets).
// Setting this tag to any non-numeric value (e.g., an empty string or the
// word "none") will ask to disable tablets.
// When vnodes are asked for by the tag value, but tablets are enforced by config,
// throw an exception to the client.
// If we make this tag a permanent feature, it will get a "system:" prefix -
// until then we give it the "experimental:" prefix to not commit to it.
static constexpr auto INITIAL_TABLETS_TAG_KEY = "experimental:initial_tablets";
// initial_tablets currently defaults to unset, so tablets will not be
// used by default on new Alternator tables. Change this initialization
// to 0 enable tablets by default, with automatic number of tablets.
std::optional<unsigned> initial_tablets;
if (feat.tablets) {
auto it = tags_map.find(INITIAL_TABLETS_TAG_KEY);
@@ -5996,20 +5981,7 @@ static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_vie
try {
initial_tablets = std::stol(tags_map.at(INITIAL_TABLETS_TAG_KEY));
} catch (...) {
if (tablets_mode == db::tablets_mode_t::mode::enforced) {
throw api_error::validation(format("Tag {} containing non-numerical value requests vnodes, but vnodes are forbidden by configuration option `tablets_mode_for_new_keyspaces: enforced`", INITIAL_TABLETS_TAG_KEY));
}
initial_tablets = std::nullopt;
elogger.trace("Following {} tag containing non-numerical value, Alternator will attempt to create a keyspace {} with vnodes.", INITIAL_TABLETS_TAG_KEY, keyspace_name);
}
} else {
// No per-table tag present, use the value from config
if (tablets_mode == db::tablets_mode_t::mode::enabled || tablets_mode == db::tablets_mode_t::mode::enforced) {
initial_tablets = 0;
elogger.trace("Following the `tablets_mode_for_new_keyspaces` flag from the settings, Alternator will attempt to create a keyspace {} with tablets.", keyspace_name);
} else {
initial_tablets = std::nullopt;
elogger.trace("Following the `tablets_mode_for_new_keyspaces` flag from the settings, Alternator will attempt to create a keyspace {} with vnodes.", keyspace_name);
}
}
}

View File

@@ -1073,7 +1073,9 @@ bool executor::add_stream_options(const rjson::value& stream_specification, sche
}
if (stream_enabled->GetBool()) {
if (!sp.features().alternator_streams) {
auto db = sp.data_dictionary();
if (!db.features().alternator_streams) {
throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster.");
}

View File

@@ -68,7 +68,7 @@ extern const sstring TTL_TAG_KEY;
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.update_time_to_live++;
if (!_proxy.features().alternator_ttl) {
if (!_proxy.data_dictionary().features().alternator_ttl) {
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
}
@@ -753,7 +753,7 @@ static future<bool> scan_table(
auto my_host_id = erm->get_topology().my_host_id();
const auto &tablet_map = erm->get_token_metadata().tablets().get_tablet_map(s->id());
for (std::optional tablet = tablet_map.first_tablet(); tablet; tablet = tablet_map.next_tablet(*tablet)) {
auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet, erm->get_topology());
auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet);
// check if this is the primary replica for the current tablet
if (tablet_primary_replica.host == my_host_id && tablet_primary_replica.shard == this_shard_id()) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);

View File

@@ -961,14 +961,6 @@
"type":"string",
"paramType":"query",
"enum": ["all", "dc", "rack", "node"]
},
{
"name":"primary_replica_only",
"description":"Load the sstables and stream to the primary replica node within the scope, if one is specified. If not, stream to the global primary replica.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
@@ -1055,7 +1047,7 @@
]
},
{
"path":"/storage_service/cleanup_all/",
"path":"/storage_service/cleanup_all",
"operations":[
{
"method":"POST",
@@ -1065,30 +1057,6 @@
"produces":[
"application/json"
],
"parameters":[
{
"name":"global",
"description":"true if cleanup of entire cluster is requested",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/mark_node_as_clean",
"operations":[
{
"method":"POST",
"summary":"Mark the node as clean. After that the node will not be considered as needing cleanup during automatic cleanup which is triggered by some topology operations",
"type":"void",
"nickname":"reset_cleanup_needed",
"produces":[
"application/json"
],
"parameters":[]
}
]

View File

@@ -20,7 +20,6 @@
#include "utils/hash.hh"
#include <optional>
#include <sstream>
#include <stdexcept>
#include <time.h>
#include <algorithm>
#include <functional>
@@ -505,7 +504,6 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto bucket = req->get_query_param("bucket");
auto prefix = req->get_query_param("prefix");
auto scope = parse_stream_scope(req->get_query_param("scope"));
auto primary_replica_only = validate_bool_x(req->get_query_param("primary_replica_only"), false);
rjson::chunked_content content = co_await util::read_entire_stream(*req->content_stream);
rjson::value parsed = rjson::parse(std::move(content));
@@ -515,7 +513,7 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto sstables = parsed.GetArray() |
std::views::transform([] (const auto& s) { return sstring(rjson::to_string_view(s)); }) |
std::ranges::to<std::vector>();
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket, scope, primary_replica_only);
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket, scope);
co_return json::json_return_type(fmt::to_string(task_id));
});
@@ -765,14 +763,8 @@ rest_cdc_streams_check_and_repair(sharded<service::storage_service>& ss, std::un
static
future<json::json_return_type>
rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
bool global = true;
if (auto global_param = req->get_query_param("global"); !global_param.empty()) {
global = validate_bool(global_param);
}
apilog.info("cleanup_all global={}", global);
auto done = !global ? false : co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
apilog.info("cleanup_all");
auto done = co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
if (!ss.is_topology_coordinator_enabled()) {
co_return false;
}
@@ -782,35 +774,14 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
if (done) {
co_return json::json_return_type(0);
}
// fall back to the local cleanup if topology coordinator is not enabled or local cleanup is requested
// fall back to the local global cleanup if topology coordinator is not enabled
auto& db = ctx.db;
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::global_cleanup_compaction_task_impl>({}, db);
co_await task->done();
// Mark this node as clean
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
if (ss.is_topology_coordinator_enabled()) {
co_await ss.reset_cleanup_needed();
}
});
co_return json::json_return_type(0);
}
static
future<json::json_return_type>
rest_reset_cleanup_needed(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("reset_cleanup_needed");
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
if (!ss.is_topology_coordinator_enabled()) {
throw std::runtime_error("mark_node_as_clean is only supported when topology over raft is enabled");
}
return ss.reset_cleanup_needed();
});
co_return json_void();
}
static
future<json::json_return_type>
rest_force_flush(http_context& ctx, std::unique_ptr<http::request> req) {
@@ -1812,7 +1783,6 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::get_natural_endpoints_v2.set(r, rest_bind(rest_get_natural_endpoints_v2, ctx, ss));
ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss));
ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss));
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
ss::decommission.set(r, rest_bind(rest_decommission, ss));
@@ -1891,7 +1861,6 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::get_natural_endpoints.unset(r);
ss::cdc_streams_check_and_repair.unset(r);
ss::cleanup_all.unset(r);
ss::reset_cleanup_needed.unset(r);
ss::force_flush.unset(r);
ss::force_keyspace_flush.unset(r);
ss::decommission.unset(r);

View File

@@ -5,7 +5,6 @@ target_sources(scylla_audit
PRIVATE
audit.cc
audit_cf_storage_helper.cc
audit_composite_storage_helper.cc
audit_syslog_storage_helper.cc)
target_include_directories(scylla_audit
PUBLIC

View File

@@ -13,11 +13,9 @@
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "storage_helper.hh"
#include "audit_cf_storage_helper.hh"
#include "audit_syslog_storage_helper.hh"
#include "audit_composite_storage_helper.hh"
#include "audit.hh"
#include "../db/config.hh"
#include "utils/class_registrator.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
@@ -28,47 +26,6 @@ namespace audit {
logging::logger logger("audit");
static std::set<sstring> parse_audit_modes(const sstring& data) {
std::set<sstring> result;
if (!data.empty()) {
std::vector<sstring> audit_modes;
boost::split(audit_modes, data, boost::is_any_of(","));
if (audit_modes.empty()) {
return {};
}
for (sstring& audit_mode : audit_modes) {
boost::trim(audit_mode);
if (audit_mode == "none") {
return {};
}
if (audit_mode != "table" && audit_mode != "syslog") {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", audit_mode));
}
result.insert(std::move(audit_mode));
}
}
return result;
}
static std::unique_ptr<storage_helper> create_storage_helper(const std::set<sstring>& audit_modes, cql3::query_processor& qp, service::migration_manager& mm) {
SCYLLA_ASSERT(!audit_modes.empty() && !audit_modes.contains("none"));
std::vector<std::unique_ptr<storage_helper>> helpers;
for (const sstring& audit_mode : audit_modes) {
if (audit_mode == "table") {
helpers.emplace_back(std::make_unique<audit_cf_storage_helper>(qp, mm));
} else if (audit_mode == "syslog") {
helpers.emplace_back(std::make_unique<audit_syslog_storage_helper>(qp, mm));
}
}
SCYLLA_ASSERT(!helpers.empty());
if (helpers.size() == 1) {
return std::move(helpers.front());
}
return std::make_unique<audit_composite_storage_helper>(std::move(helpers));
}
static sstring category_to_string(statement_category category)
{
switch (category) {
@@ -146,9 +103,7 @@ static std::set<sstring> parse_audit_keyspaces(const sstring& data) {
}
audit::audit(locator::shared_token_metadata& token_metadata,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
sstring&& storage_helper_name,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
@@ -157,21 +112,28 @@ audit::audit(locator::shared_token_metadata& token_metadata,
, _audited_keyspaces(std::move(audited_keyspaces))
, _audited_tables(std::move(audited_tables))
, _audited_categories(std::move(audited_categories))
, _storage_helper_class_name(std::move(storage_helper_name))
, _cfg(cfg)
, _cfg_keyspaces_observer(cfg.audit_keyspaces.observe([this] (sstring const& new_value){ update_config<std::set<sstring>>(new_value, parse_audit_keyspaces, _audited_keyspaces); }))
, _cfg_tables_observer(cfg.audit_tables.observe([this] (sstring const& new_value){ update_config<std::map<sstring, std::set<sstring>>>(new_value, parse_audit_tables, _audited_tables); }))
, _cfg_categories_observer(cfg.audit_categories.observe([this] (sstring const& new_value){ update_config<category_set>(new_value, parse_audit_categories, _audited_categories); }))
{
_storage_helper_ptr = create_storage_helper(std::move(audit_modes), qp, mm);
}
{ }
audit::~audit() = default;
future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
std::set<sstring> audit_modes = parse_audit_modes(cfg.audit());
if (audit_modes.empty()) {
future<> audit::create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm) {
sstring storage_helper_name;
if (cfg.audit() == "table") {
storage_helper_name = "audit_cf_storage_helper";
} else if (cfg.audit() == "syslog") {
storage_helper_name = "audit_syslog_storage_helper";
} else if (cfg.audit() == "none") {
// Audit is off
logger.info("Audit is disabled");
return make_ready_future<>();
} else {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", cfg.audit()));
}
category_set audited_categories = parse_audit_categories(cfg.audit_categories());
std::map<sstring, std::set<sstring>> audited_tables = parse_audit_tables(cfg.audit_tables());
@@ -181,20 +143,19 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables());
return audit_instance().start(std::ref(stm),
std::ref(qp),
std::ref(mm),
std::move(audit_modes),
std::move(storage_helper_name),
std::move(audited_keyspaces),
std::move(audited_tables),
std::move(audited_categories),
std::cref(cfg))
.then([&cfg] {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit.start(cfg);
});
std::cref(cfg));
}
future<> audit::start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg, &qp, &mm] (audit& local_audit) {
return local_audit.start(cfg, qp.local(), mm.local());
});
}
@@ -220,7 +181,15 @@ audit_info_ptr audit::create_no_audit_info() {
return audit_info_ptr();
}
future<> audit::start(const db::config& cfg) {
future<> audit::start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm) {
try {
_storage_helper_ptr = create_object<storage_helper>(_storage_helper_class_name, qp, mm);
} catch (no_such_class& e) {
logger.error("Can't create audit storage helper {}: not supported", _storage_helper_class_name);
throw;
} catch (...) {
throw;
}
return _storage_helper_ptr->start(cfg);
}

View File

@@ -102,6 +102,7 @@ class audit final : public seastar::async_sharded_service<audit> {
std::map<sstring, std::set<sstring>> _audited_tables;
category_set _audited_categories;
sstring _storage_helper_class_name;
std::unique_ptr<storage_helper> _storage_helper_ptr;
const db::config& _cfg;
@@ -124,20 +125,18 @@ public:
static audit& local_audit_instance() {
return audit_instance().local();
}
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm);
static future<> start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table);
static audit_info_ptr create_no_audit_info();
audit(locator::shared_token_metadata& stm,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
audit(locator::shared_token_metadata& stm, sstring&& storage_helper_name,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
const db::config& cfg);
~audit();
future<> start(const db::config& cfg);
future<> start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm);
future<> stop();
future<> shutdown();
bool should_log(const audit_info* audit_info) const;

View File

@@ -11,6 +11,7 @@
#include "cql3/query_processor.hh"
#include "data_dictionary/keyspace_metadata.hh"
#include "utils/UUID_gen.hh"
#include "utils/class_registrator.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "service/migration_manager.hh"
@@ -197,4 +198,7 @@ cql3::query_options audit_cf_storage_helper::make_login_data(socket_address node
return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
using registry = class_registrator<storage_helper, audit_cf_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_cf_storage_helper");
}

View File

@@ -1,68 +0,0 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/loop.hh>
#include <seastar/core/future-util.hh>
#include "audit/audit_composite_storage_helper.hh"
#include "utils/class_registrator.hh"
namespace audit {
audit_composite_storage_helper::audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&& storage_helpers)
: _storage_helpers(std::move(storage_helpers))
{}
future<> audit_composite_storage_helper::start(const db::config& cfg) {
auto res = seastar::parallel_for_each(
_storage_helpers,
[&cfg] (std::unique_ptr<storage_helper>& h) {
return h->start(cfg);
}
);
return res;
}
future<> audit_composite_storage_helper::stop() {
auto res = seastar::parallel_for_each(
_storage_helpers,
[] (std::unique_ptr<storage_helper>& h) {
return h->stop();
}
);
return res;
}
future<> audit_composite_storage_helper::write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[audit_info, node_ip, client_ip, cl, &username, error](std::unique_ptr<storage_helper>& h) {
return h->write(audit_info, node_ip, client_ip, cl, username, error);
}
);
}
future<> audit_composite_storage_helper::write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[&username, node_ip, client_ip, error](std::unique_ptr<storage_helper>& h) {
return h->write_login(username, node_ip, client_ip, error);
}
);
}
} // namespace audit

View File

@@ -1,37 +0,0 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "audit/audit.hh"
#include <seastar/core/future.hh>
#include "storage_helper.hh"
namespace audit {
class audit_composite_storage_helper : public storage_helper {
std::vector<std::unique_ptr<storage_helper>> _storage_helpers;
public:
explicit audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&&);
virtual ~audit_composite_storage_helper() = default;
virtual future<> start(const db::config& cfg) override;
virtual future<> stop() override;
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) override;
virtual future<> write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) override;
};
} // namespace audit

View File

@@ -21,6 +21,7 @@
#include <fmt/chrono.h>
#include "cql3/query_processor.hh"
#include "utils/class_registrator.hh"
namespace cql3 {
@@ -142,4 +143,7 @@ future<> audit_syslog_storage_helper::write_login(const sstring& username,
co_await syslog_send_helper(msg.c_str());
}
using registry = class_registrator<storage_helper, audit_syslog_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_syslog_storage_helper");
}

View File

@@ -68,15 +68,10 @@ shared_ptr<locator::abstract_replication_strategy> generate_replication_strategy
return locator::abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), params, topo);
}
// When dropping a column from a CDC log table, we set the drop timestamp
// `column_drop_leeway` seconds into the future to ensure that for writes concurrent
// with column drop, the write timestamp is before the column drop timestamp.
constexpr auto column_drop_leeway = std::chrono::seconds(5);
} // anonymous namespace
namespace cdc {
static schema_ptr create_log_schema(const schema&, const replica::database&, const keyspace_metadata&, api::timestamp_type,
static schema_ptr create_log_schema(const schema&, const replica::database&, const keyspace_metadata&,
std::optional<table_id> = {}, schema_ptr = nullptr);
}
@@ -188,7 +183,7 @@ public:
muts.emplace_back(std::move(mut));
}
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms, api::timestamp_type ts) override {
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms) override {
std::vector<schema_ptr> new_cfms;
for (auto sp : cfms) {
@@ -207,7 +202,7 @@ public:
}
// in seastar thread
auto log_schema = create_log_schema(schema, db, ksm, ts);
auto log_schema = create_log_schema(schema, db, ksm);
new_cfms.push_back(std::move(log_schema));
}
@@ -254,7 +249,7 @@ public:
}
std::optional<table_id> maybe_id = log_schema ? std::make_optional(log_schema->id()) : std::nullopt;
auto new_log_schema = create_log_schema(new_schema, db, *keyspace.metadata(), timestamp, std::move(maybe_id), log_schema);
auto new_log_schema = create_log_schema(new_schema, db, *keyspace.metadata(), std::move(maybe_id), log_schema);
auto log_mut = log_schema
? db::schema_tables::make_update_table_mutations(_ctxt._proxy, keyspace.metadata(), log_schema, new_log_schema, timestamp)
@@ -587,7 +582,7 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
}
static schema_ptr create_log_schema(const schema& s, const replica::database& db,
const keyspace_metadata& ksm, api::timestamp_type timestamp, std::optional<table_id> uuid, schema_ptr old)
const keyspace_metadata& ksm, std::optional<table_id> uuid, schema_ptr old)
{
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner(cdc::cdc_partitioner::classname);
@@ -623,28 +618,6 @@ static schema_ptr create_log_schema(const schema& s, const replica::database& db
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
b.set_caching_options(caching_options::get_disabled_caching_options());
auto validate_new_column = [&] (const sstring& name) {
// When dropping a column from a CDC log table, we set the drop timestamp to be
// `column_drop_leeway` seconds into the future (see `create_log_schema`).
// Therefore, when recreating a column with the same name, we need to validate
// that it's not recreated too soon and that the drop timestamp has passed.
if (old && old->dropped_columns().contains(name)) {
const auto& drop_info = old->dropped_columns().at(name);
auto create_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(timestamp));
auto drop_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(drop_info.timestamp));
if (drop_time > create_time) {
throw exceptions::invalid_request_exception(format("Cannot add column {} because a column with the same name was dropped too recently. Please retry after {} seconds",
name, std::chrono::duration_cast<std::chrono::seconds>(drop_time - create_time).count() + 1));
}
}
};
auto add_column = [&] (sstring name, data_type type) {
validate_new_column(name);
b.with_column(to_bytes(name), type);
};
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
for (const auto& column : columns) {
auto type = column.type;
@@ -666,9 +639,9 @@ static schema_ptr create_log_schema(const schema& s, const replica::database& db
}
));
}
add_column(log_data_column_name(column.name_as_text()), type);
b.with_column(log_data_column_name_bytes(column.name()), type);
if (is_data_col) {
add_column(log_data_column_deleted_name(column.name_as_text()), boolean_type);
b.with_column(log_data_column_deleted_name_bytes(column.name()), boolean_type);
}
if (column.type->is_multi_cell()) {
auto dtype = visit(*type, make_visitor(
@@ -684,7 +657,7 @@ static schema_ptr create_log_schema(const schema& s, const replica::database& db
throw std::invalid_argument("Should not reach");
}
));
add_column(log_data_column_deleted_elements_name(column.name_as_text()), dtype);
b.with_column(log_data_column_deleted_elements_name_bytes(column.name()), dtype);
}
}
};
@@ -710,8 +683,7 @@ static schema_ptr create_log_schema(const schema& s, const replica::database& db
// not super efficient, but we don't do this often.
for (auto& col : old->all_columns()) {
if (!b.has_column({col.name(), col.name_as_text() })) {
auto drop_ts = api::timestamp_clock::now() + column_drop_leeway;
b.without_column(col.name_as_text(), col.type, drop_ts.time_since_epoch().count());
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
}
}
}
@@ -1618,7 +1590,7 @@ public:
: _ctx(ctx)
, _schema(std::move(s))
, _dk(std::move(dk))
, _log_schema(_schema->cdc_schema() ? _schema->cdc_schema() : ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
, _log_schema(ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
, _options(options)
, _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema))
, _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets())

View File

@@ -855,7 +855,7 @@ maintenance_socket: ignore
# enable_create_table_with_compact_storage: false
# Control tablets for new keyspaces.
# Can be set to: disabled|enabled|enforced
# Can be set to: disabled|enabled
#
# When enabled, newly created keyspaces will have tablets enabled by default.
# That can be explicitly disabled in the CREATE KEYSPACE query

View File

@@ -642,8 +642,7 @@ raft_tests = set([
vector_search_tests = set([
'test/vector_search/vector_store_client_test',
'test/vector_search/load_balancer_test',
'test/vector_search/client_test'
'test/vector_search/load_balancer_test'
])
wasms = set([
@@ -1196,7 +1195,6 @@ scylla_core = (['message/messaging_service.cc',
'table_helper.cc',
'audit/audit.cc',
'audit/audit_cf_storage_helper.cc',
'audit/audit_composite_storage_helper.cc',
'audit/audit_syslog_storage_helper.cc',
'tombstone_gc_options.cc',
'tombstone_gc.cc',
@@ -1267,8 +1265,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/disk_space_monitor.cc',
'vector_search/vector_store_client.cc',
'vector_search/dns.cc',
'vector_search/client.cc',
'vector_search/clients.cc'
] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core
)
@@ -1412,8 +1408,6 @@ scylla_tests_dependencies = scylla_core + alternator + idls + scylla_tests_gener
'test/lib/key_utils.cc',
'test/lib/proc_utils.cc',
'test/lib/gcs_fixture.cc',
'test/lib/aws_kms_fixture.cc',
'test/lib/azure_kms_fixture.cc',
]
scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc', 'utils/exceptions.cc']
@@ -1666,7 +1660,6 @@ deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vector_store_client_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
wasm_deps = {}

View File

@@ -1349,7 +1349,7 @@ static managed_bytes reserialize_value(View value_bytes,
if (type.is_map()) {
std::vector<std::pair<managed_bytes, managed_bytes>> elements = partially_deserialize_map(value_bytes);
const map_type_impl& mapt = dynamic_cast<const map_type_impl&>(type);
const map_type_impl mapt = dynamic_cast<const map_type_impl&>(type);
const abstract_type& key_type = mapt.get_keys_type()->without_reversed();
const abstract_type& value_type = mapt.get_values_type()->without_reversed();
@@ -1391,7 +1391,7 @@ static managed_bytes reserialize_value(View value_bytes,
const vector_type_impl& vtype = dynamic_cast<const vector_type_impl&>(type);
std::vector<managed_bytes> elements = vtype.split_fragmented(value_bytes);
const auto& elements_type = vtype.get_elements_type()->without_reversed();
auto elements_type = vtype.get_elements_type()->without_reversed();
if (elements_type.bound_value_needs_to_be_reserialized()) {
for (size_t i = 0; i < elements.size(); i++) {

View File

@@ -2016,9 +2016,7 @@ vector_indexed_table_select_statement::vector_indexed_table_select_statement(sch
future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::do_execute(
query_processor& qp, service::query_state& state, const query_options& options) const {
auto limit = get_limit(options, _limit);
auto result = co_await measure_index_latency(*_schema, _index, [this, &qp, &state, &options, &limit](this auto) -> future<shared_ptr<cql_transport::messages::result_message>> {
return measure_index_latency(*_schema, _index, [this, &qp, &state, &options](this auto) -> future<shared_ptr<cql_transport::messages::result_message>> {
tracing::add_table_name(state.get_trace_state(), keyspace(), column_family());
validate_for_read(options.get_consistency());
@@ -2026,6 +2024,8 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
update_stats();
auto limit = get_limit(options, _limit);
if (limit > max_ann_query_limit) {
co_await coroutine::return_exception(exceptions::invalid_request_exception(
fmt::format("Use of ANN OF in an ORDER BY clause requires a LIMIT that is not greater than {}. LIMIT was {}", max_ann_query_limit, limit)));
@@ -2040,12 +2040,6 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
co_return co_await query_base_table(qp, state, options, pkeys.value());
});
auto page_size = options.get_page_size();
if (page_size > 0 && (uint64_t) page_size < limit) {
result->add_warning("Paging is not supported for Vector Search queries. The entire result set has been returned.");
}
co_return result;
}
void vector_indexed_table_select_statement::update_stats() const {

View File

@@ -77,11 +77,9 @@ future<db::all_batches_replayed> db::batchlog_manager::do_batch_log_replay(post_
});
});
}
if (all_replayed == all_batches_replayed::yes) {
co_await bm.container().invoke_on_all([last_replay] (auto& bm) {
bm._last_replay = last_replay;
});
}
co_await bm.container().invoke_on_all([last_replay] (auto& bm) {
bm._last_replay = last_replay;
});
blogger.debug("Batchlog replay on shard {}: done", dest);
co_return all_replayed;
});
@@ -190,7 +188,6 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
all_replayed = all_batches_replayed::no;
co_return stop_iteration::no;
}

View File

@@ -1526,9 +1526,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, error_injections_at_startup(this, "error_injections_at_startup", error_injection_value_status, {}, "List of error injections that should be enabled on startup.")
, topology_barrier_stall_detector_threshold_seconds(this, "topology_barrier_stall_detector_threshold_seconds", value_status::Used, 2, "Report sites blocking topology barrier if it takes longer than this.")
, enable_tablets(this, "enable_tablets", value_status::Used, false, "Enable tablets for newly created keyspaces. (deprecated)")
, tablets_mode_for_new_keyspaces(this, "tablets_mode_for_new_keyspaces", liveness::LiveUpdate, value_status::Used, tablets_mode_t::mode::unset, "Control tablets for new keyspaces. Can be set to the following values:\n"
, tablets_mode_for_new_keyspaces(this, "tablets_mode_for_new_keyspaces", value_status::Used, tablets_mode_t::mode::unset, "Control tablets for new keyspaces. Can be set to the following values:\n"
"\tdisabled: New keyspaces use vnodes by default, unless enabled by the tablets={'enabled':true} option\n"
"\tenabled: New keyspaces use tablets by default, unless disabled by the tablets={'enabled':false} option\n"
"\tenabled: New keyspaces use tablets by default, unless disabled by the tablets={'disabled':true} option\n"
"\tenforced: New keyspaces must use tablets. Tablets cannot be disabled using the CREATE KEYSPACE option")
, view_flow_control_delay_limit_in_ms(this, "view_flow_control_delay_limit_in_ms", liveness::LiveUpdate, value_status::Used, 1000,
"The maximal amount of time that materialized-view update flow control may delay responses "

View File

@@ -30,8 +30,6 @@
#include "mutation/frozen_mutation.hh"
#include "schema/schema_fwd.hh"
#include "utils/assert.hh"
#include "cdc/log.hh"
#include "cdc/cdc_partitioner.hh"
#include "view_info.hh"
#include "replica/database.hh"
#include "lang/manager.hh"
@@ -594,48 +592,9 @@ future<> schema_applier::merge_tables_and_views()
// diffs bound to current shard
auto& local_views = _affected_tables_and_views.tables_and_views.local().views;
auto& local_tables = _affected_tables_and_views.tables_and_views.local().tables;
auto& local_cdc = _affected_tables_and_views.tables_and_views.local().cdc;
// Create CDC tables before non-CDC base tables, because we want the base tables with CDC enabled
// to point to their CDC tables.
local_cdc = diff_table_or_view(_proxy, _before.cdc, _after.cdc, _reload, [&] (schema_mutations sm, schema_diff_side) {
return create_table_from_mutations(_proxy, std::move(sm), user_types, nullptr);
});
local_tables = diff_table_or_view(_proxy, _before.tables, _after.tables, _reload, [&] (schema_mutations sm, schema_diff_side side) {
// If the table has CDC enabled, find the CDC schema version and set it in the table schema.
// If the table is created or altered with CDC enabled, then the CDC
// table is also created or altered in the same operation, so we can
// find its schema version in the CDC schemas we created above in
// local_cdc.
query::result_set rs(sm.columnfamilies_mutation());
const query::result_set_row& table_row = rs.row(0);
auto ks_name = table_row.get_nonnull<sstring>("keyspace_name");
auto cf_name = table_row.get_nonnull<sstring>("table_name");
auto cdc_name = cdc::log_name(cf_name);
schema_ptr cdc_schema; // optional CDC schema of this table
// we only need to set the cdc schema for created schemas and new altered schemas.
// old altered schemas that we create here will not be used for generating cdc mutations.
if (side == schema_diff_side::right) {
for (const auto& cdc_created : local_cdc.created) {
const auto& new_cdc_schema = cdc_created;
if (new_cdc_schema->ks_name() == ks_name && new_cdc_schema->cf_name() == cdc_name) {
cdc_schema = new_cdc_schema;
break;
}
}
for (const auto& cdc_altered : local_cdc.altered) {
const auto& new_cdc_schema = cdc_altered.new_schema;
if (new_cdc_schema->ks_name() == ks_name && new_cdc_schema->cf_name() == cdc_name) {
cdc_schema = new_cdc_schema;
break;
}
}
}
return create_table_from_mutations(_proxy, std::move(sm), user_types, cdc_schema);
local_tables = diff_table_or_view(_proxy, _before.tables, _after.tables, _reload, [&] (schema_mutations sm, schema_diff_side) {
return create_table_from_mutations(_proxy, std::move(sm), user_types);
});
local_views = diff_table_or_view(_proxy, _before.views, _after.views, _reload, [&] (schema_mutations sm, schema_diff_side side) {
// The view schema mutation should be created with reference to the base table schema because we definitely know it by now.
@@ -682,14 +641,11 @@ future<> schema_applier::merge_tables_and_views()
// create schema_ptrs for all shards
frozen_schema_diff tables_frozen = co_await local_tables.freeze();
frozen_schema_diff cdc_frozen = co_await local_cdc.freeze();
frozen_schema_diff views_frozen = co_await local_views.freeze();
co_await _affected_tables_and_views.tables_and_views.invoke_on_others([this, &tables_frozen, &cdc_frozen, &views_frozen] (affected_tables_and_views_per_shard& tables_and_views) -> future<> {
co_await _affected_tables_and_views.tables_and_views.invoke_on_others([this, &tables_frozen, &views_frozen] (affected_tables_and_views_per_shard& tables_and_views) -> future<> {
auto& db = _proxy.local().get_db().local();
tables_and_views.tables = co_await schema_diff_per_shard::copy_from(
db, _types_storage, tables_frozen);
tables_and_views.cdc = co_await schema_diff_per_shard::copy_from(
db, _types_storage, cdc_frozen);
tables_and_views.views = co_await schema_diff_per_shard::copy_from(
db, _types_storage, views_frozen);
});
@@ -705,28 +661,23 @@ future<> schema_applier::merge_tables_and_views()
_affected_tables_and_views.table_shards.insert({uuid,
co_await replica::database::prepare_drop_table_on_all_shards(db, uuid)});
});
co_await max_concurrent_for_each(local_cdc.dropped, max_concurrent, [&db, this] (schema_ptr& dt) -> future<> {
auto uuid = dt->id();
_affected_tables_and_views.table_shards.insert({uuid,
co_await replica::database::prepare_drop_table_on_all_shards(db, uuid)});
});
}
future<frozen_schema_diff> schema_diff_per_shard::freeze() const {
frozen_schema_diff result;
for (const auto& c : created) {
result.created.emplace_back(extended_frozen_schema(c));
result.created.emplace_back(frozen_schema_with_base_info(c));
co_await coroutine::maybe_yield();
}
for (const auto& a : altered) {
result.altered.push_back(frozen_schema_diff::altered_schema{
.old_schema = extended_frozen_schema(a.old_schema),
.new_schema = extended_frozen_schema(a.new_schema),
.old_schema = frozen_schema_with_base_info(a.old_schema),
.new_schema = frozen_schema_with_base_info(a.new_schema),
});
co_await coroutine::maybe_yield();
}
for (const auto& d : dropped) {
result.dropped.emplace_back(extended_frozen_schema(d));
result.dropped.emplace_back(frozen_schema_with_base_info(d));
co_await coroutine::maybe_yield();
}
co_return result;
@@ -764,20 +715,16 @@ static future<> notify_tables_and_views(service::migration_notifier& notifier, c
};
const auto& tables = diff.tables_and_views.local().tables;
const auto& cdc = diff.tables_and_views.local().cdc;
const auto& views = diff.tables_and_views.local().views;
// View drops are notified first, because a table can only be dropped if its views are already deleted
co_await notify(views.dropped, [&] (auto&& dt) { return notifier.drop_view(view_ptr(dt)); });
co_await notify(tables.dropped, [&] (auto&& dt) { return notifier.drop_column_family(dt); });
co_await notify(cdc.dropped, [&] (auto&& dt) { return notifier.drop_column_family(dt); });
// Table creations are notified first, in case a view is created right after the table
co_await notify(tables.created, [&] (auto&& gs) { return notifier.create_column_family(gs); });
co_await notify(cdc.created, [&] (auto&& gs) { return notifier.create_column_family(gs); });
co_await notify(views.created, [&] (auto&& gs) { return notifier.create_view(view_ptr(gs)); });
// Table altering is notified first, in case new base columns appear
co_await notify(tables.altered, [&] (auto&& altered) { return notifier.update_column_family(altered.new_schema, *it++); });
co_await notify(cdc.altered, [&] (auto&& altered) { return notifier.update_column_family(altered.new_schema, *it++); });
co_await notify(views.altered, [&] (auto&& altered) { return notifier.update_view(view_ptr(altered.new_schema), *it++); });
}
@@ -835,38 +782,13 @@ future<> schema_applier::merge_aggregates() {
});
}
struct extracted_cdc {
std::map<table_id, schema_mutations> tables_without_cdc;
std::map<table_id, schema_mutations> cdc_tables;
};
static extracted_cdc extract_cdc(std::map<table_id, schema_mutations> tables) {
std::map<table_id, schema_mutations> cdc_tables;
auto it = tables.begin();
while (it != tables.end()) {
if (it->second.partitioner() == cdc::cdc_partitioner::classname) {
auto node = tables.extract(it++);
cdc_tables.insert(std::move(node));
} else {
++it;
}
}
return extracted_cdc{std::move(tables), std::move(cdc_tables)};
}
future<schema_persisted_state> schema_applier::get_schema_persisted_state() {
auto tables_and_cdc = co_await read_tables_for_keyspaces(_proxy, _keyspaces, table_kind::table, _affected_tables);
auto [tables, cdc] = extract_cdc(std::move(tables_and_cdc));
schema_persisted_state v{
.keyspaces = co_await read_schema_for_keyspaces(_proxy, KEYSPACES, _keyspaces),
.scylla_keyspaces = co_await read_schema_for_keyspaces(_proxy, SCYLLA_KEYSPACES, _keyspaces),
.tables = std::move(tables),
.tables = co_await read_tables_for_keyspaces(_proxy, _keyspaces, table_kind::table, _affected_tables),
.types = co_await read_schema_for_keyspaces(_proxy, TYPES, _keyspaces),
.views = co_await read_tables_for_keyspaces(_proxy, _keyspaces, table_kind::view, _affected_tables),
.cdc = std::move(cdc),
.functions = co_await read_schema_for_keyspaces(_proxy, FUNCTIONS, _keyspaces),
.aggregates = co_await read_schema_for_keyspaces(_proxy, AGGREGATES, _keyspaces),
.scylla_aggregates = co_await read_schema_for_keyspaces(_proxy, SCYLLA_AGGREGATES, _keyspaces),
@@ -975,7 +897,6 @@ public:
};
auto& tables_and_views = _sa._affected_tables_and_views.tables_and_views.local();
co_await include_pending_changes(tables_and_views.tables);
co_await include_pending_changes(tables_and_views.cdc);
co_await include_pending_changes(tables_and_views.views);
for (auto& [id, schema] : table_schemas) {
@@ -1023,7 +944,6 @@ void schema_applier::commit_tables_and_views() {
auto& db = sharded_db.local();
auto& diff = _affected_tables_and_views;
const auto& tables = diff.tables_and_views.local().tables;
const auto& cdc = diff.tables_and_views.local().cdc;
const auto& views = diff.tables_and_views.local().views;
for (auto& dropped_view : views.dropped) {
@@ -1034,15 +954,6 @@ void schema_applier::commit_tables_and_views() {
auto s = dropped_table.get();
replica::database::drop_table(sharded_db, s->ks_name(), s->cf_name(), true, diff.table_shards[s->id()]);
}
for (auto& dropped_cdc : cdc.dropped) {
auto s = dropped_cdc.get();
replica::database::drop_table(sharded_db, s->ks_name(), s->cf_name(), true, diff.table_shards[s->id()]);
}
for (auto& schema : cdc.created) {
auto& ks = db.find_keyspace(schema->ks_name());
db.add_column_family(ks, schema, ks.make_column_family_config(*schema, db), replica::database::is_new_cf::yes, _pending_token_metadata.local());
}
for (auto& schema : tables.created) {
auto& ks = db.find_keyspace(schema->ks_name());
@@ -1054,11 +965,7 @@ void schema_applier::commit_tables_and_views() {
db.add_column_family(ks, schema, ks.make_column_family_config(*schema, db), replica::database::is_new_cf::yes, _pending_token_metadata.local());
}
diff.tables_and_views.local().columns_changed.reserve(tables.altered.size() + cdc.altered.size() + views.altered.size());
for (auto&& altered : cdc.altered) {
bool changed = db.update_column_family(altered.new_schema);
diff.tables_and_views.local().columns_changed.push_back(changed);
}
diff.tables_and_views.local().columns_changed.reserve(tables.altered.size() + views.altered.size());
for (auto&& altered : boost::range::join(tables.altered, views.altered)) {
bool changed = db.update_column_family(altered.new_schema);
diff.tables_and_views.local().columns_changed.push_back(changed);
@@ -1145,10 +1052,6 @@ future<> schema_applier::finalize_tables_and_views() {
auto s = dropped_table.get();
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}
for (auto& dropped_cdc : diff.tables_and_views.local().cdc.dropped) {
auto s = dropped_cdc.get();
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}
if (_tablet_hint) {
auto& db = sharded_db.local();
@@ -1159,11 +1062,7 @@ future<> schema_applier::finalize_tables_and_views() {
co_await sharded_db.invoke_on_all([&diff] (replica::database& db) -> future<> {
const auto& tables = diff.tables_and_views.local().tables;
const auto& cdc = diff.tables_and_views.local().cdc;
const auto& views = diff.tables_and_views.local().views;
for (auto& created_cdc : cdc.created) {
co_await db.make_column_family_directory(created_cdc);
}
for (auto& created_table : tables.created) {
co_await db.make_column_family_directory(created_table);
}

View File

@@ -48,7 +48,6 @@ struct schema_persisted_state {
std::map<table_id, schema_mutations> tables;
schema_tables::schema_result types;
std::map<table_id, schema_mutations> views;
std::map<table_id, schema_mutations> cdc;
schema_tables::schema_result functions;
schema_tables::schema_result aggregates;
schema_tables::schema_result scylla_aggregates;
@@ -106,12 +105,12 @@ public:
struct frozen_schema_diff {
struct altered_schema {
extended_frozen_schema old_schema;
extended_frozen_schema new_schema;
frozen_schema_with_base_info old_schema;
frozen_schema_with_base_info new_schema;
};
std::vector<extended_frozen_schema> created;
std::vector<frozen_schema_with_base_info> created;
std::vector<altered_schema> altered;
std::vector<extended_frozen_schema> dropped;
std::vector<frozen_schema_with_base_info> dropped;
};
// schema_diff represents what is happening with tables or views during schema merge
@@ -141,7 +140,6 @@ public:
struct affected_tables_and_views_per_shard {
schema_diff_per_shard tables;
schema_diff_per_shard cdc;
schema_diff_per_shard views;
std::vector<bool> columns_changed;
};

View File

@@ -28,7 +28,6 @@
#include "utils/log.hh"
#include "schema/frozen_schema.hh"
#include "schema/schema_registry.hh"
#include "cdc/cdc_options.hh"
#include "mutation_query.hh"
#include "system_keyspace.hh"
#include "system_distributed_keyspace.hh"
@@ -2078,9 +2077,7 @@ future<schema_ptr> create_table_from_name(sharded<service::storage_proxy>& proxy
co_await coroutine::return_exception(std::runtime_error(format("{}:{} not found in the schema definitions keyspace.", qn.keyspace_name, qn.table_name)));
}
const schema_ctxt& ctxt = proxy;
// The CDC schema is set to nullptr because we don't have it yet, but we will
// check and update it soon if needed in create_tables_from_tables_partition.
co_return create_table_from_mutations(ctxt, std::move(sm), ctxt.user_types(), nullptr);
co_return create_table_from_mutations(ctxt, std::move(sm), ctxt.user_types());
}
// Limit concurrency of user tables to prevent stalls.
@@ -2098,28 +2095,10 @@ constexpr size_t max_concurrent = 8;
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(sharded<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
{
auto tables = std::map<sstring, schema_ptr>();
auto tables_with_cdc = std::map<sstring, schema_ptr>();
co_await max_concurrent_for_each(result->rows().begin(), result->rows().end(), max_concurrent, [&] (const query::result_set_row& row) -> future<> {
schema_ptr cfm = co_await create_table_from_table_row(proxy, row);
if (!cfm->cdc_options().enabled()) {
tables.emplace(cfm->cf_name(), std::move(cfm));
} else {
// defer tables with CDC enabled. we want to construct all CDC tables first
// so then we can construct the schemas for these tables with the pointer to
// its CDC schema.
tables_with_cdc.emplace(cfm->cf_name(), std::move(cfm));
}
tables.emplace(cfm->cf_name(), std::move(cfm));
});
for (auto&& [name, cfm] : tables_with_cdc) {
schema_ptr cdc_schema;
if (auto it = tables.find(cdc::log_name(name)); it != tables.end()) {
cdc_schema = it->second;
} else {
slogger.warn("Did not find CDC log schema for table {}", name);
}
schema_ptr extended_cfm = cdc_schema ? cfm->make_with_cdc(cdc_schema) : cfm;
tables.emplace(std::move(name), std::move(extended_cfm));
}
co_return std::move(tables);
}
@@ -2268,7 +2247,7 @@ static void prepare_builder_from_scylla_tables_row(const schema_ctxt& ctxt, sche
}
}
schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, const data_dictionary::user_types_storage& user_types, schema_ptr cdc_schema, std::optional<table_schema_version> version)
schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, const data_dictionary::user_types_storage& user_types, std::optional<table_schema_version> version)
{
slogger.trace("create_table_from_mutations: version={}, {}", version, sm);
@@ -2352,10 +2331,6 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
builder.with_version(sm.digest(ctxt.features().cluster_schema_features()));
}
if (cdc_schema) {
builder.with_cdc_schema(cdc_schema);
}
if (auto partitioner = sm.partitioner()) {
builder.with_partitioner(*partitioner);
builder.with_sharder(smp::count, ctxt.murmur3_partitioner_ignore_msb_bits());

View File

@@ -286,7 +286,7 @@ future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(sharde
utils::chunked_vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage& user_types, schema_ptr cdc_schema, std::optional<table_schema_version> version = {});
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage& user_types, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage&, schema_ptr, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage&, std::optional<view::base_dependent_view_info> = {}, std::optional<table_schema_version> version = {});

View File

@@ -3681,11 +3681,6 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
if (some_row.has("ignore_nodes")) {
ret.ignored_nodes = decode_nodes_ids(deserialize_set_column(*topology(), some_row, "ignore_nodes"));
}
ret.excluded_tablet_nodes = ret.ignored_nodes;
for (const auto& [id, _]: ret.left_nodes_rs) {
ret.excluded_tablet_nodes.insert(id);
}
}
co_return ret;

View File

@@ -29,8 +29,6 @@
#include "db/view/view_building_task_mutation_builder.hh"
#include "utils/assert.hh"
#include "idl/view.dist.hh"
#include "utils/error_injection.hh"
#include "utils/log.hh"
static logging::logger vbc_logger("view_building_coordinator");
@@ -336,13 +334,10 @@ future<bool> view_building_coordinator::work_on_view_building(service::group0_gu
_remote_work.erase(replica);
}
}
const bool ignore_gossiper = utils::get_local_injector().enter("view_building_coordinator_ignore_gossiper");
if (!_gossiper.is_alive(replica.host) && !ignore_gossiper) {
if (!_gossiper.is_alive(replica.host)) {
vbc_logger.debug("Replica {} is dead", replica);
continue;
}
if (skip_work_on_this_replica) {
continue;
}
@@ -444,22 +439,11 @@ void view_building_coordinator::attach_to_started_tasks(const locator::tablet_re
}
future<std::optional<view_building_coordinator::remote_work_results>> view_building_coordinator::work_on_tasks(locator::tablet_replica replica, std::vector<utils::UUID> tasks) {
constexpr auto backoff_duration = std::chrono::seconds(1);
static thread_local logger::rate_limit rate_limit{backoff_duration};
std::vector<view_task_result> remote_results;
bool rpc_failed = false;
try {
remote_results = co_await ser::view_rpc_verbs::send_work_on_view_building_tasks(&_messaging, replica.host, _as, tasks);
} catch (...) {
vbc_logger.log(log_level::warn, rate_limit, "Work on tasks {} on replica {}, failed with error: {}",
tasks, replica, std::current_exception());
rpc_failed = true;
}
if (rpc_failed) {
co_await seastar::sleep(backoff_duration);
vbc_logger.warn("Work on tasks {} on replica {}, failed with error: {}", tasks, replica, std::current_exception());
_vb_sm.event.broadcast();
co_return std::nullopt;
}

View File

@@ -245,21 +245,23 @@ future<> view_building_worker::create_staging_sstable_tasks() {
// Firstly reorgenize `_sstables_to_register` for easier movement.
// This is done in separate loop after committing the group0 command, because we need to move values from `_sstables_to_register`
// (`staging_sstable_task_info` is non-copyable because of `foreign_ptr` field).
std::unordered_map<shard_id, std::unordered_map<table_id, std::vector<foreign_ptr<sstables::shared_sstable>>>> new_sstables_per_shard;
std::unordered_map<shard_id, std::unordered_map<table_id, std::unordered_map<dht::token, std::vector<foreign_ptr<sstables::shared_sstable>>>>> new_sstables_per_shard;
for (auto& [table_id, sst_infos]: _sstables_to_register) {
for (auto& sst_info: sst_infos) {
new_sstables_per_shard[sst_info.shard][table_id].push_back(std::move(sst_info.sst_foreign_ptr));
new_sstables_per_shard[sst_info.shard][table_id][sst_info.last_token].push_back(std::move(sst_info.sst_foreign_ptr));
}
}
for (auto& [shard, sstables_per_table]: new_sstables_per_shard) {
co_await container().invoke_on(shard, [sstables_for_this_shard = std::move(sstables_per_table)] (view_building_worker& local_vbw) mutable {
for (auto& [tid, ssts]: sstables_for_this_shard) {
auto unwrapped_ssts = ssts | std::views::as_rvalue | std::views::transform([] (auto&& fptr) {
return fptr.unwrap_on_owner_shard();
}) | std::ranges::to<std::vector>();
auto& tid_ssts = local_vbw._staging_sstables[tid];
tid_ssts.insert(tid_ssts.end(), std::make_move_iterator(unwrapped_ssts.begin()), std::make_move_iterator(unwrapped_ssts.end()));
for (auto& [tid, ssts_map]: sstables_for_this_shard) {
for (auto& [token, ssts]: ssts_map) {
auto unwrapped_ssts = ssts | std::views::as_rvalue | std::views::transform([] (auto&& fptr) {
return fptr.unwrap_on_owner_shard();
}) | std::ranges::to<std::vector>();
auto& tid_ssts = local_vbw._staging_sstables[tid][token];
tid_ssts.insert(tid_ssts.end(), std::make_move_iterator(unwrapped_ssts.begin()), std::make_move_iterator(unwrapped_ssts.end()));
}
}
});
}
@@ -326,7 +328,7 @@ std::unordered_map<table_id, std::vector<view_building_worker::staging_sstable_t
// or maybe it can be registered to view_update_generator directly.
tasks_to_create[table_id].emplace_back(table_id, shard, last_token, make_foreign(std::move(sstable)));
} else {
_staging_sstables[table_id].push_back(std::move(sstable));
_staging_sstables[table_id][last_token].push_back(std::move(sstable));
}
}
});
@@ -846,54 +848,13 @@ future<> view_building_worker::do_build_range(table_id base_id, std::vector<tabl
}
future<> view_building_worker::do_process_staging(table_id table_id, dht::token last_token) {
if (_staging_sstables[table_id].empty()) {
if (_staging_sstables[table_id][last_token].empty()) {
co_return;
}
auto table = _db.get_tables_metadata().get_table(table_id).shared_from_this();
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
auto tid = tablet_map.get_tablet_id(last_token);
auto tablet_range = tablet_map.get_token_range(tid);
// Select sstables belonging to the tablet (identified by `last_token`)
std::vector<sstables::shared_sstable> sstables_to_process;
for (auto& sst: _staging_sstables[table_id]) {
auto sst_last_token = sst->get_last_decorated_key().token();
if (tablet_range.contains(sst_last_token, dht::token_comparator())) {
sstables_to_process.push_back(sst);
}
}
co_await _vug.process_staging_sstables(std::move(table), sstables_to_process);
try {
// Remove processed sstables from `_staging_sstables` map
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
std::unordered_set<sstables::shared_sstable> sstables_to_remove(sstables_to_process.begin(), sstables_to_process.end());
auto [first, last] = std::ranges::remove_if(_staging_sstables[table_id], [&] (auto& sst) {
return sstables_to_remove.contains(sst);
});
_staging_sstables[table_id].erase(first, last);
} catch (semaphore_aborted&) {
vbw_logger.warn("Semaphore was aborted while waiting to removed processed sstables for table {}", table_id);
}
}
void view_building_worker::load_sstables(table_id table_id, std::vector<sstables::shared_sstable> ssts) {
std::ranges::copy_if(std::move(ssts), std::back_inserter(_staging_sstables[table_id]), [] (auto& sst) {
return sst->state() == sstables::sstable_state::staging;
});
}
void view_building_worker::cleanup_staging_sstables(locator::effective_replication_map_ptr erm, table_id table_id, locator::tablet_id tid) {
auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(table_id);
auto tablet_range = tablet_map.get_token_range(tid);
auto [first, last] = std::ranges::remove_if(_staging_sstables[table_id], [&] (auto& sst) {
auto sst_last_token = sst->get_last_decorated_key().token();
return tablet_range.contains(sst_last_token, dht::token_comparator());
});
_staging_sstables[table_id].erase(first, last);
auto sstables = std::exchange(_staging_sstables[table_id][last_token], {});
co_await _vug.process_staging_sstables(std::move(table), std::move(sstables));
}
}

View File

@@ -14,7 +14,6 @@
#include <seastar/core/shared_future.hh>
#include <unordered_map>
#include <unordered_set>
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "seastar/core/gate.hh"
#include "db/view/view_building_state.hh"
@@ -161,7 +160,7 @@ private:
condition_variable _sstables_to_register_event;
semaphore _staging_sstables_mutex = semaphore(1);
std::unordered_map<table_id, std::vector<staging_sstable_task_info>> _sstables_to_register;
std::unordered_map<table_id, std::vector<sstables::shared_sstable>> _staging_sstables;
std::unordered_map<table_id, std::unordered_map<dht::token, std::vector<sstables::shared_sstable>>> _staging_sstables;
future<> _staging_sstables_registrator = make_ready_future<>();
public:
@@ -179,11 +178,6 @@ public:
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {};
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override;
// Used ONLY to load staging sstables migrated during intra-node tablet migration.
void load_sstables(table_id table_id, std::vector<sstables::shared_sstable> ssts);
// Used in cleanup/cleanup-target tablet transition stage
void cleanup_staging_sstables(locator::effective_replication_map_ptr erm, table_id table_id, locator::tablet_id tid);
private:
future<> run_view_building_state_observer();
future<> update_built_views();

View File

@@ -31,5 +31,4 @@ def parse():
parser.add_argument('--replace-address-first-boot', default=None, dest='replaceAddressFirstBoot', help="[[deprecated]] IP address of a dead node to replace.")
parser.add_argument('--dc', default=None, dest='dc', help="The datacenter name for this node, for use with the snitch GossipingPropertyFileSnitch.")
parser.add_argument('--rack', default=None, dest='rack', help="The rack name for this node, for use with the snitch GossipingPropertyFileSnitch.")
parser.add_argument('--blocked-reactor-notify-ms', default='25', dest='blocked_reactor_notify_ms', help="Set the blocked reactor notification timeout in milliseconds. Defaults to 25.")
return parser.parse_known_args()

View File

@@ -46,7 +46,6 @@ class ScyllaSetup:
self._extra_args = extra_arguments
self._dc = arguments.dc
self._rack = arguments.rack
self._blocked_reactor_notify_ms = arguments.blocked_reactor_notify_ms
def _run(self, *args, **kwargs):
logging.info('running: {}'.format(args))
@@ -206,7 +205,7 @@ class ScyllaSetup:
elif self._replaceAddressFirstBoot is not None:
args += ["--replace-address-first-boot %s" % self._replaceAddressFirstBoot]
args += ["--blocked-reactor-notify-ms %s" % self._blocked_reactor_notify_ms]
args += ["--blocked-reactor-notify-ms 999999999"]
with open("/etc/scylla.d/docker.conf", "w") as cqlshrc:
cqlshrc.write("SCYLLA_DOCKER_ARGS=\"%s\"\n" % (" ".join(args) + " " + " ".join(self._extra_args)))

View File

@@ -95,12 +95,11 @@ class DBConfigParser:
for match in config_matches:
name = match[1].strip()
liveness_value = match[3].strip() if match[3] else ""
property_data = {
"name": name,
"value_status": match[4].strip(),
"default": match[5].strip(),
"liveness": "True" if liveness_value == "LiveUpdate" else "False",
"liveness": "True" if match[3] else "False",
"description": match[6].strip(),
}
properties.append(property_data)
@@ -136,7 +135,7 @@ class DBConfigParser:
end_pos = next_group_match.start() if next_group_match else len(config_content)
config_group_content = config_content[group_match.end():end_pos]
current_group = self._parse_group(group_match, config_group_content)
groups.append(current_group)

View File

@@ -338,7 +338,7 @@ they should be easy to detect. Here is a list of these unimplemented features:
Currently, *all* Alternator tables are created as global tables and can
be accessed from all the DCs existing at the time of the table's creation.
If a DC is added after a table is created, the table won't be visible from
the new DC and changing that requires a CQL "ALTER KEYSPACE" statement to
the new DC and changing that requires a CQL "ALTER TABLE" statement to
modify the table's replication strategy.
<https://github.com/scylladb/scylla/issues/5062>

View File

@@ -181,18 +181,17 @@ entire data center, or other data centers, in that case.
## Tablets
"Tablets" are ScyllaDB's new approach to replicating data across a cluster.
It replaces the older approach which was named "vnodes". See
[Data Distribution with Tablets](../architecture/tablets.rst) for details.
It replaces the older approach which was named "vnodes". Compared to vnodes,
tablets are smaller pieces of tables that are easier to move between nodes,
and allow for faster growing or shrinking of the cluster when needed.
In this version, tablet support is almost complete, so new
Alternator tables default to following what the global configuration flag
[tablets_mode_for_new_keyspaces](../reference/configuration-parameters.rst#confval-tablets_mode_for_new_keyspaces)
tells them to.
In this version, tablet support is incomplete and not all of the features
which Alternator needs are supported with tablets. So currently, new
Alternator tables default to using vnodes - not tablets.
If you want to influence whether a specific Alternator table is created with tablets or vnodes,
you can do this by specifying the `system:initial_tablets` tag
(in earlier versions of Scylla the tag was `experimental:initial_tablets`)
in the CreateTable operation. The value of this tag can be:
However, if you do want to create an Alternator table which uses tablets,
you can do this by specifying the `experimental:initial_tablets` tag in
the CreateTable operation. The value of this tag can be:
* Any valid integer as the value of this tag enables tablets.
Typically the number "0" is used - which tells ScyllaDB to pick a reasonable
@@ -200,11 +199,9 @@ in the CreateTable operation. The value of this tag can be:
number overrides the default choice of initial number of tablets.
* Any non-integer value - e.g., the string "none" - creates the table
without tablets - i.e., using vnodes. However, when vnodes are asked for by the tag value,
but tablets are `enforced` by the `tablets_mode_for_new_keyspaces` configuration flag,
an exception will be thrown.
without tablets - i.e., using vnodes.
The `system:initial_tablets` tag only has any effect while creating
The `experimental:initial_tablets` tag only has any effect while creating
a new table with CreateTable - changing it later has no effect.
Because the tablets support is incomplete, when tablets are enabled for an

View File

@@ -883,8 +883,7 @@ when replicas are slow or unresponsive. The following are legal values (case-in
``XPERCENTILE`` 90.5PERCENTILE Coordinators record average per-table response times for all replicas.
If a replica takes longer than ``X`` percent of this table's average
response time, the coordinator queries an additional replica.
``X`` must be between 0 and 100, including those values.
The value is rounded to the nearest 0.1 (1 decimal place).
``X`` must be between 0 and 100.
``XP`` 90.5P Synonym for ``XPERCENTILE``
``Yms`` 25ms If a replica takes more than ``Y`` milliseconds to respond,
the coordinator queries an additional replica.

View File

@@ -274,8 +274,8 @@ in this case ``[0.1, 0.2, 0.3, 0.4]``.
.. warning::
Currently, vector queries do not support filtering with ``WHERE`` clause, returning similarity distances,
grouping with ``GROUP BY`` and paging. This will be added in the future releases.
Currently, vector queries do not support filtering with ``WHERE`` clause, grouping with ``GROUP BY`` and paging.
This will be added in the future releases.
.. _limit-clause:

View File

@@ -189,18 +189,3 @@ The command displays a table with: option name, effective service level the valu
workload_type | sl2 | batch
timeout | sl1 | 2s
```
## Implementation
### Integration with auth
Service levels ultimately depend on the state of `auth`. Since `auth::service` is initialized long after
`service_level_controller`, we register it separately once it's started, and unregister it right before
it's stopped. For that, we wrap it in a struct called `auth_integration` that manages access to it.
That ensures that `service_level_controller` will not try to reference it beyond its lifetime.
It's important to note that there may still be attempts to fetch an effective service level for a role
or indirectly access `auth::service` in some other way when `auth_integration` is absent. One important
situation to have in mind is when the user connects to Scylla via the maintenance socket. It's possible
early on, way before Scylla is fully initialized. Since we don't have access to `auth` yet, we need to
ensure that the semantics of the operations performed on `service_level_controller` still make sense
in that context.

View File

@@ -110,17 +110,14 @@ stateDiagram-v2
A node state may have additional parameters associated with it. For instance
'replacing' state has host id of a node been replaced as a parameter.
Additionally to specific node states, the entire topology can also be in one of the transitioning states listed below.
Note that these are not all states, as there are other states specific to tablets described in the following sections.
Additionally to specific node states, there entire topology can also be in a transitioning state:
- `join_group0` - a join request from a bootstrapping/replacing node has been accepted. The node joins group 0 and,
in the case of a bootstrapping node, receives bootstrap tokens.
- `commit_cdc_generation` - a new CDC generation data was written to internal tables earlier
and now we need to commit the generation - create a timestamp for it and tell every node
to start using it for CDC log table writes.
- `write_both_read_old` - one of the nodes is in a bootstrapping/decommissioning/removing/replacing state.
Writes to vnodes-based tables are going to both new and old replicas (new replicas means calculated according
to modified token ring), reads are using old replicas.
Writes are going to both new and old replicas (new replicas means calculated according to modified
token ring), reads are using old replicas.
- `write_both_read_new` - as above, but reads are using new replicas.
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
@@ -131,9 +128,8 @@ Note that these are not all states, as there are other states specific to tablet
requests from starting. Intended to be used in tests which want to prevent internally-triggered topology
operations during the test.
When a node bootstraps, we move the topology to `join_group0` state, where we add
the node to group 0, create new tokens for it, and create a new CDC generation.
Then, we enter the `commit_cdc_generation` state. Once the generation is committed,
When a node bootstraps, we create new tokens for it and a new CDC generation
and enter the `commit_cdc_generation` state. Once the generation is committed,
we enter `write_both_read_old` state. After the entire cluster learns about it,
streaming starts. When streaming finishes, we move to `write_both_read_new`
state and again the whole cluster needs to learn about it and make sure that no
@@ -176,13 +172,6 @@ are the currently supported global topology operations:
contain replicas of the table being truncated. It uses [sessions](#Topology guards)
to make sure that no stale RPCs are executed outside of the scope of the request.
## Zero-token nodes
Zero-token nodes (the nodes started with `join_ring=false`) never own tokens or become
tablet replicas. Hence, the logic described above is significantly simplified for them.
For example, a bootstrapping zero-token node completes the transition on the
`join_group0` state, as the following tasks (like creating a new CDC generation,
streaming, and tablet migrations) are unneeded.
# Load balancing
@@ -689,25 +678,15 @@ CREATE TABLE system.topology (
rebuild_option text,
release_version text,
replaced_id uuid,
ignore_nodes set<uuid> static,
ignore_nodes set<uuid>,
shard_count int,
tokens set<text>,
tokens_string text,
topology_request text,
transition_state text static,
cleanup_status text,
supported_features set<uuid>,
request_id timeuuid,
version bigint static,
fence_version bigint static,
committed_cdc_generations set<tuple<timestamp, timeuuid>> static,
unpublished_cdc_generations set<tuple<timestamp, timeuuid>> static,
global_topology_request text static,
global_topology_request_id timeuuid static,
enabled_features set<text> static,
session uuid static,
tablet_balancing_enabled boolean static,
upgrade_state text static,
new_cdc_generation_data_uuid timeuuid static,
new_keyspace_rf_change_ks_name text static,
new_keyspace_rf_change_data frozen<map<text, text>> static,
@@ -730,17 +709,12 @@ Each node has a clustering row in the table where its `host_id` is the clusterin
- `topology_request` - if set contains one of the supported node-specific topology requests
- `tokens` - if set contains a list of tokens that belongs to the node
- `replaced_id` - if the node replacing or replaced another node here will be the id of that node
- `ignore_nodes` - if set contains a list of ids of nodes ignored during the remove or replace operation
- `rebuild_option` - if the node is being rebuild contains datacenter name that is used as a rebuild source
- `num_tokens` - the requested number of tokens when the node bootstraps
- `tokens_string` - if set contains the `initial_token` value of the bootstrapping node
- `cleanup_status` - contains the cleanup status of the node (clean, needed, or running)
- `supported_features` - if set contains the list of cluster features supported by the node
- `request_id` - the ID of the current request for the node or the last one if there is no current request
There are also a few static columns for cluster-global properties:
- `ignore_nodes` - if set, contains a list of node IDs to be ignored during remove or replace topology operations
and tablet-related operations such as migration, split, and merge.
- `transition_state` - the transitioning state of the cluster (as described earlier), may be null
- `committed_cdc_generations` - the IDs of the committed CDC generations
- `unpublished_cdc_generations` - the IDs of the committed yet unpublished CDC generations
@@ -751,12 +725,7 @@ There are also a few static columns for cluster-global properties:
- `new_keyspace_rf_change_ks_name` - the name of the KS that is being the target of the scheduled ALTER KS statement
- `new_keyspace_rf_change_data` - the KS options to be used when executing the scheduled ALTER KS statement
- `global_requests` - contains a list of ids of pending global requests, the information about requests (type and parameters)
can be obtained from topology_requests table by using request's id as a look up key
- `version` - the current topology version
- `fence_version` - the current fence version
- `enabled_features` - the list of cluster features enabled by the cluster
- `session` - if set contains the ID of the current session
- `tablet_balancing_enabled` - if false, the tablet balancing has been disabled
can be obtained from topology_requests table by using request's id as a look up key.
# Join procedure
@@ -878,87 +847,20 @@ topology coordinator fiber and coordinates the remaining steps:
If a disaster happens and a majority of nodes are lost, changes to the group 0
state are no longer possible and a manual recovery procedure needs to be performed.
Our current procedure starts by switching all nodes to a special "recovery" mode
in which nodes do not use raft at all. In this mode, dead nodes are supposed
to be removed from the cluster via `nodetool removenode`. After all dead nodes
are removed, state related to group 0 is deleted and nodes are restarted in
regular mode, allowing the cluster to re-form group 0.
## The procedure
Topology on raft fits into this procedure in the following way:
Our current procedure starts by removing the persistent group 0 ID and the group 0
discovery state on all live nodes. This process ensures that live nodes will try to
join a new group 0 during the next restart.
The issue is that one of the live nodes has to create the new group 0, and not every
node is a safe choice. It turns out that we can choose only nodes with the latest
`commit_index` (see this [section](#choosing-the-recovery-leader) for a detailed
explanation). We call the chosen node the *recovery leader*.
Once the recovery leader is chosen, all live nodes can join the new group 0 during
a rolling restart. Nodes learn about the recovery leader through the
`recovery_leader` config option. Also, the recovery leader must be restarted first
to create the new group 0 before other nodes try to join it.
After successfully restarting all live nodes, dead nodes can be removed via
`nodetool removenode` or by replacing them.
Finally, the persisted internal state of the old group 0 can be cleaned up.
## Topology coordinator during recovery
After joining the new group 0 during the procedure, live nodes don't execute any
"special recovery code" related to topology.
In particular, the recovery leader normally starts the topology coordinator fiber.
This fiber is designed to ensure that a started topology operation never hangs
(it succeeds or is rolled back) regardless of the conditions. So, if the majority
has been lost in the middle of some work done by the topology coordinator, the new
topology coordinator (run on the recovery leader) will finish this work. It will
usually fail and be rolled back, e.g., due to `global_token_metadata_barrier`
failing after a global topology command sent to a dead normal node fails.
Note that this behavior is necessary to ensure that the new topology coordinator
will eventually be able to start handling the topology requests to remove/replace
dead nodes. Those requests will succeed thanks to the `ignore_dead_nodes` and
`ignore_dead_nodes_for_replace` options.
## Gossip during recovery
A node always includes its group 0 ID in `gossip_digest_syn`, and the receiving node
rejects the message if the ID is different from its local ID. However, nodes can
temporarily belong to two different group 0's during the recovery procedure. To keep
the gossip working, we've decided to additionally include the local `recovery_leader`
value in `gossip_digest_syn`. Nodes ignore group 0 ID mismatch if the sender or the
receiver has a non-empty `recovery_leader` (usually both have it).
## Choosing the recovery leader
The group 0 state persisted on the recovery leader becomes the initial state of
other nodes that join the new group 0 (which happens through the Raft snapshot
transfer). After all, the Raft state machine must be consistent at the beginning.
When a disaster happens, live nodes can have different commit indexes, and the nodes
that are behind have no way of catching up without majority. Imagine there are two
live nodes - node A and node B, node A has `commit_index`=10, and node B has
`commit_index`=11. Also, assume that the log entry with index 11 is a schema change
that adds a new column to a table. Node B could have already handled some replica
writes to the new column. If node A became the recovery leader and node B joined the
new group 0, node B would receive a snapshot that regresses its schema version. Node
B could end up in an inconsistent state with data in a column that doesn't exist
according to group 0. Hence, node B must be the recovery leader.
## Loss of committed entries
It can happen that a group 0 entry has been committed by a majority consisting of
only dead nodes. Then, no matter what recovery leader we choose, it won't have this
entry. This is fine, assuming that the following group 0 causality property holds on
all live nodes: any persisted effect on the nodes state is written only after
the group 0 state it depends on has already been persisted.
For example, the above property holds for schema changes and writes because
a replica persists a write only after applying the group 0 entry with the latest
schema, which is ensured by a read barrier.
It's critical for recovery safety to ensure that no subsystem breaks group 0 causality.
Fortunately, this property is natural and not very limiting.
Losing a committed entry can be observed by external systems. For example, the latest
schema version in the cluster can go back in time from the driver's perspective. This
is outside the scope of the recovery procedure, though, and it shouldn't cause
problems in practice.
- When nodes are restarted in recovery mode, they revert to gossiper-based
operations. This allows to perform `nodetool removenode` without having
a majority of nodes. In this mode, `system.topology` is *not* updated, so
it becomes outdated at the end.
- Before disabling recovery mode on the nodes, the `system.topology` table
needs to be truncated on all nodes. This will cause nodes to revert to
legacy topology operations after exiting recovery mode.
- After re-forming group 0, the cluster needs to be upgraded again to raft
topology by the administrator.

View File

@@ -20,8 +20,6 @@ To clean up the data of a specific node and specific keyspace, use this command:
nodetool -h <host name> cleanup <keyspace>
To clean up entire cluster see :doc:`nodetool cluster cleanup </operating-scylla/nodetool-commands/cluster/cleanup/>`
.. warning::
Make sure there are no topology changes before running cleanup. To validate, run ``nodetool status``, all nodes should be in status Up Normal (``UN``).

View File

@@ -1,15 +0,0 @@
Nodetool cluster cleanup
========================
**cluster cleanup** - A process that runs in the background and removes data no longer owned by nodes. Used for non tablet (vnode-based) tables only.
Running ``cluster cleanup`` on a **single node** cleans up all non tablet tables on all nodes in the cluster (tablet enabled tables are cleaned up automatically).
For example:
::
nodetool cluster cleanup
See also `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_.

View File

@@ -5,7 +5,6 @@ Nodetool cluster
:hidden:
repair <repair>
cleanup <cleanup>
**cluster** - Nodetool supercommand for running cluster operations.
@@ -13,4 +12,3 @@ Supported cluster suboperations
-------------------------------
* :doc:`repair </operating-scylla/nodetool-commands/cluster/repair>` :code:`<keyspace>` :code:`<table>` - Repair one or more tablet tables.
* :doc:`cleanup </operating-scylla/nodetool-commands/cluster/cleanup>` - Clean up all non tablet (vnode-based) keyspaces in a cluster

View File

@@ -29,27 +29,16 @@ Load and Stream
.. code::
nodetool refresh <my_keyspace> <my_table> [(--load-and-stream | -las) [[(--primary-replica-only | -pro)] | [--scope <scope>]]]
The Load and Stream feature extends nodetool refresh.
The ``--load-and-stream`` option loads arbitrary sstables into the cluster by reading the sstable data and streaming each partition to the replica(s) that owns it. In addition, the ``--scope`` and ``--primary-replica-only`` options are applied to filter the set of target replicas for each partition. For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. One can copy the sstables from the old cluster to any of the new nodes and trigger refresh with load and stream.
nodetool refresh <my_keyspace> <my_table> [--load-and-stream | -las] [--scope <scope>]
The Load and Stream feature extends nodetool refresh. The new ``-las`` option loads arbitrary sstables that do not belong to a node into the cluster. It loads the sstables from the disk and calculates the data's owning nodes, and streams automatically.
For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. We can copy the sstables from the old cluster to any of the new nodes and trigger the load and stream process.
Load and Stream make restores and migrations much easier:
* You can place sstable from every node to every node
* No need to run nodetool cleanup to remove unused data
With --primary-replica-only (or -pro) option, only the primary replica of each partition in an sstable will be used as the target.
--primary-replica-only must be applied together with --load-and-stream.
--primary-replica-only cannot be used with --scope, they are mutually exclusive.
--primary-replica-only requires repair to be run after the load and stream operation is completed.
Scope
-----

View File

@@ -53,7 +53,6 @@ Options
* ``--nowait`` - Don't wait on the restore process
* ``--scope <scope>`` - Use specified load-and-stream scope
* ``--sstables-file-list <file>`` - restore the sstables listed in the given <file>. the list should be new-line separated.
* ``--primary-replica-only`` - Load the sstables and stream to primary replica node that owns the data. Repair is needed after the restore process
* ``<sstables>`` - Remainder of keys of the TOC (Table of Contents) components of SSTables to restore, relative to the specified prefix
The `scope` parameter describes the subset of cluster nodes where you want to load data:

View File

@@ -165,7 +165,7 @@ bytes hmac_sha256(bytes_view msg, bytes_view key) {
return res;
}
future<temporary_buffer<char>> read_text_file_fully(const std::string& filename) {
future<temporary_buffer<char>> read_text_file_fully(const sstring& filename) {
return open_file_dma(filename, open_flags::ro).then([](file f) {
return f.size().then([f](size_t s) {
return do_with(make_file_input_stream(f), [s](input_stream<char>& in) {
@@ -179,7 +179,7 @@ future<temporary_buffer<char>> read_text_file_fully(const std::string& filename)
});
}
future<> write_text_file_fully(const std::string& filename, temporary_buffer<char> buf) {
future<> write_text_file_fully(const sstring& filename, temporary_buffer<char> buf) {
return open_file_dma(filename, open_flags::wo|open_flags::create).then([buf = std::move(buf)](file f) mutable {
return make_file_output_stream(f).then([buf = std::move(buf)] (output_stream<char> out) mutable {
return do_with(std::move(out), [buf = std::move(buf)](output_stream<char>& out) mutable {
@@ -193,7 +193,7 @@ future<> write_text_file_fully(const std::string& filename, temporary_buffer<cha
});
}
future<> write_text_file_fully(const std::string& filename, const std::string& s) {
future<> write_text_file_fully(const sstring& filename, const sstring& s) {
return write_text_file_fully(filename, temporary_buffer<char>(s.data(), s.size()));
}

View File

@@ -63,9 +63,9 @@ bytes calculate_sha256(const bytes&, size_t off = 0, size_t n = bytes::npos);
bytes calculate_sha256(bytes_view);
bytes hmac_sha256(bytes_view msg, bytes_view key);
future<temporary_buffer<char>> read_text_file_fully(const std::string&);
future<> write_text_file_fully(const std::string&, temporary_buffer<char>);
future<> write_text_file_fully(const std::string&, const std::string&);
future<temporary_buffer<char>> read_text_file_fully(const sstring&);
future<> write_text_file_fully(const sstring&, temporary_buffer<char>);
future<> write_text_file_fully(const sstring&, const sstring&);
std::optional<std::chrono::milliseconds> parse_expiry(std::optional<std::string>);

View File

@@ -32,7 +32,6 @@
#include "encryption_exceptions.hh"
#include "symmetric_key.hh"
#include "utils.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
#include "utils/UUID.hh"
@@ -46,43 +45,13 @@ using namespace std::string_literals;
logger kms_log("kms");
using httpclient = rest::httpclient;
static std::string get_response_error(httpclient::reply_status res) {
switch (res) {
case httpclient::reply_status::unauthorized: case httpclient::reply_status::forbidden: return "AccessDenied";
case httpclient::reply_status::not_found: return "ResourceNotFound";
case httpclient::reply_status::too_many_requests: return "SlowDown";
case httpclient::reply_status::internal_server_error: return "InternalError";
case httpclient::reply_status::service_unavailable: return "ServiceUnavailable";
case httpclient::reply_status::request_timeout: case httpclient::reply_status::gateway_timeout:
case httpclient::reply_status::network_connect_timeout: case httpclient::reply_status::network_read_timeout:
return "RequestTimeout";
default:
return format("{}", res);
}
};
class kms_error : public std::exception {
httpclient::reply_status _res;
std::string _type, _msg;
public:
kms_error(httpclient::reply_status res, std::string type, std::string_view msg)
: _res(res)
, _type(std::move(type))
, _msg(fmt::format("{}: {}", _type, msg))
kms_error(std::string_view type, std::string_view msg)
: _type(type)
, _msg(fmt::format("{}: {}", type, msg))
{}
kms_error(httpclient::reply_status res, std::string_view msg)
: _res(res)
, _type(get_response_error(res))
, _msg(fmt::format("{}: {}", _type, msg))
{}
kms_error(const httpclient::result_type& res, std::string_view msg)
: kms_error(res.result(), msg)
{}
httpclient::reply_status result() const {
return _res;
}
const std::string& type() const {
return _type;
}
@@ -232,9 +201,7 @@ private:
using result_type = httpclient::result_type;
future<result_type> post(aws_query);
future<rjson::value> post(std::string_view target, std::string_view aws_assume_role_arn, const rjson::value& query);
future<rjson::value> do_post(std::string_view target, std::string_view aws_assume_role_arn, const rjson::value& query);
future<key_and_id_type> create_key(const attr_cache_key&);
future<bytes> find_key(const id_cache_key&);
@@ -371,27 +338,21 @@ struct encryption::kms_host::impl::aws_query {
};
future<rjson::value> encryption::kms_host::impl::post(std::string_view target, std::string_view aws_assume_role_arn, const rjson::value& query) {
static constexpr auto max_retries = 10;
exponential_backoff_retry exr(10ms, 10000ms);
for (int retry = 0; ; ++retry) {
try {
co_return co_await do_post(target, aws_assume_role_arn, query);
} catch (kms_error& e) {
// Special case 503. This can be both actual service or ec2 metadata.
// In either case, do local backoff-retry here.
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html#instance-metadata-returns
if (e.result() != httpclient::reply_status::service_unavailable || retry >= max_retries) {
throw;
}
static auto get_response_error = [](const result_type& res) -> std::string {
switch (res.result()) {
case httpclient::reply_status::unauthorized: case httpclient::reply_status::forbidden: return "AccessDenied";
case httpclient::reply_status::not_found: return "ResourceNotFound";
case httpclient::reply_status::too_many_requests: return "SlowDown";
case httpclient::reply_status::internal_server_error: return "InternalError";
case httpclient::reply_status::service_unavailable: return "ServiceUnavailable";
case httpclient::reply_status::request_timeout: case httpclient::reply_status::gateway_timeout:
case httpclient::reply_status::network_connect_timeout: case httpclient::reply_status::network_read_timeout:
return "RequestTimeout";
default:
return format("{}", res.result());
}
};
co_await exr.retry();
}
}
future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target, std::string_view aws_assume_role_arn, const rjson::value& query) {
static auto query_ec2_meta = [](std::string_view target, std::string token = {}) -> future<std::tuple<httpclient::result_type, std::string>> {
static auto get_env_def = [](std::string_view var, std::string_view def) {
auto val = std::getenv(var.data());
@@ -421,7 +382,7 @@ future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target
}
kms_log.trace("Result: status={}, response={}", res.result_int(), res);
if (res.result() != httpclient::reply_status::ok) {
throw kms_error(res, "EC2 metadata query");
throw kms_error(get_response_error(res), "EC2 metadata query");
}
co_return res;
};
@@ -433,8 +394,13 @@ future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target
client.method(httpclient::method_type::PUT);
client.target("/latest/api/token");
auto res = co_await logged_send(client);
if (res.result() != httpclient::reply_status::ok) {
throw kms_error(get_response_error(res), "EC2 metadata token query");
}
token = res.body();
client.clear_headers();
}
@@ -575,7 +541,7 @@ future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target
aws_secret_access_key = rjson::get<std::string>(body, "SecretAccessKey");
session = rjson::get<std::string>(body, "Token");
} catch (rjson::malformed_value&) {
std::throw_with_nested(kms_error(httpclient::reply_status::forbidden, fmt::format("Code={}, Message={}"
std::throw_with_nested(kms_error("AccessDenied", fmt::format("Code={}, Message={}"
, rjson::get_opt<std::string>(body, "Code")
, rjson::get_opt<std::string>(body, "Message")
)));
@@ -607,7 +573,7 @@ future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target
});
if (res.result() != httpclient::reply_status::ok) {
throw kms_error(res, "AssumeRole");
throw kms_error(get_response_error(res), "AssumeRole");
}
rapidxml::xml_document<> doc;
@@ -620,7 +586,7 @@ future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target
static auto get_xml_node = [](node_type* node, const char* what) {
auto res = node->first_node(what);
if (!res) {
throw malformed_response_error(fmt::format("XML parse error", what));
throw kms_error("XML parse error", what);
}
return res;
};
@@ -637,7 +603,7 @@ future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target
session = token->value();
} catch (const rapidxml::parse_error& e) {
std::throw_with_nested(malformed_response_error("XML parse error: AssumeRole"));
std::throw_with_nested(kms_error("XML parse error", "AssumeRole"));
}
}
@@ -684,11 +650,9 @@ future<rjson::value> encryption::kms_host::impl::do_post(std::string_view target
o = rjson::get_opt<std::string>(body, type_header);
}
// this should never happen with aws, but...
if (!o) {
throw kms_error(res, msg);
}
auto type = o ? *o : get_response_error(res);
throw kms_error(res.result(), *o, msg);
throw kms_error(type, msg);
}
co_return body;

View File

@@ -233,10 +233,10 @@ go_arch() {
echo ${GO_ARCH["$(arch)"]}
}
NODE_EXPORTER_VERSION=1.10.2
NODE_EXPORTER_VERSION=1.9.0
declare -A NODE_EXPORTER_CHECKSUM=(
["x86_64"]=c46e5b6f53948477ff3a19d97c58307394a29fe64a01905646f026ddc32cb65b
["aarch64"]=de69ec8341c8068b7c8e4cfe3eb85065d24d984a3b33007f575d307d13eb89a6
["x86_64"]=e7b65ea30eec77180487d518081d3dcb121b975f6d95f1866dfb9156c5b24075
["aarch64"]=5314fae1efff19abf807cfc8bd7dadbd47a35565c1043c236ffb0689dc15ef4f
)
NODE_EXPORTER_DIR=/opt/scylladb/dependencies

View File

@@ -557,7 +557,7 @@ static_effective_replication_map::~static_effective_replication_map() {
vnode_effective_replication_map::~vnode_effective_replication_map() {
if (is_registered()) {
try {
_factory->submit_background_work(dispose_gently(std::move(_replication_map),
_factory->submit_background_work(clear_gently(std::move(_replication_map),
std::move(*_pending_endpoints),
std::move(*_read_endpoints),
std::move(_tmptr)));

View File

@@ -13,7 +13,6 @@
#include "locator/tablet_sharder.hh"
#include "locator/token_range_splitter.hh"
#include "db/system_keyspace.hh"
#include "locator/topology.hh"
#include "replica/database.hh"
#include "utils/stall_free.hh"
#include "utils/rjson.hh"
@@ -241,7 +240,7 @@ tablet_replica_set get_new_replicas(const tablet_info& tinfo, const tablet_migra
return replace_replica(tinfo.replicas, mig.src, mig.dst);
}
tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, tablet_id tid, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter) {
tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, tablet_id tid, std::function<bool(const tablet_replica&)> filter) {
const auto& info = tablet_map.get_tablet_info(tid);
const auto* transition = tablet_map.get_tablet_transition_info(tid);
@@ -251,8 +250,8 @@ tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, t
}
return transition->writes;
};
auto primary = [tid, filter = std::move(filter), &topo] (tablet_replica_set set) -> std::optional<tablet_replica> {
return maybe_get_primary_replica(tid, set, topo, filter);
auto primary = [tid, filter = std::move(filter)] (tablet_replica_set set) -> std::optional<tablet_replica> {
return maybe_get_primary_replica(tid, set, filter);
};
auto add = [] (tablet_replica r1, tablet_replica r2) -> tablet_replica_set {
// if primary replica is not the one leaving, then only primary will be streamed to.
@@ -556,30 +555,14 @@ dht::token_range tablet_map::get_token_range_after_split(const token& t) const n
return get_token_range(id_after_split, log2_tablets_after_split);
}
auto tablet_replica_comparator(const locator::topology& topo) {
return [&topo](const tablet_replica& a, const tablet_replica& b) {
const auto loc_a = topo.get_location(a.host);
const auto loc_b = topo.get_location(b.host);
if (loc_a.dc != loc_b.dc) {
return loc_a.dc < loc_b.dc;
}
if (loc_a.rack != loc_b.rack) {
return loc_a.rack < loc_b.rack;
}
return a.host < b.host;
};
}
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter) {
tablet_replica_set replica_set_copy = replica_set;
std::ranges::sort(replica_set_copy, tablet_replica_comparator(topo));
const auto replicas = replica_set_copy | std::views::filter(std::move(filter)) | std::ranges::to<tablet_replica_set>();
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, std::function<bool(const tablet_replica&)> filter) {
const auto replicas = replica_set | std::views::filter(std::move(filter)) | std::ranges::to<tablet_replica_set>();
return !replicas.empty() ? std::make_optional(replicas.at(size_t(id) % replicas.size())) : std::nullopt;
}
tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topology& topo) const {
tablet_replica tablet_map::get_primary_replica(tablet_id id) const {
const auto& replicas = get_tablet_info(id).replicas;
return maybe_get_primary_replica(id, replicas, topo, [&] (const auto& _) { return true; }).value();
return replicas.at(size_t(id) % replicas.size());
}
tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
@@ -591,7 +574,7 @@ tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
}
std::optional<tablet_replica> tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const {
return maybe_get_primary_replica(id, get_tablet_info(id).replicas, topo, [&] (const auto& tr) {
return maybe_get_primary_replica(id, get_tablet_info(id).replicas, [&] (const auto& tr) {
return tablet_task_info.selected_by_filters(tr, topo);
});
}
@@ -994,28 +977,6 @@ lw_shared_ptr<load_stats> load_stats::reconcile_tablets_resize(const std::unorde
return reconciled_stats;
}
lw_shared_ptr<load_stats> load_stats::migrate_tablet_size(locator::host_id leaving, locator::host_id pending, locator::global_tablet_id gid, const dht::token_range trange) const {
lw_shared_ptr<load_stats> result;
if (leaving != pending) {
range_based_tablet_id rb_tid {gid.table, trange};
if (get_tablet_size(leaving, rb_tid) && !get_tablet_size(pending, rb_tid) && tablet_stats.contains(pending)) {
tablet_logger.debug("Moving tablet size for tablet: {} from: {} to: {}", gid, leaving, pending);
result = make_lw_shared<locator::load_stats>(*this);
auto& new_leaving_ts = result->tablet_stats.at(leaving);
auto& new_pending_ts = result->tablet_stats.at(pending);
auto map_node = new_leaving_ts.tablet_sizes.at(gid.table).extract(trange);
new_pending_ts.tablet_sizes[gid.table].insert(std::move(map_node));
if (new_leaving_ts.tablet_sizes.at(gid.table).empty()) {
new_leaving_ts.tablet_sizes.erase(gid.table);
}
}
}
return result;
}
tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges)
: _schema(std::move(schema))
, _ranges(ranges)

View File

@@ -355,7 +355,7 @@ class tablet_map;
/// Returns the replica set which will become the replica set of the tablet after executing a given tablet transition.
tablet_replica_set get_new_replicas(const tablet_info&, const tablet_migration_info&);
// If filter returns true, the replica can be chosen as primary replica.
tablet_replica_set get_primary_replicas(const locator::tablet_map&, tablet_id, const locator::topology&, std::function<bool(const tablet_replica&)> filter);
tablet_replica_set get_primary_replicas(const locator::tablet_map&, tablet_id, std::function<bool(const tablet_replica&)> filter);
tablet_transition_info migration_to_transition_info(const tablet_info&, const tablet_migration_info&);
/// Describes streaming required for a given tablet transition.
@@ -490,15 +490,6 @@ struct load_stats {
// corresponding to the post-resize tablet_map.
// In case any pre-resize tablet replica is not found, the function returns nullptr
lw_shared_ptr<load_stats> reconcile_tablets_resize(const std::unordered_set<table_id>& tables, const token_metadata& old_tm, const token_metadata& new_tm) const;
// Modifies the tablet sizes in load_stats by moving the size of a tablet from leaving to pending host.
// The function returns modified load_stats if the tablet size was successfully migrated.
// It returns nullptr if any of the following is true:
// - tablet was not found on the leaving host
// - tablet was found on the pending host
// - pending and leaving hosts are equal (in case of intranode migration)
// - pending host is not found in load_stats.tablet_stats
lw_shared_ptr<load_stats> migrate_tablet_size(locator::host_id leaving, locator::host_id pending, locator::global_tablet_id gid, const dht::token_range trange) const;
};
using load_stats_v2 = load_stats;
@@ -615,7 +606,7 @@ public:
dht::token_range get_token_range(tablet_id id) const;
/// Returns the primary replica for the tablet
tablet_replica get_primary_replica(tablet_id id, const locator::topology& topo) const;
tablet_replica get_primary_replica(tablet_id id) const;
/// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector
/// \throws std::runtime_error if the tablet has less than 2 replicas.
@@ -803,7 +794,7 @@ public:
// Check that all tablets which have replicas on this host, have a valid replica shard (< smp::count).
future<bool> check_tablet_replica_shards(const tablet_metadata& tm, host_id this_host);
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter);
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, std::function<bool(const tablet_replica&)> filter);
struct tablet_routing_info {
tablet_replica_set tablet_replicas;
@@ -879,10 +870,6 @@ void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr
/// Returns the list of racks that can be used for placing replicas in a given DC.
rack_list get_allowed_racks(const locator::token_metadata&, const sstring& dc);
/// Returns a comparator function that can be used to sort tablet_replicas
/// according to <dc, rack, host_id> order in the given topology.
auto tablet_replica_comparator(const locator::topology& topo);
}
template <>

View File

@@ -1366,6 +1366,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto destroy_tracing = defer_verbose_shutdown("tracing instance", [&tracing] {
tracing.stop().get();
});
audit::audit::create_audit(*cfg, token_metadata).handle_exception([&] (auto&& e) {
startlog.error("audit creation failed: {}", e);
}).get();
stop_signal.check();
ctx.http_server.server().invoke_on_all([] (auto& server) { server.set_content_streaming(true); }).get();
@@ -1699,7 +1702,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
checkpoint(stop_signal, "starting migration manager");
debug::the_migration_manager = &mm;
mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(ss), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] {
mm.stop().get();
});
@@ -2504,9 +2507,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
seastar::set_abort_on_ebadf(cfg->abort_on_ebadf());
api::set_server_done(ctx).get();
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
audit::audit::start_audit(*cfg, qp, mm).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});

View File

@@ -364,25 +364,19 @@ sstring messaging_service::client_metrics_domain(unsigned idx, inet_address addr
return ret;
}
future<> messaging_service::ban_hosts(const utils::chunked_vector<locator::host_id>& ids) {
if (ids.empty()) {
return make_ready_future<>();
}
return container().invoke_on_all([&ids] (messaging_service& ms) {
if (ms.is_shutting_down()) {
future<> messaging_service::ban_host(locator::host_id id) {
return container().invoke_on_all([id] (messaging_service& ms) {
if (ms._banned_hosts.contains(id) || ms.is_shutting_down()) {
return;
}
for (const auto id: ids) {
if (const auto [it, inserted] = ms._banned_hosts.insert(id); !inserted) {
continue;
}
auto [start, end] = ms._host_connections.equal_range(id);
for (auto it = start; it != end; ++it) {
auto& conn_ref = it->second;
conn_ref.server.abort_connection(conn_ref.conn_id);
}
ms._host_connections.erase(start, end);
ms._banned_hosts.insert(id);
auto [start, end] = ms._host_connections.equal_range(id);
for (auto it = start; it != end; ++it) {
auto& conn_ref = it->second;
conn_ref.server.abort_connection(conn_ref.conn_id);
}
ms._host_connections.erase(start, end);
});
}

View File

@@ -440,11 +440,11 @@ public:
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
// Drops all connections from the given hosts and prevents further communication from it to happen.
// Drops all connections from the given host and prevents further communication from it to happen.
//
// No further RPC handlers will be called for that node,
// but we don't prevent handlers that were started concurrently from finishing.
future<> ban_hosts(const utils::chunked_vector<locator::host_id>& ids);
future<> ban_host(locator::host_id);
msg_addr addr_for_host_id(locator::host_id hid);
private:

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:875c2435bce1e93bab492bdad21b7efe586a4fa22149e9526d219df77f0c3dfd
size 6411264
oid sha256:012ccbeb5c93878bf260f751ff55faa723f235ee796dc7e31c4e14c1bcc0efae
size 6408088

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:57294d7a476c1bfba10f038f01e3b236ac45d11e94c71918e1e5d0ec3d6a9212
size 6420604
oid sha256:acb4310f476a7dac4a645ae6babae22af2541c37ed368eac666ff3bd24f1a56a
size 6406700

View File

@@ -757,8 +757,7 @@ private:
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& = sstables::default_sstable_predicate(),
sstables::integrity_check integrity = sstables::integrity_check::no) const;
const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const;
lw_shared_ptr<const sstables::sstable_set> make_compound_sstable_set() const;
// Compound sstable set must be refreshed whenever any of its managed sets are changed

View File

@@ -92,18 +92,17 @@ table::make_sstable_reader(schema_ptr s,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& predicate,
sstables::integrity_check integrity) const {
const sstables::sstable_predicate& predicate) const {
// CAVEAT: if make_sstable_reader() is called on a single partition
// we want to optimize and read exactly this partition. As a
// consequence, fast_forward_to() will *NOT* work on the result,
// regardless of what the fwd_mr parameter says.
if (pr.is_singular() && pr.start()->value().has_key()) {
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity);
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate);
} else {
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice,
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate, nullptr, integrity);
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), predicate);
}
}
@@ -311,8 +310,7 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
add_memtables_to_reader_list(readers, s, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
readers.reserve(memtable_count + 1);
});
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice,
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
});
@@ -333,8 +331,7 @@ mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit pe
add_memtables_to_reader_list(readers, schema, permit, range, slice, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
readers.reserve(memtable_count + 1);
});
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice,
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr));
return maybe_compact_for_streaming(
make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr),
get_compaction_manager(),
@@ -351,7 +348,7 @@ mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit pe
const auto fwd_mr = mutation_reader::forwarding::no;
return maybe_compact_for_streaming(
sstables->make_range_sstable_reader(std::move(schema), std::move(permit), range, slice,
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), sstables::integrity_check::yes),
std::move(trace_state), fwd, fwd_mr),
get_compaction_manager(),
compaction_time,
_config.enable_compacting_data_for_streaming_and_repair(),

View File

@@ -934,8 +934,7 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
const sstables::sstable_predicate&,
sstables::integrity_check integrity = sstables::integrity_check::no) const override;
const sstables::sstable_predicate&) const override;
// Will always return an engaged sstable set ptr.
const lw_shared_ptr<const sstables::sstable_set>& find_sstable_set(size_t i) const {
@@ -1172,8 +1171,7 @@ tablet_sstable_set::create_single_key_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& predicate,
sstables::integrity_check integrity) const {
const sstables::sstable_predicate& predicate) const {
// The singular partition_range start bound must be engaged.
auto idx = group_of(pr.start()->value().token());
const auto& set = find_sstable_set(idx);

View File

@@ -27,7 +27,7 @@ frozen_schema::frozen_schema(const schema_ptr& s)
}())
{ }
schema_ptr frozen_schema::unfreeze(const db::schema_ctxt& ctxt, schema_ptr cdc_schema, std::optional<db::view::base_dependent_view_info> base_info) const {
schema_ptr frozen_schema::unfreeze(const db::schema_ctxt& ctxt, std::optional<db::view::base_dependent_view_info> base_info) const {
auto in = ser::as_input_stream(_data);
auto sv = ser::deserialize(in, std::type_identity<ser::schema_view>());
auto sm = sv.mutations();
@@ -37,7 +37,7 @@ schema_ptr frozen_schema::unfreeze(const db::schema_ctxt& ctxt, schema_ptr cdc_s
if (base_info) {
throw std::runtime_error("Trying to unfreeze regular table schema with base info");
}
return db::schema_tables::create_table_from_mutations(ctxt, std::move(sm), ctxt.user_types(), std::move(cdc_schema), sv.version());
return db::schema_tables::create_table_from_mutations(ctxt, std::move(sm), ctxt.user_types(), sv.version());
}
}
@@ -50,24 +50,12 @@ const bytes_ostream& frozen_schema::representation() const
return _data;
}
extended_frozen_schema::extended_frozen_schema(const schema_ptr& c)
: fs(c),
base_info([&c] -> std::optional<db::view::base_dependent_view_info> {
if (c->is_view()) {
return c->view_info()->base_info();
}
return std::nullopt;
}()),
frozen_cdc_schema([&c] -> std::optional<frozen_schema> {
if (c->cdc_schema()) {
return frozen_schema(c->cdc_schema());
}
return std::nullopt;
}())
{
frozen_schema_with_base_info::frozen_schema_with_base_info(const schema_ptr& c) : frozen_schema(c) {
if (c->is_view()) {
base_info = c->view_info()->base_info();
}
}
schema_ptr extended_frozen_schema::unfreeze(const db::schema_ctxt& ctxt) const {
auto cdc_schema = frozen_cdc_schema ? frozen_cdc_schema->unfreeze(ctxt, nullptr, {}) : nullptr;
return fs.unfreeze(ctxt, std::move(cdc_schema), base_info);
schema_ptr frozen_schema_with_base_info::unfreeze(const db::schema_ctxt& ctxt) const {
return frozen_schema::unfreeze(ctxt, base_info);
}

View File

@@ -28,17 +28,17 @@ public:
frozen_schema(const frozen_schema&) = default;
frozen_schema& operator=(const frozen_schema&) = default;
frozen_schema& operator=(frozen_schema&&) = default;
schema_ptr unfreeze(const db::schema_ctxt&, schema_ptr cdc_schema, std::optional<db::view::base_dependent_view_info> base_info = {}) const;
schema_ptr unfreeze(const db::schema_ctxt&, std::optional<db::view::base_dependent_view_info> base_info = {}) const;
const bytes_ostream& representation() const;
};
// A frozen schema with additional information that is needed to be transported
// with it to be used for unfreezing it.
struct extended_frozen_schema {
extended_frozen_schema(const schema_ptr& c);
// To unfreeze view without base table added to schema registry
// we need base_info.
class frozen_schema_with_base_info : public frozen_schema {
public:
frozen_schema_with_base_info(const schema_ptr& c);
schema_ptr unfreeze(const db::schema_ctxt& ctxt) const;
frozen_schema fs;
std::optional<db::view::base_dependent_view_info> base_info; // Set only for views.
std::optional<frozen_schema> frozen_cdc_schema; // Set only for tables with CDC enabled.
private:
// Set only for views.
std::optional<db::view::base_dependent_view_info> base_info;
};

View File

@@ -70,7 +70,7 @@ speculative_retry::from_sstring(sstring str) {
try {
return boost::lexical_cast<double>(str.substr(0, str.size() - t.size()));
} catch (boost::bad_lexical_cast& e) {
throw exceptions::configuration_exception(format("cannot convert {} to speculative_retry\n", str));
throw std::invalid_argument(format("cannot convert {} to speculative_retry\n", str));
}
};
@@ -86,12 +86,12 @@ speculative_retry::from_sstring(sstring str) {
} else if (str.compare(str.size() - percentile.size(), percentile.size(), percentile) == 0) {
t = type::PERCENTILE;
v = convert(percentile) / 100;
if (v < 0.0 || v > 1.0) {
if (v <= 0.0 || v >= 1.0) {
throw exceptions::configuration_exception(
format("Invalid value {} for PERCENTILE option 'speculative_retry': must be between [0.0 and 100.0]", str));
format("Invalid value {} for PERCENTILE option 'speculative_retry': must be between (0.0 and 100.0)", str));
}
} else {
throw exceptions::configuration_exception(format("cannot convert {} to speculative_retry\n", str));
throw std::invalid_argument(format("cannot convert {} to speculative_retry\n", str));
}
return speculative_retry(t, v);
}
@@ -413,10 +413,9 @@ schema::raw_schema::raw_schema(table_id id)
, _sharder(::get_sharder(smp::count, default_partitioner_ignore_msb))
{ }
schema::schema(private_tag, const raw_schema& raw, const schema_static_props& props, schema_ptr cdc_schema, std::optional<std::variant<schema_ptr, db::view::base_dependent_view_info>> base)
schema::schema(private_tag, const raw_schema& raw, const schema_static_props& props, std::optional<std::variant<schema_ptr, db::view::base_dependent_view_info>> base)
: _raw(raw)
, _static_props(props)
, _cdc_schema(cdc_schema)
, _offsets([this] {
if (_raw._columns.size() > std::numeric_limits<column_count_type>::max()) {
throw std::runtime_error(format("Column count limit ({:d}) overflowed: {:d}",
@@ -519,7 +518,6 @@ schema::schema(private_tag, const raw_schema& raw, const schema_static_props& pr
schema::schema(const schema& o, const std::function<void(schema&)>& transform)
: _raw(o._raw)
, _static_props(o._static_props)
, _cdc_schema(o._cdc_schema)
, _offsets(o._offsets)
{
// Do the transformation after all the raw fields are initialized, but
@@ -551,13 +549,6 @@ schema::schema(reversed_tag, const schema& o)
{
}
schema::schema(with_cdc_schema_tag, const schema& o, schema_ptr cdc_schema)
: schema(o, [cdc_schema] (schema& s) {
s._cdc_schema = cdc_schema;
})
{
}
schema::~schema() {
if (_registry_entry) {
_registry_entry->detach_schema();
@@ -1311,10 +1302,6 @@ schema_builder::schema_builder(const schema_ptr s)
_base_info = s->view_info()->base_info();
_view_info = s->view_info()->raw();
}
if (s->cdc_schema()) {
_cdc_schema = s->cdc_schema();
}
}
schema_builder::schema_builder(const schema::raw_schema& raw)
@@ -1562,13 +1549,6 @@ schema_builder& schema_builder::with_view_info(table_id base_id, sstring base_na
return *this;
}
schema_builder& schema_builder::with_cdc_schema(schema_ptr cdc_schema) {
if (cdc_schema) {
_cdc_schema = std::move(cdc_schema);
}
return *this;
}
schema_builder& schema_builder::with_index(const index_metadata& im) {
_raw._indices_by_name.emplace(im.name(), im);
return *this;
@@ -1671,11 +1651,11 @@ schema_ptr schema_builder::build(schema::raw_schema& new_raw) {
), _version);
if (_base_info) {
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props, nullptr, _base_info);
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props, _base_info);
} else if (_base_schema) {
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props, nullptr, _base_schema);
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props, _base_schema);
}
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props, _cdc_schema ? *_cdc_schema : nullptr);
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props);
}
auto schema_builder::static_configurators() -> std::vector<static_configurator>& {
@@ -2103,14 +2083,14 @@ schema_ptr schema::make_reversed() const {
}
schema_ptr schema::get_reversed() const {
return local_schema_registry().get_or_load(reversed(_raw._version), [this] (table_schema_version) -> extended_frozen_schema {
return local_schema_registry().get_or_load(reversed(_raw._version), [this] (table_schema_version) -> view_schema_and_base_info {
auto s = make_reversed();
return extended_frozen_schema(s);
});
}
schema_ptr schema::make_with_cdc(schema_ptr cdc_schema) const {
return make_lw_shared<schema>(schema::with_cdc_schema_tag{}, *this, cdc_schema);
if (s->is_view()) {
return {frozen_schema(s), s->view_info()->base_info()};
}
return {frozen_schema(s)};
});
}
raw_view_info::raw_view_info(table_id base_id, sstring base_name, bool include_all_columns, sstring where_clause)

View File

@@ -579,7 +579,6 @@ private:
v3_columns _v3_columns;
mutable schema_registry_entry* _registry_entry = nullptr;
std::unique_ptr<::view_info> _view_info;
schema_ptr _cdc_schema;
const std::array<column_count_type, 3> _offsets;
@@ -636,7 +635,6 @@ public:
};
private:
struct reversed_tag { };
struct with_cdc_schema_tag { };
lw_shared_ptr<cql3::column_specification> make_column_specification(const column_definition& def) const;
void rebuild();
@@ -644,11 +642,10 @@ private:
schema(const schema&, const std::function<void(schema&)>&);
class private_tag{};
public:
schema(private_tag, const raw_schema&, const schema_static_props& props, schema_ptr cdc_schema, std::optional<std::variant<schema_ptr, db::view::base_dependent_view_info>> base = std::nullopt);
schema(private_tag, const raw_schema&, const schema_static_props& props, std::optional<std::variant<schema_ptr, db::view::base_dependent_view_info>> base = std::nullopt);
schema(const schema&);
// See \ref make_reversed().
schema(reversed_tag, const schema&);
schema(with_cdc_schema_tag, const schema&, schema_ptr cdc_schema);
~schema();
const schema_static_props& static_props() const {
return _static_props;
@@ -891,9 +888,6 @@ public:
bool is_view() const {
return bool(_view_info);
}
schema_ptr cdc_schema() const {
return _cdc_schema;
}
const query::partition_slice& full_slice() const {
return *_full_slice;
}
@@ -969,8 +963,6 @@ public:
// The schema's version is also reversed using UUID_gen::negate().
schema_ptr make_reversed() const;
schema_ptr make_with_cdc(schema_ptr cdc_schema) const;
// Get the reversed counterpart of this schema from the schema registry.
//
// If not present in the registry, create one (via \ref make_reversed()) and

View File

@@ -32,7 +32,6 @@ private:
std::optional<raw_view_info> _view_info;
std::optional<schema_ptr> _base_schema;
std::optional<db::view::base_dependent_view_info> _base_info;
std::optional<schema_ptr> _cdc_schema;
schema_builder(const schema::raw_schema&);
static std::vector<static_configurator>& static_configurators();
public:
@@ -280,8 +279,6 @@ public:
schema_builder& with_view_info(schema_ptr base_schema, bool include_all_columns, sstring where_clause);
schema_builder& with_view_info(table_id base_id, sstring base_name, bool include_all_columns, sstring where_clause, db::view::base_dependent_view_info base);
schema_builder& with_cdc_schema(schema_ptr cdc_schema);
schema_builder& with_index(const index_metadata& im);
schema_builder& without_index(const sstring& name);
schema_builder& without_indexes();

View File

@@ -77,13 +77,9 @@ void schema_registry::attach_table(schema_registry_entry& e) noexcept {
}
}
schema_ptr schema_registry::learn(schema_ptr s) {
auto learned_cdc_schema = s->cdc_schema() ? local_schema_registry().learn(s->cdc_schema()) : nullptr;
if (learned_cdc_schema != s->cdc_schema()) {
s = s->make_with_cdc(learned_cdc_schema);
}
schema_ptr schema_registry::learn(const schema_ptr& s) {
if (s->registry_entry()) {
return s;
return std::move(s);
}
auto i = _entries.find(s->version());
if (i != _entries.end()) {
@@ -175,8 +171,11 @@ void schema_registry::clear() {
_entries.clear();
}
schema_ptr schema_registry_entry::load(extended_frozen_schema fs) {
_extended_frozen_schema = std::move(fs);
schema_ptr schema_registry_entry::load(view_schema_and_base_info fs) {
_frozen_schema = std::move(fs.schema);
if (fs.base_info) {
_base_info = std::move(fs.base_info);
}
auto s = get_schema();
if (_state == state::LOADING) {
_schema_promise.set_value(s);
@@ -188,7 +187,10 @@ schema_ptr schema_registry_entry::load(extended_frozen_schema fs) {
}
schema_ptr schema_registry_entry::load(schema_ptr s) {
_extended_frozen_schema = extended_frozen_schema(s);
_frozen_schema = frozen_schema(s);
if (s->is_view()) {
_base_info = s->view_info()->base_info();
}
_schema = &*s;
_schema->_registry_entry = this;
_erase_timer.cancel();
@@ -208,7 +210,7 @@ future<schema_ptr> schema_registry_entry::start_loading(async_schema_loader load
_state = state::LOADING;
slogger.trace("Loading {}", _version);
// Move to background.
(void)f.then_wrapped([self = shared_from_this(), this] (future<extended_frozen_schema>&& f) {
(void)f.then_wrapped([self = shared_from_this(), this] (future<view_schema_and_base_info>&& f) {
_loader = {};
if (_state != state::LOADING) {
slogger.trace("Loading of {} aborted", _version);
@@ -234,7 +236,11 @@ schema_ptr schema_registry_entry::get_schema() {
if (!_schema) {
slogger.trace("Activating {}", _version);
schema_ptr s;
s = _extended_frozen_schema->unfreeze(*_registry._ctxt);
if (_base_info) {
s = _frozen_schema->unfreeze(*_registry._ctxt, *_base_info);
} else {
s = _frozen_schema->unfreeze(*_registry._ctxt);
}
if (s->version() != _version) {
throw std::runtime_error(format("Unfrozen schema version doesn't match entry version ({}): {}", _version, *s));
}
@@ -253,14 +259,9 @@ void schema_registry_entry::detach_schema() noexcept {
_erase_timer.arm(_registry.grace_period());
}
extended_frozen_schema schema_registry_entry::extended_frozen() const {
SCYLLA_ASSERT(_state >= state::LOADED);
return *_extended_frozen_schema;
}
frozen_schema schema_registry_entry::frozen() const {
SCYLLA_ASSERT(_state >= state::LOADED);
return _extended_frozen_schema->fs;
return *_frozen_schema;
}
future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
@@ -329,17 +330,18 @@ global_schema_ptr::global_schema_ptr(global_schema_ptr&& o) noexcept {
SCYLLA_ASSERT(o._cpu_of_origin == current);
_ptr = std::move(o._ptr);
_cpu_of_origin = current;
_base_info = std::move(o._base_info);
}
schema_ptr global_schema_ptr::get() const {
if (this_shard_id() == _cpu_of_origin) {
return _ptr;
} else {
auto registered_schema = [](const schema_registry_entry& e) -> schema_ptr {
auto registered_schema = [](const schema_registry_entry& e, std::optional<db::view::base_dependent_view_info> base_info = std::nullopt) -> schema_ptr {
schema_ptr ret = local_schema_registry().get_or_null(e.version());
if (!ret) {
ret = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) -> extended_frozen_schema {
return e.extended_frozen();
ret = local_schema_registry().get_or_load(e.version(), [&e, &base_info](table_schema_version) -> view_schema_and_base_info {
return {e.frozen(), base_info};
});
}
return ret;
@@ -350,7 +352,7 @@ schema_ptr global_schema_ptr::get() const {
// that _ptr will have a registry on the foreign shard where this
// object originated so as long as this object lives the registry entries lives too
// and it is safe to reference them on foreign shards.
schema_ptr s = registered_schema(*_ptr->registry_entry());
schema_ptr s = registered_schema(*_ptr->registry_entry(), _base_info);
if (_ptr->registry_entry()->is_synced()) {
s->registry_entry()->mark_synced();
}
@@ -367,11 +369,18 @@ global_schema_ptr::global_schema_ptr(const schema_ptr& ptr)
if (e) {
return s;
} else {
return local_schema_registry().get_or_load(s->version(), [&s] (table_schema_version) -> extended_frozen_schema {
return extended_frozen_schema(s);
return local_schema_registry().get_or_load(s->version(), [&s] (table_schema_version) -> view_schema_and_base_info {
if (s->is_view()) {
return {frozen_schema(s), s->view_info()->base_info()};
} else {
return {frozen_schema(s)};
}
});
}
};
_ptr = ensure_registry_entry(ptr);
if (_ptr->is_view()) {
_base_info = _ptr->view_info()->base_info();
}
}

View File

@@ -15,15 +15,20 @@
#include "schema_fwd.hh"
#include "frozen_schema.hh"
#include "replica/database_fwd.hh"
#include "db/view/base_info.hh"
namespace db {
class schema_ctxt;
}
class schema_registry;
using async_schema_loader = std::function<future<extended_frozen_schema>(table_schema_version)>;
using schema_loader = std::function<extended_frozen_schema(table_schema_version)>;
struct view_schema_and_base_info {
frozen_schema schema;
std::optional<db::view::base_dependent_view_info> base_info;
};
using async_schema_loader = std::function<future<view_schema_and_base_info>(table_schema_version)>;
using schema_loader = std::function<view_schema_and_base_info(table_schema_version)>;
class schema_version_not_found : public std::runtime_error {
public:
@@ -60,7 +65,8 @@ class schema_registry_entry : public enable_lw_shared_from_this<schema_registry_
async_schema_loader _loader; // valid when state == LOADING
shared_promise<schema_ptr> _schema_promise; // valid when state == LOADING
std::optional<extended_frozen_schema> _extended_frozen_schema; // engaged when state == LOADED
std::optional<frozen_schema> _frozen_schema; // engaged when state == LOADED
std::optional<db::view::base_dependent_view_info> _base_info;// engaged when state == LOADED for view schemas
// valid when state == LOADED
// This is != nullptr when there is an alive schema_ptr associated with this entry.
@@ -78,7 +84,7 @@ public:
schema_registry_entry(schema_registry_entry&&) = delete;
schema_registry_entry(const schema_registry_entry&) = delete;
~schema_registry_entry();
schema_ptr load(extended_frozen_schema);
schema_ptr load(view_schema_and_base_info);
schema_ptr load(schema_ptr);
future<schema_ptr> start_loading(async_schema_loader);
schema_ptr get_schema(); // call only when state >= LOADED
@@ -89,8 +95,6 @@ public:
// Marks this schema version as synced. Syncing cannot be in progress.
void mark_synced();
// Can be called from other shards
extended_frozen_schema extended_frozen() const;
// Can be called from other shards
frozen_schema frozen() const;
// Can be called from other shards
table_schema_version version() const { return _version; }
@@ -156,7 +160,7 @@ public:
// The schema instance pointed to by the argument will be attached to the registry
// entry and will keep it alive.
// If the schema refers to a view, it must have base_info set.
schema_ptr learn(schema_ptr);
schema_ptr learn(const schema_ptr&);
// Removes all entries from the registry. This in turn removes all dependencies
// on the Seastar reactor.
@@ -173,6 +177,7 @@ schema_registry& local_schema_registry();
// chain will last.
class global_schema_ptr {
schema_ptr _ptr;
std::optional<db::view::base_dependent_view_info> _base_info;
unsigned _cpu_of_origin;
public:
// Note: the schema_ptr must come from the current shard and can't be nullptr.

View File

@@ -81,7 +81,7 @@ public:
// of the column family's keyspace. The reason for this is that we sometimes create a keyspace
// and its column families together. Therefore, listeners can't load the keyspace from the
// database. Instead, they should use the `ksm` parameter if needed.
virtual void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&, api::timestamp_type) {}
virtual void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&) {}
virtual void on_before_create_column_family(const keyspace_metadata& ksm, const schema&, utils::chunked_vector<mutation>&, api::timestamp_type) {}
virtual void on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, utils::chunked_vector<mutation>& mutations, api::timestamp_type timestamp);
virtual void on_before_update_column_family(const schema& new_schema, const schema& old_schema, utils::chunked_vector<mutation>&, api::timestamp_type) {}
@@ -155,7 +155,7 @@ public:
// We want to do this before calling `before_create_column_families`,
// because in `before_create_column_families` we want the subscriber to get
// the final list of tables.
void pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&, api::timestamp_type);
void pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>&);
void before_create_column_family(const keyspace_metadata& ksm, const schema&, utils::chunked_vector<mutation>&, api::timestamp_type);
void before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>&, utils::chunked_vector<mutation>&, api::timestamp_type);

View File

@@ -54,7 +54,7 @@ const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& sp);
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
service::storage_proxy& storage_proxy, sharded<service::storage_service>& ss, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
_notifier(notifier)
, _group0_barrier(this_shard_id() == 0 ?
std::function<future<>()>([this] () -> future<> {
@@ -75,7 +75,7 @@ migration_manager::migration_manager(migration_notifier& notifier, gms::feature_
})
)
, _background_tasks("migration_manager::background_tasks")
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _ss("migration_manager::storage_service"), _gossiper(gossiper), _group0_client(group0_client)
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _ss(ss), _gossiper(gossiper), _group0_client(group0_client)
, _sys_ks(sysks)
, _schema_push([this] { return passive_announce(); })
, _concurrent_ddl_retries{10}
@@ -94,14 +94,6 @@ future<> migration_manager::stop() {
}
}
void migration_manager::plug_storage_service(service::storage_service& ss) {
_ss.plug(ss.shared_from_this());
}
future<> migration_manager::unplug_storage_service() {
return _ss.unplug();
}
future<> migration_manager::drain()
{
mlogger.info("stopping migration service");
@@ -400,13 +392,9 @@ future<> migration_manager::merge_schema_from(locator::host_id src, const utils:
mlogger.debug("Applying schema mutations from {}", src);
auto& proxy = _storage_proxy;
const auto& db = proxy.get_db().local();
auto ss = _ss.get_permit();
if (!ss) {
co_return;
}
if (_as.abort_requested()) {
throw abort_requested_exception{};
return make_exception_future<>(abort_requested_exception());
}
utils::chunked_vector<mutation> mutations;
@@ -419,19 +407,16 @@ future<> migration_manager::merge_schema_from(locator::host_id src, const utils:
}
} catch (replica::no_such_column_family& e) {
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
throw std::runtime_error(fmt::format("Error while applying schema mutations: {}", e));
return make_exception_future<>(std::make_exception_ptr<std::runtime_error>(
std::runtime_error(fmt::format("Error while applying schema mutations: {}", e))));
}
co_await db::schema_tables::merge_schema(_sys_ks, proxy.container(), ss.get()->container(), _feat, std::move(mutations));
return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _ss, _feat, std::move(mutations));
}
future<> migration_manager::reload_schema() {
mlogger.info("Reloading schema");
auto ss = _ss.get_permit();
if (!ss) {
co_return;
}
utils::chunked_vector<mutation> mutations;
co_await db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), ss.get()->container(), _feat, std::move(mutations), true);
return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _ss, _feat, std::move(mutations), true);
}
bool migration_manager::has_compatible_schema_tables_version(const locator::host_id& endpoint) {
@@ -603,10 +588,10 @@ void migration_notifier::before_create_column_family(const keyspace_metadata& ks
});
}
void migration_notifier::pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms, api::timestamp_type timestamp) {
_listeners.thread_for_each([&ksm, &cfms, timestamp] (migration_listener* listener) {
void migration_notifier::pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms) {
_listeners.thread_for_each([&ksm, &cfms] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_pre_create_column_families(ksm, cfms, timestamp);
listener->on_pre_create_column_families(ksm, cfms);
});
}
@@ -693,7 +678,7 @@ static future<utils::chunked_vector<mutation>> do_prepare_new_column_families_an
mlogger.info("Create new ColumnFamily: {}", cfm);
}
db.get_notifier().pre_create_column_families(ksm, cfms, timestamp);
db.get_notifier().pre_create_column_families(ksm, cfms);
utils::chunked_vector<mutation> mutations;
for (schema_ptr cfm : cfms) {
@@ -731,13 +716,6 @@ future<> prepare_new_column_family_announcement(utils::chunked_vector<mutation>&
future<> prepare_new_column_families_announcement(utils::chunked_vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
for (auto cfm : cfms) {
try {
co_await validate(cfm);
} catch (...) {
std::throw_with_nested(std::runtime_error(seastar::format("Validation of schema extensions failed for ColumnFamily: {}", cfm)));
}
}
auto& db = sp.local_db();
// If the keyspace exists, ensure that we use the current metadata.
const auto& current_ksm = db.has_keyspace(ksm.name()) ? *db.find_keyspace(ksm.name()).metadata() : ksm;
@@ -1096,11 +1074,7 @@ future<> migration_manager::announce_with_raft(utils::chunked_vector<mutation> s
}
future<> migration_manager::announce_without_raft(utils::chunked_vector<mutation> schema, group0_guard guard) {
auto ss = _ss.get_permit();
if (!ss) {
co_return;
}
auto f = db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), ss.get()->container(), _feat, schema);
auto f = db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _ss, _feat, schema);
try {
using namespace std::placeholders;
@@ -1238,9 +1212,7 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, locator:
// referenced by the incoming request.
// That means the column mapping for the schema should always be inserted
// with TTL (refresh TTL in case column mapping already existed prior to that).
// We don't set the CDC schema here because it's not included in the RPC and we're
// not using raft mode.
auto us = s.unfreeze(db::schema_ctxt(proxy), nullptr);
auto us = s.unfreeze(db::schema_ctxt(proxy));
// if this is a view - sanity check that its schema doesn't need fixing.
schema_ptr base_schema;
if (us->is_view()) {
@@ -1248,8 +1220,12 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, locator:
base_schema = db.find_schema(us->view_info()->base_id());
db::schema_tables::check_no_legacy_secondary_index_mv_schema(db, view_ptr(us), base_schema);
}
return db::schema_tables::store_column_mapping(proxy, us, true).then([us, base_schema] -> extended_frozen_schema {
return extended_frozen_schema(us);
return db::schema_tables::store_column_mapping(proxy, us, true).then([us, base_schema] -> view_schema_and_base_info {
if (us->is_view()) {
return {frozen_schema(us), us->view_info()->base_info()};
} else {
return {frozen_schema(us)};
}
});
});
});

View File

@@ -19,7 +19,6 @@
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "schema/schema_fwd.hh"
#include "service/storage_service.hh"
#include "utils/pluggable.hh"
#include "utils/serialized_action.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_group0_client.hh"
@@ -64,7 +63,7 @@ private:
gms::feature_service& _feat;
netw::messaging_service& _messaging;
service::storage_proxy& _storage_proxy;
utils::pluggable<storage_service> _ss;
sharded<service::storage_service>& _ss;
gms::gossiper& _gossiper;
seastar::abort_source _as;
service::raft_group0_client& _group0_client;
@@ -90,9 +89,7 @@ private:
friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);
void plug_storage_service(service::storage_service& ss);
future<> unplug_storage_service();
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, sharded<service::storage_service>& ss, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);
migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }

View File

@@ -641,17 +641,6 @@ future<scheduling_group> service_level_controller::auth_integration::get_user_sc
}
future<scheduling_group> service_level_controller::get_user_scheduling_group(const std::optional<auth::authenticated_user>& usr) {
// Special case:
// -------------
// The maintenance socket can communicate with Scylla before `auth_integration`
// is registered, and we need to prepare for it.
// For the discussion, see: scylladb/scylladb#26816.
//
// TODO: Get rid of this.
if (!usr.has_value() || auth::is_anonymous(usr.value())) {
return make_ready_future<scheduling_group>(get_default_scheduling_group());
}
SCYLLA_ASSERT(_auth_integration != nullptr);
return _auth_integration->get_user_scheduling_group(usr);
}
@@ -1191,10 +1180,7 @@ future<std::vector<cql3::description>> service_level_controller::auth_integratio
}
future<std::vector<cql3::description>> service_level_controller::describe_service_levels() {
if (_auth_integration == nullptr) {
throw std::runtime_error("Describing service levels requires that `auth_integration` has been registered, "
"but it has not. One of the potential reasons is using the maintenance socket.");
}
SCYLLA_ASSERT(_auth_integration != nullptr);
std::vector<cql3::description> created_service_levels_descs = co_await describe_created_service_levels();
std::vector<cql3::description> attached_service_levels_descs = co_await _auth_integration->describe_attached_service_levels();

View File

@@ -17,7 +17,6 @@
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/core/lowres_clock.hh>
#include "auth/authenticated_user.hh"
#include "seastarx.hh"
#include "auth/service.hh"
#include "cql3/description.hh"
@@ -289,17 +288,6 @@ public:
template <typename Func, typename Ret = std::invoke_result_t<Func>>
requires std::invocable<Func>
futurize_t<Ret> with_user_service_level(const std::optional<auth::authenticated_user>& usr, Func&& func) {
// Special case:
// -------------
// The maintenance socket can communicate with Scylla before `auth_integration`
// is registered, and we need to prepare for it.
// For the discussion, see: scylladb/scylladb#26816.
//
// TODO: Get rid of this.
if (!usr.has_value() || auth::is_anonymous(usr.value())) {
return with_scheduling_group(get_default_scheduling_group(), std::forward<Func>(func));
}
SCYLLA_ASSERT(_auth_integration != nullptr);
return _auth_integration->with_user_service_level(usr, std::forward<Func>(func));
}

View File

@@ -1190,7 +1190,7 @@ static unsigned get_cas_shard(const schema& s, dht::token token, const locator::
if (const auto& rs = erm.get_replication_strategy(); rs.uses_tablets()) {
const auto& tablet_map = erm.get_token_metadata().tablets().get_tablet_map(s.id());
const auto tablet_id = tablet_map.get_tablet_id(token);
return tablet_map.get_primary_replica(tablet_id, erm.get_topology()).shard % smp::count;
return tablet_map.get_primary_replica(tablet_id).shard % smp::count;
} else {
on_internal_error(paxos::paxos_state::logger,
format("failed to detect shard for reads for non-tablet-based rs {}, table {}.{}",
@@ -1607,7 +1607,7 @@ protected:
service_permit _permit; // holds admission permit until operation completes
db::per_partition_rate_limit::info _rate_limit_info;
db::view::update_backlog _view_backlog; // max view update backlog of all participating targets
utils::small_vector<gate::holder, 2> _holders;
int _destroy_promise_index = -1;
protected:
virtual bool waited_for(locator::host_id from) = 0;
@@ -1639,11 +1639,6 @@ public:
if (cancellable) {
register_cancellable();
}
attach_to(_proxy->_write_handlers_gate);
}
void attach_to(gate& g) {
_holders.push_back(g.hold());
}
virtual ~abstract_write_response_handler() {
--_stats.writes;
@@ -1673,6 +1668,22 @@ public:
}
update_cancellable_live_iterators();
if (const auto index = _destroy_promise_index; index >= 0) {
auto& promises = _proxy->_write_handler_destroy_promises;
auto& p = promises[index];
p.on_destroy();
std::swap(p, promises.back());
p.handler()._destroy_promise_index = index;
promises.pop_back();
}
}
void ensure_destroy_promise() {
if (_destroy_promise_index < 0) {
auto& promises = _proxy->_write_handler_destroy_promises;
_destroy_promise_index = promises.size();
promises.emplace_back(*this);
}
}
bool is_counter() const {
return _type == db::write_type::COUNTER;
@@ -2692,7 +2703,29 @@ void storage_proxy::remove_response_handler(storage_proxy::response_id_type id)
remove_response_handler_entry(std::move(entry));
}
storage_proxy::write_handler_destroy_promise::write_handler_destroy_promise(abstract_write_response_handler& handler)
: _handler(&handler)
, _promise(std::nullopt)
{
}
future<> storage_proxy::write_handler_destroy_promise::get_future() {
if (!_promise) {
_promise.emplace();
}
co_await _promise->get_shared_future();
}
void storage_proxy::write_handler_destroy_promise::on_destroy() {
if (_promise) {
_promise->set_value();
}
}
void storage_proxy::remove_response_handler_entry(response_handlers_map::iterator entry) {
if (auto& handler = *entry->second; handler.use_count() > 1) {
handler.ensure_destroy_promise();
}
entry->second->on_released();
_response_handlers.erase(std::move(entry));
}
@@ -7282,7 +7315,7 @@ void storage_proxy::on_released(const locator::host_id& hid) {
}
future<> storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun) {
gate g;
std::vector<future<>> futures;
auto it = _cancellable_write_handlers_list->begin();
while (it != _cancellable_write_handlers_list->end()) {
// timeout_cb() may destroy the current handler. Since the list uses
@@ -7292,10 +7325,14 @@ future<> storage_proxy::cancel_write_handlers(noncopyable_function<bool(const ab
const auto next = std::next(it);
if (filter_fun(*it)) {
it->attach_to(g);
auto handler = it->shared_from_this();
if (_response_handlers.contains(it->id())) {
it->timeout_cb();
}
if (it->use_count() > 1) {
it->ensure_destroy_promise();
futures.push_back(_write_handler_destroy_promises[it->_destroy_promise_index].get_future());
}
}
it = next;
if (need_preempt() && it != _cancellable_write_handlers_list->end()) {
@@ -7307,10 +7344,10 @@ future<> storage_proxy::cancel_write_handlers(noncopyable_function<bool(const ab
// iterator_guard handles safe iterator updates while allowing prompt
// handler destruction and client response.
cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it};
co_await coroutine::maybe_yield();
co_await maybe_yield();
}
}
co_await g.close();
co_await when_all_succeed(std::move(futures));
}
void storage_proxy::on_down(const gms::inet_address& endpoint, locator::host_id id) {
@@ -7351,11 +7388,15 @@ future<utils::chunked_vector<dht::token_range_endpoints>> storage_proxy::describ
}
future<> storage_proxy::cancel_all_write_response_handlers() {
auto f = _write_handlers_gate.close();
while (!_response_handlers.empty()) {
_response_handlers.begin()->second->timeout_cb();
co_await coroutine::maybe_yield();
if (!_response_handlers.empty()) {
co_await maybe_yield();
}
}
while (!_write_handler_destroy_promises.empty()) {
co_await _write_handler_destroy_promises.front().get_future();
}
co_await std::move(f);
}
}

View File

@@ -320,7 +320,20 @@ private:
class cancellable_write_handlers_list;
std::unique_ptr<cancellable_write_handlers_list> _cancellable_write_handlers_list;
gate _write_handlers_gate;
// shared_ptr<abstract_write_response_handler> instances are captured in the lmutate/rmutate
// lambdas of send_to_live_endpoints(). As a result, an abstract_write_response_handler object
// may outlive its removal from the _response_handlers map. We use write_handler_destroy_promise to
// wait for such pending instances in cancel_write_handlers() and cancel_all_write_response_handlers().
class write_handler_destroy_promise {
abstract_write_response_handler* _handler;
std::optional<shared_promise<void>> _promise;
public:
write_handler_destroy_promise(abstract_write_response_handler& handler);
future<> get_future();
abstract_write_response_handler& handler() { return *_handler; }
void on_destroy();
};
std::vector<write_handler_destroy_promise> _write_handler_destroy_promises;
/* This is a pointer to the shard-local part of the sharded cdc_service:
* storage_proxy needs access to cdc_service to augment mutations.

View File

@@ -265,7 +265,6 @@ storage_service::storage_service(abort_source& abort_source,
}
init_messaging_service();
_migration_manager.local().plug_storage_service(*this);
}
storage_service::~storage_service() = default;
@@ -533,6 +532,9 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
update_topology(host_id, t.left_nodes_rs.at(id));
}
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
co_await _messaging.local().ban_host(host_id);
};
auto process_normal_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
@@ -643,7 +645,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr));
}
}
for (auto id : t.excluded_tablet_nodes) {
for (auto id : t.get_excluded_nodes()) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
if (n) {
n->set_excluded(true);
@@ -651,7 +653,9 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
}
auto nodes_to_release = t.left_nodes;
nodes_to_release.insert(t.ignored_nodes.begin(), t.ignored_nodes.end());
for (auto id: t.get_excluded_nodes()) {
nodes_to_release.insert(id);
}
for (const auto& id: nodes_to_release) {
auto host_id = locator::host_id(id.uuid());
if (!tmptr->get_topology().find_node(host_id)) {
@@ -700,11 +704,10 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
// read topology state from disk and recreate token_metadata from it
_topology_state_machine._topology = co_await _sys_ks.local().load_topology_state(tablet_hosts);
_topology_state_machine.reload_count++;
auto& topology = _topology_state_machine._topology;
set_topology_change_kind(upgrade_state_to_topology_op_kind(topology.upgrade_state));
set_topology_change_kind(upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state));
if (topology.upgrade_state != topology::upgrade_state_type::done) {
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::done) {
co_return;
}
@@ -738,13 +741,13 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
}
co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) {
return fs.enable(topology.enabled_features | std::ranges::to<std::set<std::string_view>>());
return fs.enable(_topology_state_machine._topology.enabled_features | std::ranges::to<std::set<std::string_view>>());
});
// Update the legacy `enabled_features` key in `system.scylla_local`.
// It's OK to update it after enabling features because `system.topology` now
// is the source of truth about enabled features.
co_await _sys_ks.local().save_local_enabled_features(topology.enabled_features, false);
co_await _sys_ks.local().save_local_enabled_features(_topology_state_machine._topology.enabled_features, false);
auto saved_tmpr = get_token_metadata_ptr();
{
@@ -752,7 +755,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
auto tmptr = _shared_token_metadata.make_token_metadata_ptr();
tmptr->invalidate_cached_rings();
tmptr->set_version(topology.version);
tmptr->set_version(_topology_state_machine._topology.version);
const auto read_new = std::invoke([](std::optional<topology::transition_state> state) {
using read_new_t = locator::token_metadata::read_new_t;
@@ -785,7 +788,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
case topology::transition_state::write_both_read_new:
return read_new_t::yes;
}
}, topology.tstate);
}, _topology_state_machine._topology.tstate);
tmptr->set_read_new(read_new);
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal));
@@ -799,7 +802,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
} else {
tablets = co_await replica::read_tablet_metadata(_qp);
}
tablets->set_balancing_enabled(topology.tablet_balancing_enabled);
tablets->set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
tmptr->set_tablets(std::move(*tablets));
co_await replicate_to_all_cores(std::move(tmptr));
@@ -807,7 +810,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
rtlogger.debug("topology_state_load: token metadata replication to all cores finished");
}
co_await update_fence_version(topology.fence_version);
co_await update_fence_version(_topology_state_machine._topology.fence_version);
// As soon as a node joins token_metadata.topology we
// need to drop all its rpc connections with ignored_topology flag.
@@ -822,25 +825,25 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
for (const auto& gen_id : topology.committed_cdc_generations) {
for (const auto& gen_id : _topology_state_machine._topology.committed_cdc_generations) {
rtlogger.trace("topology_state_load: process committed cdc generation {}", gen_id);
co_await utils::get_local_injector().inject("topology_state_load_before_update_cdc", [](auto& handler) -> future<> {
rtlogger.info("topology_state_load_before_update_cdc hit, wait for message");
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
});
co_await _cdc_gens.local().handle_cdc_generation(gen_id);
if (gen_id == topology.committed_cdc_generations.back()) {
if (gen_id == _topology_state_machine._topology.committed_cdc_generations.back()) {
co_await _sys_ks.local().update_cdc_generation_id(gen_id);
rtlogger.debug("topology_state_load: the last committed CDC generation ID: {}", gen_id);
}
}
// Ban all left and ignord nodes. We do not allow them to go back online.
co_await _messaging.local().ban_hosts(boost::join(topology.left_nodes, topology.ignored_nodes)
| std::views::transform([] (auto id) { return locator::host_id{id.uuid()}; })
| std::ranges::to<utils::chunked_vector<locator::host_id>>());
for (auto& id : _topology_state_machine._topology.ignored_nodes) {
// Ban all ignored nodes. We do not allow them to go back online
co_await _messaging.local().ban_host(locator::host_id{id.uuid()});
}
slogger.debug("topology_state_load: excluded tablet nodes: {}", topology.excluded_tablet_nodes);
slogger.debug("topology_state_load: excluded nodes: {}", _topology_state_machine._topology.get_excluded_nodes());
}
future<> storage_service::topology_transition(state_change_hint hint) {
@@ -3420,7 +3423,6 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
}
future<> storage_service::stop() {
co_await _migration_manager.local().unplug_storage_service();
// if there is a background "isolate" shutdown
// in progress, we need to sync with it. Mostly
// relevant for tests
@@ -4962,39 +4964,6 @@ future<> storage_service::do_clusterwide_vnodes_cleanup() {
rtlogger.info("cluster-wide vnodes cleanup done");
}
future<> storage_service::reset_cleanup_needed() {
auto& server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto me = _topology_state_machine._topology.find(server.id());
if (!me || me->second.state != node_state::normal) {
throw std::runtime_error(format("cannot mark the node as clean: local node {} is either not a member of the cluster or is not in a normal state", server.id()));
}
if (me->second.cleanup != cleanup_status::needed) {
rtlogger.info("cannot reset cleanup flag when it is {}", me->second.cleanup);
co_return;
}
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup status reset by force for {}", server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
} catch (group0_concurrent_modification&) {
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
continue;
}
rtlogger.info("cleanup needed flag is reset by force");
break;
}
}
future<sstring> storage_service::wait_for_topology_request_completion(utils::UUID id, bool require_entry) {
co_return co_await _topology_state_machine.wait_for_request_completion(_sys_ks.local(), id, require_entry);
}
@@ -5727,7 +5696,7 @@ future<std::map<token, inet_address>> storage_service::get_tablet_to_endpoint_ma
const auto& tmap = tm.tablets().get_tablet_map(table);
std::map<token, inet_address> result;
for (std::optional<locator::tablet_id> tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) {
result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid, tm.get_topology()).host));
result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid).host));
co_await coroutine::maybe_yield();
}
co_return result;
@@ -6479,7 +6448,6 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
ssts.push_back(co_await load_sstable(sharder, table, std::move(sst_desc)));
}
co_await table.add_sstables_and_update_cache(ssts);
_view_building_worker.local().load_sstables(tablet.table, ssts);
});
rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending);
}
@@ -6719,9 +6687,8 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
throw std::runtime_error(fmt::format("Tablet {} stage is not at cleanup/cleanup_target", tablet));
}
}
co_await _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks, &vbw = _view_building_worker] (replica::database& db) {
co_await _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks] (replica::database& db) {
auto& table = db.find_column_family(tablet.table);
vbw.local().cleanup_staging_sstables(table.get_effective_replication_map(), tablet.table, tablet.tablet);
return table.cleanup_tablet(db, sys_ks.local(), tablet.tablet);
});
co_return tablet_operation_result();
@@ -8333,3 +8300,4 @@ future<> storage_service::query_cdc_streams(table_id table, noncopyable_function
}
} // namespace service

View File

@@ -1010,7 +1010,6 @@ public:
future<> load_cdc_streams(std::optional<std::unordered_set<table_id>> changed_tables = std::nullopt);
future<> do_clusterwide_vnodes_cleanup();
future<> reset_cleanup_needed();
// Starts the upgrade procedure to topology on raft.
// Must be called on shard 0.

View File

@@ -294,7 +294,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// it may still fail if down node has data for the rebuild process
return !dead_nodes.contains(req.first);
}
auto exclude_nodes = get_excluded_nodes_for_topology_request(topo, req.first, req.second);
auto exclude_nodes = get_excluded_nodes(topo, req.first, req.second);
for (auto id : dead_nodes) {
if (!exclude_nodes.contains(id)) {
return false;
@@ -448,11 +448,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
| std::views::filter([&cmd, &exclude_nodes] (const std::pair<const raft::server_id, replica_state>& n) {
// We must send barrier and barrier_and_drain to the decommissioning node
// as it might be accepting or coordinating requests.
// A joining node might be receiving mutations and coordinating view updates.
bool include_transitioning_node =
(n.second.state == node_state::decommissioning || n.second.state == node_state::bootstrapping || n.second.state == node_state::replacing)
bool include_decommissioning_node = n.second.state == node_state::decommissioning
&& (cmd.cmd == raft_topology_cmd::command::barrier || cmd.cmd == raft_topology_cmd::command::barrier_and_drain);
return !exclude_nodes.contains(n.first) && (n.second.state == node_state::normal || include_transitioning_node);
return !exclude_nodes.contains(n.first) && (n.second.state == node_state::normal || include_decommissioning_node);
})
| std::views::keys;
if (drop_and_retake) {
@@ -465,39 +463,20 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_return guard;
}
std::unordered_set<raft::server_id> get_excluded_nodes_for_topology_request(const topology_state_machine::topology_type& topo,
std::unordered_set<raft::server_id> get_excluded_nodes(const topology_state_machine::topology_type& topo,
raft::server_id id, const std::optional<topology_request>& req) const {
// ignored_nodes is not per request any longer, but for now consider ignored nodes only
// for remove and replace operations since only those operations support it on streaming level.
// Specifically, streaming for bootstrapping doesn't support ignored_nodes
// (see raft_topology_cmd::command::stream_ranges handler in storage_proxy, neither
// bootstrap_with_repair nor bs.bootstrap take an ignored_nodes parameter).
// If we ignored a dead node for a join request here, this request wouldn't be cancelled by
// get_next_task and the stream_ranges would fail with timeout after wasting some time
// trying to access the dead node.
const auto is_remove_or_replace = std::invoke([&]() {
if (req) {
return *req == topology_request::remove || *req == topology_request::replace;
}
const auto it = topo.transition_nodes.find(id);
if (it == topo.transition_nodes.end()) {
return false;
}
const auto s = it->second.state;
return s == node_state::removing || s == node_state::replacing;
});
std::unordered_set<raft::server_id> exclude_nodes;
if (is_remove_or_replace) {
exclude_nodes = topo.ignored_nodes;
}
return exclude_nodes;
return topo.get_excluded_nodes(id, req);
}
std::unordered_set<raft::server_id> get_excluded_nodes_for_topology_request(const node_to_work_on& node) const {
return get_excluded_nodes_for_topology_request(*node.topology, node.id, node.request);
std::unordered_set<raft::server_id> get_excluded_nodes(const node_to_work_on& node) const {
return node.topology->get_excluded_nodes(node.id, node.request);
}
future<node_to_work_on> exec_global_command(node_to_work_on&& node, const raft_topology_cmd& cmd) {
auto guard = co_await exec_global_command(std::move(node.guard), cmd, get_excluded_nodes(node), drop_guard_and_retake::yes);
co_return retake_node(std::move(guard), node.id);
};
future<group0_guard> remove_from_group0(group0_guard guard, const raft::server_id& id) {
rtlogger.info("removing node {} from group 0 configuration...", id);
release_guard(std::move(guard));
@@ -1186,7 +1165,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
future<group0_guard> global_tablet_token_metadata_barrier(group0_guard guard) {
// FIXME: Don't require all nodes to be up, only tablet replicas.
return global_token_metadata_barrier(std::move(guard), _topo_sm._topology.ignored_nodes);
return global_token_metadata_barrier(std::move(guard), _topo_sm._topology.get_excluded_nodes());
}
// Represents a two-state state machine which changes monotonically
@@ -1279,7 +1258,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
bool is_excluded(raft::server_id server_id) const {
return _topo_sm._topology.excluded_tablet_nodes.contains(server_id);
return _topo_sm._topology.get_excluded_nodes().contains(server_id);
}
void generate_migration_update(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const tablet_migration_info& mig) {
@@ -1508,8 +1487,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.info("Skipped tablet rebuild repair of {} as no tablet replica was found", gid);
return make_ready_future<>();
}
auto dst = locator::maybe_get_primary_replica(gid.tablet, {tsi.read_from.begin(), tsi.read_from.end()},
_db.get_token_metadata().get_topology(), [] (const auto& tr) { return true; }).value().host;
auto dst = locator::maybe_get_primary_replica(gid.tablet, {tsi.read_from.begin(), tsi.read_from.end()}, [] (const auto& tr) { return true; }).value().host;
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
@@ -1735,7 +1713,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
const auto& topo = _db.get_token_metadata().get_topology();
locator::host_id dst;
if (hosts_filter.empty() && dcs_filter.empty()) {
auto primary = tmap.get_primary_replica(gid.tablet, topo);
auto primary = tmap.get_primary_replica(gid.tablet);
dst = primary.host;
} else {
auto dst_opt = tmap.maybe_get_selected_replica(gid.tablet, topo, tinfo.repair_task_info);
@@ -1912,10 +1890,34 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
void migrate_tablet_size(locator::host_id leaving, locator::host_id pending, locator::global_tablet_id gid, const dht::token_range trange) {
if (auto old_load_stats = _tablet_allocator.get_load_stats()) {
auto new_load_stats = old_load_stats->migrate_tablet_size(leaving, pending, gid, trange);
if (new_load_stats) {
_tablet_allocator.set_load_stats(std::move(new_load_stats));
auto has_tablet_size = [&] (const locator::load_stats& stats, locator::host_id host) {
if (auto host_i = stats.tablet_stats.find(host); host_i != stats.tablet_stats.end()) {
auto& tables = host_i->second.tablet_sizes;
if (auto table_i = tables.find(gid.table); table_i != tables.find(gid.table)) {
if (auto size_i = table_i->second.find(trange); size_i != table_i->second.find(trange)) {
return true;
}
}
}
return false;
};
if (leaving != pending) {
auto old_load_stats = _tablet_allocator.get_load_stats();
if (old_load_stats) {
const locator::load_stats& stats = *old_load_stats;
if (has_tablet_size(stats, leaving) && !has_tablet_size(stats, pending)) {
rtlogger.debug("Moving tablet size for tablet: {} from: {} to: {}", gid, leaving, pending);
auto new_load_stats = make_lw_shared<locator::load_stats>(*old_load_stats);
auto& new_leaving_ts = new_load_stats->tablet_stats.at(leaving);
auto& new_pending_ts = new_load_stats->tablet_stats.at(pending);
auto map_node = new_leaving_ts.tablet_sizes.at(gid.table).extract(trange);
new_pending_ts.tablet_sizes[gid.table].insert(std::move(map_node));
if (new_leaving_ts.tablet_sizes.at(gid.table).empty()) {
new_leaving_ts.tablet_sizes.erase(gid.table);
}
_tablet_allocator.set_load_stats(std::move(new_load_stats));
}
}
}
}
@@ -2023,10 +2025,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// Collect the IDs of the hosts with replicas, but ignore excluded nodes
std::unordered_set<locator::host_id> replica_hosts;
const std::unordered_set<raft::server_id> excluded_nodes = _topo_sm._topology.get_excluded_nodes();
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
for (const locator::tablet_replica& replica: tinfo.replicas) {
if (!_topo_sm._topology.excluded_tablet_nodes.contains(raft::server_id(replica.host.uuid()))) {
if (!excluded_nodes.contains(raft::server_id(replica.host.uuid()))) {
replica_hosts.insert(replica.host);
}
}
@@ -2513,7 +2516,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// make sure all nodes know about new topology (we require all nodes to be alive for topo change for now)
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
@@ -2553,7 +2556,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
if (!node.rs->ring->tokens.empty()) {
if (node.rs->state == node_state::removing) {
// tell all token owners to stream data of the removed node to new range owners
auto exclude_nodes = get_excluded_nodes_for_topology_request(node);
auto exclude_nodes = get_excluded_nodes(node);
auto normal_zero_token_nodes = _topo_sm._topology.get_normal_zero_token_nodes();
std::move(normal_zero_token_nodes.begin(), normal_zero_token_nodes.end(),
std::inserter(exclude_nodes, exclude_nodes.begin()));
@@ -2597,7 +2600,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// In this state writes goes to old and new replicas but reads start to be done from new replicas
// Before we stop writing to old replicas we need to wait for all previous reads to complete
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
@@ -2727,39 +2730,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case topology::transition_state::left_token_ring: {
auto node = get_node_to_work_on(std::move(guard));
auto finish_left_token_ring_transition = [&](node_to_work_on& node) -> future<> {
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
utils::get_local_injector().inject("finish_left_token_ring_transition_throw", [] {
throw std::runtime_error("finish_left_token_ring_transition failed due to error injection");
});
utils::chunked_vector<canonical_mutation> muts;
topology_mutation_builder builder(node.guard.write_timestamp());
cleanup_ignored_nodes_on_left(builder, node.id);
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::left);
muts.push_back(builder.build());
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
auto str = node.rs->state == node_state::decommissioning
? ::format("finished decommissioning node {}", node.id)
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
};
// Denotes the case when this path is already executed before and first topology state update was successful.
// So we can skip all steps and perform the second topology state update operation (which modifies node state to left)
// and remove node conditionally from group0.
if (auto [done, error] = co_await _sys_ks.get_topology_request_state(node.rs->request_id, false); done) {
co_await finish_left_token_ring_transition(node);
break;
}
if (node.id == _raft.id()) {
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
@@ -2775,7 +2745,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
@@ -2832,7 +2802,24 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node = retake_node(co_await start_operation(), node_id);
}
co_await finish_left_token_ring_transition(node);
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
utils::chunked_vector<canonical_mutation> muts;
topology_mutation_builder builder(node.guard.write_timestamp());
cleanup_ignored_nodes_on_left(builder, node.id);
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::left);
muts.push_back(builder.build());
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
auto str = node.rs->state == node_state::decommissioning
? ::format("finished decommissioning node {}", node.id)
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
}
break;
case topology::transition_state::rollback_to_normal: {
@@ -2843,7 +2830,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
bool barrier_failed = false;
auto state = node.rs->state;
try {
node.guard = co_await exec_global_command(std::move(node.guard),raft_topology_cmd::command::barrier_and_drain, get_excluded_nodes_for_topology_request(node), drop_guard_and_retake::yes);
node.guard = co_await exec_global_command(std::move(node.guard),raft_topology_cmd::command::barrier_and_drain, get_excluded_nodes(node), drop_guard_and_retake::yes);
} catch (term_changed_error&) {
throw;
} catch (raft::request_aborted&) {
@@ -2919,7 +2906,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await wait_for_gossiper(node.id, _gossiper, _as);
node.guard = co_await start_operation();
} else {
auto exclude_nodes = get_excluded_nodes_for_topology_request(node);
auto exclude_nodes = get_excluded_nodes(node);
exclude_nodes.insert(node.id);
node.guard = co_await exec_global_command(std::move(node.guard),
raft_topology_cmd::command::wait_for_ip,
@@ -3194,7 +3181,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
bool failed = false;
try {
rtlogger.info("vnodes cleanup {}: running global_token_metadata_barrier", cleanup_reason);
guard = co_await global_token_metadata_barrier(std::move(guard), _topo_sm._topology.ignored_nodes, &fenced);
guard = co_await global_token_metadata_barrier(std::move(guard), _topo_sm._topology.get_excluded_nodes(), &fenced);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {

View File

@@ -52,6 +52,18 @@ raft::server_id topology::parse_replaced_node(const std::optional<request_param>
return {};
}
std::unordered_set<raft::server_id> topology::get_excluded_nodes(raft::server_id id,
const std::optional<topology_request>& req) const {
std::unordered_set<raft::server_id> exclude_nodes;
// ignored_nodes is not per request any longer, but for now consider ignored nodes only
// for remove and replace operations since only those operations support it on streaming level
if ((req && (*req == topology_request::remove || *req == topology_request::replace)) ||
(transition_nodes.contains(id) && (transition_nodes.at(id).state == node_state::removing || transition_nodes.at(id).state == node_state::replacing))) {
exclude_nodes = ignored_nodes;
}
return exclude_nodes;
}
std::optional<request_param> topology::get_request_param(raft::server_id id) const {
auto rit = req_param.find(id);
if (rit != req_param.end()) {
@@ -60,6 +72,14 @@ std::optional<request_param> topology::get_request_param(raft::server_id id) con
return std::nullopt;
};
std::unordered_set<raft::server_id> topology::get_excluded_nodes() const {
auto result = ignored_nodes;
for (auto& [id, rs] : left_nodes_rs) {
result.insert(id);
}
return result;
}
std::set<sstring> calculate_not_yet_enabled_features(const std::set<sstring>& enabled_features, const auto& supported_features) {
std::set<sstring> to_enable;
bool first = true;

View File

@@ -195,13 +195,6 @@ struct topology {
// The set of nodes that should be considered dead during topology operations
std::unordered_set<raft::server_id> ignored_nodes;
// The set of nodes currently excluded from synchronization in the tablets management code.
// The barrier should not wait for these nodes.
// This set is effectively equal to: ignored_nodes + keys(left_nodes_rs).
// Tablet replicas may temporarily include left nodes (e.g. when a node is replaced),
// hence the need for this field.
std::unordered_set<raft::server_id> excluded_tablet_nodes;
// Find only nodes in non 'left' state
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id) const;
// Return true if node exists in any state including 'left' one
@@ -214,8 +207,15 @@ struct topology {
// Returns false iff we can safely start a new topology change.
bool is_busy() const;
// Returns the set of nodes currently excluded from synchronization-with in the topology.
// Barrier should not wait for those nodes. Used for tablets migration only.
std::unordered_set<raft::server_id> get_excluded_nodes() const;
std::optional<request_param> get_request_param(raft::server_id) const;
static raft::server_id parse_replaced_node(const std::optional<request_param>&);
// Returns the set of nodes currently excluded from based on global topology request.
// Used by topology coordinator code only.
std::unordered_set<raft::server_id> get_excluded_nodes(raft::server_id id, const std::optional<topology_request>& req) const;
// Calculates a set of features that are supported by all normal nodes but not yet enabled.
std::set<sstring> calculate_not_yet_enabled_features() const;

View File

@@ -81,8 +81,6 @@ struct sstable_open_config {
// Mimics behavior when a SSTable is streamed to a given shard, where SSTable
// writer considers the shard that created the SSTable as its owner.
bool current_shard_as_sstable_owner = false;
// Do not move the sharding metadata to the sharder, keeping it in the scylla metadata..
bool keep_sharding_metadata = false;
};
}

View File

@@ -11,6 +11,7 @@
#include "sstables/random_access_reader.hh"
#include "utils/disk-error-handler.hh"
#include "utils/log.hh"
#include "utils/fragmented_temporary_buffer.hh"
namespace sstables {
@@ -24,6 +25,15 @@ future <temporary_buffer<char>> random_access_reader::read_exactly(size_t n) noe
}
}
future<fragmented_temporary_buffer> random_access_reader::read_exactly_fragmented(size_t n) noexcept {
try {
fragmented_temporary_buffer::reader reader;
return reader.read_exactly(*_in, n);
} catch (...) {
return current_exception_as_future<fragmented_temporary_buffer>();
}
}
static future<> close_if_needed(std::unique_ptr<input_stream<char>> in) {
if (!in) {
return make_ready_future<>();

View File

@@ -17,6 +17,7 @@
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include "seastarx.hh"
#include "utils/fragmented_temporary_buffer.hh"
namespace sstables {
@@ -33,6 +34,8 @@ protected:
public:
future <temporary_buffer<char>> read_exactly(size_t n) noexcept;
future<fragmented_temporary_buffer> read_exactly_fragmented(size_t n) noexcept;
future<> seek(uint64_t pos) noexcept;
bool eof() const noexcept { return _in->eof(); }

View File

@@ -966,8 +966,7 @@ sstable_set_impl::create_single_key_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate,
sstables::integrity_check integrity) const
const sstable_predicate& predicate) const
{
const auto& pos = pr.start()->value();
auto hash = utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(*schema, *pos.key())));
@@ -980,7 +979,7 @@ sstable_set_impl::create_single_key_sstable_reader(
| std::views::transform([&] (const shared_sstable& sstable) {
tracing::trace(trace_state, "Reading key {} from sstable {}", pos, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->make_reader(schema, permit, pr, slice, trace_state, fwd, mutation_reader::forwarding::yes,
default_read_monitor(), integrity, &hash);
default_read_monitor(), integrity_check::no, &hash);
})
| std::ranges::to<std::vector<mutation_reader>>();
@@ -1011,8 +1010,7 @@ time_series_sstable_set::create_single_key_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate,
sstables::integrity_check integrity) const {
const sstable_predicate& predicate) const {
const auto& pos = pr.start()->value();
// First check if the optimized algorithm for TWCS single partition queries can be applied.
// Multiple conditions must be satisfied:
@@ -1034,7 +1032,7 @@ time_series_sstable_set::create_single_key_sstable_reader(
// Some of the conditions were not satisfied so we use the standard query path.
return sstable_set_impl::create_single_key_sstable_reader(
cf, std::move(schema), std::move(permit), sstable_histogram,
pr, slice, std::move(trace_state), fwd_sm, fwd_mr, predicate, integrity);
pr, slice, std::move(trace_state), fwd_sm, fwd_mr, predicate);
}
auto hash = utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(*schema, *pos.key())));
@@ -1260,8 +1258,7 @@ compound_sstable_set::create_single_key_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate,
sstables::integrity_check integrity) const {
const sstable_predicate& predicate) const {
auto sets = _sets;
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->size() > 0; });
auto non_empty_set_count = std::distance(sets.begin(), it);
@@ -1272,12 +1269,12 @@ compound_sstable_set::create_single_key_sstable_reader(
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
if (non_empty_set_count == 1) {
const auto& non_empty_set = *std::begin(sets);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate, integrity);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
}
auto readers = std::ranges::subrange(sets.begin(), it)
| std::views::transform([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate, integrity);
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
})
| std::ranges::to<std::vector<mutation_reader>>();
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
@@ -1294,11 +1291,10 @@ sstable_set::create_single_key_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate,
sstables::integrity_check integrity) const {
const sstable_predicate& predicate) const {
SCYLLA_ASSERT(pr.is_singular() && pr.start()->value().has_key());
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity);
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate);
}
class auto_closed_sstable_reader final : public mutation_reader::impl {

View File

@@ -118,8 +118,7 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
const sstable_predicate&,
sstables::integrity_check integrity = sstables::integrity_check::no) const;
const sstable_predicate&) const;
};
class sstable_set : public enable_lw_shared_from_this<sstable_set> {
@@ -216,8 +215,7 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
const sstable_predicate& p = default_sstable_predicate(),
sstables::integrity_check integrity = sstables::integrity_check::no) const;
const sstable_predicate& p = default_sstable_predicate()) const;
/// Read a range from the sstable set.
///

View File

@@ -117,8 +117,7 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
const sstable_predicate&,
sstables::integrity_check integrity = sstables::integrity_check::no) const override;
const sstable_predicate&) const override;
friend class sstable_position_reader_queue;
};
@@ -153,8 +152,7 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
const sstable_predicate&,
sstables::integrity_check integrity = sstables::integrity_check::no) const override;
const sstable_predicate&) const override;
class incremental_selector;
};

View File

@@ -36,6 +36,7 @@
#include "utils/error_injection.hh"
#include "utils/to_string.hh"
#include "utils/fragmented_temporary_buffer.hh"
#include "data_dictionary/storage_options.hh"
#include "dht/sharder.hh"
#include "writer.hh"
@@ -518,14 +519,26 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read
s.header.memory_size,
s.header.sampling_level,
s.header.size_at_full_sampling);
auto len = s.header.size * sizeof(pos_type);
// Use fragmented buffer to avoid large contiguous allocations
auto frag_buf = co_await in.read_exactly_fragmented(len);
if (frag_buf.empty()) {
throw bufsize_mismatch_exception(0, len);
}
if (frag_buf.size_bytes() != len) {
throw bufsize_mismatch_exception(frag_buf.size_bytes(), len);
}
// Positions are encoded in little-endian.
auto stream = frag_buf.get_istream();
s.positions.reserve(s.header.size + 1);
while (s.positions.size() != s.header.size) {
// random_access_reader::read_exactly internally maintains
// a 128K buffer, so it is okay to read one position at a time.
auto buf = co_await in.read_exactly(sizeof(pos_type));
check_buf_size(buf, sizeof(pos_type));
s.positions.push_back(seastar::read_le<pos_type>(buf.get()));
auto pos_result = stream.read<pos_type>();
if (!pos_result) {
std::rethrow_exception(pos_result.assume_error());
}
s.positions.push_back(seastar::le_to_cpu(*pos_result));
co_await coroutine::maybe_yield();
}
// Since the keys in the index are not sized, we need to calculate
@@ -1402,7 +1415,7 @@ future<> sstable::open_data(sstable_open_config cfg) noexcept {
co_await update_info_for_opened_data(cfg);
parse_assert(!_shards.empty(), get_filename());
auto* sm = _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>();
if (sm && !cfg.keep_sharding_metadata) {
if (sm) {
// Sharding information uses a lot of memory and once we're doing with this computation we will no longer use it.
co_await utils::clear_gently(sm->token_ranges.elements);
sm->token_ranges.elements = {};
@@ -2806,7 +2819,7 @@ future<std::optional<uint32_t>> sstable::read_digest() {
if (_components->digest) {
co_return *_components->digest;
}
if (!has_component(component_type::Digest) || _unlinked) {
if (!has_component(component_type::Digest)) {
co_return std::nullopt;
}
sstring digest_str;
@@ -2837,7 +2850,7 @@ future<lw_shared_ptr<checksum>> sstable::read_checksum() {
if (_components->checksum) {
co_return _components->checksum->shared_from_this();
}
if (!has_component(component_type::CRC) || _unlinked) {
if (!has_component(component_type::CRC)) {
co_return nullptr;
}
auto checksum = make_lw_shared<sstables::checksum>();
@@ -3345,7 +3358,6 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
future<>
sstable::unlink(storage::sync_dir sync) noexcept {
_unlinked = true;
_on_delete(*this);
auto remove_fut = _storage->wipe(*this, sync);

View File

@@ -624,7 +624,6 @@ private:
mutable std::optional<size_t> _total_reclaimable_memory{0};
// Total memory reclaimed so far from this sstable
size_t _total_memory_reclaimed{0};
bool _unlinked{false};
public:
bool has_component(component_type f) const;
sstables_manager& manager() { return _manager; }

View File

@@ -127,7 +127,7 @@ void bti_partition_index_writer_impl::write_last_key(size_t needed_prefix) {
for (auto frag : **_last_key) {
// The first fragment contains the entire token,
// and token collisions are rare, hence [[unlikely]].
if (i + frag.size() <= _last_key_mismatch) [[unlikely]] {
if (i + frag.size() < _last_key_mismatch) [[unlikely]] {
i += frag.size();
continue;
}

View File

@@ -248,8 +248,7 @@ template<typename W, typename First, typename Second, typename... Rest>
requires Writer<W>
inline void write(sstable_version_types v, W& out, const First& first, const Second& second, Rest&&... rest) {
write(v, out, first);
write(v, out, second);
(..., write(v, out, std::forward<Rest>(rest)));
write(v, out, second, std::forward<Rest>(rest)...);
}
template <class T, typename W>

View File

@@ -154,6 +154,9 @@ public:
, _unlink_sstables(unlink)
, _stream_scope(scope)
{
if (_primary_replica_only && _stream_scope != stream_scope::all) {
throw std::runtime_error("Scoped streaming of primary replica only is not supported yet");
}
// By sorting SSTables by their primary key, we allow SSTable runs to be
// incrementally streamed.
// Overlapping run fragments can have their content deduplicated, reducing
@@ -222,8 +225,8 @@ host_id_vector_replica_set sstable_streamer::get_endpoints(const dht::token& tok
};
if (_primary_replica_only) {
if (_stream_scope == stream_scope::node) {
throw std::runtime_error("Node scoped streaming of primary replica only is not supported");
if (_stream_scope != stream_scope::all) {
throw std::runtime_error("Scoped streaming of primary replica only is not supported yet");
}
return get_primary_endpoints(token, std::move(host_filter));
}
@@ -245,7 +248,7 @@ host_id_vector_replica_set sstable_streamer::get_primary_endpoints(const dht::to
host_id_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const {
auto tid = _tablet_map.get_tablet_id(token);
auto replicas = locator::get_primary_replicas(_tablet_map, tid, _erm->get_topology(), [filter = std::move(filter)] (const locator::tablet_replica& replica) {
auto replicas = locator::get_primary_replicas(_tablet_map, tid, [filter = std::move(filter)] (const locator::tablet_replica& replica) {
return filter(replica.host);
});
return to_replica_set(replicas);
@@ -564,7 +567,7 @@ future<locator::effective_replication_map_ptr> sstables_loader::await_topology_q
}
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, bool unlink, stream_scope scope,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink, stream_scope scope,
shared_ptr<stream_progress> progress) {
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
// throughout its lifetime.
@@ -572,7 +575,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
_messaging, _db.local(), table_id, std::move(erm), std::move(sstables),
primary, unlink_sstables(unlink), scope);
primary_replica_only(primary), unlink_sstables(unlink), scope);
co_await streamer->stream(progress);
}
@@ -581,7 +584,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
// All the global operations are going to happen here, and just the reloading happens
// in there.
future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary, bool skip_cleanup, bool skip_reshape, stream_scope scope) {
bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope) {
if (_loading_new_sstables) {
throw std::runtime_error("Already loading SSTables. Try again later");
} else {
@@ -606,7 +609,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
}
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, skip_cleanup={}",
ks_name, cf_name, load_and_stream_desc, primary, skip_cleanup);
ks_name, cf_name, load_and_stream_desc, primary_replica_only, skip_cleanup);
try {
if (load_and_stream) {
::table_id table_id;
@@ -617,20 +620,20 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
.load_bloom_filter = false,
};
std::tie(table_id, sstables_on_shards) = co_await replica::distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name, cfg);
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary, scope] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only(primary), true, scope, {});
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only, scope] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true, scope, {});
});
} else {
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, _view_building_worker, ks_name, cf_name, skip_cleanup, skip_reshape);
}
} catch (...) {
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",
ks_name, cf_name, load_and_stream, primary, std::current_exception());
ks_name, cf_name, load_and_stream, primary_replica_only, std::current_exception());
_loading_new_sstables = false;
throw;
}
llog.info("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=succeeded",
ks_name, cf_name, load_and_stream, primary);
ks_name, cf_name, load_and_stream, primary_replica_only);
_loading_new_sstables = false;
co_return;
}
@@ -644,7 +647,6 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im
sstring _prefix;
sstables_loader::stream_scope _scope;
std::vector<sstring> _sstables;
const primary_replica_only _primary_replica;
struct progress_holder {
// Wrap stream_progress in a smart pointer to enable polymorphism.
// This allows derived progress types to be passed down for per-tablet
@@ -668,8 +670,8 @@ protected:
public:
download_task_impl(tasks::task_manager::module_ptr module, sharded<sstables_loader>& loader,
sstring endpoint, sstring bucket, sstring ks, sstring cf, sstring prefix, std::vector<sstring> sstables,
sstables_loader::stream_scope scope, primary_replica_only primary_replica) noexcept
sstring endpoint, sstring bucket,
sstring ks, sstring cf, sstring prefix, std::vector<sstring> sstables, sstables_loader::stream_scope scope) noexcept
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
, _loader(loader)
, _endpoint(std::move(endpoint))
@@ -679,7 +681,6 @@ public:
, _prefix(std::move(prefix))
, _scope(scope)
, _sstables(std::move(sstables))
, _primary_replica(primary_replica)
{
_status.progress_units = "batches";
}
@@ -766,7 +767,7 @@ future<> sstables_loader::download_task_impl::run() {
co_await _progress_per_shard.start();
_progress_state = progress_state::initialized;
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), _primary_replica, false, _scope,
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false, _scope,
_progress_per_shard.local().progress);
});
} catch (...) {
@@ -815,13 +816,12 @@ future<> sstables_loader::stop() {
future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, sstring cf_name,
sstring prefix, std::vector<sstring> sstables,
sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica) {
sstring endpoint, sstring bucket, stream_scope scope) {
if (!_storage_manager.is_known_endpoint(endpoint)) {
throw std::invalid_argument(format("endpoint {} not found", endpoint));
}
llog.info("Restore sstables from {}({}) to {}", endpoint, prefix, ks_name);
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name),
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(prefix), std::move(sstables), scope);
co_return task->id();
}

View File

@@ -90,7 +90,7 @@ private:
future<> load_and_stream(sstring ks_name, sstring cf_name,
table_id, std::vector<sstables::shared_sstable> sstables,
bool_class<struct primary_replica_only_tag> primary_replica_only, bool unlink_sstables, stream_scope scope,
bool primary_replica_only, bool unlink_sstables, stream_scope scope,
shared_ptr<stream_progress> progress);
future<seastar::shared_ptr<const locator::effective_replication_map>> await_topology_quiesced_and_get_erm(table_id table_id);
@@ -130,7 +130,7 @@ public:
*/
future<tasks::task_id> download_new_sstables(sstring ks_name, sstring cf_name,
sstring prefix, std::vector<sstring> sstables,
sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica);
sstring endpoint, sstring bucket, stream_scope scope);
class download_task_impl;
};

View File

@@ -920,7 +920,11 @@ def test_rbac_tagresource(dynamodb, cql):
def test_rbac_updatetimetolive(dynamodb, cql):
with new_test_table(dynamodb,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }]
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }],
# Work around issue #16567 that Alternator TTL doesn't work with
# tablets. When that issue is solved, the following Tags should be
# removed.
Tags=[{'Key': 'experimental:initial_tablets', 'Value': 'none'}]
) as table:
with new_role(cql) as (role, key):
with new_dynamodb(dynamodb, role, key) as d:
@@ -989,7 +993,7 @@ def test_rbac_streams(dynamodb, cql):
# Work around issue #16137 that Alternator Streams doesn't work with
# tablets. When that issue is solved, the following Tags should be
# removed.
'Tags': [{'Key': 'system:initial_tablets', 'Value': 'none'}]
'Tags': [{'Key': 'experimental:initial_tablets', 'Value': 'none'}]
}
with new_test_table(dynamodb, **schema) as table:
with new_role(cql) as (role, key):
@@ -1029,7 +1033,7 @@ def test_rbac_streams_autogrant(dynamodb, cql, during_creation):
# Work around issue #16137 that Alternator Streams doesn't work with
# tablets. When that issue is solved, the following Tags should be
# removed.
'Tags': [{'Key': 'system:initial_tablets', 'Value': 'none'}]
'Tags': [{'Key': 'experimental:initial_tablets', 'Value': 'none'}]
}
enable_stream = {'StreamSpecification': {'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}}
if during_creation:
@@ -1065,7 +1069,7 @@ def test_rbac_streams_autorevoke(dynamodb, cql):
# Work around issue #16137 that Alternator Streams doesn't work with
# tablets. When that issue is solved, the following Tags should be
# removed.
'Tags': [{'Key': 'system:initial_tablets', 'Value': 'none'}]
'Tags': [{'Key': 'experimental:initial_tablets', 'Value': 'none'}]
}
table_name = unique_table_name()
with new_role(cql) as (role1, key1), new_role(cql) as (role2, key2):

View File

@@ -458,10 +458,10 @@ def test_streams_latency(dynamodb, dynamodbstreams, metrics):
with new_test_table(dynamodb,
# Alternator Streams is expected to fail with tablets due to #23838.
# To ensure that this test still runs, instead of xfailing it, we
# temporarily coerce Alternator to avoid using default tablets
# temporarily coerce Altenator to avoid using default tablets
# setting, even if it's available. We do this by using the following
# tags when creating the table:
Tags=[{'Key': 'system:initial_tablets', 'Value': 'none'}],
Tags=[{'Key': 'experimental:initial_tablets', 'Value': 'none'}],
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'}
@@ -834,10 +834,15 @@ def alternator_ttl_period_in_seconds(dynamodb, request):
# up to the setting of alternator_ttl_period_in_seconds. test/alternator/run
# sets this to 1 second, which becomes the maximum delay of this test, but
# if it is set higher we skip this test unless --runveryslow is enabled.
# This test fails with tablets due to #16567, so to temporarily ensure that
# Alternator TTL is still being tested, we use the following TAGS to
# coerce Alternator to create the test table without tablets.
TAGS = [{'Key': 'experimental:initial_tablets', 'Value': 'none'}]
def test_ttl_stats(dynamodb, metrics, alternator_ttl_period_in_seconds):
print(alternator_ttl_period_in_seconds)
with check_increases_metric(metrics, ['scylla_expiration_scan_passes', 'scylla_expiration_scan_table', 'scylla_expiration_items_deleted']):
with new_test_table(dynamodb,
Tags = TAGS,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
# Insert one already-expired item, and then enable TTL:

View File

@@ -412,43 +412,31 @@ def test_scan_long_partition_tombstone_string(dynamodb, query_tombstone_page_lim
# Verify that even if no "Limit" is specified for a Scan, the size of a
# single returned page is still limited. DynamoDB specifies it should be
# limited to 1 MB. In Alternator the limit is usually also close to 1 MB,
# but it turns out (see issue #10327) that for small tables Scylla scans
# multiple vnodes or tablets in parallel and the returned page size can
# grow signficantly above 1MB, depending on implementation details and the
# number of vnodes or tablets.
# Since the primary goal of this test was just to check that the Scan doesn't
# return the *entire* table, below we fill the data with enough data (KB=8000)
# that not all of it is returned in a single page, even if we have up to
# 16 vnodes or 8 tablets (per node).
# Additionally, we have another parametrized version of this test, where
# the table has just 1.5 MB of data, as we would like a Scan not to return
# all 1.5 MB in one page, but because of issue #10327 it does, so the test
# fails.
# limited to 1 MB. In Alternator the limit is close to 1 MB, but it turns
# out (see issue #10327) that for small tables the page size can grow up
# to 3 MB. The following test accepts this as ok. Note that for larger tables,
# the page size goes back to being closer to 1 MB.
#
# This test is for Scan paging on a table with many small partitions. We have
# a separate test for a Query over a single long partition with many rows -
# test_query.py::test_query_reverse_longish (the test's name suggests it
# checks reverse queries, but it also checks the unreversed unlimited query).
# For single-partition scans, the page size is more exactly 1 MB.
#
@pytest.mark.parametrize("KB", [
pytest.param(1500, marks=pytest.mark.xfail(
reason="Issue #10327: small tables don't respect paging limits")),
8000
])
def test_scan_paging_missing_limit(dynamodb, KB):
def test_scan_paging_missing_limit(dynamodb):
with new_test_table(dynamodb,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'N' }]) as table:
# Insert data in multiple smaller partitions.
# Insert a 6 MB of data in multiple smaller partitions.
# Because of issue #10327 when the table is *small* Alternator may
# return significantly more than 1 MB - sometimes even 4 MB. This
# is why we need to use 6 MB of data here and 2 MB is not enough.
str = 'x' * 10240
N = KB // 10
N = 600
with table.batch_writer() as batch:
for i in range(N):
batch.put_item({'p': i, 's': str})
n = len(table.scan(ConsistentRead=True)['Items'])
# we don't know how big n should be (hopefully around 100)
# but definitely not N.
assert n < N, f"The response was not paged, {n*len(str)/1024} kB of data returned in a single page"
assert n < N

View File

@@ -21,7 +21,7 @@ from test.alternator.util import unique_table_name, create_test_table, new_test_
# xfailing these tests, we temporarily coerce the tests below to avoid
# using default tablets setting, even if it's available. We do this by
# using the following tags when creating each table below:
TAGS = [{'Key': 'system:initial_tablets', 'Value': 'none'}]
TAGS = [{'Key': 'experimental:initial_tablets', 'Value': 'none'}]
stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES']

View File

@@ -697,3 +697,11 @@ def test_create_table_spurious_attribute_definitions(dynamodb):
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' }]) as table:
pass
# Currently, because of incomplete LWT support, Alternator tables do not use
# tablets by default - even if the tablets experimental feature is enabled.
# This test enshrines this fact - that an Alternator table doesn't use tablets.
# This is a temporary test: When we reverse this decision and tablets go back
# to being used by default on Alternator tables, this test should be deleted.
def test_alternator_doesnt_use_tablets(dynamodb, has_tablets):
assert not has_tablets

View File

@@ -8,7 +8,7 @@
# old vnodes), that the DynamoDB API user would not even be aware
# of. So there should be very few, if any, tests in this file.
# However, temporarily - while the tablets feature is only partially
# working, it is useful
# working and turned off by default (see issue #21989) - it is useful
# to have here a few tests that clarify the situation and how to
# override it. Most of these tests, or perhaps even this entire file,
# will probably go away eventually.
@@ -17,7 +17,7 @@ import pytest
import boto3
from botocore.exceptions import ClientError
from .util import new_test_table, wait_for_gsi, random_string, full_scan, full_query, multiset, scylla_config_read, scylla_config_temporary
from .util import new_test_table, wait_for_gsi, random_string, full_scan, full_query, multiset
# All tests in this file are scylla-only
@pytest.fixture(scope="function", autouse=True)
@@ -45,34 +45,27 @@ def uses_tablets(dynamodb, table):
return True
return False
# Utility function for checking whether using tablets by a given table
# is in-line with the global Scylla configuration flag regarding tablets.
def assert_tablets_usage_follows_config(dynamodb, table):
tablets_default = scylla_config_read(dynamodb, 'tablets_mode_for_new_keyspaces')
if tablets_default in ("\"enabled\"", "\"enforced\"", None):
assert uses_tablets(dynamodb, table)
else:
assert not uses_tablets(dynamodb, table)
# New Alternator tables are created with tablets or vnodes, according to the
# "tablets_mode_for_new_keyspaces" configuration flag.
# Right now, new Alternator tables are created *without* tablets.
# This test should be changed if this default ever changes.
def test_default_tablets(dynamodb):
schema = {
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
'AttributeDefinitions': [ { 'AttributeName': 'p', 'AttributeType': 'S' }]}
with new_test_table(dynamodb, **schema) as table:
assert_tablets_usage_follows_config(dynamodb, table)
# Change this assertion if Alternator's default changes!
assert not uses_tablets(dynamodb, table)
# Tests for the initial_tablets tag named "system:initial_tablets".
# This tag was earlier called "experimental:initial_tablets".
# Ref. #26211
initial_tablets_tag = 'system:initial_tablets'
# Tests for the initial_tablets tag. Currently, it is considered
# experimental, and named "experimental:initial_tablets", but perhaps
# in the future it will graduate out of experimental status and
# the prefix will be replaced by "system:".
initial_tablets_tag = 'experimental:initial_tablets'
# Check that a table created with a number as initial_tablets will use
# tablets. Different numbers have different meanings (0 asked to use
# default number, any other number overrides the default) but they
# all enable tablets.
def test_initial_tablets_int(dynamodb):
def test_initial_tablets_number(dynamodb):
for value in ['0', '4']:
schema = {
'Tags': [{'Key': initial_tablets_tag, 'Value': value}],
@@ -83,7 +76,7 @@ def test_initial_tablets_int(dynamodb):
# Check that a table created with a non-number (e.g., the string "none")
# as initial_tablets, will not use tablets.
def test_initial_tablets_not_int(dynamodb):
def test_initial_tablets_number(dynamodb):
schema = {
'Tags': [{'Key': initial_tablets_tag, 'Value': 'none'}],
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
@@ -91,47 +84,6 @@ def test_initial_tablets_not_int(dynamodb):
with new_test_table(dynamodb, **schema) as table:
assert not uses_tablets(dynamodb, table)
# Usage of tablets is determined by the configuration flag
# "tablets_mode_for_new_keyspaces", as well as by the per-table
# "system:initial_tablets" tag. The tag overrides the configuration,
# except when the configuration flag's value is "enforced" -
# then if the tag asks for vnodes, an error is generated.
def test_tablets_tag_vs_config(dynamodb):
schema = {
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
'AttributeDefinitions': [ { 'AttributeName': 'p', 'AttributeType': 'S' }]
}
schema_tablets = {**schema, 'Tags': [{'Key': initial_tablets_tag, 'Value': '0'}]}
schema_vnodes = {**schema, 'Tags': [{'Key': initial_tablets_tag, 'Value': 'none'}]}
# With tablets_mode_for_new_keyspaces=enabled, tablets are used unless
# the user explicitly asks for vnodes (schema_vnodes).
with scylla_config_temporary(dynamodb, 'tablets_mode_for_new_keyspaces', 'enabled'):
with new_test_table(dynamodb, **schema) as table:
assert uses_tablets(dynamodb, table)
with new_test_table(dynamodb, **schema_tablets) as table:
assert uses_tablets(dynamodb, table)
with new_test_table(dynamodb, **schema_vnodes) as table:
assert not uses_tablets(dynamodb, table)
# With tablets_mode_for_new_keyspaces=disabled, vnodes are used unless
# the user explicitly asks tablets (schema_tablets)
with scylla_config_temporary(dynamodb, 'tablets_mode_for_new_keyspaces', 'disabled'):
with new_test_table(dynamodb, **schema) as table:
assert not uses_tablets(dynamodb, table)
with new_test_table(dynamodb, **schema_tablets) as table:
assert uses_tablets(dynamodb, table)
with new_test_table(dynamodb, **schema_vnodes) as table:
assert not uses_tablets(dynamodb, table)
# With tablets_mode_for_new_keyspaces=enforced, tablets are used except
# when the user requests vnodes, which is a ValidationException.
with scylla_config_temporary(dynamodb, 'tablets_mode_for_new_keyspaces', 'enforced'):
with new_test_table(dynamodb, **schema) as table:
assert uses_tablets(dynamodb, table)
with new_test_table(dynamodb, **schema_tablets) as table:
assert uses_tablets(dynamodb, table)
with pytest.raises(ClientError, match='ValidationException.*tablets'):
with new_test_table(dynamodb, **schema_vnodes) as table:
pass
# Before Alternator Streams is supported with tablets (#23838), let's verify
# that enabling Streams results in an orderly error. This test should be
# deleted when #23838 is fixed.
@@ -155,7 +107,7 @@ def test_streams_enable_error_with_tablets(dynamodb):
# For a while (see #18068) it was possible to create an Alternator table with
# tablets enabled and choose LWT for write isolation (always_use_lwt)
# but the writes themselves failed. This test verifies that this is no longer
# the case, and the LWT writes succeed even when tablets are used.
# the case, and the LWT writes succeed even when tables are used.
def test_alternator_tablets_and_lwt(dynamodb):
schema = {
'Tags': [
@@ -164,12 +116,12 @@ def test_alternator_tablets_and_lwt(dynamodb):
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
'AttributeDefinitions': [ { 'AttributeName': 'p', 'AttributeType': 'S' }]}
with new_test_table(dynamodb, **schema) as table:
assert_tablets_usage_follows_config(dynamodb, table)
assert uses_tablets(dynamodb, table)
# This put_item() failed before #18068 was fixed:
table.put_item(Item={'p': 'hello'})
assert table.get_item(Key={'p': 'hello'}, ConsistentRead=True)['Item'] == {'p': 'hello'}
# An Alternator table created with tablets and with a write isolation
# An Alternator table created tablets and with a write isolation
# mode that doesn't use LWT ("forbid_rmw") works normally, even
# before #18068 is fixed.
def test_alternator_tablets_without_lwt(dynamodb):
@@ -180,6 +132,104 @@ def test_alternator_tablets_without_lwt(dynamodb):
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
'AttributeDefinitions': [ { 'AttributeName': 'p', 'AttributeType': 'S' }]}
with new_test_table(dynamodb, **schema) as table:
assert_tablets_usage_follows_config(dynamodb, table)
assert uses_tablets(dynamodb, table)
table.put_item(Item={'p': 'hello'})
assert table.get_item(Key={'p': 'hello'})['Item'] == {'p': 'hello'}
# Reproduces scylladb/scylladb#26615
def test_gsi_with_tablets(dynamodb):
schema = {
'Tags': [{'Key': initial_tablets_tag, 'Value': '4'}],
'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
],
'AttributeDefinitions': [
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
],
'GlobalSecondaryIndexes': [
{ 'IndexName': 'hello',
'KeySchema': [
{ 'AttributeName': 'c', 'KeyType': 'HASH' },
{ 'AttributeName': 'p', 'KeyType': 'RANGE' },
],
'Projection': { 'ProjectionType': 'ALL' }
}
],
}
with new_test_table(dynamodb, **schema) as table:
desc = table.meta.client.describe_table(TableName=table.name)
table_status = desc['Table']['TableStatus']
assert table_status == 'ACTIVE'
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == 'hello']
assert len(index_desc) == 1
index_status = index_desc[0]['IndexStatus']
assert index_status == 'ACTIVE'
# When the index is ACTIVE, this must be after backfilling completed
assert not 'Backfilling' in index_desc[0]
# This test is copy of alternator/test_gsi_backfill.py::test_gsi_backfill but with enabled tablets.
def test_gsi_backfill_with_tablets(dynamodb):
# First create, and fill, a table without GSI. The items in items1
# will have the appropriate string type for 'x' and will later get
# indexed. Items in item2 have no value for 'x', and in items in
# items3 'x' is not a string; So the items in items2 and items3
# will be missing in the index that we'll create later.
with new_test_table(dynamodb,
Tags=[{'Key': initial_tablets_tag, 'Value': '4'}],
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
items1 = [{'p': random_string(), 'x': random_string(), 'y': random_string()} for i in range(10)]
items2 = [{'p': random_string(), 'y': random_string()} for i in range(10)]
items3 = [{'p': random_string(), 'x': i} for i in range(10)]
items = items1 + items2 + items3
with table.batch_writer() as batch:
for item in items:
batch.put_item(item)
assert multiset(items) == multiset(full_scan(table))
# Now use UpdateTable to create the GSI
dynamodb.meta.client.update_table(TableName=table.name,
AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }],
GlobalSecondaryIndexUpdates=[ { 'Create':
{ 'IndexName': 'hello',
'KeySchema': [{ 'AttributeName': 'x', 'KeyType': 'HASH' }],
'Projection': { 'ProjectionType': 'ALL' }
}}])
# update_table is an asynchronous operation. We need to wait until it
# finishes and the table is backfilled.
wait_for_gsi(table, 'hello')
# As explained above, only items in items1 got copied to the gsi,
# so this is what a full table scan on the GSI 'hello' should return.
# Note that we don't need to retry the reads here (i.e., use the
# assert_index_scan() function) because after we waited for
# backfilling to complete, we know all the pre-existing data is
# already in the index.
assert multiset(items1) == multiset(full_scan(table, ConsistentRead=False, IndexName='hello'))
# We can also use Query on the new GSI, to search on the attribute x:
assert multiset([items1[3]]) == multiset(full_query(table,
ConsistentRead=False, IndexName='hello',
KeyConditions={'x': {'AttributeValueList': [items1[3]['x']], 'ComparisonOperator': 'EQ'}}))
# Because the GSI now exists, we are no longer allowed to add to the
# base table items with a wrong type for x (like we were able to add
# earlier - see items3). But if x is missing (as in items2), we
# *are* allowed to add the item and it appears in the base table
# (but the view table doesn't change).
p = random_string()
y = random_string()
table.put_item(Item={'p': p, 'y': y})
assert table.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == {'p': p, 'y': y}
with pytest.raises(ClientError, match='ValidationException.*mismatch'):
table.put_item(Item={'p': random_string(), 'x': 3})
# Let's also test that we cannot add another index with the same name
# that already exists
with pytest.raises(ClientError, match='ValidationException.*already exists'):
dynamodb.meta.client.update_table(TableName=table.name,
AttributeDefinitions=[{ 'AttributeName': 'y', 'AttributeType': 'S' }],
GlobalSecondaryIndexUpdates=[ { 'Create':
{ 'IndexName': 'hello',
'KeySchema': [{ 'AttributeName': 'y', 'KeyType': 'HASH' }],
'Projection': { 'ProjectionType': 'ALL' }
}}])

View File

@@ -22,8 +22,8 @@ from .util import new_test_table, random_string, full_query, unique_table_name,
# both values. Thanks to this, the tests will run for both vnodes and tables without the need to change
# their argument list.
@pytest.fixture(params=[
[{'Key': 'system:initial_tablets', 'Value': 'none'}],
[{'Key': 'system:initial_tablets', 'Value': '0'}],
[{'Key': 'experimental:initial_tablets', 'Value': 'none'}],
[{'Key': 'experimental:initial_tablets', 'Value': '0'}],
], ids=["using vnodes", "using tablets"], autouse=True)
def tags_param(request):
# Set TAGS in the global namespace of this module
@@ -660,7 +660,7 @@ def test_ttl_expiration_streams(dynamodb, dynamodbstreams, waits_for_expiration)
# Alternator Streams currently doesn't work with tablets, so until
# #23838 is solved, skip this test on tablets.
for tag in TAGS:
if tag['Key'] == 'system:initial_tablets' and tag['Value'].isdigit():
if tag['Key'] == 'experimental:initial_tablets' and tag['Value'].isdigit():
pytest.skip("Streams test skipped on tablets due to #23838")
# In my experiments, a 30-minute (1800 seconds) is the typical

View File

@@ -61,8 +61,6 @@ add_scylla_test(compound_test
KIND SEASTAR)
add_scylla_test(compress_test
KIND BOOST)
add_scylla_test(gzip_test
KIND SEASTAR)
add_scylla_test(config_test
KIND SEASTAR)
add_scylla_test(continuous_data_consumer_test

View File

@@ -183,14 +183,8 @@ index_entry_dataset generate_random_dataset(const schema& the_schema, const rand
// Generate a few partition keys.
std::vector<partition_key> pks;
{
std::set<int16_t> pks_set;
while (pks_set.size() < static_cast<size_t>(cfg.partition_key_component_values)) {
pks_set.insert(tests::random::get_int<int16_t>());
}
for (auto& x : pks_set) {
pks.push_back(partition_key::from_deeply_exploded(s, std::vector<data_value>{data_value(x)}));
}
for (int i = 0; i < cfg.partition_key_component_values; ++i) {
pks.push_back(partition_key::from_deeply_exploded(s, std::vector<data_value>{data_value(int16_t(i))}));
}
// Generate the set of decorated keys participating in the test.

View File

@@ -623,34 +623,33 @@ future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name
}
}
future<std::set<sstring>> collect_files(fs::path path) {
std::set<sstring> ret;
directory_lister lister(path, lister::dir_entry_types::of<directory_entry_type::regular>());
while (auto de = co_await lister.get()) {
ret.insert(de->name);
}
co_return ret;
}
static future<> snapshot_works(const std::string& table_name) {
return do_with_some_data({"cf"}, [table_name] (cql_test_env& e) {
take_snapshot(e, "ks", table_name).get();
std::set<sstring> expected = {
"manifest.json",
"schema.cql"
};
auto& cf = e.local_db().find_column_family("ks", table_name);
auto table_directory = table_dir(cf);
auto snapshot_dir = table_directory / sstables::snapshots_dir / "test";
auto in_table_dir = collect_files(table_directory).get();
lister::scan_dir(table_directory, lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected](fs::path, directory_entry de) {
expected.insert(de.name);
return make_ready_future<>();
}).get();
// snapshot triggered a flush and wrote the data down.
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
BOOST_REQUIRE_GE(expected.size(), 11);
auto in_snapshot_dir = collect_files(snapshot_dir).get();
in_table_dir.insert("manifest.json");
in_table_dir.insert("schema.cql");
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
lister::scan_dir(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected](fs::path, directory_entry de) {
expected.erase(de.name);
return make_ready_future<>();
}).get();
BOOST_REQUIRE_EQUAL(expected.size(), 0);
return make_ready_future<>();
}, true);
}
@@ -671,13 +670,26 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
take_snapshot(e, "ks", "cf", "test", true /* skip_flush */).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
std::set<sstring> expected = {
"manifest.json",
};
auto in_table_dir = collect_files(table_dir(cf)).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
expected.insert(de.name);
return make_ready_future<>();
}).get();
// Snapshot did not trigger a flush.
BOOST_REQUIRE(in_table_dir.empty());
auto in_snapshot_dir = collect_files(table_dir(cf) / sstables::snapshots_dir / "test").get();
BOOST_REQUIRE_EQUAL(in_snapshot_dir, std::set<sstring>({"manifest.json", "schema.cql"}));
// Only "manifest.json" is expected.
BOOST_REQUIRE_EQUAL(expected.size(), 1);
// all files were copied and manifest was generated
lister::scan_dir((table_dir(cf) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
expected.erase(de.name);
return make_ready_future<>();
}).get();
BOOST_REQUIRE_EQUAL(expected.size(), 0);
return make_ready_future<>();
});
}
@@ -694,10 +706,10 @@ SEASTAR_TEST_CASE(snapshot_list_okay) {
BOOST_REQUIRE_EQUAL(sd.live, 0);
BOOST_REQUIRE_GT(sd.total, 0);
auto table_directory = table_dir(cf);
for (auto& f : collect_files(table_directory).get()) {
fs::remove(table_directory / f);
}
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
fs::remove(parent_dir / de.name);
return make_ready_future<>();
}).get();
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
@@ -764,7 +776,11 @@ SEASTAR_TEST_CASE(clear_snapshot) {
take_snapshot(e).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
unsigned count = collect_files(table_dir(cf) / sstables::snapshots_dir / "test").get().size();
unsigned count = 0;
lister::scan_dir((table_dir(cf) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&count] (fs::path parent_dir, directory_entry de) {
count++;
return make_ready_future<>();
}).get();
BOOST_REQUIRE_GT(count, 1); // expect more than the manifest alone
e.local_db().clear_snapshot("test", {"ks"}, "").get();
@@ -795,8 +811,12 @@ SEASTAR_TEST_CASE(clear_multiple_snapshots) {
}
for (auto i = 0; i < num_snapshots; i++) {
unsigned count = 0;
testlog.debug("Verifying {}", snapshots_dir / snapshot_name(i));
unsigned count = collect_files(snapshots_dir / snapshot_name(i)).get().size();
lister::scan_dir(snapshots_dir / snapshot_name(i), lister::dir_entry_types::of<directory_entry_type::regular>(), [&count] (fs::path parent_dir, directory_entry de) {
count++;
return make_ready_future<>();
}).get();
BOOST_REQUIRE_GT(count, 1); // expect more than the manifest alone
}
@@ -879,10 +899,10 @@ SEASTAR_TEST_CASE(test_snapshot_ctl_details) {
BOOST_REQUIRE_EQUAL(sc_sd.details.live, sd.live);
BOOST_REQUIRE_EQUAL(sc_sd.details.total, sd.total);
auto table_directory = table_dir(cf);
for (auto& f : collect_files(table_directory).get()) {
fs::remove(table_directory / f);
}
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
fs::remove(parent_dir / de.name);
return make_ready_future<>();
}).get();
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
@@ -921,10 +941,10 @@ SEASTAR_TEST_CASE(test_snapshot_ctl_true_snapshots_size) {
auto sc_live_size = sc.local().true_snapshots_size().get();
BOOST_REQUIRE_EQUAL(sc_live_size, sd.live);
auto table_directory = table_dir(cf);
for (auto& f : collect_files(table_directory).get()) {
fs::remove(table_directory / f);
}
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
fs::remove(parent_dir / de.name);
return make_ready_future<>();
}).get();
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
@@ -1465,9 +1485,18 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
auto& cf = db.local().find_column_family("ks", "cf");
auto in_snap_dir = co_await collect_files(table_dir(cf) / sstables::snapshots_dir / "test");
// all files were copied and manifest was generated
BOOST_REQUIRE(std::includes(in_snap_dir.begin(), in_snap_dir.end(), expected.begin(), expected.end()));
co_await lister::scan_dir((table_dir(cf) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
testlog.debug("Found in snapshots: {}", de.name);
expected.erase(de.name);
return make_ready_future<>();
});
if (!expected.empty()) {
testlog.error("Not in snapshots: {}", expected);
}
BOOST_REQUIRE(expected.empty());
});
}

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More