Compare commits
21 Commits
scylla-4.0
...
next-4.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ae9a56466 | ||
|
|
0374c1d040 | ||
|
|
9cb0fe3b33 | ||
|
|
a813ff4da2 | ||
|
|
d5936147f4 | ||
|
|
a3d3b4e185 | ||
|
|
4ca2576c98 | ||
|
|
e99a0c7b89 | ||
|
|
f8c7605657 | ||
|
|
7b9e33dcd4 | ||
|
|
d86a31097a | ||
|
|
bd9d6f8e45 | ||
|
|
11ef23e97a | ||
|
|
2c0eac09ae | ||
|
|
713a7269d0 | ||
|
|
1724301d4d | ||
|
|
9971f2f5db | ||
|
|
ee328c22ca | ||
|
|
3a9c9a8a12 | ||
|
|
c03445871a | ||
|
|
565ac1b092 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.0.8
|
||||
VERSION=4.0.11
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -887,15 +887,24 @@ class attribute_collector {
|
||||
void add(bytes&& name, atomic_cell&& cell) {
|
||||
collected.emplace(std::move(name), std::move(cell));
|
||||
}
|
||||
void add(const bytes& name, atomic_cell&& cell) {
|
||||
collected.emplace(name, std::move(cell));
|
||||
}
|
||||
public:
|
||||
attribute_collector() : collected(attrs_type()->get_keys_type()->as_less_comparator()) { }
|
||||
void put(bytes&& name, bytes&& val, api::timestamp_type ts) {
|
||||
add(std::move(name), atomic_cell::make_live(*bytes_type, ts, std::move(val), atomic_cell::collection_member::yes));
|
||||
void put(bytes&& name, const bytes& val, api::timestamp_type ts) {
|
||||
add(std::move(name), atomic_cell::make_live(*bytes_type, ts, val, atomic_cell::collection_member::yes));
|
||||
|
||||
}
|
||||
void put(const bytes& name, const bytes& val, api::timestamp_type ts) {
|
||||
add(name, atomic_cell::make_live(*bytes_type, ts, val, atomic_cell::collection_member::yes));
|
||||
}
|
||||
void del(bytes&& name, api::timestamp_type ts) {
|
||||
add(std::move(name), atomic_cell::make_dead(ts, gc_clock::now()));
|
||||
}
|
||||
void del(const bytes& name, api::timestamp_type ts) {
|
||||
add(name, atomic_cell::make_dead(ts, gc_clock::now()));
|
||||
}
|
||||
collection_mutation_description to_mut() {
|
||||
collection_mutation_description ret;
|
||||
for (auto&& e : collected) {
|
||||
@@ -975,7 +984,7 @@ public:
|
||||
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item);
|
||||
// put_or_delete_item doesn't keep a reference to schema (so it can be
|
||||
// moved between shards for LWT) so it needs to be given again to build():
|
||||
mutation build(schema_ptr schema, api::timestamp_type ts);
|
||||
mutation build(schema_ptr schema, api::timestamp_type ts) const;
|
||||
const partition_key& pk() const { return _pk; }
|
||||
const clustering_key& ck() const { return _ck; }
|
||||
};
|
||||
@@ -1004,7 +1013,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
}
|
||||
}
|
||||
|
||||
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) const {
|
||||
mutation m(schema, _pk);
|
||||
// If there's no clustering key, a tombstone should be created directly
|
||||
// on a partition, not on a clustering row - otherwise it will look like
|
||||
@@ -1026,7 +1035,7 @@ mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||
for (auto& c : *_cells) {
|
||||
const column_definition* cdef = schema->get_column_definition(c.column_name);
|
||||
if (!cdef) {
|
||||
attrs_collector.put(std::move(c.column_name), std::move(c.value), ts);
|
||||
attrs_collector.put(c.column_name, c.value, ts);
|
||||
} else {
|
||||
row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, std::move(c.value)));
|
||||
}
|
||||
@@ -1326,7 +1335,7 @@ public:
|
||||
check_needs_read_before_write(_condition_expression) ||
|
||||
_returnvalues == returnvalues::ALL_OLD;
|
||||
}
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) override {
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const override {
|
||||
std::unordered_set<std::string> used_attribute_values;
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
if (!verify_expected(_request, previous_item) ||
|
||||
@@ -1338,6 +1347,7 @@ public:
|
||||
// efficient than throwing an exception.
|
||||
return {};
|
||||
}
|
||||
_return_attributes = {};
|
||||
if (_returnvalues == returnvalues::ALL_OLD && previous_item) {
|
||||
// previous_item is supposed to have been created with
|
||||
// describe_item(), so has the "Item" attribute:
|
||||
@@ -1404,7 +1414,7 @@ public:
|
||||
check_needs_read_before_write(_condition_expression) ||
|
||||
_returnvalues == returnvalues::ALL_OLD;
|
||||
}
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) override {
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const override {
|
||||
std::unordered_set<std::string> used_attribute_values;
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
if (!verify_expected(_request, previous_item) ||
|
||||
@@ -1416,6 +1426,7 @@ public:
|
||||
// efficient than throwing an exception.
|
||||
return {};
|
||||
}
|
||||
_return_attributes = {};
|
||||
if (_returnvalues == returnvalues::ALL_OLD && previous_item) {
|
||||
rjson::value* item = rjson::find(*previous_item, "Item");
|
||||
if (item) {
|
||||
@@ -1499,7 +1510,7 @@ public:
|
||||
virtual ~put_or_delete_item_cas_request() = default;
|
||||
virtual std::optional<mutation> apply(query::result& qr, const query::partition_slice& slice, api::timestamp_type ts) override {
|
||||
std::optional<mutation> ret;
|
||||
for (put_or_delete_item& mutation_builder : _mutation_builders) {
|
||||
for (const put_or_delete_item& mutation_builder : _mutation_builders) {
|
||||
// We assume all these builders have the same partition.
|
||||
if (ret) {
|
||||
ret->apply(mutation_builder.build(schema, ts));
|
||||
@@ -2324,7 +2335,7 @@ public:
|
||||
|
||||
update_item_operation(service::storage_proxy& proxy, rjson::value&& request);
|
||||
virtual ~update_item_operation() = default;
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) override;
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const override;
|
||||
bool needs_read_before_write() const;
|
||||
};
|
||||
|
||||
@@ -2388,7 +2399,7 @@ update_item_operation::needs_read_before_write() const {
|
||||
}
|
||||
|
||||
std::optional<mutation>
|
||||
update_item_operation::apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) {
|
||||
update_item_operation::apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const {
|
||||
std::unordered_set<std::string> used_attribute_values;
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
if (!verify_expected(_request, previous_item) ||
|
||||
|
||||
@@ -83,7 +83,11 @@ protected:
|
||||
// When _returnvalues != NONE, apply() should store here, in JSON form,
|
||||
// the values which are to be returned in the "Attributes" field.
|
||||
// The default null JSON means do not return an Attributes field at all.
|
||||
rjson::value _return_attributes;
|
||||
// This field is marked "mutable" so that the const apply() can modify
|
||||
// it (see explanation below), but note that because apply() may be
|
||||
// called more than once, if apply() will sometimes set this field it
|
||||
// must set it (even if just to the default empty value) every time.
|
||||
mutable rjson::value _return_attributes;
|
||||
public:
|
||||
// The constructor of a rmw_operation subclass should parse the request
|
||||
// and try to discover as many input errors as it can before really
|
||||
@@ -96,7 +100,12 @@ public:
|
||||
// conditional expression, apply() should return an empty optional.
|
||||
// apply() may throw if it encounters input errors not discovered during
|
||||
// the constructor.
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) = 0;
|
||||
// apply() may be called more than once in case of contention, so it must
|
||||
// not change the state saved in the object (issue #7218 was caused by
|
||||
// violating this). We mark apply() "const" to let the compiler validate
|
||||
// this for us. The output-only field _return_attributes is marked
|
||||
// "mutable" above so that apply() can still write to it.
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const = 0;
|
||||
// Convert the above apply() into the signature needed by cas_request:
|
||||
virtual std::optional<mutation> apply(query::result& qr, const query::partition_slice& slice, api::timestamp_type ts) override;
|
||||
virtual ~rmw_operation() = default;
|
||||
|
||||
@@ -381,6 +381,7 @@ scylla_tests = set([
|
||||
'test/boost/view_schema_ckey_test',
|
||||
'test/boost/vint_serialization_test',
|
||||
'test/boost/virtual_reader_test',
|
||||
'test/boost/stall_free_test',
|
||||
'test/manual/ec2_snitch_test',
|
||||
'test/manual/gce_snitch_test',
|
||||
'test/manual/gossip',
|
||||
|
||||
@@ -417,7 +417,7 @@ std::vector<const column_definition*> statement_restrictions::get_column_defs_fo
|
||||
_clustering_columns_restrictions->num_prefix_columns_that_need_not_be_filtered();
|
||||
for (auto&& cdef : _clustering_columns_restrictions->get_column_defs()) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
if (single_ck_restrs) {
|
||||
auto it = single_ck_restrs->restrictions().find(cdef);
|
||||
if (it != single_ck_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
|
||||
@@ -225,7 +225,7 @@ void alter_table_statement::add_column(const schema& schema, const table& cf, sc
|
||||
schema_builder builder(view);
|
||||
if (view->view_info()->include_all_columns()) {
|
||||
builder.with_column(column_name.name(), type);
|
||||
} else if (view->view_info()->base_non_pk_columns_in_view_pk().empty()) {
|
||||
} else if (!view->view_info()->has_base_non_pk_columns_in_view_pk()) {
|
||||
db::view::create_virtual_column(builder, column_name.name(), type);
|
||||
}
|
||||
view_updates.push_back(view_ptr(builder.build()));
|
||||
|
||||
@@ -55,6 +55,7 @@
|
||||
#include <limits>
|
||||
#include <cstddef>
|
||||
#include "schema_fwd.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "db/schema_features.hh"
|
||||
#include "gms/feature.hh"
|
||||
#include "timestamp.hh"
|
||||
@@ -885,7 +886,7 @@ public:
|
||||
lw_shared_ptr<const sstable_list> get_sstables_including_compacted_undeleted() const;
|
||||
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const;
|
||||
std::vector<sstables::shared_sstable> select_sstables(const dht::partition_range& range) const;
|
||||
std::vector<sstables::shared_sstable> candidates_for_compaction() const;
|
||||
std::vector<sstables::shared_sstable> non_staging_sstables() const;
|
||||
std::vector<sstables::shared_sstable> sstables_need_rewrite() const;
|
||||
size_t sstables_count() const;
|
||||
std::vector<uint64_t> sstable_count_per_level() const;
|
||||
@@ -981,8 +982,9 @@ public:
|
||||
return *_config.sstables_manager;
|
||||
}
|
||||
|
||||
// Reader's schema must be the same as the base schema of each of the views.
|
||||
future<> populate_views(
|
||||
std::vector<view_ptr>,
|
||||
std::vector<db::view::view_and_base>,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&&);
|
||||
|
||||
@@ -998,7 +1000,7 @@ private:
|
||||
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source, const io_priority_class& io_priority) const;
|
||||
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
|
||||
future<> generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
std::vector<db::view::view_and_base>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings) const;
|
||||
|
||||
|
||||
@@ -822,6 +822,14 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_s
|
||||
});
|
||||
}
|
||||
|
||||
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
|
||||
return merge_lock().then([&proxy, &feat] {
|
||||
return update_schema_version_and_announce(proxy, feat.cluster_schema_features());
|
||||
}).finally([] {
|
||||
return merge_unlock();
|
||||
});
|
||||
}
|
||||
|
||||
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
{
|
||||
return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] () mutable {
|
||||
|
||||
@@ -170,6 +170,13 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_s
|
||||
|
||||
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush);
|
||||
|
||||
// Recalculates the local schema version and publishes it in gossip.
|
||||
//
|
||||
// It is safe to call concurrently with recalculate_schema_version() and merge_schema() in which case it
|
||||
// is guaranteed that the schema version we end up with after all calls will reflect the most recent state
|
||||
// of feature_service and schema tables.
|
||||
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat);
|
||||
|
||||
future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after);
|
||||
|
||||
std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
|
||||
|
||||
@@ -130,17 +130,26 @@ const column_definition* view_info::view_column(const column_definition& base_de
|
||||
return _schema.get_column_definition(base_def.name());
|
||||
}
|
||||
|
||||
const std::vector<column_id>& view_info::base_non_pk_columns_in_view_pk() const {
|
||||
return _base_non_pk_columns_in_view_pk;
|
||||
void view_info::set_base_info(db::view::base_info_ptr base_info) {
|
||||
_base_info = std::move(base_info);
|
||||
}
|
||||
|
||||
void view_info::initialize_base_dependent_fields(const schema& base) {
|
||||
db::view::base_info_ptr view_info::make_base_dependent_view_info(const schema& base) const {
|
||||
std::vector<column_id> base_non_pk_columns_in_view_pk;
|
||||
for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) {
|
||||
auto* base_col = base.get_column_definition(view_col.name());
|
||||
if (base_col && !base_col->is_primary_key()) {
|
||||
_base_non_pk_columns_in_view_pk.push_back(base_col->id);
|
||||
base_non_pk_columns_in_view_pk.push_back(base_col->id);
|
||||
}
|
||||
}
|
||||
return make_lw_shared<db::view::base_dependent_view_info>({
|
||||
.base_schema = base.shared_from_this(),
|
||||
.base_non_pk_columns_in_view_pk = std::move(base_non_pk_columns_in_view_pk)
|
||||
});
|
||||
}
|
||||
|
||||
bool view_info::has_base_non_pk_columns_in_view_pk() const {
|
||||
return !_base_info->base_non_pk_columns_in_view_pk.empty();
|
||||
}
|
||||
|
||||
namespace db {
|
||||
@@ -188,11 +197,11 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de
|
||||
}
|
||||
|
||||
static bool update_requires_read_before_write(const schema& base,
|
||||
const std::vector<view_ptr>& views,
|
||||
const std::vector<view_and_base>& views,
|
||||
const dht::decorated_key& key,
|
||||
const rows_entry& update) {
|
||||
for (auto&& v : views) {
|
||||
view_info& vf = *v->view_info();
|
||||
view_info& vf = *v.view->view_info();
|
||||
if (may_be_affected_by(base, vf, key, update)) {
|
||||
return true;
|
||||
}
|
||||
@@ -239,12 +248,14 @@ class view_updates final {
|
||||
view_ptr _view;
|
||||
const view_info& _view_info;
|
||||
schema_ptr _base;
|
||||
base_info_ptr _base_info;
|
||||
std::unordered_map<partition_key, mutation_partition, partition_key::hashing, partition_key::equality> _updates;
|
||||
public:
|
||||
explicit view_updates(view_ptr view, schema_ptr base)
|
||||
: _view(std::move(view))
|
||||
explicit view_updates(view_and_base vab)
|
||||
: _view(std::move(vab.view))
|
||||
, _view_info(*_view->view_info())
|
||||
, _base(std::move(base))
|
||||
, _base(vab.base->base_schema)
|
||||
, _base_info(vab.base)
|
||||
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) {
|
||||
}
|
||||
|
||||
@@ -306,7 +317,7 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
// they share liveness information. It's true especially in the only case currently allowed by CQL,
|
||||
// which assumes there's up to one non-pk column in the view key. It's also true in alternator,
|
||||
// which does not carry TTL information.
|
||||
const auto& col_ids = _view_info.base_non_pk_columns_in_view_pk();
|
||||
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk;
|
||||
if (!col_ids.empty()) {
|
||||
auto& def = _base->regular_column_at(col_ids[0]);
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
@@ -537,7 +548,7 @@ void view_updates::delete_old_entry(const partition_key& base_key, const cluster
|
||||
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
|
||||
auto& r = get_view_row(base_key, existing);
|
||||
const auto& col_ids = _view_info.base_non_pk_columns_in_view_pk();
|
||||
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk;
|
||||
if (!col_ids.empty()) {
|
||||
// We delete the old row using a shadowable row tombstone, making sure that
|
||||
// the tombstone deletes everything in the row (or it might still show up).
|
||||
@@ -678,7 +689,7 @@ void view_updates::generate_update(
|
||||
return;
|
||||
}
|
||||
|
||||
const auto& col_ids = _view_info.base_non_pk_columns_in_view_pk();
|
||||
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk;
|
||||
if (col_ids.empty()) {
|
||||
// The view key is necessarily the same pre and post update.
|
||||
if (existing && existing->is_live(*_base)) {
|
||||
@@ -932,11 +943,16 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
|
||||
future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
|
||||
const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views_to_update,
|
||||
std::vector<view_and_base>&& views_to_update,
|
||||
flat_mutation_reader&& updates,
|
||||
flat_mutation_reader_opt&& existings) {
|
||||
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (auto&& v) {
|
||||
return view_updates(std::move(v), base);
|
||||
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (view_and_base v) {
|
||||
if (base->version() != v.base->base_schema->version()) {
|
||||
on_internal_error(vlogger, format("Schema version used for view updates ({}) does not match the current"
|
||||
" base schema version of the view ({}) for view {}.{} of {}.{}",
|
||||
base->version(), v.base->base_schema->version(), v.view->ks_name(), v.view->cf_name(), base->ks_name(), base->cf_name()));
|
||||
}
|
||||
return view_updates(std::move(v));
|
||||
}));
|
||||
auto builder = std::make_unique<view_update_builder>(base, std::move(vs), std::move(updates), std::move(existings));
|
||||
auto f = builder->build();
|
||||
@@ -946,18 +962,18 @@ future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
|
||||
query::clustering_row_ranges calculate_affected_clustering_ranges(const schema& base,
|
||||
const dht::decorated_key& key,
|
||||
const mutation_partition& mp,
|
||||
const std::vector<view_ptr>& views) {
|
||||
const std::vector<view_and_base>& views) {
|
||||
std::vector<nonwrapping_range<clustering_key_prefix_view>> row_ranges;
|
||||
std::vector<nonwrapping_range<clustering_key_prefix_view>> view_row_ranges;
|
||||
clustering_key_prefix_view::tri_compare cmp(base);
|
||||
if (mp.partition_tombstone() || !mp.row_tombstones().empty()) {
|
||||
for (auto&& v : views) {
|
||||
// FIXME: #2371
|
||||
if (v->view_info()->select_statement().get_restrictions()->has_unrestricted_clustering_columns()) {
|
||||
if (v.view->view_info()->select_statement().get_restrictions()->has_unrestricted_clustering_columns()) {
|
||||
view_row_ranges.push_back(nonwrapping_range<clustering_key_prefix_view>::make_open_ended_both_sides());
|
||||
break;
|
||||
}
|
||||
for (auto&& r : v->view_info()->partition_slice().default_row_ranges()) {
|
||||
for (auto&& r : v.view->view_info()->partition_slice().default_row_ranges()) {
|
||||
view_row_ranges.push_back(r.transform(std::mem_fn(&clustering_key_prefix::view)));
|
||||
}
|
||||
}
|
||||
@@ -1717,7 +1733,7 @@ public:
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
_fragments_memory_usage += cr.memory_usage(*_step.base->schema());
|
||||
_fragments_memory_usage += cr.memory_usage(*_step.reader.schema());
|
||||
_fragments.push_back(std::move(cr));
|
||||
if (_fragments_memory_usage > batch_memory_max) {
|
||||
// Although we have not yet completed the batch of base rows that
|
||||
@@ -1737,10 +1753,14 @@ public:
|
||||
_builder._as.check();
|
||||
if (!_fragments.empty()) {
|
||||
_fragments.push_front(partition_start(_step.current_key, tombstone()));
|
||||
auto base_schema = _step.base->schema();
|
||||
auto views = with_base_info_snapshot(_views_to_build);
|
||||
auto reader = make_flat_mutation_reader_from_fragments(_step.reader.schema(), std::move(_fragments));
|
||||
reader.upgrade_schema(base_schema);
|
||||
_step.base->populate_views(
|
||||
_views_to_build,
|
||||
std::move(views),
|
||||
_step.current_token(),
|
||||
make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get();
|
||||
std::move(reader)).get();
|
||||
_fragments.clear();
|
||||
_fragments_memory_usage = 0;
|
||||
}
|
||||
@@ -1887,5 +1907,11 @@ future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_d
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<db::view::view_and_base> with_base_info_snapshot(std::vector<view_ptr> vs) {
|
||||
return boost::copy_range<std::vector<db::view::view_and_base>>(vs | boost::adaptors::transformed([] (const view_ptr& v) {
|
||||
return db::view::view_and_base{v, v->view_info()->base_info()};
|
||||
}));
|
||||
}
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
@@ -43,6 +43,27 @@ namespace db {
|
||||
|
||||
namespace view {
|
||||
|
||||
// Part of the view description which depends on the base schema version.
|
||||
//
|
||||
// This structure may change even though the view schema doesn't change, so
|
||||
// it needs to live outside view_ptr.
|
||||
struct base_dependent_view_info {
|
||||
schema_ptr base_schema;
|
||||
|
||||
// Id of a regular base table column included in the view's PK, if any.
|
||||
// Scylla views only allow one such column, alternator can have up to two.
|
||||
std::vector<column_id> base_non_pk_columns_in_view_pk;
|
||||
};
|
||||
|
||||
// Immutable snapshot of view's base-schema-dependent part.
|
||||
using base_info_ptr = lw_shared_ptr<const base_dependent_view_info>;
|
||||
|
||||
// Snapshot of the view schema and its base-schema-dependent part.
|
||||
struct view_and_base {
|
||||
view_ptr view;
|
||||
base_info_ptr base;
|
||||
};
|
||||
|
||||
/**
|
||||
* Whether the view filter considers the specified partition key.
|
||||
*
|
||||
@@ -92,7 +113,7 @@ bool clustering_prefix_matches(const schema& base, const partition_key& key, con
|
||||
|
||||
future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
|
||||
const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views_to_update,
|
||||
std::vector<view_and_base>&& views_to_update,
|
||||
flat_mutation_reader&& updates,
|
||||
flat_mutation_reader_opt&& existings);
|
||||
|
||||
@@ -100,7 +121,7 @@ query::clustering_row_ranges calculate_affected_clustering_ranges(
|
||||
const schema& base,
|
||||
const dht::decorated_key& key,
|
||||
const mutation_partition& mp,
|
||||
const std::vector<view_ptr>& views);
|
||||
const std::vector<view_and_base>& views);
|
||||
|
||||
struct wait_for_all_updates_tag {};
|
||||
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
|
||||
@@ -128,6 +149,13 @@ future<> mutate_MV(
|
||||
*/
|
||||
void create_virtual_column(schema_builder& builder, const bytes& name, const data_type& type);
|
||||
|
||||
/**
|
||||
* Converts a collection of view schema snapshots into a collection of
|
||||
* view_and_base objects, which are snapshots of both the view schema
|
||||
* and the base-schema-dependent part of view description.
|
||||
*/
|
||||
std::vector<view_and_base> with_base_info_snapshot(std::vector<view_ptr>);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -182,7 +182,7 @@ class aws_instance:
|
||||
instance_size = self.instance_size()
|
||||
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
|
||||
return 'ixgbevf'
|
||||
if instance_class in ['a1', 'c5', 'c5d', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
|
||||
return 'ena'
|
||||
if instance_class == 'm4':
|
||||
if instance_size == '16xlarge':
|
||||
|
||||
@@ -487,6 +487,9 @@ public:
|
||||
size_t buffer_size() const {
|
||||
return _impl->buffer_size();
|
||||
}
|
||||
const circular_buffer<mutation_fragment>& buffer() const {
|
||||
return _impl->buffer();
|
||||
}
|
||||
// Detach the internal buffer of the reader.
|
||||
// Roughly equivalent to depleting it by calling pop_mutation_fragment()
|
||||
// until is_buffer_empty() returns true.
|
||||
|
||||
@@ -126,6 +126,7 @@ relocate_python3() {
|
||||
cp "$script" "$relocateddir"
|
||||
cat > "$install"<<EOF
|
||||
#!/usr/bin/env bash
|
||||
export LC_ALL=en_US.UTF-8
|
||||
x="\$(readlink -f "\$0")"
|
||||
b="\$(basename "\$x")"
|
||||
d="\$(dirname "\$x")"
|
||||
|
||||
@@ -1721,7 +1721,7 @@ void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
|
||||
// we erase the live cells according to the shadowable_tombstone rules.
|
||||
static bool dead_marker_shadows_row(const schema& s, column_kind kind, const row_marker& marker) {
|
||||
return s.is_view()
|
||||
&& !s.view_info()->base_non_pk_columns_in_view_pk().empty()
|
||||
&& s.view_info()->has_base_non_pk_columns_in_view_pk()
|
||||
&& !marker.is_live()
|
||||
&& kind == column_kind::regular_column; // not applicable to static rows
|
||||
}
|
||||
|
||||
@@ -113,9 +113,6 @@ class reconcilable_result_builder {
|
||||
const schema& _schema;
|
||||
const query::partition_slice& _slice;
|
||||
|
||||
utils::chunked_vector<partition> _result;
|
||||
uint32_t _live_rows{};
|
||||
|
||||
bool _return_static_content_on_partition_with_no_rows{};
|
||||
bool _static_row_is_alive{};
|
||||
uint32_t _total_live_rows = 0;
|
||||
@@ -123,6 +120,10 @@ class reconcilable_result_builder {
|
||||
stop_iteration _stop;
|
||||
bool _short_read_allowed;
|
||||
std::optional<streamed_mutation_freezer> _mutation_consumer;
|
||||
|
||||
uint32_t _live_rows{};
|
||||
// make this the last member so it is destroyed first. #7240
|
||||
utils::chunked_vector<partition> _result;
|
||||
public:
|
||||
reconcilable_result_builder(const schema& s, const query::partition_slice& slice,
|
||||
query::result_memory_accounter&& accounter)
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "schema_registry.hh"
|
||||
#include "mutation_compactor.hh"
|
||||
|
||||
logging::logger mrlog("mutation_reader");
|
||||
|
||||
static constexpr size_t merger_small_vector_size = 4;
|
||||
|
||||
@@ -1028,6 +1029,13 @@ private:
|
||||
bool _reader_created = false;
|
||||
bool _drop_partition_start = false;
|
||||
bool _drop_static_row = false;
|
||||
// Trim range tombstones on the start of the buffer to the start of the read
|
||||
// range (_next_position_in_partition). Set after reader recreation.
|
||||
// Also validate the first not-trimmed mutation fragment's position.
|
||||
bool _trim_range_tombstones = false;
|
||||
// Validate the partition key of the first emitted partition, set after the
|
||||
// reader was recreated.
|
||||
bool _validate_partition_key = false;
|
||||
position_in_partition::tri_compare _tri_cmp;
|
||||
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
@@ -1049,7 +1057,10 @@ private:
|
||||
void adjust_partition_slice();
|
||||
flat_mutation_reader recreate_reader();
|
||||
flat_mutation_reader resume_or_create_reader();
|
||||
void maybe_validate_partition_start(const circular_buffer<mutation_fragment>& buffer);
|
||||
void validate_position_in_partition(position_in_partition_view pos) const;
|
||||
bool should_drop_fragment(const mutation_fragment& mf);
|
||||
bool maybe_trim_range_tombstone(mutation_fragment& mf) const;
|
||||
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
future<> fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
|
||||
@@ -1122,16 +1133,11 @@ void evictable_reader::update_next_position(flat_mutation_reader& reader) {
|
||||
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
if (reader.is_buffer_empty()) {
|
||||
_next_position_in_partition = position_in_partition::after_key(last_pos);
|
||||
} else {
|
||||
const auto& next_frag = reader.peek_buffer();
|
||||
if (next_frag.is_end_of_partition()) {
|
||||
if (!reader.is_buffer_empty() && reader.peek_buffer().is_end_of_partition()) {
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
} else {
|
||||
_next_position_in_partition = position_in_partition(next_frag.position());
|
||||
}
|
||||
} else {
|
||||
_next_position_in_partition = position_in_partition::after_key(last_pos);
|
||||
}
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
@@ -1156,6 +1162,9 @@ flat_mutation_reader evictable_reader::recreate_reader() {
|
||||
const dht::partition_range* range = _pr;
|
||||
const query::partition_slice* slice = &_ps;
|
||||
|
||||
_range_override.reset();
|
||||
_slice_override.reset();
|
||||
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
@@ -1192,6 +1201,9 @@ flat_mutation_reader evictable_reader::recreate_reader() {
|
||||
range = &*_range_override;
|
||||
}
|
||||
|
||||
_trim_range_tombstones = true;
|
||||
_validate_partition_key = true;
|
||||
|
||||
return _ms.make_reader(
|
||||
_schema,
|
||||
no_reader_permit(),
|
||||
@@ -1218,6 +1230,78 @@ flat_mutation_reader evictable_reader::resume_or_create_reader() {
|
||||
return recreate_reader();
|
||||
}
|
||||
|
||||
template <typename... Arg>
|
||||
static void require(bool condition, const char* msg, const Arg&... arg) {
|
||||
if (!condition) {
|
||||
on_internal_error(mrlog, format(msg, arg...));
|
||||
}
|
||||
}
|
||||
|
||||
void evictable_reader::maybe_validate_partition_start(const circular_buffer<mutation_fragment>& buffer) {
|
||||
if (!_validate_partition_key || buffer.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// If this is set we can assume the first fragment is a partition-start.
|
||||
const auto& ps = buffer.front().as_partition_start();
|
||||
const auto tri_cmp = dht::ring_position_comparator(*_schema);
|
||||
// If we recreated the reader after fast-forwarding it we won't have
|
||||
// _last_pkey set. In this case it is enough to check if the partition
|
||||
// is in range.
|
||||
if (_last_pkey) {
|
||||
const auto cmp_res = tri_cmp(*_last_pkey, ps.key());
|
||||
if (_drop_partition_start) { // should be the same partition
|
||||
require(
|
||||
cmp_res == 0,
|
||||
"{}(): validation failed, expected partition with key equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
|
||||
__FUNCTION__,
|
||||
*_last_pkey,
|
||||
ps.key());
|
||||
} else { // should be a larger partition
|
||||
require(
|
||||
cmp_res < 0,
|
||||
"{}(): validation failed, expected partition with key larger than _last_pkey {} due to _drop_partition_start being unset, but got {}",
|
||||
__FUNCTION__,
|
||||
*_last_pkey,
|
||||
ps.key());
|
||||
}
|
||||
}
|
||||
const auto& prange = _range_override ? *_range_override : *_pr;
|
||||
require(
|
||||
// TODO: somehow avoid this copy
|
||||
prange.contains(ps.key(), tri_cmp),
|
||||
"{}(): validation failed, expected partition with key that falls into current range {}, but got {}",
|
||||
__FUNCTION__,
|
||||
prange,
|
||||
ps.key());
|
||||
|
||||
_validate_partition_key = false;
|
||||
}
|
||||
|
||||
void evictable_reader::validate_position_in_partition(position_in_partition_view pos) const {
|
||||
require(
|
||||
_tri_cmp(_next_position_in_partition, pos) <= 0,
|
||||
"{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}",
|
||||
__FUNCTION__,
|
||||
_next_position_in_partition,
|
||||
pos);
|
||||
|
||||
if (_slice_override && pos.region() == partition_region::clustered) {
|
||||
const auto ranges = _slice_override->row_ranges(*_schema, _last_pkey->key());
|
||||
const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) {
|
||||
// TODO: somehow avoid this copy
|
||||
auto range = position_range(cr);
|
||||
return range.contains(*_schema, pos);
|
||||
});
|
||||
require(
|
||||
any_contains,
|
||||
"{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}",
|
||||
__FUNCTION__,
|
||||
*_slice_override,
|
||||
pos);
|
||||
}
|
||||
}
|
||||
|
||||
bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
if (_drop_partition_start && mf.is_partition_start()) {
|
||||
_drop_partition_start = false;
|
||||
@@ -1230,12 +1314,50 @@ bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool evictable_reader::maybe_trim_range_tombstone(mutation_fragment& mf) const {
|
||||
// We either didn't read a partition yet (evicted after fast-forwarding) or
|
||||
// didn't stop in a clustering region. We don't need to trim range
|
||||
// tombstones in either case.
|
||||
if (!_last_pkey || _next_position_in_partition.region() != partition_region::clustered) {
|
||||
return false;
|
||||
}
|
||||
if (!mf.is_range_tombstone()) {
|
||||
validate_position_in_partition(mf.position());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_tri_cmp(mf.position(), _next_position_in_partition) >= 0) {
|
||||
validate_position_in_partition(mf.position());
|
||||
return false; // rt in range, no need to trim
|
||||
}
|
||||
|
||||
auto& rt = mf.as_mutable_range_tombstone();
|
||||
|
||||
require(
|
||||
_tri_cmp(_next_position_in_partition, rt.end_position()) <= 0,
|
||||
"{}(): validation failed, expected range tombstone with end pos larger than _next_position_in_partition {}, but got {}",
|
||||
__FUNCTION__,
|
||||
_next_position_in_partition,
|
||||
rt.end_position());
|
||||
|
||||
rt.set_start(*_schema, position_in_partition_view::before_key(_next_position_in_partition));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
if (!_drop_partition_start && !_drop_static_row) {
|
||||
return reader.fill_buffer(timeout);
|
||||
auto fill_buf_fut = reader.fill_buffer(timeout);
|
||||
if (_validate_partition_key) {
|
||||
fill_buf_fut = fill_buf_fut.then([this, &reader] {
|
||||
maybe_validate_partition_start(reader.buffer());
|
||||
});
|
||||
}
|
||||
return fill_buf_fut;
|
||||
}
|
||||
return repeat([this, &reader, timeout] {
|
||||
return reader.fill_buffer(timeout).then([this, &reader] {
|
||||
maybe_validate_partition_start(reader.buffer());
|
||||
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
@@ -1249,6 +1371,11 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout
|
||||
if (reader.is_buffer_empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
while (_trim_range_tombstones && !reader.is_buffer_empty()) {
|
||||
auto mf = reader.pop_mutation_fragment();
|
||||
_trim_range_tombstones = maybe_trim_range_tombstone(mf);
|
||||
push_mutation_fragment(std::move(mf));
|
||||
}
|
||||
reader.move_buffer_content_to(*this);
|
||||
auto stop = [this, &reader] {
|
||||
// The only problematic fragment kind is the range tombstone.
|
||||
@@ -1289,7 +1416,13 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout
|
||||
if (reader.is_buffer_empty()) {
|
||||
return do_fill_buffer(reader, timeout);
|
||||
}
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
if (_trim_range_tombstones) {
|
||||
auto mf = reader.pop_mutation_fragment();
|
||||
_trim_range_tombstones = maybe_trim_range_tombstone(mf);
|
||||
push_mutation_fragment(std::move(mf));
|
||||
} else {
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([this, &reader] {
|
||||
|
||||
@@ -163,6 +163,11 @@ public:
|
||||
return {partition_region::clustered, bound_weight::before_all_prefixed, &ck};
|
||||
}
|
||||
|
||||
// Returns a view to before_key(pos._ck) if pos.is_clustering_row() else returns pos as-is.
|
||||
static position_in_partition_view before_key(position_in_partition_view pos) {
|
||||
return {partition_region::clustered, pos._bound_weight == bound_weight::equal ? bound_weight::before_all_prefixed : pos._bound_weight, pos._ck};
|
||||
}
|
||||
|
||||
partition_region region() const { return _type; }
|
||||
bound_weight get_bound_weight() const { return _bound_weight; }
|
||||
bool is_partition_start() const { return _type == partition_region::partition_start; }
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
|
||||
|
||||
reader_permit::impl::impl(reader_concurrency_semaphore& semaphore, reader_resources base_cost) : semaphore(semaphore), base_cost(base_cost) {
|
||||
semaphore.consume(base_cost);
|
||||
}
|
||||
|
||||
reader_permit::impl::~impl() {
|
||||
@@ -88,7 +89,6 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
_resources += r;
|
||||
while (!_wait_list.empty() && has_available_units(_wait_list.front().res)) {
|
||||
auto& x = _wait_list.front();
|
||||
_resources -= x.res;
|
||||
try {
|
||||
x.pr.set_value(reader_permit(*this, x.res));
|
||||
} catch (...) {
|
||||
@@ -160,7 +160,6 @@ future<reader_permit> reader_concurrency_semaphore::wait_admission(size_t memory
|
||||
--_inactive_read_stats.population;
|
||||
}
|
||||
if (may_proceed(r)) {
|
||||
_resources -= r;
|
||||
return make_ready_future<reader_permit>(reader_permit(*this, r));
|
||||
}
|
||||
promise<reader_permit> pr;
|
||||
@@ -170,7 +169,6 @@ future<reader_permit> reader_concurrency_semaphore::wait_admission(size_t memory
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::consume_resources(resources r) {
|
||||
_resources -= r;
|
||||
return reader_permit(*this, r);
|
||||
}
|
||||
|
||||
|
||||
@@ -128,6 +128,10 @@ private:
|
||||
return has_available_units(r) && _wait_list.empty();
|
||||
}
|
||||
|
||||
void consume(resources r) {
|
||||
_resources -= r;
|
||||
}
|
||||
|
||||
void consume_memory(size_t memory) {
|
||||
_resources.memory -= memory;
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@
|
||||
#include "gms/gossiper.hh"
|
||||
#include "repair/row_level.hh"
|
||||
#include "mutation_source_metadata.hh"
|
||||
#include "utils/stall_free.hh"
|
||||
|
||||
extern logging::logger rlogger;
|
||||
|
||||
@@ -1094,24 +1095,32 @@ private:
|
||||
});
|
||||
}
|
||||
|
||||
future<> clear_row_buf() {
|
||||
return utils::clear_gently(_row_buf);
|
||||
}
|
||||
|
||||
future<> clear_working_row_buf() {
|
||||
return utils::clear_gently(_working_row_buf).then([this] {
|
||||
_working_row_buf_combined_hash.clear();
|
||||
});
|
||||
}
|
||||
|
||||
// Read rows from disk until _max_row_buf_size of rows are filled into _row_buf.
|
||||
// Calculate the combined checksum of the rows
|
||||
// Calculate the total size of the rows in _row_buf
|
||||
future<get_sync_boundary_response>
|
||||
get_sync_boundary(std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
auto f = make_ready_future<>();
|
||||
if (skipped_sync_boundary) {
|
||||
_current_sync_boundary = skipped_sync_boundary;
|
||||
_row_buf.clear();
|
||||
_working_row_buf.clear();
|
||||
_working_row_buf_combined_hash.clear();
|
||||
} else {
|
||||
_working_row_buf.clear();
|
||||
_working_row_buf_combined_hash.clear();
|
||||
f = clear_row_buf();
|
||||
}
|
||||
// Here is the place we update _last_sync_boundary
|
||||
rlogger.trace("SET _last_sync_boundary from {} to {}", _last_sync_boundary, _current_sync_boundary);
|
||||
_last_sync_boundary = _current_sync_boundary;
|
||||
return row_buf_size().then([this, sb = std::move(skipped_sync_boundary)] (size_t cur_size) {
|
||||
return f.then([this, sb = std::move(skipped_sync_boundary)] () mutable {
|
||||
return clear_working_row_buf().then([this, sb = sb] () mutable {
|
||||
return row_buf_size().then([this, sb = std::move(sb)] (size_t cur_size) {
|
||||
return read_rows_from_disk(cur_size).then([this, sb = std::move(sb)] (std::list<repair_row> new_rows, size_t new_rows_size) mutable {
|
||||
size_t new_rows_nr = new_rows.size();
|
||||
_row_buf.splice(_row_buf.end(), new_rows);
|
||||
@@ -1128,6 +1137,8 @@ private:
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> move_row_buf_to_working_row_buf() {
|
||||
@@ -1231,19 +1242,28 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
|
||||
future<> do_apply_rows(std::list<repair_row>&& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return do_with(std::move(row_diff), [this, node_idx, update_buf] (std::list<repair_row>& row_diff) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return repeat([this, node_idx, update_buf, &row_diff] () mutable {
|
||||
if (row_diff.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
repair_row& r = row_diff.front();
|
||||
if (update_buf) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
}
|
||||
// The repair_row here is supposed to have
|
||||
// mutation_fragment attached because we have stored it in
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
|
||||
row_diff.pop_front();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1261,19 +1281,17 @@ private:
|
||||
stats().rx_row_nr += row_diff.size();
|
||||
stats().rx_row_nr_peer[from] += row_diff.size();
|
||||
if (update_buf) {
|
||||
std::list<repair_row> tmp;
|
||||
tmp.swap(_working_row_buf);
|
||||
// Both row_diff and _working_row_buf and are ordered, merging
|
||||
// two sored list to make sure the combination of row_diff
|
||||
// and _working_row_buf are ordered.
|
||||
std::merge(tmp.begin(), tmp.end(), row_diff.begin(), row_diff.end(), std::back_inserter(_working_row_buf),
|
||||
[this] (const repair_row& x, const repair_row& y) { thread::maybe_yield(); return _cmp(x.boundary(), y.boundary()) < 0; });
|
||||
utils::merge_to_gently(_working_row_buf, row_diff,
|
||||
[this] (const repair_row& x, const repair_row& y) { return _cmp(x.boundary(), y.boundary()) < 0; });
|
||||
}
|
||||
if (update_hash_set) {
|
||||
_peer_row_hash_sets[node_idx] = boost::copy_range<repair_hash_set>(row_diff |
|
||||
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
|
||||
}
|
||||
do_apply_rows(row_diff, node_idx, update_buf).get();
|
||||
do_apply_rows(std::move(row_diff), node_idx, update_buf).get();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1281,11 +1299,9 @@ private:
|
||||
if (rows.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
|
||||
});
|
||||
return to_repair_rows_list(std::move(rows)).then([this] (std::list<repair_row> row_diff) {
|
||||
unsigned node_idx = 0;
|
||||
return do_apply_rows(std::move(row_diff), node_idx, update_working_row_buf::no);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -42,6 +42,8 @@
|
||||
|
||||
constexpr int32_t schema::NAME_LENGTH;
|
||||
|
||||
extern logging::logger dblog;
|
||||
|
||||
sstring to_sstring(column_kind k) {
|
||||
switch (k) {
|
||||
case column_kind::partition_key: return "PARTITION_KEY";
|
||||
@@ -575,11 +577,15 @@ schema::get_column_definition(const bytes& name) const {
|
||||
|
||||
const column_definition&
|
||||
schema::column_at(column_kind kind, column_id id) const {
|
||||
return _raw._columns.at(column_offset(kind) + id);
|
||||
return column_at(static_cast<ordinal_column_id>(column_offset(kind) + id));
|
||||
}
|
||||
|
||||
const column_definition&
|
||||
schema::column_at(ordinal_column_id ordinal_id) const {
|
||||
if (size_t(ordinal_id) >= _raw._columns.size()) {
|
||||
on_internal_error(dblog, format("{}.{}@{}: column id {:d} >= {:d}",
|
||||
ks_name(), cf_name(), version(), size_t(ordinal_id), _raw._columns.size()));
|
||||
}
|
||||
return _raw._columns.at(static_cast<column_count_type>(ordinal_id));
|
||||
}
|
||||
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 861b7edd61...748428930a
@@ -92,7 +92,7 @@ void migration_manager::init_messaging_service()
|
||||
//FIXME: future discarded.
|
||||
(void)with_gate(_background_tasks, [this] {
|
||||
mlogger.debug("features changed, recalculating schema version");
|
||||
return update_schema_version_and_announce(get_storage_proxy(), _feat.cluster_schema_features());
|
||||
return db::schema_tables::recalculate_schema_version(get_storage_proxy(), _feat);
|
||||
});
|
||||
};
|
||||
|
||||
@@ -277,9 +277,9 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
|
||||
}).finally([me = shared_from_this()] {});
|
||||
}
|
||||
|
||||
future<> migration_manager::submit_migration_task(const gms::inet_address& endpoint)
|
||||
future<> migration_manager::submit_migration_task(const gms::inet_address& endpoint, bool can_ignore_down_node)
|
||||
{
|
||||
return service::migration_task::run_may_throw(endpoint);
|
||||
return service::migration_task::run_may_throw(endpoint, can_ignore_down_node);
|
||||
}
|
||||
|
||||
future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_addr id)
|
||||
@@ -1132,7 +1132,8 @@ future<> migration_manager::sync_schema(const database& db, const std::vector<gm
|
||||
}).then([this, &schema_map] {
|
||||
return parallel_for_each(schema_map, [this] (auto& x) {
|
||||
mlogger.debug("Pulling schema {} from {}", x.first, x.second.front());
|
||||
return submit_migration_task(x.second.front());
|
||||
bool can_ignore_down_node = false;
|
||||
return submit_migration_task(x.second.front(), can_ignore_down_node);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -82,7 +82,7 @@ public:
|
||||
|
||||
future<> maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint);
|
||||
|
||||
future<> submit_migration_task(const gms::inet_address& endpoint);
|
||||
future<> submit_migration_task(const gms::inet_address& endpoint, bool can_ignore_down_node = true);
|
||||
|
||||
// Makes sure that this node knows about all schema changes known by "nodes" that were made prior to this call.
|
||||
future<> sync_schema(const database& db, const std::vector<gms::inet_address>& nodes);
|
||||
|
||||
@@ -51,11 +51,12 @@ namespace service {
|
||||
|
||||
static logging::logger mlogger("migration_task");
|
||||
|
||||
future<> migration_task::run_may_throw(const gms::inet_address& endpoint)
|
||||
future<> migration_task::run_may_throw(const gms::inet_address& endpoint, bool can_ignore_down_node)
|
||||
{
|
||||
if (!gms::get_local_gossiper().is_alive(endpoint)) {
|
||||
mlogger.warn("Can't send migration request: node {} is down.", endpoint);
|
||||
return make_ready_future<>();
|
||||
auto msg = format("Can't send migration request: node {} is down.", endpoint);
|
||||
mlogger.warn("{}", msg);
|
||||
return can_ignore_down_node ? make_ready_future<>() : make_exception_future<>(std::runtime_error(msg));
|
||||
}
|
||||
netw::messaging_service::msg_addr id{endpoint, 0};
|
||||
return service::get_local_migration_manager().merge_schema_from(id).handle_exception([](std::exception_ptr e) {
|
||||
|
||||
@@ -47,7 +47,7 @@ namespace service {
|
||||
|
||||
class migration_task {
|
||||
public:
|
||||
static future<> run_may_throw(const gms::inet_address& endpoint);
|
||||
static future<> run_may_throw(const gms::inet_address& endpoint, bool can_ignore_down_node);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -218,7 +218,7 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const c
|
||||
auto& cs = cf.get_compaction_strategy();
|
||||
|
||||
// Filter out sstables that are being compacted.
|
||||
for (auto& sst : cf.candidates_for_compaction()) {
|
||||
for (auto& sst : cf.non_staging_sstables()) {
|
||||
if (_compacting_sstables.count(sst)) {
|
||||
continue;
|
||||
}
|
||||
@@ -680,7 +680,7 @@ future<> compaction_manager::perform_cleanup(database& db, column_family* cf) {
|
||||
auto& rs = db.find_keyspace(schema->ks_name()).get_replication_strategy();
|
||||
auto sorted_owned_ranges = rs.get_ranges_in_thread(utils::fb_utilities::get_broadcast_address());
|
||||
auto sstables = std::vector<sstables::shared_sstable>{};
|
||||
const auto candidates = cf->candidates_for_compaction();
|
||||
const auto candidates = get_candidates(*cf);
|
||||
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) {
|
||||
seastar::thread::maybe_yield();
|
||||
return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema);
|
||||
@@ -703,7 +703,7 @@ future<> compaction_manager::perform_sstable_upgrade(column_family* cf, bool exc
|
||||
return cf->run_with_compaction_disabled([this, cf, &tables, exclude_current_version] {
|
||||
auto last_version = cf->get_sstables_manager().get_highest_supported_format();
|
||||
|
||||
for (auto& sst : cf->candidates_for_compaction()) {
|
||||
for (auto& sst : get_candidates(*cf)) {
|
||||
// if we are a "normal" upgrade, we only care about
|
||||
// tables with older versions, but potentially
|
||||
// we are to actually rewrite everything. (-a)
|
||||
@@ -728,8 +728,8 @@ future<> compaction_manager::perform_sstable_upgrade(column_family* cf, bool exc
|
||||
|
||||
// Submit a column family to be scrubbed and wait for its termination.
|
||||
future<> compaction_manager::perform_sstable_scrub(column_family* cf, bool skip_corrupted) {
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_scrub(skip_corrupted), [] (const table& cf) {
|
||||
return cf.candidates_for_compaction();
|
||||
return rewrite_sstables(cf, sstables::compaction_options::make_scrub(skip_corrupted), [this] (const table& cf) {
|
||||
return get_candidates(cf);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
16
table.cc
16
table.cc
@@ -1466,7 +1466,7 @@ std::vector<sstables::shared_sstable> table::select_sstables(const dht::partitio
|
||||
return _sstables->select(range);
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> table::candidates_for_compaction() const {
|
||||
std::vector<sstables::shared_sstable> table::non_staging_sstables() const {
|
||||
return boost::copy_range<std::vector<sstables::shared_sstable>>(*get_sstables()
|
||||
| boost::adaptors::filtered([this] (auto& sst) {
|
||||
return !_sstables_need_rewrite.count(sst->generation()) && !_sstables_staging.count(sst->generation());
|
||||
@@ -2034,6 +2034,11 @@ void table::set_schema(schema_ptr s) {
|
||||
}
|
||||
_schema = std::move(s);
|
||||
|
||||
for (auto&& v : _views) {
|
||||
v->view_info()->set_base_info(
|
||||
v->view_info()->make_base_dependent_view_info(*_schema));
|
||||
}
|
||||
|
||||
set_compaction_strategy(_schema->compaction_strategy());
|
||||
trigger_compaction();
|
||||
}
|
||||
@@ -2045,7 +2050,8 @@ static std::vector<view_ptr>::iterator find_view(std::vector<view_ptr>& views, c
|
||||
}
|
||||
|
||||
void table::add_or_update_view(view_ptr v) {
|
||||
v->view_info()->initialize_base_dependent_fields(*schema());
|
||||
v->view_info()->set_base_info(
|
||||
v->view_info()->make_base_dependent_view_info(*_schema));
|
||||
auto existing = find_view(_views, v);
|
||||
if (existing != _views.end()) {
|
||||
*existing = std::move(v);
|
||||
@@ -2098,7 +2104,7 @@ static size_t memory_usage_of(const std::vector<frozen_mutation_and_schema>& ms)
|
||||
* @return a future resolving to the mutations to apply to the views, which can be empty.
|
||||
*/
|
||||
future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
std::vector<db::view::view_and_base>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings) const {
|
||||
auto base_token = m.token();
|
||||
@@ -2206,7 +2212,7 @@ table::local_base_lock(
|
||||
* @return a future that resolves when the updates have been acknowledged by the view replicas
|
||||
*/
|
||||
future<> table::populate_views(
|
||||
std::vector<view_ptr> views,
|
||||
std::vector<db::view::view_and_base> views,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&& reader) {
|
||||
auto& schema = reader.schema();
|
||||
@@ -2527,7 +2533,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
}
|
||||
auto& base = schema();
|
||||
m.upgrade(base);
|
||||
auto views = affected_views(base, m);
|
||||
auto views = db::view::with_base_info_snapshot(affected_views(base, m));
|
||||
if (views.empty()) {
|
||||
return make_ready_future<row_locker::lock_holder>();
|
||||
}
|
||||
|
||||
@@ -3443,10 +3443,13 @@ SEASTAR_TEST_CASE(test_select_with_mixed_order_table) {
|
||||
}
|
||||
|
||||
uint64_t
|
||||
run_and_examine_cache_stat_change(cql_test_env& e, uint64_t cache_tracker::stats::*metric, std::function<void (cql_test_env& e)> func) {
|
||||
run_and_examine_cache_read_stats_change(cql_test_env& e, std::string_view cf_name, std::function<void (cql_test_env& e)> func) {
|
||||
auto read_stat = [&] {
|
||||
auto local_read_metric = [metric] (database& db) { return db.row_cache_tracker().get_stats().*metric; };
|
||||
return e.db().map_reduce0(local_read_metric, uint64_t(0), std::plus<uint64_t>()).get0();
|
||||
return e.db().map_reduce0([&cf_name] (const database& db) {
|
||||
auto& t = db.find_column_family("ks", sstring(cf_name));
|
||||
auto& stats = t.get_row_cache().stats();
|
||||
return stats.reads_with_misses.count() + stats.reads_with_no_misses.count();
|
||||
}, uint64_t(0), std::plus<uint64_t>()).get0();
|
||||
};
|
||||
auto before = read_stat();
|
||||
func(e);
|
||||
@@ -3457,11 +3460,11 @@ run_and_examine_cache_stat_change(cql_test_env& e, uint64_t cache_tracker::stats
|
||||
SEASTAR_TEST_CASE(test_cache_bypass) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE t (k int PRIMARY KEY)").get();
|
||||
auto with_cache = run_and_examine_cache_stat_change(e, &cache_tracker::stats::reads, [] (cql_test_env& e) {
|
||||
auto with_cache = run_and_examine_cache_read_stats_change(e, "t", [] (cql_test_env& e) {
|
||||
e.execute_cql("SELECT * FROM t").get();
|
||||
});
|
||||
BOOST_REQUIRE(with_cache >= smp::count); // scan may make multiple passes per shard
|
||||
auto without_cache = run_and_examine_cache_stat_change(e, &cache_tracker::stats::reads, [] (cql_test_env& e) {
|
||||
auto without_cache = run_and_examine_cache_read_stats_change(e, "t", [] (cql_test_env& e) {
|
||||
e.execute_cql("SELECT * FROM t BYPASS CACHE").get();
|
||||
});
|
||||
BOOST_REQUIRE_EQUAL(without_cache, 0);
|
||||
|
||||
@@ -1134,6 +1134,9 @@ SEASTAR_TEST_CASE(test_filtering) {
|
||||
{ int32_type->decompose(8), int32_type->decompose(3) },
|
||||
{ int32_type->decompose(9), int32_type->decompose(3) },
|
||||
});
|
||||
require_rows(e, "SELECT k FROM cf WHERE k=12 AND (m,n)>=(4,0) ALLOW FILTERING;", {
|
||||
{ int32_type->decompose(12), int32_type->decompose(4), int32_type->decompose(5)},
|
||||
});
|
||||
}
|
||||
|
||||
// test filtering on clustering keys
|
||||
|
||||
@@ -2847,3 +2847,488 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source)
|
||||
|
||||
run_mutation_source_tests(make_populate);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
std::deque<mutation_fragment> copy_fragments(const schema& s, const std::deque<mutation_fragment>& o) {
|
||||
std::deque<mutation_fragment> buf;
|
||||
for (const auto& mf : o) {
|
||||
buf.emplace_back(s, mf);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& prange,
|
||||
const query::partition_slice& slice,
|
||||
std::deque<mutation_fragment> first_buffer,
|
||||
position_in_partition_view last_fragment_position,
|
||||
std::deque<mutation_fragment> second_buffer,
|
||||
size_t max_buffer_size) {
|
||||
class factory {
|
||||
schema_ptr _schema;
|
||||
std::optional<std::deque<mutation_fragment>> _first_buffer;
|
||||
std::optional<std::deque<mutation_fragment>> _second_buffer;
|
||||
size_t _max_buffer_size;
|
||||
|
||||
private:
|
||||
std::optional<std::deque<mutation_fragment>> copy_buffer(const std::optional<std::deque<mutation_fragment>>& o) {
|
||||
if (!o) {
|
||||
return {};
|
||||
}
|
||||
return copy_fragments(*_schema, *o);
|
||||
}
|
||||
|
||||
public:
|
||||
factory(schema_ptr schema, std::deque<mutation_fragment> first_buffer, std::deque<mutation_fragment> second_buffer, size_t max_buffer_size)
|
||||
: _schema(std::move(schema)), _first_buffer(std::move(first_buffer)), _second_buffer(std::move(second_buffer)), _max_buffer_size(max_buffer_size) {
|
||||
}
|
||||
|
||||
factory(const factory& o)
|
||||
: _schema(o._schema)
|
||||
, _first_buffer(copy_buffer(o._first_buffer))
|
||||
, _second_buffer(copy_buffer(o._second_buffer)) {
|
||||
}
|
||||
factory(factory&& o) = default;
|
||||
|
||||
flat_mutation_reader operator()(
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
BOOST_REQUIRE(s == _schema);
|
||||
if (_first_buffer) {
|
||||
auto buf = *std::exchange(_first_buffer, {});
|
||||
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf));
|
||||
rd.set_max_buffer_size(_max_buffer_size);
|
||||
return rd;
|
||||
}
|
||||
if (_second_buffer) {
|
||||
auto buf = *std::exchange(_second_buffer, {});
|
||||
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf));
|
||||
rd.set_max_buffer_size(_max_buffer_size);
|
||||
return rd;
|
||||
}
|
||||
return make_empty_flat_reader(_schema);
|
||||
}
|
||||
};
|
||||
auto ms = mutation_source(factory(schema, std::move(first_buffer), std::move(second_buffer), max_buffer_size));
|
||||
|
||||
auto [rd, handle] = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
schema,
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
seastar::default_priority_class(),
|
||||
nullptr,
|
||||
mutation_reader::forwarding::yes);
|
||||
|
||||
rd.set_max_buffer_size(max_buffer_size);
|
||||
|
||||
rd.fill_buffer(db::no_timeout).get0();
|
||||
|
||||
const auto eq_cmp = position_in_partition::equal_compare(*schema);
|
||||
BOOST_REQUIRE(rd.is_buffer_full());
|
||||
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position));
|
||||
BOOST_REQUIRE(!rd.is_end_of_stream());
|
||||
|
||||
rd.detach_buffer();
|
||||
|
||||
handle.pause();
|
||||
|
||||
while(semaphore.try_evict_one_inactive_read());
|
||||
|
||||
return std::move(rd);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{});
|
||||
simple_schema s;
|
||||
|
||||
const auto pkey = s.make_pkey();
|
||||
size_t max_buffer_size = 512;
|
||||
const int first_ck = 100;
|
||||
const int second_buffer_ck = first_ck + 100;
|
||||
|
||||
size_t mem_usage = 0;
|
||||
|
||||
std::deque<mutation_fragment> first_buffer;
|
||||
first_buffer.emplace_back(partition_start{pkey, {}});
|
||||
mem_usage = first_buffer.back().memory_usage(*s.schema());
|
||||
for (int i = 0; i < second_buffer_ck; ++i) {
|
||||
first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v"));
|
||||
mem_usage += first_buffer.back().memory_usage(*s.schema());
|
||||
}
|
||||
const auto last_fragment_position = position_in_partition(first_buffer.back().position());
|
||||
max_buffer_size = mem_usage;
|
||||
first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v"));
|
||||
|
||||
std::deque<mutation_fragment> second_buffer;
|
||||
second_buffer.emplace_back(partition_start{pkey, {}});
|
||||
mem_usage = second_buffer.back().memory_usage(*s.schema());
|
||||
second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10))));
|
||||
int ckey = second_buffer_ck;
|
||||
while (mem_usage <= max_buffer_size) {
|
||||
second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v"));
|
||||
mem_usage += second_buffer.back().memory_usage(*s.schema());
|
||||
}
|
||||
second_buffer.emplace_back(partition_end{});
|
||||
|
||||
auto rd = create_evictable_reader_and_evict_after_first_buffer(s.schema(), semaphore, query::full_partition_range,
|
||||
s.schema()->full_slice(), std::move(first_buffer), last_fragment_position, std::move(second_buffer), max_buffer_size);
|
||||
|
||||
rd.fill_buffer(db::no_timeout).get();
|
||||
|
||||
const auto tri_cmp = position_in_partition::tri_compare(*s.schema());
|
||||
|
||||
BOOST_REQUIRE(tri_cmp(last_fragment_position, rd.peek_buffer().position()) < 0);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
void check_evictable_reader_validation_is_triggered(
|
||||
std::string_view test_name,
|
||||
std::string_view error_prefix, // empty str if no exception is expected
|
||||
schema_ptr schema,
|
||||
reader_concurrency_semaphore& semaphore,
|
||||
const dht::partition_range& prange,
|
||||
const query::partition_slice& slice,
|
||||
std::deque<mutation_fragment> first_buffer,
|
||||
position_in_partition_view last_fragment_position,
|
||||
std::deque<mutation_fragment> second_buffer,
|
||||
size_t max_buffer_size) {
|
||||
|
||||
testlog.info("check_evictable_reader_validation_is_triggered(): checking {} test case: {}", error_prefix.empty() ? "positive" : "negative", test_name);
|
||||
|
||||
auto rd = create_evictable_reader_and_evict_after_first_buffer(std::move(schema), semaphore, prange, slice, std::move(first_buffer),
|
||||
last_fragment_position, std::move(second_buffer), max_buffer_size);
|
||||
|
||||
const bool fail = !error_prefix.empty();
|
||||
|
||||
try {
|
||||
rd.fill_buffer(db::no_timeout).get0();
|
||||
} catch (std::runtime_error& e) {
|
||||
if (fail) {
|
||||
if (error_prefix == std::string_view(e.what(), error_prefix.size())) {
|
||||
testlog.trace("Expected exception caught: {}", std::current_exception());
|
||||
return;
|
||||
} else {
|
||||
BOOST_FAIL(fmt::format("Exception with unexpected message caught: {}", std::current_exception()));
|
||||
}
|
||||
} else {
|
||||
BOOST_FAIL(fmt::format("Unexpected exception caught: {}", std::current_exception()));
|
||||
}
|
||||
}
|
||||
if (fail) {
|
||||
BOOST_FAIL(fmt::format("Expected exception not thrown"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
set_abort_on_internal_error(false);
|
||||
auto reset_on_internal_abort = defer([] {
|
||||
set_abort_on_internal_error(true);
|
||||
});
|
||||
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{});
|
||||
simple_schema s;
|
||||
|
||||
auto pkeys = s.make_pkeys(4);
|
||||
boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
|
||||
|
||||
size_t max_buffer_size = 512;
|
||||
const int first_ck = 100;
|
||||
const int second_buffer_ck = first_ck + 100;
|
||||
const int last_ck = second_buffer_ck + 100;
|
||||
|
||||
static const char partition_error_prefix[] = "maybe_validate_partition_start(): validation failed";
|
||||
static const char position_in_partition_error_prefix[] = "validate_position_in_partition(): validation failed";
|
||||
static const char trim_range_tombstones_error_prefix[] = "maybe_trim_range_tombstone(): validation failed";
|
||||
|
||||
const auto prange = dht::partition_range::make(
|
||||
dht::partition_range::bound(pkeys[1], true),
|
||||
dht::partition_range::bound(pkeys[2], true));
|
||||
|
||||
const auto ckrange = query::clustering_range::make(
|
||||
query::clustering_range::bound(s.make_ckey(first_ck), true),
|
||||
query::clustering_range::bound(s.make_ckey(last_ck), true));
|
||||
|
||||
const auto slice = partition_slice_builder(*s.schema()).with_range(ckrange).build();
|
||||
|
||||
std::deque<mutation_fragment> first_buffer;
|
||||
first_buffer.emplace_back(partition_start{pkeys[1], {}});
|
||||
size_t mem_usage = first_buffer.back().memory_usage(*s.schema());
|
||||
for (int i = 0; i < second_buffer_ck; ++i) {
|
||||
first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v"));
|
||||
mem_usage += first_buffer.back().memory_usage(*s.schema());
|
||||
}
|
||||
max_buffer_size = mem_usage;
|
||||
auto last_fragment_position = position_in_partition(first_buffer.back().position());
|
||||
first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v"));
|
||||
|
||||
auto make_second_buffer = [&s, &max_buffer_size, second_buffer_ck] (dht::decorated_key pkey, std::optional<int> first_ckey = {},
|
||||
bool inject_range_tombstone = false) mutable {
|
||||
auto ckey = first_ckey ? *first_ckey : second_buffer_ck;
|
||||
std::deque<mutation_fragment> second_buffer;
|
||||
second_buffer.emplace_back(partition_start{std::move(pkey), {}});
|
||||
size_t mem_usage = second_buffer.back().memory_usage(*s.schema());
|
||||
if (inject_range_tombstone) {
|
||||
second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(last_ck))));
|
||||
}
|
||||
while (mem_usage <= max_buffer_size) {
|
||||
second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v"));
|
||||
mem_usage += second_buffer.back().memory_usage(*s.schema());
|
||||
}
|
||||
second_buffer.emplace_back(partition_end{});
|
||||
return second_buffer;
|
||||
};
|
||||
|
||||
//
|
||||
// Continuing the same partition
|
||||
//
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey < _last_pkey; pkey ∉ prange",
|
||||
partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[0]),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey",
|
||||
"",
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1]),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; position_in_partition ∉ ckrange (<)",
|
||||
position_in_partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1], first_ck - 10),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; position_in_partition ∉ ckrange (<); start with trimmable range-tombstone",
|
||||
position_in_partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1], first_ck - 10, true),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition",
|
||||
position_in_partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1], second_buffer_ck - 2),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition; start with trimmable range-tombstone",
|
||||
position_in_partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1], second_buffer_ck - 2, true),
|
||||
max_buffer_size);
|
||||
|
||||
{
|
||||
auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck);
|
||||
second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck - 10)));
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; end(range_tombstone) < _next_position_in_partition",
|
||||
trim_range_tombstones_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
std::move(second_buffer),
|
||||
max_buffer_size);
|
||||
}
|
||||
|
||||
{
|
||||
auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck);
|
||||
second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10)));
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; end(range_tombstone) > _next_position_in_partition",
|
||||
"",
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
std::move(second_buffer),
|
||||
max_buffer_size);
|
||||
}
|
||||
|
||||
{
|
||||
auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck);
|
||||
second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_starting_with(s.make_ckey(last_ck + 10)));
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; start(range_tombstone) ∉ ckrange (>)",
|
||||
position_in_partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
std::move(second_buffer),
|
||||
max_buffer_size);
|
||||
}
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; position_in_partition ∈ ckrange",
|
||||
"",
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1], second_buffer_ck),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey; position_in_partition ∉ ckrange (>)",
|
||||
position_in_partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1], last_ck + 10),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey > _last_pkey; pkey ∈ pkrange",
|
||||
partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[2]),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey > _last_pkey; pkey ∉ pkrange",
|
||||
partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[3]),
|
||||
max_buffer_size);
|
||||
|
||||
//
|
||||
// Continuing from next partition
|
||||
//
|
||||
|
||||
first_buffer.clear();
|
||||
|
||||
first_buffer.emplace_back(partition_start{pkeys[1], {}});
|
||||
mem_usage = first_buffer.back().memory_usage(*s.schema());
|
||||
for (int i = 0; i < second_buffer_ck; ++i) {
|
||||
first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v"));
|
||||
mem_usage += first_buffer.back().memory_usage(*s.schema());
|
||||
}
|
||||
first_buffer.emplace_back(partition_end{});
|
||||
mem_usage += first_buffer.back().memory_usage(*s.schema());
|
||||
last_fragment_position = position_in_partition(first_buffer.back().position());
|
||||
max_buffer_size = mem_usage;
|
||||
first_buffer.emplace_back(partition_start{pkeys[2], {}});
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey < _last_pkey; pkey ∉ pkrange",
|
||||
partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[0]),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey == _last_pkey",
|
||||
partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[1]),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey > _last_pkey; pkey ∈ pkrange",
|
||||
"",
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[2]),
|
||||
max_buffer_size);
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey > _last_pkey; pkey ∉ pkrange",
|
||||
partition_error_prefix,
|
||||
s.schema(),
|
||||
semaphore,
|
||||
prange,
|
||||
slice,
|
||||
copy_fragments(*s.schema(), first_buffer),
|
||||
last_fragment_position,
|
||||
make_second_buffer(pkeys[3]),
|
||||
max_buffer_size);
|
||||
}
|
||||
|
||||
@@ -4733,8 +4733,8 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
|
||||
|
||||
table->add_sstable_and_update_cache(sst).get();
|
||||
|
||||
BOOST_REQUIRE(table->candidates_for_compaction().size() == 1);
|
||||
BOOST_REQUIRE(table->candidates_for_compaction().front() == sst);
|
||||
BOOST_REQUIRE(table->non_staging_sstables().size() == 1);
|
||||
BOOST_REQUIRE(table->non_staging_sstables().front() == sst);
|
||||
|
||||
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
|
||||
auto r = assert_that(sst->as_mutation_source().make_reader(schema));
|
||||
@@ -4755,7 +4755,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
|
||||
// We expect the scrub with skip_corrupted=false to stop on the first invalid fragment.
|
||||
compaction_manager.perform_sstable_scrub(table.get(), false).get();
|
||||
|
||||
BOOST_REQUIRE(table->candidates_for_compaction().size() == 1);
|
||||
BOOST_REQUIRE(table->non_staging_sstables().size() == 1);
|
||||
verify_fragments(sst, corrupt_fragments);
|
||||
|
||||
testlog.info("Scrub with --skip-corrupted=true");
|
||||
@@ -4763,9 +4763,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
|
||||
// We expect the scrub with skip_corrupted=true to get rid of all invalid data.
|
||||
compaction_manager.perform_sstable_scrub(table.get(), true).get();
|
||||
|
||||
BOOST_REQUIRE(table->candidates_for_compaction().size() == 1);
|
||||
BOOST_REQUIRE(table->candidates_for_compaction().front() != sst);
|
||||
verify_fragments(table->candidates_for_compaction().front(), scrubbed_fragments);
|
||||
BOOST_REQUIRE(table->non_staging_sstables().size() == 1);
|
||||
BOOST_REQUIRE(table->non_staging_sstables().front() != sst);
|
||||
verify_fragments(table->non_staging_sstables().front(), scrubbed_fragments);
|
||||
});
|
||||
}, test_cfg);
|
||||
}
|
||||
|
||||
55
test/boost/stall_free_test.cc
Normal file
55
test/boost/stall_free_test.cc
Normal file
@@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "utils/stall_free.hh"
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_merge1) {
|
||||
std::list<int> l1{1, 2, 5, 8};
|
||||
std::list<int> l2{3};
|
||||
std::list<int> expected{1,2,3,5,8};
|
||||
utils::merge_to_gently(l1, l2, std::less<int>());
|
||||
BOOST_CHECK(l1 == expected);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_merge2) {
|
||||
std::list<int> l1{1};
|
||||
std::list<int> l2{3, 5, 6};
|
||||
std::list<int> expected{1,3,5,6};
|
||||
utils::merge_to_gently(l1, l2, std::less<int>());
|
||||
BOOST_CHECK(l1 == expected);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_merge3) {
|
||||
std::list<int> l1{};
|
||||
std::list<int> l2{3, 5, 6};
|
||||
std::list<int> expected{3,5,6};
|
||||
utils::merge_to_gently(l1, l2, std::less<int>());
|
||||
BOOST_CHECK(l1 == expected);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_merge4) {
|
||||
std::list<int> l1{1};
|
||||
std::list<int> l2{};
|
||||
std::list<int> expected{1};
|
||||
utils::merge_to_gently(l1, l2, std::less<int>());
|
||||
BOOST_CHECK(l1 == expected);
|
||||
}
|
||||
@@ -118,6 +118,25 @@ SEASTAR_TEST_CASE(test_access_and_schema) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_column_dropped_from_base) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("create table cf (p int, c ascii, a int, v int, primary key (p, c));").get();
|
||||
e.execute_cql("create materialized view vcf as select p, c, v from cf "
|
||||
"where v is not null and p is not null and c is not null "
|
||||
"primary key (v, p, c)").get();
|
||||
e.execute_cql("alter table cf drop a;").get();
|
||||
e.execute_cql("insert into cf (p, c, v) values (0, 'foo', 1);").get();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select v from vcf").get0();
|
||||
assert_that(msg).is_rows()
|
||||
.with_size(1)
|
||||
.with_row({
|
||||
{int32_type->decompose(1)}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_updates) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("create table base (k int, v int, primary key (k));").get();
|
||||
|
||||
@@ -289,9 +289,9 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
|
||||
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
|
||||
|
||||
// query command is stored on a parameters map with a 'query' key
|
||||
auto query_str_it = record.parameters.find("query");
|
||||
const auto query_str_it = record.parameters.find("query");
|
||||
if (query_str_it == record.parameters.end()) {
|
||||
throw std::logic_error("No \"query\" parameter set for a session requesting a slow_query_log record");
|
||||
tlogger.trace("No \"query\" parameter set for a session requesting a slow_query_log record");
|
||||
}
|
||||
|
||||
// parameters map
|
||||
@@ -312,7 +312,9 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
|
||||
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
|
||||
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
|
||||
cql3::raw_value::make_value(timeuuid_type->decompose(start_time_id)),
|
||||
cql3::raw_value::make_value(utf8_type->decompose(query_str_it->second)),
|
||||
query_str_it != record.parameters.end()
|
||||
? cql3::raw_value::make_value(utf8_type->decompose(query_str_it->second))
|
||||
: cql3::raw_value::make_null(),
|
||||
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
|
||||
cql3::raw_value::make_value(make_map_value(my_map_type, map_type_impl::native_type(std::move(parameters_values_vector))).serialize()),
|
||||
cql3::raw_value::make_value(inet_addr_type->decompose(record.client.addr())),
|
||||
|
||||
69
utils/stall_free.hh
Normal file
69
utils/stall_free.hh
Normal file
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <algorithm>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
|
||||
namespace utils {
|
||||
|
||||
|
||||
// Similar to std::merge but it does not stall. Must run inside a seastar
|
||||
// thread. It merges items from list2 into list1. Items from list2 can only be copied.
|
||||
template<class T, class Compare>
|
||||
void merge_to_gently(std::list<T>& list1, const std::list<T>& list2, Compare comp) {
|
||||
auto first1 = list1.begin();
|
||||
auto first2 = list2.begin();
|
||||
auto last1 = list1.end();
|
||||
auto last2 = list2.end();
|
||||
while (first2 != last2) {
|
||||
seastar::thread::maybe_yield();
|
||||
if (first1 == last1) {
|
||||
// Copy remaining items of list2 into list1
|
||||
std::copy_if(first2, last2, std::back_inserter(list1), [] (const auto&) { return true; });
|
||||
return;
|
||||
}
|
||||
if (comp(*first2, *first1)) {
|
||||
first1 = list1.insert(first1, *first2);
|
||||
++first2;
|
||||
} else {
|
||||
++first1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<class T>
|
||||
seastar::future<> clear_gently(std::list<T>& list) {
|
||||
return repeat([&list] () mutable {
|
||||
if (list.empty()) {
|
||||
return seastar::stop_iteration::yes;
|
||||
}
|
||||
list.pop_back();
|
||||
return seastar::stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
23
view_info.hh
23
view_info.hh
@@ -24,6 +24,7 @@
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "query-request.hh"
|
||||
#include "schema_fwd.hh"
|
||||
#include "db/view/view.hh"
|
||||
|
||||
namespace cql3::statements {
|
||||
class select_statement;
|
||||
@@ -35,9 +36,7 @@ class view_info final {
|
||||
// The following fields are used to select base table rows.
|
||||
mutable shared_ptr<cql3::statements::select_statement> _select_statement;
|
||||
mutable std::optional<query::partition_slice> _partition_slice;
|
||||
// Id of a regular base table column included in the view's PK, if any.
|
||||
// Scylla views only allow one such column, alternator can have up to two.
|
||||
mutable std::vector<column_id> _base_non_pk_columns_in_view_pk;
|
||||
db::view::base_info_ptr _base_info;
|
||||
public:
|
||||
view_info(const schema& schema, const raw_view_info& raw_view_info);
|
||||
|
||||
@@ -65,8 +64,22 @@ public:
|
||||
const query::partition_slice& partition_slice() const;
|
||||
const column_definition* view_column(const schema& base, column_id base_id) const;
|
||||
const column_definition* view_column(const column_definition& base_def) const;
|
||||
const std::vector<column_id>& base_non_pk_columns_in_view_pk() const;
|
||||
void initialize_base_dependent_fields(const schema& base);
|
||||
bool has_base_non_pk_columns_in_view_pk() const;
|
||||
|
||||
/// Returns a pointer to the base_dependent_view_info which matches the current
|
||||
/// schema of the base table.
|
||||
///
|
||||
/// base_dependent_view_info lives separately from the view schema.
|
||||
/// It can change without the view schema changing its value.
|
||||
/// This pointer is updated on base table schema changes as long as this view_info
|
||||
/// corresponds to the current schema of the view. After that the pointer stops tracking
|
||||
/// the base table schema.
|
||||
///
|
||||
/// The snapshot of both the view schema and base_dependent_view_info is represented
|
||||
/// by view_and_base. See with_base_info_snapshot().
|
||||
const db::view::base_info_ptr& base_info() const { return _base_info; }
|
||||
void set_base_info(db::view::base_info_ptr);
|
||||
db::view::base_info_ptr make_base_dependent_view_info(const schema& base_schema) const;
|
||||
|
||||
friend bool operator==(const view_info& x, const view_info& y) {
|
||||
return x._raw == y._raw;
|
||||
|
||||
Reference in New Issue
Block a user