The central idea of incremental repair is to allow repair participants
to select and repair only a portion of the dataset to speed up the
repair process. All repair participants must utilize an identical
selection method to repair and synchronize the same selected dataset.
There are two primary selection methods: time-based and file-based. The
time-based method selects data within a specified time frame. It is
versatile but it is less efficient because it requires reading all of
the dataset and omitting data beyond the time frame. The file-based
method selects data from unrepaired SSTables and is more efficient
because it allows the entire SSTable to be omitted. This document patch
implements the file-based selection method.
Incremental repair will only be supported for tablet tables; it will not
be supported for vnode tables. On one hand, the legacy vnode is less
important to support. On the other hand, the incremental repair for
vnode is much harder to implement. With vnodes, a SSTalbe could contain
data for multiple vnode ranges. When a given vnode range is repaired,
only a portion of the SSTable is repaired. This complicates the
manipulation of SSTables significantly during both repair and
compaction. With tablets, an entire tablet is repaired so that a
sstable is either fully repaired or not repaired which is a huge
simplification.
This patch uses the repaired_at from sstables::statistics component to
mark a sstable as repaired. It uses a virtual clock as the repair
timestamp, i.e., using a monotonically increasing number for the
repaired_at field of a SSTable and sstables_repaired_at column in
system.tablets table. Notice that when a sstable is not repaired, the
repaired_at field will be set to the default value 0 by default. The
being_repaired in memory field of a SSTable is used to explicitly mark
that a SSTable is being selected. The following variables are used for
incremental repair:
The repaired_at on disk field of a SSTable is used.
- A 64-bit number increases sequentially
The sstables_repaired_at is added to the system.tablets table.
- repaired_at <= sstables_repaired_at means the sstable is repaired
The being_repaired in memory field of a SSTable is added.
- A repair UUID tells which sstable has participated in the repair
Initial test results:
1) Medium dataset results
Node amount: 3
Instance type: i4i.2xlarge
Disk usage per node: ~500GB
Cluster pre-populated with ~500GB of data before starting repairs job.
Results for Repair Timings:
The regular repair run took 210 mins.
Incremental repair 1st run took 183 mins, 2nd and 3rd runs took around 48s
The speedup is: 183 mins / 48s = 228X
2) Small dataset results
Node amount: 3
Instance type: i4i.2xlarge
Disk usage per node: ~167GB
Cluster pre-populated with ~167GB of data before starting the repairs job.
Regular repair 1st run took 110s, 2nd and 3rd runs took 110s.
Incremental repair 1st run took 110 seconds, 2nd and 3rd run took 1.5 seconds.
The speedup is: 110s / 1.5s = 73X
3) Large dataset results
Node amount: 6
Instance type: i4i.2xlarge, 3 racks
50% of base load, 50% read/write
Dataset == Sum of data on each node
Dataset Non-incremental repair (minutes)
1.3 TiB 31:07
3.5 TiB 25:10
5.0 TiB 19:03
6.3 TiB 31:42
Dataset Incremental repair (minutes)
1.3 TiB 24:32
3.0 TiB 13:06
4.0 TiB 5:23
4.8 TiB 7:14
5.6 TiB 3:58
6.3 TiB 7:33
7.0 TiB 6:55
Fixes #22472
165 lines
4.6 KiB
C++
165 lines
4.6 KiB
C++
#pragma once
|
|
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include "schema/schema_fwd.hh"
|
|
#include "reader_permit.hh"
|
|
#include "service/topology_guard.hh"
|
|
#include "streaming/stream_reason.hh"
|
|
#include "repair/decorated_key_with_hash.hh"
|
|
#include "readers/upgrading_consumer.hh"
|
|
#include "./sstables/shared_sstable.hh"
|
|
|
|
using namespace seastar;
|
|
|
|
namespace db {
|
|
class system_distributed_keyspace;
|
|
namespace view {
|
|
class view_update_generator;
|
|
}
|
|
}
|
|
|
|
class mutation_fragment_queue {
|
|
public:
|
|
class impl {
|
|
std::vector<mutation_fragment_v2> _pending;
|
|
public:
|
|
virtual future<> push(mutation_fragment_v2 mf) = 0;
|
|
virtual void abort(std::exception_ptr ep) = 0;
|
|
virtual void push_end_of_stream() = 0;
|
|
virtual ~impl() {}
|
|
future<> flush() {
|
|
for (auto&& mf : _pending) {
|
|
co_await push(std::move(mf));
|
|
}
|
|
_pending.clear();
|
|
}
|
|
|
|
std::vector<mutation_fragment_v2>& pending() {
|
|
return _pending;
|
|
}
|
|
};
|
|
|
|
private:
|
|
|
|
class consumer {
|
|
std::vector<mutation_fragment_v2>& _fragments;
|
|
public:
|
|
explicit consumer(std::vector<mutation_fragment_v2>& fragments)
|
|
: _fragments(fragments)
|
|
{}
|
|
|
|
void operator()(mutation_fragment_v2 mf) {
|
|
_fragments.push_back(std::move(mf));
|
|
}
|
|
};
|
|
seastar::shared_ptr<impl> _impl;
|
|
upgrading_consumer<consumer> _consumer;
|
|
|
|
public:
|
|
mutation_fragment_queue(schema_ptr s, reader_permit permit, seastar::shared_ptr<impl> impl)
|
|
: _impl(std::move(impl))
|
|
, _consumer(*s, std::move(permit), consumer(_impl->pending()))
|
|
{}
|
|
|
|
future<> push(mutation_fragment mf) {
|
|
_consumer.consume(std::move(mf));
|
|
return _impl->flush();
|
|
}
|
|
|
|
void abort(std::exception_ptr ep) {
|
|
_impl->abort(ep);
|
|
}
|
|
|
|
void push_end_of_stream() {
|
|
_impl->push_end_of_stream();
|
|
}
|
|
};
|
|
|
|
class repair_writer : public enable_lw_shared_from_this<repair_writer> {
|
|
schema_ptr _schema;
|
|
reader_permit _permit;
|
|
// Current partition written to disk
|
|
lw_shared_ptr<const decorated_key_with_hash> _current_dk_written_to_sstable;
|
|
// Is current partition still open. A partition is opened when a
|
|
// partition_start is written and is closed when a partition_end is
|
|
// written.
|
|
bool _partition_opened;
|
|
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
|
|
bool _created_writer = false;
|
|
uint64_t _estimated_partitions = 0;
|
|
// Holds the sstables produced by repair
|
|
lw_shared_ptr<sstables::sstable_list> _sstables;
|
|
public:
|
|
class impl {
|
|
public:
|
|
virtual mutation_fragment_queue& queue() = 0;
|
|
virtual future<> wait_for_writer_done() = 0;
|
|
virtual void create_writer(lw_shared_ptr<repair_writer> writer) = 0;
|
|
virtual ~impl() = default;
|
|
};
|
|
private:
|
|
std::unique_ptr<impl> _impl;
|
|
mutation_fragment_queue* _mq;
|
|
public:
|
|
repair_writer(
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
std::unique_ptr<impl> impl)
|
|
: _schema(std::move(schema))
|
|
, _permit(std::move(permit))
|
|
, _sstables(make_lw_shared<sstables::sstable_list>())
|
|
, _impl(std::move(impl))
|
|
, _mq(&_impl->queue())
|
|
{}
|
|
|
|
|
|
void set_estimated_partitions(uint64_t estimated_partitions) {
|
|
_estimated_partitions = estimated_partitions;
|
|
}
|
|
|
|
uint64_t get_estimated_partitions() {
|
|
return _estimated_partitions;
|
|
}
|
|
|
|
void create_writer() {
|
|
_impl->create_writer(shared_from_this());
|
|
_created_writer = true;
|
|
}
|
|
|
|
future<> do_write(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf);
|
|
|
|
future<> wait_for_writer_done();
|
|
|
|
named_semaphore& sem() {
|
|
return _sem;
|
|
}
|
|
|
|
schema_ptr schema() const noexcept {
|
|
return _schema;
|
|
}
|
|
|
|
mutation_fragment_queue& queue() {
|
|
return _impl->queue();
|
|
}
|
|
|
|
lw_shared_ptr<sstables::sstable_list>& get_sstable_list_to_mark_as_repaired() {
|
|
return _sstables;
|
|
}
|
|
|
|
private:
|
|
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf);
|
|
future<> write_partition_end();
|
|
future<> write_end_of_stream();
|
|
};
|
|
|
|
lw_shared_ptr<repair_writer> make_repair_writer(
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
streaming::stream_reason reason,
|
|
sharded<replica::database>& db,
|
|
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
|
sharded<db::view::view_update_generator>& view_update_generator,
|
|
service::frozen_topology_guard topo_guard);
|
|
|