Merge "Cut database-system_keyspace circular dependency" from Pavel Emelyanov
" There's one via the database's compaction manager and large data handler sub-services. Both need system keyspace to put their info into, but the latter needs database naturally via query_processor->storage_proxy link. The solution is to make c.m. | l.d.h. -> sys.ks. dependency be weak with the help of shared_from_this(), described in details in patch #2 commit message. As a (not-that-)side effect this set removes a bunch of global qctx calls. refs: #11684 (this set seem to increase the chance of stepping on it) " * 'br-sysks-async-users' of https://github.com/xemul/scylla: large_data_handler: Use local system_keyspace to update entries system_keyspace: De-static compaction history update compaction_manager: Relax history paths database: Plug/unplug system_keyspace system_keyspace: Add .shutdown() method
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include <cmath>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
@@ -347,8 +348,15 @@ future<sstables::compaction_result> compaction_manager::task::compact_sstables(s
|
||||
future<> compaction_manager::task::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
|
||||
auto ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(res.stats.ended_at.time_since_epoch());
|
||||
|
||||
co_return co_await t.update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(), ended_at,
|
||||
res.stats.start_size, res.stats.end_size);
|
||||
if (_cm._sys_ks) {
|
||||
// FIXME: add support to merged_rows. merged_rows is a histogram that
|
||||
// shows how many sstables each row is merged from. This information
|
||||
// cannot be accessed until we make combined_reader more generic,
|
||||
// for example, by adding a reducer method.
|
||||
auto sys_ks = _cm._sys_ks; // hold pointer on sys_ks
|
||||
co_await sys_ks->update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(),
|
||||
ended_at.count(), res.stats.start_size, res.stats.end_size, std::unordered_map<int32_t, int64_t>{});
|
||||
}
|
||||
}
|
||||
|
||||
class compaction_manager::major_compaction_task : public compaction_manager::task {
|
||||
@@ -1681,6 +1689,14 @@ compaction::strategy_control& compaction_manager::get_strategy_control() const n
|
||||
return *_strategy_control;
|
||||
}
|
||||
|
||||
void compaction_manager::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
|
||||
_sys_ks = sys_ks.shared_from_this();
|
||||
}
|
||||
|
||||
void compaction_manager::unplug_system_keyspace() noexcept {
|
||||
_sys_ks = nullptr;
|
||||
}
|
||||
|
||||
double compaction_backlog_tracker::backlog() const {
|
||||
return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions);
|
||||
}
|
||||
|
||||
@@ -38,6 +38,10 @@
|
||||
#include "sstables/exceptions.hh"
|
||||
#include "tombstone_gc.hh"
|
||||
|
||||
namespace db {
|
||||
class system_keyspace;
|
||||
}
|
||||
|
||||
class compacting_sstable_registration;
|
||||
|
||||
class repair_history_map {
|
||||
@@ -285,6 +289,8 @@ private:
|
||||
// being picked more than once.
|
||||
seastar::named_semaphore _off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}};
|
||||
|
||||
seastar::shared_ptr<db::system_keyspace> _sys_ks;
|
||||
|
||||
std::function<void()> compaction_submission_callback();
|
||||
// all registered tables are reevaluated at a constant interval.
|
||||
// Submission is a NO-OP when there's nothing to do, so it's fine to call it regularly.
|
||||
@@ -482,6 +488,9 @@ public:
|
||||
// Run a function with compaction temporarily disabled for a table T.
|
||||
future<> run_with_compaction_disabled(compaction::table_state& t, std::function<future<> ()> func);
|
||||
|
||||
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
|
||||
void unplug_system_keyspace() noexcept;
|
||||
|
||||
// Adds a table to the compaction manager.
|
||||
// Creates a compaction_state structure that can be used for submitting
|
||||
// compaction jobs of all types.
|
||||
|
||||
@@ -40,7 +40,6 @@ public:
|
||||
virtual sstables::shared_sstable make_sstable() const = 0;
|
||||
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
|
||||
virtual api::timestamp_type min_memtable_timestamp() const = 0;
|
||||
virtual future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) = 0;
|
||||
virtual future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0;
|
||||
virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0;
|
||||
virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0;
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
*/
|
||||
|
||||
#include <seastar/core/print.hh>
|
||||
#include "db/query_context.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
@@ -63,6 +62,14 @@ future<> large_data_handler::stop() {
|
||||
return _sem.wait(max_concurrency);
|
||||
}
|
||||
|
||||
void large_data_handler::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
|
||||
_sys_ks = sys_ks.shared_from_this();
|
||||
}
|
||||
|
||||
void large_data_handler::unplug_system_keyspace() noexcept {
|
||||
_sys_ks = nullptr;
|
||||
}
|
||||
|
||||
template <typename T> static std::string key_to_str(const T& key, const schema& s) {
|
||||
std::ostringstream oss;
|
||||
oss << key.with_schema(s);
|
||||
@@ -129,11 +136,9 @@ cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service&
|
||||
{}
|
||||
|
||||
template <typename... Args>
|
||||
static future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size,
|
||||
std::string_view desc, std::string_view extra_path, const std::vector<sstring> &extra_fields, Args&&... args) {
|
||||
// FIXME This check is for test/cql-test-env that stop qctx (it does so because
|
||||
// it stops query processor and doesn't want us to access its freed instantes)
|
||||
if (!db::qctx) {
|
||||
future<> cql_table_large_data_handler::try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size,
|
||||
std::string_view desc, std::string_view extra_path, const std::vector<sstring> &extra_fields, Args&&... args) const {
|
||||
if (!_sys_ks) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -152,12 +157,13 @@ static future<> try_record(std::string_view large_table, const sstables::sstable
|
||||
std::string pk_str = key_to_str(partition_key.to_partition_key(s), s);
|
||||
auto timestamp = db_clock::now();
|
||||
large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes) to {}", desc, ks_name, cf_name, pk_str, extra_path, size, sstable_name);
|
||||
return db::qctx->execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
|
||||
return _sys_ks->execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
|
||||
.discard_result()
|
||||
.handle_exception([ks_name, cf_name, large_table, sstable_name] (std::exception_ptr ep) {
|
||||
large_data_logger.warn("Failed to add a record to system.large_{}s: ks = {}, table = {}, sst = {} exception = {}",
|
||||
large_table, ks_name, cf_name, sstable_name, ep);
|
||||
});
|
||||
})
|
||||
.finally([ p = _sys_ks ] {});
|
||||
}
|
||||
|
||||
future<> cql_table_large_data_handler::record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows) const {
|
||||
@@ -212,16 +218,18 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable
|
||||
}
|
||||
|
||||
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const {
|
||||
assert(_sys_ks);
|
||||
const sstring req =
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?",
|
||||
large_table_name);
|
||||
large_data_logger.debug("Dropping entries from {}: ks = {}, table = {}, sst = {}",
|
||||
large_table_name, s.ks_name(), s.cf_name(), sstable_name);
|
||||
return db::qctx->execute_cql(req, s.ks_name(), s.cf_name(), sstable_name)
|
||||
return _sys_ks->execute_cql(req, s.ks_name(), s.cf_name(), sstable_name)
|
||||
.discard_result()
|
||||
.handle_exception([&s, sstable_name, large_table_name] (std::exception_ptr ep) {
|
||||
large_data_logger.warn("Failed to drop entries from {}: ks = {}, table = {}, sst = {} exception = {}",
|
||||
large_table_name, s.ks_name(), s.cf_name(), sstable_name, ep);
|
||||
});
|
||||
})
|
||||
.finally([ p = _sys_ks ] {});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ class key;
|
||||
|
||||
namespace db {
|
||||
|
||||
class system_keyspace;
|
||||
|
||||
class large_data_handler {
|
||||
public:
|
||||
struct stats {
|
||||
@@ -62,6 +64,9 @@ protected:
|
||||
private:
|
||||
mutable large_data_handler::stats _stats;
|
||||
|
||||
protected:
|
||||
seastar::shared_ptr<db::system_keyspace> _sys_ks;
|
||||
|
||||
public:
|
||||
explicit large_data_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes, uint64_t rows_count_threshold, uint64_t collection_elements_count_threshold);
|
||||
virtual ~large_data_handler() {}
|
||||
@@ -125,6 +130,9 @@ public:
|
||||
|
||||
static sstring sst_filename(const sstables::sstable& sst);
|
||||
|
||||
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
|
||||
void unplug_system_keyspace() noexcept;
|
||||
|
||||
protected:
|
||||
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const = 0;
|
||||
@@ -167,6 +175,11 @@ private:
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const;
|
||||
future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const;
|
||||
|
||||
private:
|
||||
template <typename... Args>
|
||||
future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size,
|
||||
std::string_view desc, std::string_view extra_path, const std::vector<sstring> &extra_fields, Args&&... args) const;
|
||||
};
|
||||
|
||||
class nop_large_data_handler : public large_data_handler {
|
||||
|
||||
@@ -2876,7 +2876,7 @@ future<> system_keyspace::update_compaction_history(utils::UUID uuid, sstring ks
|
||||
, COMPACTION_HISTORY);
|
||||
|
||||
db_clock::time_point tp{db_clock::duration{compacted_at}};
|
||||
return qctx->execute_cql(req, uuid, ksname, cfname, tp, bytes_in, bytes_out,
|
||||
return execute_cql(req, uuid, ksname, cfname, tp, bytes_in, bytes_out,
|
||||
make_map_value(map_type, prepare_rows_merged(rows_merged))).discard_result().handle_exception([] (auto ep) {
|
||||
slogger.error("update compaction history failed: {}: ignored", ep);
|
||||
});
|
||||
@@ -3363,6 +3363,8 @@ future<> system_keyspace::start() {
|
||||
qctx = std::make_unique<query_context>(_qp);
|
||||
}
|
||||
|
||||
_db.local().plug_system_keyspace(*this);
|
||||
|
||||
// FIXME
|
||||
// This should be coupled with setup_version()'s part committing these values into
|
||||
// the system.local table. However, cql_test_env needs cached local_dc_rack strings,
|
||||
@@ -3374,6 +3376,11 @@ future<> system_keyspace::start() {
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> system_keyspace::shutdown() {
|
||||
_db.local().unplug_system_keyspace();
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> system_keyspace::stop() {
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ public:
|
||||
virtual bool contains_keyspace(std::string_view) = 0;
|
||||
};
|
||||
|
||||
class system_keyspace : public seastar::peering_sharded_service<system_keyspace> {
|
||||
class system_keyspace : public seastar::peering_sharded_service<system_keyspace>, public seastar::async_sharded_service<system_keyspace> {
|
||||
sharded<cql3::query_processor>& _qp;
|
||||
sharded<replica::database>& _db;
|
||||
std::unique_ptr<local_cache> _cache;
|
||||
@@ -306,7 +306,7 @@ public:
|
||||
std::unordered_map<int32_t, int64_t> rows_merged;
|
||||
};
|
||||
|
||||
static future<> update_compaction_history(utils::UUID uuid, sstring ksname, sstring cfname, int64_t compacted_at, int64_t bytes_in, int64_t bytes_out,
|
||||
future<> update_compaction_history(utils::UUID uuid, sstring ksname, sstring cfname, int64_t compacted_at, int64_t bytes_in, int64_t bytes_out,
|
||||
std::unordered_map<int32_t, int64_t> rows_merged);
|
||||
using compaction_history_consumer = noncopyable_function<future<>(const compaction_history_entry&)>;
|
||||
static future<> get_compaction_history(compaction_history_consumer&& f);
|
||||
@@ -468,10 +468,12 @@ public:
|
||||
~system_keyspace();
|
||||
future<> start();
|
||||
future<> stop();
|
||||
future<> shutdown();
|
||||
|
||||
private:
|
||||
future<::shared_ptr<cql3::untyped_result_set>> execute_cql(const sstring& query_string, const std::initializer_list<data_value>& values);
|
||||
|
||||
public:
|
||||
template <typename... Args>
|
||||
future<::shared_ptr<cql3::untyped_result_set>> execute_cql(sstring req, Args&&... args) {
|
||||
return execute_cql(req, { data_value(std::forward<Args>(args))... });
|
||||
|
||||
@@ -2765,6 +2765,16 @@ database::as_data_dictionary() const {
|
||||
return _impl.wrap(*this);
|
||||
}
|
||||
|
||||
void database::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
|
||||
_compaction_manager.plug_system_keyspace(sys_ks);
|
||||
_large_data_handler->plug_system_keyspace(sys_ks);
|
||||
}
|
||||
|
||||
void database::unplug_system_keyspace() noexcept {
|
||||
_compaction_manager.unplug_system_keyspace();
|
||||
_large_data_handler->unplug_system_keyspace();
|
||||
}
|
||||
|
||||
} // namespace replica
|
||||
|
||||
template <typename T>
|
||||
|
||||
@@ -1376,6 +1376,9 @@ public:
|
||||
|
||||
future<> drain();
|
||||
|
||||
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
|
||||
void unplug_system_keyspace() noexcept;
|
||||
|
||||
private:
|
||||
future<> flush_non_system_column_families();
|
||||
future<> flush_system_column_families();
|
||||
|
||||
@@ -2536,17 +2536,6 @@ public:
|
||||
api::timestamp_type min_memtable_timestamp() const override {
|
||||
return _cg.min_memtable_timestamp();
|
||||
}
|
||||
future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) override {
|
||||
// FIXME: add support to merged_rows. merged_rows is a histogram that
|
||||
// shows how many sstables each row is merged from. This information
|
||||
// cannot be accessed until we make combined_reader more generic,
|
||||
// for example, by adding a reducer method.
|
||||
if (!db::qctx) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return db::system_keyspace::update_compaction_history(compaction_id, ks_name, cf_name, ended_at.count(),
|
||||
bytes_in, bytes_out, std::unordered_map<int32_t, int64_t>{});
|
||||
}
|
||||
future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
|
||||
if (offstrategy) {
|
||||
return _cg.update_sstable_lists_on_off_strategy_completion(std::move(desc));
|
||||
|
||||
@@ -2689,6 +2689,7 @@ future<> storage_service::do_drain() {
|
||||
});
|
||||
|
||||
co_await _db.invoke_on_all(&replica::database::drain);
|
||||
co_await _sys_ks.invoke_on_all(&db::system_keyspace::shutdown);
|
||||
}
|
||||
|
||||
future<> storage_service::rebuild(sstring source_dc) {
|
||||
|
||||
@@ -771,7 +771,10 @@ public:
|
||||
}).get();
|
||||
|
||||
group0_client.init().get();
|
||||
auto stop_system_keyspace = defer([] { db::qctx = {}; });
|
||||
auto stop_system_keyspace = defer([&sys_ks] {
|
||||
db::qctx = {};
|
||||
sys_ks.invoke_on_all(&db::system_keyspace::shutdown).get();
|
||||
});
|
||||
|
||||
auto shutdown_db = defer([&db] {
|
||||
db.invoke_on_all(&replica::database::shutdown).get();
|
||||
|
||||
@@ -108,9 +108,6 @@ public:
|
||||
api::timestamp_type min_memtable_timestamp() const override {
|
||||
return table().min_memtable_timestamp();
|
||||
}
|
||||
future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
|
||||
return table().as_table_state().on_compaction_completion(std::move(desc), offstrategy);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user