From c0ad2a8e0e54656b2ec3504e9c2ddaa81dbedc29 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Fri, 27 Feb 2015 13:43:40 -0500 Subject: [PATCH] sstables: parse the index file We usually don't read the whole file into memory, so the probing interface will also allow for the specification of boundaries that we should be use for reading. The sstable needs to be informed - usually by the schema - of how many columns the partition key is composed of - 1 for simple keys, more than one, for composites. Signed-off-by: Glauber Costa --- sstables/sstables.cc | 59 ++++++++++++++++++++++++++++++++++++++++++++ sstables/sstables.hh | 14 ++++++++++- sstables/types.hh | 6 +++++ 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 0f97db0e24..4742d1cd07 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -5,6 +5,7 @@ #include "log.hh" #include #include +#include #include "core/future.hh" #include "core/future-util.hh" #include "core/sstring.hh" @@ -298,6 +299,10 @@ future<> parse(file_input_stream& in, compaction_metadata& m) { return parse(in, m.ancestors, m.cardinality); } +future<> parse(file_input_stream& in, index_entry& ie) { + return parse(in, ie.key, ie.position, ie.promoted_index); +} + template future<> parse(file_input_stream& in, std::unique_ptr& p) { p.reset(new Child); @@ -404,6 +409,60 @@ future<> sstable::read_toc() { } +future sstable::read_indexes(uint64_t position, uint64_t quantity) { + struct reader { + uint64_t count = 0; + std::vector indexes; + file_input_stream stream; + reader(lw_shared_ptr f, uint64_t quantity) : stream(f) { indexes.reserve(quantity); } + }; + + auto r = make_lw_shared(_index_file, quantity); + + r->stream.seek(position); + + auto end = [r, quantity] { return r->count >= quantity; }; + + return do_until(end, [this, r] { + r->indexes.emplace_back(); + auto fut = parse(r->stream, r->indexes.back()); + return std::move(fut).then_wrapped([this, r] (future<> f) mutable { + try { + f.get(); + r->count++; + } catch (bufsize_mismatch_exception &e) { + // We have optimistically emplaced back one element of the + // vector. If we have failed to parse, we should remove it + // so size() gives us the right picture. + r->indexes.pop_back(); + + // FIXME: If the file ends at an index boundary, there is + // no problem. Essentially, we can't know how many indexes + // are in a sampling group, so there isn't really any way + // to know, other than reading. + // + // If, however, we end in the middle of an index, this is a + // corrupted file. This code is not perfect because we only + // know that an exception happened, and it happened due to + // eof. We don't really know if eof happened at the index + // boundary. To know that, we would have to keep track of + // the real position of the stream (including what's + // already in the buffer) before we start to read the + // index, and after. We won't go through such complexity at + // the moment. + if (r->stream.eof()) { + r->count = std::numeric_limitscount)>::type>::max(); + } else { + throw e; + } + } + return make_ready_future<>(); + }); + }).then([r] { + return make_ready_future(std::move(r->indexes)); + }); +} + template future<> sstable::read_simple() { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 3f1df8f749..126e6f6a35 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -10,7 +10,8 @@ #include "core/future.hh" #include "core/sstring.hh" #include "core/enum.hh" -#include +#include "core/shared_ptr.hh" +#include #include #include "types.hh" #include "core/enum.hh" @@ -26,6 +27,8 @@ public: } }; +using index_list = std::vector; + class sstable { public: enum class component_type { @@ -79,12 +82,21 @@ private: future<> read_statistics(); future<> open_data(); + future read_indexes(uint64_t position, uint64_t quantity); + public: sstable(sstring dir, unsigned long epoch, version_types v, format_types f) : _dir(dir), _epoch(epoch), _version(v), _format(f) {} sstable& operator=(const sstable&) = delete; sstable(const sstable&) = delete; sstable(sstable&&) = default; + future read_indexes(uint64_t position) { + return read_indexes(position, _summary.header.sampling_level); + } + + future read_indexes_for_testing(uint64_t position, uint64_t quantity) { + return read_indexes(position, quantity); + } future<> load(); }; } diff --git a/sstables/types.hh b/sstables/types.hh index 6de7d523c9..3cbec3929e 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -47,6 +47,12 @@ struct filter { disk_array buckets; }; +struct index_entry { + disk_string key; + uint64_t position; + disk_string promoted_index; +}; + // FIXME: Not yet, can't know what an index entry is without a schema. struct summary_entry { int notyet;