service: migration_manager: clear old entries from group 0 history when announcing

When performing a change through group 0 (which right now only covers
schema changes), clear entries from group 0 history table which are older
than one week.

This is done by including an appropriate range tombstone in the group 0
history table mutation.
This commit is contained in:
Kamil Braun
2022-01-18 15:36:59 +01:00
parent eb42213db4
commit e9083433a8
4 changed files with 30 additions and 3 deletions

View File

@@ -3047,7 +3047,8 @@ future<bool> system_keyspace::group0_history_contains(utils::UUID state_id) {
co_return !rs->empty();
}
mutation system_keyspace::make_group0_history_state_id_mutation(utils::UUID state_id, std::string_view description) {
mutation system_keyspace::make_group0_history_state_id_mutation(
utils::UUID state_id, std::optional<gc_clock::duration> gc_older_than, std::string_view description) {
auto s = group0_history();
mutation m(s, partition_key::from_singular(*s, GROUP0_HISTORY_KEY));
auto& row = m.partition().clustered_row(*s, clustering_key::from_singular(*s, state_id));
@@ -3058,6 +3059,23 @@ mutation system_keyspace::make_group0_history_state_id_mutation(utils::UUID stat
assert(cdef);
row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, cdef->type->decompose(description)));
}
if (gc_older_than) {
using namespace std::chrono;
assert(*gc_older_than >= gc_clock::duration{0});
auto ts_millis = duration_cast<milliseconds>(microseconds{ts});
auto gc_older_than_millis = duration_cast<milliseconds>(*gc_older_than);
assert(gc_older_than_millis < ts_millis);
auto tomb_upper_bound = utils::UUID_gen::min_time_UUID(ts_millis - gc_older_than_millis);
// We want to delete all entries with IDs smaller than `tomb_upper_bound`
// but the deleted range is of the form (x, +inf) since the schema is reversed.
auto range = query::clustering_range::make_starting_with({
clustering_key_prefix::from_single_value(*s, timeuuid_type->decompose(tomb_upper_bound)), false});
auto bv = bound_view::from_range(range);
m.partition().apply_delete(*s, range_tombstone{bv.first, bv.second, tombstone{ts, gc_clock::now()}});
}
return m;
}

View File

@@ -421,8 +421,14 @@ public:
static future<bool> group0_history_contains(utils::UUID state_id);
// The mutation appends the given state ID to the group 0 history table, with the given description if non-empty.
//
// If `gc_older_than` is provided, the mutation will also contain a tombstone that clears all entries whose
// timestamps (contained in the state IDs) are older than `timestamp(state_id) - gc_older_than`.
// The duration must be non-negative and smaller than `timestamp(state_id)`.
//
// The mutation's timestamp is extracted from the state ID.
static mutation make_group0_history_state_id_mutation(utils::UUID state_id, std::string_view description);
static mutation make_group0_history_state_id_mutation(
utils::UUID state_id, std::optional<gc_clock::duration> gc_older_than, std::string_view description);
// Obtain the contents of the group 0 history table in mutation form.
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.

View File

@@ -58,6 +58,7 @@ migration_manager::migration_manager(migration_notifier& notifier, gms::feature_
_notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _raft_gr(raft_gr)
, _schema_push([this] { return passive_announce(); })
, _group0_read_apply_mutex{1}, _group0_operation_mutex{1}
, _group0_history_gc_duration{std::chrono::duration_cast<gc_clock::duration>(std::chrono::weeks{1})}
{
}
@@ -993,7 +994,7 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
}},
.history_append{db::system_keyspace::make_group0_history_state_id_mutation(
guard.new_group0_state_id(), description)},
guard.new_group0_state_id(), _group0_history_gc_duration, description)},
.prev_state_id{guard.observed_group0_state_id()},
.new_state_id{guard.new_group0_state_id()},

View File

@@ -102,6 +102,8 @@ private:
// See `group0_guard::impl` for explanation of the purpose of these locks.
semaphore _group0_read_apply_mutex;
semaphore _group0_operation_mutex;
gc_clock::duration _group0_history_gc_duration;
public:
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr);