Merge branch 'dev/pdziepak/streamed-mutations-streaming/v3'

Support for streaming of large partitions from Paweł:

This series converts streaming to streaming_mutations so that there
is need to store full mutation in memory in order to send or receive
it.

The first several patches add a way of estimating mutation fragment
memory usage and introduce fragment_and_freeze() which produces
a stream of reasonably sized frozen mutations from a single streamed
mutation.

The second part of this patchset makes sure that streaming mutations
in fragments doesn't break isolation guarantees. This is achieved by
delaying visibility of sstables produced by streaming until the
streaming is completed. However, our current receiving code merges
mutations from all streaming plans together thus making it impossible
to track which data was received from a particular streaming plan.
The solution to that problem is to introduce an additional flag to
STREAM_MUTATION verb which informs the receiver whether the mutation
is fragmented and care must be taken to preserve isolation. Small
mutations behaved as they were, with writes from different stream
plans coalesced while big mutations are handled separately for each
streaming task.
This commit is contained in:
Tomasz Grabiec
2016-07-07 13:23:39 +02:00
23 changed files with 509 additions and 121 deletions

View File

@@ -63,5 +63,8 @@ public:
::feed_hash(as_collection_mutation(), h, def.type);
}
}
size_t memory_usage() const {
return _data.memory_usage();
}
friend std::ostream& operator<<(std::ostream&, const atomic_cell_or_collection&);
};

View File

@@ -113,6 +113,13 @@ column_family::make_streaming_memtable_list() {
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
}
lw_shared_ptr<memtable_list>
column_family::make_streaming_memtable_big_list(streaming_memtable_big& smb) {
auto seal = [this, &smb] (memtable_list::flush_behavior) { return seal_active_streaming_memtable_big(smb); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
}
column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager)
: _schema(std::move(schema))
, _config(std::move(config))
@@ -166,6 +173,11 @@ logalloc::occupancy_stats column_family::occupancy() const {
for (auto m : *_streaming_memtables) {
res += m->region().occupancy();
}
for (auto smb : _streaming_memtables_big) {
for (auto m : *smb.second->memtables) {
res += m->region().occupancy();
}
}
return res;
}
@@ -687,6 +699,8 @@ column_family::seal_active_streaming_memtable_immediate() {
}
_streaming_memtables->add_memtable();
_streaming_memtables->erase(old);
auto guard = _streaming_flush_phaser.start();
return with_gate(_streaming_flush_gate, [this, old] {
_delayed_streaming_flush.cancel();
@@ -735,6 +749,35 @@ column_family::seal_active_streaming_memtable_immediate() {
});
return f;
}).finally([guard = std::move(guard)] { });
}
future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_big& smb) {
auto old = smb.memtables->back();
if (old->empty()) {
return make_ready_future<>();
}
smb.memtables->add_memtable();
smb.memtables->erase(old);
return with_gate(_streaming_flush_gate, [this, old, &smb] {
return with_gate(smb.flush_in_progress, [this, old, &smb] {
return with_lock(_sstables_lock.for_read(), [this, old, &smb] {
auto newtab = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(),
_config.datadir, calculate_generation_for_new_table(),
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
newtab->set_unshared();
auto&& priority = service::get_local_streaming_write_priority();
return newtab->write_components(*old, incremental_backups_enabled(), priority, true).then([this, newtab, old, &smb] {
smb.sstables.emplace_back(newtab);
}).handle_exception([] (auto ep) {
dblog.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
});
});
});
});
}
@@ -2105,11 +2148,26 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const
}
}
void column_family::apply_streaming_mutation(schema_ptr m_schema, const frozen_mutation& m) {
void column_family::apply_streaming_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) {
if (fragmented) {
apply_streaming_big_mutation(std::move(m_schema), plan_id, m);
return;
}
_streaming_memtables->active_memtable().apply(m, m_schema);
_streaming_memtables->seal_on_overflow();
}
void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
it = _streaming_memtables_big.emplace(plan_id, make_lw_shared<streaming_memtable_big>()).first;
it->second->memtables = _config.enable_disk_writes ? make_streaming_memtable_big_list(*it->second) : make_memory_only_memtable_list();
}
auto entry = it->second;
entry->memtables->active_memtable().apply(m, m_schema);
entry->memtables->seal_on_overflow();
}
void
column_family::check_valid_rp(const db::replay_position& rp) const {
if (rp < _highest_flushed_rp) {
@@ -2215,15 +2273,15 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m) {
});
}
future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation& m) {
future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) {
if (!s->is_synced()) {
throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s",
s->ks_name(), s->cf_name(), s->version()));
}
return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] {
return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, fragmented, s = std::move(s)] {
auto uuid = m.column_family_id();
auto& cf = find_column_family(uuid);
cf.apply_streaming_mutation(s, std::move(m));
cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented);
});
}
@@ -2728,13 +2786,17 @@ future<> column_family::flush(const db::replay_position& pos) {
// so we always flush. When we can differentiate those streams, we should not
// be indiscriminately touching the cache during repair. We will just have to
// invalidate the entries that are relevant to things we already have in the cache.
future<> column_family::flush_streaming_mutations(std::vector<query::partition_range> ranges) {
future<> column_family::flush_streaming_mutations(utils::UUID plan_id, std::vector<query::partition_range> ranges) {
// This will effectively take the gate twice for this call. The proper way to fix that would
// be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we
// need this code to go away as soon as we can (see FIXME above). So the double gate is a better
// temporary counter measure.
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).finally([this, ranges = std::move(ranges)] {
return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] {
return flush_streaming_big_mutations(plan_id).then([this] {
return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed);
}).finally([this] {
return _streaming_flush_phaser.advance_and_await();
}).finally([this, ranges = std::move(ranges)] {
if (!_config.enable_cache) {
return make_ready_future<>();
}
@@ -2747,11 +2809,49 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
});
}
future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
return make_ready_future<>();
}
auto entry = it->second;
_streaming_memtables_big.erase(it);
return entry->memtables->seal_active_memtable(memtable_list::flush_behavior::immediate).then([entry] {
return entry->flush_in_progress.close();
}).then([this, entry] {
return parallel_for_each(entry->sstables, [this] (auto& sst) {
return sst->seal_sstable(this->incremental_backups_enabled()).then([sst] {
return sst->open_data();
});
}).then([this, entry] {
for (auto&& sst : entry->sstables) {
add_sstable(sst);
}
trigger_compaction();
});
});
}
future<> column_family::fail_streaming_mutations(utils::UUID plan_id) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
return make_ready_future<>();
}
auto entry = it->second;
_streaming_memtables_big.erase(it);
return entry->flush_in_progress.close().then([this, entry] {
for (auto&& sst : entry->sstables) {
sst->mark_for_deletion();
}
});
}
future<> column_family::clear() {
_memtables->clear();
_memtables->add_memtable();
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
_streaming_memtables_big.clear();
return _cache.clear();
}
@@ -2834,6 +2934,12 @@ void column_family::set_schema(schema_ptr s) {
m->set_schema(s);
}
for (auto smb : _streaming_memtables_big) {
for (auto m : *smb.second->memtables) {
m->set_schema(s);
}
}
_cache.set_schema(s);
_schema = std::move(s);

View File

@@ -371,13 +371,33 @@ private:
// memory throttling mechanism, guaranteeing we will not overload the
// server.
lw_shared_ptr<memtable_list> _streaming_memtables;
utils::phased_barrier _streaming_flush_phaser;
friend class memtable_dirty_memory_manager;
friend class streaming_dirty_memory_manager;
// If mutations are fragmented during streaming the sstables cannot be made
// visible immediately after memtable flush, because that could cause
// readers to see only a part of a partition thus violating isolation
// guarantees.
// Mutations that are sent in fragments are kept separately in per-streaming
// plan memtables and the resulting sstables are not made visible until
// the streaming is complete.
struct streaming_memtable_big {
lw_shared_ptr<memtable_list> memtables;
std::vector<sstables::shared_sstable> sstables;
seastar::gate flush_in_progress;
};
std::unordered_map<utils::UUID, lw_shared_ptr<streaming_memtable_big>> _streaming_memtables_big;
future<> flush_streaming_big_mutations(utils::UUID plan_id);
void apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m);
future<> seal_active_streaming_memtable_big(streaming_memtable_big& smb);
lw_shared_ptr<memtable_list> make_memory_only_memtable_list();
lw_shared_ptr<memtable_list> make_memtable_list();
lw_shared_ptr<memtable_list> make_streaming_memtable_list();
lw_shared_ptr<memtable_list> make_streaming_memtable_big_list(streaming_memtable_big& smb);
sstables::compaction_strategy _compaction_strategy;
// generation -> sstable. Ordered by key so we can easily get the most recent.
@@ -530,7 +550,7 @@ public:
// The mutation is always upgraded to current schema.
void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position());
void apply(const mutation& m, const db::replay_position& = db::replay_position());
void apply_streaming_mutation(schema_ptr, const frozen_mutation&);
void apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented);
// Returns at most "cmd.limit" rows
future<lw_shared_ptr<query::result>> query(schema_ptr,
@@ -543,7 +563,8 @@ public:
future<> stop();
future<> flush();
future<> flush(const db::replay_position&);
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
future<> flush_streaming_mutations(utils::UUID plan_id, std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
future<> fail_streaming_mutations(utils::UUID plan_id);
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
@@ -1027,7 +1048,7 @@ public:
future<lw_shared_ptr<query::result>> query(schema_ptr, const query::read_command& cmd, query::result_request request, const std::vector<query::partition_range>& ranges);
future<reconcilable_result> query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range);
future<> apply(schema_ptr, const frozen_mutation&);
future<> apply_streaming_mutation(schema_ptr, const frozen_mutation&);
future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);
const sstring& get_snitch_name() const;
future<> clear_snapshot(sstring tag, std::vector<sstring> keyspace_names);

View File

@@ -188,3 +188,106 @@ future<frozen_mutation> freeze(streamed_mutation sm) {
return consume(sm, streamed_mutation_freezer(*sm.schema(), sm.key()));
});
}
class fragmenting_mutation_freezer {
const schema& _schema;
partition_key _key;
tombstone _partition_tombstone;
stdx::optional<static_row> _sr;
std::deque<clustering_row> _crs;
range_tombstone_list _rts;
stdx::optional<range_tombstone_begin> _range_tombstone_begin;
frozen_mutation_consumer_fn _consumer;
bool _fragmented = false;
size_t _dirty_size = 0;
size_t _fragment_size;
private:
future<> flush() {
bytes_ostream out;
ser::writer_of_mutation wom(out);
std::move(wom).write_table_id(_schema.id())
.write_schema_version(_schema.version())
.write_key(_key)
.partition([&] (auto wr) {
serialize_mutation_fragments(_schema, _partition_tombstone,
std::move(_sr), std::move(_rts),
std::move(_crs), std::move(wr));
}).end_mutation();
_sr = { };
_rts.clear();
_crs.clear();
_dirty_size = 0;
return _consumer(frozen_mutation(out.linearize(), _key), _fragmented);
}
future<stop_iteration> maybe_flush() {
if (_dirty_size >= _fragment_size) {
_fragmented = true;
return flush().then([] { return stop_iteration::no; });
}
return make_ready_future<stop_iteration>(stop_iteration::no);
}
public:
fragmenting_mutation_freezer(const schema& s, const partition_key& key, frozen_mutation_consumer_fn c, size_t fragment_size)
: _schema(s), _key(key), _rts(s), _consumer(c), _fragment_size(fragment_size) { }
void consume(tombstone pt) {
_dirty_size += sizeof(tombstone);
_partition_tombstone = pt;
}
future<stop_iteration> consume(static_row&& sr) {
_sr = std::move(sr);
_dirty_size += _sr->memory_usage() + sizeof(sr);
return maybe_flush();
}
future<stop_iteration> consume(clustering_row&& cr) {
_dirty_size += cr.memory_usage() + sizeof(cr);
_crs.emplace_back(std::move(cr));
return maybe_flush();
}
future<stop_iteration> consume(range_tombstone_begin&& rtb) {
assert(!_range_tombstone_begin);
_range_tombstone_begin = std::move(rtb);
return make_ready_future<stop_iteration>(stop_iteration::no);
}
future<stop_iteration> consume(range_tombstone_end&& rte) {
assert(_range_tombstone_begin);
_dirty_size += _range_tombstone_begin->memory_usage() + rte.memory_usage() + sizeof(range_tombstone);
_rts.apply(_schema, std::move(_range_tombstone_begin->key()), _range_tombstone_begin->kind(),
std::move(rte.key()), rte.kind(), _range_tombstone_begin->tomb());
_range_tombstone_begin = { };
return maybe_flush();
}
future<stop_iteration> consume_end_of_stream() {
assert(!_range_tombstone_begin);
if (_dirty_size) {
return flush().then([] { return stop_iteration::yes; });
}
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
};
future<> fragment_and_freeze(streamed_mutation sm, frozen_mutation_consumer_fn c, size_t fragment_size)
{
fragmenting_mutation_freezer freezer(*sm.schema(), sm.key(), c, fragment_size);
return do_with(std::move(sm), std::move(freezer), [] (auto& sm, auto& freezer) {
freezer.consume(sm.partition_tombstone());
return repeat([&] {
return sm().then([&] (auto mfopt) {
if (!mfopt) {
return freezer.consume_end_of_stream();
}
return std::move(*mfopt).consume(freezer);
});
});
});
}

View File

@@ -101,4 +101,7 @@ public:
future<frozen_mutation> freeze(streamed_mutation sm);
using frozen_mutation_consumer_fn = std::function<future<>(frozen_mutation, bool)>;
future<> fragment_and_freeze(streamed_mutation sm, frozen_mutation_consumer_fn c,
size_t fragment_size = 128 * 1024);

View File

@@ -322,6 +322,10 @@ public:
size_t size(const schema& s) const {
return std::distance(begin(s), end(s));
}
size_t memory_usage() const {
return _bytes.memory_usage();
}
};
template <typename TopLevel, typename PrefixTopLevel>

View File

@@ -644,13 +644,13 @@ future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id,
}
// STREAM_MUTATION
void messaging_service::register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func) {
void messaging_service::register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool> fragmented)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION, std::move(func));
}
future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) {
future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented) {
return send_message_timeout_and_retry<void>(this, messaging_verb::STREAM_MUTATION, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
plan_id, std::move(fm), dst_cpu_id);
plan_id, std::move(fm), dst_cpu_id, fragmented);
}
// STREAM_MUTATION_DONE

View File

@@ -216,8 +216,8 @@ public:
future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id);
// Wrapper for STREAM_MUTATION verb
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id);
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool>)>&& func);
future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented);
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id);

View File

@@ -1078,6 +1078,21 @@ row::find_cell(column_id id) const {
}
}
size_t row::memory_usage() const {
size_t mem = 0;
if (_type == storage_type::vector) {
mem += _storage.vector.v.memory_usage();
for (auto&& ac_o_c : _storage.vector.v) {
mem += ac_o_c.memory_usage();
}
} else {
for (auto&& ce : _storage.set) {
mem += sizeof(cell_entry) + ce.cell().memory_usage();
}
}
return mem;
}
template<bool reversed, typename Func>
void mutation_partition::trim_rows(const schema& s,
const std::vector<query::clustering_range>& row_ranges,

View File

@@ -265,6 +265,8 @@ public:
bool equal(column_kind kind, const schema& this_schema, const row& other, const schema& other_schema) const;
size_t memory_usage() const;
friend std::ostream& operator<<(std::ostream& os, const row& r);
};

View File

@@ -964,10 +964,10 @@ storage_proxy::mutate_locally(std::vector<mutation> mutations) {
}
future<>
storage_proxy::mutate_streaming_mutation(const schema_ptr& s, const frozen_mutation& m) {
storage_proxy::mutate_streaming_mutation(const schema_ptr& s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) {
auto shard = _db.local().shard_of(m);
return _db.invoke_on(shard, [&m, gs = global_schema_ptr(s)] (database& db) mutable -> future<> {
return db.apply_streaming_mutation(gs, m);
return _db.invoke_on(shard, [&m, plan_id, fragmented, gs = global_schema_ptr(s)] (database& db) mutable -> future<> {
return db.apply_streaming_mutation(gs, plan_id, m, fragmented);
});
}

View File

@@ -254,7 +254,7 @@ public:
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m);
future<> mutate_locally(std::vector<mutation> mutations);
future<> mutate_streaming_mutation(const schema_ptr&, const frozen_mutation& m);
future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented);
/**
* Use this method to have these Mutations applied

View File

@@ -817,23 +817,26 @@ void sstable::write_toc(const io_priority_class& pc) {
});
}
void sstable::seal_sstable() {
future<> sstable::seal_sstable() {
// SSTable sealing is about renaming temporary TOC file after guaranteeing
// that each component reached the disk safely.
file dir_f = open_checked_directory(sstable_write_error, _dir).get0();
sstable_write_io_check([&] {
return open_checked_directory(sstable_write_error, _dir).then([this] (file dir_f) {
// Guarantee that every component of this sstable reached the disk.
dir_f.flush().get();
// Rename TOC because it's no longer temporary.
engine().rename_file(filename(sstable::component_type::TemporaryTOC), filename(sstable::component_type::TOC)).get();
// Guarantee that the changes above reached the disk.
dir_f.flush().get();
dir_f.close().get();
return sstable_write_io_check([&] { return dir_f.flush(); }).then([this] {
// Rename TOC because it's no longer temporary.
return sstable_write_io_check([&] {
return engine().rename_file(filename(sstable::component_type::TemporaryTOC), filename(sstable::component_type::TOC));
});
}).then([this, dir_f] () mutable {
// Guarantee that the changes above reached the disk.
return sstable_write_io_check([&] { return dir_f.flush(); });
}).then([this, dir_f] () mutable {
return sstable_write_io_check([&] { return dir_f.close(); });
}).then([this, dir_f] {
// If this point was reached, sstable should be safe in disk.
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _ks, _cf);
});
});
// If this point was reached, sstable should be safe in disk.
sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _ks, _cf);
}
void write_crc(const sstring file_path, checksum& c) {
@@ -1534,10 +1537,10 @@ void components_writer::consume_end_of_stream() {
seal_statistics(_sst._statistics, _sst._collector, dht::global_partitioner().name(), _schema.bloom_filter_fp_chance());
}
future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc) {
future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc, bool leave_unsealed) {
_collector.set_replay_position(mt.replay_position());
return write_components(mt.make_reader(mt.schema()),
mt.partition_count(), mt.schema(), std::numeric_limits<uint64_t>::max(), backup, pc);
mt.partition_count(), mt.schema(), std::numeric_limits<uint64_t>::max(), backup, pc, leave_unsealed);
}
void sstable_writer::prepare_file_writer()
@@ -1569,11 +1572,12 @@ void sstable_writer::finish_file_writer()
}
sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
uint64_t max_sstable_size, bool backup, const io_priority_class& pc)
uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc)
: _sst(sst)
, _schema(s)
, _pc(pc)
, _backup(backup)
, _leave_unsealed(leave_unsealed)
{
_sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance());
_sst.write_toc(_pc);
@@ -1593,25 +1597,35 @@ void sstable_writer::consume_end_of_stream()
_sst.write_statistics(_pc);
// NOTE: write_compression means maybe_write_compression.
_sst.write_compression(_pc);
_sst.seal_sstable();
if (_backup) {
auto dir = _sst.get_dir() + "/backups/";
sstable_write_io_check(touch_directory, dir).get();
_sst.create_links(dir).get();
if (!_leave_unsealed) {
_sst.seal_sstable(_backup).get();
}
}
sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size,
bool backup, const io_priority_class& pc)
future<> sstable::seal_sstable(bool backup)
{
return sstable_writer(*this, s, estimated_partitions, max_sstable_size, backup, pc);
return seal_sstable().then([this, backup] {
if (backup) {
auto dir = get_dir() + "/backups/";
return sstable_write_io_check(touch_directory, dir).then([this, dir] {
return create_links(dir);
});
}
return make_ready_future<>();
});
}
sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size,
bool backup, const io_priority_class& pc, bool leave_unsealed)
{
return sstable_writer(*this, s, estimated_partitions, max_sstable_size, backup, leave_unsealed, pc);
}
future<> sstable::write_components(::mutation_reader mr,
uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup, const io_priority_class& pc) {
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size, backup, &pc] () mutable {
auto wr = get_writer(*schema, estimated_partitions, max_sstable_size, backup, pc);
uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup, const io_priority_class& pc, bool leave_unsealed) {
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size, backup, &pc, leave_unsealed] () mutable {
auto wr = get_writer(*schema, estimated_partitions, max_sstable_size, backup, pc, leave_unsealed);
consume_flattened_in_thread(mr, wr);
});
}

View File

@@ -233,14 +233,17 @@ public:
// Write sstable components from a memtable.
future<> write_components(memtable& mt, bool backup = false,
const io_priority_class& pc = default_priority_class());
const io_priority_class& pc = default_priority_class(), bool leave_unsealed = false);
future<> write_components(::mutation_reader mr,
uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup = false,
const io_priority_class& pc = default_priority_class());
const io_priority_class& pc = default_priority_class(), bool leave_unsealed = false);
sstable_writer get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size,
bool backup = false, const io_priority_class& pc = default_priority_class());
bool backup = false, const io_priority_class& pc = default_priority_class(),
bool leave_unsealed = false);
future<> seal_sstable(bool backup);
uint64_t get_estimated_key_count() const {
return ((uint64_t)_summary.header.size_at_full_sampling + 1) *
@@ -393,7 +396,7 @@ private:
void generate_toc(compressor c, double filter_fp_chance);
void write_toc(const io_priority_class& pc);
void seal_sstable();
future<> seal_sstable();
future<> read_compression(const io_priority_class& pc);
void write_compression(const io_priority_class& pc);
@@ -684,6 +687,7 @@ class sstable_writer {
const schema& _schema;
const io_priority_class& _pc;
bool _backup;
bool _leave_unsealed;
bool _compression_enabled;
shared_ptr<file_writer> _writer;
stdx::optional<components_writer> _components_writer;
@@ -692,7 +696,7 @@ private:
void finish_file_writer();
public:
sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
uint64_t max_sstable_size, bool backup, const io_priority_class& pc);
uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc);
void consume_new_partition(const dht::decorated_key& dk) { return _components_writer->consume_new_partition(dk); }
void consume(tombstone t) { _components_writer->consume(t); }
stop_iteration consume(static_row&& sr) { return _components_writer->consume(std::move(sr)); }

View File

@@ -68,33 +68,29 @@ void mutation_fragment::destroy_data() noexcept
}
}
// C++ does not allow local classes that have template member. The rightful
// place of this class is inside mutation_fragment::key().
struct get_mutation_fragment_key_visitor {
template<typename T>
const clustering_key_prefix& operator()(const T& mf) { return mf.key(); }
const clustering_key_prefix& operator()(const static_row& sr) { abort(); }
};
const clustering_key_prefix& mutation_fragment::key() const
{
assert(has_key());
switch (_kind) {
case kind::clustering_row:
return as_clustering_row().key();
case kind::range_tombstone_begin:
return as_range_tombstone_begin().key();
case kind::range_tombstone_end:
return as_range_tombstone_end().key();
default:
abort();
}
return visit(get_mutation_fragment_key_visitor());
}
int mutation_fragment::bound_kind_weight() const {
assert(has_key());
switch (_kind) {
case kind::clustering_row:
return 0;
case kind::range_tombstone_begin:
return weight(as_range_tombstone_begin().bound().kind);
case kind::range_tombstone_end:
return weight(as_range_tombstone_end().bound().kind);
default:
abort();
}
struct get_bound_kind_weight {
int operator()(const clustering_row&) { return 0; }
int operator()(const range_tombstone_begin& rtb) { return weight(rtb.kind()); }
int operator()(const range_tombstone_end& rte) { return weight(rte.kind()); }
int operator()(...) { abort(); }
};
return visit(get_bound_kind_weight());
}
void mutation_fragment::apply(const schema& s, mutation_fragment&& mf)
@@ -122,17 +118,7 @@ void mutation_fragment::apply(const schema& s, mutation_fragment&& mf)
position_in_partition mutation_fragment::position() const
{
switch (_kind) {
case kind::static_row:
return _data->_static_row.position();
case kind::clustering_row:
return _data->_clustering_row.position();
case kind::range_tombstone_begin:
return _data->_range_tombstone_begin.position();
case kind::range_tombstone_end:
return _data->_range_tombstone_end.position();
}
abort();
return visit([] (auto& mf) { return mf.position(); });
}
std::ostream& operator<<(std::ostream& os, const streamed_mutation& sm) {

View File

@@ -89,6 +89,10 @@ public:
}
position_in_partition position() const;
size_t memory_usage() const {
return _ck.memory_usage() + _cells.memory_usage();
}
};
class static_row {
@@ -115,6 +119,10 @@ public:
}
position_in_partition position() const;
size_t memory_usage() const {
return _cells.memory_usage();
}
};
class range_tombstone_begin {
@@ -137,6 +145,10 @@ public:
}
position_in_partition position() const;
size_t memory_usage() const {
return _ck.memory_usage();
}
};
class range_tombstone_end {
@@ -153,6 +165,10 @@ public:
bound_view bound() const { return bound_view(_ck, _kind); }
position_in_partition position() const;
size_t memory_usage() const {
return _ck.memory_usage();
}
};
class mutation_fragment {
@@ -245,7 +261,7 @@ public:
}
*/
template<typename Consumer>
auto consume(Consumer& consumer) && {
decltype(auto) consume(Consumer& consumer) && {
switch (_kind) {
case kind::static_row:
return consumer.consume(std::move(_data->_static_row));
@@ -258,6 +274,36 @@ public:
}
abort();
}
/*
template<typename T, typename ReturnType>
concept bool MutationFragmentVisitor() {
return requires(T t, const static_row& sr, const clustering_row& cr, const range_tombstone_begin& rtb, const range_tombstone_end& rte) {
{ t(sr) } -> ReturnType;
{ t(cr) } -> ReturnType;
{ t(rtb) } -> ReturnType;
{ t(rte) } -> ReturnType;
};
}
*/
template<typename Visitor>
decltype(auto) visit(Visitor&& visitor) const {
switch (_kind) {
case kind::static_row:
return visitor(as_static_row());
case kind::clustering_row:
return visitor(as_clustering_row());
case kind::range_tombstone_begin:
return visitor(as_range_tombstone_begin());
case kind::range_tombstone_end:
return visitor(as_range_tombstone_end());
}
abort();
}
size_t memory_usage() const {
return sizeof(data) + visit([] (auto& mf) { return mf.memory_usage(); });
}
};
class position_in_partition {

View File

@@ -114,28 +114,29 @@ void stream_session::init_messaging_service_handler() {
return make_ready_future<>();
});
});
ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) {
ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool> fragmented_opt) {
auto from = net::messaging_service::get_source(cinfo);
return do_with(std::move(fm), [plan_id, from] (const auto& fm) {
auto fragmented = fragmented_opt && *fragmented_opt;
return do_with(std::move(fm), [plan_id, from, fragmented] (const auto& fm) {
auto fm_size = fm.representation().size();
get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, fm_size);
return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm] (schema_ptr s) {
return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm, fragmented] (schema_ptr s) {
auto cf_id = fm.column_family_id();
sslog.debug("[Stream #{}] GOT STREAM_MUTATION from {}: cf_id={}", plan_id, from.addr, cf_id);
auto& db = service::get_local_storage_proxy().get_db().local();
if (!db.column_family_exists(cf_id)) {
sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped",
plan_id, from.addr, cf_id);
plan_id, from.addr, cf_id);
return make_ready_future<>();
}
return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), fm).then_wrapped([plan_id, cf_id, from] (auto&& f) {
return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), plan_id, fm, fragmented).then_wrapped([plan_id, cf_id, from] (auto&& f) {
try {
f.get();
return make_ready_future<>();
} catch (no_such_column_family) {
sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped",
plan_id, from.addr, cf_id);
plan_id, from.addr, cf_id);
return make_ready_future<>();
} catch (...) {
throw;
@@ -162,7 +163,7 @@ void stream_session::init_messaging_service_handler() {
for (auto& range : ranges) {
query_ranges.push_back(query::to_partition_range(range));
}
return cf.flush_streaming_mutations(std::move(query_ranges));
return cf.flush_streaming_mutations(plan_id, std::move(query_ranges));
} catch (no_such_column_family) {
sslog.warn("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped",
plan_id, from, cf_id);
@@ -424,6 +425,18 @@ void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::ra
}
}
future<> stream_session::receiving_failed(UUID cf_id)
{
return get_db().invoke_on_all([cf_id, plan_id = plan_id()] (database& db) {
try {
auto& cf = db.find_column_family(cf_id);
return cf.fail_streaming_mutations(plan_id);
} catch (no_such_column_family) {
return make_ready_future<>();
}
});
}
void stream_session::close_session(stream_session_state final_state) {
sslog.debug("[Stream #{}] close_session session={}, state={}, is_aborted={}", plan_id(), this, final_state, _is_aborted);
if (!_is_aborted) {
@@ -439,6 +452,7 @@ void stream_session::close_session(stream_session_state final_state) {
for (auto& x : _receivers) {
stream_receive_task& task = x.second;
sslog.debug("[Stream #{}] close_session session={}, state={}, abort stream_receive_task cf_id={}", plan_id(), this, final_state, task.cf_id);
receiving_failed(x.first);
task.abort();
}
}

View File

@@ -338,6 +338,7 @@ private:
bool maybe_completed();
void prepare_receiving(stream_summary& summary);
void start_streaming_files();
future<> receiving_failed(UUID cf_id);
};
} // namespace streaming

View File

@@ -84,44 +84,42 @@ struct send_info {
}
};
future<stop_iteration> do_send_mutations(auto si, auto fm) {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
future<> do_send_mutations(auto si, auto fm, bool fragmented) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
});
return make_ready_future<stop_iteration>(stop_iteration::no);
}
future<> send_mutations(auto si) {
auto& cf = si->db.find_column_family(si->cf_id);
auto& priority = service::get_local_streaming_read_priority();
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
return repeat([si, &reader] {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
return reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
return repeat([si, &reader] () {
return reader().then([si] (auto smopt) {
if (smopt && si->db.column_family_exists(si->cf_id)) {
return fragment_and_freeze(std::move(*smopt), [si] (auto fm, bool fragmented) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
return do_send_mutations(si, std::move(fm), fragmented);
}).then([] { return stop_iteration::no; });
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).then([si] {

View File

@@ -116,7 +116,7 @@ public:
sst->write_compression(default_priority_class());
sst->write_filter(default_priority_class());
sst->write_summary(default_priority_class());
sst->seal_sstable();
sst->seal_sstable().get();
});
}

View File

@@ -105,3 +105,49 @@ SEASTAR_TEST_CASE(test_freezing_streamed_mutations) {
});
}
SEASTAR_TEST_CASE(test_fragmenting_and_freezing_streamed_mutations) {
return seastar::async([] {
storage_service_for_tests ssft;
for_each_mutation([&] (const mutation& m) {
std::vector<frozen_mutation> fms;
fragment_and_freeze(streamed_mutation_from_mutation(mutation(m)), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
fms.emplace_back(std::move(fm));
return make_ready_future<>();
}, std::numeric_limits<size_t>::max()).get0();
BOOST_REQUIRE_EQUAL(fms.size(), 1);
auto m1 = fms.back().unfreeze(m.schema());
BOOST_REQUIRE_EQUAL(m, m1);
fms.clear();
stdx::optional<bool> fragmented;
fragment_and_freeze(streamed_mutation_from_mutation(mutation(m)), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!fragmented || *fragmented == frag);
*fragmented = frag;
fms.emplace_back(std::move(fm));
return make_ready_future<>();
}, 1).get0();
auto expected_fragments = m.partition().clustered_rows().size()
+ m.partition().row_tombstones().size()
+ !m.partition().static_row().empty();
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
auto m2 = fms.back().unfreeze(m.schema());
fms.pop_back();
while (!fms.empty()) {
m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema());
fms.pop_back();
}
BOOST_REQUIRE_EQUAL(m, m2);
});
});
}

View File

@@ -369,6 +369,20 @@ public:
return read_linearize();
}
// Returns the amount of external memory used.
size_t memory_usage() const {
if (external()) {
size_t mem = 0;
blob_storage* blob = _u.ptr;
while (blob) {
mem += blob->frag_size + sizeof(blob_storage);
blob = blob->next;
}
return mem;
}
return 0;
}
template <typename Func>
friend std::result_of_t<Func()> with_linearized_managed_bytes(Func&& func);
};

View File

@@ -232,4 +232,12 @@ public:
push_back(value);
}
}
// Returns the amount of external memory used.
size_t memory_usage() const {
if (is_external()) {
return sizeof(external) + _capacity * sizeof(T);
}
return 0;
}
};