thrift: Implement get_multi_slice verb

The get_multi_slice verb is used to perform multiple slices on a
single row key in one operation. It takes a set of column_slices,
which we normalize to not contain any overlapping ranges.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2016-06-13 00:36:22 +02:00
parent 9792a77266
commit 822a315dfa

View File

@@ -512,9 +512,65 @@ public:
}
void get_multi_slice(tcxx::function<void(std::vector<ColumnOrSuperColumn> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const MultiSliceRequest& request) {
std::vector<ColumnOrSuperColumn> _return;
// FIXME: implement
return pass_unimplemented(exn_cob);
with_cob(std::move(cob), std::move(exn_cob), [&] {
if (!request.__isset.key) {
throw make_exception<InvalidRequestException>("Key may not be empty");
}
if (!request.__isset.column_parent || request.column_parent.column_family.empty()) {
throw make_exception<InvalidRequestException>("non-empty table is required");
}
if (!request.column_parent.super_column.empty()) {
throw make_exception<InvalidRequestException>("get_multi_slice does not support super columns");
}
auto schema = lookup_schema(_db.local(), current_keyspace(), request.column_parent.column_family);
auto& s = *schema;
auto pk = key_from_thrift(s, to_bytes(request.key));
auto dk = dht::global_partitioner().decorate_key(s, pk);
std::vector<column_id> regular_columns;
std::vector<query::clustering_range> clustering_ranges;
auto opts = query_opts(s);
uint32_t row_limit;
if (s.thrift().is_dynamic()) {
row_limit = request.count;
clustering_ranges = make_non_overlapping_ranges<clustering_key_prefix>(std::move(request.column_slices), [&s](auto&& cslice) {
return make_clustering_range(s, cslice.start, cslice.finish);
}, clustering_key_prefix::prefix_equal_tri_compare(s), request.reversed);
regular_columns.emplace_back(s.regular_begin()->id);
if (request.reversed) {
opts.set(query::partition_slice::option::reversed);
}
} else {
row_limit = query::max_rows;
clustering_ranges.emplace_back(query::clustering_range::make_open_ended_both_sides());
auto ranges = make_non_overlapping_ranges<bytes>(std::move(request.column_slices), [](auto&& cslice) {
return make_range(cslice.start, cslice.finish);
}, [&s](auto&& s1, auto&& s2) { return s.regular_column_name_type()->compare(s1, s2); }, request.reversed);
auto on_range = [&](auto&& range) {
auto start = range.start() ? s.regular_lower_bound(range.start()->value()) : s.regular_begin();
auto end = range.end() ? s.regular_upper_bound(range.end()->value()) : s.regular_end();
regular_columns = add_columns(start, end, request.reversed);
};
if (request.reversed) {
std::for_each(ranges.rbegin(), ranges.rend(), on_range);
} else {
std::for_each(ranges.begin(), ranges.end(), on_range);
}
}
auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts, nullptr);
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), row_limit);
return service::get_local_storage_proxy().query(
schema,
cmd,
{query::partition_range::make_singular(dk)},
cl_from_thrift(request.consistency_level)).then([schema, cmd, column_limit = request.count](auto result) {
return query::result_view::do_with(*result, [schema, cmd, column_limit](query::result_view v) {
column_aggregator aggregator(*schema, cmd->slice, column_limit);
v.consume(cmd->slice, aggregator);
auto cols = aggregator.release();
return !cols.empty() ? std::move(cols.begin()->second) : std::vector<ColumnOrSuperColumn>();
});
});
});
}
void describe_schema_versions(tcxx::function<void(std::map<std::string, std::vector<std::string> > const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
@@ -1176,6 +1232,18 @@ private:
}
return range;
}
static range<bytes> make_range(const std::string& start, const std::string& end) {
using bound = range<bytes>::bound;
stdx::optional<bound> start_bound;
if (!start.empty()) {
start_bound = bound(to_bytes(start));
}
stdx::optional<bound> end_bound;
if (!end.empty()) {
end_bound = bound(to_bytes(end));
}
return { std::move(start_bound), std::move(end_bound) };
}
static std::pair<schema::const_iterator, schema::const_iterator> make_column_range(const schema& s, const std::string& start, const std::string& end) {
auto start_it = start.empty() ? s.regular_begin() : s.regular_lower_bound(to_bytes(start));
auto end_it = end.empty() ? s.regular_end() : s.regular_upper_bound(to_bytes(end));
@@ -1441,6 +1509,26 @@ private:
});
return ret;
}
template<typename RangeType, typename Comparator>
static std::vector<range<RangeType>> make_non_overlapping_ranges(
std::vector<ColumnSlice> column_slices,
const std::function<range<RangeType>(ColumnSlice&&)> mapper,
const Comparator&& cmp,
bool reversed) {
std::vector<range<RangeType>> ranges;
std::transform(column_slices.begin(), column_slices.end(), std::back_inserter(ranges), [&](auto&& cslice) {
auto range = mapper(std::move(cslice));
if (!reversed && range.is_wrap_around(cmp)) {
throw make_exception<InvalidRequestException>("Column slice had start %s greater than finish %s", cslice.start, cslice.finish);
} else if (reversed && !range.is_wrap_around(cmp)) {
throw make_exception<InvalidRequestException>("Reversed column slice had start %s less than finish %s", cslice.start, cslice.finish);
} else if (reversed) {
range.reverse();
}
return range;
});
return range<RangeType>::deoverlap(std::move(ranges), cmp);
}
};
class handler_factory : public CassandraCobSvIfFactory {