Merge "Cut database-system_keyspace circular dependency" from Pavel Emelyanov

"
There's one via the database's compaction manager and large data handler
sub-services. Both need system keyspace to put their info into, but the
latter needs database naturally via query_processor->storage_proxy link.

The solution is to make c.m. | l.d.h. -> sys.ks. dependency be weak with
the help of shared_from_this(), described in details in patch #2 commit
message.

As a (not-that-)side effect this set removes a bunch of global qctx
calls.

refs: #11684 (this set seem to increase the chance of stepping on it)
"

* 'br-sysks-async-users' of https://github.com/xemul/scylla:
  large_data_handler: Use local system_keyspace to update entries
  system_keyspace: De-static compaction history update
  compaction_manager: Relax history paths
  database: Plug/unplug system_keyspace
  system_keyspace: Add .shutdown() method
This commit is contained in:
Botond Dénes
2022-10-11 08:52:04 +03:00
13 changed files with 88 additions and 31 deletions

View File

@@ -19,6 +19,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "utils/fb_utilities.hh"
#include "utils/UUID_gen.hh"
#include "db/system_keyspace.hh"
#include <cmath>
#include <boost/algorithm/cxx11/any_of.hpp>
#include <boost/range/algorithm/remove_if.hpp>
@@ -347,8 +348,15 @@ future<sstables::compaction_result> compaction_manager::task::compact_sstables(s
future<> compaction_manager::task::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
auto ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(res.stats.ended_at.time_since_epoch());
co_return co_await t.update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(), ended_at,
res.stats.start_size, res.stats.end_size);
if (_cm._sys_ks) {
// FIXME: add support to merged_rows. merged_rows is a histogram that
// shows how many sstables each row is merged from. This information
// cannot be accessed until we make combined_reader more generic,
// for example, by adding a reducer method.
auto sys_ks = _cm._sys_ks; // hold pointer on sys_ks
co_await sys_ks->update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(),
ended_at.count(), res.stats.start_size, res.stats.end_size, std::unordered_map<int32_t, int64_t>{});
}
}
class compaction_manager::major_compaction_task : public compaction_manager::task {
@@ -1681,6 +1689,14 @@ compaction::strategy_control& compaction_manager::get_strategy_control() const n
return *_strategy_control;
}
void compaction_manager::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
_sys_ks = sys_ks.shared_from_this();
}
void compaction_manager::unplug_system_keyspace() noexcept {
_sys_ks = nullptr;
}
double compaction_backlog_tracker::backlog() const {
return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions);
}

View File

@@ -38,6 +38,10 @@
#include "sstables/exceptions.hh"
#include "tombstone_gc.hh"
namespace db {
class system_keyspace;
}
class compacting_sstable_registration;
class repair_history_map {
@@ -285,6 +289,8 @@ private:
// being picked more than once.
seastar::named_semaphore _off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}};
seastar::shared_ptr<db::system_keyspace> _sys_ks;
std::function<void()> compaction_submission_callback();
// all registered tables are reevaluated at a constant interval.
// Submission is a NO-OP when there's nothing to do, so it's fine to call it regularly.
@@ -482,6 +488,9 @@ public:
// Run a function with compaction temporarily disabled for a table T.
future<> run_with_compaction_disabled(compaction::table_state& t, std::function<future<> ()> func);
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
void unplug_system_keyspace() noexcept;
// Adds a table to the compaction manager.
// Creates a compaction_state structure that can be used for submitting
// compaction jobs of all types.

View File

@@ -40,7 +40,6 @@ public:
virtual sstables::shared_sstable make_sstable() const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
virtual future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) = 0;
virtual future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0;
virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0;
virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0;

View File

@@ -7,7 +7,6 @@
*/
#include <seastar/core/print.hh>
#include "db/query_context.hh"
#include "db/system_keyspace.hh"
#include "db/large_data_handler.hh"
#include "sstables/sstables.hh"
@@ -63,6 +62,14 @@ future<> large_data_handler::stop() {
return _sem.wait(max_concurrency);
}
void large_data_handler::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
_sys_ks = sys_ks.shared_from_this();
}
void large_data_handler::unplug_system_keyspace() noexcept {
_sys_ks = nullptr;
}
template <typename T> static std::string key_to_str(const T& key, const schema& s) {
std::ostringstream oss;
oss << key.with_schema(s);
@@ -129,11 +136,9 @@ cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&
{}
template <typename... Args>
static future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size,
std::string_view desc, std::string_view extra_path, const std::vector<sstring> &extra_fields, Args&&... args) {
// FIXME This check is for test/cql-test-env that stop qctx (it does so because
// it stops query processor and doesn't want us to access its freed instantes)
if (!db::qctx) {
future<> cql_table_large_data_handler::try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size,
std::string_view desc, std::string_view extra_path, const std::vector<sstring> &extra_fields, Args&&... args) const {
if (!_sys_ks) {
return make_ready_future<>();
}
@@ -152,12 +157,13 @@ static future<> try_record(std::string_view large_table, const sstables::sstable
std::string pk_str = key_to_str(partition_key.to_partition_key(s), s);
auto timestamp = db_clock::now();
large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes) to {}", desc, ks_name, cf_name, pk_str, extra_path, size, sstable_name);
return db::qctx->execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
return _sys_ks->execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
.discard_result()
.handle_exception([ks_name, cf_name, large_table, sstable_name] (std::exception_ptr ep) {
large_data_logger.warn("Failed to add a record to system.large_{}s: ks = {}, table = {}, sst = {} exception = {}",
large_table, ks_name, cf_name, sstable_name, ep);
});
})
.finally([ p = _sys_ks ] {});
}
future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows) const {
@@ -212,16 +218,18 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable
}
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const {
assert(_sys_ks);
const sstring req =
format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?",
large_table_name);
large_data_logger.debug("Dropping entries from {}: ks = {}, table = {}, sst = {}",
large_table_name, s.ks_name(), s.cf_name(), sstable_name);
return db::qctx->execute_cql(req, s.ks_name(), s.cf_name(), sstable_name)
return _sys_ks->execute_cql(req, s.ks_name(), s.cf_name(), sstable_name)
.discard_result()
.handle_exception([&s, sstable_name, large_table_name] (std::exception_ptr ep) {
large_data_logger.warn("Failed to drop entries from {}: ks = {}, table = {}, sst = {} exception = {}",
large_table_name, s.ks_name(), s.cf_name(), sstable_name, ep);
});
})
.finally([ p = _sys_ks ] {});
}
}

View File

@@ -21,6 +21,8 @@ class key;
namespace db {
class system_keyspace;
class large_data_handler {
public:
struct stats {
@@ -62,6 +64,9 @@ protected:
private:
mutable large_data_handler::stats _stats;
protected:
seastar::shared_ptr<db::system_keyspace> _sys_ks;
public:
explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes, uint64_t rows_count_threshold, uint64_t collection_elements_count_threshold);
virtual ~large_data_handler() {}
@@ -125,6 +130,9 @@ public:
static sstring sst_filename(const sstables::sstable& sst);
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
void unplug_system_keyspace() noexcept;
protected:
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const = 0;
@@ -167,6 +175,11 @@ private:
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const;
future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const;
private:
template <typename... Args>
future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size,
std::string_view desc, std::string_view extra_path, const std::vector<sstring> &extra_fields, Args&&... args) const;
};
class nop_large_data_handler : public large_data_handler {

View File

@@ -2876,7 +2876,7 @@ future<> system_keyspace::update_compaction_history(utils::UUID uuid, sstring ks
, COMPACTION_HISTORY);
db_clock::time_point tp{db_clock::duration{compacted_at}};
return qctx->execute_cql(req, uuid, ksname, cfname, tp, bytes_in, bytes_out,
return execute_cql(req, uuid, ksname, cfname, tp, bytes_in, bytes_out,
make_map_value(map_type, prepare_rows_merged(rows_merged))).discard_result().handle_exception([] (auto ep) {
slogger.error("update compaction history failed: {}: ignored", ep);
});
@@ -3363,6 +3363,8 @@ future<> system_keyspace::start() {
qctx = std::make_unique<query_context>(_qp);
}
_db.local().plug_system_keyspace(*this);
// FIXME
// This should be coupled with setup_version()'s part committing these values into
// the system.local table. However, cql_test_env needs cached local_dc_rack strings,
@@ -3374,6 +3376,11 @@ future<> system_keyspace::start() {
co_return;
}
future<> system_keyspace::shutdown() {
_db.local().unplug_system_keyspace();
co_return;
}
future<> system_keyspace::stop() {
co_return;
}

View File

@@ -86,7 +86,7 @@ public:
virtual bool contains_keyspace(std::string_view) = 0;
};
class system_keyspace : public seastar::peering_sharded_service<system_keyspace> {
class system_keyspace : public seastar::peering_sharded_service<system_keyspace>, public seastar::async_sharded_service<system_keyspace> {
sharded<cql3::query_processor>& _qp;
sharded<replica::database>& _db;
std::unique_ptr<local_cache> _cache;
@@ -306,7 +306,7 @@ public:
std::unordered_map<int32_t, int64_t> rows_merged;
};
static future<> update_compaction_history(utils::UUID uuid, sstring ksname, sstring cfname, int64_t compacted_at, int64_t bytes_in, int64_t bytes_out,
future<> update_compaction_history(utils::UUID uuid, sstring ksname, sstring cfname, int64_t compacted_at, int64_t bytes_in, int64_t bytes_out,
std::unordered_map<int32_t, int64_t> rows_merged);
using compaction_history_consumer = noncopyable_function<future<>(const compaction_history_entry&)>;
static future<> get_compaction_history(compaction_history_consumer&& f);
@@ -468,10 +468,12 @@ public:
~system_keyspace();
future<> start();
future<> stop();
future<> shutdown();
private:
future<::shared_ptr<cql3::untyped_result_set>> execute_cql(const sstring& query_string, const std::initializer_list<data_value>& values);
public:
template <typename... Args>
future<::shared_ptr<cql3::untyped_result_set>> execute_cql(sstring req, Args&&... args) {
return execute_cql(req, { data_value(std::forward<Args>(args))... });

View File

@@ -2765,6 +2765,16 @@ database::as_data_dictionary() const {
return _impl.wrap(*this);
}
void database::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
_compaction_manager.plug_system_keyspace(sys_ks);
_large_data_handler->plug_system_keyspace(sys_ks);
}
void database::unplug_system_keyspace() noexcept {
_compaction_manager.unplug_system_keyspace();
_large_data_handler->unplug_system_keyspace();
}
} // namespace replica
template <typename T>

View File

@@ -1376,6 +1376,9 @@ public:
future<> drain();
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
void unplug_system_keyspace() noexcept;
private:
future<> flush_non_system_column_families();
future<> flush_system_column_families();

View File

@@ -2536,17 +2536,6 @@ public:
api::timestamp_type min_memtable_timestamp() const override {
return _cg.min_memtable_timestamp();
}
future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) override {
// FIXME: add support to merged_rows. merged_rows is a histogram that
// shows how many sstables each row is merged from. This information
// cannot be accessed until we make combined_reader more generic,
// for example, by adding a reducer method.
if (!db::qctx) {
return make_ready_future<>();
}
return db::system_keyspace::update_compaction_history(compaction_id, ks_name, cf_name, ended_at.count(),
bytes_in, bytes_out, std::unordered_map<int32_t, int64_t>{});
}
future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
if (offstrategy) {
return _cg.update_sstable_lists_on_off_strategy_completion(std::move(desc));

View File

@@ -2689,6 +2689,7 @@ future<> storage_service::do_drain() {
});
co_await _db.invoke_on_all(&replica::database::drain);
co_await _sys_ks.invoke_on_all(&db::system_keyspace::shutdown);
}
future<> storage_service::rebuild(sstring source_dc) {

View File

@@ -771,7 +771,10 @@ public:
}).get();
group0_client.init().get();
auto stop_system_keyspace = defer([] { db::qctx = {}; });
auto stop_system_keyspace = defer([&sys_ks] {
db::qctx = {};
sys_ks.invoke_on_all(&db::system_keyspace::shutdown).get();
});
auto shutdown_db = defer([&db] {
db.invoke_on_all(&replica::database::shutdown).get();

View File

@@ -108,9 +108,6 @@ public:
api::timestamp_type min_memtable_timestamp() const override {
return table().min_memtable_timestamp();
}
future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) override {
return make_ready_future<>();
}
future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
return table().as_table_state().on_compaction_completion(std::move(desc), offstrategy);
}