Compare commits
3 Commits
scylla-3.2
...
next-3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d112a230c0 | ||
|
|
4371cb41d0 | ||
|
|
a8b9f94dcb |
@@ -254,6 +254,9 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
if (column_family.empty()) {
|
||||
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
}
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
|
||||
@@ -177,6 +177,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
|
||||
return handle_end_of_stream();
|
||||
}
|
||||
});
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
for (auto& q : _queue_reader_handles) {
|
||||
if (q) {
|
||||
q->abort(ep);
|
||||
}
|
||||
}
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -444,7 +444,7 @@ class repair_writer {
|
||||
uint64_t _estimated_partitions;
|
||||
size_t _nr_peer_nodes;
|
||||
// Needs more than one for repair master
|
||||
std::vector<std::optional<future<uint64_t>>> _writer_done;
|
||||
std::vector<std::optional<future<>>> _writer_done;
|
||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||
// Current partition written to disk
|
||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||
@@ -524,7 +524,15 @@ public:
|
||||
return consumer(std::move(reader));
|
||||
});
|
||||
},
|
||||
t.stream_in_progress());
|
||||
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
_mq[node_idx]->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
future<> write_partition_end(unsigned node_idx) {
|
||||
@@ -551,21 +559,33 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_wait_for_writer_done(unsigned node_idx) {
|
||||
if (_writer_done[node_idx]) {
|
||||
return std::move(*(_writer_done[node_idx]));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> wait_for_writer_done() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
||||
rlogger.debug("Managed to write partitions={} to sstable", partitions);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -288,10 +288,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
|
||||
+ column_offset(column_kind::regular_column),
|
||||
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
||||
|
||||
std::sort(_raw._columns.begin(),
|
||||
std::stable_sort(_raw._columns.begin(),
|
||||
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
_raw._columns.begin() + column_offset(column_kind::static_column),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
|
||||
|
||||
@@ -118,6 +118,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
|
||||
auto muts = gen(partition_nr);
|
||||
schema_ptr s = gen.schema();
|
||||
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
|
||||
int mf_produced = 0;
|
||||
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
|
||||
if (mf_produced++ > 800) {
|
||||
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
|
||||
} else {
|
||||
return source_reader(db::no_timeout);
|
||||
}
|
||||
};
|
||||
auto& partitioner = dht::global_partitioner();
|
||||
try {
|
||||
distribute_reader_and_consume_on_shards(s, partitioner,
|
||||
make_generating_reader(s, std::move(get_next_mutation_fragment)),
|
||||
[&partitioner, error] (flat_mutation_reader reader) mutable {
|
||||
if (error) {
|
||||
return make_exception_future<>(std::runtime_error("Failed to write"));
|
||||
}
|
||||
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
|
||||
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
|
||||
if (mf_opt) {
|
||||
if (mf_opt->is_partition_start()) {
|
||||
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
|
||||
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
).get0();
|
||||
} catch (...) {
|
||||
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class bucket_writer {
|
||||
|
||||
Reference in New Issue
Block a user