mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Revert "database: change find_column_families signature so it returns a lw_shared_ptr"
This reverts commit f3528ede65.
This commit is contained in:
@@ -58,7 +58,7 @@ future<> foreach_column_family(http_context& ctx, const sstring& name, function<
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
|
||||
return ctx.db.invoke_on_all([f, uuid](database& db) {
|
||||
f(*(db.find_column_family(uuid)));
|
||||
f(db.find_column_family(uuid));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -91,8 +91,8 @@ static future<json::json_return_type> get_cf_stats_sum(http_context& ctx, const
|
||||
// so to get an estimation of sum, we multiply the mean
|
||||
// with count. The information is gather in nano second,
|
||||
// but reported in micro
|
||||
auto cf = db.find_column_family(uuid);
|
||||
return ((cf->get_stats().*f).hist.count/1000.0) * (cf->get_stats().*f).hist.mean;
|
||||
column_family& cf = db.find_column_family(uuid);
|
||||
return ((cf.get_stats().*f).hist.count/1000.0) * (cf.get_stats().*f).hist.mean;
|
||||
}, 0.0, std::plus<double>()).then([](double res) {
|
||||
return make_ready_future<json::json_return_type>((int64_t)res);
|
||||
});
|
||||
@@ -110,7 +110,7 @@ static future<json::json_return_type> get_cf_histogram(http_context& ctx, const
|
||||
utils::timed_rate_moving_average_and_histogram column_family::stats::*f) {
|
||||
utils::UUID uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([f, uuid](const database& p) {
|
||||
return (p.find_column_family(uuid)->get_stats().*f).hist;},
|
||||
return (p.find_column_family(uuid).get_stats().*f).hist;},
|
||||
utils::ihistogram(),
|
||||
std::plus<utils::ihistogram>())
|
||||
.then([](const utils::ihistogram& val) {
|
||||
@@ -137,7 +137,7 @@ static future<json::json_return_type> get_cf_rate_and_histogram(http_context& c
|
||||
utils::timed_rate_moving_average_and_histogram column_family::stats::*f) {
|
||||
utils::UUID uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([f, uuid](const database& p) {
|
||||
return (p.find_column_family(uuid)->get_stats().*f).rate();},
|
||||
return (p.find_column_family(uuid).get_stats().*f).rate();},
|
||||
utils::rate_moving_average_and_histogram(),
|
||||
std::plus<utils::rate_moving_average_and_histogram>())
|
||||
.then([](const utils::rate_moving_average_and_histogram& val) {
|
||||
@@ -219,8 +219,8 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([uuid, total](database& db) {
|
||||
std::unordered_map<sstring, uint64_t> m;
|
||||
auto sstables = (total) ? db.find_column_family(uuid)->get_sstables_including_compacted_undeleted() :
|
||||
db.find_column_family(uuid)->get_sstables();
|
||||
auto sstables = (total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
|
||||
db.find_column_family(uuid).get_sstables();
|
||||
for (auto t : *sstables) {
|
||||
m[t->get_filename()] = t->bytes_on_disk();
|
||||
}
|
||||
@@ -723,7 +723,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
|
||||
cf::get_true_snapshots_size.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
auto uuid = get_uuid(req->param["name"], ctx.db.local());
|
||||
return ctx.db.local().find_column_family(uuid)->get_snapshot_details().then([](
|
||||
return ctx.db.local().find_column_family(uuid).get_snapshot_details().then([](
|
||||
const std::unordered_map<sstring, column_family::snapshot_details>& sd) {
|
||||
int64_t res = 0;
|
||||
for (auto i : sd) {
|
||||
@@ -861,8 +861,8 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
auto uuid = get_uuid(req->param["name"], ctx.db.local());
|
||||
|
||||
return ctx.db.map_reduce(sum_ratio<double>(), [uuid](database& db) {
|
||||
auto cf = db.find_column_family(uuid);
|
||||
return make_ready_future<double>(get_compression_ratio(*cf));
|
||||
column_family& cf = db.find_column_family(uuid);
|
||||
return make_ready_future<double>(get_compression_ratio(cf));
|
||||
}).then([] (const double& result) {
|
||||
return make_ready_future<json::json_return_type>(result);
|
||||
});
|
||||
@@ -892,7 +892,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
});
|
||||
|
||||
cf::get_compaction_strategy_class.set(r, [&ctx](const_req req) {
|
||||
return ctx.db.local().find_column_family(get_uuid(req.param["name"], ctx.db.local()))->get_compaction_strategy().name();
|
||||
return ctx.db.local().find_column_family(get_uuid(req.param["name"], ctx.db.local())).get_compaction_strategy().name();
|
||||
});
|
||||
|
||||
cf::set_compression_parameters.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
|
||||
@@ -38,7 +38,7 @@ future<I> map_reduce_cf_raw(http_context& ctx, const sstring& name, I init,
|
||||
Mapper mapper, Reducer reducer) {
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([mapper, uuid](database& db) {
|
||||
return mapper(*(db.find_column_family(uuid)));
|
||||
return mapper(db.find_column_family(uuid));
|
||||
}, init, reducer);
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ future<I> map_reduce_cf_raw(http_context& ctx, const sstring& name, I init,
|
||||
Mapper mapper, Reducer reducer, Result result) {
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([mapper, uuid](database& db) {
|
||||
return mapper(*(db.find_column_family(uuid)));
|
||||
return mapper(db.find_column_family(uuid));
|
||||
}, init, reducer);
|
||||
}
|
||||
|
||||
|
||||
@@ -263,11 +263,11 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
|
||||
}
|
||||
return ctx.db.invoke_on_all([keyspace, column_families] (database& db) {
|
||||
std::vector<lw_shared_ptr<column_family>> column_families_vec;
|
||||
std::vector<column_family*> column_families_vec;
|
||||
for (auto cf : column_families) {
|
||||
column_families_vec.push_back(db.find_column_family(keyspace, cf));
|
||||
column_families_vec.push_back(&db.find_column_family(keyspace, cf));
|
||||
}
|
||||
return parallel_for_each(column_families_vec, [] (lw_shared_ptr<column_family> cf) {
|
||||
return parallel_for_each(column_families_vec, [] (column_family* cf) {
|
||||
return cf->compact_all_sstables();
|
||||
});
|
||||
}).then([]{
|
||||
@@ -282,13 +282,13 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
|
||||
}
|
||||
return ctx.db.invoke_on_all([keyspace, column_families] (database& db) {
|
||||
std::vector<lw_shared_ptr<column_family>> column_families_vec;
|
||||
std::vector<column_family*> column_families_vec;
|
||||
auto& cm = db.get_compaction_manager();
|
||||
for (auto cf : column_families) {
|
||||
column_families_vec.push_back(db.find_column_family(keyspace, cf));
|
||||
column_families_vec.push_back(&db.find_column_family(keyspace, cf));
|
||||
}
|
||||
return parallel_for_each(column_families_vec, [&cm] (lw_shared_ptr<column_family> cf) {
|
||||
return cm.perform_cleanup(&*cf);
|
||||
return parallel_for_each(column_families_vec, [&cm] (column_family* cf) {
|
||||
return cm.perform_cleanup(cf);
|
||||
});
|
||||
}).then([]{
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
@@ -322,7 +322,7 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
}
|
||||
return ctx.db.invoke_on_all([keyspace, column_families] (database& db) {
|
||||
return parallel_for_each(column_families, [&db, keyspace](const sstring& cf) mutable {
|
||||
return db.find_column_family(keyspace, cf)->flush();
|
||||
return db.find_column_family(keyspace, cf).flush();
|
||||
});
|
||||
}).then([]{
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
|
||||
56
database.cc
56
database.cc
@@ -1924,8 +1924,8 @@ database::init_system_keyspace() {
|
||||
auto& ks = find_keyspace(db::system_keyspace::NAME);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) {
|
||||
auto cfm = pair.second;
|
||||
auto cf = this->find_column_family(cfm);
|
||||
cf->mark_ready_for_writes();
|
||||
auto& cf = this->find_column_family(cfm);
|
||||
cf.mark_ready_for_writes();
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
@@ -2089,7 +2089,7 @@ std::vector<lw_shared_ptr<column_family>> database::get_non_system_column_famili
|
||||
}));
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_family> database::find_column_family(const sstring& ks_name, const sstring& cf_name) {
|
||||
column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) {
|
||||
try {
|
||||
return find_column_family(find_uuid(ks_name, cf_name));
|
||||
} catch (...) {
|
||||
@@ -2097,7 +2097,7 @@ lw_shared_ptr<column_family> database::find_column_family(const sstring& ks_name
|
||||
}
|
||||
}
|
||||
|
||||
const lw_shared_ptr<column_family> database::find_column_family(const sstring& ks_name, const sstring& cf_name) const {
|
||||
const column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) const {
|
||||
try {
|
||||
return find_column_family(find_uuid(ks_name, cf_name));
|
||||
} catch (...) {
|
||||
@@ -2105,17 +2105,17 @@ const lw_shared_ptr<column_family> database::find_column_family(const sstring& k
|
||||
}
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_family> database::find_column_family(const utils::UUID& uuid) {
|
||||
column_family& database::find_column_family(const utils::UUID& uuid) {
|
||||
try {
|
||||
return _column_families.at(uuid);
|
||||
return *_column_families.at(uuid);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_column_family(uuid));
|
||||
}
|
||||
}
|
||||
|
||||
const lw_shared_ptr<column_family> database::find_column_family(const utils::UUID& uuid) const {
|
||||
const column_family& database::find_column_family(const utils::UUID& uuid) const {
|
||||
try {
|
||||
return _column_families.at(uuid);
|
||||
return *_column_families.at(uuid);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_column_family(uuid));
|
||||
}
|
||||
@@ -2209,11 +2209,11 @@ no_such_column_family::no_such_column_family(const sstring& ks_name, const sstri
|
||||
{
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_family> database::find_column_family(const schema_ptr& schema) {
|
||||
column_family& database::find_column_family(const schema_ptr& schema) {
|
||||
return find_column_family(schema->id());
|
||||
}
|
||||
|
||||
const lw_shared_ptr<column_family> database::find_column_family(const schema_ptr& schema) const {
|
||||
const column_family& database::find_column_family(const schema_ptr& schema) const {
|
||||
return find_column_family(schema->id());
|
||||
}
|
||||
|
||||
@@ -2233,7 +2233,7 @@ schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name)
|
||||
}
|
||||
|
||||
schema_ptr database::find_schema(const utils::UUID& uuid) const {
|
||||
return find_column_family(uuid)->schema();
|
||||
return find_column_family(uuid).schema();
|
||||
}
|
||||
|
||||
bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const {
|
||||
@@ -2382,8 +2382,8 @@ column_family::as_mutation_source(tracing::trace_state_ptr trace_state) const {
|
||||
|
||||
future<lw_shared_ptr<query::result>>
|
||||
database::query(schema_ptr s, const query::read_command& cmd, query::result_request request, const std::vector<query::partition_range>& ranges, tracing::trace_state_ptr trace_state) {
|
||||
auto cf = find_column_family(cmd.cf_id);
|
||||
return cf->query(std::move(s), cmd, request, ranges, std::move(trace_state)).then([this, s = _stats] (auto&& res) {
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
return cf.query(std::move(s), cmd, request, ranges, std::move(trace_state)).then([this, s = _stats] (auto&& res) {
|
||||
++s->total_reads;
|
||||
return std::move(res);
|
||||
});
|
||||
@@ -2391,8 +2391,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_requ
|
||||
|
||||
future<reconcilable_result>
|
||||
database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range, tracing::trace_state_ptr trace_state) {
|
||||
auto cf = find_column_family(cmd.cf_id);
|
||||
return mutation_query(std::move(s), cf->as_mutation_source(std::move(trace_state)), range, cmd.slice, cmd.row_limit, cmd.partition_limit,
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
return mutation_query(std::move(s), cf.as_mutation_source(std::move(trace_state)), range, cmd.slice, cmd.row_limit, cmd.partition_limit,
|
||||
cmd.timestamp).then([this, s = _stats] (auto&& res) {
|
||||
++s->total_reads;
|
||||
return std::move(res);
|
||||
@@ -2543,8 +2543,8 @@ void dirty_memory_manager::maybe_do_active_flush() {
|
||||
// However, since we'll very soon have a mechanism in place to account for the memory
|
||||
// that was already written in one form or another, that disadvantage is mitigated.
|
||||
memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region());
|
||||
auto biggest_cf = _db->find_column_family(biggest_memtable.schema());
|
||||
memtable_list& mtlist = get_memtable_list(*biggest_cf);
|
||||
auto& biggest_cf = _db->find_column_family(biggest_memtable.schema());
|
||||
memtable_list& mtlist = get_memtable_list(biggest_cf);
|
||||
// Please note that this will eventually take the semaphore and prevent two concurrent flushes.
|
||||
// We don't need any other extra protection.
|
||||
mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
@@ -2565,8 +2565,8 @@ void dirty_memory_manager::start_reclaiming() {
|
||||
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) {
|
||||
return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] {
|
||||
try {
|
||||
auto cf = find_column_family(m.column_family_id());
|
||||
cf->apply(m, m_schema, rp);
|
||||
auto& cf = find_column_family(m.column_family_id());
|
||||
cf.apply(m, m_schema, rp);
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
|
||||
}
|
||||
@@ -2578,14 +2578,14 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m) {
|
||||
// is a little in flux and commitlog is created only when db is
|
||||
// initied from datadir.
|
||||
auto uuid = m.column_family_id();
|
||||
auto cf = find_column_family(uuid);
|
||||
auto& cf = find_column_family(uuid);
|
||||
if (!s->is_synced()) {
|
||||
throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s",
|
||||
s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
if (cf->commitlog() != nullptr) {
|
||||
if (cf.commitlog() != nullptr) {
|
||||
commitlog_entry_writer cew(s, m);
|
||||
return cf->commitlog()->add_entry(uuid, cew).then([&m, this, s](auto rp) {
|
||||
return cf.commitlog()->add_entry(uuid, cew).then([&m, this, s](auto rp) {
|
||||
return this->apply_in_memory(m, s, rp).handle_exception([this, s, &m] (auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
@@ -2620,8 +2620,8 @@ future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, c
|
||||
}
|
||||
return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, fragmented, s = std::move(s)] {
|
||||
auto uuid = m.column_family_id();
|
||||
auto cf = find_column_family(uuid);
|
||||
cf->apply_streaming_mutation(s, plan_id, std::move(m), fragmented);
|
||||
auto& cf = find_column_family(uuid);
|
||||
cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2743,8 +2743,8 @@ future<> database::flush_all_memtables() {
|
||||
|
||||
future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf) {
|
||||
auto& ks = find_keyspace(ksname);
|
||||
auto cf = find_column_family(ksname, cfname);
|
||||
return truncate(ks, *cf, std::move(tsf));
|
||||
auto& cf = find_column_family(ksname, cfname);
|
||||
return truncate(ks, cf, std::move(tsf));
|
||||
}
|
||||
|
||||
future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf)
|
||||
@@ -2810,8 +2810,8 @@ future<> database::clear_snapshot(sstring tag, std::vector<sstring> keyspace_nam
|
||||
|
||||
return parallel_for_each(keyspaces, [this, tag] (auto& ks) {
|
||||
return parallel_for_each(ks.get().metadata()->cf_meta_data(), [this, tag] (auto& pair) {
|
||||
auto cf = this->find_column_family(pair.second);
|
||||
return cf->clear_snapshot(tag);
|
||||
auto& cf = this->find_column_family(pair.second);
|
||||
return cf.clear_snapshot(tag);
|
||||
}).then_wrapped([] (future<> f) {
|
||||
dblog.debug("Cleared out snapshot directories");
|
||||
});
|
||||
|
||||
12
database.hh
12
database.hh
@@ -1061,12 +1061,12 @@ public:
|
||||
void drop_keyspace(const sstring& name);
|
||||
const auto& keyspaces() const { return _keyspaces; }
|
||||
std::vector<sstring> get_non_system_keyspaces() const;
|
||||
lw_shared_ptr<column_family> find_column_family(const sstring& ks, const sstring& name);
|
||||
const lw_shared_ptr<column_family> find_column_family(const sstring& ks, const sstring& name) const;
|
||||
lw_shared_ptr<column_family> find_column_family(const utils::UUID&);
|
||||
const lw_shared_ptr<column_family> find_column_family(const utils::UUID&) const;
|
||||
lw_shared_ptr<column_family> find_column_family(const schema_ptr&);
|
||||
const lw_shared_ptr<column_family> find_column_family(const schema_ptr&) const;
|
||||
column_family& find_column_family(const sstring& ks, const sstring& name);
|
||||
const column_family& find_column_family(const sstring& ks, const sstring& name) const;
|
||||
column_family& find_column_family(const utils::UUID&);
|
||||
const column_family& find_column_family(const utils::UUID&) const;
|
||||
column_family& find_column_family(const schema_ptr&);
|
||||
const column_family& find_column_family(const schema_ptr&) const;
|
||||
bool column_family_exists(const utils::UUID& uuid) const;
|
||||
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const;
|
||||
schema_ptr find_schema(const utils::UUID&) const;
|
||||
|
||||
@@ -251,25 +251,25 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
|
||||
// TODO: might need better verification that the deserialized mutation
|
||||
// is schema compatible. My guess is that just applying the mutation
|
||||
// will not do this.
|
||||
auto cf = db.find_column_family(fm.column_family_id());
|
||||
auto& cf = db.find_column_family(fm.column_family_id());
|
||||
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
logger.debug("replaying at {} v={} {}:{} at {}", fm.column_family_id(), fm.schema_version(),
|
||||
cf->schema()->ks_name(), cf->schema()->cf_name(), rp);
|
||||
cf.schema()->ks_name(), cf.schema()->cf_name(), rp);
|
||||
}
|
||||
// Removed forwarding "new" RP. Instead give none/empty.
|
||||
// This is what origin does, and it should be fine.
|
||||
// The end result should be that once sstables are flushed out
|
||||
// their "replay_position" attribute will be empty, which is
|
||||
// lower than anything the new session will produce.
|
||||
if (cf->schema()->version() != fm.schema_version()) {
|
||||
if (cf.schema()->version() != fm.schema_version()) {
|
||||
const column_mapping& cm = cm_it->second;
|
||||
mutation m(fm.decorated_key(*cf->schema()), cf->schema());
|
||||
converting_mutation_partition_applier v(cm, *cf->schema(), m.partition());
|
||||
mutation m(fm.decorated_key(*cf.schema()), cf.schema());
|
||||
converting_mutation_partition_applier v(cm, *cf.schema(), m.partition());
|
||||
fm.partition().accept(cm, v);
|
||||
cf->apply(std::move(m));
|
||||
cf.apply(std::move(m));
|
||||
} else {
|
||||
cf->apply(fm, cf->schema());
|
||||
cf.apply(fm, cf.schema());
|
||||
}
|
||||
s->applied_mutations++;
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -615,8 +615,8 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
|
||||
if (do_flush) {
|
||||
proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) {
|
||||
return parallel_for_each(cfs.begin(), cfs.end(), [&db] (auto& id) {
|
||||
auto cf = db.find_column_family(id);
|
||||
return cf->flush();
|
||||
auto& cf = db.find_column_family(id);
|
||||
return cf.flush();
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
@@ -699,17 +699,17 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
|
||||
}
|
||||
|
||||
static future<> update_column_family(database& db, schema_ptr new_schema) {
|
||||
auto cfm = db.find_column_family(new_schema->id());
|
||||
column_family& cfm = db.find_column_family(new_schema->id());
|
||||
keyspace& ks = db.find_keyspace(new_schema->ks_name());
|
||||
|
||||
bool columns_changed = !cfm->schema()->equal_columns(*new_schema);
|
||||
bool columns_changed = !cfm.schema()->equal_columns(*new_schema);
|
||||
|
||||
auto s = local_schema_registry().learn(new_schema);
|
||||
s->registry_entry()->mark_synced();
|
||||
cfm->set_schema(std::move(s));
|
||||
cfm.set_schema(std::move(s));
|
||||
ks.metadata()->add_or_update_column_family(new_schema);
|
||||
|
||||
return service::get_local_migration_manager().notify_update_column_family(cfm->schema(), columns_changed);
|
||||
return service::get_local_migration_manager().notify_update_column_family(cfm.schema(), columns_changed);
|
||||
}
|
||||
|
||||
// see the comments for merge_keyspaces()
|
||||
@@ -751,8 +751,8 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
auto& ks = db.find_keyspace(s->ks_name());
|
||||
auto cfg = ks.make_column_family_config(*s, db.get_config());
|
||||
db.add_column_family(s, cfg);
|
||||
auto cf = db.find_column_family(s);
|
||||
cf->mark_ready_for_writes();
|
||||
auto& cf = db.find_column_family(s);
|
||||
cf.mark_ready_for_writes();
|
||||
ks.make_directory_for_column_family(s->cf_name(), s->id()).get();
|
||||
service::get_local_migration_manager().notify_create_column_family(s).get();
|
||||
}
|
||||
|
||||
@@ -919,8 +919,8 @@ future<> force_blocking_flush(sstring cfname) {
|
||||
assert(qctx);
|
||||
return qctx->_db.invoke_on_all([cfname = std::move(cfname)](database& db) {
|
||||
// if (!Boolean.getBoolean("cassandra.unsafesystem"))
|
||||
auto cf = db.find_column_family(NAME, cfname);
|
||||
return cf->flush();
|
||||
column_family& cf = db.find_column_family(NAME, cfname);
|
||||
return cf.flush();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -359,9 +359,9 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) {
|
||||
static future<partition_checksum> checksum_range_shard(database &db,
|
||||
const sstring& keyspace_name, const sstring& cf_name,
|
||||
const ::nonwrapping_range<dht::token>& range, repair_checksum hash_version) {
|
||||
auto cf = db.find_column_family(keyspace_name, cf_name);
|
||||
return do_with(dht::to_partition_range(range), [cf, hash_version] (const auto& partition_range) {
|
||||
auto reader = cf->make_streaming_reader(cf->schema(), partition_range);
|
||||
auto& cf = db.find_column_family(keyspace_name, cf_name);
|
||||
return do_with(dht::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) {
|
||||
auto reader = cf.make_streaming_reader(cf.schema(), partition_range);
|
||||
return do_with(std::move(reader), partition_checksum(),
|
||||
[hash_version] (auto& reader, auto& checksum) {
|
||||
return repeat([&reader, &checksum, hash_version] () {
|
||||
@@ -485,7 +485,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
// FIXME: column_family should have a method to estimate the number of
|
||||
// partitions (and of course it should use cardinality estimation bitmaps,
|
||||
// not trivial sum). We shouldn't have this ugly code here...
|
||||
auto sstables = db.local().find_column_family(keyspace, cf)->get_sstables();
|
||||
auto sstables = db.local().find_column_family(keyspace, cf).get_sstables();
|
||||
uint64_t estimated_partitions = 0;
|
||||
for (auto sst : *sstables) {
|
||||
estimated_partitions += sst->get_estimated_key_count();
|
||||
|
||||
@@ -453,7 +453,7 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f
|
||||
#endif
|
||||
try {
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name())->schema(); // FIXME: Should we lookup by id?
|
||||
auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id?
|
||||
#if 0
|
||||
oldCfm.validateCompatility(cfm);
|
||||
#endif
|
||||
|
||||
@@ -1684,8 +1684,8 @@ future<> storage_service::do_stop_stream_manager() {
|
||||
future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) {
|
||||
auto& ks = db.find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) {
|
||||
auto cf = db.find_column_family(pair.second);
|
||||
return cf->snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) {
|
||||
auto& cf = db.find_column_family(pair.second);
|
||||
return cf.snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) {
|
||||
if (exists) {
|
||||
throw std::runtime_error(sprint("Keyspace %s: snapshot %s already exists.", ks_name, name));
|
||||
}
|
||||
@@ -1715,8 +1715,8 @@ future<> storage_service::take_snapshot(sstring tag, std::vector<sstring> keyspa
|
||||
return parallel_for_each(keyspace_names, [&db, tag = std::move(tag)] (auto& ks_name) {
|
||||
auto& ks = db.find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, tag = std::move(tag)] (auto& pair) {
|
||||
auto cf = db.find_column_family(pair.second);
|
||||
return cf->snapshot(tag);
|
||||
auto& cf = db.find_column_family(pair.second);
|
||||
return cf.snapshot(tag);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1747,8 +1747,8 @@ future<> storage_service::take_column_family_snapshot(sstring ks_name, sstring c
|
||||
}).then([this, ks_name = std::move(ks_name), cf_name = std::move(cf_name), tag = std::move(tag)] {
|
||||
return check_snapshot_not_exist(_db.local(), ks_name, tag).then([this, ks_name, cf_name, tag] {
|
||||
return _db.invoke_on_all([ks_name, cf_name, tag] (database &db) {
|
||||
auto cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf->snapshot(tag);
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf.snapshot(tag);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1813,8 +1813,8 @@ storage_service::get_snapshot_details() {
|
||||
std::vector<service::storage_service::snapshot_details> details;
|
||||
|
||||
for (auto&& snap_map: pair.second) {
|
||||
auto cf = _db.local().find_column_family(snap_map.first);
|
||||
details.push_back({ snap_map.second.live, snap_map.second.total, cf->schema()->cf_name(), cf->schema()->ks_name() });
|
||||
auto& cf = _db.local().find_column_family(snap_map.first);
|
||||
details.push_back({ snap_map.second.live, snap_map.second.total, cf.schema()->cf_name(), cf.schema()->ks_name() });
|
||||
}
|
||||
result.emplace(pair.first, std::move(details));
|
||||
}
|
||||
@@ -2652,8 +2652,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
|
||||
// The statement above is valid at least from the Scylla side of things: it is still totally possible
|
||||
// that someones just copies the table over existing ones. There isn't much we can do about it.
|
||||
return _db.map_reduce(max_element(), [ks_name, cf_name] (database& db) {
|
||||
auto cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf->disable_sstable_write();
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf.disable_sstable_write();
|
||||
}).then([this, cf_name, ks_name] (int64_t max_seen_sstable) {
|
||||
// Then, we will reshuffle the tables to make sure that the generation numbers don't go too high.
|
||||
// We will do all of it the same CPU, to make sure that we won't have two parallel shufflers stepping
|
||||
@@ -2674,17 +2674,17 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
|
||||
// We provide to reshuffle_sstables() the generation of all existing sstables, such that it will
|
||||
// easily know which sstables are new.
|
||||
return _db.map_reduce(all_generations(), [ks_name, cf_name] (database& db) {
|
||||
auto cf = db.find_column_family(ks_name, cf_name);
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
std::set<int64_t> generations;
|
||||
for (auto& p : *(cf->get_sstables())) {
|
||||
for (auto& p : *(cf.get_sstables())) {
|
||||
generations.insert(p->generation());
|
||||
}
|
||||
return make_ready_future<std::set<int64_t>>(std::move(generations));
|
||||
}).then([this, max_seen_sstable, ks_name, cf_name] (std::set<int64_t> all_generations) {
|
||||
auto shard = std::hash<sstring>()(cf_name) % smp::count;
|
||||
return _db.invoke_on(shard, [ks_name, cf_name, max_seen_sstable, all_generations = std::move(all_generations)] (database& db) {
|
||||
auto cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf->reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1);
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf.reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1);
|
||||
});
|
||||
});
|
||||
}).then_wrapped([this, ks_name, cf_name] (future<std::vector<sstables::entry_descriptor>> f) {
|
||||
@@ -2708,8 +2708,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
|
||||
|
||||
logger.debug("Now accepting writes for sstables with generation larger or equal than {}", new_gen);
|
||||
return _db.invoke_on_all([ks_name, cf_name, new_gen] (database& db) {
|
||||
auto cf = db.find_column_family(ks_name, cf_name);
|
||||
auto disabled = std::chrono::duration_cast<std::chrono::microseconds>(cf->enable_sstable_write(new_gen)).count();
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
auto disabled = std::chrono::duration_cast<std::chrono::microseconds>(cf.enable_sstable_write(new_gen)).count();
|
||||
logger.info("CF {}.{} at shard {} had SSTables writes disabled for {} usec", ks_name, cf_name, engine().cpu_id(), disabled);
|
||||
return make_ready_future<>();
|
||||
}).then([new_tables = std::move(new_tables), eptr = std::move(eptr)] {
|
||||
@@ -2721,8 +2721,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
|
||||
}).then([this, ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables) {
|
||||
auto shard = std::hash<sstring>()(cf_name) % smp::count;
|
||||
return _db.invoke_on(shard, [ks_name, cf_name] (database& db) {
|
||||
auto cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf->flush_upload_dir();
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf.flush_upload_dir();
|
||||
}).then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables_from_upload) mutable {
|
||||
if (new_tables.empty() && new_tables_from_upload.empty()) {
|
||||
logger.info("No new SSTables were found for {}.{}", ks_name, cf_name);
|
||||
@@ -2733,8 +2733,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
|
||||
});
|
||||
}).then([this, ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables) {
|
||||
return _db.invoke_on_all([ks_name = std::move(ks_name), cf_name = std::move(cf_name), new_tables = std::move(new_tables)] (database& db) {
|
||||
auto cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf->load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] {
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
return cf.load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] {
|
||||
logger.info("Done loading new SSTables for {}.{}", ks_name, cf_name);
|
||||
});
|
||||
});
|
||||
@@ -3212,9 +3212,9 @@ calculate_splits(std::vector<dht::token> tokens, uint32_t split_count, column_fa
|
||||
std::vector<std::pair<nonwrapping_range<dht::token>, uint64_t>>
|
||||
storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, range<dht::token> range, uint32_t keys_per_split) {
|
||||
using range_type = nonwrapping_range<dht::token>;
|
||||
auto cf = _db.local().find_column_family(ks_name, cf_name);
|
||||
auto schema = cf->schema();
|
||||
auto sstables = cf->get_sstables();
|
||||
auto& cf = _db.local().find_column_family(ks_name, cf_name);
|
||||
auto schema = cf.schema();
|
||||
auto sstables = cf.get_sstables();
|
||||
uint64_t total_row_count_estimate = 0;
|
||||
std::vector<dht::token> tokens;
|
||||
std::vector<range_type> unwrapped;
|
||||
@@ -3230,7 +3230,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang
|
||||
std::vector<dht::token> range_tokens;
|
||||
for (auto &&sst : *sstables) {
|
||||
total_row_count_estimate += sst->estimated_keys_for_range(r);
|
||||
auto keys = sst->get_key_samples(*cf->schema(), r);
|
||||
auto keys = sst->get_key_samples(*cf.schema(), r);
|
||||
std::transform(keys.begin(), keys.end(), std::back_inserter(range_tokens), [](auto&& k) { return std::move(k.token()); });
|
||||
}
|
||||
std::sort(range_tokens.begin(), range_tokens.end());
|
||||
@@ -3243,7 +3243,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang
|
||||
uint64_t max_split_count = tokens.size() / min_samples_per_split + 1;
|
||||
uint32_t split_count = std::max(uint32_t(1), static_cast<uint32_t>(std::min(max_split_count, total_row_count_estimate / keys_per_split)));
|
||||
|
||||
return calculate_splits(std::move(tokens), split_count, *cf);
|
||||
return calculate_splits(std::move(tokens), split_count, cf);
|
||||
};
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -158,12 +158,12 @@ void stream_session::init_messaging_service_handler() {
|
||||
}
|
||||
std::vector<query::partition_range> query_ranges;
|
||||
try {
|
||||
auto cf = db.find_column_family(cf_id);
|
||||
auto& cf = db.find_column_family(cf_id);
|
||||
query_ranges.reserve(ranges.size());
|
||||
for (auto& range : ranges) {
|
||||
query_ranges.push_back(dht::to_partition_range(range));
|
||||
}
|
||||
return cf->flush_streaming_mutations(plan_id, std::move(query_ranges));
|
||||
return cf.flush_streaming_mutations(plan_id, std::move(query_ranges));
|
||||
} catch (no_such_column_family) {
|
||||
sslog.warn("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped",
|
||||
plan_id, from, cf_id);
|
||||
@@ -386,26 +386,26 @@ void stream_session::start_streaming_files() {
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<lw_shared_ptr<column_family>> stream_session::get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families) {
|
||||
std::vector<column_family*> stream_session::get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families) {
|
||||
// if columnfamilies are not specified, we add all cf under the keyspace
|
||||
std::vector<lw_shared_ptr<column_family>> stores;
|
||||
std::vector<column_family*> stores;
|
||||
auto& db = get_local_db();
|
||||
if (column_families.empty()) {
|
||||
for (auto& x : db.get_column_families()) {
|
||||
auto cf = x.second;
|
||||
auto cf_name = cf->schema()->cf_name();
|
||||
auto ks_name = cf->schema()->ks_name();
|
||||
column_family& cf = *(x.second);
|
||||
auto cf_name = cf.schema()->cf_name();
|
||||
auto ks_name = cf.schema()->ks_name();
|
||||
if (ks_name == keyspace) {
|
||||
sslog.debug("Find ks={} cf={}", ks_name, cf_name);
|
||||
stores.push_back(cf);
|
||||
stores.push_back(&cf);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: We can move this to database class and use shared_ptr<column_family> instead
|
||||
for (auto& cf_name : column_families) {
|
||||
try {
|
||||
auto x = db.find_column_family(keyspace, cf_name);
|
||||
stores.push_back(x);
|
||||
auto& x = db.find_column_family(keyspace, cf_name);
|
||||
stores.push_back(&x);
|
||||
} catch (no_such_column_family) {
|
||||
sslog.warn("stream_session: {}.{} does not exist: {}\n", keyspace, cf_name, std::current_exception());
|
||||
continue;
|
||||
@@ -434,8 +434,8 @@ future<> stream_session::receiving_failed(UUID cf_id)
|
||||
{
|
||||
return get_db().invoke_on_all([cf_id, plan_id = plan_id()] (database& db) {
|
||||
try {
|
||||
auto cf = db.find_column_family(cf_id);
|
||||
return cf->fail_streaming_mutations(plan_id);
|
||||
auto& cf = db.find_column_family(cf_id);
|
||||
return cf.fail_streaming_mutations(plan_id);
|
||||
} catch (no_such_column_family) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -255,7 +255,7 @@ public:
|
||||
*/
|
||||
void add_transfer_ranges(sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families);
|
||||
|
||||
std::vector<lw_shared_ptr<column_family>> get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families);
|
||||
std::vector<column_family*> get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families);
|
||||
|
||||
void close_session(stream_session_state final_state);
|
||||
|
||||
|
||||
@@ -108,8 +108,8 @@ future<> do_send_mutations(auto si, auto fm, bool fragmented) {
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto cf = si->db.find_column_family(si->cf_id);
|
||||
return do_with(cf->make_streaming_reader(cf->schema(), si->pr), [si] (auto& reader) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
return do_with(cf.make_streaming_reader(cf.schema(), si->pr), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto smopt) {
|
||||
if (smopt && si->db.column_family_exists(si->cf_id)) {
|
||||
|
||||
@@ -176,8 +176,8 @@ public:
|
||||
const sstring& column_name,
|
||||
data_value expected) override {
|
||||
auto& db = _db->local();
|
||||
auto cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf->schema();
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf.schema();
|
||||
auto pkey = partition_key::from_deeply_exploded(*schema, pk);
|
||||
auto ckey = clustering_key::from_deeply_exploded(*schema, ck);
|
||||
auto exp = expected.type()->decompose(expected);
|
||||
@@ -189,9 +189,9 @@ public:
|
||||
column_name = std::move(column_name),
|
||||
exp = std::move(exp),
|
||||
table_name = std::move(table_name)] (database& db) mutable {
|
||||
auto cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf->schema();
|
||||
return cf->find_partition_slow(schema, pkey)
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf.schema();
|
||||
return cf.find_partition_slow(schema, pkey)
|
||||
.then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) {
|
||||
assert(p != nullptr);
|
||||
auto row = p->find_row(ckey);
|
||||
|
||||
Reference in New Issue
Block a user