mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
code: Introduce conf/object_storage.yaml configuration file
In order to access real S3 bucket, the client should use signed requests
over https. Partially this is due to security considerations, partially
this is unavoidable, because multipart-uploading is banned for unsigned
requests on the S3. Also, signed requests over plain http require
signing the payload as well, which is a bit troublesome, so it's better
to stick to secure https and keep payload unsigned.
To prepare signed requests the code needs to know three things:
- aws key
- aws secret
- aws region name
The latter could be derived from the endpoint URL, but it's simpler to
configure it explicitly, all the more so there's an option to use S3
URLs without region name in them we could want to use some time.
To keep the described configuration the proposed place is the
object_storage.yaml file with the format
endpoints:
- name: a.b.c
port: 443
aws_key: 12345
aws_secret: abcdefghijklmnop
...
When loaded, the map gets into db::config and later will be propagated
down to sstables code (see next patch).
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -931,6 +931,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, wasm_udf_total_fuel(this, "wasm_udf_total_fuel", value_status::Used, 100000000, "Wasmtime fuel a WASM UDF can consume before termination")
|
||||
, wasm_udf_memory_limit(this, "wasm_udf_memory_limit", value_status::Used, 2*1024*1024, "How much memory each WASM UDF can allocate at most")
|
||||
, relabel_config_file(this, "relabel_config_file", value_status::Used, "", "Optionally, read relabel config from file")
|
||||
, object_storage_config_file(this, "object_storage_config_file", value_status::Used, "", "Optionally, read object-storage endpoints config from file")
|
||||
, minimum_keyspace_rf(this, "minimum_keyspace_rf", liveness::LiveUpdate, value_status::Used, 0, "The minimum allowed replication factor when creating or altering a keyspace.")
|
||||
, default_log_level(this, "default_log_level", value_status::Used)
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used)
|
||||
|
||||
@@ -23,6 +23,8 @@
|
||||
#include "locator/host_id.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "db/hints/host_filter.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/s3/creds.hh"
|
||||
|
||||
namespace seastar {
|
||||
class file;
|
||||
@@ -414,6 +416,7 @@ public:
|
||||
named_value<uint64_t> wasm_udf_total_fuel;
|
||||
named_value<size_t> wasm_udf_memory_limit;
|
||||
named_value<sstring> relabel_config_file;
|
||||
named_value<sstring> object_storage_config_file;
|
||||
// wasm_udf_reserved_memory is static because the options in db::config
|
||||
// are parsed using seastar::app_template, while this option is used for
|
||||
// configuring the Seastar memory subsystem.
|
||||
@@ -426,6 +429,7 @@ public:
|
||||
const db::extensions& extensions() const;
|
||||
|
||||
locator::host_id host_id;
|
||||
utils::updateable_value<std::unordered_map<sstring, s3::endpoint_config>> object_storage_config;
|
||||
|
||||
static const sstring default_tls_priority;
|
||||
private:
|
||||
|
||||
61
main.cc
61
main.cc
@@ -8,6 +8,7 @@
|
||||
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "tasks/task_manager.hh"
|
||||
@@ -148,6 +149,64 @@ public:
|
||||
sharded<abort_source>& as_sharded_abort_source() { return _abort_sources; }
|
||||
};
|
||||
|
||||
struct object_storage_endpoint_param {
|
||||
sstring endpoint;
|
||||
s3::endpoint_config config;
|
||||
};
|
||||
|
||||
namespace YAML {
|
||||
template<>
|
||||
struct convert<::object_storage_endpoint_param> {
|
||||
static bool decode(const Node& node, ::object_storage_endpoint_param& ep) {
|
||||
ep.endpoint = node["name"].as<std::string>();
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static future<> read_object_storage_config(db::config& db_cfg) {
|
||||
sstring cfg_name;
|
||||
if (!db_cfg.object_storage_config_file().empty()) {
|
||||
cfg_name = db_cfg.object_storage_config_file();
|
||||
} else {
|
||||
cfg_name = db::config::get_conf_sub("object_storage.yaml").native();
|
||||
if (!co_await file_accessible(cfg_name, access_flags::exists)) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
|
||||
auto cfg_file = co_await open_file_dma(cfg_name, open_flags::ro);
|
||||
sstring data;
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
auto sz = co_await cfg_file.size();
|
||||
data = seastar::to_sstring(co_await cfg_file.dma_read_exactly<char>(0, sz));
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await cfg_file.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(ex);
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, s3::endpoint_config> cfg;
|
||||
YAML::Node doc = YAML::Load(data.c_str());
|
||||
for (auto&& section : doc) {
|
||||
auto sec_name = section.first.as<std::string>();
|
||||
if (sec_name != "endpoints") {
|
||||
co_await coroutine::return_exception(std::runtime_error(fmt::format("While parsing object_storage config: section {} currently unsupported.", sec_name)));
|
||||
}
|
||||
|
||||
auto endpoints = section.second.as<std::vector<object_storage_endpoint_param>>();
|
||||
for (auto&& ep : endpoints) {
|
||||
cfg[ep.endpoint] = std::move(ep.config);
|
||||
}
|
||||
}
|
||||
|
||||
db_cfg.object_storage_config = std::move(cfg);
|
||||
}
|
||||
|
||||
static future<>
|
||||
read_config(bpo::variables_map& opts, db::config& cfg) {
|
||||
sstring file;
|
||||
@@ -164,6 +223,8 @@ read_config(bpo::variables_map& opts, db::config& cfg) {
|
||||
level = log_level::error;
|
||||
}
|
||||
startlog.log(level, "{} : {}", msg, opt);
|
||||
}).then([&cfg] {
|
||||
return read_object_storage_config(cfg);
|
||||
});
|
||||
}).handle_exception([file](auto ep) {
|
||||
startlog.error("Could not read configuration file {}: {}", file, ep);
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#include "gms/feature_service.hh"
|
||||
#include "repair/row_level.hh"
|
||||
#include "replica/compaction_group.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include <boost/program_options.hpp>
|
||||
#include <iostream>
|
||||
#include <seastar/util/defer.hh>
|
||||
@@ -161,12 +162,25 @@ std::unique_ptr<db::config> make_db_config(sstring temp_dir) {
|
||||
return cfg;
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, s3::endpoint_config_ptr> make_storage_options_config(const data_dictionary::storage_options& so) {
|
||||
std::unordered_map<sstring, s3::endpoint_config_ptr> cfg;
|
||||
std::visit(overloaded_functor {
|
||||
[] (const data_dictionary::storage_options::local& loc) mutable -> void {
|
||||
},
|
||||
[&cfg] (const data_dictionary::storage_options::s3& os) mutable -> void {
|
||||
cfg[os.endpoint] = make_lw_shared<s3::endpoint_config>(s3::endpoint_config {
|
||||
});
|
||||
}
|
||||
}, so.value);
|
||||
return cfg;
|
||||
}
|
||||
|
||||
test_env::impl::impl(test_env_config cfg)
|
||||
: dir()
|
||||
, db_config(make_db_config(dir.path().native()))
|
||||
, dir_sem(1)
|
||||
, feature_service(gms::feature_config_from_db_config(*db_config))
|
||||
, mgr(cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, feature_service, cache_tracker, memory::stats().total_memory(), dir_sem)
|
||||
, mgr(cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, feature_service, cache_tracker, memory::stats().total_memory(), dir_sem, make_storage_options_config(cfg.storage))
|
||||
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env")
|
||||
, storage(std::move(cfg.storage))
|
||||
{ }
|
||||
|
||||
@@ -8,6 +8,7 @@ import atexit
|
||||
import os
|
||||
import requests
|
||||
import shutil
|
||||
import yaml
|
||||
|
||||
print('Scylla is: ' + run.find_scylla() + '.')
|
||||
success = True
|
||||
@@ -22,10 +23,26 @@ else:
|
||||
|
||||
test_tempdir = run.pid_to_dir(os.getpid())
|
||||
os.mkdir(test_tempdir)
|
||||
s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST']
|
||||
s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST']
|
||||
|
||||
def get_tempdir(pid):
|
||||
return test_tempdir
|
||||
|
||||
with open(test_tempdir + '/object_storage.yaml', 'w') as config_file:
|
||||
yaml.dump({ 'endpoints': [
|
||||
{
|
||||
'name': s3_server_address,
|
||||
}
|
||||
]
|
||||
}, config_file)
|
||||
|
||||
def run_scylla_cmd(pid, dir):
|
||||
global cmd
|
||||
(c, e) = cmd(pid, dir)
|
||||
c += ['--object-storage-config-file', test_tempdir + '/object_storage.yaml']
|
||||
return (c, e)
|
||||
|
||||
def teardown(pid):
|
||||
print('Kill scylla')
|
||||
sys.stdout.flush()
|
||||
@@ -33,13 +50,11 @@ def teardown(pid):
|
||||
shutil.copyfileobj(log, sys.stdout.buffer)
|
||||
|
||||
print(f'Start scylla (dir={test_tempdir}')
|
||||
pid = run.run_with_generated_dir(cmd, get_tempdir)
|
||||
pid = run.run_with_generated_dir(run_scylla_cmd, get_tempdir)
|
||||
atexit.register(lambda: teardown(pid))
|
||||
|
||||
ip = run.pid_to_ip(pid)
|
||||
run.wait_for_services(pid, [ lambda: check_cql(ip) ])
|
||||
s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST']
|
||||
s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST']
|
||||
|
||||
print(f'Create keyspace (minio listening at {s3_server_address})')
|
||||
cluster = run.get_cql_cluster(ip)
|
||||
@@ -69,7 +84,7 @@ for row in res:
|
||||
cluster.shutdown()
|
||||
|
||||
print('Restart scylla')
|
||||
pid = run.restart_with_dir(pid, cmd, test_tempdir)
|
||||
pid = run.restart_with_dir(pid, run_scylla_cmd, test_tempdir)
|
||||
ip = run.pid_to_ip(pid)
|
||||
run.wait_for_services(pid, [ lambda: check_cql(ip) ])
|
||||
|
||||
|
||||
20
utils/s3/creds.hh
Normal file
20
utils/s3/creds.hh
Normal file
@@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
namespace s3 {
|
||||
|
||||
struct endpoint_config {
|
||||
};
|
||||
|
||||
using endpoint_config_ptr = seastar::lw_shared_ptr<endpoint_config>;
|
||||
|
||||
} // s3 namespace
|
||||
Reference in New Issue
Block a user