Compare commits
30 Commits
master
...
scylla-1.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d58af7ded5 | ||
|
|
d9700a2826 | ||
|
|
d2438059a7 | ||
|
|
4098831ebc | ||
|
|
4539b8403a | ||
|
|
558f535fcb | ||
|
|
3d45d0d339 | ||
|
|
affc0d9138 | ||
|
|
3c68504e54 | ||
|
|
e9b26d547d | ||
|
|
8510389188 | ||
|
|
ea61a8b410 | ||
|
|
bd694d845e | ||
|
|
01c01d9ac4 | ||
|
|
ed39e8c235 | ||
|
|
c57835e7b5 | ||
|
|
13baa04056 | ||
|
|
298de37cef | ||
|
|
91e5e50647 | ||
|
|
08b1ff53dd | ||
|
|
0485289741 | ||
|
|
b3504e5482 | ||
|
|
6cdb1256bb | ||
|
|
39b0da51a3 | ||
|
|
0656e66f5f | ||
|
|
185fbb8abc | ||
|
|
4ed3d350cc | ||
|
|
72d4a26c43 | ||
|
|
b582525ad8 | ||
|
|
5ca372e852 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.5.rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -777,7 +777,7 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"path": "/storage_proxy/metrics/read/moving_avrage_histogram",
|
||||
"path": "/storage_proxy/metrics/read/moving_average_histogram",
|
||||
"operations": [
|
||||
{
|
||||
"method": "GET",
|
||||
@@ -792,7 +792,7 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"path": "/storage_proxy/metrics/range/moving_avrage_histogram",
|
||||
"path": "/storage_proxy/metrics/range/moving_average_histogram",
|
||||
"operations": [
|
||||
{
|
||||
"method": "GET",
|
||||
@@ -942,7 +942,7 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"path": "/storage_proxy/metrics/write/moving_avrage_histogram",
|
||||
"path": "/storage_proxy/metrics/write/moving_average_histogram",
|
||||
"operations": [
|
||||
{
|
||||
"method": "GET",
|
||||
|
||||
@@ -194,7 +194,7 @@ void set_cache_service(http_context& ctx, routes& r) {
|
||||
});
|
||||
|
||||
cs::get_row_capacity.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [](const column_family& cf) {
|
||||
return cf.get_row_cache().get_cache_tracker().region().occupancy().used_space();
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
|
||||
@@ -47,11 +47,8 @@
|
||||
const sstring auth::data_resource::ROOT_NAME("data");
|
||||
|
||||
auth::data_resource::data_resource(level l, const sstring& ks, const sstring& cf)
|
||||
: _ks(ks), _cf(cf)
|
||||
: _level(l), _ks(ks), _cf(cf)
|
||||
{
|
||||
if (l != get_level()) {
|
||||
throw std::invalid_argument("level/keyspace/column mismatch");
|
||||
}
|
||||
}
|
||||
|
||||
auth::data_resource::data_resource()
|
||||
@@ -67,14 +64,7 @@ auth::data_resource::data_resource(const sstring& ks, const sstring& cf)
|
||||
{}
|
||||
|
||||
auth::data_resource::level auth::data_resource::get_level() const {
|
||||
if (!_cf.empty()) {
|
||||
assert(!_ks.empty());
|
||||
return level::COLUMN_FAMILY;
|
||||
}
|
||||
if (!_ks.empty()) {
|
||||
return level::KEYSPACE;
|
||||
}
|
||||
return level::ROOT;
|
||||
return _level;
|
||||
}
|
||||
|
||||
auth::data_resource auth::data_resource::from_name(
|
||||
|
||||
@@ -56,6 +56,7 @@ private:
|
||||
|
||||
static const sstring ROOT_NAME;
|
||||
|
||||
level _level;
|
||||
sstring _ks;
|
||||
sstring _cf;
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
*/
|
||||
|
||||
#include <unordered_map>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include "permission.hh"
|
||||
|
||||
const auth::permission_set auth::permissions::ALL_DATA =
|
||||
@@ -75,7 +76,9 @@ const sstring& auth::permissions::to_string(permission p) {
|
||||
}
|
||||
|
||||
auth::permission auth::permissions::from_string(const sstring& s) {
|
||||
return permission_names.at(s);
|
||||
sstring upper(s);
|
||||
boost::to_upper(upper);
|
||||
return permission_names.at(upper);
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> auth::permissions::to_strings(const permission_set& set) {
|
||||
|
||||
@@ -409,29 +409,6 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
|
||||
# the smaller of 1/4 of heap or 512MB.
|
||||
# file_cache_size_in_mb: 512
|
||||
|
||||
# Total permitted memory to use for memtables. Scylla will stop
|
||||
# accepting writes when the limit is exceeded until a flush completes,
|
||||
# and will trigger a flush based on memtable_cleanup_threshold
|
||||
# If omitted, Scylla will set both to 1/4 the size of the heap.
|
||||
# memtable_heap_space_in_mb: 2048
|
||||
# memtable_offheap_space_in_mb: 2048
|
||||
|
||||
# Ratio of occupied non-flushing memtable size to total permitted size
|
||||
# that will trigger a flush of the largest memtable. Lager mct will
|
||||
# mean larger flushes and hence less compaction, but also less concurrent
|
||||
# flush activity which can make it difficult to keep your disks fed
|
||||
# under heavy write load.
|
||||
#
|
||||
# memtable_cleanup_threshold defaults to 1 / (memtable_flush_writers + 1)
|
||||
# memtable_cleanup_threshold: 0.11
|
||||
|
||||
# Specify the way Scylla allocates and manages memtable memory.
|
||||
# Options are:
|
||||
# heap_buffers: on heap nio buffers
|
||||
# offheap_buffers: off heap (direct) nio buffers
|
||||
# offheap_objects: native memory, eliminating nio buffer heap overhead
|
||||
# memtable_allocation_type: heap_buffers
|
||||
|
||||
# Total space to use for commitlogs.
|
||||
#
|
||||
# If space gets above this value (it will round up to the next nearest
|
||||
@@ -443,17 +420,6 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
|
||||
# available for Scylla.
|
||||
commitlog_total_space_in_mb: -1
|
||||
|
||||
# This sets the amount of memtable flush writer threads. These will
|
||||
# be blocked by disk io, and each one will hold a memtable in memory
|
||||
# while blocked.
|
||||
#
|
||||
# memtable_flush_writers defaults to the smaller of (number of disks,
|
||||
# number of cores), with a minimum of 2 and a maximum of 8.
|
||||
#
|
||||
# If your data directories are backed by SSD, you should increase this
|
||||
# to the number of cores.
|
||||
#memtable_flush_writers: 8
|
||||
|
||||
# A fixed memory pool size in MB for for SSTable index summaries. If left
|
||||
# empty, this will default to 5% of the heap size. If the memory usage of
|
||||
# all index summaries exceeds this limit, SSTables with low read rates will
|
||||
|
||||
@@ -221,6 +221,7 @@ scylla_tests = [
|
||||
'tests/database_test',
|
||||
'tests/nonwrapping_range_test',
|
||||
'tests/input_stream_test',
|
||||
'tests/sstable_atomic_deletion_test',
|
||||
]
|
||||
|
||||
apps = [
|
||||
@@ -307,6 +308,7 @@ scylla_core = (['database.cc',
|
||||
'sstables/compaction.cc',
|
||||
'sstables/compaction_strategy.cc',
|
||||
'sstables/compaction_manager.cc',
|
||||
'sstables/atomic_deletion.cc',
|
||||
'transport/event.cc',
|
||||
'transport/event_notifier.cc',
|
||||
'transport/server.cc',
|
||||
|
||||
@@ -232,7 +232,7 @@ uint32_t selection::add_column_for_ordering(const column_definition& c) {
|
||||
raw_selector::to_selectables(raw_selectors, schema), db, schema, defs);
|
||||
|
||||
auto metadata = collect_metadata(schema, raw_selectors, *factories);
|
||||
if (processes_selection(raw_selectors)) {
|
||||
if (processes_selection(raw_selectors) || raw_selectors.size() != defs.size()) {
|
||||
return ::make_shared<selection_with_processing>(schema, std::move(defs), std::move(metadata), std::move(factories));
|
||||
} else {
|
||||
return ::make_shared<simple_selection>(schema, std::move(defs), std::move(metadata), false);
|
||||
|
||||
222
database.cc
222
database.cc
@@ -97,28 +97,28 @@ lw_shared_ptr<memtable_list>
|
||||
column_family::make_memory_only_memtable_list() {
|
||||
auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); };
|
||||
auto get_schema = [this] { return schema(); };
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager);
|
||||
}
|
||||
|
||||
lw_shared_ptr<memtable_list>
|
||||
column_family::make_memtable_list() {
|
||||
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); };
|
||||
auto get_schema = [this] { return schema(); };
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager);
|
||||
}
|
||||
|
||||
lw_shared_ptr<memtable_list>
|
||||
column_family::make_streaming_memtable_list() {
|
||||
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); };
|
||||
auto get_schema = [this] { return schema(); };
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager);
|
||||
}
|
||||
|
||||
lw_shared_ptr<memtable_list>
|
||||
column_family::make_streaming_memtable_big_list(streaming_memtable_big& smb) {
|
||||
auto seal = [this, &smb] (memtable_list::flush_behavior) { return seal_active_streaming_memtable_big(smb); };
|
||||
auto get_schema = [this] { return schema(); };
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager);
|
||||
}
|
||||
|
||||
column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager)
|
||||
@@ -912,10 +912,6 @@ column_family::seal_active_streaming_memtable_delayed() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
if (_streaming_memtables->should_flush()) {
|
||||
return seal_active_streaming_memtable_immediate();
|
||||
}
|
||||
|
||||
if (!_delayed_streaming_flush.armed()) {
|
||||
// We don't want to wait for too long, because the incoming mutations will not be available
|
||||
// until we flush them to SSTables. On top of that, if the sender ran out of messages, it won't
|
||||
@@ -946,8 +942,7 @@ column_family::seal_active_streaming_memtable_immediate() {
|
||||
auto current_waiters = std::exchange(_waiting_streaming_flushes, shared_promise<>());
|
||||
auto f = current_waiters.get_shared_future(); // for this seal
|
||||
|
||||
_config.streaming_dirty_memory_manager->serialize_flush([this, old] {
|
||||
return with_lock(_sstables_lock.for_read(), [this, old] {
|
||||
with_lock(_sstables_lock.for_read(), [this, old] {
|
||||
auto newtab = make_lw_shared<sstables::sstable>(_schema,
|
||||
_config.datadir, calculate_generation_for_new_table(),
|
||||
sstables::sstable::version_types::ka,
|
||||
@@ -980,7 +975,6 @@ column_family::seal_active_streaming_memtable_immediate() {
|
||||
});
|
||||
// We will also not have any retry logic. If we fail here, we'll fail the streaming and let
|
||||
// the upper layers know. They can then apply any logic they want here.
|
||||
});
|
||||
}).then_wrapped([this, current_waiters = std::move(current_waiters)] (future <> f) mutable {
|
||||
if (f.failed()) {
|
||||
current_waiters.set_exception(f.get_exception());
|
||||
@@ -1044,12 +1038,10 @@ column_family::seal_active_memtable(memtable_list::flush_behavior ignored) {
|
||||
_config.cf_stats->pending_memtables_flushes_count++;
|
||||
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
|
||||
|
||||
return _config.dirty_memory_manager->serialize_flush([this, old] {
|
||||
return repeat([this, old] {
|
||||
return with_lock(_sstables_lock.for_read(), [this, old] {
|
||||
_flush_queue->check_open_gate();
|
||||
return try_flush_memtable_to_sstable(old);
|
||||
});
|
||||
return repeat([this, old] {
|
||||
return with_lock(_sstables_lock.for_read(), [this, old] {
|
||||
_flush_queue->check_open_gate();
|
||||
return try_flush_memtable_to_sstable(old);
|
||||
});
|
||||
}).then([this, memtable_size] {
|
||||
_config.cf_stats->pending_memtables_flushes_count--;
|
||||
@@ -1131,15 +1123,15 @@ column_family::start() {
|
||||
|
||||
future<>
|
||||
column_family::stop() {
|
||||
_memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
_streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
return _compaction_manager.remove(this).then([this] {
|
||||
// Nest, instead of using when_all, so we don't lose any exceptions.
|
||||
return _flush_queue->close().then([this] {
|
||||
return _streaming_flush_gate.close();
|
||||
return when_all(_memtables->request_flush(), _streaming_memtables->request_flush()).discard_result().finally([this] {
|
||||
return _compaction_manager.remove(this).then([this] {
|
||||
// Nest, instead of using when_all, so we don't lose any exceptions.
|
||||
return _flush_queue->close().then([this] {
|
||||
return _streaming_flush_gate.close();
|
||||
});
|
||||
}).then([this] {
|
||||
return _sstable_deletion_gate.close();
|
||||
});
|
||||
}).then([this] {
|
||||
return _sstable_deletion_gate.close();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1304,7 +1296,17 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
// Second, delete the old sstables. This is done in the background, so we can
|
||||
// consider this compaction completed.
|
||||
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
|
||||
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
|
||||
return sstables::delete_atomically(sstables_to_remove).then_wrapped([this, sstables_to_remove] (future<> f) {
|
||||
std::exception_ptr eptr;
|
||||
try {
|
||||
f.get();
|
||||
} catch(...) {
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
|
||||
// unconditionally remove compacted sstables from _sstables_compacted_but_not_deleted,
|
||||
// or they could stay forever in the set, resulting in deleted files remaining
|
||||
// opened and disk space not being released until shutdown.
|
||||
std::unordered_set<sstables::shared_sstable> s(
|
||||
sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
|
||||
@@ -1312,6 +1314,11 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
});
|
||||
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
|
||||
rebuild_statistics();
|
||||
|
||||
if (eptr) {
|
||||
return make_exception_future<>(eptr);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([] (std::exception_ptr e) {
|
||||
try {
|
||||
std::rethrow_exception(e);
|
||||
@@ -1644,7 +1651,7 @@ database::database(const db::config& cfg)
|
||||
// Note that even if we didn't allow extra memory, we would still want to keep system requests
|
||||
// in a different region group. This is because throttled requests are serviced in FIFO order,
|
||||
// and we don't want system requests to be waiting for a long time behind user requests.
|
||||
, _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20))
|
||||
, _system_dirty_memory_manager(*this, _memtable_total_space / 2 + (10 << 20))
|
||||
// The total space that can be used by memtables is _memtable_total_space, but we will only
|
||||
// allow the region_group to grow to half of that. This is because of virtual_dirty: memtables
|
||||
// can take a long time to flush, and if we are using the maximum amount of memory possible,
|
||||
@@ -2167,8 +2174,6 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
|
||||
cfg.enable_disk_writes = _config.enable_disk_writes;
|
||||
cfg.enable_commitlog = _config.enable_commitlog;
|
||||
cfg.enable_cache = _config.enable_cache;
|
||||
cfg.max_memtable_size = _config.max_memtable_size;
|
||||
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
|
||||
cfg.dirty_memory_manager = _config.dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_config = _config.read_concurrency_config;
|
||||
@@ -2468,7 +2473,6 @@ column_family::apply(const mutation& m, const db::replay_position& rp) {
|
||||
utils::latency_counter lc;
|
||||
_stats.writes.set_latency(lc);
|
||||
_memtables->active_memtable().apply(m, rp);
|
||||
_memtables->seal_on_overflow();
|
||||
_stats.writes.mark(lc);
|
||||
if (lc.is_start()) {
|
||||
_stats.estimated_write.add(lc.latency(), _stats.writes.hist.count);
|
||||
@@ -2481,7 +2485,6 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const
|
||||
_stats.writes.set_latency(lc);
|
||||
check_valid_rp(rp);
|
||||
_memtables->active_memtable().apply(m, m_schema, rp);
|
||||
_memtables->seal_on_overflow();
|
||||
_stats.writes.mark(lc);
|
||||
if (lc.is_start()) {
|
||||
_stats.estimated_write.add(lc.latency(), _stats.writes.hist.count);
|
||||
@@ -2494,7 +2497,6 @@ void column_family::apply_streaming_mutation(schema_ptr m_schema, utils::UUID pl
|
||||
return;
|
||||
}
|
||||
_streaming_memtables->active_memtable().apply(m, m_schema);
|
||||
_streaming_memtables->seal_on_overflow();
|
||||
}
|
||||
|
||||
void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m) {
|
||||
@@ -2505,7 +2507,6 @@ void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUI
|
||||
}
|
||||
auto entry = it->second;
|
||||
entry->memtables->active_memtable().apply(m, m_schema);
|
||||
entry->memtables->seal_on_overflow();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -2517,51 +2518,113 @@ column_family::check_valid_rp(const db::replay_position& rp) const {
|
||||
|
||||
future<> dirty_memory_manager::shutdown() {
|
||||
_db_shutdown_requested = true;
|
||||
return _waiting_flush_gate.close().then([this] {
|
||||
_should_flush.signal();
|
||||
return std::move(_waiting_flush).then([this] {
|
||||
return _region_group.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
void dirty_memory_manager::maybe_do_active_flush() {
|
||||
if (!_db || !under_pressure() || _db_shutdown_requested) {
|
||||
return;
|
||||
future<> memtable_list::request_flush() {
|
||||
if (!_flush_coalescing) {
|
||||
_flush_coalescing = shared_promise<>();
|
||||
return _dirty_memory_manager->get_flush_permit().then([this] (auto permit) {
|
||||
auto current_flush = std::move(*_flush_coalescing);
|
||||
_flush_coalescing = {};
|
||||
return _dirty_memory_manager->flush_one(*this, std::move(permit)).then_wrapped([this, current_flush = std::move(current_flush)] (auto f) mutable {
|
||||
if (f.failed()) {
|
||||
current_flush.set_exception(f.get_exception());
|
||||
} else {
|
||||
current_flush.set_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return _flush_coalescing->get_shared_future();
|
||||
}
|
||||
|
||||
// Flush already ongoing. We don't need to initiate an active flush at this moment.
|
||||
if (_flush_serializer.current() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// There are many criteria that can be used to select what is the best memtable to
|
||||
// flush. Most of the time we want some coordination with the commitlog to allow us to
|
||||
// release commitlog segments as early as we can.
|
||||
//
|
||||
// But during pressure condition, we'll just pick the CF that holds the largest
|
||||
// memtable. The advantage of doing this is that this is objectively the one that will
|
||||
// release the biggest amount of memory and is less likely to be generating tiny
|
||||
// SSTables. The disadvantage is that right now, because we only release memory when the
|
||||
// SSTable is fully written, that may take a bit of time to happen.
|
||||
//
|
||||
// However, since we'll very soon have a mechanism in place to account for the memory
|
||||
// that was already written in one form or another, that disadvantage is mitigated.
|
||||
memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region());
|
||||
auto& biggest_cf = _db->find_column_family(biggest_memtable.schema());
|
||||
memtable_list& mtlist = get_memtable_list(biggest_cf);
|
||||
// Please note that this will eventually take the semaphore and prevent two concurrent flushes.
|
||||
// We don't need any other extra protection.
|
||||
mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
}
|
||||
|
||||
memtable_list& memtable_dirty_memory_manager::get_memtable_list(column_family& cf) {
|
||||
return *(cf._memtables);
|
||||
future<> dirty_memory_manager::flush_one(memtable_list& mtlist, semaphore_units<> permit) {
|
||||
if (mtlist.back()->empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto* region = &(mtlist.back()->region());
|
||||
auto* region_group = mtlist.back()->region_group();
|
||||
auto schema = mtlist.back()->schema();
|
||||
// Because the region groups are hierarchical, when we pick the biggest region creating pressure
|
||||
// (in the memory-driven flush case) we may be picking a memtable that is placed in a region
|
||||
// group below ours. That's totally fine and we can certainly use our semaphore to account for
|
||||
// it, but we need to destroy the semaphore units from the right flush manager.
|
||||
//
|
||||
// If we abandon size-driven flush and go with another flushing scheme that always guarantees
|
||||
// that we're picking from this region_group, we can simplify this.
|
||||
dirty_memory_manager::from_region_group(region_group)._flush_manager.emplace(region, std::move(permit));
|
||||
auto fut = mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
return get_units(_background_work_flush_serializer, 1).then([this, fut = std::move(fut), region, region_group, schema] (auto permit) mutable {
|
||||
return std::move(fut).then_wrapped([this, region, region_group, schema] (auto f) {
|
||||
// There are two cases in which we may still need to remove the permits from here.
|
||||
//
|
||||
// 1) Some exception happenend, and we can't know at which point. It could be that because
|
||||
// of that, the permits are still dangling. We have to remove it.
|
||||
// 2) If we are using a memory-only Column Family. That will never create a memtable
|
||||
// flush object, and we'll never get rid of the permits. So we have to remove it
|
||||
// here.
|
||||
dirty_memory_manager::from_region_group(region_group).remove_from_flush_manager(region);
|
||||
if (f.failed()) {
|
||||
dblog.error("Failed to flush memtable, {}:{}", schema->ks_name(), schema->cf_name());
|
||||
}
|
||||
return std::move(f);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
memtable_list& streaming_dirty_memory_manager::get_memtable_list(column_family& cf) {
|
||||
return *(cf._streaming_memtables);
|
||||
future<> dirty_memory_manager::flush_when_needed() {
|
||||
if (!_db) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// If there are explicit flushes requested, we must wait for them to finish before we stop.
|
||||
return do_until([this] { return _db_shutdown_requested && !_flush_serializer.waiters(); }, [this] {
|
||||
auto has_work = [this] { return _flush_serializer.waiters() || over_soft_limit() || _db_shutdown_requested; };
|
||||
return _should_flush.wait(std::move(has_work)).then([this] {
|
||||
return get_flush_permit().then([this] (auto permit) {
|
||||
// We give priority to explicit flushes. They are mainly user-initiated flushes,
|
||||
// flushes coming from a DROP statement, or commitlog flushes.
|
||||
if (_flush_serializer.waiters()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// condition abated while we waited for the semaphore
|
||||
if (!this->over_soft_limit() || _db_shutdown_requested) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// There are many criteria that can be used to select what is the best memtable to
|
||||
// flush. Most of the time we want some coordination with the commitlog to allow us to
|
||||
// release commitlog segments as early as we can.
|
||||
//
|
||||
// But during pressure condition, we'll just pick the CF that holds the largest
|
||||
// memtable. The advantage of doing this is that this is objectively the one that will
|
||||
// release the biggest amount of memory and is less likely to be generating tiny
|
||||
// SSTables.
|
||||
memtable& biggest_memtable = memtable::from_region(*(this->_region_group.get_largest_region()));
|
||||
auto mtlist = biggest_memtable.get_memtable_list();
|
||||
// Do not wait. The semaphore will protect us against a concurrent flush. But we
|
||||
// want to start a new one as soon as the permits are destroyed and the semaphore is
|
||||
// made ready again, not when we are done with the current one.
|
||||
this->flush_one(*mtlist, std::move(permit));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}).finally([this] {
|
||||
// We'll try to acquire the permit here to make sure we only really stop when there are no
|
||||
// in-flight flushes. Our stop condition checks for the presence of waiters, but it could be
|
||||
// that we have no waiters, but a flush still in flight. We wait for all background work to
|
||||
// stop. When that stops, we know that the foreground work in the _flush_serializer has
|
||||
// stopped as well.
|
||||
return get_units(_background_work_flush_serializer, _max_background_work);
|
||||
});
|
||||
}
|
||||
|
||||
void dirty_memory_manager::start_reclaiming() {
|
||||
maybe_do_active_flush();
|
||||
_should_flush.signal();
|
||||
}
|
||||
|
||||
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) {
|
||||
@@ -2637,10 +2700,6 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
cfg.enable_disk_reads = true; // we allways read from disk
|
||||
cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store();
|
||||
cfg.enable_cache = _cfg->enable_cache();
|
||||
cfg.max_memtable_size = _memtable_total_space * _cfg->memtable_cleanup_threshold();
|
||||
// We should guarantee that at least two memtable are available, otherwise after flush, adding another memtable would
|
||||
// easily take us into throttling until the first one is flushed.
|
||||
cfg.max_streaming_memtable_size = std::min(cfg.max_memtable_size, _streaming_memtable_total_space / 2);
|
||||
|
||||
} else {
|
||||
cfg.datadir = "";
|
||||
@@ -2648,9 +2707,6 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
cfg.enable_disk_reads = false;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = false;
|
||||
cfg.max_memtable_size = std::numeric_limits<size_t>::max();
|
||||
// All writes should go to the main memtable list if we're not durable
|
||||
cfg.max_streaming_memtable_size = 0;
|
||||
}
|
||||
cfg.dirty_memory_manager = &_dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager;
|
||||
@@ -3097,21 +3153,17 @@ future<std::unordered_map<sstring, column_family::snapshot_details>> column_fami
|
||||
future<> column_family::flush() {
|
||||
_stats.pending_flushes++;
|
||||
|
||||
auto fut = _memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
// this rp is either:
|
||||
// a.) Done - no-op
|
||||
// b.) Ours
|
||||
// c.) The last active flush not finished. If our latest memtable is
|
||||
// empty it still makes sense for this api call to wait for this.
|
||||
auto high_rp = _highest_flushed_rp;
|
||||
|
||||
return fut.finally([this, high_rp] {
|
||||
// highest_flushed_rp is only updated when we flush. If the memtable is currently alive, then
|
||||
// the most up2date replay position is the one that's in there now. Otherwise, if the memtable
|
||||
// hasn't received any writes yet, that's the one from the last flush we made.
|
||||
auto desired_rp = _memtables->back()->empty() ? _highest_flushed_rp : _memtables->back()->replay_position();
|
||||
return _memtables->request_flush().finally([this, desired_rp] {
|
||||
_stats.pending_flushes--;
|
||||
// In origin memtable_switch_count is incremented inside
|
||||
// ColumnFamilyMeetrics Flush.run
|
||||
_stats.memtable_switch_count++;
|
||||
// wait for all up until us.
|
||||
return _flush_queue->wait_for_pending(high_rp);
|
||||
return _flush_queue->wait_for_pending(desired_rp);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3128,7 +3180,7 @@ future<> column_family::flush(const db::replay_position& pos) {
|
||||
// We ignore this for now and just say that if we're asked for
|
||||
// a CF and it exists, we pretty much have to have data that needs
|
||||
// flushing. Let's do it.
|
||||
return _memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
return _memtables->request_flush();
|
||||
}
|
||||
|
||||
// FIXME: We can do much better than this in terms of cache management. Right
|
||||
@@ -3169,7 +3221,7 @@ future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
|
||||
}
|
||||
auto entry = it->second;
|
||||
_streaming_memtables_big.erase(it);
|
||||
return entry->memtables->seal_active_memtable(memtable_list::flush_behavior::immediate).then([entry] {
|
||||
return entry->memtables->request_flush().then([entry] {
|
||||
return entry->flush_in_progress.close();
|
||||
}).then([this, entry] {
|
||||
return parallel_for_each(entry->sstables, [this] (auto& sst) {
|
||||
|
||||
100
database.hh
100
database.hh
@@ -119,28 +119,54 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
|
||||
// throttled for a long time. Even when we have virtual dirty, that only provides a rough
|
||||
// estimate, and we can't release requests that early.
|
||||
semaphore _flush_serializer;
|
||||
// We will accept a new flush before another one ends, once it is done with the data write.
|
||||
// That is so we can keep the disk always busy. But there is still some background work that is
|
||||
// left to be done. Mostly, update the caches and seal the auxiliary components of the SSTable.
|
||||
// This semaphore will cap the amount of background work that we have. Note that we're not
|
||||
// overly concerned about memtable memory, because dirty memory will put a limit to that. This
|
||||
// is mostly about dangling continuations. So that doesn't have to be a small number.
|
||||
static constexpr unsigned _max_background_work = 20;
|
||||
semaphore _background_work_flush_serializer = { _max_background_work };
|
||||
condition_variable _should_flush;
|
||||
int64_t _dirty_bytes_released_pre_accounted = 0;
|
||||
|
||||
seastar::gate _waiting_flush_gate;
|
||||
std::vector<shared_memtable> _pending_flushes;
|
||||
void maybe_do_active_flush();
|
||||
future<> flush_when_needed();
|
||||
// We need to start a flush before the current one finishes, otherwise
|
||||
// we'll have a period without significant disk activity when the current
|
||||
// SSTable is being sealed, the caches are being updated, etc. To do that
|
||||
// we need to keep track of who is it that we are flushing this memory from.
|
||||
std::unordered_map<const logalloc::region*, semaphore_units<>> _flush_manager;
|
||||
|
||||
void remove_from_flush_manager(const logalloc::region *region) {
|
||||
// If the flush fails, but the failure happens after we reverted the dirty changes, we
|
||||
// won't find the region here, because it would have been destroyed already. That's
|
||||
// ultimately fine, we just need to check it. If we really want to restrict the new attempt
|
||||
// to run concurrently with a new flush, it can call into the dirty manager to reaquire the
|
||||
// semaphore. Right now we don't bother.
|
||||
auto it = _flush_manager.find(region);
|
||||
if (it != _flush_manager.end()) {
|
||||
_flush_manager.erase(it);
|
||||
}
|
||||
}
|
||||
future<> _waiting_flush;
|
||||
protected:
|
||||
virtual memtable_list& get_memtable_list(column_family& cf) = 0;
|
||||
virtual void start_reclaiming() override;
|
||||
public:
|
||||
future<> shutdown();
|
||||
|
||||
dirty_memory_manager(database* db, size_t threshold)
|
||||
: logalloc::region_group_reclaimer(threshold)
|
||||
: logalloc::region_group_reclaimer(threshold, threshold * 0.4)
|
||||
, _db(db)
|
||||
, _region_group(*this)
|
||||
, _flush_serializer(1) {}
|
||||
, _flush_serializer(1)
|
||||
, _waiting_flush(flush_when_needed()) {}
|
||||
|
||||
dirty_memory_manager(database* db, dirty_memory_manager *parent, size_t threshold)
|
||||
: logalloc::region_group_reclaimer(threshold)
|
||||
: logalloc::region_group_reclaimer(threshold, threshold * 0.4)
|
||||
, _db(db)
|
||||
, _region_group(&parent->_region_group, *this)
|
||||
, _flush_serializer(1) {}
|
||||
, _flush_serializer(1)
|
||||
, _waiting_flush(flush_when_needed()) {}
|
||||
|
||||
static dirty_memory_manager& from_region_group(logalloc::region_group *rg) {
|
||||
return *(boost::intrusive::get_parent_from_member(rg, &dirty_memory_manager::_region_group));
|
||||
@@ -154,12 +180,19 @@ public:
|
||||
return _region_group;
|
||||
}
|
||||
|
||||
void revert_potentially_cleaned_up_memory(int64_t delta) {
|
||||
void revert_potentially_cleaned_up_memory(logalloc::region* from, int64_t delta) {
|
||||
_region_group.update(delta);
|
||||
_dirty_bytes_released_pre_accounted -= delta;
|
||||
// Flushed the current memtable. There is still some work to do, like finish sealing the
|
||||
// SSTable and updating the cache, but we can already allow the next one to start.
|
||||
//
|
||||
// By erasing this memtable from the flush_manager we'll destroy the semaphore_units
|
||||
// associated with this flush and will allow another one to start. We'll signal the
|
||||
// condition variable to let them know we might be ready early.
|
||||
remove_from_flush_manager(from);
|
||||
}
|
||||
|
||||
void account_potentially_cleaned_up_memory(int64_t delta) {
|
||||
void account_potentially_cleaned_up_memory(logalloc::region* from, int64_t delta) {
|
||||
_region_group.update(-delta);
|
||||
_dirty_bytes_released_pre_accounted += delta;
|
||||
}
|
||||
@@ -172,25 +205,18 @@ public:
|
||||
return _region_group.memory_used();
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
future<> serialize_flush(Func&& func) {
|
||||
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
|
||||
return with_semaphore(_flush_serializer, 1, func).finally([this] {
|
||||
maybe_do_active_flush();
|
||||
});
|
||||
});
|
||||
future<> flush_one(memtable_list& cf, semaphore_units<> permit);
|
||||
|
||||
future<semaphore_units<>> get_flush_permit() {
|
||||
return get_units(_flush_serializer, 1);
|
||||
}
|
||||
};
|
||||
|
||||
class streaming_dirty_memory_manager: public dirty_memory_manager {
|
||||
virtual memtable_list& get_memtable_list(column_family& cf) override;
|
||||
public:
|
||||
struct streaming_dirty_memory_manager: public dirty_memory_manager {
|
||||
streaming_dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(&db, parent, threshold) {}
|
||||
};
|
||||
|
||||
class memtable_dirty_memory_manager: public dirty_memory_manager {
|
||||
virtual memtable_list& get_memtable_list(column_family& cf) override;
|
||||
public:
|
||||
struct memtable_dirty_memory_manager: public dirty_memory_manager {
|
||||
memtable_dirty_memory_manager(database& db, dirty_memory_manager* parent, size_t threshold) : dirty_memory_manager(&db, parent, threshold) {}
|
||||
// This constructor will be called for the system tables (no parent). Its flushes are usually drive by us
|
||||
// and not the user, and tend to be small in size. So we'll allow only two slots.
|
||||
@@ -225,14 +251,13 @@ private:
|
||||
std::vector<shared_memtable> _memtables;
|
||||
std::function<future<> (flush_behavior)> _seal_fn;
|
||||
std::function<schema_ptr()> _current_schema;
|
||||
size_t _max_memtable_size;
|
||||
dirty_memory_manager* _dirty_memory_manager;
|
||||
std::experimental::optional<shared_promise<>> _flush_coalescing;
|
||||
public:
|
||||
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, dirty_memory_manager* dirty_memory_manager)
|
||||
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, dirty_memory_manager* dirty_memory_manager)
|
||||
: _memtables({})
|
||||
, _seal_fn(seal_fn)
|
||||
, _current_schema(cs)
|
||||
, _max_memtable_size(max_memtable_size)
|
||||
, _dirty_memory_manager(dirty_memory_manager) {
|
||||
add_memtable();
|
||||
}
|
||||
@@ -281,20 +306,17 @@ public:
|
||||
_memtables.emplace_back(new_memtable());
|
||||
}
|
||||
|
||||
bool should_flush() {
|
||||
return active_memtable().occupancy().total_space() >= _max_memtable_size;
|
||||
}
|
||||
|
||||
void seal_on_overflow() {
|
||||
if (should_flush()) {
|
||||
// FIXME: if sparse, do some in-memory compaction first
|
||||
// FIXME: maybe merge with other in-memory memtables
|
||||
seal_active_memtable(flush_behavior::immediate);
|
||||
}
|
||||
logalloc::region_group& region_group() {
|
||||
return _dirty_memory_manager->region_group();
|
||||
}
|
||||
// This is used for explicit flushes. Will queue the memtable for flushing and proceed when the
|
||||
// dirty_memory_manager allows us to. We will not seal at this time since the flush itself
|
||||
// wouldn't happen anyway. Keeping the memtable in memory will potentially increase the time it
|
||||
// spends in memory allowing for more coalescing opportunities.
|
||||
future<> request_flush();
|
||||
private:
|
||||
lw_shared_ptr<memtable> new_memtable() {
|
||||
return make_lw_shared<memtable>(_current_schema(), &(_dirty_memory_manager->region_group()));
|
||||
return make_lw_shared<memtable>(_current_schema(), this);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -328,8 +350,6 @@ public:
|
||||
bool enable_cache = true;
|
||||
bool enable_commitlog = true;
|
||||
bool enable_incremental_backups = false;
|
||||
size_t max_memtable_size = 5'000'000;
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
@@ -882,8 +902,6 @@ public:
|
||||
bool enable_disk_writes = true;
|
||||
bool enable_cache = true;
|
||||
bool enable_incremental_backups = false;
|
||||
size_t max_memtable_size = 5'000'000;
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
// database.hh
|
||||
class database;
|
||||
class memtable_list;
|
||||
|
||||
// mutation.hh
|
||||
class mutation;
|
||||
|
||||
@@ -288,6 +288,7 @@ public:
|
||||
void flush_segments(bool = false);
|
||||
|
||||
private:
|
||||
size_t max_request_controller_units() const;
|
||||
segment_id_type _ids = 0;
|
||||
std::vector<sseg_ptr> _segments;
|
||||
std::deque<sseg_ptr> _reserve_segments;
|
||||
@@ -911,7 +912,7 @@ db::commitlog::segment_manager::segment_manager(config c)
|
||||
// an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger
|
||||
// than default_size at the end of the allocation, that allows for every valid mutation to
|
||||
// always be admitted for processing.
|
||||
, _request_controller(max_mutation_size + db::commitlog::segment::default_size)
|
||||
, _request_controller(max_request_controller_units())
|
||||
{
|
||||
assert(max_size > 0);
|
||||
|
||||
@@ -922,6 +923,10 @@ db::commitlog::segment_manager::segment_manager(config c)
|
||||
_regs = create_counters();
|
||||
}
|
||||
|
||||
size_t db::commitlog::segment_manager::max_request_controller_units() const {
|
||||
return max_mutation_size + db::commitlog::segment::default_size;
|
||||
}
|
||||
|
||||
future<std::vector<db::commitlog::descriptor>>
|
||||
db::commitlog::segment_manager::list_descriptors(sstring dirname) {
|
||||
struct helper {
|
||||
@@ -1233,11 +1238,14 @@ future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) {
|
||||
|
||||
future<> db::commitlog::segment_manager::shutdown() {
|
||||
if (!_shutdown) {
|
||||
_shutdown = true; // no re-arm, no create new segments.
|
||||
_timer.cancel(); // no more timer calls
|
||||
// Now first wait for periodic task to finish, then sync and close all
|
||||
// segments, flushing out any remaining data.
|
||||
return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true));
|
||||
// Wait for all pending requests to finish
|
||||
return get_units(_request_controller, max_request_controller_units()).then([this] (auto permits) {
|
||||
_timer.cancel(); // no more timer calls
|
||||
_shutdown = true; // no re-arm, no create new segments.
|
||||
// Now first wait for periodic task to finish, then sync and close all
|
||||
// segments, flushing out any remaining data.
|
||||
return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true));
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -334,7 +334,7 @@ public:
|
||||
"\toffheap_buffers Off heap (direct) NIO buffers.\n" \
|
||||
"\toffheap_objects Native memory, eliminating NIO buffer heap overhead." \
|
||||
) \
|
||||
val(memtable_cleanup_threshold, double, .11, Used, \
|
||||
val(memtable_cleanup_threshold, double, .11, Invalid, \
|
||||
"Ratio of occupied non-flushing memtable size to total permitted size for triggering a flush of the largest memtable. Larger values mean larger flushes and less compaction, but also less concurrent flush activity, which can make it difficult to keep your disks saturated under heavy write load." \
|
||||
) \
|
||||
val(file_cache_size_in_mb, uint32_t, 512, Unused, \
|
||||
|
||||
4
main.cc
4
main.cc
@@ -184,8 +184,8 @@ public:
|
||||
throw;
|
||||
}
|
||||
});
|
||||
} catch (std::system_error& e) {
|
||||
startlog.error("Directory '{}' not found. Tried to created it but failed: {}", path, e.what());
|
||||
} catch (...) {
|
||||
startlog.error("Directory '{}' cannot be initialized. Tried to do it but failed with: {}", path, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
|
||||
14
memtable.cc
14
memtable.cc
@@ -26,8 +26,16 @@
|
||||
|
||||
namespace stdx = std::experimental;
|
||||
|
||||
memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group)
|
||||
memtable::memtable(schema_ptr schema, memtable_list* memtable_list)
|
||||
: logalloc::region(memtable_list ? logalloc::region(memtable_list->region_group()) : logalloc::region())
|
||||
, _memtable_list(memtable_list)
|
||||
, _schema(std::move(schema))
|
||||
, partitions(memtable_entry::compare(_schema)) {
|
||||
}
|
||||
|
||||
memtable::memtable(schema_ptr schema, logalloc::region_group *dirty_memory_region_group)
|
||||
: logalloc::region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region())
|
||||
, _memtable_list(nullptr)
|
||||
, _schema(std::move(schema))
|
||||
, partitions(memtable_entry::compare(_schema)) {
|
||||
}
|
||||
@@ -254,7 +262,7 @@ class flush_memory_accounter {
|
||||
public:
|
||||
void update_bytes_read(uint64_t delta) {
|
||||
_bytes_read += delta;
|
||||
dirty_memory_manager::from_region_group(_region.group()).account_potentially_cleaned_up_memory(delta);
|
||||
dirty_memory_manager::from_region_group(_region.group()).account_potentially_cleaned_up_memory(&_region, delta);
|
||||
}
|
||||
|
||||
explicit flush_memory_accounter(logalloc::region& region)
|
||||
@@ -263,7 +271,7 @@ public:
|
||||
|
||||
~flush_memory_accounter() {
|
||||
assert(_bytes_read <= _region.occupancy().used_space());
|
||||
dirty_memory_manager::from_region_group(_region.group()).revert_potentially_cleaned_up_memory(_bytes_read);
|
||||
dirty_memory_manager::from_region_group(_region.group()).revert_potentially_cleaned_up_memory(&_region, _bytes_read);
|
||||
}
|
||||
void account_component(memtable_entry& e) {
|
||||
auto delta = _region.allocator().object_memory_size_in_allocator(&e)
|
||||
|
||||
13
memtable.hh
13
memtable.hh
@@ -101,6 +101,7 @@ public:
|
||||
bi::member_hook<memtable_entry, bi::set_member_hook<>, &memtable_entry::_link>,
|
||||
bi::compare<memtable_entry::compare>>;
|
||||
private:
|
||||
memtable_list *_memtable_list;
|
||||
schema_ptr _schema;
|
||||
logalloc::allocating_section _read_section;
|
||||
logalloc::allocating_section _allocating_section;
|
||||
@@ -116,7 +117,9 @@ private:
|
||||
partition_entry& find_or_create_partition_slow(partition_key_view key);
|
||||
void upgrade_entry(memtable_entry&);
|
||||
public:
|
||||
explicit memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group = nullptr);
|
||||
explicit memtable(schema_ptr schema, memtable_list *memtable_list);
|
||||
// Used for testing that want to control the flush process.
|
||||
explicit memtable(schema_ptr schema, logalloc::region_group *dirty_memrory_region= nullptr);
|
||||
~memtable();
|
||||
schema_ptr schema() const { return _schema; }
|
||||
void set_schema(schema_ptr) noexcept;
|
||||
@@ -134,7 +137,15 @@ public:
|
||||
const logalloc::region& region() const {
|
||||
return *this;
|
||||
}
|
||||
|
||||
logalloc::region_group* region_group() {
|
||||
return group();
|
||||
}
|
||||
public:
|
||||
memtable_list* get_memtable_list() {
|
||||
return _memtable_list;
|
||||
}
|
||||
|
||||
size_t partition_count() const;
|
||||
logalloc::occupancy_stats occupancy() const;
|
||||
|
||||
|
||||
@@ -474,9 +474,9 @@ public:
|
||||
try {
|
||||
_read_section(_lsa_region, [this] {
|
||||
_snapshot->merge_partition_versions();
|
||||
_snapshot = {};
|
||||
});
|
||||
} catch (...) { }
|
||||
_snapshot = {};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
10
row_cache.cc
10
row_cache.cc
@@ -74,8 +74,7 @@ cache_tracker::cache_tracker() {
|
||||
}
|
||||
cache_entry& ce = _lru.back();
|
||||
auto it = row_cache::partitions_type::s_iterator_to(ce);
|
||||
--it;
|
||||
clear_continuity(*it);
|
||||
clear_continuity(*std::next(it));
|
||||
_lru.pop_back_and_dispose(current_deleter<cache_entry>());
|
||||
--_partitions;
|
||||
++_evictions;
|
||||
@@ -365,6 +364,7 @@ public:
|
||||
++_it;
|
||||
_last = ce.key();
|
||||
_cache.upgrade_entry(ce);
|
||||
_cache._tracker.touch(ce);
|
||||
_cache.on_hit();
|
||||
cache_data cd { { }, ce.continuous() };
|
||||
if (ce.wide_partition()) {
|
||||
@@ -546,7 +546,6 @@ private:
|
||||
if (!_first_element) {
|
||||
return false;
|
||||
}
|
||||
_first_element = false;
|
||||
return _pr.start() && _pr.start()->is_inclusive() && _pr.start()->value().equal(*_schema, dk);
|
||||
}
|
||||
|
||||
@@ -554,6 +553,7 @@ private:
|
||||
return _primary_reader().then([this] (just_cache_scanning_reader::cache_data cd) {
|
||||
auto& smopt = cd.mut;
|
||||
if (cd.continuous || (smopt && is_inclusive_start_bound(smopt->decorated_key()))) {
|
||||
_first_element = false;
|
||||
update_last_key(smopt);
|
||||
return make_ready_future<streamed_mutation_opt>(std::move(smopt));
|
||||
} else {
|
||||
@@ -720,7 +720,9 @@ void row_cache::do_find_or_create_entry(const dht::decorated_key& key,
|
||||
return;
|
||||
}
|
||||
|
||||
if ((!previous->_key && i == _partitions.begin()) || (previous->_key && std::prev(i)->key().equal(*_schema, *previous->_key))) {
|
||||
if ((!previous->_key && i == _partitions.begin())
|
||||
|| (previous->_key && i != _partitions.begin()
|
||||
&& std::prev(i)->key().equal(*_schema, *previous->_key))) {
|
||||
i->set_continuous(true);
|
||||
}
|
||||
});
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 69acec1788...25137c2846
@@ -54,7 +54,7 @@ public:
|
||||
const cql3::query_options& options,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
std::vector<query::partition_range> ranges)
|
||||
: _has_clustering_keys(s->clustering_key_size() > 0)
|
||||
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
|
||||
, _max(cmd->row_limit)
|
||||
, _schema(std::move(s))
|
||||
, _selection(selection)
|
||||
@@ -65,6 +65,11 @@ public:
|
||||
{}
|
||||
|
||||
private:
|
||||
static bool has_clustering_keys(const schema& s, const query::read_command& cmd) {
|
||||
return s.clustering_key_size() > 0
|
||||
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
|
||||
}
|
||||
|
||||
future<> fetch_page(cql3::selection::result_set_builder& builder, uint32_t page_size, db_clock::time_point now) override {
|
||||
auto state = _options.get_paging_state();
|
||||
|
||||
|
||||
@@ -2643,12 +2643,19 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
}
|
||||
}
|
||||
|
||||
// estimate_result_rows_per_range() is currently broken, and this is not needed
|
||||
// when paging is available in any case
|
||||
#if 0
|
||||
// our estimate of how many result rows there will be per-range
|
||||
float result_rows_per_range = estimate_result_rows_per_range(cmd, ks);
|
||||
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
|
||||
// fetch enough rows in the first round
|
||||
result_rows_per_range -= result_rows_per_range * CONCURRENT_SUBREQUESTS_MARGIN;
|
||||
int concurrency_factor = result_rows_per_range == 0.0 ? 1 : std::max(1, std::min(int(ranges.size()), int(std::ceil(cmd->row_limit / result_rows_per_range))));
|
||||
#else
|
||||
int result_rows_per_range = 0;
|
||||
int concurrency_factor = 1;
|
||||
#endif
|
||||
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
|
||||
results.reserve(ranges.size()/concurrency_factor + 1);
|
||||
|
||||
140
sstables/atomic_deletion.cc
Normal file
140
sstables/atomic_deletion.cc
Normal file
@@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "atomic_deletion.hh"
|
||||
#include "to_string.hh"
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
atomic_deletion_manager::atomic_deletion_manager(unsigned shard_count,
|
||||
std::function<future<> (std::vector<sstring> sstables)> delete_sstables)
|
||||
: _shard_count(shard_count)
|
||||
, _delete_sstables(std::move(delete_sstables)) {
|
||||
}
|
||||
|
||||
future<>
|
||||
atomic_deletion_manager::delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
|
||||
// runs on shard 0 only
|
||||
_deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
|
||||
|
||||
if (_atomic_deletions_cancelled) {
|
||||
_deletion_logger.debug("atomic deletions disabled, erroring out");
|
||||
using boost::adaptors::transformed;
|
||||
throw atomic_deletion_cancelled(atomic_deletion_set
|
||||
| transformed(std::mem_fn(&sstable_to_delete::name)));
|
||||
}
|
||||
|
||||
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
|
||||
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
|
||||
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
|
||||
auto merged_set = make_lw_shared(pending_deletion());
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
merged_set->names.insert(sst_to_delete.name);
|
||||
if (!sst_to_delete.shared) {
|
||||
for (auto shard : boost::irange<shard_id>(0, _shard_count)) {
|
||||
_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
|
||||
}
|
||||
}
|
||||
new_atomic_deletion_sets.emplace(sst_to_delete.name, merged_set);
|
||||
}
|
||||
auto pr = make_lw_shared<promise<>>();
|
||||
merged_set->completions.insert(pr);
|
||||
auto ret = pr->get_future();
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
auto i = _atomic_deletion_sets.find(sst_to_delete.name);
|
||||
// merge from old deletion set to new deletion set
|
||||
// i->second can be nullptr, see below why
|
||||
if (i != _atomic_deletion_sets.end() && i->second) {
|
||||
boost::copy(i->second->names, std::inserter(merged_set->names, merged_set->names.end()));
|
||||
boost::copy(i->second->completions, std::inserter(merged_set->completions, merged_set->completions.end()));
|
||||
}
|
||||
}
|
||||
_deletion_logger.debug("new atomic set: {}", merged_set->names);
|
||||
// we need to merge new_atomic_deletion_sets into g_atomic_deletion_sets,
|
||||
// but beware of exceptions. We do that with a first pass that inserts
|
||||
// nullptr as the value, so the second pass only replaces, and does not allocate
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
_atomic_deletion_sets.emplace(sst_to_delete.name, nullptr);
|
||||
}
|
||||
// now, no allocations are involved, so this commits the operation atomically
|
||||
for (auto&& n : merged_set->names) {
|
||||
auto i = _atomic_deletion_sets.find(n);
|
||||
i->second = merged_set;
|
||||
}
|
||||
|
||||
// Mark each sstable as being deleted from deleting_shard. We have to do
|
||||
// this in a separate pass, so the consideration whether we can delete or not
|
||||
// sees all the data from this pass.
|
||||
for (auto&& sst : atomic_deletion_set) {
|
||||
_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
|
||||
}
|
||||
|
||||
// Figure out if the (possibly merged) set can be deleted
|
||||
for (auto&& sst : merged_set->names) {
|
||||
if (_shards_agreeing_to_delete_sstable[sst].size() != _shard_count) {
|
||||
// Not everyone agrees, leave the set pending
|
||||
_deletion_logger.debug("deferring deletion until all shards agree");
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot recover from a failed deletion
|
||||
for (auto&& name : merged_set->names) {
|
||||
_atomic_deletion_sets.erase(name);
|
||||
_shards_agreeing_to_delete_sstable.erase(name);
|
||||
}
|
||||
|
||||
// Everyone agrees, let's delete
|
||||
auto names = boost::copy_range<std::vector<sstring>>(merged_set->names);
|
||||
_deletion_logger.debug("deleting {}", names);
|
||||
return _delete_sstables(names).then_wrapped([this, merged_set] (future<> result) {
|
||||
_deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
|
||||
shared_future<> sf(std::move(result));
|
||||
for (auto&& comp : merged_set->completions) {
|
||||
sf.get_future().forward_to(std::move(*comp));
|
||||
}
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
atomic_deletion_manager::cancel_atomic_deletions() {
|
||||
_atomic_deletions_cancelled = true;
|
||||
for (auto&& pd : _atomic_deletion_sets) {
|
||||
if (!pd.second) {
|
||||
// Could happen if a delete_atomically() failed
|
||||
continue;
|
||||
}
|
||||
for (auto&& c : pd.second->completions) {
|
||||
c->set_exception(atomic_deletion_cancelled(pd.second->names));
|
||||
}
|
||||
// since sets are shared, make sure we don't hit the same one again
|
||||
pd.second->completions.clear();
|
||||
}
|
||||
_atomic_deletion_sets.clear();
|
||||
_shards_agreeing_to_delete_sstable.clear();
|
||||
}
|
||||
|
||||
}
|
||||
92
sstables/atomic_deletion.hh
Normal file
92
sstables/atomic_deletion.hh
Normal file
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
// The atomic deletion manager solves the problem of orchestrating
|
||||
// the deletion of files that must be deleted as a group, where each
|
||||
// shard has different groups, and all shards delete a file for it to
|
||||
// be deleted. For example,
|
||||
//
|
||||
// shard 0: delete "A"
|
||||
// we can't delete anything because shard 1 hasn't agreed yet.
|
||||
// shard 1: delete "A" and B"
|
||||
// shard 1 agrees to delete "A", but we can't delete it yet,
|
||||
// because shard 1 requires that it be deleted together with "B",
|
||||
// and shard 0 hasn't agreed to delete "B" yet.
|
||||
// shard 0: delete "B" and "C"
|
||||
// shards 0 and 1 now both agree to delete "A" and "B", but shard 0
|
||||
// doesn't allow us to delete "B" without "C".
|
||||
// shard 1: delete "C"
|
||||
// finally, we can delete "A", "B", and "C".
|
||||
|
||||
#include "log.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/reactor.hh> // for shard_id
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
struct sstable_to_delete {
|
||||
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
|
||||
sstring name;
|
||||
bool shared = false;
|
||||
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
|
||||
};
|
||||
|
||||
class atomic_deletion_cancelled : public std::exception {
|
||||
std::string _msg;
|
||||
public:
|
||||
explicit atomic_deletion_cancelled(std::vector<sstring> names);
|
||||
template <typename StringRange>
|
||||
explicit atomic_deletion_cancelled(StringRange range)
|
||||
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
|
||||
}
|
||||
const char* what() const noexcept override;
|
||||
};
|
||||
|
||||
class atomic_deletion_manager {
|
||||
logging::logger _deletion_logger{"sstable-deletion"};
|
||||
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
|
||||
using sstables_to_delete_atomically_type = std::set<sstring>;
|
||||
struct pending_deletion {
|
||||
sstables_to_delete_atomically_type names;
|
||||
std::unordered_set<lw_shared_ptr<promise<>>> completions;
|
||||
};
|
||||
bool _atomic_deletions_cancelled = false;
|
||||
// map from sstable name to a set of sstables that must be deleted atomically, including itself
|
||||
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> _atomic_deletion_sets;
|
||||
std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> _shards_agreeing_to_delete_sstable;
|
||||
unsigned _shard_count;
|
||||
std::function<future<> (std::vector<sstring> sstables)> _delete_sstables;
|
||||
public:
|
||||
atomic_deletion_manager(unsigned shard_count,
|
||||
std::function<future<> (std::vector<sstring> sstables)> delete_sstables);
|
||||
future<> delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard);
|
||||
void cancel_atomic_deletions();
|
||||
};
|
||||
|
||||
}
|
||||
@@ -305,7 +305,7 @@ public:
|
||||
_remain = end - _stream_position;
|
||||
|
||||
_prestate = prestate::NONE;
|
||||
state_processor().reset();
|
||||
state_processor().reset(begin);
|
||||
return _input.skip(n);
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ public:
|
||||
bool should_continue() {
|
||||
return indexes.size() < max_quantity;
|
||||
}
|
||||
void consume_entry(index_entry&& ie) {
|
||||
void consume_entry(index_entry&& ie, uint64_t offset) {
|
||||
indexes.push_back(std::move(ie));
|
||||
}
|
||||
void reset() {
|
||||
@@ -49,13 +49,14 @@ public:
|
||||
// IndexConsumer is a concept that implements:
|
||||
//
|
||||
// bool should_continue();
|
||||
// void consume_entry(index_entry&& ie);
|
||||
// void consume_entry(index_entry&& ie, uintt64_t offset);
|
||||
template <class IndexConsumer>
|
||||
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
|
||||
using proceed = data_consumer::proceed;
|
||||
using continuous_data_consumer = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
|
||||
private:
|
||||
IndexConsumer& _consumer;
|
||||
uint64_t _entry_offset;
|
||||
|
||||
enum class state {
|
||||
START,
|
||||
@@ -113,9 +114,12 @@ public:
|
||||
_state = state::CONSUME_ENTRY;
|
||||
break;
|
||||
}
|
||||
case state::CONSUME_ENTRY:
|
||||
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)));
|
||||
case state::CONSUME_ENTRY: {
|
||||
auto len = (_key.size() + _promoted.size() + 14);
|
||||
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)), _entry_offset);
|
||||
_entry_offset += len;
|
||||
_state = state::START;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw malformed_sstable_exception("unknown state");
|
||||
@@ -126,11 +130,12 @@ public:
|
||||
index_consume_entry_context(IndexConsumer& consumer,
|
||||
input_stream<char>&& input, uint64_t start, uint64_t maxlen)
|
||||
: continuous_data_consumer(std::move(input), start, maxlen)
|
||||
, _consumer(consumer)
|
||||
, _consumer(consumer), _entry_offset(start)
|
||||
{}
|
||||
|
||||
void reset() {
|
||||
void reset(uint64_t offset) {
|
||||
_state = state::START;
|
||||
_entry_offset = offset;
|
||||
_consumer.reset();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -374,7 +374,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void reset() {
|
||||
void reset(uint64_t offset) {
|
||||
_state = state::ROW_START;
|
||||
_consumer.reset();
|
||||
}
|
||||
|
||||
@@ -741,10 +741,10 @@ future<> sstable::read_toc() {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
_components.insert(reverse_map(c, _component_map));
|
||||
_components.insert(reverse_map(c, _component_map));
|
||||
} catch (std::out_of_range& oor) {
|
||||
_components.clear(); // so subsequent read_toc will be forced to fail again
|
||||
throw malformed_sstable_exception("Unrecognized TOC component: " + c, file_path);
|
||||
_unrecognized_components.push_back(c);
|
||||
sstlog.info("Unrecognized TOC component was found: {} in sstable {}", c, file_path);
|
||||
}
|
||||
}
|
||||
if (!_components.size()) {
|
||||
@@ -1867,8 +1867,8 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
bool should_continue() {
|
||||
return true;
|
||||
}
|
||||
void consume_entry(index_entry&& ie) {
|
||||
maybe_add_summary_entry(_summary, ie.get_key_bytes(), ie.position());
|
||||
void consume_entry(index_entry&& ie, uint64_t offset) {
|
||||
maybe_add_summary_entry(_summary, ie.get_key_bytes(), offset);
|
||||
if (!first_key) {
|
||||
first_key = key(to_bytes(ie.get_key_bytes()));
|
||||
} else {
|
||||
@@ -1957,6 +1957,28 @@ const sstring sstable::filename(sstring dir, sstring ks, sstring cf, version_typ
|
||||
return dir + "/" + strmap[version](entry_descriptor(ks, cf, version, generation, format, component));
|
||||
}
|
||||
|
||||
const sstring sstable::filename(sstring dir, sstring ks, sstring cf, version_types version, int64_t generation,
|
||||
format_types format, sstring component) {
|
||||
static std::unordered_map<version_types, const char*, enum_hash<version_types>> fmtmap = {
|
||||
{ sstable::version_types::ka, "{0}-{1}-{2}-{3}-{5}" },
|
||||
{ sstable::version_types::la, "{2}-{3}-{4}-{5}" }
|
||||
};
|
||||
|
||||
return dir + "/" + seastar::format(fmtmap[version], ks, cf, _version_string.at(version), to_sstring(generation), _format_string.at(format), component);
|
||||
}
|
||||
|
||||
std::vector<std::pair<sstable::component_type, sstring>> sstable::all_components() const {
|
||||
std::vector<std::pair<component_type, sstring>> all;
|
||||
all.reserve(_components.size() + _unrecognized_components.size());
|
||||
for (auto& c : _components) {
|
||||
all.push_back(std::make_pair(c, _component_map.at(c)));
|
||||
}
|
||||
for (auto& c : _unrecognized_components) {
|
||||
all.push_back(std::make_pair(component_type::Unknown, c));
|
||||
}
|
||||
return all;
|
||||
}
|
||||
|
||||
future<> sstable::create_links(sstring dir, int64_t generation) const {
|
||||
// TemporaryTOC is always first, TOC is always last
|
||||
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, component_type::TemporaryTOC);
|
||||
@@ -1964,12 +1986,13 @@ future<> sstable::create_links(sstring dir, int64_t generation) const {
|
||||
return sstable_write_io_check(sync_directory, dir);
|
||||
}).then([this, dir, generation] {
|
||||
// FIXME: Should clean already-created links if we failed midway.
|
||||
return parallel_for_each(_components, [this, dir, generation] (auto comp) {
|
||||
if (comp == component_type::TOC) {
|
||||
return parallel_for_each(all_components(), [this, dir, generation] (auto p) {
|
||||
if (p.first == component_type::TOC) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, comp);
|
||||
return sstable_write_io_check(::link_file, this->filename(comp), dst);
|
||||
auto src = sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, p.second);
|
||||
auto dst = sstable::filename(dir, _schema->ks_name(), _schema->cf_name(), _version, generation, _format, p.second);
|
||||
return sstable_write_io_check(::link_file, std::move(src), std::move(dst));
|
||||
});
|
||||
}).then([dir] {
|
||||
return sstable_write_io_check(sync_directory, dir);
|
||||
@@ -1989,11 +2012,11 @@ future<> sstable::set_generation(int64_t new_generation) {
|
||||
return remove_file(filename(component_type::TOC)).then([this] {
|
||||
return sstable_write_io_check(sync_directory, _dir);
|
||||
}).then([this] {
|
||||
return parallel_for_each(_components, [this] (auto comp) {
|
||||
if (comp == component_type::TOC) {
|
||||
return parallel_for_each(all_components(), [this] (auto p) {
|
||||
if (p.first == component_type::TOC) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return remove_file(this->filename(comp));
|
||||
return remove_file(sstable::filename(_dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, p.second));
|
||||
});
|
||||
});
|
||||
}).then([this, new_generation] {
|
||||
@@ -2240,8 +2263,11 @@ remove_by_toc_name(sstring sstable_toc_name) {
|
||||
dir = dirname(sstable_toc_name);
|
||||
sstable_write_io_check(rename_file, sstable_toc_name, new_toc_name).get();
|
||||
sstable_write_io_check(fsync_directory, dir).get();
|
||||
} else {
|
||||
} else if (sstable_write_io_check(file_exists, new_toc_name).get0()) {
|
||||
dir = dirname(new_toc_name);
|
||||
} else {
|
||||
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
|
||||
return;
|
||||
}
|
||||
|
||||
auto toc_file = open_checked_file_dma(sstable_read_error, new_toc_name, open_flags::ro).get0();
|
||||
@@ -2427,107 +2453,21 @@ operator<<(std::ostream& os, const sstable_to_delete& std) {
|
||||
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
|
||||
}
|
||||
|
||||
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
|
||||
using sstables_to_delete_atomically_type = std::set<sstring>;
|
||||
struct pending_deletion {
|
||||
sstables_to_delete_atomically_type names;
|
||||
std::vector<lw_shared_ptr<promise<>>> completions;
|
||||
};
|
||||
|
||||
static thread_local bool g_atomic_deletions_cancelled = false;
|
||||
static thread_local std::list<lw_shared_ptr<pending_deletion>> g_atomic_deletion_sets;
|
||||
static thread_local std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> g_shards_agreeing_to_delete_sstable;
|
||||
|
||||
static logging::logger deletion_logger("sstable-deletion");
|
||||
|
||||
static
|
||||
future<>
|
||||
do_delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
|
||||
// runs on shard 0 only
|
||||
deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
|
||||
|
||||
if (g_atomic_deletions_cancelled) {
|
||||
deletion_logger.debug("atomic deletions disabled, erroring out");
|
||||
using boost::adaptors::transformed;
|
||||
throw atomic_deletion_cancelled(atomic_deletion_set
|
||||
| transformed(std::mem_fn(&sstable_to_delete::name)));
|
||||
}
|
||||
|
||||
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
|
||||
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
|
||||
std::list<lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
|
||||
auto merged_set = make_lw_shared(pending_deletion());
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
merged_set->names.insert(sst_to_delete.name);
|
||||
if (!sst_to_delete.shared) {
|
||||
for (auto shard : boost::irange<shard_id>(0, smp::count)) {
|
||||
g_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
merged_set->completions.push_back(make_lw_shared<promise<>>());
|
||||
auto ret = merged_set->completions.back()->get_future();
|
||||
for (auto&& old_set : g_atomic_deletion_sets) {
|
||||
auto intersection = sstables_to_delete_atomically_type();
|
||||
boost::set_intersection(merged_set->names, old_set->names, std::inserter(intersection, intersection.end()));
|
||||
if (intersection.empty()) {
|
||||
// We copy old_set to avoid corrupting g_atomic_deletion_sets if we fail
|
||||
// further on.
|
||||
new_atomic_deletion_sets.push_back(old_set);
|
||||
} else {
|
||||
deletion_logger.debug("merging with {}", old_set->names);
|
||||
merged_set->names.insert(old_set->names.begin(), old_set->names.end());
|
||||
boost::push_back(merged_set->completions, old_set->completions);
|
||||
}
|
||||
}
|
||||
deletion_logger.debug("new atomic set: {}", merged_set->names);
|
||||
new_atomic_deletion_sets.push_back(merged_set);
|
||||
// can now exception-safely commit:
|
||||
g_atomic_deletion_sets = std::move(new_atomic_deletion_sets);
|
||||
|
||||
// Mark each sstable as being deleted from deleting_shard. We have to do
|
||||
// this in a separate pass, so the consideration whether we can delete or not
|
||||
// sees all the data from this pass.
|
||||
for (auto&& sst : atomic_deletion_set) {
|
||||
g_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
|
||||
}
|
||||
|
||||
// Figure out if the (possibly merged) set can be deleted
|
||||
for (auto&& sst : merged_set->names) {
|
||||
if (g_shards_agreeing_to_delete_sstable[sst].size() != smp::count) {
|
||||
// Not everyone agrees, leave the set pending
|
||||
deletion_logger.debug("deferring deletion until all shards agree");
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot recover from a failed deletion
|
||||
g_atomic_deletion_sets.pop_back();
|
||||
for (auto&& name : merged_set->names) {
|
||||
g_shards_agreeing_to_delete_sstable.erase(name);
|
||||
}
|
||||
|
||||
// Everyone agrees, let's delete
|
||||
delete_sstables(std::vector<sstring> tocs) {
|
||||
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
|
||||
parallel_for_each(merged_set->names, [] (sstring name) {
|
||||
deletion_logger.debug("deleting {}", name);
|
||||
return parallel_for_each(tocs, [] (sstring name) {
|
||||
return remove_by_toc_name(name);
|
||||
}).then_wrapped([merged_set] (future<> result) {
|
||||
deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
|
||||
shared_future<> sf(std::move(result));
|
||||
for (auto&& comp : merged_set->completions) {
|
||||
sf.get_future().forward_to(std::move(*comp));
|
||||
}
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static thread_local atomic_deletion_manager g_atomic_deletion_manager(smp::count, delete_sstables);
|
||||
|
||||
future<>
|
||||
delete_atomically(std::vector<sstable_to_delete> ssts) {
|
||||
auto shard = engine().cpu_id();
|
||||
return smp::submit_to(0, [=] {
|
||||
return do_delete_atomically(ssts, shard);
|
||||
return g_atomic_deletion_manager.delete_atomically(ssts, shard);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2540,16 +2480,8 @@ delete_atomically(std::vector<shared_sstable> ssts) {
|
||||
return delete_atomically(std::move(sstables_to_delete_atomically));
|
||||
}
|
||||
|
||||
void
|
||||
cancel_atomic_deletions() {
|
||||
g_atomic_deletions_cancelled = true;
|
||||
for (auto&& pd : g_atomic_deletion_sets) {
|
||||
for (auto&& c : pd->completions) {
|
||||
c->set_exception(atomic_deletion_cancelled(pd->names));
|
||||
}
|
||||
}
|
||||
g_atomic_deletion_sets.clear();
|
||||
g_shards_agreeing_to_delete_sstable.clear();
|
||||
void cancel_atomic_deletions() {
|
||||
g_atomic_deletion_manager.cancel_atomic_deletions();
|
||||
}
|
||||
|
||||
atomic_deletion_cancelled::atomic_deletion_cancelled(std::vector<sstring> names)
|
||||
|
||||
@@ -48,6 +48,7 @@
|
||||
#include "mutation_reader.hh"
|
||||
#include "query-request.hh"
|
||||
#include "compound_compat.hh"
|
||||
#include "atomic_deletion.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -130,6 +131,7 @@ public:
|
||||
Statistics,
|
||||
TemporaryTOC,
|
||||
TemporaryStatistics,
|
||||
Unknown,
|
||||
};
|
||||
enum class version_types { ka, la };
|
||||
enum class format_types { big };
|
||||
@@ -221,6 +223,8 @@ public:
|
||||
static format_types format_from_sstring(sstring& s);
|
||||
static const sstring filename(sstring dir, sstring ks, sstring cf, version_types version, int64_t generation,
|
||||
format_types format, component_type component);
|
||||
static const sstring filename(sstring dir, sstring ks, sstring cf, version_types version, int64_t generation,
|
||||
format_types format, sstring component);
|
||||
// WARNING: it should only be called to remove components of a sstable with
|
||||
// a temporary TOC file.
|
||||
static future<> remove_sstable_with_temp_toc(sstring ks, sstring cf, sstring dir, int64_t generation,
|
||||
@@ -358,6 +362,8 @@ public:
|
||||
return _collector;
|
||||
}
|
||||
|
||||
std::vector<std::pair<component_type, sstring>> all_components() const;
|
||||
|
||||
future<> create_links(sstring dir, int64_t generation) const;
|
||||
|
||||
future<> create_links(sstring dir) const {
|
||||
@@ -394,6 +400,7 @@ private:
|
||||
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
|
||||
|
||||
std::unordered_set<component_type, enum_hash<component_type>> _components;
|
||||
std::vector<sstring> _unrecognized_components;
|
||||
|
||||
bool _shared = true; // across shards; safe default
|
||||
compression _compression;
|
||||
@@ -688,14 +695,6 @@ future<> await_background_jobs();
|
||||
// Invokes await_background_jobs() on all shards
|
||||
future<> await_background_jobs_on_all_shards();
|
||||
|
||||
struct sstable_to_delete {
|
||||
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
|
||||
sstring name;
|
||||
bool shared = false;
|
||||
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
|
||||
};
|
||||
|
||||
|
||||
// When we compact sstables, we have to atomically instantiate the new
|
||||
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
|
||||
// and if A contained some data that was tombstoned by B, and if B was
|
||||
@@ -714,17 +713,6 @@ struct sstable_to_delete {
|
||||
future<> delete_atomically(std::vector<shared_sstable> ssts);
|
||||
future<> delete_atomically(std::vector<sstable_to_delete> ssts);
|
||||
|
||||
class atomic_deletion_cancelled : public std::exception {
|
||||
std::string _msg;
|
||||
public:
|
||||
explicit atomic_deletion_cancelled(std::vector<sstring> names);
|
||||
template <typename StringRange>
|
||||
explicit atomic_deletion_cancelled(StringRange range)
|
||||
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
|
||||
}
|
||||
const char* what() const noexcept override;
|
||||
};
|
||||
|
||||
// Cancel any deletions scheduled by delete_atomically() and make their
|
||||
// futures complete (with an atomic_deletion_cancelled exception).
|
||||
void cancel_atomic_deletions();
|
||||
|
||||
1
test.py
1
test.py
@@ -39,6 +39,7 @@ boost_tests = [
|
||||
'storage_proxy_test',
|
||||
'schema_change_test',
|
||||
'sstable_mutation_test',
|
||||
'sstable_atomic_deletion_test',
|
||||
'commitlog_test',
|
||||
'hash_test',
|
||||
'test-serialization',
|
||||
|
||||
170
tests/sstable_atomic_deletion_test.cc
Normal file
170
tests/sstable_atomic_deletion_test.cc
Normal file
@@ -0,0 +1,170 @@
|
||||
/*
|
||||
* Copyright (C) 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sstables/atomic_deletion.hh"
|
||||
#include <seastar/tests/test-utils.hh>
|
||||
#include <deque>
|
||||
#include <boost/range/numeric.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
|
||||
class atomic_deletion_test_env {
|
||||
public:
|
||||
using event = std::function<future<> (atomic_deletion_test_env& adm)>;
|
||||
private:
|
||||
struct a_hash {
|
||||
size_t operator()(const std::unordered_set<sstring>& s) const {
|
||||
auto h = std::hash<sstring>();
|
||||
return boost::accumulate(s | boost::adaptors::transformed(h), size_t(0)); // sue me
|
||||
}
|
||||
};
|
||||
atomic_deletion_manager _adm;
|
||||
std::deque<event> _events;
|
||||
std::unordered_set<std::unordered_set<sstring>, a_hash> _deletes;
|
||||
semaphore _deletion_counter { 0 };
|
||||
private:
|
||||
future<> delete_sstables(std::vector<sstring> names) {
|
||||
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
|
||||
_deletes.insert(s1);
|
||||
_deletion_counter.signal();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
public:
|
||||
explicit atomic_deletion_test_env(unsigned shard_count, std::vector<event> events)
|
||||
: _adm(shard_count, [this] (std::vector<sstring> names) {
|
||||
return delete_sstables(names);
|
||||
})
|
||||
, _events(events.begin(), events.end()) {
|
||||
}
|
||||
void expect_no_deletion() {
|
||||
BOOST_REQUIRE(_deletes.empty());
|
||||
}
|
||||
future<> schedule_delete(std::vector<sstable_to_delete> names, unsigned shard) {
|
||||
_adm.delete_atomically(names, shard).discard_result();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> expect_deletion(std::vector<sstring> names) {
|
||||
return _deletion_counter.wait().then([this, names] {
|
||||
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
|
||||
auto erased = _deletes.erase(s1);
|
||||
BOOST_REQUIRE_EQUAL(erased, 1);
|
||||
});
|
||||
}
|
||||
future<> test() {
|
||||
// run all _events sequentially
|
||||
return repeat([this] {
|
||||
if (_events.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
auto ev = std::move(_events.front());
|
||||
_events.pop_front();
|
||||
return ev(*this).then([] {
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
future<> test_atomic_deletion_manager(unsigned shards, std::vector<atomic_deletion_test_env::event> events) {
|
||||
auto env = make_lw_shared<atomic_deletion_test_env>(shards, events);
|
||||
return env->test().finally([env] {});
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
delete_many(std::vector<sstable_to_delete> v, unsigned shard) {
|
||||
return [v, shard] (atomic_deletion_test_env& env) {
|
||||
// verify we didn't have an early delete from previous deletion
|
||||
env.expect_no_deletion();
|
||||
return env.schedule_delete(v, shard);
|
||||
};
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
delete_one(sstable_to_delete s, unsigned shard) {
|
||||
return delete_many({s}, shard);
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
expect_many(std::vector<sstring> names) {
|
||||
return [names] (atomic_deletion_test_env& env) {
|
||||
return env.expect_deletion(names);
|
||||
};
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
expect_one(sstring name) {
|
||||
return expect_many({name});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_single_shard_single_sstable) {
|
||||
return test_atomic_deletion_manager(1, {
|
||||
delete_one({"1", false}, 0),
|
||||
expect_one("1"),
|
||||
delete_one({"2", true}, 0),
|
||||
expect_one("2"),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multi_shard_single_sstable) {
|
||||
return test_atomic_deletion_manager(3, {
|
||||
delete_one({"1", true}, 0),
|
||||
delete_one({"1", true}, 1),
|
||||
delete_one({"1", true}, 2),
|
||||
expect_one("1"),
|
||||
delete_one({"2", false}, 1),
|
||||
expect_one("2"),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_nonshared_compaction) {
|
||||
return test_atomic_deletion_manager(5, {
|
||||
delete_many({{"1", false}, {"2", false}, {"3", false}}, 2),
|
||||
expect_many({"1", "2", "3"}),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_shared_compaction) {
|
||||
return test_atomic_deletion_manager(3, {
|
||||
delete_one({"1", true}, 0),
|
||||
delete_many({{"1", true}, {"2", false}, {"3", false}}, 2),
|
||||
delete_one({"1", true}, 1),
|
||||
expect_many({"1", "2", "3"}),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_overlapping_compaction) {
|
||||
return test_atomic_deletion_manager(3, {
|
||||
delete_one({"1", true}, 0),
|
||||
delete_one({"3", true}, 0),
|
||||
delete_many({{"1", true}, {"2", false}, {"3", true}}, 2),
|
||||
delete_one({"1", true}, 1),
|
||||
delete_many({{"3", true}, {"4", false}}, 1),
|
||||
expect_many({"1", "2", "3", "4"}),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
@@ -3031,3 +3031,22 @@ SEASTAR_TEST_CASE(test_partition_skipping) {
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_unknown_component) {
|
||||
return seastar::async([] {
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
auto sstp = reusable_sst(uncompressed_schema(), "tests/sstables/unknown_component", 1).get0();
|
||||
sstp->create_links(tmp->path).get();
|
||||
// check that create_links() moved unknown component to new dir
|
||||
BOOST_REQUIRE(file_exists(tmp->path + "/la-1-big-UNKNOWN.txt").get0());
|
||||
|
||||
sstp = reusable_sst(uncompressed_schema(), tmp->path, 1).get0();
|
||||
sstp->set_generation(2).get();
|
||||
BOOST_REQUIRE(!file_exists(tmp->path + "/la-1-big-UNKNOWN.txt").get0());
|
||||
BOOST_REQUIRE(file_exists(tmp->path + "/la-2-big-UNKNOWN.txt").get0());
|
||||
|
||||
sstables::delete_atomically({sstp}).get();
|
||||
// assure unknown component is deleted
|
||||
BOOST_REQUIRE(!file_exists(tmp->path + "/la-2-big-UNKNOWN.txt").get0());
|
||||
});
|
||||
}
|
||||
|
||||
BIN
tests/sstables/unknown_component/la-1-big-CRC.db
Normal file
BIN
tests/sstables/unknown_component/la-1-big-CRC.db
Normal file
Binary file not shown.
BIN
tests/sstables/unknown_component/la-1-big-Data.db
Normal file
BIN
tests/sstables/unknown_component/la-1-big-Data.db
Normal file
Binary file not shown.
1
tests/sstables/unknown_component/la-1-big-Digest.sha1
Normal file
1
tests/sstables/unknown_component/la-1-big-Digest.sha1
Normal file
@@ -0,0 +1 @@
|
||||
748507322
|
||||
BIN
tests/sstables/unknown_component/la-1-big-Filter.db
Normal file
BIN
tests/sstables/unknown_component/la-1-big-Filter.db
Normal file
Binary file not shown.
BIN
tests/sstables/unknown_component/la-1-big-Index.db
Normal file
BIN
tests/sstables/unknown_component/la-1-big-Index.db
Normal file
Binary file not shown.
BIN
tests/sstables/unknown_component/la-1-big-Statistics.db
Normal file
BIN
tests/sstables/unknown_component/la-1-big-Statistics.db
Normal file
Binary file not shown.
BIN
tests/sstables/unknown_component/la-1-big-Summary.db
Normal file
BIN
tests/sstables/unknown_component/la-1-big-Summary.db
Normal file
Binary file not shown.
9
tests/sstables/unknown_component/la-1-big-TOC.txt
Normal file
9
tests/sstables/unknown_component/la-1-big-TOC.txt
Normal file
@@ -0,0 +1,9 @@
|
||||
Data.db
|
||||
Filter.db
|
||||
CRC.db
|
||||
Statistics.db
|
||||
Summary.db
|
||||
Digest.sha1
|
||||
Index.db
|
||||
TOC.txt
|
||||
UNKNOWN.txt
|
||||
@@ -39,8 +39,8 @@ class moving_average {
|
||||
public:
|
||||
moving_average(latency_counter::duration interval, latency_counter::duration tick_interval) :
|
||||
_tick_interval(tick_interval) {
|
||||
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::nanoseconds>(interval).count()/
|
||||
static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(tick_interval).count()));
|
||||
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::seconds>(tick_interval).count()/
|
||||
static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(interval).count()));
|
||||
}
|
||||
|
||||
void add(uint64_t val = 1) {
|
||||
@@ -48,7 +48,7 @@ public:
|
||||
}
|
||||
|
||||
void update() {
|
||||
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(_tick_interval).count());
|
||||
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(_tick_interval).count());
|
||||
if (_initialized) {
|
||||
_rate += (_alpha * (instant_rate - _rate));
|
||||
} else {
|
||||
@@ -70,7 +70,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class ihistogram {
|
||||
template <typename Unit>
|
||||
class basic_ihistogram {
|
||||
public:
|
||||
// count holds all the events
|
||||
int64_t count;
|
||||
@@ -84,12 +85,13 @@ public:
|
||||
double variance;
|
||||
int64_t sample_mask;
|
||||
boost::circular_buffer<int64_t> sample;
|
||||
ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
|
||||
basic_ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
|
||||
: count(0), total(0), min(0), max(0), sum(0), started(0), mean(0), variance(0),
|
||||
sample_mask(_sample_mask), sample(
|
||||
size) {
|
||||
}
|
||||
void mark(int64_t value) {
|
||||
void mark(int64_t ns_value) {
|
||||
auto value = std::chrono::duration_cast<Unit>(std::chrono::nanoseconds(ns_value)).count();
|
||||
if (total == 0 || value < min) {
|
||||
min = value;
|
||||
}
|
||||
@@ -131,7 +133,7 @@ public:
|
||||
/**
|
||||
* Set the latency according to the sample rate.
|
||||
*/
|
||||
ihistogram& set_latency(latency_counter& lc) {
|
||||
basic_ihistogram& set_latency(latency_counter& lc) {
|
||||
if (should_sample()) {
|
||||
lc.start();
|
||||
}
|
||||
@@ -144,7 +146,7 @@ public:
|
||||
* Increment the total number of events without
|
||||
* sampling the value.
|
||||
*/
|
||||
ihistogram& inc() {
|
||||
basic_ihistogram& inc() {
|
||||
count++;
|
||||
return *this;
|
||||
}
|
||||
@@ -157,7 +159,7 @@ public:
|
||||
return a * a;
|
||||
}
|
||||
|
||||
ihistogram& operator +=(const ihistogram& o) {
|
||||
basic_ihistogram& operator +=(const basic_ihistogram& o) {
|
||||
if (count == 0) {
|
||||
*this = o;
|
||||
} else if (o.count > 0) {
|
||||
@@ -190,14 +192,18 @@ public:
|
||||
return mean * count;
|
||||
}
|
||||
|
||||
friend ihistogram operator +(ihistogram a, const ihistogram& b);
|
||||
template <typename U>
|
||||
friend basic_ihistogram<U> operator +(basic_ihistogram<U> a, const basic_ihistogram<U>& b);
|
||||
};
|
||||
|
||||
inline ihistogram operator +(ihistogram a, const ihistogram& b) {
|
||||
template <typename Unit>
|
||||
inline basic_ihistogram<Unit> operator +(basic_ihistogram<Unit> a, const basic_ihistogram<Unit>& b) {
|
||||
a += b;
|
||||
return a;
|
||||
}
|
||||
|
||||
using ihistogram = basic_ihistogram<std::chrono::microseconds>;
|
||||
|
||||
struct rate_moving_average {
|
||||
uint64_t count = 0;
|
||||
double rates[3] = {0};
|
||||
@@ -222,7 +228,7 @@ class timed_rate_moving_average {
|
||||
static constexpr latency_counter::duration tick_interval() {
|
||||
return std::chrono::seconds(10);
|
||||
}
|
||||
moving_average rates[3] = {{tick_interval(), std::chrono::minutes(1)}, {tick_interval(), std::chrono::minutes(5)}, {tick_interval(), std::chrono::minutes(15)}};
|
||||
moving_average rates[3] = {{std::chrono::minutes(1), tick_interval()}, {std::chrono::minutes(5), tick_interval()}, {std::chrono::minutes(15), tick_interval()}};
|
||||
latency_counter::time_point start_time;
|
||||
timer<> _timer;
|
||||
|
||||
@@ -246,7 +252,7 @@ public:
|
||||
rate_moving_average rate() const {
|
||||
rate_moving_average res;
|
||||
if (_count > 0) {
|
||||
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(latency_counter::now() - start_time).count();
|
||||
double elapsed = std::chrono::duration_cast<std::chrono::seconds>(latency_counter::now() - start_time).count();
|
||||
res.mean_rate = (_count / elapsed);
|
||||
}
|
||||
res.count = _count;
|
||||
|
||||
@@ -61,7 +61,9 @@ using eviction_fn = std::function<memory::reclaiming_result()>;
|
||||
class region_group_reclaimer {
|
||||
protected:
|
||||
size_t _threshold;
|
||||
size_t _soft_limit;
|
||||
bool _under_pressure = false;
|
||||
bool _under_soft_pressure = false;
|
||||
virtual void start_reclaiming() {}
|
||||
virtual void stop_reclaiming() {}
|
||||
public:
|
||||
@@ -69,6 +71,24 @@ public:
|
||||
return _under_pressure;
|
||||
}
|
||||
|
||||
bool over_soft_limit() const {
|
||||
return _under_soft_pressure;
|
||||
}
|
||||
|
||||
void notify_soft_pressure() {
|
||||
if (!_under_soft_pressure) {
|
||||
_under_soft_pressure = true;
|
||||
start_reclaiming();
|
||||
}
|
||||
}
|
||||
|
||||
void notify_soft_relief() {
|
||||
if (_under_soft_pressure) {
|
||||
_under_soft_pressure = false;
|
||||
stop_reclaiming();
|
||||
}
|
||||
}
|
||||
|
||||
void notify_pressure() {
|
||||
if (!_under_pressure) {
|
||||
_under_pressure = true;
|
||||
@@ -83,12 +103,21 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
region_group_reclaimer(size_t threshold = std::numeric_limits<size_t>::max()) : _threshold(threshold) {}
|
||||
region_group_reclaimer()
|
||||
: _threshold(std::numeric_limits<size_t>::max()), _soft_limit(std::numeric_limits<size_t>::max()) {}
|
||||
region_group_reclaimer(size_t threshold)
|
||||
: _threshold(threshold), _soft_limit(threshold) {}
|
||||
region_group_reclaimer(size_t threshold, size_t soft)
|
||||
: _threshold(threshold), _soft_limit(soft) {}
|
||||
|
||||
virtual ~region_group_reclaimer() {}
|
||||
|
||||
size_t throttle_threshold() const {
|
||||
return _threshold;
|
||||
}
|
||||
size_t soft_limit_threshold() const {
|
||||
return _soft_limit;
|
||||
}
|
||||
};
|
||||
|
||||
// Groups regions for the purpose of statistics. Can be nested.
|
||||
@@ -232,6 +261,11 @@ public:
|
||||
if (rg->execution_permitted()) {
|
||||
rg->release_requests();
|
||||
}
|
||||
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_pressure();
|
||||
} else if (rg->_total_memory < rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_relief();
|
||||
}
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user