diff --git a/db/config.cc b/db/config.cc index 4f08c5aec6..0d69851699 100644 --- a/db/config.cc +++ b/db/config.cc @@ -931,6 +931,7 @@ db::config::config(std::shared_ptr exts) , wasm_udf_total_fuel(this, "wasm_udf_total_fuel", value_status::Used, 100000000, "Wasmtime fuel a WASM UDF can consume before termination") , wasm_udf_memory_limit(this, "wasm_udf_memory_limit", value_status::Used, 2*1024*1024, "How much memory each WASM UDF can allocate at most") , relabel_config_file(this, "relabel_config_file", value_status::Used, "", "Optionally, read relabel config from file") + , object_storage_config_file(this, "object_storage_config_file", value_status::Used, "", "Optionally, read object-storage endpoints config from file") , minimum_keyspace_rf(this, "minimum_keyspace_rf", liveness::LiveUpdate, value_status::Used, 0, "The minimum allowed replication factor when creating or altering a keyspace.") , default_log_level(this, "default_log_level", value_status::Used) , logger_log_level(this, "logger_log_level", value_status::Used) diff --git a/db/config.hh b/db/config.hh index 38af51803c..ab1f06ea9e 100644 --- a/db/config.hh +++ b/db/config.hh @@ -23,6 +23,8 @@ #include "locator/host_id.hh" #include "gms/inet_address.hh" #include "db/hints/host_filter.hh" +#include "utils/updateable_value.hh" +#include "utils/s3/creds.hh" namespace seastar { class file; @@ -414,6 +416,7 @@ public: named_value wasm_udf_total_fuel; named_value wasm_udf_memory_limit; named_value relabel_config_file; + named_value object_storage_config_file; // wasm_udf_reserved_memory is static because the options in db::config // are parsed using seastar::app_template, while this option is used for // configuring the Seastar memory subsystem. @@ -426,6 +429,7 @@ public: const db::extensions& extensions() const; locator::host_id host_id; + utils::updateable_value> object_storage_config; static const sstring default_tls_priority; private: diff --git a/main.cc b/main.cc index ad11f1a473..e91fa2699f 100644 --- a/main.cc +++ b/main.cc @@ -8,6 +8,7 @@ #include #include +#include #include #include "tasks/task_manager.hh" @@ -148,6 +149,64 @@ public: sharded& as_sharded_abort_source() { return _abort_sources; } }; +struct object_storage_endpoint_param { + sstring endpoint; + s3::endpoint_config config; +}; + +namespace YAML { +template<> +struct convert<::object_storage_endpoint_param> { + static bool decode(const Node& node, ::object_storage_endpoint_param& ep) { + ep.endpoint = node["name"].as(); + return true; + } +}; +} + +static future<> read_object_storage_config(db::config& db_cfg) { + sstring cfg_name; + if (!db_cfg.object_storage_config_file().empty()) { + cfg_name = db_cfg.object_storage_config_file(); + } else { + cfg_name = db::config::get_conf_sub("object_storage.yaml").native(); + if (!co_await file_accessible(cfg_name, access_flags::exists)) { + co_return; + } + } + + auto cfg_file = co_await open_file_dma(cfg_name, open_flags::ro); + sstring data; + std::exception_ptr ex; + + try { + auto sz = co_await cfg_file.size(); + data = seastar::to_sstring(co_await cfg_file.dma_read_exactly(0, sz)); + } catch (...) { + ex = std::current_exception(); + } + co_await cfg_file.close(); + if (ex) { + co_await coroutine::return_exception_ptr(ex); + } + + std::unordered_map cfg; + YAML::Node doc = YAML::Load(data.c_str()); + for (auto&& section : doc) { + auto sec_name = section.first.as(); + if (sec_name != "endpoints") { + co_await coroutine::return_exception(std::runtime_error(fmt::format("While parsing object_storage config: section {} currently unsupported.", sec_name))); + } + + auto endpoints = section.second.as>(); + for (auto&& ep : endpoints) { + cfg[ep.endpoint] = std::move(ep.config); + } + } + + db_cfg.object_storage_config = std::move(cfg); +} + static future<> read_config(bpo::variables_map& opts, db::config& cfg) { sstring file; @@ -164,6 +223,8 @@ read_config(bpo::variables_map& opts, db::config& cfg) { level = log_level::error; } startlog.log(level, "{} : {}", msg, opt); + }).then([&cfg] { + return read_object_storage_config(cfg); }); }).handle_exception([file](auto ep) { startlog.error("Could not read configuration file {}: {}", file, ep); diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index fd483b364d..731a307a81 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -17,6 +17,7 @@ #include "gms/feature_service.hh" #include "repair/row_level.hh" #include "replica/compaction_group.hh" +#include "utils/overloaded_functor.hh" #include #include #include @@ -161,12 +162,25 @@ std::unique_ptr make_db_config(sstring temp_dir) { return cfg; } +std::unordered_map make_storage_options_config(const data_dictionary::storage_options& so) { + std::unordered_map cfg; + std::visit(overloaded_functor { + [] (const data_dictionary::storage_options::local& loc) mutable -> void { + }, + [&cfg] (const data_dictionary::storage_options::s3& os) mutable -> void { + cfg[os.endpoint] = make_lw_shared(s3::endpoint_config { + }); + } + }, so.value); + return cfg; +} + test_env::impl::impl(test_env_config cfg) : dir() , db_config(make_db_config(dir.path().native())) , dir_sem(1) , feature_service(gms::feature_config_from_db_config(*db_config)) - , mgr(cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, feature_service, cache_tracker, memory::stats().total_memory(), dir_sem) + , mgr(cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, feature_service, cache_tracker, memory::stats().total_memory(), dir_sem, make_storage_options_config(cfg.storage)) , semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env") , storage(std::move(cfg.storage)) { } diff --git a/test/object_store/run b/test/object_store/run index 3966ecf1f1..8c18b197be 100755 --- a/test/object_store/run +++ b/test/object_store/run @@ -8,6 +8,7 @@ import atexit import os import requests import shutil +import yaml print('Scylla is: ' + run.find_scylla() + '.') success = True @@ -22,10 +23,26 @@ else: test_tempdir = run.pid_to_dir(os.getpid()) os.mkdir(test_tempdir) +s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST'] +s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST'] def get_tempdir(pid): return test_tempdir +with open(test_tempdir + '/object_storage.yaml', 'w') as config_file: + yaml.dump({ 'endpoints': [ + { + 'name': s3_server_address, + } + ] + }, config_file) + +def run_scylla_cmd(pid, dir): + global cmd + (c, e) = cmd(pid, dir) + c += ['--object-storage-config-file', test_tempdir + '/object_storage.yaml'] + return (c, e) + def teardown(pid): print('Kill scylla') sys.stdout.flush() @@ -33,13 +50,11 @@ def teardown(pid): shutil.copyfileobj(log, sys.stdout.buffer) print(f'Start scylla (dir={test_tempdir}') -pid = run.run_with_generated_dir(cmd, get_tempdir) +pid = run.run_with_generated_dir(run_scylla_cmd, get_tempdir) atexit.register(lambda: teardown(pid)) ip = run.pid_to_ip(pid) run.wait_for_services(pid, [ lambda: check_cql(ip) ]) -s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST'] -s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST'] print(f'Create keyspace (minio listening at {s3_server_address})') cluster = run.get_cql_cluster(ip) @@ -69,7 +84,7 @@ for row in res: cluster.shutdown() print('Restart scylla') -pid = run.restart_with_dir(pid, cmd, test_tempdir) +pid = run.restart_with_dir(pid, run_scylla_cmd, test_tempdir) ip = run.pid_to_ip(pid) run.wait_for_services(pid, [ lambda: check_cql(ip) ]) diff --git a/utils/s3/creds.hh b/utils/s3/creds.hh new file mode 100644 index 0000000000..6f55d1f0aa --- /dev/null +++ b/utils/s3/creds.hh @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include + +namespace s3 { + +struct endpoint_config { +}; + +using endpoint_config_ptr = seastar::lw_shared_ptr; + +} // s3 namespace