Compare commits
39 Commits
next
...
scylla-4.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5ed14bff6 | ||
|
|
8366eda943 | ||
|
|
5d0b0dd4c4 | ||
|
|
6f259be5f1 | ||
|
|
16e512e21c | ||
|
|
c61dc4e87d | ||
|
|
af76a3ba79 | ||
|
|
8fb5ebb2c6 | ||
|
|
bfb11defdd | ||
|
|
2d1ddcbb6a | ||
|
|
4c560b63f0 | ||
|
|
00155e32b1 | ||
|
|
b06dffcc19 | ||
|
|
508e58ef9e | ||
|
|
776faa809f | ||
|
|
7037f43a17 | ||
|
|
bd713959ce | ||
|
|
b7c5a918cb | ||
|
|
fb2ae9e66b | ||
|
|
7a7ed8c65d | ||
|
|
7b9be752ec | ||
|
|
903e967a16 | ||
|
|
b84946895c | ||
|
|
a27188886a | ||
|
|
51d4efc321 | ||
|
|
0847eea8d6 | ||
|
|
35ad57cb9c | ||
|
|
42b0b9ad08 | ||
|
|
68b95bf2ac | ||
|
|
fea83f6ae0 | ||
|
|
76618a7e06 | ||
|
|
189a08ac72 | ||
|
|
a3e9915a83 | ||
|
|
e4bc14ec1a | ||
|
|
972acb6d56 | ||
|
|
7fbfedf025 | ||
|
|
5f175f8103 | ||
|
|
674ad6656a | ||
|
|
58498b4b6c |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.2.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -267,10 +267,13 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/// The same as `impl_max_function_for' but without knowledge of `Type'.
|
||||
/// The same as `impl_max_function_for' but without compile-time dependency on `Type'.
|
||||
class impl_max_dynamic_function final : public aggregate_function::aggregate {
|
||||
data_type _io_type;
|
||||
opt_bytes _max;
|
||||
public:
|
||||
impl_max_dynamic_function(data_type io_type) : _io_type(std::move(io_type)) {}
|
||||
|
||||
virtual void reset() override {
|
||||
_max = {};
|
||||
}
|
||||
@@ -278,12 +281,11 @@ public:
|
||||
return _max.value_or(bytes{});
|
||||
}
|
||||
virtual void add_input(cql_serialization_format sf, const std::vector<opt_bytes>& values) override {
|
||||
if (!values[0]) {
|
||||
if (values.empty() || !values[0]) {
|
||||
return;
|
||||
}
|
||||
const auto val = *values[0];
|
||||
if (!_max || *_max < val) {
|
||||
_max = val;
|
||||
if (!_max || _io_type->less(*_max, *values[0])) {
|
||||
_max = values[0];
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -298,10 +300,13 @@ public:
|
||||
};
|
||||
|
||||
class max_dynamic_function final : public native_aggregate_function {
|
||||
data_type _io_type;
|
||||
public:
|
||||
max_dynamic_function(data_type io_type) : native_aggregate_function("max", io_type, { io_type }) {}
|
||||
max_dynamic_function(data_type io_type)
|
||||
: native_aggregate_function("max", io_type, { io_type })
|
||||
, _io_type(std::move(io_type)) {}
|
||||
virtual std::unique_ptr<aggregate> new_aggregate() override {
|
||||
return std::make_unique<impl_max_dynamic_function>();
|
||||
return std::make_unique<impl_max_dynamic_function>(_io_type);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -358,10 +363,13 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/// The same as `impl_min_function_for' but without knowledge of `Type'.
|
||||
/// The same as `impl_min_function_for' but without compile-time dependency on `Type'.
|
||||
class impl_min_dynamic_function final : public aggregate_function::aggregate {
|
||||
data_type _io_type;
|
||||
opt_bytes _min;
|
||||
public:
|
||||
impl_min_dynamic_function(data_type io_type) : _io_type(std::move(io_type)) {}
|
||||
|
||||
virtual void reset() override {
|
||||
_min = {};
|
||||
}
|
||||
@@ -369,12 +377,11 @@ public:
|
||||
return _min.value_or(bytes{});
|
||||
}
|
||||
virtual void add_input(cql_serialization_format sf, const std::vector<opt_bytes>& values) override {
|
||||
if (!values[0]) {
|
||||
if (values.empty() || !values[0]) {
|
||||
return;
|
||||
}
|
||||
const auto val = *values[0];
|
||||
if (!_min || val < *_min) {
|
||||
_min = val;
|
||||
if (!_min || _io_type->less(*values[0], *_min)) {
|
||||
_min = values[0];
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -389,10 +396,13 @@ public:
|
||||
};
|
||||
|
||||
class min_dynamic_function final : public native_aggregate_function {
|
||||
data_type _io_type;
|
||||
public:
|
||||
min_dynamic_function(data_type io_type) : native_aggregate_function("min", io_type, { io_type }) {}
|
||||
min_dynamic_function(data_type io_type)
|
||||
: native_aggregate_function("min", io_type, { io_type })
|
||||
, _io_type(std::move(io_type)) {}
|
||||
virtual std::unique_ptr<aggregate> new_aggregate() override {
|
||||
return std::make_unique<impl_min_dynamic_function>();
|
||||
return std::make_unique<impl_min_dynamic_function>(_io_type);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -88,16 +88,13 @@ static data_value castas_fctn_simple(data_value from) {
|
||||
template<typename ToType>
|
||||
static data_value castas_fctn_from_decimal_to_float(data_value from) {
|
||||
auto val_from = value_cast<big_decimal>(from);
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
boost::multiprecision::cpp_rational r = val_from.unscaled_value();
|
||||
r /= boost::multiprecision::pow(ten, val_from.scale());
|
||||
return static_cast<ToType>(r);
|
||||
return static_cast<ToType>(val_from.as_rational());
|
||||
}
|
||||
|
||||
static utils::multiprecision_int from_decimal_to_cppint(const data_value& from) {
|
||||
const auto& val_from = value_cast<big_decimal>(from);
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
return boost::multiprecision::cpp_int(val_from.unscaled_value() / boost::multiprecision::pow(ten, val_from.scale()));
|
||||
auto r = val_from.as_rational();
|
||||
return utils::multiprecision_int(numerator(r)/denominator(r));
|
||||
}
|
||||
|
||||
template<typename ToType>
|
||||
|
||||
@@ -357,7 +357,12 @@ lists::setter_by_uuid::execute(mutation& m, const clustering_key_prefix& prefix,
|
||||
|
||||
collection_mutation_description mut;
|
||||
mut.cells.reserve(1);
|
||||
mut.cells.emplace_back(to_bytes(*index), params.make_cell(*ltype->value_comparator(), *value, atomic_cell::collection_member::yes));
|
||||
|
||||
if (!value) {
|
||||
mut.cells.emplace_back(to_bytes(*index), params.make_dead_cell());
|
||||
} else {
|
||||
mut.cells.emplace_back(to_bytes(*index), params.make_cell(*ltype->value_comparator(), *value, atomic_cell::collection_member::yes));
|
||||
}
|
||||
|
||||
m.set_cell(prefix, column, mut.serialize(*ltype));
|
||||
}
|
||||
|
||||
@@ -688,6 +688,11 @@ static query::range<bytes_view> to_range(const term_slice& slice, const query_op
|
||||
extract_bound(statements::bound::END));
|
||||
}
|
||||
|
||||
static bool contains_without_wraparound(
|
||||
const query::range<bytes_view>& range, bytes_view value, const serialized_tri_compare& cmp) {
|
||||
return !range.is_wrap_around(cmp) && range.contains(value, cmp);
|
||||
}
|
||||
|
||||
bool single_column_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
const partition_key& key,
|
||||
const clustering_key_prefix& ckey,
|
||||
@@ -702,13 +707,13 @@ bool single_column_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
return false;
|
||||
}
|
||||
return cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return to_range(_slice, options, _column_def.name_as_text()).contains(
|
||||
return contains_without_wraparound(to_range(_slice, options, _column_def.name_as_text()),
|
||||
cell_value_bv, _column_def.type->as_tri_comparator());
|
||||
});
|
||||
}
|
||||
|
||||
bool single_column_restriction::slice::is_satisfied_by(bytes_view data, const query_options& options) const {
|
||||
return to_range(_slice, options, _column_def.name_as_text()).contains(
|
||||
return contains_without_wraparound(to_range(_slice, options, _column_def.name_as_text()),
|
||||
data, _column_def.type->underlying_type()->as_tri_comparator());
|
||||
}
|
||||
|
||||
|
||||
19
database.hh
19
database.hh
@@ -1099,6 +1099,10 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
/// Read a range from the passed-in sstables.
|
||||
///
|
||||
/// The reader is unrestricted, but will account its resource usage on the
|
||||
/// semaphore belonging to the passed-in permit.
|
||||
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
@@ -1110,6 +1114,21 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
/// Read a range from the passed-in sstables.
|
||||
///
|
||||
/// The reader is restricted, that is it will wait for admission on the semaphore
|
||||
/// belonging to the passed-in permit, before starting to read.
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
class user_types_metadata;
|
||||
|
||||
class keyspace_metadata final {
|
||||
|
||||
@@ -304,7 +304,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
mutation m(cf.schema(), fm.decorated_key(*cf.schema()));
|
||||
converting_mutation_partition_applier v(cm, *cf.schema(), m.partition());
|
||||
fm.partition().accept(cm, v);
|
||||
return do_with(std::move(m), [&db, &cf] (mutation m) {
|
||||
return do_with(std::move(m), [&db, &cf] (const mutation& m) {
|
||||
return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -58,6 +58,7 @@
|
||||
#include "cql3/util.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "db/view/view_updating_consumer.hh"
|
||||
#include "db/system_keyspace_view_types.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "frozen_mutation.hh"
|
||||
@@ -1909,5 +1910,48 @@ future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_d
|
||||
});
|
||||
}
|
||||
|
||||
const size_t view_updating_consumer::buffer_size_soft_limit{1 * 1024 * 1024};
|
||||
const size_t view_updating_consumer::buffer_size_hard_limit{2 * 1024 * 1024};
|
||||
|
||||
void view_updating_consumer::do_flush_buffer() {
|
||||
_staging_reader_handle.pause();
|
||||
|
||||
if (_buffer.front().partition().empty()) {
|
||||
// If we flushed mid-partition we can have an empty mutation if we
|
||||
// flushed right before getting the end-of-partition fragment.
|
||||
_buffer.pop_front();
|
||||
}
|
||||
|
||||
while (!_buffer.empty()) {
|
||||
try {
|
||||
auto lock_holder = _view_update_pusher(std::move(_buffer.front())).get();
|
||||
} catch (...) {
|
||||
vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception());
|
||||
}
|
||||
_buffer.pop_front();
|
||||
}
|
||||
|
||||
_buffer_size = 0;
|
||||
_m = nullptr;
|
||||
}
|
||||
|
||||
void view_updating_consumer::maybe_flush_buffer_mid_partition() {
|
||||
if (_buffer_size >= buffer_size_hard_limit) {
|
||||
auto m = mutation(_schema, _m->decorated_key(), mutation_partition(_schema));
|
||||
do_flush_buffer();
|
||||
_buffer.emplace_back(std::move(m));
|
||||
_m = &_buffer.back();
|
||||
}
|
||||
}
|
||||
|
||||
view_updating_consumer::view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
|
||||
evictable_reader_handle& staging_reader_handle)
|
||||
: view_updating_consumer(std::move(schema), as, staging_reader_handle,
|
||||
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable {
|
||||
auto s = m.schema();
|
||||
return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables);
|
||||
})
|
||||
{ }
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
@@ -44,33 +44,49 @@ future<> view_update_generator::start() {
|
||||
|
||||
// If we got here, we will process all tables we know about so far eventually so there
|
||||
// is no starvation
|
||||
for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) {
|
||||
for (auto table_it = _sstables_with_tables.begin(); table_it != _sstables_with_tables.end(); table_it = _sstables_with_tables.erase(table_it)) {
|
||||
auto& [t, t_sstables] = *table_it;
|
||||
schema_ptr s = t->schema();
|
||||
|
||||
vug_logger.trace("Processing {}.{}: {} sstables", s->ks_name(), s->cf_name(), t_sstables.size());
|
||||
|
||||
// Copy what we have so far so we don't miss new updates
|
||||
auto sstables = std::exchange(_sstables_with_tables[t], {});
|
||||
auto sstables = std::exchange(t_sstables, {});
|
||||
|
||||
const auto num_sstables = sstables.size();
|
||||
|
||||
try {
|
||||
// temporary: need an sstable set for the flat mutation reader, but the
|
||||
// compaction_descriptor takes a vector. Soon this will become a compaction
|
||||
// so the transformation to the SSTable set will not be needed.
|
||||
auto ssts = make_lw_shared(t->get_compaction_strategy().make_sstable_set(s));
|
||||
// Exploit the fact that sstables in the staging directory
|
||||
// are usually non-overlapping and use a partitioned set for
|
||||
// the read.
|
||||
auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, make_lw_shared<sstable_list>(sstable_list{}), false));
|
||||
for (auto& sst : sstables) {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
|
||||
flat_mutation_reader staging_sstable_reader = ::make_range_sstable_reader(s,
|
||||
auto ms = mutation_source([this, ssts] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
});
|
||||
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
s,
|
||||
_db.make_query_class_config().semaphore.make_permit(),
|
||||
std::move(ssts),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
service::get_local_streaming_priority(),
|
||||
nullptr,
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
inject_failure("view_update_generator_consume_staging_sstable");
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout);
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as, staging_sstable_reader_handle), db::no_timeout);
|
||||
if (result == stop_iteration::yes) {
|
||||
break;
|
||||
}
|
||||
@@ -89,7 +105,7 @@ future<> view_update_generator::start() {
|
||||
// Move from staging will be retried upon restart.
|
||||
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
|
||||
}
|
||||
_registration_sem.signal();
|
||||
_registration_sem.signal(num_sstables);
|
||||
}
|
||||
// For each table, move the processed staging sstables into the table's base dir.
|
||||
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {
|
||||
|
||||
@@ -32,7 +32,10 @@
|
||||
namespace db::view {
|
||||
|
||||
class view_update_generator {
|
||||
public:
|
||||
static constexpr size_t registration_queue_size = 5;
|
||||
|
||||
private:
|
||||
database& _db;
|
||||
seastar::abort_source _as;
|
||||
future<> _started = make_ready_future<>();
|
||||
@@ -51,6 +54,8 @@ public:
|
||||
future<> start();
|
||||
future<> stop();
|
||||
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<table> table);
|
||||
|
||||
ssize_t available_register_units() const { return _registration_sem.available_units(); }
|
||||
private:
|
||||
bool should_throttle() const;
|
||||
};
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "database.hh"
|
||||
|
||||
class evictable_reader_handle;
|
||||
|
||||
namespace db::view {
|
||||
|
||||
/*
|
||||
@@ -34,22 +36,46 @@ namespace db::view {
|
||||
* It is expected to be run in seastar::async threaded context through consume_in_thread()
|
||||
*/
|
||||
class view_updating_consumer {
|
||||
schema_ptr _schema;
|
||||
lw_shared_ptr<table> _table;
|
||||
std::vector<sstables::shared_sstable> _excluded_sstables;
|
||||
const seastar::abort_source* _as;
|
||||
std::optional<mutation> _m;
|
||||
public:
|
||||
view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as)
|
||||
// We prefer flushing on partition boundaries, so at the end of a partition,
|
||||
// we flush on reaching the soft limit. Otherwise we continue accumulating
|
||||
// data. We flush mid-partition if we reach the hard limit.
|
||||
static const size_t buffer_size_soft_limit;
|
||||
static const size_t buffer_size_hard_limit;
|
||||
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
const seastar::abort_source* _as;
|
||||
evictable_reader_handle& _staging_reader_handle;
|
||||
circular_buffer<mutation> _buffer;
|
||||
mutation* _m{nullptr};
|
||||
size_t _buffer_size{0};
|
||||
noncopyable_function<future<row_locker::lock_holder>(mutation)> _view_update_pusher;
|
||||
|
||||
private:
|
||||
void do_flush_buffer();
|
||||
void maybe_flush_buffer_mid_partition();
|
||||
|
||||
public:
|
||||
// Push updates with a custom pusher. Mainly for tests.
|
||||
view_updating_consumer(schema_ptr schema, const seastar::abort_source& as, evictable_reader_handle& staging_reader_handle,
|
||||
noncopyable_function<future<row_locker::lock_holder>(mutation)> view_update_pusher)
|
||||
: _schema(std::move(schema))
|
||||
, _table(table.shared_from_this())
|
||||
, _excluded_sstables(std::move(excluded_sstables))
|
||||
, _as(&as)
|
||||
, _m()
|
||||
, _staging_reader_handle(staging_reader_handle)
|
||||
, _view_update_pusher(std::move(view_update_pusher))
|
||||
{ }
|
||||
|
||||
view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
|
||||
evictable_reader_handle& staging_reader_handle);
|
||||
|
||||
view_updating_consumer(view_updating_consumer&&) = default;
|
||||
|
||||
view_updating_consumer& operator=(view_updating_consumer&&) = delete;
|
||||
|
||||
void consume_new_partition(const dht::decorated_key& dk) {
|
||||
_m = mutation(_schema, dk, mutation_partition(_schema));
|
||||
_buffer.emplace_back(_schema, dk, mutation_partition(_schema));
|
||||
_m = &_buffer.back();
|
||||
}
|
||||
|
||||
void consume(tombstone t) {
|
||||
@@ -60,7 +86,9 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_buffer_size += sr.memory_usage(*_schema);
|
||||
_m->partition().apply(*_schema, std::move(sr));
|
||||
maybe_flush_buffer_mid_partition();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
@@ -68,7 +96,9 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_buffer_size += cr.memory_usage(*_schema);
|
||||
_m->partition().apply(*_schema, std::move(cr));
|
||||
maybe_flush_buffer_mid_partition();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
@@ -76,14 +106,27 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_buffer_size += rt.memory_usage(*_schema);
|
||||
_m->partition().apply(*_schema, std::move(rt));
|
||||
maybe_flush_buffer_mid_partition();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
// Expected to be run in seastar::async threaded context (consume_in_thread())
|
||||
stop_iteration consume_end_of_partition();
|
||||
stop_iteration consume_end_of_partition() {
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
if (_buffer_size >= buffer_size_soft_limit) {
|
||||
do_flush_buffer();
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume_end_of_stream() {
|
||||
if (!_buffer.empty()) {
|
||||
do_flush_buffer();
|
||||
}
|
||||
return stop_iteration(_as->abort_requested());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -59,7 +59,12 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason) {
|
||||
return make_exception_future<>(std::runtime_error("Wrong stream_reason provided: it can only be replace or bootstrap"));
|
||||
}
|
||||
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _abort_source, _tokens, _address, description, reason);
|
||||
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(gms::get_local_gossiper().get_unreachable_members()));
|
||||
auto nodes_to_filter = gms::get_local_gossiper().get_unreachable_members();
|
||||
if (reason == streaming::stream_reason::replace && _db.local().get_replace_address()) {
|
||||
nodes_to_filter.insert(_db.local().get_replace_address().value());
|
||||
}
|
||||
blogger.debug("nodes_to_filter={}", nodes_to_filter);
|
||||
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(nodes_to_filter));
|
||||
auto keyspaces = make_lw_shared<std::vector<sstring>>(_db.local().get_non_system_keyspaces());
|
||||
return do_for_each(*keyspaces, [this, keyspaces, streamer] (sstring& keyspace_name) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
|
||||
15
dist/common/scripts/scylla-housekeeping
vendored
15
dist/common/scripts/scylla-housekeeping
vendored
@@ -61,7 +61,15 @@ def sh_command(*args):
|
||||
return out
|
||||
|
||||
def get_url(path):
|
||||
return urllib.request.urlopen(path).read().decode('utf-8')
|
||||
# If server returns any error, like 403, or 500 urllib.request throws exception, which is not serializable.
|
||||
# When multiprocessing routines fail to serialize it, it throws ambiguous serialization exception
|
||||
# from get_json_from_url.
|
||||
# In order to see legit error we catch it from the inside of process, covert to string and
|
||||
# pass it as part of return value
|
||||
try:
|
||||
return 0, urllib.request.urlopen(path).read().decode('utf-8')
|
||||
except Exception as exc:
|
||||
return 1, str(exc)
|
||||
|
||||
def get_json_from_url(path):
|
||||
pool = mp.Pool(processes=1)
|
||||
@@ -71,13 +79,16 @@ def get_json_from_url(path):
|
||||
# to enforce a wallclock timeout.
|
||||
result = pool.apply_async(get_url, args=(path,))
|
||||
try:
|
||||
retval = result.get(timeout=5)
|
||||
status, retval = result.get(timeout=5)
|
||||
except mp.TimeoutError as err:
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
raise
|
||||
if status == 1:
|
||||
raise RuntimeError(f'Failed to get "{path}" due to the following error: {retval}')
|
||||
return json.loads(retval)
|
||||
|
||||
|
||||
def get_api(path):
|
||||
return get_json_from_url("http://" + api_address + path)
|
||||
|
||||
|
||||
13
dist/common/scripts/scylla_setup
vendored
13
dist/common/scripts/scylla_setup
vendored
@@ -27,6 +27,7 @@ import glob
|
||||
import shutil
|
||||
import io
|
||||
import stat
|
||||
import distro
|
||||
from scylla_util import *
|
||||
|
||||
interactive = False
|
||||
@@ -385,6 +386,9 @@ if __name__ == '__main__':
|
||||
if not stat.S_ISBLK(os.stat(dsk).st_mode):
|
||||
print('{} is not block device'.format(dsk))
|
||||
continue
|
||||
if dsk in selected:
|
||||
print(f'{dsk} is already added')
|
||||
continue
|
||||
selected.append(dsk)
|
||||
devices.remove(dsk)
|
||||
disks = ','.join(selected)
|
||||
@@ -468,5 +472,10 @@ if __name__ == '__main__':
|
||||
print('Please restart your machine before using ScyllaDB, as you have disabled')
|
||||
print(' SELinux.')
|
||||
|
||||
if dist_name() == 'Ubuntu':
|
||||
run('apt-get install -y hugepages')
|
||||
if distro.id() == 'ubuntu':
|
||||
# Ubuntu version is 20.04 or later
|
||||
if int(distro.major_version()) >= 20:
|
||||
hugepkg = 'libhugetlbfs-bin'
|
||||
else:
|
||||
hugepkg = 'hugepages'
|
||||
run(f'apt-get install -y {hugepkg}')
|
||||
|
||||
4
dist/common/scripts/scylla_swap_setup
vendored
4
dist/common/scripts/scylla_swap_setup
vendored
@@ -40,6 +40,10 @@ if __name__ == '__main__':
|
||||
sys.exit(1)
|
||||
|
||||
memtotal = get_memtotal_gb()
|
||||
if memtotal == 0:
|
||||
print('memory too small: {} KB'.format(get_memtotal()))
|
||||
sys.exit(1)
|
||||
|
||||
# Scylla document says 'swap size should be set to either total_mem/3 or
|
||||
# 16GB - lower of the two', so we need to compare 16g vs memtotal/3 and
|
||||
# choose lower one
|
||||
|
||||
18
dist/common/scripts/scylla_util.py
vendored
18
dist/common/scripts/scylla_util.py
vendored
@@ -331,7 +331,7 @@ class scylla_cpuinfo:
|
||||
|
||||
# When a CLI tool is not installed, use relocatable CLI tool provided by Scylla
|
||||
scylla_env = os.environ.copy()
|
||||
scylla_env['PATH'] = '{}:{}'.format(scylla_env['PATH'], scyllabindir())
|
||||
scylla_env['PATH'] = '{}:{}'.format(scyllabindir(), scylla_env['PATH'])
|
||||
|
||||
def run(cmd, shell=False, silent=False, exception=True):
|
||||
stdout = subprocess.DEVNULL if silent else None
|
||||
@@ -446,6 +446,19 @@ def dist_ver():
|
||||
return distro.version()
|
||||
|
||||
|
||||
SYSTEM_PARTITION_UUIDS = [
|
||||
'21686148-6449-6e6f-744e-656564454649', # BIOS boot partition
|
||||
'c12a7328-f81f-11d2-ba4b-00a0c93ec93b', # EFI system partition
|
||||
'024dee41-33e7-11d3-9d69-0008c781f39f' # MBR partition scheme
|
||||
]
|
||||
|
||||
def get_partition_uuid(dev):
|
||||
return out(f'lsblk -n -oPARTTYPE {dev}')
|
||||
|
||||
def is_system_partition(dev):
|
||||
uuid = get_partition_uuid(dev)
|
||||
return (uuid in SYSTEM_PARTITION_UUIDS)
|
||||
|
||||
def is_unused_disk(dev):
|
||||
# dev is not in /sys/class/block/, like /dev/nvme[0-9]+
|
||||
if not os.path.isdir('/sys/class/block/{dev}'.format(dev=dev.replace('/dev/', ''))):
|
||||
@@ -453,7 +466,8 @@ def is_unused_disk(dev):
|
||||
try:
|
||||
fd = os.open(dev, os.O_EXCL)
|
||||
os.close(fd)
|
||||
return True
|
||||
# dev is not reserved for system
|
||||
return not is_system_partition(dev)
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=666.development
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/scylla-4.2/latest/scylla.repo
|
||||
ARG VERSION=4.2
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -98,6 +98,7 @@ fedora_packages=(
|
||||
debhelper
|
||||
fakeroot
|
||||
file
|
||||
dpkg-dev
|
||||
)
|
||||
|
||||
centos_packages=(
|
||||
|
||||
10
lua.cc
10
lua.cc
@@ -262,14 +262,12 @@ static auto visit_lua_raw_value(lua_State* l, int index, Func&& f) {
|
||||
|
||||
template <typename Func>
|
||||
static auto visit_decimal(const big_decimal &v, Func&& f) {
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
const auto& dividend = v.unscaled_value();
|
||||
auto divisor = boost::multiprecision::pow(ten, v.scale());
|
||||
boost::multiprecision::cpp_rational r = v.as_rational();
|
||||
const boost::multiprecision::cpp_int& dividend = numerator(r);
|
||||
const boost::multiprecision::cpp_int& divisor = denominator(r);
|
||||
if (dividend % divisor == 0) {
|
||||
return f(utils::multiprecision_int(boost::multiprecision::cpp_int(dividend/divisor)));
|
||||
return f(utils::multiprecision_int(dividend/divisor));
|
||||
}
|
||||
boost::multiprecision::cpp_rational r = dividend;
|
||||
r /= divisor;
|
||||
return f(r.convert_to<double>());
|
||||
}
|
||||
|
||||
|
||||
@@ -572,7 +572,12 @@ messaging_service::initial_scheduling_info() const {
|
||||
|
||||
scheduling_group
|
||||
messaging_service::scheduling_group_for_verb(messaging_verb verb) const {
|
||||
return _scheduling_info_for_connection_index[get_rpc_client_idx(verb)].sched_group;
|
||||
// We are not using get_rpc_client_idx() because it figures out the client
|
||||
// index based on the current scheduling group, which is relevant when
|
||||
// selecting the right client for sending a message, but is not relevant
|
||||
// when registering handlers.
|
||||
const auto idx = s_rpc_client_idx_table[static_cast<size_t>(verb)];
|
||||
return _scheduling_info_for_connection_index[idx].sched_group;
|
||||
}
|
||||
|
||||
scheduling_group
|
||||
@@ -1199,14 +1204,14 @@ future<partition_checksum> messaging_service::send_repair_checksum_range(
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void messaging_service::register_repair_get_full_row_hashes(std::function<future<std::unordered_set<repair_hash>> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
|
||||
void messaging_service::register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_full_row_hashes() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES);
|
||||
}
|
||||
future<std::unordered_set<repair_hash>> messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id) {
|
||||
return send_message<future<std::unordered_set<repair_hash>>>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id);
|
||||
future<repair_hash_set> messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id) {
|
||||
return send_message<future<repair_hash_set>>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
@@ -1231,13 +1236,13 @@ future<get_sync_boundary_response> messaging_service::send_repair_get_sync_bound
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void messaging_service::register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows)>&& func) {
|
||||
void messaging_service::register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_row_diff() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF);
|
||||
}
|
||||
future<repair_rows_on_wire> messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
|
||||
future<repair_rows_on_wire> messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows) {
|
||||
return send_message<future<repair_rows_on_wire>>(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(id), repair_meta_id, std::move(set_diff), needs_all_rows);
|
||||
}
|
||||
|
||||
|
||||
@@ -339,9 +339,9 @@ public:
|
||||
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version);
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void register_repair_get_full_row_hashes(std::function<future<std::unordered_set<repair_hash>> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
|
||||
void register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
|
||||
future<> unregister_repair_get_full_row_hashes();
|
||||
future<std::unordered_set<repair_hash>> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id);
|
||||
future<repair_hash_set> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
void register_repair_get_combined_row_hash(std::function<future<get_combined_row_hash_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary)>&& func);
|
||||
@@ -354,9 +354,9 @@ public:
|
||||
future<get_sync_boundary_response> send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary);
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows)>&& func);
|
||||
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows)>&& func);
|
||||
future<> unregister_repair_get_row_diff();
|
||||
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, std::unordered_set<repair_hash> set_diff, bool needs_all_rows);
|
||||
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows);
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF
|
||||
void register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff)>&& func);
|
||||
|
||||
@@ -300,10 +300,9 @@ flat_mutation_reader read_context::create_reader(
|
||||
}
|
||||
|
||||
auto& table = _db.local().find_column_family(schema);
|
||||
auto class_config = _db.local().make_query_class_config();
|
||||
|
||||
if (!rm.rparts) {
|
||||
rm.rparts = make_foreign(std::make_unique<reader_meta::remote_parts>(class_config.semaphore));
|
||||
rm.rparts = make_foreign(std::make_unique<reader_meta::remote_parts>(semaphore()));
|
||||
}
|
||||
|
||||
rm.rparts->range = std::make_unique<const dht::partition_range>(pr);
|
||||
@@ -513,18 +512,28 @@ future<> read_context::lookup_readers() {
|
||||
}
|
||||
|
||||
return parallel_for_each(boost::irange(0u, smp::count), [this] (shard_id shard) {
|
||||
return _db.invoke_on(shard, [shard, cmd = &_cmd, ranges = &_ranges, gs = global_schema_ptr(_schema),
|
||||
return _db.invoke_on(shard, [this, shard, cmd = &_cmd, ranges = &_ranges, gs = global_schema_ptr(_schema),
|
||||
gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable {
|
||||
auto schema = gs.get();
|
||||
auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, gts.get());
|
||||
auto& table = db.find_column_family(schema);
|
||||
auto& semaphore = db.make_query_class_config().semaphore;
|
||||
auto& semaphore = this->semaphore();
|
||||
|
||||
if (!querier_opt) {
|
||||
return reader_meta(reader_state::inexistent, reader_meta::remote_parts(semaphore));
|
||||
}
|
||||
|
||||
auto& q = *querier_opt;
|
||||
|
||||
if (&q.permit().semaphore() != &semaphore) {
|
||||
on_internal_error(mmq_log, format("looked-up reader belongs to different semaphore than the one appropriate for this query class: "
|
||||
"looked-up reader belongs to {} (0x{:x}) the query class appropriate is {} (0x{:x})",
|
||||
q.permit().semaphore().name(),
|
||||
reinterpret_cast<uintptr_t>(&q.permit().semaphore()),
|
||||
semaphore.name(),
|
||||
reinterpret_cast<uintptr_t>(&semaphore)));
|
||||
}
|
||||
|
||||
auto handle = pause(semaphore, std::move(q).reader());
|
||||
return reader_meta(
|
||||
reader_state::successful_lookup,
|
||||
|
||||
@@ -659,6 +659,8 @@ flat_mutation_reader make_combined_reader(schema_ptr schema,
|
||||
return make_combined_reader(std::move(schema), std::move(v), fwd_sm, fwd_mr);
|
||||
}
|
||||
|
||||
const ssize_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
class restricting_mutation_reader : public flat_mutation_reader::impl {
|
||||
struct mutation_source_and_params {
|
||||
mutation_source _ms;
|
||||
@@ -685,8 +687,6 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
|
||||
};
|
||||
std::variant<pending_state, admitted_state> _state;
|
||||
|
||||
static const ssize_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
template<typename Function>
|
||||
requires std::is_move_constructible<Function>::value
|
||||
&& requires(Function fn, flat_mutation_reader& reader) {
|
||||
|
||||
@@ -304,6 +304,8 @@ public:
|
||||
mutation_source make_empty_mutation_source();
|
||||
snapshot_source make_empty_snapshot_source();
|
||||
|
||||
extern const ssize_t new_reader_base_cost;
|
||||
|
||||
// Creates a restricted reader whose resource usages will be tracked
|
||||
// during it's lifetime. If there are not enough resources (dues to
|
||||
// existing readers) to create the new reader, it's construction will
|
||||
|
||||
@@ -104,7 +104,7 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore:
|
||||
const auto [it, _] = _inactive_reads.emplace(_next_id++, std::move(ir));
|
||||
(void)_;
|
||||
++_inactive_read_stats.population;
|
||||
return inactive_read_handle(it->first);
|
||||
return inactive_read_handle(*this, it->first);
|
||||
}
|
||||
|
||||
// The evicted reader will release its permit, hopefully allowing us to
|
||||
@@ -115,6 +115,17 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore:
|
||||
}
|
||||
|
||||
std::unique_ptr<reader_concurrency_semaphore::inactive_read> reader_concurrency_semaphore::unregister_inactive_read(inactive_read_handle irh) {
|
||||
if (irh && irh._sem != this) {
|
||||
throw std::runtime_error(fmt::format(
|
||||
"reader_concurrency_semaphore::unregister_inactive_read(): "
|
||||
"attempted to unregister an inactive read with a handle belonging to another semaphore: "
|
||||
"this is {} (0x{:x}) but the handle belongs to {} (0x{:x})",
|
||||
name(),
|
||||
reinterpret_cast<uintptr_t>(this),
|
||||
irh._sem->name(),
|
||||
reinterpret_cast<uintptr_t>(irh._sem)));
|
||||
}
|
||||
|
||||
if (auto it = _inactive_reads.find(irh._id); it != _inactive_reads.end()) {
|
||||
auto ir = std::move(it->second);
|
||||
_inactive_reads.erase(it);
|
||||
|
||||
@@ -60,18 +60,20 @@ public:
|
||||
};
|
||||
|
||||
class inactive_read_handle {
|
||||
reader_concurrency_semaphore* _sem = nullptr;
|
||||
uint64_t _id = 0;
|
||||
|
||||
friend class reader_concurrency_semaphore;
|
||||
|
||||
explicit inactive_read_handle(uint64_t id)
|
||||
: _id(id) {
|
||||
explicit inactive_read_handle(reader_concurrency_semaphore& sem, uint64_t id)
|
||||
: _sem(&sem), _id(id) {
|
||||
}
|
||||
public:
|
||||
inactive_read_handle() = default;
|
||||
inactive_read_handle(inactive_read_handle&& o) : _id(std::exchange(o._id, 0)) {
|
||||
inactive_read_handle(inactive_read_handle&& o) : _sem(std::exchange(o._sem, nullptr)), _id(std::exchange(o._id, 0)) {
|
||||
}
|
||||
inactive_read_handle& operator=(inactive_read_handle&& o) {
|
||||
_sem = std::exchange(o._sem, nullptr);
|
||||
_id = std::exchange(o._id, 0);
|
||||
return *this;
|
||||
}
|
||||
@@ -105,6 +107,7 @@ private:
|
||||
};
|
||||
|
||||
private:
|
||||
const resources _initial_resources;
|
||||
resources _resources;
|
||||
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _wait_list;
|
||||
@@ -135,7 +138,8 @@ public:
|
||||
sstring name,
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max(),
|
||||
std::function<void()> prethrow_action = nullptr)
|
||||
: _resources(count, memory)
|
||||
: _initial_resources(count, memory)
|
||||
, _resources(count, memory)
|
||||
, _wait_list(expiry_handler(name))
|
||||
, _name(std::move(name))
|
||||
, _max_queue_length(max_queue_length)
|
||||
@@ -144,11 +148,11 @@ public:
|
||||
/// Create a semaphore with practically unlimited count and memory.
|
||||
///
|
||||
/// And conversely, no queue limit either.
|
||||
explicit reader_concurrency_semaphore(no_limits)
|
||||
explicit reader_concurrency_semaphore(no_limits, sstring name = "unlimited reader_concurrency_semaphore")
|
||||
: reader_concurrency_semaphore(
|
||||
std::numeric_limits<int>::max(),
|
||||
std::numeric_limits<ssize_t>::max(),
|
||||
"unlimited reader_concurrency_semaphore") {}
|
||||
std::move(name)) {}
|
||||
|
||||
~reader_concurrency_semaphore();
|
||||
|
||||
@@ -158,6 +162,13 @@ public:
|
||||
reader_concurrency_semaphore(reader_concurrency_semaphore&&) = delete;
|
||||
reader_concurrency_semaphore& operator=(reader_concurrency_semaphore&&) = delete;
|
||||
|
||||
/// Returns the name of the semaphore
|
||||
///
|
||||
/// If the semaphore has no name, "unnamed reader concurrency semaphore" is returned.
|
||||
std::string_view name() const {
|
||||
return _name.empty() ? "unnamed reader concurrency semaphore" : std::string_view(_name);
|
||||
}
|
||||
|
||||
/// Register an inactive read.
|
||||
///
|
||||
/// The semaphore will evict this read when there is a shortage of
|
||||
@@ -193,6 +204,10 @@ public:
|
||||
|
||||
reader_permit make_permit();
|
||||
|
||||
const resources initial_resources() const {
|
||||
return _initial_resources;
|
||||
}
|
||||
|
||||
const resources available_resources() const {
|
||||
return _resources;
|
||||
}
|
||||
|
||||
@@ -42,12 +42,20 @@ struct reader_resources {
|
||||
return count >= other.count && memory >= other.memory;
|
||||
}
|
||||
|
||||
reader_resources operator-(const reader_resources& other) const {
|
||||
return reader_resources{count - other.count, memory - other.memory};
|
||||
}
|
||||
|
||||
reader_resources& operator-=(const reader_resources& other) {
|
||||
count -= other.count;
|
||||
memory -= other.memory;
|
||||
return *this;
|
||||
}
|
||||
|
||||
reader_resources operator+(const reader_resources& other) const {
|
||||
return reader_resources{count + other.count, memory + other.memory};
|
||||
}
|
||||
|
||||
reader_resources& operator+=(const reader_resources& other) {
|
||||
count += other.count;
|
||||
memory += other.memory;
|
||||
|
||||
@@ -44,15 +44,15 @@ mkdir -p $BUILDDIR/scylla-package
|
||||
tar -C $BUILDDIR/scylla-package -xpf $RELOC_PKG
|
||||
cd $BUILDDIR/scylla-package
|
||||
|
||||
PRODUCT=$(cat scylla/SCYLLA-PRODUCT-FILE)
|
||||
SCYLLA_VERSION=$(cat scylla/SCYLLA-VERSION-FILE)
|
||||
SCYLLA_RELEASE=$(cat scylla/SCYLLA-RELEASE-FILE)
|
||||
|
||||
ln -fv $RELOC_PKG ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz
|
||||
|
||||
if $DIST; then
|
||||
export DEB_BUILD_OPTIONS="housekeeping"
|
||||
fi
|
||||
|
||||
mv scylla/debian debian
|
||||
|
||||
PKG_NAME=$(dpkg-parsechangelog --show-field Source)
|
||||
# XXX: Drop revision number from version string.
|
||||
# Since it always '1', this should be okay for now.
|
||||
PKG_VERSION=$(dpkg-parsechangelog --show-field Version |sed -e 's/-1$//')
|
||||
ln -fv $RELOC_PKG ../"$PKG_NAME"_"$PKG_VERSION".orig.tar.gz
|
||||
debuild -rfakeroot -us -uc
|
||||
|
||||
@@ -1633,6 +1633,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
|
||||
auto& ks = db.local().find_keyspace(keyspace_name);
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tm, tokens, myip);
|
||||
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
|
||||
|
||||
//Active ranges
|
||||
auto metadata_clone = tm.clone_only_token_map();
|
||||
@@ -1719,6 +1720,9 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
|
||||
mandatory_neighbors = get_node_losing_the_ranges(old_endpoints, new_endpoints);
|
||||
neighbors = mandatory_neighbors;
|
||||
} else if (old_endpoints.size() < strat.get_replication_factor()) {
|
||||
if (!find_node_in_local_dc_only) {
|
||||
neighbors = old_endpoints;
|
||||
} else {
|
||||
if (old_endpoints_in_local_dc.size() == rf_in_local_dc) {
|
||||
// Local DC has enough replica nodes.
|
||||
mandatory_neighbors = get_node_losing_the_ranges(old_endpoints_in_local_dc, new_endpoints);
|
||||
@@ -1746,6 +1750,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
|
||||
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints_in_local_dc={}, rf_in_local_dc={}",
|
||||
keyspace_name, desired_range, old_endpoints_in_local_dc.size(), rf_in_local_dc));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints={}, rf={}",
|
||||
keyspace_name, desired_range, old_endpoints, strat.get_replication_factor()));
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
#include <unordered_map>
|
||||
#include <exception>
|
||||
#include <absl/container/btree_set.h>
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
@@ -339,6 +340,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using repair_hash_set = absl::btree_set<repair_hash>;
|
||||
|
||||
enum class repair_row_level_start_status: uint8_t {
|
||||
ok,
|
||||
no_such_column_family,
|
||||
|
||||
@@ -529,7 +529,7 @@ public:
|
||||
sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write();
|
||||
schema_ptr s = reader.schema();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s,
|
||||
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
|
||||
t->get_sstables_manager().configure_writer(),
|
||||
encoding_stats{}, pc).then([sst] {
|
||||
return sst->open_data();
|
||||
@@ -666,7 +666,7 @@ private:
|
||||
// Tracks current sync boundary
|
||||
std::optional<repair_sync_boundary> _current_sync_boundary;
|
||||
// Contains the hashes of rows in the _working_row_buffor for all peer nodes
|
||||
std::vector<std::unordered_set<repair_hash>> _peer_row_hash_sets;
|
||||
std::vector<repair_hash_set> _peer_row_hash_sets;
|
||||
// Gate used to make sure pending operation of meta data is done
|
||||
seastar::gate _gate;
|
||||
sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes;
|
||||
@@ -754,11 +754,12 @@ public:
|
||||
public:
|
||||
future<> stop() {
|
||||
auto gate_future = _gate.close();
|
||||
auto writer_future = _repair_writer.wait_for_writer_done();
|
||||
auto f1 = _sink_source_for_get_full_row_hashes.close();
|
||||
auto f2 = _sink_source_for_get_row_diff.close();
|
||||
auto f3 = _sink_source_for_put_row_diff.close();
|
||||
return when_all_succeed(std::move(gate_future), std::move(writer_future), std::move(f1), std::move(f2), std::move(f3)).discard_result();
|
||||
return when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
|
||||
return _repair_writer.wait_for_writer_done();
|
||||
});
|
||||
}
|
||||
|
||||
static std::unordered_map<node_repair_meta_id, lw_shared_ptr<repair_meta>>& repair_meta_map() {
|
||||
@@ -886,9 +887,9 @@ public:
|
||||
}
|
||||
|
||||
// Must run inside a seastar thread
|
||||
static std::unordered_set<repair_hash>
|
||||
get_set_diff(const std::unordered_set<repair_hash>& x, const std::unordered_set<repair_hash>& y) {
|
||||
std::unordered_set<repair_hash> set_diff;
|
||||
static repair_hash_set
|
||||
get_set_diff(const repair_hash_set& x, const repair_hash_set& y) {
|
||||
repair_hash_set set_diff;
|
||||
// Note std::set_difference needs x and y are sorted.
|
||||
std::copy_if(x.begin(), x.end(), std::inserter(set_diff, set_diff.end()),
|
||||
[&y] (auto& item) { thread::maybe_yield(); return y.find(item) == y.end(); });
|
||||
@@ -906,14 +907,14 @@ public:
|
||||
|
||||
}
|
||||
|
||||
std::unordered_set<repair_hash>& peer_row_hash_sets(unsigned node_idx) {
|
||||
repair_hash_set& peer_row_hash_sets(unsigned node_idx) {
|
||||
return _peer_row_hash_sets[node_idx];
|
||||
}
|
||||
|
||||
// Get a list of row hashes in _working_row_buf
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
working_row_hashes() {
|
||||
return do_with(std::unordered_set<repair_hash>(), [this] (std::unordered_set<repair_hash>& hashes) {
|
||||
return do_with(repair_hash_set(), [this] (repair_hash_set& hashes) {
|
||||
return do_for_each(_working_row_buf, [&hashes] (repair_row& r) {
|
||||
hashes.emplace(r.hash());
|
||||
}).then([&hashes] {
|
||||
@@ -1199,9 +1200,9 @@ private:
|
||||
}
|
||||
|
||||
future<std::list<repair_row>>
|
||||
copy_rows_from_working_row_buf_within_set_diff(std::unordered_set<repair_hash> set_diff) {
|
||||
copy_rows_from_working_row_buf_within_set_diff(repair_hash_set set_diff) {
|
||||
return do_with(std::list<repair_row>(), std::move(set_diff),
|
||||
[this] (std::list<repair_row>& rows, std::unordered_set<repair_hash>& set_diff) {
|
||||
[this] (std::list<repair_row>& rows, repair_hash_set& set_diff) {
|
||||
return do_for_each(_working_row_buf, [this, &set_diff, &rows] (const repair_row& r) {
|
||||
if (set_diff.count(r.hash()) > 0) {
|
||||
rows.push_back(r);
|
||||
@@ -1216,7 +1217,7 @@ private:
|
||||
// Give a set of row hashes, return the corresponding rows
|
||||
// If needs_all_rows is set, return all the rows in _working_row_buf, ignore the set_diff
|
||||
future<std::list<repair_row>>
|
||||
get_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) {
|
||||
get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) {
|
||||
if (needs_all_rows) {
|
||||
if (!_repair_master || _nr_peer_nodes == 1) {
|
||||
return make_ready_future<std::list<repair_row>>(std::move(_working_row_buf));
|
||||
@@ -1266,7 +1267,7 @@ private:
|
||||
[this] (const repair_row& x, const repair_row& y) { thread::maybe_yield(); return _cmp(x.boundary(), y.boundary()) < 0; });
|
||||
}
|
||||
if (update_hash_set) {
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<repair_hash_set>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
@@ -1360,13 +1361,13 @@ private:
|
||||
public:
|
||||
// RPC API
|
||||
// Return the hashes of the rows in _working_row_buf
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
get_full_row_hashes(gms::inet_address remote_node) {
|
||||
if (remote_node == _myip) {
|
||||
return get_full_row_hashes_handler();
|
||||
}
|
||||
return netw::get_local_messaging_service().send_repair_get_full_row_hashes(msg_addr(remote_node),
|
||||
_repair_meta_id).then([this, remote_node] (std::unordered_set<repair_hash> hashes) {
|
||||
_repair_meta_id).then([this, remote_node] (repair_hash_set hashes) {
|
||||
rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size());
|
||||
_metrics.rx_hashes_nr += hashes.size();
|
||||
stats().rx_hashes_nr += hashes.size();
|
||||
@@ -1377,7 +1378,7 @@ public:
|
||||
|
||||
private:
|
||||
future<> get_full_row_hashes_source_op(
|
||||
lw_shared_ptr<std::unordered_set<repair_hash>> current_hashes,
|
||||
lw_shared_ptr<repair_hash_set> current_hashes,
|
||||
gms::inet_address remote_node,
|
||||
unsigned node_idx,
|
||||
rpc::source<repair_hash_with_cmd>& source) {
|
||||
@@ -1415,12 +1416,12 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (remote_node == _myip) {
|
||||
return get_full_row_hashes_handler();
|
||||
}
|
||||
auto current_hashes = make_lw_shared<std::unordered_set<repair_hash>>();
|
||||
auto current_hashes = make_lw_shared<repair_hash_set>();
|
||||
return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx).then(
|
||||
[this, current_hashes, remote_node, node_idx]
|
||||
(rpc::sink<repair_stream_cmd>& sink, rpc::source<repair_hash_with_cmd>& source) mutable {
|
||||
@@ -1435,7 +1436,7 @@ public:
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<std::unordered_set<repair_hash>>
|
||||
future<repair_hash_set>
|
||||
get_full_row_hashes_handler() {
|
||||
return with_gate(_gate, [this] {
|
||||
return working_row_hashes();
|
||||
@@ -1585,7 +1586,7 @@ public:
|
||||
// RPC API
|
||||
// Return rows in the _working_row_buf with hash within the given sef_diff
|
||||
// Must run inside a seastar thread
|
||||
void get_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, unsigned node_idx) {
|
||||
void get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (needs_all_rows || !set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return;
|
||||
@@ -1654,11 +1655,11 @@ private:
|
||||
}
|
||||
|
||||
future<> get_row_diff_sink_op(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
repair_hash_set set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
rpc::sink<repair_hash_with_cmd>& sink,
|
||||
gms::inet_address remote_node) {
|
||||
return do_with(std::move(set_diff), [needs_all_rows, remote_node, &sink] (std::unordered_set<repair_hash>& set_diff) mutable {
|
||||
return do_with(std::move(set_diff), [needs_all_rows, remote_node, &sink] (repair_hash_set& set_diff) mutable {
|
||||
if (inject_rpc_stream_error) {
|
||||
return make_exception_future<>(std::runtime_error("get_row_diff: Inject sender error in sink loop"));
|
||||
}
|
||||
@@ -1685,7 +1686,7 @@ private:
|
||||
public:
|
||||
// Must run inside a seastar thread
|
||||
void get_row_diff_with_rpc_stream(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
repair_hash_set set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
update_peer_row_hash_sets update_hash_set,
|
||||
gms::inet_address remote_node,
|
||||
@@ -1711,7 +1712,7 @@ public:
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<repair_rows_on_wire> get_row_diff_handler(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows) {
|
||||
future<repair_rows_on_wire> get_row_diff_handler(repair_hash_set set_diff, needs_all_rows_t needs_all_rows) {
|
||||
return with_gate(_gate, [this, set_diff = std::move(set_diff), needs_all_rows] () mutable {
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return to_repair_rows_on_wire(std::move(row_diff));
|
||||
@@ -1721,15 +1722,16 @@ public:
|
||||
|
||||
// RPC API
|
||||
// Send rows in the _working_row_buf with hash within the given sef_diff
|
||||
future<> put_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node) {
|
||||
future<> put_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto sz = set_diff.size();
|
||||
size_t sz = set_diff.size();
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, sz] (std::list<repair_row> row_diff) {
|
||||
if (row_diff.size() != sz) {
|
||||
throw std::runtime_error("row_diff.size() != set_diff.size()");
|
||||
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
|
||||
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
|
||||
}
|
||||
return do_with(std::move(row_diff), [this, remote_node] (std::list<repair_row>& row_diff) {
|
||||
return get_repair_rows_size(row_diff).then([this, remote_node, &row_diff] (size_t row_bytes) mutable {
|
||||
@@ -1796,17 +1798,18 @@ private:
|
||||
|
||||
public:
|
||||
future<> put_row_diff_with_rpc_stream(
|
||||
std::unordered_set<repair_hash> set_diff,
|
||||
repair_hash_set set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto sz = set_diff.size();
|
||||
size_t sz = set_diff.size();
|
||||
return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, node_idx, sz] (std::list<repair_row> row_diff) {
|
||||
if (row_diff.size() != sz) {
|
||||
throw std::runtime_error("row_diff.size() != set_diff.size()");
|
||||
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
|
||||
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
|
||||
}
|
||||
return do_with(std::move(row_diff), [this, remote_node, node_idx] (std::list<repair_row>& row_diff) {
|
||||
return get_repair_rows_size(row_diff).then([this, remote_node, node_idx, &row_diff] (size_t row_bytes) mutable {
|
||||
@@ -1845,7 +1848,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source,
|
||||
bool &error,
|
||||
std::unordered_set<repair_hash>& current_set_diff,
|
||||
repair_hash_set& current_set_diff,
|
||||
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
|
||||
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
|
||||
rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd));
|
||||
@@ -1858,7 +1861,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
}
|
||||
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
|
||||
_metrics.rx_hashes_nr += current_set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(current_set_diff)));
|
||||
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(current_set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
if (fp.get_owner_shard() == this_shard_id()) {
|
||||
@@ -1936,12 +1939,12 @@ static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process
|
||||
if (status == repair_stream_cmd::get_full_row_hashes) {
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->get_full_row_hashes_handler().then([] (std::unordered_set<repair_hash> hashes) {
|
||||
return rm->get_full_row_hashes_handler().then([] (repair_hash_set hashes) {
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
return hashes;
|
||||
});
|
||||
}).then([sink] (std::unordered_set<repair_hash> hashes) mutable {
|
||||
return do_with(std::move(hashes), [sink] (std::unordered_set<repair_hash>& hashes) mutable {
|
||||
}).then([sink] (repair_hash_set hashes) mutable {
|
||||
return do_with(std::move(hashes), [sink] (repair_hash_set& hashes) mutable {
|
||||
return do_for_each(hashes, [sink] (const repair_hash& hash) mutable {
|
||||
return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
|
||||
}).then([sink] () mutable {
|
||||
@@ -1964,7 +1967,7 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
|
||||
uint32_t repair_meta_id,
|
||||
rpc::sink<repair_row_on_wire_with_cmd> sink,
|
||||
rpc::source<repair_hash_with_cmd> source) {
|
||||
return do_with(false, std::unordered_set<repair_hash>(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, std::unordered_set<repair_hash>& current_set_diff) mutable {
|
||||
return do_with(false, repair_hash_set(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, repair_hash_set& current_set_diff) mutable {
|
||||
return repeat([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] () mutable {
|
||||
return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) mutable {
|
||||
if (hash_cmd_opt) {
|
||||
@@ -2107,7 +2110,7 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->get_full_row_hashes_handler().then([] (std::unordered_set<repair_hash> hashes) {
|
||||
return rm->get_full_row_hashes_handler().then([] (repair_hash_set hashes) {
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
return hashes;
|
||||
});
|
||||
@@ -2135,11 +2138,11 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_row_diff([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
|
||||
repair_hash_set set_diff, bool needs_all_rows) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
_metrics.rx_hashes_nr += set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(set_diff)));
|
||||
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
if (fp.get_owner_shard() == this_shard_id()) {
|
||||
@@ -2439,7 +2442,7 @@ private:
|
||||
// sequentially because the rows from repair follower 1 to
|
||||
// repair master might reduce the amount of missing data
|
||||
// between repair master and repair follower 2.
|
||||
std::unordered_set<repair_hash> set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes().get0());
|
||||
repair_hash_set set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes().get0());
|
||||
// Request missing sets from peer node
|
||||
rlogger.debug("Before get_row_diff to node {}, local={}, peer={}, set_diff={}",
|
||||
node, master.working_row_hashes().get0().size(), master.peer_row_hash_sets(node_idx).size(), set_diff.size());
|
||||
@@ -2462,9 +2465,9 @@ private:
|
||||
// So we can figure out which rows peer node are missing and send the missing rows to them
|
||||
check_in_shutdown();
|
||||
_ri.check_in_abort();
|
||||
std::unordered_set<repair_hash> local_row_hash_sets = master.working_row_hashes().get0();
|
||||
repair_hash_set local_row_hash_sets = master.working_row_hashes().get0();
|
||||
auto sz = _all_live_peer_nodes.size();
|
||||
std::vector<std::unordered_set<repair_hash>> set_diffs(sz);
|
||||
std::vector<repair_hash_set> set_diffs(sz);
|
||||
for (size_t idx : boost::irange(size_t(0), sz)) {
|
||||
set_diffs[idx] = repair_meta::get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx));
|
||||
}
|
||||
|
||||
@@ -92,7 +92,8 @@ executables = ['build/{}/scylla'.format(args.mode),
|
||||
'/usr/sbin/ethtool',
|
||||
'/usr/bin/netstat',
|
||||
'/usr/bin/hwloc-distrib',
|
||||
'/usr/bin/hwloc-calc']
|
||||
'/usr/bin/hwloc-calc',
|
||||
'/usr/bin/lsblk']
|
||||
|
||||
output = args.dest
|
||||
|
||||
|
||||
@@ -63,6 +63,17 @@ MemoryHigh=1200M
|
||||
MemoryMax=1400M
|
||||
MemoryLimit=1400M
|
||||
EOS
|
||||
|
||||
# On CentOS7, systemd does not support percentage-based parameter.
|
||||
# To apply memory parameter on CentOS7, we need to override the parameter
|
||||
# in bytes, instead of percentage.
|
||||
elif [ "$RHEL" -a "$VERSION_ID" = "7" ]; then
|
||||
MEMORY_LIMIT=$((MEMTOTAL_BYTES / 100 * 5))
|
||||
mkdir -p /etc/systemd/system/scylla-helper.slice.d/
|
||||
cat << EOS > /etc/systemd/system/scylla-helper.slice.d/memory.conf
|
||||
[Slice]
|
||||
MemoryLimit=$MEMORY_LIMIT
|
||||
EOS
|
||||
fi
|
||||
|
||||
systemctl --system daemon-reload >/dev/null || true
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 11e86172ba...4641f4f2d3
@@ -25,6 +25,7 @@
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include <boost/range/algorithm/for_each.hpp>
|
||||
#include "utils/small_vector.hh"
|
||||
#include <absl/container/btree_set.h>
|
||||
|
||||
namespace ser {
|
||||
|
||||
@@ -81,6 +82,17 @@ static inline void serialize_array(Output& out, const Container& v) {
|
||||
template<typename Container>
|
||||
struct container_traits;
|
||||
|
||||
template<typename T>
|
||||
struct container_traits<absl::btree_set<T>> {
|
||||
struct back_emplacer {
|
||||
absl::btree_set<T>& c;
|
||||
back_emplacer(absl::btree_set<T>& c_) : c(c_) {}
|
||||
void operator()(T&& v) {
|
||||
c.emplace(std::move(v));
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
struct container_traits<std::unordered_set<T>> {
|
||||
struct back_emplacer {
|
||||
@@ -253,6 +265,27 @@ struct serializer<std::list<T>> {
|
||||
}
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
struct serializer<absl::btree_set<T>> {
|
||||
template<typename Input>
|
||||
static absl::btree_set<T> read(Input& in) {
|
||||
auto sz = deserialize(in, boost::type<uint32_t>());
|
||||
absl::btree_set<T> v;
|
||||
deserialize_array_helper<false, T>::doit(in, v, sz);
|
||||
return v;
|
||||
}
|
||||
template<typename Output>
|
||||
static void write(Output& out, const absl::btree_set<T>& v) {
|
||||
safe_serialize_as_uint32(out, v.size());
|
||||
serialize_array_helper<false, T>::doit(out, v);
|
||||
}
|
||||
template<typename Input>
|
||||
static void skip(Input& in) {
|
||||
auto sz = deserialize(in, boost::type<uint32_t>());
|
||||
skip_array<T>(in, sz);
|
||||
}
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
struct serializer<std::unordered_set<T>> {
|
||||
template<typename Input>
|
||||
|
||||
@@ -602,7 +602,7 @@ private:
|
||||
// - add support to merge summary (message: Partition merge counts were {%s}.).
|
||||
// - there is no easy way, currently, to know the exact number of total partitions.
|
||||
// By the time being, using estimated key count.
|
||||
sstring formatted_msg = fmt::format("{} sstables to [{}]. {} to {} (~{} of original) in {}ms = {}. " \
|
||||
sstring formatted_msg = fmt::format("{} sstables to [{}]. {} to {} (~{}% of original) in {}ms = {}. " \
|
||||
"~{} total partitions merged to {}.",
|
||||
_info->sstables, new_sstables_msg, pretty_printed_data_size(_info->start_size), pretty_printed_data_size(_info->end_size), int(ratio * 100),
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), pretty_printed_throughput(_info->end_size, duration),
|
||||
@@ -1236,11 +1236,8 @@ private:
|
||||
// return estimated partitions per sstable for a given shard
|
||||
uint64_t partitions_per_sstable(shard_id s) const {
|
||||
uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size)));
|
||||
// As we adjust this estimate downwards from the compaction strategy, it can get to 0 so
|
||||
// make sure we're returning at least 1.
|
||||
return std::max(uint64_t(1),
|
||||
std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)),
|
||||
_cf.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions)));
|
||||
return std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)),
|
||||
_cf.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions));
|
||||
}
|
||||
public:
|
||||
resharding_compaction(column_family& cf, sstables::compaction_descriptor descriptor)
|
||||
|
||||
@@ -92,6 +92,9 @@ public:
|
||||
void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true);
|
||||
void revert_charges(sstables::shared_sstable sst);
|
||||
private:
|
||||
// Returns true if this SSTable can be added or removed from the tracker.
|
||||
bool sstable_belongs_to_tracker(const sstables::shared_sstable& sst);
|
||||
|
||||
void disable() {
|
||||
_disabled = true;
|
||||
_ongoing_writes = {};
|
||||
|
||||
@@ -857,7 +857,7 @@ double compaction_backlog_tracker::backlog() const {
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled) {
|
||||
if (_disabled || !sstable_belongs_to_tracker(sst)) {
|
||||
return;
|
||||
}
|
||||
_ongoing_writes.erase(sst);
|
||||
@@ -870,7 +870,7 @@ void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled) {
|
||||
if (_disabled || !sstable_belongs_to_tracker(sst)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -883,6 +883,10 @@ void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool compaction_backlog_tracker::sstable_belongs_to_tracker(const sstables::shared_sstable& sst) {
|
||||
return !sst->requires_view_building();
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp) {
|
||||
if (_disabled) {
|
||||
return;
|
||||
|
||||
@@ -438,8 +438,8 @@ std::unique_ptr<sstable_set_impl> leveled_compaction_strategy::make_sstable_set(
|
||||
return std::make_unique<partitioned_sstable_set>(std::move(schema));
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) {
|
||||
return std::make_unique<partitioned_sstable_set>(std::move(schema), use_level_metadata);
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata) {
|
||||
return sstables::sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
|
||||
}
|
||||
|
||||
compaction_descriptor compaction_strategy_impl::get_major_compaction_job(column_family& cf, std::vector<sstables::shared_sstable> candidates) {
|
||||
|
||||
@@ -453,9 +453,16 @@ private:
|
||||
auto indexes = std::move(entries_reader->_consumer.indexes);
|
||||
return entries_reader->_context.close().then([indexes = std::move(indexes), ex = std::move(ex)] () mutable {
|
||||
if (ex) {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
return do_with(std::move(indexes), [ex = std::move(ex)] (index_list& indexes) mutable {
|
||||
return parallel_for_each(indexes, [] (index_entry& ie) mutable {
|
||||
return ie.close_pi_stream();
|
||||
}).then_wrapped([ex = std::move(ex)] (future<>&& fut) mutable {
|
||||
fut.ignore_ready_future();
|
||||
return make_exception_future<index_list>(std::move(ex));
|
||||
});
|
||||
});
|
||||
}
|
||||
return std::move(indexes);
|
||||
return make_ready_future<index_list>(std::move(indexes));
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
@@ -741,6 +741,11 @@ public:
|
||||
, _run_identifier(cfg.run_identifier)
|
||||
, _write_regular_as_static(cfg.correctly_serialize_static_compact_in_mc && s.is_static_compact_table())
|
||||
{
|
||||
// This can be 0 in some cases, which is albeit benign, can wreak havoc
|
||||
// in lower-level writer code, so clamp it to [1, +inf) here, which is
|
||||
// exactly what callers used to do anyway.
|
||||
estimated_partitions = std::max(uint64_t(1), estimated_partitions);
|
||||
|
||||
_sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance());
|
||||
_sst.write_toc(_pc);
|
||||
_sst.create_data().get();
|
||||
|
||||
@@ -101,7 +101,7 @@ public:
|
||||
incremental_selector make_incremental_selector() const;
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_set_impl> make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true);
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata = true);
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run);
|
||||
|
||||
|
||||
@@ -2012,6 +2012,11 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer&
|
||||
, _tombstone_written(false)
|
||||
, _range_tombstones(s)
|
||||
{
|
||||
// This can be 0 in some cases, which is albeit benign, can wreak havoc
|
||||
// in lower-level writer code, so clamp it to [1, +inf) here, which is
|
||||
// exactly what callers used to do anyway.
|
||||
estimated_partitions = std::max(uint64_t(1), estimated_partitions);
|
||||
|
||||
_sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance(), utils::filter_format::k_l_format);
|
||||
_sst._pi_write.desired_block_size = cfg.promoted_index_block_size;
|
||||
_sst._correctly_serialize_non_compound_range_tombstones = cfg.correctly_serialize_non_compound_range_tombstones;
|
||||
|
||||
@@ -229,7 +229,7 @@ void stream_session::init_messaging_service_handler() {
|
||||
schema_ptr s = reader.schema();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
return sst->write_components(std::move(reader), std::max(1ul, adjusted_estimated_partitions), s,
|
||||
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
|
||||
cf->get_sstables_manager().configure_writer(),
|
||||
encoding_stats{}, pc).then([sst] {
|
||||
return sst->open_data();
|
||||
|
||||
40
table.cc
40
table.cc
@@ -23,7 +23,6 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "db/view/view_updating_consumer.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "cell_locking.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
@@ -326,6 +325,32 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator)
|
||||
{
|
||||
auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc,
|
||||
std::move(trace_state), fwd, fwd_mr, monitor_generator);
|
||||
});
|
||||
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
table::make_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
@@ -2588,16 +2613,3 @@ table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts)
|
||||
return this->make_reader_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
stop_iteration db::view::view_updating_consumer::consume_end_of_partition() {
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
try {
|
||||
auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get();
|
||||
} catch (...) {
|
||||
tlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception());
|
||||
}
|
||||
_m.reset();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
3
test.py
3
test.py
@@ -447,6 +447,9 @@ async def run_test(test, options, gentle_kill=False, env=dict()):
|
||||
env=dict(os.environ,
|
||||
UBSAN_OPTIONS=":".join(filter(None, UBSAN_OPTIONS)),
|
||||
ASAN_OPTIONS=":".join(filter(None, ASAN_OPTIONS)),
|
||||
# TMPDIR env variable is used by any seastar/scylla
|
||||
# test for directory to store test temporary data.
|
||||
TMPDIR=os.path.join(options.tmpdir, test.mode),
|
||||
**env,
|
||||
),
|
||||
preexec_fn=os.setsid,
|
||||
|
||||
@@ -28,8 +28,8 @@ fi
|
||||
SCYLLA_IP=127.1.$(($$ >> 8 & 255)).$(($$ & 255))
|
||||
echo "Running Scylla on $SCYLLA_IP"
|
||||
|
||||
tmp_dir=/tmp/alternator-test-$$
|
||||
mkdir $tmp_dir
|
||||
tmp_dir="$(readlink -e ${TMPDIR-/tmp})"/alternator-test-$$
|
||||
mkdir "$tmp_dir"
|
||||
|
||||
# We run the cleanup() function on exit for any reason - successful finish
|
||||
# of the script, an error (since we have "set -e"), or a signal.
|
||||
@@ -76,7 +76,7 @@ done
|
||||
# argv[0] isn't good enough - because killall inspects the actual executable
|
||||
# filename in /proc/<pid>/stat. So we need to name the executable differently.
|
||||
# Luckily, using a symbolic link is good enough.
|
||||
SCYLLA_LINK=$tmp_dir/test_scylla
|
||||
SCYLLA_LINK="$tmp_dir"/test_scylla
|
||||
ln -s "$SCYLLA" "$SCYLLA_LINK"
|
||||
|
||||
"$SCYLLA_LINK" --options-file "$source_path/conf/scylla.yaml" \
|
||||
|
||||
@@ -157,6 +157,13 @@ BOOST_AUTO_TEST_CASE(test_big_decimal_div) {
|
||||
test_div("-0.25", 10, "-0.02");
|
||||
test_div("-0.26", 10, "-0.03");
|
||||
test_div("-10E10", 3, "-3E10");
|
||||
|
||||
// Document a small oddity, 1e1 has -1 decimal places, so dividing
|
||||
// it by 2 produces 0. This is not the behavior in cassandra, but
|
||||
// scylla doesn't expose arithmetic operations, so this doesn't
|
||||
// seem to be visible from CQL.
|
||||
test_div("10", 2, "5");
|
||||
test_div("1e1", 2, "0e1");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_big_decimal_assignadd) {
|
||||
|
||||
@@ -142,6 +142,19 @@ SEASTAR_TEST_CASE(test_decimal_to_bigint) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_decimal_to_float) {
|
||||
return do_with_cql_env_thread([&](auto& e) {
|
||||
e.execute_cql("CREATE TABLE test (key text primary key, value decimal)").get();
|
||||
e.execute_cql("INSERT INTO test (key, value) VALUES ('k1', 10)").get();
|
||||
e.execute_cql("INSERT INTO test (key, value) VALUES ('k2', 1e1)").get();
|
||||
auto v = e.execute_cql("SELECT key, CAST(value as float) from test").get0();
|
||||
assert_that(v).is_rows().with_rows_ignore_order({
|
||||
{{serialized("k1")}, {serialized(float(10))}},
|
||||
{{serialized("k2")}, {serialized(float(10))}},
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_varint_to_bigint) {
|
||||
return do_with_cql_env_thread([&](auto& e) {
|
||||
e.execute_cql("CREATE TABLE test (key text primary key, value varint)").get();
|
||||
|
||||
@@ -4583,3 +4583,21 @@ SEASTAR_TEST_CASE(test_internal_alter_table_on_a_distributed_table) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_impossible_where) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t(p int PRIMARY KEY, r int)");
|
||||
cquery_nofail(e, "INSERT INTO t(p,r) VALUES (0, 0)");
|
||||
cquery_nofail(e, "INSERT INTO t(p,r) VALUES (1, 10)");
|
||||
cquery_nofail(e, "INSERT INTO t(p,r) VALUES (2, 20)");
|
||||
require_rows(e, "SELECT * FROM t WHERE r>10 AND r<10 ALLOW FILTERING", {});
|
||||
require_rows(e, "SELECT * FROM t WHERE r>=10 AND r<=0 ALLOW FILTERING", {});
|
||||
|
||||
cquery_nofail(e, "CREATE TABLE t2(p int, c int, PRIMARY KEY(p, c)) WITH CLUSTERING ORDER BY (c DESC)");
|
||||
cquery_nofail(e, "INSERT INTO t2(p,c) VALUES (0, 0)");
|
||||
cquery_nofail(e, "INSERT INTO t2(p,c) VALUES (1, 10)");
|
||||
cquery_nofail(e, "INSERT INTO t2(p,c) VALUES (2, 20)");
|
||||
require_rows(e, "SELECT * FROM t2 WHERE c>10 AND c<10 ALLOW FILTERING", {});
|
||||
require_rows(e, "SELECT * FROM t2 WHERE c>=10 AND c<=0 ALLOW FILTERING", {});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -870,8 +870,6 @@ SEASTAR_TEST_CASE(reader_selector_fast_forwarding_test) {
|
||||
});
|
||||
}
|
||||
|
||||
static const std::size_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
sstables::shared_sstable create_sstable(sstables::test_env& env, simple_schema& sschema, const sstring& path) {
|
||||
std::vector<mutation> mutations;
|
||||
mutations.reserve(1 << 14);
|
||||
@@ -2588,6 +2586,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
|
||||
|
||||
BOOST_REQUIRE_THROW(handle.push(partition_end{}).get(), std::runtime_error);
|
||||
BOOST_REQUIRE_THROW(handle.push_end_of_stream(), std::runtime_error);
|
||||
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), broken_promise);
|
||||
}
|
||||
|
||||
// Abandoned handle aborts, move-assignment
|
||||
|
||||
@@ -769,3 +769,27 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
|
||||
fut.get();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class inactive_read : public reader_concurrency_semaphore::inactive_read {
|
||||
public:
|
||||
virtual void evict() override {
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
|
||||
reader_concurrency_semaphore sem1(reader_concurrency_semaphore::no_limits{}, "sem1");
|
||||
reader_concurrency_semaphore sem2(reader_concurrency_semaphore::no_limits{}, ""); // to see the message for an unnamed semaphore
|
||||
|
||||
auto sem1_h1 = sem1.register_inactive_read(std::make_unique<inactive_read>());
|
||||
auto sem2_h1 = sem2.register_inactive_read(std::make_unique<inactive_read>());
|
||||
|
||||
// Sanity check that lookup still works with empty handle.
|
||||
BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{}));
|
||||
|
||||
BOOST_REQUIRE_THROW(sem1.unregister_inactive_read(std::move(sem2_h1)), std::runtime_error);
|
||||
BOOST_REQUIRE_THROW(sem2.unregister_inactive_read(std::move(sem1_h1)), std::runtime_error);
|
||||
}
|
||||
|
||||
@@ -5847,3 +5847,111 @@ SEASTAR_TEST_CASE(test_bug_6472) {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
|
||||
return test_setup::do_with_tmp_directory([] (test_env& env, sstring tmpdir_path) {
|
||||
auto builder = schema_builder("tests", "test_bug_6472")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map<sstring, sstring> opts = {
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS" },
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1" },
|
||||
};
|
||||
builder.set_compaction_strategy_options(opts);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
auto s = builder.build();
|
||||
|
||||
const auto rows_per_partition = 200;
|
||||
|
||||
auto sst_gen = [&env, s, tmpdir_path, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
return env.make_sstable(s, tmpdir_path, (*gen)++, la, big);
|
||||
};
|
||||
|
||||
auto next_timestamp = [] (int sstable_idx, int ck_idx) {
|
||||
using namespace std::chrono;
|
||||
auto window = hours(sstable_idx * rows_per_partition + ck_idx);
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(window)).count();
|
||||
};
|
||||
|
||||
auto tokens = token_generation_for_shard(4, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto make_sstable = [&] (int sstable_idx) {
|
||||
static thread_local int32_t value = 1;
|
||||
|
||||
auto key_str = tokens[sstable_idx].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
for (auto ck = 0; ck < rows_per_partition; ++ck) {
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(sstable_idx, ck));
|
||||
}
|
||||
return make_sstable_containing(sst_gen, {m});
|
||||
};
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.datadir = tmpdir_path;
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
|
||||
std::vector<shared_sstable> sstables_spanning_many_windows = {
|
||||
make_sstable(0),
|
||||
make_sstable(1),
|
||||
make_sstable(2),
|
||||
make_sstable(3),
|
||||
};
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(sstables_spanning_many_windows,
|
||||
cf->get_sstable_set(), default_priority_class()), *cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
// The real test here is that we don't assert() in
|
||||
// sstables::prepare_summary() with the compact_sstables() call above,
|
||||
// this is only here as a sanity check.
|
||||
BOOST_REQUIRE_EQUAL(ret.new_sstables.size(), std::min(sstables_spanning_many_windows.size() * rows_per_partition,
|
||||
sstables::time_window_compaction_strategy::max_data_segregation_window_count));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
|
||||
return test_setup::do_with_tmp_directory([] (test_env& env, sstring tmpdir_path) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto pk = ss.make_pkey(make_local_key(s));
|
||||
auto mut = mutation(s, pk);
|
||||
ss.add_row(mut, ss.make_ckey(0), "val");
|
||||
|
||||
for (const auto version : all_sstable_versions) {
|
||||
testlog.info("version={}", sstables::to_string(version));
|
||||
|
||||
auto mr = flat_mutation_reader_from_mutations({mut});
|
||||
|
||||
auto sst = env.make_sstable(s, tmpdir_path, 0, version, big);
|
||||
sstable_writer_config cfg = test_sstables_manager.configure_writer();
|
||||
sst->write_components(std::move(mr), 0, s, cfg, encoding_stats{}).get();
|
||||
sst->load().get();
|
||||
|
||||
auto sst_mr = sst->as_mutation_source().make_reader(s, tests::make_permit(), query::full_partition_range, s->full_slice());
|
||||
auto sst_mut = read_mutation_from_flat_mutation_reader(sst_mr, db::no_timeout).get0();
|
||||
|
||||
// The real test here is that we don't assert() in
|
||||
// sstables::prepare_summary() with the write_components() call above,
|
||||
// this is only here as a sanity check.
|
||||
BOOST_REQUIRE(sst_mr.is_buffer_empty());
|
||||
BOOST_REQUIRE(sst_mr.is_end_of_stream());
|
||||
BOOST_REQUIRE_EQUAL(mut, sst_mut);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -26,12 +26,16 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "test/lib/test_services.hh"
|
||||
#include "test/lib/data_model.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "utils/ranges.hh"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
@@ -421,23 +425,49 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
auto& view_update_generator = e.local_view_update_generator();
|
||||
auto s = test_table_schema();
|
||||
|
||||
std::vector<shared_sstable> ssts;
|
||||
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
|
||||
auto write_to_sstable = [&] (mutation m) {
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
return sst;
|
||||
};
|
||||
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key1)});
|
||||
mutation m(s, key);
|
||||
auto col = s->get_column_definition("v");
|
||||
for (int i = 1024; i < 1280; ++i) {
|
||||
auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))}));
|
||||
row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(sstring(fmt::format("v{}", i)))));
|
||||
// Scatter the data in a bunch of different sstables, so we
|
||||
// can test the registration semaphore of the view update
|
||||
// generator
|
||||
if (!(i % 10)) {
|
||||
ssts.push_back(write_to_sstable(std::exchange(m, mutation(s, key))));
|
||||
}
|
||||
}
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
ssts.push_back(write_to_sstable(std::move(m)));
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
view_update_generator.register_staging_sstable(sst, t).get();
|
||||
parallel_for_each(ssts.begin(), ssts.begin() + 10, [&] (shared_sstable& sst) {
|
||||
return view_update_generator.register_staging_sstable(sst, t);
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
parallel_for_each(ssts.begin() + 10, ssts.end(), [&] (shared_sstable& sst) {
|
||||
return view_update_generator.register_staging_sstable(sst, t);
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
eventually([&, key1, key2] {
|
||||
auto msg = e.execute_cql(fmt::format("SELECT * FROM t WHERE p = '{}'", key1)).get0();
|
||||
@@ -464,5 +494,261 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
cql_test_config test_cfg;
|
||||
auto& db_cfg = *test_cfg.db_config;
|
||||
|
||||
db_cfg.enable_cache(false);
|
||||
db_cfg.enable_commitlog(false);
|
||||
|
||||
test_cfg.dbcfg.emplace();
|
||||
test_cfg.dbcfg->available_memory = memory::stats().total_memory();
|
||||
test_cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0();
|
||||
test_cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0();
|
||||
|
||||
do_with_cql_env([] (cql_test_env& e) -> future<> {
|
||||
e.execute_cql("create table t (p text, c text, v text, primary key (p, c))").get();
|
||||
e.execute_cql("create materialized view tv as select * from t "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
auto msb = e.local_db().get_config().murmur3_partitioner_ignore_msb_bits();
|
||||
auto key1 = token_generation_for_shard(1, this_shard_id(), msb).front().first;
|
||||
|
||||
for (auto i = 0; i < 1024; ++i) {
|
||||
e.execute_cql(fmt::format("insert into t (p, c, v) values ('{}', 'c{}', 'x')", key1, i)).get();
|
||||
}
|
||||
|
||||
// We need data on the disk so that the pre-image reader is forced to go to disk.
|
||||
e.db().invoke_on_all([] (database& db) {
|
||||
return db.flush_all_memtables();
|
||||
}).get();
|
||||
|
||||
auto& view_update_generator = e.local_view_update_generator();
|
||||
auto s = test_table_schema();
|
||||
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key1)});
|
||||
mutation m(s, key);
|
||||
auto col = s->get_column_definition("v");
|
||||
const auto filler_val_size = 4 * 1024;
|
||||
const auto filler_val = sstring(filler_val_size, 'a');
|
||||
for (int i = 0; i < 1024; ++i) {
|
||||
auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))}));
|
||||
row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(filler_val)));
|
||||
}
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
|
||||
auto& sem = *with_scheduling_group(e.local_db().get_streaming_scheduling_group(), [&] () {
|
||||
return &e.local_db().make_query_class_config().semaphore;
|
||||
}).get0();
|
||||
|
||||
// consume all units except what is needed to admit a single reader.
|
||||
sem.consume(sem.initial_resources() - reader_resources{1, new_reader_base_cost});
|
||||
|
||||
testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(sem.get_inactive_read_stats().permit_based_evictions, 0);
|
||||
|
||||
view_update_generator.register_staging_sstable(sst, t).get();
|
||||
|
||||
eventually_true([&] {
|
||||
return sem.get_inactive_read_stats().permit_based_evictions > 0;
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
}, std::move(test_cfg)).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
using partition_size_map = std::map<dht::decorated_key, size_t, dht::ring_position_less_comparator>;
|
||||
|
||||
class consumer_verifier {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
const partition_size_map& _partition_rows;
|
||||
std::vector<mutation>& _collected_muts;
|
||||
bool& _failed;
|
||||
std::unique_ptr<row_locker> _rl;
|
||||
std::unique_ptr<row_locker::stats> _rl_stats;
|
||||
clustering_key::less_compare _less_cmp;
|
||||
const size_t _max_rows_soft;
|
||||
const size_t _max_rows_hard;
|
||||
size_t _buffer_rows = 0;
|
||||
|
||||
private:
|
||||
static size_t rows_in_limit(size_t l) {
|
||||
const size_t _100kb = 100 * 1024;
|
||||
// round up
|
||||
return l / _100kb + std::min(size_t(1), l % _100kb);
|
||||
}
|
||||
|
||||
static size_t rows_in_mut(const mutation& m) {
|
||||
return std::distance(m.partition().clustered_rows().begin(), m.partition().clustered_rows().end());
|
||||
}
|
||||
|
||||
void check(mutation mut) {
|
||||
// First we check that we would be able to create a reader, even
|
||||
// though the staging reader consumed all resources.
|
||||
auto fut = _permit.wait_admission(new_reader_base_cost, db::timeout_clock::now());
|
||||
BOOST_REQUIRE(!fut.failed());
|
||||
auto res_units = fut.get0();
|
||||
|
||||
const size_t current_rows = rows_in_mut(mut);
|
||||
const auto total_rows = _partition_rows.at(mut.decorated_key());
|
||||
_buffer_rows += current_rows;
|
||||
|
||||
testlog.trace("consumer_verifier::check(): key={}, rows={}/{}, _buffer={}",
|
||||
partition_key::with_schema_wrapper(*_schema, mut.key()),
|
||||
current_rows,
|
||||
total_rows,
|
||||
_buffer_rows);
|
||||
|
||||
BOOST_REQUIRE(current_rows);
|
||||
BOOST_REQUIRE(current_rows <= _max_rows_hard);
|
||||
BOOST_REQUIRE(_buffer_rows <= _max_rows_hard);
|
||||
|
||||
// The current partition doesn't have all of its rows yet, verify
|
||||
// that the new mutation contains the next rows for the same
|
||||
// partition
|
||||
if (!_collected_muts.empty() && rows_in_mut(_collected_muts.back()) < _partition_rows.at(_collected_muts.back().decorated_key())) {
|
||||
BOOST_REQUIRE(_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
|
||||
const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key();
|
||||
const auto& next_ckey = mut.partition().clustered_rows().begin()->key();
|
||||
BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey));
|
||||
mutation_application_stats stats;
|
||||
_collected_muts.back().partition().apply(*_schema, mut.partition(), *mut.schema(), stats);
|
||||
// The new mutation is a new partition.
|
||||
} else {
|
||||
if (!_collected_muts.empty()) {
|
||||
BOOST_REQUIRE(!_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
|
||||
}
|
||||
_collected_muts.push_back(std::move(mut));
|
||||
}
|
||||
|
||||
if (_buffer_rows >= _max_rows_hard) { // buffer flushed on hard limit
|
||||
_buffer_rows = 0;
|
||||
testlog.trace("consumer_verifier::check(): buffer ends on hard limit");
|
||||
} else if (_buffer_rows >= _max_rows_soft) { // buffer flushed on soft limit
|
||||
_buffer_rows = 0;
|
||||
testlog.trace("consumer_verifier::check(): buffer ends on soft limit");
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
consumer_verifier(schema_ptr schema, reader_permit permit, const partition_size_map& partition_rows, std::vector<mutation>& collected_muts, bool& failed)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _partition_rows(partition_rows)
|
||||
, _collected_muts(collected_muts)
|
||||
, _failed(failed)
|
||||
, _rl(std::make_unique<row_locker>(_schema))
|
||||
, _rl_stats(std::make_unique<row_locker::stats>())
|
||||
, _less_cmp(*_schema)
|
||||
, _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit))
|
||||
, _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit))
|
||||
{ }
|
||||
|
||||
future<row_locker::lock_holder> operator()(mutation mut) {
|
||||
try {
|
||||
check(std::move(mut));
|
||||
} catch (...) {
|
||||
testlog.error("consumer_verifier::operator(): caught unexpected exception {}", std::current_exception());
|
||||
_failed |= true;
|
||||
}
|
||||
return _rl->lock_pk(_collected_muts.back().decorated_key(), true, db::no_timeout, *_rl_stats);
|
||||
}
|
||||
};
|
||||
|
||||
reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name());
|
||||
|
||||
auto schema = schema_builder("ks", "cf")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", bytes_type)
|
||||
.build();
|
||||
|
||||
const auto blob_100kb = bytes(100 * 1024, bytes::value_type(0xab));
|
||||
const abort_source as;
|
||||
|
||||
const auto partition_size_sets = std::vector<std::vector<int>>{{12}, {8, 4}, {8, 16}, {22}, {8, 8, 8, 8}, {8, 8, 8, 16, 8}, {8, 20, 16, 16}, {50}, {21}, {21, 2}};
|
||||
const auto max_partition_set_size = std::ranges::max_element(partition_size_sets, [] (const std::vector<int>& a, const std::vector<int>& b) { return a.size() < b.size(); })->size();
|
||||
auto pkeys = ranges::to<std::vector<dht::decorated_key>>(std::views::iota(size_t{0}, max_partition_set_size) | std::views::transform([schema] (int i) {
|
||||
return dht::decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(data_value(i))));
|
||||
}));
|
||||
std::ranges::sort(pkeys, dht::ring_position_less_comparator(*schema));
|
||||
|
||||
for (auto partition_sizes_100kb : partition_size_sets) {
|
||||
testlog.debug("partition_sizes_100kb={}", partition_sizes_100kb);
|
||||
partition_size_map partition_rows{dht::ring_position_less_comparator(*schema)};
|
||||
std::vector<mutation> muts;
|
||||
auto pk = 0;
|
||||
for (auto partition_size_100kb : partition_sizes_100kb) {
|
||||
auto mut_desc = tests::data_model::mutation_description(pkeys.at(pk++).key().explode(*schema));
|
||||
for (auto ck = 0; ck < partition_size_100kb; ++ck) {
|
||||
mut_desc.add_clustered_cell({int32_type->decompose(data_value(ck))}, "v", tests::data_model::mutation_description::value(blob_100kb));
|
||||
}
|
||||
muts.push_back(mut_desc.build(schema));
|
||||
partition_rows.emplace(muts.back().decorated_key(), partition_size_100kb);
|
||||
}
|
||||
|
||||
std::ranges::sort(muts, [less = dht::ring_position_less_comparator(*schema)] (const mutation& a, const mutation& b) {
|
||||
return less(a.decorated_key(), b.decorated_key());
|
||||
});
|
||||
|
||||
auto permit = sem.make_permit();
|
||||
|
||||
auto mt = make_lw_shared<memtable>(schema);
|
||||
for (const auto& mut : muts) {
|
||||
mt->apply(mut);
|
||||
}
|
||||
|
||||
auto ms = mutation_source([mt] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_restricted_flat_reader(mt->as_data_source(), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
});
|
||||
auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
schema,
|
||||
permit,
|
||||
query::full_partition_range,
|
||||
schema->full_slice(),
|
||||
service::get_local_streaming_priority(),
|
||||
nullptr,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
std::vector<mutation> collected_muts;
|
||||
bool failed = false;
|
||||
|
||||
staging_reader.consume_in_thread(db::view::view_updating_consumer(schema, as, staging_reader_handle,
|
||||
consumer_verifier(schema, permit, partition_rows, collected_muts, failed)), db::no_timeout);
|
||||
|
||||
BOOST_REQUIRE(!failed);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(muts.size(), collected_muts.size());
|
||||
for (size_t i = 0; i < muts.size(); ++i) {
|
||||
testlog.trace("compare mutation {}", i);
|
||||
BOOST_REQUIRE_EQUAL(muts[i], collected_muts[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -459,7 +459,11 @@ public:
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
|
||||
database_config dbcfg;
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
if (cfg_in.dbcfg) {
|
||||
dbcfg = std::move(*cfg_in.dbcfg);
|
||||
} else {
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
}
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
auto stop_db = defer([&db] {
|
||||
db.stop().get();
|
||||
|
||||
@@ -64,6 +64,7 @@ namespace db {
|
||||
class cql_test_config {
|
||||
public:
|
||||
seastar::shared_ptr<db::config> db_config;
|
||||
std::optional<database_config> dbcfg;
|
||||
std::set<sstring> disabled_features;
|
||||
|
||||
cql_test_config();
|
||||
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-32-20200610
|
||||
docker.io/scylladb/scylla-toolchain:fedora-32-branch-4.2-20200731
|
||||
|
||||
@@ -36,6 +36,9 @@ uint64_t from_varint_to_integer(const utils::multiprecision_int& varint) {
|
||||
return static_cast<uint64_t>(~static_cast<uint64_t>(0) & boost::multiprecision::cpp_int(varint));
|
||||
}
|
||||
|
||||
big_decimal::big_decimal() : big_decimal(0, 0) {}
|
||||
big_decimal::big_decimal(int32_t scale, boost::multiprecision::cpp_int unscaled_value)
|
||||
: _scale(scale), _unscaled_value(std::move(unscaled_value)) {}
|
||||
|
||||
big_decimal::big_decimal(sstring_view text)
|
||||
{
|
||||
@@ -82,6 +85,20 @@ big_decimal::big_decimal(sstring_view text)
|
||||
_scale += fraction.size();
|
||||
}
|
||||
|
||||
boost::multiprecision::cpp_rational big_decimal::as_rational() const {
|
||||
boost::multiprecision::cpp_int ten(10);
|
||||
auto unscaled_value = static_cast<const boost::multiprecision::cpp_int&>(_unscaled_value);
|
||||
boost::multiprecision::cpp_rational r = unscaled_value;
|
||||
int32_t abs_scale = std::abs(_scale);
|
||||
auto pow = boost::multiprecision::pow(ten, abs_scale);
|
||||
if (_scale < 0) {
|
||||
r *= pow;
|
||||
} else {
|
||||
r /= pow;
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
sstring big_decimal::to_string() const
|
||||
{
|
||||
if (!_unscaled_value) {
|
||||
|
||||
@@ -39,13 +39,12 @@ public:
|
||||
};
|
||||
|
||||
explicit big_decimal(sstring_view text);
|
||||
big_decimal() : big_decimal(0, 0) {}
|
||||
big_decimal(int32_t scale, boost::multiprecision::cpp_int unscaled_value)
|
||||
: _scale(scale), _unscaled_value(unscaled_value)
|
||||
{ }
|
||||
big_decimal();
|
||||
big_decimal(int32_t scale, boost::multiprecision::cpp_int unscaled_value);
|
||||
|
||||
int32_t scale() const { return _scale; }
|
||||
const boost::multiprecision::cpp_int& unscaled_value() const { return _unscaled_value; }
|
||||
boost::multiprecision::cpp_rational as_rational() const;
|
||||
|
||||
sstring to_string() const;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user