sstables: index_reader: Share index lists among other index readers
Direct motivation for this is to be able to use two index readers from a single mutation reader, one for lower bound of the range and one for the upper bound of the range, without sacrificing optimization of avoiding index reads when forwarding to partition ranges which are close by. After the change, all index readers of given sstable will share index buffers, so lower bound reader can reuse the page read by the upper bound reader. The reason for using two readers will be so that we are able to skip inside the partition range, not only outside of it. This is not possible if we use the same index reader to locate the upper bound of the range, because we may only advance the cursor.
This commit is contained in:
@@ -23,6 +23,7 @@
|
||||
#include "sstables.hh"
|
||||
#include "consumer.hh"
|
||||
#include "downsampling.hh"
|
||||
#include "sstables/shared_index_lists.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -176,13 +177,12 @@ public:
|
||||
// If eof() then the cursor is positioned past all partitions in the sstable.
|
||||
class index_reader {
|
||||
shared_sstable _sstable;
|
||||
shared_index_lists::list_ptr _current_list;
|
||||
const io_priority_class& _pc;
|
||||
|
||||
struct reader {
|
||||
index_consumer _consumer;
|
||||
index_consume_entry_context<index_consumer> _context;
|
||||
uint64_t _current_summary_idx;
|
||||
uint64_t _current_index_idx;
|
||||
|
||||
static auto create_file_input_stream(shared_sstable sst, const io_priority_class& pc, uint64_t begin, uint64_t end) {
|
||||
file_input_stream_options options;
|
||||
@@ -200,12 +200,14 @@ class index_reader {
|
||||
|
||||
stdx::optional<reader> _reader;
|
||||
|
||||
index_list _previous_bucket;
|
||||
uint64_t _previous_summary_idx = 0;
|
||||
uint64_t _current_summary_idx = 0;
|
||||
uint64_t _current_index_idx = 0;
|
||||
uint64_t _data_file_position = 0;
|
||||
private:
|
||||
future<> advance_to_end() {
|
||||
_data_file_position = data_file_end();
|
||||
_current_list = {};
|
||||
return close_reader().finally([this] {
|
||||
_reader = stdx::nullopt;
|
||||
});
|
||||
@@ -213,8 +215,8 @@ private:
|
||||
|
||||
// Must be called for non-decreasing summary_idx.
|
||||
future<> advance_to_page(uint64_t summary_idx) {
|
||||
assert(!_reader || _reader->_current_summary_idx <= summary_idx);
|
||||
if (_reader && _reader->_current_summary_idx == summary_idx) {
|
||||
assert(!_current_list || _current_summary_idx <= summary_idx);
|
||||
if (_current_list && _current_summary_idx == summary_idx) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -223,28 +225,39 @@ private:
|
||||
return advance_to_end();
|
||||
}
|
||||
|
||||
uint64_t position = summary.entries[summary_idx].position;
|
||||
uint64_t quantity = downsampling::get_effective_index_interval_after_index(summary_idx, summary.header.sampling_level,
|
||||
summary.header.min_index_interval);
|
||||
auto loader = [this] (uint64_t summary_idx) -> future<index_list> {
|
||||
auto& summary = _sstable->get_summary();
|
||||
uint64_t position = summary.entries[summary_idx].position;
|
||||
uint64_t quantity = downsampling::get_effective_index_interval_after_index(summary_idx, summary.header.sampling_level,
|
||||
summary.header.min_index_interval);
|
||||
|
||||
uint64_t end;
|
||||
if (summary_idx + 1 >= summary.header.size) {
|
||||
end = _sstable->index_size();
|
||||
} else {
|
||||
end = summary.entries[summary_idx + 1].position;
|
||||
}
|
||||
|
||||
return close_reader().then_wrapped([this, position, end, quantity, summary_idx] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
_reader.emplace(_sstable, _pc, position, end, quantity);
|
||||
} catch (...) {
|
||||
_reader = stdx::nullopt;
|
||||
throw;
|
||||
uint64_t end;
|
||||
if (summary_idx + 1 >= summary.header.size) {
|
||||
end = _sstable->index_size();
|
||||
} else {
|
||||
end = summary.entries[summary_idx + 1].position;
|
||||
}
|
||||
_reader->_current_summary_idx = summary_idx;
|
||||
_reader->_current_index_idx = 0;
|
||||
return _reader->_context.consume_input(_reader->_context);
|
||||
|
||||
return close_reader().then_wrapped([this, position, end, quantity, summary_idx] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
_reader.emplace(_sstable, _pc, position, end, quantity);
|
||||
} catch (...) {
|
||||
_reader = stdx::nullopt;
|
||||
throw;
|
||||
}
|
||||
return _reader->_context.consume_input(_reader->_context).then([this] {
|
||||
return std::move(_reader->_consumer.indexes);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
return _sstable->_index_lists.get_or_load(summary_idx, loader).then([this, summary_idx] (shared_index_lists::list_ptr ref) {
|
||||
_current_list = std::move(ref);
|
||||
_current_summary_idx = summary_idx;
|
||||
_current_index_idx = 0;
|
||||
assert(!_current_list->empty());
|
||||
_data_file_position = (*_current_list)[0].position();
|
||||
});
|
||||
}
|
||||
public:
|
||||
@@ -269,9 +282,11 @@ public:
|
||||
, _pc(pc)
|
||||
{ }
|
||||
|
||||
// Cannot be used twice on the same summary_idx and together with advance_to().
|
||||
// @deprecated
|
||||
future<index_list> get_index_entries(uint64_t summary_idx) {
|
||||
return advance_to_page(summary_idx).then([this] {
|
||||
return _reader ? std::move(_reader->_consumer.indexes) : index_list();
|
||||
return _current_list ? _current_list.release() : index_list();
|
||||
});
|
||||
}
|
||||
public:
|
||||
@@ -304,17 +319,17 @@ public:
|
||||
// The solution is this condition above. If our lookup requires reading
|
||||
// the previous bucket we assume that the entry doesn't exist and return
|
||||
// the position of the first one in the current index bucket.
|
||||
if (_reader && summary_idx + 1 == _reader->_current_summary_idx) {
|
||||
if (summary_idx + 1 == _current_summary_idx) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return advance_to_page(summary_idx).then([this, pos, summary_idx] {
|
||||
auto& il = _reader->_consumer.indexes;
|
||||
auto i = std::lower_bound(il.begin() + _reader->_current_index_idx, il.end(), pos, index_comparator(*_sstable->_schema));
|
||||
index_list& il = *_current_list;
|
||||
auto i = std::lower_bound(il.begin() + _current_index_idx, il.end(), pos, index_comparator(*_sstable->_schema));
|
||||
if (i == il.end()) {
|
||||
return advance_to_page(summary_idx + 1);
|
||||
}
|
||||
_reader->_current_index_idx = std::distance(il.begin(), i);
|
||||
_current_index_idx = std::distance(il.begin(), i);
|
||||
_data_file_position = i->position();
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
130
sstables/shared_index_lists.hh
Normal file
130
sstables/shared_index_lists.hh
Normal file
@@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
using index_list = std::vector<index_entry>;
|
||||
|
||||
// Associative cache of summary index -> index_list
|
||||
// Entries stay around as long as there is any live external reference (list_ptr) to them.
|
||||
// Supports asynchronous insertion, ensures that only one entry will be loaded.
|
||||
class shared_index_lists {
|
||||
public:
|
||||
using key_type = uint64_t;
|
||||
struct stats {
|
||||
uint64_t hits = 0; // Number of times entry was found ready
|
||||
uint64_t misses = 0; // Number of times entry was not found
|
||||
uint64_t blocks = 0; // Number of times entry was not ready (>= misses)
|
||||
};
|
||||
private:
|
||||
class entry : public enable_lw_shared_from_this<entry> {
|
||||
public:
|
||||
key_type key;
|
||||
index_list list;
|
||||
shared_promise<> loaded;
|
||||
shared_index_lists& parent;
|
||||
|
||||
entry(shared_index_lists& parent, key_type key)
|
||||
: key(key), parent(parent)
|
||||
{ }
|
||||
~entry() {
|
||||
parent._lists.erase(key);
|
||||
}
|
||||
bool operator==(const entry& e) const { return key == e.key; }
|
||||
bool operator!=(const entry& e) const { return key != e.key; }
|
||||
};
|
||||
std::unordered_map<key_type, entry*> _lists;
|
||||
static thread_local stats _shard_stats;
|
||||
public:
|
||||
// Pointer to index_list
|
||||
class list_ptr {
|
||||
lw_shared_ptr<entry> _e;
|
||||
public:
|
||||
using element_type = index_list;
|
||||
list_ptr() = default;
|
||||
explicit list_ptr(lw_shared_ptr<entry> e) : _e(std::move(e)) {}
|
||||
explicit operator bool() const { return static_cast<bool>(_e); }
|
||||
index_list& operator*() { return _e->list; }
|
||||
const index_list& operator*() const { return _e->list; }
|
||||
index_list* operator->() { return &_e->list; }
|
||||
const index_list* operator->() const { return &_e->list; }
|
||||
|
||||
index_list release() {
|
||||
auto res = _e.owned() ? index_list(std::move(_e->list)) : index_list(_e->list);
|
||||
_e = {};
|
||||
return std::move(res);
|
||||
}
|
||||
};
|
||||
|
||||
shared_index_lists() = default;
|
||||
shared_index_lists(shared_index_lists&&) = delete;
|
||||
shared_index_lists(const shared_index_lists&) = delete;
|
||||
|
||||
// Returns a future which resolves with a shared pointer to index_list for given key.
|
||||
// Always returns a valid pointer if succeeds. The pointer is never invalidated externally.
|
||||
//
|
||||
// If entry is missing, the loader is invoked. If list is already loading, this invocation
|
||||
// will wait for prior loading to complete and use its result when it's done.
|
||||
//
|
||||
// The loader object does not survive deferring, so the caller must deal with its liveness.
|
||||
template<typename Loader>
|
||||
future<list_ptr> get_or_load(key_type key, Loader&& loader) {
|
||||
auto i = _lists.find(key);
|
||||
lw_shared_ptr<entry> e;
|
||||
if (i != _lists.end()) {
|
||||
e = i->second->shared_from_this();
|
||||
} else {
|
||||
++_shard_stats.misses;
|
||||
e = make_lw_shared<entry>(*this, key);
|
||||
auto res = _lists.emplace(key, e.get());
|
||||
assert(res.second);
|
||||
loader(key).then_wrapped([e](future<index_list>&& f) mutable {
|
||||
if (f.failed()) {
|
||||
e->loaded.set_exception(f.get_exception());
|
||||
} else {
|
||||
e->list = f.get0();
|
||||
e->loaded.set_value();
|
||||
}
|
||||
});
|
||||
}
|
||||
future<> f = e->loaded.get_shared_future();
|
||||
if (!f.available()) {
|
||||
++_shard_stats.blocks;
|
||||
return f.then([e]() mutable {
|
||||
return list_ptr(std::move(e));
|
||||
});
|
||||
} else {
|
||||
++_shard_stats.hits;
|
||||
return make_ready_future<list_ptr>(list_ptr(std::move(e)));
|
||||
}
|
||||
}
|
||||
|
||||
static const stats& shard_stats() { return _shard_stats; }
|
||||
};
|
||||
|
||||
}
|
||||
@@ -2729,4 +2729,6 @@ atomic_deletion_cancelled::what() const noexcept {
|
||||
return _msg.c_str();
|
||||
}
|
||||
|
||||
thread_local shared_index_lists::stats shared_index_lists::_shard_stats;
|
||||
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@
|
||||
#include "compound_compat.hh"
|
||||
#include "disk-error-handler.hh"
|
||||
#include "atomic_deletion.hh"
|
||||
#include "sstables/shared_index_lists.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -117,7 +118,6 @@ class sstable_writer;
|
||||
struct foreign_sstable_open_info;
|
||||
struct sstable_open_info;
|
||||
|
||||
using index_list = std::vector<index_entry>;
|
||||
class index_reader;
|
||||
|
||||
struct sstable_writer_config {
|
||||
@@ -437,6 +437,7 @@ private:
|
||||
std::vector<sstring> _unrecognized_components;
|
||||
|
||||
foreign_ptr<lw_shared_ptr<shareable_components>> _components = make_foreign(make_lw_shared<shareable_components>());
|
||||
shared_index_lists _index_lists;
|
||||
bool _shared = true; // across shards; safe default
|
||||
// NOTE: _collector and _c_stats are used to generation of statistics file
|
||||
// when writing a new sstable.
|
||||
|
||||
Reference in New Issue
Block a user