Merge 'Clean up old cluster features' from Piotr Sarna
" This series follows the suggestion from https://github.com/scylladb/scylla/pull/7203#issuecomment-689499773 discussion and deprecates a number of cluster features. The deprecation does not remove any features from the strings sent via gossip to other nodes, but it removes all checks for these features from code, assuming that the checks are always true. This assumption is quite safe for features introduced over 2 years ago, because the official upgrade path only allows upgrading from a previous official release, and these feature bits were introduced many release cycles ago. All deprecated features were picked from a `git blame` output which indicated that they come from 2018: ```gite46537b7d32016-05-31 11:44:17 +0200 RANGE_TOMBSTONES_FEATURE = "RANGE_TOMBSTONES";85c092c56c2016-07-11 10:59:40 +0100 LARGE_PARTITIONS_FEATURE = "LARGE_PARTITIONS";02bc0d2ab32016-12-09 22:09:30 +0100 MATERIALIZED_VIEWS_FEATURE = "MATERIALIZED_VIEWS";67ca6959bd2017-01-30 19:50:13 +0000 COUNTERS_FEATURE = "COUNTERS";815c91a1b82017-04-12 10:14:38 +0300 INDEXES_FEATURE = "INDEXES";d2a2a6d4712017-08-03 10:53:22 +0300 DIGEST_MULTIPARTITION_READ_FEATURE = "DIGEST_MULTIPARTITION_READ";ecd2bf128b2017-09-01 09:55:02 +0100 CORRECT_COUNTER_ORDER_FEATURE = "CORRECT_COUNTER_ORDER";713d75fd512017-09-14 19:15:41 +0200 SCHEMA_TABLES_V3 = "SCHEMA_TABLES_V3";2f513514cc2017-11-29 11:57:09 +0000 CORRECT_NON_COMPOUND_RANGE_TOMBSTONES = "CORRECT_NON_COMPOUND_RANGE_TOMBSTONES";0be3bd383b2017-12-04 13:55:36 +0200 WRITE_FAILURE_REPLY_FEATURE = "WRITE_FAILURE_REPLY";0bab3e59c22017-11-30 00:16:34 +0000 XXHASH_FEATURE = "XXHASH";fbc97626c42018-01-14 21:28:58 -0500 ROLES_FEATURE = "ROLES";802be72ca62018-03-18 06:25:52 +0100 LA_SSTABLE_FEATURE = "LA_SSTABLE_FORMAT";71e22fe9812018-05-25 10:37:54 +0800 STREAM_WITH_RPC_STREAM = "STREAM_WITH_RPC_STREAM"; ``` Tests: unit(dev) manual(verifying with cqlsh that the feature strings are indeed still set) " Closes #7234. * psarna-clean_up_features: gms: add comments for deprecated features gms: remove unused feature bits streaming: drop checks for RPC stream support roles: drop checks for roles schema support service: drop checks for xxhash support service: drop checks for write failure reply support sstables: drop checks for non-compound range tombstones support service: drop checks for v3 schema support repair: drop checks for large partitions support service: drop checks for digest multipartition read support sstables: drop checks for correct counter order support cql3: drop checks for materialized views support cql3: drop checks for counters support cql3: drop checks for indexing support
This commit is contained in:
@@ -272,9 +272,6 @@ void create_index_statement::validate_targets_for_multi_column_index(std::vector
|
||||
|
||||
future<::shared_ptr<cql_transport::event::schema_change>>
|
||||
create_index_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const {
|
||||
if (!proxy.features().cluster_supports_indexes()) {
|
||||
throw exceptions::invalid_request_exception("Index support is not enabled");
|
||||
}
|
||||
auto& db = proxy.get_db().local();
|
||||
auto schema = db.find_schema(keyspace(), column_family());
|
||||
std::vector<::shared_ptr<index_target>> targets;
|
||||
|
||||
@@ -211,9 +211,6 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
|
||||
for (auto&& entry : _definitions) {
|
||||
::shared_ptr<column_identifier> id = entry.first;
|
||||
cql3_type pt = entry.second->prepare(db, keyspace());
|
||||
if (pt.is_counter() && !db.features().cluster_supports_counters()) {
|
||||
throw exceptions::invalid_request_exception("Counter support is not enabled");
|
||||
}
|
||||
if (pt.get_type()->is_multi_cell()) {
|
||||
if (pt.get_type()->is_user_type()) {
|
||||
// check for multi-cell types (non-frozen UDTs or collections) inside a non-frozen UDT
|
||||
|
||||
@@ -93,9 +93,6 @@ future<> create_view_statement::check_access(service::storage_proxy& proxy, cons
|
||||
}
|
||||
|
||||
void create_view_statement::validate(service::storage_proxy& proxy, const service::client_state& state) const {
|
||||
if (!proxy.features().cluster_supports_materialized_views()) {
|
||||
throw exceptions::invalid_request_exception("Can't create materialized views until the whole cluster has been upgraded");
|
||||
}
|
||||
}
|
||||
|
||||
static const column_definition* get_column_definition(const schema& schema, column_identifier::raw& identifier) {
|
||||
|
||||
@@ -88,9 +88,6 @@ void drop_index_statement::validate(service::storage_proxy& proxy, const service
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_index_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
{
|
||||
if (!proxy.features().cluster_supports_indexes()) {
|
||||
throw exceptions::invalid_request_exception("Index support is not enabled");
|
||||
}
|
||||
auto cfm = lookup_indexed_table(proxy);
|
||||
if (!cfm) {
|
||||
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>(nullptr);
|
||||
|
||||
@@ -68,11 +68,7 @@ cql3::statements::permission_altering_statement::permission_altering_statement(
|
||||
, _role_name(rn.to_string()) {
|
||||
}
|
||||
|
||||
void cql3::statements::permission_altering_statement::validate(service::storage_proxy& proxy, const service::client_state& state) const {
|
||||
if (!proxy.features().cluster_supports_roles()) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"You cannot modify access-control information until the cluster has fully upgraded.");
|
||||
}
|
||||
void cql3::statements::permission_altering_statement::validate(service::storage_proxy&, const service::client_state&) const {
|
||||
}
|
||||
|
||||
future<> cql3::statements::permission_altering_statement::check_access(service::storage_proxy& proxy, const service::client_state& state) const {
|
||||
|
||||
@@ -82,11 +82,7 @@ static future<result_message_ptr> void_result_message() {
|
||||
return make_ready_future<result_message_ptr>(nullptr);
|
||||
}
|
||||
|
||||
void validate_cluster_support(service::storage_proxy& proxy) {
|
||||
if (!proxy.features().cluster_supports_roles()) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"You cannot modify access-control information until the cluster has fully upgraded.");
|
||||
}
|
||||
void validate_cluster_support(service::storage_proxy&) {
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
#include "gms/feature_service.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
// Deprecated features - sent to other nodes via gossip, but assumed true in the code
|
||||
constexpr std::string_view features::RANGE_TOMBSTONES = "RANGE_TOMBSTONES";
|
||||
constexpr std::string_view features::LARGE_PARTITIONS = "LARGE_PARTITIONS";
|
||||
constexpr std::string_view features::MATERIALIZED_VIEWS = "MATERIALIZED_VIEWS";
|
||||
@@ -38,10 +40,12 @@ constexpr std::string_view features::SCHEMA_TABLES_V3 = "SCHEMA_TABLES_V3";
|
||||
constexpr std::string_view features::CORRECT_NON_COMPOUND_RANGE_TOMBSTONES = "CORRECT_NON_COMPOUND_RANGE_TOMBSTONES";
|
||||
constexpr std::string_view features::WRITE_FAILURE_REPLY = "WRITE_FAILURE_REPLY";
|
||||
constexpr std::string_view features::XXHASH = "XXHASH";
|
||||
constexpr std::string_view features::UDF = "UDF";
|
||||
constexpr std::string_view features::ROLES = "ROLES";
|
||||
constexpr std::string_view features::LA_SSTABLE = "LA_SSTABLE_FORMAT";
|
||||
constexpr std::string_view features::STREAM_WITH_RPC_STREAM = "STREAM_WITH_RPC_STREAM";
|
||||
|
||||
// Up-to-date features
|
||||
constexpr std::string_view features::UDF = "UDF";
|
||||
constexpr std::string_view features::MC_SSTABLE = "MC_SSTABLE_FORMAT";
|
||||
constexpr std::string_view features::MD_SSTABLE = "MD_SSTABLE_FORMAT";
|
||||
constexpr std::string_view features::ROW_LEVEL_REPAIR = "ROW_LEVEL_REPAIR";
|
||||
@@ -65,20 +69,7 @@ feature_config::feature_config() {
|
||||
}
|
||||
|
||||
feature_service::feature_service(feature_config cfg) : _config(cfg)
|
||||
, _range_tombstones_feature(*this, features::RANGE_TOMBSTONES)
|
||||
, _large_partitions_feature(*this, features::LARGE_PARTITIONS)
|
||||
, _materialized_views_feature(*this, features::MATERIALIZED_VIEWS)
|
||||
, _counters_feature(*this, features::COUNTERS)
|
||||
, _indexes_feature(*this, features::INDEXES)
|
||||
, _digest_multipartition_read_feature(*this, features::DIGEST_MULTIPARTITION_READ)
|
||||
, _correct_counter_order_feature(*this, features::CORRECT_COUNTER_ORDER)
|
||||
, _schema_tables_v3(*this, features::SCHEMA_TABLES_V3)
|
||||
, _correct_non_compound_range_tombstones(*this, features::CORRECT_NON_COMPOUND_RANGE_TOMBSTONES)
|
||||
, _write_failure_reply_feature(*this, features::WRITE_FAILURE_REPLY)
|
||||
, _xxhash_feature(*this, features::XXHASH)
|
||||
, _udf_feature(*this, features::UDF)
|
||||
, _roles_feature(*this, features::ROLES)
|
||||
, _stream_with_rpc_stream_feature(*this, features::STREAM_WITH_RPC_STREAM)
|
||||
, _mc_sstable_feature(*this, features::MC_SSTABLE)
|
||||
, _md_sstable_feature(*this, features::MD_SSTABLE)
|
||||
, _row_level_repair_feature(*this, features::ROW_LEVEL_REPAIR)
|
||||
@@ -162,6 +153,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
|
||||
// introduced in scylla, update it here, e.g.,
|
||||
// return sstring("FEATURE1,FEATURE2")
|
||||
std::set<std::string_view> features = {
|
||||
// Deprecated features - sent to other nodes via gossip, but assumed true in the code
|
||||
gms::features::RANGE_TOMBSTONES,
|
||||
gms::features::LARGE_PARTITIONS,
|
||||
gms::features::COUNTERS,
|
||||
@@ -176,6 +168,8 @@ std::set<std::string_view> feature_service::known_feature_set() {
|
||||
gms::features::STREAM_WITH_RPC_STREAM,
|
||||
gms::features::MATERIALIZED_VIEWS,
|
||||
gms::features::INDEXES,
|
||||
|
||||
// Up-to-date features
|
||||
gms::features::ROW_LEVEL_REPAIR,
|
||||
gms::features::TRUNCATION_TABLE,
|
||||
gms::features::CORRECT_STATIC_COMPACT_IN_MC,
|
||||
@@ -255,20 +249,7 @@ db::schema_features feature_service::cluster_schema_features() const {
|
||||
|
||||
void feature_service::enable(const std::set<std::string_view>& list) {
|
||||
for (gms::feature& f : {
|
||||
std::ref(_range_tombstones_feature),
|
||||
std::ref(_large_partitions_feature),
|
||||
std::ref(_materialized_views_feature),
|
||||
std::ref(_counters_feature),
|
||||
std::ref(_indexes_feature),
|
||||
std::ref(_digest_multipartition_read_feature),
|
||||
std::ref(_correct_counter_order_feature),
|
||||
std::ref(_schema_tables_v3),
|
||||
std::ref(_correct_non_compound_range_tombstones),
|
||||
std::ref(_write_failure_reply_feature),
|
||||
std::ref(_xxhash_feature),
|
||||
std::ref(_udf_feature),
|
||||
std::ref(_roles_feature),
|
||||
std::ref(_stream_with_rpc_stream_feature),
|
||||
std::ref(_mc_sstable_feature),
|
||||
std::ref(_md_sstable_feature),
|
||||
std::ref(_row_level_repair_feature),
|
||||
|
||||
@@ -75,20 +75,7 @@ public:
|
||||
std::set<std::string_view> supported_feature_set();
|
||||
|
||||
private:
|
||||
gms::feature _range_tombstones_feature;
|
||||
gms::feature _large_partitions_feature;
|
||||
gms::feature _materialized_views_feature;
|
||||
gms::feature _counters_feature;
|
||||
gms::feature _indexes_feature;
|
||||
gms::feature _digest_multipartition_read_feature;
|
||||
gms::feature _correct_counter_order_feature;
|
||||
gms::feature _schema_tables_v3;
|
||||
gms::feature _correct_non_compound_range_tombstones;
|
||||
gms::feature _write_failure_reply_feature;
|
||||
gms::feature _xxhash_feature;
|
||||
gms::feature _udf_feature;
|
||||
gms::feature _roles_feature;
|
||||
gms::feature _stream_with_rpc_stream_feature;
|
||||
gms::feature _mc_sstable_feature;
|
||||
gms::feature _md_sstable_feature;
|
||||
gms::feature _row_level_repair_feature;
|
||||
@@ -107,62 +94,10 @@ private:
|
||||
gms::feature _digest_for_null_values_feature;
|
||||
|
||||
public:
|
||||
bool cluster_supports_range_tombstones() const {
|
||||
return bool(_range_tombstones_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_large_partitions() const {
|
||||
return bool(_large_partitions_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_materialized_views() const {
|
||||
return bool(_materialized_views_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_counters() const {
|
||||
return bool(_counters_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_indexes() const {
|
||||
return bool(_indexes_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_digest_multipartition_reads() const {
|
||||
return bool(_digest_multipartition_read_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_correct_counter_order() const {
|
||||
return bool(_correct_counter_order_feature);
|
||||
}
|
||||
|
||||
const gms::feature& cluster_supports_schema_tables_v3() const {
|
||||
return _schema_tables_v3;
|
||||
}
|
||||
|
||||
bool cluster_supports_reading_correctly_serialized_range_tombstones() const {
|
||||
return bool(_correct_non_compound_range_tombstones);
|
||||
}
|
||||
|
||||
bool cluster_supports_write_failure_reply() const {
|
||||
return bool(_write_failure_reply_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_xxhash_digest_algorithm() const {
|
||||
return bool(_xxhash_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_user_defined_functions() const {
|
||||
return bool(_udf_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_roles() const {
|
||||
return bool(_roles_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_stream_with_rpc_stream() const {
|
||||
return bool(_stream_with_rpc_stream_feature);
|
||||
}
|
||||
|
||||
const feature& cluster_supports_mc_sstable() const {
|
||||
return _mc_sstable_feature;
|
||||
}
|
||||
|
||||
@@ -63,14 +63,8 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c)
|
||||
return counter_cell_view::with_linearized(c, [&] (counter_cell_view ccv) {
|
||||
auto shards = std::move(value).start_value_counter_cell_full()
|
||||
.start_shards();
|
||||
if (service::get_local_storage_service().features().cluster_supports_correct_counter_order()) {
|
||||
for (auto csv : ccv.shards()) {
|
||||
shards.add_shards(counter_shard(csv));
|
||||
}
|
||||
} else {
|
||||
for (auto& cs : ccv.shards_compatible_with_1_7_4()) {
|
||||
shards.add_shards(cs);
|
||||
}
|
||||
for (auto csv : ccv.shards()) {
|
||||
shards.add_shards(counter_shard(csv));
|
||||
}
|
||||
return std::move(shards).end_shards().end_counter_cell_full();
|
||||
});
|
||||
|
||||
@@ -931,8 +931,7 @@ static future<> repair_cf_range(repair_info& ri,
|
||||
check_in_shutdown();
|
||||
ri.check_in_abort();
|
||||
return seastar::get_units(parallelism_semaphore, 1).then([&ri, &completion, &success, &neighbors, &cf, range] (auto signal_sem) {
|
||||
auto checksum_type = ri.db.local().features().cluster_supports_large_partitions()
|
||||
? repair_checksum::streamed : repair_checksum::legacy;
|
||||
auto checksum_type = repair_checksum::streamed;
|
||||
|
||||
// Ask this node, and all neighbors, to calculate checksums in
|
||||
// this range. When all are done, compare the results, and if
|
||||
|
||||
@@ -73,9 +73,6 @@ future<> migration_manager::stop()
|
||||
{
|
||||
mlogger.info("stopping migration service");
|
||||
_as.request_abort();
|
||||
if (!_cluster_upgraded) {
|
||||
_wait_cluster_upgraded.broken();
|
||||
}
|
||||
|
||||
return uninit_messaging_service().then([this] {
|
||||
return parallel_for_each(_schema_pulls.begin(), _schema_pulls.end(), [] (auto&& e) {
|
||||
@@ -103,10 +100,6 @@ void migration_manager::init_messaging_service()
|
||||
_feature_listeners.push_back(_feat.cluster_supports_cdc().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(_feat.cluster_supports_per_table_partitioners().when_enabled(update_schema));
|
||||
}
|
||||
_feature_listeners.push_back(_feat.cluster_supports_schema_tables_v3().when_enabled([this] {
|
||||
_cluster_upgraded = true;
|
||||
_wait_cluster_upgraded.broadcast();
|
||||
}));
|
||||
|
||||
_messaging.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
|
||||
auto src = netw::messaging_service::get_source(cinfo);
|
||||
@@ -243,15 +236,6 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Disable pulls during rolling upgrade from 1.7 to 2.0 to avoid
|
||||
// schema version inconsistency. See https://github.com/scylladb/scylla/issues/2802.
|
||||
if (!_cluster_upgraded) {
|
||||
mlogger.debug("Delaying pull with {} until cluster upgrade is complete", endpoint);
|
||||
return _wait_cluster_upgraded.wait().then([this, their_version, endpoint] {
|
||||
return maybe_schedule_schema_pull(their_version, endpoint);
|
||||
}).finally([me = shared_from_this()] {});
|
||||
}
|
||||
|
||||
if (db.get_version() == database::empty_version || runtime::get_uptime() < migration_delay) {
|
||||
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
|
||||
mlogger.debug("Submitting migration task for {}", endpoint);
|
||||
|
||||
@@ -76,8 +76,6 @@ private:
|
||||
gms::feature_service& _feat;
|
||||
netw::messaging_service& _messaging;
|
||||
seastar::abort_source _as;
|
||||
bool _cluster_upgraded = false;
|
||||
seastar::condition_variable _wait_cluster_upgraded;
|
||||
public:
|
||||
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms);
|
||||
|
||||
|
||||
@@ -126,9 +126,7 @@ static inline
|
||||
query::digest_algorithm digest_algorithm(service::storage_proxy& proxy) {
|
||||
return proxy.features().cluster_supports_digest_for_null_values()
|
||||
? query::digest_algorithm::xxHash
|
||||
: proxy.features().cluster_supports_xxhash_digest_algorithm()
|
||||
? query::digest_algorithm::legacy_xxHash_without_null_digest
|
||||
: query::digest_algorithm::MD5;
|
||||
: query::digest_algorithm::legacy_xxHash_without_null_digest;
|
||||
}
|
||||
|
||||
static inline
|
||||
@@ -3775,10 +3773,6 @@ class range_slice_read_executor : public never_speculating_read_executor {
|
||||
public:
|
||||
using never_speculating_read_executor::never_speculating_read_executor;
|
||||
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) override {
|
||||
if (!_proxy->features().cluster_supports_digest_multipartition_reads()) {
|
||||
reconcile(_cl, timeout);
|
||||
return _result_promise.get_future();
|
||||
}
|
||||
return never_speculating_read_executor::execute(timeout);
|
||||
}
|
||||
};
|
||||
@@ -4849,18 +4843,16 @@ void storage_proxy::init_messaging_service() {
|
||||
// ignore results, since we'll be returning them via MUTATION_DONE/MUTATION_FAILURE verbs
|
||||
auto fut = make_ready_future<seastar::rpc::no_wait_type>(netw::messaging_service::no_wait());
|
||||
if (errors) {
|
||||
if (p->features().cluster_supports_write_failure_reply()) {
|
||||
tracing::trace(trace_state_ptr, "Sending mutation_failure with {} failures to /{}", errors, reply_to);
|
||||
fut = p->_messaging.send_mutation_failed(
|
||||
netw::messaging_service::msg_addr{reply_to, shard},
|
||||
shard,
|
||||
response_id,
|
||||
errors,
|
||||
p->get_view_update_backlog()).then_wrapped([] (future<> f) {
|
||||
f.ignore_ready_future();
|
||||
return netw::messaging_service::no_wait();
|
||||
});
|
||||
}
|
||||
tracing::trace(trace_state_ptr, "Sending mutation_failure with {} failures to /{}", errors, reply_to);
|
||||
fut = p->_messaging.send_mutation_failed(
|
||||
netw::messaging_service::msg_addr{reply_to, shard},
|
||||
shard,
|
||||
response_id,
|
||||
errors,
|
||||
p->get_view_update_backlog()).then_wrapped([] (future<> f) {
|
||||
f.ignore_ready_future();
|
||||
return netw::messaging_service::no_wait();
|
||||
});
|
||||
}
|
||||
return fut.finally([trace_state_ptr] {
|
||||
tracing::trace(trace_state_ptr, "Mutation handling is done");
|
||||
|
||||
@@ -53,8 +53,7 @@ sstable_writer_config sstables_manager::configure_writer() const {
|
||||
cfg.validate_keys = _db_config.enable_sstable_key_validation();
|
||||
cfg.summary_byte_cost = summary_byte_cost(_db_config.sstable_summary_ratio());
|
||||
|
||||
cfg.correctly_serialize_non_compound_range_tombstones =
|
||||
_features.cluster_supports_reading_correctly_serialized_range_tombstones();
|
||||
cfg.correctly_serialize_non_compound_range_tombstones = true;
|
||||
cfg.correctly_serialize_static_compact_in_mc =
|
||||
bool(_features.cluster_supports_correct_static_compact_in_mc());
|
||||
|
||||
|
||||
@@ -586,14 +586,8 @@ void write_counter_value(counter_cell_view ccv, W& out, sstable_version_types v,
|
||||
int64_t(uuid.get_least_significant_bits()),
|
||||
int64_t(s.logical_clock()), int64_t(s.value()));
|
||||
};
|
||||
if (service::get_local_storage_service().features().cluster_supports_correct_counter_order()) {
|
||||
for (auto&& s : ccv.shards()) {
|
||||
write_shard(s);
|
||||
}
|
||||
} else {
|
||||
for (auto&& s : ccv.shards_compatible_with_1_7_4()) {
|
||||
write_shard(s);
|
||||
}
|
||||
for (auto&& s : ccv.shards()) {
|
||||
write_shard(s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -228,11 +228,7 @@ future<> stream_transfer_task::execute() {
|
||||
plan_id, cf_id, this_shard_id());
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (si->db.features().cluster_supports_stream_with_rpc_stream()) {
|
||||
return send_mutation_fragments(std::move(si));
|
||||
} else {
|
||||
throw std::runtime_error("cluster does not support STREAM_WITH_RPC_STREAM feature");
|
||||
}
|
||||
return send_mutation_fragments(std::move(si));
|
||||
});
|
||||
}).then([this, plan_id, cf_id, id] {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
|
||||
|
||||
Reference in New Issue
Block a user