diff --git a/api/column_family.cc b/api/column_family.cc index 12e10056e3..e8754ea143 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -58,7 +58,7 @@ future<> foreach_column_family(http_context& ctx, const sstring& name, function< auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.invoke_on_all([f, uuid](database& db) { - f(*(db.find_column_family(uuid))); + f(db.find_column_family(uuid)); }); } @@ -91,8 +91,8 @@ static future get_cf_stats_sum(http_context& ctx, const // so to get an estimation of sum, we multiply the mean // with count. The information is gather in nano second, // but reported in micro - auto cf = db.find_column_family(uuid); - return ((cf->get_stats().*f).hist.count/1000.0) * (cf->get_stats().*f).hist.mean; + column_family& cf = db.find_column_family(uuid); + return ((cf.get_stats().*f).hist.count/1000.0) * (cf.get_stats().*f).hist.mean; }, 0.0, std::plus()).then([](double res) { return make_ready_future((int64_t)res); }); @@ -110,7 +110,7 @@ static future get_cf_histogram(http_context& ctx, const utils::timed_rate_moving_average_and_histogram column_family::stats::*f) { utils::UUID uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([f, uuid](const database& p) { - return (p.find_column_family(uuid)->get_stats().*f).hist;}, + return (p.find_column_family(uuid).get_stats().*f).hist;}, utils::ihistogram(), std::plus()) .then([](const utils::ihistogram& val) { @@ -137,7 +137,7 @@ static future get_cf_rate_and_histogram(http_context& c utils::timed_rate_moving_average_and_histogram column_family::stats::*f) { utils::UUID uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([f, uuid](const database& p) { - return (p.find_column_family(uuid)->get_stats().*f).rate();}, + return (p.find_column_family(uuid).get_stats().*f).rate();}, utils::rate_moving_average_and_histogram(), std::plus()) .then([](const utils::rate_moving_average_and_histogram& val) { @@ -219,8 +219,8 @@ static future sum_sstable(http_context& ctx, const sstr auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([uuid, total](database& db) { std::unordered_map m; - auto sstables = (total) ? db.find_column_family(uuid)->get_sstables_including_compacted_undeleted() : - db.find_column_family(uuid)->get_sstables(); + auto sstables = (total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() : + db.find_column_family(uuid).get_sstables(); for (auto t : *sstables) { m[t->get_filename()] = t->bytes_on_disk(); } @@ -723,7 +723,7 @@ void set_column_family(http_context& ctx, routes& r) { cf::get_true_snapshots_size.set(r, [&ctx] (std::unique_ptr req) { auto uuid = get_uuid(req->param["name"], ctx.db.local()); - return ctx.db.local().find_column_family(uuid)->get_snapshot_details().then([]( + return ctx.db.local().find_column_family(uuid).get_snapshot_details().then([]( const std::unordered_map& sd) { int64_t res = 0; for (auto i : sd) { @@ -861,8 +861,8 @@ void set_column_family(http_context& ctx, routes& r) { auto uuid = get_uuid(req->param["name"], ctx.db.local()); return ctx.db.map_reduce(sum_ratio(), [uuid](database& db) { - auto cf = db.find_column_family(uuid); - return make_ready_future(get_compression_ratio(*cf)); + column_family& cf = db.find_column_family(uuid); + return make_ready_future(get_compression_ratio(cf)); }).then([] (const double& result) { return make_ready_future(result); }); @@ -892,7 +892,7 @@ void set_column_family(http_context& ctx, routes& r) { }); cf::get_compaction_strategy_class.set(r, [&ctx](const_req req) { - return ctx.db.local().find_column_family(get_uuid(req.param["name"], ctx.db.local()))->get_compaction_strategy().name(); + return ctx.db.local().find_column_family(get_uuid(req.param["name"], ctx.db.local())).get_compaction_strategy().name(); }); cf::set_compression_parameters.set(r, [&ctx](std::unique_ptr req) { diff --git a/api/column_family.hh b/api/column_family.hh index 803d12e1db..00d173e94a 100644 --- a/api/column_family.hh +++ b/api/column_family.hh @@ -38,7 +38,7 @@ future map_reduce_cf_raw(http_context& ctx, const sstring& name, I init, Mapper mapper, Reducer reducer) { auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([mapper, uuid](database& db) { - return mapper(*(db.find_column_family(uuid))); + return mapper(db.find_column_family(uuid)); }, init, reducer); } @@ -56,7 +56,7 @@ future map_reduce_cf_raw(http_context& ctx, const sstring& name, I init, Mapper mapper, Reducer reducer, Result result) { auto uuid = get_uuid(name, ctx.db.local()); return ctx.db.map_reduce0([mapper, uuid](database& db) { - return mapper(*(db.find_column_family(uuid))); + return mapper(db.find_column_family(uuid)); }, init, reducer); } diff --git a/api/storage_service.cc b/api/storage_service.cc index 0cd92a8475..37b80a7cff 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -263,11 +263,11 @@ void set_storage_service(http_context& ctx, routes& r) { column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); } return ctx.db.invoke_on_all([keyspace, column_families] (database& db) { - std::vector> column_families_vec; + std::vector column_families_vec; for (auto cf : column_families) { - column_families_vec.push_back(db.find_column_family(keyspace, cf)); + column_families_vec.push_back(&db.find_column_family(keyspace, cf)); } - return parallel_for_each(column_families_vec, [] (lw_shared_ptr cf) { + return parallel_for_each(column_families_vec, [] (column_family* cf) { return cf->compact_all_sstables(); }); }).then([]{ @@ -282,13 +282,13 @@ void set_storage_service(http_context& ctx, routes& r) { column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); } return ctx.db.invoke_on_all([keyspace, column_families] (database& db) { - std::vector> column_families_vec; + std::vector column_families_vec; auto& cm = db.get_compaction_manager(); for (auto cf : column_families) { - column_families_vec.push_back(db.find_column_family(keyspace, cf)); + column_families_vec.push_back(&db.find_column_family(keyspace, cf)); } - return parallel_for_each(column_families_vec, [&cm] (lw_shared_ptr cf) { - return cm.perform_cleanup(&*cf); + return parallel_for_each(column_families_vec, [&cm] (column_family* cf) { + return cm.perform_cleanup(cf); }); }).then([]{ return make_ready_future(0); @@ -322,7 +322,7 @@ void set_storage_service(http_context& ctx, routes& r) { } return ctx.db.invoke_on_all([keyspace, column_families] (database& db) { return parallel_for_each(column_families, [&db, keyspace](const sstring& cf) mutable { - return db.find_column_family(keyspace, cf)->flush(); + return db.find_column_family(keyspace, cf).flush(); }); }).then([]{ return make_ready_future(json_void()); diff --git a/database.cc b/database.cc index 41460dcee9..9dd008c463 100644 --- a/database.cc +++ b/database.cc @@ -1924,8 +1924,8 @@ database::init_system_keyspace() { auto& ks = find_keyspace(db::system_keyspace::NAME); return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) { auto cfm = pair.second; - auto cf = this->find_column_family(cfm); - cf->mark_ready_for_writes(); + auto& cf = this->find_column_family(cfm); + cf.mark_ready_for_writes(); return make_ready_future<>(); }); }); @@ -2089,7 +2089,7 @@ std::vector> database::get_non_system_column_famili })); } -lw_shared_ptr database::find_column_family(const sstring& ks_name, const sstring& cf_name) { +column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) { try { return find_column_family(find_uuid(ks_name, cf_name)); } catch (...) { @@ -2097,7 +2097,7 @@ lw_shared_ptr database::find_column_family(const sstring& ks_name } } -const lw_shared_ptr database::find_column_family(const sstring& ks_name, const sstring& cf_name) const { +const column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) const { try { return find_column_family(find_uuid(ks_name, cf_name)); } catch (...) { @@ -2105,17 +2105,17 @@ const lw_shared_ptr database::find_column_family(const sstring& k } } -lw_shared_ptr database::find_column_family(const utils::UUID& uuid) { +column_family& database::find_column_family(const utils::UUID& uuid) { try { - return _column_families.at(uuid); + return *_column_families.at(uuid); } catch (...) { std::throw_with_nested(no_such_column_family(uuid)); } } -const lw_shared_ptr database::find_column_family(const utils::UUID& uuid) const { +const column_family& database::find_column_family(const utils::UUID& uuid) const { try { - return _column_families.at(uuid); + return *_column_families.at(uuid); } catch (...) { std::throw_with_nested(no_such_column_family(uuid)); } @@ -2209,11 +2209,11 @@ no_such_column_family::no_such_column_family(const sstring& ks_name, const sstri { } -lw_shared_ptr database::find_column_family(const schema_ptr& schema) { +column_family& database::find_column_family(const schema_ptr& schema) { return find_column_family(schema->id()); } -const lw_shared_ptr database::find_column_family(const schema_ptr& schema) const { +const column_family& database::find_column_family(const schema_ptr& schema) const { return find_column_family(schema->id()); } @@ -2233,7 +2233,7 @@ schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) } schema_ptr database::find_schema(const utils::UUID& uuid) const { - return find_column_family(uuid)->schema(); + return find_column_family(uuid).schema(); } bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const { @@ -2382,8 +2382,8 @@ column_family::as_mutation_source(tracing::trace_state_ptr trace_state) const { future> database::query(schema_ptr s, const query::read_command& cmd, query::result_request request, const std::vector& ranges, tracing::trace_state_ptr trace_state) { - auto cf = find_column_family(cmd.cf_id); - return cf->query(std::move(s), cmd, request, ranges, std::move(trace_state)).then([this, s = _stats] (auto&& res) { + column_family& cf = find_column_family(cmd.cf_id); + return cf.query(std::move(s), cmd, request, ranges, std::move(trace_state)).then([this, s = _stats] (auto&& res) { ++s->total_reads; return std::move(res); }); @@ -2391,8 +2391,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_requ future database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range, tracing::trace_state_ptr trace_state) { - auto cf = find_column_family(cmd.cf_id); - return mutation_query(std::move(s), cf->as_mutation_source(std::move(trace_state)), range, cmd.slice, cmd.row_limit, cmd.partition_limit, + column_family& cf = find_column_family(cmd.cf_id); + return mutation_query(std::move(s), cf.as_mutation_source(std::move(trace_state)), range, cmd.slice, cmd.row_limit, cmd.partition_limit, cmd.timestamp).then([this, s = _stats] (auto&& res) { ++s->total_reads; return std::move(res); @@ -2543,8 +2543,8 @@ void dirty_memory_manager::maybe_do_active_flush() { // However, since we'll very soon have a mechanism in place to account for the memory // that was already written in one form or another, that disadvantage is mitigated. memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region()); - auto biggest_cf = _db->find_column_family(biggest_memtable.schema()); - memtable_list& mtlist = get_memtable_list(*biggest_cf); + auto& biggest_cf = _db->find_column_family(biggest_memtable.schema()); + memtable_list& mtlist = get_memtable_list(biggest_cf); // Please note that this will eventually take the semaphore and prevent two concurrent flushes. // We don't need any other extra protection. mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate); @@ -2565,8 +2565,8 @@ void dirty_memory_manager::start_reclaiming() { future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { try { - auto cf = find_column_family(m.column_family_id()); - cf->apply(m, m_schema, rp); + auto& cf = find_column_family(m.column_family_id()); + cf.apply(m, m_schema, rp); } catch (no_such_column_family&) { dblog.error("Attempting to mutate non-existent table {}", m.column_family_id()); } @@ -2578,14 +2578,14 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m) { // is a little in flux and commitlog is created only when db is // initied from datadir. auto uuid = m.column_family_id(); - auto cf = find_column_family(uuid); + auto& cf = find_column_family(uuid); if (!s->is_synced()) { throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - if (cf->commitlog() != nullptr) { + if (cf.commitlog() != nullptr) { commitlog_entry_writer cew(s, m); - return cf->commitlog()->add_entry(uuid, cew).then([&m, this, s](auto rp) { + return cf.commitlog()->add_entry(uuid, cew).then([&m, this, s](auto rp) { return this->apply_in_memory(m, s, rp).handle_exception([this, s, &m] (auto ep) { try { std::rethrow_exception(ep); @@ -2620,8 +2620,8 @@ future<> database::apply_streaming_mutation(schema_ptr s, utils::UUID plan_id, c } return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, plan_id, fragmented, s = std::move(s)] { auto uuid = m.column_family_id(); - auto cf = find_column_family(uuid); - cf->apply_streaming_mutation(s, plan_id, std::move(m), fragmented); + auto& cf = find_column_family(uuid); + cf.apply_streaming_mutation(s, plan_id, std::move(m), fragmented); }); } @@ -2743,8 +2743,8 @@ future<> database::flush_all_memtables() { future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf) { auto& ks = find_keyspace(ksname); - auto cf = find_column_family(ksname, cfname); - return truncate(ks, *cf, std::move(tsf)); + auto& cf = find_column_family(ksname, cfname); + return truncate(ks, cf, std::move(tsf)); } future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf) @@ -2810,8 +2810,8 @@ future<> database::clear_snapshot(sstring tag, std::vector keyspace_nam return parallel_for_each(keyspaces, [this, tag] (auto& ks) { return parallel_for_each(ks.get().metadata()->cf_meta_data(), [this, tag] (auto& pair) { - auto cf = this->find_column_family(pair.second); - return cf->clear_snapshot(tag); + auto& cf = this->find_column_family(pair.second); + return cf.clear_snapshot(tag); }).then_wrapped([] (future<> f) { dblog.debug("Cleared out snapshot directories"); }); diff --git a/database.hh b/database.hh index a547648a51..0e728098ee 100644 --- a/database.hh +++ b/database.hh @@ -1061,12 +1061,12 @@ public: void drop_keyspace(const sstring& name); const auto& keyspaces() const { return _keyspaces; } std::vector get_non_system_keyspaces() const; - lw_shared_ptr find_column_family(const sstring& ks, const sstring& name); - const lw_shared_ptr find_column_family(const sstring& ks, const sstring& name) const; - lw_shared_ptr find_column_family(const utils::UUID&); - const lw_shared_ptr find_column_family(const utils::UUID&) const; - lw_shared_ptr find_column_family(const schema_ptr&); - const lw_shared_ptr find_column_family(const schema_ptr&) const; + column_family& find_column_family(const sstring& ks, const sstring& name); + const column_family& find_column_family(const sstring& ks, const sstring& name) const; + column_family& find_column_family(const utils::UUID&); + const column_family& find_column_family(const utils::UUID&) const; + column_family& find_column_family(const schema_ptr&); + const column_family& find_column_family(const schema_ptr&) const; bool column_family_exists(const utils::UUID& uuid) const; schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const; schema_ptr find_schema(const utils::UUID&) const; diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index ea4864013e..3751a3c04c 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -251,25 +251,25 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer // TODO: might need better verification that the deserialized mutation // is schema compatible. My guess is that just applying the mutation // will not do this. - auto cf = db.find_column_family(fm.column_family_id()); + auto& cf = db.find_column_family(fm.column_family_id()); if (logger.is_enabled(logging::log_level::debug)) { logger.debug("replaying at {} v={} {}:{} at {}", fm.column_family_id(), fm.schema_version(), - cf->schema()->ks_name(), cf->schema()->cf_name(), rp); + cf.schema()->ks_name(), cf.schema()->cf_name(), rp); } // Removed forwarding "new" RP. Instead give none/empty. // This is what origin does, and it should be fine. // The end result should be that once sstables are flushed out // their "replay_position" attribute will be empty, which is // lower than anything the new session will produce. - if (cf->schema()->version() != fm.schema_version()) { + if (cf.schema()->version() != fm.schema_version()) { const column_mapping& cm = cm_it->second; - mutation m(fm.decorated_key(*cf->schema()), cf->schema()); - converting_mutation_partition_applier v(cm, *cf->schema(), m.partition()); + mutation m(fm.decorated_key(*cf.schema()), cf.schema()); + converting_mutation_partition_applier v(cm, *cf.schema(), m.partition()); fm.partition().accept(cm, v); - cf->apply(std::move(m)); + cf.apply(std::move(m)); } else { - cf->apply(fm, cf->schema()); + cf.apply(fm, cf.schema()); } s->applied_mutations++; return make_ready_future<>(); diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 296ed21bdd..1b814a38b5 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -615,8 +615,8 @@ future<> do_merge_schema(distributed& proxy, std::vector if (do_flush) { proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) { return parallel_for_each(cfs.begin(), cfs.end(), [&db] (auto& id) { - auto cf = db.find_column_family(id); - return cf->flush(); + auto& cf = db.find_column_family(id); + return cf.flush(); }); }).get(); } @@ -699,17 +699,17 @@ future> merge_keyspaces(distributed& p } static future<> update_column_family(database& db, schema_ptr new_schema) { - auto cfm = db.find_column_family(new_schema->id()); + column_family& cfm = db.find_column_family(new_schema->id()); keyspace& ks = db.find_keyspace(new_schema->ks_name()); - bool columns_changed = !cfm->schema()->equal_columns(*new_schema); + bool columns_changed = !cfm.schema()->equal_columns(*new_schema); auto s = local_schema_registry().learn(new_schema); s->registry_entry()->mark_synced(); - cfm->set_schema(std::move(s)); + cfm.set_schema(std::move(s)); ks.metadata()->add_or_update_column_family(new_schema); - return service::get_local_migration_manager().notify_update_column_family(cfm->schema(), columns_changed); + return service::get_local_migration_manager().notify_update_column_family(cfm.schema(), columns_changed); } // see the comments for merge_keyspaces() @@ -751,8 +751,8 @@ static void merge_tables(distributed& proxy, auto& ks = db.find_keyspace(s->ks_name()); auto cfg = ks.make_column_family_config(*s, db.get_config()); db.add_column_family(s, cfg); - auto cf = db.find_column_family(s); - cf->mark_ready_for_writes(); + auto& cf = db.find_column_family(s); + cf.mark_ready_for_writes(); ks.make_directory_for_column_family(s->cf_name(), s->id()).get(); service::get_local_migration_manager().notify_create_column_family(s).get(); } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 9f1cc34911..5cdf4a6053 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -919,8 +919,8 @@ future<> force_blocking_flush(sstring cfname) { assert(qctx); return qctx->_db.invoke_on_all([cfname = std::move(cfname)](database& db) { // if (!Boolean.getBoolean("cassandra.unsafesystem")) - auto cf = db.find_column_family(NAME, cfname); - return cf->flush(); + column_family& cf = db.find_column_family(NAME, cfname); + return cf.flush(); }); } diff --git a/repair/repair.cc b/repair/repair.cc index 4e15b1b7a3..67c477db9c 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -359,9 +359,9 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) { static future checksum_range_shard(database &db, const sstring& keyspace_name, const sstring& cf_name, const ::nonwrapping_range& range, repair_checksum hash_version) { - auto cf = db.find_column_family(keyspace_name, cf_name); - return do_with(dht::to_partition_range(range), [cf, hash_version] (const auto& partition_range) { - auto reader = cf->make_streaming_reader(cf->schema(), partition_range); + auto& cf = db.find_column_family(keyspace_name, cf_name); + return do_with(dht::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) { + auto reader = cf.make_streaming_reader(cf.schema(), partition_range); return do_with(std::move(reader), partition_checksum(), [hash_version] (auto& reader, auto& checksum) { return repeat([&reader, &checksum, hash_version] () { @@ -485,7 +485,7 @@ static future<> repair_cf_range(seastar::sharded& db, // FIXME: column_family should have a method to estimate the number of // partitions (and of course it should use cardinality estimation bitmaps, // not trivial sum). We shouldn't have this ugly code here... - auto sstables = db.local().find_column_family(keyspace, cf)->get_sstables(); + auto sstables = db.local().find_column_family(keyspace, cf).get_sstables(); uint64_t estimated_partitions = 0; for (auto sst : *sstables) { estimated_partitions += sst->get_estimated_key_count(); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index ccdd0dec4a..65b8563ad1 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -453,7 +453,7 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f #endif try { auto& db = get_local_storage_proxy().get_db().local(); - auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name())->schema(); // FIXME: Should we lookup by id? + auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id? #if 0 oldCfm.validateCompatility(cfm); #endif diff --git a/service/storage_service.cc b/service/storage_service.cc index d7ff428c45..31274eb025 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1684,8 +1684,8 @@ future<> storage_service::do_stop_stream_manager() { future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) { auto& ks = db.find_keyspace(ks_name); return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) { - auto cf = db.find_column_family(pair.second); - return cf->snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) { + auto& cf = db.find_column_family(pair.second); + return cf.snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) { if (exists) { throw std::runtime_error(sprint("Keyspace %s: snapshot %s already exists.", ks_name, name)); } @@ -1715,8 +1715,8 @@ future<> storage_service::take_snapshot(sstring tag, std::vector keyspa return parallel_for_each(keyspace_names, [&db, tag = std::move(tag)] (auto& ks_name) { auto& ks = db.find_keyspace(ks_name); return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, tag = std::move(tag)] (auto& pair) { - auto cf = db.find_column_family(pair.second); - return cf->snapshot(tag); + auto& cf = db.find_column_family(pair.second); + return cf.snapshot(tag); }); }); }); @@ -1747,8 +1747,8 @@ future<> storage_service::take_column_family_snapshot(sstring ks_name, sstring c }).then([this, ks_name = std::move(ks_name), cf_name = std::move(cf_name), tag = std::move(tag)] { return check_snapshot_not_exist(_db.local(), ks_name, tag).then([this, ks_name, cf_name, tag] { return _db.invoke_on_all([ks_name, cf_name, tag] (database &db) { - auto cf = db.find_column_family(ks_name, cf_name); - return cf->snapshot(tag); + auto& cf = db.find_column_family(ks_name, cf_name); + return cf.snapshot(tag); }); }); }); @@ -1813,8 +1813,8 @@ storage_service::get_snapshot_details() { std::vector details; for (auto&& snap_map: pair.second) { - auto cf = _db.local().find_column_family(snap_map.first); - details.push_back({ snap_map.second.live, snap_map.second.total, cf->schema()->cf_name(), cf->schema()->ks_name() }); + auto& cf = _db.local().find_column_family(snap_map.first); + details.push_back({ snap_map.second.live, snap_map.second.total, cf.schema()->cf_name(), cf.schema()->ks_name() }); } result.emplace(pair.first, std::move(details)); } @@ -2652,8 +2652,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { // The statement above is valid at least from the Scylla side of things: it is still totally possible // that someones just copies the table over existing ones. There isn't much we can do about it. return _db.map_reduce(max_element(), [ks_name, cf_name] (database& db) { - auto cf = db.find_column_family(ks_name, cf_name); - return cf->disable_sstable_write(); + auto& cf = db.find_column_family(ks_name, cf_name); + return cf.disable_sstable_write(); }).then([this, cf_name, ks_name] (int64_t max_seen_sstable) { // Then, we will reshuffle the tables to make sure that the generation numbers don't go too high. // We will do all of it the same CPU, to make sure that we won't have two parallel shufflers stepping @@ -2674,17 +2674,17 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { // We provide to reshuffle_sstables() the generation of all existing sstables, such that it will // easily know which sstables are new. return _db.map_reduce(all_generations(), [ks_name, cf_name] (database& db) { - auto cf = db.find_column_family(ks_name, cf_name); + auto& cf = db.find_column_family(ks_name, cf_name); std::set generations; - for (auto& p : *(cf->get_sstables())) { + for (auto& p : *(cf.get_sstables())) { generations.insert(p->generation()); } return make_ready_future>(std::move(generations)); }).then([this, max_seen_sstable, ks_name, cf_name] (std::set all_generations) { auto shard = std::hash()(cf_name) % smp::count; return _db.invoke_on(shard, [ks_name, cf_name, max_seen_sstable, all_generations = std::move(all_generations)] (database& db) { - auto cf = db.find_column_family(ks_name, cf_name); - return cf->reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1); + auto& cf = db.find_column_family(ks_name, cf_name); + return cf.reshuffle_sstables(std::move(all_generations), max_seen_sstable + 1); }); }); }).then_wrapped([this, ks_name, cf_name] (future> f) { @@ -2708,8 +2708,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { logger.debug("Now accepting writes for sstables with generation larger or equal than {}", new_gen); return _db.invoke_on_all([ks_name, cf_name, new_gen] (database& db) { - auto cf = db.find_column_family(ks_name, cf_name); - auto disabled = std::chrono::duration_cast(cf->enable_sstable_write(new_gen)).count(); + auto& cf = db.find_column_family(ks_name, cf_name); + auto disabled = std::chrono::duration_cast(cf.enable_sstable_write(new_gen)).count(); logger.info("CF {}.{} at shard {} had SSTables writes disabled for {} usec", ks_name, cf_name, engine().cpu_id(), disabled); return make_ready_future<>(); }).then([new_tables = std::move(new_tables), eptr = std::move(eptr)] { @@ -2721,8 +2721,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { }).then([this, ks_name, cf_name] (std::vector new_tables) { auto shard = std::hash()(cf_name) % smp::count; return _db.invoke_on(shard, [ks_name, cf_name] (database& db) { - auto cf = db.find_column_family(ks_name, cf_name); - return cf->flush_upload_dir(); + auto& cf = db.find_column_family(ks_name, cf_name); + return cf.flush_upload_dir(); }).then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector new_tables_from_upload) mutable { if (new_tables.empty() && new_tables_from_upload.empty()) { logger.info("No new SSTables were found for {}.{}", ks_name, cf_name); @@ -2733,8 +2733,8 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { }); }).then([this, ks_name, cf_name] (std::vector new_tables) { return _db.invoke_on_all([ks_name = std::move(ks_name), cf_name = std::move(cf_name), new_tables = std::move(new_tables)] (database& db) { - auto cf = db.find_column_family(ks_name, cf_name); - return cf->load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] { + auto& cf = db.find_column_family(ks_name, cf_name); + return cf.load_new_sstables(new_tables).then([ks_name = std::move(ks_name), cf_name = std::move(cf_name)] { logger.info("Done loading new SSTables for {}.{}", ks_name, cf_name); }); }); @@ -3212,9 +3212,9 @@ calculate_splits(std::vector tokens, uint32_t split_count, column_fa std::vector, uint64_t>> storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, range range, uint32_t keys_per_split) { using range_type = nonwrapping_range; - auto cf = _db.local().find_column_family(ks_name, cf_name); - auto schema = cf->schema(); - auto sstables = cf->get_sstables(); + auto& cf = _db.local().find_column_family(ks_name, cf_name); + auto schema = cf.schema(); + auto sstables = cf.get_sstables(); uint64_t total_row_count_estimate = 0; std::vector tokens; std::vector unwrapped; @@ -3230,7 +3230,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang std::vector range_tokens; for (auto &&sst : *sstables) { total_row_count_estimate += sst->estimated_keys_for_range(r); - auto keys = sst->get_key_samples(*cf->schema(), r); + auto keys = sst->get_key_samples(*cf.schema(), r); std::transform(keys.begin(), keys.end(), std::back_inserter(range_tokens), [](auto&& k) { return std::move(k.token()); }); } std::sort(range_tokens.begin(), range_tokens.end()); @@ -3243,7 +3243,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang uint64_t max_split_count = tokens.size() / min_samples_per_split + 1; uint32_t split_count = std::max(uint32_t(1), static_cast(std::min(max_split_count, total_row_count_estimate / keys_per_split))); - return calculate_splits(std::move(tokens), split_count, *cf); + return calculate_splits(std::move(tokens), split_count, cf); }; } // namespace service diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 25b981d12d..991ea760be 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -158,12 +158,12 @@ void stream_session::init_messaging_service_handler() { } std::vector query_ranges; try { - auto cf = db.find_column_family(cf_id); + auto& cf = db.find_column_family(cf_id); query_ranges.reserve(ranges.size()); for (auto& range : ranges) { query_ranges.push_back(dht::to_partition_range(range)); } - return cf->flush_streaming_mutations(plan_id, std::move(query_ranges)); + return cf.flush_streaming_mutations(plan_id, std::move(query_ranges)); } catch (no_such_column_family) { sslog.warn("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped", plan_id, from, cf_id); @@ -386,26 +386,26 @@ void stream_session::start_streaming_files() { } } -std::vector> stream_session::get_column_family_stores(const sstring& keyspace, const std::vector& column_families) { +std::vector stream_session::get_column_family_stores(const sstring& keyspace, const std::vector& column_families) { // if columnfamilies are not specified, we add all cf under the keyspace - std::vector> stores; + std::vector stores; auto& db = get_local_db(); if (column_families.empty()) { for (auto& x : db.get_column_families()) { - auto cf = x.second; - auto cf_name = cf->schema()->cf_name(); - auto ks_name = cf->schema()->ks_name(); + column_family& cf = *(x.second); + auto cf_name = cf.schema()->cf_name(); + auto ks_name = cf.schema()->ks_name(); if (ks_name == keyspace) { sslog.debug("Find ks={} cf={}", ks_name, cf_name); - stores.push_back(cf); + stores.push_back(&cf); } } } else { // TODO: We can move this to database class and use shared_ptr instead for (auto& cf_name : column_families) { try { - auto x = db.find_column_family(keyspace, cf_name); - stores.push_back(x); + auto& x = db.find_column_family(keyspace, cf_name); + stores.push_back(&x); } catch (no_such_column_family) { sslog.warn("stream_session: {}.{} does not exist: {}\n", keyspace, cf_name, std::current_exception()); continue; @@ -434,8 +434,8 @@ future<> stream_session::receiving_failed(UUID cf_id) { return get_db().invoke_on_all([cf_id, plan_id = plan_id()] (database& db) { try { - auto cf = db.find_column_family(cf_id); - return cf->fail_streaming_mutations(plan_id); + auto& cf = db.find_column_family(cf_id); + return cf.fail_streaming_mutations(plan_id); } catch (no_such_column_family) { return make_ready_future<>(); } diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 21be3ad00d..fc253d58ac 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -255,7 +255,7 @@ public: */ void add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families); - std::vector> get_column_family_stores(const sstring& keyspace, const std::vector& column_families); + std::vector get_column_family_stores(const sstring& keyspace, const std::vector& column_families); void close_session(stream_session_state final_state); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index bb3c550ab4..bdbfc6eb2a 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -108,8 +108,8 @@ future<> do_send_mutations(auto si, auto fm, bool fragmented) { } future<> send_mutations(auto si) { - auto cf = si->db.find_column_family(si->cf_id); - return do_with(cf->make_streaming_reader(cf->schema(), si->pr), [si] (auto& reader) { + auto& cf = si->db.find_column_family(si->cf_id); + return do_with(cf.make_streaming_reader(cf.schema(), si->pr), [si] (auto& reader) { return repeat([si, &reader] () { return reader().then([si] (auto smopt) { if (smopt && si->db.column_family_exists(si->cf_id)) { diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 67f4e9ffcd..34c3f9fa51 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -176,8 +176,8 @@ public: const sstring& column_name, data_value expected) override { auto& db = _db->local(); - auto cf = db.find_column_family(ks_name, table_name); - auto schema = cf->schema(); + auto& cf = db.find_column_family(ks_name, table_name); + auto schema = cf.schema(); auto pkey = partition_key::from_deeply_exploded(*schema, pk); auto ckey = clustering_key::from_deeply_exploded(*schema, ck); auto exp = expected.type()->decompose(expected); @@ -189,9 +189,9 @@ public: column_name = std::move(column_name), exp = std::move(exp), table_name = std::move(table_name)] (database& db) mutable { - auto cf = db.find_column_family(ks_name, table_name); - auto schema = cf->schema(); - return cf->find_partition_slow(schema, pkey) + auto& cf = db.find_column_family(ks_name, table_name); + auto schema = cf.schema(); + return cf.find_partition_slow(schema, pkey) .then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { assert(p != nullptr); auto row = p->find_row(ckey);