Compare commits

...

11 Commits

Author SHA1 Message Date
Yaron Kaikov
66f34245fc release: prepare for 5.1.0-rc2 2022-09-19 14:35:28 +03:00
Michał Chojnowski
4047528bd9 db: commitlog: don't print INFO logs on shutdown
The intention was for these logs to be printed during the
database shutdown sequence, but it was overlooked that it's not
the only place where commitlog::shutdown is called.
Commitlogs are started and shut down periodically by hinted handoff.
When that happens, these messages spam the log.

Fix that by adding INFO commitlog shutdown logs to database::stop,
and change the level of the commitlog::shutdown log call to DEBUG.

Fixes #11508

Closes #11536

(cherry picked from commit 9b6fc553b4)
2022-09-18 13:33:05 +03:00
Michał Chojnowski
1a82c61452 sstables: add a flag for disabling long-term index caching
Long-term index caching in the global cache, as introduced in 4.6, is a major
pessimization for workloads where accesses to the index are (spacially) sparse.
We want to have a way to disable it for the affected workloads.

There is already infrastructure in place for disabling it for BYPASS CACHE
queries. One way of solving the issue is hijacking that infrastructure.

This patch adds a global flag (and a corresponding CLI option) which controls
index caching. Setting the flag to `false` causes all index reads to behave
like they would in BYPASS CACHE queries.

Consequences of this choice:

- The per-SSTable partition_index_cache is unused. Every index_reader has
  its own, and they die together. Independent reads can no longer reuse the
  work of other reads which hit the same index pages. This is not crucial,
  since partition accesses have no (natural) spatial locality. Note that
  the original reason for partition_index_cache -- the ability to share
  reads for the lower and upper bound of the query -- is unaffected.
- The per-SSTable cached_file is unused. Every index_reader has its own
  (uncached) input stream from the index file, and every
  bsearch_clustered_cursor has its own cached_file, which dies together with
  the cursor. Note that the cursor still can perform its binary search with
  caching. However, it won't be able to reuse the file pages read by
  index_reader. In particular, if the promoted index is small, and fits inside
  the same file page as its index_entry, that page will be re-read.
  It can also happen that index_reader will read the same index file page
  multiple times. When the summary is so dense that multiple index pages fit in
  one index file page, advancing the upper bound, which reads the next index
  page, will read the same index file page. Since summary:disk ratio is 1:2000,
  this is expected to happen for partitions with size greater than 2000
  partition keys.

Fixes #11202

(cherry picked from commit cdb3e71045)
2022-09-18 13:27:46 +03:00
Avi Kivity
3d9800eb1c logalloc: don't crash while reporting reclaim stalls if --abort-on-seastar-bad-alloc is specified
The logger is proof against allocation failures, except if
--abort-on-seastar-bad-alloc is specified. If it is, it will crash.

The reclaim stall report is likely to be called in low memory conditions
(reclaim's job is to alleviate these conditions after all), so we're
likely to crash here if we're reclaiming a very low memory condition
and have a large stall simultaneously (AND we're running in a debug
environment).

Prevent all this by disabling --abort-on-seastar-bad-alloc temporarily.

Fixes #11549

Closes #11555

(cherry picked from commit d3b8c0c8a6)
2022-09-18 13:24:21 +03:00
Karol Baryła
c48e9b47dd transport/server.cc: Return correct size of decompressed lz4 buffer
An incorrect size is returned from the function, which could lead to
crashes or undefined behavior. Fix by erroring out in these cases.

Fixes #11476

(cherry picked from commit 1c2eef384d)
2022-09-07 10:58:30 +03:00
Avi Kivity
2eadaad9f7 Merge 'database: evict all inactive reads for table when detaching table' from Botond Dénes
Currently, when detaching the table from the database, we force-evict all queriers for said table. This series broadens the scope of this force-evict to include all inactive reads registered at the semaphore. This ensures that any regular inactive read "forgotten" for any reason in the semaphore, will not end up in said readers accessing a dangling table reference when destroyed later.

Fixes: https://github.com/scylladb/scylladb/issues/11264

Closes #11273

* github.com:scylladb/scylladb:
  querier: querier_cache: remove now unused evict_all_for_table()
  database: detach_column_family(): use reader_concurrency_semaphore::evict_inactive_reads_for_table()
  reader_concurrency_semaphore: add evict_inactive_reads_for_table()

(cherry picked from commit afa7960926)
2022-09-02 10:41:22 +03:00
Yaron Kaikov
d10aee15e7 release: prepare for 5.1.0-rc1 2022-09-02 06:15:05 +03:00
Avi Kivity
9e017cb1e6 Update seastar submodule (tls error handling)
* seastar f9f5228b74...3aa91b4d2d (1):
  > Merge 'tls: vec_push: handle async errors rather than throwing on_internal_error' from Benny Halevy

Fixes #11252
2022-09-01 13:10:13 +03:00
Avi Kivity
b8504cc9b2 .gitmodules: switch seastar to scylla-seastar.git
This allows us to backport seastar patches to branch-5.1 on
scylla-seastar.git.
2022-09-01 13:08:22 +03:00
Avi Kivity
856703a85e Merge 'row_cache: Fix missing row if upper bound of population range is evicted and has adjacent dummy' from Tomasz Grabiec
Scenario:

cache = [
    row(pos=2, continuous=false),
    row(pos=after(2), dummy=true)
]

Scanning read starts, starts populating [-inf, before(2)] from sstables.

row(pos=2) is evicted.

cache = [
    row(pos=after(2), dummy=true)
]

Scanning read finishes reading from sstables.

Refreshes cache cursor via
partition_snapshot_row_cursor::maybe_refresh(), which calls
partition_snapshot_row_cursor::advance_to() because iterators are
invalidated. This advances the cursor to
after(2). no_clustering_row_between(2, after(2)) returns true, so
advance_to() returns true, and maybe_refresh() returns true. This is
interpreted by the cache reader as "the cursor has not moved forward",
so it marks the range as complete, without emitting the row with
pos=2. Also, it marks row(pos=after(2)) as continuous, so later reads
will also miss the row.

The bug is in advance_to(), which is using
no_clustering_row_between(a, b) to determine its result, which by
definition excludes the starting key.

Discovered by row_cache_test.cc::test_concurrent_reads_and_eviction
with reduced key range in the random_mutation_generator (1024 -> 16).

Fixes #11239

Closes #11240

* github.com:scylladb/scylladb:
  test: mvcc: Fix illegal use of maybe_refresh()
  tests: row_cache_test: Add test_eviction_of_upper_bound_of_population_range()
  tests: row_cache_test: Introduce one_shot mode to throttle
  row_cache: Fix missing row if upper bound of population range is evicted and has adjacent dummy
2022-08-11 16:51:59 +02:00
Yaron Kaikov
86a6c1fb2b release: prepare for 5.1.0-rc0 2022-08-09 18:48:43 +03:00
24 changed files with 231 additions and 64 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -60,7 +60,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=5.1.0-dev
VERSION=5.1.0-rc2
if test -f version
then

View File

@@ -2031,7 +2031,7 @@ future<> db::commitlog::segment_manager::shutdown() {
}
}
co_await _shutdown_promise->get_shared_future();
clogger.info("Commitlog shutdown complete");
clogger.debug("Commitlog shutdown complete");
}
void db::commitlog::segment_manager::add_file_to_dispose(named_file f, dispose_mode mode) {

View File

@@ -899,6 +899,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Ignore truncation record stored in system tables as if tables were never truncated.")
, force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false,
"Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.")
, cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, true,
"Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.")
, default_log_level(this, "default_log_level", value_status::Used)
, logger_log_level(this, "logger_log_level", value_status::Used)
, log_to_stdout(this, "log_to_stdout", value_status::Used)

View File

@@ -379,6 +379,8 @@ public:
named_value<bool> ignore_truncation_record;
named_value<bool> force_schema_commit_log;
named_value<bool> cache_index_pages;
seastar::logging_settings logging_settings(const log_cli::options&) const;
const db::extensions& extensions() const;

View File

@@ -600,6 +600,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
cfg->broadcast_to_all_shards().get();
// We pass this piece of config through a global as a temporary hack.
// See the comment at the definition of sstables::global_cache_index_pages.
smp::invoke_on_all([&cfg] {
sstables::global_cache_index_pages = cfg->cache_index_pages.operator utils::updateable_value<bool>();
}).get();
::sighup_handler sighup_handler(opts, *cfg);
auto stop_sighup_handler = defer_verbose_shutdown("sighup", [&] {
sighup_handler.stop().get();

View File

@@ -444,7 +444,7 @@ public:
// When throws, the cursor is invalidated and its position is not changed.
bool advance_to(position_in_partition_view lower_bound) {
maybe_advance_to(lower_bound);
return no_clustering_row_between(_schema, lower_bound, position());
return no_clustering_row_between_weak(_schema, lower_bound, position());
}
// Call only when valid.

View File

@@ -571,6 +571,20 @@ bool no_clustering_row_between(const schema& s, position_in_partition_view a, po
}
}
// Returns true if and only if there can't be any clustering_row with position >= a and < b.
// It is assumed that a <= b.
inline
bool no_clustering_row_between_weak(const schema& s, position_in_partition_view a, position_in_partition_view b) {
clustering_key_prefix::equality eq(s);
if (a.has_key() && b.has_key()) {
return eq(a.key(), b.key())
&& (a.get_bound_weight() == bound_weight::after_all_prefixed
|| b.get_bound_weight() != bound_weight::after_all_prefixed);
} else {
return !a.has_key() && !b.has_key();
}
}
// Includes all position_in_partition objects "p" for which: start <= p < end
// And only those.
class position_range {

View File

@@ -413,25 +413,6 @@ future<bool> querier_cache::evict_one() noexcept {
co_return false;
}
future<> querier_cache::evict_all_for_table(const utils::UUID& schema_id) noexcept {
for (auto ip : {&_data_querier_index, &_mutation_querier_index, &_shard_mutation_querier_index}) {
auto& idx = *ip;
for (auto it = idx.begin(); it != idx.end();) {
if (it->second->schema().id() == schema_id) {
auto reader_opt = it->second->permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(*it->second));
it = idx.erase(it);
--_stats.population;
if (reader_opt) {
co_await reader_opt->close();
}
} else {
++it;
}
}
}
co_return;
}
future<> querier_cache::stop() noexcept {
co_await _closing_gate.close();

View File

@@ -383,11 +383,6 @@ public:
/// is empty).
future<bool> evict_one() noexcept;
/// Evict all queriers that belong to a table.
///
/// Should be used when dropping a table.
future<> evict_all_for_table(const utils::UUID& schema_id) noexcept;
/// Close all queriers and wait on background work.
///
/// Should be used before destroying the querier_cache.

View File

@@ -749,6 +749,25 @@ void reader_concurrency_semaphore::clear_inactive_reads() {
}
}
future<> reader_concurrency_semaphore::evict_inactive_reads_for_table(utils::UUID id) noexcept {
inactive_reads_type evicted_readers;
auto it = _inactive_reads.begin();
while (it != _inactive_reads.end()) {
auto& ir = *it;
++it;
if (ir.reader.schema()->id() == id) {
do_detach_inactive_reader(ir, evict_reason::manual);
ir.ttl_timer.cancel();
ir.unlink();
evicted_readers.push_back(ir);
}
}
while (!evicted_readers.empty()) {
std::unique_ptr<inactive_read> irp(&evicted_readers.front());
co_await irp->reader.close();
}
}
std::runtime_error reader_concurrency_semaphore::stopped_exception() {
return std::runtime_error(format("{} was stopped", _name));
}
@@ -771,11 +790,9 @@ future<> reader_concurrency_semaphore::stop() noexcept {
co_return;
}
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
auto reader = std::move(ir.reader);
void reader_concurrency_semaphore::do_detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
ir.detach();
reader.permit()._impl->on_evicted();
std::unique_ptr<inactive_read> irp(&ir);
ir.reader.permit()._impl->on_evicted();
try {
if (ir.notify_handler) {
ir.notify_handler(reason);
@@ -794,7 +811,12 @@ flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(ina
break;
}
--_stats.inactive_reads;
return reader;
}
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
std::unique_ptr<inactive_read> irp(&ir);
do_detach_inactive_reader(ir, reason);
return std::move(irp->reader);
}
void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason) noexcept {

View File

@@ -187,6 +187,7 @@ private:
std::optional<future<>> _execution_loop_future;
private:
void do_detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
[[nodiscard]] flat_mutation_reader_v2 detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
void evict(inactive_read&, evict_reason reason) noexcept;
@@ -302,6 +303,9 @@ public:
/// Clear all inactive reads.
void clear_inactive_reads();
/// Evict all inactive reads the belong to the table designated by the id.
future<> evict_inactive_reads_for_table(utils::UUID id) noexcept;
private:
// The following two functions are extension points for
// future inheriting classes that needs to run some stop

View File

@@ -1017,7 +1017,9 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
remove(*cf);
cf->clear_views();
co_await cf->await_pending_ops();
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) {
co_await sem->evict_inactive_reads_for_table(uuid);
}
auto f = co_await coroutine::as_future(truncate(ks, *cf, std::move(tsf), snapshot));
co_await cf->stop();
f.get(); // re-throw exception from truncate() if any
@@ -2238,10 +2240,14 @@ future<> database::stop() {
// try to ensure that CL has done disk flushing
if (_commitlog) {
dblog.info("Shutting down commitlog");
co_await _commitlog->shutdown();
dblog.info("Shutting down commitlog complete");
}
if (_schema_commitlog) {
dblog.info("Shutting down schema commitlog");
co_await _schema_commitlog->shutdown();
dblog.info("Shutting down schema commitlog complete");
}
co_await _view_update_concurrency_sem.wait(max_memory_pending_view_updates());
if (_commitlog) {

Submodule seastar updated: f9f5228b74...3aa91b4d2d

View File

@@ -1189,7 +1189,7 @@ private:
}
index_reader& get_index_reader() {
if (!_index_reader) {
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
_consumer.trace_state(), caching, _single_partition_read);
}

View File

@@ -1319,7 +1319,7 @@ private:
}
index_reader& get_index_reader() {
if (!_index_reader) {
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
_consumer.trace_state(), caching, _single_partition_read);
}

View File

@@ -87,6 +87,18 @@ thread_local disk_error_signal_type sstable_write_error;
namespace sstables {
// The below flag governs the mode of index file page caching used by the index
// reader.
//
// If set to true, the reader will read and/or populate a common global cache,
// which shares its capacity with the row cache. If false, the reader will use
// BYPASS CACHE semantics for index caching.
//
// This flag is intended to be a temporary hack. The goal is to eventually
// solve index caching problems via a smart cache replacement policy.
//
thread_local utils::updateable_value<bool> global_cache_index_pages(false);
logging::logger sstlog("sstable");
// Because this is a noop and won't hold any state, it is better to use a global than a

View File

@@ -50,6 +50,7 @@
#include "mutation_fragment_stream_validator.hh"
#include "readers/flat_mutation_reader_fwd.hh"
#include "tracing/trace_state.hh"
#include "utils/updateable_value.hh"
#include <seastar/util/optimized_optional.hh>
@@ -58,6 +59,8 @@ class cached_file;
namespace sstables {
extern thread_local utils::updateable_value<bool> global_cache_index_pages;
namespace mc {
class writer;
}

View File

@@ -641,7 +641,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
static mutation_partition read_using_cursor(partition_snapshot& snap) {
tests::reader_concurrency_semaphore_wrapper semaphore;
partition_snapshot_row_cursor cur(*snap.schema(), snap);
cur.maybe_refresh();
cur.advance_to(position_in_partition::before_all_clustered_rows());
auto mp = read_partition_from(*snap.schema(), cur);
for (auto&& rt : snap.range_tombstones()) {
mp.apply_delete(*snap.schema(), rt);

View File

@@ -327,11 +327,6 @@ public:
return *this;
}
test_querier_cache& evict_all_for_table() {
_cache.evict_all_for_table(get_schema()->id()).get();
return *this;
}
test_querier_cache& no_misses() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, _expected_stats.misses);
return *this;
@@ -727,21 +722,6 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
}, std::move(db_cfg_ptr)).get();
}
SEASTAR_THREAD_TEST_CASE(test_evict_all_for_table) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_mutation_querier();
t.evict_all_for_table();
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.misses()
.no_drops()
.no_evictions();
// Check that the querier was removed from the semaphore too.
BOOST_CHECK(!t.get_semaphore().try_evict_one_inactive_read());
}
SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
test_querier_cache t;

View File

@@ -10,6 +10,7 @@
#include "test/lib/simple_schema.hh"
#include "test/lib/eventually.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/random_schema.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/testing/test_case.hh>
@@ -915,3 +916,44 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
}
}
}
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_for_table) {
auto spec = tests::make_random_schema_specification(get_name());
std::list<tests::random_schema> schemas;
std::unordered_map<tests::random_schema*, std::vector<reader_concurrency_semaphore::inactive_read_handle>> schema_handles;
for (unsigned i = 0; i < 4; ++i) {
auto& s = schemas.emplace_back(tests::random_schema(i, *spec));
schema_handles.emplace(&s, std::vector<reader_concurrency_semaphore::inactive_read_handle>{});
}
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
auto stop_sem = deferred_stop(semaphore);
for (auto& s : schemas) {
auto& handles = schema_handles[&s];
for (int i = 0; i < 10; ++i) {
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout))));
}
}
for (auto& s : schemas) {
auto& handles = schema_handles[&s];
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
}
for (auto& s : schemas) {
auto& handles = schema_handles[&s];
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
semaphore.evict_inactive_reads_for_table(s.schema()->id()).get();
for (const auto& [k, v] : schema_handles) {
if (k == &s) {
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return !bool(handle); }));
} else if (!v.empty()) {
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
}
}
handles.clear();
}
}

View File

@@ -1235,9 +1235,13 @@ SEASTAR_TEST_CASE(test_update_failure) {
class throttle {
unsigned _block_counter = 0;
promise<> _p; // valid when _block_counter != 0, resolves when goes down to 0
std::optional<promise<>> _entered;
bool _one_shot;
public:
// one_shot means whether only the first enter() after block() will block.
throttle(bool one_shot = false) : _one_shot(one_shot) {}
future<> enter() {
if (_block_counter) {
if (_block_counter && (!_one_shot || _entered)) {
promise<> p1;
promise<> p2;
@@ -1249,16 +1253,21 @@ public:
p3.set_value();
});
_p = std::move(p2);
if (_entered) {
_entered->set_value();
_entered.reset();
}
return f1;
} else {
return make_ready_future<>();
}
}
void block() {
future<> block() {
++_block_counter;
_p = promise<>();
_entered = promise<>();
return _entered->get_future();
}
void unblock() {
@@ -1402,7 +1411,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
mt2->apply(m);
}
thr.block();
auto f = thr.block();
auto m0_range = dht::partition_range::make_singular(ring[0].ring_position());
auto rd1 = cache.make_reader(s, semaphore.make_permit(), m0_range);
@@ -1413,6 +1422,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
rd2.set_max_buffer_size(1);
auto rd2_fill_buffer = rd2.fill_buffer();
f.get();
sleep(10ms).get();
// This update should miss on all partitions
@@ -1540,12 +1550,13 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
mt2->apply(m);
}
thr.block();
auto f = thr.block();
auto rd1 = cache.make_reader(s, semaphore.make_permit());
rd1.set_max_buffer_size(1);
auto rd1_fill_buffer = rd1.fill_buffer();
f.get();
sleep(10ms).get();
// This update should miss on all partitions
@@ -3994,3 +4005,81 @@ SEASTAR_TEST_CASE(row_cache_is_populated_using_compacting_sstable_reader) {
BOOST_ASSERT(rt.calculate_size() == 1);
});
}
SEASTAR_TEST_CASE(test_eviction_of_upper_bound_of_population_range) {
return seastar::async([] {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto cache_mt = make_lw_shared<replica::memtable>(s.schema());
auto pkey = s.make_pkey("pk");
mutation m1(s.schema(), pkey);
s.add_row(m1, s.make_ckey(1), "v1");
s.add_row(m1, s.make_ckey(2), "v2");
cache_mt->apply(m1);
cache_tracker tracker;
throttle thr(true);
auto cache_source = make_decorated_snapshot_source(snapshot_source([&] { return cache_mt->as_data_source(); }),
[&] (mutation_source src) {
return throttled_mutation_source(thr, std::move(src));
});
row_cache cache(s.schema(), cache_source, tracker);
auto pr = dht::partition_range::make_singular(pkey);
auto read = [&] (int start, int end) {
auto slice = partition_slice_builder(*s.schema())
.with_range(query::clustering_range::make(s.make_ckey(start), s.make_ckey(end)))
.build();
auto rd = cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice);
auto close_rd = deferred_close(rd);
auto m_cache = read_mutation_from_flat_mutation_reader(rd).get0();
close_rd.close_now();
rd = cache_mt->make_flat_reader(s.schema(), semaphore.make_permit(), pr, slice);
auto close_rd2 = deferred_close(rd);
auto m_mt = read_mutation_from_flat_mutation_reader(rd).get0();
BOOST_REQUIRE(m_mt);
assert_that(m_cache).has_mutation().is_equal_to(*m_mt);
};
// populate [2]
{
auto slice = partition_slice_builder(*s.schema())
.with_range(query::clustering_range::make_singular(s.make_ckey(2)))
.build();
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
.has_monotonic_positions();
}
auto arrived = thr.block();
// Read [0, 2]
auto f = seastar::async([&] {
read(0, 2);
});
arrived.get();
// populate (2, 3]
{
auto slice = partition_slice_builder(*s.schema())
.with_range(query::clustering_range::make(query::clustering_range::bound(s.make_ckey(2), false),
query::clustering_range::bound(s.make_ckey(3), true)))
.build();
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
.has_monotonic_positions();
}
testlog.trace("Evicting");
evict_one_row(tracker); // Evicts before(0)
evict_one_row(tracker); // Evicts ck(2)
testlog.trace("Unblocking");
thr.unblock();
f.get();
read(0, 3);
});
}

View File

@@ -727,7 +727,10 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
if (ret < 0) {
throw std::runtime_error("CQL frame LZ4 uncompression failure");
}
return out.size();
if (ret != out.size()) {
throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
}
return static_cast<size_t>(ret);
});
on_compression_buffer_use();
return uncomp;

View File

@@ -1329,6 +1329,12 @@ void reclaim_timer::sample_stats(stats& data) {
}
void reclaim_timer::report() const noexcept {
// The logger can allocate (and will recover from allocation failure), and
// we're in a memory-sensitive situation here and allocation can easily fail.
// Prevent --abort-on-seastar-bad-alloc from crashing us in a situation that
// we're likely to recover from, by reclaiming more.
auto guard = memory::disable_abort_on_alloc_failure_temporarily();
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
auto info_level = _stall_detected ? log_level::info : log_level::debug;
auto MiB = 1024*1024;