Merge "Support reading deleted cells" from Piotr

"
Implement and test support for reading deleted cells in SSTables 3.
"

* 'haaawk/sstables3/read-deleted-cells-v2' of ssh://github.com/scylladb/seastar-dev:
  sstables: Test reading deleted cells from SST3
  sstables: Support deleted cells in reading SST3
  test_uncompressed_compound_ck_read: fix comment
  utils: add observer/observable templates
This commit is contained in:
Avi Kivity
2018-07-10 11:21:00 +03:00
14 changed files with 305 additions and 12 deletions

View File

@@ -304,6 +304,7 @@ scylla_tests = [
'tests/partition_data_test',
'tests/reusable_buffer_test',
'tests/multishard_writer_test',
'tests/observable_test',
]
perf_tests = [
@@ -742,6 +743,7 @@ pure_boost_tests = set([
'tests/imr_test',
'tests/partition_data_test',
'tests/reusable_buffer_test',
'tests/observable_test',
])
tests_not_using_seastar_test_framework = set([

View File

@@ -950,7 +950,8 @@ public:
bytes_view value,
api::timestamp_type timestamp,
gc_clock::duration ttl,
gc_clock::time_point local_deletion_time) override {
gc_clock::time_point local_deletion_time,
bool is_deleted) override {
if (!column_id) {
return proceed::yes;
}
@@ -960,15 +961,17 @@ public:
}
if (column_def.is_multi_cell()) {
auto ctype = static_pointer_cast<const collection_type_impl>(column_def.type);
auto ac = make_atomic_cell(*ctype->value_comparator(),
timestamp,
value,
ttl,
local_deletion_time,
atomic_cell::collection_member::yes);
auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time)
: make_atomic_cell(*ctype->value_comparator(),
timestamp,
value,
ttl,
local_deletion_time,
atomic_cell::collection_member::yes);
_cm.cells.emplace_back(to_bytes(cell_path), std::move(ac));
} else {
auto ac = make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time,
auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time)
: make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time,
atomic_cell::collection_member::no);
_cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))});
}

View File

@@ -164,7 +164,8 @@ public:
bytes_view value,
api::timestamp_type timestamp,
gc_clock::duration ttl,
gc_clock::time_point local_deletion_time) = 0;
gc_clock::time_point local_deletion_time,
bool is_deleted) = 0;
virtual proceed consume_complex_column_start(stdx::optional<column_id> column_id,
tombstone tomb) = 0;
@@ -1025,7 +1026,8 @@ public:
to_bytes_view(_column_value),
_column_timestamp,
_column_ttl,
_column_local_deletion_time) == consumer_m::proceed::no) {
_column_local_deletion_time,
_column_flags.is_deleted()) == consumer_m::proceed::no) {
return consumer_m::proceed::no;
}
}

71
tests/observable_test.cc Normal file
View File

@@ -0,0 +1,71 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#define BOOST_TEST_MODULE observable_test
#include <boost/test/unit_test.hpp>
#include "../utils/observable.hh"
using namespace utils;
BOOST_AUTO_TEST_CASE(test_basic_functionality) {
observable<int> pub;
int v1 = 0, v2 = 0;
observer<int> sub1 = pub.observe([&] (int x) { v1 = x; });
observer<int> sub2 = pub.observe([&] (int x) { v2 = x; });
pub(7);
BOOST_REQUIRE_EQUAL(v1, 7);
BOOST_REQUIRE_EQUAL(v2, 7);
sub1.disconnect();
pub(3);
BOOST_REQUIRE_EQUAL(v1, 7);
BOOST_REQUIRE_EQUAL(v2, 3);
sub1 = std::move(sub2);
pub(4);
BOOST_REQUIRE_EQUAL(v1, 7);
BOOST_REQUIRE_EQUAL(v2, 4);
pub = observable<int>();
pub(5);
BOOST_REQUIRE_EQUAL(v1, 7);
BOOST_REQUIRE_EQUAL(v2, 4);
}
BOOST_AUTO_TEST_CASE(test_exceptions) {
observable<> pub;
bool saw1 = false;
observer<> sub1 = pub.observe([&] { saw1 = true; });
observer<> sub2 = pub.observe([&] { throw 2; });
bool saw3 = false;
observer<> sub3 = pub.observe([&] { saw3 = true; });
observer<> sub4 = pub.observe([&] { throw 4; });
bool caught = false;
try {
pub();
} catch (int v) {
BOOST_REQUIRE(saw1);
BOOST_REQUIRE(saw3);
BOOST_REQUIRE(v == 2 || v == 4);
caught = true;
}
BOOST_REQUIRE(caught);
}

View File

@@ -1172,7 +1172,81 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_large_subset_of_columns_dense_read) {
.produces_end_of_stream();
}
// Following tests run on files in tests/sstables/3.x/uncompressed/simple
// Following tests run on files in tests/sstables/3.x/uncompressed/deleted_cells
// They were created using following CQL statements:
//
// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
//
// CREATE TABLE test_ks.test_table ( pk INT, ck INT, val INT, PRIMARY KEY(pk, ck))
// WITH compression = { 'enabled' : false };
//
// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 101, 1001);
// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 102, 1002);
// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 103, 1003);
// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 104, 1004);
// INSERT INTO test_ks.test_table(pk, ck, val) VALUES(1, 105, 1005);
// DELETE val FROM test_ks.test_table WHERE pk = 1 AND ck = 102;
// DELETE val FROM test_ks.test_table WHERE pk = 1 AND ck = 104;
static thread_local const sstring UNCOMPRESSED_DELETED_CELLS_PATH = "tests/sstables/3.x/uncompressed/deleted_cells";
static thread_local const schema_ptr UNCOMPRESSED_DELETED_CELLS_SCHEMA =
schema_builder("test_ks", "test_table")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("val", int32_type)
.build();
SEASTAR_THREAD_TEST_CASE(test_uncompressed_deleted_cells_read) {
sstable_assertions sst(UNCOMPRESSED_DELETED_CELLS_SCHEMA, UNCOMPRESSED_DELETED_CELLS_PATH);
sst.load();
auto to_key = [] (int key) {
auto bytes = int32_type->decompose(int32_t(key));
auto pk = partition_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, bytes);
return dht::global_partitioner().decorate_key(*UNCOMPRESSED_DELETED_CELLS_SCHEMA, pk);
};
auto int_cdef =
UNCOMPRESSED_DELETED_CELLS_SCHEMA->get_column_definition(to_bytes("val"));
BOOST_REQUIRE(int_cdef);
auto generate = [&] (uint64_t timestamp, uint64_t deletion_time) {
std::vector<flat_reader_assertions::assert_function> assertions;
assertions.push_back([timestamp, deletion_time] (const column_definition& def,
const atomic_cell_or_collection* cell) {
auto c = cell->as_atomic_cell(def);
BOOST_REQUIRE(!c.is_live());
BOOST_REQUIRE_EQUAL(timestamp, c.timestamp());
BOOST_REQUIRE_EQUAL(deletion_time, c.deletion_time().time_since_epoch().count());
});
return assertions;
};
std::vector<column_id> ids{int_cdef->id};
assert_that(sst.read_rows_flat())
.produces_partition_start(to_key(1))
.produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA,
int32_type->decompose(101)),
{{int_cdef, int32_type->decompose(1001)}})
.produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA,
int32_type->decompose(102)),
ids, generate(1529586065552460, 1529586065))
.produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA,
int32_type->decompose(103)),
{{int_cdef, int32_type->decompose(1003)}})
.produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA,
int32_type->decompose(104)),
ids, generate(1529586067568210, 1529586067))
.produces_row(clustering_key::from_single_value(*UNCOMPRESSED_DELETED_CELLS_SCHEMA,
int32_type->decompose(105)),
{{int_cdef, int32_type->decompose(1005)}})
.produces_partition_end()
.produces_end_of_stream();
}
// Following tests run on files in tests/sstables/3.x/uncompressed/compound_ck
// They were created using following CQL statements:
//
// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
@@ -1281,7 +1355,7 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_simple_read) {
.produces_end_of_stream();
}
// Following tests run on files in tests/sstables/3.x/uncompressed/simple
// Following tests run on files in tests/sstables/3.x/uncompressed/compound_ck
// They were created using following CQL statements:
//
// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

View File

@@ -0,0 +1 @@
3061025548

View File

@@ -0,0 +1,8 @@
Index.db
TOC.txt
Statistics.db
Digest.crc32
CRC.db
Data.db
Summary.db
Filter.db

132
utils/observable.hh Normal file
View File

@@ -0,0 +1,132 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <functional>
#include <vector>
#include <boost/range/algorithm/replace.hpp>
#include <boost/range/algorithm/remove.hpp>
namespace utils {
template <typename... Args>
class observable {
public:
class observer;
private:
std::vector<observer*> _observers;
public:
class observer {
friend class observable;
observable* _observable;
std::function<void (Args...)> _callback;
private:
void moved(observer* from) {
if (_observable) {
_observable->moved(from, this);
}
}
public:
observer(observable* o, std::function<void (Args...)> callback) noexcept
: _observable(o), _callback(std::move(callback)) {
}
observer(observer&& o) noexcept
: _observable(std::exchange(o._observable, nullptr))
, _callback(std::move(o._callback)) {
moved(&o);
}
observer& operator=(observer&& o) noexcept {
if (this != &o) {
disconnect();
_observable = std::exchange(o._observable, nullptr);
_callback = std::move(o._callback);
moved(&o);
}
return *this;
}
~observer() {
disconnect();
}
void disconnect() {
if (_observable) {
_observable->destroyed(this);
}
}
};
friend class observer;
private:
void destroyed(observer* dead) {
_observers.erase(boost::remove(_observers, dead), _observers.end());
}
void moved(observer* from, observer* to) {
boost::replace(_observers, from, to);
}
void update_observers(observable* ob) {
for (auto&& c : _observers) {
c->_observable = ob;
}
}
public:
observable() = default;
observable(observable&& o) noexcept
: _observers(std::move(o._observers)) {
update_observers(this);
}
observable& operator=(observable&& o) noexcept {
if (this != &o) {
update_observers(nullptr);
_observers = std::move(o._observers);
update_observers(this);
}
return *this;
}
~observable() {
update_observers(nullptr);
}
// Send args to all connected observers
void operator()(Args... args) const {
std::exception_ptr e;
for (auto&& ob : _observers) {
try {
ob->_callback(args...);
} catch (...) {
if (!e) {
e = std::current_exception();
}
}
}
if (e) {
std::rethrow_exception(std::move(e));
}
}
// Adds an observer to an observable
observer observe(std::function<void (Args...)> callback) {
observer ob(this, std::move(callback));
_observers.push_back(&ob);
return ob;
}
};
template <typename... Args>
using observer = typename observable<Args...>::observer;
}