diff --git a/database.cc b/database.cc index bb10ab161f..fe4f3eb730 100644 --- a/database.cc +++ b/database.cc @@ -125,12 +125,15 @@ class database_sstable_write_monitor : public permit_monitor, public backlog_wri sstables::compaction_strategy& _compaction_strategy; const sstables::writer_offset_tracker* _tracker = nullptr; uint64_t _progress_seen = 0; + api::timestamp_type _maximum_timestamp; public: - database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager, sstables::compaction_strategy& strategy) + database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager, + sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp) : permit_monitor(std::move(permit)) , _sst(std::move(sst)) , _compaction_manager(manager) , _compaction_strategy(strategy) + , _maximum_timestamp(max_timestamp) {} virtual void on_write_started(const sstables::writer_offset_tracker& t) override { @@ -154,6 +157,10 @@ public: } return _progress_seen; } + + api::timestamp_type maximum_timestamp() const override { + return _maximum_timestamp; + } }; static const std::unordered_set system_keyspaces = { @@ -896,7 +903,7 @@ column_family::seal_active_streaming_memtable_immediate(flush_permit&& permit) { // Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the // memtable list, since this memtable was not available for reading up until this point. auto fp = permit.release_sstable_write_permit(); - database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy); + database_sstable_write_monitor monitor(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); return do_with(std::move(monitor), [this, newtab, old, permit = std::move(permit)] (auto& monitor) mutable { auto&& priority = service::get_local_streaming_write_priority(); return write_memtable_to_sstable(*old, newtab, monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, false).then([this, newtab, old] { @@ -956,7 +963,7 @@ future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_bi newtab->set_unshared(); auto fp = permit.release_sstable_write_permit(); - auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy); + auto monitor = std::make_unique(std::move(fp), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); auto&& priority = service::get_local_streaming_write_priority(); auto fut = write_memtable_to_sstable(*old, newtab, *monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, true); return fut.then_wrapped([this, newtab, old, &smb, permit = std::move(permit), monitor = std::move(monitor)] (future<> f) mutable { @@ -1051,7 +1058,7 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr old, sstabl // // The code as is guarantees that we'll never partially backup a // single sstable, so that is enough of a guarantee. - database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy); + database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) { auto&& priority = service::get_local_memtable_flush_priority(); auto f = write_memtable_to_sstable(*old, newtab, monitor, get_large_partition_handler(), incremental_backups_enabled(), priority, false); diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 6f00e4f2f5..90e36c52b8 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -220,10 +220,13 @@ class compaction_write_monitor final : public sstables::write_monitor, public ba column_family& _cf; const sstables::writer_offset_tracker* _tracker = nullptr; uint64_t _progress_seen = 0; + api::timestamp_type _maximum_timestamp; public: - compaction_write_monitor(sstables::shared_sstable sst, column_family& cf) + compaction_write_monitor(sstables::shared_sstable sst, column_family& cf, api::timestamp_type max_timestamp) : _sst(sst) - , _cf(cf) {} + , _cf(cf) + , _maximum_timestamp(max_timestamp) + {} ~compaction_write_monitor() { if (_sst) { @@ -255,6 +258,10 @@ public: _sst = {}; } + api::timestamp_type maximum_timestamp() const override { + return _maximum_timestamp; + } + virtual void on_write_completed() override { } virtual void on_flush_completed() override { } }; @@ -327,6 +334,13 @@ protected: sst->open_data().get0(); _info->end_size += sst->bytes_on_disk(); } + + api::timestamp_type maximum_timestamp() const { + auto m = std::max_element(_sstables.begin(), _sstables.end(), [] (const shared_sstable& sst1, const shared_sstable& sst2) { + return sst1->get_stats_metadata().max_timestamp < sst2->get_stats_metadata().max_timestamp; + }); + return (*m)->get_stats_metadata().max_timestamp; + } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -543,7 +557,7 @@ public: _sst = _creator(); setup_new_sstable(_sst); - _active_write_monitors.emplace_back(_sst, _cf); + _active_write_monitors.emplace_back(_sst, _cf, maximum_timestamp()); auto&& priority = service::get_local_compaction_priority(); sstable_writer_config cfg; cfg.max_sstable_size = _max_sstable_size; diff --git a/sstables/compaction_backlog_manager.hh b/sstables/compaction_backlog_manager.hh index ff116dfe44..b1b649951f 100644 --- a/sstables/compaction_backlog_manager.hh +++ b/sstables/compaction_backlog_manager.hh @@ -26,6 +26,7 @@ #include #include "shared_sstable.hh" #include "sstables/progress_monitor.hh" +#include "timestamp.hh" class compaction_backlog_manager; class compaction_controller; @@ -55,6 +56,7 @@ class compaction_controller; // will certainly be gone by then. struct backlog_write_progress_manager { virtual uint64_t written() const = 0; + virtual api::timestamp_type maximum_timestamp() const = 0; virtual ~backlog_write_progress_manager() {} };