This reverts commit866c96f536, reversing changes made to367633270a. This change caused all longevities to fail, with a crash in parsing scylla-metadata. The investigation is still ongoing, with no quick fix in sight yet. Fixes: #27496 Closes scylladb/scylladb#27518
1122 lines
46 KiB
C++
1122 lines
46 KiB
C++
/*
|
|
* Copyright (C) 2015 ScyllaDB
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
#include <map>
|
|
#include <unordered_map>
|
|
#include <tuple>
|
|
#include <stdexcept>
|
|
#include <regex>
|
|
#include <algorithm>
|
|
#include <ios>
|
|
|
|
#include <string.h>
|
|
|
|
#include <openssl/evp.h>
|
|
#include <openssl/rand.h>
|
|
#include <openssl/md5.h>
|
|
#include <openssl/sha.h>
|
|
#include <openssl/hmac.h>
|
|
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/fstream.hh>
|
|
#include <seastar/core/reactor.hh>
|
|
|
|
#include <fmt/ranges.h>
|
|
#include <fmt/ostream.h>
|
|
#include <fmt/std.h>
|
|
#include "utils/to_string.hh"
|
|
|
|
#include "encryption.hh"
|
|
#include "symmetric_key.hh"
|
|
#include "local_file_provider.hh"
|
|
#include "replicated_key_provider.hh"
|
|
#include "kmip_key_provider.hh"
|
|
#include "kmip_host.hh"
|
|
#include "kms_key_provider.hh"
|
|
#include "kms_host.hh"
|
|
#include "gcp_key_provider.hh"
|
|
#include "gcp_host.hh"
|
|
#include "azure_key_provider.hh"
|
|
#include "azure_host.hh"
|
|
#include "bytes.hh"
|
|
#include "utils/class_registrator.hh"
|
|
#include "cql3/query_processor.hh"
|
|
#include "db/extensions.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "serializer.hh"
|
|
#include "serializer_impl.hh"
|
|
#include "schema/schema.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "service/storage_service.hh"
|
|
#include "service/migration_manager.hh"
|
|
#include "db/commitlog/commitlog_extensions.hh"
|
|
#include "encrypted_file_impl.hh"
|
|
#include "encryption_config.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#include "init.hh"
|
|
|
|
static seastar::logger logg{"encryption"};
|
|
|
|
namespace encryption {
|
|
|
|
static constexpr auto REPLICATED_KEY_PROVIDER_FACTORY = "ReplicatedKeyProviderFactory";
|
|
static constexpr auto LOCAL_FILE_SYSTEM_KEY_PROVIDER_FACTORY = "LocalFileSystemKeyProviderFactory";
|
|
static constexpr auto KMIP_KEY_PROVIDER_FACTORY = "KmipKeyProviderFactory";
|
|
static constexpr auto KMS_KEY_PROVIDER_FACTORY = "KmsKeyProviderFactory";
|
|
static constexpr auto GCP_KEY_PROVIDER_FACTORY = "GcpKeyProviderFactory";
|
|
static constexpr auto AZURE_KEY_PROVIDER_FACTORY = "AzureKeyProviderFactory";
|
|
|
|
bytes base64_decode(const sstring& s, size_t off, size_t len) {
|
|
if (off >= s.size()) {
|
|
throw std::out_of_range("Invalid offset");
|
|
}
|
|
len = std::min(len, s.size() - off);
|
|
auto n = (len / 4) * 3;
|
|
bytes b{bytes::initialized_later(), n};
|
|
|
|
// EVP_DecodeBlock does not handle padding well (i.e. it returns
|
|
// data with actual padding. This is not what we want, since
|
|
// we need to allow zeros in data.
|
|
// Must thus do decoding the hard way...
|
|
|
|
std::unique_ptr<EVP_ENCODE_CTX, void (*)(EVP_ENCODE_CTX*)> ctxt(EVP_ENCODE_CTX_new(), &EVP_ENCODE_CTX_free);
|
|
|
|
::EVP_DecodeInit(ctxt.get());
|
|
|
|
int outl = 0;
|
|
auto r = ::EVP_DecodeUpdate(ctxt.get(), reinterpret_cast<uint8_t*>(b.data()), &outl, reinterpret_cast<const uint8_t *>(s.data() + off),
|
|
int(len));
|
|
if (r < 0) {
|
|
throw std::invalid_argument("Could not decode: " + s);
|
|
}
|
|
|
|
int outl2 = 0;
|
|
r = ::EVP_DecodeFinal(ctxt.get(), reinterpret_cast<uint8_t*>(b.data() + outl), &outl2);
|
|
if (r < 0) {
|
|
throw std::invalid_argument("Could not decode: " + s);
|
|
}
|
|
b.resize(outl + outl2);
|
|
return b;
|
|
}
|
|
|
|
sstring base64_encode(const bytes& b, size_t off, size_t len) {
|
|
if (off >= b.size()) {
|
|
throw std::out_of_range("Invalid offset");
|
|
}
|
|
len = std::min(len, b.size() - off);
|
|
auto n = ((len + 2) / 3) * 4;
|
|
sstring s{sstring::initialized_later(), n};
|
|
auto r = EVP_EncodeBlock(reinterpret_cast<uint8_t *>(s.data()),
|
|
reinterpret_cast<const uint8_t*>(b.data() + off), int(len));
|
|
if (r < 0) {
|
|
throw std::invalid_argument("Could not encode");
|
|
}
|
|
s.resize(r);
|
|
return s;
|
|
}
|
|
|
|
bytes calculate_md5(const bytes& b, size_t off, size_t len) {
|
|
if (off >= b.size()) {
|
|
throw std::out_of_range("Invalid offset");
|
|
}
|
|
len = std::min(len, b.size() - off);
|
|
bytes res{bytes::initialized_later(), MD5_DIGEST_LENGTH};
|
|
#if OPENSSL_VERSION_NUMBER >= (3<<28)
|
|
EVP_MD_CTX *md5 = EVP_MD_CTX_new();
|
|
EVP_DigestInit_ex(md5, EVP_md5(), nullptr);
|
|
EVP_DigestUpdate(md5, b.data() + off, len);
|
|
EVP_DigestFinal_ex(md5, reinterpret_cast<uint8_t *>(res.data()), nullptr);
|
|
EVP_MD_CTX_free(md5);
|
|
#else
|
|
MD5(reinterpret_cast<const uint8_t*>(b.data() + off), len, reinterpret_cast<uint8_t *>(res.data()));
|
|
#endif
|
|
return res;
|
|
}
|
|
|
|
bytes calculate_sha256(bytes_view b) {
|
|
bytes res{bytes::initialized_later(), SHA256_DIGEST_LENGTH};
|
|
SHA256(reinterpret_cast<const uint8_t*>(b.data()), b.size(), reinterpret_cast<uint8_t *>(res.data()));
|
|
return res;
|
|
}
|
|
|
|
bytes calculate_sha256(const bytes& b, size_t off, size_t len) {
|
|
if (off >= b.size()) {
|
|
throw std::out_of_range("Invalid offset");
|
|
}
|
|
len = std::min(len, b.size() - off);
|
|
return calculate_sha256(bytes_view(b.data() + off, len));
|
|
}
|
|
|
|
bytes hmac_sha256(bytes_view msg, bytes_view key) {
|
|
bytes res{bytes::initialized_later(), SHA256_DIGEST_LENGTH};
|
|
|
|
unsigned length;
|
|
HMAC(EVP_sha256(),
|
|
key.data(), key.size(),
|
|
reinterpret_cast<const uint8_t*>(msg.data()), msg.size(),
|
|
reinterpret_cast<uint8_t*>(res.data()), &length);
|
|
return res;
|
|
}
|
|
|
|
future<temporary_buffer<char>> read_text_file_fully(const std::string& filename) {
|
|
return open_file_dma(filename, open_flags::ro).then([](file f) {
|
|
return f.size().then([f](size_t s) {
|
|
return do_with(make_file_input_stream(f), [s](input_stream<char>& in) {
|
|
return in.read_exactly(s).then([](temporary_buffer<char> buf) {
|
|
return make_ready_future<temporary_buffer<char>>(std::move(buf));
|
|
}).finally([&in] {
|
|
return in.close();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> write_text_file_fully(const std::string& filename, temporary_buffer<char> buf) {
|
|
return open_file_dma(filename, open_flags::wo|open_flags::create).then([buf = std::move(buf)](file f) mutable {
|
|
return make_file_output_stream(f).then([buf = std::move(buf)] (output_stream<char> out) mutable {
|
|
return do_with(std::move(out), [buf = std::move(buf)](output_stream<char>& out) mutable {
|
|
auto p = buf.get();
|
|
auto s = buf.size();
|
|
return out.write(p, s).finally([&out, buf = std::move(buf)] {
|
|
return out.close();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> write_text_file_fully(const std::string& filename, const std::string& s) {
|
|
return write_text_file_fully(filename, temporary_buffer<char>(s.data(), s.size()));
|
|
}
|
|
|
|
std::optional<std::chrono::milliseconds> parse_expiry(std::optional<std::string> in) {
|
|
if (!in) {
|
|
return std::nullopt;
|
|
}
|
|
size_t idx = 0;
|
|
auto n = std::stoll(*in, &idx); // we assume seconds
|
|
|
|
if (idx != 0) {
|
|
auto unit = in->substr(idx);
|
|
if (unit == "ms") {
|
|
return std::chrono::milliseconds(n);
|
|
} else if (unit == "h") {
|
|
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::hours(n));
|
|
} else if (unit == "d") {
|
|
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::days(n));
|
|
} else if (unit == "s") {
|
|
// ok
|
|
} else if (unit != "") {
|
|
throw std::invalid_argument("Unsupported time unit: " + unit);
|
|
}
|
|
}
|
|
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::seconds(n));
|
|
}
|
|
|
|
|
|
static const sstring namespace_prefix = "com.datastax.bdp.cassandra.crypto.";
|
|
static const sstring encryption_attribute = "scylla_encryption_options";
|
|
|
|
static inline const sstring key_id_attribute = "scylla_key_id";
|
|
static inline const sstring encrypted_components_attribute = "encrypted_components";
|
|
|
|
static inline const sstables::disk_string<uint32_t> encryption_attribute_ds{
|
|
bytes{encryption_attribute.begin(), encryption_attribute.end()}
|
|
};
|
|
static inline const sstables::disk_string<uint32_t> key_id_attribute_ds{
|
|
bytes{key_id_attribute.begin(), key_id_attribute.end()}
|
|
};
|
|
static inline const sstables::disk_string<uint32_t> encrypted_components_attribute_ds{
|
|
bytes{encrypted_components_attribute.begin(), encrypted_components_attribute.end()}
|
|
};
|
|
|
|
key_info get_key_info(const options& map) {
|
|
opt_wrapper opts(map);
|
|
|
|
auto cipher_name = opts(CIPHER_ALGORITHM).value_or("AES/CBC/PKCS5Padding");
|
|
auto key_strength = std::stoul(opts(SECRET_KEY_STRENGTH).value_or("128"));
|
|
// todo: static constexpr auto KMIP_KEY_PROVIDER_FACTORY = "KmipKeyProviderFactory";
|
|
return key_info{ std::move(cipher_name), unsigned(key_strength) };
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const key_provider& p) {
|
|
p.print(os);
|
|
return os;
|
|
}
|
|
|
|
sstring encryption_context::maybe_decrypt_config_value(const sstring& s) const {
|
|
shared_ptr<symmetric_key> k = get_config_encryption_key();
|
|
if (!s.empty() && k != nullptr) {
|
|
auto b = base64_decode(s);
|
|
auto iv = calculate_sha256(k->key());
|
|
iv.resize(k->block_size(), 0);
|
|
bytes dst(bytes::initialized_later(), b.size());
|
|
auto len = k->decrypt(b.data(), b.size(), dst.data(), dst.size(), iv.data());
|
|
return sstring(dst.begin(), dst.begin() + len);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
class encryption_schema_extension;
|
|
|
|
class encryption_context_impl : public encryption_context {
|
|
// poor mans per-thread instance variable. We need a lookup map
|
|
// per shard, so preallocate it, much like a "sharded" thing would,
|
|
// but without all the fancy start/stop stuff.
|
|
// Allows this object to be effectively stateless, except for the
|
|
// objects in the maps.
|
|
std::vector<std::unordered_map<sstring, shared_ptr<key_provider>>> _per_thread_provider_cache;
|
|
std::vector<std::unordered_map<sstring, shared_ptr<system_key>>> _per_thread_system_key_cache;
|
|
std::vector<std::unordered_map<sstring, shared_ptr<kmip_host>>> _per_thread_kmip_host_cache;
|
|
std::vector<std::unordered_map<sstring, shared_ptr<kms_host>>> _per_thread_kms_host_cache;
|
|
std::vector<std::unordered_map<sstring, shared_ptr<gcp_host>>> _per_thread_gcp_host_cache;
|
|
std::vector<std::unordered_map<sstring, shared_ptr<azure_host>>> _per_thread_azure_host_cache;
|
|
std::vector<shared_ptr<encryption_schema_extension>> _per_thread_global_user_extension;
|
|
std::unique_ptr<encryption_config> _cfg;
|
|
sharded<cql3::query_processor>* _qp;;
|
|
sharded<service::migration_manager>* _mm;
|
|
sharded<replica::database>* _db;
|
|
sharded<service::storage_service>* _ss;
|
|
shared_ptr<symmetric_key> _cfg_encryption_key;
|
|
bool _allow_per_table_encryption;
|
|
public:
|
|
encryption_context_impl(std::unique_ptr<encryption_config> cfg, const service_set& services)
|
|
: _per_thread_provider_cache(smp::count)
|
|
, _per_thread_system_key_cache(smp::count)
|
|
, _per_thread_kmip_host_cache(smp::count)
|
|
, _per_thread_kms_host_cache(smp::count)
|
|
, _per_thread_gcp_host_cache(smp::count)
|
|
, _per_thread_azure_host_cache(smp::count)
|
|
, _per_thread_global_user_extension(smp::count)
|
|
, _cfg(std::move(cfg))
|
|
, _qp(find_or_null<cql3::query_processor>(services))
|
|
, _mm(find_or_null<service::migration_manager>(services))
|
|
, _db(find_or_null<replica::database>(services))
|
|
, _ss(find_or_null<service::storage_service>(services))
|
|
, _allow_per_table_encryption(_cfg->allow_per_table_encryption())
|
|
{}
|
|
|
|
template<typename T>
|
|
static sharded<T>* find_or_null(const service_set& services) {
|
|
try {
|
|
return std::addressof(services.find<T>());
|
|
} catch (std::out_of_range&) {
|
|
// TODO: would be great if we could verify we are in tool mode here.
|
|
return nullptr;
|
|
}
|
|
}
|
|
|
|
shared_ptr<key_provider> get_provider(const options& map) override {
|
|
opt_wrapper opts(map);
|
|
|
|
auto provider_class = opts(KEY_PROVIDER);
|
|
if (!provider_class) {
|
|
provider_class = opts(SECRET_KEY_PROVIDER_FACTORY_CLASS).value_or(LOCAL_FILE_SYSTEM_KEY_PROVIDER_FACTORY);
|
|
}
|
|
if (provider_class->empty() || ::strcasecmp(provider_class->c_str(), "none") == 0) {
|
|
return {};
|
|
}
|
|
static const std::unordered_map<sstring, std::unique_ptr<key_provider_factory>> providers = [] {
|
|
std::unordered_map<sstring, std::unique_ptr<key_provider_factory>> map;
|
|
|
|
map[REPLICATED_KEY_PROVIDER_FACTORY] = std::make_unique<replicated_key_provider_factory>();
|
|
map[LOCAL_FILE_SYSTEM_KEY_PROVIDER_FACTORY] = std::make_unique<local_file_provider_factory>();
|
|
map[KMIP_KEY_PROVIDER_FACTORY] = std::make_unique<kmip_key_provider_factory>();
|
|
map[KMS_KEY_PROVIDER_FACTORY] = std::make_unique<kms_key_provider_factory>();
|
|
map[GCP_KEY_PROVIDER_FACTORY] = std::make_unique<gcp_key_provider_factory>();
|
|
map[AZURE_KEY_PROVIDER_FACTORY] = std::make_unique<azure_key_provider_factory>();
|
|
|
|
return map;
|
|
}();
|
|
|
|
unqualified_name qn(namespace_prefix, *provider_class);
|
|
|
|
try {
|
|
return providers.at(qn)->get_provider(*this, map);
|
|
} catch (std::out_of_range&) {
|
|
throw std::invalid_argument("Unknown provider: " + *provider_class);
|
|
}
|
|
}
|
|
shared_ptr<key_provider> get_cached_provider(const sstring& id) const override {
|
|
auto& cache = _per_thread_provider_cache[this_shard_id()];
|
|
auto i = cache.find(id);
|
|
if (i != cache.end()) {
|
|
return i->second;
|
|
}
|
|
return {};
|
|
}
|
|
void cache_provider(const sstring& id, shared_ptr<key_provider> p) override {
|
|
_per_thread_provider_cache[this_shard_id()][id] = std::move(p);
|
|
}
|
|
|
|
shared_ptr<system_key> get_system_key(const sstring& name) override {
|
|
auto& cache = _per_thread_system_key_cache[this_shard_id()];
|
|
auto i = cache.find(name);
|
|
if (i != cache.end()) {
|
|
return i->second;
|
|
}
|
|
|
|
shared_ptr<encryption::system_key> k;
|
|
|
|
if (kmip_system_key::is_kmip_path(name)) {
|
|
k = make_shared<kmip_system_key>(*this, name);
|
|
} else {
|
|
k = make_shared<local_system_key>(*this, name);
|
|
}
|
|
|
|
if (k != nullptr) {
|
|
cache[name] = k;
|
|
}
|
|
|
|
return k;
|
|
}
|
|
|
|
template<typename HostType, typename CacheType, typename ConfigType>
|
|
shared_ptr<HostType> get_host(const sstring& host, CacheType& cache, const ConfigType& config_map) {
|
|
auto& host_cache = cache[this_shard_id()];
|
|
auto it = host_cache.find(host);
|
|
if (it != host_cache.end()) {
|
|
return it->second;
|
|
}
|
|
|
|
auto config_it = config_map.find(host);
|
|
if (config_it != config_map.end()) {
|
|
auto result = ::make_shared<HostType>(*this, host, config_it->second);
|
|
host_cache.emplace(host, result);
|
|
return result;
|
|
}
|
|
|
|
throw std::invalid_argument("No such host: " + host);
|
|
}
|
|
|
|
shared_ptr<kmip_host> get_kmip_host(const sstring& host) override {
|
|
return get_host<kmip_host>(host, _per_thread_kmip_host_cache, _cfg->kmip_hosts());
|
|
}
|
|
|
|
shared_ptr<kms_host> get_kms_host(const sstring& host) override {
|
|
return get_host<kms_host>(host, _per_thread_kms_host_cache, _cfg->kms_hosts());
|
|
}
|
|
|
|
shared_ptr<gcp_host> get_gcp_host(const sstring& host) override {
|
|
return get_host<gcp_host>(host, _per_thread_gcp_host_cache, _cfg->gcp_hosts());
|
|
}
|
|
|
|
shared_ptr<azure_host> get_azure_host(const sstring& host) override {
|
|
return get_host<azure_host>(host, _per_thread_azure_host_cache, _cfg->azure_hosts());
|
|
}
|
|
|
|
|
|
const encryption_config& config() const override {
|
|
return *_cfg;
|
|
}
|
|
shared_ptr<symmetric_key> get_config_encryption_key() const override {
|
|
return _cfg_encryption_key;
|
|
}
|
|
future<> load_config_encryption_key(const sstring & name) {
|
|
return get_system_key(name)->get_key().then([this](auto k) {
|
|
_cfg_encryption_key = std::move(k);
|
|
});
|
|
}
|
|
/**
|
|
* This looks like checking too late, but since these are only used by
|
|
* replicated provider, they will be checked very early anyway, unless
|
|
* running tool mode, in which case they don't exist.
|
|
*/
|
|
template<typename T>
|
|
T& check_service_object(T* t) const {
|
|
if (t == nullptr) {
|
|
throw std::runtime_error(fmt::format("Service {} not registered", typeid(T).name()));
|
|
}
|
|
return *t;
|
|
}
|
|
sharded<cql3::query_processor>& get_query_processor() const override {
|
|
return check_service_object(_qp);
|
|
}
|
|
sharded<service::storage_service>& get_storage_service() const override {
|
|
return check_service_object(_ss);
|
|
}
|
|
sharded<replica::database>& get_database() const override {
|
|
return check_service_object(_db);
|
|
}
|
|
sharded<service::migration_manager>& get_migration_manager() const override {
|
|
return check_service_object(_mm);
|
|
}
|
|
|
|
future<> start() override {
|
|
if (_qp && _ss && _db && _mm) {
|
|
co_await replicated_key_provider_factory::on_started(get_database().local(), get_migration_manager().local());
|
|
}
|
|
}
|
|
future<> stop() override {
|
|
return smp::invoke_on_all([this]() -> future<> {
|
|
for (auto&& [id, h] : _per_thread_kmip_host_cache[this_shard_id()]) {
|
|
co_await h->disconnect();
|
|
}
|
|
static auto stop_all = [](auto&& cache) -> future<> {
|
|
for (auto& [k, host] : cache) {
|
|
co_await host->stop();
|
|
}
|
|
};
|
|
co_await stop_all(_per_thread_kms_host_cache[this_shard_id()]);
|
|
co_await stop_all(_per_thread_gcp_host_cache[this_shard_id()]);
|
|
|
|
_per_thread_provider_cache[this_shard_id()].clear();
|
|
_per_thread_system_key_cache[this_shard_id()].clear();
|
|
_per_thread_kmip_host_cache[this_shard_id()].clear();
|
|
_per_thread_kms_host_cache[this_shard_id()].clear();
|
|
_per_thread_gcp_host_cache[this_shard_id()].clear();
|
|
_per_thread_azure_host_cache[this_shard_id()].clear();
|
|
_per_thread_global_user_extension[this_shard_id()] = {};
|
|
});
|
|
}
|
|
|
|
void add_global_user_encryption(shared_ptr<encryption_schema_extension> ext) {
|
|
_per_thread_global_user_extension[this_shard_id()] = std::move(ext);
|
|
}
|
|
|
|
shared_ptr<encryption_schema_extension> get_global_user_encryption() const {
|
|
return _per_thread_global_user_extension[this_shard_id()];
|
|
}
|
|
bool allow_per_table_encryption() const {
|
|
return _allow_per_table_encryption;
|
|
}
|
|
};
|
|
|
|
class encryption_schema_extension;
|
|
|
|
std::ostream& operator<<(std::ostream& os, const encryption_schema_extension& ext);
|
|
|
|
}
|
|
|
|
template <> struct fmt::formatter<encryption::encryption_schema_extension> : fmt::ostream_formatter {};
|
|
|
|
namespace encryption {
|
|
|
|
class encryption_schema_extension : public schema_extension {
|
|
key_info _info;
|
|
shared_ptr<key_provider> _provider;
|
|
std::map<sstring, sstring> _options;
|
|
std::optional<size_t> _key_block_size;
|
|
|
|
friend std::ostream& operator<<(std::ostream&, const encryption_schema_extension&);
|
|
public:
|
|
encryption_schema_extension(key_info, shared_ptr<key_provider>, std::map<sstring, sstring>);
|
|
|
|
using extension_ptr = ::shared_ptr<encryption_schema_extension>;
|
|
|
|
static extension_ptr create(encryption_context_impl&, std::map<sstring, sstring>);
|
|
static extension_ptr create(encryption_context_impl&, const bytes&);
|
|
|
|
static extension_ptr parse(encryption_context_impl& ctxt, db::extensions::schema_ext_config cfg) {
|
|
struct {
|
|
encryption_context_impl& _ctxt;
|
|
|
|
extension_ptr operator()(const sstring&) const {
|
|
throw std::invalid_argument("Malformed extension");
|
|
}
|
|
extension_ptr operator()(const std::map<sstring, sstring>& opts) const {
|
|
return create(_ctxt, opts);
|
|
}
|
|
extension_ptr operator()(const bytes& v) const {
|
|
return create(_ctxt, v);
|
|
}
|
|
} v{ctxt};
|
|
|
|
auto res = std::visit(v, cfg);
|
|
// Note: We always allow _disbling_ per-table encryption, i.e. if user encryption is active, we fall back to node-local
|
|
if (res && !ctxt.allow_per_table_encryption() && ctxt.get_global_user_encryption()) {
|
|
throw std::invalid_argument(fmt::format("Node global user encryption is active and per-table encryption attributes have been prohibited ({})", *res));
|
|
}
|
|
return res;
|
|
}
|
|
|
|
static options parse_options(const bytes& v) {
|
|
return ser::deserialize_from_buffer(v, std::type_identity<options>(), 0);
|
|
}
|
|
|
|
future<::shared_ptr<symmetric_key>> key_for_read(opt_bytes id) const {
|
|
return _provider->key(_info, std::move(id)).then([](std::tuple<key_ptr, opt_bytes> k_id) {
|
|
return std::get<0>(std::move(k_id));
|
|
});
|
|
}
|
|
future<std::tuple<::shared_ptr<symmetric_key>, opt_bytes>> key_for_write(opt_bytes id = {}) const {
|
|
return _provider->key(_info, std::move(id));
|
|
}
|
|
|
|
bytes serialize() const override {
|
|
return ser::serialize_to_buffer<bytes>(_options, 0);
|
|
}
|
|
future<> validate(const schema& s) const override {
|
|
try {
|
|
co_await _provider->validate();
|
|
auto k = co_await key_for_write();
|
|
logg.info("Added encryption extension to {}.{}", s.ks_name(), s.cf_name());
|
|
logg.info(" Options: {}", _options);
|
|
logg.info(" Key Algorithm: {}", _info);
|
|
logg.info(" Provider: {}", *_provider);
|
|
|
|
auto problems = std::get<0>(k)->validate_exact_info_result();
|
|
if (!problems.empty()) {
|
|
logg.warn("{}", problems);
|
|
}
|
|
} catch (...) {
|
|
std::throw_with_nested(exceptions::configuration_exception((std::stringstream{} << "Validation failed:" << std::current_exception()).str()));
|
|
}
|
|
}
|
|
|
|
bool should_delay_read(const opt_bytes& id) {
|
|
return _provider->should_delay_read(id);
|
|
}
|
|
size_t key_block_size() {
|
|
if (!_key_block_size) {
|
|
_key_block_size = symmetric_key(_info).block_size();
|
|
}
|
|
return *_key_block_size;
|
|
}
|
|
};
|
|
|
|
std::ostream& operator<<(std::ostream& os, const encryption_schema_extension& ext) {
|
|
fmt::print(os, "{}, alg={}, provider={}", ext._options, ext._info, *ext._provider);
|
|
return os;
|
|
}
|
|
|
|
// encryption_schema_extension was written before schema_extension was deprecated, so support it
|
|
// without warnings
|
|
#pragma clang diagnostic push
|
|
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
|
|
encryption_schema_extension::encryption_schema_extension(key_info info, shared_ptr<key_provider> provider, std::map<sstring, sstring> options)
|
|
: _info(std::move(info))
|
|
, _provider(std::move(provider))
|
|
, _options(std::move(options))
|
|
{}
|
|
|
|
#pragma clang diagnostic pop
|
|
|
|
::shared_ptr<encryption_schema_extension> encryption_schema_extension::create(encryption_context_impl& ctxt, const bytes& v) {
|
|
auto map = parse_options(v);
|
|
return create(ctxt, map);
|
|
}
|
|
|
|
::shared_ptr<encryption_schema_extension> encryption_schema_extension::create(encryption_context_impl& ctxt, std::map<sstring, sstring> map) {
|
|
key_info info = get_key_info(map);
|
|
auto provider = ctxt.get_provider(map);
|
|
if (!provider) {
|
|
return {};
|
|
}
|
|
return ::make_shared<encryption_schema_extension>(std::move(info), std::move(provider), std::move(map));
|
|
}
|
|
|
|
class encryption_file_io_extension : public sstables::file_io_extension {
|
|
::shared_ptr<encryption_context_impl> _ctxt;
|
|
public:
|
|
encryption_file_io_extension(::shared_ptr<encryption_context_impl> ctxt)
|
|
: _ctxt(std::move(ctxt))
|
|
{}
|
|
|
|
attr_value_map get_attributes(const sstables::sstable& sst) const override {
|
|
auto& sc = sst.get_shared_components();
|
|
if (!sc.scylla_metadata) {
|
|
return {};
|
|
}
|
|
auto* exta = sc.scylla_metadata->get_extension_attributes();
|
|
if (!exta) {
|
|
return {};
|
|
}
|
|
|
|
auto i = exta->map.find(encryption_attribute_ds);
|
|
if (i == exta->map.end()) {
|
|
return {};
|
|
}
|
|
auto opts = encryption_schema_extension::parse_options(i->second.value);
|
|
|
|
if (exta->map.count(key_id_attribute_ds)) {
|
|
auto id = exta->map.at(key_id_attribute_ds).value;
|
|
auto id_str = id.size() == utils::UUID::serialized_size()
|
|
? sstring(fmt::format("{}", utils::UUID_gen::get_UUID(id)))
|
|
: to_hex(id)
|
|
;
|
|
opts["key_id"] = std::move(id_str);
|
|
}
|
|
|
|
if (exta->map.count(encrypted_components_attribute_ds)) {
|
|
std::vector<sstables::component_type> ccs;
|
|
ccs.reserve(11);
|
|
auto mask = ser::deserialize_from_buffer(exta->map.at(encrypted_components_attribute_ds).value, std::type_identity<uint32_t>{}, 0);
|
|
for (auto c : { sstables::component_type::Index,
|
|
sstables::component_type::CompressionInfo,
|
|
sstables::component_type::Data,
|
|
sstables::component_type::Summary,
|
|
sstables::component_type::Digest,
|
|
sstables::component_type::CRC,
|
|
sstables::component_type::Filter,
|
|
sstables::component_type::Statistics,
|
|
sstables::component_type::TemporaryStatistics,
|
|
sstables::component_type::Partitions,
|
|
sstables::component_type::Rows,
|
|
sstables::component_type::TemporaryHashes,
|
|
}) {
|
|
if (mask & (1 << int(c))) {
|
|
ccs.emplace_back(c);
|
|
}
|
|
}
|
|
opts["components"] = fmt::to_string(fmt::join(ccs, ", "));
|
|
} else {
|
|
opts["components"] = "Data";
|
|
}
|
|
attr_value_map res;
|
|
res["encryption_info"] = std::move(opts);
|
|
return res;
|
|
}
|
|
|
|
std::tuple<opt_bytes, shared_ptr<encryption_schema_extension>> get_encryption_schema_extension(const sstables::sstable& sst,
|
|
sstables::component_type type) const {
|
|
const auto& sc = sst.get_shared_components();
|
|
if (!sc.scylla_metadata) {
|
|
return {};
|
|
}
|
|
const auto* ext_attr = sc.scylla_metadata->get_extension_attributes();
|
|
if (!ext_attr) {
|
|
return {};
|
|
}
|
|
|
|
bool ok = ext_attr->map.contains(encryption_attribute_ds);
|
|
if (ok && type != sstables::component_type::Data) {
|
|
ok = (ser::deserialize_from_buffer(ext_attr->map.at(encrypted_components_attribute_ds).value, std::type_identity<uint32_t>{}, 0) & (1 << static_cast<int>(type))) > 0;
|
|
}
|
|
|
|
if (!ok) {
|
|
return {};
|
|
}
|
|
auto esx = encryption_schema_extension::create(*_ctxt, ext_attr->map.at(encryption_attribute_ds).value);
|
|
opt_bytes id;
|
|
if (ext_attr->map.contains(key_id_attribute_ds)) {
|
|
id = ext_attr->map.at(key_id_attribute_ds).value;
|
|
}
|
|
return {std::move(id), std::move(esx)};
|
|
}
|
|
|
|
future<file> wrap_file(const sstables::sstable& sst, sstables::component_type type, file f, open_flags flags) override {
|
|
switch (type) {
|
|
case sstables::component_type::Scylla:
|
|
case sstables::component_type::TemporaryTOC:
|
|
case sstables::component_type::TOC:
|
|
co_return file{};
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if (flags == open_flags::ro) {
|
|
// open existing. check read opts.
|
|
auto [id, esx] = get_encryption_schema_extension(sst, type);
|
|
if (esx) {
|
|
if (esx->should_delay_read(id)) {
|
|
logg.debug("Encrypted sstable component {} using delayed opening {} (id: {})", sst.component_basename(type), *esx, id);
|
|
|
|
co_return make_delayed_encrypted_file(f, esx->key_block_size(), [esx, comp = sst.component_basename(type), id = std::move(id)] {
|
|
logg.trace("Delayed component {} using {} (id: {}) resolve", comp, *esx, id);
|
|
return esx->key_for_read(id);
|
|
});
|
|
}
|
|
|
|
logg.debug("Open encrypted sstable component {} using {} (id: {})", sst.component_basename(type), *esx, id);
|
|
|
|
auto k = co_await esx->key_for_read(std::move(id));
|
|
co_return make_encrypted_file(f, std::move(k));
|
|
}
|
|
} else {
|
|
if (co_await wrap_writeonly(sst, type, [&f](shared_ptr<symmetric_key> k) { f = make_encrypted_file(std::move(f), std::move(k)); })) {
|
|
co_return f;
|
|
}
|
|
}
|
|
|
|
co_return file{};
|
|
}
|
|
|
|
future<bool> wrap_writeonly(const sstables::sstable& sst, sstables::component_type type, std::function<void(shared_ptr<symmetric_key>)> apply) {
|
|
auto s = sst.get_schema();
|
|
shared_ptr<encryption_schema_extension> esx;
|
|
auto e = s->extensions().find(encryption_attribute);
|
|
// #4844 - don't allow schema encryption to be used for writing
|
|
// iff it is disallowed by config -> placeholder here
|
|
// (See schema_tables.cc::prepare_builder_from_table_row - if an extension
|
|
// is unavailable/non-creatable at load time a dummy object is inserted
|
|
// )
|
|
if (e != s->extensions().end() && !e->second->is_placeholder()) {
|
|
esx = static_pointer_cast<encryption_schema_extension>(e->second);
|
|
} else if (!is_system_keyspace(s->ks_name())) {
|
|
esx = _ctxt->get_global_user_encryption();
|
|
}
|
|
if (esx) {
|
|
auto& sc = sst.get_shared_components();
|
|
if (!sc.scylla_metadata) {
|
|
sc.scylla_metadata.emplace();
|
|
}
|
|
auto& ext = sc.scylla_metadata->get_or_create_extension_attributes();
|
|
opt_bytes id;
|
|
|
|
// We are writing more than one component. If we used a named key before
|
|
// we need to make sure we use the exact same one for all components,
|
|
// even if something like KMIP key invalidation replaced it.
|
|
// This will also speed up key lookup in some cases, as both repl
|
|
// and kmip cache id bound keys.
|
|
if (ext.map.count(key_id_attribute_ds)) {
|
|
id = ext.map.at(key_id_attribute_ds).value;
|
|
}
|
|
|
|
logg.debug("Write encrypted sstable component {} using {} (id: {})", sst.component_basename(type), *esx, id);
|
|
|
|
/**
|
|
* #3954 We can be (and are) called with two components simultaneously (hello index, data).
|
|
* If this case we could block on the below "key" call and iff provider has certain cache behaviour (hello replicated)
|
|
* or caches expire, we could end up with different keys for respective components, leading to one
|
|
* of the components ending up unreadable.
|
|
*/
|
|
for (;;) {
|
|
auto [k, k_id] = co_await esx->key_for_write(std::move(id));
|
|
|
|
if (k_id && ext.map.count(key_id_attribute_ds)) {
|
|
id = ext.map.at(key_id_attribute_ds).value;
|
|
if (k_id != id) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
id = std::move(k_id);
|
|
|
|
if (!ext.map.count(encryption_attribute_ds)) {
|
|
ext.map.emplace(encryption_attribute_ds, sstables::disk_string<uint32_t>{esx->serialize()});
|
|
}
|
|
if (id) {
|
|
ext.map.emplace(key_id_attribute_ds, sstables::disk_string<uint32_t>{*id});
|
|
}
|
|
if (type != sstables::component_type::Data) {
|
|
uint32_t mask = 0;
|
|
if (ext.map.count(encrypted_components_attribute_ds)) {
|
|
mask = ser::deserialize_from_buffer(ext.map.at(encrypted_components_attribute_ds).value, std::type_identity<uint32_t>{}, 0);
|
|
}
|
|
mask |= (1 << int(type));
|
|
// just a marker. see above
|
|
ext.map[encrypted_components_attribute_ds] = sstables::disk_string<uint32_t>{ser::serialize_to_buffer<bytes>(mask, 0)};
|
|
}
|
|
apply(std::move(k));
|
|
co_return true;
|
|
}
|
|
}
|
|
co_return false;
|
|
}
|
|
|
|
future<data_sink> wrap_sink(const sstables::sstable& sst, sstables::component_type type, data_sink sink) override {
|
|
switch (type) {
|
|
case sstables::component_type::Scylla:
|
|
case sstables::component_type::TemporaryTOC:
|
|
case sstables::component_type::TOC:
|
|
co_return sink;
|
|
case sstables::component_type::Data:
|
|
case sstables::component_type::Index:
|
|
case sstables::component_type::CompressionInfo:
|
|
case sstables::component_type::Summary:
|
|
case sstables::component_type::Digest:
|
|
case sstables::component_type::CRC:
|
|
case sstables::component_type::Filter:
|
|
case sstables::component_type::Statistics:
|
|
case sstables::component_type::TemporaryStatistics:
|
|
case sstables::component_type::Partitions:
|
|
case sstables::component_type::Rows:
|
|
case sstables::component_type::TemporaryHashes:
|
|
case sstables::component_type::Unknown:
|
|
break;
|
|
}
|
|
co_await wrap_writeonly(sst, type, [&sink](shared_ptr<symmetric_key> k) {
|
|
sink = data_sink(make_encrypted_sink(std::move(sink), std::move(k)));
|
|
});
|
|
co_return sink;
|
|
}
|
|
|
|
future<data_source> wrap_source(const sstables::sstable& sst,
|
|
sstables::component_type type,
|
|
data_source src) override {
|
|
switch (type) {
|
|
case sstables::component_type::Scylla:
|
|
case sstables::component_type::TemporaryTOC:
|
|
case sstables::component_type::TOC:
|
|
co_return src;
|
|
case sstables::component_type::CompressionInfo:
|
|
case sstables::component_type::CRC:
|
|
case sstables::component_type::Data:
|
|
case sstables::component_type::Digest:
|
|
case sstables::component_type::Filter:
|
|
case sstables::component_type::Index:
|
|
case sstables::component_type::Statistics:
|
|
case sstables::component_type::Summary:
|
|
case sstables::component_type::TemporaryStatistics:
|
|
case sstables::component_type::Rows:
|
|
case sstables::component_type::Partitions:
|
|
case sstables::component_type::TemporaryHashes:
|
|
case sstables::component_type::Unknown:
|
|
auto [id, esx] = get_encryption_schema_extension(sst, type);
|
|
if (esx) {
|
|
auto key = co_await esx->key_for_read(std::move(id));
|
|
co_return data_source(make_encrypted_source(std::move(src), std::move(key)));
|
|
}
|
|
co_return src;
|
|
}
|
|
}
|
|
};
|
|
|
|
std::string encryption_provider(const sstables::sstable& sst) {
|
|
auto& sc = sst.get_shared_components();
|
|
if (!sc.scylla_metadata) {
|
|
return {};
|
|
}
|
|
auto* exta = sc.scylla_metadata->get_extension_attributes();
|
|
if (!exta) {
|
|
return {};
|
|
}
|
|
|
|
auto i = exta->map.find(encryption_attribute_ds);
|
|
if (i == exta->map.end()) {
|
|
return {};
|
|
}
|
|
auto options = encryption_schema_extension::parse_options(i->second.value);
|
|
opt_wrapper opts(options);
|
|
|
|
return opts(KEY_PROVIDER).value_or(std::string{});
|
|
}
|
|
|
|
namespace bfs = std::filesystem;
|
|
|
|
class encryption_commitlog_file_extension : public db::commitlog_file_extension {
|
|
const ::shared_ptr<encryption_context> _ctxt;
|
|
const options _opts;
|
|
|
|
static const inline std::regex prop_expr = std::regex("^([^=]+)=(\\S+)$");
|
|
static const inline sstring id_key = "key_id";
|
|
static const inline std::string end_of_file_mark = "#-- end of file";
|
|
|
|
public:
|
|
encryption_commitlog_file_extension(::shared_ptr<encryption_context> ctxt, options opts)
|
|
: _ctxt(ctxt)
|
|
, _opts(std::move(opts))
|
|
{}
|
|
sstring config_name(const sstring& filename) const {
|
|
bfs::path p(filename);
|
|
auto dir = p.parent_path();
|
|
auto file = p.filename();
|
|
return (dir / bfs::path("." + file.string())).string();
|
|
}
|
|
future<file> wrap_file(const sstring& filename, file f, open_flags flags) override {
|
|
auto cfg_file = config_name(filename);
|
|
|
|
if (flags == open_flags::ro) {
|
|
return file_exists(cfg_file).then([=, this](bool exists) {
|
|
if (!exists) {
|
|
// #1681 if file system errors caused the options file to simply not exist,
|
|
// we can at least hope that the file itself is not very encrypted either.
|
|
// But who knows. Will probably cause data corruption.
|
|
logg.info("Commitlog segment {} has no encryption info. Opening unencrypted.", filename);
|
|
return make_ready_future<file>(std::move(f));
|
|
}
|
|
return read_text_file_fully(cfg_file).then([f, this, filename](temporary_buffer<char> buf) {
|
|
std::istringstream ss(std::string(buf.begin(), buf.end()));
|
|
options opts;
|
|
std::string line;
|
|
bool has_eof = false;
|
|
while (std::getline(ss, line)) {
|
|
std::smatch m;
|
|
if (std::regex_match(line, m, prop_expr)) {
|
|
auto k = m[1].str();
|
|
auto v = m[2].str();
|
|
opts[k] = v;
|
|
} else if (line == end_of_file_mark) {
|
|
has_eof = true;
|
|
}
|
|
}
|
|
|
|
// #1682 - if we crashed while writing the options file,
|
|
// it is quite possible that we are eventually trying to
|
|
// open + replay an (empty) CL file, but cannot read the
|
|
// properties now, since _our_ metadata is empty/truncated
|
|
if (!has_eof) {
|
|
// just return the unwrapped file.
|
|
logg.info("Commitlog segment {} has incomplete encryption info. Opening unencrypted.", filename);
|
|
return make_ready_future<file>(std::move(f));
|
|
}
|
|
opt_bytes id;
|
|
if (opts.count(id_key)) {
|
|
id = base64_decode(opts[id_key]);
|
|
}
|
|
|
|
auto provider = _ctxt->get_provider(opts);
|
|
|
|
logg.debug("Open commitlog segment {} using {} (id: {})", filename, *provider, id);
|
|
auto info = make_shared<key_info>(get_key_info(opts));
|
|
return provider->key(*info, id).then([f, info](std::tuple<shared_ptr<symmetric_key>, opt_bytes> k) {
|
|
return make_ready_future<file>(make_encrypted_file(f, std::get<0>(k)));
|
|
});
|
|
});
|
|
});
|
|
} else {
|
|
auto provider = _ctxt->get_provider(_opts);
|
|
auto info = make_shared<key_info>(get_key_info(_opts));
|
|
return provider->key(*info).then([f, this, info, cfg_file, filename, &provider = *provider](std::tuple<shared_ptr<symmetric_key>, opt_bytes> k_id) {
|
|
auto&& k = std::get<0>(k_id);
|
|
auto&& id = std::get<1>(k_id);
|
|
std::ostringstream ss;
|
|
for (auto&p : _opts) {
|
|
ss << p.first << "=" << p.second << std::endl;
|
|
}
|
|
if (id) {
|
|
ss << id_key << "=" << base64_encode(*id) << std::endl;
|
|
}
|
|
ss << end_of_file_mark << std::endl;
|
|
|
|
logg.debug("Creating commitlog segment {} using {} (id: {})", filename, provider, id);
|
|
|
|
return write_text_file_fully(cfg_file, ss.str()).then([f, k] {
|
|
return make_ready_future<file>(make_encrypted_file(f, k));
|
|
});
|
|
});
|
|
}
|
|
}
|
|
future<> before_delete(const sstring& filename) override {
|
|
auto cfg_file = config_name(filename);
|
|
return file_exists(cfg_file).then([cfg_file](bool b) {
|
|
return b ? remove_file(cfg_file) : make_ready_future();
|
|
});
|
|
}
|
|
};
|
|
|
|
future<seastar::shared_ptr<encryption_context>> register_extensions(const db::config&, std::unique_ptr<encryption_config> cfg_in, db::extensions& exts, const ::service_set& services) {
|
|
auto& cfg = *cfg_in;
|
|
auto ctxt = ::make_shared<encryption_context_impl>(std::move(cfg_in), services);
|
|
// Note: extensions are immutable and shared across shards.
|
|
// Object in them must be stateless. We anchor the context in the
|
|
// extension objects, and while it is not as such 100% stateless,
|
|
// it is close enough.
|
|
exts.add_schema_extension(encryption_attribute, [ctxt](auto v) {
|
|
return encryption_schema_extension::parse(*ctxt, std::move(v));
|
|
});
|
|
exts.add_sstable_file_io_extension(encryption_attribute, std::make_unique<encryption_file_io_extension>(ctxt));
|
|
std::exception_ptr p;
|
|
try {
|
|
auto maybe_get_options = [&](const utils::config_file::string_map& map, const sstring& what) -> std::optional<options> {
|
|
options opts(map.begin(), map.end());
|
|
opt_wrapper get_opt(opts);
|
|
if (!::strcasecmp(get_opt("enabled").value_or("false").c_str(), "false")) {
|
|
return std::nullopt;
|
|
}
|
|
// commitlog/system table encryption/global user encryption should not use replicated keys,
|
|
// We default to local keys, but KMIP/KMS is ok as well (better in fact).
|
|
opts[KEY_PROVIDER] = get_opt(KEY_PROVIDER).value_or(LOCAL_FILE_SYSTEM_KEY_PROVIDER_FACTORY);
|
|
if (opts[KEY_PROVIDER] == LOCAL_FILE_SYSTEM_KEY_PROVIDER_FACTORY && !get_opt(SECRET_KEY_FILE)) {
|
|
// system encryption uses different key folder than user tables.
|
|
// explicitly set the key file path
|
|
opts[SECRET_KEY_FILE] = (bfs::path(cfg.system_key_directory()) / bfs::path("system") / bfs::path(get_opt("key_name").value_or("system_table_keytab"))).string();
|
|
}
|
|
// forbid replicated. we cannot guarantee being able to open sstables on populate
|
|
if (opts[KEY_PROVIDER] == REPLICATED_KEY_PROVIDER_FACTORY) {
|
|
throw std::invalid_argument("Replicated provider is not allowed for " + what);
|
|
}
|
|
return opts;
|
|
};
|
|
|
|
auto opts = maybe_get_options(cfg.system_info_encryption(), "system table encryption");
|
|
|
|
if (opts) {
|
|
logg.info("Adding system info encryption using {}", *opts);
|
|
|
|
exts.add_commitlog_file_extension(encryption_attribute, std::make_unique<encryption_commitlog_file_extension>(ctxt, *opts));
|
|
|
|
// modify schemas for tables holding sensitive data to use encryption w. key described
|
|
// by the opts.
|
|
// since schemas are duplicated across shards, we must call to each shard and augment
|
|
// them all.
|
|
// Since we are in pre-init phase, this should be safe.
|
|
co_await smp::invoke_on_all([&opts, &exts] () mutable {
|
|
auto& f = exts.schema_extensions().at(encryption_attribute);
|
|
for (auto& s : { db::system_keyspace::paxos(), db::system_keyspace::batchlog(), db::system_keyspace::dicts() }) {
|
|
exts.add_extension_to_schema(s, encryption_attribute, f(*opts));
|
|
}
|
|
});
|
|
}
|
|
|
|
if (cfg.config_encryption_active()) {
|
|
co_await ctxt->load_config_encryption_key(cfg.config_encryption_key_name());
|
|
}
|
|
|
|
|
|
if (!cfg.kmip_hosts().empty()) {
|
|
// only pre-create on shard 0.
|
|
co_await parallel_for_each(cfg.kmip_hosts(), [ctxt](auto& p) {
|
|
auto host = ctxt->get_kmip_host(p.first);
|
|
return host->connect();
|
|
});
|
|
}
|
|
|
|
if (!cfg.kms_hosts().empty()) {
|
|
// only pre-create on shard 0.
|
|
co_await parallel_for_each(cfg.kms_hosts(), [ctxt](auto& p) {
|
|
auto host = ctxt->get_kms_host(p.first);
|
|
return host->init();
|
|
});
|
|
}
|
|
|
|
if (!cfg.gcp_hosts().empty()) {
|
|
// only pre-create on shard 0.
|
|
co_await parallel_for_each(cfg.gcp_hosts(), [ctxt](auto& p) {
|
|
auto host = ctxt->get_gcp_host(p.first);
|
|
return host->init();
|
|
});
|
|
}
|
|
|
|
if (!cfg.azure_hosts().empty()) {
|
|
// only pre-create on shard 0.
|
|
co_await parallel_for_each(cfg.azure_hosts(), [ctxt](auto& p) {
|
|
auto host = ctxt->get_azure_host(p.first);
|
|
return host->init();
|
|
});
|
|
}
|
|
|
|
replicated_key_provider_factory::init(exts);
|
|
|
|
auto user_opts = maybe_get_options(cfg.user_info_encryption(), "user table encryption");
|
|
|
|
if (user_opts) {
|
|
logg.info("Adding user info encryption using {}", *user_opts);
|
|
co_await smp::invoke_on_all([&user_opts, ctxt]() {
|
|
auto ext = encryption_schema_extension::create(*ctxt, *user_opts);
|
|
ctxt->add_global_user_encryption(std::move(ext));
|
|
});
|
|
}
|
|
|
|
co_return ctxt;
|
|
} catch (...) {
|
|
p = std::current_exception();
|
|
}
|
|
|
|
/**
|
|
* This only really affects tests, but in the case where we
|
|
* have a bad config/env vars (hint minio), we could fail even
|
|
* setting up the context. In a "normal" run, this is ok. We will
|
|
* report the exception, and the do a exit(1).
|
|
* In tests however, we don't and active context will instead be
|
|
* freed quite proper, in which case we need to call stop to ensure
|
|
* we don't crash on shared pointer destruction on wrong shard.
|
|
* Doing so will hide the real issue from whomever runs the test.
|
|
*/
|
|
assert(p);
|
|
co_await ctxt->stop();
|
|
std::rethrow_exception(p);
|
|
}
|
|
|
|
}
|
|
|