Files
scylladb/cdc/split.cc
Kamil Braun a1e235b1a4 CDC: Don't split collection tombstone away from base update
Overwriting a collection cell using timestamp T is a process with
following steps:
1. inserting a row marker (if applicable) with timestamp T;
2. writing a collection tombstone with timestamp T-1;
3. writing the new collection value with timestamp T.
Since CDC does clustering of the operations by timestamp, this
would result in 3 separate calls to `transform` (in case of
INSERT, or 2 - in the case of UPDATE), which seems excessive,
especially when pre-/postimage is enabled. This patch makes
collection tombstones being treated as if they had the same TS as
the base write and thus they are processed in one call to `transform`
(as long as TTLs are not used).

Also, `cdc_test` had to be updated in places that relied on former
splitting strategy.

Fixes #6084
2020-06-07 17:09:05 +03:00

494 lines
19 KiB
C++

/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mutation.hh"
#include "schema.hh"
#include "split.hh"
#include "log.hh"
struct atomic_column_update {
column_id id;
atomic_cell cell;
};
struct nonatomic_column_update {
column_id id;
tombstone t; // optional
utils::chunked_vector<std::pair<bytes, atomic_cell>> cells;
};
struct static_row_update {
gc_clock::duration ttl;
std::vector<atomic_column_update> atomic_entries;
std::vector<nonatomic_column_update> nonatomic_entries;
};
struct clustered_row_insert {
gc_clock::duration ttl;
clustering_key key;
row_marker marker;
std::vector<atomic_column_update> atomic_entries;
std::vector<nonatomic_column_update> nonatomic_entries;
};
struct clustered_row_update {
gc_clock::duration ttl;
clustering_key key;
std::vector<atomic_column_update> atomic_entries;
std::vector<nonatomic_column_update> nonatomic_entries;
};
struct clustered_row_deletion {
clustering_key key;
tombstone t;
};
struct clustered_range_deletion {
range_tombstone rt;
};
struct partition_deletion {
tombstone t;
};
struct batch {
std::vector<static_row_update> static_updates;
std::vector<clustered_row_insert> clustered_inserts;
std::vector<clustered_row_update> clustered_updates;
std::vector<clustered_row_deletion> clustered_row_deletions;
std::vector<clustered_range_deletion> clustered_range_deletions;
std::optional<partition_deletion> partition_deletions;
};
using set_of_changes = std::map<api::timestamp_type, batch>;
struct row_update {
std::vector<atomic_column_update> atomic_entries;
std::vector<nonatomic_column_update> nonatomic_entries;
};
static
std::map<std::pair<api::timestamp_type, gc_clock::duration>, row_update>
extract_row_updates(const row& r, column_kind ckind, const schema& schema) {
std::map<std::pair<api::timestamp_type, gc_clock::duration>, row_update> result;
r.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
auto& cdef = schema.column_at(ckind, id);
if (cdef.is_atomic()) {
auto view = cell.as_atomic_cell(cdef);
auto timestamp_and_ttl = std::pair(
view.timestamp(),
view.is_live_and_has_ttl() ? view.ttl() : gc_clock::duration(0)
);
result[timestamp_and_ttl].atomic_entries.push_back({id, atomic_cell(*cdef.type, view)});
return;
}
cell.as_collection_mutation().with_deserialized(*cdef.type, [&] (collection_mutation_view_description mview) {
auto desc = mview.materialize(*cdef.type);
for (auto& [k, v]: desc.cells) {
auto timestamp_and_ttl = std::pair(
v.timestamp(),
v.is_live_and_has_ttl() ? v.ttl() : gc_clock::duration(0)
);
auto& updates = result[timestamp_and_ttl].nonatomic_entries;
if (updates.empty() || updates.back().id != id) {
updates.push_back({id, {}});
}
updates.back().cells.push_back({std::move(k), std::move(v)});
}
if (desc.tomb) {
auto timestamp_and_ttl = std::pair(desc.tomb.timestamp + 1, gc_clock::duration(0));
auto& updates = result[timestamp_and_ttl].nonatomic_entries;
if (updates.empty() || updates.back().id != id) {
updates.push_back({id, {}});
}
updates.back().t = std::move(desc.tomb);
}
});
});
return result;
};
set_of_changes extract_changes(const mutation& base_mutation, const schema& base_schema) {
set_of_changes res;
auto& p = base_mutation.partition();
auto sr_updates = extract_row_updates(p.static_row().get(), column_kind::static_column, base_schema);
for (auto& [k, up]: sr_updates) {
auto [timestamp, ttl] = k;
res[timestamp].static_updates.push_back({
ttl,
std::move(up.atomic_entries),
std::move(up.nonatomic_entries)
});
}
for (const rows_entry& cr : p.clustered_rows()) {
auto cr_updates = extract_row_updates(cr.row().cells(), column_kind::regular_column, base_schema);
const auto& marker = cr.row().marker();
auto marker_timestamp = marker.timestamp();
auto marker_ttl = marker.is_expiring() ? marker.ttl() : gc_clock::duration(0);
if (marker.is_live()) {
// make sure that an entry corresponding to the row marker's timestamp and ttl is in the map
(void)cr_updates[std::pair(marker_timestamp, marker_ttl)];
}
auto is_insert = [&] (api::timestamp_type timestamp, gc_clock::duration ttl) {
if (!marker.is_live()) {
return false;
}
return timestamp == marker_timestamp && ttl == marker_ttl;
};
for (auto& [k, up]: cr_updates) {
// It is important that changes in the resulting `set_of_changes` are listed
// in increasing TTL order. The reason is explained in a comment in cdc/log.cc,
// search for "#6070".
auto [timestamp, ttl] = k;
if (is_insert(timestamp, ttl)) {
res[timestamp].clustered_inserts.push_back({
ttl,
cr.key(),
marker,
std::move(up.atomic_entries),
{}
});
auto& cr_insert = res[timestamp].clustered_inserts.back();
bool clustered_update_exists = false;
for (auto& nonatomic_up: up.nonatomic_entries) {
// Updating a collection column with an INSERT statement implies inserting a tombstone.
//
// For example, suppose that we have:
// CREATE TABLE t (a int primary key, b map<int, int>);
// Then the following statement:
// INSERT INTO t (a, b) VALUES (0, {0:0}) USING TIMESTAMP T;
// creates a tombstone in column b with timestamp T-1.
// It also creates a cell (0, 0) with timestamp T.
//
// There is no way to create just the cell using an INSERT statement.
// This can only be done using an UPDATE, as follows:
// UPDATE t USING TIMESTAMP T SET b = b + {0:0} WHERE a = 0;
// note that this is different than
// UPDATE t USING TIMESTAMP T SET b = {0:0} WHERE a = 0;
// which also creates a tombstone with timestamp T-1.
//
// It follows that:
// - if `nonatomic_up` has a tombstone, it can be made merged with our `cr_insert`,
// which represents an INSERT change.
// - but if `nonatomic_up` only has cells, we must create a separate UPDATE change
// for the cells alone.
if (nonatomic_up.t) {
cr_insert.nonatomic_entries.push_back(std::move(nonatomic_up));
} else {
if (!clustered_update_exists) {
res[timestamp].clustered_updates.push_back({
ttl,
cr.key(),
{},
{}
});
// Multiple iterations of this `for` loop (for different collection columns)
// might want to put their `nonatomic_up`s into an UPDATE change;
// but we don't want to create a separate change for each of them, reusing one instead.
//
// Example:
// CREATE TABLE t (a int primary key, b map<int, int>, c map <int, int>) with cdc = {'enabled':true};
// insert into t (a, b, c) values (0, {1:1}, {2:2}) USING TTL 5;
//
// this should create 3 delta rows:
// 1. one for the row marker (indicating an INSERT), with TTL 5
// 2. one for the b and c tombstones, without TTL (cdc$ttl = null)
// 3. one for the b and c cells, with TTL 5
// This logic takes care that b cells and c cells are put into a single change (3. above).
clustered_update_exists = true;
}
auto& cr_update = res[timestamp].clustered_updates.back();
cr_update.nonatomic_entries.push_back(std::move(nonatomic_up));
}
}
} else {
res[timestamp].clustered_updates.push_back({
ttl,
cr.key(),
std::move(up.atomic_entries),
std::move(up.nonatomic_entries)
});
}
}
auto row_tomb = cr.row().deleted_at().regular();
if (row_tomb) {
res[row_tomb.timestamp].clustered_row_deletions.push_back({cr.key(), row_tomb});
}
}
for (const auto& rt: p.row_tombstones()) {
if (rt.tomb.timestamp != api::missing_timestamp) {
res[rt.tomb.timestamp].clustered_range_deletions.push_back({rt});
}
}
auto partition_tomb_timestamp = p.partition_tombstone().timestamp;
if (partition_tomb_timestamp != api::missing_timestamp) {
res[partition_tomb_timestamp].partition_deletions = {p.partition_tombstone()};
}
return res;
}
namespace cdc {
bool should_split(const mutation& base_mutation, const schema& base_schema) {
auto& p = base_mutation.partition();
api::timestamp_type found_ts = api::missing_timestamp;
std::optional<gc_clock::duration> found_ttl; // 0 = "no ttl"
auto check_or_set = [&] (api::timestamp_type ts, gc_clock::duration ttl) {
if (found_ts != api::missing_timestamp && found_ts != ts) {
return true;
}
found_ts = ts;
if (found_ttl && *found_ttl != ttl) {
return true;
}
found_ttl = ttl;
return false;
};
bool had_static_row = false;
bool should_split = false;
p.static_row().get().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
had_static_row = true;
auto& cdef = base_schema.column_at(column_kind::static_column, id);
if (cdef.is_atomic()) {
auto view = cell.as_atomic_cell(cdef);
if (check_or_set(view.timestamp(), view.is_live_and_has_ttl() ? view.ttl() : gc_clock::duration(0))) {
should_split = true;
}
return;
}
cell.as_collection_mutation().with_deserialized(*cdef.type, [&] (collection_mutation_view_description mview) {
auto desc = mview.materialize(*cdef.type);
for (auto& [k, v]: desc.cells) {
if (check_or_set(v.timestamp(), v.is_live_and_has_ttl() ? v.ttl() : gc_clock::duration(0))) {
should_split = true;
return;
}
}
if (desc.tomb) {
if (check_or_set(desc.tomb.timestamp + 1, gc_clock::duration(0))) {
should_split = true;
return;
}
}
});
});
if (should_split) {
return true;
}
bool had_clustered_row = false;
if (!p.clustered_rows().empty() && had_static_row) {
return true;
}
for (const rows_entry& cr : p.clustered_rows()) {
had_clustered_row = true;
const auto& marker = cr.row().marker();
if (marker.is_live() && check_or_set(marker.timestamp(), marker.is_expiring() ? marker.ttl() : gc_clock::duration(0))) {
return true;
}
bool is_insert = marker.is_live();
bool had_cells = false;
cr.row().cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
had_cells = true;
auto& cdef = base_schema.column_at(column_kind::regular_column, id);
if (cdef.is_atomic()) {
auto view = cell.as_atomic_cell(cdef);
if (check_or_set(view.timestamp(), view.is_live_and_has_ttl() ? view.ttl() : gc_clock::duration(0))) {
should_split = true;
}
return;
}
cell.as_collection_mutation().with_deserialized(*cdef.type, [&] (collection_mutation_view_description mview) {
for (auto& [k, v]: mview.cells) {
if (check_or_set(v.timestamp(), v.is_live_and_has_ttl() ? v.ttl() : gc_clock::duration(0))) {
should_split = true;
return;
}
if (is_insert) {
// nonatomic updates cannot be expressed with an INSERT.
should_split = true;
return;
}
}
if (mview.tomb) {
if (check_or_set(mview.tomb.timestamp + 1, gc_clock::duration(0))) {
should_split = true;
return;
}
}
});
});
if (should_split) {
return true;
}
auto row_tomb = cr.row().deleted_at().regular();
if (row_tomb) {
if (had_cells) {
return true;
}
// there were no cells, so no ttl
assert(!found_ttl);
if (found_ts != api::missing_timestamp && found_ts != row_tomb.timestamp) {
return true;
}
found_ts = row_tomb.timestamp;
}
}
if (!p.row_tombstones().empty() && (had_static_row || had_clustered_row)) {
return true;
}
for (const auto& rt: p.row_tombstones()) {
if (rt.tomb) {
if (found_ts != api::missing_timestamp && found_ts != rt.tomb.timestamp) {
return true;
}
found_ts = rt.tomb.timestamp;
}
}
if (p.partition_tombstone().timestamp != api::missing_timestamp
&& (!p.row_tombstones().empty() || had_static_row || had_clustered_row)) {
return true;
}
// A mutation with no timestamp will be split into 0 mutations
return found_ts == api::missing_timestamp;
}
void for_each_change(const mutation& base_mutation, const schema_ptr& base_schema,
seastar::noncopyable_function<void(mutation, api::timestamp_type, bytes, int&)> f) {
auto changes = extract_changes(base_mutation, *base_schema);
auto pk = base_mutation.key();
for (auto& [change_ts, btch] : changes) {
auto tuuid = timeuuid_type->decompose(generate_timeuuid(change_ts));
int batch_no = 0;
for (auto& sr_update : btch.static_updates) {
mutation m(base_schema, pk);
for (auto& atomic_update : sr_update.atomic_entries) {
auto& cdef = base_schema->column_at(column_kind::static_column, atomic_update.id);
m.set_static_cell(cdef, std::move(atomic_update.cell));
}
for (auto& nonatomic_update : sr_update.nonatomic_entries) {
auto& cdef = base_schema->column_at(column_kind::static_column, nonatomic_update.id);
m.set_static_cell(cdef, collection_mutation_description{nonatomic_update.t, std::move(nonatomic_update.cells)}.serialize(*cdef.type));
}
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& cr_insert : btch.clustered_inserts) {
mutation m(base_schema, pk);
auto& row = m.partition().clustered_row(*base_schema, cr_insert.key);
for (auto& atomic_update : cr_insert.atomic_entries) {
auto& cdef = base_schema->column_at(column_kind::regular_column, atomic_update.id);
row.cells().apply(cdef, std::move(atomic_update.cell));
}
for (auto& nonatomic_update : cr_insert.nonatomic_entries) {
auto& cdef = base_schema->column_at(column_kind::regular_column, nonatomic_update.id);
row.cells().apply(cdef, collection_mutation_description{nonatomic_update.t, std::move(nonatomic_update.cells)}.serialize(*cdef.type));
}
row.apply(cr_insert.marker);
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& cr_update : btch.clustered_updates) {
mutation m(base_schema, pk);
auto& row = m.partition().clustered_row(*base_schema, cr_update.key).cells();
for (auto& atomic_update : cr_update.atomic_entries) {
auto& cdef = base_schema->column_at(column_kind::regular_column, atomic_update.id);
row.apply(cdef, std::move(atomic_update.cell));
}
for (auto& nonatomic_update : cr_update.nonatomic_entries) {
auto& cdef = base_schema->column_at(column_kind::regular_column, nonatomic_update.id);
row.apply(cdef, collection_mutation_description{nonatomic_update.t, std::move(nonatomic_update.cells)}.serialize(*cdef.type));
}
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& cr_delete : btch.clustered_row_deletions) {
mutation m(base_schema, pk);
m.partition().apply_delete(*base_schema, cr_delete.key, cr_delete.t);
f(std::move(m), change_ts, tuuid, batch_no);
}
for (auto& crange_delete : btch.clustered_range_deletions) {
mutation m(base_schema, pk);
m.partition().apply_delete(*base_schema, crange_delete.rt);
f(std::move(m), change_ts, tuuid, batch_no);
}
if (btch.partition_deletions) {
mutation m(base_schema, pk);
m.partition().apply(btch.partition_deletions->t);
f(std::move(m), change_ts, tuuid, batch_no);
}
}
}
} // namespace cdc