Merge "Fix cross shard cf usage" from Piotr

"
Lambda passed to distribute_reader_and_consume_on_shards shouldn't
capture shard local variables.

Fixes #4108

Tests:
unit(release),
dtest(update_cluster_layout_tests.TestLargeScaleCluster.add_50_nodes_test)
"

* 'haaawk/4108/v2' of github.com:scylladb/seastar-dev:
  Fix cross shard cf usage in repair
  Fix cross shard cf usage in streaming
This commit is contained in:
Avi Kivity
2019-01-24 19:40:44 +02:00
2 changed files with 6 additions and 3 deletions

View File

@@ -380,10 +380,12 @@ public:
auto get_next_mutation_fragment = [this, node_idx] () mutable {
return _mq[node_idx]->pop_eventually();
};
table& t = service::get_local_storage_service().db().local().find_column_family(_schema->id());
auto& db = service::get_local_storage_service().db();
table& t = db.local().find_column_family(_schema->id());
_writer_done[node_idx] = distribute_reader_and_consume_on_shards(_schema, dht::global_partitioner(),
make_generating_reader(_schema, std::move(get_next_mutation_fragment)),
[&t, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
[&db, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, streaming::stream_reason::repair).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write();
schema_ptr s = reader.schema();

View File

@@ -183,7 +183,8 @@ void stream_session::init_messaging_service_handler() {
};
distribute_reader_and_consume_on_shards(s, dht::global_partitioner(),
make_generating_reader(s, std::move(get_next_mutation_fragment)),
[&cf, plan_id, estimated_partitions, reason] (flat_mutation_reader reader) {
[plan_id, estimated_partitions, reason] (flat_mutation_reader reader) {
auto& cf = get_local_db().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), cf, reason).then([cf = cf.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
sstables::shared_sstable sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
schema_ptr s = reader.schema();