backlog: keep track of maximum timestamp in write monitor

For sealed SSTables we can get the maximum timestamp from the statistics
component.  But for partially written SSTables, the metadata is not yet
available.

One way to solve this would be to make the SSTable statistics available
earlier. But we would end up with a maximum timestamp that potentially
changes all the time as we write more cells.

A better approach is to take note of what's the maximum timestamp in a
memtable before we start to flush, and when time comes for us to flush
we will use the progress manager to inform the consumers about the
maximum timestamp.

For SSTables being compacted, we can't know for sure what is the maximum
timestamp as some entries could be TTLd already. But the maximum of all
SSTables present in the compaction is a good enough estimation for this
purposes.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2018-05-18 14:11:50 -04:00
parent 68d1c64e7a
commit b573a2ff61
3 changed files with 30 additions and 7 deletions

View File

@@ -125,12 +125,15 @@ class database_sstable_write_monitor : public permit_monitor, public backlog_wri
sstables::compaction_strategy& _compaction_strategy;
const sstables::writer_offset_tracker* _tracker = nullptr;
uint64_t _progress_seen = 0;
api::timestamp_type _maximum_timestamp;
public:
database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager, sstables::compaction_strategy& strategy)
database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager,
sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp)
: permit_monitor(std::move(permit))
, _sst(std::move(sst))
, _compaction_manager(manager)
, _compaction_strategy(strategy)
, _maximum_timestamp(max_timestamp)
{}
virtual void on_write_started(const sstables::writer_offset_tracker& t) override {
@@ -154,6 +157,10 @@ public:
}
return _progress_seen;
}
api::timestamp_type maximum_timestamp() const override {
return _maximum_timestamp;
}
};
static const std::unordered_set<sstring> system_keyspaces = {
@@ -896,7 +903,7 @@ column_family::seal_active_streaming_memtable_immediate(flush_permit&& permit) {
// Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the
// memtable list, since this memtable was not available for reading up until this point.
auto fp = permit.release_sstable_write_permit();
database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy);
database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable {
auto&& priority = service::get_local_streaming_write_priority();
return write_memtable_to_sstable(*old, newtab, monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, false).then([this, newtab, old] {
@@ -956,7 +963,7 @@ future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_bi
newtab->set_unshared();
auto fp = permit.release_sstable_write_permit();
auto monitor = std::make_unique<database_sstable_write_monitor>(std::move(fp), newtab, _compaction_manager, _compaction_strategy);
auto monitor = std::make_unique<database_sstable_write_monitor>(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
auto&& priority = service::get_local_streaming_write_priority();
auto fut = write_memtable_to_sstable(*old, newtab, *monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, true);
return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable {
@@ -1051,7 +1058,7 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstabl
//
// The code as is guarantees that we'll never partially backup a
// single sstable, so that is enough of a guarantee.
database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy);
database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) {
auto&& priority = service::get_local_memtable_flush_priority();
auto f = write_memtable_to_sstable(*old, newtab, monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, false);

View File

@@ -220,10 +220,13 @@ class compaction_write_monitor final : public sstables::write_monitor, public ba
column_family& _cf;
const sstables::writer_offset_tracker* _tracker = nullptr;
uint64_t _progress_seen = 0;
api::timestamp_type _maximum_timestamp;
public:
compaction_write_monitor(sstables::shared_sstable sst, column_family& cf)
compaction_write_monitor(sstables::shared_sstable sst, column_family& cf, api::timestamp_type max_timestamp)
: _sst(sst)
, _cf(cf) {}
, _cf(cf)
, _maximum_timestamp(max_timestamp)
{}
~compaction_write_monitor() {
if (_sst) {
@@ -255,6 +258,10 @@ public:
_sst = {};
}
api::timestamp_type maximum_timestamp() const override {
return _maximum_timestamp;
}
virtual void on_write_completed() override { }
virtual void on_flush_completed() override { }
};
@@ -327,6 +334,13 @@ protected:
sst->open_data().get0();
_info->end_size += sst->bytes_on_disk();
}
api::timestamp_type maximum_timestamp() const {
auto m = std::max_element(_sstables.begin(), _sstables.end(), [] (const shared_sstable& sst1, const shared_sstable& sst2) {
return sst1->get_stats_metadata().max_timestamp < sst2->get_stats_metadata().max_timestamp;
});
return (*m)->get_stats_metadata().max_timestamp;
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
@@ -543,7 +557,7 @@ public:
_sst = _creator();
setup_new_sstable(_sst);
_active_write_monitors.emplace_back(_sst, _cf);
_active_write_monitors.emplace_back(_sst, _cf, maximum_timestamp());
auto&& priority = service::get_local_compaction_priority();
sstable_writer_config cfg;
cfg.max_sstable_size = _max_sstable_size;

View File

@@ -26,6 +26,7 @@
#include <seastar/core/shared_ptr.hh>
#include "shared_sstable.hh"
#include "sstables/progress_monitor.hh"
#include "timestamp.hh"
class compaction_backlog_manager;
class compaction_controller;
@@ -55,6 +56,7 @@ class compaction_controller;
// will certainly be gone by then.
struct backlog_write_progress_manager {
virtual uint64_t written() const = 0;
virtual api::timestamp_type maximum_timestamp() const = 0;
virtual ~backlog_write_progress_manager() {}
};