sstables: allow to convert a full range

While trying to write code to populate the system table's data, I found out
that currently, there is no publicly exported way for an entity outside the
sstable to find out which are the keys available in the index.

Without the full list of keys, it is very hard to make the kind of
"read-everything" queries we will need for that to work.

Although I haven't checked the internals fully, Origin seems to do that through
token-based range searches, where you can specify start and end tokens and
return everything in the middle (this is how the WHERE clauses are
implemented).

I am proposing in this patchset a subscription interface that will allow us
to extract data from the sstables in this fashion.

Signed-off-by: Glauber Costa <glommer@cloudius-systems.com>
This commit is contained in:
Glauber Costa
2015-05-19 22:35:53 -04:00
parent 6a8049dce1
commit 62327ff25d
2 changed files with 98 additions and 0 deletions

View File

@@ -83,6 +83,7 @@ static inline bytes pop_back(std::vector<bytes>& vec) {
class mp_row_consumer : public row_consumer {
schema_ptr _schema;
key_view _key;
std::function<future<> (mutation&& m)> _mutation_to_subscription;
struct column {
bool is_static;
@@ -231,6 +232,11 @@ public:
: _schema(_schema)
{ }
mp_row_consumer(const schema_ptr _schema, std::function<future<> (mutation&& m)> sub_fn)
: _schema(_schema)
, _mutation_to_subscription(sub_fn)
{ }
void validate_row_marker() {
if (_schema->is_dense()) {
throw malformed_sstable_exception("row marker found in dense table");
@@ -306,6 +312,9 @@ public:
virtual void consume_row_end() override {
if (mut) {
_pending_collection.flush(*_schema, *mut);
if (_mutation_to_subscription) {
_mutation_to_subscription(std::move(*mut));
}
}
}
@@ -410,4 +419,83 @@ sstables::sstable::read_row(schema_ptr schema, const sstables::key& key) {
});
});
}
subscription<mutation>
sstables::sstable::read_range_rows(schema_ptr schema, const dht::token& min_token, const dht::token& max_token,
std::function<future<> (mutation m)> walker) {
auto pstream = make_lw_shared<stream<mutation>>();
auto ret = pstream->listen(std::move(walker));
pstream->started().then([this, pstream, schema, min_token, max_token] {
if (max_token < min_token) {
return make_ready_future<>();
}
auto& summary = _summary;
auto min_idx = adjust_binary_search_index(binary_search(summary.entries, minimum_key(), min_token));
auto max_idx = adjust_binary_search_index(binary_search(summary.entries, maximum_key(), max_token));
if (max_idx < 0) {
return make_ready_future<>();
}
if (min_idx < 0) {
min_idx = 0;
}
auto position = _summary.entries[min_idx].position;
auto ipos_fut = read_indexes(position).then([this, min_token] (auto index_list) {
// Note that we have to adjust the binary search result here as
// well. We will never find the exact element, since we are not
// using real keys.
//
// So what we really want here is to know in which bucket does the
// set of keys that compute the token of interest starts.
auto m = adjust_binary_search_index(this->binary_search(index_list, minimum_key(), min_token));
auto min_index_idx = m >= 0 ? m : 0;
// We will be given an element that is guaranteed to be before the
// minimum token in token order. This can happen in two
// situations:
//
// 1) if both elements compute the same token (differing in the key comparator)
// 2) if the element returned computes a token that precedes the minimum token.
//
// In the former case, we will retain the element. But in the
// latter we want to discard it. Otherwise we would be returning a
// token that is smaller than the minimum requested.
auto candidate = key_view(bytes_view(index_list[min_index_idx]));
auto tcandidate = dht::global_partitioner().get_token(candidate);
if (tcandidate < min_token) {
min_index_idx++;
}
return make_ready_future<size_t>(index_list[min_index_idx].position);
});
auto epos_fut = read_indexes(position).then([this, max_idx, max_token] (auto index_list) {
auto m = adjust_binary_search_index(this->binary_search(index_list, maximum_key(), max_token));
auto max_index_idx = m >= 0 ? m : int(index_list.size());
// For the max case, we don't need to do the index adjustment.
// Since we compare greater than any key that computes max_token,
// they are all guaranteed to be in the final set.
return this->data_end_position(max_idx, max_index_idx, index_list);
});
return when_all(std::move(ipos_fut), std::move(epos_fut)).then([this, schema, pstream] (auto positions) {
auto subscription_producer = [pstream] (mutation &&mut) {
return pstream->produce(std::move(mut));
};
auto ipos = std::get<size_t>(std::get<0>(positions).get());
auto epos = std::get<size_t>(std::get<1>(positions).get());
return do_with(mp_row_consumer(schema, subscription_producer), [this, ipos, epos] (auto& c) {
return this->data_consume_rows(c, ipos, epos);
});
});
}).then([pstream] {
pstream->close();
});
return ret;
}
}

View File

@@ -22,6 +22,7 @@
#include "schema.hh"
#include "mutation.hh"
#include "utils/i_filter.hh"
#include "core/stream.hh"
namespace sstables {
@@ -100,6 +101,15 @@ public:
}
future<mutation_opt> read_row(schema_ptr schema, const key& k);
/**
* @param schema a schema_ptr object describing this table
* @param min the minimum token we want to search for (inclusive)
* @param max the maximum token we want to search for (inclusive)
* @param walker a future-returning function to be called for each mutation found within the specified range
* @return a subscription that will call @param walker for every mutation found.
*/
subscription<mutation>
read_range_rows(schema_ptr schema, const dht::token& min, const dht::token& max, std::function<future<> (mutation m)> walker);
// Write sstable components from a memtable.
future<> write_components(const memtable& mt);