diff --git a/sstables/partition.cc b/sstables/partition.cc index 8f512e1395..3c954add3f 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -83,6 +83,7 @@ static inline bytes pop_back(std::vector& vec) { class mp_row_consumer : public row_consumer { schema_ptr _schema; key_view _key; + std::function (mutation&& m)> _mutation_to_subscription; struct column { bool is_static; @@ -231,6 +232,11 @@ public: : _schema(_schema) { } + mp_row_consumer(const schema_ptr _schema, std::function (mutation&& m)> sub_fn) + : _schema(_schema) + , _mutation_to_subscription(sub_fn) + { } + void validate_row_marker() { if (_schema->is_dense()) { throw malformed_sstable_exception("row marker found in dense table"); @@ -306,6 +312,9 @@ public: virtual void consume_row_end() override { if (mut) { _pending_collection.flush(*_schema, *mut); + if (_mutation_to_subscription) { + _mutation_to_subscription(std::move(*mut)); + } } } @@ -410,4 +419,83 @@ sstables::sstable::read_row(schema_ptr schema, const sstables::key& key) { }); }); } + +subscription +sstables::sstable::read_range_rows(schema_ptr schema, const dht::token& min_token, const dht::token& max_token, + std::function (mutation m)> walker) { + auto pstream = make_lw_shared>(); + + auto ret = pstream->listen(std::move(walker)); + pstream->started().then([this, pstream, schema, min_token, max_token] { + if (max_token < min_token) { + return make_ready_future<>(); + } + auto& summary = _summary; + + auto min_idx = adjust_binary_search_index(binary_search(summary.entries, minimum_key(), min_token)); + auto max_idx = adjust_binary_search_index(binary_search(summary.entries, maximum_key(), max_token)); + + if (max_idx < 0) { + return make_ready_future<>(); + } + + if (min_idx < 0) { + min_idx = 0; + } + + auto position = _summary.entries[min_idx].position; + auto ipos_fut = read_indexes(position).then([this, min_token] (auto index_list) { + // Note that we have to adjust the binary search result here as + // well. We will never find the exact element, since we are not + // using real keys. + // + // So what we really want here is to know in which bucket does the + // set of keys that compute the token of interest starts. + auto m = adjust_binary_search_index(this->binary_search(index_list, minimum_key(), min_token)); + auto min_index_idx = m >= 0 ? m : 0; + + // We will be given an element that is guaranteed to be before the + // minimum token in token order. This can happen in two + // situations: + // + // 1) if both elements compute the same token (differing in the key comparator) + // 2) if the element returned computes a token that precedes the minimum token. + // + // In the former case, we will retain the element. But in the + // latter we want to discard it. Otherwise we would be returning a + // token that is smaller than the minimum requested. + auto candidate = key_view(bytes_view(index_list[min_index_idx])); + auto tcandidate = dht::global_partitioner().get_token(candidate); + if (tcandidate < min_token) { + min_index_idx++; + } + return make_ready_future(index_list[min_index_idx].position); + }); + + auto epos_fut = read_indexes(position).then([this, max_idx, max_token] (auto index_list) { + auto m = adjust_binary_search_index(this->binary_search(index_list, maximum_key(), max_token)); + auto max_index_idx = m >= 0 ? m : int(index_list.size()); + + // For the max case, we don't need to do the index adjustment. + // Since we compare greater than any key that computes max_token, + // they are all guaranteed to be in the final set. + return this->data_end_position(max_idx, max_index_idx, index_list); + }); + + return when_all(std::move(ipos_fut), std::move(epos_fut)).then([this, schema, pstream] (auto positions) { + auto subscription_producer = [pstream] (mutation &&mut) { + return pstream->produce(std::move(mut)); + }; + + auto ipos = std::get(std::get<0>(positions).get()); + auto epos = std::get(std::get<1>(positions).get()); + return do_with(mp_row_consumer(schema, subscription_producer), [this, ipos, epos] (auto& c) { + return this->data_consume_rows(c, ipos, epos); + }); + }); + }).then([pstream] { + pstream->close(); + }); + return ret; +} } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 3c10e1c72d..a65c5e6f9c 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -22,6 +22,7 @@ #include "schema.hh" #include "mutation.hh" #include "utils/i_filter.hh" +#include "core/stream.hh" namespace sstables { @@ -100,6 +101,15 @@ public: } future read_row(schema_ptr schema, const key& k); + /** + * @param schema a schema_ptr object describing this table + * @param min the minimum token we want to search for (inclusive) + * @param max the maximum token we want to search for (inclusive) + * @param walker a future-returning function to be called for each mutation found within the specified range + * @return a subscription that will call @param walker for every mutation found. + */ + subscription + read_range_rows(schema_ptr schema, const dht::token& min, const dht::token& max, std::function (mutation m)> walker); // Write sstable components from a memtable. future<> write_components(const memtable& mt);