Compare commits
23 Commits
debug_form
...
scylla-4.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49cd0b87f0 | ||
|
|
0977a73ab2 | ||
|
|
9fc582ee83 | ||
|
|
4be14c2249 | ||
|
|
3160dd4b59 | ||
|
|
50a8eab1a2 | ||
|
|
04615436a0 | ||
|
|
d1ab37654e | ||
|
|
b47bdb053d | ||
|
|
e11ae8c58f | ||
|
|
e4132edef3 | ||
|
|
492f0802fb | ||
|
|
34f22e1df1 | ||
|
|
acb921845f | ||
|
|
5b6c284281 | ||
|
|
7d15319a8a | ||
|
|
a06412fd24 | ||
|
|
2500dd1dc4 | ||
|
|
fd868722dd | ||
|
|
f470c5d4de | ||
|
|
3677a72a21 | ||
|
|
46e6273821 | ||
|
|
ce7e31013c |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.4.dev
|
||||
VERSION=4.4.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -159,23 +159,40 @@ static bool check_NE(const rjson::value* v1, const rjson::value& v2) {
|
||||
}
|
||||
|
||||
// Check if two JSON-encoded values match with the BEGINS_WITH relation
|
||||
static bool check_BEGINS_WITH(const rjson::value* v1, const rjson::value& v2) {
|
||||
// BEGINS_WITH requires that its single operand (v2) be a string or
|
||||
// binary - otherwise it's a validation error. However, problems with
|
||||
// the stored attribute (v1) will just return false (no match).
|
||||
if (!v2.IsObject() || v2.MemberCount() != 1) {
|
||||
throw api_error::validation(format("BEGINS_WITH operator encountered malformed AttributeValue: {}", v2));
|
||||
}
|
||||
auto it2 = v2.MemberBegin();
|
||||
if (it2->name != "S" && it2->name != "B") {
|
||||
throw api_error::validation(format("BEGINS_WITH operator requires String or Binary type in AttributeValue, got {}", it2->name));
|
||||
}
|
||||
|
||||
|
||||
bool check_BEGINS_WITH(const rjson::value* v1, const rjson::value& v2,
|
||||
bool v1_from_query, bool v2_from_query) {
|
||||
bool bad = false;
|
||||
if (!v1 || !v1->IsObject() || v1->MemberCount() != 1) {
|
||||
if (v1_from_query) {
|
||||
throw api_error::validation("begins_with() encountered malformed argument");
|
||||
} else {
|
||||
bad = true;
|
||||
}
|
||||
} else if (v1->MemberBegin()->name != "S" && v1->MemberBegin()->name != "B") {
|
||||
if (v1_from_query) {
|
||||
throw api_error::validation(format("begins_with supports only string or binary type, got: {}", *v1));
|
||||
} else {
|
||||
bad = true;
|
||||
}
|
||||
}
|
||||
if (!v2.IsObject() || v2.MemberCount() != 1) {
|
||||
if (v2_from_query) {
|
||||
throw api_error::validation("begins_with() encountered malformed argument");
|
||||
} else {
|
||||
bad = true;
|
||||
}
|
||||
} else if (v2.MemberBegin()->name != "S" && v2.MemberBegin()->name != "B") {
|
||||
if (v2_from_query) {
|
||||
throw api_error::validation(format("begins_with() supports only string or binary type, got: {}", v2));
|
||||
} else {
|
||||
bad = true;
|
||||
}
|
||||
}
|
||||
if (bad) {
|
||||
return false;
|
||||
}
|
||||
auto it1 = v1->MemberBegin();
|
||||
auto it2 = v2.MemberBegin();
|
||||
if (it1->name != it2->name) {
|
||||
return false;
|
||||
}
|
||||
@@ -279,24 +296,38 @@ static bool check_NOT_NULL(const rjson::value* val) {
|
||||
return val != nullptr;
|
||||
}
|
||||
|
||||
// Only types S, N or B (string, number or bytes) may be compared by the
|
||||
// various comparion operators - lt, le, gt, ge, and between.
|
||||
static bool check_comparable_type(const rjson::value& v) {
|
||||
if (!v.IsObject() || v.MemberCount() != 1) {
|
||||
return false;
|
||||
}
|
||||
const rjson::value& type = v.MemberBegin()->name;
|
||||
return type == "S" || type == "N" || type == "B";
|
||||
}
|
||||
|
||||
// Check if two JSON-encoded values match with cmp.
|
||||
template <typename Comparator>
|
||||
bool check_compare(const rjson::value* v1, const rjson::value& v2, const Comparator& cmp) {
|
||||
if (!v2.IsObject() || v2.MemberCount() != 1) {
|
||||
throw api_error::validation(
|
||||
format("{} requires a single AttributeValue of type String, Number, or Binary",
|
||||
cmp.diagnostic));
|
||||
bool check_compare(const rjson::value* v1, const rjson::value& v2, const Comparator& cmp,
|
||||
bool v1_from_query, bool v2_from_query) {
|
||||
bool bad = false;
|
||||
if (!v1 || !check_comparable_type(*v1)) {
|
||||
if (v1_from_query) {
|
||||
throw api_error::validation(format("{} allow only the types String, Number, or Binary", cmp.diagnostic));
|
||||
}
|
||||
bad = true;
|
||||
}
|
||||
const auto& kv2 = *v2.MemberBegin();
|
||||
if (kv2.name != "S" && kv2.name != "N" && kv2.name != "B") {
|
||||
throw api_error::validation(
|
||||
format("{} requires a single AttributeValue of type String, Number, or Binary",
|
||||
cmp.diagnostic));
|
||||
if (!check_comparable_type(v2)) {
|
||||
if (v2_from_query) {
|
||||
throw api_error::validation(format("{} allow only the types String, Number, or Binary", cmp.diagnostic));
|
||||
}
|
||||
bad = true;
|
||||
}
|
||||
if (!v1 || !v1->IsObject() || v1->MemberCount() != 1) {
|
||||
if (bad) {
|
||||
return false;
|
||||
}
|
||||
const auto& kv1 = *v1->MemberBegin();
|
||||
const auto& kv2 = *v2.MemberBegin();
|
||||
if (kv1.name != kv2.name) {
|
||||
return false;
|
||||
}
|
||||
@@ -310,7 +341,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
|
||||
if (kv1.name == "B") {
|
||||
return cmp(base64_decode(kv1.value), base64_decode(kv2.value));
|
||||
}
|
||||
clogger.error("check_compare panic: LHS type equals RHS type, but one is in {N,S,B} while the other isn't");
|
||||
// cannot reach here, as check_comparable_type() verifies the type is one
|
||||
// of the above options.
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -341,56 +373,71 @@ struct cmp_gt {
|
||||
static constexpr const char* diagnostic = "GT operator";
|
||||
};
|
||||
|
||||
// True if v is between lb and ub, inclusive. Throws if lb > ub.
|
||||
// True if v is between lb and ub, inclusive. Throws or returns false
|
||||
// (depending on bounds_from_query parameter) if lb > ub.
|
||||
template <typename T>
|
||||
static bool check_BETWEEN(const T& v, const T& lb, const T& ub) {
|
||||
static bool check_BETWEEN(const T& v, const T& lb, const T& ub, bool bounds_from_query) {
|
||||
if (cmp_lt()(ub, lb)) {
|
||||
throw api_error::validation(
|
||||
format("BETWEEN operator requires lower_bound <= upper_bound, but {} > {}", lb, ub));
|
||||
if (bounds_from_query) {
|
||||
throw api_error::validation(
|
||||
format("BETWEEN operator requires lower_bound <= upper_bound, but {} > {}", lb, ub));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return cmp_ge()(v, lb) && cmp_le()(v, ub);
|
||||
}
|
||||
|
||||
static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const rjson::value& ub) {
|
||||
if (!v) {
|
||||
static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const rjson::value& ub,
|
||||
bool v_from_query, bool lb_from_query, bool ub_from_query) {
|
||||
if ((v && v_from_query && !check_comparable_type(*v)) ||
|
||||
(lb_from_query && !check_comparable_type(lb)) ||
|
||||
(ub_from_query && !check_comparable_type(ub))) {
|
||||
throw api_error::validation("between allow only the types String, Number, or Binary");
|
||||
|
||||
}
|
||||
if (!v || !v->IsObject() || v->MemberCount() != 1 ||
|
||||
!lb.IsObject() || lb.MemberCount() != 1 ||
|
||||
!ub.IsObject() || ub.MemberCount() != 1) {
|
||||
return false;
|
||||
}
|
||||
if (!v->IsObject() || v->MemberCount() != 1) {
|
||||
throw api_error::validation(format("BETWEEN operator encountered malformed AttributeValue: {}", *v));
|
||||
}
|
||||
if (!lb.IsObject() || lb.MemberCount() != 1) {
|
||||
throw api_error::validation(format("BETWEEN operator encountered malformed AttributeValue: {}", lb));
|
||||
}
|
||||
if (!ub.IsObject() || ub.MemberCount() != 1) {
|
||||
throw api_error::validation(format("BETWEEN operator encountered malformed AttributeValue: {}", ub));
|
||||
}
|
||||
|
||||
const auto& kv_v = *v->MemberBegin();
|
||||
const auto& kv_lb = *lb.MemberBegin();
|
||||
const auto& kv_ub = *ub.MemberBegin();
|
||||
bool bounds_from_query = lb_from_query && ub_from_query;
|
||||
if (kv_lb.name != kv_ub.name) {
|
||||
throw api_error::validation(
|
||||
if (bounds_from_query) {
|
||||
throw api_error::validation(
|
||||
format("BETWEEN operator requires the same type for lower and upper bound; instead got {} and {}",
|
||||
kv_lb.name, kv_ub.name));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (kv_v.name != kv_lb.name) { // Cannot compare different types, so v is NOT between lb and ub.
|
||||
return false;
|
||||
}
|
||||
if (kv_v.name == "N") {
|
||||
const char* diag = "BETWEEN operator";
|
||||
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag));
|
||||
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "S") {
|
||||
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
|
||||
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
|
||||
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()));
|
||||
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
|
||||
bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "B") {
|
||||
return check_BETWEEN(base64_decode(kv_v.value), base64_decode(kv_lb.value), base64_decode(kv_ub.value));
|
||||
return check_BETWEEN(base64_decode(kv_v.value), base64_decode(kv_lb.value), base64_decode(kv_ub.value), bounds_from_query);
|
||||
}
|
||||
throw api_error::validation(
|
||||
format("BETWEEN operator requires AttributeValueList elements to be of type String, Number, or Binary; instead got {}",
|
||||
if (v_from_query) {
|
||||
throw api_error::validation(
|
||||
format("BETWEEN operator requires AttributeValueList elements to be of type String, Number, or Binary; instead got {}",
|
||||
kv_lb.name));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Verify one Expect condition on one attribute (whose content is "got")
|
||||
@@ -437,19 +484,19 @@ static bool verify_expected_one(const rjson::value& condition, const rjson::valu
|
||||
return check_NE(got, (*attribute_value_list)[0]);
|
||||
case comparison_operator_type::LT:
|
||||
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_lt{});
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_lt{}, false, true);
|
||||
case comparison_operator_type::LE:
|
||||
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_le{});
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_le{}, false, true);
|
||||
case comparison_operator_type::GT:
|
||||
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_gt{});
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_gt{}, false, true);
|
||||
case comparison_operator_type::GE:
|
||||
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_ge{});
|
||||
return check_compare(got, (*attribute_value_list)[0], cmp_ge{}, false, true);
|
||||
case comparison_operator_type::BEGINS_WITH:
|
||||
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
|
||||
return check_BEGINS_WITH(got, (*attribute_value_list)[0]);
|
||||
return check_BEGINS_WITH(got, (*attribute_value_list)[0], false, true);
|
||||
case comparison_operator_type::IN:
|
||||
verify_operand_count(attribute_value_list, nonempty(), *comparison_operator);
|
||||
return check_IN(got, *attribute_value_list);
|
||||
@@ -461,7 +508,8 @@ static bool verify_expected_one(const rjson::value& condition, const rjson::valu
|
||||
return check_NOT_NULL(got);
|
||||
case comparison_operator_type::BETWEEN:
|
||||
verify_operand_count(attribute_value_list, exact_size(2), *comparison_operator);
|
||||
return check_BETWEEN(got, (*attribute_value_list)[0], (*attribute_value_list)[1]);
|
||||
return check_BETWEEN(got, (*attribute_value_list)[0], (*attribute_value_list)[1],
|
||||
false, true, true);
|
||||
case comparison_operator_type::CONTAINS:
|
||||
{
|
||||
verify_operand_count(attribute_value_list, exact_size(1), *comparison_operator);
|
||||
@@ -573,7 +621,8 @@ static bool calculate_primitive_condition(const parsed::primitive_condition& con
|
||||
// Shouldn't happen unless we have a bug in the parser
|
||||
throw std::logic_error(format("Wrong number of values {} in BETWEEN primitive_condition", cond._values.size()));
|
||||
}
|
||||
return check_BETWEEN(&calculated_values[0], calculated_values[1], calculated_values[2]);
|
||||
return check_BETWEEN(&calculated_values[0], calculated_values[1], calculated_values[2],
|
||||
cond._values[0].is_constant(), cond._values[1].is_constant(), cond._values[2].is_constant());
|
||||
case parsed::primitive_condition::type::IN:
|
||||
return check_IN(calculated_values);
|
||||
case parsed::primitive_condition::type::VALUE:
|
||||
@@ -604,13 +653,17 @@ static bool calculate_primitive_condition(const parsed::primitive_condition& con
|
||||
case parsed::primitive_condition::type::NE:
|
||||
return check_NE(&calculated_values[0], calculated_values[1]);
|
||||
case parsed::primitive_condition::type::GT:
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_gt{});
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_gt{},
|
||||
cond._values[0].is_constant(), cond._values[1].is_constant());
|
||||
case parsed::primitive_condition::type::GE:
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_ge{});
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_ge{},
|
||||
cond._values[0].is_constant(), cond._values[1].is_constant());
|
||||
case parsed::primitive_condition::type::LT:
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_lt{});
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_lt{},
|
||||
cond._values[0].is_constant(), cond._values[1].is_constant());
|
||||
case parsed::primitive_condition::type::LE:
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_le{});
|
||||
return check_compare(&calculated_values[0], calculated_values[1], cmp_le{},
|
||||
cond._values[0].is_constant(), cond._values[1].is_constant());
|
||||
default:
|
||||
// Shouldn't happen unless we have a bug in the parser
|
||||
throw std::logic_error(format("Unknown type {} in primitive_condition object", (int)(cond._op)));
|
||||
|
||||
@@ -52,6 +52,7 @@ bool verify_expected(const rjson::value& req, const rjson::value* previous_item)
|
||||
bool verify_condition(const rjson::value& condition, bool require_all, const rjson::value* previous_item);
|
||||
|
||||
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2);
|
||||
bool check_BEGINS_WITH(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query);
|
||||
|
||||
bool verify_condition_expression(
|
||||
const parsed::condition_expression& condition_expression,
|
||||
|
||||
@@ -603,52 +603,8 @@ std::unordered_map<std::string_view, function_handler_type*> function_handlers {
|
||||
}
|
||||
rjson::value v1 = calculate_value(f._parameters[0], caller, previous_item);
|
||||
rjson::value v2 = calculate_value(f._parameters[1], caller, previous_item);
|
||||
// TODO: There's duplication here with check_BEGINS_WITH().
|
||||
// But unfortunately, the two functions differ a bit.
|
||||
|
||||
// If one of v1 or v2 is malformed or has an unsupported type
|
||||
// (not B or S), what we do depends on whether it came from
|
||||
// the user's query (is_constant()), or the item. Unsupported
|
||||
// values in the query result in an error, but if they are in
|
||||
// the item, we silently return false (no match).
|
||||
bool bad = false;
|
||||
if (!v1.IsObject() || v1.MemberCount() != 1) {
|
||||
bad = true;
|
||||
if (f._parameters[0].is_constant()) {
|
||||
throw api_error::validation(format("{}: begins_with() encountered malformed AttributeValue: {}", caller, v1));
|
||||
}
|
||||
} else if (v1.MemberBegin()->name != "S" && v1.MemberBegin()->name != "B") {
|
||||
bad = true;
|
||||
if (f._parameters[0].is_constant()) {
|
||||
throw api_error::validation(format("{}: begins_with() supports only string or binary in AttributeValue: {}", caller, v1));
|
||||
}
|
||||
}
|
||||
if (!v2.IsObject() || v2.MemberCount() != 1) {
|
||||
bad = true;
|
||||
if (f._parameters[1].is_constant()) {
|
||||
throw api_error::validation(format("{}: begins_with() encountered malformed AttributeValue: {}", caller, v2));
|
||||
}
|
||||
} else if (v2.MemberBegin()->name != "S" && v2.MemberBegin()->name != "B") {
|
||||
bad = true;
|
||||
if (f._parameters[1].is_constant()) {
|
||||
throw api_error::validation(format("{}: begins_with() supports only string or binary in AttributeValue: {}", caller, v2));
|
||||
}
|
||||
}
|
||||
bool ret = false;
|
||||
if (!bad) {
|
||||
auto it1 = v1.MemberBegin();
|
||||
auto it2 = v2.MemberBegin();
|
||||
if (it1->name == it2->name) {
|
||||
if (it2->name == "S") {
|
||||
std::string_view val1 = rjson::to_string_view(it1->value);
|
||||
std::string_view val2 = rjson::to_string_view(it2->value);
|
||||
ret = val1.starts_with(val2);
|
||||
} else /* it2->name == "B" */ {
|
||||
ret = base64_begins_with(rjson::to_string_view(it1->value), rjson::to_string_view(it2->value));
|
||||
}
|
||||
}
|
||||
}
|
||||
return to_bool_json(ret);
|
||||
return to_bool_json(check_BEGINS_WITH(v1.IsNull() ? nullptr : &v1, v2,
|
||||
f._parameters[0].is_constant(), f._parameters[1].is_constant()));
|
||||
}
|
||||
},
|
||||
{"contains", [] (calculate_value_caller caller, const rjson::value* previous_item, const parsed::value::function_call& f) {
|
||||
|
||||
@@ -1105,14 +1105,6 @@
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"ignore_nodes",
|
||||
"description":"List of dead nodes to ingore in removenode operation",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -656,7 +656,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_bloom_filter_disk_space_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst->filter_size();
|
||||
return s + sst->filter_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -664,7 +664,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_bloom_filter_disk_space_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst->filter_size();
|
||||
return s + sst->filter_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -672,7 +672,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_bloom_filter_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst->filter_memory_size();
|
||||
return s + sst->filter_memory_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -680,7 +680,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_bloom_filter_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst->filter_memory_size();
|
||||
return s + sst->filter_memory_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -688,7 +688,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_index_summary_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst->get_summary().memory_footprint();
|
||||
return s + sst->get_summary().memory_footprint();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -696,7 +696,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_index_summary_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst->get_summary().memory_footprint();
|
||||
return s + sst->get_summary().memory_footprint();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
#include <time.h>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/adaptor/filtered.hpp>
|
||||
#include <boost/algorithm/string/trim_all.hpp>
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/load_meter.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
@@ -497,22 +496,7 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
|
||||
ss::remove_node.set(r, [](std::unique_ptr<request> req) {
|
||||
auto host_id = req->get_query_param("host_id");
|
||||
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
|
||||
auto ignore_nodes = std::list<gms::inet_address>();
|
||||
for (std::string n : ignore_nodes_strs) {
|
||||
try {
|
||||
std::replace(n.begin(), n.end(), '\"', ' ');
|
||||
std::replace(n.begin(), n.end(), '\'', ' ');
|
||||
boost::trim_all(n);
|
||||
if (!n.empty()) {
|
||||
auto node = gms::inet_address(n);
|
||||
ignore_nodes.push_back(node);
|
||||
}
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
|
||||
}
|
||||
}
|
||||
return service::get_local_storage_service().removenode(host_id, std::move(ignore_nodes)).then([] {
|
||||
return service::get_local_storage_service().removenode(host_id).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
});
|
||||
|
||||
4
dist/common/scripts/scylla_io_setup
vendored
4
dist/common/scripts/scylla_io_setup
vendored
@@ -244,12 +244,12 @@ if __name__ == "__main__":
|
||||
# and https://cloud.google.com/compute/docs/disks/local-ssd#nvme
|
||||
# note that scylla iotune might measure more, this is GCP recommended
|
||||
mbs=1024*1024
|
||||
if nr_disks >= 1 & nr_disks < 4:
|
||||
if nr_disks >= 1 and nr_disks < 4:
|
||||
disk_properties["read_iops"] = 170000 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 660 * mbs * nr_disks
|
||||
disk_properties["write_iops"] = 90000 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 350 * mbs * nr_disks
|
||||
elif nr_disks >= 4 & nr_disks <= 8:
|
||||
elif nr_disks >= 4 and nr_disks <= 8:
|
||||
disk_properties["read_iops"] = 680000
|
||||
disk_properties["read_bandwidth"] = 2650 * mbs
|
||||
disk_properties["write_iops"] = 360000
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -380,6 +380,8 @@ class aws_instance:
|
||||
raise Exception("found more than one disk mounted at root'".format(root_dev_candidates))
|
||||
|
||||
root_dev = root_dev_candidates[0].device
|
||||
if root_dev == '/dev/root':
|
||||
root_dev = run('findmnt -n -o SOURCE /', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
nvmes_present = list(filter(nvme_re.match, os.listdir("/dev")))
|
||||
return {"root": [ root_dev ], "ephemeral": [ x for x in nvmes_present if not root_dev.startswith(os.path.join("/dev/", x)) ] }
|
||||
|
||||
|
||||
2
dist/debian/debian/rules
vendored
2
dist/debian/debian/rules
vendored
@@ -33,7 +33,7 @@ endif
|
||||
dh_installinit --no-start --name scylla-housekeeping-daily
|
||||
dh_installinit --no-start --name scylla-housekeeping-restart
|
||||
dh_installinit --no-start --name scylla-fstrim
|
||||
dh_installinit --no-start --name node-exporter
|
||||
dh_installinit --no-start --name scylla-node-exporter
|
||||
|
||||
override_dh_strip:
|
||||
# The binaries (ethtool...patchelf) don't pass dh_strip after going through patchelf. Since they are
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=666.development
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.4/latest/scylla.repo
|
||||
ARG VERSION=4.4
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -26,26 +26,31 @@ fi
|
||||
print_usage() {
|
||||
echo "build_offline_installer.sh --repo [URL]"
|
||||
echo " --repo repository for fetching scylla rpm, specify .repo file URL"
|
||||
echo " --releasever use specific minor version of the distribution repo (ex: 7.4)"
|
||||
echo " --image [IMAGE] Use the specified docker IMAGE"
|
||||
echo " --no-docker Build offline installer without using docker"
|
||||
exit 1
|
||||
}
|
||||
|
||||
is_rhel7_variant() {
|
||||
[ "$ID" = "rhel" -o "$ID" = "ol" -o "$ID" = "centos" ] && [[ "$VERSION_ID" =~ ^7 ]]
|
||||
}
|
||||
here="$(realpath $(dirname "$0"))"
|
||||
releasever=`rpm -q --provides $(rpm -q --whatprovides "system-release(releasever)") | grep "system-release(releasever)"| uniq | cut -d ' ' -f 3`
|
||||
|
||||
REPO=
|
||||
RELEASEVER=
|
||||
IMAGE=docker.io/centos:7
|
||||
NO_DOCKER=false
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--repo")
|
||||
REPO=$2
|
||||
shift 2
|
||||
;;
|
||||
"--releasever")
|
||||
RELEASEVER=$2
|
||||
"--image")
|
||||
IMAGE=$2
|
||||
shift 2
|
||||
;;
|
||||
"--no-docker")
|
||||
NO_DOCKER=true
|
||||
shift 1
|
||||
;;
|
||||
*)
|
||||
print_usage
|
||||
;;
|
||||
@@ -59,25 +64,17 @@ if [ -z $REPO ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! is_rhel7_variant; then
|
||||
echo "Unsupported distribution"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$ID" = "centos" ]; then
|
||||
if [ ! -f /etc/yum.repos.d/epel.repo ]; then
|
||||
sudo yum install -y epel-release
|
||||
if ! $NO_DOCKER; then
|
||||
if [[ -f ~/.config/scylladb/dbuild ]]; then
|
||||
. ~/.config/scylladb/dbuild
|
||||
fi
|
||||
RELEASE=7
|
||||
else
|
||||
if [ ! -f /etc/yum.repos.d/epel.repo ]; then
|
||||
sudo rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
|
||||
if which docker >/dev/null 2>&1 ; then
|
||||
tool=${DBUILD_TOOL-docker}
|
||||
elif which podman >/dev/null 2>&1 ; then
|
||||
tool=${DBUILD_TOOL-podman}
|
||||
else
|
||||
echo "Please make sure you install either podman or docker on this machine to run dbuild" && exit 1
|
||||
fi
|
||||
RELEASE=7Server
|
||||
fi
|
||||
|
||||
if [ ! -f /usr/bin/yumdownloader ]; then
|
||||
sudo yum -y install yum-utils
|
||||
fi
|
||||
|
||||
if [ ! -f /usr/bin/wget ]; then
|
||||
@@ -85,29 +82,55 @@ if [ ! -f /usr/bin/wget ]; then
|
||||
fi
|
||||
|
||||
if [ ! -f /usr/bin/makeself ]; then
|
||||
sudo yum -y install makeself
|
||||
if $NO_DOCKER; then
|
||||
# makeself on EPEL7 is too old, borrow it from EPEL8
|
||||
# since there is no dependency on the package, it just work
|
||||
if [ $release_major = '7' ]; then
|
||||
sudo rpm --import https://dl.fedoraproject.org/pub/epel/RPM-GPG-KEY-EPEL-8
|
||||
sudo cp "$here"/lib/epel8.repo /etc/yum.repos.d/
|
||||
YUM_OPTS="--enablerepo=epel8"
|
||||
elif [ $release_major = '8' ]; then
|
||||
yum -y install epel-release || yum -y install https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm
|
||||
fi
|
||||
fi
|
||||
sudo yum -y install "$YUM_OPTS" makeself
|
||||
fi
|
||||
|
||||
if [ ! -f /usr/bin/createrepo ]; then
|
||||
sudo yum -y install createrepo
|
||||
fi
|
||||
|
||||
sudo yum -y install yum-plugin-downloadonly
|
||||
makeself_ver=$(makeself --version|cut -d ' ' -f 3|sed -e 's/\.//g')
|
||||
if [ $makeself_ver -lt 240 ]; then
|
||||
echo "$(makeself --version) is too old, please install 2.4.0 or later"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
cd /etc/yum.repos.d/
|
||||
sudo wget -N $REPO
|
||||
cd -
|
||||
|
||||
sudo rm -rf build/installroot build/offline_installer build/scylla_offline_installer.sh
|
||||
sudo rm -rf build/installroot build/offline_docker build/offline_installer build/scylla_offline_installer.sh
|
||||
mkdir -p build/installroot
|
||||
mkdir -p build/installroot/etc/yum/vars
|
||||
sudo sh -c "echo $RELEASE >> build/installroot/etc/yum/vars/releasever"
|
||||
|
||||
mkdir -p build/offline_docker
|
||||
wget "$REPO" -O build/offline_docker/scylla.repo
|
||||
cp "$here"/lib/install_deps.sh build/offline_docker
|
||||
cp "$here"/lib/Dockerfile.in build/offline_docker/Dockerfile
|
||||
sed -i -e "s#@@IMAGE@@#$IMAGE#" build/offline_docker/Dockerfile
|
||||
|
||||
cd build/offline_docker
|
||||
if $NO_DOCKER; then
|
||||
sudo cp scylla.repo /etc/yum.repos.d/scylla.repo
|
||||
sudo ./install_deps.sh
|
||||
else
|
||||
image_id=$($tool build -q .)
|
||||
fi
|
||||
cd -
|
||||
|
||||
mkdir -p build/offline_installer
|
||||
cp dist/offline_installer/redhat/header build/offline_installer
|
||||
if [ -n "$RELEASEVER" ]; then
|
||||
YUMOPTS="--releasever=$RELEASEVER"
|
||||
cp "$here"/lib/header build/offline_installer
|
||||
if $NO_DOCKER; then
|
||||
"$here"/lib/construct_offline_repo.sh
|
||||
else
|
||||
./tools/toolchain/dbuild --image "$image_id" -- "$here"/lib/construct_offline_repo.sh
|
||||
fi
|
||||
sudo yum -y install $YUMOPTS --downloadonly --installroot=`pwd`/build/installroot --downloaddir=build/offline_installer scylla sudo ntp ntpdate net-tools kernel-tools
|
||||
(cd build/offline_installer; createrepo -v .)
|
||||
(cd build; makeself offline_installer scylla_offline_installer.sh "Scylla offline package" ./header)
|
||||
(cd build; makeself --keep-umask offline_installer scylla_offline_installer.sh "Scylla offline package" ./header)
|
||||
|
||||
5
dist/offline_installer/redhat/lib/Dockerfile.in
vendored
Normal file
5
dist/offline_installer/redhat/lib/Dockerfile.in
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
FROM @@IMAGE@@
|
||||
ADD install_deps.sh install_deps.sh
|
||||
RUN ./install_deps.sh
|
||||
ADD scylla.repo /etc/yum.repos.d/scylla.repo
|
||||
CMD /bin/bash
|
||||
9
dist/offline_installer/redhat/lib/construct_offline_repo.sh
vendored
Executable file
9
dist/offline_installer/redhat/lib/construct_offline_repo.sh
vendored
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
releasever=`rpm -q --provides $(rpm -q --whatprovides "system-release(releasever)") | grep "system-release(releasever)"| uniq | cut -d ' ' -f 3`
|
||||
|
||||
# Can ignore error since we only needed when files exists
|
||||
cp /etc/yum/vars/* build/installroot/etc/yum/vars/ ||:
|
||||
|
||||
# run yum in non-root mode using fakeroot
|
||||
fakeroot yum -y install --downloadonly --releasever="$releasever" --installroot=`pwd`/build/installroot --downloaddir=build/offline_installer scylla sudo chrony net-tools kernel-tools mdadm xfsprogs
|
||||
7
dist/offline_installer/redhat/lib/epel8.repo
vendored
Normal file
7
dist/offline_installer/redhat/lib/epel8.repo
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
[epel8]
|
||||
name=Extra Packages for Enterprise Linux 8 - $basearch
|
||||
#baseurl=https://download.fedoraproject.org/pub/epel/8/Everything/$basearch
|
||||
metalink=https://mirrors.fedoraproject.org/metalink?repo=epel-8&arch=$basearch&infra=$infra&content=$contentdir
|
||||
enabled=0
|
||||
gpgcheck=1
|
||||
countme=1
|
||||
12
dist/offline_installer/redhat/lib/install_deps.sh
vendored
Executable file
12
dist/offline_installer/redhat/lib/install_deps.sh
vendored
Executable file
@@ -0,0 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
. /etc/os-release
|
||||
|
||||
release_major=$(echo $VERSION_ID|sed -e 's/^\([0-9]*\)[^0-9]*.*/\1/')
|
||||
|
||||
if [ ! -f /etc/yum.repos.d/epel.repo ]; then
|
||||
yum -y install epel-release || yum -y install https://dl.fedoraproject.org/pub/epel/epel-release-latest-"$release_major".noarch.rpm
|
||||
fi
|
||||
if [ ! -f /usr/bin/fakeroot ]; then
|
||||
yum -y install fakeroot
|
||||
fi
|
||||
@@ -103,22 +103,3 @@ enum class repair_row_level_start_status: uint8_t {
|
||||
struct repair_row_level_start_response {
|
||||
repair_row_level_start_status status;
|
||||
};
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
};
|
||||
|
||||
struct node_ops_cmd_request {
|
||||
node_ops_cmd cmd;
|
||||
utils::UUID ops_uuid;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
};
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
bool ok;
|
||||
};
|
||||
|
||||
@@ -335,7 +335,6 @@ public:
|
||||
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
|
||||
|
||||
void add_leaving_endpoint(inet_address endpoint);
|
||||
void del_leaving_endpoint(inet_address endpoint);
|
||||
public:
|
||||
void remove_endpoint(inet_address endpoint);
|
||||
#if 0
|
||||
@@ -1658,10 +1657,6 @@ void token_metadata_impl::add_leaving_endpoint(inet_address endpoint) {
|
||||
_leaving_endpoints.emplace(endpoint);
|
||||
}
|
||||
|
||||
void token_metadata_impl::del_leaving_endpoint(inet_address endpoint) {
|
||||
_leaving_endpoints.erase(endpoint);
|
||||
}
|
||||
|
||||
void token_metadata_impl::add_replacing_endpoint(inet_address existing_node, inet_address replacing_node) {
|
||||
tlogger.info("Added node {} as pending replacing endpoint which replaces existing node {}",
|
||||
replacing_node, existing_node);
|
||||
@@ -1932,11 +1927,6 @@ token_metadata::add_leaving_endpoint(inet_address endpoint) {
|
||||
_impl->add_leaving_endpoint(endpoint);
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::del_leaving_endpoint(inet_address endpoint) {
|
||||
_impl->del_leaving_endpoint(endpoint);
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::remove_endpoint(inet_address endpoint) {
|
||||
_impl->remove_endpoint(endpoint);
|
||||
|
||||
@@ -238,7 +238,6 @@ public:
|
||||
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
|
||||
|
||||
void add_leaving_endpoint(inet_address endpoint);
|
||||
void del_leaving_endpoint(inet_address endpoint);
|
||||
|
||||
void remove_endpoint(inet_address endpoint);
|
||||
|
||||
|
||||
@@ -477,6 +477,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
// as well as reduce latency as there are potentially many requests
|
||||
// blocked on schema version request.
|
||||
case messaging_verb::GOSSIP_DIGEST_SYN:
|
||||
case messaging_verb::GOSSIP_DIGEST_ACK:
|
||||
case messaging_verb::GOSSIP_DIGEST_ACK2:
|
||||
case messaging_verb::GOSSIP_SHUTDOWN:
|
||||
case messaging_verb::GOSSIP_ECHO:
|
||||
@@ -504,7 +505,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
|
||||
case messaging_verb::NODE_OPS_CMD:
|
||||
case messaging_verb::HINT_MUTATION:
|
||||
return 1;
|
||||
case messaging_verb::CLIENT_ID:
|
||||
@@ -512,7 +512,6 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::READ_DATA:
|
||||
case messaging_verb::READ_MUTATION_DATA:
|
||||
case messaging_verb::READ_DIGEST:
|
||||
case messaging_verb::GOSSIP_DIGEST_ACK:
|
||||
case messaging_verb::DEFINITIONS_UPDATE:
|
||||
case messaging_verb::TRUNCATE:
|
||||
case messaging_verb::MIGRATION_REQUEST:
|
||||
@@ -1350,17 +1349,6 @@ future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_rep
|
||||
return send_message<future<std::vector<row_level_diff_detect_algorithm>>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id));
|
||||
}
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void messaging_service::register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) {
|
||||
register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_node_ops_cmd() {
|
||||
return unregister_handler(messaging_verb::NODE_OPS_CMD);
|
||||
}
|
||||
future<node_ops_cmd_response> messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) {
|
||||
return send_message<future<node_ops_cmd_response>>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req));
|
||||
}
|
||||
|
||||
void
|
||||
messaging_service::register_paxos_prepare(std::function<future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>(
|
||||
const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot,
|
||||
|
||||
@@ -143,8 +143,7 @@ enum class messaging_verb : int32_t {
|
||||
HINT_MUTATION = 42,
|
||||
PAXOS_PRUNE = 43,
|
||||
GOSSIP_GET_ENDPOINT_STATES = 44,
|
||||
NODE_OPS_CMD = 45,
|
||||
LAST = 46,
|
||||
LAST = 45,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -395,11 +394,6 @@ public:
|
||||
future<> unregister_repair_get_diff_algorithms();
|
||||
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func);
|
||||
future<> unregister_node_ops_cmd();
|
||||
future<node_ops_cmd_response> send_node_ops_cmd(msg_addr id, node_ops_cmd_request);
|
||||
|
||||
// Wrapper for GOSSIP_ECHO verb
|
||||
void register_gossip_echo(std::function<future<> ()>&& func);
|
||||
future<> unregister_gossip_echo();
|
||||
|
||||
@@ -54,14 +54,6 @@ logging::logger rlogger("repair");
|
||||
|
||||
static sharded<netw::messaging_service>* _messaging;
|
||||
|
||||
void node_ops_info::check_abort() {
|
||||
if (abort) {
|
||||
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
|
||||
rlogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
}
|
||||
|
||||
class node_ops_metrics {
|
||||
public:
|
||||
node_ops_metrics() {
|
||||
@@ -443,16 +435,6 @@ void tracker::abort_all_repairs() {
|
||||
rlogger.info0("Aborted {} repair job(s)", count);
|
||||
}
|
||||
|
||||
void tracker::abort_repair_node_ops(utils::UUID ops_uuid) {
|
||||
for (auto& x : _repairs[this_shard_id()]) {
|
||||
auto& ri = x.second;
|
||||
if (ri->ops_uuid() && ri->ops_uuid().value() == ops_uuid) {
|
||||
rlogger.info0("Aborted repair jobs for ops_uuid={}", ops_uuid);
|
||||
ri->abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
float tracker::report_progress(streaming::stream_reason reason) {
|
||||
uint64_t nr_ranges_finished = 0;
|
||||
uint64_t nr_ranges_total = 0;
|
||||
@@ -811,8 +793,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
||||
repair_uniq_id id_,
|
||||
const std::vector<sstring>& data_centers_,
|
||||
const std::vector<sstring>& hosts_,
|
||||
streaming::stream_reason reason_,
|
||||
std::optional<utils::UUID> ops_uuid)
|
||||
streaming::stream_reason reason_)
|
||||
: db(db_)
|
||||
, messaging(ms_)
|
||||
, sharder(get_sharder_for_tables(db_, keyspace_, table_ids_))
|
||||
@@ -826,8 +807,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
||||
, hosts(hosts_)
|
||||
, reason(reason_)
|
||||
, nr_ranges_total(ranges.size())
|
||||
, _row_level_repair(db.local().features().cluster_supports_row_level_repair())
|
||||
, _ops_uuid(std::move(ops_uuid)) {
|
||||
, _row_level_repair(db.local().features().cluster_supports_row_level_repair()) {
|
||||
}
|
||||
|
||||
future<> repair_info::do_streaming() {
|
||||
@@ -1646,7 +1626,7 @@ static int do_repair_start(seastar::sharded<database>& db, seastar::sharded<netw
|
||||
_node_ops_metrics.repair_total_ranges_sum += ranges.size();
|
||||
auto ri = make_lw_shared<repair_info>(db, ms,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair, id.uuid);
|
||||
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
@@ -1716,15 +1696,14 @@ static future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||
sstring keyspace,
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
|
||||
streaming::stream_reason reason,
|
||||
std::optional<utils::UUID> ops_uuid) {
|
||||
streaming::stream_reason reason) {
|
||||
if (ranges.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
|
||||
return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||
repair_uniq_id id = repair_tracker().next_repair_command();
|
||||
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
|
||||
return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable {
|
||||
return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
|
||||
auto cfs = list_column_families(db.local(), keyspace);
|
||||
if (cfs.empty()) {
|
||||
rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace);
|
||||
@@ -1734,12 +1713,12 @@ static future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||
std::vector<future<>> repair_results;
|
||||
repair_results.reserve(smp::count);
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason, ops_uuid] (database& localdb) mutable {
|
||||
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason] (database& localdb) mutable {
|
||||
auto data_centers = std::vector<sstring>();
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ri = make_lw_shared<repair_info>(db, ms,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), reason, ops_uuid);
|
||||
id, std::move(data_centers), std::move(hosts), reason);
|
||||
ri->neighbors = std::move(neighbors);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
@@ -1933,16 +1912,16 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
|
||||
}
|
||||
}
|
||||
auto nr_ranges = desired_ranges.size();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, {}).get();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason).get();
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
|
||||
});
|
||||
}
|
||||
|
||||
static future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
static future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) {
|
||||
using inet_address = gms::inet_address;
|
||||
return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable {
|
||||
return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node)] () mutable {
|
||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
bool is_removenode = myip != leaving_node;
|
||||
@@ -2001,9 +1980,6 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
auto local_dc = get_local_dc();
|
||||
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
|
||||
for (auto&r : ranges) {
|
||||
if (ops) {
|
||||
ops->check_abort();
|
||||
}
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
const std::vector<inet_address> new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes);
|
||||
const std::vector<inet_address>& current_eps = current_replica_endpoints[r];
|
||||
@@ -2085,12 +2061,6 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
}
|
||||
neighbors_set.erase(myip);
|
||||
neighbors_set.erase(leaving_node);
|
||||
// Remove nodes in ignore_nodes
|
||||
if (ops) {
|
||||
for (const auto& node : ops->ignore_nodes) {
|
||||
neighbors_set.erase(node);
|
||||
}
|
||||
}
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors_set |
|
||||
boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) {
|
||||
return snitch_ptr->get_datacenter(node) == local_dc;
|
||||
@@ -2102,10 +2072,9 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, skipped",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors);
|
||||
} else {
|
||||
std::vector<gms::inet_address> mandatory_neighbors = is_removenode ? neighbors : std::vector<gms::inet_address>{};
|
||||
rlogger.info("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, mandatory_neighbor={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors, mandatory_neighbors);
|
||||
range_sources[r] = repair_neighbors(std::move(neighbors), std::move(mandatory_neighbors));
|
||||
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors);
|
||||
range_sources[r] = repair_neighbors(std::move(neighbors));
|
||||
if (is_removenode) {
|
||||
ranges_for_removenode.push_back(r);
|
||||
}
|
||||
@@ -2125,8 +2094,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
ranges.swap(ranges_for_removenode);
|
||||
}
|
||||
auto nr_ranges_synced = ranges.size();
|
||||
std::optional<utils::UUID> opt_uuid = ops ? std::make_optional<utils::UUID>(ops->ops_uuid) : std::nullopt;
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason, opt_uuid).get();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
||||
}
|
||||
@@ -2135,17 +2103,11 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
|
||||
}
|
||||
|
||||
future<> decommission_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr) {
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address(), {});
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node), std::move(ops));
|
||||
}
|
||||
|
||||
future<> abort_repair_node_ops(utils::UUID ops_uuid) {
|
||||
return smp::invoke_on_all([ops_uuid] {
|
||||
return repair_tracker().abort_repair_node_ops(ops_uuid);
|
||||
});
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) {
|
||||
return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node));
|
||||
}
|
||||
|
||||
static future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason) {
|
||||
@@ -2220,7 +2182,7 @@ static future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, s
|
||||
}).get();
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason, {}).get();
|
||||
sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||
@@ -2258,19 +2220,12 @@ static future<> init_messaging_service_handler(sharded<database>& db, sharded<ne
|
||||
return checksum_range(db, keyspace, cf, range, hv);
|
||||
});
|
||||
});
|
||||
ms.register_node_ops_cmd([] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [coordinator, req = std::move(req)] () mutable {
|
||||
return service::get_local_storage_service().node_ops_cmd_handler(coordinator, std::move(req));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> uninit_messaging_service_handler() {
|
||||
return _messaging->invoke_on_all([] (auto& ms) {
|
||||
return when_all_succeed(ms.unregister_repair_checksum_range(), ms.unregister_node_ops_cmd()).discard_result();
|
||||
return ms.unregister_repair_checksum_range();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -76,22 +76,13 @@ struct repair_uniq_id {
|
||||
};
|
||||
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
|
||||
|
||||
struct node_ops_info {
|
||||
utils::UUID ops_uuid;
|
||||
bool abort = false;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
void check_abort();
|
||||
};
|
||||
|
||||
// The tokens are the tokens assigned to the bootstrap node.
|
||||
future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr);
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node);
|
||||
future<> rebuild_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, sstring source_dc);
|
||||
future<> replace_with_repair(seastar::sharded<database>& db, seastar::sharded<netw::messaging_service>& ms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens);
|
||||
|
||||
future<> abort_repair_node_ops(utils::UUID ops_uuid);
|
||||
|
||||
// NOTE: repair_start() can be run on any node, but starts a node-global
|
||||
// operation.
|
||||
// repair_start() starts the requested repair on this node. It returns an
|
||||
@@ -253,7 +244,6 @@ public:
|
||||
bool _row_level_repair;
|
||||
uint64_t _sub_ranges_nr = 0;
|
||||
std::unordered_set<sstring> dropped_tables;
|
||||
std::optional<utils::UUID> _ops_uuid;
|
||||
public:
|
||||
repair_info(seastar::sharded<database>& db_,
|
||||
seastar::sharded<netw::messaging_service>& ms_,
|
||||
@@ -263,8 +253,7 @@ public:
|
||||
repair_uniq_id id_,
|
||||
const std::vector<sstring>& data_centers_,
|
||||
const std::vector<sstring>& hosts_,
|
||||
streaming::stream_reason reason_,
|
||||
std::optional<utils::UUID> ops_uuid);
|
||||
streaming::stream_reason reason_);
|
||||
future<> do_streaming();
|
||||
void check_failed_ranges();
|
||||
future<> request_transfer_ranges(const sstring& cf,
|
||||
@@ -283,9 +272,6 @@ public:
|
||||
const std::vector<sstring>& table_names() {
|
||||
return cfs;
|
||||
}
|
||||
const std::optional<utils::UUID>& ops_uuid() const {
|
||||
return _ops_uuid;
|
||||
};
|
||||
};
|
||||
|
||||
// The repair_tracker tracks ongoing repair operations and their progress.
|
||||
@@ -338,7 +324,6 @@ public:
|
||||
future<> run(repair_uniq_id id, std::function<void ()> func);
|
||||
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
|
||||
float report_progress(streaming::stream_reason reason);
|
||||
void abort_repair_node_ops(utils::UUID ops_uuid);
|
||||
};
|
||||
|
||||
future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
|
||||
@@ -479,27 +464,6 @@ enum class row_level_diff_detect_algorithm : uint8_t {
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo);
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
};
|
||||
|
||||
// The cmd and ops_uuid are mandatory for each request.
|
||||
// The ignore_nodes and leaving_node are optional.
|
||||
struct node_ops_cmd_request {
|
||||
node_ops_cmd cmd;
|
||||
utils::UUID ops_uuid;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
};
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
bool ok;
|
||||
};
|
||||
|
||||
namespace std {
|
||||
template<>
|
||||
struct hash<partition_checksum> {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: a287bb1a39...74ae29bc17
@@ -4931,10 +4931,12 @@ void storage_proxy::init_messaging_service() {
|
||||
tracing::trace(trace_state_ptr, "read_data: message received from /{}", src_addr.addr);
|
||||
}
|
||||
auto da = oda.value_or(query::digest_algorithm::MD5);
|
||||
auto sp = get_local_shared_storage_proxy();
|
||||
if (!cmd.max_result_size) {
|
||||
cmd.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
|
||||
auto& cfg = sp->local_db().get_config();
|
||||
cmd.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit());
|
||||
}
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
return do_with(std::move(pr), std::move(sp), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
p->get_stats().replica_data_reads++;
|
||||
auto src_ip = src_addr.addr;
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr), p->_messaging).then([cmd, da, &pr, &p, &trace_state_ptr, t] (schema_ptr s) {
|
||||
|
||||
@@ -107,7 +107,6 @@ storage_service::storage_service(abort_source& abort_source, distributed<databas
|
||||
, _service_memory_total(config.available_memory / 10)
|
||||
, _service_memory_limiter(_service_memory_total)
|
||||
, _for_testing(for_testing)
|
||||
, _node_ops_abort_thread(node_ops_abort_thread())
|
||||
, _shared_token_metadata(stm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _view_update_generator(view_update_generator)
|
||||
@@ -1750,12 +1749,9 @@ future<> storage_service::gossip_sharder() {
|
||||
|
||||
future<> storage_service::stop() {
|
||||
// make sure nobody uses the semaphore
|
||||
node_ops_singal_abort(std::nullopt);
|
||||
return _service_memory_limiter.wait(_service_memory_total).finally([this] {
|
||||
_listeners.clear();
|
||||
return _schema_version_publisher.join();
|
||||
}).finally([this] {
|
||||
return std::move(_node_ops_abort_thread);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2177,192 +2173,102 @@ future<> storage_service::decommission() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
|
||||
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
|
||||
auto uuid = utils::make_random_uuid();
|
||||
auto tmptr = ss.get_token_metadata_ptr();
|
||||
future<> storage_service::removenode(sstring host_id_string) {
|
||||
return run_with_api_lock(sstring("removenode"), [host_id_string] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id_string] {
|
||||
slogger.debug("removenode: host_id = {}", host_id_string);
|
||||
auto my_address = ss.get_broadcast_address();
|
||||
auto tmlock = std::make_unique<token_metadata_lock>(ss.get_token_metadata_lock().get0());
|
||||
auto tmptr = ss.get_mutable_token_metadata_ptr().get0();
|
||||
auto local_host_id = tmptr->get_host_id(my_address);
|
||||
auto host_id = utils::UUID(host_id_string);
|
||||
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
|
||||
if (!endpoint_opt) {
|
||||
throw std::runtime_error(format("removenode[{}]: Host ID not found in the cluster", uuid));
|
||||
throw std::runtime_error("Host ID not found.");
|
||||
}
|
||||
auto endpoint = *endpoint_opt;
|
||||
|
||||
auto tokens = tmptr->get_tokens(endpoint);
|
||||
auto leaving_nodes = std::list<gms::inet_address>{endpoint};
|
||||
|
||||
future<> heartbeat_updater = make_ready_future<>();
|
||||
auto heartbeat_updater_done = make_lw_shared<bool>(false);
|
||||
slogger.debug("removenode: endpoint = {}", endpoint);
|
||||
|
||||
// Step 1: Decide who needs to sync data
|
||||
//
|
||||
// By default, we require all nodes in the cluster to participate
|
||||
// the removenode operation and sync data if needed. We fail the
|
||||
// removenode operation if any of them is down or fails.
|
||||
//
|
||||
// If the user want the removenode opeartion to succeed even if some of the nodes
|
||||
// are not available, the user has to explicitly pass a list of
|
||||
// node that can be skipped for the operation.
|
||||
std::vector<gms::inet_address> nodes;
|
||||
for (const auto& x : tmptr->get_endpoint_to_host_id_map_for_reading()) {
|
||||
seastar::thread::maybe_yield();
|
||||
if (x.first != endpoint && std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) {
|
||||
nodes.push_back(x.first);
|
||||
if (endpoint == my_address) {
|
||||
throw std::runtime_error("Cannot remove self");
|
||||
}
|
||||
|
||||
if (ss._gossiper.get_live_members().contains(endpoint)) {
|
||||
throw std::runtime_error(format("Node {} is alive and owns this ID. Use decommission command to remove it from the ring", endpoint));
|
||||
}
|
||||
|
||||
// A leaving endpoint that is dead is already being removed.
|
||||
if (tmptr->is_leaving(endpoint)) {
|
||||
slogger.warn("Node {} is already being removed, continuing removal anyway", endpoint);
|
||||
}
|
||||
|
||||
if (!ss._replicating_nodes.empty()) {
|
||||
throw std::runtime_error("This node is already processing a removal. Wait for it to complete, or use 'removenode force' if this has failed.");
|
||||
}
|
||||
|
||||
auto non_system_keyspaces = ss.db().local().get_non_system_keyspaces();
|
||||
// Find the endpoints that are going to become responsible for data
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
auto& ks = ss.db().local().find_keyspace(keyspace_name);
|
||||
// if the replication factor is 1 the data is lost so we shouldn't wait for confirmation
|
||||
if (ks.get_replication_strategy().get_replication_factor() == 1) {
|
||||
slogger.warn("keyspace={} has replication factor 1, the data is probably lost", keyspace_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// get all ranges that change ownership (that is, a node needs
|
||||
// to take responsibility for new range)
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges =
|
||||
ss.get_changed_ranges_for_leaving(keyspace_name, endpoint);
|
||||
for (auto& x: changed_ranges) {
|
||||
auto ep = x.second;
|
||||
if (ss._gossiper.is_alive(ep)) {
|
||||
ss._replicating_nodes.emplace(ep);
|
||||
} else {
|
||||
slogger.warn("Endpoint {} is down and will not receive data for re-replication of {}", ep, endpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
slogger.info("removenode[{}]: Started removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
|
||||
slogger.info("removenode: endpoint = {}, replicating_nodes = {}", endpoint, ss._replicating_nodes);
|
||||
ss._removing_node = endpoint;
|
||||
tmptr->add_leaving_endpoint(endpoint);
|
||||
ss.update_pending_ranges(tmptr, format("removenode {}", endpoint)).get();
|
||||
ss.replicate_to_all_cores(std::move(tmptr)).get();
|
||||
tmlock.reset();
|
||||
|
||||
// Step 2: Prepare to sync data
|
||||
std::unordered_set<gms::inet_address> nodes_unknown_verb;
|
||||
std::unordered_set<gms::inet_address> nodes_down;
|
||||
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes};
|
||||
try {
|
||||
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got prepare response from node={}", uuid, node);
|
||||
}).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) {
|
||||
slogger.warn("removenode[{}]: Node {} does not support removenode verb", uuid, node);
|
||||
nodes_unknown_verb.emplace(node);
|
||||
}).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) {
|
||||
slogger.warn("removenode[{}]: Node {} is down for node_ops_cmd verb", uuid, node);
|
||||
nodes_down.emplace(node);
|
||||
});
|
||||
}).get();
|
||||
if (!nodes_unknown_verb.empty()) {
|
||||
auto msg = format("removenode[{}]: Nodes={} do not support removenode verb. Please upgrade your cluster and run removenode again.", uuid, nodes_unknown_verb);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
auto msg = format("removenode[{}]: Nodes={} needed for removenode operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, run nodetool removenode --ignore-dead-nodes <list_of_dead_nodes> <host_id>. E.g., nodetool removenode --ignore-dead-nodes 127.0.0.1,127.0.0.2 817e9515-316f-4fe3-aaab-b00d6f12dddd", uuid, nodes_down);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
|
||||
// we add our own token so other nodes to let us know when they're done
|
||||
ss._gossiper.advertise_removing(endpoint, host_id, local_host_id).get();
|
||||
|
||||
// Step 3: Start heartbeat updater
|
||||
heartbeat_updater = seastar::async([&ss, &nodes, uuid, heartbeat_updater_done] {
|
||||
slogger.debug("removenode[{}]: Started heartbeat_updater", uuid);
|
||||
while (!(*heartbeat_updater_done)) {
|
||||
auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}};
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got heartbeat response from node={}", uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).handle_exception([uuid] (std::exception_ptr ep) {
|
||||
slogger.warn("removenode[{}]: Failed to send heartbeat", uuid);
|
||||
}).get();
|
||||
int nr_seconds = 10;
|
||||
while (!(*heartbeat_updater_done) && nr_seconds--) {
|
||||
sleep_abortable(std::chrono::seconds(1), ss._abort_source).get();
|
||||
}
|
||||
}
|
||||
slogger.debug("removenode[{}]: Stopped heartbeat_updater", uuid);
|
||||
});
|
||||
auto stop_heartbeat_updater = defer([&] {
|
||||
*heartbeat_updater_done = true;
|
||||
heartbeat_updater.get();
|
||||
});
|
||||
// kick off streaming commands
|
||||
// No need to wait for restore_replica_count to complete, since
|
||||
// when it completes, the node will be removed from _replicating_nodes,
|
||||
// and we wait for _replicating_nodes to become empty below
|
||||
//FIXME: discarded future.
|
||||
(void)ss.restore_replica_count(endpoint, my_address).handle_exception([endpoint, my_address] (auto ep) {
|
||||
slogger.info("Failed to restore_replica_count for node {} on node {}", endpoint, my_address);
|
||||
});
|
||||
|
||||
// Step 4: Start to sync data
|
||||
req.cmd = node_ops_cmd::removenode_sync_data;
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got sync_data response from node={}", uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
|
||||
|
||||
// Step 5: Announce the node has left
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint);
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
|
||||
// Step 6: Finish
|
||||
req.cmd = node_ops_cmd::removenode_done;
|
||||
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got done response from node={}", uuid, node);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).get();
|
||||
slogger.info("removenode[{}]: Finished removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
|
||||
} catch (...) {
|
||||
// we need to revert the effect of prepare verb the removenode ops is failed
|
||||
req.cmd = node_ops_cmd::removenode_abort;
|
||||
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
|
||||
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node)) {
|
||||
// No need to revert previous prepare cmd for those who do not apply prepare cmd.
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
slogger.debug("removenode[{}]: Got abort response from node={}", uuid, node);
|
||||
});
|
||||
}).get();
|
||||
slogger.info("removenode[{}]: Aborted removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
|
||||
throw;
|
||||
// wait for ReplicationFinishedVerbHandler to signal we're done
|
||||
while (!(ss._replicating_nodes.empty() || ss._force_remove_completion)) {
|
||||
sleep_abortable(std::chrono::milliseconds(100), ss._abort_source).get();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
|
||||
return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable {
|
||||
return seastar::async([&ss, coordinator, req = std::move(req)] () mutable {
|
||||
auto ops_uuid = req.ops_uuid;
|
||||
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid);
|
||||
if (req.cmd == node_ops_cmd::removenode_prepare) {
|
||||
if (req.leaving_nodes.size() > 1) {
|
||||
auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
|
||||
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); });
|
||||
ss._node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_done) {
|
||||
slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_sync_data) {
|
||||
auto it = ss._node_ops.find(ops_uuid);
|
||||
if (it == ss._node_ops.end()) {
|
||||
throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid));
|
||||
}
|
||||
auto ops = it->second.get_ops_info();
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={}, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
removenode_with_repair(ss._db, ss._messaging, ss.get_token_metadata_ptr(), node, ops).get();
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::removenode_abort) {
|
||||
ss.node_ops_abort(ops_uuid);
|
||||
} else {
|
||||
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd));
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
if (ss._force_remove_completion) {
|
||||
throw std::runtime_error("nodetool removenode force is called by user");
|
||||
}
|
||||
node_ops_cmd_response resp;
|
||||
resp.ok = true;
|
||||
return resp;
|
||||
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint);
|
||||
|
||||
// gossiper will indicate the token has left
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
|
||||
ss._replicating_nodes.clear();
|
||||
ss._removing_node = std::nullopt;
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2606,9 +2512,7 @@ void storage_service::unbootstrap() {
|
||||
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
if (is_repair_based_node_ops_enabled()) {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::list<gms::inet_address>()});
|
||||
return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
|
||||
return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint).finally([this, notify_endpoint] () {
|
||||
return send_replication_notification(notify_endpoint);
|
||||
});
|
||||
}
|
||||
@@ -3323,111 +3227,5 @@ bool storage_service::is_repair_based_node_ops_enabled() {
|
||||
return _db.local().get_config().enable_repair_based_node_ops();
|
||||
}
|
||||
|
||||
node_ops_meta_data::node_ops_meta_data(
|
||||
utils::UUID ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
shared_ptr<node_ops_info> ops,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func)
|
||||
: _ops_uuid(std::move(ops_uuid))
|
||||
, _coordinator(std::move(coordinator))
|
||||
, _abort(std::move(abort_func))
|
||||
, _signal(std::move(signal_func))
|
||||
, _ops(std::move(ops))
|
||||
, _watchdog([sig = _signal] { sig(); }) {
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::abort() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
|
||||
_aborted = true;
|
||||
if (_ops) {
|
||||
_ops->abort = true;
|
||||
}
|
||||
_watchdog.cancel();
|
||||
return _abort();
|
||||
}
|
||||
|
||||
void node_ops_meta_data::update_watchdog() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} update_watchdog", _ops_uuid);
|
||||
if (_aborted) {
|
||||
return;
|
||||
}
|
||||
_watchdog.cancel();
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
void node_ops_meta_data::cancel_watchdog() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} cancel_watchdog", _ops_uuid);
|
||||
_watchdog.cancel();
|
||||
}
|
||||
|
||||
shared_ptr<node_ops_info> node_ops_meta_data::get_ops_info() {
|
||||
return _ops;
|
||||
}
|
||||
|
||||
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.update_watchdog();
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.cancel_watchdog();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.abort().get();
|
||||
abort_repair_node_ops(ops_uuid).get();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void storage_service::node_ops_singal_abort(std::optional<utils::UUID> ops_uuid) {
|
||||
slogger.debug("node_ops_singal_abort: ops_uuid={}", ops_uuid);
|
||||
_node_ops_abort_queue.push_back(ops_uuid);
|
||||
_node_ops_abort_cond.signal();
|
||||
}
|
||||
|
||||
future<> storage_service::node_ops_abort_thread() {
|
||||
return seastar::async([this] {
|
||||
slogger.info("Started node_ops_abort_thread");
|
||||
for (;;) {
|
||||
_node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); }).get();
|
||||
slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue);
|
||||
while (!_node_ops_abort_queue.empty()) {
|
||||
auto uuid_opt = _node_ops_abort_queue.front();
|
||||
_node_ops_abort_queue.pop_front();
|
||||
if (!uuid_opt) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
storage_service::node_ops_abort(*uuid_opt);
|
||||
} catch (...) {
|
||||
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
} // namespace service
|
||||
|
||||
|
||||
@@ -63,12 +63,6 @@
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include "sstables/version.hh"
|
||||
#include "cdc/metadata.hh"
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/lowres_clock.hh>
|
||||
|
||||
class node_ops_cmd_request;
|
||||
class node_ops_cmd_response;
|
||||
class node_ops_info;
|
||||
|
||||
namespace cql_transport { class controller; }
|
||||
|
||||
@@ -109,28 +103,6 @@ struct storage_service_config {
|
||||
size_t available_memory;
|
||||
};
|
||||
|
||||
class node_ops_meta_data {
|
||||
utils::UUID _ops_uuid;
|
||||
gms::inet_address _coordinator;
|
||||
std::function<future<> ()> _abort;
|
||||
std::function<void ()> _signal;
|
||||
shared_ptr<node_ops_info> _ops;
|
||||
seastar::timer<lowres_clock> _watchdog;
|
||||
std::chrono::seconds _watchdog_interval{30};
|
||||
bool _aborted = false;
|
||||
public:
|
||||
explicit node_ops_meta_data(
|
||||
utils::UUID ops_uuid,
|
||||
gms::inet_address coordinator,
|
||||
shared_ptr<node_ops_info> ops,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func);
|
||||
shared_ptr<node_ops_info> get_ops_info();
|
||||
future<> abort();
|
||||
void update_watchdog();
|
||||
void cancel_watchdog();
|
||||
};
|
||||
|
||||
/**
|
||||
* This abstraction contains the token/identifier of this node
|
||||
* on the identifier space. This token gets gossiped around.
|
||||
@@ -186,17 +158,6 @@ private:
|
||||
* and would only slow down tests (by having them wait).
|
||||
*/
|
||||
bool _for_testing;
|
||||
|
||||
std::unordered_map<utils::UUID, node_ops_meta_data> _node_ops;
|
||||
std::list<std::optional<utils::UUID>> _node_ops_abort_queue;
|
||||
seastar::condition_variable _node_ops_abort_cond;
|
||||
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
|
||||
future<> _node_ops_abort_thread;
|
||||
void node_ops_update_heartbeat(utils::UUID ops_uuid);
|
||||
void node_ops_done(utils::UUID ops_uuid);
|
||||
void node_ops_abort(utils::UUID ops_uuid);
|
||||
void node_ops_singal_abort(std::optional<utils::UUID> ops_uuid);
|
||||
future<> node_ops_abort_thread();
|
||||
public:
|
||||
storage_service(abort_source& as, distributed<database>& db, gms::gossiper& gossiper, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, storage_service_config config, sharded<service::migration_notifier>& mn, locator::shared_token_metadata& stm, sharded<netw::messaging_service>& ms, /* only for tests */ bool for_testing = false);
|
||||
|
||||
@@ -810,8 +771,7 @@ public:
|
||||
*
|
||||
* @param hostIdString token for the node
|
||||
*/
|
||||
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
|
||||
future<> removenode(sstring host_id_string);
|
||||
|
||||
future<sstring> get_operation_mode();
|
||||
|
||||
|
||||
@@ -380,7 +380,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
try {
|
||||
db.find_column_family(ks, cf);
|
||||
} catch (no_such_column_family&) {
|
||||
auto err = format("[Stream #{{}}] prepare requested ks={{}} cf={{}} does not exist", ks, cf);
|
||||
auto err = format("[Stream #{{}}] prepare requested ks={{}} cf={{}} does not exist", plan_id, ks, cf);
|
||||
sslog.warn(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
|
||||
3
table.cc
3
table.cc
@@ -1677,7 +1677,8 @@ write_memtable_to_sstable(flat_mutation_reader reader,
|
||||
const io_priority_class& pc) {
|
||||
cfg.replay_position = mt.replay_position();
|
||||
cfg.monitor = &monitor;
|
||||
return sst->write_components(std::move(reader), mt.partition_count(), mt.schema(), cfg, mt.get_encoding_stats(), pc);
|
||||
schema_ptr s = reader.schema();
|
||||
return sst->write_components(std::move(reader), mt.partition_count(), s, cfg, mt.get_encoding_stats(), pc);
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
@@ -136,7 +136,7 @@ def test_update_condition_eq_different(test_table_s):
|
||||
ConditionExpression='a = :val2',
|
||||
ExpressionAttributeValues={':val1': val1, ':val2': val2})
|
||||
|
||||
# Also check an actual case of same time, but inequality.
|
||||
# Also check an actual case of same type, but inequality.
|
||||
def test_update_condition_eq_unequal(test_table_s):
|
||||
p = random_string()
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
@@ -146,6 +146,13 @@ def test_update_condition_eq_unequal(test_table_s):
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='a = :oldval',
|
||||
ExpressionAttributeValues={':val1': 3, ':oldval': 2})
|
||||
# If the attribute being compared doesn't exist, it's considered a failed
|
||||
# condition, not an error:
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q = :oldval',
|
||||
ExpressionAttributeValues={':val1': 3, ':oldval': 2})
|
||||
|
||||
# Check that set equality is checked correctly. Unlike string equality (for
|
||||
# example), it cannot be done with just naive string comparison of the JSON
|
||||
@@ -269,15 +276,44 @@ def test_update_condition_lt(test_table_s):
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a < :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
# Trying to compare an unsupported type - e.g., in the following test
|
||||
# a boolean, is unfortunately caught by boto3 and cannot be tested here...
|
||||
#test_table_s.update_item(Key={'p': p},
|
||||
# AttributeUpdates={'d': {'Value': False, 'Action': 'PUT'}})
|
||||
#with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
# test_table_s.update_item(Key={'p': p},
|
||||
# UpdateExpression='SET z = :newval',
|
||||
# ConditionExpression='d < :oldval',
|
||||
# ExpressionAttributeValues={':newval': 2, ':oldval': True})
|
||||
# If the attribute being compared doesn't even exist, this is also
|
||||
# considered as a false condition - not an error.
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='q < :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval < q',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
# If a comparison parameter comes from a constant specified in the query,
|
||||
# and it has a type not supported by the comparison (e.g., a list), it's
|
||||
# not just a failed comparison - it is considered a ValidationException
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a < :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval < a',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
# However, if when the wrong type comes from an item attribute, not the
|
||||
# query, the comparison is simply false - not a ValidationException.
|
||||
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='x < :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval < x',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 4
|
||||
|
||||
# Test for ConditionExpression with operator "<="
|
||||
@@ -341,6 +377,44 @@ def test_update_condition_le(test_table_s):
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a <= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
# If the attribute being compared doesn't even exist, this is also
|
||||
# considered as a false condition - not an error.
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='q <= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval <= q',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
# If a comparison parameter comes from a constant specified in the query,
|
||||
# and it has a type not supported by the comparison (e.g., a list), it's
|
||||
# not just a failed comparison - it is considered a ValidationException
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a <= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval <= a',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
# However, if when the wrong type comes from an item attribute, not the
|
||||
# query, the comparison is simply false - not a ValidationException.
|
||||
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='x <= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval <= x',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 7
|
||||
|
||||
# Test for ConditionExpression with operator ">"
|
||||
@@ -404,6 +478,44 @@ def test_update_condition_gt(test_table_s):
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a > :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
# If the attribute being compared doesn't even exist, this is also
|
||||
# considered as a false condition - not an error.
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='q > :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval > q',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
# If a comparison parameter comes from a constant specified in the query,
|
||||
# and it has a type not supported by the comparison (e.g., a list), it's
|
||||
# not just a failed comparison - it is considered a ValidationException
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a > :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval > a',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
# However, if when the wrong type comes from an item attribute, not the
|
||||
# query, the comparison is simply false - not a ValidationException.
|
||||
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='x > :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval > x',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 4
|
||||
|
||||
# Test for ConditionExpression with operator ">="
|
||||
@@ -467,6 +579,44 @@ def test_update_condition_ge(test_table_s):
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a >= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '0'})
|
||||
# If the attribute being compared doesn't even exist, this is also
|
||||
# considered as a false condition - not an error.
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='q >= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval >= q',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': '17'})
|
||||
# If a comparison parameter comes from a constant specified in the query,
|
||||
# and it has a type not supported by the comparison (e.g., a list), it's
|
||||
# not just a failed comparison - it is considered a ValidationException
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a >= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval >= a',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': [1,2]})
|
||||
# However, if when the wrong type comes from an item attribute, not the
|
||||
# query, the comparison is simply false - not a ValidationException.
|
||||
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='x >= :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression=':oldval >= x',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 7
|
||||
|
||||
# Test for ConditionExpression with ternary operator "BETWEEN" (checking
|
||||
@@ -548,6 +698,60 @@ def test_update_condition_between(test_table_s):
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval1': '0', ':oldval2': '2'})
|
||||
# If the attribute being compared doesn't even exist, this is also
|
||||
# considered as a false condition - not an error.
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='q BETWEEN :oldval1 AND :oldval2',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval1': b'dog', ':oldval2': b'zebra'})
|
||||
# If and operand from the query, and it has a type not supported by the
|
||||
# comparison (e.g., a list), it's not just a failed condition - it is
|
||||
# considered a ValidationException
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval1': [1,2], ':oldval2': [2,3]})
|
||||
# However, if when the wrong type comes from an item attribute, not the
|
||||
# query, the comparison is simply false - not a ValidationException.
|
||||
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'x': {'Value': [1,2,3], 'Action': 'PUT'},
|
||||
'y': {'Value': [2,3,4], 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN x and y',
|
||||
ExpressionAttributeValues={':newval': 2})
|
||||
# If the two operands come from the query (":val" references) then if they
|
||||
# have different types or the wrong order, this is a ValidationException.
|
||||
# But if one or more of the operands come from the item, this only causes
|
||||
# a false condition - not a ValidationException.
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval1': 2, ':oldval2': 1})
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN :oldval1 AND :oldval2',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval1': 2, ':oldval2': 'dog'})
|
||||
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'two': {'Value': 2, 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN two AND :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN :oldval AND two',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 3})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET z = :newval',
|
||||
ConditionExpression='a BETWEEN two AND :oldval',
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 'dog'})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 9
|
||||
|
||||
# Test for ConditionExpression with multi-operand operator "IN", checking
|
||||
@@ -605,6 +809,13 @@ def test_update_condition_in(test_table_s):
|
||||
UpdateExpression='SET c = :val37',
|
||||
ConditionExpression='a IN ()',
|
||||
ExpressionAttributeValues=values)
|
||||
# If the attribute being compared doesn't even exist, this is also
|
||||
# considered as a false condition - not an error.
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET c = :val37',
|
||||
ConditionExpression='q IN ({})'.format(','.join(values.keys())),
|
||||
ExpressionAttributeValues=values)
|
||||
|
||||
# Beyond the above operators, there are also test functions supported -
|
||||
# attribute_exists, attribute_not_exists, attribute_type, begins_with,
|
||||
|
||||
@@ -237,6 +237,30 @@ def test_update_expected_1_le(test_table_s):
|
||||
'AttributeValueList': [2, 3]}}
|
||||
)
|
||||
|
||||
# Comparison operators like le work only on numbers, strings or bytes.
|
||||
# As noted in issue #8043, if any other type is included in *the query*,
|
||||
# the result should be a ValidationException, but if the wrong type appears
|
||||
# in the item, not the query, the result is a failed condition.
|
||||
def test_update_expected_1_le_validation(test_table_s):
|
||||
p = random_string()
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'},
|
||||
'b': {'Value': [1,2], 'Action': 'PUT'}})
|
||||
# Bad type (a list) in the query. Result is ValidationException.
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
|
||||
Expected={'a': {'ComparisonOperator': 'LE',
|
||||
'AttributeValueList': [[1,2,3]]}}
|
||||
)
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
|
||||
Expected={'b': {'ComparisonOperator': 'LE',
|
||||
'AttributeValueList': [3]}}
|
||||
)
|
||||
assert not 'z' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
|
||||
# Tests for Expected with ComparisonOperator = "LT":
|
||||
def test_update_expected_1_lt(test_table_s):
|
||||
p = random_string()
|
||||
@@ -894,6 +918,34 @@ def test_update_expected_1_between(test_table_s):
|
||||
AttributeUpdates={'z': {'Value': 2, 'Action': 'PUT'}},
|
||||
Expected={'d': {'ComparisonOperator': 'BETWEEN', 'AttributeValueList': [set([1]), set([2])]}})
|
||||
|
||||
# BETWEEN work only on numbers, strings or bytes. As noted in issue #8043,
|
||||
# if any other type is included in *the query*, the result should be a
|
||||
# ValidationException, but if the wrong type appears in the item, not the
|
||||
# query, the result is a failed condition.
|
||||
# BETWEEN should also generate ValidationException if the two ends of the
|
||||
# range are not of the same type or not in the correct order, but this
|
||||
# already is tested in the test above (test_update_expected_1_between).
|
||||
def test_update_expected_1_between_validation(test_table_s):
|
||||
p = random_string()
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'},
|
||||
'b': {'Value': [1,2], 'Action': 'PUT'}})
|
||||
# Bad type (a list) in the query. Result is ValidationException.
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
|
||||
Expected={'a': {'ComparisonOperator': 'BETWEEN',
|
||||
'AttributeValueList': [[1,2,3], [2,3,4]]}}
|
||||
)
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'z': {'Value': 17, 'Action': 'PUT'}},
|
||||
Expected={'b': {'ComparisonOperator': 'BETWEEN',
|
||||
'AttributeValueList': [1,2]}}
|
||||
)
|
||||
assert not 'z' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
|
||||
|
||||
##############################################################################
|
||||
# Instead of ComparisonOperator and AttributeValueList, one can specify either
|
||||
# Value or Exists:
|
||||
|
||||
@@ -235,6 +235,30 @@ def test_filter_expression_ge(test_table_sn_with_data):
|
||||
expected_items = [item for item in items if item[xn] >= xv]
|
||||
assert(got_items == expected_items)
|
||||
|
||||
# Comparison operators such as >= or BETWEEN only work on numbers, strings or
|
||||
# bytes. When an expression's operands come from the item and has a wrong type
|
||||
# (e.g., a list), the result is that the item is skipped - aborting the scan
|
||||
# with a ValidationException is a bug (this was issue #8043).
|
||||
def test_filter_expression_le_bad_type(test_table_sn_with_data):
|
||||
table, p, items = test_table_sn_with_data
|
||||
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='l <= :xv',
|
||||
ExpressionAttributeValues={':p': p, ':xv': 3})
|
||||
assert got_items == []
|
||||
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression=':xv <= l',
|
||||
ExpressionAttributeValues={':p': p, ':xv': 3})
|
||||
assert got_items == []
|
||||
def test_filter_expression_between_bad_type(test_table_sn_with_data):
|
||||
table, p, items = test_table_sn_with_data
|
||||
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='s between :xv and l',
|
||||
ExpressionAttributeValues={':p': p, ':xv': 'cat'})
|
||||
assert got_items == []
|
||||
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='s between l and :xv',
|
||||
ExpressionAttributeValues={':p': p, ':xv': 'cat'})
|
||||
assert got_items == []
|
||||
got_items = full_query(table, KeyConditionExpression='p=:p', FilterExpression='s between i and :xv',
|
||||
ExpressionAttributeValues={':p': p, ':xv': 'cat'})
|
||||
assert got_items == []
|
||||
|
||||
# Test the "BETWEEN/AND" ternary operator on a numeric, string and bytes
|
||||
# attribute. These keywords are case-insensitive.
|
||||
def test_filter_expression_between(test_table_sn_with_data):
|
||||
|
||||
@@ -49,6 +49,16 @@ def testTimeuuid(cql, test_keyspace):
|
||||
for i in range(4):
|
||||
uuid = rows[i][1]
|
||||
datetime = datetime_from_uuid1(uuid)
|
||||
# Before comparing this datetime to the result of dateOf(), we
|
||||
# must truncate the resolution of datetime to milliseconds.
|
||||
# he problem is that the dateOf(timeuuid) CQL function converts a
|
||||
# timeuuid to CQL's "timestamp" type, which has millisecond
|
||||
# resolution, but datetime *may* have finer resolution. It will
|
||||
# usually be whole milliseconds, because this is what the now()
|
||||
# implementation usually does, but when now() is called more than
|
||||
# once per millisecond, it *may* start incrementing the sub-
|
||||
# millisecond part.
|
||||
datetime = datetime.replace(microsecond=datetime.microsecond//1000*1000)
|
||||
timestamp = round(datetime.replace(tzinfo=timezone.utc).timestamp() * 1000)
|
||||
assert_rows(execute(cql, table, "SELECT dateOf(t), unixTimestampOf(t) FROM %s WHERE k = 0 AND t = ?", rows[i][1]),
|
||||
[datetime, timestamp])
|
||||
|
||||
@@ -136,7 +136,7 @@ def test_mix_per_query_timeout_with_other_params(scylla_only, cql, table1):
|
||||
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES ({key},1,1) USING TIMEOUT 60m AND TTL 1000000 AND TIMESTAMP 321")
|
||||
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES ({key},2,1) USING TIMESTAMP 42 AND TIMEOUT 30m")
|
||||
res = list(cql.execute(f"SELECT ttl(v), writetime(v) FROM {table} WHERE p = {key} and c = 1"))
|
||||
assert len(res) == 1 and res[0].ttl_v == 1000000 and res[0].writetime_v == 321
|
||||
assert len(res) == 1 and res[0].ttl_v > 0 and res[0].writetime_v == 321
|
||||
res = list(cql.execute(f"SELECT ttl(v), writetime(v) FROM {table} WHERE p = {key} and c = 2"))
|
||||
assert len(res) == 1 and not res[0].ttl_v and res[0].writetime_v == 42
|
||||
|
||||
|
||||
Submodule tools/python3 updated: 1763a1a834...199ac909b0
Reference in New Issue
Block a user