Files
scylladb/ent/encryption/replicated_key_provider.cc
Botond Dénes 19efd7f6f9 Merge 'The system_replicated_keys should be mark as a system keyspace' from Amnon Heiman
This PR marks system_replicated_keys as a system keyspace.
It was missing when the keyspace was added.

A side effect of that is that metrics that are not supposed to be reported are.
Fixes #27903

Closes scylladb/scylladb#27954

* github.com:scylladb/scylladb:
  distributed_loader: system_replicated_keys as system keyspace
  replicated_key_provider: make KSNAME public
2026-01-19 09:37:41 +02:00

485 lines
18 KiB
C++

/*
* Copyright (C) 2015 ScyllaDB
*
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <unordered_map>
#include <stdexcept>
#include <regex>
#include <tuple>
#include <openssl/evp.h>
#include <openssl/rand.h>
#include <seastar/core/semaphore.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/coroutine.hh>
#include <fmt/ranges.h>
#include <fmt/std.h>
#include "utils/to_string.hh"
#include "replicated_key_provider.hh"
#include "encryption.hh"
#include "encryption_exceptions.hh"
#include "local_file_provider.hh"
#include "symmetric_key.hh"
#include "replica/database.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "utils/hash.hh"
#include "service/storage_service.hh"
#include "service/migration_manager.hh"
#include "compaction/compaction_manager.hh"
#include "replica/distributed_loader.hh"
#include "schema/schema_builder.hh"
#include "db/system_keyspace.hh"
#include "db/extensions.hh"
#include "locator/everywhere_replication_strategy.hh"
#include "release.hh"
namespace encryption {
static auto constexpr TABLENAME = "encrypted_keys";
static logger log("replicated_key_provider");
using utils::UUID;
class replicated_key_provider : public key_provider {
public:
static constexpr int8_t version = 0;
/**
* Header:
* 1 byte version
* 16 bytes UUID of key
* 16 bytes MD5 of UUID
*/
static const size_t header_size = 33;
struct key_id {
key_info info;
opt_bytes id;
key_id(key_info k, opt_bytes b = {})
: info(std::move(k))
, id(std::move(b))
{}
bool operator==(const key_id& v) const {
return info == v.info && id == v.id;
}
};
struct key_id_hash {
size_t operator()(const key_id& id) const {
return utils::tuple_hash()(std::tie(id.info.alg, id.info.len, id.id));
}
};
replicated_key_provider(encryption_context& ctxt, shared_ptr<system_key> system_key, shared_ptr<key_provider> local_provider)
: _ctxt(ctxt)
, _system_key(std::move(system_key))
, _local_provider(std::move(local_provider))
{}
future<std::tuple<key_ptr, opt_bytes>> key(const key_info&, opt_bytes = {}) override;
future<> validate() const override;
future<> maybe_initialize_tables();
static future<> do_initialize_tables(::replica::database& db, service::migration_manager&);
bool should_delay_read(const opt_bytes& id) const override {
if (!id || _initialized) {
return false;
}
if (!_initialized) {
return true;
}
auto& qp = _ctxt.get_query_processor();
// This check should be ok, and even somewhat redundant. "Initialized" above
// will only be set once we've generated/queried a key not passing through here
// (i.e. a key for write _or_ commit log (should we allow this)). This can only be
// done if:
// a.) Encryption was already set up, thus table existed and we waited
// for distributed_tables in "ensure_populated"
// b.) Encryption was added. In which case we are way past bootstrap
// and can receive user commands.
// c.) System table/commit log write, with either first use of this provider,
// in which case we're creating the table (here at least) - thus fine,
// or again, we've waited through "ensure_populated", so keys are
// readable. At worst, we create a few extra keys.
// Note: currently c.) is not relevant, as we don't support system/commitlog
// encryption using repl_prov.
return !qp.local_is_initialized();
}
void print(std::ostream& os) const override {
os << "system_key=" << _system_key->name() << ", local=" << *_local_provider;
}
private:
void store_key(const key_id&, const UUID&, key_ptr);
static opt_bytes decode_id(const opt_bytes&);
static bytes encode_id(const UUID&);
future<std::tuple<UUID, key_ptr>> get_key(const key_info&, opt_bytes = {});
future<key_ptr> load_or_create(const key_info&);
future<key_ptr> load_or_create_local(const key_info&);
future<> read_key_file();
future<> write_key_file();
template<typename... Args>
future<::shared_ptr<cql3::untyped_result_set>> query(sstring, Args&& ...);
future<> force_blocking_flush();
encryption_context& _ctxt;
shared_ptr<system_key> _system_key;
shared_ptr<key_provider> _local_provider;
std::unordered_map<key_id, std::pair<UUID, key_ptr>, key_id_hash> _keys;
bool _initialized = false;
bool _use_cache = true;
friend class replicated_key_provider_factory;
static const utils::UUID local_fallback_uuid;
static const bytes local_fallback_id;
static const bytes_view local_fallback_bytes;
};
using namespace std::chrono_literals;
static const timeout_config rkp_db_timeout_config {
5s, 5s, 5s, 5s, 5s, 5s, 5s,
};
static service::query_state& rkp_db_query_state() {
static thread_local service::client_state cs(service::client_state::internal_tag{}, rkp_db_timeout_config);
static thread_local service::query_state qs(cs, empty_service_permit());
return qs;
}
template<typename... Args>
future<::shared_ptr<cql3::untyped_result_set>> replicated_key_provider::query(sstring q, Args&& ...params) {
auto mode = co_await _ctxt.get_storage_service().local().get_operation_mode();
if (mode != service::storage_service::mode::STARTING) {
co_return co_await _ctxt.get_query_processor().local().execute_internal(q, { std::forward<Args>(params)...}, cql3::query_processor::cache_internal::no);
}
co_return co_await _ctxt.get_query_processor().local().execute_internal(q, db::consistency_level::ONE, rkp_db_query_state(), { std::forward<Args>(params)...}, cql3::query_processor::cache_internal::no);
}
future<> replicated_key_provider::force_blocking_flush() {
return _ctxt.get_database().invoke_on_all([](replica::database& db) {
// if (!Boolean.getBoolean("cassandra.unsafesystem"))
replica::column_family& cf = db.find_column_family(replicated_key_provider_factory::KSNAME, TABLENAME);
return cf.flush();
});
}
void replicated_key_provider::store_key(const key_id& id, const UUID& uuid, key_ptr k) {
if (!_use_cache) {
return;
}
_keys[id] = std::make_pair(uuid, k);
if (!id.id) {
_keys[key_id(id.info, uuid.serialize())] = std::make_pair(uuid, k);
}
}
opt_bytes replicated_key_provider::decode_id(const opt_bytes& b) {
if (b) {
auto i = b->begin();
auto v = *i++;
if (v == version && b->size() == 33) {
bytes id(i + 1, i + 1 + 16);
bytes md(i + 1 + 16, b->end());
if (calculate_md5(id) == md) {
return id;
}
}
}
return std::nullopt;
}
bytes replicated_key_provider::encode_id(const UUID& uuid) {
bytes b{bytes::initialized_later(), header_size};
auto i = b.begin();
*i++ = version;
uuid.serialize(i);
auto md = calculate_md5(b, 1, 16);
std::copy(md.begin(), md.end(), i);
return b;
}
const utils::UUID replicated_key_provider::local_fallback_uuid(0u, 0u); // not valid!
const bytes replicated_key_provider::local_fallback_id = encode_id(local_fallback_uuid);
const bytes_view replicated_key_provider::local_fallback_bytes(local_fallback_id.data() + 1, 16);
future<std::tuple<key_ptr, opt_bytes>> replicated_key_provider::key(const key_info& info, opt_bytes input) {
opt_bytes id;
if (input) { //reading header?
auto v = *input;
if (v[0] == version) {
bytes bid(v.begin() + 1, v.begin() + 1 + 16);
bytes md(v.begin() + 1 + 16, v.begin() + 1 + 32);
if (calculate_md5(bid) == md) {
id = bid;
}
}
}
bool try_local = id == local_fallback_bytes;
// if the id indicates the key came from local fallback, don't even
// try keyspace lookup.
if (!try_local) {
try {
auto [uuid, k] = co_await get_key(info, std::move(id));
co_return std::make_tuple(k, encode_id(uuid));
} catch (std::invalid_argument& e) {
std::throw_with_nested(configuration_error(e.what()));
} catch (...) {
auto ep = std::current_exception();
log.warn("Exception looking up key {}: {}", info, ep);
if (_local_provider) {
try {
std::rethrow_exception(ep);
} catch (replica::no_such_keyspace&) {
} catch (exceptions::invalid_request_exception&) {
} catch (exceptions::read_failure_exception&) {
} catch (...) {
std::throw_with_nested(service_error(fmt::format("key: {}", std::current_exception())));
}
if (!id) {
try_local = true;
}
}
if (!try_local) {
std::throw_with_nested(service_error(fmt::format("key: {}", std::current_exception())));
}
}
}
log.warn("Falling back to local key {}", info);
auto [k, nid] = co_await _local_provider->key(info, id);
if (nid && nid != id) {
// local provider does not give ids.
throw malformed_response_error("Expected null id back from local provider");
}
co_return std::make_tuple(k, local_fallback_id);
}
future<std::tuple<UUID, key_ptr>> replicated_key_provider::get_key(const key_info& info, opt_bytes opt_id) {
if (!_initialized) {
co_await maybe_initialize_tables();
}
key_id id(info, std::move(opt_id));
auto i = _keys.find(id);
if (i != _keys.end()) {
co_return std::tuple(i->second.first, i->second.second);
}
// TODO: origin does non-cql acquire of all available keys from
// replicas in the "host_ids" table iff we get here during boot.
// For now, ignore this and assume that if we have a sstable with
// key X, we should have a local replica of X as well, given
// the "everywhere strategy of the keys table.
auto cipher = info.alg.substr(0, info.alg.find('/')); // e.g. "AES"
UUID uuid;
shared_ptr<cql3::untyped_result_set> res;
if (id.id) {
uuid = utils::UUID_gen::get_UUID(*id.id);
log.debug("Finding key {} ({})", uuid, info);
auto s = fmt::format("SELECT * FROM {}.{} WHERE key_file=? AND cipher=? AND strength=? AND key_id=?;", replicated_key_provider_factory::KSNAME, TABLENAME);
res = co_await query(std::move(s), _system_key->name(), cipher, int32_t(id.info.len), uuid);
// if we find nothing, and we actually queried a specific key (by uuid), we've failed.
if (res->empty()) {
log.debug("Could not find key {}", id.id);
throw std::runtime_error(fmt::format("Unable to find key for cipher={} strength={} id={}", cipher, id.info.len, uuid));
}
} else {
log.debug("Finding key ({})", info);
auto s = fmt::format("SELECT * FROM {}.{} WHERE key_file=? AND cipher=? AND strength=? LIMIT 1;", replicated_key_provider_factory::KSNAME, TABLENAME);
res = co_await query(std::move(s), _system_key->name(), cipher, int32_t(id.info.len));
}
// otoh, if we don't need a specific key, we can just create a new one (writing a sstable)
if (res->empty()) {
uuid = utils::UUID_gen::get_time_UUID();
log.debug("No key found. Generating {}", uuid);
auto k = make_shared<symmetric_key>(id.info);
store_key(id, uuid, k);
auto b = co_await _system_key->encrypt(k->key());
auto ks = base64_encode(b);
log.trace("Inserting generated key {}", uuid);
co_await query(fmt::format("INSERT INTO {}.{} (key_file, cipher, strength, key_id, key) VALUES (?, ?, ?, ?, ?)",
replicated_key_provider_factory::KSNAME, TABLENAME), _system_key->name(), cipher, int32_t(id.info.len), uuid, ks
);
log.trace("Flushing key table");
co_await force_blocking_flush();
co_return std::tuple(uuid, k);
}
// found it
auto& row = res->one();
uuid = row.get_as<UUID>("key_id");
auto ks = row.get_as<sstring>("key");
auto kb = base64_decode(ks);
auto b = co_await _system_key->decrypt(kb);
auto k = make_shared<symmetric_key>(id.info, b);
store_key(id, uuid, k);
co_return std::tuple(uuid, k);
}
future<> replicated_key_provider::validate() const {
try {
co_await _system_key->validate();
} catch (...) {
std::throw_with_nested(std::invalid_argument(fmt::format("Could not validate system key: {}", _system_key->name())));
}
if (_local_provider){
co_await _local_provider->validate();
}
}
schema_ptr encrypted_keys_table() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(replicated_key_provider_factory::KSNAME, TABLENAME);
return schema_builder(replicated_key_provider_factory::KSNAME, TABLENAME, std::make_optional(id))
.with_column("key_file", utf8_type, column_kind::partition_key)
.with_column("cipher", utf8_type, column_kind::partition_key)
.with_column("strength", int32_type, column_kind::clustering_key)
.with_column("key_id", timeuuid_type, column_kind::clustering_key)
.with_column("key", utf8_type)
.with_hash_version()
.build();
}();
return schema;
}
future<> replicated_key_provider::maybe_initialize_tables() {
if (!_initialized) {
co_await do_initialize_tables(_ctxt.get_database().local(), _ctxt.get_migration_manager().local());
_initialized = true;
}
}
future<> replicated_key_provider::do_initialize_tables(::replica::database& db, service::migration_manager& mm) {
if (db.has_schema(replicated_key_provider_factory::KSNAME, TABLENAME)) {
co_return;
}
log.debug("Creating keyspace and table");
if (!db.has_keyspace(replicated_key_provider_factory::KSNAME)) {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
try {
auto ksm = keyspace_metadata::new_keyspace(
replicated_key_provider_factory::KSNAME,
"org.apache.cassandra.locator.EverywhereStrategy",
{},
std::nullopt,
std::nullopt,
true);
co_await mm.announce(service::prepare_new_keyspace_announcement(db, ksm, ts), std::move(group0_guard), fmt::format("encryption at rest: create keyspace {}", replicated_key_provider_factory::KSNAME));
} catch (exceptions::already_exists_exception&) {
}
}
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
try {
co_await mm.announce(co_await service::prepare_new_column_family_announcement(mm.get_storage_proxy(), encrypted_keys_table(), ts), std::move(group0_guard),
fmt::format("encryption at rest: create table {}.{}", replicated_key_provider_factory::KSNAME, TABLENAME));
} catch (exceptions::already_exists_exception&) {
}
auto& ks = db.find_keyspace(replicated_key_provider_factory::KSNAME);
auto& rs = ks.get_replication_strategy();
// should perhaps check name also..
if (rs.get_type() != locator::replication_strategy_type::everywhere_topology) {
// TODO: reset to everywhere + repair.
}
}
const size_t replicated_key_provider::header_size;
replicated_key_provider_factory::replicated_key_provider_factory()
{}
replicated_key_provider_factory::~replicated_key_provider_factory()
{}
namespace bfs = std::filesystem;
shared_ptr<key_provider> replicated_key_provider_factory::get_provider(encryption_context& ctxt, const options& map) {
static bool did_warn = false;
if (!std::exchange(did_warn, true)) {
log.warn("ReplicatedKeyProviderFactory is deprecated and will be removed in a future release.");
log.warn("Please consult the Scylla documentation on how to migrate to a supported key provider.");
log.warn("For more info see: {}", doc_link("operating-scylla/security/encryption-at-rest.html"));
}
opt_wrapper opts(map);
auto system_key_name = opts(SYSTEM_KEY_FILE).value_or("system_key");
if (system_key_name.find('/') != sstring::npos) {
throw std::invalid_argument("system_key cannot contain '/'");
}
auto system_key = ctxt.get_system_key(system_key_name);
auto local_key_file = bfs::absolute(bfs::path(opts(SECRET_KEY_FILE).value_or(default_key_file_path)));
if (system_key->is_local() && bfs::absolute(bfs::path(system_key->name())) == local_key_file) {
throw std::invalid_argument("system key and local key cannot be the same");
}
auto name = system_key->name() + ":" + local_key_file.string();
auto debug = opts("DEBUG");
if (debug) {
name = name + ":" + *debug;
}
auto p = ctxt.get_cached_provider(name);
if (!p) {
auto rp = seastar::make_shared<replicated_key_provider>(ctxt, std::move(system_key), local_file_provider_factory::find(ctxt, local_key_file.string()));
ctxt.cache_provider(name, rp);
if (debug && debug->find("nocache") != sstring::npos) {
log.debug("Turn off cache");
rp->_use_cache = false;
}
p = std::move(rp);
}
return p;
}
void replicated_key_provider_factory::init(db::extensions& exts) {
exts.add_extension_internal_keyspace(KSNAME);
}
future<> replicated_key_provider_factory::on_started(::replica::database& db, service::migration_manager& mm) {
return replicated_key_provider::do_initialize_tables(db, mm);
}
}