diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 0f97db0e24..4742d1cd07 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -5,6 +5,7 @@ #include "log.hh" #include #include +#include #include "core/future.hh" #include "core/future-util.hh" #include "core/sstring.hh" @@ -298,6 +299,10 @@ future<> parse(file_input_stream& in, compaction_metadata& m) { return parse(in, m.ancestors, m.cardinality); } +future<> parse(file_input_stream& in, index_entry& ie) { + return parse(in, ie.key, ie.position, ie.promoted_index); +} + template future<> parse(file_input_stream& in, std::unique_ptr& p) { p.reset(new Child); @@ -404,6 +409,60 @@ future<> sstable::read_toc() { } +future sstable::read_indexes(uint64_t position, uint64_t quantity) { + struct reader { + uint64_t count = 0; + std::vector indexes; + file_input_stream stream; + reader(lw_shared_ptr f, uint64_t quantity) : stream(f) { indexes.reserve(quantity); } + }; + + auto r = make_lw_shared(_index_file, quantity); + + r->stream.seek(position); + + auto end = [r, quantity] { return r->count >= quantity; }; + + return do_until(end, [this, r] { + r->indexes.emplace_back(); + auto fut = parse(r->stream, r->indexes.back()); + return std::move(fut).then_wrapped([this, r] (future<> f) mutable { + try { + f.get(); + r->count++; + } catch (bufsize_mismatch_exception &e) { + // We have optimistically emplaced back one element of the + // vector. If we have failed to parse, we should remove it + // so size() gives us the right picture. + r->indexes.pop_back(); + + // FIXME: If the file ends at an index boundary, there is + // no problem. Essentially, we can't know how many indexes + // are in a sampling group, so there isn't really any way + // to know, other than reading. + // + // If, however, we end in the middle of an index, this is a + // corrupted file. This code is not perfect because we only + // know that an exception happened, and it happened due to + // eof. We don't really know if eof happened at the index + // boundary. To know that, we would have to keep track of + // the real position of the stream (including what's + // already in the buffer) before we start to read the + // index, and after. We won't go through such complexity at + // the moment. + if (r->stream.eof()) { + r->count = std::numeric_limitscount)>::type>::max(); + } else { + throw e; + } + } + return make_ready_future<>(); + }); + }).then([r] { + return make_ready_future(std::move(r->indexes)); + }); +} + template future<> sstable::read_simple() { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 3f1df8f749..126e6f6a35 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -10,7 +10,8 @@ #include "core/future.hh" #include "core/sstring.hh" #include "core/enum.hh" -#include +#include "core/shared_ptr.hh" +#include #include #include "types.hh" #include "core/enum.hh" @@ -26,6 +27,8 @@ public: } }; +using index_list = std::vector; + class sstable { public: enum class component_type { @@ -79,12 +82,21 @@ private: future<> read_statistics(); future<> open_data(); + future read_indexes(uint64_t position, uint64_t quantity); + public: sstable(sstring dir, unsigned long epoch, version_types v, format_types f) : _dir(dir), _epoch(epoch), _version(v), _format(f) {} sstable& operator=(const sstable&) = delete; sstable(const sstable&) = delete; sstable(sstable&&) = default; + future read_indexes(uint64_t position) { + return read_indexes(position, _summary.header.sampling_level); + } + + future read_indexes_for_testing(uint64_t position, uint64_t quantity) { + return read_indexes(position, quantity); + } future<> load(); }; } diff --git a/sstables/types.hh b/sstables/types.hh index 6de7d523c9..3cbec3929e 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -47,6 +47,12 @@ struct filter { disk_array buckets; }; +struct index_entry { + disk_string key; + uint64_t position; + disk_string promoted_index; +}; + // FIXME: Not yet, can't know what an index entry is without a schema. struct summary_entry { int notyet;