merge: cdc: Remove post-filterings for keys-only/off cdc delta generation

Merged pull request https://github.com/scylladb/scylla/pull/7121
By Calle Wilund:

Refs #7095
Fixes #7128

CDC delta!=full both relied on post-filtering
to remove generated log row and/or cells. This is inefficient.
Instead, simply check if the data should be created in the
visitors.

Also removed delta_mode=off mode.

  cdc: Remove post-filterings for keys-only/off cdc delta generation
  cdc: Remove cdc delta_mode::off
This commit is contained in:
Nadav Har'El
2020-08-31 11:22:09 +03:00
4 changed files with 38 additions and 126 deletions

View File

@@ -28,7 +28,6 @@
namespace cdc {
enum class delta_mode : uint8_t {
off,
keys,
full,
};

View File

@@ -295,17 +295,15 @@ future<> cdc::cdc_service::stop() {
cdc::cdc_service::~cdc_service() = default;
namespace {
static const sstring delta_mode_string_off = "off";
static const sstring delta_mode_string_keys = "keys";
static const sstring delta_mode_string_full = "full";
static const std::string_view image_mode_string_on = "on";
static const std::string_view image_mode_string_off = delta_mode_string_off;
static const std::string_view image_mode_string_off = "off";
static const std::string_view image_mode_string_full = delta_mode_string_full;
sstring to_string(cdc::delta_mode dm) {
switch (dm) {
case cdc::delta_mode::off : return delta_mode_string_off;
case cdc::delta_mode::keys : return delta_mode_string_keys;
case cdc::delta_mode::full : return delta_mode_string_full;
}
@@ -363,8 +361,6 @@ cdc::options::options(const std::map<sstring, sstring>& map) {
} else if (key == "delta") {
if (val == delta_mode_string_keys) {
_delta_mode = delta_mode::keys;
} else if (val == delta_mode_string_off) {
_delta_mode = delta_mode::off;
} else if (val != delta_mode_string_full) {
throw exceptions::configuration_exception("Invalid value for CDC option \"delta\": " + p.second);
}
@@ -377,11 +373,6 @@ cdc::options::options(const std::map<sstring, sstring>& map) {
throw exceptions::configuration_exception("Invalid CDC option: " + p.first);
}
}
if (_enabled && !preimage() && !postimage() && get_delta_mode() == delta_mode::off) {
throw exceptions::configuration_exception("Invalid combination of CDC options: neither of"
" {preimage, postimage, delta} is enabled");
}
}
std::map<sstring, sstring> cdc::options::to_map() const {
@@ -985,6 +976,13 @@ static ttl_opt get_ttl(const row_marker& rm) {
return rm.is_expiring() ? std::optional{rm.ttl()} : std::nullopt;
}
/**
* Returns whether we should generate cdc delta values (beyond keys)
*/
static bool generate_delta_values(const schema& s) {
return s.cdc_options().get_delta_mode() == cdc::delta_mode::full;
}
/* Visits the cells and tombstones of a single base mutation row and constructs corresponding delta-row cells
* for the corresponding log mutation.
*
@@ -1016,13 +1014,16 @@ struct process_row_visitor {
// We need to keep a reference to it since we might insert new row_states during the visitation.
row_states_map& _clustering_row_states;
const bool _generate_delta_values = true;
process_row_visitor(
const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder,
bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state,
row_states_map& clustering_row_states)
row_states_map& clustering_row_states, bool generate_delta_values)
: _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder),
_enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state),
_clustering_row_states(clustering_row_states)
_clustering_row_states(clustering_row_states),
_generate_delta_values(generate_delta_values)
{}
void update_row_state(const column_definition& cdef, bytes_opt value) {
@@ -1040,7 +1041,9 @@ struct process_row_visitor {
bytes value = get_bytes(cell);
// delta
_builder.set_value(_log_ck, cdef, value);
if (_generate_delta_values) {
_builder.set_value(_log_ck, cdef, value);
}
// images
if (_enable_updating_state) {
@@ -1050,8 +1053,9 @@ struct process_row_visitor {
void dead_atomic_cell(const column_definition& cdef, const atomic_cell_view&) {
// delta
_builder.set_deleted(_log_ck, cdef);
if (_generate_delta_values) {
_builder.set_deleted(_log_ck, cdef);
}
// images
if (_enable_updating_state) {
update_row_state(cdef, std::nullopt);
@@ -1183,16 +1187,18 @@ struct process_row_visitor {
}
// delta
if (is_column_delete) {
_builder.set_deleted(_log_ck, cdef);
}
if (_generate_delta_values) {
if (is_column_delete) {
_builder.set_deleted(_log_ck, cdef);
}
if (deleted_elements) {
_builder.set_deleted_elements(_log_ck, cdef, *deleted_elements);
}
if (deleted_elements) {
_builder.set_deleted_elements(_log_ck, cdef, *deleted_elements);
}
if (added_cells) {
_builder.set_value(_log_ck, cdef, *added_cells);
if (added_cells) {
_builder.set_value(_log_ck, cdef, *added_cells);
}
}
// images
@@ -1225,6 +1231,8 @@ struct process_change_visitor {
row_states_map& _clustering_row_states;
cell_map& _static_row_state;
const bool _generate_delta_values = true;
void static_row_cells(auto&& visit_row_cells) {
_touched_parts.set<stats::part_type::STATIC_ROW>();
@@ -1232,7 +1240,7 @@ struct process_change_visitor {
process_row_visitor v(
log_ck, _touched_parts, _builder,
_enable_updating_state, nullptr, &_static_row_state, _clustering_row_states);
_enable_updating_state, nullptr, &_static_row_state, _clustering_row_states, _generate_delta_values);
visit_row_cells(v);
_builder.set_ttl(log_ck, v._ttl_column);
@@ -1242,20 +1250,12 @@ struct process_change_visitor {
_touched_parts.set<stats::part_type::CLUSTERING_ROW>();
auto log_ck = _builder.allocate_new_log_row();
_builder.set_clustering_columns(log_ck, ckey);
struct clustering_row_cells_visitor : public process_row_visitor {
operation _cdc_op = operation::update;
clustering_row_cells_visitor(
const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder,
bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state,
row_states_map& clustering_row_states)
: process_row_visitor(
log_ck, touched_parts, builder,
enable_updating_state, base_ck, row_state, clustering_row_states)
{}
using process_row_visitor::process_row_visitor;
void marker(const row_marker& rm) {
_ttl_column = get_ttl(rm);
@@ -1266,7 +1266,7 @@ struct process_change_visitor {
clustering_row_cells_visitor v(
log_ck, _touched_parts, _builder,
_enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey),
_clustering_row_states);
_clustering_row_states, _generate_delta_values);
visit_row_cells(v);
_builder.set_operation(log_ck, v._cdc_op);
@@ -1320,7 +1320,6 @@ struct process_change_visitor {
void partition_delete(const tombstone&) {
_touched_parts.set<stats::part_type::PARTITION_DELETE>();
auto log_ck = _builder.allocate_new_log_row(operation::partition_delete);
if (_enable_updating_state) {
_clustering_row_states.clear();
@@ -1422,56 +1421,6 @@ private:
stats::part_type_set _touched_parts;
// Remove non-key columns or entire delta rows, according to `delta` setting in `cdc` options.
void adjust_or_delete_deltas() {
if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::full) {
return;
}
static const auto& op_col = *_log_schema->get_column_definition(log_meta_column_name_bytes("operation"));
static const auto preimg_op_bytes = op_col.type->decompose(operation_native_type(operation::pre_image));
static const auto postimg_op_bytes = op_col.type->decompose(operation_native_type(operation::post_image));
for (auto& m : _result_mutations) {
auto& clustered_rows = m.partition().clustered_rows();
int deleted_rows_cnt = 0;
for (auto it = clustered_rows.begin(); it != clustered_rows.end(); /* no increment */) {
const auto& op_cell = it->row().cells().cell_at(op_col.id).as_atomic_cell(op_col);
const auto op_val = op_cell.value().linearize();
if (op_val != preimg_op_bytes && op_val != postimg_op_bytes) {
if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::off) {
it = m.partition().clustered_rows().erase_and_dispose(it, current_deleter<rows_entry>());
++deleted_rows_cnt;
continue;
}
// The case of `get_delta_mode() == delta_mode::keys`:
it->row().cells().remove_if([this, log_s = m.schema()] (column_id id, atomic_cell_or_collection& acoc) {
const auto& log_cdef = log_s->column_at(column_kind::regular_column, id);
if (is_cdc_metacolumn_name(log_cdef.name_as_text())) {
return false;
}
const auto* base_cdef = _schema->get_column_definition(log_cdef.name());
// Remove columns from delta that correspond to non-PK/CK columns in the base table.
return base_cdef != nullptr
&& (base_cdef->kind != column_kind::partition_key && base_cdef->kind != column_kind::clustering_key);
});
}
// Deletion of deltas might leave gaps in `batch_seq_no` - let's fix them.
if (deleted_rows_cnt > 0) {
const auto* batch_seq_no_cdef = m.schema()->get_column_definition(log_meta_column_name_bytes("batch_seq_no"));
assert(batch_seq_no_cdef != nullptr);
auto exploded_ck = it->key().explode();
const size_t batch_seq_no_idx = batch_seq_no_cdef->component_index();
const auto old_batch_seq_no = value_cast<int32_t>(int32_type->deserialize(exploded_ck[batch_seq_no_idx]));
exploded_ck[batch_seq_no_idx] = int32_type->decompose(old_batch_seq_no - deleted_rows_cnt);
it->key() = clustering_key::from_exploded(std::move(exploded_ck));
}
++it;
}
}
}
public:
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk)
: _ctx(ctx)
@@ -1561,7 +1510,8 @@ public:
._builder = *_builder,
._enable_updating_state = _enable_updating_state,
._clustering_row_states = _clustering_row_states,
._static_row_state = _static_row_state
._static_row_state = _static_row_state,
._generate_delta_values = generate_delta_values(_builder->base_schema())
};
cdc::inspect_mutation(m, v);
}
@@ -1569,7 +1519,6 @@ public:
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {
adjust_or_delete_deltas();
return std::make_pair<std::vector<mutation>, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts));
}

View File

@@ -1,11 +1,4 @@
create table tb1 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': false, 'postimage': false, 'delta': 'off'};
create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'off'};
-- Should add 1 row (postimage)
insert into tb2 (pk, ck, i1) VALUES (1, 11, 111) USING TTL 1111;
select "cdc$batch_seq_no", "cdc$operation", pk, ck, i1 from tb2_scylla_cdc_log where pk = 1 and ck = 11 allow filtering;
alter table tb2 with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'};
create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'};
-- Should add 3 rows (preimage + postimage + delta). Delta has only key columns and "pk" + "ck"
insert into tb2 (pk, ck, i1) VALUES (2, 22, 222) USING TTL 2222;
select "cdc$batch_seq_no", "cdc$operation", "cdc$ttl", pk, ck, i1 from tb2_scylla_cdc_log where pk = 2 and ck = 22 allow filtering;

View File

@@ -1,33 +1,4 @@
create table tb1 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': false, 'postimage': false, 'delta': 'off'};
{
"message" : "exceptions::configuration_exception (Invalid combination of CDC options: neither of {preimage, postimage, delta} is enabled)",
"status" : "error"
}
create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'off'};
{
"status" : "ok"
}
-- Should add 1 row (postimage)
insert into tb2 (pk, ck, i1) VALUES (1, 11, 111) USING TTL 1111;
{
"status" : "ok"
}
select "cdc$batch_seq_no", "cdc$operation", pk, ck, i1 from tb2_scylla_cdc_log where pk = 1 and ck = 11 allow filtering;
{
"rows" :
[
{
"cdc$batch_seq_no" : "0",
"cdc$operation" : "9",
"ck" : "11",
"i1" : "111",
"pk" : "1"
}
]
}
alter table tb2 with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'};
create table tb2 (pk int, ck int, i1 int, PRIMARY KEY (pk, ck)) with cdc = {'enabled': true, 'preimage': true, 'postimage': true, 'delta': 'keys'};
{
"status" : "ok"
}