diff --git a/sstables/index_reader.hh b/sstables/index_reader.hh index 7f3d1069b7..05f9eed277 100644 --- a/sstables/index_reader.hh +++ b/sstables/index_reader.hh @@ -23,6 +23,7 @@ #include "sstables.hh" #include "consumer.hh" #include "downsampling.hh" +#include "sstables/shared_index_lists.hh" namespace sstables { @@ -176,13 +177,12 @@ public: // If eof() then the cursor is positioned past all partitions in the sstable. class index_reader { shared_sstable _sstable; + shared_index_lists::list_ptr _current_list; const io_priority_class& _pc; struct reader { index_consumer _consumer; index_consume_entry_context _context; - uint64_t _current_summary_idx; - uint64_t _current_index_idx; static auto create_file_input_stream(shared_sstable sst, const io_priority_class& pc, uint64_t begin, uint64_t end) { file_input_stream_options options; @@ -200,12 +200,14 @@ class index_reader { stdx::optional _reader; - index_list _previous_bucket; uint64_t _previous_summary_idx = 0; + uint64_t _current_summary_idx = 0; + uint64_t _current_index_idx = 0; uint64_t _data_file_position = 0; private: future<> advance_to_end() { _data_file_position = data_file_end(); + _current_list = {}; return close_reader().finally([this] { _reader = stdx::nullopt; }); @@ -213,8 +215,8 @@ private: // Must be called for non-decreasing summary_idx. future<> advance_to_page(uint64_t summary_idx) { - assert(!_reader || _reader->_current_summary_idx <= summary_idx); - if (_reader && _reader->_current_summary_idx == summary_idx) { + assert(!_current_list || _current_summary_idx <= summary_idx); + if (_current_list && _current_summary_idx == summary_idx) { return make_ready_future<>(); } @@ -223,28 +225,39 @@ private: return advance_to_end(); } - uint64_t position = summary.entries[summary_idx].position; - uint64_t quantity = downsampling::get_effective_index_interval_after_index(summary_idx, summary.header.sampling_level, - summary.header.min_index_interval); + auto loader = [this] (uint64_t summary_idx) -> future { + auto& summary = _sstable->get_summary(); + uint64_t position = summary.entries[summary_idx].position; + uint64_t quantity = downsampling::get_effective_index_interval_after_index(summary_idx, summary.header.sampling_level, + summary.header.min_index_interval); - uint64_t end; - if (summary_idx + 1 >= summary.header.size) { - end = _sstable->index_size(); - } else { - end = summary.entries[summary_idx + 1].position; - } - - return close_reader().then_wrapped([this, position, end, quantity, summary_idx] (auto&& f) { - try { - f.get(); - _reader.emplace(_sstable, _pc, position, end, quantity); - } catch (...) { - _reader = stdx::nullopt; - throw; + uint64_t end; + if (summary_idx + 1 >= summary.header.size) { + end = _sstable->index_size(); + } else { + end = summary.entries[summary_idx + 1].position; } - _reader->_current_summary_idx = summary_idx; - _reader->_current_index_idx = 0; - return _reader->_context.consume_input(_reader->_context); + + return close_reader().then_wrapped([this, position, end, quantity, summary_idx] (auto&& f) { + try { + f.get(); + _reader.emplace(_sstable, _pc, position, end, quantity); + } catch (...) { + _reader = stdx::nullopt; + throw; + } + return _reader->_context.consume_input(_reader->_context).then([this] { + return std::move(_reader->_consumer.indexes); + }); + }); + }; + + return _sstable->_index_lists.get_or_load(summary_idx, loader).then([this, summary_idx] (shared_index_lists::list_ptr ref) { + _current_list = std::move(ref); + _current_summary_idx = summary_idx; + _current_index_idx = 0; + assert(!_current_list->empty()); + _data_file_position = (*_current_list)[0].position(); }); } public: @@ -269,9 +282,11 @@ public: , _pc(pc) { } + // Cannot be used twice on the same summary_idx and together with advance_to(). + // @deprecated future get_index_entries(uint64_t summary_idx) { return advance_to_page(summary_idx).then([this] { - return _reader ? std::move(_reader->_consumer.indexes) : index_list(); + return _current_list ? _current_list.release() : index_list(); }); } public: @@ -304,17 +319,17 @@ public: // The solution is this condition above. If our lookup requires reading // the previous bucket we assume that the entry doesn't exist and return // the position of the first one in the current index bucket. - if (_reader && summary_idx + 1 == _reader->_current_summary_idx) { + if (summary_idx + 1 == _current_summary_idx) { return make_ready_future<>(); } return advance_to_page(summary_idx).then([this, pos, summary_idx] { - auto& il = _reader->_consumer.indexes; - auto i = std::lower_bound(il.begin() + _reader->_current_index_idx, il.end(), pos, index_comparator(*_sstable->_schema)); + index_list& il = *_current_list; + auto i = std::lower_bound(il.begin() + _current_index_idx, il.end(), pos, index_comparator(*_sstable->_schema)); if (i == il.end()) { return advance_to_page(summary_idx + 1); } - _reader->_current_index_idx = std::distance(il.begin(), i); + _current_index_idx = std::distance(il.begin(), i); _data_file_position = i->position(); return make_ready_future<>(); }); diff --git a/sstables/shared_index_lists.hh b/sstables/shared_index_lists.hh new file mode 100644 index 0000000000..9566d5a43b --- /dev/null +++ b/sstables/shared_index_lists.hh @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +namespace sstables { + +using index_list = std::vector; + +// Associative cache of summary index -> index_list +// Entries stay around as long as there is any live external reference (list_ptr) to them. +// Supports asynchronous insertion, ensures that only one entry will be loaded. +class shared_index_lists { +public: + using key_type = uint64_t; + struct stats { + uint64_t hits = 0; // Number of times entry was found ready + uint64_t misses = 0; // Number of times entry was not found + uint64_t blocks = 0; // Number of times entry was not ready (>= misses) + }; +private: + class entry : public enable_lw_shared_from_this { + public: + key_type key; + index_list list; + shared_promise<> loaded; + shared_index_lists& parent; + + entry(shared_index_lists& parent, key_type key) + : key(key), parent(parent) + { } + ~entry() { + parent._lists.erase(key); + } + bool operator==(const entry& e) const { return key == e.key; } + bool operator!=(const entry& e) const { return key != e.key; } + }; + std::unordered_map _lists; + static thread_local stats _shard_stats; +public: + // Pointer to index_list + class list_ptr { + lw_shared_ptr _e; + public: + using element_type = index_list; + list_ptr() = default; + explicit list_ptr(lw_shared_ptr e) : _e(std::move(e)) {} + explicit operator bool() const { return static_cast(_e); } + index_list& operator*() { return _e->list; } + const index_list& operator*() const { return _e->list; } + index_list* operator->() { return &_e->list; } + const index_list* operator->() const { return &_e->list; } + + index_list release() { + auto res = _e.owned() ? index_list(std::move(_e->list)) : index_list(_e->list); + _e = {}; + return std::move(res); + } + }; + + shared_index_lists() = default; + shared_index_lists(shared_index_lists&&) = delete; + shared_index_lists(const shared_index_lists&) = delete; + + // Returns a future which resolves with a shared pointer to index_list for given key. + // Always returns a valid pointer if succeeds. The pointer is never invalidated externally. + // + // If entry is missing, the loader is invoked. If list is already loading, this invocation + // will wait for prior loading to complete and use its result when it's done. + // + // The loader object does not survive deferring, so the caller must deal with its liveness. + template + future get_or_load(key_type key, Loader&& loader) { + auto i = _lists.find(key); + lw_shared_ptr e; + if (i != _lists.end()) { + e = i->second->shared_from_this(); + } else { + ++_shard_stats.misses; + e = make_lw_shared(*this, key); + auto res = _lists.emplace(key, e.get()); + assert(res.second); + loader(key).then_wrapped([e](future&& f) mutable { + if (f.failed()) { + e->loaded.set_exception(f.get_exception()); + } else { + e->list = f.get0(); + e->loaded.set_value(); + } + }); + } + future<> f = e->loaded.get_shared_future(); + if (!f.available()) { + ++_shard_stats.blocks; + return f.then([e]() mutable { + return list_ptr(std::move(e)); + }); + } else { + ++_shard_stats.hits; + return make_ready_future(list_ptr(std::move(e))); + } + } + + static const stats& shard_stats() { return _shard_stats; } +}; + +} diff --git a/sstables/sstables.cc b/sstables/sstables.cc index b032ee1ed1..6c32a24cf2 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2729,4 +2729,6 @@ atomic_deletion_cancelled::what() const noexcept { return _msg.c_str(); } +thread_local shared_index_lists::stats shared_index_lists::_shard_stats; + } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index ee8b176b7d..7996ef40ae 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -50,6 +50,7 @@ #include "compound_compat.hh" #include "disk-error-handler.hh" #include "atomic_deletion.hh" +#include "sstables/shared_index_lists.hh" namespace sstables { @@ -117,7 +118,6 @@ class sstable_writer; struct foreign_sstable_open_info; struct sstable_open_info; -using index_list = std::vector; class index_reader; struct sstable_writer_config { @@ -437,6 +437,7 @@ private: std::vector _unrecognized_components; foreign_ptr> _components = make_foreign(make_lw_shared()); + shared_index_lists _index_lists; bool _shared = true; // across shards; safe default // NOTE: _collector and _c_stats are used to generation of statistics file // when writing a new sstable.