From 891763d7fdce6a0c00a9e3aa4c48208eb1f5b5c2 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Wed, 25 Mar 2015 11:20:58 +0200 Subject: [PATCH] sstables: implement low-level data file random-access reading This patch implements sstable::data_read(pos, len) to do random-access read of a specific byte range from the data file. Later we'll determine the byte range needed to read a specific row, using the summary and index files. This function works for either a compressed or uncompressed data file. To support the compressed data file, we need to determine when opening the sstable also the data file's size, and then make a compression_metadata object using the data we read from the compression file and the data file's size. Signed-off-by: Nadav Har'El --- sstables/sstables.cc | 27 +++++++++++++++++++++++++++ sstables/sstables.hh | 16 ++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index aab4726644..b06d1d7ed1 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -52,6 +52,9 @@ public: } }; +// FIXME: We don't use this API, and it can be removed. The compressed reader +// is only needed for the data file, and for that we have the nicer +// data_stream_at() API below. class compressed_file_random_access_reader : public random_access_reader { lw_shared_ptr _file; lw_shared_ptr _cm; @@ -581,6 +584,9 @@ future<> sstable::open_data() { engine().open_file_dma(filename(component_type::Data), open_flags::ro)).then([this] (auto files) { _index_file = make_lw_shared(std::move(std::get(std::get<0>(files).get()))); _data_file = make_lw_shared(std::move(std::get(std::get<1>(files).get()))); + return _data_file->size().then([this] (auto size) { + _data_file_size = size; + }); }); } @@ -595,6 +601,13 @@ future<> sstable::load() { return read_summary(); }).then([this] { return open_data(); + }).then([this] { + // After we have _compression and _data_file_size, we can build + // _compression_metatadata (and invalidate _compression). + if (has_component(sstable::component_type::CompressionInfo)) { + _compression_metadata = make_lw_shared( + std::move(_compression), _data_file_size); + } }); } @@ -619,4 +632,18 @@ sstable::version_types sstable::version_from_sstring(sstring &s) { sstable::format_types sstable::format_from_sstring(sstring &s) { return reverse_map(s, _format_string); } + +input_stream sstable::data_stream_at(uint64_t pos) { + if (_compression_metadata) { + return make_compressed_file_input_stream( + _data_file, _compression_metadata, pos); + } else { + return make_file_input_stream(_data_file, pos); + } +} + +future> sstable::data_read(uint64_t pos, size_t len) { + return data_stream_at(pos).read_exactly(len); +} + } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 4dd18f85c4..787955dbed 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -15,6 +15,7 @@ #include #include "types.hh" #include "core/enum.hh" +#include "compress.hh" namespace sstables { @@ -53,11 +54,13 @@ private: std::unordered_set> _components; compression _compression; + lw_shared_ptr _compression_metadata; filter _filter; summary _summary; statistics _statistics; lw_shared_ptr _index_file; lw_shared_ptr _data_file; + size_t _data_file_size; sstring _dir; unsigned long _generation = 0; @@ -84,6 +87,15 @@ private: future read_indexes(uint64_t position, uint64_t quantity); + input_stream data_stream_at(uint64_t pos); + // Read exactly the specific byte range from the data file (after + // uncompression, if the file is compressed). This can be used to read + // a specific row from the data file (its position and length can be + // determined using the index file). + // This function is intended (and optimized for) random access, not + // for iteration through all the rows. + future> data_read(uint64_t pos, size_t len); + public: sstable(sstring dir, unsigned long generation, version_types v, format_types f) : _dir(dir), _generation(generation), _version(v), _format(f) {} sstable& operator=(const sstable&) = delete; @@ -103,5 +115,9 @@ public: future<> load(); future read_summary_entry(size_t i); + + // Allow the test cases from sstable_test.cc to test private methods + friend class uncompressed_random_access_read; + friend class compressed_random_access_read; }; }