Compare commits
23 Commits
scylla-3.3
...
next-3.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd293768e7 | ||
|
|
22dfa48585 | ||
|
|
2f3d7f1408 | ||
|
|
76a08df939 | ||
|
|
6aa129d3b0 | ||
|
|
b4f781e4eb | ||
|
|
27594ca50e | ||
|
|
0f2f0d65d7 | ||
|
|
31c2f8a3ae | ||
|
|
ec12331f11 | ||
|
|
ccc463b5e5 | ||
|
|
4a9676f6b7 | ||
|
|
aaf4989c31 | ||
|
|
b29f954f20 | ||
|
|
5546d5df7b | ||
|
|
541c29677f | ||
|
|
06f18108c0 | ||
|
|
90002ca3d2 | ||
|
|
da23902311 | ||
|
|
2b0dc21f97 | ||
|
|
b544691493 | ||
|
|
d420b06844 | ||
|
|
b3a2cb2f68 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=3.3.2
|
||||
VERSION=3.3.4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -56,26 +56,22 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) {
|
||||
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
|
||||
}
|
||||
|
||||
static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
|
||||
std::vector<ss::token_range> res;
|
||||
for (auto d : service::get_local_storage_service().describe_ring(keyspace)) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
r.endpoint_details.push(ed);
|
||||
static ss::token_range token_range_endpoints_to_json(const dht::token_range_endpoints& d) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
r.end_token = d._end_token;
|
||||
r.endpoints = d._endpoints;
|
||||
r.rpc_endpoints = d._rpc_endpoints;
|
||||
for (auto det : d._endpoint_details) {
|
||||
ss::endpoint_detail ed;
|
||||
ed.host = det._host;
|
||||
ed.datacenter = det._datacenter;
|
||||
if (det._rack != "") {
|
||||
ed.rack = det._rack;
|
||||
}
|
||||
res.push_back(r);
|
||||
r.endpoint_details.push(ed);
|
||||
}
|
||||
return res;
|
||||
return r;
|
||||
}
|
||||
|
||||
void set_storage_service(http_context& ctx, routes& r) {
|
||||
@@ -177,13 +173,13 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
|
||||
ss::describe_any_ring.set(r, [&ctx](const_req req) {
|
||||
return describe_ring("");
|
||||
ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::describe_ring.set(r, [&ctx](const_req req) {
|
||||
auto keyspace = validate_keyspace(ctx, req.param);
|
||||
return describe_ring(keyspace);
|
||||
ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
|
||||
});
|
||||
|
||||
ss::get_host_id_map.set(r, [](const_req req) {
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
|
||||
#include "auth/resource.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -52,9 +53,9 @@ struct role_config_update final {
|
||||
///
|
||||
/// A logical argument error for a role-management operation.
|
||||
///
|
||||
class roles_argument_exception : public std::invalid_argument {
|
||||
class roles_argument_exception : public exceptions::invalid_request_exception {
|
||||
public:
|
||||
using std::invalid_argument::invalid_argument;
|
||||
using exceptions::invalid_request_exception::invalid_request_exception;
|
||||
};
|
||||
|
||||
class role_already_exists : public roles_argument_exception {
|
||||
|
||||
@@ -430,14 +430,42 @@ operator<<(std::ostream& os, const cql3_type::raw& r) {
|
||||
namespace util {
|
||||
|
||||
sstring maybe_quote(const sstring& identifier) {
|
||||
static const std::regex unquoted_identifier_re("[a-z][a-z0-9_]*");
|
||||
if (std::regex_match(identifier.begin(), identifier.end(), unquoted_identifier_re)) {
|
||||
const auto* p = identifier.begin();
|
||||
const auto* ep = identifier.end();
|
||||
|
||||
// quote empty string
|
||||
if (__builtin_expect(p == ep, false)) {
|
||||
return "\"\"";
|
||||
}
|
||||
|
||||
// string needs no quoting if it matches [a-z][a-z0-9_]*
|
||||
// quotes ('"') in the string are doubled
|
||||
bool need_quotes;
|
||||
bool has_quotes;
|
||||
auto c = *p;
|
||||
if ('a' <= c && c <= 'z') {
|
||||
need_quotes = false;
|
||||
has_quotes = false;
|
||||
} else {
|
||||
need_quotes = true;
|
||||
has_quotes = (c == '"');
|
||||
}
|
||||
while ((++p != ep) && !has_quotes) {
|
||||
c = *p;
|
||||
if (!(('a' <= c && c <= 'z') || ('0' <= c && c <= '9') || (c == '_'))) {
|
||||
need_quotes = true;
|
||||
has_quotes = (c == '"');
|
||||
}
|
||||
}
|
||||
|
||||
if (!need_quotes) {
|
||||
return identifier;
|
||||
}
|
||||
if (!has_quotes) {
|
||||
return make_sstring("\"", identifier, "\"");
|
||||
}
|
||||
static const std::regex double_quote_re("\"");
|
||||
std::string result = identifier;
|
||||
std::regex_replace(result, double_quote_re, "\"\"");
|
||||
return '"' + result + '"';
|
||||
return '"' + std::regex_replace(identifier.c_str(), double_quote_re, "\"\"") + '"';
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1323,7 +1323,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
std::vector<iovec> v;
|
||||
v.reserve(n);
|
||||
size_t m = 0;
|
||||
while (m < rem && n < max_write) {
|
||||
while (m < rem && n--) {
|
||||
auto s = std::min(rem - m, buf_size);
|
||||
v.emplace_back(iovec{ buf.get_write(), s});
|
||||
m += s;
|
||||
|
||||
@@ -704,6 +704,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
|
||||
// (last_modification - manager::hints_timer_period) old.
|
||||
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -726,6 +727,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
|
||||
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
|
||||
++this->shard_stats().discarded;
|
||||
}
|
||||
ctx_ptr->rps_set.erase(rp);
|
||||
return make_ready_future<>();
|
||||
}).finally([units = std::move(units), ctx_ptr] {});
|
||||
}).handle_exception([this, ctx_ptr] (auto eptr) {
|
||||
|
||||
13
dist/common/scripts/scylla_raid_setup
vendored
13
dist/common/scripts/scylla_raid_setup
vendored
@@ -130,17 +130,14 @@ if __name__ == '__main__':
|
||||
|
||||
makedirs(mount_at)
|
||||
run('mount -t xfs -o noatime {raid} "{mount_at}"'.format(raid=fsdev, mount_at=mount_at))
|
||||
|
||||
makedirs('{}/data'.format(root))
|
||||
makedirs('{}/commitlog'.format(root))
|
||||
makedirs('{}/coredump'.format(root))
|
||||
|
||||
uid = pwd.getpwnam('scylla').pw_uid
|
||||
gid = grp.getgrnam('scylla').gr_gid
|
||||
os.chown(root, uid, gid)
|
||||
os.chown('{}/data'.format(root), uid, gid)
|
||||
os.chown('{}/commitlog'.format(root), uid, gid)
|
||||
os.chown('{}/coredump'.format(root), uid, gid)
|
||||
|
||||
for d in ['coredump', 'data', 'commitlog', 'hints', 'view_hints', 'saved_caches']:
|
||||
dpath = '{}/{}'.format(root, d)
|
||||
makedirs(dpath)
|
||||
os.chown(dpath, uid, gid)
|
||||
|
||||
if args.update_fstab:
|
||||
res = out('blkid {}'.format(fsdev))
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -182,7 +182,7 @@ class aws_instance:
|
||||
instance_size = self.instance_size()
|
||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||
return 'ixgbevf'
|
||||
if instance_class in ['c5', 'c5d', 'f1', 'g3', 'h1', 'i3', 'i3en', 'm5', 'm5d', 'p2', 'p3', 'r4', 'x1']:
|
||||
if instance_class in ['a1', 'c5', 'c5d', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||
return 'ena'
|
||||
if instance_class == 'm4':
|
||||
if instance_size == '16xlarge':
|
||||
|
||||
@@ -2171,25 +2171,24 @@ future<> gossiper::wait_for_range_setup() {
|
||||
}
|
||||
|
||||
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) {
|
||||
// We allow to bootstrap a new node in only two cases:
|
||||
// 1) The node is a completely new node and no state in gossip at all
|
||||
// 2) The node has state in gossip and it is already removed from the
|
||||
// cluster either by nodetool decommission or nodetool removenode
|
||||
auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
|
||||
// if there's no previous state, or the node was previously removed from the cluster, we're good
|
||||
if (!eps || is_dead_state(*eps)) {
|
||||
return true;
|
||||
bool allowed = true;
|
||||
if (!eps) {
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
sstring status = get_gossip_status(*eps);
|
||||
|
||||
logger.debug("is_safe_for_bootstrap: node {} status {}", endpoint, status);
|
||||
|
||||
// these states are not allowed to join the cluster as it would not be safe
|
||||
std::unordered_set<sstring> unsafe_statuses{
|
||||
sstring(""), // failed bootstrap but we did start gossiping
|
||||
sstring(versioned_value::STATUS_NORMAL), // node is legit in the cluster or it was stopped with kill -9
|
||||
sstring(versioned_value::SHUTDOWN) // node was shutdown
|
||||
std::unordered_set<sstring> allowed_statuses{
|
||||
sstring(versioned_value::STATUS_LEFT),
|
||||
sstring(versioned_value::REMOVED_TOKEN),
|
||||
};
|
||||
|
||||
return !unsafe_statuses.count(status);
|
||||
allowed = allowed_statuses.count(status);
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status={}, allowed_to_bootstrap={}", endpoint, status, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
std::set<sstring> to_feature_set(sstring features_string) {
|
||||
|
||||
@@ -69,7 +69,8 @@ std::ostream& gms::operator<<(std::ostream& os, const inet_address& x) {
|
||||
auto&& bytes = x.bytes();
|
||||
auto i = 0u;
|
||||
auto acc = 0u;
|
||||
for (auto b : bytes) {
|
||||
// extra paranoid sign extension evasion - #5808
|
||||
for (uint8_t b : bytes) {
|
||||
acc <<= 8;
|
||||
acc |= b;
|
||||
if ((++i & 1) == 0) {
|
||||
|
||||
@@ -2602,7 +2602,7 @@ void mutation_cleaner_impl::start_worker() {
|
||||
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
|
||||
auto&& region = snp.region();
|
||||
return with_allocator(region.allocator(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
{
|
||||
// Allocating sections require the region to be reclaimable
|
||||
// which means that they cannot be nested.
|
||||
// It is, however, possible, that if the snapshot is taken
|
||||
@@ -2614,13 +2614,15 @@ stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexce
|
||||
}
|
||||
try {
|
||||
return _worker_state->alloc_section(region, [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
return snp.merge_partition_versions(_app_stats);
|
||||
});
|
||||
});
|
||||
} catch (...) {
|
||||
// Merging failed, give up as there is no guarantee of forward progress.
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -21,8 +21,12 @@
|
||||
# At the end of the build we check that the build-id is indeed in the
|
||||
# first page. At install time we check that patchelf doesn't modify
|
||||
# the program headers.
|
||||
|
||||
# gdb has a SO_NAME_MAX_PATH_SIZE of 512, so limit the path size to
|
||||
# that. The 512 includes the null at the end, hence the 511 bellow.
|
||||
|
||||
ORIGINAL_DYNAMIC_LINKER=$(gcc -### /dev/null -o t 2>&1 | perl -n -e '/-dynamic-linker ([^ ]*) / && print $1')
|
||||
DYNAMIC_LINKER=$(printf "%2000s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
DYNAMIC_LINKER=$(printf "%511s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
|
||||
|
||||
COMMON_FLAGS="--enable-dpdk --cflags=-ffile-prefix-map=$PWD=. --ldflags=-Wl,--build-id=sha1,--dynamic-linker=$DYNAMIC_LINKER"
|
||||
|
||||
|
||||
@@ -452,6 +452,7 @@ class repair_writer {
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -561,11 +562,13 @@ public:
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
return with_semaphore(_sem, 1, [this, node_idx] {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -588,6 +591,10 @@ public:
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
named_semaphore& sem() {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
@@ -1187,6 +1194,23 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
|
||||
// Must run inside a seastar thread
|
||||
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
|
||||
@@ -1212,18 +1236,7 @@ private:
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
_repair_writer.create_writer(node_idx);
|
||||
for (auto& r : row_diff) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1234,15 +1247,7 @@ private:
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
_repair_writer.create_writer(node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
});
|
||||
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1925,22 +1930,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
current_set_diff,
|
||||
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1966,22 +1966,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
|
||||
current_rows,
|
||||
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_stream_cmd::error).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_stream_cmd::error).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2006,22 +2001,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
|
||||
error,
|
||||
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
|
||||
error = true;
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
|
||||
return sink.close();
|
||||
}).then([sink] {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
if (error) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return sink.close().then([sink] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}).finally([sink] () mutable {
|
||||
return sink.close().finally([sink] { });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
20
row_cache.cc
20
row_cache.cc
@@ -528,8 +528,12 @@ public:
|
||||
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
|
||||
{
|
||||
if (!mfopt) {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
return _cache._read_section(_cache._tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
|
||||
});
|
||||
});
|
||||
}
|
||||
_cache.on_partition_miss();
|
||||
const partition_start& ps = mfopt->as_partition_start();
|
||||
@@ -952,13 +956,15 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
// expensive and we need to amortize it somehow.
|
||||
do {
|
||||
STAP_PROBE(scylla, row_cache_update_partition_start);
|
||||
with_linearized_managed_bytes([&] {
|
||||
{
|
||||
if (!update) {
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
memtable_entry& mem_e = *m.partitions.begin();
|
||||
size_entry = mem_e.size_in_allocator_without_rows(_tracker.allocator());
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
|
||||
update = updater(_update_section, cache_i, mem_e, is_present, real_dirty_acc);
|
||||
});
|
||||
});
|
||||
}
|
||||
// We use cooperative deferring instead of futures so that
|
||||
@@ -970,14 +976,16 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
update = {};
|
||||
real_dirty_acc.unpin_memory(size_entry);
|
||||
_update_section(_tracker.region(), [&] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto i = m.partitions.begin();
|
||||
memtable_entry& mem_e = *i;
|
||||
m.partitions.erase(i);
|
||||
mem_e.partition().evict(_tracker.memtable_cleaner());
|
||||
current_allocator().destroy(&mem_e);
|
||||
});
|
||||
});
|
||||
++partition_count;
|
||||
});
|
||||
}
|
||||
STAP_PROBE(scylla, row_cache_update_partition_end);
|
||||
} while (!m.partitions.empty() && !need_preempt());
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
@@ -1124,8 +1132,8 @@ future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&
|
||||
seastar::thread::maybe_yield();
|
||||
|
||||
while (true) {
|
||||
auto done = with_linearized_managed_bytes([&] {
|
||||
return _update_section(_tracker.region(), [&] {
|
||||
auto done = _update_section(_tracker.region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
auto it = _partitions.lower_bound(*_prev_snapshot_pos, cmp);
|
||||
auto end = _partitions.lower_bound(dht::ring_position_view::for_range_end(range), cmp);
|
||||
|
||||
@@ -45,7 +45,7 @@ if [[ "$ID" = "debian" && "$VERSION_ID" = "8" ]] || [[ "$ID" = "ubuntu" && "$VER
|
||||
echo "scylla ALL=(ALL) NOPASSWD: /opt/scylladb/scripts/scylla_prepare,/opt/scylladb/scripts/scylla_stop,/opt/scylladb/scripts/scylla_io_setup,/opt/scylladb/scripts/scylla-ami/scylla_ami_setup" > /etc/sudoers.d/scylla
|
||||
else
|
||||
# AmbientCapabilities supported from v229 but it backported to v219-33 on RHEL7
|
||||
if [ $SYSTEMD_VER -ge 229 ] || [ $SYSTEMD_VER -eq 219 ] && [ $SYSTEMD_REL -ge 33 ]; then
|
||||
if [ $SYSTEMD_VER -ge 229 ] || [[ $SYSTEMD_VER -eq 219 && $SYSTEMD_REL -ge 33 ]]; then
|
||||
if [ $AMB_SUPPORT -eq 1 ]; then
|
||||
mkdir -p /etc/systemd/system/scylla-server.service.d/
|
||||
cat << EOS > /etc/systemd/system/scylla-server.service.d/capabilities.conf
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 0ebd89a858...30f03aeba9
@@ -3532,7 +3532,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
|
||||
// reordering of endpoints happens. The local endpoint, if
|
||||
// present, is always first in the list, as get_live_sorted_endpoints()
|
||||
// orders the list by proximity to the local endpoint.
|
||||
is_read_non_local |= all_replicas.front() != utils::fb_utilities::get_broadcast_address();
|
||||
is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address();
|
||||
|
||||
auto cf = _db.local().find_column_family(schema).shared_from_this();
|
||||
std::vector<gms::inet_address> target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
|
||||
|
||||
@@ -903,12 +903,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
storage_service::get_range_to_address_map(const sstring& keyspace,
|
||||
const std::vector<token>& sorted_tokens) const {
|
||||
sstring ks = keyspace;
|
||||
// some people just want to get a visual representation of things. Allow null and set it to the first
|
||||
// non-system keyspace.
|
||||
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
if (keyspace == "") {
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
if (keyspaces.empty()) {
|
||||
throw std::runtime_error("No keyspace provided and no non system kespace exist");
|
||||
}
|
||||
ks = keyspaces[0];
|
||||
}
|
||||
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
|
||||
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
|
||||
@@ -85,7 +85,7 @@ private:
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
uint32_t _promoted_index_end;
|
||||
uint64_t _promoted_index_end;
|
||||
uint64_t _position;
|
||||
uint64_t _partition_header_length = 0;
|
||||
std::optional<deletion_time> _deletion_time;
|
||||
|
||||
@@ -31,6 +31,44 @@
|
||||
#include "cql3/statements/raw/parsed_statement.hh"
|
||||
#include "cql3/util.hh"
|
||||
|
||||
//
|
||||
// Test basic CQL string quoting
|
||||
//
|
||||
BOOST_AUTO_TEST_CASE(maybe_quote) {
|
||||
std::string s(65536, 'x');
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(s), s);
|
||||
s += " " + std::string(65536, 'y');
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(s), "\"" + s + "\"");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("a"), "a");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("z"), "z");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("b0"), "b0");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("y9"), "y9");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("c_d"), "c_d");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("x8_"), "x8_");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(""), "\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("0"), "\"0\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("9"), "\"9\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("_"), "\"_\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("A"), "\"A\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("To"), "\"To\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("zeD"), "\"zeD\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world"), "\"hello world\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello_world01234"), "hello_world01234");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world01234"), "\"hello world01234\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world\"1234"), "\"hello world\"\"1234\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello_world01234hello_world01234"), "hello_world01234hello_world01234");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world01234hello_world01234"), "\"hello world01234hello_world01234\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world\"1234hello_world\"1234"), "\"hello world\"\"1234hello_world\"\"1234\"");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\""), "\"\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("[\"]"), "\"[\"\"]\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"\""), "\"\"\"\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"hell0\""), "\"\"\"hell0\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello \"my\" world"), "\"hello \"\"my\"\" world\"");
|
||||
}
|
||||
|
||||
//
|
||||
// These tests verify that all excepted variations of CQL syntax related to access-control ("auth") functionality are
|
||||
// accepted by the parser. They do not verify that invalid syntax is rejected, nor do they verify the correctness of
|
||||
|
||||
@@ -200,5 +200,16 @@ BOOST_AUTO_TEST_CASE(inet_address) {
|
||||
auto res = ser::deserialize_from_buffer(buf, boost::type<gms::inet_address>{});
|
||||
BOOST_CHECK_EQUAL(res, ip);
|
||||
}
|
||||
|
||||
// stringify tests
|
||||
{
|
||||
for (sstring s : { "2001:6b0:8:2::232", "2a05:d018:223:f00:97af:f4d9:eac2:6a0f", "fe80::8898:3e04:215b:2cd6" }) {
|
||||
gms::inet_address ip(s);
|
||||
BOOST_CHECK(ip.addr().is_ipv6());
|
||||
auto s2 = boost::lexical_cast<std::string>(ip);
|
||||
gms::inet_address ip2(s);
|
||||
BOOST_CHECK_EQUAL(ip2, ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM fedora:31
|
||||
FROM docker.io/fedora:31
|
||||
ADD ./install-dependencies.sh ./
|
||||
ADD ./seastar/install-dependencies.sh ./seastar/
|
||||
ADD ./tools/toolchain/system-auth ./
|
||||
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-31-20200115
|
||||
docker.io/scylladb/scylla-toolchain:fedora-31-branch-3.3-20200615
|
||||
@@ -254,6 +254,14 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
|
||||
--_connections;
|
||||
return unadvertise_connection(conn);
|
||||
}).handle_exception([] (std::exception_ptr ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch(std::system_error& serr) {
|
||||
if (serr.code().category() == std::system_category() &&
|
||||
serr.code().value() == EPIPE) { // expected if another side closes a connection
|
||||
return;
|
||||
}
|
||||
} catch(...) {};
|
||||
clogger.info("exception while processing connection: {}", ep);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user