454 lines
18 KiB
C++
454 lines
18 KiB
C++
/*
|
|
* Copyright (C) 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "mutation_writer/timestamp_based_splitting_writer.hh"
|
|
|
|
#include <optional>
|
|
#include <seastar/core/shared_mutex.hh>
|
|
#include <seastar/core/when_all.hh>
|
|
|
|
#include "mutation_writer/feed_writers.hh"
|
|
|
|
namespace mutation_writer {
|
|
|
|
namespace {
|
|
|
|
// TODO: should probably move to utils and make it a proper container.
|
|
template <typename Key, typename Value, size_t Size>
|
|
class small_flat_map {
|
|
public:
|
|
// The underlying flat container requires value_type to be move
|
|
// assignable, so we can't have a const key.
|
|
using key_type = Key;
|
|
using mapped_type = Value;
|
|
using value_type = std::pair<Key, Value>;
|
|
using container = utils::small_vector<value_type, Size>;
|
|
using iterator = typename container::iterator;
|
|
private:
|
|
container _values;
|
|
|
|
public:
|
|
// Element access
|
|
mapped_type& at(const key_type& k);
|
|
mapped_type& operator[](const key_type& k);
|
|
|
|
// Iterators
|
|
iterator begin() { return _values.begin(); }
|
|
iterator end() { return _values.end(); }
|
|
|
|
// Modifiers
|
|
template <typename... Args>
|
|
std::pair<iterator, bool> emplace(Args&&... args);
|
|
|
|
// Lookup
|
|
iterator find(const key_type& k);
|
|
};
|
|
|
|
template <typename Key, typename Value, size_t Size>
|
|
typename small_flat_map<Key, Value, Size>::mapped_type&
|
|
small_flat_map<Key, Value, Size>::at(const key_type& k) {
|
|
if (auto it = find(k); it != end()) {
|
|
return it->second;
|
|
}
|
|
throw std::out_of_range("small_flat_map: did not find key");
|
|
}
|
|
|
|
template <typename Key, typename Value, size_t Size>
|
|
typename small_flat_map<Key, Value, Size>::mapped_type&
|
|
small_flat_map<Key, Value, Size>::operator[](const key_type& k) {
|
|
if (auto it = find(k); it != end()) {
|
|
return it->second;
|
|
}
|
|
_values.emplace_back(k, mapped_type{});
|
|
return _values.back().second;
|
|
}
|
|
|
|
template <typename Key, typename Value, size_t Size>
|
|
template <typename... Args>
|
|
std::pair<typename small_flat_map<Key, Value, Size>::iterator, bool>
|
|
small_flat_map<Key, Value, Size>::emplace(Args&&... args) {
|
|
value_type elem(std::forward<Args>(args)...);
|
|
if (auto it = find(elem.first); it != end()) {
|
|
return std::pair(it, false);
|
|
}
|
|
_values.emplace_back(std::move(elem));
|
|
return std::pair(end() - 1, true);
|
|
}
|
|
|
|
template <typename Key, typename Value, size_t Size>
|
|
typename small_flat_map<Key, Value, Size>::iterator
|
|
small_flat_map<Key, Value, Size>::find(const key_type& k) {
|
|
auto key_matches = [&k] (const value_type& v) { return v.first == k; };
|
|
if (auto it = std::find_if(begin(), end(), key_matches); it != _values.end()) {
|
|
return it;
|
|
}
|
|
return end();
|
|
}
|
|
|
|
} // anonymous namespace
|
|
|
|
class timestamp_based_splitting_mutation_writer {
|
|
using bucket_id = int64_t;
|
|
|
|
class timestamp_bucket_writer : public bucket_writer {
|
|
bool _has_current_partition = false;
|
|
|
|
public:
|
|
timestamp_bucket_writer(schema_ptr schema, reader_permit permit, mutation_reader_consumer& consumer)
|
|
: bucket_writer(schema, std::move(permit), consumer) {
|
|
}
|
|
void set_has_current_partition() {
|
|
_has_current_partition = true;
|
|
}
|
|
void clear_has_current_partition() {
|
|
_has_current_partition = false;
|
|
}
|
|
bool has_current_partition() const {
|
|
return _has_current_partition;
|
|
}
|
|
};
|
|
|
|
private:
|
|
schema_ptr _schema;
|
|
reader_permit _permit;
|
|
classify_by_timestamp _classifier;
|
|
mutation_reader_consumer _consumer;
|
|
partition_start _current_partition_start;
|
|
std::unordered_map<bucket_id, timestamp_bucket_writer> _buckets;
|
|
std::vector<bucket_id> _buckets_used_for_current_partition;
|
|
std::optional<bucket_id> _last_rtc_bucket;
|
|
|
|
private:
|
|
future<> write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf);
|
|
|
|
std::optional<bucket_id> examine_column(const atomic_cell_or_collection& c, const column_definition& cdef);
|
|
std::optional<bucket_id> examine_row(const row& r, column_kind kind);
|
|
std::optional<bucket_id> examine_static_row(const static_row& sr);
|
|
std::optional<bucket_id> examine_clustering_row(const clustering_row& cr);
|
|
small_flat_map<bucket_id, atomic_cell_or_collection, 4> split_collection(atomic_cell_or_collection&& collection, const column_definition& cdef);
|
|
small_flat_map<bucket_id, row, 4> split_row(column_kind kind, row&& r);
|
|
small_flat_map<bucket_id, static_row, 4> split_static_row(static_row&& sr);
|
|
small_flat_map<bucket_id, clustering_row, 4> split_clustering_row(clustering_row&& cr);
|
|
future<> write_marker_and_tombstone(const clustering_row& cr);
|
|
|
|
public:
|
|
timestamp_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_timestamp classifier, mutation_reader_consumer consumer)
|
|
: _schema(std::move(schema))
|
|
, _permit(std::move(permit))
|
|
, _classifier(std::move(classifier))
|
|
, _consumer(std::move(consumer))
|
|
, _current_partition_start(dht::decorated_key(dht::token{}, partition_key::make_empty()), tombstone{}) {
|
|
}
|
|
|
|
future<> consume(partition_start&& ps);
|
|
future<> consume(static_row&& sr);
|
|
future<> consume(clustering_row&& cr);
|
|
future<> consume(range_tombstone_change&& rt);
|
|
future<> consume(partition_end&& pe);
|
|
|
|
void consume_end_of_stream() {
|
|
for (auto& b : _buckets) {
|
|
b.second.consume_end_of_stream();
|
|
}
|
|
}
|
|
void abort(std::exception_ptr ep) {
|
|
for (auto&& b : _buckets) {
|
|
b.second.abort(ep);
|
|
}
|
|
}
|
|
future<> close() noexcept {
|
|
return parallel_for_each(_buckets, [] (std::pair<const bucket_id, timestamp_bucket_writer>& b) {
|
|
return b.second.close();
|
|
});
|
|
}
|
|
};
|
|
|
|
future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf) {
|
|
auto it = _buckets.try_emplace(bucket, _schema, _permit, _consumer).first;
|
|
|
|
auto& writer = it->second;
|
|
|
|
if (writer.has_current_partition()) {
|
|
return writer.consume(std::move(mf));
|
|
}
|
|
|
|
// We can explicitly write a partition-start fragment when the partition has
|
|
// a partition tombstone.
|
|
if (mf.is_partition_start()) {
|
|
return writer.consume(std::move(mf)).then([this, bucket, &writer] {
|
|
writer.set_has_current_partition();
|
|
_buckets_used_for_current_partition.push_back(bucket);
|
|
});
|
|
}
|
|
|
|
return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_start(_current_partition_start))).then([this, bucket, &writer, mf = std::move(mf)] () mutable {
|
|
writer.set_has_current_partition();
|
|
_buckets_used_for_current_partition.push_back(bucket);
|
|
return writer.consume(std::move(mf));
|
|
});
|
|
}
|
|
|
|
std::optional<timestamp_based_splitting_mutation_writer::bucket_id> timestamp_based_splitting_mutation_writer::examine_column(
|
|
const atomic_cell_or_collection& cell, const column_definition& cdef) {
|
|
if (cdef.is_atomic()) {
|
|
return _classifier(cell.as_atomic_cell(cdef).timestamp());
|
|
}
|
|
if (cdef.type->is_collection() || cdef.type->is_user_type()) {
|
|
return cell.as_collection_mutation().with_deserialized(*cdef.type, [this] (collection_mutation_view_description mv) {
|
|
std::optional<bucket_id> bucket;
|
|
if (mv.tomb) {
|
|
bucket = _classifier(mv.tomb.timestamp);
|
|
}
|
|
for (auto&& c : mv.cells) {
|
|
const auto this_bucket = _classifier(c.second.timestamp());
|
|
if (bucket && *bucket != this_bucket) {
|
|
return std::optional<bucket_id>{};
|
|
}
|
|
bucket = this_bucket;
|
|
}
|
|
return bucket;
|
|
});
|
|
}
|
|
throw std::runtime_error(fmt::format("Cannot classify timestamp of cell (column {} of unknown type {})", cdef.name_as_text(), cdef.type->name()));
|
|
}
|
|
|
|
std::optional<timestamp_based_splitting_mutation_writer::bucket_id> timestamp_based_splitting_mutation_writer::examine_row(const row& r,
|
|
column_kind kind) {
|
|
std::optional<bucket_id> bucket;
|
|
r.for_each_cell_until([this, &bucket, kind] (column_id id, const atomic_cell_or_collection& cell) {
|
|
const auto this_bucket = examine_column(cell, _schema->column_at(kind, id));
|
|
if (!this_bucket || (bucket && *bucket != *this_bucket)) {
|
|
bucket.reset();
|
|
return stop_iteration::yes;
|
|
}
|
|
bucket = this_bucket;
|
|
return stop_iteration::no;
|
|
});
|
|
return bucket;
|
|
}
|
|
|
|
std::optional<timestamp_based_splitting_mutation_writer::bucket_id> timestamp_based_splitting_mutation_writer::examine_static_row(
|
|
const static_row& sr) {
|
|
return examine_row(sr.cells(), column_kind::static_column);
|
|
}
|
|
|
|
std::optional<timestamp_based_splitting_mutation_writer::bucket_id> timestamp_based_splitting_mutation_writer::examine_clustering_row(
|
|
const clustering_row& cr) {
|
|
std::optional<bucket_id> bucket_id;
|
|
|
|
if (!cr.marker().is_missing()) {
|
|
auto marker_bucket_id = _classifier(cr.marker().timestamp());
|
|
if (bucket_id) {
|
|
if (*bucket_id != marker_bucket_id) {
|
|
return {};
|
|
}
|
|
} else {
|
|
bucket_id = marker_bucket_id;
|
|
}
|
|
}
|
|
if (cr.tomb() != row_tombstone{}) {
|
|
auto tomb_bucket_id = _classifier(cr.tomb().tomb().timestamp);
|
|
if (bucket_id) {
|
|
if (*bucket_id != tomb_bucket_id) {
|
|
return {};
|
|
}
|
|
} else {
|
|
bucket_id = tomb_bucket_id;
|
|
}
|
|
}
|
|
|
|
const auto cells_bucket_id = examine_row(cr.cells(), column_kind::regular_column);
|
|
if (!cells_bucket_id) {
|
|
return {};
|
|
}
|
|
if (bucket_id) {
|
|
if (*bucket_id != *cells_bucket_id) {
|
|
return {};
|
|
}
|
|
} else {
|
|
bucket_id = cells_bucket_id;
|
|
}
|
|
return bucket_id;
|
|
}
|
|
|
|
small_flat_map<timestamp_based_splitting_mutation_writer::bucket_id, atomic_cell_or_collection, 4>
|
|
timestamp_based_splitting_mutation_writer::split_collection(atomic_cell_or_collection&& collection, const column_definition& cdef) {
|
|
small_flat_map<bucket_id, atomic_cell_or_collection, 4> pieces_by_bucket;
|
|
|
|
collection.as_collection_mutation().with_deserialized(*cdef.type, [&, this] (collection_mutation_view_description original_mv) {
|
|
small_flat_map<bucket_id, collection_mutation_view_description, 4> mutations_by_bucket;
|
|
for (auto&& c : original_mv.cells) {
|
|
mutations_by_bucket[_classifier(c.second.timestamp())].cells.push_back(c);
|
|
}
|
|
if (original_mv.tomb) {
|
|
mutations_by_bucket[_classifier(original_mv.tomb.timestamp)].tomb = original_mv.tomb;
|
|
}
|
|
|
|
for (auto&& [bucket, bucket_mv] : mutations_by_bucket) {
|
|
pieces_by_bucket.emplace(bucket, bucket_mv.serialize(*cdef.type));
|
|
}
|
|
});
|
|
|
|
return pieces_by_bucket;
|
|
}
|
|
|
|
small_flat_map<timestamp_based_splitting_mutation_writer::bucket_id, row, 4>
|
|
timestamp_based_splitting_mutation_writer::split_row(column_kind kind, row&& r) {
|
|
small_flat_map<bucket_id, row, 4> rows_by_bucket;
|
|
|
|
r.for_each_cell([&, this, kind] (column_id id, atomic_cell_or_collection& cell) {
|
|
const auto& cdef = _schema->column_at(kind, id);
|
|
if (cdef.type->is_atomic()) {
|
|
rows_by_bucket[_classifier(cell.as_atomic_cell(cdef).timestamp())].append_cell(id, std::move(cell));
|
|
} else if (cdef.type->is_collection() || cdef.type->is_user_type()) {
|
|
for (auto&& [bucket, cell_piece] : split_collection(std::move(cell), cdef)) {
|
|
rows_by_bucket[bucket].append_cell(id, std::move(cell_piece));
|
|
}
|
|
} else {
|
|
throw std::runtime_error(fmt::format("Cannot classify cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name()));
|
|
}
|
|
});
|
|
|
|
return rows_by_bucket;
|
|
}
|
|
|
|
small_flat_map<timestamp_based_splitting_mutation_writer::bucket_id, static_row, 4>
|
|
timestamp_based_splitting_mutation_writer::split_static_row(static_row&& sr) {
|
|
small_flat_map<bucket_id, static_row, 4> static_rows_by_bucket;
|
|
|
|
for (auto&& [bucket_id, row] : split_row(column_kind::static_column, std::move(sr.cells()))) {
|
|
static_rows_by_bucket.emplace(bucket_id, static_row(std::move(row)));
|
|
}
|
|
|
|
return static_rows_by_bucket;
|
|
}
|
|
|
|
small_flat_map<timestamp_based_splitting_mutation_writer::bucket_id, clustering_row, 4>
|
|
timestamp_based_splitting_mutation_writer::split_clustering_row(clustering_row&& cr) {
|
|
small_flat_map<bucket_id, clustering_row, 4> clustering_rows_by_bucket;
|
|
|
|
for (auto&& [bucket_id, row] : split_row(column_kind::regular_column, std::move(cr.cells()))) {
|
|
clustering_rows_by_bucket.emplace(bucket_id, clustering_row(cr.key(), {}, {}, std::move(row)));
|
|
}
|
|
|
|
if (!cr.marker().is_missing()) {
|
|
const auto marker_bucket_id = _classifier(cr.marker().timestamp());
|
|
if (auto it = clustering_rows_by_bucket.find(marker_bucket_id); it != clustering_rows_by_bucket.end()) {
|
|
it->second.apply(cr.marker());
|
|
} else {
|
|
clustering_rows_by_bucket.emplace(marker_bucket_id, clustering_row(cr.key(), {}, cr.marker(), {}));
|
|
}
|
|
}
|
|
|
|
if (cr.tomb() != row_tombstone{}) {
|
|
const auto tomb_bucket_id = _classifier(cr.tomb().tomb().timestamp);
|
|
if (auto it = clustering_rows_by_bucket.find(tomb_bucket_id); it != clustering_rows_by_bucket.end()) {
|
|
it->second.apply(cr.tomb().regular());
|
|
it->second.apply(cr.tomb().shadowable());
|
|
} else {
|
|
clustering_rows_by_bucket.emplace(tomb_bucket_id, clustering_row(cr.key(), cr.tomb(), {}, {}));
|
|
}
|
|
}
|
|
|
|
return clustering_rows_by_bucket;
|
|
}
|
|
|
|
future<> timestamp_based_splitting_mutation_writer::write_marker_and_tombstone(const clustering_row& cr) {
|
|
auto marker_bucket_id = cr.marker().is_missing() ? std::optional<int64_t>{} : std::optional<int64_t>{_classifier(cr.marker().timestamp())};
|
|
auto tomb_bucket_id = cr.tomb() == row_tombstone{} ? std::optional<int64_t>{} : std::optional<int64_t>{_classifier(cr.tomb().tomb().timestamp)};
|
|
if (!marker_bucket_id && !tomb_bucket_id) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
if (marker_bucket_id == tomb_bucket_id) {
|
|
return write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), cr.marker(), {})));
|
|
}
|
|
|
|
auto write_marker_fut = make_ready_future<>();
|
|
if (marker_bucket_id) {
|
|
write_marker_fut = write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), {}, cr.marker(), {})));
|
|
}
|
|
|
|
auto write_tomb_fut = make_ready_future<>();
|
|
if (tomb_bucket_id) {
|
|
write_tomb_fut = write_to_bucket(*tomb_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), {}, {})));
|
|
}
|
|
return when_all_succeed(std::move(write_marker_fut), std::move(write_tomb_fut)).discard_result();
|
|
}
|
|
|
|
future<> timestamp_based_splitting_mutation_writer::consume(partition_start&& ps) {
|
|
_current_partition_start = std::move(ps);
|
|
_last_rtc_bucket.reset();
|
|
if (auto& tomb = _current_partition_start.partition_tombstone()) {
|
|
auto bucket = _classifier(tomb.timestamp);
|
|
auto ps = partition_start(_current_partition_start);
|
|
tomb = {};
|
|
return write_to_bucket(bucket, mutation_fragment_v2(mutation_fragment_v2(*_schema, _permit, std::move(ps))));
|
|
}
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<> timestamp_based_splitting_mutation_writer::consume(static_row&& sr) {
|
|
if (sr.cells().empty()) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
if (const auto bucket = examine_static_row(sr)) {
|
|
return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(sr)));
|
|
}
|
|
|
|
return parallel_for_each(split_static_row(std::move(sr)), [this] (std::pair<bucket_id, static_row>& sr_piece) {
|
|
return write_to_bucket(sr_piece.first, mutation_fragment_v2(*_schema, _permit, static_row(std::move(sr_piece.second))));
|
|
});
|
|
}
|
|
|
|
future<> timestamp_based_splitting_mutation_writer::consume(clustering_row&& cr) {
|
|
if (cr.cells().empty()) {
|
|
return write_marker_and_tombstone(cr);
|
|
}
|
|
|
|
if (const auto bucket = examine_clustering_row(cr)) {
|
|
return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(cr)));
|
|
}
|
|
|
|
return parallel_for_each(split_clustering_row(std::move(cr)), [this] (std::pair<bucket_id, clustering_row>& cr_piece) {
|
|
return write_to_bucket(cr_piece.first, mutation_fragment_v2(*_schema, _permit, std::move(cr_piece.second)));
|
|
});
|
|
}
|
|
|
|
future<> timestamp_based_splitting_mutation_writer::consume(range_tombstone_change&& rtc) {
|
|
auto id = _classifier(rtc.tombstone().timestamp);
|
|
if (_last_rtc_bucket && *_last_rtc_bucket != id) {
|
|
co_await write_to_bucket(*_last_rtc_bucket, mutation_fragment_v2(*_schema, _permit, range_tombstone_change(rtc.position(), {})));
|
|
}
|
|
_last_rtc_bucket = id;
|
|
co_await write_to_bucket(id, mutation_fragment_v2(*_schema, _permit, std::move(rtc)));
|
|
}
|
|
|
|
future<> timestamp_based_splitting_mutation_writer::consume(partition_end&& pe) {
|
|
return parallel_for_each(_buckets_used_for_current_partition, [this, pe = std::move(pe)] (bucket_id bucket) {
|
|
auto& writer = _buckets.at(bucket);
|
|
return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_end(pe))).then([&writer] {
|
|
writer.clear_has_current_partition();
|
|
});
|
|
}).then([this] {
|
|
_buckets_used_for_current_partition.clear();
|
|
});
|
|
}
|
|
|
|
future<> segregate_by_timestamp(mutation_reader producer, classify_by_timestamp classifier, mutation_reader_consumer consumer) {
|
|
//FIXME: make this into a consume() variant?
|
|
auto schema = producer.schema();
|
|
auto permit = producer.permit();
|
|
return feed_writer(
|
|
std::move(producer),
|
|
timestamp_based_splitting_mutation_writer(std::move(schema), std::move(permit), std::move(classifier), std::move(consumer)));
|
|
}
|
|
|
|
} // namespace mutation_writer
|