From 37bd7230bcceba9deb7af10641c3c6001159c17f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:33:29 +0100 Subject: [PATCH 01/17] streamed_mutation: add mutation fragment visitor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- streamed_mutation.cc | 48 ++++++++++++++++---------------------------- streamed_mutation.hh | 26 ++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/streamed_mutation.cc b/streamed_mutation.cc index 35f1fdb931..a0f1715c38 100644 --- a/streamed_mutation.cc +++ b/streamed_mutation.cc @@ -68,33 +68,29 @@ void mutation_fragment::destroy_data() noexcept } } +// C++ does not allow local classes that have template member. The rightful +// place of this class is inside mutation_fragment::key(). +struct get_mutation_fragment_key_visitor { + template + const clustering_key_prefix& operator()(const T& mf) { return mf.key(); } + const clustering_key_prefix& operator()(const static_row& sr) { abort(); } +}; + const clustering_key_prefix& mutation_fragment::key() const { assert(has_key()); - switch (_kind) { - case kind::clustering_row: - return as_clustering_row().key(); - case kind::range_tombstone_begin: - return as_range_tombstone_begin().key(); - case kind::range_tombstone_end: - return as_range_tombstone_end().key(); - default: - abort(); - } + return visit(get_mutation_fragment_key_visitor()); } int mutation_fragment::bound_kind_weight() const { assert(has_key()); - switch (_kind) { - case kind::clustering_row: - return 0; - case kind::range_tombstone_begin: - return weight(as_range_tombstone_begin().bound().kind); - case kind::range_tombstone_end: - return weight(as_range_tombstone_end().bound().kind); - default: - abort(); - } + struct get_bound_kind_weight { + int operator()(const clustering_row&) { return 0; } + int operator()(const range_tombstone_begin& rtb) { return weight(rtb.kind()); } + int operator()(const range_tombstone_end& rte) { return weight(rte.kind()); } + int operator()(...) { abort(); } + }; + return visit(get_bound_kind_weight()); } void mutation_fragment::apply(const schema& s, mutation_fragment&& mf) @@ -122,17 +118,7 @@ void mutation_fragment::apply(const schema& s, mutation_fragment&& mf) position_in_partition mutation_fragment::position() const { - switch (_kind) { - case kind::static_row: - return _data->_static_row.position(); - case kind::clustering_row: - return _data->_clustering_row.position(); - case kind::range_tombstone_begin: - return _data->_range_tombstone_begin.position(); - case kind::range_tombstone_end: - return _data->_range_tombstone_end.position(); - } - abort(); + return visit([] (auto& mf) { return mf.position(); }); } std::ostream& operator<<(std::ostream& os, const streamed_mutation& sm) { diff --git a/streamed_mutation.hh b/streamed_mutation.hh index 1c67cb23f1..64158715db 100644 --- a/streamed_mutation.hh +++ b/streamed_mutation.hh @@ -258,6 +258,32 @@ public: } abort(); } + + /* + template + concept bool MutationFragmentVisitor() { + return requires(T t, const static_row& sr, const clustering_row& cr, const range_tombstone_begin& rtb, const range_tombstone_end& rte) { + { t(sr) } -> ReturnType; + { t(cr) } -> ReturnType; + { t(rtb) } -> ReturnType; + { t(rte) } -> ReturnType; + }; + } + */ + template + decltype(auto) visit(Visitor&& visitor) const { + switch (_kind) { + case kind::static_row: + return visitor(as_static_row()); + case kind::clustering_row: + return visitor(as_clustering_row()); + case kind::range_tombstone_begin: + return visitor(as_range_tombstone_begin()); + case kind::range_tombstone_end: + return visitor(as_range_tombstone_end()); + } + abort(); + } }; class position_in_partition { From a289816b310d943eb41b91f00dc88d7a5213199f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:33:56 +0100 Subject: [PATCH 02/17] streamed_mutation: fix mutation_fragment::consume() return type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- streamed_mutation.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamed_mutation.hh b/streamed_mutation.hh index 64158715db..a8811fd0eb 100644 --- a/streamed_mutation.hh +++ b/streamed_mutation.hh @@ -245,7 +245,7 @@ public: } */ template - auto consume(Consumer& consumer) && { + decltype(auto) consume(Consumer& consumer) && { switch (_kind) { case kind::static_row: return consumer.consume(std::move(_data->_static_row)); From 703509a1c7aee7986e2a2935033830645aa6e489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:41:27 +0100 Subject: [PATCH 03/17] utils/managed_bytes: add memory_usage() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- utils/managed_bytes.hh | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/utils/managed_bytes.hh b/utils/managed_bytes.hh index cce0bf3964..3b1b63786a 100644 --- a/utils/managed_bytes.hh +++ b/utils/managed_bytes.hh @@ -369,6 +369,20 @@ public: return read_linearize(); } + // Returns the amount of external memory used. + size_t memory_usage() const { + if (external()) { + size_t mem = 0; + blob_storage* blob = _u.ptr; + while (blob) { + mem += blob->frag_size + sizeof(blob_storage); + blob = blob->next; + } + return mem; + } + return 0; + } + template friend std::result_of_t with_linearized_managed_bytes(Func&& func); }; From cfa581b42695decf94cfa614ba295959884350f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:41:43 +0100 Subject: [PATCH 04/17] utils/managed_vector: add memory_usage() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- utils/managed_vector.hh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/utils/managed_vector.hh b/utils/managed_vector.hh index 8d61436ec6..5d6f1e66c9 100644 --- a/utils/managed_vector.hh +++ b/utils/managed_vector.hh @@ -232,4 +232,12 @@ public: push_back(value); } } + + // Returns the amount of external memory used. + size_t memory_usage() const { + if (is_external()) { + return sizeof(external) + _capacity * sizeof(T); + } + return 0; + } }; From d0ee750ceccf30b730c9aa15d066d41842e1b1bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:42:04 +0100 Subject: [PATCH 05/17] keys: add memory_usage() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- keys.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/keys.hh b/keys.hh index 26edd310e8..e6ac630697 100644 --- a/keys.hh +++ b/keys.hh @@ -322,6 +322,10 @@ public: size_t size(const schema& s) const { return std::distance(begin(s), end(s)); } + + size_t memory_usage() const { + return _bytes.memory_usage(); + } }; template From 1d54327afdec909175976281721413d2d9d35164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:42:17 +0100 Subject: [PATCH 06/17] atomic_cell_or_collection: add memory_usage() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- atomic_cell_or_collection.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/atomic_cell_or_collection.hh b/atomic_cell_or_collection.hh index 69c0deb997..78454ac29f 100644 --- a/atomic_cell_or_collection.hh +++ b/atomic_cell_or_collection.hh @@ -63,5 +63,8 @@ public: ::feed_hash(as_collection_mutation(), h, def.type); } } + size_t memory_usage() const { + return _data.memory_usage(); + } friend std::ostream& operator<<(std::ostream&, const atomic_cell_or_collection&); }; From 23d0bfd0655e193635bdb817c1dfa3d459242399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:42:43 +0100 Subject: [PATCH 07/17] mutation_partition: add row::memory_usage() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- mutation_partition.cc | 15 +++++++++++++++ mutation_partition.hh | 2 ++ 2 files changed, 17 insertions(+) diff --git a/mutation_partition.cc b/mutation_partition.cc index 564e7f3dcd..8c7b73c9b8 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1078,6 +1078,21 @@ row::find_cell(column_id id) const { } } +size_t row::memory_usage() const { + size_t mem = 0; + if (_type == storage_type::vector) { + mem += _storage.vector.v.memory_usage(); + for (auto&& ac_o_c : _storage.vector.v) { + mem += ac_o_c.memory_usage(); + } + } else { + for (auto&& ce : _storage.set) { + mem += sizeof(cell_entry) + ce.cell().memory_usage(); + } + } + return mem; +} + template void mutation_partition::trim_rows(const schema& s, const std::vector& row_ranges, diff --git a/mutation_partition.hh b/mutation_partition.hh index 8ff716051f..aadc01c9ea 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -265,6 +265,8 @@ public: bool equal(column_kind kind, const schema& this_schema, const row& other, const schema& other_schema) const; + size_t memory_usage() const; + friend std::ostream& operator<<(std::ostream& os, const row& r); }; From 820bd6c9bcaaf01daf354719a159d261f3bc19ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 28 Jun 2016 11:43:00 +0100 Subject: [PATCH 08/17] streamed_mutation: add mutation_fragment::memory_usage() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- streamed_mutation.hh | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/streamed_mutation.hh b/streamed_mutation.hh index a8811fd0eb..51defa9533 100644 --- a/streamed_mutation.hh +++ b/streamed_mutation.hh @@ -89,6 +89,10 @@ public: } position_in_partition position() const; + + size_t memory_usage() const { + return _ck.memory_usage() + _cells.memory_usage(); + } }; class static_row { @@ -115,6 +119,10 @@ public: } position_in_partition position() const; + + size_t memory_usage() const { + return _cells.memory_usage(); + } }; class range_tombstone_begin { @@ -137,6 +145,10 @@ public: } position_in_partition position() const; + + size_t memory_usage() const { + return _ck.memory_usage(); + } }; class range_tombstone_end { @@ -153,6 +165,10 @@ public: bound_view bound() const { return bound_view(_ck, _kind); } position_in_partition position() const; + + size_t memory_usage() const { + return _ck.memory_usage(); + } }; class mutation_fragment { @@ -284,6 +300,10 @@ public: } abort(); } + + size_t memory_usage() const { + return sizeof(data) + visit([] (auto& mf) { return mf.memory_usage(); }); + } }; class position_in_partition { From 19629e95e2879cf49f094b1f74c8f74d53cf5c07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 1 Jun 2016 19:30:55 +0100 Subject: [PATCH 09/17] frozen_mutation: add fragment_add_freeze() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fragment_and_freeze() produces a stream of frozen mutations from a single streamed_mutation. Signed-off-by: Paweł Dziepak --- frozen_mutation.cc | 103 +++++++++++++++++++++++++++++++++++++++++++++ frozen_mutation.hh | 3 ++ 2 files changed, 106 insertions(+) diff --git a/frozen_mutation.cc b/frozen_mutation.cc index 67e7838186..e5f2ee7d61 100644 --- a/frozen_mutation.cc +++ b/frozen_mutation.cc @@ -188,3 +188,106 @@ future freeze(streamed_mutation sm) { return consume(sm, streamed_mutation_freezer(*sm.schema(), sm.key())); }); } + +class fragmenting_mutation_freezer { + const schema& _schema; + partition_key _key; + + tombstone _partition_tombstone; + stdx::optional _sr; + std::deque _crs; + range_tombstone_list _rts; + stdx::optional _range_tombstone_begin; + + frozen_mutation_consumer_fn _consumer; + + bool _fragmented = false; + size_t _dirty_size = 0; + size_t _fragment_size; +private: + future<> flush() { + bytes_ostream out; + ser::writer_of_mutation wom(out); + std::move(wom).write_table_id(_schema.id()) + .write_schema_version(_schema.version()) + .write_key(_key) + .partition([&] (auto wr) { + serialize_mutation_fragments(_schema, _partition_tombstone, + std::move(_sr), std::move(_rts), + std::move(_crs), std::move(wr)); + }).end_mutation(); + + _sr = { }; + _rts.clear(); + _crs.clear(); + _dirty_size = 0; + return _consumer(frozen_mutation(out.linearize(), _key), _fragmented); + } + + future maybe_flush() { + if (_dirty_size >= _fragment_size) { + _fragmented = true; + return flush().then([] { return stop_iteration::no; }); + } + return make_ready_future(stop_iteration::no); + } +public: + fragmenting_mutation_freezer(const schema& s, const partition_key& key, frozen_mutation_consumer_fn c, size_t fragment_size) + : _schema(s), _key(key), _rts(s), _consumer(c), _fragment_size(fragment_size) { } + + void consume(tombstone pt) { + _dirty_size += sizeof(tombstone); + _partition_tombstone = pt; + } + + future consume(static_row&& sr) { + _sr = std::move(sr); + _dirty_size += _sr->memory_usage() + sizeof(sr); + return maybe_flush(); + } + + future consume(clustering_row&& cr) { + _dirty_size += cr.memory_usage() + sizeof(cr); + _crs.emplace_back(std::move(cr)); + return maybe_flush(); + } + + future consume(range_tombstone_begin&& rtb) { + assert(!_range_tombstone_begin); + _range_tombstone_begin = std::move(rtb); + return make_ready_future(stop_iteration::no); + } + + future consume(range_tombstone_end&& rte) { + assert(_range_tombstone_begin); + _dirty_size += _range_tombstone_begin->memory_usage() + rte.memory_usage() + sizeof(range_tombstone); + _rts.apply(_schema, std::move(_range_tombstone_begin->key()), _range_tombstone_begin->kind(), + std::move(rte.key()), rte.kind(), _range_tombstone_begin->tomb()); + _range_tombstone_begin = { }; + return maybe_flush(); + } + + future consume_end_of_stream() { + assert(!_range_tombstone_begin); + if (_dirty_size) { + return flush().then([] { return stop_iteration::yes; }); + } + return make_ready_future(stop_iteration::yes); + } +}; + +future<> fragment_and_freeze(streamed_mutation sm, frozen_mutation_consumer_fn c, size_t fragment_size) +{ + fragmenting_mutation_freezer freezer(*sm.schema(), sm.key(), c, fragment_size); + return do_with(std::move(sm), std::move(freezer), [] (auto& sm, auto& freezer) { + freezer.consume(sm.partition_tombstone()); + return repeat([&] { + return sm().then([&] (auto mfopt) { + if (!mfopt) { + return freezer.consume_end_of_stream(); + } + return std::move(*mfopt).consume(freezer); + }); + }); + }); +} diff --git a/frozen_mutation.hh b/frozen_mutation.hh index 6a24cf73a6..e05dfab030 100644 --- a/frozen_mutation.hh +++ b/frozen_mutation.hh @@ -101,4 +101,7 @@ public: future freeze(streamed_mutation sm); +using frozen_mutation_consumer_fn = std::function(frozen_mutation, bool)>; +future<> fragment_and_freeze(streamed_mutation sm, frozen_mutation_consumer_fn c, + size_t fragment_size = 128 * 1024); From 4e34bd4e8a3caff68cd20aeac353133a18f23647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 1 Jun 2016 19:31:11 +0100 Subject: [PATCH 10/17] tests/streamed_mutation: test fragment_and_freeze() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- tests/streamed_mutation_test.cc | 46 +++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/streamed_mutation_test.cc b/tests/streamed_mutation_test.cc index 18cfaa0a2e..1fa3f70994 100644 --- a/tests/streamed_mutation_test.cc +++ b/tests/streamed_mutation_test.cc @@ -105,3 +105,49 @@ SEASTAR_TEST_CASE(test_freezing_streamed_mutations) { }); } +SEASTAR_TEST_CASE(test_fragmenting_and_freezing_streamed_mutations) { + return seastar::async([] { + storage_service_for_tests ssft; + + for_each_mutation([&] (const mutation& m) { + std::vector fms; + + fragment_and_freeze(streamed_mutation_from_mutation(mutation(m)), [&] (auto fm, bool frag) { + BOOST_REQUIRE(!frag); + fms.emplace_back(std::move(fm)); + return make_ready_future<>(); + }, std::numeric_limits::max()).get0(); + + BOOST_REQUIRE_EQUAL(fms.size(), 1); + + auto m1 = fms.back().unfreeze(m.schema()); + BOOST_REQUIRE_EQUAL(m, m1); + + fms.clear(); + + stdx::optional fragmented; + fragment_and_freeze(streamed_mutation_from_mutation(mutation(m)), [&] (auto fm, bool frag) { + BOOST_REQUIRE(!fragmented || *fragmented == frag); + *fragmented = frag; + fms.emplace_back(std::move(fm)); + return make_ready_future<>(); + }, 1).get0(); + + auto expected_fragments = m.partition().clustered_rows().size() + + m.partition().row_tombstones().size() + + !m.partition().static_row().empty(); + BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1))); + BOOST_REQUIRE(expected_fragments < 2 || *fragmented); + + auto m2 = fms.back().unfreeze(m.schema()); + fms.pop_back(); + while (!fms.empty()) { + m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema()); + fms.pop_back(); + } + BOOST_REQUIRE_EQUAL(m, m2); + }); + }); +} + + From a7b6c1110f4230a7db82c9ed8f9f4169868fb99e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 1 Jul 2016 10:29:35 +0100 Subject: [PATCH 11/17] sstables: do not require seal_sstable() to be run in thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- sstables/sstables.cc | 31 +++++++++++++++++-------------- sstables/sstables.hh | 2 +- tests/sstable_test.hh | 2 +- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index bce0e7e650..11ee2b4adb 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -817,23 +817,26 @@ void sstable::write_toc(const io_priority_class& pc) { }); } -void sstable::seal_sstable() { +future<> sstable::seal_sstable() { // SSTable sealing is about renaming temporary TOC file after guaranteeing // that each component reached the disk safely. - - file dir_f = open_checked_directory(sstable_write_error, _dir).get0(); - - sstable_write_io_check([&] { + return open_checked_directory(sstable_write_error, _dir).then([this] (file dir_f) { // Guarantee that every component of this sstable reached the disk. - dir_f.flush().get(); - // Rename TOC because it's no longer temporary. - engine().rename_file(filename(sstable::component_type::TemporaryTOC), filename(sstable::component_type::TOC)).get(); - // Guarantee that the changes above reached the disk. - dir_f.flush().get(); - dir_f.close().get(); + return sstable_write_io_check([&] { return dir_f.flush(); }).then([this] { + // Rename TOC because it's no longer temporary. + return sstable_write_io_check([&] { + return engine().rename_file(filename(sstable::component_type::TemporaryTOC), filename(sstable::component_type::TOC)); + }); + }).then([this, dir_f] () mutable { + // Guarantee that the changes above reached the disk. + return sstable_write_io_check([&] { return dir_f.flush(); }); + }).then([this, dir_f] () mutable { + return sstable_write_io_check([&] { return dir_f.close(); }); + }).then([this, dir_f] { + // If this point was reached, sstable should be safe in disk. + sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _ks, _cf); + }); }); - // If this point was reached, sstable should be safe in disk. - sstlog.debug("SSTable with generation {} of {}.{} was sealed successfully.", _generation, _ks, _cf); } void write_crc(const sstring file_path, checksum& c) { @@ -1593,7 +1596,7 @@ void sstable_writer::consume_end_of_stream() _sst.write_statistics(_pc); // NOTE: write_compression means maybe_write_compression. _sst.write_compression(_pc); - _sst.seal_sstable(); + _sst.seal_sstable().get(); if (_backup) { auto dir = _sst.get_dir() + "/backups/"; diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 7cef991c0e..9c64a5eeaf 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -393,7 +393,7 @@ private: void generate_toc(compressor c, double filter_fp_chance); void write_toc(const io_priority_class& pc); - void seal_sstable(); + future<> seal_sstable(); future<> read_compression(const io_priority_class& pc); void write_compression(const io_priority_class& pc); diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index faa70259bf..7b10e3b30f 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -116,7 +116,7 @@ public: sst->write_compression(default_priority_class()); sst->write_filter(default_priority_class()); sst->write_summary(default_priority_class()); - sst->seal_sstable(); + sst->seal_sstable().get(); }); } From 5bc51821fe55249b30a7653d1a69fc968b86a222 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 1 Jul 2016 10:31:07 +0100 Subject: [PATCH 12/17] sstables: allow writing unsealed sstables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The purpose of this patch is to split the actions of writing sstable and sealing it. As long as the sstable is unsealed it is considered incomplete and is going to be removed on reboot. Such functionality is needed in order to defer visibility of sstables created during streaming until the streaming is complete. Signed-off-by: Paweł Dziepak --- sstables/sstables.cc | 39 +++++++++++++++++++++++++-------------- sstables/sstables.hh | 12 ++++++++---- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 11ee2b4adb..769d5ac1af 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1537,10 +1537,10 @@ void components_writer::consume_end_of_stream() { seal_statistics(_sst._statistics, _sst._collector, dht::global_partitioner().name(), _schema.bloom_filter_fp_chance()); } -future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc) { +future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc, bool leave_unsealed) { _collector.set_replay_position(mt.replay_position()); return write_components(mt.make_reader(mt.schema()), - mt.partition_count(), mt.schema(), std::numeric_limits::max(), backup, pc); + mt.partition_count(), mt.schema(), std::numeric_limits::max(), backup, pc, leave_unsealed); } void sstable_writer::prepare_file_writer() @@ -1572,11 +1572,12 @@ void sstable_writer::finish_file_writer() } sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions, - uint64_t max_sstable_size, bool backup, const io_priority_class& pc) + uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc) : _sst(sst) , _schema(s) , _pc(pc) , _backup(backup) + , _leave_unsealed(leave_unsealed) { _sst.generate_toc(_schema.get_compressor_params().get_compressor(), _schema.bloom_filter_fp_chance()); _sst.write_toc(_pc); @@ -1596,25 +1597,35 @@ void sstable_writer::consume_end_of_stream() _sst.write_statistics(_pc); // NOTE: write_compression means maybe_write_compression. _sst.write_compression(_pc); - _sst.seal_sstable().get(); - if (_backup) { - auto dir = _sst.get_dir() + "/backups/"; - sstable_write_io_check(touch_directory, dir).get(); - _sst.create_links(dir).get(); + if (!_leave_unsealed) { + _sst.seal_sstable(_backup).get(); } } -sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size, - bool backup, const io_priority_class& pc) +future<> sstable::seal_sstable(bool backup) { - return sstable_writer(*this, s, estimated_partitions, max_sstable_size, backup, pc); + return seal_sstable().then([this, backup] { + if (backup) { + auto dir = get_dir() + "/backups/"; + return sstable_write_io_check(touch_directory, dir).then([this, dir] { + return create_links(dir); + }); + } + return make_ready_future<>(); + }); +} + +sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size, + bool backup, const io_priority_class& pc, bool leave_unsealed) +{ + return sstable_writer(*this, s, estimated_partitions, max_sstable_size, backup, leave_unsealed, pc); } future<> sstable::write_components(::mutation_reader mr, - uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup, const io_priority_class& pc) { - return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size, backup, &pc] () mutable { - auto wr = get_writer(*schema, estimated_partitions, max_sstable_size, backup, pc); + uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup, const io_priority_class& pc, bool leave_unsealed) { + return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), max_sstable_size, backup, &pc, leave_unsealed] () mutable { + auto wr = get_writer(*schema, estimated_partitions, max_sstable_size, backup, pc, leave_unsealed); consume_flattened_in_thread(mr, wr); }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 9c64a5eeaf..d92aeac248 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -233,14 +233,17 @@ public: // Write sstable components from a memtable. future<> write_components(memtable& mt, bool backup = false, - const io_priority_class& pc = default_priority_class()); + const io_priority_class& pc = default_priority_class(), bool leave_unsealed = false); future<> write_components(::mutation_reader mr, uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size, bool backup = false, - const io_priority_class& pc = default_priority_class()); + const io_priority_class& pc = default_priority_class(), bool leave_unsealed = false); sstable_writer get_writer(const schema& s, uint64_t estimated_partitions, uint64_t max_sstable_size, - bool backup = false, const io_priority_class& pc = default_priority_class()); + bool backup = false, const io_priority_class& pc = default_priority_class(), + bool leave_unsealed = false); + + future<> seal_sstable(bool backup); uint64_t get_estimated_key_count() const { return ((uint64_t)_summary.header.size_at_full_sampling + 1) * @@ -684,6 +687,7 @@ class sstable_writer { const schema& _schema; const io_priority_class& _pc; bool _backup; + bool _leave_unsealed; bool _compression_enabled; shared_ptr _writer; stdx::optional _components_writer; @@ -692,7 +696,7 @@ private: void finish_file_writer(); public: sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions, - uint64_t max_sstable_size, bool backup, const io_priority_class& pc); + uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc); void consume_new_partition(const dht::decorated_key& dk) { return _components_writer->consume_new_partition(dk); } void consume(tombstone t) { _components_writer->consume(t); } stop_iteration consume(static_row&& sr) { return _components_writer->consume(std::move(sr)); } From 51ec7a7285be06bd4f84901c706ed37942ed4a99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 5 Jul 2016 21:39:37 +0100 Subject: [PATCH 13/17] db: wait for ongoing flushes at end of streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When flush_streaming_mutations() is called at the end of streaming it is supposed to flush all data and then invalidate cache. ranges However, if there are already some memtable flushes in progress it won't wait for them. Signed-off-by: Paweł Dziepak --- database.cc | 8 ++++++-- database.hh | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/database.cc b/database.cc index 93b1581457..5712ea7e4e 100644 --- a/database.cc +++ b/database.cc @@ -687,6 +687,8 @@ column_family::seal_active_streaming_memtable_immediate() { } _streaming_memtables->add_memtable(); _streaming_memtables->erase(old); + + auto guard = _streaming_flush_phaser.start(); return with_gate(_streaming_flush_gate, [this, old] { _delayed_streaming_flush.cancel(); @@ -735,7 +737,7 @@ column_family::seal_active_streaming_memtable_immediate() { }); return f; - }); + }).finally([guard = std::move(guard)] { }); } future<> @@ -2734,7 +2736,9 @@ future<> column_family::flush_streaming_mutations(std::vectorseal_active_memtable(memtable_list::flush_behavior::delayed).finally([this, ranges = std::move(ranges)] { + return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).finally([this] { + return _streaming_flush_phaser.advance_and_await(); + }).finally([this, ranges = std::move(ranges)] { if (!_config.enable_cache) { return make_ready_future<>(); } diff --git a/database.hh b/database.hh index 793d15a716..3d8b6ca98b 100644 --- a/database.hh +++ b/database.hh @@ -371,6 +371,7 @@ private: // memory throttling mechanism, guaranteeing we will not overload the // server. lw_shared_ptr _streaming_memtables; + utils::phased_barrier _streaming_flush_phaser; friend class memtable_dirty_memory_manager; friend class streaming_dirty_memory_manager; From 4031c0ed8f92d6b04ce71c1906e5a7727b90ade0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 6 Jul 2016 01:30:07 +0100 Subject: [PATCH 14/17] streaming: pass plan_id to column family for apply and flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plan_id is needed to keep track of the origin of mutations so that if they are fragmented all fragments are made visible at the same time, when that particular streaming plan_id completes. Basically, each streaming plan that sends big (fragmented) mutations is going to have its own memtables and a list of sstables which will get flushed and made visible when that plan completes (or dropped if it fails). Signed-off-by: Paweł Dziepak --- database.cc | 10 +++++----- database.hh | 6 +++--- service/storage_proxy.cc | 6 +++--- service/storage_proxy.hh | 2 +- streaming/stream_session.cc | 4 ++-- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/database.cc b/database.cc index 5712ea7e4e..1b8c1e6b2c 100644 --- a/database.cc +++ b/database.cc @@ -2107,7 +2107,7 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const } } -void column_family::apply_streaming_mutation(schema_ptr m_schema, const frozen_mutation& m) { +void column_family::apply_streaming_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) { _streaming_memtables->active_memtable().apply(m, m_schema); _streaming_memtables->seal_on_overflow(); } @@ -2217,15 +2217,15 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m) { }); } -future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation& m) { +future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, const frozen_mutation& m) { if (!s->is_synced()) { throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] { + return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, s = std::move(s)] { auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); - cf.apply_streaming_mutation(s, std::move(m)); + cf.apply_streaming_mutation(s, plan_id, std::move(m)); }); } @@ -2730,7 +2730,7 @@ future<> column_family::flush(const db::replay_position& pos) { // so we always flush. When we can differentiate those streams, we should not // be indiscriminately touching the cache during repair. We will just have to // invalidate the entries that are relevant to things we already have in the cache. -future<> column_family::flush_streaming_mutations(std::vector ranges) { +future<> column_family::flush_streaming_mutations(utils::UUID plan_id, std::vector ranges) { // This will effectively take the gate twice for this call. The proper way to fix that would // be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we // need this code to go away as soon as we can (see FIXME above). So the double gate is a better diff --git a/database.hh b/database.hh index 3d8b6ca98b..b2e56dd99a 100644 --- a/database.hh +++ b/database.hh @@ -531,7 +531,7 @@ public: // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position()); void apply(const mutation& m, const db::replay_position& = db::replay_position()); - void apply_streaming_mutation(schema_ptr, const frozen_mutation&); + void apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&); // Returns at most "cmd.limit" rows future> query(schema_ptr, @@ -544,7 +544,7 @@ public: future<> stop(); future<> flush(); future<> flush(const db::replay_position&); - future<> flush_streaming_mutations(std::vector ranges = std::vector{}); + future<> flush_streaming_mutations(utils::UUID plan_id, std::vector ranges = std::vector{}); future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); @@ -1028,7 +1028,7 @@ public: future> query(schema_ptr, const query::read_command& cmd, query::result_request request, const std::vector& ranges); future query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range); future<> apply(schema_ptr, const frozen_mutation&); - future<> apply_streaming_mutation(schema_ptr, const frozen_mutation&); + future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&); keyspace::config make_keyspace_config(const keyspace_metadata& ksm); const sstring& get_snitch_name() const; future<> clear_snapshot(sstring tag, std::vector keyspace_names); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9fd6773aaf..f70d53e72c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -964,10 +964,10 @@ storage_proxy::mutate_locally(std::vector mutations) { } future<> -storage_proxy::mutate_streaming_mutation(const schema_ptr& s, const frozen_mutation& m) { +storage_proxy::mutate_streaming_mutation(const schema_ptr& s, utils::UUID plan_id, const frozen_mutation& m) { auto shard = _db.local().shard_of(m); - return _db.invoke_on(shard, [&m, gs = global_schema_ptr(s)] (database& db) mutable -> future<> { - return db.apply_streaming_mutation(gs, m); + return _db.invoke_on(shard, [&m, plan_id, gs = global_schema_ptr(s)] (database& db) mutable -> future<> { + return db.apply_streaming_mutation(gs, plan_id, m); }); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index fcc12fd52e..33862f96aa 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -254,7 +254,7 @@ public: future<> mutate_locally(const schema_ptr&, const frozen_mutation& m); future<> mutate_locally(std::vector mutations); - future<> mutate_streaming_mutation(const schema_ptr&, const frozen_mutation& m); + future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m); /** * Use this method to have these Mutations applied diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 72d05b016c..ed04a47d73 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -129,7 +129,7 @@ void stream_session::init_messaging_service_handler() { plan_id, from.addr, cf_id); return make_ready_future<>(); } - return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), fm).then_wrapped([plan_id, cf_id, from] (auto&& f) { + return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), plan_id, fm).then_wrapped([plan_id, cf_id, from] (auto&& f) { try { f.get(); return make_ready_future<>(); @@ -162,7 +162,7 @@ void stream_session::init_messaging_service_handler() { for (auto& range : ranges) { query_ranges.push_back(query::to_partition_range(range)); } - return cf.flush_streaming_mutations(std::move(query_ranges)); + return cf.flush_streaming_mutations(plan_id, std::move(query_ranges)); } catch (no_such_column_family) { sslog.warn("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped", plan_id, from, cf_id); From f2ae31711e51655d0a27b5a90c5a790d17f8d29c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 6 Jul 2016 01:43:25 +0100 Subject: [PATCH 15/17] streaming: inform CF when streaming fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Dziepak --- database.hh | 1 + streaming/stream_session.cc | 13 +++++++++++++ streaming/stream_session.hh | 1 + 3 files changed, 15 insertions(+) diff --git a/database.hh b/database.hh index b2e56dd99a..1175839da2 100644 --- a/database.hh +++ b/database.hh @@ -545,6 +545,7 @@ public: future<> flush(); future<> flush(const db::replay_position&); future<> flush_streaming_mutations(utils::UUID plan_id, std::vector ranges = std::vector{}); + future<> fail_streaming_mutations(utils::UUID plan_id) { return make_ready_future<>(); } future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index ed04a47d73..543ac1af5d 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -424,6 +424,18 @@ void stream_session::add_transfer_ranges(sstring keyspace, std::vector stream_session::receiving_failed(UUID cf_id) +{ + return get_db().invoke_on_all([cf_id, plan_id = plan_id()] (database& db) { + try { + auto& cf = db.find_column_family(cf_id); + return cf.fail_streaming_mutations(plan_id); + } catch (no_such_column_family) { + return make_ready_future<>(); + } + }); +} + void stream_session::close_session(stream_session_state final_state) { sslog.debug("[Stream #{}] close_session session={}, state={}, is_aborted={}", plan_id(), this, final_state, _is_aborted); if (!_is_aborted) { @@ -439,6 +451,7 @@ void stream_session::close_session(stream_session_state final_state) { for (auto& x : _receivers) { stream_receive_task& task = x.second; sslog.debug("[Stream #{}] close_session session={}, state={}, abort stream_receive_task cf_id={}", plan_id(), this, final_state, task.cf_id); + receiving_failed(x.first); task.abort(); } } diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 94e7541cc2..7f96c06681 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -338,6 +338,7 @@ private: bool maybe_completed(); void prepare_receiving(stream_summary& summary); void start_streaming_files(); + future<> receiving_failed(UUID cf_id); }; } // namespace streaming From 32a5de7a1fd0afdec657576c803e5202e51ae5e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 6 Jul 2016 02:13:16 +0100 Subject: [PATCH 16/17] db: handle receiving fragmented mutations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If mutations are fragmented during streaming a special care must be taken so that isolation guarantees are not broken. Mutations received with flag "fragmented" set are applied to a memtable that is used only by that particular streaming task and the sstables created by flushing such memtables are not made visible until the task is complte. Also, in case the streaming fails all data is dropped. This means that fragmented mutations cannot benefit from coalescing of writes from multiple streaming plans, hence separate way of handling them so that there is no loss of performance for small partitions. Signed-off-by: Paweł Dziepak --- database.cc | 114 ++++++++++++++++++++++++++++-- database.hh | 25 ++++++- message/messaging_service.cc | 6 +- message/messaging_service.hh | 4 +- service/storage_proxy.cc | 6 +- service/storage_proxy.hh | 2 +- streaming/stream_session.cc | 13 ++-- streaming/stream_transfer_task.cc | 2 +- 8 files changed, 147 insertions(+), 25 deletions(-) diff --git a/database.cc b/database.cc index 1b8c1e6b2c..62198e6b9a 100644 --- a/database.cc +++ b/database.cc @@ -113,6 +113,13 @@ column_family::make_streaming_memtable_list() { return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager); } +lw_shared_ptr +column_family::make_streaming_memtable_big_list(streaming_memtable_big& smb) { + auto seal = [this, &smb] (memtable_list::flush_behavior) { return seal_active_streaming_memtable_big(smb); }; + auto get_schema = [this] { return schema(); }; + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager); +} + column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) : _schema(std::move(schema)) , _config(std::move(config)) @@ -166,6 +173,11 @@ logalloc::occupancy_stats column_family::occupancy() const { for (auto m : *_streaming_memtables) { res += m->region().occupancy(); } + for (auto smb : _streaming_memtables_big) { + for (auto m : *smb.second->memtables) { + res += m->region().occupancy(); + } + } return res; } @@ -740,6 +752,35 @@ column_family::seal_active_streaming_memtable_immediate() { }).finally([guard = std::move(guard)] { }); } +future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_big& smb) { + auto old = smb.memtables->back(); + if (old->empty()) { + return make_ready_future<>(); + } + smb.memtables->add_memtable(); + smb.memtables->erase(old); + return with_gate(_streaming_flush_gate, [this, old, &smb] { + return with_gate(smb.flush_in_progress, [this, old, &smb] { + return with_lock(_sstables_lock.for_read(), [this, old, &smb] { + auto newtab = make_lw_shared(_schema->ks_name(), _schema->cf_name(), + _config.datadir, calculate_generation_for_new_table(), + sstables::sstable::version_types::ka, + sstables::sstable::format_types::big); + + newtab->set_unshared(); + + auto&& priority = service::get_local_streaming_write_priority(); + return newtab->write_components(*old, incremental_backups_enabled(), priority, true).then([this, newtab, old, &smb] { + smb.sstables.emplace_back(newtab); + }).handle_exception([] (auto ep) { + dblog.error("failed to write streamed sstable: {}", ep); + return make_exception_future<>(ep); + }); + }); + }); + }); +} + future<> column_family::seal_active_memtable(memtable_list::flush_behavior ignored) { auto old = _memtables->back(); @@ -2107,11 +2148,26 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const } } -void column_family::apply_streaming_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) { +void column_family::apply_streaming_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) { + if (fragmented) { + apply_streaming_big_mutation(std::move(m_schema), plan_id, m); + return; + } _streaming_memtables->active_memtable().apply(m, m_schema); _streaming_memtables->seal_on_overflow(); } +void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) { + auto it = _streaming_memtables_big.find(plan_id); + if (it == _streaming_memtables_big.end()) { + it = _streaming_memtables_big.emplace(plan_id, make_lw_shared()).first; + it->second->memtables = _config.enable_disk_writes ? make_streaming_memtable_big_list(*it->second) : make_memory_only_memtable_list(); + } + auto entry = it->second; + entry->memtables->active_memtable().apply(m, m_schema); + entry->memtables->seal_on_overflow(); +} + void column_family::check_valid_rp(const db::replay_position& rp) const { if (rp < _highest_flushed_rp) { @@ -2217,15 +2273,15 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m) { }); } -future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, const frozen_mutation& m) { +future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) { if (!s->is_synced()) { throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, s = std::move(s)] { + return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, fragmented, s = std::move(s)] { auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); - cf.apply_streaming_mutation(s, plan_id, std::move(m)); + cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented); }); } @@ -2735,8 +2791,10 @@ future<> column_family::flush_streaming_mutations(utils::UUID plan_id, std::vect // be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we // need this code to go away as soon as we can (see FIXME above). So the double gate is a better // temporary counter measure. - return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] { - return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).finally([this] { + return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] { + return flush_streaming_big_mutations(plan_id).then([this] { + return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed); + }).finally([this] { return _streaming_flush_phaser.advance_and_await(); }).finally([this, ranges = std::move(ranges)] { if (!_config.enable_cache) { @@ -2751,11 +2809,49 @@ future<> column_family::flush_streaming_mutations(utils::UUID plan_id, std::vect }); } +future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) { + auto it = _streaming_memtables_big.find(plan_id); + if (it == _streaming_memtables_big.end()) { + return make_ready_future<>(); + } + auto entry = it->second; + _streaming_memtables_big.erase(it); + return entry->memtables->seal_active_memtable(memtable_list::flush_behavior::immediate).then([entry] { + return entry->flush_in_progress.close(); + }).then([this, entry] { + return parallel_for_each(entry->sstables, [this] (auto& sst) { + return sst->seal_sstable(this->incremental_backups_enabled()).then([sst] { + return sst->open_data(); + }); + }).then([this, entry] { + for (auto&& sst : entry->sstables) { + add_sstable(sst); + } + trigger_compaction(); + }); + }); +} + +future<> column_family::fail_streaming_mutations(utils::UUID plan_id) { + auto it = _streaming_memtables_big.find(plan_id); + if (it == _streaming_memtables_big.end()) { + return make_ready_future<>(); + } + auto entry = it->second; + _streaming_memtables_big.erase(it); + return entry->flush_in_progress.close().then([this, entry] { + for (auto&& sst : entry->sstables) { + sst->mark_for_deletion(); + } + }); +} + future<> column_family::clear() { _memtables->clear(); _memtables->add_memtable(); _streaming_memtables->clear(); _streaming_memtables->add_memtable(); + _streaming_memtables_big.clear(); return _cache.clear(); } @@ -2838,6 +2934,12 @@ void column_family::set_schema(schema_ptr s) { m->set_schema(s); } + for (auto smb : _streaming_memtables_big) { + for (auto m : *smb.second->memtables) { + m->set_schema(s); + } + } + _cache.set_schema(s); _schema = std::move(s); diff --git a/database.hh b/database.hh index 1175839da2..3793172c73 100644 --- a/database.hh +++ b/database.hh @@ -376,9 +376,28 @@ private: friend class memtable_dirty_memory_manager; friend class streaming_dirty_memory_manager; + // If mutations are fragmented during streaming the sstables cannot be made + // visible immediately after memtable flush, because that could cause + // readers to see only a part of a partition thus violating isolation + // guarantees. + // Mutations that are sent in fragments are kept separately in per-streaming + // plan memtables and the resulting sstables are not made visible until + // the streaming is complete. + struct streaming_memtable_big { + lw_shared_ptr memtables; + std::vector sstables; + seastar::gate flush_in_progress; + }; + std::unordered_map> _streaming_memtables_big; + + future<> flush_streaming_big_mutations(utils::UUID plan_id); + void apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m); + future<> seal_active_streaming_memtable_big(streaming_memtable_big& smb); + lw_shared_ptr make_memory_only_memtable_list(); lw_shared_ptr make_memtable_list(); lw_shared_ptr make_streaming_memtable_list(); + lw_shared_ptr make_streaming_memtable_big_list(streaming_memtable_big& smb); sstables::compaction_strategy _compaction_strategy; // generation -> sstable. Ordered by key so we can easily get the most recent. @@ -531,7 +550,7 @@ public: // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position()); void apply(const mutation& m, const db::replay_position& = db::replay_position()); - void apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&); + void apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented); // Returns at most "cmd.limit" rows future> query(schema_ptr, @@ -545,7 +564,7 @@ public: future<> flush(); future<> flush(const db::replay_position&); future<> flush_streaming_mutations(utils::UUID plan_id, std::vector ranges = std::vector{}); - future<> fail_streaming_mutations(utils::UUID plan_id) { return make_ready_future<>(); } + future<> fail_streaming_mutations(utils::UUID plan_id); future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); @@ -1029,7 +1048,7 @@ public: future> query(schema_ptr, const query::read_command& cmd, query::result_request request, const std::vector& ranges); future query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range); future<> apply(schema_ptr, const frozen_mutation&); - future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&); + future<> apply_streaming_mutation(schema_ptr, utils::UUID plan_id, const frozen_mutation&, bool fragmented); keyspace::config make_keyspace_config(const keyspace_metadata& ksm); const sstring& get_snitch_name() const; future<> clear_snapshot(sstring tag, std::vector keyspace_names); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index c97c2ba03b..441d68d189 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -644,13 +644,13 @@ future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id, } // STREAM_MUTATION -void messaging_service::register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func) { +void messaging_service::register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional fragmented)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION, std::move(func)); } -future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) { +future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented) { return send_message_timeout_and_retry(this, messaging_verb::STREAM_MUTATION, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, - plan_id, std::move(fm), dst_cpu_id); + plan_id, std::move(fm), dst_cpu_id, fragmented); } // STREAM_MUTATION_DONE diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5b19286bc1..78d6f243aa 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -216,8 +216,8 @@ public: future<> send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id); // Wrapper for STREAM_MUTATION verb - void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func); - future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id); + void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional)>&& func); + future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented); void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f70d53e72c..df2a41e1bb 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -964,10 +964,10 @@ storage_proxy::mutate_locally(std::vector mutations) { } future<> -storage_proxy::mutate_streaming_mutation(const schema_ptr& s, utils::UUID plan_id, const frozen_mutation& m) { +storage_proxy::mutate_streaming_mutation(const schema_ptr& s, utils::UUID plan_id, const frozen_mutation& m, bool fragmented) { auto shard = _db.local().shard_of(m); - return _db.invoke_on(shard, [&m, plan_id, gs = global_schema_ptr(s)] (database& db) mutable -> future<> { - return db.apply_streaming_mutation(gs, plan_id, m); + return _db.invoke_on(shard, [&m, plan_id, fragmented, gs = global_schema_ptr(s)] (database& db) mutable -> future<> { + return db.apply_streaming_mutation(gs, plan_id, m, fragmented); }); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 33862f96aa..b286e2acd4 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -254,7 +254,7 @@ public: future<> mutate_locally(const schema_ptr&, const frozen_mutation& m); future<> mutate_locally(std::vector mutations); - future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m); + future<> mutate_streaming_mutation(const schema_ptr&, utils::UUID plan_id, const frozen_mutation& m, bool fragmented); /** * Use this method to have these Mutations applied diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 543ac1af5d..c7af346f91 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -114,28 +114,29 @@ void stream_session::init_messaging_service_handler() { return make_ready_future<>(); }); }); - ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) { + ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional fragmented_opt) { auto from = net::messaging_service::get_source(cinfo); - return do_with(std::move(fm), [plan_id, from] (const auto& fm) { + auto fragmented = fragmented_opt && *fragmented_opt; + return do_with(std::move(fm), [plan_id, from, fragmented] (const auto& fm) { auto fm_size = fm.representation().size(); get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, fm_size); - return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm] (schema_ptr s) { + return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm, fragmented] (schema_ptr s) { auto cf_id = fm.column_family_id(); sslog.debug("[Stream #{}] GOT STREAM_MUTATION from {}: cf_id={}", plan_id, from.addr, cf_id); auto& db = service::get_local_storage_proxy().get_db().local(); if (!db.column_family_exists(cf_id)) { sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped", - plan_id, from.addr, cf_id); + plan_id, from.addr, cf_id); return make_ready_future<>(); } - return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), plan_id, fm).then_wrapped([plan_id, cf_id, from] (auto&& f) { + return service::get_storage_proxy().local().mutate_streaming_mutation(std::move(s), plan_id, fm, fragmented).then_wrapped([plan_id, cf_id, from] (auto&& f) { try { f.get(); return make_ready_future<>(); } catch (no_such_column_family) { sslog.warn("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped", - plan_id, from.addr, cf_id); + plan_id, from.addr, cf_id); return make_ready_future<>(); } catch (...) { throw; diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 9abdef2cd3..1e702c1134 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -87,7 +87,7 @@ struct send_info { future do_send_mutations(auto si, auto fm) { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); - net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] { + net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, false).then([si, fm_size] { sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); si->mutations_done.signal(); From d9eb4d8028aa0205c1d99a30cf6f1512ed55ff23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 1 Jun 2016 19:36:30 +0100 Subject: [PATCH 17/17] streaming: use fragment_and_freeze() to send mutations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Commit 206955e4 "streaming: Reduce memory usage when sending mutations" moved streaming mutation limiter from do_send_mutations() to send_mutations(). The reason for that was that send_mutation() did full mutation copies. That's no longer the case and streaming limiter should be moved back to do_send_mutation() in order to provide back pressure to fragment_and_freeze(). Signed-off-by: Paweł Dziepak --- streaming/stream_transfer_task.cc | 58 +++++++++++++++---------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 1e702c1134..9e36d34a3f 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -84,44 +84,42 @@ struct send_info { } }; -future do_send_mutations(auto si, auto fm) { - sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); - auto fm_size = fm.representation().size(); - net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, false).then([si, fm_size] { - sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); - get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); - si->mutations_done.signal(); - }).handle_exception([si] (auto ep) { - // There might be larger number of STREAM_MUTATION inflight. - // Log one error per column_family per range - if (!si->error_logged) { - si->error_logged = true; - sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep); - } - si->mutations_done.broken(); +future<> do_send_mutations(auto si, auto fm, bool fragmented) { + return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable { + sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); + auto fm_size = fm.representation().size(); + net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented).then([si, fm_size] { + sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); + get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); + si->mutations_done.signal(); + }).handle_exception([si] (auto ep) { + // There might be larger number of STREAM_MUTATION inflight. + // Log one error per column_family per range + if (!si->error_logged) { + si->error_logged = true; + sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep); + } + si->mutations_done.broken(); + }).finally([] { + get_local_stream_manager().mutation_send_limiter().signal(); + }); }); - return make_ready_future(stop_iteration::no); } future<> send_mutations(auto si) { auto& cf = si->db.find_column_family(si->cf_id); auto& priority = service::get_local_streaming_read_priority(); return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) { - return repeat([si, &reader] { - return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] { - return reader().then([] (auto sm) { - return mutation_from_streamed_mutation(std::move(sm)); - }).then([si] (auto mopt) { - if (mopt && si->db.column_family_exists(si->cf_id)) { + return repeat([si, &reader] () { + return reader().then([si] (auto smopt) { + if (smopt && si->db.column_family_exists(si->cf_id)) { + return fragment_and_freeze(std::move(*smopt), [si] (auto fm, bool fragmented) { si->mutations_nr++; - auto fm = frozen_mutation(*mopt); - return do_send_mutations(si, std::move(fm)); - } else { - return make_ready_future(stop_iteration::yes); - } - }); - }).finally([] { - get_local_stream_manager().mutation_send_limiter().signal(); + return do_send_mutations(si, std::move(fm), fragmented); + }).then([] { return stop_iteration::no; }); + } else { + return make_ready_future(stop_iteration::yes); + } }); }); }).then([si] {