From b573a2ff614db828d5ba3f5ff8f1a2011dac17f3 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Fri, 18 May 2018 14:11:50 -0400 Subject: [PATCH] backlog: keep track of maximum timestamp in write monitor For sealed SSTables we can get the maximum timestamp from the statistics component. But for partially written SSTables, the metadata is not yet available. One way to solve this would be to make the SSTable statistics available earlier. But we would end up with a maximum timestamp that potentially changes all the time as we write more cells. A better approach is to take note of what's the maximum timestamp in a memtable before we start to flush, and when time comes for us to flush we will use the progress manager to inform the consumers about the maximum timestamp. For SSTables being compacted, we can't know for sure what is the maximum timestamp as some entries could be TTLd already. But the maximum of all SSTables present in the compaction is a good enough estimation for this purposes. Signed-off-by: Glauber Costa --- database.cc | 15 +++++++++++---- sstables/compaction.cc | 20 +++++++++++++++++--- sstables/compaction_backlog_manager.hh | 2 ++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/database.cc b/database.cc index bb10ab161f..fe4f3eb730 100644 --- a/database.cc +++ b/database.cc @@ -125,12 +125,15 @@ class database_sstable_write_monitor : public permit_monitor, public backlog_wri sstables::compaction_strategy& _compaction_strategy; const sstables::writer_offset_tracker* _tracker = nullptr; uint64_t _progress_seen = 0; + api::timestamp_type _maximum_timestamp; public: - database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager, sstables::compaction_strategy& strategy) + database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager, + sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp) : permit_monitor(std::move(permit)) , _sst(std::move(sst)) , _compaction_manager(manager) , _compaction_strategy(strategy) + , _maximum_timestamp(max_timestamp) {} virtual void on_write_started(const sstables::writer_offset_tracker& t) override { @@ -154,6 +157,10 @@ public: } return _progress_seen; } + + api::timestamp_type maximum_timestamp() const override { + return _maximum_timestamp; + } }; static const std::unordered_set system_keyspaces = { @@ -896,7 +903,7 @@ column_family::seal_active_streaming_memtable_immediate(flush_permit&& permit) { // Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the // memtable list, since this memtable was not available for reading up until this point. auto fp = permit.release_sstable_write_permit(); - database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy); + database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable { auto&& priority = service::get_local_streaming_write_priority(); return write_memtable_to_sstable(*old, newtab, monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, false).then([this, newtab, old] { @@ -956,7 +963,7 @@ future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_bi newtab->set_unshared(); auto fp = permit.release_sstable_write_permit(); - auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy); + auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); auto&& priority = service::get_local_streaming_write_priority(); auto fut = write_memtable_to_sstable(*old, newtab, *monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, true); return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable { @@ -1051,7 +1058,7 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr old, sstabl // // The code as is guarantees that we'll never partially backup a // single sstable, so that is enough of a guarantee. - database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy); + database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) { auto&& priority = service::get_local_memtable_flush_priority(); auto f = write_memtable_to_sstable(*old, newtab, monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, false); diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 6f00e4f2f5..90e36c52b8 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -220,10 +220,13 @@ class compaction_write_monitor final : public sstables::write_monitor, public ba column_family& _cf; const sstables::writer_offset_tracker* _tracker = nullptr; uint64_t _progress_seen = 0; + api::timestamp_type _maximum_timestamp; public: - compaction_write_monitor(sstables::shared_sstable sst, column_family& cf) + compaction_write_monitor(sstables::shared_sstable sst, column_family& cf, api::timestamp_type max_timestamp) : _sst(sst) - , _cf(cf) {} + , _cf(cf) + , _maximum_timestamp(max_timestamp) + {} ~compaction_write_monitor() { if (_sst) { @@ -255,6 +258,10 @@ public: _sst = {}; } + api::timestamp_type maximum_timestamp() const override { + return _maximum_timestamp; + } + virtual void on_write_completed() override { } virtual void on_flush_completed() override { } }; @@ -327,6 +334,13 @@ protected: sst->open_data().get0(); _info->end_size += sst->bytes_on_disk(); } + + api::timestamp_type maximum_timestamp() const { + auto m = std::max_element(_sstables.begin(), _sstables.end(), [] (const shared_sstable& sst1, const shared_sstable& sst2) { + return sst1->get_stats_metadata().max_timestamp < sst2->get_stats_metadata().max_timestamp; + }); + return (*m)->get_stats_metadata().max_timestamp; + } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -543,7 +557,7 @@ public: _sst = _creator(); setup_new_sstable(_sst); - _active_write_monitors.emplace_back(_sst, _cf); + _active_write_monitors.emplace_back(_sst, _cf, maximum_timestamp()); auto&& priority = service::get_local_compaction_priority(); sstable_writer_config cfg; cfg.max_sstable_size = _max_sstable_size; diff --git a/sstables/compaction_backlog_manager.hh b/sstables/compaction_backlog_manager.hh index ff116dfe44..b1b649951f 100644 --- a/sstables/compaction_backlog_manager.hh +++ b/sstables/compaction_backlog_manager.hh @@ -26,6 +26,7 @@ #include #include "shared_sstable.hh" #include "sstables/progress_monitor.hh" +#include "timestamp.hh" class compaction_backlog_manager; class compaction_controller; @@ -55,6 +56,7 @@ class compaction_controller; // will certainly be gone by then. struct backlog_write_progress_manager { virtual uint64_t written() const = 0; + virtual api::timestamp_type maximum_timestamp() const = 0; virtual ~backlog_write_progress_manager() {} };