Compare commits
15 Commits
next
...
scylla-3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9dd714ae64 | ||
|
|
3980570520 | ||
|
|
9889e553e6 | ||
|
|
3e0b09faa1 | ||
|
|
bc4106ff45 | ||
|
|
df3563c1ae | ||
|
|
1c89961c4f | ||
|
|
85b1a45252 | ||
|
|
6a847e2242 | ||
|
|
10cf0e0d91 | ||
|
|
8c1474c039 | ||
|
|
bb5e9527bb | ||
|
|
4dae72b2cd | ||
|
|
1e444a3dd5 | ||
|
|
76906d6134 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=3.2.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1241,6 +1241,34 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
|
||||
}
|
||||
}
|
||||
|
||||
/// \brief Helper for ensuring a file is closed if an exception is thrown.
|
||||
///
|
||||
/// The file provided by the file_fut future is passed to func.
|
||||
/// * If func throws an exception E, the file is closed and we return
|
||||
/// a failed future with E.
|
||||
/// * If func returns a value V, the file is not closed and we return
|
||||
/// a future with V.
|
||||
/// Note that when an exception is not thrown, it is the
|
||||
/// responsibility of func to make sure the file will be closed. It
|
||||
/// can close the file itself, return it, or store it somewhere.
|
||||
///
|
||||
/// \tparam Func The type of function this wraps
|
||||
/// \param file_fut A future that produces a file
|
||||
/// \param func A function that uses a file
|
||||
/// \return A future that passes the file produced by file_fut to func
|
||||
/// and closes it if func fails
|
||||
template <typename Func>
|
||||
static auto close_on_failure(future<file> file_fut, Func func) {
|
||||
return file_fut.then([func = std::move(func)](file f) {
|
||||
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
|
||||
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
|
||||
using futurator = futurize<std::result_of_t<Func(file)>>;
|
||||
return futurator::make_exception_future(e);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(const descriptor& d, sstring filename, open_flags flags, bool active) {
|
||||
file_open_options opt;
|
||||
opt.extent_allocation_size_hint = max_size;
|
||||
@@ -1258,7 +1286,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
return fut;
|
||||
});
|
||||
|
||||
return fut.then([this, d, active, filename, flags](file f) {
|
||||
return close_on_failure(std::move(fut), [this, d, active, filename, flags] (file f) {
|
||||
f = make_checked_file(commit_error_handler, f);
|
||||
// xfs doesn't like files extended betond eof, so enlarge the file
|
||||
auto fut = make_ready_future<>();
|
||||
|
||||
@@ -276,7 +276,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
}
|
||||
|
||||
auto shard = _db.local().shard_of(fm);
|
||||
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) -> future<> {
|
||||
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) mutable -> future<> {
|
||||
auto& fm = cer.mutation();
|
||||
// TODO: might need better verification that the deserialized mutation
|
||||
// is schema compatible. My guess is that just applying the mutation
|
||||
@@ -306,7 +306,9 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
} else {
|
||||
return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout);
|
||||
return do_with(std::move(cer).mutation(), [&](const frozen_mutation& m) {
|
||||
return db.apply_in_memory(m, cf.schema(), db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
}
|
||||
}).then_wrapped([s] (future<> f) {
|
||||
try {
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -5,7 +5,7 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-3.2/latest/scylla.repo
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
2
dist/redhat/scylla.spec.mustache
vendored
2
dist/redhat/scylla.spec.mustache
vendored
@@ -15,6 +15,8 @@ Obsoletes: scylla-server < 1.1
|
||||
%global __brp_python_bytecompile %{nil}
|
||||
%global __brp_mangle_shebangs %{nil}
|
||||
|
||||
%undefine _find_debuginfo_dwz_opts
|
||||
|
||||
%description
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
|
||||
@@ -1876,7 +1876,7 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now,
|
||||
_timestamp = api::missing_timestamp;
|
||||
return false;
|
||||
}
|
||||
if (_ttl > no_ttl && _expiry < now) {
|
||||
if (_ttl > no_ttl && _expiry <= now) {
|
||||
_expiry -= _ttl;
|
||||
_ttl = dead;
|
||||
}
|
||||
|
||||
@@ -679,7 +679,7 @@ public:
|
||||
if (is_missing() || _ttl == dead) {
|
||||
return false;
|
||||
}
|
||||
if (_ttl != no_ttl && _expiry < now) {
|
||||
if (_ttl != no_ttl && _expiry <= now) {
|
||||
return false;
|
||||
}
|
||||
return _timestamp > t.timestamp;
|
||||
@@ -689,7 +689,7 @@ public:
|
||||
if (_ttl == dead) {
|
||||
return true;
|
||||
}
|
||||
return _ttl != no_ttl && _expiry < now;
|
||||
return _ttl != no_ttl && _expiry <= now;
|
||||
}
|
||||
// Can be called only when is_live().
|
||||
bool is_expiring() const {
|
||||
|
||||
@@ -1352,7 +1352,9 @@ public:
|
||||
auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source);
|
||||
auto sink_op = get_full_row_hashes_sink_op(sink);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
}).then([current_hashes] () mutable {
|
||||
}).then([this, current_hashes] () mutable {
|
||||
stats().rx_hashes_nr += current_hashes->size();
|
||||
_metrics.rx_hashes_nr += current_hashes->size();
|
||||
return std::move(*current_hashes);
|
||||
});
|
||||
}
|
||||
@@ -1763,6 +1765,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
|
||||
}
|
||||
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
|
||||
_metrics.rx_hashes_nr += current_set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(current_set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
@@ -2067,6 +2070,7 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
_metrics.rx_hashes_nr += set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
|
||||
@@ -931,7 +931,6 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
});
|
||||
|
||||
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
|
||||
coroutine update;
|
||||
size_t size_entry;
|
||||
// In case updater fails, we must bring the cache to consistency without deferring.
|
||||
auto cleanup = defer([&m, this] {
|
||||
@@ -939,6 +938,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
_prev_snapshot_pos = {};
|
||||
_prev_snapshot = {};
|
||||
});
|
||||
coroutine update; // Destroy before cleanup to release snapshots before invalidating.
|
||||
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
|
||||
while (!m.partitions.empty()) {
|
||||
with_allocator(_tracker.allocator(), [&] () {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 6f0ef32514...8837a3fdf1
@@ -2699,7 +2699,7 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
|
||||
static std::regex la_mc("(la|mc)-(\\d+)-(\\w+)-(.*)");
|
||||
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
|
||||
|
||||
static std::regex dir(".*/([^/]*)/(\\w+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
|
||||
static std::regex dir(".*/([^/]*)/([^/]+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
|
||||
|
||||
std::smatch match;
|
||||
|
||||
|
||||
6
table.cc
6
table.cc
@@ -292,7 +292,7 @@ create_single_key_sstable_reader(column_family* cf,
|
||||
filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice)
|
||||
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
|
||||
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
||||
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, std::move(trace_state), fwd);
|
||||
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, trace_state, fwd);
|
||||
})
|
||||
);
|
||||
if (readers.empty()) {
|
||||
@@ -315,7 +315,7 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
{
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
@@ -587,7 +587,7 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
flat_mutation_reader reader = sst->read_range_rows_flat(s, pr, slice, pc,
|
||||
resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
|
||||
resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
if (sst->is_shared()) {
|
||||
using sig = bool (&)(const dht::decorated_key&);
|
||||
reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard));
|
||||
|
||||
8
test.py
8
test.py
@@ -265,7 +265,7 @@ if __name__ == "__main__":
|
||||
env['UBSAN_OPTIONS'] = 'print_stacktrace=1'
|
||||
env['BOOST_TEST_CATCH_SYSTEM_ERRORS'] = 'no'
|
||||
|
||||
def run_test(path, type, exec_args):
|
||||
def run_test(path, repeat, type, exec_args):
|
||||
boost_args = []
|
||||
# avoid modifying in-place, it will change test_to_run
|
||||
exec_args = exec_args + '--collectd 0'.split()
|
||||
@@ -274,7 +274,7 @@ if __name__ == "__main__":
|
||||
mode = 'release'
|
||||
if path.startswith(os.path.join('build', 'debug')):
|
||||
mode = 'debug'
|
||||
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + ".boost.xml")
|
||||
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + "." + str(repeat) + ".boost.xml")
|
||||
boost_args += ['--report_level=no', '--logger=HRF,test_suite:XML,test_suite,' + xmlout]
|
||||
if type == 'boost':
|
||||
boost_args += ['--']
|
||||
@@ -312,8 +312,8 @@ if __name__ == "__main__":
|
||||
path = test[0]
|
||||
test_type = test[1]
|
||||
exec_args = test[2] if len(test) >= 3 else []
|
||||
for _ in range(args.repeat):
|
||||
futures.append(executor.submit(run_test, path, test_type, exec_args))
|
||||
for repeat in range(args.repeat):
|
||||
futures.append(executor.submit(run_test, path, repeat, test_type, exec_args))
|
||||
|
||||
results = []
|
||||
cookie = len(futures)
|
||||
|
||||
@@ -1320,6 +1320,104 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) {
|
||||
assert_that(m).is_equal_to(m2);
|
||||
}
|
||||
|
||||
// This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some
|
||||
// duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect
|
||||
// to TTL.
|
||||
SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) {
|
||||
can_gc_fn never_gc = [] (tombstone) { return false; };
|
||||
|
||||
auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_alive({}, {})", mark, t));
|
||||
BOOST_REQUIRE(mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t));
|
||||
BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_dead({}, {})", mark, t));
|
||||
BOOST_REQUIRE(!mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t));
|
||||
BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
const auto timestamp = api::timestamp_type(1);
|
||||
const auto t0 = gc_clock::now();
|
||||
const auto t1 = t0 + 1s;
|
||||
const auto t2 = t0 + 2s;
|
||||
const auto t3 = t0 + 3s;
|
||||
|
||||
// Without timestamp the marker is missing (doesn't exist)
|
||||
const row_marker m1;
|
||||
must_be_dead(m1, t0);
|
||||
must_be_dead(m1, t1);
|
||||
must_be_dead(m1, t2);
|
||||
must_be_dead(m1, t3);
|
||||
|
||||
// With timestamp and without ttl, a row_marker is always alive
|
||||
const row_marker m2(timestamp);
|
||||
must_be_alive(m2, t0);
|
||||
must_be_alive(m2, t1);
|
||||
must_be_alive(m2, t2);
|
||||
must_be_alive(m2, t3);
|
||||
|
||||
// A row_marker becomes dead exactly at the moment of expiry
|
||||
// Reproduces #4263, #5290
|
||||
const auto ttl = 1s;
|
||||
const row_marker m3(timestamp, ttl, t2);
|
||||
must_be_alive(m3, t0);
|
||||
must_be_alive(m3, t1);
|
||||
must_be_dead(m3, t2);
|
||||
must_be_dead(m3, t3);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("ck", bytes_type, column_kind::clustering_key)
|
||||
.build();
|
||||
|
||||
auto pk = partition_key::from_singular(*s, data_value(bytes("key1")));
|
||||
auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A")));
|
||||
auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B")));
|
||||
auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C")));
|
||||
|
||||
auto ttl = 1s;
|
||||
auto t0 = gc_clock::now();
|
||||
auto t1 = t0 + 1s;
|
||||
auto t2 = t0 + 2s;
|
||||
auto t3 = t0 + 3s;
|
||||
|
||||
auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.without_partition_key_columns()
|
||||
.build();
|
||||
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
|
||||
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
|
||||
};
|
||||
|
||||
mutation m(s, pk);
|
||||
m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1));
|
||||
m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2));
|
||||
m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3));
|
||||
|
||||
assert_that(results_at_time(m, t0))
|
||||
.has_size(3)
|
||||
.has(a_row().with_column("ck", data_value(bytes("A"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t1))
|
||||
.has_size(2)
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t2))
|
||||
.has_size(1)
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t3)).is_empty();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_querying_expired_cells) {
|
||||
return seastar::async([] {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
|
||||
Reference in New Issue
Block a user