Compare commits
38 Commits
scylla-3.3
...
next-3.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd293768e7 | ||
|
|
22dfa48585 | ||
|
|
2f3d7f1408 | ||
|
|
76a08df939 | ||
|
|
6aa129d3b0 | ||
|
|
b4f781e4eb | ||
|
|
27594ca50e | ||
|
|
0f2f0d65d7 | ||
|
|
31c2f8a3ae | ||
|
|
ec12331f11 | ||
|
|
ccc463b5e5 | ||
|
|
4a9676f6b7 | ||
|
|
aaf4989c31 | ||
|
|
b29f954f20 | ||
|
|
5546d5df7b | ||
|
|
541c29677f | ||
|
|
06f18108c0 | ||
|
|
90002ca3d2 | ||
|
|
da23902311 | ||
|
|
2b0dc21f97 | ||
|
|
b544691493 | ||
|
|
d420b06844 | ||
|
|
b3a2cb2f68 | ||
|
|
c8c057f5f8 | ||
|
|
038bfc925c | ||
|
|
13a4e7db83 | ||
|
|
727d6cf8f3 | ||
|
|
6d6d7b4abe | ||
|
|
28f974b810 | ||
|
|
5fdadcaf3b | ||
|
|
a960394f27 | ||
|
|
3216a1a70a | ||
|
|
5a7fd41618 | ||
|
|
dd24ba7a62 | ||
|
|
204f6dd393 | ||
|
|
b1278adc15 | ||
|
|
ee9677ef71 | ||
|
|
2060e361cf |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=3.3.1
|
||||
VERSION=3.3.4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -388,6 +388,34 @@ def test_gsi_update_second_regular_base_column(test_table_gsi_3):
|
||||
KeyConditions={'a': {'AttributeValueList': [items[3]['a']], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [items[3]['b']], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# Test that when a table has a GSI, if the indexed attribute is missing, the
|
||||
# item is added to the base table but not the index.
|
||||
# This is the same feature we already tested in test_gsi_missing_attribute()
|
||||
# above, but on a different table: In that test we used test_table_gsi_2,
|
||||
# with one indexed attribute, and in this test we use test_table_gsi_3 which
|
||||
# has two base regular attributes in the view key, and more possibilities
|
||||
# of which value might be missing. Reproduces issue #6008.
|
||||
def test_gsi_missing_attribute_3(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
# First, add an item with a missing "a" value. It should appear in the
|
||||
# base table, but not in the index:
|
||||
test_table_gsi_3.put_item(Item={'p': p, 'b': b})
|
||||
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p, 'b': b}
|
||||
# Note: with eventually consistent read, we can't really be sure that
|
||||
# an item will "never" appear in the index. We hope that if a bug exists
|
||||
# and such an item did appear, sometimes the delay here will be enough
|
||||
# for the unexpected item to become visible.
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
|
||||
# Same thing for an item with a missing "b" value:
|
||||
test_table_gsi_3.put_item(Item={'p': p, 'a': a})
|
||||
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p, 'a': a}
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
|
||||
# And for an item missing both:
|
||||
test_table_gsi_3.put_item(Item={'p': p})
|
||||
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p}
|
||||
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
|
||||
|
||||
# A fourth scenario of GSI. Two GSIs on a single base table.
|
||||
@pytest.fixture(scope="session")
|
||||
|
||||
@@ -66,8 +66,9 @@ static std::string format_time_point(db_clock::time_point tp) {
|
||||
time_t time_point_repr = db_clock::to_time_t(tp);
|
||||
std::string time_point_str;
|
||||
time_point_str.resize(17);
|
||||
::tm time_buf;
|
||||
// strftime prints the terminating null character as well
|
||||
std::strftime(time_point_str.data(), time_point_str.size(), "%Y%m%dT%H%M%SZ", std::gmtime(&time_point_repr));
|
||||
std::strftime(time_point_str.data(), time_point_str.size(), "%Y%m%dT%H%M%SZ", ::gmtime_r(&time_point_repr, &time_buf));
|
||||
time_point_str.resize(16);
|
||||
return time_point_str;
|
||||
}
|
||||
|
||||
@@ -215,6 +215,7 @@ future<> server::verify_signature(const request& req) {
|
||||
}
|
||||
|
||||
future<json::json_return_type> server::handle_api_request(std::unique_ptr<request>&& req) {
|
||||
_executor.local()._stats.total_operations++;
|
||||
sstring target = req->get_header(TARGET);
|
||||
std::vector<std::string_view> split_target = split(target, '.');
|
||||
//NOTICE(sarna): Target consists of Dynamo API version followed by a dot '.' and operation type (e.g. CreateTable)
|
||||
|
||||
@@ -56,26 +56,22 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) {
|
||||
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
|
||||
}
|
||||
|
||||
static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
|
||||
std::vector<ss::token_range> res;
|
||||
for (auto d : service::get_local_storage_service().describe_ring(keyspace)) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
r.endpoint_details.push(ed);
|
||||
static ss::token_range token_range_endpoints_to_json(const dht::token_range_endpoints& d) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
res.push_back(r);
|
||||
r.endpoint_details.push(ed);
|
||||
}
|
||||
return res;
|
||||
return r;
|
||||
}
|
||||
|
||||
void set_storage_service(http_context& ctx, routes& r) {
|
||||
@@ -177,13 +173,13 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
|
||||
ss::describe_any_ring.set(r, [&ctx](const_req req) {
|
||||
return describe_ring("");
|
||||
ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::describe_ring.set(r, [&ctx](const_req req) {
|
||||
auto keyspace = validate_keyspace(ctx, req.param);
|
||||
return describe_ring(keyspace);
|
||||
ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::get_host_id_map.set(r, [](const_req req) {
|
||||
@@ -255,6 +251,9 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
if (column_family.empty()) {
|
||||
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
}
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
|
||||
#include "atomic_cell.hh"
|
||||
#include "atomic_cell_or_collection.hh"
|
||||
#include "counters.hh"
|
||||
#include "types.hh"
|
||||
|
||||
/// LSA mirator for cells with irrelevant type
|
||||
@@ -218,7 +219,9 @@ std::ostream&
|
||||
operator<<(std::ostream& os, const atomic_cell_view& acv) {
|
||||
if (acv.is_live()) {
|
||||
return fmt_print(os, "atomic_cell{{{},ts={:d},expiry={:d},ttl={:d}}}",
|
||||
to_hex(acv.value().linearize()),
|
||||
acv.is_counter_update()
|
||||
? "counter_update_value=" + to_sstring(acv.counter_update_value())
|
||||
: to_hex(acv.value().linearize()),
|
||||
acv.timestamp(),
|
||||
acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1,
|
||||
acv.is_live_and_has_ttl() ? acv.ttl().count() : 0);
|
||||
@@ -238,8 +241,21 @@ operator<<(std::ostream& os, const atomic_cell_view::printer& acvp) {
|
||||
auto& type = acvp._type;
|
||||
auto& acv = acvp._cell;
|
||||
if (acv.is_live()) {
|
||||
std::ostringstream cell_value_string_builder;
|
||||
if (type.is_counter()) {
|
||||
if (acv.is_counter_update()) {
|
||||
cell_value_string_builder << "counter_update_value=" << acv.counter_update_value();
|
||||
} else {
|
||||
cell_value_string_builder << "shards: ";
|
||||
counter_cell_view::with_linearized(acv, [&cell_value_string_builder] (counter_cell_view& ccv) {
|
||||
cell_value_string_builder << ::join(", ", ccv.shards());
|
||||
});
|
||||
}
|
||||
} else {
|
||||
cell_value_string_builder << type.to_string(acv.value().linearize());
|
||||
}
|
||||
return fmt_print(os, "atomic_cell{{{},ts={:d},expiry={:d},ttl={:d}}}",
|
||||
type.to_string(acv.value().linearize()),
|
||||
cell_value_string_builder.str(),
|
||||
acv.timestamp(),
|
||||
acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1,
|
||||
acv.is_live_and_has_ttl() ? acv.ttl().count() : 0);
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
|
||||
#include "auth/resource.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -52,9 +53,9 @@ struct role_config_update final {
|
||||
///
|
||||
/// A logical argument error for a role-management operation.
|
||||
///
|
||||
class roles_argument_exception : public std::invalid_argument {
|
||||
class roles_argument_exception : public exceptions::invalid_request_exception {
|
||||
public:
|
||||
using std::invalid_argument::invalid_argument;
|
||||
using exceptions::invalid_request_exception::invalid_request_exception;
|
||||
};
|
||||
|
||||
class role_already_exists : public roles_argument_exception {
|
||||
|
||||
@@ -430,14 +430,42 @@ operator<<(std::ostream& os, const cql3_type::raw& r) {
|
||||
namespace util {
|
||||
|
||||
sstring maybe_quote(const sstring& identifier) {
|
||||
static const std::regex unquoted_identifier_re("[a-z][a-z0-9_]*");
|
||||
if (std::regex_match(identifier.begin(), identifier.end(), unquoted_identifier_re)) {
|
||||
const auto* p = identifier.begin();
|
||||
const auto* ep = identifier.end();
|
||||
|
||||
// quote empty string
|
||||
if (__builtin_expect(p == ep, false)) {
|
||||
return "\"\"";
|
||||
}
|
||||
|
||||
// string needs no quoting if it matches [a-z][a-z0-9_]*
|
||||
// quotes ('"') in the string are doubled
|
||||
bool need_quotes;
|
||||
bool has_quotes;
|
||||
auto c = *p;
|
||||
if ('a' <= c && c <= 'z') {
|
||||
need_quotes = false;
|
||||
has_quotes = false;
|
||||
} else {
|
||||
need_quotes = true;
|
||||
has_quotes = (c == '"');
|
||||
}
|
||||
while ((++p != ep) && !has_quotes) {
|
||||
c = *p;
|
||||
if (!(('a' <= c && c <= 'z') || ('0' <= c && c <= '9') || (c == '_'))) {
|
||||
need_quotes = true;
|
||||
has_quotes = (c == '"');
|
||||
}
|
||||
}
|
||||
|
||||
if (!need_quotes) {
|
||||
return identifier;
|
||||
}
|
||||
if (!has_quotes) {
|
||||
return make_sstring("\"", identifier, "\"");
|
||||
}
|
||||
static const std::regex double_quote_re("\"");
|
||||
std::string result = identifier;
|
||||
std::regex_replace(result, double_quote_re, "\"\"");
|
||||
return '"' + result + '"';
|
||||
return '"' + std::regex_replace(identifier.c_str(), double_quote_re, "\"\"") + '"';
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -116,6 +116,7 @@ namespace sstables {
|
||||
class sstable;
|
||||
class entry_descriptor;
|
||||
class compaction_descriptor;
|
||||
class compaction_completion_desc;
|
||||
class foreign_sstable_open_info;
|
||||
class sstables_manager;
|
||||
|
||||
@@ -592,9 +593,8 @@ private:
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables);
|
||||
|
||||
// Rebuilds the sstable set right away and schedule deletion of old sstables.
|
||||
void on_compaction_completion(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove);
|
||||
// Rebuild sstable set, delete input sstables right away, and update row cache and statistics.
|
||||
void on_compaction_completion(sstables::compaction_completion_desc& desc);
|
||||
|
||||
void rebuild_statistics();
|
||||
|
||||
|
||||
@@ -1323,7 +1323,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
std::vector<iovec> v;
|
||||
v.reserve(n);
|
||||
size_t m = 0;
|
||||
while (m < rem && n < max_write) {
|
||||
while (m < rem && n--) {
|
||||
auto s = std::min(rem - m, buf_size);
|
||||
v.emplace_back(iovec{ buf.get_write(), s});
|
||||
m += s;
|
||||
|
||||
@@ -704,6 +704,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||
// (last_modification - manager::hints_timer_period) old.
|
||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -726,6 +727,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||
++this->shard_stats().discarded;
|
||||
}
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}).finally([units = std::move(units), ctx_ptr] {});
|
||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||
|
||||
@@ -678,9 +678,14 @@ void view_updates::generate_update(
|
||||
return;
|
||||
}
|
||||
|
||||
bool should_update = false;
|
||||
bool should_replace = false;
|
||||
bool should_create = false;
|
||||
// If one of the key columns is missing, set has_new_row = false
|
||||
// meaning that after the update there will be no view row.
|
||||
// If one of the key columns is missing in the existing value,
|
||||
// set has_old_row = false meaning we don't have an old row to
|
||||
// delete.
|
||||
bool has_old_row = true;
|
||||
bool has_new_row = true;
|
||||
bool same_row = true;
|
||||
for (auto col_id : col_ids) {
|
||||
auto* after = update.cells().find_cell(col_id);
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
@@ -690,27 +695,31 @@ void view_updates::generate_update(
|
||||
if (before && before->as_atomic_cell(cdef).is_live()) {
|
||||
if (after && after->as_atomic_cell(cdef).is_live()) {
|
||||
auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(cdef), after->as_atomic_cell(cdef));
|
||||
if (cmp == 0) {
|
||||
should_update = true;
|
||||
} else {
|
||||
should_replace = true;
|
||||
if (cmp != 0) {
|
||||
same_row = false;
|
||||
}
|
||||
} else {
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
has_old_row = false;
|
||||
}
|
||||
} else {
|
||||
has_old_row = false;
|
||||
}
|
||||
if (after && after->as_atomic_cell(cdef).is_live()) {
|
||||
should_create = true;
|
||||
if (!after || !after->as_atomic_cell(cdef).is_live()) {
|
||||
has_new_row = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (should_replace) {
|
||||
replace_entry(base_key, update, *existing, now);
|
||||
} else if (should_update) {
|
||||
update_entry(base_key, update, *existing, now);
|
||||
} else if (should_create) {
|
||||
if (has_old_row) {
|
||||
if (has_new_row) {
|
||||
if (same_row) {
|
||||
update_entry(base_key, update, *existing, now);
|
||||
} else {
|
||||
replace_entry(base_key, update, *existing, now);
|
||||
}
|
||||
} else {
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
}
|
||||
} else if (has_new_row) {
|
||||
create_entry(base_key, update, now);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -543,6 +543,15 @@ to_partition_range(dht::token_range r) {
|
||||
return { std::move(start), std::move(end) };
|
||||
}
|
||||
|
||||
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges) {
|
||||
dht::partition_range_vector prs;
|
||||
prs.reserve(ranges.size());
|
||||
for (auto& range : ranges) {
|
||||
prs.push_back(dht::to_partition_range(range));
|
||||
}
|
||||
return prs;
|
||||
}
|
||||
|
||||
std::map<unsigned, dht::partition_range_vector>
|
||||
split_range_to_shards(dht::partition_range pr, const schema& s) {
|
||||
std::map<unsigned, dht::partition_range_vector> ret;
|
||||
|
||||
@@ -903,6 +903,7 @@ public:
|
||||
};
|
||||
|
||||
dht::partition_range to_partition_range(dht::token_range);
|
||||
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges);
|
||||
|
||||
// Each shard gets a sorted, disjoint vector of ranges
|
||||
std::map<unsigned, dht::partition_range_vector>
|
||||
|
||||
1
dist/common/scripts/scylla_fstrim_setup
vendored
1
dist/common/scripts/scylla_fstrim_setup
vendored
@@ -31,6 +31,7 @@ if __name__ == '__main__':
|
||||
sys.exit(1)
|
||||
if is_systemd():
|
||||
systemd_unit('scylla-fstrim.timer').unmask()
|
||||
systemd_unit('scylla-fstrim.timer').enable()
|
||||
if is_redhat_variant():
|
||||
systemd_unit('fstrim.timer').disable()
|
||||
if dist_name() == 'Ubuntu' and os.path.exists('/etc/cron.weekly/fstrim'):
|
||||
|
||||
13
dist/common/scripts/scylla_raid_setup
vendored
13
dist/common/scripts/scylla_raid_setup
vendored
@@ -130,17 +130,14 @@ if __name__ == '__main__':
|
||||
|
||||
makedirs(mount_at)
|
||||
run('mount -t xfs -o noatime {raid} "{mount_at}"'.format(raid=fsdev, mount_at=mount_at))
|
||||
|
||||
makedirs('{}/data'.format(root))
|
||||
makedirs('{}/commitlog'.format(root))
|
||||
makedirs('{}/coredump'.format(root))
|
||||
|
||||
uid = pwd.getpwnam('scylla').pw_uid
|
||||
gid = grp.getgrnam('scylla').gr_gid
|
||||
os.chown(root, uid, gid)
|
||||
os.chown('{}/data'.format(root), uid, gid)
|
||||
os.chown('{}/commitlog'.format(root), uid, gid)
|
||||
os.chown('{}/coredump'.format(root), uid, gid)
|
||||
|
||||
for d in ['coredump', 'data', 'commitlog', 'hints', 'view_hints', 'saved_caches']:
|
||||
dpath = '{}/{}'.format(root, d)
|
||||
makedirs(dpath)
|
||||
os.chown(dpath, uid, gid)
|
||||
|
||||
if args.update_fstab:
|
||||
res = out('blkid {}'.format(fsdev))
|
||||
|
||||
6
dist/common/scripts/scylla_util.py
vendored
6
dist/common/scripts/scylla_util.py
vendored
@@ -182,7 +182,7 @@ class aws_instance:
|
||||
instance_size = self.instance_size()
|
||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||
return 'ixgbevf'
|
||||
if instance_class in ['c5', 'c5d', 'f1', 'g3', 'h1', 'i3', 'i3en', 'm5', 'm5d', 'p2', 'p3', 'r4', 'x1']:
|
||||
if instance_class in ['a1', 'c5', 'c5d', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||
return 'ena'
|
||||
if instance_class == 'm4':
|
||||
if instance_size == '16xlarge':
|
||||
@@ -481,8 +481,8 @@ def parse_scylla_dirs_with_default(conf='/etc/scylla/scylla.yaml'):
|
||||
y['data_file_directories'] = [os.path.join(y['workdir'], 'data')]
|
||||
for t in [ "commitlog", "hints", "view_hints", "saved_caches" ]:
|
||||
key = "%s_directory" % t
|
||||
if key not in y or not y[k]:
|
||||
y[k] = os.path.join(y['workdir'], t)
|
||||
if key not in y or not y[key]:
|
||||
y[key] = os.path.join(y['workdir'], t)
|
||||
return y
|
||||
|
||||
|
||||
|
||||
@@ -2171,25 +2171,24 @@ future<> gossiper::wait_for_range_setup() {
|
||||
}
|
||||
|
||||
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) {
|
||||
// We allow to bootstrap a new node in only two cases:
|
||||
// 1) The node is a completely new node and no state in gossip at all
|
||||
// 2) The node has state in gossip and it is already removed from the
|
||||
// cluster either by nodetool decommission or nodetool removenode
|
||||
auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
|
||||
// if there's no previous state, or the node was previously removed from the cluster, we're good
|
||||
if (!eps || is_dead_state(*eps)) {
|
||||
return true;
|
||||
bool allowed = true;
|
||||
if (!eps) {
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
sstring status = get_gossip_status(*eps);
|
||||
|
||||
logger.debug("is_safe_for_bootstrap: node {} status {}", endpoint, status);
|
||||
|
||||
// these states are not allowed to join the cluster as it would not be safe
|
||||
std::unordered_set<sstring> unsafe_statuses{
|
||||
sstring(""), // failed bootstrap but we did start gossiping
|
||||
sstring(versioned_value::STATUS_NORMAL), // node is legit in the cluster or it was stopped with kill -9
|
||||
sstring(versioned_value::SHUTDOWN) // node was shutdown
|
||||
std::unordered_set<sstring> allowed_statuses{
|
||||
sstring(versioned_value::STATUS_LEFT),
|
||||
sstring(versioned_value::REMOVED_TOKEN),
|
||||
};
|
||||
|
||||
return !unsafe_statuses.count(status);
|
||||
allowed = allowed_statuses.count(status);
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status={}, allowed_to_bootstrap={}", endpoint, status, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
std::set<sstring> to_feature_set(sstring features_string) {
|
||||
|
||||
@@ -69,7 +69,8 @@ std::ostream& gms::operator<<(std::ostream& os, const inet_address& x) {
|
||||
auto&& bytes = x.bytes();
|
||||
auto i = 0u;
|
||||
auto acc = 0u;
|
||||
for (auto b : bytes) {
|
||||
// extra paranoid sign extension evasion - #5808
|
||||
for (uint8_t b : bytes) {
|
||||
acc <<= 8;
|
||||
acc |= b;
|
||||
if ((++i & 1) == 0) {
|
||||
|
||||
@@ -85,6 +85,10 @@ network_topology_strategy::network_topology_strategy(
|
||||
"NetworkTopologyStrategy");
|
||||
}
|
||||
|
||||
if (val.empty() || std::any_of(val.begin(), val.end(), [] (char c) {return !isdigit(c);})) {
|
||||
throw exceptions::configuration_exception(
|
||||
format("Replication factor must be numeric and non-negative, found '{}'", val));
|
||||
}
|
||||
_dc_rep_factor.emplace(key, std::stol(val));
|
||||
_datacenteres.push_back(key);
|
||||
}
|
||||
|
||||
@@ -2602,7 +2602,7 @@ void mutation_cleaner_impl::start_worker() {
|
||||
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
|
||||
auto&& region = snp.region();
|
||||
return with_allocator(region.allocator(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
{
|
||||
// Allocating sections require the region to be reclaimable
|
||||
// which means that they cannot be nested.
|
||||
// It is, however, possible, that if the snapshot is taken
|
||||
@@ -2614,13 +2614,15 @@ stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexce
|
||||
}
|
||||
try {
|
||||
return _worker_state->alloc_section(region, [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
return snp.merge_partition_versions(_app_stats);
|
||||
});
|
||||
});
|
||||
} catch (...) {
|
||||
// Merging failed, give up as there is no guarantee of forward progress.
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -177,6 +177,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
|
||||
return handle_end_of_stream();
|
||||
}
|
||||
});
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
for (auto& q : _queue_reader_handles) {
|
||||
if (q) {
|
||||
q->abort(ep);
|
||||
}
|
||||
}
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -21,8 +21,12 @@
|
||||
# At the end of the build we check that the build-id is indeed in the
|
||||
# first page. At install time we check that patchelf doesn't modify
|
||||
# the program headers.
|
||||
|
||||
# gdb has a SO_NAME_MAX_PATH_SIZE of 512, so limit the path size to
|
||||
# that. The 512 includes the null at the end, hence the 511 bellow.
|
||||
|
||||
ORIGINAL_DYNAMIC_LINKER=$(gcc -### /dev/null -o t 2>&1 | perl -n -e '/-dynamic-linker ([^ ]*) / && print $1')
|
||||
DYNAMIC_LINKER=$(printf "%2000s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
DYNAMIC_LINKER=$(printf "%511s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
|
||||
COMMON_FLAGS="--enable-dpdk --cflags=-ffile-prefix-map=$PWD=. --ldflags=-Wl,--build-id=sha1,--dynamic-linker=$DYNAMIC_LINKER"
|
||||
|
||||
|
||||
@@ -444,7 +444,7 @@ class repair_writer {
|
||||
uint64_t _estimated_partitions;
|
||||
size_t _nr_peer_nodes;
|
||||
// Needs more than one for repair master
|
||||
std::vector<std::optional<future<uint64_t>>> _writer_done;
|
||||
std::vector<std::optional<future<>>> _writer_done;
|
||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||
// Current partition written to disk
|
||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||
@@ -452,6 +452,7 @@ class repair_writer {
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -524,7 +525,15 @@ public:
|
||||
return consumer(std::move(reader));
|
||||
});
|
||||
},
|
||||
t.stream_in_progress());
|
||||
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
_mq[node_idx]->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
future<> write_partition_end(unsigned node_idx) {
|
||||
@@ -551,23 +560,41 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_wait_for_writer_done(unsigned node_idx) {
|
||||
if (_writer_done[node_idx]) {
|
||||
return std::move(*(_writer_done[node_idx]));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> wait_for_writer_done() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
||||
rlogger.debug("Managed to write partitions={} to sstable", partitions);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
@@ -1167,6 +1194,23 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
// Must run inside a seastar thread
|
||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||
@@ -1192,18 +1236,7 @@ private:
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
_repair_writer.create_writer(node_idx);
|
||||
for (auto& r : row_diff) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1214,15 +1247,7 @@ private:
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
_repair_writer.create_writer(node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1905,22 +1930,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1946,22 +1966,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1986,22 +2001,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
20
row_cache.cc
20
row_cache.cc
@@ -528,8 +528,12 @@ public:
|
||||
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
|
||||
{
|
||||
if (!mfopt) {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
return _cache._read_section(_cache._tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
});
|
||||
});
|
||||
}
|
||||
_cache.on_partition_miss();
|
||||
const partition_start& ps = mfopt->as_partition_start();
|
||||
@@ -952,13 +956,15 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
// expensive and we need to amortize it somehow.
|
||||
do {
|
||||
STAP_PROBE(scylla, row_cache_update_partition_start);
|
||||
with_linearized_managed_bytes([&] {
|
||||
{
|
||||
if (!update) {
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
memtable_entry& mem_e = *m.partitions.begin();
|
||||
size_entry = mem_e.size_in_allocator_without_rows(_tracker.allocator());
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
||||
update = updater(_update_section, cache_i, mem_e, is_present, real_dirty_acc);
|
||||
});
|
||||
});
|
||||
}
|
||||
// We use cooperative deferring instead of futures so that
|
||||
@@ -970,14 +976,16 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
update = {};
|
||||
real_dirty_acc.unpin_memory(size_entry);
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto i = m.partitions.begin();
|
||||
memtable_entry& mem_e = *i;
|
||||
m.partitions.erase(i);
|
||||
mem_e.partition().evict(_tracker.memtable_cleaner());
|
||||
current_allocator().destroy(&mem_e);
|
||||
});
|
||||
});
|
||||
++partition_count;
|
||||
});
|
||||
}
|
||||
STAP_PROBE(scylla, row_cache_update_partition_end);
|
||||
} while (!m.partitions.empty() && !need_preempt());
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
@@ -1124,8 +1132,8 @@ future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&
|
||||
seastar::thread::maybe_yield();
|
||||
|
||||
while (true) {
|
||||
auto done = with_linearized_managed_bytes([&] {
|
||||
return _update_section(_tracker.region(), [&] {
|
||||
auto done = _update_section(_tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
auto it = _partitions.lower_bound(*_prev_snapshot_pos, cmp);
|
||||
auto end = _partitions.lower_bound(dht::ring_position_view::for_range_end(range), cmp);
|
||||
|
||||
@@ -290,10 +290,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
|
||||
+ column_offset(column_kind::regular_column),
|
||||
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
||||
|
||||
std::sort(_raw._columns.begin(),
|
||||
std::stable_sort(_raw._columns.begin(),
|
||||
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
_raw._columns.begin() + column_offset(column_kind::static_column),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
|
||||
|
||||
@@ -33,9 +33,10 @@ import os
|
||||
procs = os.sysconf('SC_NPROCESSORS_ONLN')
|
||||
mem = os.sysconf('SC_PHYS_PAGES') * os.sysconf('SC_PAGESIZE')
|
||||
|
||||
mem_reserve = 1000000000
|
||||
job_mem = 4000000000
|
||||
|
||||
jobs = min(procs, mem // job_mem)
|
||||
jobs = min(procs, (mem-mem_reserve) // job_mem)
|
||||
jobs = max(jobs, 1)
|
||||
|
||||
print(jobs)
|
||||
|
||||
@@ -45,7 +45,7 @@ if [[ "$ID" = "debian" && "$VERSION_ID" = "8" ]] || [[ "$ID" = "ubuntu" && "$VER
|
||||
echo "scylla ALL=(ALL) NOPASSWD: /opt/scylladb/scripts/scylla_prepare,/opt/scylladb/scripts/scylla_stop,/opt/scylladb/scripts/scylla_io_setup,/opt/scylladb/scripts/scylla-ami/scylla_ami_setup" > /etc/sudoers.d/scylla
|
||||
else
|
||||
# AmbientCapabilities supported from v229 but it backported to v219-33 on RHEL7
|
||||
if [ $SYSTEMD_VER -ge 229 ] || [ $SYSTEMD_VER -eq 219 ] && [ $SYSTEMD_REL -ge 33 ]; then
|
||||
if [ $SYSTEMD_VER -ge 229 ] || [[ $SYSTEMD_VER -eq 219 && $SYSTEMD_REL -ge 33 ]]; then
|
||||
if [ $AMB_SUPPORT -eq 1 ]; then
|
||||
mkdir -p /etc/systemd/system/scylla-server.service.d/
|
||||
cat << EOS > /etc/systemd/system/scylla-server.service.d/capabilities.conf
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: a0bdc6cd85...30f03aeba9
@@ -3254,7 +3254,9 @@ protected:
|
||||
uint32_t original_partition_limit() const {
|
||||
return _cmd->partition_limit;
|
||||
}
|
||||
virtual void adjust_targets_for_reconciliation() {}
|
||||
void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
|
||||
adjust_targets_for_reconciliation();
|
||||
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
@@ -3481,6 +3483,9 @@ public:
|
||||
virtual void got_cl() override {
|
||||
_speculate_timer.cancel();
|
||||
}
|
||||
virtual void adjust_targets_for_reconciliation() override {
|
||||
_targets = used_targets();
|
||||
}
|
||||
};
|
||||
|
||||
class range_slice_read_executor : public never_speculating_read_executor {
|
||||
@@ -3527,7 +3532,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
|
||||
// reordering of endpoints happens. The local endpoint, if
|
||||
// present, is always first in the list, as get_live_sorted_endpoints()
|
||||
// orders the list by proximity to the local endpoint.
|
||||
is_read_non_local |= all_replicas.front() != utils::fb_utilities::get_broadcast_address();
|
||||
is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address();
|
||||
|
||||
auto cf = _db.local().find_column_family(schema).shared_from_this();
|
||||
std::vector<gms::inet_address> target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
|
||||
|
||||
@@ -903,12 +903,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
|
||||
@@ -452,6 +452,11 @@ protected:
|
||||
encoding_stats get_encoding_stats() const {
|
||||
return _stats_collector.get();
|
||||
}
|
||||
|
||||
virtual compaction_completion_desc
|
||||
get_compaction_completion_desc(std::vector<shared_sstable> input_sstables, std::vector<shared_sstable> output_sstables) {
|
||||
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables)};
|
||||
}
|
||||
public:
|
||||
compaction& operator=(const compaction&) = delete;
|
||||
compaction(const compaction&) = delete;
|
||||
@@ -828,7 +833,7 @@ private:
|
||||
_compacting->erase(sst);
|
||||
});
|
||||
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
|
||||
_replacer(exhausted_ssts, std::move(_new_unused_sstables));
|
||||
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
|
||||
_sstables.erase(exhausted, _sstables.end());
|
||||
backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
|
||||
}
|
||||
@@ -838,7 +843,7 @@ private:
|
||||
if (!_sstables.empty()) {
|
||||
std::vector<shared_sstable> sstables_compacted;
|
||||
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(sstables_compacted));
|
||||
_replacer(std::move(sstables_compacted), std::move(_new_unused_sstables));
|
||||
_replacer(get_compaction_completion_desc(std::move(sstables_compacted), std::move(_new_unused_sstables)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -869,9 +874,42 @@ private:
|
||||
};
|
||||
|
||||
class cleanup_compaction final : public regular_compaction {
|
||||
dht::token_range_vector _owned_ranges;
|
||||
private:
|
||||
dht::partition_range_vector
|
||||
get_ranges_for_invalidation(const std::vector<shared_sstable>& sstables) {
|
||||
auto owned_ranges = dht::to_partition_ranges(_owned_ranges);
|
||||
|
||||
auto non_owned_ranges = boost::copy_range<dht::partition_range_vector>(sstables
|
||||
| boost::adaptors::transformed([] (const shared_sstable& sst) {
|
||||
return dht::partition_range::make({sst->get_first_decorated_key(), true},
|
||||
{sst->get_last_decorated_key(), true});
|
||||
}));
|
||||
// optimize set of potentially overlapping ranges by deoverlapping them.
|
||||
non_owned_ranges = dht::partition_range::deoverlap(std::move(non_owned_ranges), dht::ring_position_comparator(*_schema));
|
||||
|
||||
// subtract *each* owned range from the partition range of *each* sstable*,
|
||||
// such that we'll be left only with a set of non-owned ranges.
|
||||
for (auto& owned_range : owned_ranges) {
|
||||
dht::partition_range_vector new_non_owned_ranges;
|
||||
for (auto& non_owned_range : non_owned_ranges) {
|
||||
auto ret = non_owned_range.subtract(owned_range, dht::ring_position_comparator(*_schema));
|
||||
new_non_owned_ranges.insert(new_non_owned_ranges.end(), ret.begin(), ret.end());
|
||||
}
|
||||
non_owned_ranges = std::move(new_non_owned_ranges);
|
||||
}
|
||||
return non_owned_ranges;
|
||||
}
|
||||
protected:
|
||||
virtual compaction_completion_desc
|
||||
get_compaction_completion_desc(std::vector<shared_sstable> input_sstables, std::vector<shared_sstable> output_sstables) override {
|
||||
auto ranges_for_for_invalidation = get_ranges_for_invalidation(input_sstables);
|
||||
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)};
|
||||
}
|
||||
public:
|
||||
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, std::function<shared_sstable()> creator, replacer_fn replacer)
|
||||
: regular_compaction(cf, std::move(descriptor), std::move(creator), std::move(replacer))
|
||||
, _owned_ranges(service::get_local_storage_service().get_local_ranges(_schema->ks_name()))
|
||||
{
|
||||
_info->type = compaction_type::Cleanup;
|
||||
}
|
||||
@@ -885,15 +923,13 @@ public:
|
||||
}
|
||||
|
||||
flat_mutation_reader::filter make_partition_filter() const override {
|
||||
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
|
||||
|
||||
return [this, owned_ranges = std::move(owned_ranges)] (const dht::decorated_key& dk) {
|
||||
return [this] (const dht::decorated_key& dk) {
|
||||
if (dht::shard_of(dk.token()) != engine().cpu_id()) {
|
||||
clogger.trace("Token {} does not belong to CPU {}, skipping", dk.token(), engine().cpu_id());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!belongs_to_current_node(dk.token(), owned_ranges)) {
|
||||
if (!belongs_to_current_node(dk.token(), _owned_ranges)) {
|
||||
clogger.trace("Token {} does not belong to this node, skipping", dk.token());
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "gc_clock.hh"
|
||||
#include "compaction_weight_registration.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <functional>
|
||||
|
||||
@@ -133,8 +134,15 @@ namespace sstables {
|
||||
}
|
||||
};
|
||||
|
||||
struct compaction_completion_desc {
|
||||
std::vector<shared_sstable> input_sstables;
|
||||
std::vector<shared_sstable> output_sstables;
|
||||
// Set of compacted partition ranges that should be invalidated in the cache.
|
||||
dht::partition_range_vector ranges_for_cache_invalidation;
|
||||
};
|
||||
|
||||
// Replaces old sstable(s) by new one(s) which contain all non-expired data.
|
||||
using replacer_fn = std::function<void(std::vector<shared_sstable> removed, std::vector<shared_sstable> added)>;
|
||||
using replacer_fn = std::function<void(compaction_completion_desc)>;
|
||||
|
||||
// Compact a list of N sstables into M sstables.
|
||||
// Returns info about the finished compaction, which includes vector to new sstables.
|
||||
|
||||
@@ -85,7 +85,7 @@ private:
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
uint32_t _promoted_index_end;
|
||||
uint64_t _promoted_index_end;
|
||||
uint64_t _position;
|
||||
uint64_t _partition_header_length = 0;
|
||||
std::optional<deletion_time> _deletion_time;
|
||||
|
||||
@@ -1310,7 +1310,7 @@ future<> sstable::open_data() {
|
||||
c->pop_back();
|
||||
return make_ready_future<>();
|
||||
}).then([this, c] () mutable {
|
||||
c = {};
|
||||
*c = {};
|
||||
_open = true;
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
@@ -69,15 +69,6 @@ stream_transfer_task::stream_transfer_task(shared_ptr<stream_session> session, U
|
||||
|
||||
stream_transfer_task::~stream_transfer_task() = default;
|
||||
|
||||
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges) {
|
||||
dht::partition_range_vector prs;
|
||||
prs.reserve(ranges.size());
|
||||
for (auto& range : ranges) {
|
||||
prs.push_back(dht::to_partition_range(range));
|
||||
}
|
||||
return prs;
|
||||
}
|
||||
|
||||
struct send_info {
|
||||
database& db;
|
||||
utils::UUID plan_id;
|
||||
@@ -103,7 +94,7 @@ struct send_info {
|
||||
, reason(reason_)
|
||||
, cf(db.find_column_family(cf_id))
|
||||
, ranges(std::move(ranges_))
|
||||
, prs(to_partition_ranges(ranges))
|
||||
, prs(dht::to_partition_ranges(ranges))
|
||||
, reader(cf.make_streaming_reader(cf.schema(), prs)) {
|
||||
}
|
||||
future<bool> has_relevant_range_on_this_shard() {
|
||||
|
||||
30
table.cc
30
table.cc
@@ -1181,8 +1181,7 @@ table::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sst
|
||||
|
||||
// Note: must run in a seastar thread
|
||||
void
|
||||
table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
|
||||
// Build a new list of _sstables: We remove from the existing list the
|
||||
// tables we compacted (by now, there might be more sstables flushed
|
||||
// later), and we add the new tables generated by the compaction.
|
||||
@@ -1195,7 +1194,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
|
||||
// unbounded time, because all shards must agree on the deletion).
|
||||
|
||||
// make sure all old sstables belong *ONLY* to current shard before we proceed to their deletion.
|
||||
for (auto& sst : sstables_to_remove) {
|
||||
for (auto& sst : desc.input_sstables) {
|
||||
auto shards = sst->get_shards_for_this_sstable();
|
||||
if (shards.size() > 1) {
|
||||
throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used shared sstable {}. Only resharding work with those!",
|
||||
@@ -1209,9 +1208,12 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
|
||||
|
||||
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
|
||||
// rebuilding _sstables_compacted_but_not_deleted first to make the entire rebuild operation exception safe.
|
||||
new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), desc.input_sstables.begin(), desc.input_sstables.end());
|
||||
|
||||
rebuild_sstable_list(new_sstables, sstables_to_remove);
|
||||
_cache.invalidate([this, &desc] () noexcept {
|
||||
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
||||
rebuild_sstable_list(desc.output_sstables, desc.input_sstables);
|
||||
}, std::move(desc.ranges_for_cache_invalidation)).get();
|
||||
|
||||
// refresh underlying data source in row cache to prevent it from holding reference
|
||||
// to sstables files that are about to be deleted.
|
||||
@@ -1221,7 +1223,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
|
||||
|
||||
rebuild_statistics();
|
||||
|
||||
auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
|
||||
auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove = desc.input_sstables] {
|
||||
return with_semaphore(_sstable_deletion_sem, 1, [sstables_to_remove = std::move(sstables_to_remove)] {
|
||||
return sstables::delete_atomically(std::move(sstables_to_remove));
|
||||
});
|
||||
@@ -1239,7 +1241,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
|
||||
// or they could stay forever in the set, resulting in deleted files remaining
|
||||
// opened and disk space not being released until shutdown.
|
||||
std::unordered_set<sstables::shared_sstable> s(
|
||||
sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
desc.input_sstables.begin(), desc.input_sstables.end());
|
||||
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
|
||||
return s.count(sst);
|
||||
});
|
||||
@@ -1296,13 +1298,12 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) {
|
||||
sst->set_unshared();
|
||||
return sst;
|
||||
};
|
||||
auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (std::vector<sstables::shared_sstable> old_ssts,
|
||||
std::vector<sstables::shared_sstable> new_ssts) {
|
||||
_compaction_strategy.notify_completion(old_ssts, new_ssts);
|
||||
_compaction_manager.propagate_replacement(this, old_ssts, new_ssts);
|
||||
this->on_compaction_completion(new_ssts, old_ssts);
|
||||
auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) {
|
||||
_compaction_strategy.notify_completion(desc.input_sstables, desc.output_sstables);
|
||||
_compaction_manager.propagate_replacement(this, desc.input_sstables, desc.output_sstables);
|
||||
this->on_compaction_completion(desc);
|
||||
if (release_exhausted) {
|
||||
release_exhausted(old_ssts);
|
||||
release_exhausted(desc.input_sstables);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1889,6 +1890,9 @@ future<> table::flush_streaming_mutations(utils::UUID plan_id, dht::partition_ra
|
||||
return _streaming_memtables->seal_active_memtable_delayed().then([this] {
|
||||
return _streaming_flush_phaser.advance_and_await();
|
||||
}).then([this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable {
|
||||
if (sstables.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _cache.invalidate([this, sstables = std::move(sstables)] () mutable noexcept {
|
||||
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
||||
for (auto&& sst : sstables) {
|
||||
|
||||
@@ -31,6 +31,44 @@
|
||||
#include "cql3/statements/raw/parsed_statement.hh"
|
||||
#include "cql3/util.hh"
|
||||
|
||||
//
|
||||
// Test basic CQL string quoting
|
||||
//
|
||||
BOOST_AUTO_TEST_CASE(maybe_quote) {
|
||||
std::string s(65536, 'x');
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(s), s);
|
||||
s += " " + std::string(65536, 'y');
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(s), "\"" + s + "\"");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("a"), "a");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("z"), "z");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("b0"), "b0");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("y9"), "y9");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("c_d"), "c_d");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("x8_"), "x8_");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(""), "\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("0"), "\"0\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("9"), "\"9\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("_"), "\"_\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("A"), "\"A\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("To"), "\"To\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("zeD"), "\"zeD\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world"), "\"hello world\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello_world01234"), "hello_world01234");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world01234"), "\"hello world01234\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world\"1234"), "\"hello world\"\"1234\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello_world01234hello_world01234"), "hello_world01234hello_world01234");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world01234hello_world01234"), "\"hello world01234hello_world01234\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world\"1234hello_world\"1234"), "\"hello world\"\"1234hello_world\"\"1234\"");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\""), "\"\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("[\"]"), "\"[\"\"]\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"\""), "\"\"\"\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"hell0\""), "\"\"\"hell0\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello \"my\" world"), "\"hello \"\"my\"\" world\"");
|
||||
}
|
||||
|
||||
//
|
||||
// These tests verify that all excepted variations of CQL syntax related to access-control ("auth") functionality are
|
||||
// accepted by the parser. They do not verify that invalid syntax is rejected, nor do they verify the correctness of
|
||||
|
||||
@@ -4491,6 +4491,44 @@ SEASTAR_TEST_CASE(test_view_with_two_regular_base_columns_in_key) {
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{{int32_type->decompose(11), int32_type->decompose(13), int32_type->decompose(1), int32_type->decompose(2)}},
|
||||
});
|
||||
|
||||
// Reproduce issue #6008 - updates with not-previously-existing row,
|
||||
// not setting both v1 and v2 - should not create a view row, and
|
||||
// definitely not cause a crash as they did in #6008. Same for
|
||||
// deletes when no previous row exists.
|
||||
cquery_nofail(e, "DELETE FROM t WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "UPDATE t SET v1 = 17 WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
cquery_nofail(e, "DELETE FROM t WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "UPDATE t SET v2 = 7 WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
// Same tests as above, but with a row marker left behind, so there
|
||||
// is an existing base row - it's just empty.
|
||||
cquery_nofail(e, "INSERT INTO t (p, c, v1, v2) VALUES (1, 2, 3, 4)");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{{int32_type->decompose(3), int32_type->decompose(4), int32_type->decompose(1), int32_type->decompose(2)}},
|
||||
});
|
||||
cquery_nofail(e, "UPDATE t SET v1 = NULL, v2 = NULL WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "UPDATE t SET v1 = 17 WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
cquery_nofail(e, "UPDATE t SET v1 = NULL, v2 = NULL WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "UPDATE t SET v2 = 7 WHERE p = 1 AND c = 2");
|
||||
msg = cquery_nofail(e, "SELECT * FROM tv");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -118,6 +118,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
|
||||
auto muts = gen(partition_nr);
|
||||
schema_ptr s = gen.schema();
|
||||
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
|
||||
int mf_produced = 0;
|
||||
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
|
||||
if (mf_produced++ > 800) {
|
||||
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
|
||||
} else {
|
||||
return source_reader(db::no_timeout);
|
||||
}
|
||||
};
|
||||
auto& partitioner = dht::global_partitioner();
|
||||
try {
|
||||
distribute_reader_and_consume_on_shards(s, partitioner,
|
||||
make_generating_reader(s, std::move(get_next_mutation_fragment)),
|
||||
[&partitioner, error] (flat_mutation_reader reader) mutable {
|
||||
if (error) {
|
||||
return make_exception_future<>(std::runtime_error("Failed to write"));
|
||||
}
|
||||
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
|
||||
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
|
||||
if (mf_opt) {
|
||||
if (mf_opt->is_partition_start()) {
|
||||
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
|
||||
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
).get0();
|
||||
} catch (...) {
|
||||
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class bucket_writer {
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include <sstream>
|
||||
#include <boost/range/algorithm/adjacent_find.hpp>
|
||||
#include <boost/algorithm/cxx11/iota.hpp>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
static logging::logger nlogger("NetworkTopologyStrategyLogger");
|
||||
|
||||
@@ -607,4 +608,13 @@ SEASTAR_TEST_CASE(testCalculateEndpoints) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_invalid_dcs) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
for (auto& incorrect : std::vector<std::string>{"3\"", "", "!!!", "abcb", "!3", "-5", "0x123"}) {
|
||||
BOOST_REQUIRE_THROW(e.execute_cql("CREATE KEYSPACE abc WITH REPLICATION "
|
||||
"= {'class': 'NetworkTopologyStrategy', 'dc1':'" + incorrect + "'}").get(),
|
||||
exceptions::configuration_exception);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -200,5 +200,16 @@ BOOST_AUTO_TEST_CASE(inet_address) {
|
||||
auto res = ser::deserialize_from_buffer(buf, boost::type<gms::inet_address>{});
|
||||
BOOST_CHECK_EQUAL(res, ip);
|
||||
}
|
||||
|
||||
// stringify tests
|
||||
{
|
||||
for (sstring s : { "2001:6b0:8:2::232", "2a05:d018:223:f00:97af:f4d9:eac2:6a0f", "fe80::8898:3e04:215b:2cd6" }) {
|
||||
gms::inet_address ip(s);
|
||||
BOOST_CHECK(ip.addr().is_ipv6());
|
||||
auto s2 = boost::lexical_cast<std::string>(ip);
|
||||
gms::inet_address ip2(s);
|
||||
BOOST_CHECK_EQUAL(ip2, ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4835,7 +4835,9 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
|
||||
| boost::adaptors::transformed([] (auto& sst) { return sst->generation(); }));
|
||||
auto expected_sst = sstable_run.begin();
|
||||
auto closed_sstables_tracker = sstable_run.begin();
|
||||
auto replacer = [&] (auto old_sstables, auto new_sstables) {
|
||||
auto replacer = [&] (sstables::compaction_completion_desc desc) {
|
||||
auto old_sstables = std::move(desc.input_sstables);
|
||||
auto new_sstables = std::move(desc.output_sstables);
|
||||
BOOST_REQUIRE(expected_sst != sstable_run.end());
|
||||
if (incremental_enabled) {
|
||||
do_incremental_replace(std::move(old_sstables), std::move(new_sstables), expected_sst, closed_sstables_tracker);
|
||||
@@ -5371,8 +5373,9 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
|
||||
cf->add_sstable_and_update_cache(expired_sst).get();
|
||||
BOOST_REQUIRE(is_partition_dead(alpha));
|
||||
|
||||
auto replacer = [&] (std::vector<sstables::shared_sstable> old_sstables, std::vector<sstables::shared_sstable> new_sstables) {
|
||||
|
||||
auto replacer = [&] (sstables::compaction_completion_desc desc) {
|
||||
auto old_sstables = std::move(desc.input_sstables);
|
||||
auto new_sstables = std::move(desc.output_sstables);
|
||||
// expired_sst is exhausted, and new sstable is written with mut 2.
|
||||
BOOST_REQUIRE(old_sstables.size() == 1);
|
||||
BOOST_REQUIRE(old_sstables.front() == expired_sst);
|
||||
|
||||
@@ -223,7 +223,7 @@ public:
|
||||
};
|
||||
|
||||
inline auto replacer_fn_no_op() {
|
||||
return [](std::vector<shared_sstable> removed, std::vector<shared_sstable> added) -> void {};
|
||||
return [](sstables::compaction_completion_desc desc) -> void {};
|
||||
}
|
||||
|
||||
inline sstring get_test_dir(const sstring& name, const sstring& ks, const sstring& cf)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM fedora:31
|
||||
FROM docker.io/fedora:31
|
||||
ADD ./install-dependencies.sh ./
|
||||
ADD ./seastar/install-dependencies.sh ./seastar/
|
||||
ADD ./tools/toolchain/system-auth ./
|
||||
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-31-20200115
|
||||
docker.io/scylladb/scylla-toolchain:fedora-31-branch-3.3-20200615
|
||||
@@ -254,6 +254,14 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
|
||||
--_connections;
|
||||
return unadvertise_connection(conn);
|
||||
}).handle_exception([] (std::exception_ptr ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch(std::system_error& serr) {
|
||||
if (serr.code().category() == std::system_category() &&
|
||||
serr.code().value() == EPIPE) { // expected if another side closes a connection
|
||||
return;
|
||||
}
|
||||
} catch(...) {};
|
||||
clogger.info("exception while processing connection: {}", ep);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user