Compare commits

...

21 Commits

Author SHA1 Message Date
Jenkins Promoter
6f1efcff31 Update ScyllaDB version to: 2025.2.0-rc1 2025-05-13 22:48:32 +03:00
Botond Dénes
7673a17365 Merge 'compress: fix an internal error when a specific debug log is enabled' from Michał Chojnowski
compress: fix an internal error when a specific debug log is enabled
While iterating over the recent 69684e16d8,
series I shot myself in the foot by defining `algorithm_to_name(algorithm::none)`
to be an internal error, and later calling that anyway in a debug log.

(Tests didn't catch it because there's no test which simultaneously
enables the debug log and configures some table to have no compression).

This proves that `algorithm_to_name` is too much of a footgun.
Fix it so that calling `algorithm_to_name(algorithm::none)` is legal.
In hindsight, I should have done that immediately.

Fixes #23624

Fix for recently-added code, no backporting needed.

Closes scylladb/scylladb#23625

* github.com:scylladb/scylladb:
  test_sstable_compression_dictionaries: reproduce an internal error in debug logging
  compress: fix an internal error when a specific debug log is enabled

(cherry picked from commit 746382257c)
2025-05-12 23:13:59 +03:00
Avi Kivity
ae05d62b97 Merge '[Backport 2025.2] compress: make sstable compression dictionaries NUMA-aware ' from Scylladb[bot]
compress: distribute compression dictionaries over shards
We don't want each shard to have its own copy of each dictionary.
It would unnecessary pressure on cache and memory.
Instead, we want to share dictionaries between shards.

Before this commit, all dictionaries live on shard 0.
All other shards borrow foreign shared pointers from shard 0.

There's a problem with this setup: dictionary blobs receive many random
accesses. If shard 0 is on a remote NUMA node, this could pose
a performance problem.

Therefore, for each dictionary, we would like to have one copy per NUMA node,
not one copy per the entire machine. And each shard should use the copy
belonging to its own NUMA node. This is the main goal of this patch.

There is another issue with putting all dicts on shard 0: it eats
an assymetric amount of memory from shard 0.
This commit spreads the ownership of dicts over all shards within
the NUMA group, to make the situation more symmetric.
(Dict owner is decided based on the hash of dict contents).

It should be noted that the last part isn't necessarily a good thing,
though.
While it makes the situation more symmetric within each node,
it makes it less symmetric across the cluster, if different node
sizes are present.

If dicts occupy 1% of memory on each shard of a 100-shard node,
then the same dicts would occupy 100% of memory on a 1-shard node.

So for the sake of cluster-wide symmetry, we might later want to consider
e.g. making the memory limit for dictionaries inversely proportional
to the number of shards.

New functionality, added to a feature which isn't in any stable branch yet. No backporting.

Edit: no backporting to <=2025.1, but need backporting to 2025.2, where the feature is introduced.

Fixes #24108

- (cherry picked from commit 0e4d0ded8d)

- (cherry picked from commit 8649adafa8)

- (cherry picked from commit 1bcf77951c)

- (cherry picked from commit 6b831aaf1b)

- (cherry picked from commit e952992560)

- (cherry picked from commit 66a454f61d)

- (cherry picked from commit 518f04f1c4)

- (cherry picked from commit f075674ebe)

Parent PR: #23590

Closes scylladb/scylladb#24109

* github.com:scylladb/scylladb:
  test: add test/boost/sstable_compressor_factory_test
  compress: add some test-only APIs
  compress: rename sstable_compressor_factory_impl to dictionary_holder
  compress: fix indentation
  compress: remove sstable_compressor_factory_impl::_owner_shard
  compress: distribute compression dictionaries over shards
  test: switch uses of make_sstable_compressor_factory() to a seastar::thread-dependent version
  test: remove sstables::test_env::do_with()
2025-05-12 23:11:12 +03:00
Michał Chojnowski
732321e3b8 test: add test/boost/sstable_compressor_factory_test
Add a basic test for NUMA awareness of `default_sstable_compressor_factory`.

(cherry picked from commit f075674ebe)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
a2622e1919 compress: add some test-only APIs
Will be needed by the test added in the next patch.

(cherry picked from commit 518f04f1c4)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
270bf34846 compress: rename sstable_compressor_factory_impl to dictionary_holder
Since sstable_compressor_factory_impl no longer
implements sstable_compressor_factory, the name can be
misleading. Rename it to something closer to its new role.

(cherry picked from commit 66a454f61d)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
168f694c5d compress: fix indentation
Purely cosmetic.

(cherry picked from commit e952992560)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
b5579be915 compress: remove sstable_compressor_factory_impl::_owner_shard
Before the series, sstable_compressor_factory_impl was directly
accessed by multiple shards. Now, it's a part of a `sharded`
data structure and is never directly from other shards,
so there's no need to check for that. Remove the leftover logic.

(cherry picked from commit 6b831aaf1b)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
ad60d765f9 compress: distribute compression dictionaries over shards
We don't want each shard to have its own copy of each dictionary.
It would unnecessary pressure on cache and memory.
Instead, we want to share dictionaries between shards.

Before this commit, all dictionaries live on shard 0.
All other shards borrow foreign shared pointers from shard 0.

There's a problem with this setup: dictionary blobs receive many random
accesses. If shard 0 is on a remote NUMA node, this could pose
a performance problem.

Therefore, for each dictionary, we would like to have one copy per NUMA node,
not one copy per the entire machine. And each shard should use the copy
belonging to its own NUMA node. This is the main goal of this patch.

There is another issue with putting all dicts on shard 0: it eats
an assymetric amount of memory from shard 0.
This commit spreads the ownership of dicts over all shards within
the NUMA group, to make the situation more symmetric.
(Dict owner is decided based on the hash of dict contents).

It should be noted that the last part isn't necessarily a good thing,
though.
While it makes the situation more symmetric within each node,
it makes it less symmetric across the cluster, if different node
sizes are present.

If dicts occupy 1% of memory on each shard of a 100-shard node,
then the same dicts would occupy 100% of memory on a 1-shard node.

So for the sake of cluster-wide symmetry, we might later want to consider
e.g. making the memory limit for dictionaries inversely proportional
to the number of shards.

(cherry picked from commit 1bcf77951c)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
68d2086fa5 test: switch uses of make_sstable_compressor_factory() to a seastar::thread-dependent version
In next patches, make_sstable_compressor_factory() will have to
disappear.
In preparation for that, we switch to a seastar::thread-dependent
replacement.

(cherry picked from commit 8649adafa8)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
403d43093f test: remove sstables::test_env::do_with()
`sstable_manager` depends on `sstable_compressor_factory&`.
Currently, `test_env` obtains an implementation of this
interface with the synchronous `make_sstable_compressor_factory()`.

But after this patch, the only implementation of that interface
`sstable_compressor_factory&` will use `sharded<...>`,
so its construction will become asynchronous,
and the synchronous `make_sstable_compressor_factory()` must disappear.

There are several possible ways to deal with this, but I think the
easiest one is to write an asynchronous replacement for
`make_sstable_compressor_factory()`
that will keep the same signature but will be only usable
in a `seastar::thread`.

All other uses of `make_sstable_compressor_factory()` outside of
`test_env::do_with()` already are in seastar threads,
so if we just get rid of `test_env::do_with()`, then we will
be able to use that thread-dependent replacement. This is the
purpose of this commit.

We shouldn't be losing much.

(cherry picked from commit 0e4d0ded8d)
2025-05-12 09:12:04 +00:00
Patryk Jędrzejczak
2b1b4d1dfc Merge '[Backport 2025.2] Correctly skip updating node's own ip address due to oudated gossiper data ' from Scylladb[bot]
Used host id to check if the update is for the node itself. Using IP is unreliable since if a node is restarted with different IP a gossiper message with previous IP can be misinterpreted as belonging to a different node.

Fixes: #22777

Backport to 2025.1 since this fixes a crash. Older version do not have the code.

- (cherry picked from commit a2178b7c31)

- (cherry picked from commit ecd14753c0)

- (cherry picked from commit 7403de241c)

Parent PR: #24000

Closes scylladb/scylladb#24089

* https://github.com/scylladb/scylladb:
  test: add reproducer for #22777
  storage_service: Do not remove gossiper entry on address change
  storage_service: use id to check for local node
2025-05-12 09:31:20 +02:00
Gleb Natapov
827563902c test: add reproducer for #22777
Add sleep before starting gossiper to increase a chance of getting old
gossiper entry about yourself before updating local gossiper info with
new IP address.

(cherry picked from commit 7403de241c)
2025-05-09 12:56:15 +00:00
Gleb Natapov
ccf194bd89 storage_service: Do not remove gossiper entry on address change
When gossiper indexed entries by ip an old entry had to be removed on an
address change, but the index is id based, so even if ip was change the
entry should stay. Gossiper simply updates an ip address there.

(cherry picked from commit ecd14753c0)
2025-05-09 12:56:15 +00:00
Gleb Natapov
9b735bb4dc storage_service: use id to check for local node
IP may change and an old gossiper message with previous IP may be
processed when it shouldn't.

Fixes: #22777
(cherry picked from commit a2178b7c31)
2025-05-09 12:56:15 +00:00
Michał Chojnowski
f29b87970a test/boost/mvcc_test: fix an overly-strong assertion in test_snapshot_cursor_is_consistent_with_merging
The test checks that merging the partition versions on-the-fly using the
cursor gives the same results as merging them destructively with apply_monotonically.

In particular, it tests that the continuity of both results is equal.
However, there's a subtlety which makes this not true.
The cursor puts empty dummy rows (i.e. dummies shadowed by the partition
tombstone) in the output.
But the destructive merge is allowed (as an expection to the general
rule, for optimization reasons), to remove those dummies and thus reduce
the continuity.

So after this patch we instead check that the output of the cursor
has continuity equal to the merged continuities of version.
(Rather than to the continuity of merged versions, which can be
smaller as described above).

Refs https://github.com/scylladb/scylladb/pull/21459, a patch which did
the same in a different test.
Fixes https://github.com/scylladb/scylladb/issues/13642

Closes scylladb/scylladb#24044

(cherry picked from commit 746ec1d4e4)

Closes scylladb/scylladb#24083
2025-05-09 13:00:34 +02:00
Botond Dénes
17a76b6264 Merge '[Backport 2025.2] test/cluster/test_read_repair.py: improve trace logging test (again)' from Scylladb[bot]
The test test_read_repair_with_trace_logging wants to test read repair with trace logging. Turns out that node restart + trace-level logging + debug mode is too much and even with 1 minute timeout, the read repair     times out sometimes. Refactor the test to use injection point instead of restart. To make sure the test still tests what it supposed to test, use tracing to assert that read repair did indeed happen.

Fixes: scylladb/scylladb#23968

Needs backport to 2025.1 and 6.2, both have the flaky test

- (cherry picked from commit 51025de755)

- (cherry picked from commit 29eedaa0e5)

Parent PR: #23989

Closes scylladb/scylladb#24051

* github.com:scylladb/scylladb:
  test/cluster/test_read_repair.py: improve trace logging test (again)
  test/cluster: extract execute_with_tracing() into pylib/util.py
2025-05-08 11:01:18 +03:00
Aleksandra Martyniuk
ab45df1aa1 streaming: skip dropped tables
Currently, stream_session::prepare throws when a table in requests
or summaries is dropped. However, we do not want to fail streaming
if the table is dropped.

Delete table checks from stream_session::prepare. Further streaming
steps can handle the dropped table and finish the streaming successfully.

Fixes: #15257.

Closes scylladb/scylladb#23915

(cherry picked from commit 20c2d6210e)

Closes scylladb/scylladb#24053
2025-05-08 11:00:27 +03:00
Botond Dénes
97f0f312e0 test/cluster/test_read_repair.py: improve trace logging test (again)
The test test_read_repair_with_trace_logging wants to test read repair
with trace logging. Turns out that node restart + trace-level logging
+ debug mode is too much and even with 1 minute timeout, the read repair
times out sometimes.
Refactor the test to use injection point instead of restart. To make
sure the test still tests what it supposed to test, use tracing to
assert that read repair did indeed happen.

(cherry picked from commit 29eedaa0e5)
2025-05-07 13:26:08 +00:00
Botond Dénes
4df6a17d30 test/cluster: extract execute_with_tracing() into pylib/util.py
To allow reuse in other tests.

(cherry picked from commit 51025de755)
2025-05-07 13:26:08 +00:00
Anna Mikhlin
b3dbfaf27a Update ScyllaDB version to: 2025.2.0-rc0 2025-05-07 11:41:33 +03:00
30 changed files with 637 additions and 252 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2025.2.0-dev
VERSION=2025.2.0-rc1
if test -f version
then

View File

@@ -15,6 +15,8 @@
#include <seastar/core/metrics.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/weak_ptr.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/reactor.hh>
#include "utils/reusable_buffer.hh"
#include "sstables/compress.hh"
#include "sstables/exceptions.hh"
@@ -27,7 +29,7 @@
// SHA256
using dict_id = std::array<std::byte, 32>;
class sstable_compressor_factory_impl;
class dictionary_holder;
static seastar::logger compressor_factory_logger("sstable_compressor_factory");
@@ -41,11 +43,11 @@ template <> struct fmt::formatter<compression_parameters::algorithm> : fmt::form
// raw dicts might be used (and kept alive) directly by compressors (in particular, lz4 decompressor)
// or referenced by algorithm-specific dicts.
class raw_dict : public enable_lw_shared_from_this<raw_dict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
dict_id _id;
std::vector<std::byte> _dict;
public:
raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict);
raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict);
~raw_dict();
const std::span<const std::byte> raw() const { return _dict; }
dict_id id() const { return _id; }
@@ -79,13 +81,13 @@ struct zstd_callback_allocator {
// (which internally holds a pointer to the raw dictionary blob
// and parsed entropy tables).
class zstd_ddict : public enable_lw_shared_from_this<zstd_ddict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
size_t _used_memory = 0;
zstd_callback_allocator _alloc;
std::unique_ptr<ZSTD_DDict, decltype(&ZSTD_freeDDict)> _dict;
public:
zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
~zstd_ddict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -100,14 +102,14 @@ public:
// so the level of compression is decided at the time of construction
// of this dict.
class zstd_cdict : public enable_lw_shared_from_this<zstd_cdict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
int _level;
size_t _used_memory = 0;
zstd_callback_allocator _alloc;
std::unique_ptr<ZSTD_CDict, decltype(&ZSTD_freeCDict)> _dict;
public:
zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level);
zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level);
~zstd_cdict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -119,11 +121,11 @@ public:
// and a hash index over the substrings of the blob).
//
class lz4_cdict : public enable_lw_shared_from_this<lz4_cdict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
std::unique_ptr<LZ4_stream_t, decltype(&LZ4_freeStream)> _dict;
public:
lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
~lz4_cdict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -164,6 +166,7 @@ public:
size_t compress_max_size(size_t input_len) const override;
std::map<sstring, sstring> options() const override;
algorithm get_algorithm() const override;
std::optional<unsigned> get_dict_owner_for_test() const override;
};
class snappy_processor: public compressor {
@@ -266,6 +269,7 @@ public:
size_t compress_max_size(size_t input_len) const override;
algorithm get_algorithm() const override;
std::map<sstring, sstring> options() const override;
std::optional<unsigned> get_dict_owner_for_test() const override;
};
zstd_processor::zstd_processor(const compression_parameters& opts, cdict_ptr cdict, ddict_ptr ddict) {
@@ -323,6 +327,16 @@ auto zstd_processor::get_algorithm() const -> algorithm {
return (_cdict || _ddict) ? algorithm::zstd_with_dicts : algorithm::zstd;
}
std::optional<unsigned> zstd_processor::get_dict_owner_for_test() const {
if (_cdict) {
return _cdict.get_owner_shard();
} else if (_ddict) {
return _ddict.get_owner_shard();
} else {
return std::nullopt;
}
}
const std::string_view DICTIONARY_OPTION = ".dictionary.";
static std::map<sstring, sstring> dict_as_options(std::span<const std::byte> d) {
@@ -384,6 +398,10 @@ std::map<sstring, sstring> compressor::options() const {
return {};
}
std::optional<unsigned> compressor::get_dict_owner_for_test() const {
return std::nullopt;
}
std::string compressor::name() const {
return compression_parameters::algorithm_to_qualified_name(get_algorithm());
}
@@ -434,7 +452,7 @@ std::string_view compression_parameters::algorithm_to_name(algorithm alg) {
case algorithm::snappy: return "SnappyCompressor";
case algorithm::zstd: return "ZstdCompressor";
case algorithm::zstd_with_dicts: return "ZstdWithDictsCompressor";
case algorithm::none: on_internal_error(compressor_factory_logger, "algorithm_to_name(): called with algorithm::none");
case algorithm::none: return "none"; // Name used only for logging purposes, can't be chosen by the user.
}
abort();
}
@@ -660,6 +678,16 @@ std::map<sstring, sstring> lz4_processor::options() const {
}
}
std::optional<unsigned> lz4_processor::get_dict_owner_for_test() const {
if (_cdict) {
return _cdict.get_owner_shard();
} else if (_ddict) {
return _ddict.get_owner_shard();
} else {
return std::nullopt;
}
}
compressor_ptr make_lz4_sstable_compressor_for_tests() {
return std::make_unique<lz4_processor>();
}
@@ -751,21 +779,12 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
return snappy_max_compressed_length(input_len);
}
// Constructs compressors and decompressors for SSTables,
// making sure that the expensive identical parts (dictionaries) are shared
// across nodes.
//
// Holds weak pointers to all live dictionaries
// (so that they can be cheaply shared with new SSTables if an identical dict is requested),
// and shared (lifetime-extending) pointers to the current writer ("recommended")
// dict for each table (so that they can be shared with new SSTables without consulting
// `system.dicts`).
//
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
// (and are borrowed by foreign shared pointers) and all requests for a given dict ID go through its owner.
// (Note: this shouldn't pose a performance problem because a dict is only requested once per an opening of an SSTable).
// (Note: at the moment of this writing, one shard owns all. Later we can spread the ownership. (E.g. shard it by dict hash)).
//
// Whenever a dictionary dies (because its refcount reaches 0), its weak pointer
// is removed from the factory.
//
@@ -774,10 +793,10 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
// Has a configurable memory budget for live dicts. If the budget is exceeded,
// will return null dicts to new writers (to avoid making the memory usage even worse)
// and print warnings.
class sstable_compressor_factory_impl : public sstable_compressor_factory, public weakly_referencable<sstable_compressor_factory_impl> {
class dictionary_holder : public weakly_referencable<dictionary_holder> {
mutable logger::rate_limit budget_warning_rate_limit{std::chrono::minutes(10)};
shard_id _owner_shard;
config _cfg;
using config = default_sstable_compressor_factory::config;
const config& _cfg;
uint64_t _total_live_dict_memory = 0;
metrics::metric_groups _metrics;
struct zstd_cdict_id {
@@ -789,7 +808,7 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
std::map<zstd_cdict_id, const zstd_cdict*> _zstd_cdicts;
std::map<dict_id, const zstd_ddict*> _zstd_ddicts;
std::map<dict_id, const lz4_cdict*> _lz4_cdicts;
std::map<table_id, lw_shared_ptr<const raw_dict>> _recommended;
std::map<table_id, foreign_ptr<lw_shared_ptr<const raw_dict>>> _recommended;
size_t memory_budget() const {
return _cfg.memory_fraction_starting_at_which_we_stop_writing_dicts() * seastar::memory::stats().total_memory();
@@ -806,8 +825,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
memory_budget()
);
}
public:
lw_shared_ptr<const raw_dict> get_canonical_ptr(std::span<const std::byte> dict) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (dict.empty()) {
return nullptr;
}
auto id = get_sha256(dict);
if (auto it = _raw_dicts.find(id); it != _raw_dicts.end()) {
return it->second->shared_from_this();
@@ -819,7 +841,9 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
using foreign_zstd_ddict = foreign_ptr<lw_shared_ptr<const zstd_ddict>>;
foreign_zstd_ddict get_zstd_dict_for_reading(lw_shared_ptr<const raw_dict> raw, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!raw) {
return nullptr;
}
lw_shared_ptr<const zstd_ddict> ddict;
// Fo reading, we must allocate a new dict, even if memory budget is exceeded. We have no other choice.
// In any case, if the budget is exceeded after we print a rate-limited warning about it.
@@ -835,15 +859,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(ddict));
}
future<foreign_zstd_ddict> get_zstd_dict_for_reading(std::span<const std::byte> dict, int level) {
return smp::submit_to(_owner_shard, [this, dict, level] -> foreign_zstd_ddict {
auto raw = get_canonical_ptr(dict);
return get_zstd_dict_for_reading(raw, level);
});
}
using foreign_zstd_cdict = foreign_ptr<lw_shared_ptr<const zstd_cdict>>;
foreign_zstd_cdict get_zstd_dict_for_writing(lw_shared_ptr<const raw_dict> raw, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!_cfg.enable_writing_dictionaries() || !raw) {
return nullptr;
}
lw_shared_ptr<const zstd_cdict> cdict;
// If we can share an already-allocated dict, we do that regardless of memory budget.
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
@@ -859,19 +879,6 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(cdict));
}
future<foreign_zstd_cdict> get_zstd_dict_for_writing(table_id t, int level) {
return smp::submit_to(_owner_shard, [this, t, level] -> foreign_zstd_cdict {
if (!_cfg.enable_writing_dictionaries()) {
return {};
}
auto rec_it = _recommended.find(t);
if (rec_it != _recommended.end()) {
return get_zstd_dict_for_writing(rec_it->second, level);
} else {
return {};
}
});
}
using lz4_dicts = std::pair<
foreign_ptr<lw_shared_ptr<const raw_dict>>,
foreign_ptr<lw_shared_ptr<const lz4_cdict>>
@@ -879,18 +886,12 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
using foreign_lz4_ddict = foreign_ptr<lw_shared_ptr<const raw_dict>>;
using foreign_lz4_cdict = foreign_ptr<lw_shared_ptr<const lz4_cdict>>;
foreign_lz4_ddict get_lz4_dict_for_reading(lw_shared_ptr<const raw_dict> raw) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
lw_shared_ptr<const raw_dict> ddict;
return make_foreign(std::move(raw));
}
future<foreign_lz4_ddict> get_lz4_dicts_for_reading(std::span<const std::byte> dict) {
return smp::submit_to(_owner_shard, [this, dict] -> foreign_lz4_ddict {
auto raw = get_canonical_ptr(dict);
return get_lz4_dict_for_reading(raw);
});
}
foreign_lz4_cdict get_lz4_dict_for_writing(lw_shared_ptr<const raw_dict> raw) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!_cfg.enable_writing_dictionaries() || !raw) {
return nullptr;
}
lw_shared_ptr<const lz4_cdict> cdict;
// If we can share an already-allocated dict, we do that regardless of memory budget.
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
@@ -905,24 +906,10 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(cdict));
}
future<foreign_lz4_cdict> get_lz4_dicts_for_writing(table_id t) {
return smp::submit_to(_owner_shard, [this, t] -> foreign_lz4_cdict {
if (!_cfg.enable_writing_dictionaries()) {
return {};
}
auto rec_it = _recommended.find(t);
if (rec_it != _recommended.end()) {
return get_lz4_dict_for_writing(rec_it->second);
} else {
return {};
}
});
}
public:
sstable_compressor_factory_impl(config cfg)
: _owner_shard(this_shard_id())
, _cfg(std::move(cfg))
dictionary_holder(const config& cfg)
: _cfg(cfg)
{
if (_cfg.register_metrics) {
namespace sm = seastar::metrics;
@@ -931,8 +918,8 @@ public:
});
}
}
sstable_compressor_factory_impl(sstable_compressor_factory_impl&&) = delete;
~sstable_compressor_factory_impl() {
dictionary_holder(dictionary_holder&&) = delete;
~dictionary_holder() {
// Note: `_recommended` might be the only thing keeping some dicts alive,
// so clearing it will destroy them.
//
@@ -948,39 +935,36 @@ public:
_recommended.clear();
}
void forget_raw_dict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_raw_dicts.erase(id);
}
void forget_zstd_cdict(dict_id id, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_zstd_cdicts.erase({id, level});
}
void forget_zstd_ddict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_zstd_ddicts.erase(id);
}
void forget_lz4_cdict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_lz4_cdicts.erase(id);
}
future<> set_recommended_dict(table_id t, std::span<const std::byte> dict) override {
return smp::submit_to(_owner_shard, [this, t, dict] {
_recommended.erase(t);
if (dict.size()) {
auto canonical_ptr = get_canonical_ptr(dict);
_recommended.emplace(t, canonical_ptr);
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
t, dict.size(), fmt_hex(canonical_ptr->id()));
} else {
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
}
});
void set_recommended_dict(table_id t, foreign_ptr<lw_shared_ptr<const raw_dict>> dict) {
_recommended.erase(t);
if (dict) {
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
t, dict->raw().size(), fmt_hex(dict->id()));
_recommended.emplace(t, std::move(dict));
} else {
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
}
}
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t) {
auto rec_it = _recommended.find(t);
if (rec_it == _recommended.end()) {
co_return nullptr;
}
co_return co_await rec_it->second.copy();
}
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
void account_memory_delta(ssize_t n) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (static_cast<ssize_t>(_total_live_dict_memory) + n < 0) {
compressor_factory_logger.error(
"Error in dictionary memory accounting: delta {} brings live memory {} below 0",
@@ -990,19 +974,85 @@ public:
}
};
default_sstable_compressor_factory::default_sstable_compressor_factory(config cfg)
: _cfg(std::move(cfg))
, _holder(std::make_unique<dictionary_holder>(_cfg))
{
for (shard_id i = 0; i < smp::count; ++i) {
auto numa_id = _cfg.numa_config[i];
_numa_groups.resize(std::max<size_t>(_numa_groups.size(), numa_id + 1));
_numa_groups[numa_id].push_back(i);
}
}
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writing(schema_ptr s) {
const auto params = s->get_compressor_params();
default_sstable_compressor_factory::~default_sstable_compressor_factory() {
}
std::vector<unsigned> default_sstable_compressor_factory_config::get_default_shard_to_numa_node_mapping() {
auto sp = local_engine->smp().shard_to_numa_node_mapping();
return std::vector<unsigned>(sp.begin(), sp.end());
}
unsigned default_sstable_compressor_factory::local_numa_id() {
return _cfg.numa_config[this_shard_id()];
}
shard_id default_sstable_compressor_factory::get_dict_owner(unsigned numa_id, const sha256_type& sha) {
auto hash = read_unaligned<uint64_t>(sha.data());
const auto& group = _numa_groups[numa_id];
if (group.empty()) {
on_internal_error(compressor_factory_logger, "get_dict_owner called on an empty NUMA group");
}
return group[hash % group.size()];
}
future<> default_sstable_compressor_factory::set_recommended_dict_local(table_id t, std::span<const std::byte> dict) {
if (_leader_shard != this_shard_id()) {
on_internal_error(compressor_factory_logger, fmt::format("set_recommended_dict_local called on wrong shard. Expected: {}, got {}", _leader_shard, this_shard_id()));
}
auto units = co_await get_units(_recommendation_setting_sem, 1);
auto sha = get_sha256(dict);
for (unsigned numa_id = 0; numa_id < _numa_groups.size(); ++numa_id) {
const auto& group = _numa_groups[numa_id];
if (group.empty()) {
continue;
}
auto r = get_dict_owner(numa_id, sha);
auto d = co_await container().invoke_on(r, [dict](self& local) {
return make_foreign(local._holder->get_canonical_ptr(dict));
});
auto local_coordinator = group[0];
co_await container().invoke_on(local_coordinator, coroutine::lambda([t, d = std::move(d)](self& local) mutable {
local._holder->set_recommended_dict(t, std::move(d));
}));
}
}
future<> default_sstable_compressor_factory::set_recommended_dict(table_id t, std::span<const std::byte> dict) {
return container().invoke_on(_leader_shard, &self::set_recommended_dict_local, t, dict);
}
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> default_sstable_compressor_factory::get_recommended_dict(table_id t) {
const auto local_coordinator = _numa_groups[local_numa_id()][0];
return container().invoke_on(local_coordinator, [t](self& local) {
return local._holder->get_recommended_dict(t);
});
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_impl(const compression_parameters& params, table_id id) {
using algorithm = compression_parameters::algorithm;
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", s->id(), algo);
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", id, algo);
switch (algo) {
case algorithm::lz4:
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
case algorithm::lz4_with_dicts: {
auto cdict = _cfg.enable_writing_dictionaries()
? co_await get_lz4_dicts_for_writing(s->id())
: nullptr;
holder::foreign_lz4_cdict cdict;
if (auto recommended = co_await get_recommended_dict(id)) {
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [recommended = std::move(recommended)] (self& local) mutable {
return local._holder->get_lz4_dict_for_writing(recommended.release());
});
}
if (cdict) {
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
}
@@ -1015,9 +1065,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
case algorithm::zstd:
co_return std::make_unique<zstd_processor>(params, nullptr, nullptr);
case algorithm::zstd_with_dicts: {
auto cdict = _cfg.enable_writing_dictionaries()
? co_await get_zstd_dict_for_writing(s->id(), params.zstd_compression_level().value_or(ZSTD_defaultCLevel()))
: nullptr;
holder::foreign_zstd_cdict cdict;
if (auto recommended = co_await get_recommended_dict(id)) {
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [level, recommended = std::move(recommended)] (self& local) mutable {
return local._holder->get_zstd_dict_for_writing(recommended.release(), level);
});
}
if (cdict) {
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
}
@@ -1029,17 +1083,28 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
abort();
}
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_reading(sstables::compression& c) {
const auto params = compression_parameters(sstables::options_from_compression(c));
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing(schema_ptr s) {
return make_compressor_for_writing_impl(s->get_compressor_params(), s->id());
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_for_tests(const compression_parameters& params, table_id id) {
return make_compressor_for_writing_impl(params, id);
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_impl(const compression_parameters& params, std::span<const std::byte> dict) {
using algorithm = compression_parameters::algorithm;
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
switch (algo) {
case algorithm::lz4:
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
case algorithm::lz4_with_dicts: {
auto dict = dict_from_options(c);
auto ddict = co_await get_lz4_dicts_for_reading(std::as_bytes(std::span(*dict)));
auto dict_span = dict;
auto sha = get_sha256(dict_span);
auto dict_owner = get_dict_owner(local_numa_id(), sha);
auto ddict = co_await container().invoke_on(dict_owner, [dict_span] (self& local) mutable {
auto d = local._holder->get_canonical_ptr(dict_span);
return local._holder->get_lz4_dict_for_reading(std::move(d));
});
if (ddict) {
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
}
@@ -1054,8 +1119,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
}
case algorithm::zstd_with_dicts: {
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
auto dict = dict_from_options(c);
auto ddict = co_await get_zstd_dict_for_reading(std::as_bytes(std::span(*dict)), level);
auto dict_span = dict;
auto sha = get_sha256(dict_span);
auto dict_owner = get_dict_owner(local_numa_id(), sha);
auto ddict = co_await container().invoke_on(dict_owner, [level, dict_span] (self& local) mutable {
auto d = local._holder->get_canonical_ptr(dict_span);
return local._holder->get_zstd_dict_for_reading(std::move(d), level);
});
if (ddict) {
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
}
@@ -1067,7 +1137,19 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
abort();
}
raw_dict::raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict)
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading(sstables::compression& c) {
const auto params = compression_parameters(sstables::options_from_compression(c));
auto dict = dict_from_options(c);
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
co_return co_await make_compressor_for_reading_impl(params, std::as_bytes(std::span(*dict)));
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_for_tests(const compression_parameters& params, std::span<const std::byte> dict) {
return make_compressor_for_reading_impl(params, dict);
}
raw_dict::raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict)
: _owner(owner.weak_from_this())
, _id(key)
, _dict(dict.begin(), dict.end())
@@ -1082,7 +1164,7 @@ raw_dict::~raw_dict() {
}
}
zstd_cdict::zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level)
zstd_cdict::zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level)
: _owner(owner.weak_from_this())
, _raw(raw)
, _level(level)
@@ -1114,7 +1196,7 @@ zstd_cdict::~zstd_cdict() {
}
}
zstd_ddict::zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
zstd_ddict::zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
: _owner(owner.weak_from_this())
, _raw(raw)
, _alloc([this] (ssize_t n) {
@@ -1143,7 +1225,7 @@ zstd_ddict::~zstd_ddict() {
}
}
lz4_cdict::lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
lz4_cdict::lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
: _owner(owner.weak_from_this())
, _raw(raw)
, _dict(LZ4_createStream(), LZ4_freeStream)
@@ -1162,6 +1244,28 @@ lz4_cdict::~lz4_cdict() {
}
}
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg) {
return std::make_unique<sstable_compressor_factory_impl>(std::move(cfg));
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
SCYLLA_ASSERT(thread::running_in_thread());
struct wrapper : sstable_compressor_factory {
using impl = default_sstable_compressor_factory;
sharded<impl> _impl;
future<compressor_ptr> make_compressor_for_writing(schema_ptr s) override {
return _impl.local().make_compressor_for_writing(s);
}
future<compressor_ptr> make_compressor_for_reading(sstables::compression& c) override {
return _impl.local().make_compressor_for_reading(c);
}
future<> set_recommended_dict(table_id t, std::span<const std::byte> d) override {
return _impl.local().set_recommended_dict(t, d);
};
wrapper(wrapper&&) = delete;
wrapper() {
_impl.start().get();
}
~wrapper() {
_impl.stop().get();
}
};
return std::make_unique<wrapper>();
}

View File

@@ -64,6 +64,8 @@ public:
virtual algorithm get_algorithm() const = 0;
virtual std::optional<unsigned> get_dict_owner_for_test() const;
using ptr_type = std::unique_ptr<compressor>;
};

View File

@@ -1538,6 +1538,7 @@ deps['test/boost/combined_tests'] += [
'test/boost/secondary_index_test.cc',
'test/boost/sessions_test.cc',
'test/boost/sstable_compaction_test.cc',
'test/boost/sstable_compressor_factory_test.cc',
'test/boost/sstable_directory_test.cc',
'test/boost/sstable_set_test.cc',
'test/boost/statement_restrictions_test.cc',

15
init.cc
View File

@@ -13,6 +13,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <seastar/core/coroutine.hh>
#include "sstables/sstable_compressor_factory.hh"
logging::logger startlog("init");
@@ -129,3 +130,17 @@ void service_set::add(std::any value) {
std::any service_set::find(const std::type_info& type) const {
return _impl->find(type);
}
// Placed here to avoid dependency on db::config in compress.cc,
// where the rest of default_sstable_compressor_factory_config is.
auto default_sstable_compressor_factory_config::from_db_config(
const db::config& cfg,
std::span<const unsigned> numa_config) -> self
{
return self {
.register_metrics = true,
.enable_writing_dictionaries = cfg.sstable_compression_dictionaries_enable_writing,
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg.sstable_compression_dictionaries_memory_budget_fraction,
.numa_config{numa_config.begin(), numa_config.end()},
};
}

14
main.cc
View File

@@ -1236,17 +1236,19 @@ sharded<locator::shared_token_metadata> token_metadata;
auto stop_lang_man = defer_verbose_shutdown("lang manager", [] { langman.invoke_on_all(&lang::manager::stop).get(); });
langman.invoke_on_all(&lang::manager::start).get();
auto sstable_compressor_factory = make_sstable_compressor_factory(sstable_compressor_factory::config{
.register_metrics = true,
.enable_writing_dictionaries = cfg->sstable_compression_dictionaries_enable_writing,
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg->sstable_compression_dictionaries_memory_budget_fraction,
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
sstable_compressor_factory.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config,
std::cref(*cfg), std::cref(numa_groups))).get();
auto stop_compressor_factory = defer_verbose_shutdown("sstable_compressor_factory", [&sstable_compressor_factory] {
sstable_compressor_factory.stop().get();
});
checkpoint(stop_signal, "starting database");
debug::the_database = &db;
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(*sstable_compressor_factory),
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(sstable_compressor_factory),
std::ref(stop_signal.as_sharded_abort_source()), utils::cross_shard_barrier()).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
@@ -1717,7 +1719,7 @@ sharded<locator::shared_token_metadata> token_metadata;
auto sstables_prefix = std::string_view("sstables/");
if (name.starts_with(sstables_prefix)) {
auto table = table_id(utils::UUID(name.substr(sstables_prefix.size())));
co_await sstable_compressor_factory->set_recommended_dict(table, std::move(dict.data));
co_await sstable_compressor_factory.local().set_recommended_dict(table, std::move(dict.data));
} else if (name == dictionary_service::rpc_compression_dict_name) {
co_await utils::announce_dict_to_shards(compressor_tracker, std::move(dict));
}

View File

@@ -457,7 +457,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
switch (rs.state) {
case node_state::normal: {
if (is_me(ip)) {
if (is_me(id)) {
co_return;
}
// In replace-with-same-ip scenario the replaced node IP will be the same
@@ -490,8 +490,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
auto old_ip = it->second;
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
co_await _gossiper.force_remove_endpoint(id, gms::null_permit_id);
}
}
break;
@@ -945,22 +943,13 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
if (prev_ip == endpoint) {
co_return;
}
if (_address_map.find(id) != endpoint) {
// Address map refused to update IP for the host_id,
// this means prev_ip has higher generation than endpoint.
// We can immediately remove endpoint from gossiper
// since it represents an old IP (before an IP change)
// for the given host_id. This is not strictly
// necessary, but it reduces the noise circulated
// in gossiper messages and allows for clearer
// expectations of the gossiper state in tests.
co_await _ss._gossiper.force_remove_endpoint(id, permit_id);
// Do not update address.
co_return;
}
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
if (_ss.raft_topology_change_enabled()) {
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
@@ -1691,6 +1680,8 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
slogger.info("Starting up server gossip");
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
co_await _gossiper.start_gossiping(new_generation, app_states);
utils::get_local_injector().inject("stop_after_starting_gossiping",

View File

@@ -189,7 +189,19 @@ future<float> try_one_compression_config(
const compression_parameters& params,
const utils::chunked_vector<temporary_buffer<char>>& validation_samples
) {
auto factory = make_sstable_compressor_factory();
co_await factory->set_recommended_dict(initial_schema->id(), dict);
co_return co_await try_one_compression_config(*factory, initial_schema, params, validation_samples);
sharded<default_sstable_compressor_factory> factory;
co_await factory.start();
std::exception_ptr ex;
float result;
try {
co_await factory.local().set_recommended_dict(initial_schema->id(), dict);
result = co_await try_one_compression_config(factory.local(), initial_schema, params, validation_samples);
} catch (...) {
ex = std::current_exception();
}
co_await factory.stop();
if (ex) {
co_return coroutine::exception(ex);
}
co_return result;
}

View File

@@ -10,20 +10,89 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "compress.hh"
#include "schema/schema_fwd.hh"
#include "utils/updateable_value.hh"
#include <span>
namespace db {
class config;
} // namespace db
struct dictionary_holder;
class raw_dict;
struct sstable_compressor_factory {
virtual ~sstable_compressor_factory() {}
virtual future<compressor_ptr> make_compressor_for_writing(schema_ptr) = 0;
virtual future<compressor_ptr> make_compressor_for_reading(sstables::compression&) = 0;
virtual future<> set_recommended_dict(table_id, std::span<const std::byte> dict) = 0;
struct config {
bool register_metrics = false;
utils::updateable_value<bool> enable_writing_dictionaries{true};
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
};
};
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg = {});
// Note: I couldn't make this an inner class of default_sstable_compressor_factory,
// because then the compiler gives weird complains about default member initializers in line
// ```
// default_sstable_compressor_factory(config = config{});
// ```
// apparently due to some compiler bug related default initializers.
struct default_sstable_compressor_factory_config {
using self = default_sstable_compressor_factory_config;
static std::vector<unsigned> get_default_shard_to_numa_node_mapping();
bool register_metrics = false;
utils::updateable_value<bool> enable_writing_dictionaries{true};
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
std::vector<unsigned> numa_config{get_default_shard_to_numa_node_mapping()};
static default_sstable_compressor_factory_config from_db_config(
const db::config&,
std::span<const unsigned> numa_config = get_default_shard_to_numa_node_mapping());
};
// Constructs compressors and decompressors for SSTables,
// making sure that the expensive identical parts (dictionaries) are shared
// between all shards within the same NUMA group.
//
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
// decided by a content hash of the dictionary.
// All requests for a given dict ID go through the owner of this ID and return a foreign shared pointer
// to that dict.
//
// (Note: this centralization shouldn't pose a performance problem because a dict is only requested once
// per an opening of an SSTable).
struct default_sstable_compressor_factory : peering_sharded_service<default_sstable_compressor_factory>, sstable_compressor_factory {
using holder = dictionary_holder;
public:
using self = default_sstable_compressor_factory;
using config = default_sstable_compressor_factory_config;
private:
config _cfg;
// Maps NUMA node ID to the array of shards on that node.
std::vector<std::vector<shard_id>> _numa_groups;
// Holds dictionaries owned by this shard.
std::unique_ptr<dictionary_holder> _holder;
// All recommended dictionary updates are serialized by a single "leader shard".
// We do this to avoid dealing with concurrent updates altogether.
semaphore _recommendation_setting_sem{1};
constexpr static shard_id _leader_shard = 0;
private:
using sha256_type = std::array<std::byte, 32>;
unsigned local_numa_id();
shard_id get_dict_owner(unsigned numa_id, const sha256_type& sha);
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t);
future<> set_recommended_dict_local(table_id, std::span<const std::byte> dict);
future<compressor_ptr> make_compressor_for_writing_impl(const compression_parameters&, table_id);
future<compressor_ptr> make_compressor_for_reading_impl(const compression_parameters&, std::span<const std::byte> dict);
public:
default_sstable_compressor_factory(config = config{});
~default_sstable_compressor_factory();
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
future<compressor_ptr> make_compressor_for_writing_for_tests(const compression_parameters&, table_id);
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
future<compressor_ptr> make_compressor_for_reading_for_tests(const compression_parameters&, std::span<const std::byte> dict);
future<> set_recommended_dict(table_id, std::span<const std::byte> dict) override;
};
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread();

View File

@@ -392,35 +392,14 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, nr_requests, summaries.size());
// prepare tasks
set_state(stream_session_state::PREPARING);
auto& db = manager().db();
for (auto& request : requests) {
// always flush on stream request
sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request);
const auto& ks = request.keyspace;
// Make sure cf requested by peer node exists
for (auto& cf : request.column_families) {
try {
db.find_column_family(ks, cf);
} catch (replica::no_such_column_family&) {
auto err = format("[Stream #{}] prepare requested ks={} cf={} does not exist", plan_id, ks, cf);
sslog.warn("{}", err.c_str());
throw std::runtime_error(err);
}
}
add_transfer_ranges(std::move(request.keyspace), std::move(request.ranges), std::move(request.column_families));
co_await coroutine::maybe_yield();
}
for (auto& summary : summaries) {
sslog.debug("[Stream #{}] prepare stream_summary={}", plan_id, summary);
auto cf_id = summary.cf_id;
// Make sure cf the peer node will send to us exists
try {
db.find_column_family(cf_id);
} catch (replica::no_such_column_family&) {
auto err = format("[Stream #{}] prepare cf_id={} does not exist", plan_id, cf_id);
sslog.warn("{}", err.c_str());
throw std::runtime_error(err);
}
prepare_receiving(summary);
}

View File

@@ -335,6 +335,7 @@ add_scylla_test(combined_tests
secondary_index_test.cc
sessions_test.cc
sstable_compaction_test.cc
sstable_compressor_factory_test.cc
sstable_directory_test.cc
sstable_set_test.cc
statement_restrictions_test.cc

View File

@@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
}
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
return sstables::test_env::do_with([] (sstables::test_env& env) {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type)
@@ -581,7 +581,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
return with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
return seastar::async([&env, s, &cf] {
// populate
auto new_key = [&] {
@@ -645,7 +645,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
flushed.get();
});
}).then([cf_stats] {});
}).get();
});
}

View File

@@ -530,6 +530,22 @@ static void reverse(schema_ptr s, mutation_partition& m) {
m = std::move(reverse(mutation(s, std::move(dk), std::move(m))).partition());
}
void assert_has_same_squashed_continuity(const mutation_partition& actual, mvcc_partition& expected) {
const schema& s = *expected.schema();
auto expected_cont = expected.entry().squashed_continuity(s);
auto actual_cont = actual.get_continuity(s);
bool actual_static_cont = actual.static_row_continuous();
bool expected_static_cont = expected.squashed().static_row_continuous();
if (actual_static_cont != expected_static_cont) {
BOOST_FAIL(format("Static row continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
expected_static_cont, actual_static_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
}
if (!expected_cont.equals(s, actual_cont)) {
BOOST_FAIL(format("Continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
expected_cont, actual_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
}
}
SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
// Tests that reading many versions using a cursor gives the logical mutation back.
return seastar::async([] {
@@ -558,7 +574,21 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
auto snap = e.read();
auto actual = read_using_cursor(*snap);
assert_that(s, actual).has_same_continuity(expected);
// Checks that the squashed continuity of `e` is equal to continuity of `actual`.
// Note: squashed continuity of an entry is slightly different than the continuity
// of a squashed entry.
//
// Squashed continuity is the union of continuities of all versions in the entry,
// and in particular it includes empty dummy rows resulting in the logical merge
// of version.
// The process of actually squashing an entry is allowed to
// remove those empty dummies, so the squashed entry can have slightly
// smaller continuity.
//
// Since a cursor isn't allowed to remove dummy rows, the strongest test
// we can do here is to compare the continuity of the cursor-read mutation
// with the squashed continuity of the entry.
assert_has_same_squashed_continuity(actual, e);
assert_that(s, actual).is_equal_to_compacted(expected);
// Reversed iteration

View File

@@ -1666,7 +1666,7 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
}
SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
return test_env::do_with([] (test_env& env) {
return test_env::do_with_async([] (test_env& env) {
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
auto ts_in_ms = std::chrono::milliseconds(ts);
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);
@@ -1702,7 +1702,6 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
BOOST_REQUIRE(ret.second == expected);
}
return make_ready_future<>();
});
}
@@ -2319,6 +2318,7 @@ public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
private:
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<test_env> _env;
uint32_t _seed;
std::unique_ptr<tests::random_schema_specification> _random_schema_spec;
@@ -2336,7 +2336,7 @@ public:
compress))
, _random_schema(_seed, *_random_schema_spec)
{
_env.start().get();
_env.start(test_env_config(), std::ref(*scf)).get();
testlog.info("random_schema: {}", _random_schema.cql());
}
@@ -2402,12 +2402,14 @@ public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
private:
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<test_env> _env;
public:
scrub_test_framework()
{
_env.start().get();
_env.start(test_env_config(), std::ref(*scf)).get();
}
~scrub_test_framework() {

View File

@@ -0,0 +1,133 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <fmt/ranges.h>
#include <seastar/util/defer.hh>
#include <seastar/testing/thread_test_case.hh>
#include "sstables/sstable_compressor_factory.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
// 1. Create a random message.
// 2. Set this random message as the recommended dict.
// 3. On all shards, create compressors.
// 4. Check that they are using the recommended dict (i.e. that the original message compresses perfectly).
// 5. Check that the used dictionaries are owned by shards on the same NUMA node.
// 6. Check that the number of dictionary copies is equal to number of NUMA nodes.
// 7. Repeat this a few times for both lz4 and zstd.
void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
// Create a compressor factory.
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
auto config = default_sstable_compressor_factory::config{
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
};
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
sstable_compressor_factory.start(std::cref(config)).get();
auto stop_compressor_factory = defer([&sstable_compressor_factory] { sstable_compressor_factory.stop().get(); });
// The factory keeps recommended dicts (i.e. dicts for writing) per table ID.
auto table = table_id::create_random_id();
// Retry a few times just to check that it works more than once.
for (int retry = 0; retry < 3; ++retry) {
// Generate a random (and hence uhcompressible without a dict) message.
auto message = tests::random::get_sstring(4096);
auto dict_view = std::as_bytes(std::span(message));
// Set the message as the dict to make the message perfectly compressible.
sstable_compressor_factory.local().set_recommended_dict(table, dict_view).get();
// We'll put the owners here to check that the number of owners matches the number of NUMA nodes.
std::vector<unsigned> compressor_numa_nodes(smp::count);
std::vector<unsigned> decompressor_numa_nodes(smp::count);
// Try for both algorithms, just in case there are some differences in how dictionary
// distribution over shards is implemented between them.
for (const auto algo : {compressor::algorithm::lz4_with_dicts, compressor::algorithm::zstd_with_dicts}) {
sstable_compressor_factory.invoke_on_all(coroutine::lambda([&] (default_sstable_compressor_factory& local) -> seastar::future<> {
// Validate that the dictionaries work as intended,
// and check that their owner is as expected.
auto params = compression_parameters(algo);
auto compressor = co_await local.make_compressor_for_writing_for_tests(params, table);
auto decompressor = co_await local.make_compressor_for_reading_for_tests(params, dict_view);
auto our_numa_node = shard_to_numa_mapping[this_shard_id()];
auto compressor_numa_node = shard_to_numa_mapping[compressor->get_dict_owner_for_test().value()];
auto decompressor_numa_node = shard_to_numa_mapping[decompressor->get_dict_owner_for_test().value()];
// Check that the dictionary used by this shard lies on the same NUMA node.
// This is important to avoid cross-node memory accesses on the hot path.
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
auto output_max_size = compressor->compress_max_size(message.size());
auto compressed = std::vector<char>(output_max_size);
auto compressed_size = compressor->compress(
reinterpret_cast<const char*>(message.data()), message.size(),
reinterpret_cast<char*>(compressed.data()), compressed.size());
BOOST_REQUIRE_GE(compressed_size, 0);
compressed.resize(compressed_size);
// Validate that the recommeded dict was actually used.
BOOST_CHECK(compressed.size() < message.size() / 10);
auto decompressed = std::vector<char>(message.size());
auto decompressed_size = decompressor->uncompress(
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
BOOST_REQUIRE_GE(decompressed_size, 0);
decompressed.resize(decompressed_size);
// Validate that the roundtrip through compressor and decompressor
// resulted in the original message.
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
})).get();
}
// Check that the number of owners (and hence, copies) is equal to the number
// of NUMA nodes.
// This isn't that important, but we don't want to duplicate dictionaries
// within a NUMA node unnecessarily.
BOOST_CHECK_EQUAL(
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
BOOST_CHECK_EQUAL(
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
}
}
SEASTAR_THREAD_TEST_CASE(test_numa_awareness) {
{
std::vector<unsigned> one_numa_node(smp::count);
test_one_numa_topology(one_numa_node);
}
{
std::vector<unsigned> two_numa_nodes(smp::count);
for (size_t i = 0; i < two_numa_nodes.size(); ++i) {
two_numa_nodes[i] = i % 2;
}
test_one_numa_topology(two_numa_nodes);
}
{
std::vector<unsigned> n_numa_nodes(smp::count);
for (size_t i = 0; i < n_numa_nodes.size(); ++i) {
n_numa_nodes[i] = i;
}
test_one_numa_topology(n_numa_nodes);
}
}

View File

@@ -466,8 +466,8 @@ static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_p
}
SEASTAR_TEST_CASE(check_read_indexes) {
return test_env::do_with([] (test_env& env) {
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return seastar::async([&env, version] {
auto builder = schema_builder("test", "summary_test")
.with_column("a", int32_type, column_kind::partition_key);
@@ -478,7 +478,7 @@ SEASTAR_TEST_CASE(check_read_indexes) {
auto list = sstables::test(sst).read_indexes(env.make_reader_permit()).get();
BOOST_REQUIRE(list.size() == 130);
});
});
}).get();
});
}
@@ -499,8 +499,8 @@ SEASTAR_TEST_CASE(check_multi_schema) {
// d int,
// e blob
//);
return test_env::do_with([] (test_env& env) {
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return seastar::async([&env, version] {
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
auto builder = schema_builder("test", "test_multi_schema")
@@ -532,7 +532,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
BOOST_REQUIRE(!m);
});
});
});
}).get();
});
}
@@ -2413,7 +2413,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
}
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
return test_env::do_with([] (test_env& env) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
@@ -2441,8 +2441,6 @@ SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
BOOST_REQUIRE(insert(2, 2) == true);
BOOST_REQUIRE(insert(5, 5) == true);
BOOST_REQUIRE(run.all().size() == 5);
return make_ready_future<>();
});
}

View File

@@ -32,7 +32,9 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -56,7 +58,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -100,7 +104,8 @@ static bool partial_create_links(sstable_ptr sst, fs::path dst_path, sstables::g
SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -121,7 +126,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
SEASTAR_THREAD_TEST_CASE(test_sstable_move_exists_failure) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
// please note, the SSTables used by this test are stored under

View File

@@ -117,6 +117,9 @@ async def test_change_two(manager, random_tables, build_mode):
# IP-s before they are send back to servers[1] and servers[2],
# and the mentioned above code is not exercised by this test.
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
# sleep_before_start_gossiping injections are needed to reproduce #22777
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_start(servers[1].server_id)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
if build_mode != 'release':

View File

@@ -15,7 +15,7 @@ from cassandra.query import SimpleStatement # type: ignore
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
logger = logging.getLogger(__name__)
@@ -47,19 +47,6 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
host1, host2, host3 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
def execute_with_tracing(cql, statement, *args, **kwargs):
kwargs['trace'] = True
query_result = cql.execute(statement, *args, **kwargs)
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
page_traces = []
for trace in tracing:
trace_events = []
for event in trace.events:
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
page_traces.append("\n".join(trace_events))
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = { 'enabled': true }") as ks:
cql.execute(f"CREATE TABLE {ks}.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
" WITH speculative_retry = 'NONE'"
@@ -67,9 +54,9 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
" AND compaction = {'class': 'NullCompactionStrategy'}")
for write_statement, delete_statement in statement_pairs:
execute_with_tracing(cql, write_statement.format(ks=ks))
execute_with_tracing(cql, write_statement.format(ks=ks), log = True)
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False)
execute_with_tracing(cql, delete_statement.format(ks=ks))
execute_with_tracing(cql, delete_statement.format(ks=ks), log = True)
await manager.api.disable_injection(node3.ip_addr, "database_apply")
def check_data(host, data):

View File

@@ -15,9 +15,10 @@ from cassandra.cluster import ConsistencyLevel, Session # type: ignore
from cassandra.query import SimpleStatement # type: ignore
from cassandra.pool import Host # type: ignore
from test.pylib.util import wait_for_cql_and_get_hosts
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
@@ -309,13 +310,13 @@ async def test_incremental_read_repair(data_class: DataClass, manager: ManagerCl
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_read_repair_with_trace_logging(request, manager):
logger.info("Creating a new cluster")
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace"]
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace:debug_error_injection=trace"]
config = {"read_request_timeout_in_ms": 60000}
for i in range(2):
await manager.server_add(cmdline=cmdline, config=config)
[node1, node2] = await manager.servers_add(2, cmdline=cmdline, config=config)
cql = manager.get_cql()
srvs = await manager.running_servers()
@@ -326,13 +327,15 @@ async def test_read_repair_with_trace_logging(request, manager):
await cql.run_async(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 0)")
await manager.server_stop(srvs[0].server_id)
prepared = cql.prepare(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)")
prepared.consistency_level = ConsistencyLevel.ONE
await cql.run_async(prepared)
await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=True)
await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)", consistency_level = ConsistencyLevel.ONE))
await manager.server_start(srvs[0].server_id)
tracing = execute_with_tracing(cql, SimpleStatement(f"SELECT * FROM {ks}.t WHERE pk = 0", consistency_level = ConsistencyLevel.ALL), log = True)
prepared = cql.prepare(f"SELECT * FROM {ks}.t WHERE pk = 0")
prepared.consistency_level = ConsistencyLevel.ALL
await cql.run_async(prepared)
assert len(tracing) == 1 # 1 page
found_read_repair = False
for event in tracing[0]:
found_read_repair |= "digest mismatch, starting read repair" == event.description
assert found_read_repair

View File

@@ -346,13 +346,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
LZ4WithDictsCompressor and ZstdWithDictsCompressor
to the Cassandra-compatible LZ4Compressor and ZstdCompressor.
"""
servers = await manager.servers_add(1)
servers = await manager.servers_add(1, [
*common_debug_cli_options,
])
# Create keyspace and table
logger.info("Creating tables")
cql = manager.get_cql()
algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
dict_algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
nondict_algorithms = ['Snappy', 'LZ4', 'Deflate', 'Zstd']
algorithms = dict_algorithms + nondict_algorithms
no_compression = 'NoCompression'
all_tables = dict_algorithms + nondict_algorithms + [no_compression]
await cql.run_async("""
CREATE KEYSPACE test
@@ -363,14 +369,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
CREATE TABLE test."{algo}" (pk int PRIMARY KEY, c blob)
WITH COMPRESSION = {{'sstable_compression': '{algo}Compressor'}};
''')
for algo in algorithms
])
for algo in algorithms],
cql.run_async(f'''
CREATE TABLE test."{no_compression}" (pk int PRIMARY KEY, c blob)
WITH COMPRESSION = {{}}
''')
)
# Populate data with
blob = random.randbytes(16*1024);
logger.info("Populating table")
n_blobs = 100
for algo in algorithms:
for algo in all_tables:
insert = cql.prepare(f'''INSERT INTO test."{algo}" (pk, c) VALUES (?, ?);''')
insert.consistency_level = ConsistencyLevel.ALL;
for pks in itertools.batched(range(n_blobs), n=100):
@@ -381,7 +392,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
async def validate_select():
cql = manager.get_cql()
for algo in algorithms:
for algo in all_tables:
select = cql.prepare(f'''SELECT c FROM test."{algo}" WHERE pk = ? BYPASS CACHE;''')
results = await cql.run_async(select, [42])
assert results[0][0] == blob
@@ -424,7 +435,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
names = set()
for table_info in sstable_info:
for sstable in table_info["sstables"]:
for prop in sstable["extended_properties"]:
for prop in sstable.get("extended_properties", []):
if prop["group"] == "compression_parameters":
for attr in prop["attributes"]:
if attr["key"] == "sstable_compression":
@@ -433,18 +444,24 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
await asyncio.gather(*[
manager.api.retrain_dict(servers[0].ip_addr, "test", algo)
for algo in algorithms
for algo in all_tables
])
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
name_prefix = "org.apache.cassandra.io.compress."
for algo in algorithms:
for algo in dict_algorithms:
assert (await get_compressor_names(algo)) == {f"{algo}Compressor"}
for algo in nondict_algorithms:
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
assert (await get_compressor_names(no_compression)) == set()
await live_update_config(manager, servers, 'sstable_compression_dictionaries_enable_writing', "false")
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
name_prefix = "org.apache.cassandra.io.compress."
assert (await get_compressor_names("LZ4WithDicts")) == {name_prefix + "LZ4Compressor"}
assert (await get_compressor_names("ZstdWithDicts")) == {name_prefix + "ZstdCompressor"}
for algo in nondict_algorithms:
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
assert (await get_compressor_names(no_compression)) == set()

View File

@@ -128,7 +128,7 @@ public:
static constexpr std::string_view ks_name = "ks";
static std::atomic<bool> active;
private:
std::unique_ptr<sstable_compressor_factory> _scf;
sharded<default_sstable_compressor_factory> _scf;
sharded<replica::database> _db;
sharded<gms::feature_service> _feature_service;
sharded<sstables::storage_manager> _sstm;
@@ -657,10 +657,14 @@ private:
auto stop_lang_manager = defer_verbose_shutdown("lang manager", [this] { _lang_manager.stop().get(); });
_lang_manager.invoke_on_all(&lang::manager::start).get();
_scf = make_sstable_compressor_factory();
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
_scf.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config, std::cref(*cfg), std::cref(numa_groups))).get();
auto stop_scf = defer_verbose_shutdown("sstable_compressor_factory", [this] {
_scf.stop().get();
});
_db_config = &*cfg;
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(*_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
auto stop_db = defer_verbose_shutdown("database", [this] {
_db.stop().get();
});

View File

@@ -109,7 +109,7 @@ public:
void maybe_start_compaction_manager(bool enable = true);
explicit test_env(test_env_config cfg = {}, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
explicit test_env(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
~test_env();
test_env(test_env&&) noexcept;
@@ -176,15 +176,6 @@ public:
replica::table::config make_table_config();
template <typename Func>
static inline auto do_with(Func&& func, test_env_config cfg = {}) {
return seastar::do_with(test_env(std::move(cfg)), [func = std::move(func)] (test_env& env) mutable {
return futurize_invoke(func, env).finally([&env] {
return env.stop();
});
});
}
static future<> do_with_async(noncopyable_function<void (test_env&)> func, test_env_config cfg = {});
static future<> do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)> func);
@@ -192,7 +183,8 @@ public:
template <typename T>
static future<T> do_with_async_returning(noncopyable_function<T (test_env&)> func) {
return seastar::async([func = std::move(func)] {
test_env env;
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test_env env({}, *scf);
auto stop = defer([&] { env.stop().get(); });
return func(env);
});

View File

@@ -201,7 +201,7 @@ struct test_env::impl {
::cache_tracker cache_tracker;
gms::feature_service feature_service;
db::nop_large_data_handler nop_ld_handler;
std::unique_ptr<sstable_compressor_factory> scf;
sstable_compressor_factory& scf;
test_env_sstables_manager mgr;
std::unique_ptr<test_env_compaction_manager> cmgr;
reader_concurrency_semaphore semaphore;
@@ -210,7 +210,7 @@ struct test_env::impl {
data_dictionary::storage_options storage;
abort_source abort;
impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir);
impl(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm, tmpdir* tdir);
impl(impl&&) = delete;
impl(const impl&) = delete;
@@ -219,16 +219,16 @@ struct test_env::impl {
}
};
test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir)
test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, sstables::storage_manager* sstm, tmpdir* tdir)
: local_dir(tdir == nullptr ? std::optional<tmpdir>(std::in_place) : std::optional<tmpdir>(std::nullopt))
, dir(tdir == nullptr ? local_dir.value() : *tdir)
, db_config(make_db_config(dir.path().native(), cfg.storage))
, dir_sem(1)
, feature_service(gms::feature_config_from_db_config(*db_config))
, scf(make_sstable_compressor_factory())
, scf(scfarg)
, mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
feature_service, cache_tracker, cfg.available_memory, dir_sem,
[host_id = locator::host_id::create_random_id()]{ return host_id; }, *scf, abort, current_scheduling_group(), sstm)
[host_id = locator::host_id::create_random_id()]{ return host_id; }, scf, abort, current_scheduling_group(), sstm)
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
, use_uuid(cfg.use_uuid)
, storage(std::move(cfg.storage))
@@ -242,8 +242,8 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdi
}
}
test_env::test_env(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tmp)
: _impl(std::make_unique<impl>(std::move(cfg), sstm, tmp))
test_env::test_env(test_env_config cfg, sstable_compressor_factory& scf, sstables::storage_manager* sstm, tmpdir* tmp)
: _impl(std::make_unique<impl>(std::move(cfg), scf, sstm, tmp))
{
}
@@ -325,7 +325,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
sharded<sstables::storage_manager> sstm;
sstm.start(std::ref(*db_cfg), sstables::storage_manager::config{}).get();
auto stop_sstm = defer([&] { sstm.stop().get(); });
test_env env(std::move(cfg), &sstm.local());
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test_env env(std::move(cfg), *scf, &sstm.local());
auto close_env = defer([&] { env.stop().get(); });
env.manager().plug_sstables_registry(std::make_unique<mock_sstables_registry>());
auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); });
@@ -334,7 +335,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
}
return seastar::async([func = std::move(func), cfg = std::move(cfg)] () mutable {
test_env env(std::move(cfg));
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test_env env(std::move(cfg), *scf);
auto close_env = defer([&] { env.stop().get(); });
func(env);
});
@@ -476,7 +478,8 @@ test_env::do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)>
return seastar::async([func = std::move(func)] {
tmpdir tdir;
sharded<test_env> env;
env.start(test_env_config{}, nullptr, &tdir).get();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
env.start(test_env_config{}, std::ref(*scf), nullptr, &tdir).get();
auto stop = defer([&] { env.stop().get(); });
func(env);
});

View File

@@ -144,7 +144,8 @@ int scylla_sstable_main(int argc, char** argv) {
}
cfg.compaction_strategy = sstables::compaction_strategy::type(app.configuration()["compaction-strategy"].as<sstring>());
cfg.timestamp_range = app.configuration()["timestamp-range"].as<api::timestamp_type>();
test.start(std::move(cfg)).get();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test.start(std::move(cfg), std::ref(*scf)).get();
auto stop_test = deferred_stop(test);
switch (mode) {

View File

@@ -195,7 +195,9 @@ private:
}
public:
perf_sstable_test_env(conf cfg) : _cfg(std::move(cfg))
perf_sstable_test_env(conf cfg, sstable_compressor_factory& scf)
: _env({}, scf)
, _cfg(std::move(cfg))
, s(create_schema(cfg.compaction_strategy))
, _distribution('@', '~')
, _mt(make_lw_shared<replica::memtable>(s))

View File

@@ -24,6 +24,7 @@ from typing import Optional, TypeVar, Any
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import Statement # type: ignore # pylint: disable=no-name-in-module
from cassandra import DriverException, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from test import BUILD_DIR, TOP_SRC_DIR
@@ -315,3 +316,29 @@ async def gather_safely(*awaitables: Awaitable):
def get_xdist_worker_id() -> str | None:
return os.environ.get("PYTEST_XDIST_WORKER")
def execute_with_tracing(cql : Session, statement : str | Statement, log : bool = False, *cql_execute_extra_args, **cql_execute_extra_kwargs):
""" Execute statement via cql session and log the tracing output. """
cql_execute_extra_kwargs['trace'] = True
query_result = cql.execute(statement, *cql_execute_extra_args, **cql_execute_extra_kwargs)
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
ret = []
page_traces = []
for trace in tracing:
ret.append(trace.events)
if not log:
continue
trace_events = []
for event in trace.events:
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
page_traces.append("\n".join(trace_events))
if log:
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
return ret

View File

@@ -49,7 +49,7 @@ tools::tablets_t do_load_system_tablets(const db::config& dbcfg,
std::string_view table_name,
reader_permit permit) {
sharded<sstable_manager_service> sst_man;
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
auto stop_sst_man_service = deferred_stop(sst_man);

View File

@@ -396,7 +396,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__, reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(rcs_sem);
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<sstable_manager_service> sst_man;
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
auto stop_sst_man_service = deferred_stop(sst_man);
@@ -500,7 +500,7 @@ schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem:
cache_tracker tracker;
sstables::directory_semaphore dir_sem(1);
abort_source abort;
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, dbcfg, feature_service, tracker,
memory::stats().total_memory(), dir_sem,
[host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort);

View File

@@ -3559,7 +3559,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
}
gms::feature_service feature_service(gms::feature_config_from_db_config(dbcfg));
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
cache_tracker tracker;
sstables::directory_semaphore dir_sem(1);
abort_source abort;