storage_proxy: make range_slice_read_executor go through digest matching state

Currently scanning reads go to reconciliation stage directly which
requires asking for mutation data from all peers. This patch makes
it to try matching digests first like a single partition read.

The change requires internode protocol changes since currently it is not
possible to ask for multi partition data/digest over RPC. It means that
the capability has to be guarded by new gossip feature flag which the
patch also adds.
This commit is contained in:
Gleb Natapov
2017-08-03 10:53:22 +03:00
parent 3b7d8c8767
commit d2a2a6d471
3 changed files with 16 additions and 5 deletions

View File

@@ -2749,13 +2749,15 @@ public:
}
};
class range_slice_read_executor : public abstract_read_executor {
class range_slice_read_executor : public never_speculating_read_executor {
public:
range_slice_read_executor(schema_ptr s, lw_shared_ptr<column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets, tracing::trace_state_ptr trace_state) :
abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets), std::move(trace_state)) {}
using never_speculating_read_executor::never_speculating_read_executor;
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) override {
reconcile(_cl, timeout);
return _result_promise.get_future();
if (!service::get_local_storage_service().cluster_supports_digest_multipartition_reads()) {
reconcile(_cl, timeout);
return _result_promise.get_future();
}
return never_speculating_read_executor::execute(timeout);
}
};

View File

@@ -85,6 +85,7 @@ static const sstring LARGE_PARTITIONS_FEATURE = "LARGE_PARTITIONS";
static const sstring MATERIALIZED_VIEWS_FEATURE = "MATERIALIZED_VIEWS";
static const sstring COUNTERS_FEATURE = "COUNTERS";
static const sstring INDEXES_FEATURE = "INDEXES";
static const sstring DIGEST_MULTIPARTITION_READ_FEATURE = "DIGEST_MULTIPARTITION_READ";
distributed<storage_service> _the_storage_service;
@@ -125,6 +126,7 @@ sstring storage_service::get_config_supported_features() {
RANGE_TOMBSTONES_FEATURE,
LARGE_PARTITIONS_FEATURE,
COUNTERS_FEATURE,
DIGEST_MULTIPARTITION_READ_FEATURE,
};
if (service::get_local_storage_service()._db.local().get_config().experimental()) {
features.push_back(MATERIALIZED_VIEWS_FEATURE);
@@ -1349,6 +1351,7 @@ future<> storage_service::init_server(int delay) {
ss._range_tombstones_feature = gms::feature(RANGE_TOMBSTONES_FEATURE);
ss._large_partitions_feature = gms::feature(LARGE_PARTITIONS_FEATURE);
ss._counters_feature = gms::feature(COUNTERS_FEATURE);
ss._digest_multipartition_read_feature = gms::feature(DIGEST_MULTIPARTITION_READ_FEATURE);
if (ss._db.local().get_config().experimental()) {
ss._materialized_views_feature = gms::feature(MATERIALIZED_VIEWS_FEATURE);

View File

@@ -263,6 +263,7 @@ private:
gms::feature _materialized_views_feature;
gms::feature _counters_feature;
gms::feature _indexes_feature;
gms::feature _digest_multipartition_read_feature;
public:
void enable_all_features() {
@@ -271,6 +272,7 @@ public:
_materialized_views_feature.enable();
_counters_feature.enable();
_indexes_feature.enable();
_digest_multipartition_read_feature.enable();
}
void finish_bootstrapping() {
@@ -2236,6 +2238,10 @@ public:
bool cluster_supports_indexes() const {
return bool(_indexes_feature);
}
bool cluster_supports_digest_multipartition_reads() const {
return bool(_digest_multipartition_read_feature);
}
};
inline future<> init_storage_service(distributed<database>& db) {