diff --git a/atomic_cell_or_collection.hh b/atomic_cell_or_collection.hh index 69c0deb997..78454ac29f 100644 --- a/atomic_cell_or_collection.hh +++ b/atomic_cell_or_collection.hh @@ -63,5 +63,8 @@ public: ::feed_hash(as_collection_mutation(), h, def.type); } } + size_t memory_usage() const { + return _data.memory_usage(); + } friend std::ostream& operator<<(std::ostream&, const atomic_cell_or_collection&); }; diff --git a/database.cc b/database.cc index 93b1581457..62198e6b9a 100644 --- a/database.cc +++ b/database.cc @@ -113,6 +113,13 @@ column_family::make_streaming_memtable_list() { return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager); } +lw_shared_ptr +column_family::make_streaming_memtable_big_list(streaming_memtable_big& smb) { + auto seal = [this, &smb] (memtable_list::flush_behavior) { return seal_active_streaming_memtable_big(smb); }; + auto get_schema = [this] { return schema(); }; + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager); +} + column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) : _schema(std::move(schema)) , _config(std::move(config)) @@ -166,6 +173,11 @@ logalloc::occupancy_stats column_family::occupancy() const { for (auto m : *_streaming_memtables) { res += m->region().occupancy(); } + for (auto smb : _streaming_memtables_big) { + for (auto m : *smb.second->memtables) { + res += m->region().occupancy(); + } + } return res; } @@ -687,6 +699,8 @@ column_family::seal_active_streaming_memtable_immediate() { } _streaming_memtables->add_memtable(); _streaming_memtables->erase(old); + + auto guard = _streaming_flush_phaser.start(); return with_gate(_streaming_flush_gate, [this, old] { _delayed_streaming_flush.cancel(); @@ -735,6 +749,35 @@ column_family::seal_active_streaming_memtable_immediate() { }); return f; + }).finally([guard = std::move(guard)] { }); +} + +future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_big& smb) { + auto old = smb.memtables->back(); + if (old->empty()) { + return make_ready_future<>(); + } + smb.memtables->add_memtable(); + smb.memtables->erase(old); + return with_gate(_streaming_flush_gate, [this, old, &smb] { + return with_gate(smb.flush_in_progress, [this, old, &smb] { + return with_lock(_sstables_lock.for_read(), [this, old, &smb] { + auto newtab = make_lw_shared(_schema->ks_name(), _schema->cf_name(), + _config.datadir, calculate_generation_for_new_table(), + sstables::sstable::version_types::ka, + sstables::sstable::format_types::big); + + newtab->set_unshared(); + + auto&& priority = service::get_local_streaming_write_priority(); + return newtab->write_components(*old, incremental_backups_enabled(), priority, true).then([this, newtab, old, &smb] { + smb.sstables.emplace_back(newtab); + }).handle_exception([] (auto ep) { + dblog.error("failed to write streamed sstable: {}", ep); + return make_exception_future<>(ep); + }); + }); + }); }); } @@ -2105,11 +2148,26 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const } } -void column_family::apply_streaming_mutation(schema_ptr m_schema, const frozen_mutation& m) { +void column_family::apply_streaming_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) { + if (fragmented) { + apply_streaming_big_mutation(std::move(m_schema), plan_id, m); + return; + } _streaming_memtables->active_memtable().apply(m, m_schema); _streaming_memtables->seal_on_overflow(); } +void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) { + auto it = _streaming_memtables_big.find(plan_id); + if (it == _streaming_memtables_big.end()) { + it = _streaming_memtables_big.emplace(plan_id, make_lw_shared()).first; + it->second->memtables = _config.enable_disk_writes ? make_streaming_memtable_big_list(*it->second) : make_memory_only_memtable_list(); + } + auto entry = it->second; + entry->memtables->active_memtable().apply(m, m_schema); + entry->memtables->seal_on_overflow(); +} + void column_family::check_valid_rp(const db::replay_position& rp) const { if (rp < _highest_flushed_rp) { @@ -2215,15 +2273,15 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m) { }); } -future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation& m) { +future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) { if (!s->is_synced()) { throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] { + return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, fragmented, s = std::move(s)] { auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); - cf.apply_streaming_mutation(s, std::move(m)); + cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented); }); } @@ -2728,13 +2786,17 @@ future<> column_family::flush(const db::replay_position& pos) { // so we always flush. When we can differentiate those streams, we should not // be indiscriminately touching the cache during repair. We will just have to // invalidate the entries that are relevant to things we already have in the cache. -future<> column_family::flush_streaming_mutations(std::vector ranges) { +future<> column_family::flush_streaming_mutations(utils::UUID plan_id, std::vector ranges) { // This will effectively take the gate twice for this call. The proper way to fix that would // be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we // need this code to go away as soon as we can (see FIXME above). So the double gate is a better // temporary counter measure. - return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] { - return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).finally([this, ranges = std::move(ranges)] { + return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] { + return flush_streaming_big_mutations(plan_id).then([this] { + return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed); + }).finally([this] { + return _streaming_flush_phaser.advance_and_await(); + }).finally([this, ranges = std::move(ranges)] { if (!_config.enable_cache) { return make_ready_future<>(); } @@ -2747,11 +2809,49 @@ future<> column_family::flush_streaming_mutations(std::vector column_family::flush_streaming_big_mutations(utils::UUID plan_id) { + auto it = _streaming_memtables_big.find(plan_id); + if (it == _streaming_memtables_big.end()) { + return make_ready_future<>(); + } + auto entry = it->second; + _streaming_memtables_big.erase(it); + return entry->memtables->seal_active_memtable(memtable_list::flush_behavior::immediate).then([entry] { + return entry->flush_in_progress.close(); + }).then([this, entry] { + return parallel_for_each(entry->sstables, [this] (auto& sst) { + return sst->seal_sstable(this->incremental_backups_enabled()).then([sst] { + return sst->open_data(); + }); + }).then([this, entry] { + for (auto&& sst : entry->sstables) { + add_sstable(sst); + } + trigger_compaction(); + }); + }); +} + +future<> column_family::fail_streaming_mutations(utils::UUID plan_id) { + auto it = _streaming_memtables_big.find(plan_id); + if (it == _streaming_memtables_big.end()) { + return make_ready_future<>(); + } + auto entry = it->second; + _streaming_memtables_big.erase(it); + return entry->flush_in_progress.close().then([this, entry] { + for (auto&& sst : entry->sstables) { + sst->mark_for_deletion(); + } + }); +} + future<> column_family::clear() { _memtables->clear(); _memtables->add_memtable(); _streaming_memtables->clear(); _streaming_memtables->add_memtable(); + _streaming_memtables_big.clear(); return _cache.clear(); } @@ -2834,6 +2934,12 @@ void column_family::set_schema(schema_ptr s) { m->set_schema(s); } + for (auto smb : _streaming_memtables_big) { + for (auto m : *smb.second->memtables) { + m->set_schema(s); + } + } + _cache.set_schema(s); _schema = std::move(s); diff --git a/database.hh b/database.hh index 793d15a716..3793172c73 100644 --- a/database.hh +++ b/database.hh @@ -371,13 +371,33 @@ private: // memory throttling mechanism, guaranteeing we will not overload the // server. lw_shared_ptr _streaming_memtables; + utils::phased_barrier _streaming_flush_phaser; friend class memtable_dirty_memory_manager; friend class streaming_dirty_memory_manager; + // If mutations are fragmented during streaming the sstables cannot be made + // visible immediately after memtable flush, because that could cause + // readers to see only a part of a partition thus violating isolation + // guarantees. + // Mutations that are sent in fragments are kept separately in per-streaming + // plan memtables and the resulting sstables are not made visible until + // the streaming is complete. + struct streaming_memtable_big { + lw_shared_ptr memtables; + std::vector sstables; + seastar::gate flush_in_progress; + }; + std::unordered_map> _streaming_memtables_big; + + future<> flush_streaming_big_mutations(utils::UUID plan_id); + void apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m); + future<> seal_active_streaming_memtable_big(streaming_memtable_big& smb); + lw_shared_ptr make_memory_only_memtable_list(); lw_shared_ptr make_memtable_list(); lw_shared_ptr make_streaming_memtable_list(); + lw_shared_ptr make_streaming_memtable_big_list(streaming_memtable_big& smb); sstables::compaction_strategy _compaction_strategy; // generation -> sstable. Ordered by key so we can easily get the most recent. @@ -530,7 +550,7 @@ public: // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position()); void apply(const mutation& m, const db::replay_position& = db::replay_position()); - void apply_streaming_mutation(schema_ptr, const frozen_mutation&); + void apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented); // Returns at most "cmd.limit" rows future> query(schema_ptr, @@ -543,7 +563,8 @@ public: future<> stop(); future<> flush(); future<> flush(const db::replay_position&); - future<> flush_streaming_mutations(std::vector ranges = std::vector{}); + future<> flush_streaming_mutations(utils::UUID plan_id, std::vector ranges = std::vector{}); + future<> fail_streaming_mutations(utils::UUID plan_id); future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); @@ -1027,7 +1048,7 @@ public: future> query(schema_ptr, const query::read_command& cmd, query::result_request request, const std::vector& ranges); future query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range); future<> apply(schema_ptr, const frozen_mutation&); - future<> apply_streaming_mutation(schema_ptr, const frozen_mutation&); + future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented); keyspace::config make_keyspace_config(const keyspace_metadata& ksm); const sstring& get_snitch_name() const; future<> clear_snapshot(sstring tag, std::vector keyspace_names); diff --git a/frozen_mutation.cc b/frozen_mutation.cc index 67e7838186..e5f2ee7d61 100644 --- a/frozen_mutation.cc +++ b/frozen_mutation.cc @@ -188,3 +188,106 @@ future freeze(streamed_mutation sm) { return consume(sm, streamed_mutation_freezer(*sm.schema(), sm.key())); }); } + +class fragmenting_mutation_freezer { + const schema& _schema; + partition_key _key; + + tombstone _partition_tombstone; + stdx::optional _sr; + std::deque _crs; + range_tombstone_list _rts; + stdx::optional _range_tombstone_begin; + + frozen_mutation_consumer_fn _consumer; + + bool _fragmented = false; + size_t _dirty_size = 0; + size_t _fragment_size; +private: + future<> flush() { + bytes_ostream out; + ser::writer_of_mutation wom(out); + std::move(wom).write_table_id(_schema.id()) + .write_schema_version(_schema.version()) + .write_key(_key) + .partition([&] (auto wr) { + serialize_mutation_fragments(_schema, _partition_tombstone, + std::move(_sr), std::move(_rts), + std::move(_crs), std::move(wr)); + }).end_mutation(); + + _sr = { }; + _rts.clear(); + _crs.clear(); + _dirty_size = 0; + return _consumer(frozen_mutation(out.linearize(), _key), _fragmented); + } + + future maybe_flush() { + if (_dirty_size >= _fragment_size) { + _fragmented = true; + return flush().then([] { return stop_iteration::no; }); + } + return make_ready_future(stop_iteration::no); + } +public: + fragmenting_mutation_freezer(const schema& s, const partition_key& key, frozen_mutation_consumer_fn c, size_t fragment_size) + : _schema(s), _key(key), _rts(s), _consumer(c), _fragment_size(fragment_size) { } + + void consume(tombstone pt) { + _dirty_size += sizeof(tombstone); + _partition_tombstone = pt; + } + + future consume(static_row&& sr) { + _sr = std::move(sr); + _dirty_size += _sr->memory_usage() + sizeof(sr); + return maybe_flush(); + } + + future consume(clustering_row&& cr) { + _dirty_size += cr.memory_usage() + sizeof(cr); + _crs.emplace_back(std::move(cr)); + return maybe_flush(); + } + + future consume(range_tombstone_begin&& rtb) { + assert(!_range_tombstone_begin); + _range_tombstone_begin = std::move(rtb); + return make_ready_future(stop_iteration::no); + } + + future consume(range_tombstone_end&& rte) { + assert(_range_tombstone_begin); + _dirty_size += _range_tombstone_begin->memory_usage() + rte.memory_usage() + sizeof(range_tombstone); + _rts.apply(_schema, std::move(_range_tombstone_begin->key()), _range_tombstone_begin->kind(), + std::move(rte.key()), rte.kind(), _range_tombstone_begin->tomb()); + _range_tombstone_begin = { }; + return maybe_flush(); + } + + future consume_end_of_stream() { + assert(!_range_tombstone_begin); + if (_dirty_size) { + return flush().then([] { return stop_iteration::yes; }); + } + return make_ready_future(stop_iteration::yes); + } +}; + +future<> fragment_and_freeze(streamed_mutation sm, frozen_mutation_consumer_fn c, size_t fragment_size) +{ + fragmenting_mutation_freezer freezer(*sm.schema(), sm.key(), c, fragment_size); + return do_with(std::move(sm), std::move(freezer), [] (auto& sm, auto& freezer) { + freezer.consume(sm.partition_tombstone()); + return repeat([&] { + return sm().then([&] (auto mfopt) { + if (!mfopt) { + return freezer.consume_end_of_stream(); + } + return std::move(*mfopt).consume(freezer); + }); + }); + }); +} diff --git a/frozen_mutation.hh b/frozen_mutation.hh index 6a24cf73a6..e05dfab030 100644 --- a/frozen_mutation.hh +++ b/frozen_mutation.hh @@ -101,4 +101,7 @@ public: future freeze(streamed_mutation sm); +using frozen_mutation_consumer_fn = std::function(frozen_mutation, bool)>; +future<> fragment_and_freeze(streamed_mutation sm, frozen_mutation_consumer_fn c, + size_t fragment_size = 128 * 1024); diff --git a/keys.hh b/keys.hh index 26edd310e8..e6ac630697 100644 --- a/keys.hh +++ b/keys.hh @@ -322,6 +322,10 @@ public: size_t size(const schema& s) const { return std::distance(begin(s), end(s)); } + + size_t memory_usage() const { + return _bytes.memory_usage(); + } }; template diff --git a/message/messaging_service.cc b/message/messaging_service.cc index c97c2ba03b..441d68d189 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -644,13 +644,13 @@ future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id, } // STREAM_MUTATION -void messaging_service::register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func) { +void messaging_service::register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional fragmented)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION, std::move(func)); } -future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) { +future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented) { return send_message_timeout_and_retry(this, messaging_verb::STREAM_MUTATION, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, - plan_id, std::move(fm), dst_cpu_id); + plan_id, std::move(fm), dst_cpu_id, fragmented); } // STREAM_MUTATION_DONE diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5b19286bc1..78d6f243aa 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -216,8 +216,8 @@ public: future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id); // Wrapper for STREAM_MUTATION verb - void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func); - future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id); + void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional)>&& func); + future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented); void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id); diff --git a/mutation_partition.cc b/mutation_partition.cc index 564e7f3dcd..8c7b73c9b8 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1078,6 +1078,21 @@ row::find_cell(column_id id) const { } } +size_t row::memory_usage() const { + size_t mem = 0; + if (_type == storage_type::vector) { + mem += _storage.vector.v.memory_usage(); + for (auto&& ac_o_c : _storage.vector.v) { + mem += ac_o_c.memory_usage(); + } + } else { + for (auto&& ce : _storage.set) { + mem += sizeof(cell_entry) + ce.cell().memory_usage(); + } + } + return mem; +} + template void mutation_partition::trim_rows(const schema& s, const std::vector& row_ranges, diff --git a/mutation_partition.hh b/mutation_partition.hh index 8ff716051f..aadc01c9ea 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -265,6 +265,8 @@ public: bool equal(column_kind kind, const schema& this_schema, const row& other, const schema& other_schema) const; + size_t memory_usage() const; + friend std::ostream& operator<<(std::ostream& os, const row& r); }; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9fd6773aaf..df2a41e1bb 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -964,10 +964,10 @@ storage_proxy::mutate_locally(std::vector mutations) { } future<> -storage_proxy::mutate_streaming_mutation(const schema_ptr& s, const frozen_mutation& m) { +storage_proxy::mutate_streaming_mutation(const schema_ptr& s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) { auto shard = _db.local().shard_of(m); - return _db.invoke_on(shard, [&m, gs = global_schema_ptr(s)] (database& db) mutable -> future<> { - return db.apply_streaming_mutation(gs, m); + return _db.invoke_on(shard, [&m, plan_id, fragmented, gs = global_schema_ptr(s)] (database& db) mutable -> future<> { + return db.apply_streaming_mutation(gs, plan_id, m, fragmented); }); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index fcc12fd52e..b286e2acd4 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -254,7 +254,7 @@ public: future<> mutate_locally(const schema_ptr&, const frozen_mutation& m); future<> mutate_locally(std::vector mutations); - future<> mutate_streaming_mutation(const schema_ptr&, const frozen_mutation& m); + future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented); /** * Use this method to have these Mutations applied diff --git a/sstables/sstables.cc b/sstables/sstables.cc index bce0e7e650..769d5ac1af 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -817,23 +817,26 @@ void sstable::write_toc(const io_priority_class& pc) { }); } -void sstable::seal_sstable() { +future<> sstable::seal_sstable() { // SSTable sealing is about renaming temporary TOC file after guaranteeing // that each component reached the disk safely. - - file dir_f = open_checked_directory(sstable_write_error, _dir).get0(); - - sstable_write_io_check([&] { + return open_checked_directory(sstable_write_error, _dir).then([this] (file dir_f) { // Guarantee that every component of this sstable reached the disk. - dir_f.flush().get(); - // Rename TOC because it's no longer temporary. - engine().rename_file(filename(sstable::component_type::TemporaryTOC), filename(sstable::component_type::TOC)).get(); - // Guarantee that the changes above reached the disk. - dir_f.flush().get(); - dir_f.close().get(); + return sstable_write_io_check([&] { return dir_f.flush(); }).then([this] { + // Rename TOC because it's no longer temporary. + return sstable_write_io_check([&] { + return engine().rename_file(filename(sstable::component_type::TemporaryTOC), filename(sstable::component_type::TOC)); + }); + }).then([this, dir_f] () mutable { + // Guarantee that the changes above reached the disk. + return sstable_write_io_check([&] { return dir_f.flush(); }); + }).then([this, dir_f] () mutable { + return sstable_write_io_check([&] { return dir_f.close(); }); + }).then([this, dir_f] { + // If this point was reached, sstable should be safe in disk. + sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _ks, _cf); + }); }); - // If this point was reached, sstable should be safe in disk. - sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _ks, _cf); } void write_crc(const sstring file_path, checksum& c) { @@ -1534,10 +1537,10 @@ void components_writer::consume_end_of_stream() { seal_statistics(_sst._statistics, _sst._collector, dht::global_partitioner().name(), _schema.bloom_filter_fp_chance()); } -future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc) { +future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc, bool leave_unsealed) { _collector.set_replay_position(mt.replay_position()); return write_components(mt.make_reader(mt.schema()), - mt.partition_count(), mt.schema(), std::numeric_limits::max(), backup, pc); + mt.partition_count(), mt.schema(), std::numeric_limits::max(), backup, pc, leave_unsealed); } void sstable_writer::prepare_file_writer() @@ -1569,11 +1572,12 @@ void sstable_writer::finish_file_writer() } sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions, - uint64_t max_sstable_size, bool backup, const io_priority_class& pc) + uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc) : _sst(sst) , _schema(s) , _pc(pc) , _backup(backup) + , _leave_unsealed(leave_unsealed) { _sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance()); _sst.write_toc(_pc); @@ -1593,25 +1597,35 @@ void sstable_writer::consume_end_of_stream() _sst.write_statistics(_pc); // NOTE: write_compression means maybe_write_compression. _sst.write_compression(_pc); - _sst.seal_sstable(); - if (_backup) { - auto dir = _sst.get_dir() + "/backups/"; - sstable_write_io_check(touch_directory, dir).get(); - _sst.create_links(dir).get(); + if (!_leave_unsealed) { + _sst.seal_sstable(_backup).get(); } } -sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size, - bool backup, const io_priority_class& pc) +future<> sstable::seal_sstable(bool backup) { - return sstable_writer(*this, s, estimated_partitions, max_sstable_size, backup, pc); + return seal_sstable().then([this, backup] { + if (backup) { + auto dir = get_dir() + "/backups/"; + return sstable_write_io_check(touch_directory, dir).then([this, dir] { + return create_links(dir); + }); + } + return make_ready_future<>(); + }); +} + +sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size, + bool backup, const io_priority_class& pc, bool leave_unsealed) +{ + return sstable_writer(*this, s, estimated_partitions, max_sstable_size, backup, leave_unsealed, pc); } future<> sstable::write_components(::mutation_reader mr, - uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup, const io_priority_class& pc) { - return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size, backup, &pc] () mutable { - auto wr = get_writer(*schema, estimated_partitions, max_sstable_size, backup, pc); + uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup, const io_priority_class& pc, bool leave_unsealed) { + return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size, backup, &pc, leave_unsealed] () mutable { + auto wr = get_writer(*schema, estimated_partitions, max_sstable_size, backup, pc, leave_unsealed); consume_flattened_in_thread(mr, wr); }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 7cef991c0e..d92aeac248 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -233,14 +233,17 @@ public: // Write sstable components from a memtable. future<> write_components(memtable& mt, bool backup = false, - const io_priority_class& pc = default_priority_class()); + const io_priority_class& pc = default_priority_class(), bool leave_unsealed = false); future<> write_components(::mutation_reader mr, uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup = false, - const io_priority_class& pc = default_priority_class()); + const io_priority_class& pc = default_priority_class(), bool leave_unsealed = false); sstable_writer get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size, - bool backup = false, const io_priority_class& pc = default_priority_class()); + bool backup = false, const io_priority_class& pc = default_priority_class(), + bool leave_unsealed = false); + + future<> seal_sstable(bool backup); uint64_t get_estimated_key_count() const { return ((uint64_t)_summary.header.size_at_full_sampling + 1) * @@ -393,7 +396,7 @@ private: void generate_toc(compressor c, double filter_fp_chance); void write_toc(const io_priority_class& pc); - void seal_sstable(); + future<> seal_sstable(); future<> read_compression(const io_priority_class& pc); void write_compression(const io_priority_class& pc); @@ -684,6 +687,7 @@ class sstable_writer { const schema& _schema; const io_priority_class& _pc; bool _backup; + bool _leave_unsealed; bool _compression_enabled; shared_ptr _writer; stdx::optional _components_writer; @@ -692,7 +696,7 @@ private: void finish_file_writer(); public: sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions, - uint64_t max_sstable_size, bool backup, const io_priority_class& pc); + uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc); void consume_new_partition(const dht::decorated_key& dk) { return _components_writer->consume_new_partition(dk); } void consume(tombstone t) { _components_writer->consume(t); } stop_iteration consume(static_row&& sr) { return _components_writer->consume(std::move(sr)); } diff --git a/streamed_mutation.cc b/streamed_mutation.cc index 35f1fdb931..a0f1715c38 100644 --- a/streamed_mutation.cc +++ b/streamed_mutation.cc @@ -68,33 +68,29 @@ void mutation_fragment::destroy_data() noexcept } } +// C++ does not allow local classes that have template member. The rightful +// place of this class is inside mutation_fragment::key(). +struct get_mutation_fragment_key_visitor { + template + const clustering_key_prefix& operator()(const T& mf) { return mf.key(); } + const clustering_key_prefix& operator()(const static_row& sr) { abort(); } +}; + const clustering_key_prefix& mutation_fragment::key() const { assert(has_key()); - switch (_kind) { - case kind::clustering_row: - return as_clustering_row().key(); - case kind::range_tombstone_begin: - return as_range_tombstone_begin().key(); - case kind::range_tombstone_end: - return as_range_tombstone_end().key(); - default: - abort(); - } + return visit(get_mutation_fragment_key_visitor()); } int mutation_fragment::bound_kind_weight() const { assert(has_key()); - switch (_kind) { - case kind::clustering_row: - return 0; - case kind::range_tombstone_begin: - return weight(as_range_tombstone_begin().bound().kind); - case kind::range_tombstone_end: - return weight(as_range_tombstone_end().bound().kind); - default: - abort(); - } + struct get_bound_kind_weight { + int operator()(const clustering_row&) { return 0; } + int operator()(const range_tombstone_begin& rtb) { return weight(rtb.kind()); } + int operator()(const range_tombstone_end& rte) { return weight(rte.kind()); } + int operator()(...) { abort(); } + }; + return visit(get_bound_kind_weight()); } void mutation_fragment::apply(const schema& s, mutation_fragment&& mf) @@ -122,17 +118,7 @@ void mutation_fragment::apply(const schema& s, mutation_fragment&& mf) position_in_partition mutation_fragment::position() const { - switch (_kind) { - case kind::static_row: - return _data->_static_row.position(); - case kind::clustering_row: - return _data->_clustering_row.position(); - case kind::range_tombstone_begin: - return _data->_range_tombstone_begin.position(); - case kind::range_tombstone_end: - return _data->_range_tombstone_end.position(); - } - abort(); + return visit([] (auto& mf) { return mf.position(); }); } std::ostream& operator<<(std::ostream& os, const streamed_mutation& sm) { diff --git a/streamed_mutation.hh b/streamed_mutation.hh index 1c67cb23f1..51defa9533 100644 --- a/streamed_mutation.hh +++ b/streamed_mutation.hh @@ -89,6 +89,10 @@ public: } position_in_partition position() const; + + size_t memory_usage() const { + return _ck.memory_usage() + _cells.memory_usage(); + } }; class static_row { @@ -115,6 +119,10 @@ public: } position_in_partition position() const; + + size_t memory_usage() const { + return _cells.memory_usage(); + } }; class range_tombstone_begin { @@ -137,6 +145,10 @@ public: } position_in_partition position() const; + + size_t memory_usage() const { + return _ck.memory_usage(); + } }; class range_tombstone_end { @@ -153,6 +165,10 @@ public: bound_view bound() const { return bound_view(_ck, _kind); } position_in_partition position() const; + + size_t memory_usage() const { + return _ck.memory_usage(); + } }; class mutation_fragment { @@ -245,7 +261,7 @@ public: } */ template - auto consume(Consumer& consumer) && { + decltype(auto) consume(Consumer& consumer) && { switch (_kind) { case kind::static_row: return consumer.consume(std::move(_data->_static_row)); @@ -258,6 +274,36 @@ public: } abort(); } + + /* + template + concept bool MutationFragmentVisitor() { + return requires(T t, const static_row& sr, const clustering_row& cr, const range_tombstone_begin& rtb, const range_tombstone_end& rte) { + { t(sr) } -> ReturnType; + { t(cr) } -> ReturnType; + { t(rtb) } -> ReturnType; + { t(rte) } -> ReturnType; + }; + } + */ + template + decltype(auto) visit(Visitor&& visitor) const { + switch (_kind) { + case kind::static_row: + return visitor(as_static_row()); + case kind::clustering_row: + return visitor(as_clustering_row()); + case kind::range_tombstone_begin: + return visitor(as_range_tombstone_begin()); + case kind::range_tombstone_end: + return visitor(as_range_tombstone_end()); + } + abort(); + } + + size_t memory_usage() const { + return sizeof(data) + visit([] (auto& mf) { return mf.memory_usage(); }); + } }; class position_in_partition { diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 72d05b016c..c7af346f91 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -114,28 +114,29 @@ void stream_session::init_messaging_service_handler() { return make_ready_future<>(); }); }); - ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) { + ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional fragmented_opt) { auto from = net::messaging_service::get_source(cinfo); - return do_with(std::move(fm), [plan_id, from] (const auto& fm) { + auto fragmented = fragmented_opt && *fragmented_opt; + return do_with(std::move(fm), [plan_id, from, fragmented] (const auto& fm) { auto fm_size = fm.representation().size(); get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, fm_size); - return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm] (schema_ptr s) { + return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm, fragmented] (schema_ptr s) { auto cf_id = fm.column_family_id(); sslog.debug("[Stream #{}] GOT STREAM_MUTATION from {}: cf_id={}", plan_id, from.addr, cf_id); auto& db = service::get_local_storage_proxy().get_db().local(); if (!db.column_family_exists(cf_id)) { sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped", - plan_id, from.addr, cf_id); + plan_id, from.addr, cf_id); return make_ready_future<>(); } - return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), fm).then_wrapped([plan_id, cf_id, from] (auto&& f) { + return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), plan_id, fm, fragmented).then_wrapped([plan_id, cf_id, from] (auto&& f) { try { f.get(); return make_ready_future<>(); } catch (no_such_column_family) { sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped", - plan_id, from.addr, cf_id); + plan_id, from.addr, cf_id); return make_ready_future<>(); } catch (...) { throw; @@ -162,7 +163,7 @@ void stream_session::init_messaging_service_handler() { for (auto& range : ranges) { query_ranges.push_back(query::to_partition_range(range)); } - return cf.flush_streaming_mutations(std::move(query_ranges)); + return cf.flush_streaming_mutations(plan_id, std::move(query_ranges)); } catch (no_such_column_family) { sslog.warn("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped", plan_id, from, cf_id); @@ -424,6 +425,18 @@ void stream_session::add_transfer_ranges(sstring keyspace, std::vector stream_session::receiving_failed(UUID cf_id) +{ + return get_db().invoke_on_all([cf_id, plan_id = plan_id()] (database& db) { + try { + auto& cf = db.find_column_family(cf_id); + return cf.fail_streaming_mutations(plan_id); + } catch (no_such_column_family) { + return make_ready_future<>(); + } + }); +} + void stream_session::close_session(stream_session_state final_state) { sslog.debug("[Stream #{}] close_session session={}, state={}, is_aborted={}", plan_id(), this, final_state, _is_aborted); if (!_is_aborted) { @@ -439,6 +452,7 @@ void stream_session::close_session(stream_session_state final_state) { for (auto& x : _receivers) { stream_receive_task& task = x.second; sslog.debug("[Stream #{}] close_session session={}, state={}, abort stream_receive_task cf_id={}", plan_id(), this, final_state, task.cf_id); + receiving_failed(x.first); task.abort(); } } diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 94e7541cc2..7f96c06681 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -338,6 +338,7 @@ private: bool maybe_completed(); void prepare_receiving(stream_summary& summary); void start_streaming_files(); + future<> receiving_failed(UUID cf_id); }; } // namespace streaming diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 9abdef2cd3..9e36d34a3f 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -84,44 +84,42 @@ struct send_info { } }; -future do_send_mutations(auto si, auto fm) { - sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); - auto fm_size = fm.representation().size(); - net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] { - sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); - get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); - si->mutations_done.signal(); - }).handle_exception([si] (auto ep) { - // There might be larger number of STREAM_MUTATION inflight. - // Log one error per column_family per range - if (!si->error_logged) { - si->error_logged = true; - sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep); - } - si->mutations_done.broken(); +future<> do_send_mutations(auto si, auto fm, bool fragmented) { + return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable { + sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); + auto fm_size = fm.representation().size(); + net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([si, fm_size] { + sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); + get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); + si->mutations_done.signal(); + }).handle_exception([si] (auto ep) { + // There might be larger number of STREAM_MUTATION inflight. + // Log one error per column_family per range + if (!si->error_logged) { + si->error_logged = true; + sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep); + } + si->mutations_done.broken(); + }).finally([] { + get_local_stream_manager().mutation_send_limiter().signal(); + }); }); - return make_ready_future(stop_iteration::no); } future<> send_mutations(auto si) { auto& cf = si->db.find_column_family(si->cf_id); auto& priority = service::get_local_streaming_read_priority(); return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) { - return repeat([si, &reader] { - return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] { - return reader().then([] (auto sm) { - return mutation_from_streamed_mutation(std::move(sm)); - }).then([si] (auto mopt) { - if (mopt && si->db.column_family_exists(si->cf_id)) { + return repeat([si, &reader] () { + return reader().then([si] (auto smopt) { + if (smopt && si->db.column_family_exists(si->cf_id)) { + return fragment_and_freeze(std::move(*smopt), [si] (auto fm, bool fragmented) { si->mutations_nr++; - auto fm = frozen_mutation(*mopt); - return do_send_mutations(si, std::move(fm)); - } else { - return make_ready_future(stop_iteration::yes); - } - }); - }).finally([] { - get_local_stream_manager().mutation_send_limiter().signal(); + return do_send_mutations(si, std::move(fm), fragmented); + }).then([] { return stop_iteration::no; }); + } else { + return make_ready_future(stop_iteration::yes); + } }); }); }).then([si] { diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index faa70259bf..7b10e3b30f 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -116,7 +116,7 @@ public: sst->write_compression(default_priority_class()); sst->write_filter(default_priority_class()); sst->write_summary(default_priority_class()); - sst->seal_sstable(); + sst->seal_sstable().get(); }); } diff --git a/tests/streamed_mutation_test.cc b/tests/streamed_mutation_test.cc index 18cfaa0a2e..1fa3f70994 100644 --- a/tests/streamed_mutation_test.cc +++ b/tests/streamed_mutation_test.cc @@ -105,3 +105,49 @@ SEASTAR_TEST_CASE(test_freezing_streamed_mutations) { }); } +SEASTAR_TEST_CASE(test_fragmenting_and_freezing_streamed_mutations) { + return seastar::async([] { + storage_service_for_tests ssft; + + for_each_mutation([&] (const mutation& m) { + std::vector fms; + + fragment_and_freeze(streamed_mutation_from_mutation(mutation(m)), [&] (auto fm, bool frag) { + BOOST_REQUIRE(!frag); + fms.emplace_back(std::move(fm)); + return make_ready_future<>(); + }, std::numeric_limits::max()).get0(); + + BOOST_REQUIRE_EQUAL(fms.size(), 1); + + auto m1 = fms.back().unfreeze(m.schema()); + BOOST_REQUIRE_EQUAL(m, m1); + + fms.clear(); + + stdx::optional fragmented; + fragment_and_freeze(streamed_mutation_from_mutation(mutation(m)), [&] (auto fm, bool frag) { + BOOST_REQUIRE(!fragmented || *fragmented == frag); + *fragmented = frag; + fms.emplace_back(std::move(fm)); + return make_ready_future<>(); + }, 1).get0(); + + auto expected_fragments = m.partition().clustered_rows().size() + + m.partition().row_tombstones().size() + + !m.partition().static_row().empty(); + BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1))); + BOOST_REQUIRE(expected_fragments < 2 || *fragmented); + + auto m2 = fms.back().unfreeze(m.schema()); + fms.pop_back(); + while (!fms.empty()) { + m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema()); + fms.pop_back(); + } + BOOST_REQUIRE_EQUAL(m, m2); + }); + }); +} + + diff --git a/utils/managed_bytes.hh b/utils/managed_bytes.hh index cce0bf3964..3b1b63786a 100644 --- a/utils/managed_bytes.hh +++ b/utils/managed_bytes.hh @@ -369,6 +369,20 @@ public: return read_linearize(); } + // Returns the amount of external memory used. + size_t memory_usage() const { + if (external()) { + size_t mem = 0; + blob_storage* blob = _u.ptr; + while (blob) { + mem += blob->frag_size + sizeof(blob_storage); + blob = blob->next; + } + return mem; + } + return 0; + } + template friend std::result_of_t with_linearized_managed_bytes(Func&& func); }; diff --git a/utils/managed_vector.hh b/utils/managed_vector.hh index 8d61436ec6..5d6f1e66c9 100644 --- a/utils/managed_vector.hh +++ b/utils/managed_vector.hh @@ -232,4 +232,12 @@ public: push_back(value); } } + + // Returns the amount of external memory used. + size_t memory_usage() const { + if (is_external()) { + return sizeof(external) + _capacity * sizeof(T); + } + return 0; + } };