diff --git a/configure.py b/configure.py index c39756b2d2..aa9b5d63f0 100755 --- a/configure.py +++ b/configure.py @@ -304,6 +304,7 @@ scylla_tests = [ 'tests/partition_data_test', 'tests/reusable_buffer_test', 'tests/multishard_writer_test', + 'tests/observable_test', ] perf_tests = [ @@ -742,6 +743,7 @@ pure_boost_tests = set([ 'tests/imr_test', 'tests/partition_data_test', 'tests/reusable_buffer_test', + 'tests/observable_test', ]) tests_not_using_seastar_test_framework = set([ diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh index b996c9e1a7..882d675723 100644 --- a/sstables/mp_row_consumer.hh +++ b/sstables/mp_row_consumer.hh @@ -950,7 +950,8 @@ public: bytes_view value, api::timestamp_type timestamp, gc_clock::duration ttl, - gc_clock::time_point local_deletion_time) override { + gc_clock::time_point local_deletion_time, + bool is_deleted) override { if (!column_id) { return proceed::yes; } @@ -960,15 +961,17 @@ public: } if (column_def.is_multi_cell()) { auto ctype = static_pointer_cast(column_def.type); - auto ac = make_atomic_cell(*ctype->value_comparator(), - timestamp, - value, - ttl, - local_deletion_time, - atomic_cell::collection_member::yes); + auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time) + : make_atomic_cell(*ctype->value_comparator(), + timestamp, + value, + ttl, + local_deletion_time, + atomic_cell::collection_member::yes); _cm.cells.emplace_back(to_bytes(cell_path), std::move(ac)); } else { - auto ac = make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time, + auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time) + : make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time, atomic_cell::collection_member::no); _cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))}); } diff --git a/sstables/row.hh b/sstables/row.hh index c72c3a4c9f..3f847c2a67 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -164,7 +164,8 @@ public: bytes_view value, api::timestamp_type timestamp, gc_clock::duration ttl, - gc_clock::time_point local_deletion_time) = 0; + gc_clock::time_point local_deletion_time, + bool is_deleted) = 0; virtual proceed consume_complex_column_start(stdx::optional column_id, tombstone tomb) = 0; @@ -1025,7 +1026,8 @@ public: to_bytes_view(_column_value), _column_timestamp, _column_ttl, - _column_local_deletion_time) == consumer_m::proceed::no) { + _column_local_deletion_time, + _column_flags.is_deleted()) == consumer_m::proceed::no) { return consumer_m::proceed::no; } } diff --git a/tests/observable_test.cc b/tests/observable_test.cc new file mode 100644 index 0000000000..ae99caedc3 --- /dev/null +++ b/tests/observable_test.cc @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#define BOOST_TEST_MODULE observable_test + +#include + + +#include "../utils/observable.hh" + +using namespace utils; + +BOOST_AUTO_TEST_CASE(test_basic_functionality) { + observable pub; + int v1 = 0, v2 = 0; + observer sub1 = pub.observe([&] (int x) { v1 = x; }); + observer sub2 = pub.observe([&] (int x) { v2 = x; }); + pub(7); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 7); + sub1.disconnect(); + pub(3); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 3); + sub1 = std::move(sub2); + pub(4); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 4); + pub = observable(); + pub(5); + BOOST_REQUIRE_EQUAL(v1, 7); + BOOST_REQUIRE_EQUAL(v2, 4); +} + +BOOST_AUTO_TEST_CASE(test_exceptions) { + observable<> pub; + bool saw1 = false; + observer<> sub1 = pub.observe([&] { saw1 = true; }); + observer<> sub2 = pub.observe([&] { throw 2; }); + bool saw3 = false; + observer<> sub3 = pub.observe([&] { saw3 = true; }); + observer<> sub4 = pub.observe([&] { throw 4; }); + bool caught = false; + try { + pub(); + } catch (int v) { + BOOST_REQUIRE(saw1); + BOOST_REQUIRE(saw3); + BOOST_REQUIRE(v == 2 || v == 4); + caught = true; + } + BOOST_REQUIRE(caught); +} diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index a72fbccff9..26e553feda 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -1172,7 +1172,81 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_large_subset_of_columns_dense_read) { .produces_end_of_stream(); } -// Following tests run on files in tests/sstables/3.x/uncompressed/simple +// Following tests run on files in tests/sstables/3.x/uncompressed/deleted_cells +// They were created using following CQL statements: +// +// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +// +// CREATE TABLE test_ks.test_table ( pk INT, ck INT, val INT, PRIMARY KEY(pk, ck)) +// WITH compression = { 'enabled' : false }; +// +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 101, 1001); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 102, 1002); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 103, 1003); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 104, 1004); +// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 105, 1005); +// DELETE val FROM test_ks.test_table WHERE pk = 1 AND ck = 102; +// DELETE val FROM test_ks.test_table WHERE pk = 1 AND ck = 104; + +static thread_local const sstring UNCOMPRESSED_DELETED_CELLS_PATH = "tests/sstables/3.x/uncompressed/deleted_cells"; +static thread_local const schema_ptr UNCOMPRESSED_DELETED_CELLS_SCHEMA = + schema_builder("test_ks", "test_table") + .with_column("pk", int32_type, column_kind::partition_key) + .with_column("ck", int32_type, column_kind::clustering_key) + .with_column("val", int32_type) + .build(); + +SEASTAR_THREAD_TEST_CASE(test_uncompressed_deleted_cells_read) { + sstable_assertions sst(UNCOMPRESSED_DELETED_CELLS_SCHEMA, UNCOMPRESSED_DELETED_CELLS_PATH); + sst.load(); + auto to_key = [] (int key) { + auto bytes = int32_type->decompose(int32_t(key)); + auto pk = partition_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, bytes); + return dht::global_partitioner().decorate_key(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, pk); + }; + + auto int_cdef = + UNCOMPRESSED_DELETED_CELLS_SCHEMA->get_column_definition(to_bytes("val")); + BOOST_REQUIRE(int_cdef); + + auto generate = [&] (uint64_t timestamp, uint64_t deletion_time) { + std::vector assertions; + + assertions.push_back([timestamp, deletion_time] (const column_definition& def, + const atomic_cell_or_collection* cell) { + auto c = cell->as_atomic_cell(def); + BOOST_REQUIRE(!c.is_live()); + BOOST_REQUIRE_EQUAL(timestamp, c.timestamp()); + BOOST_REQUIRE_EQUAL(deletion_time, c.deletion_time().time_since_epoch().count()); + }); + + return assertions; + }; + + std::vector ids{int_cdef->id}; + + assert_that(sst.read_rows_flat()) + .produces_partition_start(to_key(1)) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(101)), + {{int_cdef, int32_type->decompose(1001)}}) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(102)), + ids, generate(1529586065552460, 1529586065)) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(103)), + {{int_cdef, int32_type->decompose(1003)}}) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(104)), + ids, generate(1529586067568210, 1529586067)) + .produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, + int32_type->decompose(105)), + {{int_cdef, int32_type->decompose(1005)}}) + .produces_partition_end() + .produces_end_of_stream(); +} + +// Following tests run on files in tests/sstables/3.x/uncompressed/compound_ck // They were created using following CQL statements: // // CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; @@ -1281,7 +1355,7 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_simple_read) { .produces_end_of_stream(); } -// Following tests run on files in tests/sstables/3.x/uncompressed/simple +// Following tests run on files in tests/sstables/3.x/uncompressed/compound_ck // They were created using following CQL statements: // // CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-CRC.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-CRC.db new file mode 100644 index 0000000000..dd2991b943 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-CRC.db differ diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Data.db new file mode 100644 index 0000000000..1f6cddd46a Binary files /dev/null and b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Data.db differ diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Digest.crc32 b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Digest.crc32 new file mode 100644 index 0000000000..f506768c05 --- /dev/null +++ b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Digest.crc32 @@ -0,0 +1 @@ +3061025548 \ No newline at end of file diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Filter.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Filter.db new file mode 100644 index 0000000000..f9c2d6ed9c Binary files /dev/null and b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Filter.db differ diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Index.db new file mode 100644 index 0000000000..b077026fd8 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Index.db differ diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Statistics.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Statistics.db new file mode 100644 index 0000000000..fe199194f5 Binary files /dev/null and b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Statistics.db differ diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Summary.db b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Summary.db new file mode 100644 index 0000000000..4547a9451a Binary files /dev/null and b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-Summary.db differ diff --git a/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-TOC.txt b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-TOC.txt new file mode 100644 index 0000000000..bac44176b0 --- /dev/null +++ b/tests/sstables/3.x/uncompressed/deleted_cells/mc-1-big-TOC.txt @@ -0,0 +1,8 @@ +Index.db +TOC.txt +Statistics.db +Digest.crc32 +CRC.db +Data.db +Summary.db +Filter.db diff --git a/utils/observable.hh b/utils/observable.hh new file mode 100644 index 0000000000..cab1b41702 --- /dev/null +++ b/utils/observable.hh @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +namespace utils { + +template +class observable { +public: + class observer; +private: + std::vector _observers; +public: + class observer { + friend class observable; + observable* _observable; + std::function _callback; + private: + void moved(observer* from) { + if (_observable) { + _observable->moved(from, this); + } + } + public: + observer(observable* o, std::function callback) noexcept + : _observable(o), _callback(std::move(callback)) { + } + observer(observer&& o) noexcept + : _observable(std::exchange(o._observable, nullptr)) + , _callback(std::move(o._callback)) { + moved(&o); + } + observer& operator=(observer&& o) noexcept { + if (this != &o) { + disconnect(); + _observable = std::exchange(o._observable, nullptr); + _callback = std::move(o._callback); + moved(&o); + } + return *this; + } + ~observer() { + disconnect(); + } + void disconnect() { + if (_observable) { + _observable->destroyed(this); + } + } + }; + friend class observer; +private: + void destroyed(observer* dead) { + _observers.erase(boost::remove(_observers, dead), _observers.end()); + } + void moved(observer* from, observer* to) { + boost::replace(_observers, from, to); + } + void update_observers(observable* ob) { + for (auto&& c : _observers) { + c->_observable = ob; + } + } +public: + observable() = default; + observable(observable&& o) noexcept + : _observers(std::move(o._observers)) { + update_observers(this); + } + observable& operator=(observable&& o) noexcept { + if (this != &o) { + update_observers(nullptr); + _observers = std::move(o._observers); + update_observers(this); + } + return *this; + } + ~observable() { + update_observers(nullptr); + } + // Send args to all connected observers + void operator()(Args... args) const { + std::exception_ptr e; + for (auto&& ob : _observers) { + try { + ob->_callback(args...); + } catch (...) { + if (!e) { + e = std::current_exception(); + } + } + } + if (e) { + std::rethrow_exception(std::move(e)); + } + } + // Adds an observer to an observable + observer observe(std::function callback) { + observer ob(this, std::move(callback)); + _observers.push_back(&ob); + return ob; + } +}; + +template +using observer = typename observable::observer; + +}