mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 03:12:13 +00:00
Abort if audit storage fails to start rather than silently running with an unaudited maintenance socket. Also assert that storage is already stopped when the audit service is destroyed, documenting the defer-stack ordering requirement. Refs SCYLLADB-1615 Refs SCYLLADB-1695
376 lines
16 KiB
C++
376 lines
16 KiB
C++
/*
|
|
* Copyright (C) 2017 ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
|
*/
|
|
|
|
#include <seastar/core/future-util.hh>
|
|
#include "audit/audit.hh"
|
|
#include "db/config.hh"
|
|
#include "cql3/cql_statement.hh"
|
|
#include "cql3/statements/batch_statement.hh"
|
|
#include "cql3/statements/modification_statement.hh"
|
|
#include "storage_helper.hh"
|
|
#include "audit_cf_storage_helper.hh"
|
|
#include "audit_syslog_storage_helper.hh"
|
|
#include "audit_composite_storage_helper.hh"
|
|
#include "audit.hh"
|
|
#include "../db/config.hh"
|
|
|
|
#include <boost/algorithm/string/split.hpp>
|
|
#include <boost/algorithm/string/trim.hpp>
|
|
#include <boost/algorithm/string/classification.hpp>
|
|
|
|
|
|
namespace audit {
|
|
|
|
logging::logger logger("audit");
|
|
|
|
static std::set<sstring> parse_audit_modes(const sstring& data) {
|
|
std::set<sstring> result;
|
|
if (!data.empty()) {
|
|
std::vector<sstring> audit_modes;
|
|
boost::split(audit_modes, data, boost::is_any_of(","));
|
|
if (audit_modes.empty()) {
|
|
return {};
|
|
}
|
|
for (sstring& audit_mode : audit_modes) {
|
|
boost::trim(audit_mode);
|
|
if (audit_mode == "none") {
|
|
return {};
|
|
}
|
|
if (audit_mode != "table" && audit_mode != "syslog") {
|
|
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", audit_mode));
|
|
}
|
|
result.insert(std::move(audit_mode));
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
static std::unique_ptr<storage_helper> create_storage_helper(const std::set<sstring>& audit_modes, cql3::query_processor& qp, service::migration_manager& mm) {
|
|
SCYLLA_ASSERT(!audit_modes.empty() && !audit_modes.contains("none"));
|
|
|
|
std::vector<std::unique_ptr<storage_helper>> helpers;
|
|
for (const sstring& audit_mode : audit_modes) {
|
|
if (audit_mode == "table") {
|
|
helpers.emplace_back(std::make_unique<audit_cf_storage_helper>(qp, mm));
|
|
} else if (audit_mode == "syslog") {
|
|
helpers.emplace_back(std::make_unique<audit_syslog_storage_helper>(qp, mm));
|
|
}
|
|
}
|
|
|
|
SCYLLA_ASSERT(!helpers.empty());
|
|
if (helpers.size() == 1) {
|
|
return std::move(helpers.front());
|
|
}
|
|
return std::make_unique<audit_composite_storage_helper>(std::move(helpers));
|
|
}
|
|
|
|
static sstring category_to_string(statement_category category)
|
|
{
|
|
switch (category) {
|
|
case statement_category::QUERY: return "QUERY";
|
|
case statement_category::DML: return "DML";
|
|
case statement_category::DDL: return "DDL";
|
|
case statement_category::DCL: return "DCL";
|
|
case statement_category::AUTH: return "AUTH";
|
|
case statement_category::ADMIN: return "ADMIN";
|
|
}
|
|
return "";
|
|
}
|
|
|
|
sstring audit_info::category_string() const {
|
|
return category_to_string(_category);
|
|
}
|
|
|
|
static category_set parse_audit_categories(const sstring& data) {
|
|
category_set result;
|
|
if (!data.empty()) {
|
|
std::vector<sstring> tokens;
|
|
boost::split(tokens, data, boost::is_any_of(","));
|
|
for (sstring& category : tokens) {
|
|
boost::trim(category);
|
|
if (category == "QUERY") {
|
|
result.set(statement_category::QUERY);
|
|
} else if (category == "DML") {
|
|
result.set(statement_category::DML);
|
|
} else if (category == "DDL") {
|
|
result.set(statement_category::DDL);
|
|
} else if (category == "DCL") {
|
|
result.set(statement_category::DCL);
|
|
} else if (category == "AUTH") {
|
|
result.set(statement_category::AUTH);
|
|
} else if (category == "ADMIN") {
|
|
result.set(statement_category::ADMIN);
|
|
} else {
|
|
throw audit_exception(fmt::format("Bad configuration: invalid 'audit_categories': {}", data));
|
|
}
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
static audit::audited_tables_t parse_audit_tables(const sstring& data) {
|
|
audit::audited_tables_t result;
|
|
if (!data.empty()) {
|
|
std::vector<sstring> tokens;
|
|
boost::split(tokens, data, boost::is_any_of(","));
|
|
for (sstring& token : tokens) {
|
|
std::vector<sstring> parts;
|
|
boost::split(parts, token, boost::is_any_of("."));
|
|
if (parts.size() != 2) {
|
|
throw audit_exception(fmt::format("Bad configuration: invalid 'audit_tables': {}", data));
|
|
}
|
|
boost::trim(parts[0]);
|
|
boost::trim(parts[1]);
|
|
// The real keyspace name of an Alternator table T is
|
|
// "alternator_T". The audit_tables config flag uses the format
|
|
// "alternator.T" to refer to such tables, so we expand it here
|
|
// to the real keyspace name.
|
|
if (parts[0] == "alternator") {
|
|
parts[0] = "alternator_" + parts[1];
|
|
}
|
|
result[parts[0]].insert(std::move(parts[1]));
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
static audit::audited_keyspaces_t parse_audit_keyspaces(const sstring& data) {
|
|
audit::audited_keyspaces_t result;
|
|
if (!data.empty()) {
|
|
std::vector<sstring> tokens;
|
|
boost::split(tokens, data, boost::is_any_of(","));
|
|
for (sstring& token : tokens) {
|
|
boost::trim(token);
|
|
result.insert(std::move(token));
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
audit::audit(locator::shared_token_metadata& token_metadata,
|
|
cql3::query_processor& qp,
|
|
service::migration_manager& mm,
|
|
std::set<sstring>&& audit_modes,
|
|
audited_keyspaces_t&& audited_keyspaces,
|
|
audited_tables_t&& audited_tables,
|
|
category_set&& audited_categories,
|
|
const db::config& cfg)
|
|
: _token_metadata(token_metadata)
|
|
, _audited_keyspaces(std::move(audited_keyspaces))
|
|
, _audited_tables(std::move(audited_tables))
|
|
, _audited_categories(std::move(audited_categories))
|
|
, _cfg(cfg)
|
|
, _cfg_keyspaces_observer(cfg.audit_keyspaces.observe([this] (sstring const& new_value){ update_config<audited_keyspaces_t>(new_value, parse_audit_keyspaces, _audited_keyspaces); }))
|
|
, _cfg_tables_observer(cfg.audit_tables.observe([this] (sstring const& new_value){ update_config<audited_tables_t>(new_value, parse_audit_tables, _audited_tables); }))
|
|
, _cfg_categories_observer(cfg.audit_categories.observe([this] (sstring const& new_value){ update_config<category_set>(new_value, parse_audit_categories, _audited_categories); }))
|
|
{
|
|
_storage_helper_ptr = create_storage_helper(std::move(audit_modes), qp, mm);
|
|
}
|
|
|
|
audit::~audit() = default;
|
|
|
|
future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
|
|
std::set<sstring> audit_modes = parse_audit_modes(cfg.audit());
|
|
if (audit_modes.empty()) {
|
|
logger.info("Audit is disabled");
|
|
return make_ready_future<>();
|
|
}
|
|
category_set audited_categories = parse_audit_categories(cfg.audit_categories());
|
|
audit::audited_tables_t audited_tables = parse_audit_tables(cfg.audit_tables());
|
|
audit::audited_keyspaces_t audited_keyspaces = parse_audit_keyspaces(cfg.audit_keyspaces());
|
|
|
|
logger.info("Audit is enabled. Auditing to: \"{}\", with the following categories: \"{}\", keyspaces: \"{}\", and tables: \"{}\"",
|
|
cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables());
|
|
|
|
return audit_instance().start(std::ref(stm),
|
|
std::ref(qp),
|
|
std::ref(mm),
|
|
std::move(audit_modes),
|
|
std::move(audited_keyspaces),
|
|
std::move(audited_tables),
|
|
std::move(audited_categories),
|
|
std::cref(cfg));
|
|
}
|
|
|
|
future<> audit::start_storage(const db::config& cfg) {
|
|
if (!audit_instance().local_is_initialized()) {
|
|
return make_ready_future<>();
|
|
}
|
|
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
|
|
return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] {
|
|
local_audit._storage_running = true;
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> audit::stop_storage() {
|
|
if (!audit_instance().local_is_initialized()) {
|
|
return make_ready_future<>();
|
|
}
|
|
return audit_instance().invoke_on_all([] (audit& local_audit) {
|
|
local_audit._storage_running = false;
|
|
return local_audit._storage_helper_ptr->stop();
|
|
});
|
|
}
|
|
|
|
future<> audit::stop_audit() {
|
|
if (!audit_instance().local_is_initialized()) {
|
|
return make_ready_future<>();
|
|
}
|
|
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
|
|
SCYLLA_ASSERT(!local_audit._storage_running);
|
|
return local_audit.shutdown();
|
|
}).then([] {
|
|
return audit::audit::audit_instance().stop();
|
|
});
|
|
}
|
|
|
|
audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch) {
|
|
if (!audit_instance().local_is_initialized()) {
|
|
return nullptr;
|
|
}
|
|
return std::make_unique<audit_info>(cat, keyspace, table, batch);
|
|
}
|
|
|
|
future<> audit::shutdown() {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<> audit::log(const audit_info& audit_info, const service::client_state& client_state, std::optional<db::consistency_level> cl, bool error) {
|
|
thread_local static sstring no_username("undefined");
|
|
static const sstring anonymous_username("anonymous");
|
|
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
|
|
socket_address client_ip = client_state.get_client_address().addr();
|
|
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
|
if (!_storage_running) {
|
|
on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
|
|
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
|
audit_info.query(), client_ip, audit_info.table(), username));
|
|
return make_ready_future<>();
|
|
}
|
|
if (logger.is_enabled(logging::log_level::debug)) {
|
|
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
|
|
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
|
audit_info.query(), client_ip, audit_info.table(), username);
|
|
}
|
|
return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, &audit_info, node_ip, client_ip, cl, username, error)
|
|
.handle_exception([audit_info, node_ip, client_ip, cl, username, error] (auto ep) {
|
|
logger.error("Unexpected exception when writing log with: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {} exception {}",
|
|
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
|
audit_info.query(), client_ip, audit_info.table(), username, ep);
|
|
});
|
|
}
|
|
|
|
static future<> maybe_log(const audit_info& audit_info, const service::client_state& client_state, std::optional<db::consistency_level> cl, bool error) {
|
|
if(audit::audit_instance().local_is_initialized() && audit::local_audit_instance().should_log(audit_info)) {
|
|
return audit::local_audit_instance().log(audit_info, client_state, cl, error);
|
|
}
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
static future<> inspect(const audit_info& audit_info, const service::query_state& query_state, const cql3::query_options& options, bool error) {
|
|
return maybe_log(audit_info, query_state.get_client_state(), options.get_consistency(), error);
|
|
}
|
|
|
|
future<> inspect(shared_ptr<cql3::cql_statement> statement, const service::query_state& query_state, const cql3::query_options& options, bool error) {
|
|
const auto audit_info = statement->get_audit_info();
|
|
if (audit_info == nullptr) {
|
|
return make_ready_future<>();
|
|
}
|
|
if (audit_info->batch()) {
|
|
cql3::statements::batch_statement* batch = static_cast<cql3::statements::batch_statement*>(statement.get());
|
|
return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) {
|
|
return inspect(m.statement, query_state, options, error);
|
|
});
|
|
} else {
|
|
return inspect(*audit_info, query_state, options, error);
|
|
}
|
|
}
|
|
|
|
future<> inspect(const audit_info_alternator& ai, const service::client_state& client_state, bool error) {
|
|
return maybe_log(static_cast<const audit_info&>(ai), client_state, ai.get_cl(), error);
|
|
}
|
|
|
|
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
|
|
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
|
if (!_storage_running) {
|
|
on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}",
|
|
node_ip, client_ip, username, error ? "true" : "false"));
|
|
return make_ready_future<>();
|
|
}
|
|
if (logger.is_enabled(logging::log_level::debug)) {
|
|
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
|
|
node_ip, client_ip, username, error ? "true" : "false");
|
|
}
|
|
return futurize_invoke(std::mem_fn(&storage_helper::write_login), _storage_helper_ptr, username, node_ip, client_ip, error)
|
|
.handle_exception([username, node_ip, client_ip, error] (auto ep) {
|
|
logger.error("Unexpected exception when writing login log with: node_ip {} client_ip {} username {} error {} exception {}",
|
|
node_ip, client_ip, username, error, ep);
|
|
});
|
|
}
|
|
|
|
future<> inspect_login(const sstring& username, socket_address client_ip, bool error) {
|
|
if (!audit::audit_instance().local_is_initialized() || !audit::local_audit_instance().should_log_login()) {
|
|
return make_ready_future<>();
|
|
}
|
|
return audit::local_audit_instance().log_login(username, client_ip, error);
|
|
}
|
|
|
|
bool audit::should_log_table(std::string_view keyspace, std::string_view name) const {
|
|
auto keyspace_it = _audited_tables.find(keyspace);
|
|
return keyspace_it != _audited_tables.cend() && keyspace_it->second.find(name) != keyspace_it->second.cend();
|
|
}
|
|
|
|
bool audit::should_log(const audit_info& audit_info) const {
|
|
return will_log(audit_info.category(), audit_info.keyspace(), audit_info.table());
|
|
}
|
|
|
|
bool audit::will_log(statement_category cat, std::string_view keyspace, std::string_view table) const {
|
|
// If keyspace is empty (e.g., ListTables, or batch operations spanning
|
|
// multiple tables), the operation cannot be filtered by keyspace/table,
|
|
// so it is logged whenever the category matches.
|
|
return _audited_categories.contains(cat)
|
|
&& (keyspace.empty()
|
|
|| _audited_keyspaces.find(keyspace) != _audited_keyspaces.cend()
|
|
|| should_log_table(keyspace, table)
|
|
|| cat == statement_category::AUTH
|
|
|| cat == statement_category::ADMIN
|
|
|| cat == statement_category::DCL);
|
|
}
|
|
|
|
template<class T>
|
|
void audit::update_config(const sstring & new_value, std::function<T(const sstring&)> parse_func, T& cfg_parameter)
|
|
{
|
|
try {
|
|
cfg_parameter = parse_func(new_value);
|
|
} catch (...) {
|
|
logger.error("Audit configuration update failed because cannot parse value=\"{}\".", new_value);
|
|
return;
|
|
}
|
|
|
|
// If update_config is called with an invalid new_value, this line is not reached.
|
|
// But logging the invalid value must be avoided later, when a different configuration parameter is changed to a correct value.
|
|
// That's why values from _audited_{categories, keyspaces, tables} are logged instead of _cfg.audit_{categories, keyspaces, tables}
|
|
|
|
// Each table as "keyspace.table_name" like in the configuration file
|
|
auto table_entries = _audited_tables | std::views::transform([](const auto& pair) {
|
|
return pair.second | std::views::transform([&](const std::string& table_name) {
|
|
return fmt::format("{}.{}", pair.first, table_name);
|
|
});
|
|
}) | std::views::join;
|
|
|
|
logger.info(
|
|
"Audit configuration is updated. Auditing to: \"{}\", with the following categories: \"{}\", keyspaces: \"{}\", and tables: \"{}\".",
|
|
_cfg.audit(),
|
|
fmt::join(std::views::transform(_audited_categories, category_to_string), ","),
|
|
fmt::join(_audited_keyspaces, ","),
|
|
fmt::join(table_entries, ","));
|
|
}
|
|
|
|
}
|