Compare commits
148 Commits
copilot/fi
...
mergify/co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bdf31d7f54 | ||
|
|
3d915cd091 | ||
|
|
abb26ff913 | ||
|
|
3f821b7f4f | ||
|
|
cab3b86240 | ||
|
|
a3e69cc8fb | ||
|
|
876017efee | ||
|
|
9eed1d1cbd | ||
|
|
d33538bdd4 | ||
|
|
073c9cbaa1 | ||
|
|
a3a0ffbcd0 | ||
|
|
8bf76d6be7 | ||
|
|
1e51ed88c6 | ||
|
|
ca5f938ed4 | ||
|
|
fb20ea7de1 | ||
|
|
d5eb12c25d | ||
|
|
291f568585 | ||
|
|
d5475fbc07 | ||
|
|
6916dbe822 | ||
|
|
f51a8ed541 | ||
|
|
127606f788 | ||
|
|
56a0fa922d | ||
|
|
c841a4a851 | ||
|
|
1a9721e93e | ||
|
|
1dded7e52f | ||
|
|
9082d66d8a | ||
|
|
c7a0876a73 | ||
|
|
917d40e600 | ||
|
|
1fd60424d9 | ||
|
|
af6ddebc7f | ||
|
|
fa71b82da4 | ||
|
|
1a5a6a0758 | ||
|
|
6ae5481de4 | ||
|
|
0bc22db3a9 | ||
|
|
b78675270e | ||
|
|
ea6fe4bfa1 | ||
|
|
30a2ed7488 | ||
|
|
dcddb1ff4a | ||
|
|
4ca0e31415 | ||
|
|
363bc7424e | ||
|
|
a5b11a3189 | ||
|
|
c0eba659f6 | ||
|
|
dbb1dc872d | ||
|
|
07b288b7d7 | ||
|
|
41a44ddc12 | ||
|
|
3f04df55eb | ||
|
|
8c8f97c280 | ||
|
|
a61ab7d02e | ||
|
|
775578af59 | ||
|
|
97f22f426f | ||
|
|
55a9605687 | ||
|
|
83cc3e4791 | ||
|
|
aef7e7db0b | ||
|
|
282cdfcfcc | ||
|
|
b661bc39df | ||
|
|
0805780064 | ||
|
|
3de8885161 | ||
|
|
29a0ce3b0a | ||
|
|
eebf97c545 | ||
|
|
a728695d10 | ||
|
|
82a34aa837 | ||
|
|
d10c6a86cc | ||
|
|
554838691b | ||
|
|
b691dddf6b | ||
|
|
85b1c64a33 | ||
|
|
6e67a993ba | ||
|
|
b8a9fd4e49 | ||
|
|
363cf881d4 | ||
|
|
68a55facdf | ||
|
|
9010d0a22f | ||
|
|
3c0f43b6eb | ||
|
|
a22e4476ac | ||
|
|
14650257c0 | ||
|
|
2a318817ba | ||
|
|
5f052a2b52 | ||
|
|
e018b38a54 | ||
|
|
14ce5e14d0 | ||
|
|
d1a31460a0 | ||
|
|
9175cc528b | ||
|
|
18be4f454e | ||
|
|
f35a083abe | ||
|
|
57affc7fad | ||
|
|
927e526e2d | ||
|
|
b224665575 | ||
|
|
9afb1afefa | ||
|
|
72153cac96 | ||
|
|
f988980260 | ||
|
|
1d11adf766 | ||
|
|
dae1d18145 | ||
|
|
e9588a8a53 | ||
|
|
c73d0ffbaa | ||
|
|
c7b5571766 | ||
|
|
92325073a9 | ||
|
|
f5c0969c06 | ||
|
|
90ced080a8 | ||
|
|
7674d80c31 | ||
|
|
06ceef34a7 | ||
|
|
ec83367b45 | ||
|
|
dfe2e20442 | ||
|
|
ad2191e84f | ||
|
|
855abd7368 | ||
|
|
086dc6d53c | ||
|
|
09b0b3f7d6 | ||
|
|
3bbb7a24b1 | ||
|
|
b43454c658 | ||
|
|
93700ff5d1 | ||
|
|
5e2b4a0e80 | ||
|
|
bb5dc0771c | ||
|
|
9ed8519362 | ||
|
|
077d7c06a0 | ||
|
|
5a1575678b | ||
|
|
2401f7f9ca | ||
|
|
906d085289 | ||
|
|
34dd3a6daa | ||
|
|
3afa8ee2ca | ||
|
|
3347152ff9 | ||
|
|
ff7bd937e2 | ||
|
|
50ea1dbe32 | ||
|
|
45125c4d7d | ||
|
|
9207f7823d | ||
|
|
711864687f | ||
|
|
faf11e5bc3 | ||
|
|
f9215b4d7e | ||
|
|
469ac9976a | ||
|
|
d341f1ef1e | ||
|
|
07dfcd1f64 | ||
|
|
f8d63b5572 | ||
|
|
ca83da91d1 | ||
|
|
f55081fb1a | ||
|
|
aa8cdec5bd | ||
|
|
75a2484dba | ||
|
|
37387135b4 | ||
|
|
ac24ab5141 | ||
|
|
729dc03e0c | ||
|
|
9d64ced982 | ||
|
|
ea6349a6f5 | ||
|
|
ed9122a84e | ||
|
|
c7d6b4a194 | ||
|
|
a35e138b22 | ||
|
|
3db67faa8a | ||
|
|
6a12174e2d | ||
|
|
ca0096ccb8 | ||
|
|
a71d4bc49c | ||
|
|
749399e4b8 | ||
|
|
bdd97b2950 | ||
|
|
1a056f0cab | ||
|
|
cf78a2caca | ||
|
|
cbc53f0e81 |
181
.github/scripts/auto-backport.py
vendored
Executable file
181
.github/scripts/auto-backport.py
vendored
Executable file
@@ -0,0 +1,181 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import tempfile
|
||||
import logging
|
||||
|
||||
from github import Github, GithubException
|
||||
from git import Repo, GitCommandError
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
try:
|
||||
github_token = os.environ["GITHUB_TOKEN"]
|
||||
except KeyError:
|
||||
print("Please set the 'GITHUB_TOKEN' environment variable")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def is_pull_request():
|
||||
return '--pull-request' in sys.argv[1:]
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--repo', type=str, required=True, help='Github repository name')
|
||||
parser.add_argument('--base-branch', type=str, default='refs/heads/master', help='Base branch')
|
||||
parser.add_argument('--commits', default=None, type=str, help='Range of promoted commits.')
|
||||
parser.add_argument('--pull-request', type=int, help='Pull request number to be backported')
|
||||
parser.add_argument('--head-commit', type=str, required=is_pull_request(), help='The HEAD of target branch after the pull request specified by --pull-request is merged')
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr_title, commits, is_draft=False):
|
||||
pr_body = f'{pr.body}\n\n'
|
||||
for commit in commits:
|
||||
pr_body += f'- (cherry picked from commit {commit})\n\n'
|
||||
pr_body += f'Parent PR: #{pr.number}'
|
||||
try:
|
||||
backport_pr = repo.create_pull(
|
||||
title=backport_pr_title,
|
||||
body=pr_body,
|
||||
head=f'scylladbbot:{new_branch_name}',
|
||||
base=base_branch_name,
|
||||
draft=is_draft
|
||||
)
|
||||
logging.info(f"Pull request created: {backport_pr.html_url}")
|
||||
backport_pr.add_to_assignees(pr.user)
|
||||
logging.info(f"Assigned PR to original author: {pr.user}")
|
||||
return backport_pr
|
||||
except GithubException as e:
|
||||
if 'A pull request already exists' in str(e):
|
||||
logging.warning(f'A pull request already exists for {pr.user}:{new_branch_name}')
|
||||
else:
|
||||
logging.error(f'Failed to create PR: {e}')
|
||||
|
||||
|
||||
def get_pr_commits(repo, pr, stable_branch, start_commit=None):
|
||||
commits = []
|
||||
if pr.merged:
|
||||
merge_commit = repo.get_commit(pr.merge_commit_sha)
|
||||
if len(merge_commit.parents) > 1: # Check if this merge commit includes multiple commits
|
||||
commits.append(pr.merge_commit_sha)
|
||||
else:
|
||||
if start_commit:
|
||||
promoted_commits = repo.compare(start_commit, stable_branch).commits
|
||||
else:
|
||||
promoted_commits = repo.get_commits(sha=stable_branch)
|
||||
for commit in pr.get_commits():
|
||||
for promoted_commit in promoted_commits:
|
||||
commit_title = commit.commit.message.splitlines()[0]
|
||||
# In Scylla-pkg and scylla-dtest, for example,
|
||||
# we don't create a merge commit for a PR with multiple commits,
|
||||
# according to the GitHub API, the last commit will be the merge commit,
|
||||
# which is not what we need when backporting (we need all the commits).
|
||||
# So here, we are validating the correct SHA for each commit so we can cherry-pick
|
||||
if promoted_commit.commit.message.startswith(commit_title):
|
||||
commits.append(promoted_commit.sha)
|
||||
|
||||
elif pr.state == 'closed':
|
||||
events = pr.get_issue_events()
|
||||
for event in events:
|
||||
if event.event == 'closed':
|
||||
commits.append(event.commit_id)
|
||||
return commits
|
||||
|
||||
|
||||
def create_pr_comment_and_remove_label(pr, comment_body):
|
||||
labels = pr.get_labels()
|
||||
pattern = re.compile(r"backport/\d+\.\d+$")
|
||||
for label in labels:
|
||||
if pattern.match(label.name):
|
||||
print(f"Removing label: {label.name}")
|
||||
comment_body += f'- {label.name}\n'
|
||||
pr.remove_from_labels(label)
|
||||
pr.create_issue_comment(comment_body)
|
||||
|
||||
|
||||
def backport(repo, pr, version, commits, backport_base_branch):
|
||||
new_branch_name = f'backport/{pr.number}/to-{version}'
|
||||
backport_pr_title = f'[Backport {version}] {pr.title}'
|
||||
repo_url = f'https://scylladbbot:{github_token}@github.com/{repo.full_name}.git'
|
||||
fork_repo = f'https://scylladbbot:{github_token}@github.com/scylladbbot/{repo.name}.git'
|
||||
with (tempfile.TemporaryDirectory() as local_repo_path):
|
||||
try:
|
||||
repo_local = Repo.clone_from(repo_url, local_repo_path, branch=backport_base_branch)
|
||||
repo_local.git.checkout(b=new_branch_name)
|
||||
is_draft = False
|
||||
for commit in commits:
|
||||
try:
|
||||
repo_local.git.cherry_pick(commit, '-m1', '-x')
|
||||
except GitCommandError as e:
|
||||
logging.warning(f'Cherry-pick conflict on commit {commit}: {e}')
|
||||
is_draft = True
|
||||
repo_local.git.add(A=True)
|
||||
repo_local.git.cherry_pick('--continue')
|
||||
if not repo.private and not repo.has_in_collaborators(pr.user.login):
|
||||
repo.add_to_collaborators(pr.user.login, permission="push")
|
||||
comment = f':warning: @{pr.user.login} you have been added as collaborator to scylladbbot fork '
|
||||
comment += f'Please check your inbox and approve the invitation, once it is done, please add the backport labels again'
|
||||
create_pr_comment_and_remove_label(pr, comment)
|
||||
return
|
||||
repo_local.git.push(fork_repo, new_branch_name, force=True)
|
||||
create_pull_request(repo, new_branch_name, backport_base_branch, pr, backport_pr_title, commits,
|
||||
is_draft=is_draft)
|
||||
|
||||
except GitCommandError as e:
|
||||
logging.warning(f"GitCommandError: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
base_branch = args.base_branch.split('/')[2]
|
||||
promoted_label = 'promoted-to-master'
|
||||
repo_name = args.repo
|
||||
if 'scylla-enterprise' in args.repo:
|
||||
promoted_label = 'promoted-to-enterprise'
|
||||
stable_branch = base_branch
|
||||
backport_branch = 'branch-'
|
||||
|
||||
backport_label_pattern = re.compile(r'backport/\d+\.\d+$')
|
||||
|
||||
g = Github(github_token)
|
||||
repo = g.get_repo(repo_name)
|
||||
closed_prs = []
|
||||
start_commit = None
|
||||
|
||||
if args.commits:
|
||||
start_commit, end_commit = args.commits.split('..')
|
||||
commits = repo.compare(start_commit, end_commit).commits
|
||||
for commit in commits:
|
||||
match = re.search(rf"Closes .*#([0-9]+)", commit.commit.message, re.IGNORECASE)
|
||||
if match:
|
||||
pr_number = int(match.group(1))
|
||||
pr = repo.get_pull(pr_number)
|
||||
closed_prs.append(pr)
|
||||
if args.pull_request:
|
||||
start_commit = args.head_commit
|
||||
pr = repo.get_pull(args.pull_request)
|
||||
closed_prs = [pr]
|
||||
|
||||
for pr in closed_prs:
|
||||
labels = [label.name for label in pr.labels]
|
||||
backport_labels = [label for label in labels if backport_label_pattern.match(label)]
|
||||
if promoted_label not in labels:
|
||||
print(f'no {promoted_label} label: {pr.number}')
|
||||
continue
|
||||
if not backport_labels:
|
||||
print(f'no backport label: {pr.number}')
|
||||
continue
|
||||
commits = get_pr_commits(repo, pr, stable_branch, start_commit)
|
||||
logging.info(f"Found PR #{pr.number} with commit {commits} and the following labels: {backport_labels}")
|
||||
for backport_label in backport_labels:
|
||||
version = backport_label.replace('backport/', '')
|
||||
backport_base_branch = backport_label.replace('backport/', backport_branch)
|
||||
backport(repo, pr, version, commits, backport_base_branch)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
20
.github/scripts/label_promoted_commits.py
vendored
20
.github/scripts/label_promoted_commits.py
vendored
@@ -16,13 +16,8 @@ def parser():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--repository', type=str, required=True,
|
||||
help='Github repository name (e.g., scylladb/scylladb)')
|
||||
parser.add_argument('--commit_before_merge', type=str, required=True, help='Git commit ID to start labeling from ('
|
||||
'newest commit).')
|
||||
parser.add_argument('--commit_after_merge', type=str, required=True,
|
||||
help='Git commit ID to end labeling at (oldest '
|
||||
'commit, exclusive).')
|
||||
parser.add_argument('--update_issue', type=bool, default=False, help='Set True to update issues when backport was '
|
||||
'done')
|
||||
parser.add_argument('--commits', type=str, required=True, help='Range of promoted commits.')
|
||||
parser.add_argument('--label', type=str, default='promoted-to-master', help='Label to use')
|
||||
parser.add_argument('--ref', type=str, required=True, help='PR target branch')
|
||||
return parser.parse_args()
|
||||
|
||||
@@ -53,10 +48,11 @@ def main():
|
||||
target_branch = re.search(r'branch-(\d+\.\d+)', args.ref)
|
||||
g = Github(github_token)
|
||||
repo = g.get_repo(args.repository, lazy=False)
|
||||
commits = repo.compare(head=args.commit_after_merge, base=args.commit_before_merge)
|
||||
start_commit, end_commit = args.commits.split('..')
|
||||
commits = repo.compare(start_commit, end_commit).commits
|
||||
processed_prs = set()
|
||||
# Print commit information
|
||||
for commit in commits.commits:
|
||||
for commit in commits:
|
||||
print(f'Commit sha is: {commit.sha}')
|
||||
match = pr_pattern.search(commit.commit.message)
|
||||
if match:
|
||||
@@ -66,13 +62,13 @@ def main():
|
||||
if target_branch:
|
||||
pr = repo.get_pull(pr_number)
|
||||
branch_name = target_branch[1]
|
||||
refs_pr = re.findall(r'Refs (?:#|https.*?)(\d+)', pr.body)
|
||||
refs_pr = re.findall(r'Parent PR: (?:#|https.*?)(\d+)', pr.body)
|
||||
if refs_pr:
|
||||
print(f'branch-{target_branch.group(1)}, pr number is: {pr_number}')
|
||||
# 1. change the backport label of the parent PR to note that
|
||||
# we've merge the corresponding backport PR
|
||||
# we've merged the corresponding backport PR
|
||||
# 2. close the backport PR and leave a comment on it to note
|
||||
# that it has been merged with a certain git commit,
|
||||
# that it has been merged with a certain git commit.
|
||||
ref_pr_number = refs_pr[0]
|
||||
mark_backport_done(repo, ref_pr_number, branch_name)
|
||||
comment = f'Closed via {commit.sha}'
|
||||
|
||||
51
.github/workflows/add-label-when-promoted.yaml
vendored
51
.github/workflows/add-label-when-promoted.yaml
vendored
@@ -5,9 +5,10 @@ on:
|
||||
branches:
|
||||
- master
|
||||
- branch-*.*
|
||||
|
||||
env:
|
||||
DEFAULT_BRANCH: 'master'
|
||||
- enterprise
|
||||
pull_request_target:
|
||||
types: [labeled]
|
||||
branches: [master, next, enterprise]
|
||||
|
||||
jobs:
|
||||
check-commit:
|
||||
@@ -20,17 +21,51 @@ jobs:
|
||||
env:
|
||||
GITHUB_CONTEXT: ${{ toJson(github) }}
|
||||
run: echo "$GITHUB_CONTEXT"
|
||||
- name: Set Default Branch
|
||||
id: set_branch
|
||||
run: |
|
||||
if [[ "${{ github.repository }}" == *enterprise* ]]; then
|
||||
echo "DEFAULT_BRANCH=enterprise" >> $GITHUB_ENV
|
||||
else
|
||||
echo "DEFAULT_BRANCH=master" >> $GITHUB_ENV
|
||||
fi
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: ${{ github.repository }}
|
||||
ref: ${{ env.DEFAULT_BRANCH }}
|
||||
token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
fetch-depth: 0 # Fetch all history for all tags and branches
|
||||
|
||||
- name: Set up Git identity
|
||||
run: |
|
||||
git config --global user.name "GitHub Action"
|
||||
git config --global user.email "action@github.com"
|
||||
git config --global merge.conflictstyle diff3
|
||||
- name: Install dependencies
|
||||
run: sudo apt-get install -y python3-github
|
||||
|
||||
run: sudo apt-get install -y python3-github python3-git
|
||||
- name: Run python script
|
||||
if: github.event_name == 'push'
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: python .github/scripts/label_promoted_commits.py --commit_before_merge ${{ github.event.before }} --commit_after_merge ${{ github.event.after }} --repository ${{ github.repository }} --ref ${{ github.ref }}
|
||||
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
run: python .github/scripts/label_promoted_commits.py --commits ${{ github.event.before }}..${{ github.sha }} --repository ${{ github.repository }} --ref ${{ github.ref }}
|
||||
- name: Run auto-backport.py when promotion completed
|
||||
if: github.event_name == 'push' && github.ref == 'refs/heads/${{ env.DEFAULT_BRANCH }}'
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
run: python .github/scripts/auto-backport.py --repo ${{ github.repository }} --base-branch ${{ github.ref }} --commits ${{ github.event.before }}..${{ github.sha }}
|
||||
- name: Check if label starts with 'backport/' and contains digits
|
||||
id: check_label
|
||||
run: |
|
||||
label_name="${{ github.event.label.name }}"
|
||||
if [[ "$label_name" =~ ^backport/[0-9]+\.[0-9]+$ ]]; then
|
||||
echo "Label matches backport/X.X pattern."
|
||||
echo "backport_label=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "Label does not match the required pattern."
|
||||
echo "backport_label=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
- name: Run auto-backport.py when label was added
|
||||
if: github.event_name == 'pull_request_target' && steps.check_label.outputs.backport_label == 'true' && (github.event.pull_request.state == 'closed' && github.event.pull_request.merged == true)
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
run: python .github/scripts/auto-backport.py --repo ${{ github.repository }} --base-branch ${{ github.ref }} --pull-request ${{ github.event.pull_request.number }} --head-commit ${{ github.event.pull_request.base.sha }}
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=6.2.0-dev
|
||||
VERSION=6.2.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
@@ -104,7 +104,7 @@ else
|
||||
fi
|
||||
|
||||
if [ -f "$OUTPUT_DIR/SCYLLA-RELEASE-FILE" ]; then
|
||||
GIT_COMMIT_FILE=$(cat "$OUTPUT_DIR/SCYLLA-RELEASE-FILE" |cut -d . -f 3)
|
||||
GIT_COMMIT_FILE=$(cat "$OUTPUT_DIR/SCYLLA-RELEASE-FILE" | rev | cut -d . -f 1 | rev)
|
||||
if [ "$GIT_COMMIT" = "$GIT_COMMIT_FILE" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
@@ -2195,7 +2195,6 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
mutation_builders.reserve(request_items.MemberCount());
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
batch_size++;
|
||||
schema_ptr schema = get_table_from_batch_request(_proxy, it);
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
|
||||
@@ -2216,6 +2215,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else if (r_name == "DeleteRequest") {
|
||||
const rjson::value& key = (r->value)["Key"];
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
@@ -2226,6 +2226,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
co_return api_error::validation("Provided list of item keys contains duplicates");
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
batch_size++;
|
||||
} else {
|
||||
co_return api_error::validation(fmt::format("Unknown BatchWriteItem request type: {}", r_name));
|
||||
}
|
||||
@@ -3483,7 +3484,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
};
|
||||
std::vector<table_requests> requests;
|
||||
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs(get_table_from_batch_request(_proxy, it));
|
||||
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
|
||||
@@ -3497,6 +3498,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rs.add(key);
|
||||
check_key(key, rs.schema);
|
||||
}
|
||||
batch_size += rs.requests.size();
|
||||
requests.emplace_back(std::move(rs));
|
||||
}
|
||||
|
||||
@@ -3504,7 +3506,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
co_await verify_permission(client_state, tr.schema, auth::permission::SELECT);
|
||||
}
|
||||
|
||||
_stats.api_operations.batch_get_item_batch_total += requests.size();
|
||||
_stats.api_operations.batch_get_item_batch_total += batch_size;
|
||||
// If we got here, all "requests" are valid, so let's start the
|
||||
// requests for the different partitions all in parallel.
|
||||
std::vector<future<std::vector<rjson::value>>> response_futures;
|
||||
|
||||
@@ -29,8 +29,6 @@ stats::stats() : api_operations{} {
|
||||
seastar::metrics::description("Latency summary of an operation via Alternator API"), [this]{return to_metrics_summary(api_operations.name.summary());})(op(CamelCaseName)).set_skip_when_empty(),
|
||||
OPERATION(batch_get_item, "BatchGetItem")
|
||||
OPERATION(batch_write_item, "BatchWriteItem")
|
||||
OPERATION(batch_get_item_batch_total, "BatchGetItemSize")
|
||||
OPERATION(batch_write_item_batch_total, "BatchWriteItemSize")
|
||||
OPERATION(create_backup, "CreateBackup")
|
||||
OPERATION(create_global_table, "CreateGlobalTable")
|
||||
OPERATION(create_table, "CreateTable")
|
||||
@@ -98,6 +96,10 @@ stats::stats() : api_operations{} {
|
||||
seastar::metrics::description("number of rows read and matched during filtering operations")),
|
||||
seastar::metrics::make_total_operations("filtered_rows_dropped_total", [this] { return cql_stats.filtered_rows_read_total - cql_stats.filtered_rows_matched_total; },
|
||||
seastar::metrics::description("number of rows read and dropped during filtering operations")),
|
||||
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchWriteItem")},
|
||||
api_operations.batch_write_item_batch_total).set_skip_when_empty(),
|
||||
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchGetItem")},
|
||||
api_operations.batch_get_item_batch_total).set_skip_when_empty(),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -898,7 +898,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
auto host_id = validate_host_id(req->get_query_param("host_id"));
|
||||
std::vector<sstring> ignore_nodes_strs = utils::split_comma_separated_list(req->get_query_param("ignore_nodes"));
|
||||
apilog.info("remove_node: host_id={} ignore_nodes={}", host_id, ignore_nodes_strs);
|
||||
auto ignore_nodes = std::list<locator::host_id_or_endpoint>();
|
||||
locator::host_id_or_endpoint_list ignore_nodes;
|
||||
ignore_nodes.reserve(ignore_nodes_strs.size());
|
||||
for (const sstring& n : ignore_nodes_strs) {
|
||||
try {
|
||||
auto hoep = locator::host_id_or_endpoint(n);
|
||||
|
||||
@@ -71,7 +71,7 @@ void set_token_metadata(http_context& ctx, routes& r, sharded<locator::shared_to
|
||||
|
||||
ss::get_host_id_map.set(r, [&tm](const_req req) {
|
||||
std::vector<ss::mapper> res;
|
||||
return map_to_key_value(tm.local().get()->get_endpoint_to_host_id_map_for_reading(), res);
|
||||
return map_to_key_value(tm.local().get()->get_endpoint_to_host_id_map(), res);
|
||||
});
|
||||
|
||||
static auto host_or_broadcast = [&tm](const_req req) {
|
||||
|
||||
@@ -76,7 +76,7 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
|
||||
continue;
|
||||
} catch (std::out_of_range&) {
|
||||
// just fallthrough
|
||||
} catch (std::regex_error&) {
|
||||
} catch (boost::regex_error&) {
|
||||
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,7 +226,8 @@ static api::timestamp_type get_max_purgeable_timestamp(const table_state& table_
|
||||
}
|
||||
|
||||
static std::vector<shared_sstable> get_uncompacting_sstables(const table_state& table_s, std::vector<shared_sstable> sstables) {
|
||||
auto all_sstables = boost::copy_range<std::vector<shared_sstable>>(*table_s.main_sstable_set().all());
|
||||
auto sstable_set = table_s.sstable_set_for_tombstone_gc();
|
||||
auto all_sstables = boost::copy_range<std::vector<shared_sstable>>(*sstable_set->all());
|
||||
auto& compacted_undeleted = table_s.compacted_undeleted_sstables();
|
||||
all_sstables.insert(all_sstables.end(), compacted_undeleted.begin(), compacted_undeleted.end());
|
||||
boost::sort(all_sstables, [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
|
||||
@@ -188,7 +188,7 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const {
|
||||
return 0;
|
||||
}
|
||||
auto largest_fan_in = std::ranges::max(_tasks | boost::adaptors::transformed([] (auto& task) {
|
||||
return task->compaction_running() ? task->compaction_data().compaction_fan_in : 0;
|
||||
return task.compaction_running() ? task.compaction_data().compaction_fan_in : 0;
|
||||
}));
|
||||
// conservatively limit fan-in threshold to 32, such that tons of small sstables won't accumulate if
|
||||
// running major on a leveled table, which can even have more than one thousand files.
|
||||
@@ -388,11 +388,26 @@ future<sstables::compaction_result> compaction_task_executor::compact_sstables_a
|
||||
|
||||
co_return res;
|
||||
}
|
||||
|
||||
future<sstables::sstable_set> compaction_task_executor::sstable_set_for_tombstone_gc(table_state& t) {
|
||||
auto compound_set = t.sstable_set_for_tombstone_gc();
|
||||
// Compound set will be linearized into a single set, since compaction might add or remove sstables
|
||||
// to it for incremental compaction to work.
|
||||
auto new_set = sstables::make_partitioned_sstable_set(t.schema(), false);
|
||||
co_await compound_set->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) {
|
||||
auto inserted = new_set.insert(sst);
|
||||
if (!inserted) {
|
||||
on_internal_error(cmlog, format("Unable to insert SSTable {} into set used for tombstone GC", sst->get_filename()));
|
||||
}
|
||||
});
|
||||
co_return std::move(new_set);
|
||||
}
|
||||
|
||||
future<sstables::compaction_result> compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, on_replacement& on_replace, compaction_manager::can_purge_tombstones can_purge,
|
||||
sstables::offstrategy offstrategy) {
|
||||
table_state& t = *_compacting_table;
|
||||
if (can_purge) {
|
||||
descriptor.enable_garbage_collection(t.main_sstable_set());
|
||||
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
|
||||
}
|
||||
descriptor.creator = [&t] (shard_id dummy) {
|
||||
auto sst = t.make_sstable();
|
||||
@@ -580,9 +595,9 @@ requires (compaction_manager& cm, throw_if_stopping do_throw_if_stopping, Args&&
|
||||
}
|
||||
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args) {
|
||||
auto task_executor = seastar::make_shared<TaskExecutor>(*this, do_throw_if_stopping, std::forward<Args>(args)...);
|
||||
_tasks.push_back(task_executor);
|
||||
auto unregister_task = defer([this, task_executor] {
|
||||
_tasks.remove(task_executor);
|
||||
_tasks.push_back(*task_executor);
|
||||
auto unregister_task = defer([task_executor] {
|
||||
task_executor->unlink();
|
||||
task_executor->switch_state(compaction_task_executor::state::none);
|
||||
});
|
||||
|
||||
@@ -884,10 +899,10 @@ public:
|
||||
explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {}
|
||||
|
||||
bool has_ongoing_compaction(table_state& table_s) const noexcept override {
|
||||
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr<compaction_task_executor>& task) {
|
||||
return task->compaction_running()
|
||||
&& task->compacting_table()->schema()->ks_name() == s->ks_name()
|
||||
&& task->compacting_table()->schema()->cf_name() == s->cf_name();
|
||||
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const compaction_task_executor& task) {
|
||||
return task.compaction_running()
|
||||
&& task.compacting_table()->schema()->ks_name() == s->ks_name()
|
||||
&& task.compacting_table()->schema()->cf_name() == s->cf_name();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1051,7 +1066,7 @@ void compaction_manager::postpone_compaction_for_table(table_state* t) {
|
||||
_postponed.insert(t);
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, sstring reason) {
|
||||
future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, sstring reason) noexcept {
|
||||
// To prevent compaction from being postponed while tasks are being stopped,
|
||||
// let's stop all tasks before the deferring point below.
|
||||
for (auto& t : tasks) {
|
||||
@@ -1059,14 +1074,16 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_e
|
||||
t->stop_compaction(reason);
|
||||
}
|
||||
co_await coroutine::parallel_for_each(tasks, [] (auto& task) -> future<> {
|
||||
auto unlink_task = deferred_action([task] { task->unlink(); });
|
||||
try {
|
||||
co_await task->compaction_done();
|
||||
} catch (sstables::compaction_stopped_exception&) {
|
||||
// swallow stop exception if a given procedure decides to propagate it to the caller,
|
||||
// as it happens with reshard and reshape.
|
||||
} catch (...) {
|
||||
// just log any other errors as the callers have nothing to do with them.
|
||||
cmlog.debug("Stopping {}: task returned error: {}", *task, std::current_exception());
|
||||
throw;
|
||||
co_return;
|
||||
}
|
||||
cmlog.debug("Stopping {}: done", *task);
|
||||
});
|
||||
@@ -1075,9 +1092,12 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_e
|
||||
future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_state* t, std::optional<sstables::compaction_type> type_opt) noexcept {
|
||||
try {
|
||||
auto ongoing_compactions = get_compactions(t).size();
|
||||
auto tasks = boost::copy_range<std::vector<shared_ptr<compaction_task_executor>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
|
||||
return (!t || task->compacting_table() == t) && (!type_opt || task->compaction_type() == *type_opt);
|
||||
}));
|
||||
auto tasks = _tasks
|
||||
| std::views::filter([t, type_opt] (const auto& task) {
|
||||
return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt);
|
||||
})
|
||||
| std::views::transform([] (auto& task) { return task.shared_from_this(); })
|
||||
| std::ranges::to<std::vector<shared_ptr<compaction_task_executor>>>();
|
||||
logging::log_level level = tasks.empty() ? log_level::debug : log_level::info;
|
||||
if (cmlog.is_enabled(level)) {
|
||||
std::string scope = "";
|
||||
@@ -1091,8 +1111,9 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_stat
|
||||
}
|
||||
return stop_tasks(std::move(tasks), std::move(reason));
|
||||
} catch (...) {
|
||||
return current_exception_as_future<>();
|
||||
cmlog.error("Stopping ongoing compactions failed: {}. Ignored", std::current_exception());
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> compaction_manager::drain() {
|
||||
@@ -1109,17 +1130,17 @@ future<> compaction_manager::stop() {
|
||||
if (auto cm = std::exchange(_task_manager_module, nullptr)) {
|
||||
co_await cm->stop();
|
||||
}
|
||||
if (_state != state::none) {
|
||||
co_return co_await std::move(*_stop_future);
|
||||
if (_stop_future) {
|
||||
co_await std::exchange(*_stop_future, make_ready_future());
|
||||
}
|
||||
}
|
||||
|
||||
future<> compaction_manager::really_do_stop() {
|
||||
future<> compaction_manager::really_do_stop() noexcept {
|
||||
cmlog.info("Asked to stop");
|
||||
// Reset the metrics registry
|
||||
_metrics.clear();
|
||||
co_await stop_ongoing_compactions("shutdown");
|
||||
co_await coroutine::parallel_for_each(_compaction_state | boost::adaptors::map_values, [] (compaction_state& cs) -> future<> {
|
||||
co_await coroutine::parallel_for_each(_compaction_state | std::views::values, [] (compaction_state& cs) -> future<> {
|
||||
if (!cs.gate.is_closed()) {
|
||||
co_await cs.gate.close();
|
||||
}
|
||||
@@ -1618,6 +1639,9 @@ public:
|
||||
std::move(sstables), std::move(compacting), compaction_manager::can_purge_tombstones::yes)
|
||||
, _opt(options.as<sstables::compaction_type_options::split>())
|
||||
{
|
||||
if (utils::get_local_injector().is_enabled("split_sstable_rewrite")) {
|
||||
_do_throw_if_stopping = throw_if_stopping::yes;
|
||||
}
|
||||
}
|
||||
|
||||
static bool sstable_needs_split(const sstables::shared_sstable& sst, const sstables::compaction_type_options::split& opt) {
|
||||
@@ -1633,13 +1657,12 @@ private:
|
||||
bool sstable_needs_split(const sstables::shared_sstable& sst) const {
|
||||
return sstable_needs_split(sst, _opt);
|
||||
}
|
||||
|
||||
protected:
|
||||
sstables::compaction_descriptor make_descriptor(const sstables::shared_sstable& sst) const override {
|
||||
return make_descriptor(sst, _opt);
|
||||
}
|
||||
|
||||
future<sstables::compaction_result> rewrite_sstable(const sstables::shared_sstable sst) override {
|
||||
future<sstables::compaction_result> do_rewrite_sstable(const sstables::shared_sstable sst) {
|
||||
if (sstable_needs_split(sst)) {
|
||||
return rewrite_sstables_compaction_task_executor::rewrite_sstable(std::move(sst));
|
||||
}
|
||||
@@ -1652,6 +1675,20 @@ protected:
|
||||
return sstables::compaction_result{};
|
||||
});
|
||||
}
|
||||
|
||||
future<sstables::compaction_result> rewrite_sstable(const sstables::shared_sstable sst) override {
|
||||
co_await utils::get_local_injector().inject("split_sstable_rewrite", [this] (auto& handler) -> future<> {
|
||||
cmlog.info("split_sstable_rewrite: waiting");
|
||||
while (!handler.poll_for_message() && !_compaction_data.is_stop_requested()) {
|
||||
co_await sleep(std::chrono::milliseconds(5));
|
||||
}
|
||||
cmlog.info("split_sstable_rewrite: released");
|
||||
if (_compaction_data.is_stop_requested()) {
|
||||
throw make_compaction_stopped_exception();
|
||||
}
|
||||
}, false);
|
||||
co_return co_await do_rewrite_sstable(std::move(sst));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
@@ -1979,7 +2016,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range
|
||||
future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t, tasks::task_info info) {
|
||||
auto check_for_cleanup = [this, &t] {
|
||||
return boost::algorithm::any_of(_tasks, [&t] (auto& task) {
|
||||
return task->compacting_table() == &t && task->compaction_type() == sstables::compaction_type::Cleanup;
|
||||
return task.compacting_table() == &t && task.compaction_type() == sstables::compaction_type::Cleanup;
|
||||
});
|
||||
};
|
||||
if (check_for_cleanup()) {
|
||||
@@ -2077,8 +2114,10 @@ compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, table_stat
|
||||
}
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
|
||||
co_await run_custom_job(t, sstables::compaction_type::Split, "Split SSTable",
|
||||
[&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& monitor) -> future<> {
|
||||
// FIXME: indentation.
|
||||
auto gate = get_compaction_state(&t).gate.hold();
|
||||
sstables::compaction_progress_monitor monitor;
|
||||
sstables::compaction_data info = create_compaction_data();
|
||||
sstables::compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
|
||||
desc.creator = [&t] (shard_id _) {
|
||||
return t.make_sstable();
|
||||
@@ -2089,7 +2128,6 @@ compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, table_stat
|
||||
|
||||
co_await sstables::compact_sstables(std::move(desc), info, t, monitor);
|
||||
co_await sst->unlink();
|
||||
}, tasks::task_info{}, throw_if_stopping::yes);
|
||||
|
||||
co_return ret;
|
||||
}
|
||||
@@ -2159,11 +2197,11 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept {
|
||||
auto found = false;
|
||||
sstring msg;
|
||||
for (auto& task : _tasks) {
|
||||
if (task->compacting_table() == &t) {
|
||||
if (task.compacting_table() == &t) {
|
||||
if (!msg.empty()) {
|
||||
msg += "\n";
|
||||
}
|
||||
msg += format("Found {} after remove", *task.get());
|
||||
msg += format("Found {} after remove", task);
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
@@ -2174,30 +2212,38 @@ future<> compaction_manager::remove(table_state& t, sstring reason) noexcept {
|
||||
}
|
||||
|
||||
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(table_state* t) const {
|
||||
auto to_info = [] (const shared_ptr<compaction_task_executor>& task) {
|
||||
auto to_info = [] (const compaction_task_executor& task) {
|
||||
sstables::compaction_info ret;
|
||||
ret.compaction_uuid = task->compaction_data().compaction_uuid;
|
||||
ret.type = task->compaction_type();
|
||||
ret.ks_name = task->compacting_table()->schema()->ks_name();
|
||||
ret.cf_name = task->compacting_table()->schema()->cf_name();
|
||||
ret.total_partitions = task->compaction_data().total_partitions;
|
||||
ret.total_keys_written = task->compaction_data().total_keys_written;
|
||||
ret.compaction_uuid = task.compaction_data().compaction_uuid;
|
||||
ret.type = task.compaction_type();
|
||||
ret.ks_name = task.compacting_table()->schema()->ks_name();
|
||||
ret.cf_name = task.compacting_table()->schema()->cf_name();
|
||||
ret.total_partitions = task.compaction_data().total_partitions;
|
||||
ret.total_keys_written = task.compaction_data().total_keys_written;
|
||||
return ret;
|
||||
};
|
||||
using ret = std::vector<sstables::compaction_info>;
|
||||
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const shared_ptr<compaction_task_executor>& task) {
|
||||
return (!t || task->compacting_table() == t) && task->compaction_running();
|
||||
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const compaction_task_executor& task) {
|
||||
return (!t || task.compacting_table() == t) && task.compaction_running();
|
||||
}) | boost::adaptors::transformed(to_info));
|
||||
}
|
||||
|
||||
bool compaction_manager::has_table_ongoing_compaction(const table_state& t) const {
|
||||
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<compaction_task_executor>& task) {
|
||||
return task->compacting_table() == &t && task->compaction_running();
|
||||
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const compaction_task_executor& task) {
|
||||
return task.compacting_table() == &t && task.compaction_running();
|
||||
});
|
||||
};
|
||||
|
||||
bool compaction_manager::compaction_disabled(table_state& t) const {
|
||||
return _compaction_state.contains(&t) && _compaction_state.at(&t).compaction_disabled();
|
||||
if (auto it = _compaction_state.find(&t); it != _compaction_state.end()) {
|
||||
return it->second.compaction_disabled();
|
||||
} else {
|
||||
cmlog.debug("compaction_disabled: {}:{} not in compaction_state", t.schema()->id(), t.get_group_id());
|
||||
// Compaction is not strictly disabled, but it is not enabled either.
|
||||
// The callers actually care about if it's enabled or not, not about the actual state of
|
||||
// compaction_state::compaction_disabled()
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_compaction(sstring type, table_state* table) {
|
||||
@@ -2222,8 +2268,8 @@ future<> compaction_manager::stop_compaction(sstring type, table_state* table) {
|
||||
void compaction_manager::propagate_replacement(table_state& t,
|
||||
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
|
||||
for (auto& task : _tasks) {
|
||||
if (task->compacting_table() == &t && task->compaction_running()) {
|
||||
task->compaction_data().pending_replacements.push_back({ removed, added });
|
||||
if (task.compacting_table() == &t && task.compaction_running()) {
|
||||
task.compaction_data().pending_replacements.push_back({ removed, added });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,8 +94,13 @@ public:
|
||||
|
||||
private:
|
||||
shared_ptr<compaction::task_manager_module> _task_manager_module;
|
||||
|
||||
using compaction_task_executor_list_type = bi::list<
|
||||
compaction_task_executor,
|
||||
bi::base_hook<bi::list_base_hook<bi::link_mode<bi::auto_unlink>>>,
|
||||
bi::constant_time_size<false>>;
|
||||
// compaction manager may have N fibers to allow parallel compaction per shard.
|
||||
std::list<shared_ptr<compaction::compaction_task_executor>> _tasks;
|
||||
compaction_task_executor_list_type _tasks;
|
||||
|
||||
// Possible states in which the compaction manager can be found.
|
||||
//
|
||||
@@ -179,7 +184,7 @@ private:
|
||||
}
|
||||
future<compaction_manager::compaction_stats_opt> perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args);
|
||||
|
||||
future<> stop_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>> tasks, sstring reason);
|
||||
future<> stop_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>> tasks, sstring reason) noexcept;
|
||||
future<> update_throughput(uint32_t value_mbs);
|
||||
|
||||
// Return the largest fan-in of currently running compactions
|
||||
@@ -245,7 +250,7 @@ private:
|
||||
|
||||
// Stop all fibers, without waiting. Safe to be called multiple times.
|
||||
void do_stop() noexcept;
|
||||
future<> really_do_stop();
|
||||
future<> really_do_stop() noexcept;
|
||||
|
||||
// Propagate replacement of sstables to all ongoing compaction of a given table
|
||||
void propagate_replacement(compaction::table_state& t, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added);
|
||||
@@ -470,7 +475,9 @@ public:
|
||||
|
||||
namespace compaction {
|
||||
|
||||
class compaction_task_executor : public enable_shared_from_this<compaction_task_executor> {
|
||||
class compaction_task_executor
|
||||
: public enable_shared_from_this<compaction_task_executor>
|
||||
, public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
|
||||
public:
|
||||
enum class state {
|
||||
none, // initial and final state
|
||||
@@ -594,6 +601,8 @@ private:
|
||||
future<compaction_manager::compaction_stats_opt> compaction_done() noexcept {
|
||||
return _compaction_done.get_future();
|
||||
}
|
||||
|
||||
future<sstables::sstable_set> sstable_set_for_tombstone_gc(::compaction::table_state& t);
|
||||
public:
|
||||
bool stopping() const noexcept {
|
||||
return _compaction_data.abort.abort_requested();
|
||||
@@ -614,7 +623,7 @@ public:
|
||||
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args);
|
||||
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);
|
||||
friend fmt::formatter<compaction_task_executor>;
|
||||
friend future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, sstring reason);
|
||||
friend future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, sstring reason) noexcept;
|
||||
friend sstables::test_env_compaction_manager;
|
||||
};
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ public:
|
||||
virtual bool compaction_enforce_min_threshold() const noexcept = 0;
|
||||
virtual const sstables::sstable_set& main_sstable_set() const = 0;
|
||||
virtual const sstables::sstable_set& maintenance_sstable_set() const = 0;
|
||||
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const = 0;
|
||||
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const = 0;
|
||||
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept = 0;
|
||||
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0;
|
||||
|
||||
@@ -296,7 +296,8 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
// When trimming, let's keep sstables with overlapping time window, so as to reduce write amplification.
|
||||
// For example, if there are N sstables spanning window W, where N <= 32, then we can produce all data for W
|
||||
// in a single compaction round, removing the need to later compact W to reduce its number of files.
|
||||
boost::partial_sort(multi_window, multi_window.begin() + max_sstables, [](const shared_sstable &a, const shared_sstable &b) {
|
||||
auto sort_size = std::min(max_sstables, multi_window.size());
|
||||
boost::partial_sort(multi_window, multi_window.begin() + sort_size, [](const shared_sstable &a, const shared_sstable &b) {
|
||||
return a->get_stats_metadata().max_timestamp < b->get_stats_metadata().max_timestamp;
|
||||
});
|
||||
maybe_trim_job(multi_window, job_size, disjoint);
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <boost/range/algorithm.hpp>
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <stdexcept>
|
||||
#include "alter_keyspace_statement.hh"
|
||||
#include "prepared_statement.hh"
|
||||
@@ -43,18 +44,16 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo
|
||||
return state.has_keyspace_access(_name, auth::permission::ALTER);
|
||||
}
|
||||
|
||||
static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) {
|
||||
auto to_number = [] (const std::string_view rf) {
|
||||
int result;
|
||||
// We assume the passed string view represents a valid decimal number,
|
||||
// so we don't need the error code.
|
||||
(void) std::from_chars(rf.begin(), rf.end(), result);
|
||||
return result;
|
||||
};
|
||||
|
||||
// We want to ensure that each DC's RF is going to change by at most 1
|
||||
// because in that case the old and new quorums must overlap.
|
||||
return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1;
|
||||
static unsigned get_abs_rf_diff(const std::string& curr_rf, const std::string& new_rf) {
|
||||
try {
|
||||
return std::abs(std::stoi(curr_rf) - std::stoi(new_rf));
|
||||
} catch (std::invalid_argument const& ex) {
|
||||
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments, "
|
||||
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
|
||||
} catch (std::out_of_range const& ex) {
|
||||
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments to fit into `int` type, "
|
||||
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
|
||||
}
|
||||
}
|
||||
|
||||
void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const {
|
||||
@@ -84,11 +83,24 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
|
||||
auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());
|
||||
|
||||
if (ks.get_replication_strategy().uses_tablets()) {
|
||||
const std::map<sstring, sstring>& current_rfs = ks.metadata()->strategy_options();
|
||||
for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) {
|
||||
auto it = current_rfs.find(new_dc);
|
||||
if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) {
|
||||
throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time.");
|
||||
const std::map<sstring, sstring>& current_rf_per_dc = ks.metadata()->strategy_options();
|
||||
auto new_rf_per_dc = _attrs->get_replication_options();
|
||||
new_rf_per_dc.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
unsigned total_abs_rfs_diff = 0;
|
||||
for (const auto& [new_dc, new_rf] : new_rf_per_dc) {
|
||||
sstring old_rf = "0";
|
||||
if (auto new_dc_in_current_mapping = current_rf_per_dc.find(new_dc);
|
||||
new_dc_in_current_mapping != current_rf_per_dc.end()) {
|
||||
old_rf = new_dc_in_current_mapping->second;
|
||||
} else if (!qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters().contains(new_dc)) {
|
||||
// This means that the DC listed in ALTER doesn't exist. This error will be reported later,
|
||||
// during validation in abstract_replication_strategy::validate_replication_strategy.
|
||||
// We can't report this error now, because it'd change the order of errors reported:
|
||||
// first we need to report non-existing DCs, then if RFs aren't changed by too much.
|
||||
continue;
|
||||
}
|
||||
if (total_abs_rfs_diff += get_abs_rf_diff(old_rf, new_rf); total_abs_rfs_diff >= 2) {
|
||||
throw exceptions::invalid_request_exception("Only one DC's RF can be changed at a time and not by more than 1");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -118,6 +130,63 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
|
||||
return ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty();
|
||||
}
|
||||
|
||||
namespace {
|
||||
// These functions are used to flatten all the options in the keyspace definition into a single-level map<string, string>.
|
||||
// (Currently options are stored in a nested structure that looks more like a map<string, map<string, string>>).
|
||||
// Flattening is simply joining the keys of maps from both levels with a colon ':' character,
|
||||
// or in other words: prefixing the keys in the output map with the option type, e.g. 'replication', 'storage', etc.,
|
||||
// so that the output map contains entries like: "replication:dc1" -> "3".
|
||||
// This is done to avoid key conflicts and to be able to de-flatten the map back into the original structure.
|
||||
|
||||
void add_prefixed_key(const sstring& prefix, const std::map<sstring, sstring>& in, std::map<sstring, sstring>& out) {
|
||||
for (const auto& [in_key, in_value]: in) {
|
||||
out[prefix + ":" + in_key] = in_value;
|
||||
}
|
||||
};
|
||||
|
||||
std::map<sstring, sstring> get_current_options_flattened(const shared_ptr<cql3::statements::ks_prop_defs>& ks,
|
||||
bool include_tablet_options,
|
||||
const gms::feature_service& feat) {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
add_prefixed_key(ks->KW_REPLICATION, ks->get_replication_options(), all_options);
|
||||
add_prefixed_key(ks->KW_STORAGE, ks->get_storage_options().to_map(), all_options);
|
||||
// if no tablet options are specified in ATLER KS statement,
|
||||
// we want to preserve the old ones and hence cannot overwrite them with defaults
|
||||
if (include_tablet_options) {
|
||||
auto initial_tablets = ks->get_initial_tablets(std::nullopt);
|
||||
add_prefixed_key(ks->KW_TABLETS,
|
||||
{{"enabled", initial_tablets ? "true" : "false"},
|
||||
{"initial", std::to_string(initial_tablets.value_or(0))}},
|
||||
all_options);
|
||||
}
|
||||
add_prefixed_key(ks->KW_DURABLE_WRITES,
|
||||
{{sstring(ks->KW_DURABLE_WRITES), to_sstring(ks->get_boolean(ks->KW_DURABLE_WRITES, true))}},
|
||||
all_options);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> get_old_options_flattened(const data_dictionary::keyspace& ks, bool include_tablet_options) {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
using namespace cql3::statements;
|
||||
add_prefixed_key(ks_prop_defs::KW_REPLICATION, ks.get_replication_strategy().get_config_options(), all_options);
|
||||
add_prefixed_key(ks_prop_defs::KW_STORAGE, ks.metadata()->get_storage_options().to_map(), all_options);
|
||||
if (include_tablet_options) {
|
||||
add_prefixed_key(ks_prop_defs::KW_TABLETS,
|
||||
{{"enabled", ks.metadata()->initial_tablets() ? "true" : "false"},
|
||||
{"initial", std::to_string(ks.metadata()->initial_tablets().value_or(0))}},
|
||||
all_options);
|
||||
}
|
||||
add_prefixed_key(ks_prop_defs::KW_DURABLE_WRITES,
|
||||
{{sstring(ks_prop_defs::KW_DURABLE_WRITES), to_sstring(ks.metadata()->durable_writes())}},
|
||||
all_options);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
} // <anonymous> namespace
|
||||
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
|
||||
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
|
||||
using namespace cql_transport;
|
||||
@@ -130,11 +199,18 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat);
|
||||
std::vector<mutation> muts;
|
||||
std::vector<sstring> warnings;
|
||||
auto ks_options = _attrs->get_all_options_flattened(feat);
|
||||
bool include_tablet_options = _attrs->get_map(_attrs->KW_TABLETS).has_value();
|
||||
auto old_ks_options = get_old_options_flattened(ks, include_tablet_options);
|
||||
auto ks_options = get_current_options_flattened(_attrs, include_tablet_options, feat);
|
||||
ks_options.merge(old_ks_options);
|
||||
|
||||
auto ts = mc.write_timestamp();
|
||||
auto global_request_id = mc.new_group0_state_id();
|
||||
|
||||
// we only want to run the tablets path if there are actually any tablets changes, not only schema changes
|
||||
// TODO: the current `if (changes_tablets(qp))` is insufficient: someone may set the same RFs as before,
|
||||
// and we'll unnecessarily trigger the processing path for ALTER tablets KS,
|
||||
// when in reality nothing or only schema is being changed
|
||||
if (changes_tablets(qp)) {
|
||||
if (!qp.topology_global_queue_empty()) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
|
||||
|
||||
@@ -139,28 +139,22 @@ data_dictionary::storage_options ks_prop_defs::get_storage_options() const {
|
||||
return opts;
|
||||
}
|
||||
|
||||
ks_prop_defs::init_tablets_options ks_prop_defs::get_initial_tablets(const sstring& strategy_class, bool enabled_by_default) const {
|
||||
// FIXME -- this should be ignored somehow else
|
||||
init_tablets_options ret{ .enabled = false, .specified_count = std::nullopt };
|
||||
if (locator::abstract_replication_strategy::to_qualified_class_name(strategy_class) != "org.apache.cassandra.locator.NetworkTopologyStrategy") {
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::optional<unsigned> ks_prop_defs::get_initial_tablets(std::optional<unsigned> default_value) const {
|
||||
auto tablets_options = get_map(KW_TABLETS);
|
||||
if (!tablets_options) {
|
||||
return enabled_by_default ? init_tablets_options{ .enabled = true } : ret;
|
||||
return default_value;
|
||||
}
|
||||
|
||||
unsigned initial_count = 0;
|
||||
auto it = tablets_options->find("enabled");
|
||||
if (it != tablets_options->end()) {
|
||||
auto enabled = it->second;
|
||||
tablets_options->erase(it);
|
||||
|
||||
if (enabled == "true") {
|
||||
ret = init_tablets_options{ .enabled = true, .specified_count = 0 }; // even if 'initial' is not set, it'll start with auto-detection
|
||||
// nothing
|
||||
} else if (enabled == "false") {
|
||||
SCYLLA_ASSERT(!ret.enabled);
|
||||
return ret;
|
||||
return std::nullopt;
|
||||
} else {
|
||||
throw exceptions::configuration_exception(sstring("Tablets enabled value must be true or false; found: ") + enabled);
|
||||
}
|
||||
@@ -169,7 +163,7 @@ ks_prop_defs::init_tablets_options ks_prop_defs::get_initial_tablets(const sstri
|
||||
it = tablets_options->find("initial");
|
||||
if (it != tablets_options->end()) {
|
||||
try {
|
||||
ret = init_tablets_options{ .enabled = true, .specified_count = std::stol(it->second)};
|
||||
initial_count = std::stol(it->second);
|
||||
} catch (...) {
|
||||
throw exceptions::configuration_exception(sstring("Initial tablets value should be numeric; found ") + it->second);
|
||||
}
|
||||
@@ -180,7 +174,7 @@ ks_prop_defs::init_tablets_options ks_prop_defs::get_initial_tablets(const sstri
|
||||
throw exceptions::configuration_exception(sstring("Unrecognized tablets option ") + tablets_options->begin()->first);
|
||||
}
|
||||
|
||||
return ret;
|
||||
return initial_count;
|
||||
}
|
||||
|
||||
std::optional<sstring> ks_prop_defs::get_replication_strategy_class() const {
|
||||
@@ -191,32 +185,13 @@ bool ks_prop_defs::get_durable_writes() const {
|
||||
return get_boolean(KW_DURABLE_WRITES, true);
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
auto ingest_flattened_options = [&all_options](const std::map<sstring, sstring>& options, const sstring& prefix) {
|
||||
for (auto& option: options) {
|
||||
all_options[prefix + ":" + option.first] = option.second;
|
||||
}
|
||||
};
|
||||
ingest_flattened_options(get_replication_options(), KW_REPLICATION);
|
||||
ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE);
|
||||
ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map<sstring, sstring>{}), KW_TABLETS);
|
||||
ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) {
|
||||
auto sc = get_replication_strategy_class().value();
|
||||
auto initial_tablets = get_initial_tablets(sc, feat.tablets);
|
||||
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0
|
||||
if (initial_tablets.enabled && !initial_tablets.specified_count) {
|
||||
initial_tablets.specified_count = 0;
|
||||
}
|
||||
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0 for N.T.S. only
|
||||
auto initial_tablets = get_initial_tablets(feat.tablets && locator::abstract_replication_strategy::to_qualified_class_name(sc) == "org.apache.cassandra.locator.NetworkTopologyStrategy" ? std::optional<unsigned>(0) : std::nullopt);
|
||||
auto options = prepare_options(sc, tm, get_replication_options());
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
|
||||
std::move(options), initial_tablets.specified_count, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
std::move(options), initial_tablets, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata& tm, const gms::feature_service& feat) {
|
||||
@@ -229,13 +204,9 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
|
||||
sc = old->strategy_name();
|
||||
options = old_options;
|
||||
}
|
||||
auto initial_tablets = get_initial_tablets(*sc, old->initial_tablets().has_value());
|
||||
// if tablets options have not been specified, inherit them if it's tablets-enabled KS
|
||||
if (initial_tablets.enabled && !initial_tablets.specified_count) {
|
||||
initial_tablets.specified_count = old->initial_tablets();
|
||||
}
|
||||
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(old->name(), *sc, options, initial_tablets.specified_count, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
auto initial_tablets = get_initial_tablets(old->initial_tablets());
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(old->name(), *sc, options, initial_tablets, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -49,21 +49,15 @@ public:
|
||||
private:
|
||||
std::optional<sstring> _strategy_class;
|
||||
public:
|
||||
struct init_tablets_options {
|
||||
bool enabled;
|
||||
std::optional<unsigned> specified_count;
|
||||
};
|
||||
|
||||
ks_prop_defs() = default;
|
||||
explicit ks_prop_defs(std::map<sstring, sstring> options);
|
||||
|
||||
void validate();
|
||||
std::map<sstring, sstring> get_replication_options() const;
|
||||
std::optional<sstring> get_replication_strategy_class() const;
|
||||
init_tablets_options get_initial_tablets(const sstring& strategy_class, bool enabled_by_default) const;
|
||||
std::optional<unsigned> get_initial_tablets(std::optional<unsigned> default_value) const;
|
||||
data_dictionary::storage_options get_storage_options() const;
|
||||
bool get_durable_writes() const;
|
||||
std::map<sstring, sstring> get_all_options_flattened(const gms::feature_service& feat) const;
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
|
||||
};
|
||||
|
||||
@@ -46,14 +46,14 @@ public:
|
||||
protected:
|
||||
std::optional<sstring> get_simple(const sstring& name) const;
|
||||
|
||||
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
|
||||
|
||||
void remove_from_map_if_exists(const sstring& name, const sstring& key) const;
|
||||
public:
|
||||
bool has_property(const sstring& name) const;
|
||||
|
||||
std::optional<value_type> get(const sstring& name) const;
|
||||
|
||||
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
|
||||
|
||||
sstring get_string(sstring key, sstring default_value) const;
|
||||
|
||||
// Return a property value, typed as a Boolean
|
||||
|
||||
@@ -1132,7 +1132,12 @@ public:
|
||||
write(out, uint64_t(0));
|
||||
}
|
||||
|
||||
buf.remove_suffix(buf.size_bytes() - size);
|
||||
auto to_remove = buf.size_bytes() - size;
|
||||
// #20862 - we decrement usage counter based on buf.size() below.
|
||||
// Since we are shrinking buffer here, we need to also decrement
|
||||
// counter already
|
||||
buf.remove_suffix(to_remove);
|
||||
_segment_manager->totals.buffer_list_bytes -= to_remove;
|
||||
|
||||
// Build sector checksums.
|
||||
auto id = net::hton(_desc.id);
|
||||
@@ -3826,6 +3831,10 @@ uint64_t db::commitlog::get_total_size() const {
|
||||
;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_buffer_size() const {
|
||||
return _segment_manager->totals.buffer_list_bytes;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_completed_tasks() const {
|
||||
return _segment_manager->totals.allocation_count;
|
||||
}
|
||||
|
||||
@@ -306,6 +306,7 @@ public:
|
||||
future<> delete_segments(std::vector<sstring>) const;
|
||||
|
||||
uint64_t get_total_size() const;
|
||||
uint64_t get_buffer_size() const;
|
||||
uint64_t get_completed_tasks() const;
|
||||
uint64_t get_flush_count() const;
|
||||
uint64_t get_pending_tasks() const;
|
||||
|
||||
11
db/config.cc
11
db/config.cc
@@ -1526,18 +1526,19 @@ future<> update_relabel_config_from_file(const std::string& name) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
std::vector<sstring> split_comma_separated_list(sstring comma_separated_list) {
|
||||
std::vector<sstring> split_comma_separated_list(const std::string_view comma_separated_list) {
|
||||
std::vector<sstring> strs, trimmed_strs;
|
||||
boost::split(strs, std::move(comma_separated_list), boost::is_any_of(","));
|
||||
for (sstring n : strs) {
|
||||
boost::split(strs, comma_separated_list, boost::is_any_of(","));
|
||||
trimmed_strs.reserve(strs.size());
|
||||
for (sstring& n : strs) {
|
||||
std::replace(n.begin(), n.end(), '\"', ' ');
|
||||
std::replace(n.begin(), n.end(), '\'', ' ');
|
||||
boost::trim_all(n);
|
||||
if (!n.empty()) {
|
||||
trimmed_strs.push_back(n);
|
||||
trimmed_strs.push_back(std::move(n));
|
||||
}
|
||||
}
|
||||
return trimmed_strs;
|
||||
}
|
||||
|
||||
}
|
||||
} // namespace utils
|
||||
|
||||
@@ -545,6 +545,6 @@ future<gms::inet_address> resolve(const config_file::named_value<sstring>&, gms:
|
||||
*/
|
||||
future<> update_relabel_config_from_file(const std::string& name);
|
||||
|
||||
std::vector<sstring> split_comma_separated_list(sstring comma_separated_list);
|
||||
std::vector<sstring> split_comma_separated_list(std::string_view comma_separated_list);
|
||||
|
||||
}
|
||||
} // namespace utils
|
||||
|
||||
@@ -36,7 +36,7 @@ size_t quorum_for(const locator::effective_replication_map& erm) {
|
||||
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
|
||||
using namespace locator;
|
||||
|
||||
auto& rs = erm.get_replication_strategy();
|
||||
const auto& rs = erm.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
const network_topology_strategy* nrs =
|
||||
@@ -65,7 +65,7 @@ size_t block_for_local_serial(const locator::effective_replication_map& erm) {
|
||||
size_t block_for_each_quorum(const locator::effective_replication_map& erm) {
|
||||
using namespace locator;
|
||||
|
||||
auto& rs = erm.get_replication_strategy();
|
||||
const auto& rs = erm.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
const network_topology_strategy* nrs =
|
||||
@@ -260,7 +260,7 @@ filter_for_query(consistency_level cl,
|
||||
size_t bf = block_for(erm, cl);
|
||||
|
||||
if (read_repair == read_repair_decision::DC_LOCAL) {
|
||||
bf = std::max(block_for(erm, cl), local_count);
|
||||
bf = std::max(bf, local_count);
|
||||
}
|
||||
|
||||
if (bf >= live_endpoints.size()) { // RRD.DC_LOCAL + CL.LOCAL or CL.ALL
|
||||
|
||||
@@ -35,8 +35,6 @@
|
||||
#include <span>
|
||||
#include <unordered_map>
|
||||
|
||||
class fragmented_temporary_buffer;
|
||||
|
||||
namespace utils {
|
||||
class directories;
|
||||
} // namespace utils
|
||||
|
||||
@@ -3154,16 +3154,16 @@ future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw
|
||||
return _vug.register_staging_sstable(std::move(sst), std::move(table));
|
||||
}
|
||||
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t, streaming::stream_reason reason) {
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, streaming::stream_reason reason) {
|
||||
if (is_internal_keyspace(t.schema()->ks_name())) {
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
return do_with(t.views(), [&vb, &tm] (auto& views) {
|
||||
return do_with(std::move(tmptr), t.views(), [&vb] (locator::token_metadata_ptr& tmptr, auto& views) {
|
||||
return map_reduce(views,
|
||||
[&vb, &tm] (const view_ptr& view) { return vb.check_view_build_ongoing(tm, view->ks_name(), view->cf_name()); },
|
||||
[&] (const view_ptr& view) { return vb.check_view_build_ongoing(*tmptr, view->ks_name(), view->cf_name()); },
|
||||
false,
|
||||
std::logical_or<bool>());
|
||||
});
|
||||
|
||||
@@ -10,20 +10,17 @@
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace replica {
|
||||
class table;
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
class token_metadata;
|
||||
}
|
||||
|
||||
namespace db::view {
|
||||
class view_builder;
|
||||
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, const locator::token_metadata& tm, const replica::table& t,
|
||||
future<bool> check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t,
|
||||
streaming::stream_reason reason);
|
||||
|
||||
}
|
||||
|
||||
19
dist/common/scripts/scylla_coredump_setup
vendored
19
dist/common/scripts/scylla_coredump_setup
vendored
@@ -40,6 +40,25 @@ if __name__ == '__main__':
|
||||
help='enable compress on systemd-coredump')
|
||||
args = parser.parse_args()
|
||||
|
||||
# Seems like specific version of systemd pacakge on RHEL9 has a bug on
|
||||
# SELinux configuration, it introduced "systemd-container-coredump" module
|
||||
# to provide rule for systemd-coredump but not enabled by default.
|
||||
# We have to manually load it, otherwise it causes permission errror.
|
||||
# (#19325)
|
||||
if is_redhat_variant() and distro.major_version() == '9':
|
||||
if not shutil.which('getenforce'):
|
||||
pkg_install('libselinux-utils')
|
||||
if not shutil.which('semodule'):
|
||||
pkg_install('policycoreutils')
|
||||
enforce = out('getenforce')
|
||||
if enforce != "Disabled":
|
||||
if os.path.exists('/usr/share/selinux/packages/targeted/systemd-container-coredump.pp.bz2'):
|
||||
modules = out('semodule -l')
|
||||
match = re.match(r'^systemd-container-coredump$', modules, re.MULTILINE)
|
||||
if not match:
|
||||
run('semodule -v -i /usr/share/selinux/packages/targeted/systemd-container-coredump.pp.bz2', shell=True, check=True)
|
||||
run('semodule -v -e systemd-container-coredump', shell=True, check=True)
|
||||
|
||||
# abrt-ccpp.service needs to stop before enabling systemd-coredump,
|
||||
# since both will try to install kernel coredump handler
|
||||
# (This will only requires for abrt < 2.14)
|
||||
|
||||
16
dist/common/scripts/scylla_raid_setup
vendored
16
dist/common/scripts/scylla_raid_setup
vendored
@@ -333,3 +333,19 @@ WantedBy=local-fs.target
|
||||
LOGGER.error(f'Error detected, dumping udev env parameters on {fsdev}')
|
||||
udev_info.verify()
|
||||
udev_info.dump_variables()
|
||||
|
||||
if is_redhat_variant():
|
||||
if not shutil.which('getenforce'):
|
||||
pkg_install('libselinux-utils')
|
||||
if not shutil.which('restorecon'):
|
||||
pkg_install('policycoreutils')
|
||||
if not shutil.which('semanage'):
|
||||
pkg_install('policycoreutils-python-utils')
|
||||
selinux_status = out('getenforce')
|
||||
selinux_context = out('matchpathcon -n /var/lib/systemd/coredump')
|
||||
selinux_type = selinux_context.split(':')[2]
|
||||
run(f'semanage fcontext -a -t {selinux_type} "{root}/coredump(/.*)?"', shell=True, check=True)
|
||||
if selinux_status != 'Disabled':
|
||||
run(f'restorecon -F -v -R {root}', shell=True, check=True)
|
||||
else:
|
||||
Path('/.autorelabel').touch(exist_ok=True)
|
||||
|
||||
2
dist/common/sysconfig/scylla-node-exporter
vendored
2
dist/common/sysconfig/scylla-node-exporter
vendored
@@ -1 +1 @@
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts"
|
||||
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon"
|
||||
|
||||
4
docs/_templates/db_config.tmpl
vendored
4
docs/_templates/db_config.tmpl
vendored
@@ -2,7 +2,7 @@
|
||||
|
||||
{% for group in data %}
|
||||
{% if group.value_status_count[value_status] > 0 %}
|
||||
.. _confgroup_{{ group.name }}:
|
||||
.. _confgroup_{{ group.name|lower|replace(" ", "_") }}:
|
||||
|
||||
{{ group.name }}
|
||||
{{ '-' * (group.name|length) }}
|
||||
@@ -13,7 +13,7 @@
|
||||
|
||||
{% for item in group.properties %}
|
||||
{% if item.value_status == value_status %}
|
||||
.. _confprop_{{ item.name }}:
|
||||
.. _confprop_{{ item.name|lower|replace(" ", "_") }}:
|
||||
|
||||
.. confval:: {{ item.name }}
|
||||
{% endif %}
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
### a dictionary of redirections
|
||||
#old path: new path
|
||||
|
||||
|
||||
|
||||
# Move up the Features section
|
||||
# THESE REDIRECTIOSN SHOULD BE UNCOMMENTED WHEN 6.2 IS RELEASED
|
||||
# Before 6.2 documentation is available, these redirections result in 404
|
||||
|
||||
#/stable/troubleshooting/nodetool-memory-read-timeout.html: /stable/troubleshooting/index.html
|
||||
|
||||
# Move up the Features section
|
||||
|
||||
#/stable/using-scylla/features.html: /stable/features/index.html
|
||||
#/stable/using-scylla/lwt.html: /stable/features/lwt.html
|
||||
#/stable/using-scylla/secondary-indexes.html: /stable/features/secondary-indexes.html
|
||||
|
||||
@@ -50,6 +50,13 @@ Which yields, for `/proc/sys/fs/aio-max-nr`:
|
||||
$ docker run --name some-scylla --hostname some-scylla -d scylladb/scylla
|
||||
```
|
||||
|
||||
If you're on macOS and plan to start a multi-node cluster (3 nodes or more), start ScyllaDB with
|
||||
`–reactor-backend=epoll` to override the default `linux-aio` reactor backend:
|
||||
|
||||
```console
|
||||
$ docker run --name some-scylla --hostname some-scylla -d scylladb/scylla --reactor-backend=epoll
|
||||
```
|
||||
|
||||
### Run `nodetool` utility
|
||||
|
||||
```console
|
||||
@@ -77,6 +84,11 @@ cqlsh>
|
||||
```console
|
||||
$ docker run --name some-scylla2 --hostname some-scylla2 -d scylladb/scylla --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
```
|
||||
If you're on macOS, ensure to add the `–reactor-backend=epoll` option when adding new nodes:
|
||||
|
||||
```console
|
||||
$ docker run --name some-scylla2 --hostname some-scylla2 -d scylladb/scylla --reactor-backend=epoll --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
```
|
||||
|
||||
#### Make a cluster with Docker Compose
|
||||
|
||||
@@ -344,90 +356,6 @@ The `--authenticator` command lines option allows to provide the authenticator c
|
||||
|
||||
The `--authorizer` command lines option allows to provide the authorizer class ScyllaDB will use. By default ScyllaDB uses the `AllowAllAuthorizer` which allows any action to any user. The second option is using the `CassandraAuthorizer` parameter, which stores permissions in `system.permissions` table.
|
||||
|
||||
**Since: 2.3**
|
||||
|
||||
### JMX parameters
|
||||
|
||||
JMX ScyllaDB service is initialized from the `/scylla-jmx-service.sh` on
|
||||
container startup. By default the script uses `/etc/sysconfig/scylla-jmx`
|
||||
to read the default configuration. It then can be overridden by setting
|
||||
environmental parameters.
|
||||
|
||||
An example:
|
||||
|
||||
docker run -d -e "SCYLLA_JMX_ADDR=-ja 0.0.0.0" -e SCYLLA_JMX_REMOTE=-r --publish 7199:7199 scylladb/scylla
|
||||
|
||||
#### SCYLLA_JMX_PORT
|
||||
|
||||
Scylla JMX listening port.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_JMX_PORT="-jp 7199"
|
||||
|
||||
#### SCYLLA_API_PORT
|
||||
|
||||
Scylla API port for JMX to connect to.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_API_PORT="-p 10000"
|
||||
|
||||
#### SCYLLA_API_ADDR
|
||||
|
||||
Scylla API address for JMX to connect to.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_API_ADDR="-a localhost"
|
||||
|
||||
#### SCYLLA_JMX_ADDR
|
||||
|
||||
JMX address to bind on.
|
||||
|
||||
Default value:
|
||||
|
||||
SCYLLA_JMX_ADDR="-ja localhost"
|
||||
|
||||
For example, it is possible to make JMX available to the outer world
|
||||
by changing its bind address to `0.0.0.0`:
|
||||
|
||||
docker run -d -e "SCYLLA_JMX_ADDR=-ja 0.0.0.0" -e SCYLLA_JMX_REMOTE=-r --publish 7199:7199 scylladb/scylla
|
||||
|
||||
`cassandra-stress` requires direct access to the JMX.
|
||||
|
||||
#### SCYLLA_JMX_FILE
|
||||
|
||||
A JMX service configuration file path.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_FILE="-cf /etc/scylla.d/scylla-user.cfg"
|
||||
|
||||
#### SCYLLA_JMX_LOCAL
|
||||
|
||||
The location of the JMX executable.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_LOCAL="-l /opt/scylladb/jmx
|
||||
|
||||
#### SCYLLA_JMX_REMOTE
|
||||
|
||||
Allow JMX to run remotely.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_REMOTE="-r"
|
||||
|
||||
#### SCYLLA_JMX_DEBUG
|
||||
|
||||
Enable debugger.
|
||||
|
||||
Example value:
|
||||
|
||||
SCYLLA_JMX_DEBUG="-d"
|
||||
|
||||
### Related Links
|
||||
|
||||
* [Best practices for running ScyllaDB on docker](http://docs.scylladb.com/procedures/best_practices_scylla_on_docker/)
|
||||
|
||||
@@ -194,7 +194,7 @@ Alternatively, you can explicitly install **all** the ScyllaDB packages for the
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get install scylla-enterprise{,-server,-jmx,-tools,-tools-core,-kernel-conf,-node-exporter,-conf,-python3}=2021.1.0-0.20210511.9e8e7d58b-1
|
||||
sudo apt-get install scylla-enterprise{,-server,-tools,-tools-core,-kernel-conf,-node-exporter,-conf,-python3}=2021.1.0-0.20210511.9e8e7d58b-1
|
||||
sudo apt-get install scylla-enterprise-machine-image=2021.1.0-0.20210511.9e8e7d58b-1 # only execute on AMI instance
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
Features
|
||||
========================
|
||||
|
||||
This document highlights ScyllaDB's key data modeling features.
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
:hidden:
|
||||
|
||||
Lightweight Transactions </features/lwt/>
|
||||
Global Secondary Indexes </features/secondary-indexes/>
|
||||
@@ -12,6 +15,23 @@ Features
|
||||
Change Data Capture </features/cdc/index>
|
||||
Workload Attributes </features/workload-attributes>
|
||||
|
||||
`ScyllaDB Enterprise <https://enterprise.docs.scylladb.com/stable/overview.html#enterprise-only-features>`_
|
||||
provides additional features, including Encryption at Rest,
|
||||
workload prioritization, auditing, and more.
|
||||
.. panel-box::
|
||||
:title: ScyllaDB Features
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* Secondary Indexes and Materialized Views provide efficient search mechanisms
|
||||
on non-partition keys by creating an index.
|
||||
|
||||
* :doc:`Global Secondary Indexes </features/secondary-indexes/>`
|
||||
* :doc:`Local Secondary Indexes </features/local-secondary-indexes/>`
|
||||
* :doc:`Materialized Views </features/materialized-views/>`
|
||||
|
||||
* :doc:`Lightweight Transactions </features/lwt/>` provide conditional updates
|
||||
through linearizability.
|
||||
* :doc:`Counters </features/counters/>` are columns that only allow their values
|
||||
to be incremented, decremented, read, or deleted.
|
||||
* :doc:`Change Data Capture </features/cdc/index>` allows you to query the current
|
||||
state and the history of all changes made to tables in the database.
|
||||
* :doc:`Workload Attributes </features/workload-attributes>` assigned to your workloads
|
||||
specify how ScyllaDB will handle requests depending on the workload.
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-prerequisites>`_ on other x86_64 or aarch64 platforms, without any guarantees.
|
||||
|
||||
+----------------------------+--------------------+-------+---------------+
|
||||
| Linux Distributions |Ubuntu | Debian| Rocky / |
|
||||
| | | | RHEL |
|
||||
| Linux Distributions |Ubuntu | Debian|Rocky / CentOS |
|
||||
| | | |/ RHEL |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 11 | 8 | 9 |
|
||||
+============================+======+======+======+=======+=======+=======+
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.2 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
|
||||
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
@@ -18,4 +18,4 @@ Supported Architecture
|
||||
-----------------------------
|
||||
|
||||
ScyllaDB Open Source supports x86_64 for all versions and AArch64 starting from ScyllaDB 4.6 and nightly build.
|
||||
In particular, aarch64 support includes AWS EC2 Graviton.
|
||||
In particular, aarch64 support includes AWS EC2 Graviton.
|
||||
|
||||
@@ -78,7 +78,7 @@ Launching Instances from ScyllaDB AMI
|
||||
* The ``scylla.yaml`` file: ``/etc/scylla/scylla.yaml``
|
||||
* Data: ``/var/lib/scylla/``
|
||||
|
||||
To check that the ScyllaDB server and the JMX component are running, run:
|
||||
To check that the ScyllaDB server is running, run:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ Launching ScyllaDB on Azure
|
||||
|
||||
ssh -i ~/.ssh/ssh-key.pem scyllaadm@public-ip
|
||||
|
||||
To check that the ScyllaDB server and the JMX component are running, run:
|
||||
To check that the ScyllaDB server is running, run:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ Launching ScyllaDB on GCP
|
||||
|
||||
gcloud compute ssh scylla-node1
|
||||
|
||||
To check that the ScyllaDB server and the JMX component are running, run:
|
||||
To check that the ScyllaDB server is running, run:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
|
||||
@@ -1,8 +1,3 @@
|
||||
.. |SCYLLADB_VERSION| replace:: 5.2
|
||||
|
||||
.. update the version folder URL below (variables won't work):
|
||||
https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/
|
||||
|
||||
====================================================
|
||||
Install ScyllaDB Without root Privileges
|
||||
====================================================
|
||||
@@ -24,14 +19,17 @@ Note that if you're on CentOS 7, only root offline installation is supported.
|
||||
Download and Install
|
||||
-----------------------
|
||||
|
||||
#. Download the latest tar.gz file for ScyllaDB |SCYLLADB_VERSION| (x86 or ARM) from https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/.
|
||||
#. Download the latest tar.gz file for ScyllaDB version (x86 or ARM) from ``https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-<version>/``.
|
||||
|
||||
Example for version 6.1: https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-6.1/
|
||||
|
||||
#. Uncompress the downloaded package.
|
||||
|
||||
The following example shows the package for ScyllaDB 5.2.4 (x86):
|
||||
The following example shows the package for ScyllaDB 6.1.1 (x86):
|
||||
|
||||
.. code:: console
|
||||
|
||||
tar xvfz scylla-unified-5.2.4-0.20230623.cebbf6c5df2b.x86_64.tar.gz
|
||||
tar xvfz scylla-unified-6.1.1-0.20240814.8d90b817660a.x86_64.tar.gz
|
||||
|
||||
#. Install OpenJDK 8 or 11.
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ This will send ScyllaDB only logs to :code:`/var/log/scylla/scylla.log`
|
||||
|
||||
Logging on Docker
|
||||
-----------------
|
||||
Starting from ScyllaDB 1.3, `ScyllaDB Docker <https://hub.docker.com/r/scylladb/scylla/>`_, you should use :code:`docker logs` command to access ScyllaDB server and JMX proxy logs
|
||||
Starting from ScyllaDB 1.3, `ScyllaDB Docker <https://hub.docker.com/r/scylladb/scylla/>`_, you should use :code:`docker logs` command to access ScyllaDB server logs.
|
||||
|
||||
|
||||
.. include:: /rst_include/advance-index.rst
|
||||
|
||||
@@ -26,13 +26,6 @@ By default, ScyllaDB runs as user ``scylla`` in group ``scylla``. The following
|
||||
|
||||
4. Edit ``/etc/systemd/system/multi-user.target.wants/node-exporter.service``
|
||||
|
||||
.. code-block:: sh
|
||||
|
||||
User=test
|
||||
Group=test
|
||||
|
||||
5. Edit /usr/lib/systemd/system/scylla-jmx.service
|
||||
|
||||
.. code-block:: sh
|
||||
|
||||
User=test
|
||||
@@ -51,5 +44,4 @@ At this point, all services should be started as test:test user:
|
||||
.. code-block:: sh
|
||||
|
||||
test 8760 1 11 14:42 ? 00:00:01 /usr/bin/scylla --log-to-syslog 1 --log-to-std ...
|
||||
test 8765 1 12 14:42 ? 00:00:01 /opt/scylladb/jmx/symlinks/scylla-jmx -Xmx256m ...
|
||||
test 13638 1 0 14:30 ? 00:00:00 /usr/bin/node_exporter --collector.interrupts
|
||||
|
||||
@@ -11,7 +11,7 @@ For example:
|
||||
ScyllaDB uses available memory to cache your data. ScyllaDB knows how to dynamically manage memory for optimal performance, for example, if many clients connect to ScyllaDB, it will evict some data from the cache to make room for these connections, when the connection count drops again, this memory is returned to the cache.
|
||||
|
||||
To limit the memory usage you can start scylla with ``--memory`` parameter.
|
||||
Alternatively, you can specify the amount of memory ScyllaDB should leave to the OS with ``--reserve-memory`` parameter. Keep in mind that the amount of memory left to the operating system needs to suffice external scylla modules, such as ``scylla-jmx``, which runs on top of JVM.
|
||||
Alternatively, you can specify the amount of memory ScyllaDB should leave to the OS with ``--reserve-memory`` parameter. Keep in mind that the amount of memory left to the operating system needs to suffice external scylla modules.
|
||||
|
||||
On Ubuntu, edit the ``/etc/default/scylla-server``.
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@ Port Description Protocol
|
||||
------ -------------------------------------------- --------
|
||||
7001 SSL inter-node communication (RPC) TCP
|
||||
------ -------------------------------------------- --------
|
||||
7199 JMX management TCP
|
||||
------ -------------------------------------------- --------
|
||||
10000 ScyllaDB REST API TCP
|
||||
------ -------------------------------------------- --------
|
||||
9180 Prometheus API TCP
|
||||
|
||||
@@ -146,9 +146,7 @@ The ScyllaDB ports are detailed in the table below. For ScyllaDB Manager ports,
|
||||
|
||||
.. include:: /operating-scylla/_common/networking-ports.rst
|
||||
|
||||
All ports above need to be open to external clients (CQL), external admin systems (JMX), and other nodes (RPC). REST API port can be kept closed for incoming external connections.
|
||||
|
||||
The JMX service, :code:`scylla-jmx`, runs on port 7199. It is required in order to manage ScyllaDB using :code:`nodetool` and other Apache Cassandra-compatible utilities. The :code:`scylla-jmx` process must be able to connect to port 10000 on localhost. The JMX service listens for incoming JMX connections on all network interfaces on the system.
|
||||
All ports above need to be open to external clients (CQL) and other nodes (RPC). REST API port can be kept closed for incoming external connections.
|
||||
|
||||
Advanced networking
|
||||
-------------------
|
||||
@@ -223,10 +221,6 @@ Monitoring Stack
|
||||
|
||||
|mon_root|
|
||||
|
||||
JMX
|
||||
---
|
||||
ScyllaDB JMX is compatible with Apache Cassandra, exposing the relevant subset of MBeans.
|
||||
|
||||
.. REST
|
||||
|
||||
.. include:: /operating-scylla/rest.rst
|
||||
|
||||
@@ -31,7 +31,7 @@ Parameter Descriptio
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-kc <ktlist>, --kc.list <ktlist> The list of Keyspaces to take snapshot
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-p <port> / --port <port> Remote jmx agent port number
|
||||
-p <port> / --port <port> The port of the REST API of the ScyllaDB node.
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
|
||||
@@ -61,26 +61,14 @@ Nodetool
|
||||
nodetool-commands/viewbuildstatus
|
||||
nodetool-commands/version
|
||||
|
||||
The ``nodetool`` utility provides a simple command-line interface to the following exposed operations and attributes. ScyllaDB’s nodetool is a fork of `the Apache Cassandra nodetool <https://cassandra.apache.org/doc/latest/tools/nodetool/nodetool.html>`_ with the same syntax and a subset of the operations.
|
||||
The ``nodetool`` utility provides a simple command-line interface to the following exposed operations and attributes.
|
||||
|
||||
.. _nodetool-generic-options:
|
||||
|
||||
Nodetool generic options
|
||||
========================
|
||||
All options are supported:
|
||||
|
||||
|
||||
|
||||
* ``-p <port>`` or ``--port <port>`` - Remote JMX agent port number.
|
||||
|
||||
* ``-pp`` or ``--print-port`` - Operate in 4.0 mode with hosts disambiguated by port number.
|
||||
|
||||
* ``-pw <password>`` or ``--password <password>`` - Remote JMX agent password.
|
||||
|
||||
* ``-pwf <passwordFilePath>`` or ``--password-file <passwordFilePath>`` - Path to the JMX password file.
|
||||
|
||||
* ``-u <username>`` or ``--username <username>`` - Remote JMX agent username.
|
||||
|
||||
* ``-p <port>`` or ``--port <port>`` - The port of the REST API of the ScyllaDB node.
|
||||
* ``--`` - Separates command-line options from the list of argument(useful when an argument might be mistaken for a command-line option).
|
||||
|
||||
Supported Nodetool operations
|
||||
@@ -145,4 +133,4 @@ Operations that are not listed below are currently not available.
|
||||
* :doc:`viewbuildstatus </operating-scylla/nodetool-commands/viewbuildstatus/>` - Shows the progress of a materialized view build.
|
||||
* :doc:`version </operating-scylla/nodetool-commands/version>` - Print the DB version.
|
||||
|
||||
.. include:: /rst_include/apache-copyrights.rst
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ With the recent addition of the `ScyllaDB Advisor <http://monitoring.docs.scylla
|
||||
Install ScyllaDB Manager
|
||||
------------------------
|
||||
|
||||
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>` together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
|
||||
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>`_ together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
|
||||
ScyllaDB Manager provides automated backups and repairs of your database.
|
||||
ScyllaDB Manager can manage multiple ScyllaDB clusters and run cluster-wide tasks in a controlled and predictable way.
|
||||
For example, with ScyllaDB Manager you can control the intensity of a repair, increasing it to speed up the process, or lower the intensity to ensure it minimizes impact on ongoing operations.
|
||||
|
||||
@@ -22,6 +22,13 @@ To start a single ScyllaDB node instance in a Docker container, run:
|
||||
|
||||
docker run --name some-scylla -d scylladb/scylla
|
||||
|
||||
If you're on macOS and plan to start a multi-node cluster (3 nodes or more), start ScyllaDB with
|
||||
``–reactor-backend=epoll`` to override the default ``linux-aio`` reactor backend:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
docker run --name some-scylla -d scylladb/scylla --reactor-backend=epoll
|
||||
|
||||
The ``docker run`` command starts a new Docker instance in the background named some-scylla that runs the ScyllaDB server:
|
||||
|
||||
.. code-block:: console
|
||||
@@ -95,6 +102,12 @@ With a single ``some-scylla`` instance running, joining new nodes to form a clu
|
||||
|
||||
docker run --name some-scylla2 -d scylladb/scylla --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
|
||||
If you're on macOS, ensure to add the ``–reactor-backend=epoll`` option when adding new nodes:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
docker run --name some-scylla2 -d scylladb/scylla --reactor-backend=epoll --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' some-scylla)"
|
||||
|
||||
To query when the node is up and running (and view the status of the entire cluster) use the ``nodetool status`` command:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
@@ -6,8 +6,8 @@ ScyllaDB exposes a REST API to retrieve administrative information from a node a
|
||||
administrative operations. For example, it allows you to check or update configuration,
|
||||
retrieve cluster-level information, and more.
|
||||
|
||||
The :doc:`nodetool </operating-scylla/nodetool>` CLI tool interacts with a *scylla-jmx* process using JMX.
|
||||
The process, in turn, uses the REST API to interact with the ScyllaDB process.
|
||||
The :doc:`nodetool </operating-scylla/nodetool>` CLI tool uses the REST API
|
||||
to interact with the ScyllaDB process.
|
||||
|
||||
You can interact with the REST API directly using :code:`curl`, ScyllaDB's CLI for REST API, or the Swagger UI.
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ Procedure
|
||||
|
||||
#. Enable authentication
|
||||
|
||||
Enable authentication and define authorized roles in the cluster as described in the `Enable Authentication </operating-scylla/security/authentication/>`_ document.
|
||||
Enable authentication and define authorized roles in the cluster as described in the :doc:`Enable Authentication </operating-scylla/security/authentication/>` document.
|
||||
|
||||
#. Enable CQL transport TLS using client certificate verification
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ Encryption: Data in Transit Client to Node
|
||||
|
||||
Follow the procedures below to enable a client to node encryption.
|
||||
Once enabled, all communication between the client and the node is transmitted over TLS/SSL.
|
||||
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 certified.
|
||||
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 enabled.
|
||||
|
||||
Workflow
|
||||
^^^^^^^^
|
||||
|
||||
@@ -10,7 +10,6 @@ Cluster and Node
|
||||
Failed Decommission Problem </troubleshooting/failed-decommission/>
|
||||
Cluster Timeouts </troubleshooting/timeouts>
|
||||
Node Joined With No Data </troubleshooting/node-joined-without-any-data>
|
||||
SocketTimeoutException </troubleshooting/nodetool-memory-read-timeout/>
|
||||
NullPointerException </troubleshooting/nodetool-nullpointerexception/>
|
||||
Failed Schema Sync </troubleshooting/failed-schema-sync/>
|
||||
|
||||
@@ -28,7 +27,6 @@ Cluster and Node
|
||||
* :doc:`Failed Decommission Problem </troubleshooting/failed-decommission/>`
|
||||
* :doc:`Cluster Timeouts </troubleshooting/timeouts>`
|
||||
* :doc:`Node Joined With No Data </troubleshooting/node-joined-without-any-data>`
|
||||
* :doc:`Nodetool fails with SocketTimeoutException 'Read timed out' </troubleshooting/nodetool-memory-read-timeout>`
|
||||
* :doc:`Nodetool Throws NullPointerException </troubleshooting/nodetool-nullpointerexception>`
|
||||
* :doc:`Failed Schema Sync </troubleshooting/failed-schema-sync>`
|
||||
|
||||
|
||||
@@ -1,112 +0,0 @@
|
||||
Nodetool fails with SocketTimeoutException 'Read timed out'
|
||||
===========================================================
|
||||
|
||||
This troubleshooting article describes what to do when Nodetool fails with a 'Read timed out' error.
|
||||
|
||||
Problem
|
||||
^^^^^^^
|
||||
|
||||
When running any Nodetool command, users may see the following error:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
Failed to connect to '127.0.0.1:7199' - SocketTimeoutException: 'Read timed out'
|
||||
|
||||
Analysis
|
||||
^^^^^^^^
|
||||
Nodetool is a Java based application which requires memory. ScyllaDB by default consumes 93% of the node’s RAM (for MemTables + Cache) and leaves 7% for other applications, such as nodetool.
|
||||
|
||||
If cases where this is not enough memory (e.g. small instances with ~64GB RAM or lower), Nodetool may not be able to run due to insufficient memory. In this case an out of memory (OOM) error may appear and scylla-jmx will not run.
|
||||
|
||||
|
||||
Example
|
||||
-------
|
||||
|
||||
The error you will see is similar to:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000,
|
||||
671088640, 0) failed; error='Cannot allocate memory' (err no=12)
|
||||
|
||||
|
||||
In order to check if the issue is scylla-jmx, use the following command (systemd-based Linux distribution) to check the status of the service:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo systemctl status scylla-jmx
|
||||
|
||||
If the service is running you will see something similar to:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo service scylla-jmx status
|
||||
● scylla-jmx.service - ScyllaDB JMX
|
||||
Loaded: loaded (/lib/systemd/system/scylla-jmx.service; disabled; vendor preset: enabled)
|
||||
Active: active (running) since Wed 2018-07-18 20:59:08 UTC; 3s ago
|
||||
Main PID: 256050 (scylla-jmx)
|
||||
Tasks: 27
|
||||
Memory: 119.5M
|
||||
CPU: 1.959s
|
||||
CGroup: /system.slice/scylla-jmx.service
|
||||
└─256050 /usr/lib/scylla/jmx/symlinks/scylla-jmx -Xmx384m -XX:+UseSerialGC -Dcom.sun.management.jmxremote.auth
|
||||
|
||||
If it isn't, you will see an error similar to:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo systemctl status scylla-jmx
|
||||
● scylla-jmx.service - ScyllaDB JMX
|
||||
Loaded: loaded (/usr/lib/systemd/system/scylla-jmx.service; disabled; vendor preset: disabled)
|
||||
Active: failed (Result: exit-code) since Thu 2018-05-10 10:34:15 EDT; 3min 47s ago
|
||||
Process: 1417 ExecStart=/usr/lib/scylla/jmx/scylla-jmx $SCYLLA_JMX_PORT $SCYLLA_API_PORT $SCYLLA_API_ADDR $SCYLLA_JMX_ADDR
|
||||
$SCYLLA_JMX_FILE $SCYLLA_JMX_LOCAL $SCYLLA_JMX_REMOTE $SCYLLA_JMX_DEBUG (code=exited, status=127)
|
||||
Main PID: 1417 (code=exited, status=127)
|
||||
|
||||
or
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo service scylla-jmx status
|
||||
● scylla-jmx.service
|
||||
Loaded: not-found (Reason: No such file or directory)
|
||||
Active: failed (Result: exit-code) since Wed 2018-07-18 20:38:58 UTC; 12min ago
|
||||
Main PID: 141256 (code=exited, status=143)
|
||||
|
||||
You will need to restart the service or change the RAM allocation as per the Solution_ below.
|
||||
|
||||
Solution
|
||||
^^^^^^^^
|
||||
|
||||
There are two ways to fix this problem, one is faster but may not permanently fix the issue and the other solution is more robust.
|
||||
|
||||
**The immediate solution**
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
service scylla-jmx restart
|
||||
|
||||
.. note:: This is not a permanent fix as the problem might manifest again at a later time.
|
||||
|
||||
**The more robust solution**
|
||||
|
||||
1. Take the size of your node’s RAM, calculate 7% of that size, increase it by another 40%, and use this new size as your RAM requirement.
|
||||
|
||||
For example: on a GCP n1-highmem-8 instance (52GB RAM)
|
||||
|
||||
* 7% would be ~3.6GB.
|
||||
* Increasing it by ~40% means you need to increase your RAM ~5GB.
|
||||
2. Open one of the following files (as per your OS platform):
|
||||
|
||||
* Ubuntu: ``/etc/default/scylla-server``.
|
||||
* Red Hat/ CentOS: ``/etc/sysconfig/scylla-server``
|
||||
3. In the file you are editing, add to the ``SCYLLA_ARGS`` statement ``--reserve-memory 5G`` (the amount you calculated above). Save and exit.
|
||||
4. Restart ScyllaDB server
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
sudo systemctl restart scylla-server
|
||||
|
||||
|
||||
.. note:: If the initial calculation and reserve memory is not enough and problem persists and/or reappears, repeat the procedure from step 2 and increase the RAM in 1GB increments.
|
||||
|
||||
@@ -21,8 +21,8 @@ The following metrics are new in ScyllaDB |NEW_VERSION|:
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* -
|
||||
-
|
||||
* - scylla_alternator_batch_item_count
|
||||
- The total number of items processed across all batches
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -143,6 +143,7 @@ public:
|
||||
// whereas without it, it will fail the insert - i.e. for things like raft etc _all_ nodes should
|
||||
// have it or none, otherwise we can get partial failures on writes.
|
||||
gms::feature fragmented_commitlog_entries { *this, "FRAGMENTED_COMMITLOG_ENTRIES"sv };
|
||||
gms::feature maintenance_tenant { *this, "MAINTENANCE_TENANT"sv };
|
||||
|
||||
// A feature just for use in tests. It must not be advertised unless
|
||||
// the "features_enable_test_feature" injection is enabled.
|
||||
|
||||
@@ -2354,7 +2354,15 @@ future<> gossiper::do_stop_gossiping() {
|
||||
// Take the semaphore makes sure existing gossip loop is finished
|
||||
auto units = co_await get_units(_callback_running, 1);
|
||||
co_await container().invoke_on_all([] (auto& g) {
|
||||
return std::move(g._failure_detector_loop_done);
|
||||
// #21159
|
||||
// gossiper::shutdown can be called from more than once place - both
|
||||
// storage_service::isolate and normal gossip service stop. The former is
|
||||
// waited for in storage_service::stop, but if we, as was done in cql_test_env,
|
||||
// call shutdown independently, we could still end up here twite, and not hit
|
||||
// the _enabled guard (because we do waiting things before setting it, and setting it
|
||||
// is also waiting). However, making sure we don't leave an invalid future
|
||||
// here should ensure even if we reenter this method in such as way, we don't crash.
|
||||
return std::exchange(g._failure_detector_loop_done, make_ready_future<>());
|
||||
});
|
||||
logger.info("Gossip is now stopped");
|
||||
}
|
||||
|
||||
@@ -574,7 +574,7 @@ PYSCRIPTS=$(find dist/common/scripts/ -maxdepth 1 -type f -exec grep -Pls '\A#!/
|
||||
for i in $PYSCRIPTS; do
|
||||
relocate_python3 "$rprefix"/scripts "$i"
|
||||
done
|
||||
for i in seastar/scripts/perftune.py seastar/scripts/seastar-addr2line; do
|
||||
for i in seastar/scripts/{perftune.py,addr2line.py,seastar-addr2line}; do
|
||||
relocate_python3 "$rprefix"/scripts "$i"
|
||||
done
|
||||
relocate_python3 "$rprefix"/scyllatop tools/scyllatop/scyllatop.py
|
||||
|
||||
@@ -39,7 +39,11 @@ abstract_replication_strategy::abstract_replication_strategy(
|
||||
replication_strategy_params params,
|
||||
replication_strategy_type my_type)
|
||||
: _config_options(params.options)
|
||||
, _my_type(my_type) {}
|
||||
, _my_type(my_type) {
|
||||
if (params.initial_tablets.has_value()) {
|
||||
_uses_tablets = true;
|
||||
}
|
||||
}
|
||||
|
||||
abstract_replication_strategy::ptr_type abstract_replication_strategy::create_replication_strategy(const sstring& strategy_name, replication_strategy_params params) {
|
||||
try {
|
||||
|
||||
@@ -67,6 +67,7 @@ class vnode_effective_replication_map;
|
||||
class effective_replication_map_factory;
|
||||
class per_table_replication_strategy;
|
||||
class tablet_aware_replication_strategy;
|
||||
class effective_replication_map;
|
||||
|
||||
|
||||
class abstract_replication_strategy : public seastar::enable_shared_from_this<abstract_replication_strategy> {
|
||||
@@ -98,6 +99,9 @@ protected:
|
||||
public:
|
||||
using ptr_type = seastar::shared_ptr<abstract_replication_strategy>;
|
||||
|
||||
// Check that the read replica set does not exceed what's allowed by the schema.
|
||||
[[nodiscard]] virtual sstring sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const = 0;
|
||||
|
||||
abstract_replication_strategy(
|
||||
replication_strategy_params params,
|
||||
replication_strategy_type my_type);
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "locator/everywhere_replication_strategy.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
@@ -33,6 +34,21 @@ size_t everywhere_replication_strategy::get_replication_factor(const token_metad
|
||||
return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners();
|
||||
}
|
||||
|
||||
void everywhere_replication_strategy::validate_options(const gms::feature_service&) const {
|
||||
if (_uses_tablets) {
|
||||
throw exceptions::configuration_exception("EverywhereStrategy doesn't support tablet replication");
|
||||
}
|
||||
}
|
||||
|
||||
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const {
|
||||
const auto replication_factor = erm.get_replication_factor();
|
||||
if (read_replicas.size() > replication_factor) {
|
||||
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, replication_strategy_params>;
|
||||
static registry registrator("org.apache.cassandra.locator.EverywhereStrategy");
|
||||
static registry registrator_short_name("EverywhereStrategy");
|
||||
|
||||
@@ -20,7 +20,7 @@ public:
|
||||
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const override { /* noop */ }
|
||||
virtual void validate_options(const gms::feature_service&) const override;
|
||||
|
||||
std::optional<std::unordered_set<sstring>> recognized_options(const topology&) const override {
|
||||
// We explicitly allow all options
|
||||
@@ -32,5 +32,7 @@ public:
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
[[nodiscard]] sstring sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const override;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <algorithm>
|
||||
#include "local_strategy.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
|
||||
namespace locator {
|
||||
@@ -23,6 +24,9 @@ future<host_id_set> local_strategy::calculate_natural_endpoints(const token& t,
|
||||
}
|
||||
|
||||
void local_strategy::validate_options(const gms::feature_service&) const {
|
||||
if (_uses_tablets) {
|
||||
throw exceptions::configuration_exception("LocalStrategy doesn't support tablet replication");
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::unordered_set<sstring>> local_strategy::recognized_options(const topology&) const {
|
||||
@@ -34,6 +38,13 @@ size_t local_strategy::get_replication_factor(const token_metadata&) const {
|
||||
return 1;
|
||||
}
|
||||
|
||||
sstring local_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const {
|
||||
if (read_replicas.size() > 1) {
|
||||
return seastar::format("local_strategy: the number of replicas for local_strategy is {}, cannot be higher than 1", read_replicas.size());
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, local_strategy, replication_strategy_params>;
|
||||
static registry registrator("org.apache.cassandra.locator.LocalStrategy");
|
||||
static registry registrator_short_name("LocalStrategy");
|
||||
|
||||
@@ -35,6 +35,8 @@ public:
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
[[nodiscard]] sstring sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "locator/load_sketch.hh"
|
||||
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include "exceptions/exceptions.hh"
|
||||
@@ -554,6 +556,36 @@ tablet_replica_set network_topology_strategy::drop_tablets_in_dc(schema_ptr s, c
|
||||
return filtered;
|
||||
}
|
||||
|
||||
sstring network_topology_strategy::sanity_check_read_replicas(const effective_replication_map& erm,
|
||||
const inet_address_vector_replica_set& read_replicas) const {
|
||||
const auto& topology = erm.get_topology();
|
||||
|
||||
struct rf_node_count {
|
||||
size_t replication_factor{0};
|
||||
size_t node_count{0};
|
||||
};
|
||||
|
||||
absl::flat_hash_map<sstring, rf_node_count> data_centers_replication_factor;
|
||||
std::ranges::for_each(read_replicas, [&data_centers_replication_factor, &topology, this](const auto& node) {
|
||||
auto res = data_centers_replication_factor.emplace(topology.get_datacenter(node), rf_node_count{0, 0});
|
||||
if (res.second) {
|
||||
// For new item add replication factor.
|
||||
res.first->second.replication_factor = get_replication_factor(res.first->first);
|
||||
}
|
||||
++res.first->second.node_count;
|
||||
});
|
||||
|
||||
for (const auto& [key, item] : data_centers_replication_factor) {
|
||||
if (item.replication_factor < item.node_count) {
|
||||
return seastar::format("network_topology_strategy: ERM inconsistency, Datacenter [{}] has higher count of read replicas (accounting for "
|
||||
"current consistency level): [{}] than its replication factor [{}]",
|
||||
key, item.node_count, item.replication_factor);
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, network_topology_strategy, replication_strategy_params>;
|
||||
static registry registrator("org.apache.cassandra.locator.NetworkTopologyStrategy");
|
||||
static registry registrator_short_name("NetworkTopologyStrategy");
|
||||
|
||||
@@ -42,6 +42,8 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
[[nodiscard]] sstring sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const override;
|
||||
|
||||
public: // tablet_aware_replication_strategy
|
||||
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const override;
|
||||
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, unsigned initial_scale) const override;
|
||||
|
||||
@@ -70,12 +70,25 @@ void simple_strategy::validate_options(const gms::feature_service&) const {
|
||||
throw exceptions::configuration_exception("SimpleStrategy requires a replication_factor strategy option.");
|
||||
}
|
||||
parse_replication_factor(it->second);
|
||||
if (_uses_tablets) {
|
||||
throw exceptions::configuration_exception("SimpleStrategy doesn't support tablet replication");
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::unordered_set<sstring>>simple_strategy::recognized_options(const topology&) const {
|
||||
return {{ "replication_factor" }};
|
||||
}
|
||||
|
||||
sstring simple_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const {
|
||||
if (read_replicas.size() > _replication_factor) {
|
||||
return seastar::format("ERM inconsistency, the read replica set for simple strategy has higher count of"
|
||||
" read replicas [{}] than its replication factor [{}]",
|
||||
read_replicas.size(),
|
||||
_replication_factor);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, simple_strategy, replication_strategy_params>;
|
||||
static registry registrator("org.apache.cassandra.locator.SimpleStrategy");
|
||||
static registry registrator_short_name("SimpleStrategy");
|
||||
|
||||
@@ -26,6 +26,8 @@ public:
|
||||
}
|
||||
|
||||
virtual future<host_id_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
[[nodiscard]] sstring sanity_check_read_replicas(const effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) const override;
|
||||
private:
|
||||
size_t _replication_factor = 1;
|
||||
};
|
||||
|
||||
@@ -200,9 +200,6 @@ future<> tablet_metadata::mutate_tablet_map_async(table_id id, noncopyable_funct
|
||||
}
|
||||
|
||||
future<tablet_metadata> tablet_metadata::copy() const {
|
||||
if (_tablets.empty()) {
|
||||
co_return tablet_metadata{};
|
||||
}
|
||||
tablet_metadata copy;
|
||||
for (const auto& e : _tablets) {
|
||||
copy._tablets.emplace(e.first, co_await e.second.copy());
|
||||
@@ -851,9 +848,8 @@ void tablet_aware_replication_strategy::validate_tablet_options(const abstract_r
|
||||
void tablet_aware_replication_strategy::process_tablet_options(abstract_replication_strategy& ars,
|
||||
replication_strategy_config_options& opts,
|
||||
replication_strategy_params params) {
|
||||
if (params.initial_tablets.has_value()) {
|
||||
_initial_tablets = *params.initial_tablets;
|
||||
ars._uses_tablets = true;
|
||||
if (ars._uses_tablets) {
|
||||
_initial_tablets = params.initial_tablets.value_or(0);
|
||||
mark_as_per_table(ars);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +164,7 @@ public:
|
||||
inet_address get_endpoint_for_host_id(host_id) const;
|
||||
|
||||
/** @return a copy of the endpoint-to-id map for read-only operations */
|
||||
std::unordered_map<inet_address, host_id> get_endpoint_to_host_id_map_for_reading() const;
|
||||
std::unordered_map<inet_address, host_id> get_endpoint_to_host_id_map() const;
|
||||
|
||||
void add_bootstrap_token(token t, host_id endpoint);
|
||||
|
||||
@@ -565,19 +565,18 @@ inet_address token_metadata_impl::get_endpoint_for_host_id(host_id host_id) cons
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_map<inet_address, host_id> token_metadata_impl::get_endpoint_to_host_id_map_for_reading() const {
|
||||
std::unordered_map<inet_address, host_id> token_metadata_impl::get_endpoint_to_host_id_map() const {
|
||||
const auto& nodes = _topology.get_nodes_by_endpoint();
|
||||
std::unordered_map<inet_address, host_id> map;
|
||||
map.reserve(nodes.size());
|
||||
for (const auto& [endpoint, node] : nodes) {
|
||||
// Restrict to members
|
||||
if (!node->is_member()) {
|
||||
if (node->left() || node->is_none()) {
|
||||
continue;
|
||||
}
|
||||
if (const auto& host_id = node->host_id()) {
|
||||
map[endpoint] = host_id;
|
||||
} else {
|
||||
tlogger.info("get_endpoint_to_host_id_map_for_reading: endpoint {} has null host_id: state={}", endpoint, node->get_state());
|
||||
tlogger.info("get_endpoint_to_host_id_map: endpoint {} has null host_id: state={}", endpoint, node->get_state());
|
||||
}
|
||||
}
|
||||
return map;
|
||||
@@ -1044,8 +1043,8 @@ token_metadata::get_endpoint_for_host_id(host_id host_id) const {
|
||||
}
|
||||
|
||||
std::unordered_map<inet_address, host_id>
|
||||
token_metadata::get_endpoint_to_host_id_map_for_reading() const {
|
||||
return _impl->get_endpoint_to_host_id_map_for_reading();
|
||||
token_metadata::get_endpoint_to_host_id_map() const {
|
||||
return _impl->get_endpoint_to_host_id_map();
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -77,6 +77,12 @@ struct host_id_or_endpoint {
|
||||
gms::inet_address resolve_endpoint(const token_metadata&) const;
|
||||
};
|
||||
|
||||
using host_id_or_endpoint_list = std::vector<host_id_or_endpoint>;
|
||||
|
||||
[[nodiscard]] inline bool check_host_ids_contain_only_uuid(const auto& host_ids) {
|
||||
return std::ranges::none_of(host_ids, [](const auto& node_str) { return locator::host_id_or_endpoint{node_str}.has_endpoint(); });
|
||||
}
|
||||
|
||||
class token_metadata_impl;
|
||||
struct topology_change_info;
|
||||
|
||||
@@ -230,7 +236,7 @@ public:
|
||||
inet_address get_endpoint_for_host_id(locator::host_id host_id) const;
|
||||
|
||||
/** @return a copy of the endpoint-to-id map for read-only operations */
|
||||
std::unordered_map<inet_address, host_id> get_endpoint_to_host_id_map_for_reading() const;
|
||||
std::unordered_map<inet_address, host_id> get_endpoint_to_host_id_map() const;
|
||||
|
||||
/// Returns host_id of the local node.
|
||||
host_id get_my_id() const;
|
||||
|
||||
13
main.cc
13
main.cc
@@ -1389,7 +1389,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
scfg.statement_tenants = {
|
||||
{dbcfg.statement_scheduling_group, "$user"},
|
||||
{default_scheduling_group(), "$system"},
|
||||
{dbcfg.streaming_scheduling_group, "$maintenance"}
|
||||
{dbcfg.streaming_scheduling_group, "$maintenance", false}
|
||||
};
|
||||
scfg.streaming = dbcfg.streaming_scheduling_group;
|
||||
scfg.gossip = dbcfg.gossip_scheduling_group;
|
||||
@@ -1404,7 +1404,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}
|
||||
|
||||
// Delay listening messaging_service until gossip message handlers are registered
|
||||
messaging.start(mscfg, scfg, creds).get();
|
||||
messaging.start(mscfg, scfg, creds, std::ref(feature_service)).get();
|
||||
auto stop_ms = defer_verbose_shutdown("messaging service", [&messaging] {
|
||||
messaging.invoke_on_all(&netw::messaging_service::stop).get();
|
||||
});
|
||||
@@ -1511,7 +1511,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// group0 client exists only on shard 0.
|
||||
// The client has to be created before `stop_raft` since during
|
||||
// destruction it has to exist until raft_gr.stop() completes.
|
||||
service::raft_group0_client group0_client{raft_gr.local(), sys_ks.local(), maintenance_mode_enabled{cfg->maintenance_mode()}};
|
||||
service::raft_group0_client group0_client{raft_gr.local(), sys_ks.local(), token_metadata.local(), maintenance_mode_enabled{cfg->maintenance_mode()}};
|
||||
|
||||
service::raft_group0 group0_service{
|
||||
stop_signal.as_local_abort_source(), raft_gr.local(), messaging,
|
||||
@@ -1944,6 +1944,13 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
ss.local().uninit_address_map().get();
|
||||
});
|
||||
|
||||
// Need to make sure storage service stopped using group0 before running group0_service.abort()
|
||||
// Normally it is done in storage_service::do_drain(), but in case start up fail we need to do it
|
||||
// here as well
|
||||
auto stop_group0_usage_in_storage_service = defer_verbose_shutdown("group 0 usage in local storage", [&ss] {
|
||||
ss.local().wait_for_group0_stop().get();
|
||||
});
|
||||
|
||||
// Setup group0 early in case the node is bootstrapped already and the group exists.
|
||||
// Need to do it before allowing incoming messaging service connections since
|
||||
// storage proxy's and migration manager's verbs may access group0.
|
||||
|
||||
@@ -119,6 +119,7 @@
|
||||
#include "idl/mapreduce_request.dist.impl.hh"
|
||||
#include "idl/storage_service.dist.impl.hh"
|
||||
#include "idl/join_node.dist.impl.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
|
||||
namespace netw {
|
||||
|
||||
@@ -232,9 +233,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
|
||||
return _rpc->unregister_handler(verb);
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port)
|
||||
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service)
|
||||
: messaging_service(config{std::move(id), ip, ip, port},
|
||||
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
|
||||
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr, feature_service)
|
||||
{}
|
||||
|
||||
static
|
||||
@@ -419,13 +420,14 @@ void messaging_service::do_start_listen() {
|
||||
}
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials)
|
||||
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials, gms::feature_service& feature_service)
|
||||
: _cfg(std::move(cfg))
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
|
||||
, _clients(PER_SHARD_CONNECTION_COUNT + scfg.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT)
|
||||
, _scheduling_config(scfg)
|
||||
, _scheduling_info_for_connection_index(initial_scheduling_info())
|
||||
, _feature_service(feature_service)
|
||||
{
|
||||
_rpc->set_logger(&rpc_logger);
|
||||
|
||||
@@ -434,7 +436,8 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
|
||||
// which in turn relies on _connection_index_for_tenant to be initialized.
|
||||
_connection_index_for_tenant.reserve(_scheduling_config.statement_tenants.size());
|
||||
for (unsigned i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
|
||||
_connection_index_for_tenant.push_back({_scheduling_config.statement_tenants[i].sched_group, i});
|
||||
auto& tenant_cfg = _scheduling_config.statement_tenants[i];
|
||||
_connection_index_for_tenant.push_back({tenant_cfg.sched_group, i, tenant_cfg.enabled});
|
||||
}
|
||||
|
||||
register_handler(this, messaging_verb::CLIENT_ID, [this] (rpc::client_info& ci, gms::inet_address broadcast_address, uint32_t src_cpu_id, rpc::optional<uint64_t> max_result_size, rpc::optional<utils::UUID> host_id) {
|
||||
@@ -457,6 +460,7 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
|
||||
});
|
||||
|
||||
init_local_preferred_ip_cache(_cfg.preferred_ips);
|
||||
init_feature_listeners();
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
@@ -679,16 +683,22 @@ messaging_service::get_rpc_client_idx(messaging_verb verb) const {
|
||||
return idx;
|
||||
}
|
||||
|
||||
// A statement or statement-ack verb
|
||||
const auto curr_sched_group = current_scheduling_group();
|
||||
for (unsigned i = 0; i < _connection_index_for_tenant.size(); ++i) {
|
||||
if (_connection_index_for_tenant[i].sched_group == curr_sched_group) {
|
||||
// i == 0: the default tenant maps to the default client indexes belonging to the interval
|
||||
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
|
||||
idx += i * PER_TENANT_CONNECTION_COUNT;
|
||||
break;
|
||||
if (_connection_index_for_tenant[i].enabled) {
|
||||
// i == 0: the default tenant maps to the default client indexes belonging to the interval
|
||||
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
|
||||
idx += i * PER_TENANT_CONNECTION_COUNT;
|
||||
break;
|
||||
} else {
|
||||
// If the tenant is disable, immediately return current index to
|
||||
// use $system tenant.
|
||||
return idx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return idx;
|
||||
}
|
||||
|
||||
@@ -793,6 +803,22 @@ void messaging_service::cache_preferred_ip(gms::inet_address ep, gms::inet_addre
|
||||
remove_rpc_client(msg_addr(ep));
|
||||
}
|
||||
|
||||
void messaging_service::init_feature_listeners() {
|
||||
_maintenance_tenant_enabled_listener = _feature_service.maintenance_tenant.when_enabled([this] {
|
||||
enable_scheduling_tenant("$maintenance");
|
||||
});
|
||||
}
|
||||
|
||||
void messaging_service::enable_scheduling_tenant(std::string_view name) {
|
||||
for (size_t i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
|
||||
if (_scheduling_config.statement_tenants[i].name == name) {
|
||||
_scheduling_config.statement_tenants[i].enabled = true;
|
||||
_connection_index_for_tenant[i].enabled = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gms::inet_address messaging_service::get_public_endpoint_for(const gms::inet_address& ip) const {
|
||||
auto i = _preferred_to_endpoint.find(ip);
|
||||
return i != _preferred_to_endpoint.end() ? i->second : ip;
|
||||
|
||||
@@ -45,6 +45,7 @@ namespace gms {
|
||||
class gossip_digest_ack2;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
class feature_service;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
@@ -299,6 +300,7 @@ public:
|
||||
struct tenant {
|
||||
scheduling_group sched_group;
|
||||
sstring name;
|
||||
bool enabled = true;
|
||||
};
|
||||
// Must have at least one element. No two tenants should have the same
|
||||
// scheduling group. [0] is the default tenant, that all unknown
|
||||
@@ -319,6 +321,7 @@ private:
|
||||
struct tenant_connection_index {
|
||||
scheduling_group sched_group;
|
||||
unsigned cliend_idx;
|
||||
bool enabled;
|
||||
};
|
||||
private:
|
||||
config _cfg;
|
||||
@@ -337,6 +340,7 @@ private:
|
||||
scheduling_config _scheduling_config;
|
||||
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
|
||||
std::vector<tenant_connection_index> _connection_index_for_tenant;
|
||||
gms::feature_service& _feature_service;
|
||||
|
||||
struct connection_ref;
|
||||
std::unordered_multimap<locator::host_id, connection_ref> _host_connections;
|
||||
@@ -351,8 +355,8 @@ private:
|
||||
public:
|
||||
using clock_type = lowres_clock;
|
||||
|
||||
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port);
|
||||
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>);
|
||||
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service);
|
||||
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>, gms::feature_service& feature_service);
|
||||
~messaging_service();
|
||||
|
||||
future<> start();
|
||||
@@ -544,6 +548,12 @@ public:
|
||||
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
|
||||
unsigned get_rpc_client_idx(messaging_verb verb) const;
|
||||
static constexpr std::array<std::string_view, 3> _connection_types_prefix = {"statement:", "statement-ack:", "forward:"}; // "forward" is the old name for "mapreduce"
|
||||
|
||||
void init_feature_listeners();
|
||||
private:
|
||||
std::any _maintenance_tenant_enabled_listener;
|
||||
|
||||
void enable_scheduling_tenant(std::string_view name);
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
|
||||
@@ -215,6 +215,7 @@ public:
|
||||
mutation_partition as_mutation_partition(const schema&) const;
|
||||
private:
|
||||
// Erases the entry if it's safe to do so without changing the logical state of the partition.
|
||||
// (It's allowed to evict empty row entries, though).
|
||||
rows_type::iterator maybe_drop(const schema&, cache_tracker*, rows_type::iterator, mutation_application_stats&);
|
||||
void insert_row(const schema& s, const clustering_key& key, deletable_row&& row);
|
||||
void insert_row(const schema& s, const clustering_key& key, const deletable_row& row);
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/coroutine.hh"
|
||||
#include "real_dirty_memory_accounter.hh"
|
||||
#include "clustering_interval_set.hh"
|
||||
|
||||
static void remove_or_mark_as_unique_owner(partition_version* current, mutation_cleaner* cleaner)
|
||||
{
|
||||
@@ -638,6 +639,15 @@ mutation_partition_v2 partition_entry::squashed_v2(const schema& to, is_evictabl
|
||||
return mp;
|
||||
}
|
||||
|
||||
clustering_interval_set partition_entry::squashed_continuity(const schema& s)
|
||||
{
|
||||
clustering_interval_set result;
|
||||
for (auto&& v : _version->all_elements()) {
|
||||
result.add(s, v.partition().as_mutation_partition(*v.get_schema()).get_continuity(s));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
mutation_partition partition_entry::squashed(const schema& s, is_evictable evictable)
|
||||
{
|
||||
return squashed_v2(s, evictable).as_mutation_partition(s);
|
||||
|
||||
@@ -682,6 +682,7 @@ public:
|
||||
}
|
||||
|
||||
mutation_partition_v2 squashed_v2(const schema& to, is_evictable);
|
||||
clustering_interval_set squashed_continuity(const schema&);
|
||||
mutation_partition squashed(const schema&, is_evictable);
|
||||
tombstone partition_tombstone() const;
|
||||
|
||||
|
||||
@@ -186,6 +186,8 @@ std::set<gms::inet_address> task_manager_module::get_nodes() const noexcept {
|
||||
_ss._topology_state_machine._topology.transition_nodes
|
||||
) | boost::adaptors::transformed([&ss = _ss] (auto& node) {
|
||||
return ss.host2ip(locator::host_id{node.first.uuid()});
|
||||
}) | boost::adaptors::filtered([&ss = _ss] (auto& ip) {
|
||||
return ss._gossiper.is_alive(ip);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1367,7 +1367,7 @@ reader_concurrency_semaphore::can_admit_read(const reader_permit::impl& permit)
|
||||
}
|
||||
|
||||
if (!has_available_units(permit.base_resources())) {
|
||||
auto reason = _resources.memory >= permit.base_resources().memory ? reason::memory_resources : reason::count_resources;
|
||||
auto reason = _resources.memory >= permit.base_resources().memory ? reason::count_resources : reason::memory_resources;
|
||||
if (_inactive_reads.empty()) {
|
||||
return {can_admit::no, reason};
|
||||
} else {
|
||||
|
||||
@@ -1081,6 +1081,20 @@ multishard_combining_reader_v2::multishard_combining_reader_v2(
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: impl(std::move(s), std::move(permit)), _keep_alive_sharder(std::move(keep_alive_sharder)), _sharder(sharder) {
|
||||
|
||||
// The permit of the multishard reader is destroyed after the permits of its child readers.
|
||||
// Therefore its semaphore resources won't be automatically released
|
||||
// until children acquire their own resources.
|
||||
//
|
||||
// This creates a dependency (an edge in the "resource allocation graph"),
|
||||
// where the semaphore used by the multishard reader depends on the semaphores used by children.
|
||||
// When such dependencies create a cycle, and permits are acquired by different reads
|
||||
// in just the right order, a deadlock will happen.
|
||||
//
|
||||
// One way to prevent the deadlock is to avoid the resource dependency by ensuring
|
||||
// that the resources of multishard reader are released before the children attempt to acquire theirs.
|
||||
// We do this here.
|
||||
_permit.release_base_resources();
|
||||
|
||||
on_partition_range_change(pr);
|
||||
|
||||
_shard_readers.reserve(_sharder.shard_count());
|
||||
|
||||
@@ -1732,7 +1732,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
}
|
||||
auto nr_ranges = desired_ranges.size();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get();
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges * nr_tables);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | boost::adaptors::map_keys);
|
||||
});
|
||||
@@ -1914,12 +1914,12 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
}
|
||||
temp.clear_gently().get();
|
||||
if (reason == streaming::stream_reason::decommission) {
|
||||
container().invoke_on_all([nr_ranges_skipped] (repair_service& rs) {
|
||||
rs.get_metrics().decommission_finished_ranges += nr_ranges_skipped;
|
||||
container().invoke_on_all([nr_ranges_skipped, nr_tables] (repair_service& rs) {
|
||||
rs.get_metrics().decommission_finished_ranges += nr_ranges_skipped * nr_tables;
|
||||
}).get();
|
||||
} else if (reason == streaming::stream_reason::removenode) {
|
||||
container().invoke_on_all([nr_ranges_skipped] (repair_service& rs) {
|
||||
rs.get_metrics().removenode_finished_ranges += nr_ranges_skipped;
|
||||
container().invoke_on_all([nr_ranges_skipped, nr_tables] (repair_service& rs) {
|
||||
rs.get_metrics().removenode_finished_ranges += nr_ranges_skipped * nr_tables;
|
||||
}).get();
|
||||
}
|
||||
if (is_removenode) {
|
||||
@@ -1928,7 +1928,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
auto nr_ranges_synced = ranges.size();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, ops).get();
|
||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced * nr_tables, nr_ranges_skipped * nr_tables);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, leaving_node={}", op, ks_erms | boost::adaptors::map_keys, leaving_node);
|
||||
});
|
||||
@@ -2148,7 +2148,7 @@ future<> repair_service::do_rebuild_replace_with_repair(std::unordered_map<sstri
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(keyspace_name, erm, std::move(ranges), std::move(range_sources), reason, nullptr).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc_for_keyspace, nr_ranges);
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc_for_keyspace, nr_ranges * nr_tables);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, ks_erms | boost::adaptors::map_keys, source_dc);
|
||||
});
|
||||
|
||||
@@ -298,10 +298,6 @@ mutation_reader repair_reader::make_reader(
|
||||
return rd;
|
||||
}
|
||||
case read_strategy::multishard_split: {
|
||||
// We can't have two permits with count resource for 1 repair.
|
||||
// So we release the one on _permit so the only one is the one the
|
||||
// shard reader will obtain.
|
||||
_permit.release_base_resources();
|
||||
return make_multishard_streaming_reader(db, _schema, _permit, [this] {
|
||||
auto shard_range = _sharder.next();
|
||||
if (shard_range) {
|
||||
@@ -311,10 +307,6 @@ mutation_reader repair_reader::make_reader(
|
||||
}, compaction_time);
|
||||
}
|
||||
case read_strategy::multishard_filter: {
|
||||
// We can't have two permits with count resource for 1 repair.
|
||||
// So we release the one on _permit so the only one is the one the
|
||||
// shard reader will obtain.
|
||||
_permit.release_base_resources();
|
||||
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time),
|
||||
[&remote_sharder, remote_shard](const dht::decorated_key& k) {
|
||||
return remote_sharder.shard_for_reads(k.token()) == remote_shard;
|
||||
@@ -349,11 +341,6 @@ repair_reader::repair_reader(
|
||||
future<mutation_fragment_opt>
|
||||
repair_reader::read_mutation_fragment() {
|
||||
++_reads_issued;
|
||||
// Use a very long timeout for the reader to break out any eventual
|
||||
// deadlock within the reader. Thirty minutes should be more than
|
||||
// enough to read a single mutation fragment.
|
||||
auto timeout = db::timeout_clock::now() + std::chrono::minutes(30);
|
||||
_reader.set_timeout(timeout); // reset to db::no_timeout in pause()
|
||||
return _reader().then_wrapped([this] (future<mutation_fragment_opt> f) {
|
||||
try {
|
||||
auto mfopt = f.get();
|
||||
@@ -397,7 +384,6 @@ void repair_reader::check_current_dk() {
|
||||
}
|
||||
|
||||
void repair_reader::pause() {
|
||||
_reader.set_timeout(db::no_timeout);
|
||||
if (_reader_handle) {
|
||||
_reader_handle->pause();
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@ class compaction_group {
|
||||
seastar::condition_variable _staging_done_condition;
|
||||
// Gates async operations confined to a single group.
|
||||
seastar::gate _async_gate;
|
||||
bool _tombstone_gc_enabled = true;
|
||||
private:
|
||||
// Adds new sstable to the set of sstables
|
||||
// Doesn't update the cache. The cache must be synchronized in order for reads to see
|
||||
@@ -77,6 +78,15 @@ private:
|
||||
// An input SSTable remains linked if it wasn't actually compacted, yet compaction manager wants
|
||||
// it to be moved from its original sstable set (e.g. maintenance) into a new one (e.g. main).
|
||||
future<> delete_unused_sstables(sstables::compaction_completion_desc desc);
|
||||
// Tracks the maximum timestamp observed across all SSTables in this group.
|
||||
// This is used by the compacting reader to determine if a memtable contains entries
|
||||
// with timestamps that overlap with those in the SSTables of the compaction group.
|
||||
// For this purpose, tracking the maximum seen timestamp is sufficient rather than the
|
||||
// actual maximum across all SSTables. So, the variable is updated only when a new SSTable
|
||||
// is added to the group. While `set_main_sstables` and `set_maintenance_sstables` can
|
||||
// replace entire sstable sets, they are still called only by compaction, so the maximum
|
||||
// seen timestamp remains the same and there is no need to update the variable in those cases.
|
||||
api::timestamp_type _max_seen_timestamp = api::missing_timestamp;
|
||||
public:
|
||||
compaction_group(table& t, size_t gid, dht::token_range token_range);
|
||||
~compaction_group();
|
||||
@@ -114,6 +124,14 @@ public:
|
||||
return _token_range;
|
||||
}
|
||||
|
||||
void set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
|
||||
_tombstone_gc_enabled = tombstone_gc_enabled;
|
||||
}
|
||||
|
||||
bool tombstone_gc_enabled() const noexcept {
|
||||
return _tombstone_gc_enabled;
|
||||
}
|
||||
|
||||
void set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept;
|
||||
|
||||
lw_shared_ptr<memtable_list>& memtables() noexcept;
|
||||
@@ -130,6 +148,7 @@ public:
|
||||
void add_sstable(sstables::shared_sstable sstable);
|
||||
// Add sstable to maintenance set
|
||||
void add_maintenance_sstable(sstables::shared_sstable sst);
|
||||
api::timestamp_type max_seen_timestamp() const { return _max_seen_timestamp; }
|
||||
|
||||
// Update main sstable set based on info in completion descriptor, where input sstables
|
||||
// will be replaced by output ones, row cache ranges are possibly invalidated and
|
||||
@@ -246,6 +265,9 @@ public:
|
||||
bool no_compacted_sstable_undeleted() const;
|
||||
|
||||
future<> stop(sstring reason = "table removal") noexcept;
|
||||
|
||||
// Clear sstable sets
|
||||
void clear_sstables();
|
||||
};
|
||||
|
||||
using storage_group_ptr = lw_shared_ptr<storage_group>;
|
||||
@@ -305,6 +327,7 @@ public:
|
||||
const storage_group_map& storage_groups() const;
|
||||
|
||||
future<> stop_storage_groups() noexcept;
|
||||
void clear_storage_groups();
|
||||
void remove_storage_group(size_t id);
|
||||
storage_group& storage_group_for_id(const schema_ptr&, size_t i) const;
|
||||
storage_group* maybe_storage_group_for_id(const schema_ptr&, size_t i) const;
|
||||
|
||||
@@ -1139,6 +1139,18 @@ std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> databa
|
||||
return res;
|
||||
}
|
||||
|
||||
std::vector<sstring> database::get_tablets_keyspaces() const {
|
||||
std::vector<sstring> res;
|
||||
res.reserve(_keyspaces.size());
|
||||
for (auto const& [name, ks] : _keyspaces) {
|
||||
auto&& rs = ks.get_replication_strategy();
|
||||
if (rs.is_per_table()) {
|
||||
res.emplace_back(name);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
std::vector<lw_shared_ptr<column_family>> database::get_non_system_column_families() const {
|
||||
return boost::copy_range<std::vector<lw_shared_ptr<column_family>>>(
|
||||
get_tables_metadata().filter([] (auto uuid_and_cf) {
|
||||
@@ -2273,6 +2285,13 @@ future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, std:
|
||||
return flush_table_on_all_shards(sharded_db, sharded_db.local().find_uuid(ks_name, table_name));
|
||||
}
|
||||
|
||||
static future<> force_new_commitlog_segments(std::unique_ptr<db::commitlog>& cl1, std::unique_ptr<db::commitlog>& cl2) {
|
||||
co_await cl1->force_new_active_segment();
|
||||
if (cl2) {
|
||||
co_await cl2->force_new_active_segment();
|
||||
}
|
||||
}
|
||||
|
||||
future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names) {
|
||||
/**
|
||||
* #14870
|
||||
@@ -2283,7 +2302,7 @@ future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std
|
||||
* as sstable-ish a universe as we can, as soon as we can.
|
||||
*/
|
||||
return sharded_db.invoke_on_all([] (replica::database& db) {
|
||||
return db._commitlog->force_new_active_segment();
|
||||
return force_new_commitlog_segments(db._commitlog, db._schema_commitlog);
|
||||
}).then([&, ks_name, table_names = std::move(table_names)] {
|
||||
return parallel_for_each(table_names, [&, ks_name] (const auto& table_name) {
|
||||
return flush_table_on_all_shards(sharded_db, ks_name, table_name);
|
||||
@@ -2294,7 +2313,7 @@ future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std
|
||||
future<> database::flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
|
||||
// see above
|
||||
return sharded_db.invoke_on_all([] (replica::database& db) {
|
||||
return db._commitlog->force_new_active_segment();
|
||||
return force_new_commitlog_segments(db._commitlog, db._schema_commitlog);
|
||||
}).then([&, ks_name] {
|
||||
auto& ks = sharded_db.local().find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) {
|
||||
|
||||
@@ -620,6 +620,12 @@ private:
|
||||
// can wait on compaction (backpressure) which in turn takes deletion guard on completion.
|
||||
future<> safe_foreach_sstable(const sstables::sstable_set&, noncopyable_function<future<>(const sstables::shared_sstable&)> action);
|
||||
|
||||
// Returns a sstable set that can be safely used for purging any expired tombstone in a compaction group.
|
||||
// Only the sstables in the compaction group is not sufficient, since there might be other compaction
|
||||
// groups during tablet split with overlapping token range, and we need to include them all in a single
|
||||
// sstable set to allow safe tombstone gc.
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc(const compaction_group&) const;
|
||||
|
||||
bool cache_enabled() const {
|
||||
return _config.enable_cache && _schema->caching_options().enabled();
|
||||
}
|
||||
@@ -668,6 +674,7 @@ private:
|
||||
// Compound sstable set must be refreshed whenever any of its managed sets are changed
|
||||
void refresh_compound_sstable_set();
|
||||
|
||||
max_purgeable_fn get_max_purgeable_fn_for_cache_underlying_reader() const;
|
||||
snapshot_source sstables_as_snapshot_source();
|
||||
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<const sstables::sstable_set>);
|
||||
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
|
||||
@@ -1628,6 +1635,7 @@ public:
|
||||
}
|
||||
|
||||
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const { return _shared_token_metadata.get(); }
|
||||
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }
|
||||
|
||||
lang::manager& lang() noexcept { return _lang_manager; }
|
||||
@@ -1672,6 +1680,7 @@ public:
|
||||
std::vector<sstring> get_non_local_strategy_keyspaces() const;
|
||||
std::vector<sstring> get_non_local_vnode_based_strategy_keyspaces() const;
|
||||
std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> get_non_local_strategy_keyspaces_erms() const;
|
||||
std::vector<sstring> get_tablets_keyspaces() const;
|
||||
column_family& find_column_family(std::string_view ks, std::string_view name);
|
||||
const column_family& find_column_family(std::string_view ks, std::string_view name) const;
|
||||
column_family& find_column_family(const table_id&);
|
||||
|
||||
@@ -214,7 +214,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, shard
|
||||
[] (const sstables::shared_sstable&) { return true; }).get();
|
||||
|
||||
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get();
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get();
|
||||
|
||||
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb] (sstables::sstable_directory& dir) {
|
||||
return make_sstables_available(dir, db, vb, use_view_update_path, ks, cf);
|
||||
|
||||
@@ -541,6 +541,7 @@ compaction_group::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
if (backlog_tracker) {
|
||||
table::add_sstable_to_backlog_tracker(get_backlog_tracker(), sstable);
|
||||
}
|
||||
_max_seen_timestamp = std::max(_max_seen_timestamp, sstable->get_stats_metadata().max_timestamp);
|
||||
return new_sstables;
|
||||
}
|
||||
|
||||
@@ -627,7 +628,9 @@ future<> storage_group_manager::for_each_storage_group_gently(std::function<futu
|
||||
|
||||
void storage_group_manager::for_each_storage_group(std::function<void(size_t, storage_group&)> f) const {
|
||||
for (auto& [id, sg]: _storage_groups) {
|
||||
f(id, *sg);
|
||||
if (auto holder = try_hold_gate(sg->async_gate())) {
|
||||
f(id, *sg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,6 +642,12 @@ future<> storage_group_manager::stop_storage_groups() noexcept {
|
||||
return parallel_for_each(_storage_groups | boost::adaptors::map_values, [] (auto sg) { return sg->stop("table removal"); });
|
||||
}
|
||||
|
||||
void storage_group_manager::clear_storage_groups() {
|
||||
for (auto& [id, sg]: _storage_groups) {
|
||||
sg->clear_sstables();
|
||||
}
|
||||
}
|
||||
|
||||
void storage_group_manager::remove_storage_group(size_t id) {
|
||||
if (auto it = _storage_groups.find(id); it != _storage_groups.end()) {
|
||||
_storage_groups.erase(it);
|
||||
@@ -775,6 +784,15 @@ private:
|
||||
#endif
|
||||
return { idx, side };
|
||||
}
|
||||
|
||||
storage_group_ptr allocate_storage_group(const locator::tablet_map& tmap, locator::tablet_id tid, dht::token_range range) const {
|
||||
auto cg = make_lw_shared<compaction_group>(_t, tid.value(), std::move(range));
|
||||
auto sg = make_lw_shared<storage_group>(std::move(cg));
|
||||
if (tmap.needs_split()) {
|
||||
sg->set_split_mode();
|
||||
}
|
||||
return sg;
|
||||
}
|
||||
public:
|
||||
tablet_storage_group_manager(table& t, const locator::effective_replication_map& erm)
|
||||
: _t(t)
|
||||
@@ -791,8 +809,7 @@ public:
|
||||
|
||||
if (tmap.has_replica(tid, local_replica)) {
|
||||
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
|
||||
auto cg = make_lw_shared<compaction_group>(_t, tid.value(), std::move(range));
|
||||
ret[tid.value()] = make_lw_shared<storage_group>(std::move(cg));
|
||||
ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range));
|
||||
}
|
||||
}
|
||||
_storage_groups = std::move(ret);
|
||||
@@ -898,6 +915,7 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) {
|
||||
if (set_split_mode()) {
|
||||
co_return;
|
||||
}
|
||||
co_await utils::get_local_injector().inject("delay_split_compaction", 5s);
|
||||
|
||||
if (_main_cg->empty()) {
|
||||
co_return;
|
||||
@@ -923,6 +941,11 @@ lw_shared_ptr<const sstables::sstable_set> storage_group::make_sstable_set() con
|
||||
return make_lw_shared(sstables::make_compound_sstable_set(schema, std::move(underlying)));
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstables::sstable_set> table::sstable_set_for_tombstone_gc(const compaction_group& cg) const {
|
||||
auto& sg = storage_group_for_id(cg.group_id());
|
||||
return sg.make_sstable_set();
|
||||
}
|
||||
|
||||
bool tablet_storage_group_manager::all_storage_groups_split() {
|
||||
auto& tmap = tablet_map();
|
||||
if (_split_ready_seq_number == tmap.resize_decision().sequence_number) {
|
||||
@@ -1576,9 +1599,7 @@ table::stop() {
|
||||
co_await _sstable_deletion_gate.close();
|
||||
co_await std::move(gate_closed_fut);
|
||||
co_await get_row_cache().invalidate(row_cache::external_updater([this] {
|
||||
for_each_compaction_group([] (compaction_group& cg) {
|
||||
cg.clear_sstables();
|
||||
});
|
||||
_sg_manager->clear_storage_groups();
|
||||
_sstables = make_compound_sstable_set();
|
||||
}));
|
||||
_cache.refresh_snapshot();
|
||||
@@ -2183,6 +2204,9 @@ public:
|
||||
const sstables::sstable_set& maintenance_sstable_set() const override {
|
||||
return *_cg.maintenance_sstables();
|
||||
}
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
||||
return _t.sstable_set_for_tombstone_gc(_cg);
|
||||
}
|
||||
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
|
||||
return sstables::get_fully_expired_sstables(*this, sstables, query_time);
|
||||
}
|
||||
@@ -2232,7 +2256,7 @@ public:
|
||||
return _t.is_auto_compaction_disabled_by_user();
|
||||
}
|
||||
bool tombstone_gc_enabled() const noexcept override {
|
||||
return _t._tombstone_gc_enabled;
|
||||
return _t.tombstone_gc_enabled() && _cg.tombstone_gc_enabled();
|
||||
}
|
||||
const tombstone_gc_state& get_tombstone_gc_state() const noexcept override {
|
||||
return _t.get_compaction_manager().get_tombstone_gc_state();
|
||||
@@ -2288,6 +2312,12 @@ void compaction_group::clear_sstables() {
|
||||
_maintenance_sstables = _t.make_maintenance_sstable_set();
|
||||
}
|
||||
|
||||
void storage_group::clear_sstables() {
|
||||
for (auto cg : compaction_groups()) {
|
||||
cg->clear_sstables();
|
||||
}
|
||||
}
|
||||
|
||||
table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_options> sopts, compaction_manager& compaction_manager,
|
||||
sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker,
|
||||
locator::effective_replication_map_ptr erm)
|
||||
@@ -2425,11 +2455,22 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo
|
||||
auto transition_info = transition.second;
|
||||
if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) {
|
||||
auto range = new_tablet_map->get_token_range(tid);
|
||||
auto cg = make_lw_shared<compaction_group>(_t, tid.value(), std::move(range));
|
||||
_storage_groups[tid.value()] = make_lw_shared<storage_group>(std::move(cg));
|
||||
_storage_groups[tid.value()] = allocate_storage_group(*new_tablet_map, tid, std::move(range));
|
||||
tablet_migrating_in = true;
|
||||
}
|
||||
}
|
||||
|
||||
// update the per compaction group tombstone GC enabled flag
|
||||
for_each_storage_group([&] (size_t group_id, storage_group& sg) {
|
||||
const locator::tablet_id tid = static_cast<locator::tablet_id>(group_id);
|
||||
const locator::tablet_info& tinfo = new_tablet_map->get_tablet_info(tid);
|
||||
const bool tombstone_gc_enabled = std::ranges::contains(tinfo.replicas, this_replica);
|
||||
|
||||
sg.for_each_compaction_group([tombstone_gc_enabled] (const compaction_group_ptr& cg_ptr) {
|
||||
cg_ptr->set_tombstone_gc_enabled(tombstone_gc_enabled);
|
||||
});
|
||||
});
|
||||
|
||||
// TODO: possibly use row_cache::invalidate(external_updater) instead on all ranges of new replicas,
|
||||
// as underlying source will be refreshed and external_updater::execute can refresh the sstable set.
|
||||
// Also serves as a protection for clearing the cache on the new range, although it shouldn't be a
|
||||
@@ -2489,6 +2530,32 @@ table::make_partition_presence_checker(lw_shared_ptr<const sstables::sstable_set
|
||||
};
|
||||
}
|
||||
|
||||
max_purgeable_fn table::get_max_purgeable_fn_for_cache_underlying_reader() const {
|
||||
return [this](const dht::decorated_key& dk, ::is_shadowable is_shadowable) {
|
||||
auto& sg = storage_group_for_token(dk.token());
|
||||
auto max_purgeable_timestamp = api::max_timestamp;
|
||||
|
||||
sg.for_each_compaction_group([&dk, is_shadowable, &max_purgeable_timestamp] (const compaction_group_ptr& cg) {
|
||||
const auto& mt = cg->memtables()->active_memtable();
|
||||
// see get_max_purgeable_timestamp() in compaction.cc for comments on choosing min timestamp
|
||||
api::timestamp_type memtable_min_timestamp = is_shadowable ? mt.get_min_live_row_marker_timestamp() : mt.get_min_live_timestamp();
|
||||
if (memtable_min_timestamp > cg->max_seen_timestamp()) {
|
||||
// All the entries in the memtable are newer than the entries in the
|
||||
// SSTable within this compaction group. So, no need to check further.
|
||||
return;
|
||||
}
|
||||
|
||||
// If a memtable with a minimum timestamp lower than the current maximum
|
||||
// purgeable timestamp has the given key, the tombstone should not be purged.
|
||||
if (memtable_min_timestamp < max_purgeable_timestamp && mt.contains_partition(dk)) {
|
||||
max_purgeable_timestamp = memtable_min_timestamp;
|
||||
}
|
||||
});
|
||||
|
||||
return max_purgeable_timestamp;
|
||||
};
|
||||
}
|
||||
|
||||
snapshot_source
|
||||
table::sstables_as_snapshot_source() {
|
||||
return snapshot_source([this] () {
|
||||
@@ -2504,7 +2571,7 @@ table::sstables_as_snapshot_source() {
|
||||
return make_compacting_reader(
|
||||
std::move(reader),
|
||||
gc_clock::now(),
|
||||
can_always_purge,
|
||||
get_max_purgeable_fn_for_cache_underlying_reader(),
|
||||
_compaction_manager.get_tombstone_gc_state(),
|
||||
fwd);
|
||||
}, [this, sst_set] {
|
||||
@@ -3754,6 +3821,7 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato
|
||||
co_await clear_inactive_reads_for_tablet(db, sg);
|
||||
// compaction_group::stop takes care of flushing.
|
||||
co_await stop_compaction_groups(sg);
|
||||
co_await utils::get_local_injector().inject("delay_tablet_compaction_groups_cleanup", std::chrono::seconds(5));
|
||||
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
|
||||
_sg_manager->remove_storage_group(tid.value());
|
||||
}
|
||||
|
||||
@@ -245,6 +245,50 @@ static void do_update_tablet_metadata_change_hint(locator::tablet_metadata_chang
|
||||
}
|
||||
}
|
||||
|
||||
static std::optional<tablet_replica_set> maybe_deserialize_replica_set(const rows_entry& row, const column_definition& cdef) {
|
||||
const auto* cell = row.row().cells().find_cell(cdef.id);
|
||||
if (!cell) {
|
||||
return std::nullopt;
|
||||
}
|
||||
auto dv = cdef.type->deserialize_value(cell->as_atomic_cell(cdef).value());
|
||||
return tablet_replica_set_from_cell(dv);
|
||||
}
|
||||
|
||||
static void do_validate_tablet_metadata_change(const locator::tablet_metadata& tm, const schema& s, const mutation& m) {
|
||||
const auto table_id = to_tablet_metadata_key(s, m.key());
|
||||
const auto& mp = m.partition();
|
||||
|
||||
if (mp.partition_tombstone() || !mp.row_tombstones().empty() || !mp.static_row().empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto& r_cdef = *s.get_column_definition("replicas");
|
||||
auto& nr_cdef = *s.get_column_definition("new_replicas");
|
||||
|
||||
for (const auto& row : mp.clustered_rows()) {
|
||||
if (row.row().deleted_at()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto new_replicas = maybe_deserialize_replica_set(row, nr_cdef);
|
||||
if (!new_replicas) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto token = to_tablet_metadata_row_key(s, row.key());
|
||||
auto replicas = maybe_deserialize_replica_set(row, r_cdef);
|
||||
if (!replicas) {
|
||||
replicas = tm.get_tablet_map(table_id).get_tablet_info(token).replicas;
|
||||
}
|
||||
|
||||
std::unordered_set<tablet_replica> pending = substract_sets(*new_replicas, *replicas);
|
||||
if (pending.size() > 1) {
|
||||
throw std::runtime_error(fmt::format("Too many pending replicas for table {} last_token {}: {}",
|
||||
table_id, token, pending));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<locator::tablet_metadata_change_hint> get_tablet_metadata_change_hint(const std::vector<canonical_mutation>& mutations) {
|
||||
tablet_logger.trace("tablet_metadata_change_hint({})", mutations.size());
|
||||
auto s = db::system_keyspace::tablets();
|
||||
@@ -266,6 +310,18 @@ std::optional<locator::tablet_metadata_change_hint> get_tablet_metadata_change_h
|
||||
return hint;
|
||||
}
|
||||
|
||||
void validate_tablet_metadata_change(const locator::tablet_metadata& tm, const std::vector<canonical_mutation>& mutations) {
|
||||
auto s = db::system_keyspace::tablets();
|
||||
|
||||
for (const auto& cm : mutations) {
|
||||
if (cm.column_family_id() != s->id()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
do_validate_tablet_metadata_change(tm, *s, cm.to_mutation(s));
|
||||
}
|
||||
}
|
||||
|
||||
void update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hint, const mutation& m) {
|
||||
auto s = db::system_keyspace::tablets();
|
||||
if (m.column_family_id() != s->id()) {
|
||||
|
||||
@@ -92,4 +92,7 @@ future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<d
|
||||
/// Reads tablet transition stage (if any)
|
||||
future<std::optional<locator::tablet_transition_stage>> read_tablet_transition_stage(cql3::query_processor& qp, table_id tid, dht::token last_token);
|
||||
|
||||
/// Validates changes to system.tablets represented by mutations
|
||||
void validate_tablet_metadata_change(const locator::tablet_metadata& tm, const std::vector<canonical_mutation>& mutations);
|
||||
|
||||
} // namespace replica
|
||||
|
||||
@@ -5338,7 +5338,11 @@ class scylla_compaction_tasks(gdb.Command):
|
||||
pass
|
||||
task_hist = histogram(print_indicators=False)
|
||||
|
||||
task_list = list(std_list(cm['_tasks']))
|
||||
try:
|
||||
task_list = list(intrusive_list(cm['_tasks']))
|
||||
except gdb.error: # 6.2 compatibility
|
||||
task_list = list(std_list(cm['_tasks']))
|
||||
|
||||
for task in task_list:
|
||||
try:
|
||||
task = seastar_shared_ptr(task).get().dereference()
|
||||
|
||||
@@ -102,16 +102,27 @@ static const auto raft_manual_recovery_doc = "https://docs.scylladb.com/master/a
|
||||
|
||||
class group0_rpc: public service::raft_rpc {
|
||||
direct_failure_detector::failure_detector& _direct_fd;
|
||||
gms::gossiper& _gossiper;
|
||||
public:
|
||||
explicit group0_rpc(direct_failure_detector::failure_detector& direct_fd,
|
||||
raft_state_machine& sm, netw::messaging_service& ms,
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id)
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id, gms::gossiper& gossiper)
|
||||
: raft_rpc(sm, ms, address_map, std::move(raft_fd), gid, srv_id)
|
||||
, _direct_fd(direct_fd)
|
||||
, _direct_fd(direct_fd), _gossiper(gossiper)
|
||||
{}
|
||||
|
||||
virtual void on_configuration_change(raft::server_address_set add, raft::server_address_set del) override {
|
||||
for (const auto& addr: add) {
|
||||
auto ip_for_id = _address_map.find(addr.id);
|
||||
if (!ip_for_id) {
|
||||
// Make sure that the addresses of new nodes in the configuration are in the address map
|
||||
auto ips = _gossiper.get_nodes_with_host_id(locator::host_id(addr.id.uuid()));
|
||||
for (auto ip : ips) {
|
||||
if (_gossiper.is_normal(ip)) {
|
||||
_address_map.add_or_update_entry(addr.id, ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Entries explicitly managed via `rpc::on_configuration_change() should NOT be
|
||||
// expirable.
|
||||
_address_map.set_nonexpiring(addr.id);
|
||||
@@ -204,7 +215,7 @@ const raft::server_id& raft_group0::load_my_id() {
|
||||
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
|
||||
service::migration_manager& mm, bool topology_change_enabled) {
|
||||
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), _feat, topology_change_enabled);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id, _gossiper);
|
||||
// Keep a reference to a specific RPC class.
|
||||
auto& rpc_ref = *rpc;
|
||||
auto storage = std::make_unique<raft_sys_table_storage>(qp, gid, my_id);
|
||||
@@ -382,9 +393,11 @@ future<> raft_group0::abort() {
|
||||
co_await smp::invoke_on_all([this]() {
|
||||
return uninit_rpc_verbs(_ms.local());
|
||||
});
|
||||
co_await _shutdown_gate.close();
|
||||
|
||||
_leadership_monitor_as.request_abort();
|
||||
|
||||
co_await _shutdown_gate.close();
|
||||
|
||||
co_await std::move(_leadership_monitor);
|
||||
|
||||
co_await stop_group0();
|
||||
@@ -429,6 +442,7 @@ future<> raft_group0::leadership_monitor_fiber() {
|
||||
}
|
||||
});
|
||||
|
||||
auto holder = hold_group0_gate();
|
||||
while (true) {
|
||||
while (!group0_server().is_leader()) {
|
||||
co_await group0_server().wait_for_state_change(&_leadership_monitor_as);
|
||||
|
||||
@@ -291,6 +291,11 @@ public:
|
||||
return _raft_gr.group0_with_timeouts();
|
||||
}
|
||||
|
||||
// Hold shutdown gate to be waited during shutdown
|
||||
gate::holder hold_group0_gate() {
|
||||
return _shutdown_gate.hold();
|
||||
}
|
||||
|
||||
// Returns true after the group 0 server has been started.
|
||||
bool joined_group0() const;
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "replica/database.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/to_string.hh"
|
||||
#include "replica/tablets.hh"
|
||||
|
||||
|
||||
namespace service {
|
||||
@@ -298,9 +299,14 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source&
|
||||
}
|
||||
}
|
||||
|
||||
void raft_group0_client::validate_change(const topology_change& change) {
|
||||
replica::validate_tablet_metadata_change(_token_metadata.get()->tablets(), change.mutations);
|
||||
}
|
||||
|
||||
template<typename Command>
|
||||
requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change> || std::same_as<Command, write_mutations> || std::same_as<Command, mixed_change>
|
||||
group0_command raft_group0_client::prepare_command(Command change, group0_guard& guard, std::string_view description) {
|
||||
validate_change(change);
|
||||
group0_command group0_cmd {
|
||||
.change{std::move(change)},
|
||||
.history_append{db::system_keyspace::make_group0_history_state_id_mutation(
|
||||
@@ -321,6 +327,7 @@ group0_command raft_group0_client::prepare_command(Command change, group0_guard&
|
||||
template<typename Command>
|
||||
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>
|
||||
group0_command raft_group0_client::prepare_command(Command change, std::string_view description) {
|
||||
validate_change(change);
|
||||
const auto new_group0_state_id = generate_group0_state_id(utils::UUID{});
|
||||
|
||||
group0_command group0_cmd {
|
||||
@@ -338,8 +345,8 @@ group0_command raft_group0_client::prepare_command(Command change, std::string_v
|
||||
return group0_cmd;
|
||||
}
|
||||
|
||||
raft_group0_client::raft_group0_client(service::raft_group_registry& raft_gr, db::system_keyspace& sys_ks, maintenance_mode_enabled maintenance_mode)
|
||||
: _raft_gr(raft_gr), _sys_ks(sys_ks), _maintenance_mode(maintenance_mode) {
|
||||
raft_group0_client::raft_group0_client(service::raft_group_registry& raft_gr, db::system_keyspace& sys_ks, locator::shared_token_metadata& tm, maintenance_mode_enabled maintenance_mode)
|
||||
: _raft_gr(raft_gr), _sys_ks(sys_ks), _token_metadata(tm), _maintenance_mode(maintenance_mode) {
|
||||
}
|
||||
|
||||
size_t raft_group0_client::max_command_size() const {
|
||||
|
||||
@@ -28,6 +28,10 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/maintenance_mode.hh"
|
||||
|
||||
namespace locator {
|
||||
class shared_token_metadata;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
// Obtaining this object means that all previously finished operations on group 0 are visible on this node.
|
||||
|
||||
@@ -72,6 +76,7 @@ public:
|
||||
class raft_group0_client {
|
||||
service::raft_group_registry& _raft_gr;
|
||||
db::system_keyspace& _sys_ks;
|
||||
locator::shared_token_metadata& _token_metadata;
|
||||
|
||||
// See `group0_guard::impl` for explanation of the purpose of these locks.
|
||||
semaphore _read_apply_mutex = semaphore(1);
|
||||
@@ -103,8 +108,12 @@ class raft_group0_client {
|
||||
service::broadcast_tables::query_result get();
|
||||
};
|
||||
|
||||
template <typename Command>
|
||||
void validate_change(const Command& change) {}
|
||||
void validate_change(const topology_change& change);
|
||||
|
||||
public:
|
||||
raft_group0_client(service::raft_group_registry&, db::system_keyspace&, maintenance_mode_enabled);
|
||||
raft_group0_client(service::raft_group_registry&, db::system_keyspace&, locator::shared_token_metadata&, maintenance_mode_enabled);
|
||||
|
||||
// Call after `system_keyspace` is initialized.
|
||||
future<> init();
|
||||
|
||||
@@ -77,7 +77,7 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
|
||||
}
|
||||
return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...)
|
||||
.handle_exception_type([loc= std::move(loc), id] (const seastar::rpc::closed_error& e) {;
|
||||
const auto msg = fmt::format("Failed to execute {} on leader {}: {}", loc.function_name(), id, e);
|
||||
const auto msg = fmt::format("Failed to execute {}, destination {}: {}", loc.function_name(), id, e);
|
||||
rlogger.trace("{}", msg);
|
||||
return make_exception_future<Ret>(raft::transport_error(msg));
|
||||
});
|
||||
|
||||
@@ -98,6 +98,7 @@
|
||||
#include "replica/exceptions.hh"
|
||||
#include "db/operation_type.hh"
|
||||
#include "locator/util.hh"
|
||||
#include "tools/build_info.hh"
|
||||
|
||||
namespace bi = boost::intrusive;
|
||||
|
||||
@@ -110,6 +111,25 @@ static logging::logger slogger("storage_proxy");
|
||||
static logging::logger qlogger("query_result");
|
||||
static logging::logger mlogger("mutation_data");
|
||||
|
||||
namespace {
|
||||
|
||||
// Check the effective replication map consistency:
|
||||
// we have an inconsistent effective replication map in case we the number of
|
||||
// read replicas is higher than the replication factor.
|
||||
void validate_read_replicas(const locator::effective_replication_map& erm, const inet_address_vector_replica_set& read_replicas) {
|
||||
// Skip for non-debug builds.
|
||||
if constexpr (!tools::build_info::is_debug_build()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const sstring error = erm.get_replication_strategy().sanity_check_read_replicas(erm, read_replicas);
|
||||
if (!error.empty()) {
|
||||
on_internal_error(slogger, error);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace storage_proxy_stats {
|
||||
static const sstring COORDINATOR_STATS_CATEGORY("storage_proxy_coordinator");
|
||||
static const sstring REPLICA_STATS_CATEGORY("storage_proxy_replica");
|
||||
@@ -1726,11 +1746,17 @@ public:
|
||||
// handler is being removed from the b::list, so if any live iterator points at it,
|
||||
// move it to the next object (this requires that the list is traversed in the forward
|
||||
// direction).
|
||||
bool drop_end = false;
|
||||
for (auto& itp : _live_iterators) {
|
||||
if (&**itp == handler) {
|
||||
++*itp;
|
||||
drop_end |= (*itp == end());
|
||||
}
|
||||
}
|
||||
if (drop_end) {
|
||||
const auto [first, last] = std::ranges::remove_if(_live_iterators, [this] (iterator* pit) { return *pit == end(); });
|
||||
_live_iterators.erase(first, last);
|
||||
}
|
||||
}
|
||||
class iterator_guard {
|
||||
cancellable_write_handlers_list& _handlers;
|
||||
@@ -5497,6 +5523,12 @@ class always_speculating_read_executor : public abstract_read_executor {
|
||||
public:
|
||||
using abstract_read_executor::abstract_read_executor;
|
||||
virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) {
|
||||
if (_targets.size() < 2) {
|
||||
on_internal_error(slogger,
|
||||
seastar::format("always_speculating_read_executor: received {} replica(s)"
|
||||
", required at least 2 replicas",
|
||||
_targets.size()));
|
||||
}
|
||||
resolver->add_wait_targets(_targets.size());
|
||||
// FIXME: consider disabling for CL=*ONE
|
||||
bool want_digest = true;
|
||||
@@ -5511,6 +5543,12 @@ class speculating_read_executor : public abstract_read_executor {
|
||||
public:
|
||||
using abstract_read_executor::abstract_read_executor;
|
||||
virtual void make_requests(digest_resolver_ptr resolver, storage_proxy::clock_type::time_point timeout) override {
|
||||
if (_targets.size() < 2) {
|
||||
on_internal_error(slogger,
|
||||
seastar::format("speculating_read_executor: received {} replica(s)"
|
||||
", required at least 2 replicas",
|
||||
_targets.size()));
|
||||
}
|
||||
_speculate_timer.set_callback([this, resolver, timeout] {
|
||||
if (!resolver->is_completed()) { // at the time the callback runs request may be completed already
|
||||
resolver->add_wait_targets(1); // we send one more request so wait for it too
|
||||
@@ -5564,14 +5602,14 @@ public:
|
||||
result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw_shared_ptr<query::read_command> cmd,
|
||||
locator::effective_replication_map_ptr erm,
|
||||
schema_ptr schema,
|
||||
dht::partition_range pr,
|
||||
dht::partition_range partition_range,
|
||||
db::consistency_level cl,
|
||||
db::read_repair_decision repair_decision,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
const inet_address_vector_replica_set& preferred_endpoints,
|
||||
bool& is_read_non_local,
|
||||
service_permit permit) {
|
||||
const dht::token& token = pr.start()->value().token();
|
||||
const dht::token& token = partition_range.start()->value().token();
|
||||
speculative_retry::type retry_type = schema->speculative_retry().get_type();
|
||||
std::optional<gms::inet_address> extra_replica;
|
||||
|
||||
@@ -5603,8 +5641,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
get_stats().read_repair_attempts++;
|
||||
}
|
||||
|
||||
size_t block_for = db::block_for(*erm, cl);
|
||||
auto p = shared_from_this();
|
||||
const size_t block_for = db::block_for(*erm, cl);
|
||||
|
||||
db::per_partition_rate_limit::info rate_limit_info;
|
||||
if (cmd->allow_limit && _db.local().can_apply_per_partition_rate_limit(*schema, db::operation_type::read)) {
|
||||
@@ -5623,7 +5660,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size()
|
||||
|| (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) {
|
||||
tracing::trace(trace_state, "Creating never_speculating_read_executor - speculative retry is disabled or there are no extra replicas to speculate with");
|
||||
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
return ::make_shared<never_speculating_read_executor>(schema, cf, shared_from_this(), std::move(erm), cmd, std::move(partition_range), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
}
|
||||
|
||||
if (target_replicas.size() == all_replicas.size()) {
|
||||
@@ -5631,7 +5668,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
// We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
|
||||
// (same amount of requests in total, but we turn 1 digest request into a full blown data request).
|
||||
tracing::trace(trace_state, "always_speculating_read_executor (all targets)");
|
||||
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
return ::make_shared<always_speculating_read_executor>(schema, cf, shared_from_this(), std::move(erm), cmd, std::move(partition_range), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
}
|
||||
|
||||
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
|
||||
@@ -5640,7 +5677,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) {
|
||||
slogger.trace("read executor no extra target to speculate");
|
||||
tracing::trace(trace_state, "Creating never_speculating_read_executor - there are no extra replicas to speculate with");
|
||||
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
return ::make_shared<never_speculating_read_executor>(schema, cf, shared_from_this(), std::move(erm), cmd, std::move(partition_range), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
} else {
|
||||
target_replicas.push_back(*extra_replica);
|
||||
slogger.trace("creating read executor with extra target {}", *extra_replica);
|
||||
@@ -5650,10 +5687,10 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
|
||||
if (retry_type == speculative_retry::type::ALWAYS) {
|
||||
tracing::trace(trace_state, "Creating always_speculating_read_executor");
|
||||
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
return ::make_shared<always_speculating_read_executor>(schema, cf, shared_from_this(), std::move(erm), cmd, std::move(partition_range), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
} else {// PERCENTILE or CUSTOM.
|
||||
tracing::trace(trace_state, "Creating speculating_read_executor");
|
||||
return ::make_shared<speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
return ::make_shared<speculating_read_executor>(schema, cf, shared_from_this(), std::move(erm), cmd, std::move(partition_range), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6509,6 +6546,7 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, i
|
||||
|
||||
inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const {
|
||||
auto endpoints = erm.get_endpoints_for_reading(token);
|
||||
validate_read_replicas(erm, endpoints);
|
||||
auto it = boost::range::remove_if(endpoints, std::not_fn(std::bind_front(&storage_proxy::is_alive, this)));
|
||||
endpoints.erase(it, endpoints.end());
|
||||
sort_endpoints_by_proximity(erm.get_topology(), endpoints);
|
||||
@@ -6780,7 +6818,7 @@ void storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstra
|
||||
it->timeout_cb();
|
||||
}
|
||||
++it;
|
||||
if (need_preempt()) {
|
||||
if (need_preempt() && it != _cancellable_write_handlers_list->end()) {
|
||||
cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it};
|
||||
seastar::thread::yield();
|
||||
}
|
||||
|
||||
@@ -126,6 +126,26 @@ session_manager& get_topology_session_manager() {
|
||||
return topology_session_manager;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
[[nodiscard]] locator::host_id_or_endpoint_list string_list_to_endpoint_list(const std::vector<sstring>& src_node_strings) {
|
||||
locator::host_id_or_endpoint_list resulting_node_list;
|
||||
resulting_node_list.reserve(src_node_strings.size());
|
||||
for (const sstring& n : src_node_strings) {
|
||||
try {
|
||||
resulting_node_list.emplace_back(n);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(::format("Failed to parse node list: {}: invalid node={}: {}", src_node_strings, n, std::current_exception()));
|
||||
}
|
||||
}
|
||||
return resulting_node_list;
|
||||
}
|
||||
|
||||
[[nodiscard]] locator::host_id_or_endpoint_list parse_node_list(const std::string_view comma_separated_list) {
|
||||
return string_list_to_endpoint_list(utils::split_comma_separated_list(comma_separated_list));
|
||||
}
|
||||
} // namespace
|
||||
|
||||
static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30};
|
||||
|
||||
storage_service::storage_service(abort_source& abort_source,
|
||||
@@ -572,11 +592,11 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
on_fatal_internal_error(rtlogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id));
|
||||
}
|
||||
SCYLLA_ASSERT(existing_ip);
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
|
||||
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
|
||||
if (rs.ring.has_value()) {
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
|
||||
update_topology(host_id, ip, rs);
|
||||
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
|
||||
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
|
||||
} else {
|
||||
// After adding replacing endpoint above the node will no longer be reported for reads and writes,
|
||||
@@ -1032,7 +1052,7 @@ public:
|
||||
|
||||
// }}} raft_ip_address_updater
|
||||
|
||||
future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
|
||||
future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
|
||||
while (!_group0_as.abort_requested()) {
|
||||
bool err = false;
|
||||
try {
|
||||
@@ -1134,7 +1154,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
std::optional<abort_source> as;
|
||||
|
||||
try {
|
||||
@@ -1175,7 +1195,7 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<d
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(const std::list<locator::host_id_or_endpoint>& hoeps) {
|
||||
std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const {
|
||||
std::unordered_set<raft::server_id> ids;
|
||||
for (const auto& hoep : hoeps) {
|
||||
std::optional<raft::server_id> id;
|
||||
@@ -1196,16 +1216,8 @@ std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(
|
||||
}
|
||||
|
||||
std::unordered_set<raft::server_id> storage_service::ignored_nodes_from_join_params(const join_node_request_params& params) {
|
||||
std::unordered_set<raft::server_id> ignored_nodes;
|
||||
|
||||
if (!params.ignore_nodes.empty()) {
|
||||
std::list<locator::host_id_or_endpoint> ignore_nodes_params;
|
||||
for (const auto& n : params.ignore_nodes) {
|
||||
ignore_nodes_params.emplace_back(n);
|
||||
}
|
||||
|
||||
ignored_nodes = find_raft_nodes_from_hoeps(ignore_nodes_params);
|
||||
}
|
||||
const locator::host_id_or_endpoint_list ignore_nodes_params = string_list_to_endpoint_list(params.ignore_nodes);
|
||||
std::unordered_set<raft::server_id> ignored_nodes{find_raft_nodes_from_hoeps(ignore_nodes_params)};
|
||||
|
||||
if (params.replaced_id) {
|
||||
// insert node that should be replaced to ignore list so that other topology operations
|
||||
@@ -1805,6 +1817,11 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
|
||||
if (raft_replace_info) {
|
||||
join_params.replaced_id = raft_replace_info->raft_id;
|
||||
join_params.ignore_nodes = utils::split_comma_separated_list(_db.local().get_config().ignore_dead_nodes_for_replace());
|
||||
if (!locator::check_host_ids_contain_only_uuid(join_params.ignore_nodes)) {
|
||||
slogger.warn("Warning: Using IP addresses for '--ignore-dead-nodes-for-replace' is deprecated and will"
|
||||
" be disabled in a future release. Please use host IDs instead. Provided values: {}",
|
||||
_db.local().get_config().ignore_dead_nodes_for_replace());
|
||||
}
|
||||
}
|
||||
|
||||
// if the node is bootstrapped the function will do nothing since we already created group0 in main.cc
|
||||
@@ -1867,9 +1884,9 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
|
||||
co_await raft_initialize_discovery_leader(join_params);
|
||||
|
||||
// start topology coordinator fiber
|
||||
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks);
|
||||
_raft_state_monitor = raft_state_monitor_fiber(*raft_server, _group0->hold_group0_gate(), sys_dist_ks);
|
||||
// start cleanup fiber
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, proxy);
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, _group0->hold_group0_gate(), proxy);
|
||||
|
||||
// Need to start system_distributed_keyspace before bootstrap because bootstrapping
|
||||
// process may access those tables.
|
||||
@@ -2150,7 +2167,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
||||
// Start the topology coordinator monitor fiber. If we are the leader, this will start
|
||||
// the topology coordinator which is responsible for driving the upgrade process.
|
||||
try {
|
||||
_raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), sys_dist_ks);
|
||||
_raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate(), sys_dist_ks);
|
||||
} catch (...) {
|
||||
// The calls above can theoretically fail due to coroutine frame allocation failure.
|
||||
// Abort in this case as the node should be in a pretty bad shape anyway.
|
||||
@@ -2176,7 +2193,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
||||
}
|
||||
|
||||
try {
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), proxy);
|
||||
_sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy);
|
||||
start_tablet_split_monitor();
|
||||
} catch (...) {
|
||||
rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception());
|
||||
@@ -2184,19 +2201,6 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
|
||||
}
|
||||
}
|
||||
|
||||
std::list<locator::host_id_or_endpoint> storage_service::parse_node_list(sstring comma_separated_list) {
|
||||
std::vector<sstring> ignore_nodes_strs = utils::split_comma_separated_list(std::move(comma_separated_list));
|
||||
std::list<locator::host_id_or_endpoint> ignore_nodes;
|
||||
for (const sstring& n : ignore_nodes_strs) {
|
||||
try {
|
||||
ignore_nodes.push_back(locator::host_id_or_endpoint(n));
|
||||
} catch (...) {
|
||||
throw std::runtime_error(::format("Failed to parse node list: {}: invalid node={}: {}", ignore_nodes_strs, n, std::current_exception()));
|
||||
}
|
||||
}
|
||||
return ignore_nodes;
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info) {
|
||||
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &replacement_info] {
|
||||
@@ -2713,7 +2717,9 @@ future<> storage_service::on_change(gms::inet_address endpoint, const gms::appli
|
||||
if (node && node->is_member() && node->endpoint() == endpoint) {
|
||||
if (!is_me(endpoint)) {
|
||||
slogger.debug("endpoint={}/{} on_change: updating system.peers table", endpoint, host_id);
|
||||
co_await _sys_ks.local().update_peer_info(endpoint, host_id, get_peer_info_for_update(endpoint, states));
|
||||
if (auto info = get_peer_info_for_update(endpoint, states)) {
|
||||
co_await _sys_ks.local().update_peer_info(endpoint, host_id, *info);
|
||||
}
|
||||
}
|
||||
if (states.contains(application_state::RPC_READY)) {
|
||||
slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready());
|
||||
@@ -2780,11 +2786,23 @@ db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_ad
|
||||
if (!ep_state) {
|
||||
return db::system_keyspace::peer_info{};
|
||||
}
|
||||
return get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
|
||||
auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
|
||||
if (!info) {
|
||||
on_internal_error_noexcept(slogger, seastar::format("get_peer_info_for_update({}): application state has no peer info: {}", endpoint, ep_state->get_application_state_map()));
|
||||
return db::system_keyspace::peer_info{};
|
||||
}
|
||||
return *info;
|
||||
}
|
||||
|
||||
db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map) {
|
||||
db::system_keyspace::peer_info ret;
|
||||
std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map) {
|
||||
std::optional<db::system_keyspace::peer_info> ret;
|
||||
|
||||
auto get_peer_info = [&] () -> db::system_keyspace::peer_info& {
|
||||
if (!ret) {
|
||||
ret.emplace();
|
||||
}
|
||||
return *ret;
|
||||
};
|
||||
|
||||
auto set_field = [&]<typename T> (std::optional<T>& field,
|
||||
const gms::versioned_value& value,
|
||||
@@ -2805,28 +2823,28 @@ db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_ad
|
||||
for (const auto& [state, value] : app_state_map) {
|
||||
switch (state) {
|
||||
case application_state::DC:
|
||||
set_field(ret.data_center, value, "data_center", true);
|
||||
set_field(get_peer_info().data_center, value, "data_center", true);
|
||||
break;
|
||||
case application_state::INTERNAL_IP:
|
||||
set_field(ret.preferred_ip, value, "preferred_ip", false);
|
||||
set_field(get_peer_info().preferred_ip, value, "preferred_ip", false);
|
||||
break;
|
||||
case application_state::RACK:
|
||||
set_field(ret.rack, value, "rack", true);
|
||||
set_field(get_peer_info().rack, value, "rack", true);
|
||||
break;
|
||||
case application_state::RELEASE_VERSION:
|
||||
set_field(ret.release_version, value, "release_version", true);
|
||||
set_field(get_peer_info().release_version, value, "release_version", true);
|
||||
break;
|
||||
case application_state::RPC_ADDRESS:
|
||||
set_field(ret.rpc_address, value, "rpc_address", false);
|
||||
set_field(get_peer_info().rpc_address, value, "rpc_address", false);
|
||||
break;
|
||||
case application_state::SCHEMA:
|
||||
set_field(ret.schema_version, value, "schema_version", false);
|
||||
set_field(get_peer_info().schema_version, value, "schema_version", false);
|
||||
break;
|
||||
case application_state::TOKENS:
|
||||
// tokens are updated separately
|
||||
break;
|
||||
case application_state::SUPPORTED_FEATURES:
|
||||
set_field(ret.supported_features, value, "supported_features", true);
|
||||
set_field(get_peer_info().supported_features, value, "supported_features", true);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@@ -3231,9 +3249,11 @@ future<> storage_service::stop() {
|
||||
}
|
||||
|
||||
future<> storage_service::wait_for_group0_stop() {
|
||||
_group0_as.request_abort();
|
||||
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
|
||||
co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_cleanup_fiber), std::move(_upgrade_to_topology_coordinator_fiber));
|
||||
if (!_group0_as.abort_requested()) {
|
||||
_group0_as.request_abort();
|
||||
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
|
||||
co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_cleanup_fiber), std::move(_upgrade_to_topology_coordinator_fiber));
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
|
||||
@@ -3373,12 +3393,14 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
|
||||
.address = replace_address,
|
||||
};
|
||||
|
||||
bool node_ip_specified = false;
|
||||
for (auto& hoep : parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace())) {
|
||||
locator::host_id host_id;
|
||||
gms::loaded_endpoint_state st;
|
||||
// Resolve both host_id and endpoint
|
||||
if (hoep.has_endpoint()) {
|
||||
st.endpoint = hoep.endpoint();
|
||||
node_ip_specified = true;
|
||||
} else {
|
||||
host_id = hoep.id();
|
||||
auto res = _gossiper.get_nodes_with_host_id(host_id);
|
||||
@@ -3404,6 +3426,12 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
|
||||
ri.ignore_nodes.emplace(host_id, std::move(st));
|
||||
}
|
||||
|
||||
if (node_ip_specified) {
|
||||
slogger.warn("Warning: Using IP addresses for '--ignore-dead-nodes-for-replace' is deprecated and will"
|
||||
" be disabled in the next release. Please use host IDs instead. Provided values: {}",
|
||||
_db.local().get_config().ignore_dead_nodes_for_replace());
|
||||
}
|
||||
|
||||
slogger.info("Host {}/{} is replacing {}/{} ignore_nodes={}", get_token_metadata().get_my_id(), get_broadcast_address(), replace_host_id, replace_address,
|
||||
fmt::join(ri.ignore_nodes | boost::adaptors::transformed ([] (const auto& x) {
|
||||
return fmt::format("{}/{}", x.first, x.second.endpoint);
|
||||
@@ -3647,6 +3675,7 @@ static size_t count_normal_token_owners(const topology& topology) {
|
||||
|
||||
future<> storage_service::raft_decommission() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
@@ -3993,7 +4022,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::raft_removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
|
||||
future<> storage_service::raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
|
||||
auto id = raft::server_id{host_id.uuid()};
|
||||
utils::UUID request_id;
|
||||
|
||||
@@ -4062,6 +4091,11 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
|
||||
|
||||
request_id = guard.new_group0_state_id();
|
||||
|
||||
if (auto itr = _topology_state_machine._topology.requests.find(id);
|
||||
itr != _topology_state_machine._topology.requests.end() && itr->second == topology_request::remove) {
|
||||
throw std::runtime_error("Removenode failed. Concurrent request for removal already in progress");
|
||||
}
|
||||
try {
|
||||
// Make non voter during request submission for better HA
|
||||
co_await _group0->make_nonvoters(ignored_ids, _group0_as, raft_timeout{});
|
||||
@@ -4094,7 +4128,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
|
||||
future<> storage_service::removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
|
||||
return run_with_api_lock_conditionally(sstring("removenode"), !raft_topology_change_enabled(), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
|
||||
ss.check_ability_to_perform_topology_operation("removenode");
|
||||
@@ -4643,6 +4677,7 @@ future<> storage_service::do_drain() {
|
||||
|
||||
future<> storage_service::do_cluster_cleanup() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
@@ -4713,6 +4748,7 @@ future<> storage_service::wait_for_topology_not_busy() {
|
||||
|
||||
future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
@@ -4825,6 +4861,12 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
future<> storage_service::rebuild(utils::optional_param source_dc) {
|
||||
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) -> future<> {
|
||||
ss.check_ability_to_perform_topology_operation("rebuild");
|
||||
if (auto tablets_keyspaces = ss._db.local().get_tablets_keyspaces(); !tablets_keyspaces.empty()) {
|
||||
std::ranges::sort(tablets_keyspaces);
|
||||
slogger.warn("Rebuild is not supported for the following tablets-enabled keyspaces: {}: "
|
||||
"Rebuild is not required for tablets-enabled keyspace after increasing replication factor. "
|
||||
"However, recovering from local data loss on this node requires running repair on all nodes in the datacenter", tablets_keyspaces);
|
||||
}
|
||||
if (ss.raft_topology_change_enabled()) {
|
||||
co_await ss.raft_rebuild(source_dc);
|
||||
} else {
|
||||
@@ -5375,9 +5417,9 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep
|
||||
};
|
||||
|
||||
exponential_backoff_retry split_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
|
||||
bool sleep = false;
|
||||
|
||||
while (!_async_gate.is_closed() && !_group0_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
// Ensures that latest changes to tablet metadata, in group0, are visible
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
@@ -5395,11 +5437,16 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep
|
||||
release_guard(std::move(guard));
|
||||
co_await split_all_compaction_groups();
|
||||
}
|
||||
} catch (const seastar::abort_requested_exception& ex) {
|
||||
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
|
||||
break;
|
||||
} catch (raft::request_aborted& ex) {
|
||||
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
|
||||
break;
|
||||
} catch (...) {
|
||||
slogger.error("Failed to complete splitting of table {} due to {}, retrying after {} seconds",
|
||||
table, std::current_exception(), split_retry.sleep_time());
|
||||
sleep = true;
|
||||
break;
|
||||
}
|
||||
if (sleep) {
|
||||
co_await split_retry.retry(_group0_as);
|
||||
@@ -5467,12 +5514,15 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
|
||||
try {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto group0_holder = _group0->hold_group0_gate();
|
||||
// do barrier to make sure we always see the latest topology
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
if (raft_server.get_current_term() != term) {
|
||||
// Return an error since the command is from outdated leader
|
||||
co_return result;
|
||||
}
|
||||
auto id = raft_server.id();
|
||||
group0_holder.release();
|
||||
|
||||
{
|
||||
auto& state = _raft_topology_cmd_handler_state;
|
||||
@@ -5584,7 +5634,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
break;
|
||||
case raft_topology_cmd::command::stream_ranges: {
|
||||
co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> {
|
||||
const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second;
|
||||
const auto rs = _topology_state_machine._topology.find(id)->second;
|
||||
auto tstate = _topology_state_machine._topology.tstate;
|
||||
if (!rs.ring || rs.ring->tokens.empty()) {
|
||||
rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state);
|
||||
@@ -5633,11 +5683,11 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
utils::get_local_injector().inject("stop_after_streaming",
|
||||
[] { std::raise(SIGSTOP); });
|
||||
} else {
|
||||
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id;
|
||||
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
|
||||
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
|
||||
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &raft_server, replaced_id] () -> future<> {
|
||||
if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) {
|
||||
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id()));
|
||||
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &id, replaced_id] () -> future<> {
|
||||
if (!_topology_state_machine._topology.req_param.contains(id)) {
|
||||
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id));
|
||||
}
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
|
||||
auto ignored_nodes = boost::copy_range<std::unordered_set<locator::host_id>>(_topology_state_machine._topology.ignored_nodes | boost::adaptors::transformed([] (const auto& id) {
|
||||
@@ -5721,7 +5771,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
break;
|
||||
case node_state::rebuilding: {
|
||||
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc;
|
||||
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[id]).source_dc;
|
||||
rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
|
||||
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
|
||||
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
|
||||
@@ -5768,7 +5818,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
case node_state::none:
|
||||
case node_state::removing:
|
||||
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
|
||||
raft_server.id(), rs.state));
|
||||
id, rs.state));
|
||||
break;
|
||||
}
|
||||
}));
|
||||
@@ -6484,6 +6534,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
});
|
||||
|
||||
auto& g0_server = _group0->group0_server();
|
||||
auto g0_holder = _group0->hold_group0_gate();
|
||||
if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) {
|
||||
// There is a peculiar case that can happen if the leader is killed
|
||||
// and then replaced very quickly:
|
||||
@@ -6673,6 +6724,10 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
|
||||
co_return join_node_response_result{};
|
||||
}
|
||||
|
||||
if (utils::get_local_injector().enter("join_node_response_drop_expiring")) {
|
||||
_group0->modifiable_address_map().force_drop_expiring_entries();
|
||||
}
|
||||
|
||||
try {
|
||||
co_return co_await std::visit(overloaded_functor {
|
||||
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {
|
||||
@@ -7320,8 +7375,8 @@ bool storage_service::is_repair_based_node_ops_enabled(streaming::stream_reason
|
||||
{"removenode", streaming::stream_reason::removenode},
|
||||
{"rebuild", streaming::stream_reason::rebuild},
|
||||
};
|
||||
auto enabled_list_str = _db.local().get_config().allowed_repair_based_node_ops();
|
||||
std::vector<sstring> enabled_list = utils::split_comma_separated_list(std::move(enabled_list_str));
|
||||
const sstring& enabled_list_str = _db.local().get_config().allowed_repair_based_node_ops();
|
||||
std::vector<sstring> enabled_list = utils::split_comma_separated_list(enabled_list_str);
|
||||
std::unordered_set<streaming::stream_reason> enabled_set;
|
||||
for (const sstring& op : enabled_list) {
|
||||
try {
|
||||
|
||||
@@ -373,8 +373,6 @@ private:
|
||||
|
||||
public:
|
||||
|
||||
static std::list<locator::host_id_or_endpoint> parse_node_list(sstring comma_separated_list);
|
||||
|
||||
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
|
||||
|
||||
@@ -528,7 +526,8 @@ public:
|
||||
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
|
||||
private:
|
||||
db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint);
|
||||
db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map);
|
||||
// return an engaged value iff app_state_map has changes to the peer info
|
||||
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map);
|
||||
|
||||
std::unordered_set<token> get_tokens_for(inet_address endpoint);
|
||||
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(const gms::endpoint_state& ep_state);
|
||||
@@ -699,7 +698,7 @@ public:
|
||||
*
|
||||
* @param hostIdString token for the node
|
||||
*/
|
||||
future<> removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes);
|
||||
future<> removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req);
|
||||
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
||||
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
@@ -835,7 +834,7 @@ private:
|
||||
future<> _raft_state_monitor = make_ready_future<>();
|
||||
// This fibers monitors raft state and start/stops the topology change
|
||||
// coordinator fiber
|
||||
future<> raft_state_monitor_fiber(raft::server&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
future<> raft_state_monitor_fiber(raft::server&, gate::holder, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
|
||||
public:
|
||||
bool topology_global_queue_empty() const {
|
||||
@@ -865,13 +864,13 @@ private:
|
||||
// as well as the system.peers table.
|
||||
shared_ptr<raft_ip_address_updater> _raft_ip_address_updater;
|
||||
|
||||
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const std::list<locator::host_id_or_endpoint>& hoeps);
|
||||
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const;
|
||||
|
||||
future<raft_topology_cmd_result> raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd);
|
||||
|
||||
future<> raft_initialize_discovery_leader(const join_node_request_params& params);
|
||||
future<> raft_decommission();
|
||||
future<> raft_removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params);
|
||||
future<> raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params);
|
||||
future<> raft_rebuild(utils::optional_param source_dc);
|
||||
future<> raft_check_and_repair_cdc_streams();
|
||||
future<> update_topology_with_local_metadata(raft::server&);
|
||||
@@ -976,7 +975,7 @@ private:
|
||||
semaphore _join_node_response_handler_mutex{1};
|
||||
|
||||
future<> _sstable_cleanup_fiber = make_ready_future<>();
|
||||
future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
|
||||
future<> sstable_cleanup_fiber(raft::server& raft, gate::holder, sharded<service::storage_proxy>& proxy) noexcept;
|
||||
|
||||
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
|
||||
abort_source _group0_as;
|
||||
|
||||
@@ -109,6 +109,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
std::chrono::milliseconds _ring_delay;
|
||||
|
||||
gate::holder _group0_holder;
|
||||
|
||||
using drop_guard_and_retake = bool_class<class retake_guard_tag>;
|
||||
|
||||
// Engaged if an ongoing topology change should be rolled back. The string inside
|
||||
@@ -767,86 +769,87 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
case global_topology_request::keyspace_rf_change: {
|
||||
rtlogger.info("keyspace_rf_change requested");
|
||||
while (true) {
|
||||
sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
|
||||
std::unordered_map<sstring, sstring> saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
|
||||
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
|
||||
sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
|
||||
std::unordered_map<sstring, sstring> saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
|
||||
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
|
||||
|
||||
auto repl_opts = new_ks_props.get_replication_options();
|
||||
repl_opts.erase(cql3::statements::ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
utils::UUID req_uuid = *_topo_sm._topology.global_request_id;
|
||||
std::vector<canonical_mutation> updates;
|
||||
sstring error;
|
||||
if (_db.has_keyspace(ks_name)) {
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
size_t unimportant_init_tablet_count = 2; // must be a power of 2
|
||||
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
|
||||
auto repl_opts = new_ks_props.get_replication_options();
|
||||
repl_opts.erase(cql3::statements::ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
utils::UUID req_uuid = *_topo_sm._topology.global_request_id;
|
||||
std::vector<canonical_mutation> updates;
|
||||
sstring error;
|
||||
if (_db.has_keyspace(ks_name)) {
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
size_t unimportant_init_tablet_count = 2; // must be a power of 2
|
||||
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
|
||||
|
||||
for (const auto& table : ks.metadata()->tables()) {
|
||||
try {
|
||||
locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table->id());
|
||||
locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params);
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table, tmptr, old_tablets);
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {},"
|
||||
"desired new ks opts: {}", error, new_ks_props.get_replication_options());
|
||||
updates.clear(); // remove all tablets mutations ...
|
||||
break; // ... and only create mutations deleting the global req
|
||||
}
|
||||
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table->id());
|
||||
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
|
||||
auto last_token = new_tablet_map.get_last_token(tablet_id);
|
||||
updates.emplace_back(co_await make_canonical_mutation_gently(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table->id())
|
||||
.set_new_replicas(last_token, tablet_info.replicas)
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::tablet_transition_kind::rebuild)
|
||||
.build()
|
||||
));
|
||||
co_await coroutine::maybe_yield();
|
||||
});
|
||||
auto tables_with_mvs = ks.metadata()->tables();
|
||||
auto views = ks.metadata()->views();
|
||||
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
|
||||
for (const auto& table_or_mv : tables_with_mvs) {
|
||||
try {
|
||||
locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table_or_mv->id());
|
||||
locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params);
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, old_tablets);
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {},"
|
||||
"desired new ks opts: {}", error, new_ks_props.get_replication_options());
|
||||
updates.clear(); // remove all tablets mutations ...
|
||||
break; // ... and only create mutations deleting the global req
|
||||
}
|
||||
} else {
|
||||
error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist";
|
||||
}
|
||||
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(topology::transition_state::tablet_migration)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_uuid)
|
||||
.done(error)
|
||||
.build()));
|
||||
if (error.empty()) {
|
||||
const sstring strategy_name = "NetworkTopologyStrategy";
|
||||
auto ks_md = keyspace_metadata::new_keyspace(ks_name, strategy_name, repl_opts,
|
||||
new_ks_props.get_initial_tablets(strategy_name, true).specified_count,
|
||||
new_ks_props.get_durable_writes(), new_ks_props.get_storage_options());
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
updates.emplace_back(m);
|
||||
}
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id());
|
||||
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
|
||||
auto last_token = new_tablet_map.get_last_token(tablet_id);
|
||||
updates.emplace_back(co_await make_canonical_mutation_gently(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id())
|
||||
.set_new_replicas(last_token, tablet_info.replicas)
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::tablet_transition_kind::rebuild)
|
||||
.build()
|
||||
));
|
||||
co_await coroutine::maybe_yield();
|
||||
});
|
||||
}
|
||||
} else {
|
||||
error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist";
|
||||
}
|
||||
|
||||
sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props);
|
||||
rtlogger.trace("do update {} reason {}", updates, reason);
|
||||
mixed_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("handle_global_request(): concurrent modification, retrying");
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(topology::transition_state::tablet_migration)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_uuid)
|
||||
.done(error)
|
||||
.build()));
|
||||
if (error.empty()) {
|
||||
const sstring strategy_name = "NetworkTopologyStrategy";
|
||||
auto ks_md = keyspace_metadata::new_keyspace(ks_name, strategy_name, repl_opts,
|
||||
new_ks_props.get_initial_tablets(std::nullopt),
|
||||
new_ks_props.get_durable_writes(), new_ks_props.get_storage_options());
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
updates.emplace_back(m);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props);
|
||||
rtlogger.trace("do update {} reason {}", updates, reason);
|
||||
mixed_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", [] (auto& handler) -> future<> {
|
||||
rtlogger.info("wait-before-committing-rf-change-event injection hit");
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30});
|
||||
rtlogger.info("wait-before-committing-rf-change-event injection done");
|
||||
});
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1577,7 +1580,30 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtlogger.info("entered `{}` transition state", *tstate);
|
||||
switch (*tstate) {
|
||||
case topology::transition_state::join_group0: {
|
||||
auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard)));
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
if (node.rs->state == node_state::replacing) {
|
||||
// Make sure all nodes are no longer trying to write to a node being replaced. This is important
|
||||
// if the new node have the same IP, so that old write will not go to the new node by mistake after this point.
|
||||
// It is important to do so before the call to finish_accepting_node() below since after this call the new node becomes
|
||||
// a full member of the cluster and it starts loading an initial snapshot. Since snapshot loading is not atomic any queries
|
||||
// that are done in parallel may see a partial state.
|
||||
try {
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
std::current_exception());
|
||||
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool accepted;
|
||||
std::tie(node, accepted) = co_await finish_accepting_node(std::move(node));
|
||||
|
||||
// If responding to the joining node failed, move the node to the left state and
|
||||
// stop the topology transition.
|
||||
@@ -1649,22 +1675,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
case node_state::replacing: {
|
||||
SCYLLA_ASSERT(!node.rs->ring);
|
||||
// Make sure all nodes are no longer trying to write to a node being replaced. This is important if the new node have the same IP, so that old write will not
|
||||
// go to the new node by mistake
|
||||
try {
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("transition_state::join_group0, "
|
||||
"global_token_metadata_barrier failed, error {}",
|
||||
std::current_exception());
|
||||
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
|
||||
break;
|
||||
}
|
||||
|
||||
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
|
||||
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
|
||||
SCYLLA_ASSERT(it != _topo_sm._topology.normal_nodes.end());
|
||||
@@ -1927,6 +1937,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtbuilder.done();
|
||||
switch(node.rs->state) {
|
||||
case node_state::bootstrapping: {
|
||||
co_await utils::get_local_injector().inject("delay_node_bootstrap", [](auto& handler) {
|
||||
rtlogger.info("delay_node_bootstrap: waiting for message");
|
||||
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
|
||||
});
|
||||
std::vector<canonical_mutation> muts;
|
||||
// Since after bootstrapping a new node some nodes lost some ranges they need to cleanup
|
||||
muts = mark_nodes_as_cleanup_needed(node, false);
|
||||
@@ -1940,8 +1954,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
"bootstrap: read fence completed");
|
||||
}
|
||||
break;
|
||||
case node_state::removing:
|
||||
case node_state::removing: {
|
||||
co_await utils::get_local_injector().inject("delay_node_removal", [](auto& handler) {
|
||||
rtlogger.info("delay_node_removal: waiting for message");
|
||||
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
|
||||
});
|
||||
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
|
||||
}
|
||||
[[fallthrough]];
|
||||
case node_state::decommissioning: {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp());
|
||||
@@ -2494,6 +2513,7 @@ public:
|
||||
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
|
||||
, _tablet_allocator(tablet_allocator)
|
||||
, _ring_delay(ring_delay)
|
||||
, _group0_holder(_group0.hold_group0_gate())
|
||||
{}
|
||||
|
||||
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
|
||||
@@ -2934,6 +2954,7 @@ bool topology_coordinator::handle_topology_coordinator_error(std::exception_ptr
|
||||
} catch (raft::commit_status_unknown&) {
|
||||
rtlogger.warn("topology change coordinator fiber got commit_status_unknown");
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("topology change coordinator fiber got group0_concurrent_modification");
|
||||
} catch (topology_coordinator::term_changed_error&) {
|
||||
// Term changed. We may no longer be a leader
|
||||
rtlogger.debug("topology change coordinator fiber notices term change {} -> {}", _term, _raft.get_current_term());
|
||||
|
||||
@@ -23,8 +23,8 @@
|
||||
|
||||
#include <variant>
|
||||
|
||||
template<typename T>
|
||||
static inline T consume_be(temporary_buffer<char>& p) {
|
||||
template<typename T, ContiguousSharedBuffer Buffer>
|
||||
static inline T consume_be(Buffer& p) {
|
||||
T i = read_be<T>(p.get());
|
||||
p.trim_front(sizeof(T));
|
||||
return i;
|
||||
@@ -60,7 +60,9 @@ enum class read_status { ready, waiting };
|
||||
// }
|
||||
// return pc._u32;
|
||||
//
|
||||
class primitive_consumer {
|
||||
template<ContiguousSharedBuffer Buffer>
|
||||
class primitive_consumer_impl {
|
||||
using FragmentedBuffer = basic_fragmented_buffer<Buffer>;
|
||||
private:
|
||||
// state machine progress:
|
||||
enum class prestate {
|
||||
@@ -103,20 +105,26 @@ private:
|
||||
|
||||
// state for READING_BYTES prestate
|
||||
size_t _read_bytes_len = 0;
|
||||
utils::small_vector<temporary_buffer<char>, 1> _read_bytes;
|
||||
temporary_buffer<char> _read_bytes_buf; // for contiguous reading.
|
||||
utils::small_vector<Buffer, 1> _read_bytes;
|
||||
temporary_buffer<char>* _read_bytes_where_contiguous; // which buffer to set, _key, _val, _cell_path or _pk?
|
||||
fragmented_temporary_buffer* _read_bytes_where;
|
||||
FragmentedBuffer* _read_bytes_where;
|
||||
|
||||
// Alloc-free
|
||||
inline read_status read_partial_int(temporary_buffer<char>& data, prestate next_state) noexcept {
|
||||
inline read_status read_partial_int(Buffer& data, prestate next_state) noexcept {
|
||||
std::copy(data.begin(), data.end(), _read_int.bytes);
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
_prestate = next_state;
|
||||
return read_status::waiting;
|
||||
}
|
||||
inline read_status read_partial_int(prestate next_state) noexcept {
|
||||
_pos = 0;
|
||||
_prestate = next_state;
|
||||
return read_status::waiting;
|
||||
}
|
||||
template <typename VintType, prestate ReadingVint, prestate ReadingVintWithLen>
|
||||
inline read_status read_vint(temporary_buffer<char>& data, typename VintType::value_type& dest) {
|
||||
inline read_status read_vint(Buffer& data, typename VintType::value_type& dest) {
|
||||
if (data.empty()) {
|
||||
_prestate = ReadingVint;
|
||||
return read_status::waiting;
|
||||
@@ -128,9 +136,8 @@ private:
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -140,23 +147,23 @@ private:
|
||||
}
|
||||
}
|
||||
template <typename VintType>
|
||||
inline read_status read_vint_with_len(temporary_buffer<char>& data, typename VintType::value_type& dest) {
|
||||
inline read_status read_vint_with_len(Buffer& data, typename VintType::value_type& dest) {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
dest = VintType::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
return read_status::waiting;
|
||||
};
|
||||
public:
|
||||
primitive_consumer(reader_permit permit) : _permit(std::move(permit)) {}
|
||||
primitive_consumer_impl(reader_permit permit) : _permit(std::move(permit)) {}
|
||||
|
||||
inline read_status read_8(temporary_buffer<char>& data) {
|
||||
inline read_status read_8(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint8_t)) {
|
||||
_u8 = consume_be<uint8_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -170,7 +177,7 @@ public:
|
||||
// (this is the common case), do this immediately. Otherwise, remember
|
||||
// what we have in the buffer, and remember to continue later by using
|
||||
// a "prestate":
|
||||
inline read_status read_16(temporary_buffer<char>& data) {
|
||||
inline read_status read_16(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint16_t)) {
|
||||
_u16 = consume_be<uint16_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -179,7 +186,7 @@ public:
|
||||
}
|
||||
}
|
||||
// Alloc-free
|
||||
inline read_status read_32(temporary_buffer<char>& data) noexcept {
|
||||
inline read_status read_32(Buffer& data) noexcept {
|
||||
if (data.size() >= sizeof(uint32_t)) {
|
||||
_u32 = consume_be<uint32_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -187,7 +194,10 @@ public:
|
||||
return read_partial_int(data, prestate::READING_U32);
|
||||
}
|
||||
}
|
||||
inline read_status read_64(temporary_buffer<char>& data) {
|
||||
inline read_status read_32() noexcept {
|
||||
return read_partial_int(prestate::READING_U32);
|
||||
}
|
||||
inline read_status read_64(Buffer& data) {
|
||||
if (data.size() >= sizeof(uint64_t)) {
|
||||
_u64 = consume_be<uint64_t>(data);
|
||||
return read_status::ready;
|
||||
@@ -195,16 +205,24 @@ public:
|
||||
return read_partial_int(data, prestate::READING_U64);
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes_contiguous(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
temporary_buffer<char> share(Buffer& data, uint32_t offset, uint32_t len) {
|
||||
if constexpr(std::is_same_v<Buffer, temporary_buffer<char>>) {
|
||||
return data.share(offset, len);
|
||||
} else {
|
||||
auto ret = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin() + offset, data.begin() + offset + len, ret.get_write());
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes_contiguous(Buffer& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
if (data.size() >= len) {
|
||||
where = data.share(0, len);
|
||||
where = share(data, 0, len);
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
// copy what we have so far, read the rest later
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
_pos = data.size();
|
||||
@@ -213,12 +231,12 @@ public:
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, fragmented_temporary_buffer& where) {
|
||||
inline read_status read_bytes(Buffer& data, uint32_t len, FragmentedBuffer& where) {
|
||||
if (data.size() >= len) {
|
||||
auto fragments = std::move(where).release();
|
||||
fragments.clear();
|
||||
fragments.push_back(data.share(0, len));
|
||||
where = fragmented_temporary_buffer(std::move(fragments), len);
|
||||
where = FragmentedBuffer(std::move(fragments), len);
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
@@ -233,7 +251,7 @@ public:
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
inline read_status read_short_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
inline read_status read_short_length_bytes(Buffer& data, temporary_buffer<char>& where) {
|
||||
if (data.size() >= sizeof(uint16_t)) {
|
||||
_u16 = consume_be<uint16_t>(data);
|
||||
} else {
|
||||
@@ -242,19 +260,19 @@ public:
|
||||
}
|
||||
return read_bytes_contiguous(data, uint32_t{_u16}, where);
|
||||
}
|
||||
inline read_status read_unsigned_vint(temporary_buffer<char>& data) {
|
||||
inline read_status read_unsigned_vint(Buffer& data) {
|
||||
return read_vint<
|
||||
unsigned_vint,
|
||||
prestate::READING_UNSIGNED_VINT,
|
||||
prestate::READING_UNSIGNED_VINT_WITH_LEN>(data, _u64);
|
||||
}
|
||||
inline read_status read_signed_vint(temporary_buffer<char>& data) {
|
||||
inline read_status read_signed_vint(Buffer& data) {
|
||||
return read_vint<
|
||||
signed_vint,
|
||||
prestate::READING_SIGNED_VINT,
|
||||
prestate::READING_SIGNED_VINT_WITH_LEN>(data, _i64);
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes_contiguous(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
inline read_status read_unsigned_vint_length_bytes_contiguous(Buffer& data, temporary_buffer<char>& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
@@ -267,9 +285,8 @@ public:
|
||||
data.trim_front(len);
|
||||
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -279,7 +296,7 @@ public:
|
||||
}
|
||||
}
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, fragmented_temporary_buffer& where) {
|
||||
inline read_status read_unsigned_vint_length_bytes(Buffer& data, FragmentedBuffer& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
|
||||
_read_bytes_where = &where;
|
||||
@@ -292,9 +309,8 @@ public:
|
||||
data.trim_front(len);
|
||||
return read_bytes(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -307,7 +323,7 @@ public:
|
||||
private:
|
||||
// Reads bytes belonging to an integer of size len. Returns true
|
||||
// if a full integer is now available.
|
||||
bool process_int(temporary_buffer<char>& data, unsigned len) {
|
||||
bool process_int(Buffer& data, unsigned len) {
|
||||
SCYLLA_ASSERT(_pos < len);
|
||||
auto n = std::min((size_t)(len - _pos), data.size());
|
||||
std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos);
|
||||
@@ -316,9 +332,18 @@ private:
|
||||
return _pos == len;
|
||||
}
|
||||
public:
|
||||
read_status consume_u32(Buffer& data) {
|
||||
if (process_int(data, sizeof(uint32_t))) {
|
||||
_u32 = net::ntoh(_read_int.uint32);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
return read_status::waiting;
|
||||
}
|
||||
|
||||
// Feeds data into the state machine.
|
||||
// After the call, when data is not empty then active() can be assumed to be false.
|
||||
read_status consume(temporary_buffer<char>& data) {
|
||||
read_status consume(Buffer& data) {
|
||||
if (__builtin_expect(_prestate == prestate::NONE, true)) {
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -360,12 +385,12 @@ public:
|
||||
return read_vint_with_len<signed_vint>(data, _i64);
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
if (read_bytes_contiguous(data, _u64, *_read_bytes_where_contiguous) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
@@ -375,12 +400,12 @@ public:
|
||||
}
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
|
||||
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
@@ -390,11 +415,11 @@ public:
|
||||
}
|
||||
case prestate::READING_BYTES_CONTIGUOUS: {
|
||||
auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy(data.begin(), data.begin() + n, _read_bytes.front().get_write() + _pos);
|
||||
std::copy(data.begin(), data.begin() + n, _read_bytes_buf.get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
*_read_bytes_where_contiguous = std::move(_read_bytes.front());
|
||||
*_read_bytes_where_contiguous = std::move(_read_bytes_buf);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -406,8 +431,8 @@ public:
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
std::vector<temporary_buffer<char>> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
|
||||
*_read_bytes_where = fragmented_temporary_buffer(std::move(fragments), _read_bytes_len);
|
||||
std::vector<Buffer> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
|
||||
*_read_bytes_where = FragmentedBuffer(std::move(fragments), _read_bytes_len);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -435,12 +460,7 @@ public:
|
||||
}
|
||||
break;
|
||||
case prestate::READING_U32:
|
||||
if (process_int(data, sizeof(uint32_t))) {
|
||||
_u32 = net::ntoh(_read_int.uint32);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
break;
|
||||
return consume_u32(data);
|
||||
case prestate::READING_U64:
|
||||
if (process_int(data, sizeof(uint64_t))) {
|
||||
_u64 = net::ntoh(_read_int.uint64);
|
||||
@@ -461,6 +481,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using primitive_consumer = primitive_consumer_impl<temporary_buffer<char>>;
|
||||
|
||||
template <typename StateProcessor>
|
||||
class continuous_data_consumer : protected primitive_consumer {
|
||||
using proceed = data_consumer::proceed;
|
||||
|
||||
@@ -1110,7 +1110,7 @@ public:
|
||||
_consumer.consume_row_end();
|
||||
return;
|
||||
}
|
||||
if (_state != state::ROW_START || primitive_consumer::active()) {
|
||||
if (_state != state::ROW_START || data_consumer::primitive_consumer::active()) {
|
||||
throw malformed_sstable_exception("end of input, but not end of row");
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user