From 96737d140f078183fa541d4ad4be4a661f755511 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 9 Jul 2018 20:27:26 +0300 Subject: [PATCH 1/4] utils: add observer/observable templates An observable is used to decouple an information producer from a consumer (in the same way as a callback), while allowing multiple consumers (called observers) to coexist and to manage their lifetime separately. Two classes are introduced: observable: a producer class; when an observable is invoked all observers receive the information observer: a consumer class; receives information from a observable Modelled after boost::signals2, with the following changes - all signals return void; information is passed from the producer to the consumer but not back - thread-unsafe - modern C++ without preprocessor hacks - connection lifetime is always managed rather than leaked by default - renamed to avoid the funky "slot" name Message-Id: <20180709172726.5079-1-avi@scylladb.com> --- configure.py | 2 + tests/observable_test.cc | 71 +++++++++++++++++++++ utils/observable.hh | 132 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 205 insertions(+) create mode 100644 tests/observable_test.cc create mode 100644 utils/observable.hh diff --git a/configure.py b/configure.py index c39756b2d2..aa9b5d63f0 100755 --- a/configure.py +++ b/configure.py @@ -304,6 +304,7 @@ scylla_tests = [ 'tests/partition_data_test', 'tests/reusable_buffer_test', 'tests/multishard_writer_test', + 'tests/observable_test', ] perf_tests = [ @@ -742,6 +743,7 @@ pure_boost_tests = set([ 'tests/imr_test', 'tests/partition_data_test', 'tests/reusable_buffer_test', + 'tests/observable_test', ]) tests_not_using_seastar_test_framework = set([ diff --git a/tests/observable_test.cc b/tests/observable_test.cc new file mode 100644 index 0000000000..ae99caedc3 --- /dev/null +++ b/tests/observable_test.cc @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#define BOOST_TEST_MODULE observable_test + +#include + + +#include "../utils/observable.hh" + +using namespace utils; + +BOOST_AUTO_TEST_CASE(test_basic_functionality) { + observable pub; + int v1 = 0, v2 = 0; + observer sub1 = pub.observe([&] (int x) { v1 = x; }); + observer sub2 = pub.observe([&] (int x) { v2 = x; }); + pub(7); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 7); + sub1.disconnect(); + pub(3); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 3); + sub1 = std::move(sub2); + pub(4); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 4); + pub = observable(); + pub(5); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 4); +} + +BOOST_AUTO_TEST_CASE(test_exceptions) { + observable<> pub; + bool saw1 = false; + observer<> sub1 = pub.observe([&] { saw1 = true; }); + observer<> sub2 = pub.observe([&] { throw 2; }); + bool saw3 = false; + observer<> sub3 = pub.observe([&] { saw3 = true; }); + observer<> sub4 = pub.observe([&] { throw 4; }); + bool caught = false; + try { + pub(); + } catch (int v) { + BOOST_REQUIRE(saw1); + BOOST_REQUIRE(saw3); + BOOST_REQUIRE(v == 2 || v == 4); + caught = true; + } + BOOST_REQUIRE(caught); +} diff --git a/utils/observable.hh b/utils/observable.hh new file mode 100644 index 0000000000..cab1b41702 --- /dev/null +++ b/utils/observable.hh @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +namespace utils { + +template +class observable { +public: + class observer; +private: + std::vector _observers; +public: + class observer { + friend class observable; + observable* _observable; + std::function _callback; + private: + void moved(observer* from) { + if (_observable) { + _observable->moved(from, this); + } + } + public: + observer(observable* o, std::function callback) noexcept + : _observable(o), _callback(std::move(callback)) { + } + observer(observer&& o) noexcept + : _observable(std::exchange(o._observable, nullptr)) + , _callback(std::move(o._callback)) { + moved(&o); + } + observer& operator=(observer&& o) noexcept { + if (this != &o) { + disconnect(); + _observable = std::exchange(o._observable, nullptr); + _callback = std::move(o._callback); + moved(&o); + } + return *this; + } + ~observer() { + disconnect(); + } + void disconnect() { + if (_observable) { + _observable->destroyed(this); + } + } + }; + friend class observer; +private: + void destroyed(observer* dead) { + _observers.erase(boost::remove(_observers, dead), _observers.end()); + } + void moved(observer* from, observer* to) { + boost::replace(_observers, from, to); + } + void update_observers(observable* ob) { + for (auto&& c : _observers) { + c->_observable = ob; + } + } +public: + observable() = default; + observable(observable&& o) noexcept + : _observers(std::move(o._observers)) { + update_observers(this); + } + observable& operator=(observable&& o) noexcept { + if (this != &o) { + update_observers(nullptr); + _observers = std::move(o._observers); + update_observers(this); + } + return *this; + } + ~observable() { + update_observers(nullptr); + } + // Send args to all connected observers + void operator()(Args... args) const { + std::exception_ptr e; + for (auto&& ob : _observers) { + try { + ob->_callback(args...); + } catch (...) { + if (!e) { + e = std::current_exception(); + } + } + } + if (e) { + std::rethrow_exception(std::move(e)); + } + } + // Adds an observer to an observable + observer observe(std::function callback) { + observer ob(this, std::move(callback)); + _observers.push_back(&ob); + return ob; + } +}; + +template +using observer = typename observable::observer; + +} From f64901fdacf6e5c852dda151f11e684707956b67 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Tue, 10 Jul 2018 10:03:14 +0200 Subject: [PATCH 2/4] test_uncompressed_compound_ck_read: fix comment Signed-off-by: Piotr Jastrzebski --- tests/sstable_3_x_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index a72fbccff9..2de9cc9528 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -1281,7 +1281,7 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_simple_read) { .produces_end_of_stream(); } -// Following tests run on files in tests/sstables/3.x/uncompressed/simple +// Following tests run on files in tests/sstables/3.x/uncompressed/compound_ck // They were created using following CQL statements: // // CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; From 54fc6dde3547f2d4e1016966a25626e1bd62c502 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 25 Jun 2018 20:11:08 +0200 Subject: [PATCH 3/4] sstables: Support deleted cells in reading SST3 Signed-off-by: Piotr Jastrzebski --- sstables/mp_row_consumer.hh | 19 +++++++++++-------- sstables/row.hh | 6 ++++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh index b996c9e1a7..882d675723 100644 --- a/sstables/mp_row_consumer.hh +++ b/sstables/mp_row_consumer.hh @@ -950,7 +950,8 @@ public: bytes_view value, api::timestamp_type timestamp, gc_clock::duration ttl, - gc_clock::time_point local_deletion_time) override { + gc_clock::time_point local_deletion_time, + bool is_deleted) override { if (!column_id) { return proceed::yes; } @@ -960,15 +961,17 @@ public: } if (column_def.is_multi_cell()) { auto ctype = static_pointer_cast(column_def.type); - auto ac = make_atomic_cell(*ctype->value_comparator(), - timestamp, - value, - ttl, - local_deletion_time, - atomic_cell::collection_member::yes); + auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time) + : make_atomic_cell(*ctype->value_comparator(), + timestamp, + value, + ttl, + local_deletion_time, + atomic_cell::collection_member::yes); _cm.cells.emplace_back(to_bytes(cell_path), std::move(ac)); } else { - auto ac = make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time, + auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time) + : make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time, atomic_cell::collection_member::no); _cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))}); } diff --git a/sstables/row.hh b/sstables/row.hh index c72c3a4c9f..3f847c2a67 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -164,7 +164,8 @@ public: bytes_view value, api::timestamp_type timestamp, gc_clock::duration ttl, - gc_clock::time_point local_deletion_time) = 0; + gc_clock::time_point local_deletion_time, + bool is_deleted) = 0; virtual proceed consume_complex_column_start(stdx::optional column_id, tombstone tomb) = 0; @@ -1025,7 +1026,8 @@ public: to_bytes_view(_column_value), _column_timestamp, _column_ttl, - _column_local_deletion_time) == consumer_m::proceed::no) { + _column_local_deletion_time, + _column_flags.is_deleted()) == consumer_m::proceed::no) { return consumer_m::proceed::no; } } From 0abdd919c8d27491c2366ba61c491051a7ac8a12 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 25 Jun 2018 20:11:29 +0200 Subject: [PATCH 4/4] sstables: Test reading deleted cells from SST3 Signed-off-by: Piotr Jastrzebski --- tests/sstable_3_x_test.cc | 76 +++++++++++++++++- .../deleted_cells/mc-1-big-CRC.db | Bin 0 -> 8 bytes .../deleted_cells/mc-1-big-Data.db | Bin 0 -> 103 bytes .../deleted_cells/mc-1-big-Digest.crc32 | 1 + .../deleted_cells/mc-1-big-Filter.db | Bin 0 -> 16 bytes .../deleted_cells/mc-1-big-Index.db | Bin 0 -> 8 bytes .../deleted_cells/mc-1-big-Statistics.db | Bin 0 -> 4679 bytes .../deleted_cells/mc-1-big-Summary.db | Bin 0 -> 56 bytes .../deleted_cells/mc-1-big-TOC.txt | 8 ++ 9 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-CRC.db create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Data.db create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Digest.crc32 create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Filter.db create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Index.db create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Statistics.db create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Summary.db create mode 100644 tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-TOC.txt diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index 2de9cc9528..26e553feda 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -1172,7 +1172,81 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_large_subset_of_columns_dense_read) { .produces_end_of_stream(); } -// Following tests run on files in tests/sstables/3.x/uncompressed/simple +// Following tests run on files in tests/sstables/3.x/uncompressed/deleted_cells +// They were created using following CQL statements: +// +// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +// +// CREATE TABLE test_ks.test_table ( pk INT, ck INT, val INT, PRIMARY KEY(pk, ck)) +// WITH compression = { 'enabled' : false }; +// +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 101, 1001); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 102, 1002); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 103, 1003); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 104, 1004); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 105, 1005); +// DELETE val FROM test_ks.test_table WHERE pk = 1 AND ck = 102; +// DELETE val FROM test_ks.test_table WHERE pk = 1 AND ck = 104; + +static thread_local const sstring UNCOMPRESSED_DELETED_CELLS_PATH = "tests/sstables/3.x/uncompressed/deleted_cells"; +static thread_local const schema_ptr UNCOMPRESSED_DELETED_CELLS_SCHEMA = + schema_builder("test_ks", "test_table") + .with_column("pk", int32_type, column_kind::partition_key) + .with_column("ck", int32_type, column_kind::clustering_key) + .with_column("val", int32_type) + .build(); + +SEASTAR_THREAD_TEST_CASE(test_uncompressed_deleted_cells_read) { + sstable_assertions sst(UNCOMPRESSED_DELETED_CELLS_SCHEMA, UNCOMPRESSED_DELETED_CELLS_PATH); + sst.load(); + auto to_key = [] (int key) { + auto bytes = int32_type->decompose(int32_t(key)); + auto pk = partition_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, bytes); + return dht::global_partitioner().decorate_key(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, pk); + }; + + auto int_cdef = + UNCOMPRESSED_DELETED_CELLS_SCHEMA->get_column_definition(to_bytes("val")); + BOOST_REQUIRE(int_cdef); + + auto generate = [&] (uint64_t timestamp, uint64_t deletion_time) { + std::vector assertions; + + assertions.push_back([timestamp, deletion_time] (const column_definition& def, + const atomic_cell_or_collection* cell) { + auto c = cell->as_atomic_cell(def); + BOOST_REQUIRE(!c.is_live()); + BOOST_REQUIRE_EQUAL(timestamp, c.timestamp()); + BOOST_REQUIRE_EQUAL(deletion_time, c.deletion_time().time_since_epoch().count()); + }); + + return assertions; + }; + + std::vector ids{int_cdef->id}; + + assert_that(sst.read_rows_flat()) + .produces_partition_start(to_key(1)) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(101)), + {{int_cdef, int32_type->decompose(1001)}}) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(102)), + ids, generate(1529586065552460, 1529586065)) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(103)), + {{int_cdef, int32_type->decompose(1003)}}) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(104)), + ids, generate(1529586067568210, 1529586067)) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(105)), + {{int_cdef, int32_type->decompose(1005)}}) + .produces_partition_end() + .produces_end_of_stream(); +} + +// Following tests run on files in tests/sstables/3.x/uncompressed/compound_ck // They were created using following CQL statements: // // CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-CRC.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-CRC.db new file mode 100644 index 0000000000000000000000000000000000000000..dd2991b9432150a9ab2f79ec26aa1b9d65c47124 GIT binary patch literal 8 PcmZQzWMJ4<+|2_31!4ih literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Data.db new file mode 100644 index 0000000000000000000000000000000000000000..1f6cddd46a1a574d58c82773abe7cf0461b0756d GIT binary patch literal 103 zcmZQzVPIfjtpET2e*=&K0xBRPm0gH|gMoqhC774S&G*1hemU#o?>}N8^66Yc52`bs jf|R@lE6Ly%e6Y%so%M;>u4E>#NG4Fp-QE=-C2tu4pCcID literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Digest.crc32 b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Digest.crc32 new file mode 100644 index 0000000000..f506768c05 --- /dev/null +++ b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Digest.crc32 @@ -0,0 +1 @@ +3061025548 \ No newline at end of file diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Filter.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Filter.db new file mode 100644 index 0000000000000000000000000000000000000000..f9c2d6ed9c0f2cf0fdcdc9cfce9f6b88e1c0e800 GIT binary patch literal 16 XcmZQzU|?lnU|?iWP;hWyU=RQR1WEwc literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Index.db new file mode 100644 index 0000000000000000000000000000000000000000..b077026fd87b3cdd609bc60c41ad771d3edb460d GIT binary patch literal 8 PcmZQzVPIfjWMBXQ04D$j literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Statistics.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Statistics.db new file mode 100644 index 0000000000000000000000000000000000000000..fe199194f5c413e9a3ea269985e7d40baf4d78d9 GIT binary patch literal 4679 zcmeI$eM}Q)90%}w?pg|1PFwH=7I7lzgryog#>`X%YoNEU9QTTb)sc!_eIUr@!yxO#E|+eUi&P zpZ=a}pL?F??tVQ%5DdadNmO@C-BzfZsBR_dCJBP&n2=KLDY7XQN@20vR;YNrN}0=} z*j&Xv+iPn*rE5L*9L3`+@s*U9xjpGMRljE)iCM26Yz_v4Ls7Ae9oRk?p}JAsw4(h8 zExffKr$y}q6uraxheo|Jx)0-Q;vCL7lCzn!%-O;@hVvxOlQ~c0oXmL+=Xsp7Ilnn- zeH@Fp_y*29IUnWRgJ#@(uot6V#p01 zGZYP$UAP`Cj_VMQX`29^bR`ZvxhEGq?L5wR@&L}az5X<`pWk~G{7MG)Z$+FR;;a9# zf;TT|0q?r^61eqi9G}arFrF-O++Et9rDtseZ%Q}_{{GBtYH`MKxW9@Y%NJ=}imQb6 zA+^1u-TNODLuJH*2T&`J~u7mbgToE_BB zX-Tl2C(8Feq48%9HQb<{`a<>>)H8lsK971dpULPzS30c!lYvh zsn`7z)kj_TO>-9Yp}buQ)IT)V?4a)ds(UdpdoHS=jk<8rB6|I^=A7g7`e(t9WP1IJ zZvT}NZoT(Hu-9$7t2ZW5N7`O%b5aOsp9b#D{E}dGM|0U()TOe?8%!7Oj@U`dGhxE E0^B%@EdT%j literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Summary.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Summary.db new file mode 100644 index 0000000000000000000000000000000000000000..4547a9451a6c383b420c1e8b235d7444d1e8c901 GIT binary patch literal 56 kcmZQzU}#`qU|