cdc: use stream generations
Change the CDC code to use the global CDC stream generations. The per-base-table CDC description table was removed. The code instead uses cdc::metadata which is updated on gossip events. The per-table description tables were replaced by a global description table to be used by clients when searching for streams.
This commit is contained in:
433
cdc/cdc.cc
433
cdc/cdc.cc
@@ -27,6 +27,7 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
#include "cdc/cdc.hh"
|
||||
#include "cdc/generation.hh"
|
||||
#include "bytes.hh"
|
||||
#include "database.hh"
|
||||
#include "db/config.hh"
|
||||
@@ -43,13 +44,6 @@
|
||||
#include "log.hh"
|
||||
#include "json.hh"
|
||||
|
||||
using locator::snitch_ptr;
|
||||
using locator::token_metadata;
|
||||
using locator::topology;
|
||||
using seastar::sstring;
|
||||
using service::migration_notifier;
|
||||
using service::storage_proxy;
|
||||
|
||||
namespace std {
|
||||
|
||||
template<> struct hash<std::pair<net::inet_address, unsigned int>> {
|
||||
@@ -66,8 +60,6 @@ logging::logger cdc_log("cdc");
|
||||
|
||||
namespace cdc {
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
|
||||
static schema_ptr create_stream_description_table_schema(const schema&, std::optional<utils::UUID> = {});
|
||||
static future<> populate_desc(db_context ctx, const schema& s);
|
||||
}
|
||||
|
||||
class cdc::cdc_service::impl : service::migration_listener::empty_listener {
|
||||
@@ -97,14 +89,11 @@ public:
|
||||
if (!db.has_schema(schema.ks_name(), logname)) {
|
||||
// in seastar thread
|
||||
auto log_schema = create_log_schema(schema);
|
||||
auto stream_desc_schema = create_stream_description_table_schema(schema);
|
||||
auto& keyspace = db.find_keyspace(schema.ks_name());
|
||||
|
||||
auto log_mut = db::schema_tables::make_create_table_mutations(keyspace.metadata(), log_schema, timestamp);
|
||||
auto stream_mut = db::schema_tables::make_create_table_mutations(keyspace.metadata(), stream_desc_schema, timestamp);
|
||||
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(stream_mut.begin()), std::make_move_iterator(stream_mut.end()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -118,74 +107,41 @@ public:
|
||||
// etc.
|
||||
if (was_cdc || is_cdc) {
|
||||
auto logname = log_name(old_schema.cf_name());
|
||||
auto descname = desc_name(old_schema.cf_name());
|
||||
auto& db = _ctxt._proxy.get_db().local();
|
||||
auto& keyspace = db.find_keyspace(old_schema.ks_name());
|
||||
auto log_schema = was_cdc ? db.find_column_family(old_schema.ks_name(), logname).schema() : nullptr;
|
||||
auto stream_desc_schema = was_cdc ? db.find_column_family(old_schema.ks_name(), descname).schema() : nullptr;
|
||||
|
||||
if (!is_cdc) {
|
||||
auto log_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), log_schema, timestamp);
|
||||
auto stream_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), stream_desc_schema, timestamp);
|
||||
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(stream_mut.begin()), std::make_move_iterator(stream_mut.end()));
|
||||
return;
|
||||
}
|
||||
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
|
||||
auto new_stream_desc_schema = create_stream_description_table_schema(new_schema, stream_desc_schema ? std::make_optional(stream_desc_schema->id()) : std::nullopt);
|
||||
|
||||
auto log_mut = log_schema
|
||||
? db::schema_tables::make_update_table_mutations(keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
|
||||
: db::schema_tables::make_create_table_mutations(keyspace.metadata(), new_log_schema, timestamp)
|
||||
;
|
||||
auto stream_mut = stream_desc_schema
|
||||
? db::schema_tables::make_update_table_mutations(keyspace.metadata(), stream_desc_schema, new_stream_desc_schema, timestamp, false)
|
||||
: db::schema_tables::make_create_table_mutations(keyspace.metadata(), new_stream_desc_schema, timestamp)
|
||||
;
|
||||
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(stream_mut.begin()), std::make_move_iterator(stream_mut.end()));
|
||||
}
|
||||
}
|
||||
|
||||
void on_before_drop_column_family(const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
|
||||
if (schema.cdc_options().enabled()) {
|
||||
auto logname = log_name(schema.cf_name());
|
||||
auto descname = desc_name(schema.cf_name());
|
||||
auto& db = _ctxt._proxy.get_db().local();
|
||||
auto& keyspace = db.find_keyspace(schema.ks_name());
|
||||
auto log_schema = db.find_column_family(schema.ks_name(), logname).schema();
|
||||
auto stream_desc_schema = db.find_column_family(schema.ks_name(), descname).schema();
|
||||
|
||||
auto log_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), log_schema, timestamp);
|
||||
auto stream_mut = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), stream_desc_schema, timestamp);
|
||||
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(log_mut.begin()), std::make_move_iterator(log_mut.end()));
|
||||
mutations.insert(mutations.end(), std::make_move_iterator(stream_mut.begin()), std::make_move_iterator(stream_mut.end()));
|
||||
}
|
||||
}
|
||||
|
||||
void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
// This callback is done on all shards. Only do the work once.
|
||||
if (engine().cpu_id() != 0) {
|
||||
return;
|
||||
}
|
||||
auto& db = _ctxt._proxy.get_db().local();
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
auto schema = cf.schema();
|
||||
if (schema->cdc_options().enabled()) {
|
||||
populate_desc(_ctxt, *schema).get();
|
||||
}
|
||||
}
|
||||
|
||||
void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) override {
|
||||
on_create_column_family(ks_name, cf_name);
|
||||
}
|
||||
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
||||
|
||||
future<std::tuple<std::vector<mutation>, result_callback>> augment_mutation_call(
|
||||
lowres_clock::time_point timeout,
|
||||
std::vector<mutation>&& mutations
|
||||
@@ -264,16 +220,12 @@ sstring log_name(const sstring& table_name) {
|
||||
return table_name + cdc_log_suffix;
|
||||
}
|
||||
|
||||
sstring desc_name(const sstring& table_name) {
|
||||
static constexpr auto cdc_desc_suffix = "_scylla_cdc_desc";
|
||||
return table_name + cdc_desc_suffix;
|
||||
}
|
||||
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
|
||||
schema_builder b(s.ks_name(), log_name(s.cf_name()));
|
||||
b.set_default_time_to_live(gc_clock::duration{s.cdc_options().ttl()});
|
||||
b.set_comment(sprint("CDC log for %s.%s", s.ks_name(), s.cf_name()));
|
||||
b.with_column("stream_id", uuid_type, column_kind::partition_key);
|
||||
b.with_column("stream_id_1", long_type, column_kind::partition_key);
|
||||
b.with_column("stream_id_2", long_type, column_kind::partition_key);
|
||||
b.with_column("time", timeuuid_type, column_kind::clustering_key);
|
||||
b.with_column("batch_seq_no", int32_type, column_kind::clustering_key);
|
||||
b.with_column("operation", data_type_for<operation_native_type>());
|
||||
@@ -299,78 +251,6 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
return b.build();
|
||||
}
|
||||
|
||||
static schema_ptr create_stream_description_table_schema(const schema& s, std::optional<utils::UUID> uuid) {
|
||||
schema_builder b(s.ks_name(), desc_name(s.cf_name()));
|
||||
b.set_comment(sprint("CDC description for %s.%s", s.ks_name(), s.cf_name()));
|
||||
b.with_column("node_ip", inet_addr_type, column_kind::partition_key);
|
||||
b.with_column("shard_id", int32_type, column_kind::partition_key);
|
||||
b.with_column("created_at", timestamp_type, column_kind::clustering_key);
|
||||
b.with_column("stream_id", uuid_type);
|
||||
|
||||
if (uuid) {
|
||||
b.set_uuid(*uuid);
|
||||
}
|
||||
|
||||
return b.build();
|
||||
}
|
||||
|
||||
// This function assumes setup_stream_description_table was called on |s| before the call to this
|
||||
// function.
|
||||
static future<> populate_desc(db_context ctx, const schema& s) {
|
||||
auto& db = ctx._proxy.get_db().local();
|
||||
auto desc_schema =
|
||||
db.find_schema(s.ks_name(), desc_name(s.cf_name()));
|
||||
auto log_schema =
|
||||
db.find_schema(s.ks_name(), log_name(s.cf_name()));
|
||||
auto belongs_to = [&](const gms::inet_address& endpoint,
|
||||
const unsigned int shard_id,
|
||||
const int shard_count,
|
||||
const unsigned int ignore_msb_bits,
|
||||
const utils::UUID& stream_id) {
|
||||
const auto log_pk = partition_key::from_singular(*log_schema,
|
||||
data_value(stream_id));
|
||||
const auto token = ctx._partitioner.decorate_key(*log_schema, log_pk).token();
|
||||
if (ctx._token_metadata.get_endpoint(ctx._token_metadata.first_token(token)) != endpoint) {
|
||||
return false;
|
||||
}
|
||||
const auto owning_shard_id = dht::murmur3_partitioner(shard_count, ignore_msb_bits).shard_of(token);
|
||||
return owning_shard_id == shard_id;
|
||||
};
|
||||
|
||||
std::vector<mutation> mutations;
|
||||
const auto ts = api::new_timestamp();
|
||||
const auto ck = clustering_key::from_single_value(
|
||||
*desc_schema, timestamp_type->decompose(ts));
|
||||
auto cdef = desc_schema->get_column_definition(to_bytes("stream_id"));
|
||||
|
||||
for (const auto& dc : ctx._token_metadata.get_topology().get_datacenter_endpoints()) {
|
||||
for (const auto& endpoint : dc.second) {
|
||||
const auto decomposed_ip = inet_addr_type->decompose(endpoint.addr());
|
||||
const unsigned int shard_count = ctx._snitch->get_shard_count(endpoint);
|
||||
const unsigned int ignore_msb_bits = ctx._snitch->get_ignore_msb_bits(endpoint);
|
||||
for (unsigned int shard_id = 0; shard_id < shard_count; ++shard_id) {
|
||||
const auto pk = partition_key::from_exploded(
|
||||
*desc_schema, { decomposed_ip, int32_type->decompose(static_cast<int>(shard_id)) });
|
||||
mutations.emplace_back(desc_schema, pk);
|
||||
|
||||
auto stream_id = utils::make_random_uuid();
|
||||
while (!belongs_to(endpoint, shard_id, shard_count, ignore_msb_bits, stream_id)) {
|
||||
stream_id = utils::make_random_uuid();
|
||||
}
|
||||
auto value = atomic_cell::make_live(*uuid_type,
|
||||
ts,
|
||||
uuid_type->decompose(stream_id));
|
||||
mutations.back().set_cell(ck, *cdef, std::move(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx._proxy.mutate(std::move(mutations),
|
||||
db::consistency_level::QUORUM,
|
||||
db::no_timeout,
|
||||
nullptr,
|
||||
empty_service_permit());
|
||||
}
|
||||
|
||||
db_context::builder::builder(service::storage_proxy& proxy)
|
||||
: _proxy(proxy)
|
||||
{}
|
||||
@@ -395,38 +275,144 @@ db_context::builder& db_context::builder::with_partitioner(dht::i_partitioner& p
|
||||
return *this;
|
||||
}
|
||||
|
||||
db_context::builder& db_context::builder::with_cdc_metadata(cdc::metadata& cdc_metadata) {
|
||||
_cdc_metadata = cdc_metadata;
|
||||
return *this;
|
||||
}
|
||||
|
||||
db_context db_context::builder::build() {
|
||||
return db_context{
|
||||
_proxy,
|
||||
_migration_notifier ? _migration_notifier->get() : service::get_local_storage_service().get_migration_notifier(),
|
||||
_token_metadata ? _token_metadata->get() : service::get_local_storage_service().get_token_metadata(),
|
||||
_cdc_metadata ? _cdc_metadata->get() : service::get_local_storage_service().get_cdc_metadata(),
|
||||
_snitch ? _snitch->get() : locator::i_endpoint_snitch::get_local_snitch_ptr(),
|
||||
_partitioner ? _partitioner->get() : dht::global_partitioner()
|
||||
};
|
||||
}
|
||||
|
||||
/* Find some timestamp inside the given mutation.
|
||||
*
|
||||
* If this mutation was created using a single insert/update/delete statement, then it will have a single,
|
||||
* well-defined timestamp (even if this timestamp occurs multiple times, e.g. in a cell and row_marker).
|
||||
*
|
||||
* This function shouldn't be used for mutations that have multiple different timestamps: the function
|
||||
* would only find one of them. When dealing with such mutations, the caller should first split the mutation
|
||||
* into multiple ones, each with a single timestamp.
|
||||
*/
|
||||
// TODO: We need to
|
||||
// - in the code that calls `augument_mutation_call`, or inside `augument_mutation_call`,
|
||||
// split each mutation to a set of mutations, each with a single timestamp.
|
||||
// - optionally: here, throw error if multiple timestamps are encountered (may degrade performance).
|
||||
static api::timestamp_type find_timestamp(const schema& s, const mutation& m) {
|
||||
auto& p = m.partition();
|
||||
api::timestamp_type t = api::missing_timestamp;
|
||||
|
||||
t = p.partition_tombstone().timestamp;
|
||||
if (t != api::missing_timestamp) {
|
||||
return t;
|
||||
}
|
||||
|
||||
for (auto& rt: p.row_tombstones()) {
|
||||
t = rt.tomb.timestamp;
|
||||
if (t != api::missing_timestamp) {
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
auto walk_row = [&t, &s] (const row& r, column_kind ckind) {
|
||||
r.for_each_cell_until([&t, &s, ckind] (column_id id, const atomic_cell_or_collection& cell) {
|
||||
auto& cdef = s.column_at(ckind, id);
|
||||
|
||||
if (cdef.is_atomic()) {
|
||||
t = cell.as_atomic_cell(cdef).timestamp();
|
||||
if (t != api::missing_timestamp) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
return cell.as_collection_mutation().with_deserialized(*cdef.type,
|
||||
[&] (collection_mutation_view_description mview) {
|
||||
t = mview.tomb.timestamp;
|
||||
if (t != api::missing_timestamp) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
for (auto& kv : mview.cells) {
|
||||
t = kv.second.timestamp();
|
||||
if (t != api::missing_timestamp) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
}
|
||||
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
walk_row(p.static_row().get(), column_kind::static_column);
|
||||
if (t != api::missing_timestamp) {
|
||||
return t;
|
||||
}
|
||||
|
||||
for (const rows_entry& cr : p.clustered_rows()) {
|
||||
const deletable_row& r = cr.row();
|
||||
|
||||
t = r.deleted_at().regular().timestamp;
|
||||
if (t != api::missing_timestamp) {
|
||||
return t;
|
||||
}
|
||||
|
||||
t = r.deleted_at().shadowable().tomb().timestamp;
|
||||
if (t != api::missing_timestamp) {
|
||||
return t;
|
||||
}
|
||||
|
||||
t = r.created_at();
|
||||
if (t != api::missing_timestamp) {
|
||||
return t;
|
||||
}
|
||||
|
||||
walk_row(r.cells(), column_kind::regular_column);
|
||||
if (t != api::missing_timestamp) {
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
throw std::runtime_error("cdc: could not find timestamp of mutation");
|
||||
}
|
||||
|
||||
/* Given a timestamp, generates a timeuuid with the following properties:
|
||||
* 1. `t1` < `t2` implies timeuuid_type->less(timeuuid_type->decompose(generate_timeuuid(`t1`)),
|
||||
* timeuuid_type->decompose(generate_timeuuid(`t2`))),
|
||||
* 2. utils::UUID_gen::micros_timestamp(generate_timeuuid(`t`)) == `t`.
|
||||
*
|
||||
* If `t1` == `t2`, then generate_timeuuid(`t1`) != generate_timeuuid(`t2`),
|
||||
* with unspecified nondeterministic ordering.
|
||||
*/
|
||||
// external linkage for testing
|
||||
utils::UUID generate_timeuuid(api::timestamp_type t) {
|
||||
return utils::UUID_gen::get_random_time_UUID_from_micros(t);
|
||||
}
|
||||
|
||||
class transformer final {
|
||||
public:
|
||||
using streams_type = std::unordered_map<std::pair<net::inet_address, unsigned int>, utils::UUID>;
|
||||
private:
|
||||
db_context _ctx;
|
||||
schema_ptr _schema;
|
||||
schema_ptr _log_schema;
|
||||
utils::UUID _time;
|
||||
bytes _decomposed_time;
|
||||
::shared_ptr<const transformer::streams_type> _streams;
|
||||
const column_definition& _op_col;
|
||||
|
||||
clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) const {
|
||||
clustering_key set_pk_columns(const partition_key& pk, api::timestamp_type ts, bytes decomposed_tuuid, int batch_no, mutation& m) const {
|
||||
const auto log_ck = clustering_key::from_exploded(
|
||||
*m.schema(), { _decomposed_time, int32_type->decompose(batch_no) });
|
||||
*m.schema(), { decomposed_tuuid, int32_type->decompose(batch_no) });
|
||||
auto pk_value = pk.explode(*_schema);
|
||||
size_t pos = 0;
|
||||
for (const auto& column : _schema->partition_key_columns()) {
|
||||
assert (pos < pk_value.size());
|
||||
auto cdef = m.schema()->get_column_definition(to_bytes("_" + column.name()));
|
||||
auto value = atomic_cell::make_live(*column.type,
|
||||
_time.timestamp(),
|
||||
ts,
|
||||
bytes_view(pk_value[pos]));
|
||||
m.set_cell(log_ck, *cdef, std::move(value));
|
||||
++pos;
|
||||
@@ -434,44 +420,31 @@ private:
|
||||
return log_ck;
|
||||
}
|
||||
|
||||
void set_operation(const clustering_key& ck, operation op, mutation& m) const {
|
||||
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _time.timestamp(), _op_col.type->decompose(operation_native_type(op))));
|
||||
void set_operation(const clustering_key& ck, api::timestamp_type ts, operation op, mutation& m) const {
|
||||
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, ts, _op_col.type->decompose(operation_native_type(op))));
|
||||
}
|
||||
|
||||
partition_key stream_id(const net::inet_address& ip, unsigned int shard_id) const {
|
||||
auto it = _streams->find(std::make_pair(ip, shard_id));
|
||||
if (it == std::end(*_streams)) {
|
||||
throw std::runtime_error(format("No stream found for node {} and shard {}", ip, shard_id));
|
||||
}
|
||||
return partition_key::from_exploded(*_log_schema, { uuid_type->decompose(it->second) });
|
||||
}
|
||||
public:
|
||||
transformer(db_context ctx, schema_ptr s, ::shared_ptr<const transformer::streams_type> streams)
|
||||
transformer(db_context ctx, schema_ptr s)
|
||||
: _ctx(ctx)
|
||||
, _schema(std::move(s))
|
||||
, _log_schema(ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
|
||||
, _time(utils::UUID_gen::get_time_UUID())
|
||||
, _decomposed_time(timeuuid_type->decompose(_time))
|
||||
, _streams(std::move(streams))
|
||||
, _op_col(*_log_schema->get_column_definition(to_bytes("operation")))
|
||||
{}
|
||||
|
||||
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
|
||||
// more details like tombstones/ttl? Probably not but keep in mind.
|
||||
mutation transform(const mutation& m, const cql3::untyped_result_set* rs = nullptr) const {
|
||||
auto& t = m.token();
|
||||
auto&& ep = _ctx._token_metadata.get_endpoint(
|
||||
_ctx._token_metadata.first_token(t));
|
||||
if (!ep) {
|
||||
throw std::runtime_error(format("No owner found for key {}", m.decorated_key()));
|
||||
}
|
||||
auto shard_id = dht::murmur3_partitioner(_ctx._snitch->get_shard_count(*ep), _ctx._snitch->get_ignore_msb_bits(*ep)).shard_of(t);
|
||||
mutation res(_log_schema, stream_id(ep->addr(), shard_id));
|
||||
auto ts = find_timestamp(*_schema, m);
|
||||
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token(), _ctx._partitioner);
|
||||
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
|
||||
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
|
||||
|
||||
auto& p = m.partition();
|
||||
if (p.partition_tombstone()) {
|
||||
// Partition deletion
|
||||
auto log_ck = set_pk_columns(m.key(), 0, res);
|
||||
set_operation(log_ck, operation::partition_delete, res);
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, 0, res);
|
||||
set_operation(log_ck, ts, operation::partition_delete, res);
|
||||
} else if (!p.row_tombstones().empty()) {
|
||||
// range deletion
|
||||
int batch_no = 0;
|
||||
@@ -485,24 +458,24 @@ public:
|
||||
}
|
||||
auto cdef = _log_schema->get_column_definition(to_bytes("_" + column.name()));
|
||||
auto value = atomic_cell::make_live(*column.type,
|
||||
_time.timestamp(),
|
||||
ts,
|
||||
bytes_view(exploded[pos]));
|
||||
res.set_cell(log_ck, *cdef, std::move(value));
|
||||
++pos;
|
||||
}
|
||||
};
|
||||
{
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no, res);
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
|
||||
set_bound(log_ck, rt.start);
|
||||
// TODO: separate inclusive/exclusive range
|
||||
set_operation(log_ck, operation::range_delete_start, res);
|
||||
set_operation(log_ck, ts, operation::range_delete_start, res);
|
||||
++batch_no;
|
||||
}
|
||||
{
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no, res);
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
|
||||
set_bound(log_ck, rt.end);
|
||||
// TODO: separate inclusive/exclusive range
|
||||
set_operation(log_ck, operation::range_delete_end, res);
|
||||
set_operation(log_ck, ts, operation::range_delete_end, res);
|
||||
++batch_no;
|
||||
}
|
||||
}
|
||||
@@ -527,8 +500,8 @@ public:
|
||||
}
|
||||
}
|
||||
if (match) {
|
||||
pikey = set_pk_columns(m.key(), batch_no, res);
|
||||
set_operation(*pikey, operation::pre_image, res);
|
||||
pikey = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
|
||||
set_operation(*pikey, ts, operation::pre_image, res);
|
||||
pirow = &utr;
|
||||
++batch_no;
|
||||
break;
|
||||
@@ -536,17 +509,17 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
auto log_ck = set_pk_columns(m.key(), batch_no, res);
|
||||
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
|
||||
|
||||
size_t pos = 0;
|
||||
for (const auto& column : _schema->clustering_key_columns()) {
|
||||
assert (pos < ck_value.size());
|
||||
auto cdef = _log_schema->get_column_definition(to_bytes("_" + column.name()));
|
||||
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos])));
|
||||
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, ts, bytes_view(ck_value[pos])));
|
||||
|
||||
if (pirow) {
|
||||
assert(pirow->has(column.name_as_text()));
|
||||
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos])));
|
||||
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, ts, bytes_view(ck_value[pos])));
|
||||
}
|
||||
|
||||
++pos;
|
||||
@@ -558,7 +531,7 @@ public:
|
||||
r.for_each_cell([&](column_id id, const atomic_cell_or_collection& cell) {
|
||||
auto& cdef = _schema->column_at(ckind, id);
|
||||
auto* dst = _log_schema->get_column_definition(to_bytes("_" + cdef.name()));
|
||||
// todo: collections.
|
||||
// TODO: collections.
|
||||
if (cdef.is_atomic()) {
|
||||
column_op op;
|
||||
|
||||
@@ -575,7 +548,7 @@ public:
|
||||
}
|
||||
|
||||
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(op)));
|
||||
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values)));
|
||||
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values)));
|
||||
|
||||
if (pirow && pirow->has(cdef.name_as_text())) {
|
||||
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(column_op::set)));
|
||||
@@ -584,7 +557,7 @@ public:
|
||||
|
||||
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
|
||||
assert(pikey->explode() != log_ck.explode());
|
||||
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values)));
|
||||
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, tuple_type_impl::build_value(values)));
|
||||
}
|
||||
} else {
|
||||
cdc_log.warn("Non-atomic cell ignored {}.{}:{}", _schema->ks_name(), _schema->cf_name(), cdef.name_as_text());
|
||||
@@ -595,7 +568,7 @@ public:
|
||||
process_cells(r.row().cells(), column_kind::regular_column);
|
||||
process_cells(p.static_row().get(), column_kind::static_column);
|
||||
|
||||
set_operation(log_ck, operation::update, res);
|
||||
set_operation(log_ck, ts, operation::update, res);
|
||||
++batch_no;
|
||||
}
|
||||
}
|
||||
@@ -668,103 +641,12 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// This class is used to build a mapping from <node ip, shard id> to stream_id
|
||||
// It is used as a consumer for rows returned by the query to CDC Description Table
|
||||
class streams_builder {
|
||||
const schema& _schema;
|
||||
transformer::streams_type _streams;
|
||||
net::inet_address _node_ip = net::inet_address();
|
||||
unsigned int _shard_id = 0;
|
||||
api::timestamp_type _latest_row_timestamp = api::min_timestamp;
|
||||
utils::UUID _latest_row_stream_id = utils::UUID();
|
||||
public:
|
||||
streams_builder(const schema& s) : _schema(s) {}
|
||||
|
||||
void accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
auto exploded = key.explode(_schema);
|
||||
_node_ip = value_cast<net::inet_address>(inet_addr_type->deserialize(exploded[0]));
|
||||
_shard_id = static_cast<unsigned int>(value_cast<int>(int32_type->deserialize(exploded[1])));
|
||||
_latest_row_timestamp = api::min_timestamp;
|
||||
_latest_row_stream_id = utils::UUID();
|
||||
}
|
||||
|
||||
void accept_new_partition(uint32_t row_count) {
|
||||
assert(false);
|
||||
}
|
||||
|
||||
void accept_new_row(
|
||||
const clustering_key& key,
|
||||
const query::result_row_view& static_row,
|
||||
const query::result_row_view& row) {
|
||||
auto row_iterator = row.iterator();
|
||||
api::timestamp_type timestamp = value_cast<db_clock::time_point>(
|
||||
timestamp_type->deserialize(key.explode(_schema)[0])).time_since_epoch().count();
|
||||
if (timestamp <= _latest_row_timestamp) {
|
||||
return;
|
||||
}
|
||||
_latest_row_timestamp = timestamp;
|
||||
for (auto&& cdef : _schema.regular_columns()) {
|
||||
if (cdef.name_as_text() != "stream_id") {
|
||||
row_iterator.skip(cdef);
|
||||
continue;
|
||||
}
|
||||
auto val_opt = row_iterator.next_atomic_cell();
|
||||
assert(val_opt);
|
||||
val_opt->value().with_linearized([&] (bytes_view bv) {
|
||||
_latest_row_stream_id = value_cast<utils::UUID>(uuid_type->deserialize(bv));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
assert(false);
|
||||
}
|
||||
|
||||
void accept_partition_end(const query::result_row_view& static_row) {
|
||||
_streams.emplace(std::make_pair(_node_ip, _shard_id), _latest_row_stream_id);
|
||||
}
|
||||
|
||||
transformer::streams_type build() {
|
||||
return std::move(_streams);
|
||||
}
|
||||
};
|
||||
|
||||
static future<::shared_ptr<transformer::streams_type>> get_streams(
|
||||
db_context ctx,
|
||||
const sstring& ks_name,
|
||||
const sstring& cf_name,
|
||||
lowres_clock::time_point timeout,
|
||||
service::query_state& qs) {
|
||||
auto s =
|
||||
ctx._proxy.get_db().local().find_schema(ks_name, desc_name(cf_name));
|
||||
query::read_command cmd(
|
||||
s->id(),
|
||||
s->version(),
|
||||
partition_slice_builder(*s).with_no_static_columns().build());
|
||||
return ctx._proxy.query(
|
||||
s,
|
||||
make_lw_shared(std::move(cmd)),
|
||||
{dht::partition_range::make_open_ended_both_sides()},
|
||||
db::consistency_level::QUORUM,
|
||||
{timeout, qs.get_permit(), qs.get_client_state()}).then([s = std::move(s)] (auto qr) mutable {
|
||||
return query::result_view::do_with(*qr.query_result,
|
||||
[s = std::move(s)] (query::result_view v) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_no_static_columns()
|
||||
.build();
|
||||
streams_builder builder{ *s };
|
||||
v.consume(slice, builder);
|
||||
return ::make_shared<transformer::streams_type>(builder.build());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
future<std::vector<mutation>>
|
||||
transform_mutations(std::vector<mutation>& muts, decltype(muts.size()) batch_size, Func&& f) {
|
||||
return parallel_for_each(
|
||||
boost::irange(static_cast<decltype(muts.size())>(0), muts.size(), batch_size),
|
||||
std::move(f))
|
||||
std::forward<Func>(f))
|
||||
.then([&muts] () mutable { return std::move(muts); });
|
||||
}
|
||||
|
||||
@@ -784,7 +666,8 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
|
||||
mutations.reserve(2 * mutations.size());
|
||||
|
||||
return do_with(std::move(mutations), service::query_state(service::client_state::for_internal_calls(), empty_service_permit()), [this, timeout, i](std::vector<mutation>& mutations, service::query_state& qs) {
|
||||
return do_with(std::move(mutations), service::query_state(service::client_state::for_internal_calls(), empty_service_permit()),
|
||||
[this, timeout, i] (std::vector<mutation>& mutations, service::query_state& qs) {
|
||||
return transform_mutations(mutations, 1, [this, &mutations, timeout, &qs] (int idx) {
|
||||
auto& m = mutations[idx];
|
||||
auto s = m.schema();
|
||||
@@ -792,24 +675,20 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
|
||||
if (!s->cdc_options().enabled()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// for batches/multiple mutations this is super inefficient. either partition the mutation set by schema
|
||||
// and re-use streams, or probably better: add a cache so this lookup is a noop on second mutation
|
||||
return get_streams(_ctxt, s->ks_name(), s->cf_name(), timeout, qs).then([this, s = std::move(s), &qs, &mutations, idx](::shared_ptr<transformer::streams_type> streams) mutable {
|
||||
auto& m = mutations[idx]; // should not really need because of reserve, but lets be conservative
|
||||
transformer trans(_ctxt, s, streams);
|
||||
|
||||
if (!s->cdc_options().preimage()) {
|
||||
mutations.emplace_back(trans.transform(m));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
transformer trans(_ctxt, s);
|
||||
|
||||
// Note: further improvement here would be to coalesce the pre-image selects into one
|
||||
// iff a batch contains several modifications to the same table. Otoh, batch is rare(?)
|
||||
// so this is premature.
|
||||
auto f = trans.pre_image_select(qs.get_client_state(), db::consistency_level::LOCAL_QUORUM, m);
|
||||
return f.then([trans = std::move(trans), &mutations, idx] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
|
||||
mutations.push_back(trans.transform(mutations[idx], rs.get()));
|
||||
});
|
||||
if (!s->cdc_options().preimage()) {
|
||||
mutations.emplace_back(trans.transform(m));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Note: further improvement here would be to coalesce the pre-image selects into one
|
||||
// iff a batch contains several modifications to the same table. Otoh, batch is rare(?)
|
||||
// so this is premature.
|
||||
auto f = trans.pre_image_select(qs.get_client_state(), db::consistency_level::LOCAL_QUORUM, m);
|
||||
return f.then([trans = std::move(trans), &mutations, idx] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
|
||||
mutations.push_back(trans.transform(mutations[idx], rs.get()));
|
||||
});
|
||||
}).then([](std::vector<mutation> mutations) {
|
||||
return make_ready_future<std::tuple<std::vector<mutation>, cdc::result_callback>>(std::make_tuple(std::move(mutations), result_callback{}));
|
||||
|
||||
@@ -66,6 +66,7 @@ class partition_key;
|
||||
namespace cdc {
|
||||
|
||||
class db_context;
|
||||
class metadata;
|
||||
|
||||
// Callback to be invoked on mutation finish to fix
|
||||
// the whole bit about post-image.
|
||||
@@ -101,6 +102,7 @@ struct db_context final {
|
||||
service::storage_proxy& _proxy;
|
||||
service::migration_notifier& _migration_notifier;
|
||||
locator::token_metadata& _token_metadata;
|
||||
cdc::metadata& _cdc_metadata;
|
||||
locator::snitch_ptr& _snitch;
|
||||
dht::i_partitioner& _partitioner;
|
||||
|
||||
@@ -108,6 +110,7 @@ struct db_context final {
|
||||
service::storage_proxy& _proxy;
|
||||
std::optional<std::reference_wrapper<service::migration_notifier>> _migration_notifier;
|
||||
std::optional<std::reference_wrapper<locator::token_metadata>> _token_metadata;
|
||||
std::optional<std::reference_wrapper<cdc::metadata>> _cdc_metadata;
|
||||
std::optional<std::reference_wrapper<locator::snitch_ptr>> _snitch;
|
||||
std::optional<std::reference_wrapper<dht::i_partitioner>> _partitioner;
|
||||
public:
|
||||
@@ -117,6 +120,7 @@ struct db_context final {
|
||||
builder& with_token_metadata(locator::token_metadata& token_metadata);
|
||||
builder& with_snitch(locator::snitch_ptr& snitch);
|
||||
builder& with_partitioner(dht::i_partitioner& partitioner);
|
||||
builder& with_cdc_metadata(cdc::metadata&);
|
||||
|
||||
db_context build();
|
||||
};
|
||||
@@ -137,6 +141,4 @@ enum class column_op : int8_t {
|
||||
|
||||
seastar::sstring log_name(const seastar::sstring& table_name);
|
||||
|
||||
seastar::sstring desc_name(const seastar::sstring& table_name);
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -209,6 +209,10 @@ public:
|
||||
return _token_metadata;
|
||||
}
|
||||
|
||||
cdc::metadata& get_cdc_metadata() {
|
||||
return _cdc_metadata;
|
||||
}
|
||||
|
||||
const service::migration_notifier& get_migration_notifier() const {
|
||||
return _mnotifier.local();
|
||||
}
|
||||
|
||||
@@ -45,19 +45,8 @@ SEASTAR_THREAD_TEST_CASE(test_with_cdc_parameter) {
|
||||
e.local_db().find_schema("ks", "tbl")->cdc_options().enabled());
|
||||
if (exp.enabled) {
|
||||
e.require_table_exists("ks", cdc::log_name("tbl")).get();
|
||||
e.require_table_exists("ks", cdc::desc_name("tbl")).get();
|
||||
auto msg = e.execute_cql(format("select node_ip, shard_id from ks.{};", cdc::desc_name("tbl"))).get0();
|
||||
std::vector<std::vector<bytes_opt>> expected_rows;
|
||||
expected_rows.reserve(smp::count);
|
||||
auto ip = inet_addr_type->decompose(
|
||||
utils::fb_utilities::get_broadcast_address().addr());
|
||||
for (int i = 0; i < static_cast<int>(smp::count); ++i) {
|
||||
expected_rows.push_back({ip, int32_type->decompose(i)});
|
||||
}
|
||||
assert_that(msg).is_rows().with_rows_ignore_order(std::move(expected_rows));
|
||||
} else {
|
||||
e.require_table_does_not_exist("ks", cdc::log_name("tbl")).get();
|
||||
e.require_table_does_not_exist("ks", cdc::desc_name("tbl")).get();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(exp.preimage,
|
||||
e.local_db().find_schema("ks", "tbl")->cdc_options().preimage());
|
||||
@@ -81,7 +70,6 @@ SEASTAR_THREAD_TEST_CASE(test_with_cdc_parameter) {
|
||||
assert_cdc(alter2_expected);
|
||||
e.execute_cql("DROP TABLE ks.tbl").get();
|
||||
e.require_table_does_not_exist("ks", cdc::log_name("tbl")).get();
|
||||
e.require_table_does_not_exist("ks", cdc::desc_name("tbl")).get();
|
||||
};
|
||||
|
||||
test("", "{'enabled':'true'}", "{'enabled':'false'}", {false}, {true}, {false});
|
||||
|
||||
Reference in New Issue
Block a user