sstables: introduce sstables_manager

The goal of the sstables manager is to track and manage sstables life-cycle.
There is a sstable manager instance per database and it is passed to each column-family
(and test environment) on construction.
All sstables created, loaded, and deleted pass through the sstables manager.

The manager will make sure consumers of sstables are in sync so that sstables
will not be deleted while in use.

Refs #4149

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2019-02-12 07:17:49 +02:00
parent b50c041aa2
commit eebc3701a5
14 changed files with 135 additions and 29 deletions

View File

@@ -494,6 +494,7 @@ scylla_core = (['database.cc',
'compress.cc',
'sstables/mp_row_consumer.cc',
'sstables/sstables.cc',
'sstables/sstables_manager.cc',
'sstables/mc/writer.cc',
'sstables/sstable_version.cc',
'sstables/compress.cc',

View File

@@ -40,6 +40,7 @@
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/compaction.hh"
#include "sstables/remove.hh"
#include <boost/range/adaptor/transformed.hpp>
@@ -224,6 +225,7 @@ database::database(const db::config& cfg, database_config dbcfg)
_cfg->compaction_large_row_warning_threshold_mb()*1024*1024,
_cfg->compaction_large_cell_warning_threshold_mb()*1024*1024))
, _nop_large_data_handler(std::make_unique<db::nop_large_data_handler>())
, _sstables_manager(std::make_unique<sstables::sstables_manager>())
, _result_memory_limiter(dbcfg.available_memory / 10)
, _data_listeners(std::make_unique<db::data_listeners>(*this))
{
@@ -907,6 +909,7 @@ keyspace::make_column_family_config(const schema& s, const database& db) const {
cfg.large_data_handler = db.get_large_data_handler();
}
cfg.sstables_manager = &db.get_sstables_manager();
cfg.view_update_concurrency_semaphore = _config.view_update_concurrency_semaphore;
cfg.view_update_concurrency_semaphore_limit = _config.view_update_concurrency_semaphore_limit;
cfg.data_listeners = &db.data_listeners();

View File

@@ -113,6 +113,7 @@ class sstable;
class entry_descriptor;
class compaction_descriptor;
class foreign_sstable_open_info;
class sstables_manager;
}
@@ -336,6 +337,7 @@ public:
seastar::scheduling_group streaming_scheduling_group;
bool enable_metrics_reporting = false;
db::large_data_handler* large_data_handler;
sstables::sstables_manager* sstables_manager;
db::timeout_semaphore* view_update_concurrency_semaphore;
size_t view_update_concurrency_semaphore_limit;
db::data_listeners* data_listeners = nullptr;
@@ -919,6 +921,11 @@ public:
return _config.large_data_handler;
}
sstables::sstables_manager& get_sstables_manager() const {
assert(_config.sstables_manager);
return *_config.sstables_manager;
}
future<> populate_views(
std::vector<view_ptr>,
dht::token base_token,
@@ -1286,6 +1293,8 @@ private:
std::unique_ptr<db::large_data_handler> _large_data_handler;
std::unique_ptr<db::large_data_handler> _nop_large_data_handler;
std::unique_ptr<sstables::sstables_manager> _sstables_manager;
query::result_memory_limiter _result_memory_limiter;
friend db::data_listeners;
@@ -1443,6 +1452,11 @@ public:
return _nop_large_data_handler.get();
}
sstables::sstables_manager& get_sstables_manager() const {
assert(_sstables_manager);
return *_sstables_manager;
}
future<> flush_all_memtables();
// See #937. Truncation now requires a callback to get a time stamp

View File

@@ -173,12 +173,6 @@ future<> await_background_jobs_on_all_shards() {
});
}
shared_sstable
make_sstable(schema_ptr schema, sstring dir, int64_t generation, sstable_version_types v, sstable_format_types f, gc_clock::time_point now,
io_error_handler_gen error_handler_gen, size_t buffer_size) {
return make_lw_shared<sstable>(std::move(schema), std::move(dir), generation, v, f, now, std::move(error_handler_gen), buffer_size);
}
std::unordered_map<sstable::version_types, sstring, enum_hash<sstable::version_types>> sstable::_version_string = {
{ sstable::version_types::ka , "ka" },
{ sstable::version_types::la , "la" },

View File

@@ -124,13 +124,6 @@ struct sstable_writer_config {
utils::UUID run_identifier = utils::make_random_uuid();
};
static constexpr inline size_t default_sstable_buffer_size() {
return 128 * 1024;
}
shared_sstable make_sstable(schema_ptr schema, sstring dir, int64_t generation, sstable_version_types v, sstable_format_types f, gc_clock::time_point now = gc_clock::now(),
io_error_handler_gen error_handler_gen = default_io_error_handler_gen(), size_t buffer_size = default_sstable_buffer_size());
class sstable : public enable_lw_shared_from_this<sstable> {
friend ::sstable_assertions;
public:

View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "log.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstables.hh"
namespace sstables {
logging::logger smlogger("sstables_manager");
shared_sstable sstables_manager::make_sstable(schema_ptr schema,
sstring dir,
int64_t generation,
sstable_version_types v,
sstable_format_types f,
gc_clock::time_point now,
io_error_handler_gen error_handler_gen,
size_t buffer_size) {
return make_lw_shared<sstable>(std::move(schema), std::move(dir), generation, v, f, now, std::move(error_handler_gen), buffer_size);
}
} // namespace sstables

View File

@@ -0,0 +1,59 @@
/*
* Copyright (C) 2019 ScyllaDB
*
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sharded.hh>
#include "disk-error-handler.hh"
#include "gc_clock.hh"
#include "sstables/shareable_components.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/version.hh"
#include "sstables/component_type.hh"
namespace sstables {
using schema_ptr = lw_shared_ptr<const schema>;
using shareable_components_ptr = lw_shared_ptr<shareable_components>;
static constexpr inline size_t default_sstable_buffer_size() {
return 128 * 1024;
}
class sstables_manager {
public:
sstables_manager() = default;
// Constructs a shared sstable
shared_sstable make_sstable(schema_ptr schema,
sstring dir,
int64_t generation,
sstable_version_types v,
sstable_format_types f,
gc_clock::time_point now = gc_clock::now(),
io_error_handler_gen error_handler_gen = default_io_error_handler_gen(),
size_t buffer_size = default_sstable_buffer_size());
};
} // namespace sstables

View File

@@ -21,6 +21,7 @@
#include "database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "service/priority_manager.hh"
#include "db/view/view_updating_consumer.hh"
#include "cell_locking.hh"
@@ -598,12 +599,12 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
io_error_handler_gen error_handler_gen) {
return sstables::make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen);
return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen);
}
sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation,
sstables::sstable_version_types v, sstables::sstable_format_types f) {
return sstables::make_sstable(_schema, dir, generation, v, f);
return get_sstables_manager().make_sstable(_schema, dir, generation, v, f);
}
sstables::shared_sstable table::make_sstable(sstring dir) {
@@ -1542,7 +1543,8 @@ table::make_streaming_memtable_big_list(streaming_memtable_big& smb) {
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.streaming_dirty_memory_manager, _config.streaming_scheduling_group);
}
table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker)
table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager,
cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker)
: _schema(std::move(schema))
, _config(std::move(config))
, _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()))

View File

@@ -327,12 +327,11 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
{{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
auto cf_stats = make_lw_shared<::cf_stats>();
column_family::config cfg;
column_family::config cfg = column_family_test_config();
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
cfg.large_data_handler = &nop_lp_handler;
with_column_family(s, cfg, [s] (column_family& cf) {
const column_definition& r1_col = *s->get_column_definition("r1");
@@ -379,13 +378,12 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
auto cf_stats = make_lw_shared<::cf_stats>();
column_family::config cfg;
column_family::config cfg = column_family_test_config();
cfg.enable_disk_reads = true;
cfg.enable_disk_writes = true;
cfg.enable_cache = true;
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
cfg.large_data_handler = &nop_lp_handler;
return with_column_family(s, cfg, [s](column_family& cf) {
return seastar::async([s, &cf] {
@@ -459,12 +457,12 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
auto cf_stats = make_lw_shared<::cf_stats>();
column_family::config cfg;
column_family::config cfg = column_family_test_config();
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
cfg.large_data_handler = &nop_lp_handler;
with_column_family(s, cfg, [s] (auto& cf) mutable {
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;

View File

@@ -69,9 +69,7 @@ void run_sstable_resharding_test() {
auto s = get_schema();
auto cm = make_lw_shared<compaction_manager>();
auto cl_stats = make_lw_shared<cell_locker_stats>();
column_family::config cfg;
cfg.large_data_handler = &nop_lp_handler;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, tracker);
auto cf = make_lw_shared<column_family>(s, column_family_test_config(), column_family::no_commitlog(), *cm, *cl_stats, tracker);
cf->mark_ready_for_writes();
std::unordered_map<shard_id, std::vector<mutation>> muts;
static constexpr auto keys_per_shard = 1000u;

View File

@@ -893,11 +893,10 @@ SEASTAR_TEST_CASE(reshuffle) {
cm->start();
auto tracker = make_lw_shared<cache_tracker>();
column_family::config cfg;
column_family::config cfg = column_family_test_config();
cfg.datadir = generation_dir;
cfg.enable_commitlog = false;
cfg.enable_incremental_backups = false;
cfg.large_data_handler = &nop_lp_handler;
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto cf = make_lw_shared<column_family>(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
cf->start();

View File

@@ -29,13 +29,14 @@
namespace sstables {
class test_env {
sstables_manager _mgr;
public:
test_env() = default;
shared_sstable make_sstable(schema_ptr schema, sstring dir, unsigned long generation,
sstable::version_types v, sstable::format_types f = sstable::format_types::big,
size_t buffer_size = default_sstable_buffer_size(), gc_clock::time_point now = gc_clock::now()) {
return sstables::make_sstable(std::move(schema), dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
return _mgr.make_sstable(std::move(schema), dir, generation, v, f, now, default_io_error_handler_gen(), buffer_size);
}
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, unsigned long generation,

View File

@@ -43,10 +43,12 @@ static const sstring some_keyspace("ks");
static const sstring some_column_family("cf");
db::nop_large_data_handler nop_lp_handler;
thread_local sstables::sstables_manager test_sstables_manager;
column_family::config column_family_test_config() {
column_family::config cfg;
cfg.large_data_handler = &nop_lp_handler;
cfg.sstables_manager = &test_sstables_manager;
return cfg;
}
@@ -62,9 +64,9 @@ column_family_for_tests::column_family_for_tests(schema_ptr s)
: _data(make_lw_shared<data>())
{
_data->s = s;
_data->cfg = column_family_test_config();
_data->cfg.enable_disk_writes = false;
_data->cfg.enable_commitlog = false;
_data->cfg.large_data_handler = &nop_lp_handler;
_data->cf = make_lw_shared<column_family>(_data->s, _data->cfg, column_family::no_commitlog(), _data->cm, _data->cl_stats, _data->tracker);
_data->cf->mark_ready_for_writes();
}

View File

@@ -32,6 +32,7 @@
#include "cell_locking.hh"
#include "sstables/compaction_manager.hh"
#include "db/large_data_handler.hh"
#include "sstables/sstables_manager.hh"
// Includes: database, auth, storage_service
class storage_service_for_tests {