replica/tablets: add get_tablet_metadata_change_hint() and update_tablet_metadata_change_hint()
Extract a hint of what a tablet mutation changed. The hint can be later used to selectively reload only the changed parts from disk. Two variants are added: * get_tablet_metadata_change_hint() - extracts a hint from a list of tablet mutations * update_tablet_metadata_change_hint() - updates an existing hint based on a single mutation, allowing for incremental hint extraction
This commit is contained in:
@@ -954,3 +954,18 @@ auto fmt::formatter<locator::tablet_metadata>::format(const locator::tablet_meta
|
||||
}
|
||||
return fmt::format_to(out, "\n}}");
|
||||
}
|
||||
|
||||
auto fmt::formatter<locator::tablet_metadata_change_hint>::format(const locator::tablet_metadata_change_hint& hint, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
auto out = ctx.out();
|
||||
out = fmt::format_to(out, "{{");
|
||||
bool first = true;
|
||||
for (auto&& [table_id, table_hint] : hint.tables) {
|
||||
if (!first) {
|
||||
out = fmt::format_to(out, ",");
|
||||
}
|
||||
out = fmt::format_to(out, "\n [{}]: {}", table_id, table_hint.tokens);
|
||||
first = false;
|
||||
}
|
||||
return fmt::format_to(out, "\n}}");
|
||||
}
|
||||
|
||||
@@ -535,6 +535,19 @@ public:
|
||||
std::optional<range_split_result> operator()();
|
||||
};
|
||||
|
||||
struct tablet_metadata_change_hint {
|
||||
struct table_hint {
|
||||
table_id table_id;
|
||||
std::vector<token> tokens;
|
||||
|
||||
bool operator==(const table_hint&) const = default;
|
||||
};
|
||||
std::unordered_map<table_id, table_hint> tables;
|
||||
|
||||
bool operator==(const tablet_metadata_change_hint&) const = default;
|
||||
explicit operator bool() const noexcept { return !tables.empty(); }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
template <>
|
||||
@@ -575,3 +588,8 @@ template <>
|
||||
struct fmt::formatter<locator::tablet_metadata> : fmt::formatter<string_view> {
|
||||
auto format(const locator::tablet_metadata&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<locator::tablet_metadata_change_hint> : fmt::formatter<string_view> {
|
||||
auto format(const locator::tablet_metadata_change_hint&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
@@ -205,6 +205,71 @@ future<> save_tablet_metadata(replica::database& db, const tablet_metadata& tm,
|
||||
co_await db.apply(freeze(muts), db::no_timeout);
|
||||
}
|
||||
|
||||
static table_id to_tablet_metadata_key(const schema& s, const partition_key& key) {
|
||||
const auto elements = key.explode(s);
|
||||
return ::table_id(value_cast<utils::UUID>(uuid_type->deserialize_value(elements.front())));
|
||||
}
|
||||
|
||||
static dht::token to_tablet_metadata_row_key(const schema& s, const clustering_key& key) {
|
||||
const auto elements = key.explode(s);
|
||||
return dht::token::from_int64(value_cast<int64_t>(long_type->deserialize_value(elements[0])));
|
||||
}
|
||||
|
||||
static void do_update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hint, const schema& s, const mutation& m) {
|
||||
const auto table_id = to_tablet_metadata_key(s, m.key());
|
||||
auto it = hint.tables.try_emplace(table_id, locator::tablet_metadata_change_hint::table_hint{table_id, {}}).first;
|
||||
|
||||
const auto& mp = m.partition();
|
||||
auto& tokens = it->second.tokens;
|
||||
|
||||
if (mp.partition_tombstone() || !mp.row_tombstones().empty() || !mp.static_row().empty()) {
|
||||
// If there is a partition tombstone, range tombstone or static row,
|
||||
// update the entire partition. Also clear any row hints that might be
|
||||
// present to force a full read of the partition.
|
||||
tokens.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto& row : mp.clustered_rows()) {
|
||||
// TODO: we do not handle deletions yet, will revisit when tablet count
|
||||
// reduction is worked out.
|
||||
if (row.row().deleted_at()) {
|
||||
tokens.clear();
|
||||
return;
|
||||
}
|
||||
tokens.push_back(to_tablet_metadata_row_key(s, row.key()));
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<locator::tablet_metadata_change_hint> get_tablet_metadata_change_hint(const std::vector<canonical_mutation>& mutations) {
|
||||
tablet_logger.trace("tablet_metadata_change_hint({})", mutations.size());
|
||||
auto s = db::system_keyspace::tablets();
|
||||
|
||||
std::optional<locator::tablet_metadata_change_hint> hint;
|
||||
|
||||
for (const auto& cm : mutations) {
|
||||
tablet_logger.trace("tablet_metadata_change_hint() {} == {}", cm.column_family_id(), s->id());
|
||||
if (cm.column_family_id() != s->id()) {
|
||||
continue;
|
||||
}
|
||||
if (!hint) {
|
||||
hint.emplace();
|
||||
hint->tables.reserve(mutations.size());
|
||||
}
|
||||
do_update_tablet_metadata_change_hint(*hint, *s, cm.to_mutation(s));
|
||||
}
|
||||
|
||||
return hint;
|
||||
}
|
||||
|
||||
void update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hint, const mutation& m) {
|
||||
auto s = db::system_keyspace::tablets();
|
||||
if (m.column_family_id() != s->id()) {
|
||||
return;
|
||||
}
|
||||
do_update_tablet_metadata_change_hint(hint, *s, m);
|
||||
}
|
||||
|
||||
future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
|
||||
tablet_metadata tm;
|
||||
struct active_tablet_map {
|
||||
|
||||
@@ -61,6 +61,16 @@ mutation make_drop_tablet_map_mutation(table_id, api::timestamp_type);
|
||||
/// The timestamp must be greater than api::min_timestamp.
|
||||
future<> save_tablet_metadata(replica::database&, const locator::tablet_metadata&, api::timestamp_type);
|
||||
|
||||
/// Extract a tablet metadata change hint from the tablet mutations.
|
||||
///
|
||||
/// Mutations which don't mutate the tablet table are ignored.
|
||||
std::optional<locator::tablet_metadata_change_hint> get_tablet_metadata_change_hint(const std::vector<canonical_mutation>&);
|
||||
|
||||
/// Update the tablet metadata change hint, with the changes represented by the tablet mutation.
|
||||
///
|
||||
/// If the mutation belongs to another table, no updates are done.
|
||||
void update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint&, const mutation&);
|
||||
|
||||
/// Reads tablet metadata from system.tablets.
|
||||
future<locator::tablet_metadata> read_tablet_metadata(cql3::query_processor&);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user