sstables: introduce sstables_manager
The goal of the sstables manager is to track and manage sstables life-cycle. There is a sstable manager instance per database and it is passed to each column-family (and test environment) on construction. All sstables created, loaded, and deleted pass through the sstables manager. The manager will make sure consumers of sstables are in sync so that sstables will not be deleted while in use. Refs #4149 Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -494,6 +494,7 @@ scylla_core = (['database.cc',
|
||||
'compress.cc',
|
||||
'sstables/mp_row_consumer.cc',
|
||||
'sstables/sstables.cc',
|
||||
'sstables/sstables_manager.cc',
|
||||
'sstables/mc/writer.cc',
|
||||
'sstables/sstable_version.cc',
|
||||
'sstables/compress.cc',
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "sstables/compaction.hh"
|
||||
#include "sstables/remove.hh"
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
@@ -224,6 +225,7 @@ database::database(const db::config& cfg, database_config dbcfg)
|
||||
_cfg->compaction_large_row_warning_threshold_mb()*1024*1024,
|
||||
_cfg->compaction_large_cell_warning_threshold_mb()*1024*1024))
|
||||
, _nop_large_data_handler(std::make_unique<db::nop_large_data_handler>())
|
||||
, _sstables_manager(std::make_unique<sstables::sstables_manager>())
|
||||
, _result_memory_limiter(dbcfg.available_memory / 10)
|
||||
, _data_listeners(std::make_unique<db::data_listeners>(*this))
|
||||
{
|
||||
@@ -907,6 +909,7 @@ keyspace::make_column_family_config(const schema& s, const database& db) const {
|
||||
cfg.large_data_handler = db.get_large_data_handler();
|
||||
}
|
||||
|
||||
cfg.sstables_manager = &db.get_sstables_manager();
|
||||
cfg.view_update_concurrency_semaphore = _config.view_update_concurrency_semaphore;
|
||||
cfg.view_update_concurrency_semaphore_limit = _config.view_update_concurrency_semaphore_limit;
|
||||
cfg.data_listeners = &db.data_listeners();
|
||||
|
||||
14
database.hh
14
database.hh
@@ -113,6 +113,7 @@ class sstable;
|
||||
class entry_descriptor;
|
||||
class compaction_descriptor;
|
||||
class foreign_sstable_open_info;
|
||||
class sstables_manager;
|
||||
|
||||
}
|
||||
|
||||
@@ -336,6 +337,7 @@ public:
|
||||
seastar::scheduling_group streaming_scheduling_group;
|
||||
bool enable_metrics_reporting = false;
|
||||
db::large_data_handler* large_data_handler;
|
||||
sstables::sstables_manager* sstables_manager;
|
||||
db::timeout_semaphore* view_update_concurrency_semaphore;
|
||||
size_t view_update_concurrency_semaphore_limit;
|
||||
db::data_listeners* data_listeners = nullptr;
|
||||
@@ -919,6 +921,11 @@ public:
|
||||
return _config.large_data_handler;
|
||||
}
|
||||
|
||||
sstables::sstables_manager& get_sstables_manager() const {
|
||||
assert(_config.sstables_manager);
|
||||
return *_config.sstables_manager;
|
||||
}
|
||||
|
||||
future<> populate_views(
|
||||
std::vector<view_ptr>,
|
||||
dht::token base_token,
|
||||
@@ -1286,6 +1293,8 @@ private:
|
||||
std::unique_ptr<db::large_data_handler> _large_data_handler;
|
||||
std::unique_ptr<db::large_data_handler> _nop_large_data_handler;
|
||||
|
||||
std::unique_ptr<sstables::sstables_manager> _sstables_manager;
|
||||
|
||||
query::result_memory_limiter _result_memory_limiter;
|
||||
|
||||
friend db::data_listeners;
|
||||
@@ -1443,6 +1452,11 @@ public:
|
||||
return _nop_large_data_handler.get();
|
||||
}
|
||||
|
||||
sstables::sstables_manager& get_sstables_manager() const {
|
||||
assert(_sstables_manager);
|
||||
return *_sstables_manager;
|
||||
}
|
||||
|
||||
future<> flush_all_memtables();
|
||||
|
||||
// See #937. Truncation now requires a callback to get a time stamp
|
||||
|
||||
@@ -173,12 +173,6 @@ future<> await_background_jobs_on_all_shards() {
|
||||
});
|
||||
}
|
||||
|
||||
shared_sstable
|
||||
make_sstable(schema_ptr schema, sstring dir, int64_t generation, sstable_version_types v, sstable_format_types f, gc_clock::time_point now,
|
||||
io_error_handler_gen error_handler_gen, size_t buffer_size) {
|
||||
return make_lw_shared<sstable>(std::move(schema), std::move(dir), generation, v, f, now, std::move(error_handler_gen), buffer_size);
|
||||
}
|
||||
|
||||
std::unordered_map<sstable::version_types, sstring, enum_hash<sstable::version_types>> sstable::_version_string = {
|
||||
{ sstable::version_types::ka , "ka" },
|
||||
{ sstable::version_types::la , "la" },
|
||||
|
||||
@@ -124,13 +124,6 @@ struct sstable_writer_config {
|
||||
utils::UUID run_identifier = utils::make_random_uuid();
|
||||
};
|
||||
|
||||
static constexpr inline size_t default_sstable_buffer_size() {
|
||||
return 128 * 1024;
|
||||
}
|
||||
|
||||
shared_sstable make_sstable(schema_ptr schema, sstring dir, int64_t generation, sstable_version_types v, sstable_format_types f, gc_clock::time_point now = gc_clock::now(),
|
||||
io_error_handler_gen error_handler_gen = default_io_error_handler_gen(), size_t buffer_size = default_sstable_buffer_size());
|
||||
|
||||
class sstable : public enable_lw_shared_from_this<sstable> {
|
||||
friend ::sstable_assertions;
|
||||
public:
|
||||
|
||||
41
sstables/sstables_manager.cc
Normal file
41
sstables/sstables_manager.cc
Normal file
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "log.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
logging::logger smlogger("sstables_manager");
|
||||
|
||||
shared_sstable sstables_manager::make_sstable(schema_ptr schema,
|
||||
sstring dir,
|
||||
int64_t generation,
|
||||
sstable_version_types v,
|
||||
sstable_format_types f,
|
||||
gc_clock::time_point now,
|
||||
io_error_handler_gen error_handler_gen,
|
||||
size_t buffer_size) {
|
||||
return make_lw_shared<sstable>(std::move(schema), std::move(dir), generation, v, f, now, std::move(error_handler_gen), buffer_size);
|
||||
}
|
||||
|
||||
} // namespace sstables
|
||||
59
sstables/sstables_manager.hh
Normal file
59
sstables/sstables_manager.hh
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "sstables/shareable_components.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "sstables/version.hh"
|
||||
#include "sstables/component_type.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
using schema_ptr = lw_shared_ptr<const schema>;
|
||||
using shareable_components_ptr = lw_shared_ptr<shareable_components>;
|
||||
|
||||
static constexpr inline size_t default_sstable_buffer_size() {
|
||||
return 128 * 1024;
|
||||
}
|
||||
|
||||
class sstables_manager {
|
||||
public:
|
||||
sstables_manager() = default;
|
||||
|
||||
// Constructs a shared sstable
|
||||
shared_sstable make_sstable(schema_ptr schema,
|
||||
sstring dir,
|
||||
int64_t generation,
|
||||
sstable_version_types v,
|
||||
sstable_format_types f,
|
||||
gc_clock::time_point now = gc_clock::now(),
|
||||
io_error_handler_gen error_handler_gen = default_io_error_handler_gen(),
|
||||
size_t buffer_size = default_sstable_buffer_size());
|
||||
};
|
||||
|
||||
} // namespace sstables
|
||||
8
table.cc
8
table.cc
@@ -21,6 +21,7 @@
|
||||
|
||||
#include "database.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "db/view/view_updating_consumer.hh"
|
||||
#include "cell_locking.hh"
|
||||
@@ -598,12 +599,12 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
|
||||
sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
|
||||
io_error_handler_gen error_handler_gen) {
|
||||
return sstables::make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen);
|
||||
return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen);
|
||||
}
|
||||
|
||||
sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation,
|
||||
sstables::sstable_version_types v, sstables::sstable_format_types f) {
|
||||
return sstables::make_sstable(_schema, dir, generation, v, f);
|
||||
return get_sstables_manager().make_sstable(_schema, dir, generation, v, f);
|
||||
}
|
||||
|
||||
sstables::shared_sstable table::make_sstable(sstring dir) {
|
||||
@@ -1542,7 +1543,8 @@ table::make_streaming_memtable_big_list(streaming_memtable_big& smb) {
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager, _config.streaming_scheduling_group);
|
||||
}
|
||||
|
||||
table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker)
|
||||
table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager,
|
||||
cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker)
|
||||
: _schema(std::move(schema))
|
||||
, _config(std::move(config))
|
||||
, _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()))
|
||||
|
||||
@@ -327,12 +327,11 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
{{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
|
||||
|
||||
auto cf_stats = make_lw_shared<::cf_stats>();
|
||||
column_family::config cfg;
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.enable_disk_reads = false;
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
cfg.large_data_handler = &nop_lp_handler;
|
||||
|
||||
with_column_family(s, cfg, [s] (column_family& cf) {
|
||||
const column_definition& r1_col = *s->get_column_definition("r1");
|
||||
@@ -379,13 +378,12 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
|
||||
auto cf_stats = make_lw_shared<::cf_stats>();
|
||||
|
||||
column_family::config cfg;
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.enable_disk_reads = true;
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_cache = true;
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
cfg.large_data_handler = &nop_lp_handler;
|
||||
|
||||
return with_column_family(s, cfg, [s](column_family& cf) {
|
||||
return seastar::async([s, &cf] {
|
||||
@@ -459,12 +457,12 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
|
||||
|
||||
auto cf_stats = make_lw_shared<::cf_stats>();
|
||||
|
||||
column_family::config cfg;
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.enable_disk_reads = false;
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
cfg.large_data_handler = &nop_lp_handler;
|
||||
|
||||
with_column_family(s, cfg, [s] (auto& cf) mutable {
|
||||
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;
|
||||
|
||||
|
||||
@@ -69,9 +69,7 @@ void run_sstable_resharding_test() {
|
||||
auto s = get_schema();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
column_family::config cfg;
|
||||
cfg.large_data_handler = &nop_lp_handler;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, tracker);
|
||||
auto cf = make_lw_shared<column_family>(s, column_family_test_config(), column_family::no_commitlog(), *cm, *cl_stats, tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
std::unordered_map<shard_id, std::vector<mutation>> muts;
|
||||
static constexpr auto keys_per_shard = 1000u;
|
||||
|
||||
@@ -893,11 +893,10 @@ SEASTAR_TEST_CASE(reshuffle) {
|
||||
cm->start();
|
||||
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
column_family::config cfg;
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.datadir = generation_dir;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.large_data_handler = &nop_lp_handler;
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto cf = make_lw_shared<column_family>(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
|
||||
cf->start();
|
||||
|
||||
@@ -29,13 +29,14 @@
|
||||
namespace sstables {
|
||||
|
||||
class test_env {
|
||||
sstables_manager _mgr;
|
||||
public:
|
||||
test_env() = default;
|
||||
|
||||
shared_sstable make_sstable(schema_ptr schema, sstring dir, unsigned long generation,
|
||||
sstable::version_types v, sstable::format_types f = sstable::format_types::big,
|
||||
size_t buffer_size = default_sstable_buffer_size(), gc_clock::time_point now = gc_clock::now()) {
|
||||
return sstables::make_sstable(std::move(schema), dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
|
||||
return _mgr.make_sstable(std::move(schema), dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
|
||||
}
|
||||
|
||||
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, unsigned long generation,
|
||||
|
||||
@@ -43,10 +43,12 @@ static const sstring some_keyspace("ks");
|
||||
static const sstring some_column_family("cf");
|
||||
|
||||
db::nop_large_data_handler nop_lp_handler;
|
||||
thread_local sstables::sstables_manager test_sstables_manager;
|
||||
|
||||
column_family::config column_family_test_config() {
|
||||
column_family::config cfg;
|
||||
cfg.large_data_handler = &nop_lp_handler;
|
||||
cfg.sstables_manager = &test_sstables_manager;
|
||||
return cfg;
|
||||
}
|
||||
|
||||
@@ -62,9 +64,9 @@ column_family_for_tests::column_family_for_tests(schema_ptr s)
|
||||
: _data(make_lw_shared<data>())
|
||||
{
|
||||
_data->s = s;
|
||||
_data->cfg = column_family_test_config();
|
||||
_data->cfg.enable_disk_writes = false;
|
||||
_data->cfg.enable_commitlog = false;
|
||||
_data->cfg.large_data_handler = &nop_lp_handler;
|
||||
_data->cf = make_lw_shared<column_family>(_data->s, _data->cfg, column_family::no_commitlog(), _data->cm, _data->cl_stats, _data->tracker);
|
||||
_data->cf->mark_ready_for_writes();
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include "cell_locking.hh"
|
||||
#include "sstables/compaction_manager.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
|
||||
// Includes: database, auth, storage_service
|
||||
class storage_service_for_tests {
|
||||
|
||||
Reference in New Issue
Block a user