The central idea of incremental repair is to allow repair participants
to select and repair only a portion of the dataset to speed up the
repair process. All repair participants must utilize an identical
selection method to repair and synchronize the same selected dataset.
There are two primary selection methods: time-based and file-based. The
time-based method selects data within a specified time frame. It is
versatile but it is less efficient because it requires reading all of
the dataset and omitting data beyond the time frame. The file-based
method selects data from unrepaired SSTables and is more efficient
because it allows the entire SSTable to be omitted. This document patch
implements the file-based selection method.
Incremental repair will only be supported for tablet tables; it will not
be supported for vnode tables. On one hand, the legacy vnode is less
important to support. On the other hand, the incremental repair for
vnode is much harder to implement. With vnodes, a SSTalbe could contain
data for multiple vnode ranges. When a given vnode range is repaired,
only a portion of the SSTable is repaired. This complicates the
manipulation of SSTables significantly during both repair and
compaction. With tablets, an entire tablet is repaired so that a
sstable is either fully repaired or not repaired which is a huge
simplification.
This patch uses the repaired_at from sstables::statistics component to
mark a sstable as repaired. It uses a virtual clock as the repair
timestamp, i.e., using a monotonically increasing number for the
repaired_at field of a SSTable and sstables_repaired_at column in
system.tablets table. Notice that when a sstable is not repaired, the
repaired_at field will be set to the default value 0 by default. The
being_repaired in memory field of a SSTable is used to explicitly mark
that a SSTable is being selected. The following variables are used for
incremental repair:
The repaired_at on disk field of a SSTable is used.
- A 64-bit number increases sequentially
The sstables_repaired_at is added to the system.tablets table.
- repaired_at <= sstables_repaired_at means the sstable is repaired
The being_repaired in memory field of a SSTable is added.
- A repair UUID tells which sstable has participated in the repair
Initial test results:
1) Medium dataset results
Node amount: 3
Instance type: i4i.2xlarge
Disk usage per node: ~500GB
Cluster pre-populated with ~500GB of data before starting repairs job.
Results for Repair Timings:
The regular repair run took 210 mins.
Incremental repair 1st run took 183 mins, 2nd and 3rd runs took around 48s
The speedup is: 183 mins / 48s = 228X
2) Small dataset results
Node amount: 3
Instance type: i4i.2xlarge
Disk usage per node: ~167GB
Cluster pre-populated with ~167GB of data before starting the repairs job.
Regular repair 1st run took 110s, 2nd and 3rd runs took 110s.
Incremental repair 1st run took 110 seconds, 2nd and 3rd run took 1.5 seconds.
The speedup is: 110s / 1.5s = 73X
3) Large dataset results
Node amount: 6
Instance type: i4i.2xlarge, 3 racks
50% of base load, 50% read/write
Dataset == Sum of data on each node
Dataset Non-incremental repair (minutes)
1.3 TiB 31:07
3.5 TiB 25:10
5.0 TiB 19:03
6.3 TiB 31:42
Dataset Incremental repair (minutes)
1.3 TiB 24:32
3.0 TiB 13:06
4.0 TiB 5:23
4.8 TiB 7:14
5.6 TiB 3:58
6.3 TiB 7:33
7.0 TiB 6:55
Fixes #22472
106 lines
3.1 KiB
C++
106 lines
3.1 KiB
C++
#pragma once
|
|
|
|
#include "repair/decorated_key_with_hash.hh"
|
|
#include "readers/evictable.hh"
|
|
#include "dht/sharder.hh"
|
|
#include "reader_permit.hh"
|
|
#include "utils/phased_barrier.hh"
|
|
#include "readers/mutation_fragment_v1_stream.hh"
|
|
#include <fmt/core.h>
|
|
|
|
struct incremental_repair_meta;
|
|
|
|
class repair_reader {
|
|
public:
|
|
enum class read_strategy {
|
|
local,
|
|
multishard_split,
|
|
multishard_filter,
|
|
incremental_repair
|
|
};
|
|
|
|
private:
|
|
schema_ptr _schema;
|
|
reader_permit _permit;
|
|
dht::partition_range _range;
|
|
// Used to find the range that repair master will work on
|
|
dht::selective_token_range_sharder _sharder;
|
|
// Seed for the repair row hashing
|
|
uint64_t _seed;
|
|
// Pin the table while the reader is alive.
|
|
// Only needed for local readers, the multishard reader takes care
|
|
// of pinning tables on used shards.
|
|
std::optional<utils::phased_barrier::operation> _local_read_op;
|
|
std::optional<evictable_reader_handle> _reader_handle;
|
|
// Fragment stream of either local or multishard reader for the range
|
|
mutation_fragment_v1_stream _reader;
|
|
// Current partition read from disk
|
|
lw_shared_ptr<const decorated_key_with_hash> _current_dk;
|
|
uint64_t _reads_issued = 0;
|
|
uint64_t _reads_finished = 0;
|
|
|
|
mutation_reader make_reader(
|
|
seastar::sharded<replica::database>& db,
|
|
replica::column_family& cf,
|
|
read_strategy strategy,
|
|
const dht::sharder& remote_sharder,
|
|
unsigned remote_shard,
|
|
gc_clock::time_point compaction_time,
|
|
incremental_repair_meta inc);
|
|
|
|
public:
|
|
repair_reader(
|
|
seastar::sharded<replica::database>& db,
|
|
replica::column_family& cf,
|
|
schema_ptr s,
|
|
reader_permit permit,
|
|
dht::token_range range,
|
|
const dht::static_sharder& remote_sharder,
|
|
unsigned remote_shard,
|
|
uint64_t seed,
|
|
read_strategy strategy,
|
|
gc_clock::time_point compaction_time,
|
|
incremental_repair_meta inc);
|
|
|
|
future<mutation_fragment_opt>
|
|
read_mutation_fragment();
|
|
|
|
future<> on_end_of_stream() noexcept;
|
|
|
|
future<> close() noexcept;
|
|
|
|
lw_shared_ptr<const decorated_key_with_hash>& get_current_dk() {
|
|
return _current_dk;
|
|
}
|
|
|
|
void set_current_dk(const dht::decorated_key& key);
|
|
|
|
void clear_current_dk();
|
|
|
|
void check_current_dk();
|
|
|
|
void pause();
|
|
};
|
|
|
|
template <> struct fmt::formatter<repair_reader::read_strategy> : fmt::formatter<string_view> {
|
|
auto format(repair_reader::read_strategy s, fmt::format_context& ctx) const {
|
|
using enum repair_reader::read_strategy;
|
|
std::string_view name = "unknown";
|
|
switch (s) {
|
|
case local:
|
|
name = "local";
|
|
break;
|
|
case multishard_split:
|
|
name = "multishard_split";
|
|
break;
|
|
case multishard_filter:
|
|
name = "multishard_filter";
|
|
break;
|
|
case incremental_repair:
|
|
name = "incremental_repair";
|
|
break;
|
|
};
|
|
return formatter<string_view>::format(name, ctx);
|
|
}
|
|
};
|