Compare commits
63 Commits
copilot/fi
...
scylla-5.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d70751fee3 | ||
|
|
1fba43c317 | ||
|
|
e380c24c69 | ||
|
|
76a76a95f4 | ||
|
|
f6837afec7 | ||
|
|
6350c8836d | ||
|
|
5457948437 | ||
|
|
da41001b5c | ||
|
|
dd61e8634c | ||
|
|
b642b4c30e | ||
|
|
c013336121 | ||
|
|
b6b35ce061 | ||
|
|
069e38f02d | ||
|
|
61a8003ad1 | ||
|
|
8a17066961 | ||
|
|
487ba9f3e1 | ||
|
|
bd4f9e3615 | ||
|
|
c68deb2461 | ||
|
|
dd96d3017a | ||
|
|
6ca80ee118 | ||
|
|
eee8f750cc | ||
|
|
8d5206e6c6 | ||
|
|
cfa40402f4 | ||
|
|
2d170e51cf | ||
|
|
860e79e4b1 | ||
|
|
908a82bea0 | ||
|
|
39158f55d0 | ||
|
|
22c1685b3d | ||
|
|
9ba6fc73f1 | ||
|
|
f2e2c0127a | ||
|
|
363ea87f51 | ||
|
|
c49fd6f176 | ||
|
|
3114589a30 | ||
|
|
34f68a4c0f | ||
|
|
b336e11f59 | ||
|
|
9ef73d7e36 | ||
|
|
8700a72b4c | ||
|
|
886dd3e1d2 | ||
|
|
f565f3de06 | ||
|
|
76ff6d981c | ||
|
|
f924f59055 | ||
|
|
d5cef05810 | ||
|
|
e0f4e99e9b | ||
|
|
6795715011 | ||
|
|
aa9e91c376 | ||
|
|
ddfb9ebab2 | ||
|
|
d58a3e4d16 | ||
|
|
2ebac52d2d | ||
|
|
b536614913 | ||
|
|
85df0fd2b1 | ||
|
|
cdf9fe7023 | ||
|
|
8ff4717fd0 | ||
|
|
291b1f6e7f | ||
|
|
b2699743cc | ||
|
|
50ae73a4bd | ||
|
|
c3dd4a2b87 | ||
|
|
0f9fe61d91 | ||
|
|
59d30ff241 | ||
|
|
fb82dff89e | ||
|
|
b588b19620 | ||
|
|
608ef92a71 | ||
|
|
d2732b2663 | ||
|
|
34ab98e1be |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -72,7 +72,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.2.0-dev
|
||||
VERSION=5.2.0-rc4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -145,19 +145,24 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
auto table = find_table(_proxy, request);
|
||||
auto db = _proxy.data_dictionary();
|
||||
auto cfs = db.get_tables();
|
||||
auto i = cfs.begin();
|
||||
auto e = cfs.end();
|
||||
|
||||
if (limit < 1) {
|
||||
throw api_error::validation("Limit must be 1 or more");
|
||||
}
|
||||
|
||||
// TODO: the unordered_map here is not really well suited for partial
|
||||
// querying - we're sorting on local hash order, and creating a table
|
||||
// between queries may or may not miss info. But that should be rare,
|
||||
// and we can probably expect this to be a single call.
|
||||
// # 12601 (maybe?) - sort the set of tables on ID. This should ensure we never
|
||||
// generate duplicates in a paged listing here. Can obviously miss things if they
|
||||
// are added between paged calls and end up with a "smaller" UUID/ARN, but that
|
||||
// is to be expected.
|
||||
std::sort(cfs.begin(), cfs.end(), [](const data_dictionary::table& t1, const data_dictionary::table& t2) {
|
||||
return t1.schema()->id().uuid() < t2.schema()->id().uuid();
|
||||
});
|
||||
|
||||
auto i = cfs.begin();
|
||||
auto e = cfs.end();
|
||||
|
||||
if (streams_start) {
|
||||
i = std::find_if(i, e, [&](data_dictionary::table t) {
|
||||
i = std::find_if(i, e, [&](const data_dictionary::table& t) {
|
||||
return t.schema()->id().uuid() == streams_start
|
||||
&& cdc::get_base_table(db.real_database(), *t.schema())
|
||||
&& is_alternator_keyspace(t.schema()->ks_name())
|
||||
|
||||
@@ -409,7 +409,9 @@ public:
|
||||
l0_old_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
|
||||
if (l0_old_ssts.size() || l0_new_ssts.size()) {
|
||||
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -553,4 +553,16 @@ murmur3_partitioner_ignore_msb_bits: 12
|
||||
# WARNING: It's unsafe to set this to false if the node previously booted
|
||||
# with the schema commit log enabled. In such case, some schema changes
|
||||
# may be lost if the node was not cleanly stopped.
|
||||
force_schema_commit_log: true
|
||||
force_schema_commit_log: true
|
||||
|
||||
# Use Raft to consistently manage schema information in the cluster.
|
||||
# Refer to https://docs.scylladb.com/master/architecture/raft.html for more details.
|
||||
# The 'Handling Failures' section is especially important.
|
||||
#
|
||||
# Once enabled in a cluster, this cannot be turned off.
|
||||
# If you want to bootstrap a new cluster without Raft, make sure to set this to `false`
|
||||
# before starting your nodes for the first time.
|
||||
#
|
||||
# A cluster not using Raft can be 'upgraded' to use Raft. Refer to the aforementioned
|
||||
# documentation, section 'Enabling Raft in ScyllaDB 5.2 and further', for the procedure.
|
||||
consistent_cluster_management: true
|
||||
|
||||
@@ -1416,7 +1416,7 @@ expression search_and_replace(const expression& e,
|
||||
};
|
||||
},
|
||||
[&] (const binary_operator& oper) -> expression {
|
||||
return binary_operator(recurse(oper.lhs), oper.op, recurse(oper.rhs));
|
||||
return binary_operator(recurse(oper.lhs), oper.op, recurse(oper.rhs), oper.order);
|
||||
},
|
||||
[&] (const column_mutation_attribute& cma) -> expression {
|
||||
return column_mutation_attribute{cma.kind, recurse(cma.column)};
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "hashers.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -600,6 +601,14 @@ query_processor::get_statement(const sstring_view& query, const service::client_
|
||||
std::unique_ptr<raw::parsed_statement>
|
||||
query_processor::parse_statement(const sstring_view& query) {
|
||||
try {
|
||||
{
|
||||
const char* error_injection_key = "query_processor-parse_statement-test_failure";
|
||||
utils::get_local_injector().inject(error_injection_key, [&]() {
|
||||
if (query.find(error_injection_key) != sstring_view::npos) {
|
||||
throw std::runtime_error(error_injection_key);
|
||||
}
|
||||
});
|
||||
}
|
||||
auto statement = util::do_with_parser(query, std::mem_fn(&cql3_parser::CqlParser::query));
|
||||
if (!statement) {
|
||||
throw exceptions::syntax_exception("Parsing failed");
|
||||
|
||||
@@ -80,7 +80,7 @@ public:
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
auto&& name = _type->field_name(_field);
|
||||
auto sname = sstring(reinterpret_cast<const char*>(name.begin(), name.size()));
|
||||
auto sname = std::string_view(reinterpret_cast<const char*>(name.data()), name.size());
|
||||
return format("{}.{}", _selected, sname);
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ public:
|
||||
}
|
||||
|
||||
_end_of_stream = false;
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
return _underlying->fast_forward_to(std::move(pr));
|
||||
}
|
||||
|
||||
|
||||
@@ -2116,6 +2116,9 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
clogger.debug("Discarding segments {}", ftd);
|
||||
|
||||
for (auto& [f, mode] : ftd) {
|
||||
// `f.remove_file()` resets known_size to 0, so remember the size here,
|
||||
// in order to subtract it from total_size_on_disk accurately.
|
||||
size_t size = f.known_size();
|
||||
try {
|
||||
if (f) {
|
||||
co_await f.close();
|
||||
@@ -2132,7 +2135,6 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
}
|
||||
}
|
||||
|
||||
auto size = f.known_size();
|
||||
auto usage = totals.total_size_on_disk;
|
||||
auto next_usage = usage - size;
|
||||
|
||||
@@ -2165,7 +2167,7 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
// or had such an exception that we consider the file dead
|
||||
// anyway. In either case we _remove_ the file size from
|
||||
// footprint, because it is no longer our problem.
|
||||
totals.total_size_on_disk -= f.known_size();
|
||||
totals.total_size_on_disk -= size;
|
||||
}
|
||||
|
||||
// #8376 - if we had an error in recycling (disk rename?), and no elements
|
||||
|
||||
@@ -401,6 +401,10 @@ public:
|
||||
named_value<uint64_t> wasm_udf_yield_fuel;
|
||||
named_value<uint64_t> wasm_udf_total_fuel;
|
||||
named_value<size_t> wasm_udf_memory_limit;
|
||||
// wasm_udf_reserved_memory is static because the options in db::config
|
||||
// are parsed using seastar::app_template, while this option is used for
|
||||
// configuring the Seastar memory subsystem.
|
||||
static constexpr size_t wasm_udf_reserved_memory = 50 * 1024 * 1024;
|
||||
|
||||
seastar::logging_settings logging_settings(const log_cli::options&) const;
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
#include <seastar/core/print.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
@@ -55,11 +56,11 @@ void large_data_handler::start() {
|
||||
}
|
||||
|
||||
future<> large_data_handler::stop() {
|
||||
if (!running()) {
|
||||
return make_ready_future<>();
|
||||
if (running()) {
|
||||
_running = false;
|
||||
large_data_logger.info("Waiting for {} background handlers", max_concurrency - _sem.available_units());
|
||||
co_await _sem.wait(max_concurrency);
|
||||
}
|
||||
_running = false;
|
||||
return _sem.wait(max_concurrency);
|
||||
}
|
||||
|
||||
void large_data_handler::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
|
||||
|
||||
@@ -295,7 +295,7 @@ future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_ra
|
||||
}
|
||||
|
||||
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr) {
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
if (_partition_reader) {
|
||||
return _partition_reader->fast_forward_to(std::move(pr));
|
||||
|
||||
@@ -2276,7 +2276,10 @@ public:
|
||||
add_partition(mutation_sink, "trace_probability", format("{:.2}", tracing::tracing::get_local_tracing_instance().get_trace_probability()));
|
||||
co_await add_partition(mutation_sink, "memory", [this] () {
|
||||
struct stats {
|
||||
uint64_t total = 0;
|
||||
// take the pre-reserved memory into account, as seastar only returns
|
||||
// the stats of memory managed by the seastar allocator, but we instruct
|
||||
// it to reserve addition memory for system.
|
||||
uint64_t total = db::config::wasm_udf_reserved_memory;
|
||||
uint64_t free = 0;
|
||||
static stats reduce(stats a, stats b) { return stats{a.total + b.total, a.free + b.free}; }
|
||||
};
|
||||
|
||||
@@ -172,7 +172,7 @@ class build_progress_virtual_reader {
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(position_range range) override {
|
||||
forward_buffer_to(range.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _underlying.fast_forward_to(std::move(range));
|
||||
}
|
||||
|
||||
@@ -85,29 +85,25 @@ future<row_locker::lock_holder>
|
||||
row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) {
|
||||
mylog.debug("taking shared lock on partition {}, and {} lock on row {} in it", pk, (exclusive ? "exclusive" : "shared"), cpk);
|
||||
auto tracker = latency_stats_tracker(exclusive ? stats.exclusive_row : stats.shared_row);
|
||||
auto ck = cpk;
|
||||
// Create a two-level lock entry for the partition if it doesn't exist already.
|
||||
auto i = _two_level_locks.try_emplace(pk, this).first;
|
||||
// The two-level lock entry we've just created is guaranteed to be kept alive as long as it's locked.
|
||||
// Initiating read locking in the background below ensures that even if the two-level lock is currently
|
||||
// write-locked, releasing the write-lock will synchronously engage any waiting
|
||||
// locks and will keep the entry alive.
|
||||
future<lock_type::holder> lock_partition = i->second._partition_lock.hold_read_lock(timeout);
|
||||
auto j = i->second._row_locks.find(cpk);
|
||||
if (j == i->second._row_locks.end()) {
|
||||
// Not yet locked, need to create the lock. This makes a copy of cpk.
|
||||
try {
|
||||
j = i->second._row_locks.emplace(cpk, lock_type()).first;
|
||||
} catch(...) {
|
||||
// If this emplace() failed, e.g., out of memory, we fail. We
|
||||
// could do nothing - the partition lock we already started
|
||||
// taking will be unlocked automatically after being locked.
|
||||
// But it's better form to wait for the work we started, and it
|
||||
// will also allow us to remove the hash-table row we added.
|
||||
return lock_partition.then([ex = std::current_exception()] (auto lock) {
|
||||
// The lock is automatically released when "lock" goes out of scope.
|
||||
// TODO: unlock (lock = {}) now, search for the partition in the
|
||||
// hash table (we know it's still there, because we held the lock until
|
||||
// now) and remove the unused lock from the hash table if still unused.
|
||||
return make_exception_future<row_locker::lock_holder>(std::current_exception());
|
||||
});
|
||||
return lock_partition.then([this, pk = &i->first, row_locks = &i->second._row_locks, ck = std::move(ck), exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable {
|
||||
auto j = row_locks->find(ck);
|
||||
if (j == row_locks->end()) {
|
||||
// Not yet locked, need to create the lock.
|
||||
j = row_locks->emplace(std::move(ck), lock_type()).first;
|
||||
}
|
||||
}
|
||||
return lock_partition.then([this, pk = &i->first, cpk = &j->first, &row_lock = j->second, exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable {
|
||||
auto* cpk = &j->first;
|
||||
auto& row_lock = j->second;
|
||||
// Like to the two-level lock entry above, the row_lock entry we've just created
|
||||
// is guaranteed to be kept alive as long as it's locked.
|
||||
// Initiating read/write locking in the background below ensures that.
|
||||
auto lock_row = exclusive ? row_lock.hold_write_lock(timeout) : row_lock.hold_read_lock(timeout);
|
||||
return lock_row.then([this, pk, cpk, exclusive, tracker = std::move(tracker), lock1 = std::move(lock1)] (auto lock2) mutable {
|
||||
lock1.release();
|
||||
|
||||
@@ -2523,24 +2523,28 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac
|
||||
return std::max(backlog, _max.load(std::memory_order_relaxed));
|
||||
}
|
||||
|
||||
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name) {
|
||||
return sys_dist_ks.view_status(ks_name, cf_name).then([] (std::unordered_map<locator::host_id, sstring>&& view_statuses) {
|
||||
return boost::algorithm::any_of(view_statuses | boost::adaptors::map_values, [] (const sstring& view_status) {
|
||||
return view_status == "STARTED";
|
||||
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name,
|
||||
const sstring& cf_name) {
|
||||
using view_statuses_type = std::unordered_map<locator::host_id, sstring>;
|
||||
return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
|
||||
return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) {
|
||||
// Only consider status of known hosts.
|
||||
return view_status.second == "STARTED" && tm.get_endpoint_for_host_id(view_status.first);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason) {
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
|
||||
streaming::stream_reason reason) {
|
||||
if (is_internal_keyspace(t.schema()->ks_name())) {
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
|
||||
return make_ready_future<bool>(true);
|
||||
}
|
||||
return do_with(t.views(), [&sys_dist_ks] (auto& views) {
|
||||
return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) {
|
||||
return map_reduce(views,
|
||||
[&sys_dist_ks] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, view->ks_name(), view->cf_name()); },
|
||||
[&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); },
|
||||
false,
|
||||
std::logical_or<bool>());
|
||||
});
|
||||
|
||||
@@ -22,9 +22,13 @@ class system_distributed_keyspace;
|
||||
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
class token_metadata;
|
||||
}
|
||||
|
||||
namespace db::view {
|
||||
|
||||
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name);
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason);
|
||||
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
|
||||
streaming::stream_reason reason);
|
||||
|
||||
}
|
||||
|
||||
3
dist/common/scripts/scylla_coredump_setup
vendored
3
dist/common/scripts/scylla_coredump_setup
vendored
@@ -42,7 +42,8 @@ if __name__ == '__main__':
|
||||
if systemd_unit.available('systemd-coredump@.service'):
|
||||
dropin = '''
|
||||
[Service]
|
||||
TimeoutStartSec=infinity
|
||||
RuntimeMaxSec=infinity
|
||||
TimeoutSec=infinity
|
||||
'''[1:-1]
|
||||
os.makedirs('/etc/systemd/system/systemd-coredump@.service.d', exist_ok=True)
|
||||
with open('/etc/systemd/system/systemd-coredump@.service.d/timeout.conf', 'w') as f:
|
||||
|
||||
@@ -1112,14 +1112,14 @@ tls-ssl/index.html: /stable/operating-scylla/security
|
||||
/using-scylla/integrations/integration_kairos/index.html: /stable/using-scylla/integrations/integration-kairos
|
||||
/upgrade/ami_upgrade/index.html: /stable/upgrade/ami-upgrade
|
||||
|
||||
/scylla-cloud/cloud-setup/gcp-vpc-peering/index.html: /stable/scylla-cloud/cloud-setup/GCP/gcp-vpc-peering
|
||||
/scylla-cloud/cloud-setup/GCP/gcp-vcp-peering/index.html: /stable/scylla-cloud/cloud-setup/GCP/gcp-vpc-peering
|
||||
/scylla-cloud/cloud-setup/gcp-vpc-peering/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/gcp-vpc-peering.html
|
||||
/scylla-cloud/cloud-setup/GCP/gcp-vcp-peering/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/gcp-vpc-peering.html
|
||||
|
||||
# move scylla cloud for AWS to dedicated directory
|
||||
/scylla-cloud/cloud-setup/aws-vpc-peering/index.html: /stable/scylla-cloud/cloud-setup/AWS/aws-vpc-peering
|
||||
/scylla-cloud/cloud-setup/cloud-prom-proxy/index.html: /stable/scylla-cloud/cloud-setup/AWS/cloud-prom-proxy
|
||||
/scylla-cloud/cloud-setup/outposts/index.html: /stable/scylla-cloud/cloud-setup/AWS/outposts
|
||||
/scylla-cloud/cloud-setup/scylla-cloud-byoa/index.html: /stable/scylla-cloud/cloud-setup/AWS/scylla-cloud-byoa
|
||||
/scylla-cloud/cloud-setup/aws-vpc-peering/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/aws-vpc-peering.html
|
||||
/scylla-cloud/cloud-setup/cloud-prom-proxy/index.html: https://cloud.docs.scylladb.com/stable/monitoring/cloud-prom-proxy.html
|
||||
/scylla-cloud/cloud-setup/outposts/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/outposts.html
|
||||
/scylla-cloud/cloud-setup/scylla-cloud-byoa/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/scylla-cloud-byoa.html
|
||||
/scylla-cloud/cloud-services/scylla_cloud_costs/index.html: /stable/scylla-cloud/cloud-services/scylla-cloud-costs
|
||||
/scylla-cloud/cloud-services/scylla_cloud_managin_versions/index.html: /stable/scylla-cloud/cloud-services/scylla-cloud-managin-versions
|
||||
/scylla-cloud/cloud-services/scylla_cloud_support_alerts_sla/index.html: /stable/scylla-cloud/cloud-services/scylla-cloud-support-alerts-sla
|
||||
|
||||
@@ -161,6 +161,10 @@ events appear in the Streams API as normal deletions - without the
|
||||
distinctive marker on deletions which are really expirations.
|
||||
See <https://github.com/scylladb/scylla/issues/5060>.
|
||||
|
||||
<!--- REMOVE IN FUTURE VERSIONS - Remove the note below in version 5.3/2023.1 -->
|
||||
|
||||
> **Note** This feature is experimental in versions earlier than ScyllaDB Open Source 5.2 and ScyllaDB Enterprise 2022.2.
|
||||
|
||||
---
|
||||
|
||||
|
||||
|
||||
@@ -40,19 +40,19 @@ Enabling Raft
|
||||
Enabling Raft in ScyllaDB 5.0 and 5.1
|
||||
=====================================
|
||||
|
||||
.. warning::
|
||||
.. note::
|
||||
In ScyllaDB 5.0 and 5.1, Raft is an experimental feature.
|
||||
|
||||
It is not possible to enable Raft in an existing cluster in ScyllaDB 5.0 and 5.1.
|
||||
In order to have a Raft-enabled cluster in these versions, you must create a new cluster with Raft enabled from the start.
|
||||
|
||||
.. warning::
|
||||
.. note::
|
||||
|
||||
**Do not** use Raft in production clusters in ScyllaDB 5.0 and 5.1. Such clusters won't be able to correctly upgrade to ScyllaDB 5.2.
|
||||
|
||||
Use Raft only for testing and experimentation in clusters which can be thrown away.
|
||||
|
||||
.. warning::
|
||||
.. note::
|
||||
Once enabled, Raft cannot be disabled on your cluster. The cluster nodes will fail to restart if you remove the Raft feature.
|
||||
|
||||
When creating a new cluster, add ``raft`` to the list of experimental features in your ``scylla.yaml`` file:
|
||||
@@ -276,12 +276,8 @@ Examples
|
||||
- Schema updates are possible and safe.
|
||||
- Try restarting the node. If the node is dead, :doc:`replace it with a new node </operating-scylla/procedures/cluster-management/replace-dead-node/>`.
|
||||
* - 2 nodes
|
||||
- Cluster is not fully operational. The data is available for reads and writes, but schema changes are impossible.
|
||||
- Data is available for reads and writes, schema changes are impossible.
|
||||
- Restart at least 1 of the 2 nodes that are down to regain quorum. If you can’t recover at least 1 of the 2 nodes, consult the :ref:`manual Raft recovery section <recover-raft-procedure>`.
|
||||
* - 1 datacenter
|
||||
- Cluster is not fully operational. The data is available for reads and writes, but schema changes are impossible.
|
||||
- When the DC comes back online, restart the nodes. If the DC does not come back online and nodes are lost, consult the :ref:`manual Raft recovery section <recover-raft-procedure>`.
|
||||
|
||||
|
||||
.. list-table:: Cluster B: 2 datacenters, 6 nodes (3 nodes per DC)
|
||||
:widths: 20 40 40
|
||||
@@ -294,10 +290,10 @@ Examples
|
||||
- Schema updates are possible and safe.
|
||||
- Try restarting the node(s). If the node is dead, :doc:`replace it with a new node </operating-scylla/procedures/cluster-management/replace-dead-node/>`.
|
||||
* - 3 nodes
|
||||
- Cluster is not fully operational. The data is available for reads and writes, but schema changes are impossible.
|
||||
- Data is available for reads and writes, schema changes are impossible.
|
||||
- Restart 1 of the 3 nodes that are down to regain quorum. If you can’t recover at least 1 of the 3 failed nodes, consult the :ref:`manual Raft recovery section <recover-raft-procedure>`.
|
||||
* - 1DC
|
||||
- Cluster is not fully operational. The data is available for reads and writes, but schema changes are impossible.
|
||||
- Data is available for reads and writes, schema changes are impossible.
|
||||
- When the DCs come back online, restart the nodes. If the DC fails to come back online and the nodes are lost, consult the :ref:`manual Raft recovery section <recover-raft-procedure>`.
|
||||
|
||||
|
||||
@@ -315,7 +311,7 @@ Examples
|
||||
- Schema updates are possible and safe.
|
||||
- When the DC comes back online, try restarting the nodes in the cluster. If the nodes are dead, :doc:`add 3 new nodes in a new region </operating-scylla/procedures/cluster-management/add-dc-to-existing-dc/>`.
|
||||
* - 2 DCs
|
||||
- Cluster is not fully operational. The data is available for reads and writes, but schema changes are impossible.
|
||||
- Data is available for reads and writes, schema changes are impossible.
|
||||
- When the DCs come back online, restart the nodes. If at least one DC fails to come back online and the nodes are lost, consult the :ref:`manual Raft recovery section <recover-raft-procedure>`.
|
||||
|
||||
.. _recover-raft-procedure:
|
||||
@@ -334,7 +330,7 @@ The manual Raft recovery procedure applies to the following situations:
|
||||
|
||||
Perform the manual recovery procedure **only** if you're dealing with **irrecoverable** nodes. If it is possible to restart your nodes, do that instead of manual recovery.
|
||||
|
||||
.. warning::
|
||||
.. note::
|
||||
|
||||
Before proceeding, make sure that the irrecoverable nodes are truly dead, and not, for example, temporarily partitioned away due to a network failure. If it is possible for the 'dead' nodes to come back to life, they might communicate and interfere with the recovery procedure and cause unpredictable problems.
|
||||
|
||||
@@ -342,7 +338,7 @@ The manual Raft recovery procedure applies to the following situations:
|
||||
|
||||
During the manual recovery procedure you'll enter a special ``RECOVERY`` mode, remove all faulty nodes (using the standard :doc:`node removal procedure </operating-scylla/procedures/cluster-management/remove-node/>`), delete the internal Raft data, and restart the cluster. This will cause the cluster to perform the internal Raft upgrade procedure again, initializing the Raft algorithm from scratch. The manual recovery procedure is applicable both to clusters which were not running Raft in the past and then had Raft enabled, and to clusters which were bootstrapped using Raft.
|
||||
|
||||
.. warning::
|
||||
.. note::
|
||||
|
||||
Entering ``RECOVERY`` mode requires a node restart. Restarting an additional node while some nodes are already dead may lead to unavailability of data queries (assuming that you haven't lost it already). For example, if you're using the standard RF=3, CL=QUORUM setup, and you're recovering from a stuck of upgrade procedure because one of your nodes is dead, restarting another node will cause temporary data query unavailability (until the node finishes restarting). Prepare your service for downtime before proceeding.
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ Getting Started
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/>`_ - Links to the ScyllaDB Download Center
|
||||
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/#core>`_ - Links to the ScyllaDB Download Center
|
||||
|
||||
* :doc:`Configure ScyllaDB </getting-started/system-configuration/>`
|
||||
* :doc:`Run ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
|
||||
|
||||
@@ -20,7 +20,7 @@ Install ScyllaDB
|
||||
|
||||
Keep your versions up-to-date. The two latest versions are supported. Also always install the latest patches for your version.
|
||||
|
||||
* Download and install ScyllaDB Server, Drivers and Tools in `Scylla Download Center <https://www.scylladb.com/download/#server/>`_
|
||||
* Download and install ScyllaDB Server, Drivers and Tools in `ScyllaDB Download Center <https://www.scylladb.com/download/#core>`_
|
||||
* :doc:`ScyllaDB Web Installer for Linux <scylla-web-installer>`
|
||||
* :doc:`ScyllaDB Unified Installer (relocatable executable) <unified-installer>`
|
||||
* :doc:`Air-gapped Server Installation <air-gapped-install>`
|
||||
|
||||
@@ -4,7 +4,7 @@ ScyllaDB Web Installer for Linux
|
||||
|
||||
ScyllaDB Web Installer is a platform-agnostic installation script you can run with ``curl`` to install ScyllaDB on Linux.
|
||||
|
||||
See `ScyllaDB Download Center <https://www.scylladb.com/download/#server>`_ for information on manually installing ScyllaDB with platform-specific installation packages.
|
||||
See `ScyllaDB Download Center <https://www.scylladb.com/download/#core>`_ for information on manually installing ScyllaDB with platform-specific installation packages.
|
||||
|
||||
Prerequisites
|
||||
--------------
|
||||
|
||||
@@ -25,11 +25,7 @@ ScyllaDB Open Source
|
||||
|
||||
.. note::
|
||||
|
||||
Recommended OS and ScyllaDB AMI/Image OS for ScyllaDB Open Source:
|
||||
|
||||
- Ubuntu 20.04 for versions 4.6 and later.
|
||||
- CentOS 7 for versions earlier than 4.6.
|
||||
|
||||
The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
|
||||
+----------------------------+----------------------------------+-----------------------------+---------+-------+
|
||||
| Linux Distributions | Ubuntu | Debian | CentOS /| Rocky/|
|
||||
@@ -37,6 +33,8 @@ ScyllaDB Open Source
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| ScyllaDB Version / Version | 14.04| 16.04| 18.04|20.04 |22.04 | 8 | 9 | 10 | 11 | 7 | 8 |
|
||||
+============================+======+======+======+======+======+======+======+=======+=======+=========+=======+
|
||||
| 5.2 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 5.1 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
| 5.0 | |x| | |x| | |v| | |v| | |v| | |x| | |x| | |v| | |v| | |v| | |v| |
|
||||
@@ -63,17 +61,18 @@ ScyllaDB Open Source
|
||||
+----------------------------+------+------+------+------+------+------+------+-------+-------+---------+-------+
|
||||
|
||||
|
||||
All releases are available as a Docker container, EC2 AMI, and a GCP image (GCP image from version 4.3).
|
||||
All releases are available as a Docker container, EC2 AMI, and a GCP image (GCP image from version 4.3). Since
|
||||
version 5.2, the ScyllaDB AMI/Image OS for ScyllaDB Open Source is based on Ubuntu 22.04.
|
||||
|
||||
|
||||
|
||||
ScyllaDB Enterprise
|
||||
--------------------
|
||||
|
||||
.. note::
|
||||
Recommended OS and ScyllaDB AMI/Image OS for ScyllaDB Enterprise:
|
||||
|
||||
- Ubuntu 20.04 for versions 2021.1 and later.
|
||||
- CentOS 7 for versions earlier than 2021.1.
|
||||
The recommended OS for ScyllaDB Enterprise is Ubuntu 22.04.
|
||||
|
||||
|
||||
+----------------------------+-----------------------------------+---------------------------+--------+-------+
|
||||
| Linux Distributions | Ubuntu | Debian | CentOS/| Rocky/|
|
||||
@@ -95,4 +94,5 @@ ScyllaDB Enterprise
|
||||
+----------------------------+------+------+------+------+-------+------+------+------+------+--------+-------+
|
||||
|
||||
|
||||
All releases are available as a Docker container, EC2 AMI, and a GCP image (GCP image from version 2021.1).
|
||||
All releases are available as a Docker container, EC2 AMI, and a GCP image (GCP image from version 2021.1). Since
|
||||
version 2023.1, the ScyllaDB AMI/Image OS for ScyllaDB Enterprise is based on Ubuntu 22.04.
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
:image: /_static/img/mascots/scylla-docs.svg
|
||||
:search_box:
|
||||
|
||||
The most up-to-date documents for the fastest, best performing, high availability NoSQL database.
|
||||
New to ScyllaDB? Start `here <https://cloud.docs.scylladb.com/stable/scylladb-basics/>`_!
|
||||
|
||||
.. raw:: html
|
||||
|
||||
@@ -26,16 +26,7 @@
|
||||
<div class="grid-x grid-margin-x hs">
|
||||
|
||||
.. topic-box::
|
||||
:title: New to ScyllaDB? Start here!
|
||||
:link: https://cloud.docs.scylladb.com/stable/scylladb-basics/
|
||||
:class: large-4
|
||||
:anchor: ScyllaDB Basics
|
||||
|
||||
Learn the essentials of ScyllaDB.
|
||||
|
||||
|
||||
.. topic-box::
|
||||
:title: Let us manage your DB
|
||||
:title: ScyllaDB Cloud
|
||||
:link: https://cloud.docs.scylladb.com
|
||||
:class: large-4
|
||||
:anchor: ScyllaDB Cloud Documentation
|
||||
@@ -43,12 +34,20 @@
|
||||
Simplify application development with ScyllaDB Cloud - a fully managed database-as-a-service.
|
||||
|
||||
.. topic-box::
|
||||
:title: Manage your own DB
|
||||
:title: ScyllaDB Enterprise
|
||||
:link: https://enterprise.docs.scylladb.com
|
||||
:class: large-4
|
||||
:anchor: ScyllaDB Enterprise Documentation
|
||||
|
||||
Deploy and manage ScyllaDB's most stable enterprise-grade database with premium features and 24/7 support.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Open Source
|
||||
:link: getting-started
|
||||
:class: large-4
|
||||
:anchor: ScyllaDB Open Source and Enterprise Documentation
|
||||
:anchor: ScyllaDB Open Source Documentation
|
||||
|
||||
Deploy and manage your database in your own environment.
|
||||
Deploy and manage your database in your environment.
|
||||
|
||||
|
||||
.. raw:: html
|
||||
@@ -59,40 +58,16 @@
|
||||
|
||||
<div class="topics-grid topics-grid--products">
|
||||
|
||||
<h2 class="topics-grid__title">Our Products</h2>
|
||||
<h2 class="topics-grid__title">Other Products</h2>
|
||||
|
||||
<div class="grid-container full">
|
||||
<div class="grid-x grid-margin-x">
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Enterprise
|
||||
:link: getting-started
|
||||
:image: /_static/img/mascots/scylla-enterprise.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
|
||||
ScyllaDB’s most stable high-performance enterprise-grade NoSQL database.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Open Source
|
||||
:link: getting-started
|
||||
:image: /_static/img/mascots/scylla-opensource.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
|
||||
A high-performance NoSQL database with a close-to-the-hardware, shared-nothing approach.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Cloud
|
||||
:link: https://cloud.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-cloud.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
|
||||
A fully managed NoSQL database as a service powered by ScyllaDB Enterprise.
|
||||
|
||||
.. topic-box::
|
||||
:title: ScyllaDB Alternator
|
||||
:link: https://docs.scylladb.com/stable/alternator/alternator.html
|
||||
:image: /_static/img/mascots/scylla-alternator.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Open source Amazon DynamoDB-compatible API.
|
||||
|
||||
@@ -100,7 +75,7 @@
|
||||
:title: ScyllaDB Monitoring Stack
|
||||
:link: https://monitoring.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-monitor.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Complete open source monitoring solution for your ScyllaDB clusters.
|
||||
|
||||
@@ -108,7 +83,7 @@
|
||||
:title: ScyllaDB Manager
|
||||
:link: https://manager.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-manager.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Hassle-free ScyllaDB NoSQL database management for scale-out clusters.
|
||||
|
||||
@@ -116,7 +91,7 @@
|
||||
:title: ScyllaDB Drivers
|
||||
:link: https://docs.scylladb.com/stable/using-scylla/drivers/
|
||||
:image: /_static/img/mascots/scylla-drivers.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Shard-aware drivers for superior performance.
|
||||
|
||||
@@ -124,7 +99,7 @@
|
||||
:title: ScyllaDB Operator
|
||||
:link: https://operator.docs.scylladb.com
|
||||
:image: /_static/img/mascots/scylla-enterprise.svg
|
||||
:class: topic-box--product,large-3,small-6
|
||||
:class: topic-box--product,large-4,small-6
|
||||
|
||||
Easily run and manage your ScyllaDB cluster on Kubernetes.
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
* endpoint_snitch - ``grep endpoint_snitch /etc/scylla/scylla.yaml``
|
||||
* Scylla version - ``scylla --version``
|
||||
* Authenticator - ``grep authenticator /etc/scylla/scylla.yaml``
|
||||
* consistent_cluster_management - ``grep consistent_cluster_management /etc/scylla/scylla.yaml``
|
||||
|
||||
.. Note::
|
||||
|
||||
|
||||
@@ -119,6 +119,7 @@ Add New DC
|
||||
* **listen_address** - IP address that Scylla used to connect to the other Scylla nodes in the cluster.
|
||||
* **endpoint_snitch** - Set the selected snitch.
|
||||
* **rpc_address** - Address for client connections (Thrift, CQL).
|
||||
* **consistent_cluster_management** - set to the same value as used by your existing nodes.
|
||||
|
||||
The parameters ``seeds``, ``cluster_name`` and ``endpoint_snitch`` need to match the existing cluster.
|
||||
|
||||
|
||||
@@ -54,6 +54,8 @@ Procedure
|
||||
|
||||
* **seeds** - Specifies the IP address of an existing node in the cluster. The new node will use this IP to connect to the cluster and learn the cluster topology and state.
|
||||
|
||||
* **consistent_cluster_management** - set to the same value as used by your existing nodes.
|
||||
|
||||
.. note::
|
||||
|
||||
In earlier versions of ScyllaDB, seed nodes assisted in gossip. Starting with Scylla Open Source 4.3 and Scylla Enterprise 2021.1, the seed concept in gossip has been removed. If you are using an earlier version of ScyllaDB, you need to configure the seeds parameter in the following way:
|
||||
|
||||
@@ -70,6 +70,7 @@ the file can be found under ``/etc/scylla/``
|
||||
- **listen_address** - IP address that the Scylla use to connect to other Scylla nodes in the cluster
|
||||
- **endpoint_snitch** - Set the selected snitch
|
||||
- **rpc_address** - Address for client connection (Thrift, CQLSH)
|
||||
- **consistent_cluster_management** - ``true`` by default, can be set to ``false`` if you don't want to use Raft for consistent schema management in this cluster (will be mandatory in later versions). Check the :doc:`Raft in ScyllaDB document</architecture/raft/>` to learn more.
|
||||
|
||||
3. In the ``cassandra-rackdc.properties`` file, edit the rack and data center information.
|
||||
The file can be found under ``/etc/scylla/``.
|
||||
|
||||
@@ -26,6 +26,7 @@ The file can be found under ``/etc/scylla/``
|
||||
- **listen_address** - IP address that Scylla used to connect to other Scylla nodes in the cluster
|
||||
- **endpoint_snitch** - Set the selected snitch
|
||||
- **rpc_address** - Address for client connection (Thrift, CQL)
|
||||
- **consistent_cluster_management** - ``true`` by default, can be set to ``false`` if you don't want to use Raft for consistent schema management in this cluster (will be mandatory in later versions). Check the :doc:`Raft in ScyllaDB document</architecture/raft/>` to learn more.
|
||||
|
||||
3. This step needs to be done **only** if you are using the **GossipingPropertyFileSnitch**. If not, skip this step.
|
||||
In the ``cassandra-rackdc.properties`` file, edit the parameters listed below.
|
||||
|
||||
@@ -63,6 +63,7 @@ Perform the following steps for each node in the new cluster:
|
||||
* **rpc_address** - Address for client connection (Thrift, CQL).
|
||||
* **broadcast_address** - The IP address a node tells other nodes in the cluster to contact it by.
|
||||
* **broadcast_rpc_address** - Default: unset. The RPC address to broadcast to drivers and other Scylla nodes. It cannot be set to 0.0.0.0. If left blank, it will be set to the value of ``rpc_address``. If ``rpc_address`` is set to 0.0.0.0, ``broadcast_rpc_address`` must be explicitly configured.
|
||||
* **consistent_cluster_management** - ``true`` by default, can be set to ``false`` if you don't want to use Raft for consistent schema management in this cluster (will be mandatory in later versions). Check the :doc:`Raft in ScyllaDB document</architecture/raft/>` to learn more.
|
||||
|
||||
#. After you have installed and configured Scylla and edited ``scylla.yaml`` file on all the nodes, start the node specified with the ``seeds`` parameter. Then start the rest of the nodes in your cluster, one at a time, using
|
||||
``sudo systemctl start scylla-server``.
|
||||
|
||||
@@ -25,6 +25,7 @@ Login to one of the nodes in the cluster with (UN) status, collect the following
|
||||
* seeds - ``cat /etc/scylla/scylla.yaml | grep seeds:``
|
||||
* endpoint_snitch - ``cat /etc/scylla/scylla.yaml | grep endpoint_snitch``
|
||||
* Scylla version - ``scylla --version``
|
||||
* consistent_cluster_management - ``grep consistent_cluster_management /etc/scylla/scylla.yaml``
|
||||
|
||||
Procedure
|
||||
---------
|
||||
|
||||
@@ -66,6 +66,8 @@ Procedure
|
||||
|
||||
- **rpc_address** - Address for client connection (Thrift, CQL)
|
||||
|
||||
- **consistent_cluster_management** - set to the same value as used by your existing nodes.
|
||||
|
||||
#. Add the ``replace_node_first_boot`` parameter to the ``scylla.yaml`` config file on the new node. This line can be added to any place in the config file. After a successful node replacement, there is no need to remove it from the ``scylla.yaml`` file. (Note: The obsolete parameters "replace_address" and "replace_address_first_boot" are not supported and should not be used). The value of the ``replace_node_first_boot`` parameter should be the Host ID of the node to be replaced.
|
||||
|
||||
For example (using the Host ID of the failed node from above):
|
||||
|
||||
@@ -68,7 +68,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
@@ -92,13 +92,13 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
1. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in UN status.
|
||||
2. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version.
|
||||
3. Check scylla-enterprise-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
3. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
4. Check again after 2 minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in the cluster.
|
||||
@@ -130,7 +130,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Downgrade to the previous release
|
||||
----------------------------------
|
||||
@@ -164,7 +164,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -114,7 +114,7 @@ New io.conf format was introduced in ScyllaDB 2.3 and 2019.1. If your io.conf do
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
@@ -154,7 +154,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
|
||||
@@ -66,7 +66,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
@@ -16,13 +16,13 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in UN status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version.
|
||||
#. Check scylla-enterprise-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check again after 2 minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in the cluster.
|
||||
@@ -54,7 +54,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Downgrade to the previous release
|
||||
----------------------------------
|
||||
@@ -88,7 +88,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -7,7 +7,7 @@ This document is a step-by-step procedure for upgrading from ScyllaDB Enterprise
|
||||
|
||||
Applicable Versions
|
||||
===================
|
||||
This guide covers upgrading ScyllaDB Enterprise from version 2021.1.x to ScyllaDB Enterprise version 2022.1.y on |OS|. See :doc:`OS Support by Platform and Version </getting-started/os-support>` for information about supported versions.
|
||||
This guide covers upgrading ScyllaDB Enterprise from version **2021.1.8** or later to ScyllaDB Enterprise version 2022.1.y on |OS|. See :doc:`OS Support by Platform and Version </getting-started/os-support>` for information about supported versions.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
@@ -69,7 +69,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
@@ -36,13 +36,13 @@ A new io.conf format was introduced in Scylla 2.3 and 2019.1. If your io.conf do
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in UN status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version.
|
||||
#. Check scylla-enterprise-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check again after two minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in the cluster.
|
||||
@@ -75,7 +75,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
@@ -120,7 +120,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -8,8 +8,8 @@ Upgrading ScyllaDB images requires updating:
|
||||
* Underlying OS packages. Starting with ScyllaDB 4.6, each ScyllaDB version includes a list of 3rd party and
|
||||
OS packages tested with the ScyllaDB release. The list depends on the base OS:
|
||||
|
||||
* ScyllaDB Open Source **4.4** and ScyllaDB Enterprise **2020.1** or earlier are based on **CentOS 7**.
|
||||
* ScyllaDB Open Source **4.5** and ScyllaDB Enterprise **2021.1** or later are based on **Ubuntu 20.04**.
|
||||
* ScyllaDB Open Source **5.0 and 5.1** and ScyllaDB Enterprise **2021.1, 2022.1, and 2022.2** are based on **Ubuntu 20.04**.
|
||||
* ScyllaDB Open Source **5.2** and ScyllaDB Enterprise **2023.1** are based on **Ubuntu 22.04**.
|
||||
|
||||
If you're running ScyllaDB Open Source 5.0 or later or ScyllaDB Enterprise 2021.1.10 or later, you can
|
||||
automatically update 3rd party and OS packages together with the ScyllaDB packages - by running one command.
|
||||
|
||||
@@ -102,7 +102,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
.. _upgrade-debian-ubuntu-enterprise-2022.2:
|
||||
|
||||
@@ -138,7 +138,7 @@ Download and install the new release
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla-enterprise-server
|
||||
sudo apt-get dist-upgrade scylla-enterprise
|
||||
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
@@ -213,13 +213,13 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in ``UN`` status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
|
||||
#. Check scylla-enterprise-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no new errors in the log.
|
||||
#. Check scylla-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no new errors in the log.
|
||||
#. Check again after two minutes, to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
@@ -260,7 +260,7 @@ Drain and gracefully stop the node
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
@@ -359,7 +359,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -63,7 +63,7 @@ Stop ScyllaDB
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl stop scylla-enterprise-server
|
||||
sudo systemctl stop scylla-server
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
@@ -84,7 +84,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl start scylla-enterprise-server
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
Validate
|
||||
--------
|
||||
@@ -125,7 +125,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo systemctl stop scylla-enterprise-server
|
||||
sudo systemctl stop scylla-server
|
||||
|
||||
Downgrade to the previous release
|
||||
-----------------------------------
|
||||
@@ -149,7 +149,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl start scylla-enterprise-server
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
Scylla Metric Update - Scylla 5.1 to 5.2
|
||||
========================================
|
||||
ScyllaDB Metric Update - Scylla 5.1 to 5.2
|
||||
============================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
@@ -7,8 +7,8 @@ Scylla Metric Update - Scylla 5.1 to 5.2
|
||||
|
||||
Scylla 5.2 Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
The following metrics are new in Scylla 5.2
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The following metrics are new in ScyllaDB 5.2
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
@@ -16,5 +16,52 @@ The following metrics are new in Scylla 5.2
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - TODO
|
||||
- TODO
|
||||
* - scylla_database_disk_reads
|
||||
- Holds the number of currently active disk read operations.
|
||||
* - scylla_database_sstables_read
|
||||
- Holds the number of currently read sstables.
|
||||
* - scylla_memory_malloc_failed
|
||||
- Total count of failed memory allocations
|
||||
* - scylla_raft_group0_status
|
||||
- status of the raft group, 0 - disabled, 1 - normal, 2 - aborted
|
||||
* - scylla_storage_proxy_coordinator_cas_read_latency_summary
|
||||
- CAS read latency summary
|
||||
* - scylla_storage_proxy_coordinator_cas_write_latency_summary
|
||||
- CAS write latency summary
|
||||
* - scylla_storage_proxy_coordinator_read_latency_summary
|
||||
- Read latency summary
|
||||
* - scylla_storage_proxy_coordinator_write_latency_summary
|
||||
- Write latency summary
|
||||
* - scylla_streaming_finished_percentage
|
||||
- Finished percentage of node operation on this shard
|
||||
* - scylla_view_update_generator_sstables_pending_work
|
||||
- Number of bytes remaining to be processed from SSTables for view updates
|
||||
|
||||
|
||||
The following metrics are renamed in ScyllaDB 5.2
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - 5.1
|
||||
- 5.2
|
||||
* - scylla_database_active_reads_memory_consumption
|
||||
- scylla_database_reads_memory_consumption
|
||||
* - scylla_memory_regular_virtual_dirty_bytes
|
||||
- scylla_memory_regular_unspooled_dirty_bytes
|
||||
* - scylla_memory_system_virtual_dirty_bytes
|
||||
- scylla_memory_system_unspooled_dirty_bytes
|
||||
* - scylla_memory_virtual_dirty_bytes
|
||||
- scylla_memory_unspooled_dirty_bytes
|
||||
|
||||
Reporting Latencies
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
ScyllaDB 5.2 comes with a new approach to reporting latencies, which are reported using histograms and summaries:
|
||||
|
||||
* Histograms are reported per node.
|
||||
* Summaries are reported per shard and contain P50, P95, and P99 latency.
|
||||
|
||||
For more information on Prometheus histograms and summaries, see the `Prometheus documentation <https://prometheus.io/docs/practices/histograms/>`_.
|
||||
@@ -67,7 +67,11 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
If you enabled consistent cluster management in each node's configuration file, then as soon as every node has been upgraded to the new version, the cluster will start a procedure which initializes the Raft algorithm for consistent cluster metadata management.
|
||||
You must then :ref:`verify <validate-raft-setup>` that this procedure successfully finishes.
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note::
|
||||
|
||||
If you use the `ScyllaDB Monitoring Stack <https://monitoring.docs.scylladb.com/>`_, we recommend upgrading the Monitoring Stack to the latest version **before** upgrading ScyllaDB.
|
||||
|
||||
For ScyllaDB 5.2, you MUST upgrade the Monitoring Stack to version 4.3 or later.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
@@ -104,7 +104,7 @@ A new io.conf format was introduced in ScyllaDB 2.3 and 2019.1. If your io.conf
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl start scylla-enterprise-server
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -144,7 +144,7 @@ Download and install the new release
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get remove scylla\*
|
||||
sudo apt-get install scylla-enterprise-server
|
||||
sudo apt-get install scylla-enterprise
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
@@ -163,7 +163,7 @@ Download and install the new release
|
||||
sudo yum clean all
|
||||
sudo rm -rf /var/cache/yum
|
||||
sudo yum remove scylla\*
|
||||
sudo yum install scylla-enterprise-server
|
||||
sudo yum install scylla-enterprise
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
@@ -221,13 +221,13 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in ``UN`` status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
|
||||
#. Check scylla-enterprise-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no new errors in the log.
|
||||
#. Check scylla-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no new errors in the log.
|
||||
#. Check again after two minutes, to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
@@ -269,7 +269,7 @@ Drain and gracefully stop the node
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
|
||||
@@ -175,7 +175,7 @@ class built_indexes_virtual_reader {
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(position_range range) override {
|
||||
forward_buffer_to(range.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
// range contains index names (e.g., xyz) but the underlying table
|
||||
// contains view names (e.g., xyz_index) so we need to add the
|
||||
|
||||
2
main.cc
2
main.cc
@@ -476,7 +476,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// We need to have the entire app config to run the app, but we need to
|
||||
// run the app to read the config file with UDF specific options so that
|
||||
// we know whether we need to reserve additional memory for UDFs.
|
||||
app_cfg.reserve_additional_memory = 50 * 1024 * 1024;
|
||||
app_cfg.reserve_additional_memory = db::config::wasm_udf_reserved_memory;
|
||||
app_template app(std::move(app_cfg));
|
||||
|
||||
auto ext = std::make_shared<db::extensions>();
|
||||
|
||||
@@ -177,7 +177,6 @@ private:
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) {
|
||||
_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
|
||||
stop_iteration gc_consumer_stop = stop_iteration::no;
|
||||
stop_iteration consumer_stop = stop_iteration::no;
|
||||
if (rtc.tombstone() <= _partition_tombstone) {
|
||||
@@ -199,6 +198,7 @@ private:
|
||||
partition_is_not_empty(consumer);
|
||||
_current_emitted_tombstone = rtc.tombstone();
|
||||
consumer_stop = consumer.consume(std::move(rtc));
|
||||
_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
|
||||
}
|
||||
return gc_consumer_stop || consumer_stop;
|
||||
}
|
||||
|
||||
@@ -1144,7 +1144,7 @@ future<> server_impl::applier_fiber() {
|
||||
co_await _state_machine->apply(std::move(commands));
|
||||
} catch (abort_requested_exception& e) {
|
||||
logger.info("[{}] applier fiber stopped because state machine was aborted: {}", _id, e);
|
||||
co_return;
|
||||
throw stop_apply_fiber{};
|
||||
} catch (...) {
|
||||
std::throw_with_nested(raft::state_machine_error{});
|
||||
}
|
||||
|
||||
@@ -383,6 +383,10 @@ reader_concurrency_semaphore& reader_permit::semaphore() {
|
||||
return _impl->semaphore();
|
||||
}
|
||||
|
||||
reader_permit::state reader_permit::get_state() const {
|
||||
return _impl->get_state();
|
||||
}
|
||||
|
||||
bool reader_permit::needs_readmission() const {
|
||||
return _impl->needs_readmission();
|
||||
}
|
||||
@@ -771,10 +775,7 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read(evict_reason reas
|
||||
|
||||
void reader_concurrency_semaphore::clear_inactive_reads() {
|
||||
while (!_inactive_reads.empty()) {
|
||||
auto& ir = _inactive_reads.front();
|
||||
close_reader(std::move(ir.reader));
|
||||
// Destroying the read unlinks it too.
|
||||
std::unique_ptr<inactive_read> _(&*_inactive_reads.begin());
|
||||
evict(_inactive_reads.front(), evict_reason::manual);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -130,6 +130,8 @@ public:
|
||||
|
||||
reader_concurrency_semaphore& semaphore();
|
||||
|
||||
state get_state() const;
|
||||
|
||||
bool needs_readmission() const;
|
||||
|
||||
// Call only when needs_readmission() = true.
|
||||
@@ -184,6 +186,8 @@ public:
|
||||
reader_resources resources() const { return _resources; }
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, reader_permit::state s);
|
||||
|
||||
/// Mark a permit as used.
|
||||
///
|
||||
/// Conceptually, a permit is considered used, when at least one reader
|
||||
|
||||
@@ -733,7 +733,7 @@ future<> merging_reader<P>::fast_forward_to(const dht::partition_range& pr) {
|
||||
|
||||
template <FragmentProducer P>
|
||||
future<> merging_reader<P>::fast_forward_to(position_range pr) {
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _merger.fast_forward_to(std::move(pr));
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ public:
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr) override {
|
||||
_end_of_stream = false;
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
return _underlying->fast_forward_to(std::move(pr));
|
||||
}
|
||||
virtual future<> next_partition() override {
|
||||
|
||||
@@ -54,7 +54,7 @@ public:
|
||||
return _rd.fast_forward_to(pr);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr) override {
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _rd.fast_forward_to(std::move(pr));
|
||||
}
|
||||
|
||||
@@ -153,7 +153,6 @@ public:
|
||||
void reserve_additional(size_t n) {
|
||||
_buffer.reserve(_buffer.size() + n);
|
||||
}
|
||||
void forward_buffer_to(const position_in_partition& pos);
|
||||
void clear_buffer_to_next_partition();
|
||||
template<typename Source>
|
||||
future<bool> fill_buffer_from(Source&);
|
||||
@@ -722,7 +721,7 @@ flat_mutation_reader_v2 transform(flat_mutation_reader_v2 r, T t) {
|
||||
return _reader.fast_forward_to(pr);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr) override {
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(std::move(pr));
|
||||
}
|
||||
|
||||
@@ -158,7 +158,7 @@ future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) {
|
||||
}
|
||||
|
||||
future<> foreign_reader::fast_forward_to(position_range pr) {
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return forward_operation([reader = _reader.get(), pr = std::move(pr)] () {
|
||||
return reader->fast_forward_to(std::move(pr));
|
||||
|
||||
@@ -385,11 +385,6 @@ flat_mutation_reader_v2::~flat_mutation_reader_v2() {
|
||||
}
|
||||
}
|
||||
|
||||
void flat_mutation_reader_v2::impl::forward_buffer_to(const position_in_partition& pos) {
|
||||
clear_buffer();
|
||||
_buffer_size = compute_buffer_size(*_schema, _buffer);
|
||||
}
|
||||
|
||||
void flat_mutation_reader_v2::impl::clear_buffer_to_next_partition() {
|
||||
auto next_partition_start = std::find_if(_buffer.begin(), _buffer.end(), [] (const mutation_fragment_v2& mf) {
|
||||
return mf.is_partition_start();
|
||||
|
||||
@@ -167,16 +167,19 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) {
|
||||
_current = std::move(pr);
|
||||
_end_of_stream = false;
|
||||
_current_has_content = false;
|
||||
forward_buffer_to(_current.start());
|
||||
clear_buffer();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (!is_buffer_empty()) {
|
||||
co_return;
|
||||
}
|
||||
_end_of_stream = false;
|
||||
if (!_next || !_next->is_partition_start()) {
|
||||
co_await _underlying.next_partition();
|
||||
_next = {};
|
||||
}
|
||||
clear_buffer_to_next_partition();
|
||||
_current = {
|
||||
position_in_partition::for_partition_start(),
|
||||
position_in_partition(position_in_partition::after_static_row_tag_t())
|
||||
@@ -267,7 +270,7 @@ flat_mutation_reader_v2 make_slicing_filtering_reader(flat_mutation_reader_v2 rd
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(position_range pr) override {
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _rd.fast_forward_to(std::move(pr));
|
||||
}
|
||||
@@ -411,25 +414,32 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
|
||||
flat_mutation_reader_v2 _underlying;
|
||||
bool _single_partition;
|
||||
bool _static_row_done = false;
|
||||
bool _partition_is_open = false;
|
||||
bool is_end_end_of_underlying_stream() const {
|
||||
return _underlying.is_buffer_empty() && _underlying.is_end_of_stream();
|
||||
}
|
||||
future<> on_end_of_underlying_stream() {
|
||||
if (!_static_row_done) {
|
||||
_static_row_done = true;
|
||||
return _underlying.fast_forward_to(position_range::all_clustered_rows());
|
||||
if (_partition_is_open) {
|
||||
if (!_static_row_done) {
|
||||
_static_row_done = true;
|
||||
return _underlying.fast_forward_to(position_range::all_clustered_rows());
|
||||
}
|
||||
push_mutation_fragment(*_schema, _permit, partition_end());
|
||||
reset_partition();
|
||||
}
|
||||
push_mutation_fragment(*_schema, _permit, partition_end());
|
||||
if (_single_partition) {
|
||||
_end_of_stream = true;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _underlying.next_partition().then([this] {
|
||||
_static_row_done = false;
|
||||
return _underlying.fill_buffer().then([this] {
|
||||
_end_of_stream = is_end_end_of_underlying_stream();
|
||||
return _underlying.next_partition().then([this] {
|
||||
return _underlying.fill_buffer().then([this] {
|
||||
_end_of_stream = is_end_end_of_underlying_stream();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
void reset_partition() {
|
||||
_partition_is_open = false;
|
||||
_static_row_done = false;
|
||||
}
|
||||
public:
|
||||
reader(flat_mutation_reader_v2 r, bool single_partition)
|
||||
@@ -440,6 +450,9 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
|
||||
virtual future<> fill_buffer() override {
|
||||
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
|
||||
return fill_buffer_from(_underlying).then([this] (bool underlying_finished) {
|
||||
if (!_partition_is_open && !is_buffer_empty()) {
|
||||
_partition_is_open = true;
|
||||
}
|
||||
if (underlying_finished) {
|
||||
return on_end_of_underlying_stream();
|
||||
}
|
||||
@@ -452,17 +465,27 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
|
||||
}
|
||||
virtual future<> next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
auto maybe_next_partition = make_ready_future<>();;
|
||||
auto maybe_next_partition = make_ready_future<>();
|
||||
if (is_buffer_empty()) {
|
||||
if (_end_of_stream || (_partition_is_open && _single_partition)) {
|
||||
_end_of_stream = true;
|
||||
return maybe_next_partition;
|
||||
}
|
||||
reset_partition();
|
||||
maybe_next_partition = _underlying.next_partition();
|
||||
}
|
||||
return maybe_next_partition.then([this] {
|
||||
_end_of_stream = is_end_end_of_underlying_stream();
|
||||
});
|
||||
return maybe_next_partition.then([this] {
|
||||
_end_of_stream = is_end_end_of_underlying_stream();
|
||||
});
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||
_end_of_stream = false;
|
||||
clear_buffer();
|
||||
if (_single_partition) {
|
||||
_end_of_stream = true;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
reset_partition();
|
||||
_end_of_stream = false;
|
||||
return _underlying.fast_forward_to(pr);
|
||||
}
|
||||
virtual future<> close() noexcept override {
|
||||
@@ -1532,7 +1555,7 @@ public:
|
||||
return _reader.fast_forward_to(pr);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr) override {
|
||||
forward_buffer_to(pr.start());
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(std::move(pr));
|
||||
}
|
||||
|
||||
@@ -679,36 +679,6 @@ void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_
|
||||
}
|
||||
}
|
||||
|
||||
// For local reader: a permit is taken on the local shard.
|
||||
// For multi-shard reader: a permit is taken on each shard, smp::count permits
|
||||
// are taken in total.
|
||||
struct repair_reader_permit_meta {
|
||||
std::vector<foreign_ptr<lw_shared_ptr<semaphore_units<>>>> permits{smp::count};
|
||||
};
|
||||
|
||||
// If all_shards is set to true, permits on each shard are taken.
|
||||
// If all_shards is set to false, a single permit is taken from the specified shard.
|
||||
future<repair_reader_permit_meta> get_global_reader_permit(repair_service& rs, unsigned shard, bool all_shards) {
|
||||
repair_reader_permit_meta meta;
|
||||
// We need to serialize the process of taking permits. So the code to take
|
||||
// the permits are performed on a single shard. The last shard is chosen as
|
||||
// the coordinator shard.
|
||||
co_await rs.container().invoke_on(smp::count -1, [&meta, shard, all_shards] (repair_service& rs) -> future<> {
|
||||
co_await with_semaphore(rs.lock_sem(), 1, [&rs, &meta, shard, all_shards] () -> future<> {
|
||||
co_await rs.container().invoke_on_all([&meta, shard, all_shards] (repair_service& rs) -> future<> {
|
||||
if (all_shards || shard == this_shard_id()) {
|
||||
auto& reader_sem = rs.reader_sem();
|
||||
auto permit = co_await seastar::get_units(reader_sem, 1);
|
||||
auto ptr = make_lw_shared<semaphore_units<>>(std::move(permit));
|
||||
meta.permits[this_shard_id()] = make_foreign(std::move(ptr));
|
||||
}
|
||||
co_return;
|
||||
});
|
||||
});
|
||||
});
|
||||
co_return meta;
|
||||
};
|
||||
|
||||
class repair_meta {
|
||||
friend repair_meta_tracker;
|
||||
public:
|
||||
@@ -747,7 +717,6 @@ private:
|
||||
// follower nr peers is always one because repair master is the only peer.
|
||||
size_t _nr_peer_nodes= 1;
|
||||
repair_stats _stats;
|
||||
bool _is_local_reader;
|
||||
repair_reader _repair_reader;
|
||||
lw_shared_ptr<repair_writer> _repair_writer;
|
||||
// Contains rows read from disk
|
||||
@@ -773,7 +742,6 @@ private:
|
||||
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
|
||||
std::optional<shared_future<>> _stopped;
|
||||
repair_hasher _repair_hasher;
|
||||
std::optional<repair_reader_permit_meta> _reader_permit;
|
||||
public:
|
||||
std::vector<repair_node_state>& all_nodes() {
|
||||
return _all_node_states;
|
||||
@@ -849,7 +817,6 @@ public:
|
||||
, _remote_sharder(make_remote_sharder())
|
||||
, _same_sharding_config(is_same_sharding_config())
|
||||
, _nr_peer_nodes(nr_peer_nodes)
|
||||
, _is_local_reader(_repair_master || _same_sharding_config)
|
||||
, _repair_reader(
|
||||
_db,
|
||||
_cf,
|
||||
@@ -1086,12 +1053,7 @@ private:
|
||||
future<std::tuple<std::list<repair_row>, size_t>>
|
||||
read_rows_from_disk(size_t cur_size) {
|
||||
using value_type = std::tuple<std::list<repair_row>, size_t>;
|
||||
if (!_reader_permit) {
|
||||
bool all_shards = !_is_local_reader;
|
||||
auto permit = co_await get_global_reader_permit(_rs, this_shard_id(), all_shards);
|
||||
_reader_permit = std::optional<repair_reader_permit_meta>(std::move(permit));
|
||||
}
|
||||
auto ret = co_await do_with(cur_size, size_t(0), std::list<repair_row>(), [this] (size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
return do_with(cur_size, size_t(0), std::list<repair_row>(), [this] (size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
return repeat([this, &cur_size, &cur_rows, &new_rows_size] () mutable {
|
||||
if (cur_size >= _max_row_buf_size) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
@@ -1115,7 +1077,6 @@ private:
|
||||
return make_ready_future<value_type>(value_type(std::move(cur_rows), new_rows_size));
|
||||
});
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<> clear_row_buf() {
|
||||
@@ -2972,9 +2933,6 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
|
||||
, _node_ops_metrics(_repair_module)
|
||||
, _max_repair_memory(max_repair_memory)
|
||||
, _memory_sem(max_repair_memory)
|
||||
// The "10" below should be the same mas max_count_streaming_concurrent_reads.
|
||||
// FIXME: use that named constant instead of the number here.
|
||||
, _reader_sem(10)
|
||||
{
|
||||
tm.register_module("repair", _repair_module);
|
||||
if (this_shard_id() == 0) {
|
||||
|
||||
@@ -104,8 +104,6 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
|
||||
size_t _max_repair_memory;
|
||||
seastar::semaphore _memory_sem;
|
||||
seastar::semaphore _reader_sem;
|
||||
seastar::semaphore _lock_sem{1};
|
||||
|
||||
future<> init_ms_handlers();
|
||||
future<> uninit_ms_handlers();
|
||||
@@ -174,8 +172,6 @@ public:
|
||||
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
|
||||
size_t max_repair_memory() const { return _max_repair_memory; }
|
||||
seastar::semaphore& memory_sem() { return _memory_sem; }
|
||||
seastar::semaphore& reader_sem() { return _reader_sem; }
|
||||
seastar::semaphore& lock_sem() { return _lock_sem; }
|
||||
repair_module& get_repair_module() noexcept {
|
||||
return *_repair_module;
|
||||
}
|
||||
|
||||
@@ -2241,16 +2241,24 @@ future<> database::stop() {
|
||||
if (_schema_commitlog) {
|
||||
co_await _schema_commitlog->release();
|
||||
}
|
||||
dblog.info("Shutting down system dirty memory manager");
|
||||
co_await _system_dirty_memory_manager.shutdown();
|
||||
dblog.info("Shutting down dirty memory manager");
|
||||
co_await _dirty_memory_manager.shutdown();
|
||||
dblog.info("Shutting down memtable controller");
|
||||
co_await _memtable_controller.shutdown();
|
||||
dblog.info("Closing user sstables manager");
|
||||
co_await _user_sstables_manager->close();
|
||||
dblog.info("Closing system sstables manager");
|
||||
co_await _system_sstables_manager->close();
|
||||
dblog.info("Stopping querier cache");
|
||||
co_await _querier_cache.stop();
|
||||
dblog.info("Stopping concurrency semaphores");
|
||||
co_await _read_concurrency_sem.stop();
|
||||
co_await _streaming_concurrency_sem.stop();
|
||||
co_await _compaction_concurrency_sem.stop();
|
||||
co_await _system_read_concurrency_sem.stop();
|
||||
dblog.info("Joining memtable update action");
|
||||
co_await _update_memtable_flush_static_shares_action.join();
|
||||
}
|
||||
|
||||
|
||||
@@ -355,7 +355,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
|
||||
&error_handler_gen_for_upload_dir);
|
||||
}, [] (const sstables::shared_sstable&) { return true; }).get();
|
||||
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0();
|
||||
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get0();
|
||||
|
||||
auto datadir = upload.parent_path();
|
||||
if (use_view_update_path) {
|
||||
|
||||
@@ -1253,10 +1253,13 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
|
||||
|
||||
new_sstables = make_lw_shared<sstables::sstable_set>(new_cs.make_sstable_set(t._schema));
|
||||
cg.main_sstables()->for_each_sstable([this] (const sstables::shared_sstable& s) {
|
||||
add_sstable_to_backlog_tracker(new_bt, s);
|
||||
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
|
||||
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->all()->size());
|
||||
cg.main_sstables()->for_each_sstable([this, &new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
||||
new_sstables->insert(s);
|
||||
new_sstables_for_backlog_tracker.push_back(s);
|
||||
});
|
||||
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
|
||||
}
|
||||
|
||||
void execute() noexcept {
|
||||
|
||||
24
row_cache.cc
24
row_cache.cc
@@ -347,8 +347,9 @@ future<> read_context::create_underlying() {
|
||||
});
|
||||
}
|
||||
|
||||
static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader) {
|
||||
static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader, mutation_fragment_v2 partition_start) {
|
||||
auto res = make_delegating_reader(reader.underlying().underlying());
|
||||
res.unpop_mutation_fragment(std::move(partition_start));
|
||||
res.upgrade_schema(reader.schema());
|
||||
return make_nonforwardable(std::move(res), true);
|
||||
}
|
||||
@@ -381,8 +382,7 @@ private:
|
||||
});
|
||||
} else {
|
||||
_cache._tracker.on_mispopulate();
|
||||
_reader = read_directly_from_underlying(*_read_context);
|
||||
this->push_mutation_fragment(std::move(*mfopt));
|
||||
_reader = read_directly_from_underlying(*_read_context, std::move(*mfopt));
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -507,15 +507,13 @@ public:
|
||||
, _read_context(ctx)
|
||||
{}
|
||||
|
||||
using read_result = std::tuple<flat_mutation_reader_v2_opt, mutation_fragment_v2_opt>;
|
||||
|
||||
future<read_result> operator()() {
|
||||
future<flat_mutation_reader_v2_opt> operator()() {
|
||||
return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable {
|
||||
{
|
||||
if (!mfopt) {
|
||||
return _cache._read_section(_cache._tracker.region(), [&] {
|
||||
this->handle_end_of_stream();
|
||||
return make_ready_future<read_result>(read_result(std::nullopt, std::nullopt));
|
||||
return make_ready_future<flat_mutation_reader_v2_opt>(std::nullopt);
|
||||
});
|
||||
}
|
||||
_cache.on_partition_miss();
|
||||
@@ -526,14 +524,12 @@ public:
|
||||
cache_entry& e = _cache.find_or_create_incomplete(ps, _reader.creation_phase(),
|
||||
this->can_set_continuity() ? &*_last_key : nullptr);
|
||||
_last_key = row_cache::previous_entry_pointer(key);
|
||||
return make_ready_future<read_result>(
|
||||
read_result(e.read(_cache, _read_context, _reader.creation_phase()), std::nullopt));
|
||||
return make_ready_future<flat_mutation_reader_v2_opt>(e.read(_cache, _read_context, _reader.creation_phase()));
|
||||
});
|
||||
} else {
|
||||
_cache._tracker.on_mispopulate();
|
||||
_last_key = row_cache::previous_entry_pointer(key);
|
||||
return make_ready_future<read_result>(
|
||||
read_result(read_directly_from_underlying(_read_context), std::move(mfopt)));
|
||||
return make_ready_future<flat_mutation_reader_v2_opt>(read_directly_from_underlying(_read_context, std::move(*mfopt)));
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -637,12 +633,8 @@ private:
|
||||
}
|
||||
|
||||
future<flat_mutation_reader_v2_opt> read_from_secondary() {
|
||||
return _secondary_reader().then([this] (range_populating_reader::read_result&& res) {
|
||||
auto&& [fropt, ps] = res;
|
||||
return _secondary_reader().then([this] (flat_mutation_reader_v2_opt&& fropt) {
|
||||
if (fropt) {
|
||||
if (ps) {
|
||||
push_mutation_fragment(std::move(*ps));
|
||||
}
|
||||
return make_ready_future<flat_mutation_reader_v2_opt>(std::move(fropt));
|
||||
} else {
|
||||
_secondary_in_progress = false;
|
||||
|
||||
@@ -63,4 +63,15 @@ MemoryLimit=$MEMORY_LIMIT
|
||||
EOS
|
||||
fi
|
||||
|
||||
if [ -e /etc/systemd/system/systemd-coredump@.service.d/timeout.conf ]; then
|
||||
COREDUMP_RUNTIME_MAX=$(grep RuntimeMaxSec /etc/systemd/system/systemd-coredump@.service.d/timeout.conf)
|
||||
if [ -z $COREDUMP_RUNTIME_MAX ]; then
|
||||
cat << EOS > /etc/systemd/system/systemd-coredump@.service.d/timeout.conf
|
||||
[Service]
|
||||
RuntimeMaxSec=infinity
|
||||
TimeoutSec=infinity
|
||||
EOS
|
||||
fi
|
||||
fi
|
||||
|
||||
systemctl --system daemon-reload >/dev/null || true
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 8889cbc198...1488aaf842
@@ -177,6 +177,10 @@ public:
|
||||
return _remote_address.port();
|
||||
}
|
||||
|
||||
const socket_address& get_remote_address() const {
|
||||
return _remote_address;
|
||||
}
|
||||
|
||||
const timeout_config& get_timeout_config() const {
|
||||
return _timeout_config;
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
|
||||
auto ex = f2.get_exception();
|
||||
logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
|
||||
}
|
||||
auto upgrade_if_needed = [schema = std::move(schema)] (std::optional<proposal> p) mutable {
|
||||
auto upgrade_if_needed = [schema = std::move(schema)] (std::optional<proposal> p) {
|
||||
if (!p || p->update.schema_version() == schema->version()) {
|
||||
return make_ready_future<std::optional<proposal>>(std::move(p));
|
||||
}
|
||||
@@ -115,7 +115,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
|
||||
// for that version and upgrade the mutation with it.
|
||||
logger.debug("Stored mutation references outdated schema version. "
|
||||
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
|
||||
return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema = std::move(schema), p = std::move(p)] (const column_mapping& cm) {
|
||||
return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) {
|
||||
return make_ready_future<std::optional<proposal>>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
|
||||
});
|
||||
};
|
||||
|
||||
@@ -969,7 +969,11 @@ with_timeout(abort_source& as, db::timeout_clock::duration d, F&& fun) {
|
||||
// FIXME: using lambda as workaround for clang bug #50345 (miscompiling coroutine templates).
|
||||
auto impl = [] (abort_source& as, db::timeout_clock::duration d, F&& fun) -> future_t {
|
||||
abort_source timeout_src;
|
||||
auto sub = as.subscribe([&timeout_src] () noexcept { timeout_src.request_abort(); });
|
||||
auto sub = as.subscribe([&timeout_src] () noexcept {
|
||||
if (!timeout_src.abort_requested()) {
|
||||
timeout_src.request_abort();
|
||||
}
|
||||
});
|
||||
if (!sub) {
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
|
||||
@@ -2097,20 +2097,16 @@ future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> d
|
||||
|
||||
auto cdc = _proxy->get_cdc_service();
|
||||
if (cdc && cdc->needs_cdc_augmentation(update_mut_vec)) {
|
||||
f_cdc = cdc->augment_mutation_call(_timeout, std::move(update_mut_vec), tr_state, _cl_for_learn)
|
||||
.then([this, base_tbl_id, cdc = cdc->shared_from_this()] (std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>&& t) {
|
||||
auto mutations = std::move(std::get<0>(t));
|
||||
auto tracker = std::move(std::get<1>(t));
|
||||
// Pick only the CDC ("augmenting") mutations
|
||||
std::erase_if(mutations, [base_tbl_id = std::move(base_tbl_id)] (const mutation& v) {
|
||||
return v.schema()->id() == base_tbl_id;
|
||||
});
|
||||
if (mutations.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _proxy->mutate_internal(std::move(mutations), _cl_for_learn, false, tr_state, _permit, _timeout, std::move(tracker))
|
||||
.then(utils::result_into_future<result<>>);
|
||||
auto cdc_shared = cdc->shared_from_this(); // keep CDC service alive
|
||||
auto [mutations, tracker] = co_await cdc->augment_mutation_call(_timeout, std::move(update_mut_vec), tr_state, _cl_for_learn);
|
||||
// Pick only the CDC ("augmenting") mutations
|
||||
std::erase_if(mutations, [base_tbl_id = std::move(base_tbl_id)] (const mutation& v) {
|
||||
return v.schema()->id() == base_tbl_id;
|
||||
});
|
||||
if (!mutations.empty()) {
|
||||
f_cdc = _proxy->mutate_internal(std::move(mutations), _cl_for_learn, false, tr_state, _permit, _timeout, std::move(tracker))
|
||||
.then(utils::result_into_future<result<>>);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2119,7 +2115,7 @@ future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> d
|
||||
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout)
|
||||
.then(utils::result_into_future<result<>>);
|
||||
|
||||
return when_all_succeed(std::move(f_cdc), std::move(f_lwt)).discard_result();
|
||||
co_await when_all_succeed(std::move(f_cdc), std::move(f_lwt)).discard_result();
|
||||
}
|
||||
|
||||
void paxos_response_handler::prune(utils::UUID ballot) {
|
||||
|
||||
@@ -1465,7 +1465,7 @@ public:
|
||||
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range cr) override {
|
||||
forward_buffer_to(cr.start());
|
||||
clear_buffer();
|
||||
if (!_partition_finished) {
|
||||
_end_of_stream = false;
|
||||
return advance_context(_consumer.fast_forward_to(std::move(cr)));
|
||||
|
||||
@@ -1653,7 +1653,7 @@ public:
|
||||
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range cr) override {
|
||||
forward_buffer_to(cr.start());
|
||||
clear_buffer();
|
||||
if (!_partition_finished) {
|
||||
_end_of_stream = false;
|
||||
return advance_context(_consumer.fast_forward_to(std::move(cr)));
|
||||
|
||||
@@ -2518,9 +2518,14 @@ static future<bool> do_validate_uncompressed(input_stream<char>& stream, const c
|
||||
offset += buf.size();
|
||||
}
|
||||
|
||||
if (!stream.eof()) {
|
||||
sstlog.error("Chunk count mismatch between CRC.db and Data.db at offset {}: expected {} chunks but data file has more", offset, checksum.checksums.size());
|
||||
valid = false;
|
||||
{
|
||||
// We should be at EOF here, but the flag might not be set yet. To ensure
|
||||
// it is set, try to read some more. This should return an empty buffer.
|
||||
auto buf = co_await stream.read();
|
||||
if (!buf.empty()) {
|
||||
sstlog.error("Chunk count mismatch between CRC.db and Data.db at offset {}: expected {} chunks but data file has more", offset, checksum.checksums.size());
|
||||
valid = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (actual_full_checksum != expected_digest) {
|
||||
|
||||
@@ -136,7 +136,9 @@ struct sstable_open_config {
|
||||
// fields respectively. Problematic sstables might fail to load. Set to
|
||||
// false if you want to disable this, to be able to read such sstables.
|
||||
// Should only be disabled for diagnostics purposes.
|
||||
bool load_first_and_last_position_metadata = true;
|
||||
// FIXME: Enable it by default once the root cause of large allocation when reading sstable in reverse is fixed.
|
||||
// Ref: https://github.com/scylladb/scylladb/issues/11642
|
||||
bool load_first_and_last_position_metadata = false;
|
||||
};
|
||||
|
||||
class sstable : public enable_lw_shared_from_this<sstable> {
|
||||
|
||||
@@ -29,7 +29,7 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
|
||||
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), *cf, reason);
|
||||
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);
|
||||
//FIXME: for better estimations this should be transmitted from remote
|
||||
auto metadata = mutation_source_metadata{};
|
||||
auto& cs = cf->get_compaction_strategy();
|
||||
|
||||
60
test.py
60
test.py
@@ -343,7 +343,16 @@ class PythonTestSuite(TestSuite):
|
||||
pool_size = cfg.get("pool_size", 2)
|
||||
|
||||
self.create_cluster = self.get_cluster_factory(cluster_size)
|
||||
self.clusters = Pool(pool_size, self.create_cluster)
|
||||
async def recycle_cluster(cluster: ScyllaCluster) -> None:
|
||||
"""When a dirty cluster is returned to the cluster pool,
|
||||
stop it and release the used IPs. We don't necessarily uninstall() it yet,
|
||||
which would delete the log file and directory - we might want to preserve
|
||||
these if it came from a failed test.
|
||||
"""
|
||||
await cluster.stop()
|
||||
await cluster.release_ips()
|
||||
|
||||
self.clusters = Pool(pool_size, self.create_cluster, recycle_cluster)
|
||||
|
||||
def get_cluster_factory(self, cluster_size: int) -> Callable[..., Awaitable]:
|
||||
def create_server(create_cfg: ScyllaCluster.CreateServerParams):
|
||||
@@ -686,7 +695,8 @@ class CQLApprovalTest(Test):
|
||||
if self.server_log is not None:
|
||||
logger.info("Server log:\n%s", self.server_log)
|
||||
|
||||
async with self.suite.clusters.instance(logger) as cluster:
|
||||
# TODO: consider dirty_on_exception=True
|
||||
async with self.suite.clusters.instance(False, logger) as cluster:
|
||||
try:
|
||||
cluster.before_test(self.uname)
|
||||
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
||||
@@ -842,26 +852,32 @@ class PythonTest(Test):
|
||||
|
||||
loggerPrefix = self.mode + '/' + self.uname
|
||||
logger = LogPrefixAdapter(logging.getLogger(loggerPrefix), {'prefix': loggerPrefix})
|
||||
async with self.suite.clusters.instance(logger) as cluster:
|
||||
try:
|
||||
cluster.before_test(self.uname)
|
||||
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
||||
self.args.insert(0, "--host={}".format(cluster.endpoint()))
|
||||
self.is_before_test_ok = True
|
||||
cluster.take_log_savepoint()
|
||||
status = await run_test(self, options)
|
||||
cluster.after_test(self.uname)
|
||||
self.is_after_test_ok = True
|
||||
self.success = status
|
||||
except Exception as e:
|
||||
self.server_log = cluster.read_server_log()
|
||||
self.server_log_filename = cluster.server_log_filename()
|
||||
if self.is_before_test_ok is False:
|
||||
print("Test {} pre-check failed: {}".format(self.name, str(e)))
|
||||
print("Server log of the first server:\n{}".format(self.server_log))
|
||||
# Don't try to continue if the cluster is broken
|
||||
raise
|
||||
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
|
||||
cluster = await self.suite.clusters.get(logger)
|
||||
try:
|
||||
cluster.before_test(self.uname)
|
||||
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
||||
self.args.insert(0, "--host={}".format(cluster.endpoint()))
|
||||
self.is_before_test_ok = True
|
||||
cluster.take_log_savepoint()
|
||||
status = await run_test(self, options)
|
||||
cluster.after_test(self.uname)
|
||||
self.is_after_test_ok = True
|
||||
self.success = status
|
||||
except Exception as e:
|
||||
self.server_log = cluster.read_server_log()
|
||||
self.server_log_filename = cluster.server_log_filename()
|
||||
if self.is_before_test_ok is False:
|
||||
print("Test {} pre-check failed: {}".format(self.name, str(e)))
|
||||
print("Server log of the first server:\n{}".format(self.server_log))
|
||||
logger.info(f"Discarding cluster after failed start for test %s...", self.name)
|
||||
elif self.is_after_test_ok is False:
|
||||
print("Test {} post-check failed: {}".format(self.name, str(e)))
|
||||
print("Server log of the first server:\n{}".format(self.server_log))
|
||||
logger.info(f"Discarding cluster after failed test %s...", self.name)
|
||||
await self.suite.clusters.put(cluster, is_dirty=True)
|
||||
else:
|
||||
await self.suite.clusters.put(cluster, is_dirty=False)
|
||||
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
|
||||
return self
|
||||
|
||||
def write_junit_failure_report(self, xml_res: ET.Element) -> None:
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include "readers/from_fragments_v2.hh"
|
||||
#include "readers/forwardable_v2.hh"
|
||||
#include "readers/compacting.hh"
|
||||
#include "readers/nonforwardable.hh"
|
||||
|
||||
struct mock_consumer {
|
||||
struct result {
|
||||
@@ -110,193 +111,187 @@ static size_t count_fragments(mutation m) {
|
||||
return res;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
size_t fragments_in_m = count_fragments(m);
|
||||
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
|
||||
auto close_reader = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
|
||||
r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
|
||||
if (result._fragments.empty()) {
|
||||
continue;
|
||||
}
|
||||
for (auto& mf : result._fragments) {
|
||||
r2.produces(*m.schema(), mf);
|
||||
}
|
||||
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
size_t fragments_in_m = count_fragments(m);
|
||||
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
|
||||
auto close_reader = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
|
||||
r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
|
||||
if (result._fragments.empty()) {
|
||||
continue;
|
||||
}
|
||||
});
|
||||
for (auto& mf : result._fragments) {
|
||||
r2.produces(*m.schema(), mf);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto test = [&semaphore] (mutation m1, mutation m2) {
|
||||
size_t fragments_in_m1 = count_fragments(m1);
|
||||
size_t fragments_in_m2 = count_fragments(m2);
|
||||
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto test = [&semaphore] (mutation m1, mutation m2) {
|
||||
size_t fragments_in_m1 = count_fragments(m1);
|
||||
size_t fragments_in_m2 = count_fragments(m2);
|
||||
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
|
||||
size_t tombstones_count = 0;
|
||||
if (m1.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
if (m2.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
|
||||
mfopt = r2().get0();
|
||||
}
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
}
|
||||
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
|
||||
size_t tombstones_count = 0;
|
||||
if (m1.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
};
|
||||
for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
|
||||
if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
|
||||
test(m, m2);
|
||||
} else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
|
||||
test(m2, m);
|
||||
if (m2.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
});
|
||||
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
|
||||
mfopt = r2().get0();
|
||||
}
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
}
|
||||
};
|
||||
for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
|
||||
if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
|
||||
test(m, m2);
|
||||
} else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
|
||||
test(m2, m);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
std::vector<frozen_mutation> fms;
|
||||
SEASTAR_THREAD_TEST_CASE(test_fragmenting_and_freezing) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
std::vector<frozen_mutation> fms;
|
||||
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
fms.emplace_back(std::move(fm));
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), 1);
|
||||
|
||||
auto m1 = fms.back().unfreeze(m.schema());
|
||||
BOOST_REQUIRE_EQUAL(m, m1);
|
||||
|
||||
fms.clear();
|
||||
|
||||
std::optional<bool> fragmented;
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!fragmented || *fragmented == frag);
|
||||
*fragmented = frag;
|
||||
fms.emplace_back(std::move(fm));
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, 1).get0();
|
||||
|
||||
auto&& rows = m.partition().non_dummy_rows();
|
||||
auto expected_fragments = std::distance(rows.begin(), rows.end())
|
||||
+ m.partition().row_tombstones().size()
|
||||
+ !m.partition().static_row().empty();
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
|
||||
BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
|
||||
|
||||
auto m2 = fms.back().unfreeze(m.schema());
|
||||
fms.pop_back();
|
||||
mutation_application_stats app_stats;
|
||||
while (!fms.empty()) {
|
||||
m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
|
||||
fms.pop_back();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(m, m2);
|
||||
});
|
||||
|
||||
auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
auto s = muts[0].schema();
|
||||
|
||||
std::vector<frozen_mutation> frozen;
|
||||
|
||||
// Freeze all
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
fms.emplace_back(std::move(fm));
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
|
||||
for (auto j = 0u; j < muts.size(); j++) {
|
||||
BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), 1);
|
||||
// Freeze first
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(frozen.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
|
||||
|
||||
auto m1 = fms.back().unfreeze(m.schema());
|
||||
BOOST_REQUIRE_EQUAL(m, m1);
|
||||
|
||||
fms.clear();
|
||||
|
||||
std::optional<bool> fragmented;
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!fragmented || *fragmented == frag);
|
||||
*fragmented = frag;
|
||||
fms.emplace_back(std::move(fm));
|
||||
// Fragment and freeze all
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, 1).get0();
|
||||
|
||||
auto&& rows = m.partition().non_dummy_rows();
|
||||
auto expected_fragments = std::distance(rows.begin(), rows.end())
|
||||
+ m.partition().row_tombstones().size()
|
||||
+ !m.partition().static_row().empty();
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
|
||||
BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
|
||||
|
||||
auto m2 = fms.back().unfreeze(m.schema());
|
||||
fms.pop_back();
|
||||
mutation_application_stats app_stats;
|
||||
while (!fms.empty()) {
|
||||
m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
|
||||
fms.pop_back();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(m, m2);
|
||||
});
|
||||
|
||||
auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
auto s = muts[0].schema();
|
||||
|
||||
std::vector<frozen_mutation> frozen;
|
||||
|
||||
// Freeze all
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
|
||||
for (auto j = 0u; j < muts.size(); j++) {
|
||||
BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
|
||||
std::vector<mutation> unfrozen;
|
||||
while (!frozen.empty()) {
|
||||
auto m = frozen.front().unfreeze(s);
|
||||
frozen.erase(frozen.begin());
|
||||
if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
|
||||
unfrozen.emplace_back(std::move(m));
|
||||
} else {
|
||||
unfrozen.back().apply(std::move(m));
|
||||
}
|
||||
|
||||
// Freeze first
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(frozen.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
|
||||
|
||||
// Fragment and freeze all
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, 1).get0();
|
||||
std::vector<mutation> unfrozen;
|
||||
while (!frozen.empty()) {
|
||||
auto m = frozen.front().unfreeze(s);
|
||||
frozen.erase(frozen.begin());
|
||||
if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
|
||||
unfrozen.emplace_back(std::move(m));
|
||||
} else {
|
||||
unfrozen.back().apply(std::move(m));
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(muts, unfrozen);
|
||||
}
|
||||
};
|
||||
BOOST_REQUIRE_EQUAL(muts, unfrozen);
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
});
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
|
||||
@@ -371,111 +366,109 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
|
||||
.is_equal_to(mut_orig);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multi_range_reader) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
SEASTAR_THREAD_TEST_CASE(test_multi_range_reader) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto keys = s.make_pkeys(10);
|
||||
auto ring = s.to_ring_positions(keys);
|
||||
auto keys = s.make_pkeys(10);
|
||||
auto ring = s.to_ring_positions(keys);
|
||||
|
||||
auto crs = boost::copy_range<std::vector<mutation_fragment>>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
auto crs = boost::copy_range<std::vector<mutation_fragment>>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
|
||||
auto ms = boost::copy_range<std::vector<mutation>>(keys | boost::adaptors::transformed([&] (auto& key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto& mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
auto ms = boost::copy_range<std::vector<mutation>>(keys | boost::adaptors::transformed([&] (auto& key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto& mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
|
||||
auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||
return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
|
||||
});
|
||||
|
||||
const auto empty_ranges = dht::partition_range_vector{};
|
||||
const auto single_ranges = dht::partition_range_vector{
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
};
|
||||
const auto multiple_ranges = dht::partition_range_vector {
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
dht::partition_range::make_singular(ring[4]),
|
||||
dht::partition_range::make(ring[6], ring[8]),
|
||||
};
|
||||
const auto empty_generator = [] { return std::optional<dht::partition_range>{}; };
|
||||
const auto single_generator = [r = std::optional<dht::partition_range>(single_ranges.front())] () mutable {
|
||||
return std::exchange(r, {});
|
||||
};
|
||||
const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional<dht::partition_range> {
|
||||
if (it == end) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return *(it++);
|
||||
};
|
||||
auto fft_range = dht::partition_range::make_starting_with(ring[9]);
|
||||
|
||||
// Generator ranges are single pass, so we need a new range each time they are used.
|
||||
auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
|
||||
testlog.info("empty ranges");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("single range");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read full partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces(ms[4])
|
||||
.produces(ms[6])
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read, skip partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces_partition_start(keys[1])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[2])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.next_partition()
|
||||
.produces(ms[4])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[6])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.produces_row_with_key(crs[1].as_clustering_row().key())
|
||||
.fast_forward_to(fft_range)
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[9])
|
||||
.next_partition()
|
||||
.produces_end_of_stream();
|
||||
};
|
||||
|
||||
testlog.info("vector version");
|
||||
run_test(
|
||||
[&] { return empty_ranges; },
|
||||
[&] { return single_ranges; },
|
||||
[&] { return multiple_ranges; });
|
||||
|
||||
testlog.info("generator version");
|
||||
run_test(
|
||||
[&] { return empty_generator; },
|
||||
[&] { return single_generator; },
|
||||
[&] { return multiple_generator; });
|
||||
auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||
return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
|
||||
});
|
||||
|
||||
const auto empty_ranges = dht::partition_range_vector{};
|
||||
const auto single_ranges = dht::partition_range_vector{
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
};
|
||||
const auto multiple_ranges = dht::partition_range_vector {
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
dht::partition_range::make_singular(ring[4]),
|
||||
dht::partition_range::make(ring[6], ring[8]),
|
||||
};
|
||||
const auto empty_generator = [] { return std::optional<dht::partition_range>{}; };
|
||||
const auto single_generator = [r = std::optional<dht::partition_range>(single_ranges.front())] () mutable {
|
||||
return std::exchange(r, {});
|
||||
};
|
||||
const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional<dht::partition_range> {
|
||||
if (it == end) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return *(it++);
|
||||
};
|
||||
auto fft_range = dht::partition_range::make_starting_with(ring[9]);
|
||||
|
||||
// Generator ranges are single pass, so we need a new range each time they are used.
|
||||
auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
|
||||
testlog.info("empty ranges");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("single range");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read full partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces(ms[4])
|
||||
.produces(ms[6])
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read, skip partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces_partition_start(keys[1])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[2])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.next_partition()
|
||||
.produces(ms[4])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[6])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.produces_row_with_key(crs[1].as_clustering_row().key())
|
||||
.fast_forward_to(fft_range)
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[9])
|
||||
.next_partition()
|
||||
.produces_end_of_stream();
|
||||
};
|
||||
|
||||
testlog.info("vector version");
|
||||
run_test(
|
||||
[&] { return empty_ranges; },
|
||||
[&] { return single_ranges; },
|
||||
[&] { return multiple_ranges; });
|
||||
|
||||
testlog.info("generator version");
|
||||
run_test(
|
||||
[&] { return empty_generator; },
|
||||
[&] { return single_generator; },
|
||||
[&] { return multiple_generator; });
|
||||
}
|
||||
|
||||
using reversed_partitions = seastar::bool_class<class reversed_partitions_tag>;
|
||||
@@ -648,95 +641,290 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_consume_flat) {
|
||||
return seastar::async([] {
|
||||
auto test_random_streams = [&] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
|
||||
}
|
||||
};
|
||||
SEASTAR_THREAD_TEST_CASE(test_consume_flat) {
|
||||
auto test_random_streams = [&] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
});
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_make_forwardable) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
SEASTAR_THREAD_TEST_CASE(test_make_forwardable) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto keys = s.make_pkeys(10);
|
||||
auto keys = s.make_pkeys(10);
|
||||
|
||||
auto crs = boost::copy_range < std::vector <
|
||||
mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
auto crs = boost::copy_range < std::vector <
|
||||
mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
|
||||
auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto &mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto &mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
|
||||
auto make_reader = [&] (auto& range) {
|
||||
return assert_that(
|
||||
make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
|
||||
};
|
||||
auto make_reader = [&] (auto& range) {
|
||||
return assert_that(
|
||||
make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
|
||||
};
|
||||
|
||||
auto test = [&] (auto& rd, auto& partition) {
|
||||
rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
|
||||
rd.produces_end_of_stream();
|
||||
rd.fast_forward_to(position_range::all_clustered_rows());
|
||||
for (auto &row : partition.partition().clustered_rows()) {
|
||||
rd.produces_row_with_key(row.key());
|
||||
}
|
||||
rd.produces_end_of_stream();
|
||||
auto test = [&] (auto& rd, auto& partition) {
|
||||
rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
|
||||
rd.produces_end_of_stream();
|
||||
rd.fast_forward_to(position_range::all_clustered_rows());
|
||||
for (auto &row : partition.partition().clustered_rows()) {
|
||||
rd.produces_row_with_key(row.key());
|
||||
}
|
||||
rd.produces_end_of_stream();
|
||||
rd.next_partition();
|
||||
};
|
||||
|
||||
auto rd = make_reader(query::full_partition_range);
|
||||
|
||||
for (auto& partition : ms) {
|
||||
test(rd, partition);
|
||||
}
|
||||
|
||||
auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
|
||||
|
||||
auto rd2 = make_reader(single_range);
|
||||
|
||||
rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
|
||||
rd2.produces_end_of_stream();
|
||||
rd2.fast_forward_to(position_range::all_clustered_rows());
|
||||
rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
|
||||
rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
|
||||
|
||||
auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
|
||||
|
||||
rd2.fast_forward_to(remaining_range);
|
||||
|
||||
for (auto i = size_t(1); i < ms.size(); ++i) {
|
||||
test(rd2, ms[i]);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_make_forwardable_next_partition) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
const auto permit = semaphore.make_permit();
|
||||
|
||||
auto make_reader = [&](std::vector<mutation> mutations, const dht::partition_range& pr) {
|
||||
auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(),
|
||||
permit,
|
||||
std::move(mutations),
|
||||
pr,
|
||||
streamed_mutation::forwarding::yes);
|
||||
return assert_that(std::move(result)).exact();
|
||||
};
|
||||
|
||||
const auto pk1 = s.make_pkey(1);
|
||||
auto m1 = mutation(s.schema(), pk1);
|
||||
s.add_static_row(m1, "test-static-1");
|
||||
|
||||
const auto pk2 = s.make_pkey(2);
|
||||
auto m2 = mutation(s.schema(), pk2);
|
||||
s.add_static_row(m2, "test-static-2");
|
||||
|
||||
dht::ring_position_comparator cmp{*s.schema()};
|
||||
BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less);
|
||||
|
||||
auto rd = make_reader({m1, m2}, query::full_partition_range);
|
||||
rd.fill_buffer().get();
|
||||
rd.next_partition();
|
||||
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||
rd.produces_static_row(
|
||||
{{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-1")}});
|
||||
rd.produces_end_of_stream();
|
||||
|
||||
rd.next_partition();
|
||||
rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
|
||||
rd.produces_static_row(
|
||||
{{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-2")}});
|
||||
rd.produces_end_of_stream();
|
||||
|
||||
rd.next_partition();
|
||||
rd.produces_end_of_stream();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
const auto permit = semaphore.make_permit();
|
||||
|
||||
auto make_reader = [&](std::vector<mutation> mutations,
|
||||
bool single_partition,
|
||||
const dht::partition_range& pr)
|
||||
{
|
||||
auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(),
|
||||
permit,
|
||||
std::move(mutations),
|
||||
pr,
|
||||
streamed_mutation::forwarding::yes);
|
||||
result = make_nonforwardable(std::move(result), single_partition);
|
||||
return assert_that(std::move(result)).exact();
|
||||
};
|
||||
|
||||
const auto pk1 = s.make_pkey(1);
|
||||
auto m1 = mutation(s.schema(), pk1);
|
||||
m1.apply(s.make_row(permit, s.make_ckey(11), "value1"));
|
||||
|
||||
const auto pk2 = s.make_pkey(2);
|
||||
auto m2 = mutation(s.schema(), pk2);
|
||||
m2.apply(s.make_row(permit, s.make_ckey(22), "value2"));
|
||||
|
||||
const auto pk3 = s.make_pkey(3);
|
||||
auto m3 = mutation(s.schema(), pk3);
|
||||
m3.apply(s.make_row(permit, s.make_ckey(33), "value3"));
|
||||
|
||||
dht::ring_position_comparator cmp{*s.schema()};
|
||||
BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less);
|
||||
BOOST_CHECK_EQUAL(cmp(m2.decorated_key(), m3.decorated_key()), std::strong_ordering::less);
|
||||
|
||||
// no input -> no output
|
||||
{
|
||||
auto rd = make_reader({}, false, query::full_partition_range);
|
||||
rd.produces_end_of_stream();
|
||||
}
|
||||
|
||||
// next_partition()
|
||||
{
|
||||
auto check = [&] (flat_reader_assertions_v2 rd) {
|
||||
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||
rd.next_partition();
|
||||
rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
|
||||
rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key());
|
||||
rd.produces_partition_end();
|
||||
rd.produces_end_of_stream();
|
||||
};
|
||||
|
||||
auto rd = make_reader(query::full_partition_range);
|
||||
// buffer is not empty
|
||||
check(make_reader({m1, m2}, false, query::full_partition_range));
|
||||
|
||||
for (auto& partition : ms) {
|
||||
test(rd, partition);
|
||||
// buffer is empty
|
||||
{
|
||||
auto rd = make_reader({m1, m2}, false, query::full_partition_range);
|
||||
rd.set_max_buffer_size(1);
|
||||
check(std::move(rd));
|
||||
}
|
||||
}
|
||||
|
||||
auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
|
||||
// fast_forward_to()
|
||||
{
|
||||
const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
|
||||
auto rd = make_reader({m1, m2}, false, m1_range);
|
||||
rd.set_max_buffer_size(1);
|
||||
|
||||
auto rd2 = make_reader(single_range);
|
||||
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||
|
||||
rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
|
||||
rd2.produces_end_of_stream();
|
||||
rd2.fast_forward_to(position_range::all_clustered_rows());
|
||||
rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
|
||||
rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
|
||||
const auto m2_range = dht::partition_range::make_singular(m2.decorated_key());
|
||||
rd.fast_forward_to(m2_range);
|
||||
rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
|
||||
rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key());
|
||||
rd.produces_partition_end();
|
||||
|
||||
auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
|
||||
rd.next_partition();
|
||||
rd.produces_end_of_stream();
|
||||
}
|
||||
|
||||
rd2.fast_forward_to(remaining_range);
|
||||
// single_partition
|
||||
{
|
||||
auto rd = make_reader({m1, m2}, true, query::full_partition_range);
|
||||
rd.set_max_buffer_size(1);
|
||||
|
||||
for (auto i = size_t(1); i < ms.size(); ++i) {
|
||||
test(rd2, ms[i]);
|
||||
}
|
||||
});
|
||||
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||
rd.produces_row_with_key(m1.partition().clustered_rows().begin()->key());
|
||||
|
||||
rd.next_partition();
|
||||
rd.produces_end_of_stream();
|
||||
|
||||
rd.next_partition();
|
||||
rd.produces_end_of_stream();
|
||||
}
|
||||
|
||||
// single_partition with fast_forward_to
|
||||
{
|
||||
const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
|
||||
auto rd = make_reader({m1, m2}, true, m1_range);
|
||||
rd.set_max_buffer_size(1);
|
||||
|
||||
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||
|
||||
const auto m2_range = dht::partition_range::make_singular(m2.decorated_key());
|
||||
rd.fast_forward_to(m2_range);
|
||||
rd.produces_end_of_stream();
|
||||
|
||||
rd.next_partition();
|
||||
rd.produces_end_of_stream();
|
||||
}
|
||||
|
||||
// static row
|
||||
{
|
||||
s.add_static_row(m1, "test-static");
|
||||
const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
|
||||
auto rd = make_reader({m1, m2}, false, m1_range);
|
||||
rd.set_max_buffer_size(1);
|
||||
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||
rd.produces_static_row(
|
||||
{{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static")}});
|
||||
rd.produces_row(
|
||||
m1.partition().clustered_rows().begin()->key(),
|
||||
{{s.schema()->get_column_definition(to_bytes("v")), to_bytes("value1")}}
|
||||
);
|
||||
rd.produces_partition_end();
|
||||
rd.produces_end_of_stream();
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
|
||||
auto close_rd = deferred_close(rd);
|
||||
rd().get();
|
||||
rd().get();
|
||||
// We rely on AddressSanitizer telling us if nothing was leaked.
|
||||
SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable_from_mutations_as_mutation_source) {
|
||||
auto populate = [] (schema_ptr, const std::vector<mutation> &muts) {
|
||||
return mutation_source([=] (
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding) mutable {
|
||||
auto squashed_muts = squash_mutations(muts);
|
||||
const auto single_partition = squashed_muts.size() == 1;
|
||||
auto reader = make_flat_mutation_reader_from_mutations_v2(schema,
|
||||
std::move(permit),
|
||||
std::move(squashed_muts),
|
||||
range,
|
||||
slice,
|
||||
streamed_mutation::forwarding::yes);
|
||||
reader = make_nonforwardable(std::move(reader), single_partition);
|
||||
if (fwd_sm) {
|
||||
reader = make_forwardable(std::move(reader));
|
||||
}
|
||||
return reader;
|
||||
});
|
||||
};
|
||||
run_mutation_source_tests(populate);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
|
||||
auto close_rd = deferred_close(rd);
|
||||
rd().get();
|
||||
rd().get();
|
||||
// We rely on AddressSanitizer telling us if nothing was leaked.
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -24,22 +24,30 @@
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) {
|
||||
simple_schema s;
|
||||
std::vector<reader_permit> permits;
|
||||
std::vector<reader_concurrency_semaphore::inactive_read_handle> handles;
|
||||
|
||||
{
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
auto clear_permits = defer([&permits] { permits.clear(); });
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout))));
|
||||
permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout));
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permits.back())));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
BOOST_REQUIRE(std::all_of(permits.begin(), permits.end(), [] (const reader_permit& permit) { return permit.get_state() == reader_permit::state::inactive; }));
|
||||
|
||||
semaphore.clear_inactive_reads();
|
||||
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return !bool(handle); }));
|
||||
BOOST_REQUIRE(std::all_of(permits.begin(), permits.end(), [] (const reader_permit& permit) { return permit.get_state() == reader_permit::state::evicted; }));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
|
||||
permits.clear();
|
||||
handles.clear();
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
@@ -1077,3 +1085,33 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024));
|
||||
permit3_fut.get();
|
||||
}
|
||||
|
||||
|
||||
// Check that `stop()` correctly evicts all inactive reads.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_reads) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout).get());
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, *permit));
|
||||
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(permit->get_state(), reader_permit::state::inactive);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
// Using BOOST_CHECK_* because an exception thrown here causes a segfault,
|
||||
// due to the stop future not being waited for.
|
||||
auto stop_f = semaphore.stop();
|
||||
BOOST_CHECK(!stop_f.available());
|
||||
BOOST_CHECK(eventually_true([&] { return !semaphore.get_stats().inactive_reads; }));
|
||||
BOOST_CHECK(!handle);
|
||||
BOOST_CHECK_EQUAL(permit->get_state(), reader_permit::state::evicted);
|
||||
|
||||
// Stop waits on all permits, so we need to destroy the permit before we can
|
||||
// wait on the stop future.
|
||||
permit = {};
|
||||
stop_f.get();
|
||||
}
|
||||
|
||||
@@ -4926,6 +4926,7 @@ SEASTAR_TEST_CASE(test_large_partition_splitting_on_compaction) {
|
||||
position_in_partition::tri_compare pos_tri_cmp(*s);
|
||||
|
||||
for (auto& sst : ret.new_sstables) {
|
||||
sst = env.reusable_sst(s, tmp.path().string(), sst->generation().value()).get0();
|
||||
BOOST_REQUIRE(sst->may_have_partition_tombstones());
|
||||
|
||||
auto reader = sstable_reader(sst, s, env.make_reader_permit());
|
||||
|
||||
@@ -205,6 +205,7 @@ def run_scylla_cmd(pid, dir):
|
||||
'--max-networking-io-control-blocks', '100',
|
||||
'--unsafe-bypass-fsync', '1',
|
||||
'--kernel-page-cache', '1',
|
||||
'--commitlog-use-o-dsync', '0',
|
||||
'--flush-schema-tables-after-modification', 'false',
|
||||
'--api-address', ip,
|
||||
'--rpc-address', ip,
|
||||
|
||||
@@ -7,8 +7,10 @@
|
||||
# Tests for batch operations
|
||||
#############################################################################
|
||||
from cassandra import InvalidRequest
|
||||
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from util import new_test_table
|
||||
from rest_api import scylla_inject_error
|
||||
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -50,3 +52,27 @@ def test_error_is_raised_for_batch_size_above_threshold(cql, table1):
|
||||
from scylla.yaml."""
|
||||
with pytest.raises(InvalidRequest, match="Batch too large"):
|
||||
cql.execute(generate_big_batch(table1, 1025))
|
||||
|
||||
# Test checks unexpected errors handling in CQL server.
|
||||
#
|
||||
# The original problem was that std::bad_alloc exception occurred while parsing a large batch request.
|
||||
# This exception was caught by try/catch in cql_server::connection::process_request_one and
|
||||
# an attempt was made to construct the error response message via make_error function.
|
||||
# This attempt failed since the error message contained entire query and exceeded the limit of 64K
|
||||
# in cql_server::response::write_string, causing "Value too large" exception to be thrown.
|
||||
# This new exception reached the general handler in cql_server::connection::process_request, where
|
||||
# it was just logged and no information about the problem was sent to the client.
|
||||
# As a result, the client received a timeout exception after a while and
|
||||
# no other information about the cause of the error.
|
||||
#
|
||||
# It is quite difficult to reproduce OOM in a test, so we use error injection instead.
|
||||
# Passing injection_key in the body of the request ensures that the exception will be
|
||||
# thrown only for this test request and will not affect other requests that
|
||||
# the driver may send in the background.
|
||||
@pytest.mark.asyncio
|
||||
async def test_batch_with_error(cql, table1):
|
||||
injection_key = 'query_processor-parse_statement-test_failure'
|
||||
with scylla_inject_error(cql, injection_key, one_shot=False):
|
||||
# exceptions::exception_code::SERVER_ERROR, it gets converted to NoHostAvailable by the driver
|
||||
with pytest.raises(NoHostAvailable, match="Value too large"):
|
||||
cql.execute(generate_big_batch(table1, 100) + injection_key)
|
||||
|
||||
@@ -1,3 +1 @@
|
||||
type: Approval
|
||||
flaky:
|
||||
- cdc_with_lwt_test
|
||||
|
||||
@@ -106,6 +106,7 @@ cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
|
||||
db_config->add_per_partition_rate_limit_extension();
|
||||
|
||||
db_config->flush_schema_tables_after_modification.set(false);
|
||||
db_config->commitlog_use_o_dsync(false);
|
||||
}
|
||||
|
||||
cql_test_config::cql_test_config(const cql_test_config&) = default;
|
||||
|
||||
@@ -85,7 +85,8 @@ public:
|
||||
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, unsigned long generation,
|
||||
sstable::version_types version, sstable::format_types f = sstable::format_types::big) {
|
||||
auto sst = make_sstable(std::move(schema), dir, generation, version, f);
|
||||
return sst->load().then([sst = std::move(sst)] {
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
return sst->load(default_priority_class(), cfg).then([sst = std::move(sst)] {
|
||||
return make_ready_future<shared_sstable>(std::move(sst));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -43,7 +43,8 @@ sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_
|
||||
}
|
||||
}
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst->open_data().get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
std::set<mutation, mutation_decorated_key_less_comparator> merged;
|
||||
for (auto&& m : muts) {
|
||||
|
||||
@@ -72,7 +72,11 @@ class HostRegistry:
|
||||
self.next_host_id += 1
|
||||
return Host(self.subnet.format(self.next_host_id))
|
||||
|
||||
self.pool = Pool[Host](254, create_host)
|
||||
async def destroy_host(h: Host) -> None:
|
||||
# Doesn't matter, we never return hosts to the pool as 'dirty'.
|
||||
pass
|
||||
|
||||
self.pool = Pool[Host](254, create_host, destroy_host)
|
||||
|
||||
async def cleanup() -> None:
|
||||
if self.lock_filename:
|
||||
@@ -85,5 +89,5 @@ class HostRegistry:
|
||||
return await self.pool.get()
|
||||
|
||||
async def release_host(self, host: Host) -> None:
|
||||
return await self.pool.put(host)
|
||||
return await self.pool.put(host, is_dirty=False)
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import asyncio
|
||||
from typing import Generic, Callable, Awaitable, TypeVar, AsyncContextManager, Final
|
||||
from typing import Generic, Callable, Awaitable, TypeVar, AsyncContextManager, Final, Optional
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
@@ -10,12 +10,15 @@ class Pool(Generic[T]):
|
||||
on demand, so that if you use less, you don't create anything upfront.
|
||||
If there is no object in the pool and all N objects are in use, you want
|
||||
to wait until one of the object is returned to the pool. Expects a
|
||||
builder async function to build a new object.
|
||||
builder async function to build a new object and a destruction async
|
||||
function to clean up after a 'dirty' object (see below).
|
||||
|
||||
Usage example:
|
||||
async def start_server():
|
||||
return Server()
|
||||
pool = Pool(4, start_server)
|
||||
async def destroy_server(server):
|
||||
await server.free_resources()
|
||||
pool = Pool(4, start_server, destroy_server)
|
||||
|
||||
server = await pool.get()
|
||||
try:
|
||||
@@ -24,25 +27,51 @@ class Pool(Generic[T]):
|
||||
await pool.put(server)
|
||||
|
||||
Alternatively:
|
||||
async with pool.instance() as server:
|
||||
async with pool.instance(dirty_on_exception=False) as server:
|
||||
await run_test(test, server)
|
||||
|
||||
|
||||
If the object is considered no longer usable by other users of the pool
|
||||
you can 'steal' it, which frees up space in the pool.
|
||||
you can pass `is_dirty=True` flag to `put`, which will cause the object
|
||||
to be 'destroyed' (by calling the provided `destroy` function on it) and
|
||||
will free up space in the pool.
|
||||
server = await.pool.get()
|
||||
dirty = True
|
||||
try:
|
||||
dirty = await run_test(test, server)
|
||||
finally:
|
||||
if dirty:
|
||||
await pool.steal()
|
||||
else:
|
||||
await pool.put(server)
|
||||
await pool.put(server, is_dirty=dirty)
|
||||
|
||||
Alternatively:
|
||||
async with (cm := pool.instance(dirty_on_exception=True)) as server:
|
||||
cm.dirty = await run_test(test, server)
|
||||
# It will also be considered dirty if run_test throws an exception
|
||||
|
||||
|
||||
To atomically return a dirty object and use the freed space to obtain
|
||||
another object, you can use `replace_dirty`. This is different from a
|
||||
`put(is_dirty=True)` call followed by a `get` call, where a concurrent
|
||||
waiter might take the space freed up by `put`.
|
||||
server = await.pool.get()
|
||||
dirty = False
|
||||
try:
|
||||
for _ in range(num_runs):
|
||||
if dirty:
|
||||
srv = server
|
||||
server = None
|
||||
server = await pool.replace_dirty(srv)
|
||||
dirty = await run_test(test, server)
|
||||
finally:
|
||||
if server:
|
||||
await pool.put(is_dirty=dirty)
|
||||
"""
|
||||
def __init__(self, max_size: int, build: Callable[..., Awaitable[T]]):
|
||||
def __init__(self, max_size: int,
|
||||
build: Callable[..., Awaitable[T]],
|
||||
destroy: Callable[[T], Awaitable[None]]):
|
||||
assert(max_size >= 0)
|
||||
self.max_size: Final[int] = max_size
|
||||
self.build: Final[Callable[..., Awaitable[T]]] = build
|
||||
self.destroy: Final[Callable[[T], Awaitable]] = destroy
|
||||
self.cond: Final[asyncio.Condition] = asyncio.Condition()
|
||||
self.pool: list[T] = []
|
||||
self.total: int = 0 # len(self.pool) + leased objects
|
||||
@@ -64,6 +93,68 @@ class Pool(Generic[T]):
|
||||
# No object in pool, but total < max_size so we can construct one
|
||||
self.total += 1
|
||||
|
||||
return await self._build_and_get(*args, **kwargs)
|
||||
|
||||
async def put(self, obj: T, is_dirty: bool):
|
||||
"""Return a previously borrowed object to the pool
|
||||
if it's not dirty, otherwise destroy the object
|
||||
and free up space in the pool.
|
||||
"""
|
||||
if is_dirty:
|
||||
await self.destroy(obj)
|
||||
|
||||
async with self.cond:
|
||||
if is_dirty:
|
||||
self.total -= 1
|
||||
else:
|
||||
self.pool.append(obj)
|
||||
self.cond.notify()
|
||||
|
||||
async def replace_dirty(self, obj: T, *args, **kwargs) -> T:
|
||||
"""Atomically `put` a previously borrowed dirty object and `get` another one.
|
||||
The 'atomicity' guarantees that the space freed up by the returned object
|
||||
is used to return another object to the caller. The caller doesn't need
|
||||
to wait for space to be freed by another user of the pool.
|
||||
|
||||
Note: the returned object might have been constructed earlier or it might
|
||||
be built right now, as in `get`.
|
||||
*args and **kwargs are used as in `get`.
|
||||
"""
|
||||
await self.destroy(obj)
|
||||
|
||||
async with self.cond:
|
||||
if self.pool:
|
||||
self.total -= 1
|
||||
return self.pool.pop()
|
||||
|
||||
# Need to construct a new object.
|
||||
# The space for this object is already accounted for in self.total.
|
||||
|
||||
return await self._build_and_get(*args, **kwargs)
|
||||
|
||||
def instance(self, dirty_on_exception: bool, *args, **kwargs) -> AsyncContextManager[T]:
|
||||
class Instance:
|
||||
def __init__(self, pool: Pool[T], dirty_on_exception: bool):
|
||||
self.pool = pool
|
||||
self.dirty = False
|
||||
self.dirty_on_exception = dirty_on_exception
|
||||
|
||||
async def __aenter__(self):
|
||||
self.obj = await self.pool.get(*args, **kwargs)
|
||||
return self.obj
|
||||
|
||||
async def __aexit__(self, exc_type, exc, obj):
|
||||
if self.obj:
|
||||
self.dirty |= self.dirty_on_exception and exc is not None
|
||||
await self.pool.put(self.obj, is_dirty=self.dirty)
|
||||
self.obj = None
|
||||
|
||||
return Instance(self, dirty_on_exception)
|
||||
|
||||
async def _build_and_get(self, *args, **kwargs) -> T:
|
||||
"""Precondition: we allocated space for this object
|
||||
(it's included in self.total).
|
||||
"""
|
||||
try:
|
||||
obj = await self.build(*args, **kwargs)
|
||||
except:
|
||||
@@ -72,33 +163,3 @@ class Pool(Generic[T]):
|
||||
self.cond.notify()
|
||||
raise
|
||||
return obj
|
||||
|
||||
async def steal(self) -> None:
|
||||
"""Take ownership of a previously borrowed object.
|
||||
Frees up space in the pool.
|
||||
"""
|
||||
async with self.cond:
|
||||
self.total -= 1
|
||||
self.cond.notify()
|
||||
|
||||
async def put(self, obj: T):
|
||||
"""Return a previously borrowed object to the pool."""
|
||||
async with self.cond:
|
||||
self.pool.append(obj)
|
||||
self.cond.notify()
|
||||
|
||||
def instance(self, *args, **kwargs) -> AsyncContextManager[T]:
|
||||
class Instance:
|
||||
def __init__(self, pool):
|
||||
self.pool = pool
|
||||
|
||||
async def __aenter__(self):
|
||||
self.obj = await self.pool.get(*args, **kwargs)
|
||||
return self.obj
|
||||
|
||||
async def __aexit__(self, exc_type, exc, obj):
|
||||
if self.obj:
|
||||
await self.pool.put(self.obj)
|
||||
self.obj = None
|
||||
|
||||
return Instance(self)
|
||||
|
||||
@@ -21,14 +21,17 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HTTPError(Exception):
|
||||
def __init__(self, uri, code, message):
|
||||
def __init__(self, uri, code, params, json, message):
|
||||
super().__init__(message)
|
||||
self.uri = uri
|
||||
self.code = code
|
||||
self.params = params
|
||||
self.json = json
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
return f"HTTP error {self.code}: {self.message}, uri {self.uri}"
|
||||
return f"HTTP error {self.code}, uri: {self.uri}, " \
|
||||
f"params: {self.params}, json: {self.json}, body:\n{self.message}"
|
||||
|
||||
|
||||
# TODO: support ssl and verify_ssl
|
||||
@@ -63,7 +66,7 @@ class RESTClient(metaclass=ABCMeta):
|
||||
params = params, json = json, timeout = client_timeout) as resp:
|
||||
if resp.status != 200:
|
||||
text = await resp.text()
|
||||
raise HTTPError(uri, resp.status, f"{text}, params {params}, json {json}")
|
||||
raise HTTPError(uri, resp.status, params, json, text)
|
||||
if response_type is not None:
|
||||
# Return response.text() or response.json()
|
||||
return await getattr(resp, response_type)()
|
||||
|
||||
@@ -17,8 +17,10 @@ import pathlib
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
from typing import Optional, Dict, List, Set, Tuple, Callable, AsyncIterator, NamedTuple, Union
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from io import BufferedWriter
|
||||
from test.pylib.host_registry import Host, HostRegistry
|
||||
from test.pylib.pool import Pool
|
||||
@@ -111,6 +113,7 @@ SCYLLA_CMDLINE_OPTIONS = [
|
||||
'--max-networking-io-control-blocks', '100',
|
||||
'--unsafe-bypass-fsync', '1',
|
||||
'--kernel-page-cache', '1',
|
||||
'--commitlog-use-o-dsync', '0',
|
||||
'--abort-on-lsa-bad-alloc', '1',
|
||||
'--abort-on-seastar-bad-alloc',
|
||||
'--abort-on-internal-error', '1',
|
||||
@@ -173,6 +176,11 @@ def merge_cmdline_options(base: List[str], override: List[str]) -> List[str]:
|
||||
|
||||
return run()
|
||||
|
||||
class CqlUpState(Enum):
|
||||
NOT_CONNECTED = 1,
|
||||
CONNECTED = 2,
|
||||
QUERIED = 3
|
||||
|
||||
class ScyllaServer:
|
||||
"""Starts and handles a single Scylla server, managing logs, checking if responsive,
|
||||
and cleanup when finished."""
|
||||
@@ -295,7 +303,7 @@ class ScyllaServer:
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
return f"Exception when reading server log {self.log_filename}: {exc}"
|
||||
|
||||
async def cql_is_up(self) -> bool:
|
||||
async def cql_is_up(self) -> CqlUpState:
|
||||
"""Test that CQL is serving (a check we use at start up)."""
|
||||
caslog = logging.getLogger('cassandra')
|
||||
oldlevel = caslog.getEffectiveLevel()
|
||||
@@ -310,6 +318,7 @@ class ScyllaServer:
|
||||
# work, so rely on this "side effect".
|
||||
profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([self.ip_addr]),
|
||||
request_timeout=self.START_TIMEOUT)
|
||||
connected = False
|
||||
try:
|
||||
# In a cluster setup, it's possible that the CQL
|
||||
# here is directed to a node different from the initial contact
|
||||
@@ -321,16 +330,19 @@ class ScyllaServer:
|
||||
protocol_version=4,
|
||||
auth_provider=auth) as cluster:
|
||||
with cluster.connect() as session:
|
||||
session.execute("SELECT * FROM system.local")
|
||||
connected = True
|
||||
# See the comment above about `auth::standard_role_manager`. We execute
|
||||
# a 'real' query to ensure that the auth service has finished initializing.
|
||||
session.execute("SELECT key FROM system.local where key = 'local'")
|
||||
self.control_cluster = Cluster(execution_profiles=
|
||||
{EXEC_PROFILE_DEFAULT: profile},
|
||||
contact_points=[self.ip_addr],
|
||||
auth_provider=auth)
|
||||
self.control_connection = self.control_cluster.connect()
|
||||
return True
|
||||
return CqlUpState.QUERIED
|
||||
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
|
||||
self.logger.debug("Exception when checking if CQL is up: %s", exc)
|
||||
return False
|
||||
return CqlUpState.CONNECTED if connected else CqlUpState.NOT_CONNECTED
|
||||
finally:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
@@ -363,6 +375,7 @@ class ScyllaServer:
|
||||
|
||||
self.start_time = time.time()
|
||||
sleep_interval = 0.1
|
||||
cql_up_state = CqlUpState.NOT_CONNECTED
|
||||
|
||||
while time.time() < self.start_time + self.START_TIMEOUT:
|
||||
if self.cmd.returncode:
|
||||
@@ -377,20 +390,30 @@ class ScyllaServer:
|
||||
logpath = log_handler.baseFilename # type: ignore
|
||||
else:
|
||||
logpath = "?"
|
||||
raise RuntimeError(f"Failed to start server at host {self.ip_addr}.\n"
|
||||
raise RuntimeError(f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}.\n"
|
||||
"Check the log files:\n"
|
||||
f"{logpath}\n"
|
||||
f"{self.log_filename}")
|
||||
|
||||
if hasattr(self, "host_id") or await self.get_host_id(api):
|
||||
if await self.cql_is_up():
|
||||
cql_up_state = await self.cql_is_up()
|
||||
if cql_up_state == CqlUpState.QUERIED:
|
||||
return
|
||||
|
||||
# Sleep and retry
|
||||
await asyncio.sleep(sleep_interval)
|
||||
|
||||
raise RuntimeError(f"failed to start server {self.ip_addr}, "
|
||||
f"check server log at {self.log_filename}")
|
||||
err = f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}."
|
||||
if hasattr(self, "host_id"):
|
||||
err += f" Managed to obtain the server's Host ID ({self.host_id})"
|
||||
if cql_up_state == CqlUpState.CONNECTED:
|
||||
err += " and to connect the CQL driver, but failed to execute a query."
|
||||
else:
|
||||
err += " but failed to connect the CQL driver."
|
||||
else:
|
||||
err += " Failed to obtain the server's Host ID."
|
||||
err += f"\nCheck server log at {self.log_filename}."
|
||||
raise RuntimeError(err)
|
||||
|
||||
async def force_schema_migration(self) -> None:
|
||||
"""This is a hack to change schema hash on an existing cluster node
|
||||
@@ -705,6 +728,8 @@ class ScyllaCluster:
|
||||
to any specific test, throwing it here would stop a specific
|
||||
test."""
|
||||
if self.start_exception:
|
||||
# Mark as dirty so further test cases don't try to reuse this cluster.
|
||||
self.is_dirty = True
|
||||
raise self.start_exception
|
||||
|
||||
for server in self.running.values():
|
||||
@@ -729,11 +754,14 @@ class ScyllaCluster:
|
||||
if server_id not in self.running:
|
||||
return ScyllaCluster.ActionReturn(success=False, msg=f"Server {server_id} unknown")
|
||||
self.is_dirty = True
|
||||
server = self.running.pop(server_id)
|
||||
server = self.running[server_id]
|
||||
# Remove the server from `running` only after we successfully stop it.
|
||||
# Stopping may fail and if we removed it from `running` now it might leak.
|
||||
if gracefully:
|
||||
await server.stop_gracefully()
|
||||
else:
|
||||
await server.stop()
|
||||
self.running.pop(server_id)
|
||||
self.stopped[server_id] = server
|
||||
return ScyllaCluster.ActionReturn(success=True, msg=f"{server} stopped")
|
||||
|
||||
@@ -753,8 +781,10 @@ class ScyllaCluster:
|
||||
self.is_dirty = True
|
||||
server = self.stopped.pop(server_id)
|
||||
server.seeds = self._seeds()
|
||||
await server.start(self.api)
|
||||
# Put the server in `running` before starting it.
|
||||
# Starting may fail and if we didn't add it now it might leak.
|
||||
self.running[server_id] = server
|
||||
await server.start(self.api)
|
||||
return ScyllaCluster.ActionReturn(success=True, msg=f"{server} started")
|
||||
|
||||
async def server_restart(self, server_id: ServerNum) -> ActionReturn:
|
||||
@@ -817,7 +847,9 @@ class ScyllaClusterManager:
|
||||
self.is_after_test_ok: bool = False
|
||||
# API
|
||||
# NOTE: need to make a safe temp dir as tempfile can't make a safe temp sock name
|
||||
self.manager_dir: str = tempfile.mkdtemp(prefix="manager-", dir=base_dir)
|
||||
# Put the socket in /tmp, not base_dir, to avoid going over the length
|
||||
# limit of UNIX-domain socket addresses (issue #12622).
|
||||
self.manager_dir: str = tempfile.mkdtemp(prefix="manager-", dir="/tmp")
|
||||
self.sock_path: str = f"{self.manager_dir}/api"
|
||||
app = aiohttp.web.Application()
|
||||
self._setup_routes(app)
|
||||
@@ -828,7 +860,8 @@ class ScyllaClusterManager:
|
||||
if self.is_running:
|
||||
self.logger.warning("ScyllaClusterManager already running")
|
||||
return
|
||||
await self._get_cluster()
|
||||
self.cluster = await self.clusters.get(self.logger)
|
||||
self.logger.info("First Scylla cluster: %s", self.cluster)
|
||||
self.cluster.setLogger(self.logger)
|
||||
await self.runner.setup()
|
||||
self.site = aiohttp.web.UnixSite(self.runner, path=self.sock_path)
|
||||
@@ -839,12 +872,10 @@ class ScyllaClusterManager:
|
||||
self.current_test_case_full_name = f'{self.test_uname}::{test_case_name}'
|
||||
self.logger.info("Setting up %s", self.current_test_case_full_name)
|
||||
if self.cluster.is_dirty:
|
||||
self.logger.info(f"Current cluster %s is dirty after last test, stopping...", self.cluster.name)
|
||||
await self.clusters.steal()
|
||||
await self.cluster.stop()
|
||||
await self.cluster.release_ips()
|
||||
self.logger.info(f"Waiting for new cluster for test %s...", self.current_test_case_full_name)
|
||||
await self._get_cluster()
|
||||
self.logger.info(f"Current cluster %s is dirty after test %s, replacing with a new one...",
|
||||
self.cluster.name, self.current_test_case_full_name)
|
||||
self.cluster = await self.clusters.replace_dirty(self.cluster, self.logger)
|
||||
self.logger.info("Got new Scylla cluster: %s", self.cluster.name)
|
||||
self.cluster.setLogger(self.logger)
|
||||
self.logger.info("Leasing Scylla cluster %s for test %s", self.cluster, self.current_test_case_full_name)
|
||||
self.cluster.before_test(self.current_test_case_full_name)
|
||||
@@ -860,44 +891,56 @@ class ScyllaClusterManager:
|
||||
del self.site
|
||||
if not self.cluster.is_dirty:
|
||||
self.logger.info("Returning Scylla cluster %s for test %s", self.cluster, self.test_uname)
|
||||
await self.clusters.put(self.cluster)
|
||||
await self.clusters.put(self.cluster, is_dirty=False)
|
||||
else:
|
||||
self.logger.info("ScyllaManager: Scylla cluster %s is dirty after %s, stopping it",
|
||||
self.cluster, self.test_uname)
|
||||
await self.clusters.steal()
|
||||
await self.cluster.stop()
|
||||
await self.clusters.put(self.cluster, is_dirty=True)
|
||||
del self.cluster
|
||||
if os.path.exists(self.manager_dir):
|
||||
shutil.rmtree(self.manager_dir)
|
||||
self.is_running = False
|
||||
|
||||
async def _get_cluster(self) -> None:
|
||||
self.cluster = await self.clusters.get(self.logger)
|
||||
self.logger.info("Got new Scylla cluster %s", self.cluster)
|
||||
|
||||
|
||||
def _setup_routes(self, app: aiohttp.web.Application) -> None:
|
||||
app.router.add_get('/up', self._manager_up)
|
||||
app.router.add_get('/cluster/up', self._cluster_up)
|
||||
app.router.add_get('/cluster/is-dirty', self._is_dirty)
|
||||
app.router.add_get('/cluster/replicas', self._cluster_replicas)
|
||||
app.router.add_get('/cluster/running-servers', self._cluster_running_servers)
|
||||
app.router.add_get('/cluster/host-ip/{server_id}', self._cluster_server_ip_addr)
|
||||
app.router.add_get('/cluster/host-id/{server_id}', self._cluster_host_id)
|
||||
app.router.add_get('/cluster/before-test/{test_case_name}', self._before_test_req)
|
||||
app.router.add_get('/cluster/after-test', self._after_test)
|
||||
app.router.add_get('/cluster/mark-dirty', self._mark_dirty)
|
||||
app.router.add_get('/cluster/server/{server_id}/stop', self._cluster_server_stop)
|
||||
app.router.add_get('/cluster/server/{server_id}/stop_gracefully',
|
||||
self._cluster_server_stop_gracefully)
|
||||
app.router.add_get('/cluster/server/{server_id}/start', self._cluster_server_start)
|
||||
app.router.add_get('/cluster/server/{server_id}/restart', self._cluster_server_restart)
|
||||
app.router.add_put('/cluster/addserver', self._cluster_server_add)
|
||||
app.router.add_put('/cluster/remove-node/{initiator}', self._cluster_remove_node)
|
||||
app.router.add_get('/cluster/decommission-node/{server_id}',
|
||||
self._cluster_decommission_node)
|
||||
app.router.add_get('/cluster/server/{server_id}/get_config', self._server_get_config)
|
||||
app.router.add_put('/cluster/server/{server_id}/update_config', self._server_update_config)
|
||||
def make_catching_handler(handler: Callable) -> Callable:
|
||||
async def catching_handler(request) -> aiohttp.web.Response:
|
||||
"""Catch all exceptions and return them to the client.
|
||||
Without this, the client would get an 'Internal server error' message
|
||||
without any details. Thanks to this the test log shows the actual error.
|
||||
"""
|
||||
try:
|
||||
return await handler(request)
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
self.logger.error(f'Exception when executing {handler.__name__}: {e}\n{tb}')
|
||||
return aiohttp.web.Response(status=500, text=str(e))
|
||||
return catching_handler
|
||||
|
||||
def add_get(route: str, handler: Callable):
|
||||
app.router.add_get(route, make_catching_handler(handler))
|
||||
|
||||
def add_put(route: str, handler: Callable):
|
||||
app.router.add_put(route, make_catching_handler(handler))
|
||||
|
||||
add_get('/up', self._manager_up)
|
||||
add_get('/cluster/up', self._cluster_up)
|
||||
add_get('/cluster/is-dirty', self._is_dirty)
|
||||
add_get('/cluster/replicas', self._cluster_replicas)
|
||||
add_get('/cluster/running-servers', self._cluster_running_servers)
|
||||
add_get('/cluster/host-ip/{server_id}', self._cluster_server_ip_addr)
|
||||
add_get('/cluster/host-id/{server_id}', self._cluster_host_id)
|
||||
add_get('/cluster/before-test/{test_case_name}', self._before_test_req)
|
||||
add_get('/cluster/after-test', self._after_test)
|
||||
add_get('/cluster/mark-dirty', self._mark_dirty)
|
||||
add_get('/cluster/server/{server_id}/stop', self._cluster_server_stop)
|
||||
add_get('/cluster/server/{server_id}/stop_gracefully', self._cluster_server_stop_gracefully)
|
||||
add_get('/cluster/server/{server_id}/start', self._cluster_server_start)
|
||||
add_get('/cluster/server/{server_id}/restart', self._cluster_server_restart)
|
||||
add_put('/cluster/addserver', self._cluster_server_add)
|
||||
add_put('/cluster/remove-node/{initiator}', self._cluster_remove_node)
|
||||
add_get('/cluster/decommission-node/{server_id}', self._cluster_decommission_node)
|
||||
add_get('/cluster/server/{server_id}/get_config', self._server_get_config)
|
||||
add_put('/cluster/server/{server_id}/update_config', self._server_update_config)
|
||||
|
||||
async def _manager_up(self, _request) -> aiohttp.web.Response:
|
||||
return aiohttp.web.Response(text=f"{self.is_running}")
|
||||
|
||||
Submodule tools/java updated: 1c4e1e7a7d...83b2168b19
@@ -143,7 +143,8 @@ private:
|
||||
throw std::bad_function_call();
|
||||
}
|
||||
virtual const std::vector<view_ptr>& get_table_views(data_dictionary::table t) const override {
|
||||
return {};
|
||||
static const std::vector<view_ptr> empty;
|
||||
return empty;
|
||||
}
|
||||
virtual sstring get_available_index_name(data_dictionary::database db, std::string_view ks_name, std::string_view table_name,
|
||||
std::optional<sstring> index_name_root) const override {
|
||||
|
||||
@@ -667,14 +667,23 @@ future<> cql_server::connection::process_request() {
|
||||
_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
|
||||
process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
|
||||
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
try {
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
try {
|
||||
if (response_f.failed()) {
|
||||
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
|
||||
clogger.error("{}: {}", _client_state.get_remote_address(), message);
|
||||
write_response(make_error(stream, exceptions::exception_code::SERVER_ERROR,
|
||||
message,
|
||||
tracing::trace_state_ptr()));
|
||||
} else {
|
||||
write_response(response_f.get0(), std::move(mem_permit), _compression);
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
|
||||
} catch (...) {
|
||||
clogger.error("request processing failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
|
||||
} catch (...) {
|
||||
clogger.error("{}: request processing failed: {}",
|
||||
_client_state.get_remote_address(), std::current_exception());
|
||||
}
|
||||
});
|
||||
|
||||
if (should_paralelize) {
|
||||
return make_ready_future<>();
|
||||
|
||||
6
types.cc
6
types.cc
@@ -735,6 +735,7 @@ bool abstract_type::is_collection() const {
|
||||
bool abstract_type::is_tuple() const {
|
||||
struct visitor {
|
||||
bool operator()(const abstract_type&) { return false; }
|
||||
bool operator()(const reversed_type_impl& t) { return t.underlying_type()->is_tuple(); }
|
||||
bool operator()(const tuple_type_impl&) { return true; }
|
||||
};
|
||||
return visit(*this, visitor{});
|
||||
@@ -1956,6 +1957,10 @@ data_value deserialize_aux(const tuple_type_impl& t, View v) {
|
||||
|
||||
template<FragmentedView View>
|
||||
utils::multiprecision_int deserialize_value(const varint_type_impl&, View v) {
|
||||
if (v.empty()) {
|
||||
throw marshal_exception("cannot deserialize multiprecision int - empty buffer");
|
||||
}
|
||||
skip_empty_fragments(v);
|
||||
bool negative = v.current_fragment().front() < 0;
|
||||
utils::multiprecision_int num;
|
||||
while (v.size_bytes()) {
|
||||
@@ -2052,6 +2057,7 @@ bool deserialize_value(const boolean_type_impl&, View v) {
|
||||
if (v.size_bytes() != 1) {
|
||||
throw marshal_exception(format("cannot deserialize boolean, size mismatch ({:d})", v.size_bytes()));
|
||||
}
|
||||
skip_empty_fragments(v);
|
||||
return v.current_fragment().front() != 0;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user