replica: wrap column families related maps into tables_metadata

As a preparation for ensuring access safety for column families
related maps, add tables_metadata, access to members of which
would be protected by rwlock.
This commit is contained in:
Aleksandra Martyniuk
2023-07-18 13:06:24 +02:00
parent 395ce87eff
commit 52afd9d42d
21 changed files with 74 additions and 73 deletions

View File

@@ -135,7 +135,7 @@ static future<json::json_return_type> get_cf_histogram(http_context& ctx, const
static future<json::json_return_type> get_cf_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) {
std::function<utils::ihistogram(const replica::database&)> fun = [f] (const replica::database& db) {
utils::ihistogram res;
for (auto i : db.get_column_families()) {
for (auto i : db.get_tables_metadata()._column_families) {
res += (i.second->get_stats().*f).hist;
}
return res;
@@ -162,7 +162,7 @@ static future<json::json_return_type> get_cf_rate_and_histogram(http_context& c
static future<json::json_return_type> get_cf_rate_and_histogram(http_context& ctx, utils::timed_rate_moving_average_summary_and_histogram replica::column_family_stats::*f) {
std::function<utils::rate_moving_average_and_histogram(const replica::database&)> fun = [f] (const replica::database& db) {
utils::rate_moving_average_and_histogram res;
for (auto i : db.get_column_families()) {
for (auto i : db.get_tables_metadata()._column_families) {
res += (i.second->get_stats().*f).rate();
}
return res;
@@ -306,7 +306,7 @@ ratio_holder filter_recent_false_positive_as_ratio_holder(const sstables::shared
void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace>& sys_ks) {
cf::get_column_family_name.set(r, [&ctx] (const_req req){
std::vector<sstring> res;
for (auto i: ctx.db.local().get_column_families_mapping()) {
for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) {
res.push_back(i.first.first + ":" + i.first.second);
}
return res;
@@ -314,7 +314,7 @@ void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace
cf::get_column_family.set(r, [&ctx] (std::unique_ptr<http::request> req){
std::list<cf::column_family_info> res;
for (auto i: ctx.db.local().get_column_families_mapping()) {
for (auto i: ctx.db.local().get_tables_metadata()._ks_cf_to_uuid) {
cf::column_family_info info;
info.ks = i.first.first;
info.cf = i.first.second;

View File

@@ -68,7 +68,7 @@ struct map_reduce_column_families_locally {
std::function<std::unique_ptr<std::any>(std::unique_ptr<std::any>, std::unique_ptr<std::any>)> reducer;
future<std::unique_ptr<std::any>> operator()(replica::database& db) const {
auto res = seastar::make_lw_shared<std::unique_ptr<std::any>>(std::make_unique<std::any>(init));
return do_for_each(db.get_column_families(), [res, this](const std::pair<table_id, seastar::lw_shared_ptr<replica::table>>& i) {
return do_for_each(db.get_tables_metadata()._column_families, [res, this](const std::pair<table_id, seastar::lw_shared_ptr<replica::table>>& i) {
*res = reducer(std::move(*res), mapper(*i.second.get()));
}).then([res] {
return std::move(*res);

View File

@@ -68,7 +68,7 @@ void set_compaction_manager(http_context& ctx, routes& r) {
cm::get_pending_tasks_by_table.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return ctx.db.map_reduce0([](replica::database& db) {
return do_with(std::unordered_map<std::pair<sstring, sstring>, uint64_t, utils::tuple_hash>(), [&db](std::unordered_map<std::pair<sstring, sstring>, uint64_t, utils::tuple_hash>& tasks) {
return do_for_each(db.get_column_families(), [&tasks](const std::pair<table_id, seastar::lw_shared_ptr<replica::table>>& i) -> future<> {
return do_for_each(db.get_tables_metadata()._column_families, [&tasks](const std::pair<table_id, seastar::lw_shared_ptr<replica::table>>& i) -> future<> {
replica::table& cf = *i.second.get();
tasks[std::make_pair(cf.schema()->ks_name(), cf.schema()->cf_name())] = cf.estimate_pending_compactions();
return make_ready_future<>();

View File

@@ -980,7 +980,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ks.set_incremental_backups(value);
}
for (auto& pair: db.get_column_families()) {
for (auto& pair: db.get_tables_metadata()._column_families) {
auto cf_ptr = pair.second;
cf_ptr->set_incremental_backups(value);
}
@@ -1258,7 +1258,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
auto& ext = db.get_config().extensions();
for (auto& t : db.get_column_families() | boost::adaptors::map_values) {
for (auto& t : db.get_tables_metadata()._column_families | boost::adaptors::map_values) {
auto& schema = t->schema();
if ((ks.empty() || ks == schema->ks_name()) && (cf.empty() || cf == schema->cf_name())) {
// at most Nsstables long

View File

@@ -641,7 +641,7 @@ future<> generation_service::maybe_rewrite_streams_descriptions() {
// For each CDC log table get the TTL setting (from CDC options) and the table's creation time
std::vector<time_and_ttl> times_and_ttls;
for (auto& [_, cf] : _db.get_column_families()) {
for (auto& [_, cf] : _db.get_tables_metadata()._column_families) {
auto& s = *cf->schema();
auto base = cdc::get_base_table(_db, s.ks_name(), s.cf_name());
if (!base) {

View File

@@ -126,7 +126,7 @@ future<> db::commitlog_replayer::impl::init() {
}
}, [this](replica::database& db) {
return do_with(shard_rpm_map{}, [this, &db](shard_rpm_map& map) {
return parallel_for_each(db.get_column_families(), [this, &map](auto& cfp) {
return parallel_for_each(db.get_tables_metadata()._column_families, [this, &map](auto& cfp) {
auto uuid = cfp.first;
// We do this on each cpu, for each CF, which technically is a little wasteful, but the values are
// cached, this is only startup, and it makes the code easier.
@@ -156,7 +156,7 @@ future<> db::commitlog_replayer::impl::init() {
// existing sstables-per-shard.
// So, go through all CF:s and check, if a shard mapping does not
// have data for it, assume we must set global pos to zero.
for (auto&p : _db.local().get_column_families()) {
for (auto&p : _db.local().get_tables_metadata()._column_families) {
for (auto&p1 : _rpm) { // for each shard
if (!p1.second.contains(p.first)) {
_min_pos[p1.first] = replay_position();

View File

@@ -265,7 +265,7 @@ void view_update_generator::setup_metrics() {
}
void view_update_generator::discover_staging_sstables() {
for (auto& x : _db.get_column_families()) {
for (auto& x : _db.get_tables_metadata()._column_families) {
auto t = x.second->shared_from_this();
for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) {
if (sst->requires_view_building()) {

View File

@@ -283,7 +283,7 @@ public:
const auto snapshots_by_tables = co_await _db.map_reduce(snapshot_reducer(), [ks_name_ = ks_data.name] (replica::database& db) mutable -> future<snapshots_by_tables_map> {
auto ks_name = std::move(ks_name_);
snapshots_by_tables_map snapshots_by_tables;
for (auto& [_, table] : db.get_column_families()) {
for (auto& [_, table] : db.get_tables_metadata()._column_families) {
if (table->schema()->ks_name() != ks_name) {
continue;
}
@@ -433,7 +433,7 @@ private:
};
co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) {
T val = {};
for (auto& [_, table] : db.get_column_families()) {
for (auto& [_, table] : db.get_tables_metadata()._column_families) {
val = reduce(val, map(*table));
}
return val;
@@ -560,7 +560,7 @@ public:
res.total = occupancy.total_space();
res.free = occupancy.free_space();
res.entries = db.row_cache_tracker().partitions();
for (const auto& [_, t] : db.get_column_families()) {
for (const auto& [_, t] : db.get_tables_metadata()._column_families) {
auto& cache_stats = t->get_row_cache().stats();
res.hits += cache_stats.hits.count();
res.misses += cache_stats.misses.count();

View File

@@ -1346,7 +1346,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// Needs to happen before replaying the schema commitlog, which interprets
// replay position in the truncation record.
// Needs to happen before system_keyspace::setup(), which reads truncation records.
for (auto&& e : db.local().get_column_families()) {
for (auto&& e : db.local().get_tables_metadata()._column_families) {
auto table_ptr = e.second;
if (table_ptr->schema()->ks_name() == db::schema_tables::NAME) {
if (table_ptr->get_truncation_record() != db_clock::time_point::min()) {
@@ -1405,7 +1405,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}
db.invoke_on_all([] (replica::database& db) {
for (auto& x : db.get_column_families()) {
for (auto& x : db.get_tables_metadata()._column_families) {
replica::table& t = *(x.second);
t.enable_auto_compaction();
}
@@ -1423,7 +1423,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// streaming
db.invoke_on_all([] (replica::database& db) {
for (auto& x : db.get_column_families()) {
for (auto& x : db.get_tables_metadata()._column_families) {
replica::column_family& cf = *(x.second);
cf.trigger_compaction();
}

View File

@@ -127,7 +127,7 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo
}
static size_t get_nr_tables(const replica::database& db, const sstring& keyspace) {
auto& m = db.get_column_families_mapping();
auto& m = db.get_tables_metadata()._ks_cf_to_uuid;
return std::count_if(m.begin(), m.end(), [&keyspace] (auto& e) {
return e.first.first == keyspace;
});
@@ -135,7 +135,7 @@ static size_t get_nr_tables(const replica::database& db, const sstring& keyspace
static std::vector<sstring> list_column_families(const replica::database& db, const sstring& keyspace) {
std::vector<sstring> ret;
for (auto &&e : db.get_column_families_mapping()) {
for (auto &&e : db.get_tables_metadata()._ks_cf_to_uuid) {
if (e.first.first == keyspace) {
ret.push_back(e.first.second);
}

View File

@@ -3050,7 +3050,7 @@ future<> repair_service::cleanup_history(tasks::task_id repair_id) {
}
future<> repair_service::load_history() {
auto tables = get_db().local().get_column_families();
auto tables = get_db().local().get_tables_metadata()._column_families;
for (const auto& x : tables) {
auto& table_uuid = x.first;
auto& table = x.second;

View File

@@ -66,7 +66,7 @@ public:
}
virtual std::vector<data_dictionary::table> get_tables(data_dictionary::database db) const override {
std::vector<data_dictionary::table> ret;
auto&& tables = unwrap(db).get_column_families();
auto&& tables = unwrap(db).get_tables_metadata()._column_families;
ret.reserve(tables.size());
for (auto&& [uuid, cf] : tables) {
ret.push_back(wrap(*cf));

View File

@@ -272,7 +272,7 @@ void database::setup_scylla_memory_diagnostics_producer() {
for (const auto& [name, op_count_getter] : phased_barriers) {
writeln(" {} (top 10):\n", name);
auto total = 0;
for (const auto& [count, table_list] : phased_barrier_top_10_counts(_column_families, op_count_getter)) {
for (const auto& [count, table_list] : phased_barrier_top_10_counts(_tables_metadata._column_families, op_count_getter)) {
total += count;
writeln(" {}", count);
if (table_list.empty()) {
@@ -863,13 +863,13 @@ database::init_commitlog() {
return db::commitlog::create_commitlog(db::commitlog::config::from_db_config(_cfg, _dbcfg.commitlog_scheduling_group, _dbcfg.available_memory)).then([this](db::commitlog&& log) {
_commitlog = std::make_unique<db::commitlog>(std::move(log));
_commitlog->add_flush_handler([this](db::cf_id_type id, db::replay_position pos) {
if (!_column_families.contains(id)) {
if (!_tables_metadata._column_families.contains(id)) {
// the CF has been removed.
_commitlog->discard_completed_segments(id);
return;
}
// Initiate a background flush. Waited upon in `stop()`.
(void)_column_families[id]->flush(pos);
(void)_tables_metadata._column_families[id]->flush(pos);
}).release(); // we have longer life time than CL. Ignore reg anchor
});
}
@@ -959,13 +959,13 @@ void database::maybe_init_schema_commitlog() {
_schema_commitlog = std::make_unique<db::commitlog>(db::commitlog::create_commitlog(std::move(c)).get0());
_schema_commitlog->add_flush_handler([this] (db::cf_id_type id, db::replay_position pos) {
if (!_column_families.contains(id)) {
if (!_tables_metadata._column_families.contains(id)) {
// the CF has been removed.
_schema_commitlog->discard_completed_segments(id);
return;
}
// Initiate a background flush. Waited upon in `stop()`.
(void)_column_families[id]->flush(pos);
(void)_tables_metadata._column_families[id]->flush(pos);
}).release();
}
@@ -1017,18 +1017,18 @@ future<> database::add_column_family(keyspace& ks, schema_ptr schema, column_fam
cf->set_durable_writes(ks.metadata()->durable_writes());
auto uuid = schema->id();
if (_column_families.contains(uuid)) {
if (_tables_metadata._column_families.contains(uuid)) {
throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped");
}
auto kscf = std::make_pair(schema->ks_name(), schema->cf_name());
if (_ks_cf_to_uuid.contains(kscf)) {
if (_tables_metadata._ks_cf_to_uuid.contains(kscf)) {
throw std::invalid_argument("Column family " + schema->cf_name() + " exists");
}
ks.add_or_update_column_family(schema);
cf->start();
schema->registry_entry()->set_table(cf->weak_from_this());
_column_families.emplace(uuid, std::move(cf));
_ks_cf_to_uuid.emplace(std::move(kscf), uuid);
_tables_metadata._column_families.emplace(uuid, std::move(cf));
_tables_metadata._ks_cf_to_uuid.emplace(std::move(kscf), uuid);
if (schema->is_view()) {
find_column_family(schema->view_info()->base_id()).add_or_update_view(view_ptr(schema));
}
@@ -1065,9 +1065,9 @@ future<> database::remove(table& cf) noexcept {
auto s = cf.schema();
auto& ks = find_keyspace(s->ks_name());
cf.deregister_metrics();
_column_families.erase(s->id());
_tables_metadata._column_families.erase(s->id());
ks.metadata()->remove_column_family(s);
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
_tables_metadata._ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
if (s->is_view()) {
try {
find_column_family(s->view_info()->base_id()).remove_view(view_ptr(s));
@@ -1149,7 +1149,7 @@ future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstri
const table_id& database::find_uuid(std::string_view ks, std::string_view cf) const {
try {
return _ks_cf_to_uuid.at(std::make_pair(ks, cf));
return _tables_metadata._ks_cf_to_uuid.at(std::make_pair(ks, cf));
} catch (std::out_of_range&) {
throw no_such_column_family(ks, cf);
}
@@ -1245,7 +1245,7 @@ std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> databa
std::vector<lw_shared_ptr<column_family>> database::get_non_system_column_families() const {
return boost::copy_range<std::vector<lw_shared_ptr<column_family>>>(
get_column_families()
get_tables_metadata()._column_families
| boost::adaptors::map_values
| boost::adaptors::filtered([](const lw_shared_ptr<column_family>& cf) {
return !is_system_keyspace(cf->schema()->ks_name());
@@ -1272,7 +1272,7 @@ const column_family& database::find_column_family(std::string_view ks_name, std:
column_family& database::find_column_family(const table_id& uuid) {
try {
return *_column_families.at(uuid);
return *_tables_metadata._column_families.at(uuid);
} catch (...) {
throw no_such_column_family(uuid);
}
@@ -1280,14 +1280,14 @@ column_family& database::find_column_family(const table_id& uuid) {
const column_family& database::find_column_family(const table_id& uuid) const {
try {
return *_column_families.at(uuid);
return *_tables_metadata._column_families.at(uuid);
} catch (...) {
throw no_such_column_family(uuid);
}
}
bool database::column_family_exists(const table_id& uuid) const {
return _column_families.contains(uuid);
return _tables_metadata._column_families.contains(uuid);
}
future<>
@@ -1411,7 +1411,7 @@ schema_ptr database::find_schema(const table_id& uuid) const {
}
bool database::has_schema(std::string_view ks_name, std::string_view cf_name) const {
return _ks_cf_to_uuid.contains(std::make_pair(ks_name, cf_name));
return _tables_metadata._ks_cf_to_uuid.contains(std::make_pair(ks_name, cf_name));
}
std::vector<view_ptr> database::get_views() const {
@@ -1456,7 +1456,7 @@ future<> database::create_keyspace_on_all_shards(sharded<database>& sharded_db,
future<>
database::drop_caches() const {
std::unordered_map<table_id, lw_shared_ptr<column_family>> tables = get_column_families();
std::unordered_map<table_id, lw_shared_ptr<column_family>> tables = get_tables_metadata()._column_families;
for (auto&& e : tables) {
table& t = *e.second;
co_await t.get_row_cache().invalidate(row_cache::external_updater([] {}));
@@ -1806,7 +1806,7 @@ std::ostream& operator<<(std::ostream& out, const column_family& cf) {
std::ostream& operator<<(std::ostream& out, const database& db) {
out << "{\n";
for (auto&& e : db._column_families) {
for (auto&& e : db._tables_metadata._column_families) {
auto&& cf = *e.second;
out << "(" << e.first.to_sstring() << ", " << cf.schema()->cf_name() << ", " << cf.schema()->ks_name() << "): " << cf << "\n";
}
@@ -2314,7 +2314,7 @@ schema_ptr database::find_indexed_table(const sstring& ks_name, const sstring& i
future<> database::close_tables(table_kind kind_to_close) {
auto b = defer([this] { _stop_barrier.abort(); });
co_await coroutine::parallel_for_each(_column_families, [this, kind_to_close](auto& val_pair) -> future<> {
co_await coroutine::parallel_for_each(_tables_metadata._column_families, [this, kind_to_close](auto& val_pair) -> future<> {
auto& s = val_pair.second->schema();
table_kind k = is_system_table(*s) || _cfg.extensions().is_extension_internal_keyspace(s->ks_name()) ? table_kind::system : table_kind::user;
if (k == kind_to_close) {
@@ -2403,7 +2403,7 @@ future<> database::stop() {
}
future<> database::flush_all_memtables() {
return parallel_for_each(_column_families, [] (auto& cfp) {
return parallel_for_each(_tables_metadata._column_families, [] (auto& cfp) {
return cfp.second->flush();
});
}
@@ -2790,8 +2790,8 @@ future<> database::clear_snapshot(sstring tag, std::vector<sstring> keyspace_nam
// and has no remaining snapshots
if (!has_snapshots) {
auto [cf_name, cf_uuid] = extract_cf_name_and_uuid(table_ent->name);
const auto& it = _ks_cf_to_uuid.find(std::make_pair(ks_name, cf_name));
auto dropped = (it == _ks_cf_to_uuid.cend()) || (cf_uuid != it->second);
const auto& it = _tables_metadata._ks_cf_to_uuid.find(std::make_pair(ks_name, cf_name));
auto dropped = (it == _tables_metadata._ks_cf_to_uuid.cend()) || (cf_uuid != it->second);
if (dropped) {
dblog.info("Removing dropped table dir {}", table_dir);
sstables::remove_table_directory_if_has_no_snapshots(table_dir).get();
@@ -2804,7 +2804,7 @@ future<> database::clear_snapshot(sstring tag, std::vector<sstring> keyspace_nam
}
future<> database::flush_non_system_column_families() {
auto non_system_cfs = get_column_families() | boost::adaptors::filtered([this] (auto& uuid_and_cf) {
auto non_system_cfs = get_tables_metadata()._column_families | boost::adaptors::filtered([this] (auto& uuid_and_cf) {
auto cf = uuid_and_cf.second;
auto& ks = cf->schema()->ks_name();
return !is_system_keyspace(ks) && !_cfg.extensions().is_extension_internal_keyspace(ks);
@@ -2826,7 +2826,7 @@ future<> database::flush_non_system_column_families() {
}
future<> database::flush_system_column_families() {
auto system_cfs = get_column_families() | boost::adaptors::filtered([this] (auto& uuid_and_cf) {
auto system_cfs = get_tables_metadata()._column_families | boost::adaptors::filtered([this] (auto& uuid_and_cf) {
auto cf = uuid_and_cf.second;
auto& ks = cf->schema()->ks_name();
return is_system_keyspace(ks) || _cfg.extensions().is_extension_internal_keyspace(ks);

View File

@@ -1300,6 +1300,15 @@ public:
}
};
using ks_cf_t = std::pair<sstring, sstring>;
using ks_cf_to_uuid_t =
flat_hash_map<ks_cf_t, table_id, utils::tuple_hash, string_pair_eq>;
class tables_metadata {
rwlock _cf_lock;
public: // FIXME: change member access to private.
std::unordered_map<table_id, lw_shared_ptr<column_family>> _column_families;
ks_cf_to_uuid_t _ks_cf_to_uuid;
};
private:
replica::cf_stats _cf_stats;
static constexpr size_t max_count_concurrent_reads{100};
@@ -1366,10 +1375,7 @@ private:
db::per_partition_rate_limit::info> _apply_stage;
flat_hash_map<sstring, keyspace> _keyspaces;
std::unordered_map<table_id, lw_shared_ptr<column_family>> _column_families;
using ks_cf_to_uuid_t =
flat_hash_map<std::pair<sstring, sstring>, table_id, utils::tuple_hash, string_pair_eq>;
ks_cf_to_uuid_t _ks_cf_to_uuid;
tables_metadata _tables_metadata;
std::unique_ptr<db::commitlog> _commitlog;
std::unique_ptr<db::commitlog> _schema_commitlog;
utils::updateable_value_source<table_schema_version> _version;
@@ -1646,23 +1652,18 @@ public:
return _keyspaces;
}
const std::unordered_map<table_id, lw_shared_ptr<column_family>>& get_column_families() const {
return _column_families;
const tables_metadata& get_tables_metadata() const {
return _tables_metadata;
}
std::unordered_map<table_id, lw_shared_ptr<column_family>>& get_column_families() {
return _column_families;
tables_metadata& get_tables_metadata() {
return _tables_metadata;
}
std::vector<lw_shared_ptr<column_family>> get_non_system_column_families() const;
std::vector<view_ptr> get_views() const;
const ks_cf_to_uuid_t&
get_column_families_mapping() const {
return _ks_cf_to_uuid;
}
const db::config& get_config() const {
return _cfg;
}

View File

@@ -472,7 +472,7 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
dblog.info("Populating Keyspace {}", ks_name);
auto& ks = i->second;
auto& column_families = db.local().get_column_families();
auto& column_families = db.local().get_tables_metadata()._column_families;
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values, [&] (schema_ptr s) -> future<> {
auto uuid = s->id();

View File

@@ -1209,7 +1209,7 @@ def find_dbs():
def for_each_table(db=None):
if not db:
db = find_db()
cfs = db['_column_families']
cfs = db['_tables_metadata']['_column_families']
for (key, value) in unordered_map(cfs):
yield value['_p'].reinterpret_cast(lookup_type(['replica::table', 'column_family'])[1].pointer()).dereference() # it's a lw_shared_ptr
@@ -1511,7 +1511,7 @@ class scylla_tables(gdb.Command):
for shard in shards:
db = find_db(shard)
cfs = db['_column_families']
cfs = db['_tables_metadata']['_column_families']
for (key, value) in unordered_map(cfs):
value = seastar_lw_shared_ptr(value).get().dereference()
schema = schema_ptr(value['_schema'])
@@ -1533,7 +1533,7 @@ class scylla_table(gdb.Command):
def _find_table(self, ks, cf):
db = find_db()
cfs = db['_column_families']
cfs = db['_tables_metadata']['_column_families']
for (key, value) in unordered_map(cfs):
value = seastar_lw_shared_ptr(value).get().dereference()
schema = schema_ptr(value['_schema'])
@@ -1900,7 +1900,7 @@ class seastar_lw_shared_ptr():
def all_tables(db):
"""Returns pointers to table objects which exist on current shard"""
for (key, value) in unordered_map(db['_column_families']):
for (key, value) in unordered_map(db['_tables_metadata']['_column_families']):
yield seastar_lw_shared_ptr(value).get()

View File

@@ -78,7 +78,7 @@ void load_broadcaster::start_broadcasting() {
llogger.debug("Disseminating load info ...");
_done = _db.map_reduce0([](replica::database& db) {
int64_t res = 0;
for (auto i : db.get_column_families()) {
for (auto i : db.get_tables_metadata()._column_families) {
res += i.second->get_stats().live_disk_space_used;
}
return res;
@@ -137,7 +137,7 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
};
auto cf_to_cache_hit_stats = [non_system_filter] (replica::database& db) {
return boost::copy_range<std::unordered_map<table_id, stat>>(db.get_column_families() | boost::adaptors::filtered(non_system_filter) |
return boost::copy_range<std::unordered_map<table_id, stat>>(db.get_tables_metadata()._column_families | boost::adaptors::filtered(non_system_filter) |
boost::adaptors::transformed([] (const std::pair<table_id, lw_shared_ptr<replica::column_family>>& cf) {
auto& stats = cf.second->get_row_cache().stats();
return std::make_pair(cf.first, stat{float(stats.reads_with_no_misses.rate().rates[0]), float(stats.reads_with_misses.rate().rates[0])});
@@ -159,8 +159,8 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
// set calculated rates on all shards
return _db.invoke_on_all([this, cpuid = this_shard_id()] (replica::database& db) {
return do_for_each(_rates, [this, cpuid, &db] (auto&& r) mutable {
auto it = db.get_column_families().find(r.first);
if (it == db.get_column_families().end()) { // a table may be added before map/reduce completes and this code runs
auto it = db.get_tables_metadata()._column_families.find(r.first);
if (it == db.get_tables_metadata()._column_families.end()) { // a table may be added before map/reduce completes and this code runs
return;
}
auto& cf = *it;

View File

@@ -3093,7 +3093,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
co_await container().invoke_on_all([&] (storage_service& ss) {
auto& db = ss._db.local();
auto tmptr = pending_token_metadata_ptr[this_shard_id()];
for (auto&& [id, cf] : db.get_column_families()) { // Safe because we iterate without preemption
for (auto&& [id, cf] : db.get_tables_metadata()._column_families) { // Safe because we iterate without preemption
auto rs = db.find_keyspace(cf->schema()->keypace_name()).get_replication_strategy_ptr();
locator::effective_replication_map_ptr erm;
if (auto pt_rs = rs->maybe_as_per_table()) {

View File

@@ -462,7 +462,7 @@ std::vector<replica::column_family*> stream_session::get_column_family_stores(co
std::vector<replica::column_family*> stores;
auto& db = manager().db();
if (column_families.empty()) {
for (auto& x : db.get_column_families()) {
for (auto& x : db.get_tables_metadata()._column_families) {
replica::column_family& cf = *(x.second);
auto cf_name = cf.schema()->cf_name();
auto ks_name = cf.schema()->ks_name();

View File

@@ -116,7 +116,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
// and the old sstable is deleted.
flush(e);
e.db().invoke_on_all([] (replica::database& dbi) {
return parallel_for_each(dbi.get_column_families(), [&dbi] (auto& table) {
return parallel_for_each(dbi.get_tables_metadata()._column_families, [&dbi] (auto& table) {
return dbi.get_compaction_manager().perform_major_compaction((table.second)->as_table_state());
});
}).get();

View File

@@ -860,7 +860,7 @@ public:
replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get();
db.invoke_on_all([] (replica::database& db) {
for (auto& x : db.get_column_families()) {
for (auto& x : db.get_tables_metadata()._column_families) {
replica::table& t = *(x.second);
t.enable_auto_compaction();
}