thrift: Implement get_multi_slice verb
The get_multi_slice verb is used to perform multiple slices on a single row key in one operation. It takes a set of column_slices, which we normalize to not contain any overlapping ranges. Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
@@ -512,9 +512,65 @@ public:
|
||||
}
|
||||
|
||||
void get_multi_slice(tcxx::function<void(std::vector<ColumnOrSuperColumn> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const MultiSliceRequest& request) {
|
||||
std::vector<ColumnOrSuperColumn> _return;
|
||||
// FIXME: implement
|
||||
return pass_unimplemented(exn_cob);
|
||||
with_cob(std::move(cob), std::move(exn_cob), [&] {
|
||||
if (!request.__isset.key) {
|
||||
throw make_exception<InvalidRequestException>("Key may not be empty");
|
||||
}
|
||||
if (!request.__isset.column_parent || request.column_parent.column_family.empty()) {
|
||||
throw make_exception<InvalidRequestException>("non-empty table is required");
|
||||
}
|
||||
if (!request.column_parent.super_column.empty()) {
|
||||
throw make_exception<InvalidRequestException>("get_multi_slice does not support super columns");
|
||||
}
|
||||
auto schema = lookup_schema(_db.local(), current_keyspace(), request.column_parent.column_family);
|
||||
auto& s = *schema;
|
||||
auto pk = key_from_thrift(s, to_bytes(request.key));
|
||||
auto dk = dht::global_partitioner().decorate_key(s, pk);
|
||||
std::vector<column_id> regular_columns;
|
||||
std::vector<query::clustering_range> clustering_ranges;
|
||||
auto opts = query_opts(s);
|
||||
uint32_t row_limit;
|
||||
if (s.thrift().is_dynamic()) {
|
||||
row_limit = request.count;
|
||||
clustering_ranges = make_non_overlapping_ranges<clustering_key_prefix>(std::move(request.column_slices), [&s](auto&& cslice) {
|
||||
return make_clustering_range(s, cslice.start, cslice.finish);
|
||||
}, clustering_key_prefix::prefix_equal_tri_compare(s), request.reversed);
|
||||
regular_columns.emplace_back(s.regular_begin()->id);
|
||||
if (request.reversed) {
|
||||
opts.set(query::partition_slice::option::reversed);
|
||||
}
|
||||
} else {
|
||||
row_limit = query::max_rows;
|
||||
clustering_ranges.emplace_back(query::clustering_range::make_open_ended_both_sides());
|
||||
auto ranges = make_non_overlapping_ranges<bytes>(std::move(request.column_slices), [](auto&& cslice) {
|
||||
return make_range(cslice.start, cslice.finish);
|
||||
}, [&s](auto&& s1, auto&& s2) { return s.regular_column_name_type()->compare(s1, s2); }, request.reversed);
|
||||
auto on_range = [&](auto&& range) {
|
||||
auto start = range.start() ? s.regular_lower_bound(range.start()->value()) : s.regular_begin();
|
||||
auto end = range.end() ? s.regular_upper_bound(range.end()->value()) : s.regular_end();
|
||||
regular_columns = add_columns(start, end, request.reversed);
|
||||
};
|
||||
if (request.reversed) {
|
||||
std::for_each(ranges.rbegin(), ranges.rend(), on_range);
|
||||
} else {
|
||||
std::for_each(ranges.begin(), ranges.end(), on_range);
|
||||
}
|
||||
}
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts, nullptr);
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), row_limit);
|
||||
return service::get_local_storage_proxy().query(
|
||||
schema,
|
||||
cmd,
|
||||
{query::partition_range::make_singular(dk)},
|
||||
cl_from_thrift(request.consistency_level)).then([schema, cmd, column_limit = request.count](auto result) {
|
||||
return query::result_view::do_with(*result, [schema, cmd, column_limit](query::result_view v) {
|
||||
column_aggregator aggregator(*schema, cmd->slice, column_limit);
|
||||
v.consume(cmd->slice, aggregator);
|
||||
auto cols = aggregator.release();
|
||||
return !cols.empty() ? std::move(cols.begin()->second) : std::vector<ColumnOrSuperColumn>();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void describe_schema_versions(tcxx::function<void(std::map<std::string, std::vector<std::string> > const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
|
||||
@@ -1176,6 +1232,18 @@ private:
|
||||
}
|
||||
return range;
|
||||
}
|
||||
static range<bytes> make_range(const std::string& start, const std::string& end) {
|
||||
using bound = range<bytes>::bound;
|
||||
stdx::optional<bound> start_bound;
|
||||
if (!start.empty()) {
|
||||
start_bound = bound(to_bytes(start));
|
||||
}
|
||||
stdx::optional<bound> end_bound;
|
||||
if (!end.empty()) {
|
||||
end_bound = bound(to_bytes(end));
|
||||
}
|
||||
return { std::move(start_bound), std::move(end_bound) };
|
||||
}
|
||||
static std::pair<schema::const_iterator, schema::const_iterator> make_column_range(const schema& s, const std::string& start, const std::string& end) {
|
||||
auto start_it = start.empty() ? s.regular_begin() : s.regular_lower_bound(to_bytes(start));
|
||||
auto end_it = end.empty() ? s.regular_end() : s.regular_upper_bound(to_bytes(end));
|
||||
@@ -1441,6 +1509,26 @@ private:
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
template<typename RangeType, typename Comparator>
|
||||
static std::vector<range<RangeType>> make_non_overlapping_ranges(
|
||||
std::vector<ColumnSlice> column_slices,
|
||||
const std::function<range<RangeType>(ColumnSlice&&)> mapper,
|
||||
const Comparator&& cmp,
|
||||
bool reversed) {
|
||||
std::vector<range<RangeType>> ranges;
|
||||
std::transform(column_slices.begin(), column_slices.end(), std::back_inserter(ranges), [&](auto&& cslice) {
|
||||
auto range = mapper(std::move(cslice));
|
||||
if (!reversed && range.is_wrap_around(cmp)) {
|
||||
throw make_exception<InvalidRequestException>("Column slice had start %s greater than finish %s", cslice.start, cslice.finish);
|
||||
} else if (reversed && !range.is_wrap_around(cmp)) {
|
||||
throw make_exception<InvalidRequestException>("Reversed column slice had start %s less than finish %s", cslice.start, cslice.finish);
|
||||
} else if (reversed) {
|
||||
range.reverse();
|
||||
}
|
||||
return range;
|
||||
});
|
||||
return range<RangeType>::deoverlap(std::move(ranges), cmp);
|
||||
}
|
||||
};
|
||||
|
||||
class handler_factory : public CassandraCobSvIfFactory {
|
||||
|
||||
Reference in New Issue
Block a user