Compare commits
11 Commits
debug_form
...
scylla-5.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66f34245fc | ||
|
|
4047528bd9 | ||
|
|
1a82c61452 | ||
|
|
3d9800eb1c | ||
|
|
c48e9b47dd | ||
|
|
2eadaad9f7 | ||
|
|
d10aee15e7 | ||
|
|
9e017cb1e6 | ||
|
|
b8504cc9b2 | ||
|
|
856703a85e | ||
|
|
86a6c1fb2b |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.1.0-dev
|
||||
VERSION=5.1.0-rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2031,7 +2031,7 @@ future<> db::commitlog::segment_manager::shutdown() {
|
||||
}
|
||||
}
|
||||
co_await _shutdown_promise->get_shared_future();
|
||||
clogger.info("Commitlog shutdown complete");
|
||||
clogger.debug("Commitlog shutdown complete");
|
||||
}
|
||||
|
||||
void db::commitlog::segment_manager::add_file_to_dispose(named_file f, dispose_mode mode) {
|
||||
|
||||
@@ -899,6 +899,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Ignore truncation record stored in system tables as if tables were never truncated.")
|
||||
, force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false,
|
||||
"Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.")
|
||||
, cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, true,
|
||||
"Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.")
|
||||
, default_log_level(this, "default_log_level", value_status::Used)
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used)
|
||||
, log_to_stdout(this, "log_to_stdout", value_status::Used)
|
||||
|
||||
@@ -379,6 +379,8 @@ public:
|
||||
named_value<bool> ignore_truncation_record;
|
||||
named_value<bool> force_schema_commit_log;
|
||||
|
||||
named_value<bool> cache_index_pages;
|
||||
|
||||
seastar::logging_settings logging_settings(const log_cli::options&) const;
|
||||
|
||||
const db::extensions& extensions() const;
|
||||
|
||||
6
main.cc
6
main.cc
@@ -600,6 +600,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
cfg->broadcast_to_all_shards().get();
|
||||
|
||||
// We pass this piece of config through a global as a temporary hack.
|
||||
// See the comment at the definition of sstables::global_cache_index_pages.
|
||||
smp::invoke_on_all([&cfg] {
|
||||
sstables::global_cache_index_pages = cfg->cache_index_pages.operator utils::updateable_value<bool>();
|
||||
}).get();
|
||||
|
||||
::sighup_handler sighup_handler(opts, *cfg);
|
||||
auto stop_sighup_handler = defer_verbose_shutdown("sighup", [&] {
|
||||
sighup_handler.stop().get();
|
||||
|
||||
@@ -444,7 +444,7 @@ public:
|
||||
// When throws, the cursor is invalidated and its position is not changed.
|
||||
bool advance_to(position_in_partition_view lower_bound) {
|
||||
maybe_advance_to(lower_bound);
|
||||
return no_clustering_row_between(_schema, lower_bound, position());
|
||||
return no_clustering_row_between_weak(_schema, lower_bound, position());
|
||||
}
|
||||
|
||||
// Call only when valid.
|
||||
|
||||
@@ -571,6 +571,20 @@ bool no_clustering_row_between(const schema& s, position_in_partition_view a, po
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if and only if there can't be any clustering_row with position >= a and < b.
|
||||
// It is assumed that a <= b.
|
||||
inline
|
||||
bool no_clustering_row_between_weak(const schema& s, position_in_partition_view a, position_in_partition_view b) {
|
||||
clustering_key_prefix::equality eq(s);
|
||||
if (a.has_key() && b.has_key()) {
|
||||
return eq(a.key(), b.key())
|
||||
&& (a.get_bound_weight() == bound_weight::after_all_prefixed
|
||||
|| b.get_bound_weight() != bound_weight::after_all_prefixed);
|
||||
} else {
|
||||
return !a.has_key() && !b.has_key();
|
||||
}
|
||||
}
|
||||
|
||||
// Includes all position_in_partition objects "p" for which: start <= p < end
|
||||
// And only those.
|
||||
class position_range {
|
||||
|
||||
19
querier.cc
19
querier.cc
@@ -413,25 +413,6 @@ future<bool> querier_cache::evict_one() noexcept {
|
||||
co_return false;
|
||||
}
|
||||
|
||||
future<> querier_cache::evict_all_for_table(const utils::UUID& schema_id) noexcept {
|
||||
for (auto ip : {&_data_querier_index, &_mutation_querier_index, &_shard_mutation_querier_index}) {
|
||||
auto& idx = *ip;
|
||||
for (auto it = idx.begin(); it != idx.end();) {
|
||||
if (it->second->schema().id() == schema_id) {
|
||||
auto reader_opt = it->second->permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(*it->second));
|
||||
it = idx.erase(it);
|
||||
--_stats.population;
|
||||
if (reader_opt) {
|
||||
co_await reader_opt->close();
|
||||
}
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> querier_cache::stop() noexcept {
|
||||
co_await _closing_gate.close();
|
||||
|
||||
|
||||
@@ -383,11 +383,6 @@ public:
|
||||
/// is empty).
|
||||
future<bool> evict_one() noexcept;
|
||||
|
||||
/// Evict all queriers that belong to a table.
|
||||
///
|
||||
/// Should be used when dropping a table.
|
||||
future<> evict_all_for_table(const utils::UUID& schema_id) noexcept;
|
||||
|
||||
/// Close all queriers and wait on background work.
|
||||
///
|
||||
/// Should be used before destroying the querier_cache.
|
||||
|
||||
@@ -749,6 +749,25 @@ void reader_concurrency_semaphore::clear_inactive_reads() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::evict_inactive_reads_for_table(utils::UUID id) noexcept {
|
||||
inactive_reads_type evicted_readers;
|
||||
auto it = _inactive_reads.begin();
|
||||
while (it != _inactive_reads.end()) {
|
||||
auto& ir = *it;
|
||||
++it;
|
||||
if (ir.reader.schema()->id() == id) {
|
||||
do_detach_inactive_reader(ir, evict_reason::manual);
|
||||
ir.ttl_timer.cancel();
|
||||
ir.unlink();
|
||||
evicted_readers.push_back(ir);
|
||||
}
|
||||
}
|
||||
while (!evicted_readers.empty()) {
|
||||
std::unique_ptr<inactive_read> irp(&evicted_readers.front());
|
||||
co_await irp->reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
std::runtime_error reader_concurrency_semaphore::stopped_exception() {
|
||||
return std::runtime_error(format("{} was stopped", _name));
|
||||
}
|
||||
@@ -771,11 +790,9 @@ future<> reader_concurrency_semaphore::stop() noexcept {
|
||||
co_return;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
|
||||
auto reader = std::move(ir.reader);
|
||||
void reader_concurrency_semaphore::do_detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
|
||||
ir.detach();
|
||||
reader.permit()._impl->on_evicted();
|
||||
std::unique_ptr<inactive_read> irp(&ir);
|
||||
ir.reader.permit()._impl->on_evicted();
|
||||
try {
|
||||
if (ir.notify_handler) {
|
||||
ir.notify_handler(reason);
|
||||
@@ -794,7 +811,12 @@ flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(ina
|
||||
break;
|
||||
}
|
||||
--_stats.inactive_reads;
|
||||
return reader;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
|
||||
std::unique_ptr<inactive_read> irp(&ir);
|
||||
do_detach_inactive_reader(ir, reason);
|
||||
return std::move(irp->reader);
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason) noexcept {
|
||||
|
||||
@@ -187,6 +187,7 @@ private:
|
||||
std::optional<future<>> _execution_loop_future;
|
||||
|
||||
private:
|
||||
void do_detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
|
||||
[[nodiscard]] flat_mutation_reader_v2 detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
|
||||
void evict(inactive_read&, evict_reason reason) noexcept;
|
||||
|
||||
@@ -302,6 +303,9 @@ public:
|
||||
|
||||
/// Clear all inactive reads.
|
||||
void clear_inactive_reads();
|
||||
|
||||
/// Evict all inactive reads the belong to the table designated by the id.
|
||||
future<> evict_inactive_reads_for_table(utils::UUID id) noexcept;
|
||||
private:
|
||||
// The following two functions are extension points for
|
||||
// future inheriting classes that needs to run some stop
|
||||
|
||||
@@ -1017,7 +1017,9 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
|
||||
remove(*cf);
|
||||
cf->clear_views();
|
||||
co_await cf->await_pending_ops();
|
||||
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
|
||||
for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) {
|
||||
co_await sem->evict_inactive_reads_for_table(uuid);
|
||||
}
|
||||
auto f = co_await coroutine::as_future(truncate(ks, *cf, std::move(tsf), snapshot));
|
||||
co_await cf->stop();
|
||||
f.get(); // re-throw exception from truncate() if any
|
||||
@@ -2238,10 +2240,14 @@ future<> database::stop() {
|
||||
|
||||
// try to ensure that CL has done disk flushing
|
||||
if (_commitlog) {
|
||||
dblog.info("Shutting down commitlog");
|
||||
co_await _commitlog->shutdown();
|
||||
dblog.info("Shutting down commitlog complete");
|
||||
}
|
||||
if (_schema_commitlog) {
|
||||
dblog.info("Shutting down schema commitlog");
|
||||
co_await _schema_commitlog->shutdown();
|
||||
dblog.info("Shutting down schema commitlog complete");
|
||||
}
|
||||
co_await _view_update_concurrency_sem.wait(max_memory_pending_view_updates());
|
||||
if (_commitlog) {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: f9f5228b74...3aa91b4d2d
@@ -1189,7 +1189,7 @@ private:
|
||||
}
|
||||
index_reader& get_index_reader() {
|
||||
if (!_index_reader) {
|
||||
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
|
||||
_consumer.trace_state(), caching, _single_partition_read);
|
||||
}
|
||||
|
||||
@@ -1319,7 +1319,7 @@ private:
|
||||
}
|
||||
index_reader& get_index_reader() {
|
||||
if (!_index_reader) {
|
||||
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
|
||||
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
|
||||
_consumer.trace_state(), caching, _single_partition_read);
|
||||
}
|
||||
|
||||
@@ -87,6 +87,18 @@ thread_local disk_error_signal_type sstable_write_error;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
// The below flag governs the mode of index file page caching used by the index
|
||||
// reader.
|
||||
//
|
||||
// If set to true, the reader will read and/or populate a common global cache,
|
||||
// which shares its capacity with the row cache. If false, the reader will use
|
||||
// BYPASS CACHE semantics for index caching.
|
||||
//
|
||||
// This flag is intended to be a temporary hack. The goal is to eventually
|
||||
// solve index caching problems via a smart cache replacement policy.
|
||||
//
|
||||
thread_local utils::updateable_value<bool> global_cache_index_pages(false);
|
||||
|
||||
logging::logger sstlog("sstable");
|
||||
|
||||
// Because this is a noop and won't hold any state, it is better to use a global than a
|
||||
|
||||
@@ -50,6 +50,7 @@
|
||||
#include "mutation_fragment_stream_validator.hh"
|
||||
#include "readers/flat_mutation_reader_fwd.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
#include <seastar/util/optimized_optional.hh>
|
||||
|
||||
@@ -58,6 +59,8 @@ class cached_file;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
extern thread_local utils::updateable_value<bool> global_cache_index_pages;
|
||||
|
||||
namespace mc {
|
||||
class writer;
|
||||
}
|
||||
|
||||
@@ -641,7 +641,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
|
||||
static mutation_partition read_using_cursor(partition_snapshot& snap) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
partition_snapshot_row_cursor cur(*snap.schema(), snap);
|
||||
cur.maybe_refresh();
|
||||
cur.advance_to(position_in_partition::before_all_clustered_rows());
|
||||
auto mp = read_partition_from(*snap.schema(), cur);
|
||||
for (auto&& rt : snap.range_tombstones()) {
|
||||
mp.apply_delete(*snap.schema(), rt);
|
||||
|
||||
@@ -327,11 +327,6 @@ public:
|
||||
return *this;
|
||||
}
|
||||
|
||||
test_querier_cache& evict_all_for_table() {
|
||||
_cache.evict_all_for_table(get_schema()->id()).get();
|
||||
return *this;
|
||||
}
|
||||
|
||||
test_querier_cache& no_misses() {
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, _expected_stats.misses);
|
||||
return *this;
|
||||
@@ -727,21 +722,6 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
}, std::move(db_cfg_ptr)).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evict_all_for_table) {
|
||||
test_querier_cache t;
|
||||
|
||||
const auto entry = t.produce_first_page_and_save_mutation_querier();
|
||||
|
||||
t.evict_all_for_table();
|
||||
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
|
||||
.misses()
|
||||
.no_drops()
|
||||
.no_evictions();
|
||||
|
||||
// Check that the querier was removed from the semaphore too.
|
||||
BOOST_CHECK(!t.get_semaphore().try_evict_one_inactive_read());
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
test_querier_cache t;
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/random_schema.hh"
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
@@ -915,3 +916,44 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_for_table) {
|
||||
auto spec = tests::make_random_schema_specification(get_name());
|
||||
|
||||
std::list<tests::random_schema> schemas;
|
||||
std::unordered_map<tests::random_schema*, std::vector<reader_concurrency_semaphore::inactive_read_handle>> schema_handles;
|
||||
for (unsigned i = 0; i < 4; ++i) {
|
||||
auto& s = schemas.emplace_back(tests::random_schema(i, *spec));
|
||||
schema_handles.emplace(&s, std::vector<reader_concurrency_semaphore::inactive_read_handle>{});
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout))));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
}
|
||||
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
semaphore.evict_inactive_reads_for_table(s.schema()->id()).get();
|
||||
for (const auto& [k, v] : schema_handles) {
|
||||
if (k == &s) {
|
||||
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return !bool(handle); }));
|
||||
} else if (!v.empty()) {
|
||||
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
}
|
||||
}
|
||||
handles.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1235,9 +1235,13 @@ SEASTAR_TEST_CASE(test_update_failure) {
|
||||
class throttle {
|
||||
unsigned _block_counter = 0;
|
||||
promise<> _p; // valid when _block_counter != 0, resolves when goes down to 0
|
||||
std::optional<promise<>> _entered;
|
||||
bool _one_shot;
|
||||
public:
|
||||
// one_shot means whether only the first enter() after block() will block.
|
||||
throttle(bool one_shot = false) : _one_shot(one_shot) {}
|
||||
future<> enter() {
|
||||
if (_block_counter) {
|
||||
if (_block_counter && (!_one_shot || _entered)) {
|
||||
promise<> p1;
|
||||
promise<> p2;
|
||||
|
||||
@@ -1249,16 +1253,21 @@ public:
|
||||
p3.set_value();
|
||||
});
|
||||
_p = std::move(p2);
|
||||
|
||||
if (_entered) {
|
||||
_entered->set_value();
|
||||
_entered.reset();
|
||||
}
|
||||
return f1;
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
void block() {
|
||||
future<> block() {
|
||||
++_block_counter;
|
||||
_p = promise<>();
|
||||
_entered = promise<>();
|
||||
return _entered->get_future();
|
||||
}
|
||||
|
||||
void unblock() {
|
||||
@@ -1402,7 +1411,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
mt2->apply(m);
|
||||
}
|
||||
|
||||
thr.block();
|
||||
auto f = thr.block();
|
||||
|
||||
auto m0_range = dht::partition_range::make_singular(ring[0].ring_position());
|
||||
auto rd1 = cache.make_reader(s, semaphore.make_permit(), m0_range);
|
||||
@@ -1413,6 +1422,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
rd2.set_max_buffer_size(1);
|
||||
auto rd2_fill_buffer = rd2.fill_buffer();
|
||||
|
||||
f.get();
|
||||
sleep(10ms).get();
|
||||
|
||||
// This update should miss on all partitions
|
||||
@@ -1540,12 +1550,13 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
|
||||
mt2->apply(m);
|
||||
}
|
||||
|
||||
thr.block();
|
||||
auto f = thr.block();
|
||||
|
||||
auto rd1 = cache.make_reader(s, semaphore.make_permit());
|
||||
rd1.set_max_buffer_size(1);
|
||||
auto rd1_fill_buffer = rd1.fill_buffer();
|
||||
|
||||
f.get();
|
||||
sleep(10ms).get();
|
||||
|
||||
// This update should miss on all partitions
|
||||
@@ -3994,3 +4005,81 @@ SEASTAR_TEST_CASE(row_cache_is_populated_using_compacting_sstable_reader) {
|
||||
BOOST_ASSERT(rt.calculate_size() == 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_eviction_of_upper_bound_of_population_range) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto cache_mt = make_lw_shared<replica::memtable>(s.schema());
|
||||
|
||||
auto pkey = s.make_pkey("pk");
|
||||
|
||||
mutation m1(s.schema(), pkey);
|
||||
s.add_row(m1, s.make_ckey(1), "v1");
|
||||
s.add_row(m1, s.make_ckey(2), "v2");
|
||||
cache_mt->apply(m1);
|
||||
|
||||
cache_tracker tracker;
|
||||
throttle thr(true);
|
||||
auto cache_source = make_decorated_snapshot_source(snapshot_source([&] { return cache_mt->as_data_source(); }),
|
||||
[&] (mutation_source src) {
|
||||
return throttled_mutation_source(thr, std::move(src));
|
||||
});
|
||||
row_cache cache(s.schema(), cache_source, tracker);
|
||||
|
||||
auto pr = dht::partition_range::make_singular(pkey);
|
||||
|
||||
auto read = [&] (int start, int end) {
|
||||
auto slice = partition_slice_builder(*s.schema())
|
||||
.with_range(query::clustering_range::make(s.make_ckey(start), s.make_ckey(end)))
|
||||
.build();
|
||||
auto rd = cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice);
|
||||
auto close_rd = deferred_close(rd);
|
||||
auto m_cache = read_mutation_from_flat_mutation_reader(rd).get0();
|
||||
close_rd.close_now();
|
||||
rd = cache_mt->make_flat_reader(s.schema(), semaphore.make_permit(), pr, slice);
|
||||
auto close_rd2 = deferred_close(rd);
|
||||
auto m_mt = read_mutation_from_flat_mutation_reader(rd).get0();
|
||||
BOOST_REQUIRE(m_mt);
|
||||
assert_that(m_cache).has_mutation().is_equal_to(*m_mt);
|
||||
};
|
||||
|
||||
// populate [2]
|
||||
{
|
||||
auto slice = partition_slice_builder(*s.schema())
|
||||
.with_range(query::clustering_range::make_singular(s.make_ckey(2)))
|
||||
.build();
|
||||
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
|
||||
.has_monotonic_positions();
|
||||
}
|
||||
|
||||
auto arrived = thr.block();
|
||||
|
||||
// Read [0, 2]
|
||||
auto f = seastar::async([&] {
|
||||
read(0, 2);
|
||||
});
|
||||
|
||||
arrived.get();
|
||||
|
||||
// populate (2, 3]
|
||||
{
|
||||
auto slice = partition_slice_builder(*s.schema())
|
||||
.with_range(query::clustering_range::make(query::clustering_range::bound(s.make_ckey(2), false),
|
||||
query::clustering_range::bound(s.make_ckey(3), true)))
|
||||
.build();
|
||||
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
|
||||
.has_monotonic_positions();
|
||||
}
|
||||
|
||||
testlog.trace("Evicting");
|
||||
evict_one_row(tracker); // Evicts before(0)
|
||||
evict_one_row(tracker); // Evicts ck(2)
|
||||
testlog.trace("Unblocking");
|
||||
|
||||
thr.unblock();
|
||||
f.get();
|
||||
|
||||
read(0, 3);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -727,7 +727,10 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
|
||||
if (ret < 0) {
|
||||
throw std::runtime_error("CQL frame LZ4 uncompression failure");
|
||||
}
|
||||
return out.size();
|
||||
if (ret != out.size()) {
|
||||
throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
|
||||
}
|
||||
return static_cast<size_t>(ret);
|
||||
});
|
||||
on_compression_buffer_use();
|
||||
return uncomp;
|
||||
|
||||
@@ -1329,6 +1329,12 @@ void reclaim_timer::sample_stats(stats& data) {
|
||||
}
|
||||
|
||||
void reclaim_timer::report() const noexcept {
|
||||
// The logger can allocate (and will recover from allocation failure), and
|
||||
// we're in a memory-sensitive situation here and allocation can easily fail.
|
||||
// Prevent --abort-on-seastar-bad-alloc from crashing us in a situation that
|
||||
// we're likely to recover from, by reclaiming more.
|
||||
auto guard = memory::disable_abort_on_alloc_failure_temporarily();
|
||||
|
||||
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
|
||||
auto info_level = _stall_detected ? log_level::info : log_level::debug;
|
||||
auto MiB = 1024*1024;
|
||||
|
||||
Reference in New Issue
Block a user