From 2f6aa5b52ecdd3879cdbbbfcde576a54caa90f8f Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 12 Apr 2023 18:07:58 +0300 Subject: [PATCH] code: Introduce conf/object_storage.yaml configuration file In order to access real S3 bucket, the client should use signed requests over https. Partially this is due to security considerations, partially this is unavoidable, because multipart-uploading is banned for unsigned requests on the S3. Also, signed requests over plain http require signing the payload as well, which is a bit troublesome, so it's better to stick to secure https and keep payload unsigned. To prepare signed requests the code needs to know three things: - aws key - aws secret - aws region name The latter could be derived from the endpoint URL, but it's simpler to configure it explicitly, all the more so there's an option to use S3 URLs without region name in them we could want to use some time. To keep the described configuration the proposed place is the object_storage.yaml file with the format endpoints: - name: a.b.c port: 443 aws_key: 12345 aws_secret: abcdefghijklmnop ... When loaded, the map gets into db::config and later will be propagated down to sstables code (see next patch). Signed-off-by: Pavel Emelyanov --- db/config.cc | 1 + db/config.hh | 4 +++ main.cc | 61 +++++++++++++++++++++++++++++++++++++++ test/lib/test_services.cc | 16 +++++++++- test/object_store/run | 23 ++++++++++++--- utils/s3/creds.hh | 20 +++++++++++++ 6 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 utils/s3/creds.hh diff --git a/db/config.cc b/db/config.cc index 4f08c5aec6..0d69851699 100644 --- a/db/config.cc +++ b/db/config.cc @@ -931,6 +931,7 @@ db::config::config(std::shared_ptr exts) , wasm_udf_total_fuel(this, "wasm_udf_total_fuel", value_status::Used, 100000000, "Wasmtime fuel a WASM UDF can consume before termination") , wasm_udf_memory_limit(this, "wasm_udf_memory_limit", value_status::Used, 2*1024*1024, "How much memory each WASM UDF can allocate at most") , relabel_config_file(this, "relabel_config_file", value_status::Used, "", "Optionally, read relabel config from file") + , object_storage_config_file(this, "object_storage_config_file", value_status::Used, "", "Optionally, read object-storage endpoints config from file") , minimum_keyspace_rf(this, "minimum_keyspace_rf", liveness::LiveUpdate, value_status::Used, 0, "The minimum allowed replication factor when creating or altering a keyspace.") , default_log_level(this, "default_log_level", value_status::Used) , logger_log_level(this, "logger_log_level", value_status::Used) diff --git a/db/config.hh b/db/config.hh index 38af51803c..ab1f06ea9e 100644 --- a/db/config.hh +++ b/db/config.hh @@ -23,6 +23,8 @@ #include "locator/host_id.hh" #include "gms/inet_address.hh" #include "db/hints/host_filter.hh" +#include "utils/updateable_value.hh" +#include "utils/s3/creds.hh" namespace seastar { class file; @@ -414,6 +416,7 @@ public: named_value wasm_udf_total_fuel; named_value wasm_udf_memory_limit; named_value relabel_config_file; + named_value object_storage_config_file; // wasm_udf_reserved_memory is static because the options in db::config // are parsed using seastar::app_template, while this option is used for // configuring the Seastar memory subsystem. @@ -426,6 +429,7 @@ public: const db::extensions& extensions() const; locator::host_id host_id; + utils::updateable_value> object_storage_config; static const sstring default_tls_priority; private: diff --git a/main.cc b/main.cc index ad11f1a473..e91fa2699f 100644 --- a/main.cc +++ b/main.cc @@ -8,6 +8,7 @@ #include #include +#include #include #include "tasks/task_manager.hh" @@ -148,6 +149,64 @@ public: sharded& as_sharded_abort_source() { return _abort_sources; } }; +struct object_storage_endpoint_param { + sstring endpoint; + s3::endpoint_config config; +}; + +namespace YAML { +template<> +struct convert<::object_storage_endpoint_param> { + static bool decode(const Node& node, ::object_storage_endpoint_param& ep) { + ep.endpoint = node["name"].as(); + return true; + } +}; +} + +static future<> read_object_storage_config(db::config& db_cfg) { + sstring cfg_name; + if (!db_cfg.object_storage_config_file().empty()) { + cfg_name = db_cfg.object_storage_config_file(); + } else { + cfg_name = db::config::get_conf_sub("object_storage.yaml").native(); + if (!co_await file_accessible(cfg_name, access_flags::exists)) { + co_return; + } + } + + auto cfg_file = co_await open_file_dma(cfg_name, open_flags::ro); + sstring data; + std::exception_ptr ex; + + try { + auto sz = co_await cfg_file.size(); + data = seastar::to_sstring(co_await cfg_file.dma_read_exactly(0, sz)); + } catch (...) { + ex = std::current_exception(); + } + co_await cfg_file.close(); + if (ex) { + co_await coroutine::return_exception_ptr(ex); + } + + std::unordered_map cfg; + YAML::Node doc = YAML::Load(data.c_str()); + for (auto&& section : doc) { + auto sec_name = section.first.as(); + if (sec_name != "endpoints") { + co_await coroutine::return_exception(std::runtime_error(fmt::format("While parsing object_storage config: section {} currently unsupported.", sec_name))); + } + + auto endpoints = section.second.as>(); + for (auto&& ep : endpoints) { + cfg[ep.endpoint] = std::move(ep.config); + } + } + + db_cfg.object_storage_config = std::move(cfg); +} + static future<> read_config(bpo::variables_map& opts, db::config& cfg) { sstring file; @@ -164,6 +223,8 @@ read_config(bpo::variables_map& opts, db::config& cfg) { level = log_level::error; } startlog.log(level, "{} : {}", msg, opt); + }).then([&cfg] { + return read_object_storage_config(cfg); }); }).handle_exception([file](auto ep) { startlog.error("Could not read configuration file {}: {}", file, ep); diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index fd483b364d..731a307a81 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -17,6 +17,7 @@ #include "gms/feature_service.hh" #include "repair/row_level.hh" #include "replica/compaction_group.hh" +#include "utils/overloaded_functor.hh" #include #include #include @@ -161,12 +162,25 @@ std::unique_ptr make_db_config(sstring temp_dir) { return cfg; } +std::unordered_map make_storage_options_config(const data_dictionary::storage_options& so) { + std::unordered_map cfg; + std::visit(overloaded_functor { + [] (const data_dictionary::storage_options::local& loc) mutable -> void { + }, + [&cfg] (const data_dictionary::storage_options::s3& os) mutable -> void { + cfg[os.endpoint] = make_lw_shared(s3::endpoint_config { + }); + } + }, so.value); + return cfg; +} + test_env::impl::impl(test_env_config cfg) : dir() , db_config(make_db_config(dir.path().native())) , dir_sem(1) , feature_service(gms::feature_config_from_db_config(*db_config)) - , mgr(cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, feature_service, cache_tracker, memory::stats().total_memory(), dir_sem) + , mgr(cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, feature_service, cache_tracker, memory::stats().total_memory(), dir_sem, make_storage_options_config(cfg.storage)) , semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env") , storage(std::move(cfg.storage)) { } diff --git a/test/object_store/run b/test/object_store/run index 3966ecf1f1..8c18b197be 100755 --- a/test/object_store/run +++ b/test/object_store/run @@ -8,6 +8,7 @@ import atexit import os import requests import shutil +import yaml print('Scylla is: ' + run.find_scylla() + '.') success = True @@ -22,10 +23,26 @@ else: test_tempdir = run.pid_to_dir(os.getpid()) os.mkdir(test_tempdir) +s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST'] +s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST'] def get_tempdir(pid): return test_tempdir +with open(test_tempdir + '/object_storage.yaml', 'w') as config_file: + yaml.dump({ 'endpoints': [ + { + 'name': s3_server_address, + } + ] + }, config_file) + +def run_scylla_cmd(pid, dir): + global cmd + (c, e) = cmd(pid, dir) + c += ['--object-storage-config-file', test_tempdir + '/object_storage.yaml'] + return (c, e) + def teardown(pid): print('Kill scylla') sys.stdout.flush() @@ -33,13 +50,11 @@ def teardown(pid): shutil.copyfileobj(log, sys.stdout.buffer) print(f'Start scylla (dir={test_tempdir}') -pid = run.run_with_generated_dir(cmd, get_tempdir) +pid = run.run_with_generated_dir(run_scylla_cmd, get_tempdir) atexit.register(lambda: teardown(pid)) ip = run.pid_to_ip(pid) run.wait_for_services(pid, [ lambda: check_cql(ip) ]) -s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST'] -s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST'] print(f'Create keyspace (minio listening at {s3_server_address})') cluster = run.get_cql_cluster(ip) @@ -69,7 +84,7 @@ for row in res: cluster.shutdown() print('Restart scylla') -pid = run.restart_with_dir(pid, cmd, test_tempdir) +pid = run.restart_with_dir(pid, run_scylla_cmd, test_tempdir) ip = run.pid_to_ip(pid) run.wait_for_services(pid, [ lambda: check_cql(ip) ]) diff --git a/utils/s3/creds.hh b/utils/s3/creds.hh new file mode 100644 index 0000000000..6f55d1f0aa --- /dev/null +++ b/utils/s3/creds.hh @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include + +namespace s3 { + +struct endpoint_config { +}; + +using endpoint_config_ptr = seastar::lw_shared_ptr; + +} // s3 namespace