Compare commits

...

8 Commits

Author SHA1 Message Date
Yaron Kaikov
4ae9a56466 release: prepare for 4.0.11 2020-10-26 18:12:47 +02:00
Avi Kivity
0374c1d040 Update seastar submodule
* seastar 065a40b34a...748428930a (1):
  > append_challenged_posix_file_impl: allow destructing file with no queued work

Fixes #7285.
2020-10-19 15:06:24 +03:00
Botond Dénes
9cb0fe3b33 reader_permit: reader_resources: make true RAII class
Currently in all cases we first deduct the to-be-consumed resources,
then construct the `reader_resources` class to protect it (release it on
destruction). This is error prone as it relies on no exception being
thrown while constructing the `reader_resources`. Albeit the
`reader_resources` constructor is `noexcept` right now this might change
in the future and as the call sites relying on this are disconnected
from the declaration, the one modifying them might not notice.
To make this safe going forward, make the `reader_resources` a true RAII
class, consuming the units in its constructor and releasing them in its
destructor.

Refs: #7256

Tests: unit(dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20200922150625.1253798-1-bdenes@scylladb.com>
(cherry picked from commit a0107ba1c6)
Message-Id: <20200924081408.236353-1-bdenes@scylladb.com>
2020-10-19 15:05:13 +03:00
Takuya ASADA
a813ff4da2 install.sh: set LC_ALL=en_US.UTF-8 on python3 thunk
scylla-python3 causes segfault when non-default locale specified.
As workaround for this, we need to set LC_ALL=en_US.UTF_8 on python3 thunk.

Fixes #7408

Closes #7414

(cherry picked from commit ff129ee030)
2020-10-18 15:03:04 +03:00
Avi Kivity
d5936147f4 Merge "materialized views: Fix undefined behavior on base table schema changes" from Tomasz
"
The view_info object, which is attached to the schema object of the
view, contains a data structure called
"base_non_pk_columns_in_view_pk". This data structure contains column
ids of the base table so is valid only for a particular version of the
base table schema. This data structure is used by materialized view
code to interpret mutations of the base table, those coming from base
table writes, or reads of the base table done as part of view updates
or view building.

The base table schema version of that data structure must match the
schema version of the mutation fragments, otherwise we hit undefined
behavior. This may include aborts, exceptions, segfaults, or data
corruption (e.g. writes landing in the wrong column in the view).

Before this patch, we could get schema version mismatch here after the
base table was altered. That's because the view schema did not change
when the base table was altered.

Another problem was that view building was using the current table's schema
to interpret the fragments and invoke view building. That's incorrect for two
reasons. First, fragments generated by a reader must be accessed only using
the reader's schema. Second, base_non_pk_columns_in_view_pk of the recorded
view ptrs may not longer match the current base table schema, which is used
to generate the view updates.

Part of the fix is to extract base_non_pk_columns_in_view_pk into a
third entity called base_dependent_view_info, which changes both on
base table schema changes and view schema changes.

It is managed by a shared pointer so that we can take immutable
snapshots of it, just like with schema_ptr. When starting the view
update, the base table schema_ptr and the corresponding
base_dependent_view_info have to match. So we must obtain them
atomically, and base_dependent_view_info cannot change during update.

Also, whenever the base table schema changes, we must update
base_dependent_view_infos of all attached views (atomically) so that
it matches the base table schema.

Fixes #7061.

Tests:

  - unit (dev)
  - [v1] manual (reproduced using scylla binary and cqlsh)
"

* tag 'mv-schema-mismatch-fix-v2' of github.com:tgrabiec/scylla:
  db: view: Refactor view_info::initialize_base_dependent_fields()
  tests: mv: Test dropping columns from base table
  db: view: Fix incorrect schema access during view building after base table schema changes
  schema: Call on_internal_error() when out of range id is passed to column_at()
  db: views: Fix undefined behavior on base table schema changes
  db: views: Introduce has_base_non_pk_columns_in_view_pk()

(cherry picked from commit 3daa49f098)
2020-10-06 17:12:28 +03:00
Juliusz Stasiewicz
a3d3b4e185 tracing: Fix error on slow batches
`trace_keyspace_helper::make_slow_query_mutation_data` expected a
"query" key in its parameters, which does not appear in case of
e.g. batches of prepared statements. This is example of failing
`record.parameters`:
```
...{"query[0]" : "INSERT INTO ks.tbl (pk, i) values (?, ?);"},
{"query[1]" : "INSERT INTO ks.tbl (pk, i) values (?, ?);"}...
```

In such case Scylla recorded no trace and said:
```
ERROR 2020-09-28 10:09:36,696 [shard 3] trace_keyspace_helper - No
"query" parameter set for a session requesting a slow_query_log record
```

Fix here is to leave query empty if not found. The users can still
retrieve the query contents from existing info.

Fixes #5843

Closes #7293

(cherry picked from commit 0afa738a8f)
2020-10-04 18:05:00 +03:00
Tomasz Grabiec
4ca2576c98 Merge "evictable_reader: validate buffer on reader recreation" from Botond
This series backports the evictable reader validation patchset (merged
as 97c99ea9f to master) to 4.1.

I only had to do changes to the tests.

Tests: unit(dev), some exception safety tests are failing with or
without my patchset

Fixes: #7208

* https://github.com/denesb/scylla.git denesb/evictable-reader-validate-buffer/backport-4.1:
  mutation_reader_test: add unit test for evictable reader self-validation
  evictable_reader: validate buffer after recreation the underlying
  evictable_reader: update_next_position(): only use peek'd position on partition boundary
  mutation_reader_test: add unit test for evictable reader range tombstone trimming
  evictable_reader: trim range tombstones to the read clustering range
  position_in_partition_view: add position_in_partition_view before_key() overload
  flat_mutation_reader: add buffer() accessor

(cherry picked from commit 7f3ffbc1c8)
2020-10-02 11:52:57 +02:00
Tomasz Grabiec
e99a0c7b89 schema: Fix race in schema version recalculation leading to stale schema version in gossip
Migration manager installs several feature change listeners:

    if (this_shard_id() == 0) {
        _feature_listeners.push_back(_feat.cluster_supports_view_virtual_columns().when_enabled(update_schema));
        _feature_listeners.push_back(_feat.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
        _feature_listeners.push_back(_feat.cluster_supports_cdc().when_enabled(update_schema));
        _feature_listeners.push_back(_feat.cluster_supports_per_table_partitioners().when_enabled(update_schema));
    }

They will call update_schema_version_and_announce() when features are enabled, which does this:

    return update_schema_version(proxy, features).then([] (utils::UUID uuid) {
        return announce_schema_version(uuid);
    });

So it first updates the schema version and then publishes it via
gossip in announce_schema_version(). It is possible that the
announce_schema_version() part of the first schema change will be
deferred and will execute after the other four calls to
update_schema_version_and_announce(). It will install the old schema
version in gossip instead of the more recent one.

The fix is to serialize schema digest calculation and publishing.

Fixes #7200

(cherry picked from commit 1a57d641d1)
2020-10-01 18:18:53 +02:00
22 changed files with 802 additions and 56 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=4.0.10
VERSION=4.0.11
if test -f version
then

View File

@@ -225,7 +225,7 @@ void alter_table_statement::add_column(const schema& schema, const table& cf, sc
schema_builder builder(view);
if (view->view_info()->include_all_columns()) {
builder.with_column(column_name.name(), type);
} else if (view->view_info()->base_non_pk_columns_in_view_pk().empty()) {
} else if (!view->view_info()->has_base_non_pk_columns_in_view_pk()) {
db::view::create_virtual_column(builder, column_name.name(), type);
}
view_updates.push_back(view_ptr(builder.build()));

View File

@@ -55,6 +55,7 @@
#include <limits>
#include <cstddef>
#include "schema_fwd.hh"
#include "db/view/view.hh"
#include "db/schema_features.hh"
#include "gms/feature.hh"
#include "timestamp.hh"
@@ -981,8 +982,9 @@ public:
return *_config.sstables_manager;
}
// Reader's schema must be the same as the base schema of each of the views.
future<> populate_views(
std::vector<view_ptr>,
std::vector<db::view::view_and_base>,
dht::token base_token,
flat_mutation_reader&&);
@@ -998,7 +1000,7 @@ private:
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source, const io_priority_class& io_priority) const;
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
future<> generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
std::vector<db::view::view_and_base>&& views,
mutation&& m,
flat_mutation_reader_opt existings) const;

View File

@@ -822,6 +822,14 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_s
});
}
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
return merge_lock().then([&proxy, &feat] {
return update_schema_version_and_announce(proxy, feat.cluster_schema_features());
}).finally([] {
return merge_unlock();
});
}
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
{
return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] () mutable {

View File

@@ -170,6 +170,13 @@ future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_s
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush);
// Recalculates the local schema version and publishes it in gossip.
//
// It is safe to call concurrently with recalculate_schema_version() and merge_schema() in which case it
// is guaranteed that the schema version we end up with after all calls will reflect the most recent state
// of feature_service and schema tables.
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat);
future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after);
std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);

View File

@@ -130,17 +130,26 @@ const column_definition* view_info::view_column(const column_definition& base_de
return _schema.get_column_definition(base_def.name());
}
const std::vector<column_id>& view_info::base_non_pk_columns_in_view_pk() const {
return _base_non_pk_columns_in_view_pk;
void view_info::set_base_info(db::view::base_info_ptr base_info) {
_base_info = std::move(base_info);
}
void view_info::initialize_base_dependent_fields(const schema& base) {
db::view::base_info_ptr view_info::make_base_dependent_view_info(const schema& base) const {
std::vector<column_id> base_non_pk_columns_in_view_pk;
for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) {
auto* base_col = base.get_column_definition(view_col.name());
if (base_col && !base_col->is_primary_key()) {
_base_non_pk_columns_in_view_pk.push_back(base_col->id);
base_non_pk_columns_in_view_pk.push_back(base_col->id);
}
}
return make_lw_shared<db::view::base_dependent_view_info>({
.base_schema = base.shared_from_this(),
.base_non_pk_columns_in_view_pk = std::move(base_non_pk_columns_in_view_pk)
});
}
bool view_info::has_base_non_pk_columns_in_view_pk() const {
return !_base_info->base_non_pk_columns_in_view_pk.empty();
}
namespace db {
@@ -188,11 +197,11 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de
}
static bool update_requires_read_before_write(const schema& base,
const std::vector<view_ptr>& views,
const std::vector<view_and_base>& views,
const dht::decorated_key& key,
const rows_entry& update) {
for (auto&& v : views) {
view_info& vf = *v->view_info();
view_info& vf = *v.view->view_info();
if (may_be_affected_by(base, vf, key, update)) {
return true;
}
@@ -239,12 +248,14 @@ class view_updates final {
view_ptr _view;
const view_info& _view_info;
schema_ptr _base;
base_info_ptr _base_info;
std::unordered_map<partition_key, mutation_partition, partition_key::hashing, partition_key::equality> _updates;
public:
explicit view_updates(view_ptr view, schema_ptr base)
: _view(std::move(view))
explicit view_updates(view_and_base vab)
: _view(std::move(vab.view))
, _view_info(*_view->view_info())
, _base(std::move(base))
, _base(vab.base->base_schema)
, _base_info(vab.base)
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) {
}
@@ -306,7 +317,7 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
// they share liveness information. It's true especially in the only case currently allowed by CQL,
// which assumes there's up to one non-pk column in the view key. It's also true in alternator,
// which does not carry TTL information.
const auto& col_ids = _view_info.base_non_pk_columns_in_view_pk();
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk;
if (!col_ids.empty()) {
auto& def = _base->regular_column_at(col_ids[0]);
// Note: multi-cell columns can't be part of the primary key.
@@ -537,7 +548,7 @@ void view_updates::delete_old_entry(const partition_key& base_key, const cluster
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
auto& r = get_view_row(base_key, existing);
const auto& col_ids = _view_info.base_non_pk_columns_in_view_pk();
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk;
if (!col_ids.empty()) {
// We delete the old row using a shadowable row tombstone, making sure that
// the tombstone deletes everything in the row (or it might still show up).
@@ -678,7 +689,7 @@ void view_updates::generate_update(
return;
}
const auto& col_ids = _view_info.base_non_pk_columns_in_view_pk();
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk;
if (col_ids.empty()) {
// The view key is necessarily the same pre and post update.
if (existing && existing->is_live(*_base)) {
@@ -932,11 +943,16 @@ future<stop_iteration> view_update_builder::on_results() {
future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
const schema_ptr& base,
std::vector<view_ptr>&& views_to_update,
std::vector<view_and_base>&& views_to_update,
flat_mutation_reader&& updates,
flat_mutation_reader_opt&& existings) {
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (auto&& v) {
return view_updates(std::move(v), base);
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (view_and_base v) {
if (base->version() != v.base->base_schema->version()) {
on_internal_error(vlogger, format("Schema version used for view updates ({}) does not match the current"
" base schema version of the view ({}) for view {}.{} of {}.{}",
base->version(), v.base->base_schema->version(), v.view->ks_name(), v.view->cf_name(), base->ks_name(), base->cf_name()));
}
return view_updates(std::move(v));
}));
auto builder = std::make_unique<view_update_builder>(base, std::move(vs), std::move(updates), std::move(existings));
auto f = builder->build();
@@ -946,18 +962,18 @@ future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
query::clustering_row_ranges calculate_affected_clustering_ranges(const schema& base,
const dht::decorated_key& key,
const mutation_partition& mp,
const std::vector<view_ptr>& views) {
const std::vector<view_and_base>& views) {
std::vector<nonwrapping_range<clustering_key_prefix_view>> row_ranges;
std::vector<nonwrapping_range<clustering_key_prefix_view>> view_row_ranges;
clustering_key_prefix_view::tri_compare cmp(base);
if (mp.partition_tombstone() || !mp.row_tombstones().empty()) {
for (auto&& v : views) {
// FIXME: #2371
if (v->view_info()->select_statement().get_restrictions()->has_unrestricted_clustering_columns()) {
if (v.view->view_info()->select_statement().get_restrictions()->has_unrestricted_clustering_columns()) {
view_row_ranges.push_back(nonwrapping_range<clustering_key_prefix_view>::make_open_ended_both_sides());
break;
}
for (auto&& r : v->view_info()->partition_slice().default_row_ranges()) {
for (auto&& r : v.view->view_info()->partition_slice().default_row_ranges()) {
view_row_ranges.push_back(r.transform(std::mem_fn(&clustering_key_prefix::view)));
}
}
@@ -1717,7 +1733,7 @@ public:
return stop_iteration::yes;
}
_fragments_memory_usage += cr.memory_usage(*_step.base->schema());
_fragments_memory_usage += cr.memory_usage(*_step.reader.schema());
_fragments.push_back(std::move(cr));
if (_fragments_memory_usage > batch_memory_max) {
// Although we have not yet completed the batch of base rows that
@@ -1737,10 +1753,14 @@ public:
_builder._as.check();
if (!_fragments.empty()) {
_fragments.push_front(partition_start(_step.current_key, tombstone()));
auto base_schema = _step.base->schema();
auto views = with_base_info_snapshot(_views_to_build);
auto reader = make_flat_mutation_reader_from_fragments(_step.reader.schema(), std::move(_fragments));
reader.upgrade_schema(base_schema);
_step.base->populate_views(
_views_to_build,
std::move(views),
_step.current_token(),
make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get();
std::move(reader)).get();
_fragments.clear();
_fragments_memory_usage = 0;
}
@@ -1887,5 +1907,11 @@ future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_d
});
}
std::vector<db::view::view_and_base> with_base_info_snapshot(std::vector<view_ptr> vs) {
return boost::copy_range<std::vector<db::view::view_and_base>>(vs | boost::adaptors::transformed([] (const view_ptr& v) {
return db::view::view_and_base{v, v->view_info()->base_info()};
}));
}
} // namespace view
} // namespace db

View File

@@ -43,6 +43,27 @@ namespace db {
namespace view {
// Part of the view description which depends on the base schema version.
//
// This structure may change even though the view schema doesn't change, so
// it needs to live outside view_ptr.
struct base_dependent_view_info {
schema_ptr base_schema;
// Id of a regular base table column included in the view's PK, if any.
// Scylla views only allow one such column, alternator can have up to two.
std::vector<column_id> base_non_pk_columns_in_view_pk;
};
// Immutable snapshot of view's base-schema-dependent part.
using base_info_ptr = lw_shared_ptr<const base_dependent_view_info>;
// Snapshot of the view schema and its base-schema-dependent part.
struct view_and_base {
view_ptr view;
base_info_ptr base;
};
/**
* Whether the view filter considers the specified partition key.
*
@@ -92,7 +113,7 @@ bool clustering_prefix_matches(const schema& base, const partition_key& key, con
future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
const schema_ptr& base,
std::vector<view_ptr>&& views_to_update,
std::vector<view_and_base>&& views_to_update,
flat_mutation_reader&& updates,
flat_mutation_reader_opt&& existings);
@@ -100,7 +121,7 @@ query::clustering_row_ranges calculate_affected_clustering_ranges(
const schema& base,
const dht::decorated_key& key,
const mutation_partition& mp,
const std::vector<view_ptr>& views);
const std::vector<view_and_base>& views);
struct wait_for_all_updates_tag {};
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
@@ -128,6 +149,13 @@ future<> mutate_MV(
*/
void create_virtual_column(schema_builder& builder, const bytes& name, const data_type& type);
/**
* Converts a collection of view schema snapshots into a collection of
* view_and_base objects, which are snapshots of both the view schema
* and the base-schema-dependent part of view description.
*/
std::vector<view_and_base> with_base_info_snapshot(std::vector<view_ptr>);
}
}

View File

@@ -487,6 +487,9 @@ public:
size_t buffer_size() const {
return _impl->buffer_size();
}
const circular_buffer<mutation_fragment>& buffer() const {
return _impl->buffer();
}
// Detach the internal buffer of the reader.
// Roughly equivalent to depleting it by calling pop_mutation_fragment()
// until is_buffer_empty() returns true.

View File

@@ -126,6 +126,7 @@ relocate_python3() {
cp "$script" "$relocateddir"
cat > "$install"<<EOF
#!/usr/bin/env bash
export LC_ALL=en_US.UTF-8
x="\$(readlink -f "\$0")"
b="\$(basename "\$x")"
d="\$(dirname "\$x")"

View File

@@ -1721,7 +1721,7 @@ void row::apply_monotonically(const schema& s, column_kind kind, row&& other) {
// we erase the live cells according to the shadowable_tombstone rules.
static bool dead_marker_shadows_row(const schema& s, column_kind kind, const row_marker& marker) {
return s.is_view()
&& !s.view_info()->base_non_pk_columns_in_view_pk().empty()
&& s.view_info()->has_base_non_pk_columns_in_view_pk()
&& !marker.is_live()
&& kind == column_kind::regular_column; // not applicable to static rows
}

View File

@@ -30,6 +30,7 @@
#include "schema_registry.hh"
#include "mutation_compactor.hh"
logging::logger mrlog("mutation_reader");
static constexpr size_t merger_small_vector_size = 4;
@@ -1028,6 +1029,13 @@ private:
bool _reader_created = false;
bool _drop_partition_start = false;
bool _drop_static_row = false;
// Trim range tombstones on the start of the buffer to the start of the read
// range (_next_position_in_partition). Set after reader recreation.
// Also validate the first not-trimmed mutation fragment's position.
bool _trim_range_tombstones = false;
// Validate the partition key of the first emitted partition, set after the
// reader was recreated.
bool _validate_partition_key = false;
position_in_partition::tri_compare _tri_cmp;
std::optional<dht::decorated_key> _last_pkey;
@@ -1049,7 +1057,10 @@ private:
void adjust_partition_slice();
flat_mutation_reader recreate_reader();
flat_mutation_reader resume_or_create_reader();
void maybe_validate_partition_start(const circular_buffer<mutation_fragment>& buffer);
void validate_position_in_partition(position_in_partition_view pos) const;
bool should_drop_fragment(const mutation_fragment& mf);
bool maybe_trim_range_tombstone(mutation_fragment& mf) const;
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
future<> fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
@@ -1122,16 +1133,11 @@ void evictable_reader::update_next_position(flat_mutation_reader& reader) {
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
break;
case partition_region::clustered:
if (reader.is_buffer_empty()) {
_next_position_in_partition = position_in_partition::after_key(last_pos);
} else {
const auto& next_frag = reader.peek_buffer();
if (next_frag.is_end_of_partition()) {
if (!reader.is_buffer_empty() && reader.peek_buffer().is_end_of_partition()) {
push_mutation_fragment(reader.pop_mutation_fragment());
_next_position_in_partition = position_in_partition::for_partition_start();
} else {
_next_position_in_partition = position_in_partition(next_frag.position());
}
} else {
_next_position_in_partition = position_in_partition::after_key(last_pos);
}
break;
case partition_region::partition_end:
@@ -1156,6 +1162,9 @@ flat_mutation_reader evictable_reader::recreate_reader() {
const dht::partition_range* range = _pr;
const query::partition_slice* slice = &_ps;
_range_override.reset();
_slice_override.reset();
if (_last_pkey) {
bool partition_range_is_inclusive = true;
@@ -1192,6 +1201,9 @@ flat_mutation_reader evictable_reader::recreate_reader() {
range = &*_range_override;
}
_trim_range_tombstones = true;
_validate_partition_key = true;
return _ms.make_reader(
_schema,
no_reader_permit(),
@@ -1218,6 +1230,78 @@ flat_mutation_reader evictable_reader::resume_or_create_reader() {
return recreate_reader();
}
template <typename... Arg>
static void require(bool condition, const char* msg, const Arg&... arg) {
if (!condition) {
on_internal_error(mrlog, format(msg, arg...));
}
}
void evictable_reader::maybe_validate_partition_start(const circular_buffer<mutation_fragment>& buffer) {
if (!_validate_partition_key || buffer.empty()) {
return;
}
// If this is set we can assume the first fragment is a partition-start.
const auto& ps = buffer.front().as_partition_start();
const auto tri_cmp = dht::ring_position_comparator(*_schema);
// If we recreated the reader after fast-forwarding it we won't have
// _last_pkey set. In this case it is enough to check if the partition
// is in range.
if (_last_pkey) {
const auto cmp_res = tri_cmp(*_last_pkey, ps.key());
if (_drop_partition_start) { // should be the same partition
require(
cmp_res == 0,
"{}(): validation failed, expected partition with key equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
__FUNCTION__,
*_last_pkey,
ps.key());
} else { // should be a larger partition
require(
cmp_res < 0,
"{}(): validation failed, expected partition with key larger than _last_pkey {} due to _drop_partition_start being unset, but got {}",
__FUNCTION__,
*_last_pkey,
ps.key());
}
}
const auto& prange = _range_override ? *_range_override : *_pr;
require(
// TODO: somehow avoid this copy
prange.contains(ps.key(), tri_cmp),
"{}(): validation failed, expected partition with key that falls into current range {}, but got {}",
__FUNCTION__,
prange,
ps.key());
_validate_partition_key = false;
}
void evictable_reader::validate_position_in_partition(position_in_partition_view pos) const {
require(
_tri_cmp(_next_position_in_partition, pos) <= 0,
"{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}",
__FUNCTION__,
_next_position_in_partition,
pos);
if (_slice_override && pos.region() == partition_region::clustered) {
const auto ranges = _slice_override->row_ranges(*_schema, _last_pkey->key());
const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) {
// TODO: somehow avoid this copy
auto range = position_range(cr);
return range.contains(*_schema, pos);
});
require(
any_contains,
"{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}",
__FUNCTION__,
*_slice_override,
pos);
}
}
bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
if (_drop_partition_start && mf.is_partition_start()) {
_drop_partition_start = false;
@@ -1230,12 +1314,50 @@ bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
return false;
}
bool evictable_reader::maybe_trim_range_tombstone(mutation_fragment& mf) const {
// We either didn't read a partition yet (evicted after fast-forwarding) or
// didn't stop in a clustering region. We don't need to trim range
// tombstones in either case.
if (!_last_pkey || _next_position_in_partition.region() != partition_region::clustered) {
return false;
}
if (!mf.is_range_tombstone()) {
validate_position_in_partition(mf.position());
return false;
}
if (_tri_cmp(mf.position(), _next_position_in_partition) >= 0) {
validate_position_in_partition(mf.position());
return false; // rt in range, no need to trim
}
auto& rt = mf.as_mutable_range_tombstone();
require(
_tri_cmp(_next_position_in_partition, rt.end_position()) <= 0,
"{}(): validation failed, expected range tombstone with end pos larger than _next_position_in_partition {}, but got {}",
__FUNCTION__,
_next_position_in_partition,
rt.end_position());
rt.set_start(*_schema, position_in_partition_view::before_key(_next_position_in_partition));
return true;
}
future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
if (!_drop_partition_start && !_drop_static_row) {
return reader.fill_buffer(timeout);
auto fill_buf_fut = reader.fill_buffer(timeout);
if (_validate_partition_key) {
fill_buf_fut = fill_buf_fut.then([this, &reader] {
maybe_validate_partition_start(reader.buffer());
});
}
return fill_buf_fut;
}
return repeat([this, &reader, timeout] {
return reader.fill_buffer(timeout).then([this, &reader] {
maybe_validate_partition_start(reader.buffer());
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
reader.pop_mutation_fragment();
}
@@ -1249,6 +1371,11 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout
if (reader.is_buffer_empty()) {
return make_ready_future<>();
}
while (_trim_range_tombstones && !reader.is_buffer_empty()) {
auto mf = reader.pop_mutation_fragment();
_trim_range_tombstones = maybe_trim_range_tombstone(mf);
push_mutation_fragment(std::move(mf));
}
reader.move_buffer_content_to(*this);
auto stop = [this, &reader] {
// The only problematic fragment kind is the range tombstone.
@@ -1289,7 +1416,13 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout
if (reader.is_buffer_empty()) {
return do_fill_buffer(reader, timeout);
}
push_mutation_fragment(reader.pop_mutation_fragment());
if (_trim_range_tombstones) {
auto mf = reader.pop_mutation_fragment();
_trim_range_tombstones = maybe_trim_range_tombstone(mf);
push_mutation_fragment(std::move(mf));
} else {
push_mutation_fragment(reader.pop_mutation_fragment());
}
return make_ready_future<>();
});
}).then([this, &reader] {

View File

@@ -163,6 +163,11 @@ public:
return {partition_region::clustered, bound_weight::before_all_prefixed, &ck};
}
// Returns a view to before_key(pos._ck) if pos.is_clustering_row() else returns pos as-is.
static position_in_partition_view before_key(position_in_partition_view pos) {
return {partition_region::clustered, pos._bound_weight == bound_weight::equal ? bound_weight::before_all_prefixed : pos._bound_weight, pos._ck};
}
partition_region region() const { return _type; }
bound_weight get_bound_weight() const { return _bound_weight; }
bool is_partition_start() const { return _type == partition_region::partition_start; }

View File

@@ -27,6 +27,7 @@
reader_permit::impl::impl(reader_concurrency_semaphore& semaphore, reader_resources base_cost) : semaphore(semaphore), base_cost(base_cost) {
semaphore.consume(base_cost);
}
reader_permit::impl::~impl() {
@@ -88,7 +89,6 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
_resources += r;
while (!_wait_list.empty() && has_available_units(_wait_list.front().res)) {
auto& x = _wait_list.front();
_resources -= x.res;
try {
x.pr.set_value(reader_permit(*this, x.res));
} catch (...) {
@@ -160,7 +160,6 @@ future<reader_permit> reader_concurrency_semaphore::wait_admission(size_t memory
--_inactive_read_stats.population;
}
if (may_proceed(r)) {
_resources -= r;
return make_ready_future<reader_permit>(reader_permit(*this, r));
}
promise<reader_permit> pr;
@@ -170,7 +169,6 @@ future<reader_permit> reader_concurrency_semaphore::wait_admission(size_t memory
}
reader_permit reader_concurrency_semaphore::consume_resources(resources r) {
_resources -= r;
return reader_permit(*this, r);
}

View File

@@ -128,6 +128,10 @@ private:
return has_available_units(r) && _wait_list.empty();
}
void consume(resources r) {
_resources -= r;
}
void consume_memory(size_t memory) {
_resources.memory -= memory;
}

View File

@@ -42,6 +42,8 @@
constexpr int32_t schema::NAME_LENGTH;
extern logging::logger dblog;
sstring to_sstring(column_kind k) {
switch (k) {
case column_kind::partition_key: return "PARTITION_KEY";
@@ -575,11 +577,15 @@ schema::get_column_definition(const bytes& name) const {
const column_definition&
schema::column_at(column_kind kind, column_id id) const {
return _raw._columns.at(column_offset(kind) + id);
return column_at(static_cast<ordinal_column_id>(column_offset(kind) + id));
}
const column_definition&
schema::column_at(ordinal_column_id ordinal_id) const {
if (size_t(ordinal_id) >= _raw._columns.size()) {
on_internal_error(dblog, format("{}.{}@{}: column id {:d} >= {:d}",
ks_name(), cf_name(), version(), size_t(ordinal_id), _raw._columns.size()));
}
return _raw._columns.at(static_cast<column_count_type>(ordinal_id));
}

Submodule seastar updated: 065a40b34a...748428930a

View File

@@ -92,7 +92,7 @@ void migration_manager::init_messaging_service()
//FIXME: future discarded.
(void)with_gate(_background_tasks, [this] {
mlogger.debug("features changed, recalculating schema version");
return update_schema_version_and_announce(get_storage_proxy(), _feat.cluster_schema_features());
return db::schema_tables::recalculate_schema_version(get_storage_proxy(), _feat);
});
};

View File

@@ -2034,6 +2034,11 @@ void table::set_schema(schema_ptr s) {
}
_schema = std::move(s);
for (auto&& v : _views) {
v->view_info()->set_base_info(
v->view_info()->make_base_dependent_view_info(*_schema));
}
set_compaction_strategy(_schema->compaction_strategy());
trigger_compaction();
}
@@ -2045,7 +2050,8 @@ static std::vector<view_ptr>::iterator find_view(std::vector<view_ptr>& views, c
}
void table::add_or_update_view(view_ptr v) {
v->view_info()->initialize_base_dependent_fields(*schema());
v->view_info()->set_base_info(
v->view_info()->make_base_dependent_view_info(*_schema));
auto existing = find_view(_views, v);
if (existing != _views.end()) {
*existing = std::move(v);
@@ -2098,7 +2104,7 @@ static size_t memory_usage_of(const std::vector<frozen_mutation_and_schema>& ms)
* @return a future resolving to the mutations to apply to the views, which can be empty.
*/
future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
std::vector<db::view::view_and_base>&& views,
mutation&& m,
flat_mutation_reader_opt existings) const {
auto base_token = m.token();
@@ -2206,7 +2212,7 @@ table::local_base_lock(
* @return a future that resolves when the updates have been acknowledged by the view replicas
*/
future<> table::populate_views(
std::vector<view_ptr> views,
std::vector<db::view::view_and_base> views,
dht::token base_token,
flat_mutation_reader&& reader) {
auto& schema = reader.schema();
@@ -2527,7 +2533,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
}
auto& base = schema();
m.upgrade(base);
auto views = affected_views(base, m);
auto views = db::view::with_base_info_snapshot(affected_views(base, m));
if (views.empty()) {
return make_ready_future<row_locker::lock_holder>();
}

View File

@@ -2847,3 +2847,488 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source)
run_mutation_source_tests(make_populate);
}
namespace {
std::deque<mutation_fragment> copy_fragments(const schema& s, const std::deque<mutation_fragment>& o) {
std::deque<mutation_fragment> buf;
for (const auto& mf : o) {
buf.emplace_back(s, mf);
}
return buf;
}
flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::deque<mutation_fragment> first_buffer,
position_in_partition_view last_fragment_position,
std::deque<mutation_fragment> second_buffer,
size_t max_buffer_size) {
class factory {
schema_ptr _schema;
std::optional<std::deque<mutation_fragment>> _first_buffer;
std::optional<std::deque<mutation_fragment>> _second_buffer;
size_t _max_buffer_size;
private:
std::optional<std::deque<mutation_fragment>> copy_buffer(const std::optional<std::deque<mutation_fragment>>& o) {
if (!o) {
return {};
}
return copy_fragments(*_schema, *o);
}
public:
factory(schema_ptr schema, std::deque<mutation_fragment> first_buffer, std::deque<mutation_fragment> second_buffer, size_t max_buffer_size)
: _schema(std::move(schema)), _first_buffer(std::move(first_buffer)), _second_buffer(std::move(second_buffer)), _max_buffer_size(max_buffer_size) {
}
factory(const factory& o)
: _schema(o._schema)
, _first_buffer(copy_buffer(o._first_buffer))
, _second_buffer(copy_buffer(o._second_buffer)) {
}
factory(factory&& o) = default;
flat_mutation_reader operator()(
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
BOOST_REQUIRE(s == _schema);
if (_first_buffer) {
auto buf = *std::exchange(_first_buffer, {});
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf));
rd.set_max_buffer_size(_max_buffer_size);
return rd;
}
if (_second_buffer) {
auto buf = *std::exchange(_second_buffer, {});
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf));
rd.set_max_buffer_size(_max_buffer_size);
return rd;
}
return make_empty_flat_reader(_schema);
}
};
auto ms = mutation_source(factory(schema, std::move(first_buffer), std::move(second_buffer), max_buffer_size));
auto [rd, handle] = make_manually_paused_evictable_reader(
std::move(ms),
schema,
semaphore,
prange,
slice,
seastar::default_priority_class(),
nullptr,
mutation_reader::forwarding::yes);
rd.set_max_buffer_size(max_buffer_size);
rd.fill_buffer(db::no_timeout).get0();
const auto eq_cmp = position_in_partition::equal_compare(*schema);
BOOST_REQUIRE(rd.is_buffer_full());
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position));
BOOST_REQUIRE(!rd.is_end_of_stream());
rd.detach_buffer();
handle.pause();
while(semaphore.try_evict_one_inactive_read());
return std::move(rd);
}
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{});
simple_schema s;
const auto pkey = s.make_pkey();
size_t max_buffer_size = 512;
const int first_ck = 100;
const int second_buffer_ck = first_ck + 100;
size_t mem_usage = 0;
std::deque<mutation_fragment> first_buffer;
first_buffer.emplace_back(partition_start{pkey, {}});
mem_usage = first_buffer.back().memory_usage(*s.schema());
for (int i = 0; i < second_buffer_ck; ++i) {
first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v"));
mem_usage += first_buffer.back().memory_usage(*s.schema());
}
const auto last_fragment_position = position_in_partition(first_buffer.back().position());
max_buffer_size = mem_usage;
first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v"));
std::deque<mutation_fragment> second_buffer;
second_buffer.emplace_back(partition_start{pkey, {}});
mem_usage = second_buffer.back().memory_usage(*s.schema());
second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10))));
int ckey = second_buffer_ck;
while (mem_usage <= max_buffer_size) {
second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v"));
mem_usage += second_buffer.back().memory_usage(*s.schema());
}
second_buffer.emplace_back(partition_end{});
auto rd = create_evictable_reader_and_evict_after_first_buffer(s.schema(), semaphore, query::full_partition_range,
s.schema()->full_slice(), std::move(first_buffer), last_fragment_position, std::move(second_buffer), max_buffer_size);
rd.fill_buffer(db::no_timeout).get();
const auto tri_cmp = position_in_partition::tri_compare(*s.schema());
BOOST_REQUIRE(tri_cmp(last_fragment_position, rd.peek_buffer().position()) < 0);
}
namespace {
void check_evictable_reader_validation_is_triggered(
std::string_view test_name,
std::string_view error_prefix, // empty str if no exception is expected
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::deque<mutation_fragment> first_buffer,
position_in_partition_view last_fragment_position,
std::deque<mutation_fragment> second_buffer,
size_t max_buffer_size) {
testlog.info("check_evictable_reader_validation_is_triggered(): checking {} test case: {}", error_prefix.empty() ? "positive" : "negative", test_name);
auto rd = create_evictable_reader_and_evict_after_first_buffer(std::move(schema), semaphore, prange, slice, std::move(first_buffer),
last_fragment_position, std::move(second_buffer), max_buffer_size);
const bool fail = !error_prefix.empty();
try {
rd.fill_buffer(db::no_timeout).get0();
} catch (std::runtime_error& e) {
if (fail) {
if (error_prefix == std::string_view(e.what(), error_prefix.size())) {
testlog.trace("Expected exception caught: {}", std::current_exception());
return;
} else {
BOOST_FAIL(fmt::format("Exception with unexpected message caught: {}", std::current_exception()));
}
} else {
BOOST_FAIL(fmt::format("Unexpected exception caught: {}", std::current_exception()));
}
}
if (fail) {
BOOST_FAIL(fmt::format("Expected exception not thrown"));
}
}
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
set_abort_on_internal_error(false);
auto reset_on_internal_abort = defer([] {
set_abort_on_internal_error(true);
});
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{});
simple_schema s;
auto pkeys = s.make_pkeys(4);
boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
size_t max_buffer_size = 512;
const int first_ck = 100;
const int second_buffer_ck = first_ck + 100;
const int last_ck = second_buffer_ck + 100;
static const char partition_error_prefix[] = "maybe_validate_partition_start(): validation failed";
static const char position_in_partition_error_prefix[] = "validate_position_in_partition(): validation failed";
static const char trim_range_tombstones_error_prefix[] = "maybe_trim_range_tombstone(): validation failed";
const auto prange = dht::partition_range::make(
dht::partition_range::bound(pkeys[1], true),
dht::partition_range::bound(pkeys[2], true));
const auto ckrange = query::clustering_range::make(
query::clustering_range::bound(s.make_ckey(first_ck), true),
query::clustering_range::bound(s.make_ckey(last_ck), true));
const auto slice = partition_slice_builder(*s.schema()).with_range(ckrange).build();
std::deque<mutation_fragment> first_buffer;
first_buffer.emplace_back(partition_start{pkeys[1], {}});
size_t mem_usage = first_buffer.back().memory_usage(*s.schema());
for (int i = 0; i < second_buffer_ck; ++i) {
first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v"));
mem_usage += first_buffer.back().memory_usage(*s.schema());
}
max_buffer_size = mem_usage;
auto last_fragment_position = position_in_partition(first_buffer.back().position());
first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v"));
auto make_second_buffer = [&s, &max_buffer_size, second_buffer_ck] (dht::decorated_key pkey, std::optional<int> first_ckey = {},
bool inject_range_tombstone = false) mutable {
auto ckey = first_ckey ? *first_ckey : second_buffer_ck;
std::deque<mutation_fragment> second_buffer;
second_buffer.emplace_back(partition_start{std::move(pkey), {}});
size_t mem_usage = second_buffer.back().memory_usage(*s.schema());
if (inject_range_tombstone) {
second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(last_ck))));
}
while (mem_usage <= max_buffer_size) {
second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v"));
mem_usage += second_buffer.back().memory_usage(*s.schema());
}
second_buffer.emplace_back(partition_end{});
return second_buffer;
};
//
// Continuing the same partition
//
check_evictable_reader_validation_is_triggered(
"pkey < _last_pkey; pkey ∉ prange",
partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[0]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey",
"",
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange (<)",
position_in_partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], first_ck - 10),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange (<); start with trimmable range-tombstone",
position_in_partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], first_ck - 10, true),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition",
position_in_partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], second_buffer_ck - 2),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition; start with trimmable range-tombstone",
position_in_partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], second_buffer_ck - 2, true),
max_buffer_size);
{
auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck);
second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck - 10)));
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; end(range_tombstone) < _next_position_in_partition",
trim_range_tombstones_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
std::move(second_buffer),
max_buffer_size);
}
{
auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck);
second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10)));
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; end(range_tombstone) > _next_position_in_partition",
"",
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
std::move(second_buffer),
max_buffer_size);
}
{
auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck);
second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_starting_with(s.make_ckey(last_ck + 10)));
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; start(range_tombstone) ∉ ckrange (>)",
position_in_partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
std::move(second_buffer),
max_buffer_size);
}
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∈ ckrange",
"",
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], second_buffer_ck),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange (>)",
position_in_partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], last_ck + 10),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∈ pkrange",
partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[2]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∉ pkrange",
partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[3]),
max_buffer_size);
//
// Continuing from next partition
//
first_buffer.clear();
first_buffer.emplace_back(partition_start{pkeys[1], {}});
mem_usage = first_buffer.back().memory_usage(*s.schema());
for (int i = 0; i < second_buffer_ck; ++i) {
first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v"));
mem_usage += first_buffer.back().memory_usage(*s.schema());
}
first_buffer.emplace_back(partition_end{});
mem_usage += first_buffer.back().memory_usage(*s.schema());
last_fragment_position = position_in_partition(first_buffer.back().position());
max_buffer_size = mem_usage;
first_buffer.emplace_back(partition_start{pkeys[2], {}});
check_evictable_reader_validation_is_triggered(
"pkey < _last_pkey; pkey ∉ pkrange",
partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[0]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey",
partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∈ pkrange",
"",
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[2]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∉ pkrange",
partition_error_prefix,
s.schema(),
semaphore,
prange,
slice,
copy_fragments(*s.schema(), first_buffer),
last_fragment_position,
make_second_buffer(pkeys[3]),
max_buffer_size);
}

View File

@@ -118,6 +118,25 @@ SEASTAR_TEST_CASE(test_access_and_schema) {
});
}
SEASTAR_TEST_CASE(test_column_dropped_from_base) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("create table cf (p int, c ascii, a int, v int, primary key (p, c));").get();
e.execute_cql("create materialized view vcf as select p, c, v from cf "
"where v is not null and p is not null and c is not null "
"primary key (v, p, c)").get();
e.execute_cql("alter table cf drop a;").get();
e.execute_cql("insert into cf (p, c, v) values (0, 'foo', 1);").get();
eventually([&] {
auto msg = e.execute_cql("select v from vcf").get0();
assert_that(msg).is_rows()
.with_size(1)
.with_row({
{int32_type->decompose(1)}
});
});
});
}
SEASTAR_TEST_CASE(test_updates) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("create table base (k int, v int, primary key (k));").get();

View File

@@ -289,9 +289,9 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
// query command is stored on a parameters map with a 'query' key
auto query_str_it = record.parameters.find("query");
const auto query_str_it = record.parameters.find("query");
if (query_str_it == record.parameters.end()) {
throw std::logic_error("No \"query\" parameter set for a session requesting a slow_query_log record");
tlogger.trace("No \"query\" parameter set for a session requesting a slow_query_log record");
}
// parameters map
@@ -312,7 +312,9 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
cql3::raw_value::make_value(timeuuid_type->decompose(start_time_id)),
cql3::raw_value::make_value(utf8_type->decompose(query_str_it->second)),
query_str_it != record.parameters.end()
? cql3::raw_value::make_value(utf8_type->decompose(query_str_it->second))
: cql3::raw_value::make_null(),
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
cql3::raw_value::make_value(make_map_value(my_map_type, map_type_impl::native_type(std::move(parameters_values_vector))).serialize()),
cql3::raw_value::make_value(inet_addr_type->decompose(record.client.addr())),

View File

@@ -24,6 +24,7 @@
#include "dht/i_partitioner.hh"
#include "query-request.hh"
#include "schema_fwd.hh"
#include "db/view/view.hh"
namespace cql3::statements {
class select_statement;
@@ -35,9 +36,7 @@ class view_info final {
// The following fields are used to select base table rows.
mutable shared_ptr<cql3::statements::select_statement> _select_statement;
mutable std::optional<query::partition_slice> _partition_slice;
// Id of a regular base table column included in the view's PK, if any.
// Scylla views only allow one such column, alternator can have up to two.
mutable std::vector<column_id> _base_non_pk_columns_in_view_pk;
db::view::base_info_ptr _base_info;
public:
view_info(const schema& schema, const raw_view_info& raw_view_info);
@@ -65,8 +64,22 @@ public:
const query::partition_slice& partition_slice() const;
const column_definition* view_column(const schema& base, column_id base_id) const;
const column_definition* view_column(const column_definition& base_def) const;
const std::vector<column_id>& base_non_pk_columns_in_view_pk() const;
void initialize_base_dependent_fields(const schema& base);
bool has_base_non_pk_columns_in_view_pk() const;
/// Returns a pointer to the base_dependent_view_info which matches the current
/// schema of the base table.
///
/// base_dependent_view_info lives separately from the view schema.
/// It can change without the view schema changing its value.
/// This pointer is updated on base table schema changes as long as this view_info
/// corresponds to the current schema of the view. After that the pointer stops tracking
/// the base table schema.
///
/// The snapshot of both the view schema and base_dependent_view_info is represented
/// by view_and_base. See with_base_info_snapshot().
const db::view::base_info_ptr& base_info() const { return _base_info; }
void set_base_info(db::view::base_info_ptr);
db::view::base_info_ptr make_base_dependent_view_info(const schema& base_schema) const;
friend bool operator==(const view_info& x, const view_info& y) {
return x._raw == y._raw;