Compare commits
32 Commits
copilot/co
...
scylla-5.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0295d0c5c8 | ||
|
|
fa94222662 | ||
|
|
dff7f3c5ba | ||
|
|
3723713130 | ||
|
|
03f8411e38 | ||
|
|
0e391d67d1 | ||
|
|
f76989285e | ||
|
|
9deeeb4db1 | ||
|
|
1f3196735f | ||
|
|
abb6817261 | ||
|
|
d3fd090429 | ||
|
|
3e7c57d162 | ||
|
|
f878a34da3 | ||
|
|
eaded57b2e | ||
|
|
25d2da08d1 | ||
|
|
9b1a570f6f | ||
|
|
426d045249 | ||
|
|
86dbbf12cc | ||
|
|
b05903eddd | ||
|
|
26ead53304 | ||
|
|
f60bab9471 | ||
|
|
66f34245fc | ||
|
|
4047528bd9 | ||
|
|
1a82c61452 | ||
|
|
3d9800eb1c | ||
|
|
c48e9b47dd | ||
|
|
2eadaad9f7 | ||
|
|
d10aee15e7 | ||
|
|
9e017cb1e6 | ||
|
|
b8504cc9b2 | ||
|
|
856703a85e | ||
|
|
86a6c1fb2b |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.1.0-dev
|
||||
VERSION=5.1.0-rc4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -438,6 +438,11 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
rjson::add(table_description, "BillingModeSummary", rjson::empty_object());
|
||||
rjson::add(table_description["BillingModeSummary"], "BillingMode", "PAY_PER_REQUEST");
|
||||
rjson::add(table_description["BillingModeSummary"], "LastUpdateToPayPerRequestDateTime", rjson::value(creation_date_seconds));
|
||||
// In PAY_PER_REQUEST billing mode, provisioned capacity should return 0
|
||||
rjson::add(table_description, "ProvisionedThroughput", rjson::empty_object());
|
||||
rjson::add(table_description["ProvisionedThroughput"], "ReadCapacityUnits", 0);
|
||||
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", 0);
|
||||
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
|
||||
|
||||
std::unordered_map<std::string,std::string> key_attribute_types;
|
||||
// Add base table's KeySchema and collect types for AttributeDefinitions:
|
||||
|
||||
@@ -842,6 +842,20 @@ future<> compaction_manager::really_do_stop() {
|
||||
cmlog.info("Stopped");
|
||||
}
|
||||
|
||||
template <typename Ex>
|
||||
requires std::is_base_of_v<std::exception, Ex> &&
|
||||
requires (const Ex& ex) {
|
||||
{ ex.code() } noexcept -> std::same_as<const std::error_code&>;
|
||||
}
|
||||
auto swallow_enospc(const Ex& ex) noexcept {
|
||||
if (ex.code().value() != ENOSPC) {
|
||||
return make_exception_future<>(std::make_exception_ptr(ex));
|
||||
}
|
||||
|
||||
cmlog.warn("Got ENOSPC on stop, ignoring...");
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void compaction_manager::do_stop() noexcept {
|
||||
if (_state == state::none || _state == state::stopped) {
|
||||
return;
|
||||
@@ -849,7 +863,10 @@ void compaction_manager::do_stop() noexcept {
|
||||
|
||||
try {
|
||||
_state = state::stopped;
|
||||
_stop_future = really_do_stop();
|
||||
_stop_future = really_do_stop()
|
||||
.handle_exception_type([] (const std::system_error& ex) { return swallow_enospc(ex); })
|
||||
.handle_exception_type([] (const storage_io_error& ex) { return swallow_enospc(ex); })
|
||||
;
|
||||
} catch (...) {
|
||||
cmlog.error("Failed to stop the manager: {}", std::current_exception());
|
||||
}
|
||||
@@ -1050,7 +1067,7 @@ public:
|
||||
bool performed() const noexcept {
|
||||
return _performed;
|
||||
}
|
||||
|
||||
private:
|
||||
future<> run_offstrategy_compaction(sstables::compaction_data& cdata) {
|
||||
// This procedure will reshape sstables in maintenance set until it's ready for
|
||||
// integration into main set.
|
||||
@@ -1083,6 +1100,7 @@ public:
|
||||
return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt;
|
||||
};
|
||||
|
||||
std::exception_ptr err;
|
||||
while (auto desc = get_next_job()) {
|
||||
desc->creator = [this, &new_unused_sstables, &t] (shard_id dummy) {
|
||||
auto sst = t.make_sstable();
|
||||
@@ -1091,7 +1109,16 @@ public:
|
||||
};
|
||||
auto input = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc->sstables);
|
||||
|
||||
auto ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t);
|
||||
sstables::compaction_result ret;
|
||||
try {
|
||||
ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t);
|
||||
} catch (sstables::compaction_stopped_exception&) {
|
||||
// If off-strategy compaction stopped on user request, let's not discard the partial work.
|
||||
// Therefore, both un-reshaped and reshaped data will be integrated into main set, allowing
|
||||
// regular compaction to continue from where off-strategy left off.
|
||||
err = std::current_exception();
|
||||
break;
|
||||
}
|
||||
_performed = true;
|
||||
|
||||
// update list of reshape candidates without input but with output added to it
|
||||
@@ -1128,6 +1155,9 @@ public:
|
||||
for (auto& sst : sstables_to_remove) {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
if (err) {
|
||||
co_await coroutine::return_exception_ptr(std::move(err));
|
||||
}
|
||||
}
|
||||
protected:
|
||||
virtual future<compaction_stats_opt> do_run() override {
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include "tombstone_gc.hh"
|
||||
#include "db/per_partition_rate_limit_extension.hh"
|
||||
#include "db/per_partition_rate_limit_options.hh"
|
||||
#include "utils/bloom_calculations.hh"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
@@ -152,6 +153,16 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
|
||||
throw exceptions::configuration_exception(KW_MAX_INDEX_INTERVAL + " must be greater than " + KW_MIN_INDEX_INTERVAL);
|
||||
}
|
||||
|
||||
if (get_simple(KW_BF_FP_CHANCE)) {
|
||||
double bloom_filter_fp_chance = get_double(KW_BF_FP_CHANCE, 0/*not used*/);
|
||||
double min_bloom_filter_fp_chance = utils::bloom_calculations::min_supported_bloom_filter_fp_chance();
|
||||
if (bloom_filter_fp_chance <= min_bloom_filter_fp_chance || bloom_filter_fp_chance > 1.0) {
|
||||
throw exceptions::configuration_exception(format(
|
||||
"{} must be larger than {} and less than or equal to 1.0 (got {})",
|
||||
KW_BF_FP_CHANCE, min_bloom_filter_fp_chance, bloom_filter_fp_chance));
|
||||
}
|
||||
}
|
||||
|
||||
speculative_retry::from_sstring(get_string(KW_SPECULATIVE_RETRY, speculative_retry(speculative_retry::type::NONE, 0).to_sstring()));
|
||||
}
|
||||
|
||||
|
||||
@@ -2031,7 +2031,7 @@ future<> db::commitlog::segment_manager::shutdown() {
|
||||
}
|
||||
}
|
||||
co_await _shutdown_promise->get_shared_future();
|
||||
clogger.info("Commitlog shutdown complete");
|
||||
clogger.debug("Commitlog shutdown complete");
|
||||
}
|
||||
|
||||
void db::commitlog::segment_manager::add_file_to_dispose(named_file f, dispose_mode mode) {
|
||||
|
||||
@@ -899,6 +899,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Ignore truncation record stored in system tables as if tables were never truncated.")
|
||||
, force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false,
|
||||
"Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.")
|
||||
, cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, true,
|
||||
"Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.")
|
||||
, default_log_level(this, "default_log_level", value_status::Used)
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used)
|
||||
, log_to_stdout(this, "log_to_stdout", value_status::Used)
|
||||
|
||||
@@ -379,6 +379,8 @@ public:
|
||||
named_value<bool> ignore_truncation_record;
|
||||
named_value<bool> force_schema_commit_log;
|
||||
|
||||
named_value<bool> cache_index_pages;
|
||||
|
||||
seastar::logging_settings logging_settings(const log_cli::options&) const;
|
||||
|
||||
const db::extensions& extensions() const;
|
||||
|
||||
@@ -868,13 +868,18 @@ void view_updates::generate_update(
|
||||
bool same_row = true;
|
||||
for (auto col_id : col_ids) {
|
||||
auto* after = update.cells().find_cell(col_id);
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto& cdef = _base->regular_column_at(col_id);
|
||||
if (existing) {
|
||||
auto* before = existing->cells().find_cell(col_id);
|
||||
// Note that this cell is necessarily atomic, because col_ids are
|
||||
// view key columns, and keys must be atomic.
|
||||
if (before && before->as_atomic_cell(cdef).is_live()) {
|
||||
if (after && after->as_atomic_cell(cdef).is_live()) {
|
||||
auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(cdef), after->as_atomic_cell(cdef));
|
||||
// We need to compare just the values of the keys, not
|
||||
// metadata like the timestamp. This is because below,
|
||||
// if the old and new view row have the same key, we need
|
||||
// to be sure to reach the update_entry() case.
|
||||
auto cmp = compare_unsigned(before->as_atomic_cell(cdef).value(), after->as_atomic_cell(cdef).value());
|
||||
if (cmp != 0) {
|
||||
same_row = false;
|
||||
}
|
||||
@@ -894,7 +899,13 @@ void view_updates::generate_update(
|
||||
if (same_row) {
|
||||
update_entry(base_key, update, *existing, now);
|
||||
} else {
|
||||
replace_entry(base_key, update, *existing, now);
|
||||
// This code doesn't work if the old and new view row have the
|
||||
// same key, because if they do we get both data and tombstone
|
||||
// for the same timestamp (now) and the tombstone wins. This
|
||||
// is why we need the "same_row" case above - it's not just a
|
||||
// performance optimization.
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
create_entry(base_key, update, now);
|
||||
}
|
||||
} else {
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
|
||||
@@ -154,10 +154,7 @@ private:
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
|
||||
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
create_entry(base_key, update, now);
|
||||
delete_old_entry(base_key, existing, update, now);
|
||||
}
|
||||
void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
|
||||
};
|
||||
|
||||
class view_update_builder {
|
||||
|
||||
@@ -83,7 +83,7 @@ overloaded_exception::overloaded_exception(size_t c) noexcept
|
||||
{}
|
||||
|
||||
rate_limit_exception::rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept
|
||||
: cassandra_exception(exception_code::CONFIG_ERROR, prepare_message("Per-partition rate limit reached for {} in table {}.{}, rejected by {}", op_type_, ks, cf, rejected_by_coordinator_ ? "coordinator" : "replicas"))
|
||||
: cassandra_exception(exception_code::RATE_LIMIT_ERROR, prepare_message("Per-partition rate limit reached for {} in table {}.{}, rejected by {}", op_type_, ks, cf, rejected_by_coordinator_ ? "coordinator" : "replicas"))
|
||||
, op_type(op_type_)
|
||||
, rejected_by_coordinator(rejected_by_coordinator_)
|
||||
{ }
|
||||
|
||||
6
main.cc
6
main.cc
@@ -600,6 +600,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
cfg->broadcast_to_all_shards().get();
|
||||
|
||||
// We pass this piece of config through a global as a temporary hack.
|
||||
// See the comment at the definition of sstables::global_cache_index_pages.
|
||||
smp::invoke_on_all([&cfg] {
|
||||
sstables::global_cache_index_pages = cfg->cache_index_pages.operator utils::updateable_value<bool>();
|
||||
}).get();
|
||||
|
||||
::sighup_handler sighup_handler(opts, *cfg);
|
||||
auto stop_sighup_handler = defer_verbose_shutdown("sighup", [&] {
|
||||
sighup_handler.stop().get();
|
||||
|
||||
@@ -467,6 +467,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
// should not be blocked by any data requests.
|
||||
case messaging_verb::GROUP0_PEER_EXCHANGE:
|
||||
case messaging_verb::GROUP0_MODIFY_CONFIG:
|
||||
// ATTN -- if moving GOSSIP_ verbs elsewhere, mind updating the tcp_nodelay
|
||||
// setting in get_rpc_client(), which assumes gossiper verbs live in idx 0
|
||||
return 0;
|
||||
case messaging_verb::PREPARE_MESSAGE:
|
||||
case messaging_verb::PREPARE_DONE_MESSAGE:
|
||||
@@ -737,7 +739,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
}();
|
||||
|
||||
auto must_tcp_nodelay = [&] {
|
||||
if (idx == 1) {
|
||||
if (idx == 0) {
|
||||
return true; // gossip
|
||||
}
|
||||
if (_cfg.tcp_nodelay == tcp_nodelay_what::local) {
|
||||
|
||||
@@ -826,6 +826,7 @@ public:
|
||||
|
||||
void apply(tombstone deleted_at) {
|
||||
_deleted_at.apply(deleted_at);
|
||||
maybe_shadow();
|
||||
}
|
||||
|
||||
void apply(shadowable_tombstone deleted_at) {
|
||||
|
||||
@@ -444,7 +444,7 @@ public:
|
||||
// When throws, the cursor is invalidated and its position is not changed.
|
||||
bool advance_to(position_in_partition_view lower_bound) {
|
||||
maybe_advance_to(lower_bound);
|
||||
return no_clustering_row_between(_schema, lower_bound, position());
|
||||
return no_clustering_row_between_weak(_schema, lower_bound, position());
|
||||
}
|
||||
|
||||
// Call only when valid.
|
||||
|
||||
@@ -571,6 +571,20 @@ bool no_clustering_row_between(const schema& s, position_in_partition_view a, po
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if and only if there can't be any clustering_row with position >= a and < b.
|
||||
// It is assumed that a <= b.
|
||||
inline
|
||||
bool no_clustering_row_between_weak(const schema& s, position_in_partition_view a, position_in_partition_view b) {
|
||||
clustering_key_prefix::equality eq(s);
|
||||
if (a.has_key() && b.has_key()) {
|
||||
return eq(a.key(), b.key())
|
||||
&& (a.get_bound_weight() == bound_weight::after_all_prefixed
|
||||
|| b.get_bound_weight() != bound_weight::after_all_prefixed);
|
||||
} else {
|
||||
return !a.has_key() && !b.has_key();
|
||||
}
|
||||
}
|
||||
|
||||
// Includes all position_in_partition objects "p" for which: start <= p < end
|
||||
// And only those.
|
||||
class position_range {
|
||||
|
||||
19
querier.cc
19
querier.cc
@@ -413,25 +413,6 @@ future<bool> querier_cache::evict_one() noexcept {
|
||||
co_return false;
|
||||
}
|
||||
|
||||
future<> querier_cache::evict_all_for_table(const utils::UUID& schema_id) noexcept {
|
||||
for (auto ip : {&_data_querier_index, &_mutation_querier_index, &_shard_mutation_querier_index}) {
|
||||
auto& idx = *ip;
|
||||
for (auto it = idx.begin(); it != idx.end();) {
|
||||
if (it->second->schema().id() == schema_id) {
|
||||
auto reader_opt = it->second->permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(*it->second));
|
||||
it = idx.erase(it);
|
||||
--_stats.population;
|
||||
if (reader_opt) {
|
||||
co_await reader_opt->close();
|
||||
}
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> querier_cache::stop() noexcept {
|
||||
co_await _closing_gate.close();
|
||||
|
||||
|
||||
@@ -383,11 +383,6 @@ public:
|
||||
/// is empty).
|
||||
future<bool> evict_one() noexcept;
|
||||
|
||||
/// Evict all queriers that belong to a table.
|
||||
///
|
||||
/// Should be used when dropping a table.
|
||||
future<> evict_all_for_table(const utils::UUID& schema_id) noexcept;
|
||||
|
||||
/// Close all queriers and wait on background work.
|
||||
///
|
||||
/// Should be used before destroying the querier_cache.
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <boost/range/adaptor/reversed.hpp>
|
||||
#include "range_tombstone_list.hh"
|
||||
#include "utils/allocation_strategy.hh"
|
||||
#include "utils/amortized_reserve.hh"
|
||||
#include <seastar/util/variant_utils.hh>
|
||||
|
||||
range_tombstone_list::range_tombstone_list(const range_tombstone_list& x)
|
||||
@@ -375,13 +376,13 @@ range_tombstone_list::reverter::insert(range_tombstones_type::iterator it, range
|
||||
|
||||
range_tombstone_list::range_tombstones_type::iterator
|
||||
range_tombstone_list::reverter::erase(range_tombstones_type::iterator it) {
|
||||
_ops.reserve(_ops.size() + 1);
|
||||
amortized_reserve(_ops, _ops.size() + 1);
|
||||
_ops.emplace_back(erase_undo_op(*it));
|
||||
return _dst._tombstones.erase(it);
|
||||
}
|
||||
|
||||
void range_tombstone_list::reverter::update(range_tombstones_type::iterator it, range_tombstone&& new_rt) {
|
||||
_ops.reserve(_ops.size() + 1);
|
||||
amortized_reserve(_ops, _ops.size() + 1);
|
||||
swap(it->tombstone(), new_rt);
|
||||
_ops.emplace_back(update_undo_op(std::move(new_rt), *it));
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "range_tombstone.hh"
|
||||
#include "query-request.hh"
|
||||
#include "utils/preempt.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include <iosfwd>
|
||||
#include <variant>
|
||||
|
||||
@@ -106,7 +107,7 @@ class range_tombstone_list final {
|
||||
class reverter {
|
||||
private:
|
||||
using op = std::variant<erase_undo_op, insert_undo_op, update_undo_op>;
|
||||
std::vector<op> _ops;
|
||||
utils::chunked_vector<op> _ops;
|
||||
const schema& _s;
|
||||
protected:
|
||||
range_tombstone_list& _dst;
|
||||
|
||||
@@ -749,6 +749,25 @@ void reader_concurrency_semaphore::clear_inactive_reads() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::evict_inactive_reads_for_table(utils::UUID id) noexcept {
|
||||
inactive_reads_type evicted_readers;
|
||||
auto it = _inactive_reads.begin();
|
||||
while (it != _inactive_reads.end()) {
|
||||
auto& ir = *it;
|
||||
++it;
|
||||
if (ir.reader.schema()->id() == id) {
|
||||
do_detach_inactive_reader(ir, evict_reason::manual);
|
||||
ir.ttl_timer.cancel();
|
||||
ir.unlink();
|
||||
evicted_readers.push_back(ir);
|
||||
}
|
||||
}
|
||||
while (!evicted_readers.empty()) {
|
||||
std::unique_ptr<inactive_read> irp(&evicted_readers.front());
|
||||
co_await irp->reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
std::runtime_error reader_concurrency_semaphore::stopped_exception() {
|
||||
return std::runtime_error(format("{} was stopped", _name));
|
||||
}
|
||||
@@ -771,11 +790,9 @@ future<> reader_concurrency_semaphore::stop() noexcept {
|
||||
co_return;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
|
||||
auto reader = std::move(ir.reader);
|
||||
void reader_concurrency_semaphore::do_detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
|
||||
ir.detach();
|
||||
reader.permit()._impl->on_evicted();
|
||||
std::unique_ptr<inactive_read> irp(&ir);
|
||||
ir.reader.permit()._impl->on_evicted();
|
||||
try {
|
||||
if (ir.notify_handler) {
|
||||
ir.notify_handler(reason);
|
||||
@@ -794,7 +811,12 @@ flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(ina
|
||||
break;
|
||||
}
|
||||
--_stats.inactive_reads;
|
||||
return reader;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
|
||||
std::unique_ptr<inactive_read> irp(&ir);
|
||||
do_detach_inactive_reader(ir, reason);
|
||||
return std::move(irp->reader);
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason) noexcept {
|
||||
|
||||
@@ -187,6 +187,7 @@ private:
|
||||
std::optional<future<>> _execution_loop_future;
|
||||
|
||||
private:
|
||||
void do_detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
|
||||
[[nodiscard]] flat_mutation_reader_v2 detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
|
||||
void evict(inactive_read&, evict_reason reason) noexcept;
|
||||
|
||||
@@ -302,6 +303,9 @@ public:
|
||||
|
||||
/// Clear all inactive reads.
|
||||
void clear_inactive_reads();
|
||||
|
||||
/// Evict all inactive reads the belong to the table designated by the id.
|
||||
future<> evict_inactive_reads_for_table(utils::UUID id) noexcept;
|
||||
private:
|
||||
// The following two functions are extension points for
|
||||
// future inheriting classes that needs to run some stop
|
||||
|
||||
@@ -847,12 +847,12 @@ future<> shard_reader_v2::do_fill_buffer() {
|
||||
}
|
||||
|
||||
auto res = co_await(std::move(fill_buf_fut));
|
||||
_end_of_stream = res.end_of_stream;
|
||||
reserve_additional(res.buffer->size());
|
||||
for (const auto& mf : *res.buffer) {
|
||||
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
_end_of_stream = res.end_of_stream;
|
||||
}
|
||||
|
||||
future<> shard_reader_v2::fill_buffer() {
|
||||
|
||||
@@ -1017,7 +1017,9 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
|
||||
remove(*cf);
|
||||
cf->clear_views();
|
||||
co_await cf->await_pending_ops();
|
||||
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
|
||||
for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) {
|
||||
co_await sem->evict_inactive_reads_for_table(uuid);
|
||||
}
|
||||
auto f = co_await coroutine::as_future(truncate(ks, *cf, std::move(tsf), snapshot));
|
||||
co_await cf->stop();
|
||||
f.get(); // re-throw exception from truncate() if any
|
||||
@@ -2238,10 +2240,14 @@ future<> database::stop() {
|
||||
|
||||
// try to ensure that CL has done disk flushing
|
||||
if (_commitlog) {
|
||||
dblog.info("Shutting down commitlog");
|
||||
co_await _commitlog->shutdown();
|
||||
dblog.info("Shutting down commitlog complete");
|
||||
}
|
||||
if (_schema_commitlog) {
|
||||
dblog.info("Shutting down schema commitlog");
|
||||
co_await _schema_commitlog->shutdown();
|
||||
dblog.info("Shutting down schema commitlog complete");
|
||||
}
|
||||
co_await _view_update_concurrency_sem.wait(max_memory_pending_view_updates());
|
||||
if (_commitlog) {
|
||||
|
||||
@@ -586,7 +586,8 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
|
||||
auto permit = std::move(flush_permit);
|
||||
auto r = exponential_backoff_retry(100ms, 10s);
|
||||
// Try flushing for around half an hour (30 minutes every 10 seconds)
|
||||
int allowed_retries = 30 * 60 / 10;
|
||||
int default_retries = 30 * 60 / 10;
|
||||
int allowed_retries = default_retries;
|
||||
std::optional<utils::phased_barrier::operation> op;
|
||||
size_t memtable_size;
|
||||
future<> previous_flush = make_ready_future<>();
|
||||
@@ -599,7 +600,19 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
_config.cf_stats->failed_memtables_flushes_count++;
|
||||
auto abort_on_error = [ex] () {
|
||||
|
||||
if (try_catch<std::bad_alloc>(ex)) {
|
||||
// There is a chance something else will free the memory, so we can try again
|
||||
allowed_retries--;
|
||||
} else if (auto ep = try_catch<std::system_error>(ex)) {
|
||||
allowed_retries = ep->code().value() == ENOSPC ? default_retries : 0;
|
||||
} else if (auto ep = try_catch<storage_io_error>(ex)) {
|
||||
allowed_retries = ep->code().value() == ENOSPC ? default_retries : 0;
|
||||
} else {
|
||||
allowed_retries = 0;
|
||||
}
|
||||
|
||||
if (allowed_retries <= 0) {
|
||||
// At this point we don't know what has happened and it's better to potentially
|
||||
// take the node down and rely on commitlog to replay.
|
||||
//
|
||||
@@ -608,14 +621,6 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
|
||||
// may end up in an infinite crash loop.
|
||||
tlogger.error("Memtable flush failed due to: {}. Aborting, at {}", ex, current_backtrace());
|
||||
std::abort();
|
||||
};
|
||||
if (try_catch<std::bad_alloc>(ex)) {
|
||||
// There is a chance something else will free the memory, so we can try again
|
||||
if (allowed_retries-- <= 0) {
|
||||
abort_on_error();
|
||||
}
|
||||
} else {
|
||||
abort_on_error();
|
||||
}
|
||||
}
|
||||
if (_async_gate.is_closed()) {
|
||||
@@ -681,7 +686,7 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
|
||||
auto write_permit = permit.release_sstable_write_permit();
|
||||
|
||||
utils::get_local_injector().inject("table_seal_active_memtable_try_flush", []() {
|
||||
throw std::bad_alloc();
|
||||
throw std::system_error(ENOSPC, std::system_category(), "Injected error");
|
||||
});
|
||||
co_return co_await this->try_flush_memtable_to_sstable(old, std::move(write_permit));
|
||||
});
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: f9f5228b74...3aa91b4d2d
@@ -1189,7 +1189,7 @@ private:
|
||||
}
|
||||
index_reader& get_index_reader() {
|
||||
if (!_index_reader) {
|
||||
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
|
||||
_consumer.trace_state(), caching, _single_partition_read);
|
||||
}
|
||||
|
||||
@@ -1319,7 +1319,7 @@ private:
|
||||
}
|
||||
index_reader& get_index_reader() {
|
||||
if (!_index_reader) {
|
||||
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
|
||||
_consumer.trace_state(), caching, _single_partition_read);
|
||||
}
|
||||
@@ -1754,9 +1754,7 @@ public:
|
||||
_monitor.on_read_started(_context->reader_position());
|
||||
}
|
||||
public:
|
||||
void on_out_of_clustering_range() override {
|
||||
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
|
||||
}
|
||||
void on_out_of_clustering_range() override { }
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
|
||||
}
|
||||
|
||||
@@ -87,6 +87,18 @@ thread_local disk_error_signal_type sstable_write_error;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
// The below flag governs the mode of index file page caching used by the index
|
||||
// reader.
|
||||
//
|
||||
// If set to true, the reader will read and/or populate a common global cache,
|
||||
// which shares its capacity with the row cache. If false, the reader will use
|
||||
// BYPASS CACHE semantics for index caching.
|
||||
//
|
||||
// This flag is intended to be a temporary hack. The goal is to eventually
|
||||
// solve index caching problems via a smart cache replacement policy.
|
||||
//
|
||||
thread_local utils::updateable_value<bool> global_cache_index_pages(false);
|
||||
|
||||
logging::logger sstlog("sstable");
|
||||
|
||||
// Because this is a noop and won't hold any state, it is better to use a global than a
|
||||
|
||||
@@ -50,6 +50,7 @@
|
||||
#include "mutation_fragment_stream_validator.hh"
|
||||
#include "readers/flat_mutation_reader_fwd.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
#include <seastar/util/optimized_optional.hh>
|
||||
|
||||
@@ -58,6 +59,8 @@ class cached_file;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
extern thread_local utils::updateable_value<bool> global_cache_index_pages;
|
||||
|
||||
namespace mc {
|
||||
class writer;
|
||||
}
|
||||
|
||||
@@ -94,9 +94,9 @@ def test_describe_table_size(test_table):
|
||||
# Test the ProvisionedThroughput attribute returned by DescribeTable.
|
||||
# This is a very partial test: Our test table is configured without
|
||||
# provisioned throughput, so obviously it will not have interesting settings
|
||||
# for it. DynamoDB returns zeros for some of the attributes, even though
|
||||
# the documentation suggests missing values should have been fine too.
|
||||
@pytest.mark.xfail(reason="DescribeTable does not return provisioned throughput")
|
||||
# for it. But DynamoDB documents that zeros be returned for WriteCapacityUnits
|
||||
# and ReadCapacityUnits, and does this in practice as well - and some
|
||||
# applications assume these numbers are always there (even if 0).
|
||||
def test_describe_table_provisioned_throughput(test_table):
|
||||
got = test_table.meta.client.describe_table(TableName=test_table.name)['Table']
|
||||
assert got['ProvisionedThroughput']['NumberOfDecreasesToday'] == 0
|
||||
|
||||
@@ -427,6 +427,126 @@ def test_gsi_update_second_regular_base_column(test_table_gsi_3):
|
||||
KeyConditions={'a': {'AttributeValueList': [items[3]['a']], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [items[3]['b']], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# Test reproducing issue #11801: In issue #5006 we noticed that in the special
|
||||
# case of a GSI with with two non-key attributes as keys (test_table_gsi_3),
|
||||
# an update of the second attribute forgot to delete the old row. We fixed
|
||||
# that bug, but a bug remained for updates which update the value to the *same*
|
||||
# value - in that case the old row shouldn't be deleted, but we did - as
|
||||
# noticed in issue #11801.
|
||||
def test_11801(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
# Update the attribute 'b' to the same value b that it already had.
|
||||
# This shouldn't change anything in the base table or in the GSI
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
# In issue #11801, the following assertion failed (the view row was
|
||||
# deleted and nothing matched the query).
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
# Above we checked that setting 'b' to the same value didn't remove
|
||||
# the old GSI row. But the same update may actually modify the GSI row
|
||||
# (e.g., an unrelated attribute d) - check this modification took place:
|
||||
item['d'] = random_string()
|
||||
test_table_gsi_3.update_item(Key={'p': p},
|
||||
AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'},
|
||||
'd': {'Value': item['d'], 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# This test is the same as test_11801, but updating the first attribute (a)
|
||||
# instead of the second (b). This test didn't fail, showing that issue #11801
|
||||
# is - like #5006 - specific to the case of updating the second attribute.
|
||||
def test_11801_variant1(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
d = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': d}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'a': {'Value': a, 'Action': 'PUT'}})
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# This test is the same as test_11801, but updates b to a different value
|
||||
# (newb) instead of to the same one. This test didn't fail, showing that
|
||||
# issue #11801 is specific to updates to the same value. This test basically
|
||||
# reproduces the already-fixed #5006 (we also have another test above which
|
||||
# reproduces that issue - test_gsi_update_second_regular_base_column())
|
||||
def test_11801_variant2(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
newb = random_string()
|
||||
item['b'] = newb
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': newb, 'Action': 'PUT'}})
|
||||
assert_index_query(test_table_gsi_3, 'hello', [],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [newb], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# This test is the same as test_11801, but uses a different table schema
|
||||
# (test_table_gsi_5) where there is only one new key column in the view (x).
|
||||
# This test passed, showing that issue #11801 was specific to the special
|
||||
# case of a view with two new key columns (test_table_gsi_3).
|
||||
def test_11801_variant3(test_table_gsi_5):
|
||||
p = random_string()
|
||||
c = random_string()
|
||||
x = random_string()
|
||||
item = {'p': p, 'c': c, 'x': x, 'd': random_string()}
|
||||
test_table_gsi_5.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_5, 'hello', [item],
|
||||
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
test_table_gsi_5.update_item(Key={'p': p, 'c': c}, AttributeUpdates={'x': {'Value': x, 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
|
||||
assert_index_query(test_table_gsi_5, 'hello', [item],
|
||||
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# Another test similar to test_11801, but instead of updating a view key
|
||||
# column to the same value it already has, simply don't update it at all
|
||||
# (and just modify some other regular column). This test passed, showing
|
||||
# that issue #11801 is specific to the case of updating a view key column
|
||||
# to the same value it already had.
|
||||
def test_11801_variant4(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
# An update that doesn't change the GSI keys (a or b), just a regular
|
||||
# column d.
|
||||
item['d'] = random_string()
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'d': {'Value': item['d'], 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# Test that when a table has a GSI, if the indexed attribute is missing, the
|
||||
# item is added to the base table but not the index.
|
||||
# This is the same feature we already tested in test_gsi_missing_attribute()
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <deque>
|
||||
#include <random>
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/amortized_reserve.hh"
|
||||
|
||||
#include <boost/range/algorithm/sort.hpp>
|
||||
#include <boost/range/algorithm/equal.hpp>
|
||||
@@ -207,3 +208,37 @@ BOOST_AUTO_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
|
||||
v.emplace_back(std::make_unique<uint64_t>(i));
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_amoritzed_reserve) {
|
||||
utils::chunked_vector<int> v;
|
||||
|
||||
v.reserve(10);
|
||||
amortized_reserve(v, 1);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 10);
|
||||
BOOST_REQUIRE_EQUAL(v.size(), 0);
|
||||
|
||||
v = {};
|
||||
amortized_reserve(v, 1);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 1);
|
||||
BOOST_REQUIRE_EQUAL(v.size(), 0);
|
||||
|
||||
v = {};
|
||||
amortized_reserve(v, 1);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 1);
|
||||
amortized_reserve(v, 2);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 2);
|
||||
amortized_reserve(v, 3);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 4);
|
||||
amortized_reserve(v, 4);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 4);
|
||||
amortized_reserve(v, 5);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
|
||||
amortized_reserve(v, 6);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
|
||||
amortized_reserve(v, 7);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
|
||||
amortized_reserve(v, 7);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
|
||||
amortized_reserve(v, 1);
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
|
||||
}
|
||||
|
||||
@@ -1852,6 +1852,29 @@ SEASTAR_TEST_CASE(test_continuity_merging_of_complete_mutations) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_commutativity_and_associativity) {
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
gen.set_key_cardinality(7);
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
mutation m1 = gen();
|
||||
m1.partition().make_fully_continuous();
|
||||
mutation m2 = gen();
|
||||
m2.partition().make_fully_continuous();
|
||||
mutation m3 = gen();
|
||||
m3.partition().make_fully_continuous();
|
||||
|
||||
assert_that(m1 + m2 + m3)
|
||||
.is_equal_to(m1 + m3 + m2)
|
||||
.is_equal_to(m2 + m1 + m3)
|
||||
.is_equal_to(m2 + m3 + m1)
|
||||
.is_equal_to(m3 + m1 + m2)
|
||||
.is_equal_to(m3 + m2 + m1);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_continuity_merging) {
|
||||
return seastar::async([] {
|
||||
simple_schema table;
|
||||
|
||||
@@ -641,7 +641,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
|
||||
static mutation_partition read_using_cursor(partition_snapshot& snap) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
partition_snapshot_row_cursor cur(*snap.schema(), snap);
|
||||
cur.maybe_refresh();
|
||||
cur.advance_to(position_in_partition::before_all_clustered_rows());
|
||||
auto mp = read_partition_from(*snap.schema(), cur);
|
||||
for (auto&& rt : snap.range_tombstones()) {
|
||||
mp.apply_delete(*snap.schema(), rt);
|
||||
|
||||
@@ -327,11 +327,6 @@ public:
|
||||
return *this;
|
||||
}
|
||||
|
||||
test_querier_cache& evict_all_for_table() {
|
||||
_cache.evict_all_for_table(get_schema()->id()).get();
|
||||
return *this;
|
||||
}
|
||||
|
||||
test_querier_cache& no_misses() {
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, _expected_stats.misses);
|
||||
return *this;
|
||||
@@ -727,21 +722,6 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
}, std::move(db_cfg_ptr)).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evict_all_for_table) {
|
||||
test_querier_cache t;
|
||||
|
||||
const auto entry = t.produce_first_page_and_save_mutation_querier();
|
||||
|
||||
t.evict_all_for_table();
|
||||
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
|
||||
.misses()
|
||||
.no_drops()
|
||||
.no_evictions();
|
||||
|
||||
// Check that the querier was removed from the semaphore too.
|
||||
BOOST_CHECK(!t.get_semaphore().try_evict_one_inactive_read());
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
test_querier_cache t;
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/random_schema.hh"
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
@@ -915,3 +916,44 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_for_table) {
|
||||
auto spec = tests::make_random_schema_specification(get_name());
|
||||
|
||||
std::list<tests::random_schema> schemas;
|
||||
std::unordered_map<tests::random_schema*, std::vector<reader_concurrency_semaphore::inactive_read_handle>> schema_handles;
|
||||
for (unsigned i = 0; i < 4; ++i) {
|
||||
auto& s = schemas.emplace_back(tests::random_schema(i, *spec));
|
||||
schema_handles.emplace(&s, std::vector<reader_concurrency_semaphore::inactive_read_handle>{});
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout))));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
}
|
||||
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
semaphore.evict_inactive_reads_for_table(s.schema()->id()).get();
|
||||
for (const auto& [k, v] : schema_handles) {
|
||||
if (k == &s) {
|
||||
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return !bool(handle); }));
|
||||
} else if (!v.empty()) {
|
||||
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
}
|
||||
}
|
||||
handles.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1235,9 +1235,13 @@ SEASTAR_TEST_CASE(test_update_failure) {
|
||||
class throttle {
|
||||
unsigned _block_counter = 0;
|
||||
promise<> _p; // valid when _block_counter != 0, resolves when goes down to 0
|
||||
std::optional<promise<>> _entered;
|
||||
bool _one_shot;
|
||||
public:
|
||||
// one_shot means whether only the first enter() after block() will block.
|
||||
throttle(bool one_shot = false) : _one_shot(one_shot) {}
|
||||
future<> enter() {
|
||||
if (_block_counter) {
|
||||
if (_block_counter && (!_one_shot || _entered)) {
|
||||
promise<> p1;
|
||||
promise<> p2;
|
||||
|
||||
@@ -1249,16 +1253,21 @@ public:
|
||||
p3.set_value();
|
||||
});
|
||||
_p = std::move(p2);
|
||||
|
||||
if (_entered) {
|
||||
_entered->set_value();
|
||||
_entered.reset();
|
||||
}
|
||||
return f1;
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
void block() {
|
||||
future<> block() {
|
||||
++_block_counter;
|
||||
_p = promise<>();
|
||||
_entered = promise<>();
|
||||
return _entered->get_future();
|
||||
}
|
||||
|
||||
void unblock() {
|
||||
@@ -1402,7 +1411,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
mt2->apply(m);
|
||||
}
|
||||
|
||||
thr.block();
|
||||
auto f = thr.block();
|
||||
|
||||
auto m0_range = dht::partition_range::make_singular(ring[0].ring_position());
|
||||
auto rd1 = cache.make_reader(s, semaphore.make_permit(), m0_range);
|
||||
@@ -1413,6 +1422,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
rd2.set_max_buffer_size(1);
|
||||
auto rd2_fill_buffer = rd2.fill_buffer();
|
||||
|
||||
f.get();
|
||||
sleep(10ms).get();
|
||||
|
||||
// This update should miss on all partitions
|
||||
@@ -1540,12 +1550,13 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
|
||||
mt2->apply(m);
|
||||
}
|
||||
|
||||
thr.block();
|
||||
auto f = thr.block();
|
||||
|
||||
auto rd1 = cache.make_reader(s, semaphore.make_permit());
|
||||
rd1.set_max_buffer_size(1);
|
||||
auto rd1_fill_buffer = rd1.fill_buffer();
|
||||
|
||||
f.get();
|
||||
sleep(10ms).get();
|
||||
|
||||
// This update should miss on all partitions
|
||||
@@ -3341,6 +3352,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_of_overlapping_tombstones_in_many_versi
|
||||
SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
|
||||
return seastar::async([] {
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
gen.set_key_cardinality(16);
|
||||
memtable_snapshot_source underlying(gen.schema());
|
||||
schema_ptr s = gen.schema();
|
||||
schema_ptr rev_s = s->make_reversed();
|
||||
@@ -3994,3 +4006,81 @@ SEASTAR_TEST_CASE(row_cache_is_populated_using_compacting_sstable_reader) {
|
||||
BOOST_ASSERT(rt.calculate_size() == 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_eviction_of_upper_bound_of_population_range) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto cache_mt = make_lw_shared<replica::memtable>(s.schema());
|
||||
|
||||
auto pkey = s.make_pkey("pk");
|
||||
|
||||
mutation m1(s.schema(), pkey);
|
||||
s.add_row(m1, s.make_ckey(1), "v1");
|
||||
s.add_row(m1, s.make_ckey(2), "v2");
|
||||
cache_mt->apply(m1);
|
||||
|
||||
cache_tracker tracker;
|
||||
throttle thr(true);
|
||||
auto cache_source = make_decorated_snapshot_source(snapshot_source([&] { return cache_mt->as_data_source(); }),
|
||||
[&] (mutation_source src) {
|
||||
return throttled_mutation_source(thr, std::move(src));
|
||||
});
|
||||
row_cache cache(s.schema(), cache_source, tracker);
|
||||
|
||||
auto pr = dht::partition_range::make_singular(pkey);
|
||||
|
||||
auto read = [&] (int start, int end) {
|
||||
auto slice = partition_slice_builder(*s.schema())
|
||||
.with_range(query::clustering_range::make(s.make_ckey(start), s.make_ckey(end)))
|
||||
.build();
|
||||
auto rd = cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice);
|
||||
auto close_rd = deferred_close(rd);
|
||||
auto m_cache = read_mutation_from_flat_mutation_reader(rd).get0();
|
||||
close_rd.close_now();
|
||||
rd = cache_mt->make_flat_reader(s.schema(), semaphore.make_permit(), pr, slice);
|
||||
auto close_rd2 = deferred_close(rd);
|
||||
auto m_mt = read_mutation_from_flat_mutation_reader(rd).get0();
|
||||
BOOST_REQUIRE(m_mt);
|
||||
assert_that(m_cache).has_mutation().is_equal_to(*m_mt);
|
||||
};
|
||||
|
||||
// populate [2]
|
||||
{
|
||||
auto slice = partition_slice_builder(*s.schema())
|
||||
.with_range(query::clustering_range::make_singular(s.make_ckey(2)))
|
||||
.build();
|
||||
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
|
||||
.has_monotonic_positions();
|
||||
}
|
||||
|
||||
auto arrived = thr.block();
|
||||
|
||||
// Read [0, 2]
|
||||
auto f = seastar::async([&] {
|
||||
read(0, 2);
|
||||
});
|
||||
|
||||
arrived.get();
|
||||
|
||||
// populate (2, 3]
|
||||
{
|
||||
auto slice = partition_slice_builder(*s.schema())
|
||||
.with_range(query::clustering_range::make(query::clustering_range::bound(s.make_ckey(2), false),
|
||||
query::clustering_range::bound(s.make_ckey(3), true)))
|
||||
.build();
|
||||
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
|
||||
.has_monotonic_positions();
|
||||
}
|
||||
|
||||
testlog.trace("Evicting");
|
||||
evict_one_row(tracker); // Evicts before(0)
|
||||
evict_one_row(tracker); // Evicts ck(2)
|
||||
testlog.trace("Unblocking");
|
||||
|
||||
thr.unblock();
|
||||
f.get();
|
||||
|
||||
read(0, 3);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3149,3 +3149,58 @@ SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_crawling_reader_out_of_range_last_range_tombstone_change) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema table;
|
||||
|
||||
auto mut = table.new_mutation("pk0");
|
||||
auto ckeys = table.make_ckeys(4);
|
||||
table.add_row(mut, ckeys[0], "v0");
|
||||
table.add_row(mut, ckeys[1], "v1");
|
||||
table.add_row(mut, ckeys[2], "v2");
|
||||
using bound = query::clustering_range::bound;
|
||||
table.delete_range(mut, query::clustering_range::make(bound{ckeys[3], true}, bound{clustering_key::make_empty(), true}), tombstone(1, gc_clock::now()));
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto sst_gen = [&env, &table, &tmp] () {
|
||||
return env.make_sstable(table.schema(), tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
|
||||
};
|
||||
auto sst = make_sstable_containing(sst_gen, {mut});
|
||||
|
||||
assert_that(sst->make_crawling_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_crawling_reader_random_schema_random_mutations) {
|
||||
return test_env::do_with_async([this] (test_env& env) {
|
||||
auto random_spec = tests::make_random_schema_specification(
|
||||
get_name(),
|
||||
std::uniform_int_distribution<size_t>(1, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 8),
|
||||
std::uniform_int_distribution<size_t>(2, 8));
|
||||
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
|
||||
auto schema = random_schema.schema();
|
||||
|
||||
testlog.info("Random schema:\n{}", random_schema.cql());
|
||||
|
||||
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto sst_gen = [&env, schema, &tmp] () {
|
||||
return env.make_sstable(schema, tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
|
||||
};
|
||||
auto sst = make_sstable_containing(sst_gen, muts);
|
||||
|
||||
{
|
||||
auto rd = assert_that(sst->make_crawling_reader(schema, env.make_reader_permit()));
|
||||
|
||||
for (const auto& mut : muts) {
|
||||
rd.produces(mut);
|
||||
}
|
||||
}
|
||||
|
||||
assert_that(sst->make_crawling_reader(schema, env.make_reader_permit())).has_monotonic_positions();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import pytest
|
||||
import rest_api
|
||||
import nodetool
|
||||
from util import new_test_table
|
||||
from cassandra.protocol import ConfigurationException
|
||||
|
||||
# Test inserts `N` rows into table, flushes it
|
||||
# and tries to read `M` non-existing keys.
|
||||
@@ -29,3 +30,27 @@ def test_bloom_filter(scylla_only, cql, test_keyspace, N, M, fp_chance):
|
||||
ratio = fp / M
|
||||
assert ratio >= fp_chance * 0.7 and ratio <= fp_chance * 1.15
|
||||
|
||||
# Test very small bloom_filter_fp_chance settings.
|
||||
# The Cassandra documentation suggests that bloom_filter_fp_chance can be set
|
||||
# to anything between 0 and 1, and the Datastax documentation even goes further
|
||||
# and explains that 0 means "the largest possible Bloom filter".
|
||||
# But in practice, there is a minimal false-positive chance that the Bloom
|
||||
# filter can possibly achieve and Cassandra refuses lower settings (see
|
||||
# CASSANDRA-11920) and Scylla should do the same instead of crashing much
|
||||
# later during a memtable flush as it did in issue #11524.
|
||||
@pytest.mark.parametrize("fp_chance", [1e-5, 0])
|
||||
def test_small_bloom_filter_fp_chance(cql, test_keyspace, fp_chance):
|
||||
with pytest.raises(ConfigurationException):
|
||||
with new_test_table(cql, test_keyspace, 'a int PRIMARY KEY', f'WITH bloom_filter_fp_chance = {fp_chance}') as table:
|
||||
cql.execute(f'INSERT INTO {table} (a) VALUES (1)')
|
||||
# In issue #11524, Scylla used to crash during this flush after the
|
||||
# table creation succeeded above.
|
||||
nodetool.flush(cql, table)
|
||||
|
||||
# Check that bloom_filter_fp_chance outside [0, 1] (i.e., > 1 or < 0)
|
||||
# is, unsurprisingly, forbidden.
|
||||
@pytest.mark.parametrize("fp_chance", [-0.1, 1.1])
|
||||
def test_invalid_bloom_filter_fp_chance(cql, test_keyspace, fp_chance):
|
||||
with pytest.raises(ConfigurationException):
|
||||
with new_test_table(cql, test_keyspace, 'a int PRIMARY KEY', f'WITH bloom_filter_fp_chance = {fp_chance}') as table:
|
||||
pass
|
||||
|
||||
@@ -2013,6 +2013,11 @@ public:
|
||||
_blobs = boost::copy_range<std::vector<bytes>>(keys | boost::adaptors::transformed([this] (sstring& k) { return to_bytes(k); }));
|
||||
}
|
||||
|
||||
void set_key_cardinality(size_t n_keys) {
|
||||
assert(n_keys <= n_blobs);
|
||||
_ck_index_dist = std::uniform_int_distribution<size_t>{0, n_keys - 1};
|
||||
}
|
||||
|
||||
bytes random_blob() {
|
||||
return _blobs[std::min(_blobs.size() - 1, std::max<size_t>(0, _ck_index_dist(_gen)))];
|
||||
}
|
||||
@@ -2236,12 +2241,23 @@ public:
|
||||
};
|
||||
|
||||
size_t row_count = row_count_dist(_gen);
|
||||
for (size_t i = 0; i < row_count; ++i) {
|
||||
auto ckey = make_random_key();
|
||||
|
||||
std::unordered_set<clustering_key, clustering_key::hashing, clustering_key::equality> keys(
|
||||
0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema));
|
||||
while (keys.size() < row_count) {
|
||||
keys.emplace(make_random_key());
|
||||
}
|
||||
|
||||
for (auto&& ckey : keys) {
|
||||
is_continuous continuous = is_continuous(_bool_dist(_gen));
|
||||
if (_not_dummy_dist(_gen)) {
|
||||
deletable_row& row = m.partition().clustered_row(*_schema, ckey, is_dummy::no, continuous);
|
||||
row.apply(random_row_marker());
|
||||
if (!row.marker().is_missing() && !row.marker().is_live()) {
|
||||
// Mutations are not associative if dead marker is not matched with a dead row
|
||||
// due to shadowable tombstone merging rules. See #11307.
|
||||
row.apply(tombstone(row.marker().timestamp(), row.marker().deletion_time()));
|
||||
}
|
||||
if (_bool_dist(_gen)) {
|
||||
set_random_cells(row.cells(), column_kind::regular_column);
|
||||
} else {
|
||||
@@ -2332,6 +2348,10 @@ std::vector<query::clustering_range> random_mutation_generator::make_random_rang
|
||||
return _impl->make_random_ranges(n_ranges);
|
||||
}
|
||||
|
||||
void random_mutation_generator::set_key_cardinality(size_t n_keys) {
|
||||
_impl->set_key_cardinality(n_keys);
|
||||
}
|
||||
|
||||
void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mutation>&,
|
||||
schema_ptr, const std::vector<mutation>&)> fn) {
|
||||
auto map_of_int_to_int = map_type_impl::get_instance(int32_type, int32_type, true);
|
||||
|
||||
@@ -64,6 +64,8 @@ public:
|
||||
range_tombstone make_random_range_tombstone();
|
||||
std::vector<dht::decorated_key> make_partition_keys(size_t n);
|
||||
std::vector<query::clustering_range> make_random_ranges(unsigned n_ranges);
|
||||
// Sets the number of distinct clustering keys which will be used in generated mutations.
|
||||
void set_key_cardinality(size_t);
|
||||
};
|
||||
|
||||
bytes make_blob(size_t blob_size);
|
||||
|
||||
@@ -43,6 +43,9 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
|
||||
auto prefill_compacted = logalloc::memory_compacted();
|
||||
auto prefill_allocated = logalloc::memory_allocated();
|
||||
|
||||
scheduling_latency_measurer memtable_slm;
|
||||
memtable_slm.start();
|
||||
|
||||
auto mt = make_lw_shared<replica::memtable>(s);
|
||||
auto fill_d = duration_in_seconds([&] {
|
||||
while (mt->occupancy().total_space() < memtable_size) {
|
||||
@@ -54,7 +57,8 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
|
||||
}
|
||||
}
|
||||
});
|
||||
std::cout << format("Memtable fill took {:.6f} [ms]", fill_d.count() * 1000) << std::endl;
|
||||
memtable_slm.stop();
|
||||
std::cout << format("Memtable fill took {:.6f} [ms], {}", fill_d.count() * 1000, memtable_slm) << std::endl;
|
||||
|
||||
std::cout << "Draining..." << std::endl;
|
||||
auto drain_d = duration_in_seconds([&] {
|
||||
@@ -223,6 +227,40 @@ void test_partition_with_lots_of_range_tombstones() {
|
||||
});
|
||||
}
|
||||
|
||||
// This test case stresses handling of overlapping range tombstones
|
||||
void test_partition_with_lots_of_range_tombstones_with_residuals() {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", uuid_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v1", bytes_type, column_kind::regular_column)
|
||||
.with_column("v2", bytes_type, column_kind::regular_column)
|
||||
.with_column("v3", bytes_type, column_kind::regular_column)
|
||||
.build();
|
||||
|
||||
auto pk = dht::decorate_key(*s, partition_key::from_single_value(*s,
|
||||
serialized(utils::UUID_gen::get_time_UUID())));
|
||||
int ck_idx = 0;
|
||||
|
||||
run_test("Large partition, lots of range tombstones with residuals", s, [&] {
|
||||
mutation m(s, pk);
|
||||
auto val = data_value(bytes(bytes::initialized_later(), cell_size));
|
||||
auto ck = clustering_key::from_single_value(*s, serialized(ck_idx++));
|
||||
auto r = query::clustering_range::make({ck}, {ck});
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply_row_tombstone(*s, range_tombstone(bound_view::from_range_start(r), bound_view::top(), tomb));
|
||||
|
||||
// Stress range tombstone overlapping with lots of range tombstones
|
||||
auto stride = 1'000'000;
|
||||
if (ck_idx == stride) {
|
||||
ck = clustering_key::from_single_value(*s, serialized(ck_idx - stride));
|
||||
r = query::clustering_range::make({ck}, {ck});
|
||||
m.partition().apply_row_tombstone(*s, range_tombstone(bound_view::from_range_start(r), bound_view::top(), tomb));
|
||||
}
|
||||
|
||||
return m;
|
||||
});
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
app_template app;
|
||||
return app.run(argc, argv, [&app] {
|
||||
@@ -236,6 +274,7 @@ int main(int argc, char** argv) {
|
||||
test_partition_with_few_small_rows();
|
||||
test_partition_with_lots_of_small_rows();
|
||||
test_partition_with_lots_of_range_tombstones();
|
||||
test_partition_with_lots_of_range_tombstones_with_residuals();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -81,6 +81,11 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<> error() {
|
||||
_barrier.abort();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
unsigned get_phase() const noexcept { return _phase.load(); }
|
||||
};
|
||||
|
||||
@@ -115,6 +120,16 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
w.stop().get();
|
||||
}
|
||||
|
||||
std::vector<int> count(64);
|
||||
parallel_for_each(count, [] (auto& cnt) -> future<> {
|
||||
std::vector<sharded<worker>> w(32);
|
||||
co_await parallel_for_each(w, [] (auto &sw) -> future<> {
|
||||
co_await sw.start(utils::cross_shard_barrier());
|
||||
co_await sw.invoke_on_all(&worker::error);
|
||||
co_await sw.stop();
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Submodule tools/java updated: ad6764b506...b3959948dd
@@ -727,7 +727,10 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
|
||||
if (ret < 0) {
|
||||
throw std::runtime_error("CQL frame LZ4 uncompression failure");
|
||||
}
|
||||
return out.size();
|
||||
if (ret != out.size()) {
|
||||
throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
|
||||
}
|
||||
return static_cast<size_t>(ret);
|
||||
});
|
||||
on_compression_buffer_use();
|
||||
return uncomp;
|
||||
|
||||
54
utils/amortized_reserve.hh
Normal file
54
utils/amortized_reserve.hh
Normal file
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <concepts>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
|
||||
/// Represents a container which can preallocate space for future insertions
|
||||
/// which can be used to reduce the number of overall memory re-allocation and item movement.
|
||||
///
|
||||
/// The number of items for which space is currently reserved is returned by capacity().
|
||||
/// This includes items currently present in the container.
|
||||
///
|
||||
/// The number of items currently present is returned by size().
|
||||
///
|
||||
/// Invariant:
|
||||
///
|
||||
/// size() <= capacity()
|
||||
///
|
||||
/// Space is reserved by calling reserve(desired_capacity).
|
||||
/// The post-condition of calling reserve() is:
|
||||
///
|
||||
/// capacity() >= desired_capacity
|
||||
///
|
||||
/// It is guaranteed insertion of (capacity() - size()) items does not
|
||||
/// throw if T::value_type constructor and move constructor do not throw.
|
||||
template <typename T>
|
||||
concept ContainerWithCapacity = requires (T x, size_t desired_capacity, typename T::value_type e) {
|
||||
{ x.reserve(desired_capacity) } -> std::same_as<void>;
|
||||
{ x.capacity() } -> std::same_as<size_t>;
|
||||
{ x.size() } -> std::same_as<size_t>;
|
||||
};
|
||||
|
||||
static_assert(ContainerWithCapacity<std::vector<int>>);
|
||||
|
||||
/// Reserves space for at least desired_capacity - v.size() elements.
|
||||
///
|
||||
/// Amortizes space expansion so that a series of N calls to amortized_reserve(v, v.size() + 1)
|
||||
/// starting from an empty container takes O(N) time overall.
|
||||
///
|
||||
/// Post-condition: v.capacity() >= desired_capacity
|
||||
template <ContainerWithCapacity T>
|
||||
void amortized_reserve(T& v, size_t desired_capacity) {
|
||||
if (desired_capacity > v.capacity()) {
|
||||
v.reserve(std::max(desired_capacity, v.capacity() * 2));
|
||||
}
|
||||
}
|
||||
@@ -123,6 +123,18 @@ namespace bloom_calculations {
|
||||
}
|
||||
return std::min(probs.size() - 1, size_t(v));
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the minimum supported bloom_filter_fp_chance value
|
||||
* if compute_bloom_spec() above is attempted with bloom_filter_fp_chance
|
||||
* lower than this, it will throw an unsupported_operation_exception.
|
||||
*/
|
||||
inline double min_supported_bloom_filter_fp_chance() {
|
||||
int max_buckets = probs.size() - 1;
|
||||
int max_K = probs[max_buckets].size() - 1;
|
||||
return probs[max_buckets][max_K];
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -126,9 +126,9 @@ private:
|
||||
future<> complete() {
|
||||
_b->counter.fetch_add(smp::count);
|
||||
bool alive = _b->alive.load(std::memory_order_relaxed);
|
||||
return smp::invoke_on_all([this, sid = this_shard_id(), alive] {
|
||||
return smp::invoke_on_all([b = _b, sid = this_shard_id(), alive] {
|
||||
if (this_shard_id() != sid) {
|
||||
std::optional<promise<>>& w = _b->wakeup[this_shard_id()];
|
||||
std::optional<promise<>>& w = b->wakeup[this_shard_id()];
|
||||
if (alive) {
|
||||
assert(w.has_value());
|
||||
w->set_value();
|
||||
|
||||
@@ -52,7 +52,7 @@ public:
|
||||
return _what.c_str();
|
||||
}
|
||||
|
||||
const std::error_code& code() const { return _code; }
|
||||
const std::error_code& code() const noexcept { return _code; }
|
||||
};
|
||||
|
||||
// Rethrow exception if not null
|
||||
|
||||
@@ -1329,6 +1329,12 @@ void reclaim_timer::sample_stats(stats& data) {
|
||||
}
|
||||
|
||||
void reclaim_timer::report() const noexcept {
|
||||
// The logger can allocate (and will recover from allocation failure), and
|
||||
// we're in a memory-sensitive situation here and allocation can easily fail.
|
||||
// Prevent --abort-on-seastar-bad-alloc from crashing us in a situation that
|
||||
// we're likely to recover from, by reclaiming more.
|
||||
auto guard = memory::disable_abort_on_alloc_failure_temporarily();
|
||||
|
||||
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
|
||||
auto info_level = _stall_detected ? log_level::info : log_level::debug;
|
||||
auto MiB = 1024*1024;
|
||||
|
||||
Reference in New Issue
Block a user