Compare commits

...

29 Commits

Author SHA1 Message Date
Hagit Segev
c2d9247574 release: prepare for 4.4.rc3 2021-03-04 13:38:43 +02:00
Raphael S. Carvalho
b4e393d215 compaction: Fix leak of expired sstable in the backlog tracker
expired sstables are skipped in the compaction setup phase, because they don't
need to be actually compacted, but rather only deleted at the end.
that is causing such sstables to not be removed from the backlog tracker,
meaning that backlog caused by expired sstables will not be removed even after
their deletion, which means shares will be higher than needed, making compaction
potentially more aggressive than it have to.

to fix this bug, let's manually register these sstables into the monitor,
such that they'll be removed from the tracker once compaction completes.

Fixes #6054.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210216203700.189362-1-raphaelsc@scylladb.com>
(cherry picked from commit 5206a97915)
2021-03-01 14:14:33 +02:00
Avi Kivity
1a2b7037cd Update seastar submodule
* seastar 74ae29bc17...2c884a7449 (1):
  > io_queue: Fix "delay" metrics

Fixes #8166.
2021-03-01 13:57:04 +02:00
Raphael S. Carvalho
048f5efe1c sstables: Fix TWCS reshape for windows with at least min_threshold sstables
TWCS reshape was silently ignoring windows which contain at least
min_threshold sstables (can happen with data segregation).
When resizing candidates, size of multi_window was incorrectly used and
it was always empty in this path, which means candidates was always
cleared.

Fixes #8147.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210224125322.637128-1-raphaelsc@scylladb.com>
(cherry picked from commit 21608bd677)
2021-02-28 17:20:26 +02:00
Takuya ASADA
056293b95f dist/debian: don't run dh_installinit for scylla-node-exporter when service name == package name
dh_installinit --name <service> is for forcing install debian/*.service
and debian/*.default that does not matches with package name.
And if we have subpackages, packager has responsibility to rename
debian/*.service to debian/<subpackage>.*service.

However, we currently mistakenly running
dh_installinit --name scylla-node-exporter for
debian/scylla-node-exporeter.service,
the packaging system tries to find destination package for the .service,
and does not find subpackage name on it, so it will pick first
subpackage ordered by name, scylla-conf.

To solve the issue, we just need to run dh_installinit without --name
when $product == 'scylla'.

Fixes #8163

Closes #8164

(cherry picked from commit aabc67e386)
2021-02-28 17:20:26 +02:00
Takuya ASADA
f96ea8e011 scylla_setup: allow running scylla_setup with strict umask setting
We currently deny running scylla_setup when umask != 0022.
To remove this limitation, run os.chmod(0o644) on every file creation
to allow reading from scylla user.

Note that perftune.yaml is not really needed to set 0644 since perftune.py is
running in root user, but setting it to align permission with other files.

Fixes #8049

Closes #8119

(cherry picked from commit f3a82f4685)
2021-02-26 08:49:59 +02:00
Hagit Segev
49cd0b87f0 release: prepare for 4.4.rc2 2021-02-24 19:15:29 +02:00
Asias He
0977a73ab2 messaging_service: Move gossip ack message verb to gossip group
Fix a scheduling group leak:

INFO [shard 0] gossip - gossiper::run sg=gossip
INFO [shard 0] gossip - gossiper::handle_ack_msg sg=statement
INFO [shard 0] gossip - gossiper::handle_syn_msg sg=gossip
INFO [shard 0] gossip - gossiper::handle_ack2_msg sg=gossip

After the fix:

INFO [shard 0] gossip - gossiper::run sg=gossip
INFO [shard 0] gossip - gossiper::handle_ack_msg sg=gossip
INFO [shard 0] gossip - gossiper::handle_syn_msg sg=gossip
INFO [shard 0] gossip - gossiper::handle_ack2_msg sg=gossip

Fixes #7986

Closes #8129

(cherry picked from commit 7018377bd7)
2021-02-24 14:11:16 +02:00
Pekka Enberg
9fc582ee83 Update seastar submodule
* seastar 572536ef...74ae29bc (3):
  > perftune.py: fix assignment after extend and add asserts
  > scripts/perftune.py: convert nic option in old perftune.yaml to list for compatibility
  > scripts/perftune.py: remove repeated items after merging options from file
Fixes #7968.
2021-02-23 15:18:00 +02:00
Avi Kivity
4be14c2249 Revert "repair: Make removenode safe by default"
This reverts commit 829b4c1438. It ended
up causing repair failures.

Fixes #7965.
2021-02-23 14:14:07 +02:00
Tomasz Grabiec
3160dd4b59 table: Fix schema mismatch between memtable reader and sstable writer
The schema used to create the sstable writer has to be the same as the
schema used by the reader, as the former is used to intrpret mutation
fragments produced by the reader.

Commit 9124a70 intorduced a deferring point between reader creation
and writer creation which can result in schema mismatch if there was a
concurrent alter.

This could lead to the sstable write to crash, or generate a corrupted
sstable.

Fixes #7994

Message-Id: <20210222153149.289308-1-tgrabiec@scylladb.com>
2021-02-23 13:48:33 +02:00
Avi Kivity
50a8eab1a2 Update seastar submodule
* seastar a287bb1a3...572536ef4 (1):
  > rpc: streaming sink: order outgoing messages

Fixes #7552.
2021-02-23 10:19:45 +02:00
Avi Kivity
04615436a0 Point seastar submodule at scylla-seastar.git
This allows us to backport Seastar fixes to this branch.
2021-02-23 10:17:22 +02:00
Takuya ASADA
d1ab37654e scylla_util.py: resolve /dev/root to get actual device on aws
When psutil.disk_paritions() reports / is /dev/root, aws_instance mistakenly
reports root partition is part of ephemeral disks, and RAID construction will
fail.
This prevents the error and reports correct free disks.

Fixes #8055

Closes #8040

(cherry picked from commit 32d4ec6b8a)
2021-02-21 16:22:51 +02:00
Nadav Har'El
b47bdb053d alternator: fix ValidationException in FilterExpression - and more
The first condition expressions we implemented in Alternator were the old
"Expected" syntax of conditional updates. That implementation had some
specific assumptions on how it handles errors: For example, in the "LT"
operator in "Expected", the second operand is always part of the query, so
an error in it (e.g., an unsupported type) resulted it a ValidationException
error.

When we implemented ConditionExpression and FilterExpression, we wrongly
used the same functions check_compare(), check_BETWEEN(), etc., to implement
them. This results in some inaccurate error handling. The worst example is
what happens when you use a FilterExpression with an expression such as
"x < y" - this filter is supposed to silently skip items whose "x" and "y"
attributes have unsupported or different types, but in our implementation
a bad type (e.g., a list) for y resulted in a ValidationException which
aborted the entire scan! Interestingly, in once case (that of BEGINS_WITH)
we actually noticed the slightly different behavior needed and implemented
the same operator twice - with ugly code duplication. But in other operators
we missed this problem completely.

This patch first adds extensive tests of how the different expressions
(Expected, QueryFilter, FilterExpression, ConditionExpression) and the
different operators handle various input errors - unsupported types,
missing items, incompatible types, etc. Importantly, the tests demonstrate
that there is often different behavior depending on whether the bad
input comes from the query, or from the item. Some of the new tests
fail before this patch, but others pass and were useful to verify that
the patch doesn't break anything that already worked correctly previously.
As usual, all the tests pass on Cassandra.

Finally, this patch *fixes* all these problems. The comparison functions
like check_compare() and check_BETWEEN() now not only take the operands,
they also take booleans saying if each of the operands came from the
query or from an item. The old-syntax caller (Expected or QueryFilter)
always say that the first operand is from the item and the second is
from the query - but in the new-syntax caller (ConditionExpression or
FilterExpression) any or all of the operands can come from the query
and need verification.

The old duplicated code for check_BEGINS_WITH() - which a TODO to remove
it - is finally removed. Instead we use the same idea of passing booleans
saying if each of its operands came from an item or from the query.

Fixes #8043

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 653610f4bc)
2021-02-21 09:25:01 +02:00
Piotr Sarna
e11ae8c58f test: fix a flaky timeout test depending on TTL
One of the USING TIMEOUT tests relied on a specific TTL value,
but that's fragile if the test runs on the boundary of 2 seconds.
Instead, the test case simply checks if the TTL value is present
and is greater than 0, which makes the test robust unless its execution
lasts for more than 1 million seconds, which is highly unlikely.

Fixes #8062

Closes #8063

(cherry picked from commit 2aa4631148)
2021-02-14 13:08:39 +02:00
Benny Halevy
e4132edef3 stream_session: prepare: fix missing string format argument
As seen in
mv_populating_from_existing_data_during_node_decommission_test dtest:
```
ERROR 2021-02-11 06:01:32,804 [shard 0] stream_session - failed to log message: fmt::v7::format_error (argument not found)
```

Fixes #8067

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210211100158.543952-1-bhalevy@scylladb.com>
(cherry picked from commit d01e7e7b58)
2021-02-14 13:08:20 +02:00
Shlomi Livne
492f0802fb scylla_io_setup did not configure pre tuned gce instances correctly
scylla_io_setup condition for nr_disks was using the bitwise operator
(&) instead of logical and operator (and) causing the io_properties
files to have incorrect values

Fixes #7341

Reviewed-by: Lubos Kosco <lubos@scylladb.com>
Signed-off-by: Shlomi Livne <shlomi@scylladb.com>

Closes #8019

(cherry picked from commit 718976e794)
2021-02-14 13:08:00 +02:00
Takuya ASADA
34f22e1df1 dist/debian: install scylla-node-exporter.service correctly
node-exporter systemd unit name is "scylla-node-exporter.service", not
"node-exporter.service".

Fixes #8054

Closes #8053

(cherry picked from commit 856fe12e13)
2021-02-14 13:07:29 +02:00
Nadav Har'El
acb921845f cql-pytest: fix flaky timeuuid_test.py
The test timeuuid_test.py::testTimeuuid sporadically failed, and it turns out
the reason was a bug in the test - which this patch fixes.

The buggy test created a timeuuid and then compared the time stored in it
to the result of the dateOf() CQL function. The problem is that dateOf()
returns a CQL "timestamp", which has millisecond resolution, while the
timeuuid *may* have finer than millisecond resolution. The reason why this
test rarely failed is that in our implementation, the timeuuid almost
always gets a millisecond-resolution timestamp. Only if now() gets called
more than once in one millisecond, does it pick a higher time incremented
by less than a millisecond.

What this patch does is to truncate the time read from the timeuuid to
millisecond resolution, and only then compare it to the result of dateOf().
We cannot hope for more.

Fixes #8060

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210211165046.878371-1-nyh@scylladb.com>
(cherry picked from commit a03a8a89a9)
2021-02-14 13:06:59 +02:00
Botond Dénes
5b6c284281 query: use local limit for non-limited queries in mixed cluster
Since fea5067df we enforce a limit on the memory consumption of
otherwise non-limited queries like reverse and non-paged queries. This
limit is sent down to the replicas by the coordinator, ensuring that
each replica is working with the same limit. This however doesn't work
in a mixed cluster, when upgrading from a version which doesn't have
this series. This has been worked around by falling back to the old
max_result_size constant of 1MB in mixed clusters. This however resulted
in a regression when upgrading from a pre fea5067df to a post fea5067df
one. Pre fea5067df already had a limit for reverse queries, which was
generalized to also cover non-paged ones too by fea5067df.
The regression manifested in previously working reverse queries being
aborted. This happened because even though the user has set a generous
limit for them before the upgrade, in the mix cluster replicas fall back
to the much stricter 1MB limit temporarily ignoring the configured limit
if the coordinator is an old node. This patch solves this problem by
using the locally configured limit instead of the max_result_size
constant. This means that the user has to take extra care to configure
the same limit on all replicas, but at least they will have working
reverse queries during the upgrade.

Fixes: #8022

Tests: unit(release), manual test by user who reported the issue
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20210209075947.1004164-1-bdenes@scylladb.com>
(cherry picked from commit 3d001b5587)
2021-02-09 18:06:43 +02:00
Yaron Kaikov
7d15319a8a release: prepare for 4.4
Update Docker parameters for the 4.4 release.

Closes #7932
2021-02-09 09:42:53 +02:00
Amnon Heiman
a06412fd24 API: Fix aggregation in column_familiy
Few method in column_familiy API were doing the aggregation wrong,
specifically, bloom filter disk size.

The issue is not always visible, it happens when there are multiple
filter files per shard.

Fixes #4513

Signed-off-by: Amnon Heiman <amnon@scylladb.com>

Closes #8007

(cherry picked from commit 4498bb0a48)
2021-02-08 17:03:45 +02:00
Avi Kivity
2500dd1dc4 Merge 'dist/offline_installer/redhat: fix umask error' from Takuya ASADA
Since makeself script changes current umask, scylla_setup causes
"scylla does not work with current umask setting (0077)" error.
To fix that we need use latest version of makeself, and specfiy --keep-umask
option.

Fixes #6243

Closes #6244

* github.com:scylladb/scylla:
  dist/offline_redhat: fix umask error
  dist/offline_installer/redhat: support cross build

(cherry picked from commit bb202db1ff)
2021-02-01 13:03:06 +02:00
Hagit Segev
fd868722dd release: prepare for 4.4.rc1 2021-01-31 14:09:44 +02:00
Pekka Enberg
f470c5d4de Update tools/python3 submodule
* tools/python3 c579207...199ac90 (1):
  > dist: debian: adjust .orig tarball name for .rc releases
2021-01-25 09:26:33 +02:00
Pekka Enberg
3677a72a21 Update tools/python3 submodule
* tools/python3 1763a1a...c579207 (1):
  > dist/debian: handle rc version correctly
2021-01-22 09:36:54 +02:00
Hagit Segev
46e6273821 release: prepare for 4.4.rc0 2021-01-18 20:29:53 +02:00
Jenkins Promoter
ce7e31013c release: prepare for 4.4 2021-01-18 15:49:55 +02:00
41 changed files with 662 additions and 684 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=4.4.dev
VERSION=4.4.rc3
if test -f version
then

View File

@@ -159,23 +159,40 @@ static bool check_NE(const rjson::value* v1, const rjson::value& v2) {
}
// Check if two JSON-encoded values match with the BEGINS_WITH relation
static bool check_BEGINS_WITH(const rjson::value* v1, const rjson::value& v2) {
// BEGINS_WITH requires that its single operand (v2) be a string or
// binary - otherwise it's a validation error. However, problems with
// the stored attribute (v1) will just return false (no match).
if (!v2.IsObject() || v2.MemberCount() != 1) {
throw api_error::validation(format("BEGINS_WITH operator encountered malformed AttributeValue: {}", v2));
}
auto it2 = v2.MemberBegin();
if (it2->name != "S" && it2->name != "B") {
throw api_error::validation(format("BEGINS_WITH operator requires String or Binary type in AttributeValue, got {}", it2->name));
}
bool check_BEGINS_WITH(const rjson::value* v1, const rjson::value& v2,
bool v1_from_query, bool v2_from_query) {
bool bad = false;
if (!v1 || !v1->IsObject() || v1->MemberCount() != 1) {
if (v1_from_query) {
throw api_error::validation("begins_with() encountered malformed argument");
} else {
bad = true;
}
} else if (v1->MemberBegin()->name != "S" && v1->MemberBegin()->name != "B") {
if (v1_from_query) {
throw api_error::validation(format("begins_with supports only string or binary type, got: {}", *v1));
} else {
bad = true;
}
}
if (!v2.IsObject() || v2.MemberCount() != 1) {
if (v2_from_query) {
throw api_error::validation("begins_with() encountered malformed argument");
} else {
bad = true;
}
} else if (v2.MemberBegin()->name != "S" && v2.MemberBegin()->name != "B") {
if (v2_from_query) {
throw api_error::validation(format("begins_with() supports only string or binary type, got: {}", v2));
} else {
bad = true;
}
}
if (bad) {
return false;
}
auto it1 = v1->MemberBegin();
auto it2 = v2.MemberBegin();
if (it1->name != it2->name) {
return false;
}
@@ -279,24 +296,38 @@ static bool check_NOT_NULL(const rjson::value* val) {
return val != nullptr;
}
// Only types S, N or B (string, number or bytes) may be compared by the
// various comparion operators - lt, le, gt, ge, and between.
static bool check_comparable_type(const rjson::value& v) {
if (!v.IsObject() || v.MemberCount() != 1) {
return false;
}
const rjson::value& type = v.MemberBegin()->name;
return type == "S" || type == "N" || type == "B";
}
// Check if two JSON-encoded values match with cmp.
template <typename Comparator>
bool check_compare(const rjson::value* v1, const rjson::value& v2, const Comparator& cmp) {
if (!v2.IsObject() || v2.MemberCount() != 1) {
throw api_error::validation(
format("{} requires a single AttributeValue of type String, Number, or Binary",
cmp.diagnostic));
bool check_compare(const rjson::value* v1, const rjson::value& v2, const Comparator& cmp,
bool v1_from_query, bool v2_from_query) {
bool bad = false;
if (!v1 || !check_comparable_type(*v1)) {
if (v1_from_query) {
throw api_error::validation(format("{} allow only the types String, Number, or Binary", cmp.diagnostic));
}
bad = true;
}
const auto& kv2 = *v2.MemberBegin();
if (kv2.name != "S" && kv2.name != "N" && kv2.name != "B") {
throw api_error::validation(
format("{} requires a single AttributeValue of type String, Number, or Binary",
cmp.diagnostic));
if (!check_comparable_type(v2)) {
if (v2_from_query) {
throw api_error::validation(format("{} allow only the types String, Number, or Binary", cmp.diagnostic));
}
bad = true;
}
if (!v1 || !v1->IsObject() || v1->MemberCount() != 1) {
if (bad) {
return false;
}
const auto& kv1 = *v1->MemberBegin();
const auto& kv2 = *v2.MemberBegin();
if (kv1.name != kv2.name) {
return false;
}
@@ -310,7 +341,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
if (kv1.name == "B") {
return cmp(base64_decode(kv1.value), base64_decode(kv2.value));
}
clogger.error("check_compare panic: LHS type equals RHS type, but one is in {N,S,B} while the other isn't");
// cannot reach here, as check_comparable_type() verifies the type is one
// of the above options.
return false;
}
@@ -341,56 +373,71 @@ struct cmp_gt {
static constexpr const char* diagnostic = "GT operator";
};
// True if v is between lb and ub, inclusive. Throws if lb > ub.
// True if v is between lb and ub, inclusive. Throws or returns false
// (depending on bounds_from_query parameter) if lb > ub.
template <typename T>
static bool check_BETWEEN(const T& v, const T& lb, const T& ub) {
static bool check_BETWEEN(const T& v, const T& lb, const T& ub, bool bounds_from_query) {
if (cmp_lt()(ub, lb)) {
throw api_error::validation(
format("BETWEEN operator requires lower_bound <= upper_bound, but {} > {}", lb, ub));
if (bounds_from_query) {
throw api_error::validation(
format("BETWEEN operator requires lower_bound <= upper_bound, but {} > {}", lb, ub));
} else {
return false;
}
}
return cmp_ge()(v, lb) && cmp_le()(v, ub);
}
static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const rjson::value& ub) {
if (!v) {
static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const rjson::value& ub,
bool v_from_query, bool lb_from_query, bool ub_from_query) {
if ((v && v_from_query && !check_comparable_type(*v)) ||
(lb_from_query && !check_comparable_type(lb)) ||
(ub_from_query && !check_comparable_type(ub))) {
throw api_error::validation("between allow only the types String, Number, or Binary");
}
if (!v || !v->IsObject() || v->MemberCount() != 1 ||
!lb.IsObject() || lb.MemberCount() != 1 ||
!ub.IsObject() || ub.MemberCount() != 1) {
return false;
}
if (!v->IsObject() || v->MemberCount() != 1) {
throw api_error::validation(format("BETWEEN operator encountered malformed AttributeValue: {}", *v));
}
if (!lb.IsObject() || lb.MemberCount() != 1) {
throw api_error::validation(format("BETWEEN operator encountered malformed AttributeValue: {}", lb));
}
if (!ub.IsObject() || ub.MemberCount() != 1) {
throw api_error::validation(format("BETWEEN operator encountered malformed AttributeValue: {}", ub));
}
const auto& kv_v = *v->MemberBegin();
const auto& kv_lb = *lb.MemberBegin();
const auto& kv_ub = *ub.MemberBegin();
bool bounds_from_query = lb_from_query && ub_from_query;
if (kv_lb.name != kv_ub.name) {
throw api_error::validation(
if (bounds_from_query) {
throw api_error::validation(
format("BETWEEN operator requires the same type for lower and upper bound; instead got {} and {}",
kv_lb.name, kv_ub.name));
} else {
return false;
}
}
if (kv_v.name != kv_lb.name) { // Cannot compare different types, so v is NOT between lb and ub.
return false;
}
if (kv_v.name == "N") {
const char* diag = "BETWEEN operator";
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag));
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()));
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
bounds_from_query);
}
if (kv_v.name == "B") {
return check_BETWEEN(base64_decode(kv_v.value), base64_decode(kv_lb.value), base64_decode(kv_ub.value));
return check_BETWEEN(base64_decode(kv_v.value), base64_decode(kv_lb.value), base64_decode(kv_ub.value), bounds_from_query);
}
throw api_error::validation(
format("BETWEEN operator requires AttributeValueList elements to be of type String, Number, or Binary; instead got {}",
if (v_from_query) {
throw api_error::validation(
format("BETWEEN operator requires AttributeValueList elements to be of type String, Number, or Binary; instead got {}",
kv_lb.name));
} else {
return false;
}
}
// Verify one Expect condition on one attribute (whose content is "got")
@@ -437,19 +484,19 @@ static bool verify_expected_one(const rjson::value& condition, const rjson::valu
return check_NE(got, (*attribute_value_list)[0]);
case comparison_operator_type::LT:
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
return check_compare(got, (*attribute_value_list)[0], cmp_lt{});
return check_compare(got, (*attribute_value_list)[0], cmp_lt{}, false, true);
case comparison_operator_type::LE:
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
return check_compare(got, (*attribute_value_list)[0], cmp_le{});
return check_compare(got, (*attribute_value_list)[0], cmp_le{}, false, true);
case comparison_operator_type::GT:
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
return check_compare(got, (*attribute_value_list)[0], cmp_gt{});
return check_compare(got, (*attribute_value_list)[0], cmp_gt{}, false, true);
case comparison_operator_type::GE:
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
return check_compare(got, (*attribute_value_list)[0], cmp_ge{});
return check_compare(got, (*attribute_value_list)[0], cmp_ge{}, false, true);
case comparison_operator_type::BEGINS_WITH:
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
return check_BEGINS_WITH(got, (*attribute_value_list)[0]);
return check_BEGINS_WITH(got, (*attribute_value_list)[0], false, true);
case comparison_operator_type::IN:
verify_operand_count(attribute_value_list, nonempty(), *comparison_operator);
return check_IN(got, *attribute_value_list);
@@ -461,7 +508,8 @@ static bool verify_expected_one(const rjson::value& condition, const rjson::valu
return check_NOT_NULL(got);
case comparison_operator_type::BETWEEN:
verify_operand_count(attribute_value_list, exact_size(2), *comparison_operator);
return check_BETWEEN(got, (*attribute_value_list)[0], (*attribute_value_list)[1]);
return check_BETWEEN(got, (*attribute_value_list)[0], (*attribute_value_list)[1],
false, true, true);
case comparison_operator_type::CONTAINS:
{
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
@@ -573,7 +621,8 @@ static bool calculate_primitive_condition(const parsed::primitive_condition& con
// Shouldn't happen unless we have a bug in the parser
throw std::logic_error(format("Wrong number of values {} in BETWEEN primitive_condition", cond._values.size()));
}
return check_BETWEEN(&calculated_values[0], calculated_values[1], calculated_values[2]);
return check_BETWEEN(&calculated_values[0], calculated_values[1], calculated_values[2],
cond._values[0].is_constant(), cond._values[1].is_constant(), cond._values[2].is_constant());
case parsed::primitive_condition::type::IN:
return check_IN(calculated_values);
case parsed::primitive_condition::type::VALUE:
@@ -604,13 +653,17 @@ static bool calculate_primitive_condition(const parsed::primitive_condition& con
case parsed::primitive_condition::type::NE:
return check_NE(&calculated_values[0], calculated_values[1]);
case parsed::primitive_condition::type::GT:
return check_compare(&calculated_values[0], calculated_values[1], cmp_gt{});
return check_compare(&calculated_values[0], calculated_values[1], cmp_gt{},
cond._values[0].is_constant(), cond._values[1].is_constant());
case parsed::primitive_condition::type::GE:
return check_compare(&calculated_values[0], calculated_values[1], cmp_ge{});
return check_compare(&calculated_values[0], calculated_values[1], cmp_ge{},
cond._values[0].is_constant(), cond._values[1].is_constant());
case parsed::primitive_condition::type::LT:
return check_compare(&calculated_values[0], calculated_values[1], cmp_lt{});
return check_compare(&calculated_values[0], calculated_values[1], cmp_lt{},
cond._values[0].is_constant(), cond._values[1].is_constant());
case parsed::primitive_condition::type::LE:
return check_compare(&calculated_values[0], calculated_values[1], cmp_le{});
return check_compare(&calculated_values[0], calculated_values[1], cmp_le{},
cond._values[0].is_constant(), cond._values[1].is_constant());
default:
// Shouldn't happen unless we have a bug in the parser
throw std::logic_error(format("Unknown type {} in primitive_condition object", (int)(cond._op)));

View File

@@ -52,6 +52,7 @@ bool verify_expected(const rjson::value& req, const rjson::value* previous_item)
bool verify_condition(const rjson::value& condition, bool require_all, const rjson::value* previous_item);
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2);
bool check_BEGINS_WITH(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query);
bool verify_condition_expression(
const parsed::condition_expression& condition_expression,

View File

@@ -603,52 +603,8 @@ std::unordered_map<std::string_view, function_handler_type*> function_handlers {
}
rjson::value v1 = calculate_value(f._parameters[0], caller, previous_item);
rjson::value v2 = calculate_value(f._parameters[1], caller, previous_item);
// TODO: There's duplication here with check_BEGINS_WITH().
// But unfortunately, the two functions differ a bit.
// If one of v1 or v2 is malformed or has an unsupported type
// (not B or S), what we do depends on whether it came from
// the user's query (is_constant()), or the item. Unsupported
// values in the query result in an error, but if they are in
// the item, we silently return false (no match).
bool bad = false;
if (!v1.IsObject() || v1.MemberCount() != 1) {
bad = true;
if (f._parameters[0].is_constant()) {
throw api_error::validation(format("{}: begins_with() encountered malformed AttributeValue: {}", caller, v1));
}
} else if (v1.MemberBegin()->name != "S" && v1.MemberBegin()->name != "B") {
bad = true;
if (f._parameters[0].is_constant()) {
throw api_error::validation(format("{}: begins_with() supports only string or binary in AttributeValue: {}", caller, v1));
}
}
if (!v2.IsObject() || v2.MemberCount() != 1) {
bad = true;
if (f._parameters[1].is_constant()) {
throw api_error::validation(format("{}: begins_with() encountered malformed AttributeValue: {}", caller, v2));
}
} else if (v2.MemberBegin()->name != "S" && v2.MemberBegin()->name != "B") {
bad = true;
if (f._parameters[1].is_constant()) {
throw api_error::validation(format("{}: begins_with() supports only string or binary in AttributeValue: {}", caller, v2));
}
}
bool ret = false;
if (!bad) {
auto it1 = v1.MemberBegin();
auto it2 = v2.MemberBegin();
if (it1->name == it2->name) {
if (it2->name == "S") {
std::string_view val1 = rjson::to_string_view(it1->value);
std::string_view val2 = rjson::to_string_view(it2->value);
ret = val1.starts_with(val2);
} else /* it2->name == "B" */ {
ret = base64_begins_with(rjson::to_string_view(it1->value), rjson::to_string_view(it2->value));
}
}
}
return to_bool_json(ret);
return to_bool_json(check_BEGINS_WITH(v1.IsNull() ? nullptr : &v1, v2,
f._parameters[0].is_constant(), f._parameters[1].is_constant()));
}
},
{"contains", [] (calculate_value_caller caller, const rjson::value* previous_item, const parsed::value::function_call& f) {

View File

@@ -1105,14 +1105,6 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"ignore_nodes",
"description":"List of dead nodes to ingore in removenode operation",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}

View File

@@ -656,7 +656,7 @@ void set_column_family(http_context& ctx, routes& r) {
cf::get_bloom_filter_disk_space_used.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
return sst->filter_size();
return s + sst->filter_size();
});
}, std::plus<uint64_t>());
});
@@ -664,7 +664,7 @@ void set_column_family(http_context& ctx, routes& r) {
cf::get_all_bloom_filter_disk_space_used.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
return sst->filter_size();
return s + sst->filter_size();
});
}, std::plus<uint64_t>());
});
@@ -672,7 +672,7 @@ void set_column_family(http_context& ctx, routes& r) {
cf::get_bloom_filter_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
return sst->filter_memory_size();
return s + sst->filter_memory_size();
});
}, std::plus<uint64_t>());
});
@@ -680,7 +680,7 @@ void set_column_family(http_context& ctx, routes& r) {
cf::get_all_bloom_filter_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
return sst->filter_memory_size();
return s + sst->filter_memory_size();
});
}, std::plus<uint64_t>());
});
@@ -688,7 +688,7 @@ void set_column_family(http_context& ctx, routes& r) {
cf::get_index_summary_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
return sst->get_summary().memory_footprint();
return s + sst->get_summary().memory_footprint();
});
}, std::plus<uint64_t>());
});
@@ -696,7 +696,7 @@ void set_column_family(http_context& ctx, routes& r) {
cf::get_all_index_summary_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
return sst->get_summary().memory_footprint();
return s + sst->get_summary().memory_footprint();
});
}, std::plus<uint64_t>());
});

View File

@@ -27,7 +27,6 @@
#include <time.h>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/trim_all.hpp>
#include "service/storage_service.hh"
#include "service/load_meter.hh"
#include "db/commitlog/commitlog.hh"
@@ -497,22 +496,7 @@ void set_storage_service(http_context& ctx, routes& r) {
ss::remove_node.set(r, [](std::unique_ptr<request> req) {
auto host_id = req->get_query_param("host_id");
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
auto ignore_nodes = std::list<gms::inet_address>();
for (std::string n : ignore_nodes_strs) {
try {
std::replace(n.begin(), n.end(), '\"', ' ');
std::replace(n.begin(), n.end(), '\'', ' ');
boost::trim_all(n);
if (!n.empty()) {
auto node = gms::inet_address(n);
ignore_nodes.push_back(node);
}
} catch (...) {
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
}
}
return service::get_local_storage_service().removenode(host_id, std::move(ignore_nodes)).then([] {
return service::get_local_storage_service().removenode(host_id).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

View File

@@ -244,12 +244,12 @@ if __name__ == "__main__":
# and https://cloud.google.com/compute/docs/disks/local-ssd#nvme
# note that scylla iotune might measure more, this is GCP recommended
mbs=1024*1024
if nr_disks >= 1 & nr_disks < 4:
if nr_disks >= 1 and nr_disks < 4:
disk_properties["read_iops"] = 170000 * nr_disks
disk_properties["read_bandwidth"] = 660 * mbs * nr_disks
disk_properties["write_iops"] = 90000 * nr_disks
disk_properties["write_bandwidth"] = 350 * mbs * nr_disks
elif nr_disks >= 4 & nr_disks <= 8:
elif nr_disks >= 4 and nr_disks <= 8:
disk_properties["read_iops"] = 680000
disk_properties["read_bandwidth"] = 2650 * mbs
disk_properties["write_iops"] = 360000
@@ -281,3 +281,5 @@ if __name__ == "__main__":
run_iotune()
else:
run_iotune()
os.chmod(etcdir() + '/scylla.d/io_properties.yaml', 0o644)
os.chmod(etcdir() + '/scylla.d/io.conf', 0o644)

View File

@@ -82,6 +82,7 @@ def create_perftune_conf(cfg):
yaml = run('/opt/scylladb/scripts/perftune.py ' + params, shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
with open('/etc/scylla.d/perftune.yaml', 'w') as f:
f.write(yaml)
os.chmod('/etc/scylla.d/perftune.yaml', 0o644)
return True
else:
return False

View File

@@ -176,11 +176,6 @@ def warn_offline(setup):
def warn_offline_missing_pkg(setup, pkg):
colorprint('{red}{setup} disabled by default, since {pkg} not available.{nocolor}', setup=setup, pkg=pkg)
def current_umask():
current = os.umask(0)
os.umask(current)
return current
if __name__ == '__main__':
if not is_nonroot() and os.getuid() > 0:
print('Requires root permission.')
@@ -331,12 +326,6 @@ if __name__ == '__main__':
selinux_reboot_required = False
set_clocksource = False
umask = current_umask()
# files have to be world-readable
if not is_nonroot() and (umask & 0o7) != 0o2:
colorprint('{red}Scylla does not work with current umask setting ({umask}),\nplease restore umask to the default value (0022).{nocolor}', umask='{0:o}'.format(umask).zfill(4))
sys.exit(1)
if interactive:
colorprint('{green}Skip any of the following steps by answering \'no\'{nocolor}')
@@ -375,11 +364,13 @@ if __name__ == '__main__':
if version_check:
with open('/etc/scylla.d/housekeeping.cfg', 'w') as f:
f.write('[housekeeping]\ncheck-version: True\n')
os.chmod('/etc/scylla.d/housekeeping.cfg', 0o644)
systemd_unit('scylla-housekeeping-daily.timer').unmask()
systemd_unit('scylla-housekeeping-restart.timer').unmask()
else:
with open('/etc/scylla.d/housekeeping.cfg', 'w') as f:
f.write('[housekeeping]\ncheck-version: False\n')
os.chmod('/etc/scylla.d/housekeeping.cfg', 0o644)
hk_daily = systemd_unit('scylla-housekeeping-daily.timer')
hk_daily.mask()
hk_daily.stop()

View File

@@ -380,6 +380,8 @@ class aws_instance:
raise Exception("found more than one disk mounted at root'".format(root_dev_candidates))
root_dev = root_dev_candidates[0].device
if root_dev == '/dev/root':
root_dev = run('findmnt -n -o SOURCE /', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
nvmes_present = list(filter(nvme_re.match, os.listdir("/dev")))
return {"root": [ root_dev ], "ephemeral": [ x for x in nvmes_present if not root_dev.startswith(os.path.join("/dev/", x)) ] }

View File

@@ -29,11 +29,11 @@ ifeq ($(product),scylla)
dh_installinit --no-start
else
dh_installinit --no-start --name scylla-server
dh_installinit --no-start --name scylla-node-exporter
endif
dh_installinit --no-start --name scylla-housekeeping-daily
dh_installinit --no-start --name scylla-housekeeping-restart
dh_installinit --no-start --name scylla-fstrim
dh_installinit --no-start --name node-exporter
override_dh_strip:
# The binaries (ethtool...patchelf) don't pass dh_strip after going through patchelf. Since they are

View File

@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
ARG VERSION=666.development
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.4/latest/scylla.repo
ARG VERSION=4.4
ADD scylla_bashrc /scylla_bashrc

View File

@@ -26,26 +26,31 @@ fi
print_usage() {
echo "build_offline_installer.sh --repo [URL]"
echo " --repo repository for fetching scylla rpm, specify .repo file URL"
echo " --releasever use specific minor version of the distribution repo (ex: 7.4)"
echo " --image [IMAGE] Use the specified docker IMAGE"
echo " --no-docker Build offline installer without using docker"
exit 1
}
is_rhel7_variant() {
[ "$ID" = "rhel" -o "$ID" = "ol" -o "$ID" = "centos" ] && [[ "$VERSION_ID" =~ ^7 ]]
}
here="$(realpath $(dirname "$0"))"
releasever=`rpm -q --provides $(rpm -q --whatprovides "system-release(releasever)") | grep "system-release(releasever)"| uniq | cut -d ' ' -f 3`
REPO=
RELEASEVER=
IMAGE=docker.io/centos:7
NO_DOCKER=false
while [ $# -gt 0 ]; do
case "$1" in
"--repo")
REPO=$2
shift 2
;;
"--releasever")
RELEASEVER=$2
"--image")
IMAGE=$2
shift 2
;;
"--no-docker")
NO_DOCKER=true
shift 1
;;
*)
print_usage
;;
@@ -59,25 +64,17 @@ if [ -z $REPO ]; then
exit 1
fi
if ! is_rhel7_variant; then
echo "Unsupported distribution"
exit 1
fi
if [ "$ID" = "centos" ]; then
if [ ! -f /etc/yum.repos.d/epel.repo ]; then
sudo yum install -y epel-release
if ! $NO_DOCKER; then
if [[ -f ~/.config/scylladb/dbuild ]]; then
. ~/.config/scylladb/dbuild
fi
RELEASE=7
else
if [ ! -f /etc/yum.repos.d/epel.repo ]; then
sudo rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
if which docker >/dev/null 2>&1 ; then
tool=${DBUILD_TOOL-docker}
elif which podman >/dev/null 2>&1 ; then
tool=${DBUILD_TOOL-podman}
else
echo "Please make sure you install either podman or docker on this machine to run dbuild" && exit 1
fi
RELEASE=7Server
fi
if [ ! -f /usr/bin/yumdownloader ]; then
sudo yum -y install yum-utils
fi
if [ ! -f /usr/bin/wget ]; then
@@ -85,29 +82,55 @@ if [ ! -f /usr/bin/wget ]; then
fi
if [ ! -f /usr/bin/makeself ]; then
sudo yum -y install makeself
if $NO_DOCKER; then
# makeself on EPEL7 is too old, borrow it from EPEL8
# since there is no dependency on the package, it just work
if [ $release_major = '7' ]; then
sudo rpm --import https://dl.fedoraproject.org/pub/epel/RPM-GPG-KEY-EPEL-8
sudo cp "$here"/lib/epel8.repo /etc/yum.repos.d/
YUM_OPTS="--enablerepo=epel8"
elif [ $release_major = '8' ]; then
yum -y install epel-release || yum -y install https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm
fi
fi
sudo yum -y install "$YUM_OPTS" makeself
fi
if [ ! -f /usr/bin/createrepo ]; then
sudo yum -y install createrepo
fi
sudo yum -y install yum-plugin-downloadonly
makeself_ver=$(makeself --version|cut -d ' ' -f 3|sed -e 's/\.//g')
if [ $makeself_ver -lt 240 ]; then
echo "$(makeself --version) is too old, please install 2.4.0 or later"
exit 1
fi
cd /etc/yum.repos.d/
sudo wget -N $REPO
cd -
sudo rm -rf build/installroot build/offline_installer build/scylla_offline_installer.sh
sudo rm -rf build/installroot build/offline_docker build/offline_installer build/scylla_offline_installer.sh
mkdir -p build/installroot
mkdir -p build/installroot/etc/yum/vars
sudo sh -c "echo $RELEASE >> build/installroot/etc/yum/vars/releasever"
mkdir -p build/offline_docker
wget "$REPO" -O build/offline_docker/scylla.repo
cp "$here"/lib/install_deps.sh build/offline_docker
cp "$here"/lib/Dockerfile.in build/offline_docker/Dockerfile
sed -i -e "s#@@IMAGE@@#$IMAGE#" build/offline_docker/Dockerfile
cd build/offline_docker
if $NO_DOCKER; then
sudo cp scylla.repo /etc/yum.repos.d/scylla.repo
sudo ./install_deps.sh
else
image_id=$($tool build -q .)
fi
cd -
mkdir -p build/offline_installer
cp dist/offline_installer/redhat/header build/offline_installer
if [ -n "$RELEASEVER" ]; then
YUMOPTS="--releasever=$RELEASEVER"
cp "$here"/lib/header build/offline_installer
if $NO_DOCKER; then
"$here"/lib/construct_offline_repo.sh
else
./tools/toolchain/dbuild --image "$image_id" -- "$here"/lib/construct_offline_repo.sh
fi
sudo yum -y install $YUMOPTS --downloadonly --installroot=`pwd`/build/installroot --downloaddir=build/offline_installer scylla sudo ntp ntpdate net-tools kernel-tools
(cd build/offline_installer; createrepo -v .)
(cd build; makeself offline_installer scylla_offline_installer.sh "Scylla offline package" ./header)
(cd build; makeself --keep-umask offline_installer scylla_offline_installer.sh "Scylla offline package" ./header)

View File

@@ -0,0 +1,5 @@
FROM @@IMAGE@@
ADD install_deps.sh install_deps.sh
RUN ./install_deps.sh
ADD scylla.repo /etc/yum.repos.d/scylla.repo
CMD /bin/bash

View File

@@ -0,0 +1,9 @@
#!/bin/bash -e
releasever=`rpm -q --provides $(rpm -q --whatprovides "system-release(releasever)") | grep "system-release(releasever)"| uniq | cut -d ' ' -f 3`
# Can ignore error since we only needed when files exists
cp /etc/yum/vars/* build/installroot/etc/yum/vars/ ||:
# run yum in non-root mode using fakeroot
fakeroot yum -y install --downloadonly --releasever="$releasever" --installroot=`pwd`/build/installroot --downloaddir=build/offline_installer scylla sudo chrony net-tools kernel-tools mdadm xfsprogs

View File

@@ -0,0 +1,7 @@
[epel8]
name=Extra Packages for Enterprise Linux 8 - $basearch
#baseurl=https://download.fedoraproject.org/pub/epel/8/Everything/$basearch
metalink=https://mirrors.fedoraproject.org/metalink?repo=epel-8&arch=$basearch&infra=$infra&content=$contentdir
enabled=0
gpgcheck=1
countme=1

View File

@@ -0,0 +1,12 @@
#!/bin/bash
. /etc/os-release
release_major=$(echo $VERSION_ID|sed -e 's/^\([0-9]*\)[^0-9]*.*/\1/')
if [ ! -f /etc/yum.repos.d/epel.repo ]; then
yum -y install epel-release || yum -y install https://dl.fedoraproject.org/pub/epel/epel-release-latest-"$release_major".noarch.rpm
fi
if [ ! -f /usr/bin/fakeroot ]; then
yum -y install fakeroot
fi

View File

@@ -103,22 +103,3 @@ enum class repair_row_level_start_status: uint8_t {
struct repair_row_level_start_response {
repair_row_level_start_status status;
};
enum class node_ops_cmd : uint32_t {
removenode_prepare,
removenode_heartbeat,
removenode_sync_data,
removenode_abort,
removenode_done,
};
struct node_ops_cmd_request {
node_ops_cmd cmd;
utils::UUID ops_uuid;
std::list<gms::inet_address> ignore_nodes;
std::list<gms::inet_address> leaving_nodes;
};
struct node_ops_cmd_response {
bool ok;
};

View File

@@ -335,7 +335,6 @@ public:
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
void add_leaving_endpoint(inet_address endpoint);
void del_leaving_endpoint(inet_address endpoint);
public:
void remove_endpoint(inet_address endpoint);
#if 0
@@ -1658,10 +1657,6 @@ void token_metadata_impl::add_leaving_endpoint(inet_address endpoint) {
_leaving_endpoints.emplace(endpoint);
}
void token_metadata_impl::del_leaving_endpoint(inet_address endpoint) {
_leaving_endpoints.erase(endpoint);
}
void token_metadata_impl::add_replacing_endpoint(inet_address existing_node, inet_address replacing_node) {
tlogger.info("Added node {} as pending replacing endpoint which replaces existing node {}",
replacing_node, existing_node);
@@ -1932,11 +1927,6 @@ token_metadata::add_leaving_endpoint(inet_address endpoint) {
_impl->add_leaving_endpoint(endpoint);
}
void
token_metadata::del_leaving_endpoint(inet_address endpoint) {
_impl->del_leaving_endpoint(endpoint);
}
void
token_metadata::remove_endpoint(inet_address endpoint) {
_impl->remove_endpoint(endpoint);

View File

@@ -238,7 +238,6 @@ public:
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
void add_leaving_endpoint(inet_address endpoint);
void del_leaving_endpoint(inet_address endpoint);
void remove_endpoint(inet_address endpoint);

View File

@@ -477,6 +477,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
// as well as reduce latency as there are potentially many requests
// blocked on schema version request.
case messaging_verb::GOSSIP_DIGEST_SYN:
case messaging_verb::GOSSIP_DIGEST_ACK:
case messaging_verb::GOSSIP_DIGEST_ACK2:
case messaging_verb::GOSSIP_SHUTDOWN:
case messaging_verb::GOSSIP_ECHO:
@@ -504,7 +505,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
case messaging_verb::NODE_OPS_CMD:
case messaging_verb::HINT_MUTATION:
return 1;
case messaging_verb::CLIENT_ID:
@@ -512,7 +512,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::READ_DATA:
case messaging_verb::READ_MUTATION_DATA:
case messaging_verb::READ_DIGEST:
case messaging_verb::GOSSIP_DIGEST_ACK:
case messaging_verb::DEFINITIONS_UPDATE:
case messaging_verb::TRUNCATE:
case messaging_verb::MIGRATION_REQUEST:
@@ -1350,17 +1349,6 @@ future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_rep
return send_message<future<std::vector<row_level_diff_detect_algorithm>>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id));
}
// Wrapper for NODE_OPS_CMD
void messaging_service::register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) {
register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func));
}
future<> messaging_service::unregister_node_ops_cmd() {
return unregister_handler(messaging_verb::NODE_OPS_CMD);
}
future<node_ops_cmd_response> messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) {
return send_message<future<node_ops_cmd_response>>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req));
}
void
messaging_service::register_paxos_prepare(std::function<future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>(
const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot,

View File

@@ -143,8 +143,7 @@ enum class messaging_verb : int32_t {
HINT_MUTATION = 42,
PAXOS_PRUNE = 43,
GOSSIP_GET_ENDPOINT_STATES = 44,
NODE_OPS_CMD = 45,
LAST = 46,
LAST = 45,
};
} // namespace netw
@@ -395,11 +394,6 @@ public:
future<> unregister_repair_get_diff_algorithms();
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
// Wrapper for NODE_OPS_CMD
void register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func);
future<> unregister_node_ops_cmd();
future<node_ops_cmd_response> send_node_ops_cmd(msg_addr id, node_ops_cmd_request);
// Wrapper for GOSSIP_ECHO verb
void register_gossip_echo(std::function<future<> ()>&& func);
future<> unregister_gossip_echo();

View File

@@ -54,14 +54,6 @@ logging::logger rlogger("repair");
static sharded<netw::messaging_service>* _messaging;
void node_ops_info::check_abort() {
if (abort) {
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
rlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
class node_ops_metrics {
public:
node_ops_metrics() {
@@ -443,16 +435,6 @@ void tracker::abort_all_repairs() {
rlogger.info0("Aborted {} repair job(s)", count);
}
void tracker::abort_repair_node_ops(utils::UUID ops_uuid) {
for (auto& x : _repairs[this_shard_id()]) {
auto& ri = x.second;
if (ri->ops_uuid() && ri->ops_uuid().value() == ops_uuid) {
rlogger.info0("Aborted repair jobs for ops_uuid={}", ops_uuid);
ri->abort();
}
}
}
float tracker::report_progress(streaming::stream_reason reason) {
uint64_t nr_ranges_finished = 0;
uint64_t nr_ranges_total = 0;
@@ -811,8 +793,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
repair_uniq_id id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_,
streaming::stream_reason reason_,
std::optional<utils::UUID> ops_uuid)
streaming::stream_reason reason_)
: db(db_)
, messaging(ms_)
, sharder(get_sharder_for_tables(db_, keyspace_, table_ids_))
@@ -826,8 +807,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
, hosts(hosts_)
, reason(reason_)
, nr_ranges_total(ranges.size())
, _row_level_repair(db.local().features().cluster_supports_row_level_repair())
, _ops_uuid(std::move(ops_uuid)) {
, _row_level_repair(db.local().features().cluster_supports_row_level_repair()) {
}
future<> repair_info::do_streaming() {
@@ -1646,7 +1626,7 @@ static int do_repair_start(seastar::sharded<database>& db, seastar::sharded<netw
_node_ops_metrics.repair_total_ranges_sum += ranges.size();
auto ri = make_lw_shared<repair_info>(db, ms,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair, id.uuid);
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair);
return repair_ranges(ri);
});
repair_results.push_back(std::move(f));
@@ -1716,15 +1696,14 @@ static future<> sync_data_using_repair(seastar::sharded<database>& db,
sstring keyspace,
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason,
std::optional<utils::UUID> ops_uuid) {
streaming::stream_reason reason) {
if (ranges.empty()) {
return make_ready_future<>();
}
return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
repair_uniq_id id = repair_tracker().next_repair_command();
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
auto cfs = list_column_families(db.local(), keyspace);
if (cfs.empty()) {
rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace);
@@ -1734,12 +1713,12 @@ static future<> sync_data_using_repair(seastar::sharded<database>& db,
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason, ops_uuid] (database& localdb) mutable {
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason] (database& localdb) mutable {
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ri = make_lw_shared<repair_info>(db, ms,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), reason, ops_uuid);
id, std::move(data_centers), std::move(hosts), reason);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});
@@ -1933,16 +1912,16 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
}
}
auto nr_ranges = desired_ranges.size();
sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, {}).get();
sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason).get();
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
}
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
});
}
static future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
static future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) {
using inet_address = gms::inet_address;
return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable {
return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node)] () mutable {
auto myip = utils::fb_utilities::get_broadcast_address();
auto keyspaces = db.local().get_non_system_keyspaces();
bool is_removenode = myip != leaving_node;
@@ -2001,9 +1980,6 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
auto local_dc = get_local_dc();
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
for (auto&r : ranges) {
if (ops) {
ops->check_abort();
}
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
const std::vector<inet_address> new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes);
const std::vector<inet_address>& current_eps = current_replica_endpoints[r];
@@ -2085,12 +2061,6 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
}
neighbors_set.erase(myip);
neighbors_set.erase(leaving_node);
// Remove nodes in ignore_nodes
if (ops) {
for (const auto& node : ops->ignore_nodes) {
neighbors_set.erase(node);
}
}
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors_set |
boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) {
return snitch_ptr->get_datacenter(node) == local_dc;
@@ -2102,10 +2072,9 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, skipped",
op, keyspace_name, r, current_eps, new_eps, neighbors);
} else {
std::vector<gms::inet_address> mandatory_neighbors = is_removenode ? neighbors : std::vector<gms::inet_address>{};
rlogger.info("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, mandatory_neighbor={}",
op, keyspace_name, r, current_eps, new_eps, neighbors, mandatory_neighbors);
range_sources[r] = repair_neighbors(std::move(neighbors), std::move(mandatory_neighbors));
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}",
op, keyspace_name, r, current_eps, new_eps, neighbors);
range_sources[r] = repair_neighbors(std::move(neighbors));
if (is_removenode) {
ranges_for_removenode.push_back(r);
}
@@ -2125,8 +2094,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
ranges.swap(ranges_for_removenode);
}
auto nr_ranges_synced = ranges.size();
std::optional<utils::UUID> opt_uuid = ops ? std::make_optional<utils::UUID>(ops->ops_uuid) : std::nullopt;
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason, opt_uuid).get();
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
}
@@ -2135,17 +2103,11 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
}
future<> decommission_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr) {
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address(), {});
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address());
}
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node), std::move(ops));
}
future<> abort_repair_node_ops(utils::UUID ops_uuid) {
return smp::invoke_on_all([ops_uuid] {
return repair_tracker().abort_repair_node_ops(ops_uuid);
});
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) {
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node));
}
static future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason) {
@@ -2220,7 +2182,7 @@ static future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, s
}).get();
}
auto nr_ranges = ranges.size();
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason, {}).get();
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
}
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
@@ -2258,19 +2220,12 @@ static future<> init_messaging_service_handler(sharded<database>& db, sharded<ne
return checksum_range(db, keyspace, cf, range, hv);
});
});
ms.register_node_ops_cmd([] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(src_cpu_id % smp::count, [coordinator, req = std::move(req)] () mutable {
return service::get_local_storage_service().node_ops_cmd_handler(coordinator, std::move(req));
});
});
});
}
static future<> uninit_messaging_service_handler() {
return _messaging->invoke_on_all([] (auto& ms) {
return when_all_succeed(ms.unregister_repair_checksum_range(), ms.unregister_node_ops_cmd()).discard_result();
return ms.unregister_repair_checksum_range();
});
}

View File

@@ -76,22 +76,13 @@ struct repair_uniq_id {
};
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
struct node_ops_info {
utils::UUID ops_uuid;
bool abort = false;
std::list<gms::inet_address> ignore_nodes;
void check_abort();
};
// The tokens are the tokens assigned to the bootstrap node.
future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
future<> decommission_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr);
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node);
future<> rebuild_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, sstring source_dc);
future<> replace_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens);
future<> abort_repair_node_ops(utils::UUID ops_uuid);
// NOTE: repair_start() can be run on any node, but starts a node-global
// operation.
// repair_start() starts the requested repair on this node. It returns an
@@ -253,7 +244,6 @@ public:
bool _row_level_repair;
uint64_t _sub_ranges_nr = 0;
std::unordered_set<sstring> dropped_tables;
std::optional<utils::UUID> _ops_uuid;
public:
repair_info(seastar::sharded<database>& db_,
seastar::sharded<netw::messaging_service>& ms_,
@@ -263,8 +253,7 @@ public:
repair_uniq_id id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_,
streaming::stream_reason reason_,
std::optional<utils::UUID> ops_uuid);
streaming::stream_reason reason_);
future<> do_streaming();
void check_failed_ranges();
future<> request_transfer_ranges(const sstring& cf,
@@ -283,9 +272,6 @@ public:
const std::vector<sstring>& table_names() {
return cfs;
}
const std::optional<utils::UUID>& ops_uuid() const {
return _ops_uuid;
};
};
// The repair_tracker tracks ongoing repair operations and their progress.
@@ -338,7 +324,6 @@ public:
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);
void abort_repair_node_ops(utils::UUID ops_uuid);
};
future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
@@ -479,27 +464,6 @@ enum class row_level_diff_detect_algorithm : uint8_t {
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo);
enum class node_ops_cmd : uint32_t {
removenode_prepare,
removenode_heartbeat,
removenode_sync_data,
removenode_abort,
removenode_done,
};
// The cmd and ops_uuid are mandatory for each request.
// The ignore_nodes and leaving_node are optional.
struct node_ops_cmd_request {
node_ops_cmd cmd;
utils::UUID ops_uuid;
std::list<gms::inet_address> ignore_nodes;
std::list<gms::inet_address> leaving_nodes;
};
struct node_ops_cmd_response {
bool ok;
};
namespace std {
template<>
struct hash<partition_checksum> {

Submodule seastar updated: a287bb1a39...2c884a7449

View File

@@ -4931,10 +4931,12 @@ void storage_proxy::init_messaging_service() {
tracing::trace(trace_state_ptr, "read_data: message received from /{}", src_addr.addr);
}
auto da = oda.value_or(query::digest_algorithm::MD5);
auto sp = get_local_shared_storage_proxy();
if (!cmd.max_result_size) {
cmd.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
auto& cfg = sp->local_db().get_config();
cmd.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit());
}
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
return do_with(std::move(pr), std::move(sp), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
p->get_stats().replica_data_reads++;
auto src_ip = src_addr.addr;
return get_schema_for_read(cmd->schema_version, std::move(src_addr), p->_messaging).then([cmd, da, &pr, &p, &trace_state_ptr, t] (schema_ptr s) {

View File

@@ -107,7 +107,6 @@ storage_service::storage_service(abort_source& abort_source, distributed<databas
, _service_memory_total(config.available_memory / 10)
, _service_memory_limiter(_service_memory_total)
, _for_testing(for_testing)
, _node_ops_abort_thread(node_ops_abort_thread())
, _shared_token_metadata(stm)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(view_update_generator)
@@ -1750,12 +1749,9 @@ future<> storage_service::gossip_sharder() {
future<> storage_service::stop() {
// make sure nobody uses the semaphore
node_ops_singal_abort(std::nullopt);
return _service_memory_limiter.wait(_service_memory_total).finally([this] {
_listeners.clear();
return _schema_version_publisher.join();
}).finally([this] {
return std::move(_node_ops_abort_thread);
});
}
@@ -2177,192 +2173,102 @@ future<> storage_service::decommission() {
});
}
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
auto uuid = utils::make_random_uuid();
auto tmptr = ss.get_token_metadata_ptr();
future<> storage_service::removenode(sstring host_id_string) {
return run_with_api_lock(sstring("removenode"), [host_id_string] (storage_service& ss) mutable {
return seastar::async([&ss, host_id_string] {
slogger.debug("removenode: host_id = {}", host_id_string);
auto my_address = ss.get_broadcast_address();
auto tmlock = std::make_unique<token_metadata_lock>(ss.get_token_metadata_lock().get0());
auto tmptr = ss.get_mutable_token_metadata_ptr().get0();
auto local_host_id = tmptr->get_host_id(my_address);
auto host_id = utils::UUID(host_id_string);
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
if (!endpoint_opt) {
throw std::runtime_error(format("removenode[{}]: Host ID not found in the cluster", uuid));
throw std::runtime_error("Host ID not found.");
}
auto endpoint = *endpoint_opt;
auto tokens = tmptr->get_tokens(endpoint);
auto leaving_nodes = std::list<gms::inet_address>{endpoint};
future<> heartbeat_updater = make_ready_future<>();
auto heartbeat_updater_done = make_lw_shared<bool>(false);
slogger.debug("removenode: endpoint = {}", endpoint);
// Step 1: Decide who needs to sync data
//
// By default, we require all nodes in the cluster to participate
// the removenode operation and sync data if needed. We fail the
// removenode operation if any of them is down or fails.
//
// If the user want the removenode opeartion to succeed even if some of the nodes
// are not available, the user has to explicitly pass a list of
// node that can be skipped for the operation.
std::vector<gms::inet_address> nodes;
for (const auto& x : tmptr->get_endpoint_to_host_id_map_for_reading()) {
seastar::thread::maybe_yield();
if (x.first != endpoint && std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) {
nodes.push_back(x.first);
if (endpoint == my_address) {
throw std::runtime_error("Cannot remove self");
}
if (ss._gossiper.get_live_members().contains(endpoint)) {
throw std::runtime_error(format("Node {} is alive and owns this ID. Use decommission command to remove it from the ring", endpoint));
}
// A leaving endpoint that is dead is already being removed.
if (tmptr->is_leaving(endpoint)) {
slogger.warn("Node {} is already being removed, continuing removal anyway", endpoint);
}
if (!ss._replicating_nodes.empty()) {
throw std::runtime_error("This node is already processing a removal. Wait for it to complete, or use 'removenode force' if this has failed.");
}
auto non_system_keyspaces = ss.db().local().get_non_system_keyspaces();
// Find the endpoints that are going to become responsible for data
for (const auto& keyspace_name : non_system_keyspaces) {
auto& ks = ss.db().local().find_keyspace(keyspace_name);
// if the replication factor is 1 the data is lost so we shouldn't wait for confirmation
if (ks.get_replication_strategy().get_replication_factor() == 1) {
slogger.warn("keyspace={} has replication factor 1, the data is probably lost", keyspace_name);
continue;
}
// get all ranges that change ownership (that is, a node needs
// to take responsibility for new range)
std::unordered_multimap<dht::token_range, inet_address> changed_ranges =
ss.get_changed_ranges_for_leaving(keyspace_name, endpoint);
for (auto& x: changed_ranges) {
auto ep = x.second;
if (ss._gossiper.is_alive(ep)) {
ss._replicating_nodes.emplace(ep);
} else {
slogger.warn("Endpoint {} is down and will not receive data for re-replication of {}", ep, endpoint);
}
}
}
slogger.info("removenode[{}]: Started removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
slogger.info("removenode: endpoint = {}, replicating_nodes = {}", endpoint, ss._replicating_nodes);
ss._removing_node = endpoint;
tmptr->add_leaving_endpoint(endpoint);
ss.update_pending_ranges(tmptr, format("removenode {}", endpoint)).get();
ss.replicate_to_all_cores(std::move(tmptr)).get();
tmlock.reset();
// Step 2: Prepare to sync data
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes};
try {
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("removenode[{}]: Got prepare response from node={}", uuid, node);
}).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) {
slogger.warn("removenode[{}]: Node {} does not support removenode verb", uuid, node);
nodes_unknown_verb.emplace(node);
}).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) {
slogger.warn("removenode[{}]: Node {} is down for node_ops_cmd verb", uuid, node);
nodes_down.emplace(node);
});
}).get();
if (!nodes_unknown_verb.empty()) {
auto msg = format("removenode[{}]: Nodes={} do not support removenode verb. Please upgrade your cluster and run removenode again.", uuid, nodes_unknown_verb);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
if (!nodes_down.empty()) {
auto msg = format("removenode[{}]: Nodes={} needed for removenode operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, run nodetool removenode --ignore-dead-nodes <list_of_dead_nodes> <host_id>. E.g., nodetool removenode --ignore-dead-nodes 127.0.0.1,127.0.0.2 817e9515-316f-4fe3-aaab-b00d6f12dddd", uuid, nodes_down);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
// we add our own token so other nodes to let us know when they're done
ss._gossiper.advertise_removing(endpoint, host_id, local_host_id).get();
// Step 3: Start heartbeat updater
heartbeat_updater = seastar::async([&ss, &nodes, uuid, heartbeat_updater_done] {
slogger.debug("removenode[{}]: Started heartbeat_updater", uuid);
while (!(*heartbeat_updater_done)) {
auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}};
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("removenode[{}]: Got heartbeat response from node={}", uuid, node);
return make_ready_future<>();
});
}).handle_exception([uuid] (std::exception_ptr ep) {
slogger.warn("removenode[{}]: Failed to send heartbeat", uuid);
}).get();
int nr_seconds = 10;
while (!(*heartbeat_updater_done) && nr_seconds--) {
sleep_abortable(std::chrono::seconds(1), ss._abort_source).get();
}
}
slogger.debug("removenode[{}]: Stopped heartbeat_updater", uuid);
});
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});
// kick off streaming commands
// No need to wait for restore_replica_count to complete, since
// when it completes, the node will be removed from _replicating_nodes,
// and we wait for _replicating_nodes to become empty below
//FIXME: discarded future.
(void)ss.restore_replica_count(endpoint, my_address).handle_exception([endpoint, my_address] (auto ep) {
slogger.info("Failed to restore_replica_count for node {} on node {}", endpoint, my_address);
});
// Step 4: Start to sync data
req.cmd = node_ops_cmd::removenode_sync_data;
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("removenode[{}]: Got sync_data response from node={}", uuid, node);
return make_ready_future<>();
});
}).get();
// Step 5: Announce the node has left
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
ss.excise(std::move(tmp), endpoint);
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
// Step 6: Finish
req.cmd = node_ops_cmd::removenode_done;
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("removenode[{}]: Got done response from node={}", uuid, node);
return make_ready_future<>();
});
}).get();
slogger.info("removenode[{}]: Finished removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
} catch (...) {
// we need to revert the effect of prepare verb the removenode ops is failed
req.cmd = node_ops_cmd::removenode_abort;
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node)) {
// No need to revert previous prepare cmd for those who do not apply prepare cmd.
return make_ready_future<>();
}
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("removenode[{}]: Got abort response from node={}", uuid, node);
});
}).get();
slogger.info("removenode[{}]: Aborted removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
throw;
// wait for ReplicationFinishedVerbHandler to signal we're done
while (!(ss._replicating_nodes.empty() || ss._force_remove_completion)) {
sleep_abortable(std::chrono::milliseconds(100), ss._abort_source).get();
}
});
});
}
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable {
return seastar::async([&ss, coordinator, req = std::move(req)] () mutable {
auto ops_uuid = req.ops_uuid;
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid);
if (req.cmd == node_ops_cmd::removenode_prepare) {
if (req.leaving_nodes.size() > 1) {
auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& node : req.leaving_nodes) {
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
tmptr->add_leaving_endpoint(node);
}
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
}).get();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& node : req.leaving_nodes) {
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
tmptr->del_leaving_endpoint(node);
}
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
});
},
[&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); });
ss._node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
ss.node_ops_update_heartbeat(ops_uuid);
} else if (req.cmd == node_ops_cmd::removenode_done) {
slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
ss.node_ops_done(ops_uuid);
} else if (req.cmd == node_ops_cmd::removenode_sync_data) {
auto it = ss._node_ops.find(ops_uuid);
if (it == ss._node_ops.end()) {
throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid));
}
auto ops = it->second.get_ops_info();
for (auto& node : req.leaving_nodes) {
slogger.info("removenode[{}]: Started to sync data for removing node={}, coordinator={}", req.ops_uuid, node, coordinator);
removenode_with_repair(ss._db, ss._messaging, ss.get_token_metadata_ptr(), node, ops).get();
}
} else if (req.cmd == node_ops_cmd::removenode_abort) {
ss.node_ops_abort(ops_uuid);
} else {
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd));
slogger.warn("{}", msg);
throw std::runtime_error(msg);
if (ss._force_remove_completion) {
throw std::runtime_error("nodetool removenode force is called by user");
}
node_ops_cmd_response resp;
resp.ok = true;
return resp;
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
ss.excise(std::move(tmp), endpoint);
// gossiper will indicate the token has left
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
ss._replicating_nodes.clear();
ss._removing_node = std::nullopt;
});
});
}
@@ -2606,9 +2512,7 @@ void storage_service::unbootstrap() {
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
if (is_repair_based_node_ops_enabled()) {
auto ops_uuid = utils::make_random_uuid();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::list<gms::inet_address>()});
return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint).finally([this, notify_endpoint] () {
return send_replication_notification(notify_endpoint);
});
}
@@ -3323,111 +3227,5 @@ bool storage_service::is_repair_based_node_ops_enabled() {
return _db.local().get_config().enable_repair_based_node_ops();
}
node_ops_meta_data::node_ops_meta_data(
utils::UUID ops_uuid,
gms::inet_address coordinator,
shared_ptr<node_ops_info> ops,
std::function<future<> ()> abort_func,
std::function<void ()> signal_func)
: _ops_uuid(std::move(ops_uuid))
, _coordinator(std::move(coordinator))
, _abort(std::move(abort_func))
, _signal(std::move(signal_func))
, _ops(std::move(ops))
, _watchdog([sig = _signal] { sig(); }) {
_watchdog.arm(_watchdog_interval);
}
future<> node_ops_meta_data::abort() {
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
_aborted = true;
if (_ops) {
_ops->abort = true;
}
_watchdog.cancel();
return _abort();
}
void node_ops_meta_data::update_watchdog() {
slogger.debug("node_ops_meta_data: ops_uuid={} update_watchdog", _ops_uuid);
if (_aborted) {
return;
}
_watchdog.cancel();
_watchdog.arm(_watchdog_interval);
}
void node_ops_meta_data::cancel_watchdog() {
slogger.debug("node_ops_meta_data: ops_uuid={} cancel_watchdog", _ops_uuid);
_watchdog.cancel();
}
shared_ptr<node_ops_info> node_ops_meta_data::get_ops_info() {
return _ops;
}
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
meta.update_watchdog();
}
}
void storage_service::node_ops_done(utils::UUID ops_uuid) {
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
meta.cancel_watchdog();
_node_ops.erase(it);
}
}
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
meta.abort().get();
abort_repair_node_ops(ops_uuid).get();
_node_ops.erase(it);
}
}
void storage_service::node_ops_singal_abort(std::optional<utils::UUID> ops_uuid) {
slogger.debug("node_ops_singal_abort: ops_uuid={}", ops_uuid);
_node_ops_abort_queue.push_back(ops_uuid);
_node_ops_abort_cond.signal();
}
future<> storage_service::node_ops_abort_thread() {
return seastar::async([this] {
slogger.info("Started node_ops_abort_thread");
for (;;) {
_node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); }).get();
slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue);
while (!_node_ops_abort_queue.empty()) {
auto uuid_opt = _node_ops_abort_queue.front();
_node_ops_abort_queue.pop_front();
if (!uuid_opt) {
return;
}
try {
storage_service::node_ops_abort(*uuid_opt);
} catch (...) {
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
}
}
}
slogger.info("Stopped node_ops_abort_thread");
});
}
} // namespace service

View File

@@ -63,12 +63,6 @@
#include <seastar/core/rwlock.hh>
#include "sstables/version.hh"
#include "cdc/metadata.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/lowres_clock.hh>
class node_ops_cmd_request;
class node_ops_cmd_response;
class node_ops_info;
namespace cql_transport { class controller; }
@@ -109,28 +103,6 @@ struct storage_service_config {
size_t available_memory;
};
class node_ops_meta_data {
utils::UUID _ops_uuid;
gms::inet_address _coordinator;
std::function<future<> ()> _abort;
std::function<void ()> _signal;
shared_ptr<node_ops_info> _ops;
seastar::timer<lowres_clock> _watchdog;
std::chrono::seconds _watchdog_interval{30};
bool _aborted = false;
public:
explicit node_ops_meta_data(
utils::UUID ops_uuid,
gms::inet_address coordinator,
shared_ptr<node_ops_info> ops,
std::function<future<> ()> abort_func,
std::function<void ()> signal_func);
shared_ptr<node_ops_info> get_ops_info();
future<> abort();
void update_watchdog();
void cancel_watchdog();
};
/**
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
@@ -186,17 +158,6 @@ private:
* and would only slow down tests (by having them wait).
*/
bool _for_testing;
std::unordered_map<utils::UUID, node_ops_meta_data> _node_ops;
std::list<std::optional<utils::UUID>> _node_ops_abort_queue;
seastar::condition_variable _node_ops_abort_cond;
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
future<> _node_ops_abort_thread;
void node_ops_update_heartbeat(utils::UUID ops_uuid);
void node_ops_done(utils::UUID ops_uuid);
void node_ops_abort(utils::UUID ops_uuid);
void node_ops_singal_abort(std::optional<utils::UUID> ops_uuid);
future<> node_ops_abort_thread();
public:
storage_service(abort_source& as, distributed<database>& db, gms::gossiper& gossiper, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, storage_service_config config, sharded<service::migration_notifier>& mn, locator::shared_token_metadata& stm, sharded<netw::messaging_service>& ms, /* only for tests */ bool for_testing = false);
@@ -810,8 +771,7 @@ public:
*
* @param hostIdString token for the node
*/
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
future<> removenode(sstring host_id_string);
future<sstring> get_operation_mode();

View File

@@ -310,6 +310,7 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
_generated_monitors.emplace_back(std::move(sst), _compaction_manager, _cf);
return _generated_monitors.back();
}
compaction_read_monitor_generator(compaction_manager& cm, column_family& cf)
: _compaction_manager(cm)
, _cf(cf) {}
@@ -570,6 +571,7 @@ private:
// Do not actually compact a sstable that is fully expired and can be safely
// dropped without ressurrecting old data.
if (tombstone_expiration_enabled() && fully_expired.contains(sst)) {
on_skipped_expired_sstable(sst);
continue;
}
@@ -676,6 +678,9 @@ private:
virtual void on_end_of_compaction() {};
// Inform about every expired sstable that was skipped during setup phase
virtual void on_skipped_expired_sstable(shared_sstable sstable) {}
// create a writer based on decorated key.
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) = 0;
// stop current writer
@@ -919,6 +924,12 @@ public:
}
replace_remaining_exhausted_sstables();
}
virtual void on_skipped_expired_sstable(shared_sstable sstable) override {
// manually register expired sstable into monitor, as it's not being actually compacted
// this will allow expired sstable to be removed from tracker once compaction completes
_monitor_generator(std::move(sstable));
}
private:
void backlog_tracker_incrementally_adjust_charges(std::vector<shared_sstable> exhausted_sstables) {
//

View File

@@ -162,7 +162,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
for (auto& pair : all_buckets.first) {
auto ssts = std::move(pair.second);
if (ssts.size() > offstrategy_threshold) {
ssts.resize(std::min(multi_window.size(), max_sstables));
ssts.resize(std::min(ssts.size(), max_sstables));
compaction_descriptor desc(std::move(ssts), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;

View File

@@ -380,7 +380,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
try {
db.find_column_family(ks, cf);
} catch (no_such_column_family&) {
auto err = format("[Stream #{{}}] prepare requested ks={{}} cf={{}} does not exist", ks, cf);
auto err = format("[Stream #{{}}] prepare requested ks={{}} cf={{}} does not exist", plan_id, ks, cf);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}

View File

@@ -1677,7 +1677,8 @@ write_memtable_to_sstable(flat_mutation_reader reader,
const io_priority_class& pc) {
cfg.replay_position = mt.replay_position();
cfg.monitor = &monitor;
return sst->write_components(std::move(reader), mt.partition_count(), mt.schema(), cfg, mt.get_encoding_stats(), pc);
schema_ptr s = reader.schema();
return sst->write_components(std::move(reader), mt.partition_count(), s, cfg, mt.get_encoding_stats(), pc);
}
future<>

View File

@@ -136,7 +136,7 @@ def test_update_condition_eq_different(test_table_s):
ConditionExpression='a = :val2',
ExpressionAttributeValues={':val1': val1, ':val2': val2})
# Also check an actual case of same time, but inequality.
# Also check an actual case of same type, but inequality.
def test_update_condition_eq_unequal(test_table_s):
p = random_string()
test_table_s.update_item(Key={'p': p},
@@ -146,6 +146,13 @@ def test_update_condition_eq_unequal(test_table_s):
UpdateExpression='SET a = :val1',
ConditionExpression='a = :oldval',
ExpressionAttributeValues={':val1': 3, ':oldval': 2})
# If the attribute being compared doesn't exist, it's considered a failed
# condition, not an error:
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q = :oldval',
ExpressionAttributeValues={':val1': 3, ':oldval': 2})
# Check that set equality is checked correctly. Unlike string equality (for
# example), it cannot be done with just naive string comparison of the JSON
@@ -269,15 +276,44 @@ def test_update_condition_lt(test_table_s):
UpdateExpression='SET z = :newval',
ConditionExpression='a < :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
# Trying to compare an unsupported type - e.g., in the following test
# a boolean, is unfortunately caught by boto3 and cannot be tested here...
#test_table_s.update_item(Key={'p': p},
# AttributeUpdates={'d': {'Value': False, 'Action': 'PUT'}})
#with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
# test_table_s.update_item(Key={'p': p},
# UpdateExpression='SET z = :newval',
# ConditionExpression='d < :oldval',
# ExpressionAttributeValues={':newval': 2, ':oldval': True})
# If the attribute being compared doesn't even exist, this is also
# considered as a false condition - not an error.
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='q < :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval < q',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
# If a comparison parameter comes from a constant specified in the query,
# and it has a type not supported by the comparison (e.g., a list), it's
# not just a failed comparison - it is considered a ValidationException
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a < :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval < a',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
# However, if when the wrong type comes from an item attribute, not the
# query, the comparison is simply false - not a ValidationException.
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='x < :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval < x',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 4
# Test for ConditionExpression with operator "<="
@@ -341,6 +377,44 @@ def test_update_condition_le(test_table_s):
UpdateExpression='SET z = :newval',
ConditionExpression='a <= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
# If the attribute being compared doesn't even exist, this is also
# considered as a false condition - not an error.
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='q <= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval <= q',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
# If a comparison parameter comes from a constant specified in the query,
# and it has a type not supported by the comparison (e.g., a list), it's
# not just a failed comparison - it is considered a ValidationException
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a <= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval <= a',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
# However, if when the wrong type comes from an item attribute, not the
# query, the comparison is simply false - not a ValidationException.
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='x <= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval <= x',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 7
# Test for ConditionExpression with operator ">"
@@ -404,6 +478,44 @@ def test_update_condition_gt(test_table_s):
UpdateExpression='SET z = :newval',
ConditionExpression='a > :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
# If the attribute being compared doesn't even exist, this is also
# considered as a false condition - not an error.
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='q > :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval > q',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
# If a comparison parameter comes from a constant specified in the query,
# and it has a type not supported by the comparison (e.g., a list), it's
# not just a failed comparison - it is considered a ValidationException
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a > :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval > a',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
# However, if when the wrong type comes from an item attribute, not the
# query, the comparison is simply false - not a ValidationException.
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='x > :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval > x',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 4
# Test for ConditionExpression with operator ">="
@@ -467,6 +579,44 @@ def test_update_condition_ge(test_table_s):
UpdateExpression='SET z = :newval',
ConditionExpression='a >= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '0'})
# If the attribute being compared doesn't even exist, this is also
# considered as a false condition - not an error.
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='q >= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval >= q',
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
# If a comparison parameter comes from a constant specified in the query,
# and it has a type not supported by the comparison (e.g., a list), it's
# not just a failed comparison - it is considered a ValidationException
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a >= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval >= a',
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
# However, if when the wrong type comes from an item attribute, not the
# query, the comparison is simply false - not a ValidationException.
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='x >= :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression=':oldval >= x',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 7
# Test for ConditionExpression with ternary operator "BETWEEN" (checking
@@ -548,6 +698,60 @@ def test_update_condition_between(test_table_s):
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
ExpressionAttributeValues={':newval': 2, ':oldval1': '0', ':oldval2': '2'})
# If the attribute being compared doesn't even exist, this is also
# considered as a false condition - not an error.
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='q BETWEEN :oldval1 AND :oldval2',
ExpressionAttributeValues={':newval': 2, ':oldval1': b'dog', ':oldval2': b'zebra'})
# If and operand from the query, and it has a type not supported by the
# comparison (e.g., a list), it's not just a failed condition - it is
# considered a ValidationException
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
ExpressionAttributeValues={':newval': 2, ':oldval1': [1,2], ':oldval2': [2,3]})
# However, if when the wrong type comes from an item attribute, not the
# query, the comparison is simply false - not a ValidationException.
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'},
'y': {'Value': [2,3,4], 'Action': 'PUT'}})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN x and y',
ExpressionAttributeValues={':newval': 2})
# If the two operands come from the query (":val" references) then if they
# have different types or the wrong order, this is a ValidationException.
# But if one or more of the operands come from the item, this only causes
# a false condition - not a ValidationException.
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
ExpressionAttributeValues={':newval': 2, ':oldval1': 2, ':oldval2': 1})
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
ExpressionAttributeValues={':newval': 2, ':oldval1': 2, ':oldval2': 'dog'})
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'two': {'Value': 2, 'Action': 'PUT'}})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN two AND :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN :oldval AND two',
ExpressionAttributeValues={':newval': 2, ':oldval': 3})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET z = :newval',
ConditionExpression='a BETWEEN two AND :oldval',
ExpressionAttributeValues={':newval': 2, ':oldval': 'dog'})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 9
# Test for ConditionExpression with multi-operand operator "IN", checking
@@ -605,6 +809,13 @@ def test_update_condition_in(test_table_s):
UpdateExpression='SET c = :val37',
ConditionExpression='a IN ()',
ExpressionAttributeValues=values)
# If the attribute being compared doesn't even exist, this is also
# considered as a false condition - not an error.
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET c = :val37',
ConditionExpression='q IN ({})'.format(','.join(values.keys())),
ExpressionAttributeValues=values)
# Beyond the above operators, there are also test functions supported -
# attribute_exists, attribute_not_exists, attribute_type, begins_with,

View File

@@ -237,6 +237,30 @@ def test_update_expected_1_le(test_table_s):
'AttributeValueList': [2, 3]}}
)
# Comparison operators like le work only on numbers, strings or bytes.
# As noted in issue #8043, if any other type is included in *the query*,
# the result should be a ValidationException, but if the wrong type appears
# in the item, not the query, the result is a failed condition.
def test_update_expected_1_le_validation(test_table_s):
p = random_string()
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'},
'b': {'Value': [1,2], 'Action': 'PUT'}})
# Bad type (a list) in the query. Result is ValidationException.
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
Expected={'a': {'ComparisonOperator': 'LE',
'AttributeValueList': [[1,2,3]]}}
)
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
Expected={'b': {'ComparisonOperator': 'LE',
'AttributeValueList': [3]}}
)
assert not 'z' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# Tests for Expected with ComparisonOperator = "LT":
def test_update_expected_1_lt(test_table_s):
p = random_string()
@@ -894,6 +918,34 @@ def test_update_expected_1_between(test_table_s):
AttributeUpdates={'z': {'Value': 2, 'Action': 'PUT'}},
Expected={'d': {'ComparisonOperator': 'BETWEEN', 'AttributeValueList': [set([1]), set([2])]}})
# BETWEEN work only on numbers, strings or bytes. As noted in issue #8043,
# if any other type is included in *the query*, the result should be a
# ValidationException, but if the wrong type appears in the item, not the
# query, the result is a failed condition.
# BETWEEN should also generate ValidationException if the two ends of the
# range are not of the same type or not in the correct order, but this
# already is tested in the test above (test_update_expected_1_between).
def test_update_expected_1_between_validation(test_table_s):
p = random_string()
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'},
'b': {'Value': [1,2], 'Action': 'PUT'}})
# Bad type (a list) in the query. Result is ValidationException.
with pytest.raises(ClientError, match='ValidationException'):
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
Expected={'a': {'ComparisonOperator': 'BETWEEN',
'AttributeValueList': [[1,2,3], [2,3,4]]}}
)
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
Expected={'b': {'ComparisonOperator': 'BETWEEN',
'AttributeValueList': [1,2]}}
)
assert not 'z' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
##############################################################################
# Instead of ComparisonOperator and AttributeValueList, one can specify either
# Value or Exists:

View File

@@ -235,6 +235,30 @@ def test_filter_expression_ge(test_table_sn_with_data):
expected_items = [item for item in items if item[xn] >= xv]
assert(got_items == expected_items)
# Comparison operators such as >= or BETWEEN only work on numbers, strings or
# bytes. When an expression's operands come from the item and has a wrong type
# (e.g., a list), the result is that the item is skipped - aborting the scan
# with a ValidationException is a bug (this was issue #8043).
def test_filter_expression_le_bad_type(test_table_sn_with_data):
table, p, items = test_table_sn_with_data
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='l <= :xv',
ExpressionAttributeValues={':p': p, ':xv': 3})
assert got_items == []
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression=':xv <= l',
ExpressionAttributeValues={':p': p, ':xv': 3})
assert got_items == []
def test_filter_expression_between_bad_type(test_table_sn_with_data):
table, p, items = test_table_sn_with_data
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='s between :xv and l',
ExpressionAttributeValues={':p': p, ':xv': 'cat'})
assert got_items == []
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='s between l and :xv',
ExpressionAttributeValues={':p': p, ':xv': 'cat'})
assert got_items == []
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='s between i and :xv',
ExpressionAttributeValues={':p': p, ':xv': 'cat'})
assert got_items == []
# Test the "BETWEEN/AND" ternary operator on a numeric, string and bytes
# attribute. These keywords are case-insensitive.
def test_filter_expression_between(test_table_sn_with_data):

View File

@@ -49,6 +49,16 @@ def testTimeuuid(cql, test_keyspace):
for i in range(4):
uuid = rows[i][1]
datetime = datetime_from_uuid1(uuid)
# Before comparing this datetime to the result of dateOf(), we
# must truncate the resolution of datetime to milliseconds.
# he problem is that the dateOf(timeuuid) CQL function converts a
# timeuuid to CQL's "timestamp" type, which has millisecond
# resolution, but datetime *may* have finer resolution. It will
# usually be whole milliseconds, because this is what the now()
# implementation usually does, but when now() is called more than
# once per millisecond, it *may* start incrementing the sub-
# millisecond part.
datetime = datetime.replace(microsecond=datetime.microsecond//1000*1000)
timestamp = round(datetime.replace(tzinfo=timezone.utc).timestamp() * 1000)
assert_rows(execute(cql, table, "SELECT dateOf(t), unixTimestampOf(t) FROM %s WHERE k = 0 AND t = ?", rows[i][1]),
[datetime, timestamp])

View File

@@ -136,7 +136,7 @@ def test_mix_per_query_timeout_with_other_params(scylla_only, cql, table1):
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES ({key},1,1) USING TIMEOUT 60m AND TTL 1000000 AND TIMESTAMP 321")
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES ({key},2,1) USING TIMESTAMP 42 AND TIMEOUT 30m")
res = list(cql.execute(f"SELECT ttl(v), writetime(v) FROM {table} WHERE p = {key} and c = 1"))
assert len(res) == 1 and res[0].ttl_v == 1000000 and res[0].writetime_v == 321
assert len(res) == 1 and res[0].ttl_v > 0 and res[0].writetime_v == 321
res = list(cql.execute(f"SELECT ttl(v), writetime(v) FROM {table} WHERE p = {key} and c = 2"))
assert len(res) == 1 and not res[0].ttl_v and res[0].writetime_v == 42