Add components_digests struct with optional digest fields for storing CRC32 digests of individual SSTable components in Scylla metadata. Those includes: - Data - Compression - Filter - Statistics - Summary - Index - TOC - Partitions - Rows
2888 lines
127 KiB
C++
2888 lines
127 KiB
C++
/*
|
|
* Copyright (C) 2021-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <boost/algorithm/string.hpp>
|
|
#include <filesystem>
|
|
#include <set>
|
|
#include <fmt/chrono.h>
|
|
#include <fmt/ostream.h>
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/units.hh>
|
|
#include <seastar/http/short_streams.hh>
|
|
#include <seastar/util/closeable.hh>
|
|
|
|
#include "init.hh"
|
|
#include "compaction/compaction.hh"
|
|
#include "compaction/compaction_strategy.hh"
|
|
#include "compaction/compaction_strategy_state.hh"
|
|
#include "cql3/statements/raw/parsed_statement.hh"
|
|
#include "cql3/statements/modification_statement.hh"
|
|
#include "cql3/statements/select_statement.hh"
|
|
#include "db/config.hh"
|
|
#include "db/extensions.hh"
|
|
#include "db/large_data_handler.hh"
|
|
#include "db/corrupt_data_handler.hh"
|
|
#include "db/object_storage_endpoint_param.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "reader_concurrency_semaphore.hh"
|
|
#include "readers/combined.hh"
|
|
#include "readers/generating.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "sstables/index_reader.hh"
|
|
#include "sstables/sstables_manager.hh"
|
|
#include "sstables/sstable_directory.hh"
|
|
#include "sstables/open_info.hh"
|
|
#include "release.hh"
|
|
#include "replica/schema_describe_helper.hh"
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "tools/json_writer.hh"
|
|
#include "tools/json_mutation_stream_parser.hh"
|
|
#include "tools/load_system_tablets.hh"
|
|
#include "tools/lua_sstable_consumer.hh"
|
|
#include "tools/schema_loader.hh"
|
|
#include "tools/sstable_consumer.hh"
|
|
#include "tools/utils.hh"
|
|
#include "types/json_utils.hh"
|
|
#include "locator/host_id.hh"
|
|
|
|
using namespace seastar;
|
|
using namespace sstables;
|
|
|
|
using json_writer = mutation_json::json_writer;
|
|
|
|
namespace bpo = boost::program_options;
|
|
|
|
using namespace tools::utils;
|
|
|
|
using operation_func = void(*)(schema_ptr, reader_permit, const std::vector<sstables::shared_sstable>&, sstables::sstables_manager&, const db::config&, const bpo::variables_map&);
|
|
|
|
namespace std {
|
|
// required by boost::lexical_cast<std::string>(vector<string>), which is in turn used
|
|
// by boost::program_option for printing out the default value of an option
|
|
std::ostream& operator<<(std::ostream& os, const std::vector<sstring>& v) {
|
|
return os << fmt::format("{}", v);
|
|
}
|
|
}
|
|
|
|
namespace {
|
|
|
|
const auto app_name = "sstable";
|
|
|
|
logging::logger sst_log(format("scylla-{}", app_name));
|
|
|
|
struct decorated_key_hash {
|
|
std::size_t operator()(const dht::decorated_key& dk) const {
|
|
return dht::token::to_int64(dk.token());
|
|
}
|
|
};
|
|
|
|
struct decorated_key_equal {
|
|
const schema& _s;
|
|
explicit decorated_key_equal(const schema& s) : _s(s) {
|
|
}
|
|
bool operator()(const dht::decorated_key& a, const dht::decorated_key& b) const {
|
|
return a.equal(_s, b);
|
|
}
|
|
};
|
|
|
|
using partition_set = std::unordered_set<dht::decorated_key, decorated_key_hash, decorated_key_equal>;
|
|
|
|
template <typename T>
|
|
using partition_map = std::unordered_map<dht::decorated_key, T, decorated_key_hash, decorated_key_equal>;
|
|
|
|
partition_set get_partitions(schema_ptr schema, const bpo::variables_map& app_config) {
|
|
partition_set partitions(app_config.count("partition"), {}, decorated_key_equal(*schema));
|
|
auto pk_type = schema->partition_key_type();
|
|
|
|
auto dk_from_hex = [&] (std::string_view hex) {
|
|
auto pk = partition_key::from_exploded(pk_type->components(managed_bytes_view(from_hex(hex))));
|
|
return dht::decorate_key(*schema, std::move(pk));
|
|
};
|
|
|
|
if (app_config.count("partition")) {
|
|
for (const auto& pk_hex : app_config["partition"].as<std::vector<sstring>>()) {
|
|
partitions.emplace(dk_from_hex(pk_hex));
|
|
}
|
|
}
|
|
|
|
if (app_config.count("partitions-file")) {
|
|
auto file = open_file_dma(app_config["partitions-file"].as<sstring>(), open_flags::ro).get();
|
|
auto fstream = make_file_input_stream(file);
|
|
|
|
temporary_buffer<char> pk_buf;
|
|
while (auto buf = fstream.read().get()) {
|
|
do {
|
|
const auto it = std::find_if(buf.begin(), buf.end(), [] (char c) { return std::isspace(c); });
|
|
const auto len = it - buf.begin();
|
|
if (!len && !pk_buf) {
|
|
buf.trim_front(1); // discard extra leading whitespace
|
|
continue;
|
|
}
|
|
if (pk_buf) {
|
|
auto new_buf = temporary_buffer<char>(pk_buf.size() + len);
|
|
auto ot = new_buf.get_write();
|
|
ot = std::copy_n(pk_buf.begin(), pk_buf.size(), ot);
|
|
std::copy_n(buf.begin(), len, ot);
|
|
pk_buf = std::move(new_buf);
|
|
} else {
|
|
pk_buf = buf.share(0, len);
|
|
}
|
|
buf.trim_front(len);
|
|
if (it != buf.end()) {
|
|
partitions.emplace(dk_from_hex(std::string_view(pk_buf.begin(), pk_buf.size())));
|
|
pk_buf = {};
|
|
buf.trim_front(1); // remove the newline
|
|
}
|
|
thread::maybe_yield();
|
|
} while (buf);
|
|
}
|
|
if (!pk_buf.empty()) { // last line might not have EOL
|
|
partitions.emplace(dk_from_hex(std::string_view(pk_buf.begin(), pk_buf.size())));
|
|
}
|
|
}
|
|
|
|
if (!partitions.empty()) {
|
|
sst_log.info("filtering enabled, {} partition(s) to filter for", partitions.size());
|
|
}
|
|
|
|
return partitions;
|
|
}
|
|
|
|
struct sstable_path_info {
|
|
std::filesystem::path sstable_path;
|
|
std::filesystem::path data_dir_path;
|
|
sstring keyspace;
|
|
sstring table;
|
|
};
|
|
|
|
sstable_path_info extract_from_sstable_path(const bpo::variables_map& app_config) {
|
|
if (!app_config.count("sstables")) {
|
|
throw std::invalid_argument("cannot extract information from sstable path, no sstable arguments");
|
|
}
|
|
|
|
auto sst_path = std::filesystem::path(app_config["sstables"].as<std::vector<sstring>>().front());
|
|
sstring keyspace, table;
|
|
try {
|
|
auto [_, ks, tbl] = sstables::parse_path(sst_path);
|
|
keyspace = std::move(ks);
|
|
table = std::move(tbl);
|
|
} catch (const sstables::malformed_sstable_exception&) {
|
|
throw std::invalid_argument(fmt::format("cannot extract information from sstable path, sstable has invalid path: {}", sst_path));
|
|
}
|
|
const auto sst_dir_path = std::filesystem::path(sst_path).remove_filename();
|
|
std::filesystem::path data_dir_path;
|
|
// Detect whether sstable is in root table directory, or in a sub-directory
|
|
// The last component is "" due to the trailing "/" left by "remove_filename()" above.
|
|
// So we need to go back 2 more, to find the supposed keyspace component.
|
|
if (keyspace == std::prev(sst_dir_path.end(), 3)->native()) {
|
|
data_dir_path = sst_dir_path / ".." / "..";
|
|
} else {
|
|
data_dir_path = sst_dir_path / ".." / ".." / "..";
|
|
}
|
|
|
|
return sstable_path_info{std::move(sst_path), std::move(data_dir_path), std::move(keyspace), std::move(table)};
|
|
}
|
|
|
|
std::pair<sstring, sstring> get_keyspace_and_table_options(const bpo::variables_map& app_config) {
|
|
sstring keyspace_name, table_name;
|
|
auto k_it = app_config.find("keyspace");
|
|
auto t_it = app_config.find("table");
|
|
if (k_it != app_config.end() || t_it != app_config.end()) {
|
|
return std::pair(k_it->second.as<sstring>(), t_it->second.as<sstring>());
|
|
}
|
|
|
|
try {
|
|
auto info = extract_from_sstable_path(app_config);
|
|
return std::pair(info.keyspace, info.table);
|
|
} catch (...) {
|
|
throw std::invalid_argument("don't know which schema to load: no --keyspace and --table provided, failed to extract keyspace/table from sstable paths");
|
|
}
|
|
}
|
|
|
|
struct path_with_source {
|
|
fs::path path;
|
|
sstring source;
|
|
};
|
|
|
|
path_with_source obtain_data_dir(const bpo::variables_map& app_config, db::config& cfg) {
|
|
if (app_config.contains("scylla-data-dir")) {
|
|
return {.path = fs::path(app_config["scylla-data-dir"].as<sstring>()), .source = "--scylla-data-dir parameter"};
|
|
} else if (app_config.contains("scylla-yaml-file")) {
|
|
return {.path = fs::path(cfg.data_file_directories()[0]), .source = "--scylla-yaml-file parameter"};
|
|
} else if (std::getenv("SCYLLA_CONF")) {
|
|
return {.path = fs::path(cfg.data_file_directories()[0]), .source = "SCYLLA_CONF environment variable"};
|
|
} else if (std::getenv("SCYLLA_HOME")) {
|
|
return {.path = fs::path(cfg.data_file_directories()[0]), .source = "SCYLLA_HOME environment variable"};
|
|
} else {
|
|
const auto info = extract_from_sstable_path(app_config);
|
|
return {.path = info.data_dir_path, .source = seastar::format("autodetected from sstable path ({})", info.sstable_path.native())};
|
|
}
|
|
}
|
|
|
|
struct schema_with_source {
|
|
schema_ptr schema;
|
|
sstring source;
|
|
std::optional<fs::path> path;
|
|
sstring obtained_from;
|
|
};
|
|
|
|
std::optional<schema_with_source> try_load_schema_from_user_provided_source(const bpo::variables_map& app_config, db::config& cfg) {
|
|
sstring schema_source_opt;
|
|
try {
|
|
if (!app_config["schema-file"].defaulted()) {
|
|
schema_source_opt = "schema-file";
|
|
const auto schema_file_path = std::filesystem::path(app_config["schema-file"].as<sstring>());
|
|
return schema_with_source{.schema = tools::load_one_schema_from_file(cfg, schema_file_path).get(),
|
|
.source = schema_source_opt,
|
|
.path = schema_file_path,
|
|
.obtained_from = "--schema-file parameter"};
|
|
}
|
|
// All the below schema sources require this.
|
|
const auto [keyspace_name, table_name] = get_keyspace_and_table_options(app_config);
|
|
if (app_config.contains("system-schema")) {
|
|
schema_source_opt = "system-schema";
|
|
return schema_with_source{.schema = tools::load_system_schema(cfg, keyspace_name, table_name),
|
|
.source = schema_source_opt,
|
|
.obtained_from = "--system-schema parameter"};
|
|
}
|
|
if (app_config.contains("schema-tables")) {
|
|
const auto path_with_source = obtain_data_dir(app_config, cfg);
|
|
schema_source_opt = "schema-tables";
|
|
return schema_with_source{.schema = tools::load_schema_from_schema_tables(cfg, path_with_source.path, keyspace_name, table_name).get(),
|
|
.source= schema_source_opt,
|
|
.path = path_with_source.path,
|
|
.obtained_from = format("--schema-tables parameter (data-dir path obtained via {})", path_with_source.source)};
|
|
}
|
|
if (app_config.contains("sstable-schema")) {
|
|
const auto sst_path = fs::path(app_config["sstables"].as<std::vector<sstring>>().front());
|
|
return schema_with_source{.schema = tools::load_schema_from_sstable(cfg, sst_path, keyspace_name, table_name).get(),
|
|
.source= schema_source_opt,
|
|
.path = sst_path,
|
|
.obtained_from = "--sstable-schema parameter"};
|
|
}
|
|
} catch (...) {
|
|
fmt::print(std::cerr, "error processing arguments: could not load schema via {}: {}\n", schema_source_opt, std::current_exception());
|
|
return {};
|
|
}
|
|
// Should not happen, but if it does (we all know it will), let's at least have a message printed.
|
|
fmt::print(std::cerr, "error processing arguments: could not load schema from known schema sources: unknown error\n");
|
|
return {};
|
|
}
|
|
|
|
std::optional<schema_with_source> try_load_schema_autodetect(const bpo::variables_map& app_config, db::config& cfg) {
|
|
try {
|
|
const auto schema_file_path = std::filesystem::path(app_config["schema-file"].as<sstring>());
|
|
return schema_with_source{.schema = tools::load_one_schema_from_file(cfg, schema_file_path).get(),
|
|
.source = "schema-file",
|
|
.path = schema_file_path,
|
|
.obtained_from = "--schema-file parameters (default value)"};
|
|
} catch (...) {
|
|
sst_log.debug("Trying to read schema file from default location failed: {}", std::current_exception());
|
|
}
|
|
|
|
if (app_config.count("sstables")) {
|
|
try {
|
|
auto info = extract_from_sstable_path(app_config);
|
|
return schema_with_source{.schema = tools::load_schema_from_schema_tables(cfg, info.data_dir_path, info.keyspace, info.table).get(),
|
|
.source = "schema-tables",
|
|
.path = info.data_dir_path,
|
|
.obtained_from = format("sstable path ({})", info.sstable_path)};
|
|
} catch (...) {
|
|
sst_log.debug("Trying to find scylla data dir based on the sstable path failed: {}", std::current_exception());
|
|
}
|
|
} else {
|
|
sst_log.debug("Trying to find scylla data dir based on sstable path failed: no sstable argument provided");
|
|
}
|
|
|
|
try {
|
|
const auto [keyspace_name, table_name] = get_keyspace_and_table_options(app_config);
|
|
const auto data_dir_path = std::filesystem::path(cfg.data_file_directories().at(0));
|
|
return schema_with_source{.schema = tools::load_schema_from_schema_tables(cfg, data_dir_path, keyspace_name, table_name).get(),
|
|
.source = "schema-tables",
|
|
.path = data_dir_path,
|
|
.obtained_from = "data dir"};
|
|
} catch (...) {
|
|
sst_log.debug("Trying to locate data dir failed: {}", std::current_exception());
|
|
}
|
|
|
|
try {
|
|
sstring keyspace_name, table_name;
|
|
try {
|
|
std::tie(keyspace_name, table_name) = get_keyspace_and_table_options(app_config);
|
|
} catch (std::invalid_argument&) {
|
|
keyspace_name = "my_keyspace";
|
|
table_name = "my_table";
|
|
}
|
|
if (!app_config.count("sstables")) {
|
|
throw std::runtime_error("no sstables provided on the command-line");
|
|
}
|
|
const auto sst_path = app_config["sstables"].as<std::vector<sstring>>().front();
|
|
return schema_with_source{.schema = tools::load_schema_from_sstable(cfg, fs::path(sst_path),
|
|
keyspace_name, table_name).get(),
|
|
.source = "sstable's serialization header",
|
|
.obtained_from = sst_path};
|
|
} catch (...) {
|
|
sst_log.debug("Trying to load schema from the sstable itself failed: {}", std::current_exception());
|
|
}
|
|
|
|
fmt::print(std::cerr, "Failed to autodetect and load schema, try again with --logger-log-level scylla-sstable=debug to learn more or provide the schema source manually\n");
|
|
return {};
|
|
}
|
|
|
|
const std::vector<sstables::shared_sstable> load_sstables(schema_ptr schema, sstables::sstables_manager& sst_man, sstables::storage_manager& sstm, const std::vector<sstring>& sstable_names) {
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
sstables.resize(sstable_names.size());
|
|
|
|
parallel_for_each(sstable_names, [schema, &sst_man, &sstm, &sstable_names, &sstables] (const sstring& sst_name) -> future<> {
|
|
const auto i = std::distance(sstable_names.begin(), std::find(sstable_names.begin(), sstable_names.end(), sst_name));
|
|
auto sst_path = std::filesystem::path(sst_name);
|
|
|
|
if (const auto ftype_opt = co_await file_type(sst_path.c_str(), follow_symlink::yes)) {
|
|
if (!ftype_opt) {
|
|
throw std::invalid_argument(fmt::format("failed to determine type of file pointed to by provided sstable path {}", sst_path.c_str()));
|
|
}
|
|
if (*ftype_opt != directory_entry_type::regular) {
|
|
throw std::invalid_argument(fmt::format("file pointed to by provided sstable path {} is not a regular file", sst_path.c_str()));
|
|
}
|
|
}
|
|
|
|
|
|
data_dictionary::storage_options options;
|
|
auto ed = sstables::parse_path(sst_path, schema->ks_name(), schema->cf_name());
|
|
|
|
using osp = db::object_storage_endpoint_param;
|
|
static const auto os_types = { osp::s3_type, osp::gs_type };
|
|
auto is_fqn = os_types | std::views::filter(std::bind_front(&data_dictionary::is_object_storage_fqn, sst_path));
|
|
|
|
if (!is_fqn.empty()) {
|
|
auto type = is_fqn.front();
|
|
auto endpoints = sstm.endpoints(type);
|
|
if (endpoints.empty()) {
|
|
throw std::invalid_argument(fmt::format(
|
|
"Unable to open SSTable in {}: AWS object storage configuration missing. Please provide a --scylla-yaml-file with "
|
|
"valid AWS object storage configuration."
|
|
, type
|
|
));
|
|
}
|
|
auto endpoint = endpoints.front();
|
|
options = data_dictionary::make_object_storage_options(endpoint, sst_path);
|
|
} else {
|
|
sst_path = std::filesystem::canonical(std::filesystem::path(sst_name));
|
|
const auto dir_path = sst_path.parent_path();
|
|
options = data_dictionary::make_local_options(dir_path);
|
|
}
|
|
auto sst = sst_man.make_sstable(schema, options, ed.generation, sstables::sstable_state::normal, ed.version, ed.format);
|
|
|
|
try {
|
|
auto open_cfg = sstables::sstable_open_config{
|
|
.load_first_and_last_position_metadata = false,
|
|
.keep_sharding_metadata = true,
|
|
};
|
|
co_await sst->load(schema->get_sharder(), open_cfg);
|
|
} catch (...) {
|
|
// Print each individual error here since parallel_for_each
|
|
// will propagate only one of them up the stack.
|
|
auto msg = fmt::format("Could not load SSTable: {}", sst->get_filename());
|
|
fmt::print(std::cerr, "{}: {}\n", msg, std::current_exception());
|
|
throw_with_nested(std::runtime_error(msg));
|
|
}
|
|
|
|
sstables[i] = std::move(sst);
|
|
}).get();
|
|
|
|
return sstables;
|
|
}
|
|
|
|
class consumer_wrapper {
|
|
public:
|
|
using filter_type = std::function<bool(const dht::decorated_key&)>;
|
|
private:
|
|
mutation_fragment_v2::kind _last_kind = mutation_fragment_v2::kind::partition_end;
|
|
sstable_consumer& _consumer;
|
|
filter_type _filter;
|
|
public:
|
|
consumer_wrapper(sstable_consumer& consumer, filter_type filter)
|
|
: _consumer(consumer), _filter(std::move(filter)) {
|
|
}
|
|
mutation_fragment_v2::kind last_kind() const {
|
|
return _last_kind;
|
|
}
|
|
future<stop_iteration> operator()(mutation_fragment_v2&& mf) {
|
|
sst_log.trace("consume {}", mf.mutation_fragment_kind());
|
|
if (mf.is_partition_start() && _filter && !_filter(mf.as_partition_start().key())) {
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
}
|
|
_last_kind = mf.mutation_fragment_kind();
|
|
return std::move(mf).consume(_consumer);
|
|
}
|
|
};
|
|
|
|
enum class output_format {
|
|
text, json
|
|
};
|
|
|
|
output_format get_output_format_from_options(const bpo::variables_map& opts, output_format default_format) {
|
|
if (auto it = opts.find("output-format"); it != opts.end()) {
|
|
const auto& value = it->second.as<std::string>();
|
|
if (value == "text") {
|
|
return output_format::text;
|
|
} else if (value == "json") {
|
|
return output_format::json;
|
|
} else {
|
|
throw std::invalid_argument(fmt::format("invalid value for dump option output-format: {}", value));
|
|
}
|
|
}
|
|
return default_format;
|
|
}
|
|
|
|
enum class input_format {
|
|
cql, json
|
|
};
|
|
|
|
input_format get_input_format_from_options(const bpo::variables_map& opts, input_format default_format) {
|
|
if (auto it = opts.find("input-format"); it != opts.end()) {
|
|
const auto& value = it->second.as<std::string>();
|
|
if (value == "cql") {
|
|
return input_format::cql;
|
|
} else if (value == "json") {
|
|
return input_format::json;
|
|
} else {
|
|
throw std::invalid_argument(fmt::format("invalid value for option input-format: {}", value));
|
|
}
|
|
}
|
|
return default_format;
|
|
}
|
|
|
|
} // anonymous namespace
|
|
|
|
namespace tools {
|
|
|
|
void mutation_fragment_stream_json_writer::write(const clustering_row& cr) {
|
|
writer().StartObject();
|
|
writer().Key("type");
|
|
writer().String("clustering-row");
|
|
writer().Key("key");
|
|
writer().DataKey(_writer.schema(), cr.key());
|
|
if (cr.tomb()) {
|
|
writer().Key("tombstone");
|
|
_writer.write(cr.tomb().regular());
|
|
writer().Key("shadowable_tombstone");
|
|
_writer.write(cr.tomb().shadowable().tomb());
|
|
}
|
|
if (!cr.marker().is_missing()) {
|
|
writer().Key("marker");
|
|
_writer.write(cr.marker());
|
|
}
|
|
writer().Key("columns");
|
|
_writer.write(cr.cells(), column_kind::regular_column);
|
|
writer().EndObject();
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::write(const range_tombstone_change& rtc) {
|
|
writer().StartObject();
|
|
writer().Key("type");
|
|
writer().String("range-tombstone-change");
|
|
const auto pos = rtc.position();
|
|
if (pos.has_key()) {
|
|
writer().Key("key");
|
|
writer().DataKey(_writer.schema(), pos.key());
|
|
}
|
|
writer().Key("weight");
|
|
writer().Int(static_cast<int>(pos.get_bound_weight()));
|
|
writer().Key("tombstone");
|
|
_writer.write(rtc.tombstone());
|
|
writer().EndObject();
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::start_stream() {
|
|
writer().StartStream();
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::start_sstable(const sstables::sstable* const sst) {
|
|
if (sst) {
|
|
writer().Key(fmt::to_string(sst->get_filename()));
|
|
} else {
|
|
writer().Key("anonymous");
|
|
}
|
|
writer().StartArray();
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::start_partition(const partition_start& ps) {
|
|
const auto& dk = ps.key();
|
|
_clustering_array_created = false;
|
|
|
|
writer().StartObject();
|
|
|
|
writer().Key("key");
|
|
writer().DataKey(_writer.schema(), dk.key(), dk.token());
|
|
|
|
if (ps.partition_tombstone()) {
|
|
writer().Key("tombstone");
|
|
_writer.write(ps.partition_tombstone());
|
|
}
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::partition_element(const static_row& sr) {
|
|
writer().Key("static_row");
|
|
_writer.write(sr.cells(), column_kind::static_column);
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::partition_element(const clustering_row& cr) {
|
|
if (!_clustering_array_created) {
|
|
writer().Key("clustering_elements");
|
|
writer().StartArray();
|
|
_clustering_array_created = true;
|
|
}
|
|
write(cr);
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::partition_element(const range_tombstone_change& rtc) {
|
|
if (!_clustering_array_created) {
|
|
writer().Key("clustering_elements");
|
|
writer().StartArray();
|
|
_clustering_array_created = true;
|
|
}
|
|
write(rtc);
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::end_partition() {
|
|
if (_clustering_array_created) {
|
|
writer().EndArray();
|
|
}
|
|
writer().EndObject();
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::end_sstable() {
|
|
writer().EndArray();
|
|
}
|
|
|
|
void mutation_fragment_stream_json_writer::end_stream() {
|
|
writer().EndStream();
|
|
}
|
|
|
|
} // namespace tools
|
|
|
|
namespace {
|
|
|
|
class dumping_consumer : public sstable_consumer {
|
|
class text_dumper : public sstable_consumer {
|
|
const schema& _schema;
|
|
public:
|
|
explicit text_dumper(const schema& s) : _schema(s) { }
|
|
virtual future<> consume_stream_start() override {
|
|
fmt::print("{{stream_start}}\n");
|
|
return make_ready_future<>();
|
|
}
|
|
virtual future<stop_iteration> consume_sstable_start(const sstables::sstable* const sst) override {
|
|
fmt::print("{{sstable_start{}}}\n", sst ? fmt::format(": filename {}", sst->get_filename()) : "");
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(partition_start&& ps) override {
|
|
fmt::print("{}\n", ps);
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(static_row&& sr) override {
|
|
fmt::print("{}\n", static_row::printer(_schema, sr));
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(clustering_row&& cr) override {
|
|
fmt::print("{}\n", clustering_row::printer(_schema, cr));
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(range_tombstone_change&& rtc) override {
|
|
fmt::print("{}\n", rtc);
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(partition_end&& pe) override {
|
|
fmt::print("{{partition_end}}\n");
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume_sstable_end() override {
|
|
fmt::print("{{sstable_end}}\n");
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<> consume_stream_end() override {
|
|
fmt::print("{{stream_end}}\n");
|
|
return make_ready_future<>();
|
|
}
|
|
};
|
|
class json_dumper : public sstable_consumer {
|
|
tools::mutation_fragment_stream_json_writer _writer;
|
|
public:
|
|
explicit json_dumper(const schema& s) : _writer(s) {}
|
|
virtual future<> consume_stream_start() override {
|
|
_writer.start_stream();
|
|
return make_ready_future<>();
|
|
}
|
|
virtual future<stop_iteration> consume_sstable_start(const sstables::sstable* const sst) override {
|
|
_writer.start_sstable(sst);
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(partition_start&& ps) override {
|
|
_writer.start_partition(ps);
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(static_row&& sr) override {
|
|
_writer.partition_element(sr);
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(clustering_row&& cr) override {
|
|
_writer.partition_element(cr);
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(range_tombstone_change&& rtc) override {
|
|
_writer.partition_element(rtc);
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume(partition_end&& pe) override {
|
|
_writer.end_partition();
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<stop_iteration> consume_sstable_end() override {
|
|
_writer.end_sstable();
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
}
|
|
virtual future<> consume_stream_end() override {
|
|
_writer.end_stream();
|
|
return make_ready_future<>();
|
|
}
|
|
};
|
|
|
|
private:
|
|
schema_ptr _schema;
|
|
std::unique_ptr<sstable_consumer> _consumer;
|
|
|
|
public:
|
|
explicit dumping_consumer(schema_ptr s, reader_permit, const bpo::variables_map& opts) : _schema(std::move(s)) {
|
|
_consumer = std::make_unique<text_dumper>(*_schema);
|
|
switch (get_output_format_from_options(opts, output_format::text)) {
|
|
case output_format::text:
|
|
_consumer = std::make_unique<text_dumper>(*_schema);
|
|
break;
|
|
case output_format::json:
|
|
_consumer = std::make_unique<json_dumper>(*_schema);
|
|
break;
|
|
}
|
|
}
|
|
virtual future<> consume_stream_start() override { return _consumer->consume_stream_start(); }
|
|
virtual future<stop_iteration> consume_sstable_start(const sstables::sstable* const sst) override { return _consumer->consume_sstable_start(sst); }
|
|
virtual future<stop_iteration> consume(partition_start&& ps) override { return _consumer->consume(std::move(ps)); }
|
|
virtual future<stop_iteration> consume(static_row&& sr) override { return _consumer->consume(std::move(sr)); }
|
|
virtual future<stop_iteration> consume(clustering_row&& cr) override { return _consumer->consume(std::move(cr)); }
|
|
virtual future<stop_iteration> consume(range_tombstone_change&& rtc) override { return _consumer->consume(std::move(rtc)); }
|
|
virtual future<stop_iteration> consume(partition_end&& pe) override { return _consumer->consume(std::move(pe)); }
|
|
virtual future<stop_iteration> consume_sstable_end() override { return _consumer->consume_sstable_end(); }
|
|
virtual future<> consume_stream_end() override { return _consumer->consume_stream_end(); }
|
|
};
|
|
|
|
stop_iteration consume_reader(mutation_reader rd, sstable_consumer& consumer, sstables::sstable* sst, const partition_set& partitions, bool no_skips) {
|
|
auto close_rd = deferred_close(rd);
|
|
if (consumer.consume_sstable_start(sst).get() == stop_iteration::yes) {
|
|
return consumer.consume_sstable_end().get();
|
|
}
|
|
bool skip_partition = false;
|
|
consumer_wrapper::filter_type filter;
|
|
if (!partitions.empty()) {
|
|
filter = [&] (const dht::decorated_key& key) {
|
|
const auto pass = partitions.contains(key);
|
|
sst_log.trace("filter({})={}", key, pass);
|
|
skip_partition = !pass;
|
|
return pass;
|
|
};
|
|
}
|
|
auto wrapper = consumer_wrapper(consumer, filter);
|
|
while (!rd.is_end_of_stream()) {
|
|
skip_partition = false;
|
|
rd.consume_pausable(std::ref(wrapper)).get();
|
|
sst_log.trace("consumer paused, skip_partition={}", skip_partition);
|
|
if (!rd.is_end_of_stream() && !skip_partition) {
|
|
if (wrapper.last_kind() == mutation_fragment_v2::kind::partition_end) {
|
|
sst_log.trace("consumer returned stop_iteration::yes for partition end, stopping");
|
|
break;
|
|
}
|
|
if (wrapper(mutation_fragment_v2(*rd.schema(), rd.permit(), partition_end{})).get() == stop_iteration::yes) {
|
|
sst_log.trace("consumer returned stop_iteration::yes for synthetic partition end, stopping");
|
|
break;
|
|
}
|
|
skip_partition = true;
|
|
}
|
|
if (skip_partition) {
|
|
if (no_skips) {
|
|
mutation_fragment_v2_opt mfopt;
|
|
while ((mfopt = rd().get()) && !mfopt->is_end_of_partition());
|
|
} else {
|
|
rd.next_partition().get();
|
|
}
|
|
}
|
|
}
|
|
return consumer.consume_sstable_end().get();
|
|
}
|
|
|
|
void consume_sstables(schema_ptr schema, reader_permit permit, std::vector<sstables::shared_sstable> sstables, bool merge, bool use_full_scan_reader,
|
|
std::function<stop_iteration(mutation_reader&, sstables::sstable*)> reader_consumer) {
|
|
sst_log.trace("consume_sstables(): {} sstables, merge={}, use_full_scan_reader={}", sstables.size(), merge, use_full_scan_reader);
|
|
if (merge) {
|
|
std::vector<mutation_reader> readers;
|
|
readers.reserve(sstables.size());
|
|
for (const auto& sst : sstables) {
|
|
if (use_full_scan_reader) {
|
|
readers.emplace_back(sst->make_full_scan_reader(schema, permit));
|
|
} else {
|
|
readers.emplace_back(sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice()));
|
|
}
|
|
}
|
|
auto rd = make_combined_reader(schema, permit, std::move(readers));
|
|
|
|
reader_consumer(rd, nullptr);
|
|
} else {
|
|
for (const auto& sst : sstables) {
|
|
auto rd = use_full_scan_reader
|
|
? sst->make_full_scan_reader(schema, permit)
|
|
: sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice());
|
|
|
|
if (reader_consumer(rd, sst.get()) == stop_iteration::yes) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
class scylla_sstable_compaction_group_view : public compaction::compaction_group_view {
|
|
struct dummy_compaction_backlog_tracker : public compaction::compaction_backlog_tracker::impl {
|
|
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override { }
|
|
virtual double backlog(const compaction::compaction_backlog_tracker::ongoing_writes& ow, const compaction::compaction_backlog_tracker::ongoing_compactions& oc) const override { return 0.0; }
|
|
};
|
|
|
|
private:
|
|
schema_ptr _schema;
|
|
reader_permit _permit;
|
|
sstables::sstables_manager& _sst_man;
|
|
std::string _output_dir;
|
|
sstables::sstable_set _main_set;
|
|
sstables::sstable_set _maintenance_set;
|
|
std::vector<sstables::shared_sstable> _compacted_undeleted_sstables;
|
|
mutable compaction::compaction_strategy _compaction_strategy;
|
|
compaction::compaction_strategy_state _compaction_strategy_state;
|
|
tombstone_gc_state _tombstone_gc_state;
|
|
compaction::compaction_backlog_tracker _backlog_tracker;
|
|
std::string _group_id;
|
|
condition_variable _staging_done_condition;
|
|
mutable sstable_generation_generator _generation_generator;
|
|
|
|
private:
|
|
sstables::shared_sstable do_make_sstable() const {
|
|
const auto format = sstables::sstable_format_types::big;
|
|
const auto version = _sst_man.get_preferred_sstable_version();
|
|
auto generation = _generation_generator();
|
|
auto sst_name = sstables::sstable::filename(_output_dir, _schema->ks_name(), _schema->cf_name(), version, generation, format, component_type::Data);
|
|
if (file_exists(sst_name).get()) {
|
|
throw std::runtime_error(fmt::format("cannot create output sstable {}, file already exists", sst_name));
|
|
}
|
|
auto local = data_dictionary::make_local_options(_output_dir);
|
|
return _sst_man.make_sstable(_schema, local, generation, sstables::sstable_state::normal, version, format);
|
|
}
|
|
sstables::sstable_writer_config do_configure_writer(sstring origin) const {
|
|
return _sst_man.configure_writer(std::move(origin));
|
|
}
|
|
|
|
public:
|
|
scylla_sstable_compaction_group_view(schema_ptr schema, reader_permit permit, sstables::sstables_manager& sst_man, std::string output_dir)
|
|
: _schema(std::move(schema))
|
|
, _permit(std::move(permit))
|
|
, _sst_man(sst_man)
|
|
, _output_dir(std::move(output_dir))
|
|
, _main_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
|
|
, _maintenance_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
|
|
, _compaction_strategy(compaction::make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
|
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_compaction_strategy))
|
|
, _tombstone_gc_state(nullptr)
|
|
, _backlog_tracker(std::make_unique<dummy_compaction_backlog_tracker>())
|
|
, _group_id("dummy-group")
|
|
, _generation_generator()
|
|
{ }
|
|
virtual dht::token_range token_range() const noexcept override { return dht::token_range::make(dht::first_token(), dht::last_token()); }
|
|
virtual const schema_ptr& schema() const noexcept override { return _schema; }
|
|
virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); }
|
|
virtual bool compaction_enforce_min_threshold() const noexcept override { return false; }
|
|
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
|
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
|
|
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
|
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
|
|
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
|
|
virtual compaction::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }
|
|
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
|
|
virtual reader_permit make_compaction_reader_permit() const override { return _permit; }
|
|
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
|
|
virtual sstables::shared_sstable make_sstable() const override { return do_make_sstable(); }
|
|
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return do_configure_writer(std::move(origin)); }
|
|
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
|
|
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }
|
|
virtual api::timestamp_type min_memtable_live_row_marker_timestamp() const override { return api::min_timestamp; }
|
|
virtual bool memtable_has_key(const dht::decorated_key& key) const override { return false; }
|
|
virtual future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { return make_ready_future<>(); }
|
|
virtual bool is_auto_compaction_disabled_by_user() const noexcept override { return false; }
|
|
virtual bool tombstone_gc_enabled() const noexcept override { return false; }
|
|
virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept override { return _tombstone_gc_state; }
|
|
virtual compaction::compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; }
|
|
virtual const std::string get_group_id() const noexcept override { return _group_id; }
|
|
virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; }
|
|
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); }
|
|
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
|
|
};
|
|
|
|
void validate_output_dir(std::filesystem::path output_dir, bool accept_nonempty_output_dir) {
|
|
auto fd = open_file_dma(output_dir.native(), open_flags::ro).get();
|
|
unsigned entries = 0;
|
|
fd.list_directory([&entries] (directory_entry) {
|
|
++entries;
|
|
return make_ready_future<>();
|
|
}).done().get();
|
|
if (entries && !accept_nonempty_output_dir) {
|
|
throw std::invalid_argument("output-directory is not empty, pass --unsafe-accept-nonempty-output-dir if you are sure you want to write into this directory");
|
|
}
|
|
}
|
|
|
|
void validate_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
abort_source abort;
|
|
// Collect JSON output and print after validation is done, to prevent
|
|
// interleaving with error messages from validation.
|
|
std::stringstream json_output_stream;
|
|
json_writer writer(json_output_stream);
|
|
writer.StartStream();
|
|
for (const auto& sst : sstables) {
|
|
const auto errors = sst->validate(permit, abort, [] (sstring what) { sst_log.info("{}", what); }).get();
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartObject();
|
|
writer.Key("errors");
|
|
writer.Uint64(errors);
|
|
writer.Key("valid");
|
|
writer.Bool(errors == 0);
|
|
writer.EndObject();
|
|
}
|
|
writer.EndStream();
|
|
fmt::print(std::cout, "{}", json_output_stream.view());
|
|
}
|
|
|
|
void scrub_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) {
|
|
static const std::vector<std::pair<std::string, compaction::compaction_type_options::scrub::mode>> scrub_modes{
|
|
{"abort", compaction::compaction_type_options::scrub::mode::abort},
|
|
{"skip", compaction::compaction_type_options::scrub::mode::skip},
|
|
{"segregate", compaction::compaction_type_options::scrub::mode::segregate},
|
|
{"validate", compaction::compaction_type_options::scrub::mode::validate},
|
|
};
|
|
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
compaction::compaction_type_options::scrub::mode scrub_mode;
|
|
{
|
|
if (!vm.count("scrub-mode")) {
|
|
throw std::invalid_argument("missing mandatory command-line argument --scrub-mode");
|
|
}
|
|
const auto mode_name = vm["scrub-mode"].as<std::string>();
|
|
auto mode_it = std::ranges::find_if(scrub_modes, [&mode_name] (const std::pair<std::string, compaction::compaction_type_options::scrub::mode>& v) {
|
|
return v.first == mode_name;
|
|
});
|
|
if (mode_it == scrub_modes.end()) {
|
|
throw std::invalid_argument(fmt::format("invalid scrub-mode: {}", mode_name));
|
|
}
|
|
scrub_mode = mode_it->second;
|
|
}
|
|
auto output_dir = vm["output-dir"].as<std::string>();
|
|
if (scrub_mode != compaction::compaction_type_options::scrub::mode::validate) {
|
|
validate_output_dir(output_dir, vm.count("unsafe-accept-nonempty-output-dir"));
|
|
}
|
|
|
|
scylla_sstable_compaction_group_view compaction_group_view(schema, permit, sst_man, output_dir);
|
|
|
|
auto compaction_descriptor = compaction::compaction_descriptor(std::move(sstables));
|
|
compaction_descriptor.options = compaction::compaction_type_options::make_scrub(scrub_mode, compaction::compaction_type_options::scrub::quarantine_invalid_sstables::no);
|
|
compaction_descriptor.creator = [&compaction_group_view] (shard_id) { return compaction_group_view.make_sstable(); };
|
|
compaction_descriptor.replacer = [] (compaction::compaction_completion_desc) { };
|
|
|
|
auto compaction_data = compaction::compaction_data{};
|
|
|
|
compaction::compaction_progress_monitor progress_monitor;
|
|
compaction::compact_sstables(std::move(compaction_descriptor), compaction_data, compaction_group_view, progress_monitor).get();
|
|
}
|
|
|
|
void dump_index_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map&) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
json_writer writer;
|
|
writer.StartStream();
|
|
for (auto& sst : sstables) {
|
|
auto idx_reader = sst->make_index_reader(permit);
|
|
auto close_idx_reader = deferred_close(*idx_reader);
|
|
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartArray();
|
|
|
|
while (!idx_reader->eof()) {
|
|
idx_reader->read_partition_data().get();
|
|
auto pos = idx_reader->data_file_positions().start;
|
|
auto pkey = idx_reader->get_partition_key();
|
|
|
|
writer.StartObject();
|
|
if (pkey) {
|
|
writer.Key("key");
|
|
writer.DataKey(*schema, *pkey);
|
|
}
|
|
writer.Key("pos");
|
|
writer.Uint64(pos);
|
|
writer.EndObject();
|
|
|
|
idx_reader->advance_to_next_partition().get();
|
|
}
|
|
writer.EndArray();
|
|
}
|
|
writer.EndStream();
|
|
}
|
|
|
|
template <typename Integer>
|
|
sstring disk_string_to_string(const sstables::disk_string<Integer>& ds) {
|
|
return sstring(ds.value.begin(), ds.value.end());
|
|
}
|
|
|
|
void dump_compression_info_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map&) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
json_writer writer;
|
|
writer.StartStream();
|
|
|
|
for (auto& sst : sstables) {
|
|
const auto& compression = sst->get_compression();
|
|
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartObject();
|
|
writer.Key("name");
|
|
writer.String(disk_string_to_string(compression.name));
|
|
writer.Key("options");
|
|
writer.StartObject();
|
|
for (const auto& opt : compression.options.elements) {
|
|
writer.Key(disk_string_to_string(opt.key));
|
|
writer.String(disk_string_to_string(opt.value));
|
|
}
|
|
writer.EndObject();
|
|
writer.Key("chunk_len");
|
|
writer.Uint(compression.chunk_len);
|
|
writer.Key("data_len");
|
|
writer.Uint64(compression.data_len);
|
|
writer.Key("offsets");
|
|
writer.StartArray();
|
|
for (const auto& offset : compression.offsets) {
|
|
writer.Uint64(offset);
|
|
}
|
|
writer.EndArray();
|
|
writer.EndObject();
|
|
}
|
|
writer.EndStream();
|
|
}
|
|
|
|
void dump_summary_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map&) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
json_writer writer;
|
|
writer.StartStream();
|
|
|
|
for (auto& sst : sstables) {
|
|
auto& summary = sst->get_summary();
|
|
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartObject();
|
|
|
|
writer.Key("header");
|
|
writer.StartObject();
|
|
writer.Key("min_index_interval");
|
|
writer.Uint64(summary.header.min_index_interval);
|
|
writer.Key("size");
|
|
writer.Uint64(summary.header.size);
|
|
writer.Key("memory_size");
|
|
writer.Uint64(summary.header.memory_size);
|
|
writer.Key("sampling_level");
|
|
writer.Uint64(summary.header.sampling_level);
|
|
writer.Key("size_at_full_sampling");
|
|
writer.Uint64(summary.header.size_at_full_sampling);
|
|
writer.EndObject();
|
|
|
|
writer.Key("positions");
|
|
writer.StartArray();
|
|
for (const auto& pos : summary.positions) {
|
|
writer.Uint64(pos);
|
|
}
|
|
writer.EndArray();
|
|
|
|
writer.Key("entries");
|
|
writer.StartArray();
|
|
for (const auto& e : summary.entries) {
|
|
writer.StartObject();
|
|
|
|
auto pkey = e.get_key().to_partition_key(*schema);
|
|
writer.Key("key");
|
|
writer.DataKey(*schema, pkey, e.get_token());
|
|
writer.Key("position");
|
|
writer.Uint64(e.position);
|
|
|
|
writer.EndObject();
|
|
}
|
|
writer.EndArray();
|
|
|
|
auto first_key = dht::decorate_key(*schema, sstables::key_view(summary.first_key.value).to_partition_key(*schema));
|
|
writer.Key("first_key");
|
|
writer.DataKey(*schema, first_key);
|
|
|
|
auto last_key = dht::decorate_key(*schema, sstables::key_view(summary.last_key.value).to_partition_key(*schema));
|
|
writer.Key("last_key");
|
|
writer.DataKey(*schema, last_key);
|
|
|
|
writer.EndObject();
|
|
}
|
|
writer.EndStream();
|
|
}
|
|
|
|
class json_dumper {
|
|
json_writer& _writer;
|
|
sstables::sstable_version_types _version;
|
|
std::function<std::string_view(const void* const)> _name_resolver;
|
|
std::function<sstring(const void* const, bytes_view)> _disk_string_converter;
|
|
|
|
public:
|
|
static sstring default_disk_string_converter(const void* const, bytes_view value) {
|
|
return sstring(value.begin(), value.end());
|
|
}
|
|
|
|
private:
|
|
void visit(const void* const field, int8_t val) { _writer.Int(val); }
|
|
void visit(const void* const field, uint8_t val) { _writer.Uint(val); }
|
|
void visit(const void* const field, int val) { _writer.Int(val); }
|
|
void visit(const void* const field, unsigned val) { _writer.Uint(val); }
|
|
void visit(const void* const field, int64_t val) { _writer.Int64(val); }
|
|
void visit(const void* const field, uint64_t val) { _writer.Uint64(val); }
|
|
void visit(const void* const field, double val) {
|
|
if (std::isnan(val)) {
|
|
_writer.String("NaN");
|
|
} else {
|
|
_writer.Double(val);
|
|
}
|
|
}
|
|
|
|
template <typename Integer>
|
|
void visit(const void* const field, const sstables::disk_string<Integer>& val) {
|
|
_writer.String(_disk_string_converter(field, val.value));
|
|
}
|
|
|
|
template <typename Contents>
|
|
void visit(const void* const field, const std::optional<Contents>& val) {
|
|
if (bool(val)) {
|
|
visit(field, *val);
|
|
} else {
|
|
_writer.Null();
|
|
}
|
|
}
|
|
|
|
template <typename Integer, typename T>
|
|
void visit(const void* const field, const sstables::disk_array<Integer, T>& val) {
|
|
_writer.StartArray();
|
|
for (const auto& elem : val.elements) {
|
|
visit(field, elem);
|
|
}
|
|
_writer.EndArray();
|
|
}
|
|
|
|
void visit(const void* const field, const sstables::disk_string_vint_size& val) {
|
|
_writer.String(_disk_string_converter(field, val.value));
|
|
}
|
|
|
|
template <typename T>
|
|
void visit(const void* const field, const sstables::disk_array_vint_size<T>& val) {
|
|
_writer.StartArray();
|
|
for (const auto& elem : val.elements) {
|
|
visit(field, elem);
|
|
}
|
|
_writer.EndArray();
|
|
}
|
|
|
|
void visit(const void* const field, const utils::estimated_histogram& val) {
|
|
_writer.StartArray();
|
|
for (size_t i = 0; i < val.buckets.size(); i++) {
|
|
_writer.StartObject();
|
|
_writer.Key("offset");
|
|
_writer.Int64(val.bucket_offsets[i == 0 ? 0 : i - 1]);
|
|
_writer.Key("value");
|
|
_writer.Int64(val.buckets[i]);
|
|
_writer.EndObject();
|
|
}
|
|
_writer.EndArray();
|
|
}
|
|
|
|
void visit(const void* const field, const utils::streaming_histogram& val) {
|
|
_writer.StartObject();
|
|
for (const auto& [k, v] : val.bin) {
|
|
_writer.Key(format("{}", k));
|
|
_writer.Uint64(v);
|
|
}
|
|
_writer.EndObject();
|
|
}
|
|
|
|
void visit(const void* const field, const db::replay_position& val) {
|
|
_writer.StartObject();
|
|
_writer.Key("id");
|
|
_writer.Uint64(val.id);
|
|
_writer.Key("pos");
|
|
_writer.Uint(val.pos);
|
|
_writer.EndObject();
|
|
}
|
|
|
|
void visit(const void* const field, const sstables::commitlog_interval& val) {
|
|
_writer.StartObject();
|
|
_writer.Key("start");
|
|
visit(field, val.start);
|
|
_writer.Key("end");
|
|
visit(field, val.end);
|
|
_writer.EndObject();
|
|
}
|
|
|
|
void visit(const void* const field, const utils::UUID& uuid) {
|
|
_writer.String(fmt::to_string(uuid));
|
|
}
|
|
|
|
template <typename Tag>
|
|
void visit(const void* const field, const utils::tagged_uuid<Tag>& id) {
|
|
visit(field, id.uuid());
|
|
}
|
|
|
|
template <typename Integer>
|
|
void visit(const void* const field, const sstables::vint<Integer>& val) {
|
|
visit(field, val.value);
|
|
}
|
|
|
|
void visit(const void* const field, const sstables::serialization_header::column_desc& val) {
|
|
auto prev_name_resolver = std::exchange(_name_resolver, [&val] (const void* const field) {
|
|
if (field == &val.name) { return "name"; }
|
|
else if (field == &val.type_name) { return "type_name"; }
|
|
else { throw std::invalid_argument("invalid field offset"); }
|
|
});
|
|
|
|
_writer.StartObject();
|
|
const_cast<sstables::serialization_header::column_desc&>(val).describe_type(_version, std::ref(*this));
|
|
_writer.EndObject();
|
|
|
|
_name_resolver = std::move(prev_name_resolver);
|
|
}
|
|
|
|
json_dumper(json_writer& writer, sstables::sstable_version_types version, std::function<std::string_view(const void* const)> name_resolver,
|
|
std::function<sstring(const void* const, bytes_view string)> disk_string_converter)
|
|
: _writer(writer), _version(version), _name_resolver(std::move(name_resolver)), _disk_string_converter(std::move(disk_string_converter)) {
|
|
}
|
|
|
|
public:
|
|
template <typename Arg1>
|
|
void operator()(Arg1& arg1) {
|
|
_writer.Key(_name_resolver(&arg1));
|
|
visit(&arg1, arg1);
|
|
}
|
|
|
|
template <typename Arg1, typename... Arg>
|
|
void operator()(Arg1& arg1, Arg&... arg) {
|
|
_writer.Key(_name_resolver(&arg1));
|
|
visit(&arg1, arg1);
|
|
(*this)(arg...);
|
|
}
|
|
|
|
template <typename T>
|
|
static void dump(
|
|
json_writer& writer,
|
|
sstables::sstable_version_types version,
|
|
const T& obj,
|
|
std::string_view name,
|
|
std::function<std::string_view(const void* const)> name_resolver,
|
|
std::function<sstring(const void* const, bytes_view string)> disk_string_converter = &json_dumper::default_disk_string_converter) {
|
|
json_dumper dumper(writer, version, std::move(name_resolver), std::move(disk_string_converter));
|
|
writer.Key(name);
|
|
writer.StartObject();
|
|
const_cast<T&>(obj).describe_type(version, std::ref(dumper));
|
|
writer.EndObject();
|
|
}
|
|
};
|
|
|
|
void dump_validation_metadata(json_writer& writer, sstables::sstable_version_types version, const sstables::validation_metadata& metadata) {
|
|
json_dumper::dump(writer, version, metadata, "validation", [&metadata] (const void* const field) {
|
|
if (field == &metadata.partitioner) { return "partitioner"; }
|
|
else if (field == &metadata.filter_chance) { return "filter_chance"; }
|
|
else { throw std::invalid_argument("invalid field offset"); }
|
|
});
|
|
}
|
|
|
|
void dump_compaction_metadata(json_writer& writer, sstables::sstable_version_types version, const sstables::compaction_metadata& metadata) {
|
|
json_dumper::dump(writer, version, metadata, "compaction", [&metadata] (const void* const field) {
|
|
if (field == &metadata.ancestors) { return "ancestors"; }
|
|
else if (field == &metadata.cardinality) { return "cardinality"; }
|
|
else { throw std::invalid_argument("invalid field offset"); }
|
|
});
|
|
}
|
|
|
|
void dump_stats_metadata(json_writer& writer, sstables::sstable_version_types version, const sstables::stats_metadata& metadata) {
|
|
json_dumper::dump(writer, version, metadata, "stats", [&metadata] (const void* const field) {
|
|
if (field == &metadata.estimated_partition_size) { return "estimated_partition_size"; }
|
|
else if (field == &metadata.estimated_cells_count) { return "estimated_cells_count"; }
|
|
else if (field == &metadata.position) { return "position"; }
|
|
else if (field == &metadata.min_timestamp) { return "min_timestamp"; }
|
|
else if (field == &metadata.max_timestamp) { return "max_timestamp"; }
|
|
else if (field == &metadata.min_local_deletion_time) { return "min_local_deletion_time"; }
|
|
else if (field == &metadata.max_local_deletion_time) { return "max_local_deletion_time"; }
|
|
else if (field == &metadata.min_ttl) { return "min_ttl"; }
|
|
else if (field == &metadata.max_ttl) { return "max_ttl"; }
|
|
else if (field == &metadata.compression_ratio) { return "compression_ratio"; }
|
|
else if (field == &metadata.estimated_tombstone_drop_time) { return "estimated_tombstone_drop_time"; }
|
|
else if (field == &metadata.sstable_level) { return "sstable_level"; }
|
|
else if (field == &metadata.repaired_at) { return "repaired_at"; }
|
|
else if (field == &metadata.min_column_names) { return "min_column_names"; }
|
|
else if (field == &metadata.max_column_names) { return "max_column_names"; }
|
|
else if (field == &metadata.has_legacy_counter_shards) { return "has_legacy_counter_shards"; }
|
|
else if (field == &metadata.columns_count) { return "columns_count"; }
|
|
else if (field == &metadata.rows_count) { return "rows_count"; }
|
|
else if (field == &metadata.commitlog_lower_bound) { return "commitlog_lower_bound"; }
|
|
else if (field == &metadata.commitlog_intervals) { return "commitlog_intervals"; }
|
|
else if (field == &metadata.originating_host_id) { return "originating_host_id"; }
|
|
else { throw std::invalid_argument("invalid field offset"); }
|
|
}, [&metadata] (const void* const field, bytes_view value) {
|
|
if (field == &metadata.min_column_names || field == &metadata.max_column_names) {
|
|
return to_hex(value);
|
|
}
|
|
return json_dumper::default_disk_string_converter(field, value);
|
|
});
|
|
}
|
|
|
|
void dump_serialization_header(json_writer& writer, sstables::sstable_version_types version, const sstables::serialization_header& metadata) {
|
|
json_dumper::dump(writer, version, metadata, "serialization_header", [&metadata] (const void* const field) {
|
|
if (field == &metadata.min_timestamp_base) { return "min_timestamp_base"; }
|
|
else if (field == &metadata.min_local_deletion_time_base) { return "min_local_deletion_time_base"; }
|
|
else if (field == &metadata.min_ttl_base) { return "min_ttl_base"; }
|
|
else if (field == &metadata.pk_type_name) { return "pk_type_name"; }
|
|
else if (field == &metadata.clustering_key_types_names) { return "clustering_key_types_names"; }
|
|
else if (field == &metadata.static_columns) { return "static_columns"; }
|
|
else if (field == &metadata.regular_columns) { return "regular_columns"; }
|
|
else { throw std::invalid_argument("invalid field offset"); }
|
|
});
|
|
}
|
|
|
|
void dump_statistics_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map&) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
auto to_string = [] (sstables::metadata_type t) {
|
|
switch (t) {
|
|
case sstables::metadata_type::Validation: return "validation";
|
|
case sstables::metadata_type::Compaction: return "compaction";
|
|
case sstables::metadata_type::Stats: return "stats";
|
|
case sstables::metadata_type::Serialization: return "serialization";
|
|
}
|
|
std::abort();
|
|
};
|
|
|
|
json_writer writer;
|
|
writer.StartStream();
|
|
for (auto& sst : sstables) {
|
|
auto& statistics = sst->get_statistics();
|
|
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartObject();
|
|
|
|
writer.Key("offsets");
|
|
writer.StartObject();
|
|
for (const auto& [k, v] : statistics.offsets.elements) {
|
|
writer.Key(to_string(k));
|
|
writer.Uint(v);
|
|
}
|
|
writer.EndObject();
|
|
|
|
const auto version = sst->get_version();
|
|
for (const auto& [type, _] : statistics.offsets.elements) {
|
|
const auto& metadata_ptr = statistics.contents.at(type);
|
|
switch (type) {
|
|
case sstables::metadata_type::Validation:
|
|
dump_validation_metadata(writer, version, *dynamic_cast<const sstables::validation_metadata*>(metadata_ptr.get()));
|
|
break;
|
|
case sstables::metadata_type::Compaction:
|
|
dump_compaction_metadata(writer, version, *dynamic_cast<const sstables::compaction_metadata*>(metadata_ptr.get()));
|
|
break;
|
|
case sstables::metadata_type::Stats:
|
|
dump_stats_metadata(writer, version, *dynamic_cast<const sstables::stats_metadata*>(metadata_ptr.get()));
|
|
break;
|
|
case sstables::metadata_type::Serialization:
|
|
dump_serialization_header(writer, version, *dynamic_cast<const sstables::serialization_header*>(metadata_ptr.get()));
|
|
break;
|
|
}
|
|
}
|
|
|
|
writer.EndObject();
|
|
}
|
|
writer.EndStream();
|
|
}
|
|
|
|
const char* to_string(sstables::scylla_metadata_type t) {
|
|
switch (t) {
|
|
case sstables::scylla_metadata_type::Sharding: return "sharding";
|
|
case sstables::scylla_metadata_type::Features: return "features";
|
|
case sstables::scylla_metadata_type::ExtensionAttributes: return "extension_attributes";
|
|
case sstables::scylla_metadata_type::RunIdentifier: return "run_identifier";
|
|
case sstables::scylla_metadata_type::LargeDataStats: return "large_data_stats";
|
|
case sstables::scylla_metadata_type::SSTableOrigin: return "sstable_origin";
|
|
case sstables::scylla_metadata_type::ScyllaVersion: return "scylla_version";
|
|
case sstables::scylla_metadata_type::ScyllaBuildId: return "scylla_build_id";
|
|
case sstables::scylla_metadata_type::ExtTimestampStats: return "ext_timestamp_stats";
|
|
case sstables::scylla_metadata_type::SSTableIdentifier: return "sstable_identifier";
|
|
case sstables::scylla_metadata_type::Schema: return "schema";
|
|
case sstables::scylla_metadata_type::ComponentsDigests: return "components_digests";
|
|
}
|
|
std::abort();
|
|
}
|
|
|
|
const char* to_string(sstables::large_data_type t) {
|
|
switch (t) {
|
|
case sstables::large_data_type::partition_size: return "partition_size";
|
|
case sstables::large_data_type::row_size: return "row_size";
|
|
case sstables::large_data_type::cell_size: return "cell_size";
|
|
case sstables::large_data_type::rows_in_partition: return "rows_in_partition";
|
|
case sstables::large_data_type::elements_in_collection: return "elements_in_collection";
|
|
}
|
|
std::abort();
|
|
}
|
|
|
|
const char* to_string(sstables::ext_timestamp_stats_type t) {
|
|
switch (t) {
|
|
case sstables::ext_timestamp_stats_type::min_live_timestamp: return "min_live_timestamp";
|
|
case sstables::ext_timestamp_stats_type::min_live_row_marker_timestamp: return "min_live_row_marker_timestamp";
|
|
}
|
|
std::abort();
|
|
}
|
|
|
|
class scylla_metadata_visitor {
|
|
json_writer& _writer;
|
|
|
|
dht::token as_token(const sstables::disk_string<uint16_t>& ds) const {
|
|
return dht::token(dht::token::kind::key, bytes_view(ds));
|
|
}
|
|
public:
|
|
scylla_metadata_visitor(json_writer& writer) : _writer(writer) { }
|
|
|
|
void operator()(const sstables::sharding_metadata& val) const {
|
|
_writer.StartArray();
|
|
for (const auto& e : val.token_ranges.elements) {
|
|
_writer.StartObject();
|
|
|
|
_writer.Key("left");
|
|
_writer.StartObject();
|
|
_writer.Key("exclusive");
|
|
_writer.Bool(e.left.exclusive);
|
|
_writer.Key("token");
|
|
_writer.AsString(as_token(e.left.token));
|
|
_writer.EndObject();
|
|
|
|
_writer.Key("right");
|
|
_writer.StartObject();
|
|
_writer.Key("exclusive");
|
|
_writer.Bool(e.right.exclusive);
|
|
_writer.Key("token");
|
|
_writer.AsString(as_token(e.right.token));
|
|
_writer.EndObject();
|
|
|
|
_writer.EndObject();
|
|
}
|
|
_writer.EndArray();
|
|
}
|
|
void operator()(const sstables::components_digests& val) const {
|
|
_writer.StartObject();
|
|
|
|
auto write_digest = [&](std::string_view key, const std::optional<uint32_t>& digest) {
|
|
if (digest) {
|
|
_writer.Key(key);
|
|
_writer.Uint(*digest);
|
|
}
|
|
};
|
|
|
|
write_digest("data_digest", val.data_digest);
|
|
write_digest("compression_digest", val.compression_digest);
|
|
write_digest("filter_digest",val.filter_digest);
|
|
write_digest("statistics_digest", val.statistics_digest);
|
|
write_digest("summary_digest", val.summary_digest);
|
|
write_digest("index_digest", val.index_digest);
|
|
write_digest("toc_digest", val.toc_digest);
|
|
write_digest("partitions_digest", val.partitions_digest);
|
|
write_digest("rows_digest", val.rows_digest);
|
|
|
|
_writer.EndObject();
|
|
}
|
|
void operator()(const sstables::sstable_enabled_features& val) const {
|
|
std::pair<sstables::sstable_feature, const char*> all_features[] = {
|
|
{sstables::sstable_feature::NonCompoundPIEntries, "NonCompoundPIEntries"},
|
|
{sstables::sstable_feature::NonCompoundRangeTombstones, "NonCompoundRangeTombstones"},
|
|
{sstables::sstable_feature::ShadowableTombstones, "ShadowableTombstones"},
|
|
{sstables::sstable_feature::CorrectStaticCompact, "CorrectStaticCompact"},
|
|
{sstables::sstable_feature::CorrectEmptyCounters, "CorrectEmptyCounters"},
|
|
{sstables::sstable_feature::CorrectUDTsInCollections, "CorrectUDTsInCollections"},
|
|
{sstables::sstable_feature::CorrectLastPiBlockWidth, "CorrectLastPiBlockWidth"},
|
|
};
|
|
_writer.StartObject();
|
|
_writer.Key("mask");
|
|
_writer.Uint64(val.enabled_features);
|
|
_writer.Key("features");
|
|
_writer.StartArray();
|
|
for (const auto& [mask, name] : all_features) {
|
|
if (mask & val.enabled_features) {
|
|
_writer.String(name);
|
|
}
|
|
}
|
|
_writer.EndArray();
|
|
_writer.EndObject();
|
|
}
|
|
void operator()(const sstables::scylla_metadata::extension_attributes& val) const {
|
|
_writer.StartObject();
|
|
for (const auto& [k, v] : val.map) {
|
|
_writer.Key(disk_string_to_string(k));
|
|
_writer.String(disk_string_to_string(v));
|
|
}
|
|
_writer.EndObject();
|
|
}
|
|
void operator()(const sstables::run_identifier& val) const {
|
|
_writer.AsString(val.id.uuid());
|
|
}
|
|
void operator()(const sstables::scylla_metadata::large_data_stats& val) const {
|
|
_writer.StartObject();
|
|
for (const auto& [k, v] : val.map) {
|
|
_writer.Key(to_string(k));
|
|
_writer.StartObject();
|
|
_writer.Key("max_value");
|
|
_writer.Uint64(v.max_value);
|
|
_writer.Key("threshold");
|
|
_writer.Uint64(v.threshold);
|
|
_writer.Key("above_threshold");
|
|
_writer.Uint(v.above_threshold);
|
|
_writer.EndObject();
|
|
}
|
|
_writer.EndObject();
|
|
}
|
|
void operator()(const sstables::scylla_metadata::ext_timestamp_stats& val) const {
|
|
_writer.StartObject();
|
|
for (const auto& [k, v] : val.map) {
|
|
_writer.Key(to_string(k));
|
|
_writer.Int64(v);
|
|
}
|
|
_writer.EndObject();
|
|
}
|
|
template <typename Size>
|
|
void operator()(const sstables::disk_string<Size>& val) const {
|
|
_writer.String(disk_string_to_string(val));
|
|
}
|
|
|
|
template <sstables::scylla_metadata_type E, typename T>
|
|
void operator()(const sstables::disk_tagged_union_member<sstables::scylla_metadata_type, E, T>& m) const {
|
|
_writer.Key(to_string(E));
|
|
(*this)(m.value);
|
|
}
|
|
|
|
template <typename Size, typename Members>
|
|
void operator()(const sstables::disk_array<Size, Members>& a) const {
|
|
_writer.StartArray();
|
|
for (const auto& element : a.elements) {
|
|
(*this)(element);
|
|
}
|
|
_writer.EndArray();
|
|
}
|
|
|
|
void operator()(const sstables::scylla_metadata::sstable_identifier& sid) const {
|
|
_writer.AsString(sid.value);
|
|
}
|
|
|
|
void operator()(const sstables::sstable_column_description& cd) const {
|
|
_writer.StartObject();
|
|
|
|
_writer.Key("kind");
|
|
_writer.Int64(static_cast<uint32_t>(cd.kind));
|
|
|
|
_writer.Key("name");
|
|
_writer.String(disk_string_to_string(cd.name));
|
|
|
|
_writer.Key("type");
|
|
_writer.String(disk_string_to_string(cd.type));
|
|
|
|
_writer.EndObject();
|
|
}
|
|
|
|
void operator()(const sstables::scylla_metadata::sstable_schema& s) const {
|
|
_writer.StartObject();
|
|
|
|
_writer.Key("id");
|
|
_writer.String(fmt::to_string(s.id));
|
|
|
|
_writer.Key("version");
|
|
_writer.String(fmt::to_string(s.version));
|
|
|
|
_writer.Key("keyspace_name");
|
|
_writer.String(disk_string_to_string(s.keyspace_name));
|
|
|
|
_writer.Key("table_name");
|
|
_writer.String(disk_string_to_string(s.table_name));
|
|
|
|
_writer.Key("columns");
|
|
(*this)(s.columns);
|
|
|
|
_writer.EndObject();
|
|
}
|
|
};
|
|
|
|
void dump_scylla_metadata_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map&) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
json_writer writer;
|
|
writer.StartStream();
|
|
for (auto& sst : sstables) {
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartObject();
|
|
auto m = sst->get_scylla_metadata();
|
|
if (!m) {
|
|
writer.EndObject();
|
|
continue;
|
|
}
|
|
for (const auto& [k, v] : m->data.data) {
|
|
std::visit(scylla_metadata_visitor(writer), v);
|
|
}
|
|
writer.EndObject();
|
|
}
|
|
writer.EndStream();
|
|
}
|
|
|
|
void validate_checksums_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map&) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
// Collect JSON output and print after validation is done, to prevent
|
|
// interleaving with error messages from validation.
|
|
std::stringstream json_output_stream;
|
|
json_writer writer(json_output_stream);
|
|
writer.StartStream();
|
|
for (auto& sst : sstables) {
|
|
const auto res = sstables::validate_checksums(sst, permit).get();
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartObject();
|
|
writer.Key("has_checksums");
|
|
switch (res.status) {
|
|
case validate_checksums_status::valid:
|
|
case validate_checksums_status::invalid:
|
|
writer.Bool(true);
|
|
break;
|
|
case validate_checksums_status::no_checksum:
|
|
writer.Bool(false);
|
|
}
|
|
writer.Key("has_digest");
|
|
writer.Bool(res.has_digest);
|
|
switch (res.status) {
|
|
case validate_checksums_status::valid:
|
|
writer.Key("valid");
|
|
writer.Bool(true);
|
|
break;
|
|
case validate_checksums_status::invalid:
|
|
writer.Key("valid");
|
|
writer.Bool(false);
|
|
break;
|
|
case validate_checksums_status::no_checksum:
|
|
}
|
|
writer.EndObject();
|
|
}
|
|
writer.EndStream();
|
|
fmt::print(std::cout, "{}", json_output_stream.view());
|
|
}
|
|
|
|
void decompress_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
for (const auto& sst : sstables) {
|
|
if (!sst->get_compression()) {
|
|
sst_log.info("Sstable {} is not compressed, nothing to do", sst->get_filename());
|
|
continue;
|
|
}
|
|
|
|
auto output_filename = fmt::format("{}.decompressed", sst->get_filename());
|
|
|
|
auto ofile = open_file_dma(output_filename, open_flags::wo | open_flags::create).get();
|
|
file_output_stream_options options;
|
|
options.buffer_size = 4096;
|
|
auto ostream = make_file_output_stream(std::move(ofile), options).get();
|
|
auto close_ostream = defer([&ostream] { ostream.close().get(); });
|
|
|
|
auto istream = sst->data_stream(0, sst->data_size(), permit, nullptr, nullptr).get();
|
|
auto close_istream = defer([&istream] { istream.close().get(); });
|
|
|
|
istream.consume([&] (temporary_buffer<char> buf) {
|
|
return ostream.write(buf.get(), buf.size()).then([] {
|
|
return consumption_result<char>(continue_consuming{});
|
|
});
|
|
}).get();
|
|
ostream.flush().get();
|
|
|
|
sst_log.info("Sstable {} decompressed into {}", sst->get_filename(), output_filename);
|
|
}
|
|
}
|
|
|
|
future<replica::table&> create_table_in_cql_env(cql_test_env& env, schema_ptr sstable_schema) {
|
|
auto& db = env.local_db();
|
|
|
|
const auto keyspace_name = "scylla_sstable";
|
|
co_await env.execute_cql(seastar::format("CREATE KEYSPACE {} WITH replication = {{'class': 'LocalStrategy'}}", keyspace_name));
|
|
auto& keyspace = db.find_keyspace(keyspace_name);
|
|
|
|
// Clone and modify the schema:
|
|
// * Change keyspace name to scylla_sstable
|
|
// * Generate a new ID
|
|
// * Drop all properties
|
|
//
|
|
// This will help avoid conflicts when querying sstables of system-tables
|
|
// and allows cql_test_env to work with a simple config (no EAR setup).
|
|
auto builder = schema_builder(keyspace_name, sstable_schema->cf_name());
|
|
for (const auto& col_kind : {column_kind::partition_key, column_kind::clustering_key, column_kind::static_column, column_kind::regular_column}) {
|
|
for (const auto& col : sstable_schema->columns(col_kind)) {
|
|
builder.with_column(col.name(), col.type, col_kind, col.view_virtual());
|
|
|
|
// Register any user types, so they are known by the time we create the table.
|
|
if (col.type->is_user_type()) {
|
|
keyspace.add_user_type(dynamic_pointer_cast<const user_type_impl>(col.type));
|
|
}
|
|
}
|
|
}
|
|
auto schema = builder.build();
|
|
|
|
const auto table_name = schema->cf_name();
|
|
|
|
schema_describe_helper describe_helper = replica::make_schema_describe_helper(schema, db.as_data_dictionary());
|
|
|
|
const auto original_schema_description = sstable_schema->describe(describe_helper, cql3::describe_option::STMTS_AND_INTERNALS);
|
|
const auto schema_description = schema->describe(describe_helper, cql3::describe_option::STMTS_AND_INTERNALS);
|
|
|
|
const sstring original_create_statement = original_schema_description.create_statement.value().linearize();
|
|
const sstring schema_create_statement = schema_description.create_statement.value().linearize();
|
|
|
|
sst_log.debug("\noriginal schema:\n{}\nreplacement schema:\n{}\n\nNote: original keyspace name of {} was replaced with {}, original id of {} was replaced with {} and all properties were dropped!\n",
|
|
original_create_statement,
|
|
schema_create_statement,
|
|
sstable_schema->ks_name(),
|
|
keyspace_name,
|
|
sstable_schema->id(),
|
|
schema->id());
|
|
|
|
co_await env.execute_cql(schema_create_statement);
|
|
|
|
co_return std::ref(db.find_column_family(keyspace_name, table_name));
|
|
}
|
|
|
|
shared_ptr<cql3::cql_statement>
|
|
validate_and_prepare_query(std::string_view query, std::string_view table_name, data_dictionary::database db, std::string_view query_type) {
|
|
std::vector<std::unique_ptr<cql3::statements::raw::parsed_statement>> raw_statements;
|
|
try {
|
|
raw_statements = cql3::query_processor::parse_statements(query, cql3::dialect{});
|
|
} catch (...) {
|
|
throw std::invalid_argument(seastar::format("failed to parse query: {}", std::current_exception()));
|
|
}
|
|
if (raw_statements.size() != 1) {
|
|
throw std::invalid_argument(seastar::format("expected exactly 1 query, got {}", raw_statements.size()));
|
|
}
|
|
|
|
const auto raw_statement = raw_statements.front().get();
|
|
|
|
if (auto cf_statement = dynamic_cast<cql3::statements::raw::cf_statement*>(raw_statement)) {
|
|
if (!cf_statement->has_keyspace()) {
|
|
throw std::invalid_argument("query must have keyspace and the keyspace has to be scylla_sstable");
|
|
}
|
|
if (cf_statement->keyspace() != "scylla_sstable") {
|
|
throw std::invalid_argument(seastar::format("query must be against scylla_sstable keyspace, got {} instead", std::string_view(cf_statement->keyspace())));
|
|
}
|
|
if (cf_statement->column_family() != table_name) {
|
|
throw std::invalid_argument(seastar::format("query must be against {} table, got {} instead", table_name, std::string_view(cf_statement->column_family())));
|
|
}
|
|
} else {
|
|
throw std::invalid_argument(fmt::format("query must be a {} query", query_type));
|
|
}
|
|
|
|
cql3::cql_stats cql_stats;
|
|
|
|
try {
|
|
auto prepared_statement = raw_statement->prepare(db, cql_stats);
|
|
return std::move(prepared_statement->statement);
|
|
} catch (...) {
|
|
throw std::invalid_argument(seastar::format("failed to prepare query: {}", std::current_exception()));
|
|
}
|
|
}
|
|
|
|
template <typename statement_type>
|
|
void validate_query(std::string_view query, std::string_view table_name, data_dictionary::database db, std::string_view query_type) {
|
|
auto statement = validate_and_prepare_query(query, table_name, db, query_type);
|
|
|
|
if (dynamic_cast<statement_type*>(statement.get()) == nullptr) {
|
|
throw std::invalid_argument(fmt::format("query must be {} query", query_type));
|
|
}
|
|
}
|
|
|
|
future<> consume_queries(input_stream<char>&& is, std::function<future<>(std::string_view)> consumer) {
|
|
temporary_buffer<char> buf;
|
|
|
|
auto trimmed = [&buf] (size_t query_size) {
|
|
if (query_size == 0) {
|
|
return std::string_view{};
|
|
}
|
|
auto query = std::string_view(buf.begin(), query_size);
|
|
while (!query.empty() && std::isspace(query.front())) {
|
|
query.remove_prefix(1);
|
|
}
|
|
return query;
|
|
};
|
|
|
|
while (true) {
|
|
// Attemp to extract a query from existing data.
|
|
if (const auto separator_pos = std::ranges::find(buf, ';'); separator_pos != buf.end()) {
|
|
const auto query_size = separator_pos - buf.begin();
|
|
if (auto query = trimmed(query_size); !query.empty()) {
|
|
co_await consumer(query);
|
|
}
|
|
buf.trim_front(query_size + 1); // include the separator too
|
|
// Attempt to read more data.
|
|
} else if (auto read_buf = co_await is.read(); read_buf) {
|
|
auto new_buf = temporary_buffer<char>(buf.size() + read_buf.size());
|
|
std::ranges::copy(buf, new_buf.get_write());
|
|
std::ranges::copy(read_buf, new_buf.get_write() + buf.size());
|
|
buf = std::move(new_buf);
|
|
// Reached EOF
|
|
} else {
|
|
if (auto query = trimmed(buf.size()); !query.empty()) {
|
|
co_await consumer(query);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
co_await is.close();
|
|
}
|
|
|
|
void write_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& manager, const db::config&, const bpo::variables_map& vm) {
|
|
static const std::vector<std::pair<std::string, mutation_fragment_stream_validation_level>> valid_validation_levels{
|
|
{"none", mutation_fragment_stream_validation_level::none},
|
|
{"partition_region", mutation_fragment_stream_validation_level::partition_region},
|
|
{"token", mutation_fragment_stream_validation_level::token},
|
|
{"partition_key", mutation_fragment_stream_validation_level::partition_key},
|
|
{"clustering_key", mutation_fragment_stream_validation_level::clustering_key},
|
|
};
|
|
if (!sstables.empty()) {
|
|
throw std::invalid_argument("write operation does not operate on input sstables");
|
|
}
|
|
if (!vm.count("input-file")) {
|
|
throw std::invalid_argument("missing required option '--input-file'");
|
|
}
|
|
mutation_fragment_stream_validation_level validation_level;
|
|
{
|
|
const auto vl_name = vm["validation-level"].as<std::string>();
|
|
auto vl_it = std::ranges::find_if(valid_validation_levels, [&vl_name] (const std::pair<std::string, mutation_fragment_stream_validation_level>& v) {
|
|
return v.first == vl_name;
|
|
});
|
|
if (vl_it == valid_validation_levels.end()) {
|
|
throw std::invalid_argument(fmt::format("invalid validation-level {}", vl_name));
|
|
}
|
|
validation_level = vl_it->second;
|
|
}
|
|
auto input_file = vm["input-file"].as<std::string>();
|
|
auto input_format = get_input_format_from_options(vm, input_format::cql);
|
|
auto memory_limit = vm["memory-limit"].as<size_t>();
|
|
auto output_dir = vm["output-dir"].as<std::string>();
|
|
auto format = sstables::sstable_format_types::big;
|
|
auto version = vm.contains("sstable-version")
|
|
? version_from_string(vm["sstable-version"].as<std::string>())
|
|
: manager.get_preferred_sstable_version();
|
|
|
|
auto consume_reader = [&] (mutation_reader reader, size_t partition_count_estimate) -> future<> {
|
|
auto generation = sstables::generation_type(utils::UUID_gen::get_time_UUID());
|
|
|
|
{
|
|
auto sst_name = sstables::sstable::filename(output_dir, schema->ks_name(), schema->cf_name(), version, generation, format, component_type::Data);
|
|
if (co_await file_exists(sst_name)) {
|
|
throw std::invalid_argument(fmt::format("cannot create output sstable {}, file already exists", sst_name));
|
|
}
|
|
}
|
|
|
|
auto writer_cfg = manager.configure_writer("scylla-sstable");
|
|
writer_cfg.validation_level = validation_level;
|
|
auto local = data_dictionary::make_local_options(output_dir);
|
|
auto sst = manager.make_sstable(schema, local, generation, sstables::sstable_state::normal, version, format);
|
|
|
|
co_await sst->write_components(std::move(reader), partition_count_estimate, schema, writer_cfg, encoding_stats{});
|
|
|
|
fmt::print(std::cout, "{}\n", generation);
|
|
};
|
|
|
|
auto ifile = open_file_dma(input_file, open_flags::ro).get();
|
|
auto istream = make_file_input_stream(std::move(ifile));
|
|
|
|
if (input_format == input_format::json) {
|
|
auto parser = tools::json_mutation_stream_parser{schema, permit, std::move(istream), sst_log};
|
|
auto reader = make_generating_reader(schema, permit, std::move(parser));
|
|
consume_reader(std::move(reader), 1).get();
|
|
} else {
|
|
lw_shared_ptr<replica::memtable> mt = make_lw_shared<replica::memtable>(schema);
|
|
auto flush_memtable = [&consume_reader, &schema, &permit] (replica::memtable& mt) -> future<> {
|
|
co_await consume_reader(mt.make_flush_reader(schema, permit), mt.partition_count());
|
|
co_await mt.clear_gently();
|
|
};
|
|
do_with_cql_env_noreentrant_in_thread([&] (cql_test_env& env) mutable -> future<> {
|
|
auto& db = env.local_db();
|
|
auto& table = co_await create_table_in_cql_env(env, schema);
|
|
auto table_schema = table.schema();
|
|
|
|
// Disable commitlog.
|
|
table.set_durable_writes(false);
|
|
|
|
// We don't want to register the sstables with the table object,
|
|
// to avoid any attempt to compact/split/merge/rewrite them.
|
|
// Also, they were created with a foreign stable-manager (not part of
|
|
// cql_test_env).
|
|
// Use the virtual reader facility to isolate the sstables from
|
|
// cql-test-env.
|
|
table.set_virtual_writer([&] (const frozen_mutation& fm) -> future<> {
|
|
mt->apply(fm, schema);
|
|
sst_log.trace("applied mutation of size {}", fm.representation().size());
|
|
if (mt->occupancy().total_space() >= memory_limit) {
|
|
sst_log.debug("cycling memtable with occupancy {}", mt->occupancy().total_space());
|
|
co_await flush_memtable(*mt);
|
|
mt = make_lw_shared<replica::memtable>(schema);
|
|
}
|
|
});
|
|
|
|
co_await consume_queries(std::move(istream), [&] (std::string_view query) -> future<> {
|
|
sst_log.debug("write_operation(): processing query {}", query);
|
|
|
|
validate_query<cql3::statements::modification_statement>(query, table_schema->cf_name(), db.as_data_dictionary(), "an insert, update or delete");
|
|
|
|
const auto result = co_await env.execute_cql(query);
|
|
result->throw_if_exception();
|
|
});
|
|
});
|
|
if (!mt->empty()) {
|
|
flush_memtable(*mt).get();
|
|
}
|
|
}
|
|
}
|
|
|
|
void script_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& manager, const db::config&, const bpo::variables_map& vm) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
const auto merge = vm.count("merge");
|
|
const auto partitions = partition_set(0, {}, decorated_key_equal(*schema));
|
|
if (!vm.count("script-file")) {
|
|
throw std::invalid_argument("missing required option '--script-file'");
|
|
}
|
|
const auto script_file = vm["script-file"].as<std::string>();
|
|
auto script_params = vm["script-arg"].as<program_options::string_map>();
|
|
auto consumer = make_lua_sstable_consumer(schema, permit, script_file, std::move(script_params)).get();
|
|
consumer->consume_stream_start().get();
|
|
consume_sstables(schema, permit, sstables, merge, false, [&, &consumer = *consumer] (mutation_reader& rd, sstables::sstable* sst) {
|
|
return consume_reader(std::move(rd), consumer, sst, partitions, false);
|
|
});
|
|
consumer->consume_stream_end().get();
|
|
}
|
|
|
|
template <typename SstableConsumer>
|
|
void sstable_consumer_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
const auto merge = vm.count("merge");
|
|
const auto no_skips = vm.count("no-skips");
|
|
const auto partitions = get_partitions(schema, vm);
|
|
const auto use_full_scan_reader = no_skips || partitions.empty();
|
|
auto consumer = std::make_unique<SstableConsumer>(schema, permit, vm);
|
|
consumer->consume_stream_start().get();
|
|
consume_sstables(schema, permit, sstables, merge, use_full_scan_reader, [&, &consumer = *consumer] (mutation_reader& rd, sstables::sstable* sst) {
|
|
return consume_reader(std::move(rd), consumer, sst, partitions, no_skips);
|
|
});
|
|
consumer->consume_stream_end().get();
|
|
}
|
|
|
|
void shard_of_with_vnodes(const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sstable_manager,
|
|
const bpo::variables_map& vm) {
|
|
if (!vm.count("shards")) {
|
|
throw std::invalid_argument("missing required option '--shards'");
|
|
}
|
|
unsigned shard_count = vm["shards"].as<unsigned>();
|
|
unsigned ignore_msb_bits = vm["ignore-msb-bits"].as<unsigned>();
|
|
|
|
json_writer writer;
|
|
writer.StartStream();
|
|
for (auto& sst : sstables) {
|
|
// sst was loaded with the smp::count as its shard_count but that's not
|
|
// necessarily identical to the "shards" specified in the command line.
|
|
// reload the sst with the specified shard_count and ignore_msb_bits
|
|
auto schema = schema_builder(sst->get_schema()).with_sharder(
|
|
shard_count, ignore_msb_bits).build();
|
|
auto new_sst = sstable_manager.make_sstable(
|
|
schema,
|
|
data_dictionary::make_local_options(fs::path(sst->get_storage().prefix())),
|
|
sst->generation(),
|
|
sstable_state::normal,
|
|
sst->get_version());
|
|
new_sst->load_owner_shards(schema->get_sharder()).get();
|
|
|
|
writer.Key(fmt::to_string(sst->get_filename()));
|
|
writer.StartArray();
|
|
for (unsigned shard_id : new_sst->get_shards_for_this_sstable()) {
|
|
writer.Uint(shard_id);
|
|
}
|
|
writer.EndArray();
|
|
}
|
|
writer.EndStream();
|
|
}
|
|
|
|
void shard_of_with_tablets(schema_ptr schema,
|
|
const std::vector<sstables::shared_sstable>& sstables,
|
|
std::filesystem::path data_dir_path,
|
|
const db::config& dbcfg,
|
|
reader_permit permit) {
|
|
auto tablets = tools::load_system_tablets(dbcfg, data_dir_path,
|
|
schema->ks_name(), schema->cf_name(),
|
|
permit).get();
|
|
json_writer writer;
|
|
writer.StartStream();
|
|
for (auto& sst : sstables) {
|
|
auto key = fmt::to_string(sst->get_filename());
|
|
writer.Key(key);
|
|
writer.StartArray();
|
|
|
|
// token ranges are distributed across tablets, so we just check for
|
|
// the token range of each sstable
|
|
auto first_token = sst->get_first_decorated_key().token();
|
|
auto last_token = sst->get_last_decorated_key().token();
|
|
// each tablet holds a range of (last_token(i-1), last_token(i)], where
|
|
// "last_token" is the value of the column with the same name in
|
|
// the "system.tablets" table, and "i" is the index of current tablet.
|
|
auto tablet = tablets.upper_bound(first_token);
|
|
if (auto last_tablet = tablets.lower_bound(last_token); tablet != last_tablet) {
|
|
auto first_id = std::distance(tablets.begin(), tablet);
|
|
auto last_id = std::distance(tablets.begin(), last_tablet);
|
|
fmt::print(std::cerr, "sstable {} spans across multiple tablets: [{},{}]\n", key, first_id, last_id);
|
|
}
|
|
if (tablet == tablets.end()) {
|
|
fmt::print(std::cerr, "unable to find replica set for sstable: {}\n", sst->get_filename());
|
|
continue;
|
|
}
|
|
auto& [token, replica_set] = *tablet;
|
|
for (auto& replica : replica_set) {
|
|
writer.StartObject();
|
|
writer.Key("host");
|
|
writer.String(fmt::to_string(replica.host));
|
|
writer.Key("shard");
|
|
writer.Uint(replica.shard);
|
|
writer.EndObject();
|
|
}
|
|
|
|
writer.EndArray();
|
|
}
|
|
writer.EndStream();
|
|
}
|
|
|
|
void shard_of_operation(schema_ptr schema, reader_permit permit,
|
|
const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sstable_manager,
|
|
const db::config& dbcfg,
|
|
const bpo::variables_map& vm) {
|
|
// uses "tablets" by default. as new scylla installations enable "tablet" by
|
|
// default
|
|
if (vm.count("tablets") && vm.count("vnodes")) {
|
|
throw std::invalid_argument("--tablets and --vnodes are mutually exclusive");
|
|
}
|
|
if (!vm.count("tablets") && !vm.count("vnodes")) {
|
|
throw std::invalid_argument("Please specify '--tablets' or '--vnodes'");
|
|
}
|
|
if (vm.count("vnodes")) {
|
|
shard_of_with_vnodes(sstables, sstable_manager, vm);
|
|
} else {
|
|
auto info = extract_from_sstable_path(vm);
|
|
shard_of_with_tablets(schema, sstables, info.data_dir_path, dbcfg, permit);
|
|
}
|
|
}
|
|
|
|
void print_query_results_text(const cql3::result& result) {
|
|
const auto& metadata = result.get_metadata();
|
|
const auto& column_metadata = metadata.get_names();
|
|
|
|
struct column_values {
|
|
size_t max_size{0};
|
|
sstring header_format;
|
|
sstring row_format;
|
|
std::vector<sstring> values;
|
|
|
|
void add(sstring value) {
|
|
max_size = std::max(max_size, value.size());
|
|
values.push_back(std::move(value));
|
|
}
|
|
};
|
|
|
|
std::vector<column_values> columns;
|
|
columns.resize(column_metadata.size());
|
|
|
|
for (size_t i = 0; i < column_metadata.size(); ++i) {
|
|
columns[i].add(column_metadata[i]->name->text());
|
|
}
|
|
|
|
for (const auto& row : result.result_set().rows()) {
|
|
for (size_t i = 0; i < row.size(); ++i) {
|
|
if (row[i]) {
|
|
columns[i].add(column_metadata[i]->type->to_string(linearized(managed_bytes_view(*row[i]))));
|
|
} else {
|
|
columns[i].add("");
|
|
}
|
|
}
|
|
}
|
|
|
|
std::vector<sstring> separators(columns.size(), sstring());
|
|
for (size_t i = 0; i < columns.size(); ++i) {
|
|
auto& col_values = columns[i];
|
|
col_values.header_format = seastar::format(" {{:<{}}} ", col_values.max_size);
|
|
col_values.row_format = seastar::format(" {{:>{}}} ", col_values.max_size);
|
|
for (size_t c = 0; c < col_values.max_size; ++c) {
|
|
separators[i] += "-";
|
|
}
|
|
}
|
|
|
|
for (size_t r = 0; r < result.result_set().rows().size() + 1; ++r) {
|
|
std::vector<sstring> row;
|
|
row.reserve(columns.size());
|
|
for (size_t i = 0; i < columns.size(); ++i) {
|
|
const auto& format = r == 0 ? columns[i].header_format : columns[i].row_format;
|
|
row.push_back(fmt::format(fmt::runtime(std::string_view(format)), columns[i].values[r]));
|
|
}
|
|
fmt::print("{}\n", fmt::join(row, "|"));
|
|
if (!r) {
|
|
fmt::print("-{}-\n", fmt::join(separators, "-+-"));
|
|
}
|
|
}
|
|
}
|
|
|
|
void print_query_results_json(const cql3::result& result) {
|
|
const auto& metadata = result.get_metadata();
|
|
const auto& column_metadata = metadata.get_names();
|
|
|
|
rjson::streaming_writer writer(std::cout);
|
|
|
|
writer.StartArray();
|
|
for (const auto& row : result.result_set().rows()) {
|
|
writer.StartObject();
|
|
for (size_t i = 0; i < row.size(); ++i) {
|
|
writer.Key(column_metadata[i]->name->text());
|
|
if (!row[i] || row[i]->empty()) {
|
|
writer.Null();
|
|
continue;
|
|
}
|
|
const auto value = to_json_string(*column_metadata[i]->type, *row[i]);
|
|
const auto type = to_json_type(*column_metadata[i]->type, *row[i]);
|
|
writer.RawValue(value, type);
|
|
}
|
|
writer.EndObject();
|
|
}
|
|
writer.EndArray();
|
|
}
|
|
|
|
class query_operation_result_visitor : public cql_transport::messages::result_message::visitor {
|
|
output_format _output_format;
|
|
private:
|
|
[[noreturn]] void throw_on_unexpected_message(const char* message_kind) {
|
|
throw std::runtime_error(std::format("unexpected result message, expected rows, got {}", message_kind));
|
|
}
|
|
public:
|
|
query_operation_result_visitor(output_format of) : _output_format(of) { }
|
|
virtual void visit(const cql_transport::messages::result_message::void_message&) override { throw_on_unexpected_message("void_message"); }
|
|
virtual void visit(const cql_transport::messages::result_message::set_keyspace&) override { throw_on_unexpected_message("set_keyspace"); }
|
|
virtual void visit(const cql_transport::messages::result_message::prepared::cql&) override { throw_on_unexpected_message("prepared::cql"); }
|
|
virtual void visit(const cql_transport::messages::result_message::schema_change&) override { throw_on_unexpected_message("schema_change"); }
|
|
virtual void visit(const cql_transport::messages::result_message::bounce_to_shard&) override { throw_on_unexpected_message("bounce_to_shard"); }
|
|
virtual void visit(const cql_transport::messages::result_message::exception&) override { throw_on_unexpected_message("exception"); }
|
|
|
|
virtual void visit(const cql_transport::messages::result_message::rows& rows) override {
|
|
const auto& result = rows.rs();
|
|
switch (_output_format) {
|
|
case output_format::text:
|
|
print_query_results_text(result);
|
|
break;
|
|
case output_format::json:
|
|
print_query_results_json(result);
|
|
break;
|
|
}
|
|
}
|
|
};
|
|
|
|
void query_operation(schema_ptr sstable_schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sstable_manager, const db::config&, const bpo::variables_map& vm) {
|
|
if (vm.contains("query") && vm.contains("query-file")) {
|
|
throw std::invalid_argument("cannot provide both -q|--query and --query-file");
|
|
}
|
|
|
|
if (smp::count > 1) {
|
|
// Assuming smp==1 allows simplifying the code below.
|
|
throw std::runtime_error("query operation cannot run with --smp > 1");
|
|
}
|
|
|
|
const auto format = get_output_format_from_options(vm, output_format::text);
|
|
|
|
do_with_cql_env_noreentrant_in_thread([&] (cql_test_env& env) mutable -> future<> {
|
|
auto& db = env.local_db();
|
|
|
|
auto& table = co_await create_table_in_cql_env(env, sstable_schema);
|
|
auto table_schema = table.schema();
|
|
|
|
// We don't want to register the sstables with the table object,
|
|
// to avoid any attempt to compact/split/merge/rewrite them.
|
|
// Also, they were created with a foreign stable-manager (not part of
|
|
// cql_test_env).
|
|
// Use the virtual reader facility to isolate the sstables from
|
|
// cql-test-env.
|
|
table.set_virtual_reader(mutation_source([&] (
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr tr,
|
|
streamed_mutation::forwarding fwd_sm,
|
|
mutation_reader::forwarding fwd_mr) -> mutation_reader {
|
|
return make_combined_reader(
|
|
schema,
|
|
permit,
|
|
sstables |
|
|
std::views::transform([&] (const sstables::shared_sstable& sst) {
|
|
return sst->make_reader(schema, permit, range, slice, tr, fwd_sm, fwd_mr);
|
|
}) |
|
|
std::ranges::to<std::vector<mutation_reader>>(),
|
|
fwd_sm,
|
|
fwd_mr);
|
|
}));
|
|
|
|
sstring query;
|
|
if (vm.contains("query")) {
|
|
query = sstring(vm["query"].as<std::string>());
|
|
} else if (vm.contains("query-file")) {
|
|
auto file = co_await open_file_dma(vm["query-file"].as<std::string>(), open_flags::ro);
|
|
auto fstream = make_file_input_stream(file);
|
|
query = co_await util::read_entire_stream_contiguous(fstream);
|
|
} else {
|
|
query = seastar::format("SELECT * FROM {}.{} ", table_schema->ks_name(), table_schema->cf_name());
|
|
}
|
|
|
|
validate_query<cql3::statements::select_statement>(query, table_schema->cf_name(), db.as_data_dictionary(), "a select");
|
|
|
|
sst_log.debug("query_operation(): running query {}", query);
|
|
|
|
const auto result = co_await env.execute_cql(query);
|
|
result->throw_if_exception();
|
|
|
|
query_operation_result_visitor visitor{format};
|
|
result->accept(visitor);
|
|
}, {});
|
|
}
|
|
|
|
void upgrade_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) {
|
|
if (sstables.empty()) {
|
|
throw std::invalid_argument("no sstables specified on the command line");
|
|
}
|
|
|
|
const auto all = vm.count("all");
|
|
|
|
const auto output_dir = vm["output-dir"].as<std::string>();
|
|
validate_output_dir(output_dir, vm.count("unsafe-accept-nonempty-output-dir"));
|
|
|
|
const auto local = data_dictionary::make_local_options(output_dir);
|
|
|
|
const auto new_format = sstables::sstable_format_types::big;
|
|
const auto new_version = vm.contains("sstable-version")
|
|
? sstables::version_from_string(vm["sstable-version"].as<std::string>())
|
|
: sst_man.get_preferred_sstable_version();
|
|
|
|
for (const auto& sst : sstables) {
|
|
if (sst->get_version() == new_version && !all) {
|
|
fmt::print(std::cout, "Nothing to do for sstable {}, skipping (use --all to force upgrade all sstables).\n", sst->get_filename());
|
|
continue;
|
|
}
|
|
|
|
const auto new_generation = sstables::generation_type(utils::UUID_gen::get_time_UUID());
|
|
|
|
auto writer_cfg = sst_man.configure_writer("scylla-sstable");
|
|
auto new_sst = sst_man.make_sstable(schema, local, new_generation, sstables::sstable_state::normal, new_version, new_format);
|
|
|
|
new_sst->write_components(
|
|
sst->make_full_scan_reader(schema, permit),
|
|
sst->get_estimated_key_count(),
|
|
schema,
|
|
writer_cfg,
|
|
sst->get_encoding_stats_for_compaction()).get();
|
|
|
|
fmt::print(std::cout, "Upgraded sstable {} to {}.\n", sst->get_filename(), new_sst->get_filename());
|
|
}
|
|
}
|
|
|
|
void dump_schema_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
|
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) {
|
|
auto schema_desc = schema->describe({.type = schema_describe_helper::type::table}, cql3::describe_option::STMTS);
|
|
fmt::print(std::cout, "{}\n", schema_desc.create_statement.value().linearize());
|
|
}
|
|
|
|
const std::vector<operation_option> global_options {
|
|
typed_option<sstring>("schema-file", "schema.cql", "use the file containing the schema description as the schema source"),
|
|
typed_option<sstring>("keyspace", "keyspace name"),
|
|
typed_option<sstring>("table", "table name"),
|
|
typed_option<>("system-schema", "the table designated by --keyspace and --table is a system table, use the hard-coded in-memory hard-coded schema as the schema source"),
|
|
typed_option<>("schema-tables", "use the schema-tables as the schema source (see --scylla-yaml-file and --scylla-data-dir)"
|
|
", the name of the table can be provided with --keyspace and --table or left to auto-detect if possible"),
|
|
typed_option<>("sstable-schema", "use the schema stored in the sstable itself as the schema source"),
|
|
typed_option<sstring>("scylla-yaml-file", "path to the scylla.yaml config file, to obtain the data directory path from,"
|
|
" this can be also provided directly with --scylla-data-dir"),
|
|
typed_option<sstring>("scylla-data-dir", "path to the scylla data dir (usually /var/lib/scylla/data), to read the schema tables from"),
|
|
};
|
|
|
|
const std::vector<operation_option> global_positional_options{
|
|
typed_option<std::vector<sstring>>("sstables", "sstable(s) to process for operations that have sstable inputs, can also be provided as positional arguments", -1),
|
|
};
|
|
|
|
const std::map<operation, operation_func> operations_with_func{
|
|
/* dump-data */
|
|
{{"dump-data",
|
|
"Dump content of sstable(s)",
|
|
fmt::format(R"(
|
|
Dump the content of the data component. This component contains the data-proper
|
|
of the sstable. This might produce a huge amount of output. In general the
|
|
human-readable output will be larger than the binary file.
|
|
|
|
It is possible to filter the data to print via the --partitions or
|
|
--partitions-file options. Both expect partition key values in the hexdump
|
|
format.
|
|
|
|
Supports both a text and JSON output. The text output uses the built-in scylla
|
|
printers, which are also used when logging mutation-related data structures.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#dump-data")),
|
|
{
|
|
typed_option<std::vector<sstring>>("partition", "partition(s) to filter for, partitions are expected to be in the hex format"),
|
|
typed_option<sstring>("partitions-file", "file containing partition(s) to filter for, partitions are expected to be in the hex format"),
|
|
typed_option<>("merge", "merge all sstables into a single mutation fragment stream (use a combining reader over all sstable readers)"),
|
|
typed_option<>("no-skips", "don't use skips to skip to next partition when the partition filter rejects one, this is slower but works with corrupt index"),
|
|
typed_option<std::string>("output-format", "json", "the output-format, one of (text, json)"),
|
|
}},
|
|
sstable_consumer_operation<dumping_consumer>},
|
|
/* dump-index */
|
|
{{"dump-index",
|
|
"Dump content of sstable index(es)",
|
|
fmt::format(R"(
|
|
Dump the content of the index component. Contains the partition-index of the data
|
|
component. This is effectively a list of all the partitions in the sstable, with
|
|
their starting position in the data component and optionally a promoted index,
|
|
which contains a sampled index of the clustering rows in the partition.
|
|
Positions (both that of partition and that of rows) is valid for uncompressed
|
|
data.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#dump-index"))},
|
|
dump_index_operation},
|
|
/* dump-compression-info */
|
|
{{"dump-compression-info",
|
|
"Dump content of sstable compression info(s)",
|
|
fmt::format(R"(
|
|
Dump the content of the compression-info component. Contains compression
|
|
parameters and maps positions into the uncompressed data to that into compressed
|
|
data. Note that compression happens over chunks with configurable size, so to
|
|
get data at a position in the middle of a compressed chunk, the entire chunk has
|
|
to be decompressed.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#dump-compression-info"))},
|
|
dump_compression_info_operation},
|
|
/* dump-summary */
|
|
{{"dump-summary",
|
|
"Dump content of sstable summary(es)",
|
|
fmt::format(R"(
|
|
Dump the content of the summary component. The summary is a sampled index of the
|
|
content of the index-component. An index of the index. Sampling rate is chosen
|
|
such that this file is small enough to be kept in memory even for very large
|
|
sstables.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#dump-summary"))},
|
|
dump_summary_operation},
|
|
/* dump-statistics */
|
|
{{"dump-statistics",
|
|
"Dump content of sstable statistics(s)",
|
|
fmt::format(R"(
|
|
Dump the content of the statistics component. Contains various metadata about the
|
|
data component. In the sstable 3 format, this component is critical for parsing
|
|
the data component.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#dump-statistics"))},
|
|
dump_statistics_operation},
|
|
/* dump-scylla-metadata */
|
|
{{"dump-scylla-metadata",
|
|
"Dump content of sstable scylla metadata(s)",
|
|
fmt::format(R"(
|
|
Dump the content of the scylla-metadata component. Contains scylla-specific
|
|
metadata about the data component. This component won't be present in sstables
|
|
produced by Apache Cassandra.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#dump-scylla-metadata"))},
|
|
dump_scylla_metadata_operation},
|
|
/* validate */
|
|
{{"validate",
|
|
"Validate the sstable(s), same as scrub in validate mode",
|
|
fmt::format(R"(
|
|
Validates the content of the sstable on the mutation-fragment level, see
|
|
https://docs.scylladb.com/operating-scylla/admin-tools/scylla-sstable#sstable-content
|
|
for more details.
|
|
Any parsing errors will also be detected, but after successful parsing the
|
|
validation will happen on the fragment level.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#validate"))},
|
|
validate_operation},
|
|
/* scrub */
|
|
{{"scrub",
|
|
"Scrub the sstable(s), in the specified mode",
|
|
fmt::format(R"(
|
|
Read and re-write the sstable, getting rid of or fixing broken parts, depending
|
|
on the selected mode.
|
|
Output sstables are written to the directory specified via `--output-directory`.
|
|
They will be written with the BIG format and the highest supported sstable
|
|
format, with generations chosen by scylla-sstable. Generations are chosen such
|
|
that they are unique between the sstables written by the current scrub.
|
|
The output directory is expected to be empty, if it isn't scylla-sstable will
|
|
abort the scrub. This can be overridden by the
|
|
`--unsafe-accept-nonempty-output-dir` command line flag, but note that scrub will
|
|
be aborted if an sstable cannot be written because its generation clashes with
|
|
pre-existing sstables in the directory.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#scrub")),
|
|
{
|
|
typed_option<std::string>("scrub-mode", "scrub mode to use, one of (abort, skip, segregate, validate)"),
|
|
typed_option<std::string>("output-dir", ".", "directory to place the scrubbed sstables to"),
|
|
typed_option<>("unsafe-accept-nonempty-output-dir", "allow the operation to write into a non-empty output directory, acknowledging the risk that this may result in sstable clash"),
|
|
}},
|
|
scrub_operation},
|
|
/* validate-checksums */
|
|
{{"validate-checksums",
|
|
"Validate the checksums of the sstable(s)",
|
|
fmt::format(R"(
|
|
Validate both the whole-file and the per-chunk checksums of the data component.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#validate-checksums"))},
|
|
validate_checksums_operation},
|
|
/* decompress */
|
|
{{"decompress",
|
|
"Decompress sstable(s)",
|
|
R"(
|
|
Decompress Data.db if compressed. Noop if not compressed. The decompressed data
|
|
is written to Data.db.decompressed. E.g. for an sstable:
|
|
|
|
md-12311-big-Data.db
|
|
|
|
the output will be:
|
|
|
|
md-12311-big-Data.db.decompressed
|
|
)"},
|
|
decompress_operation},
|
|
/* write */
|
|
{{"write",
|
|
"Write an sstable",
|
|
fmt::format(R"(
|
|
Write an sstable based on the input provided via --input-file and
|
|
--input-format.
|
|
|
|
The input formats are supported: CQL and JSON
|
|
|
|
CQL
|
|
---
|
|
|
|
Write output sstable(s) based on the CQL statements provided in the input.
|
|
The following statements are supported: INSERT, UPDATE and DELETE.
|
|
The statements are expected to be separated by semicolons.
|
|
The statements do not need to be ordered in any way.
|
|
|
|
Data is buffered in memory, in a memtable. This provides the required ordering
|
|
to the incoming statements. When the memtable reaches its maximum size, it is
|
|
flushed and a new output sstable is created. Consequently, multiple output
|
|
sstables can be created for large enough inputs.
|
|
The maximum size of the memtable can be controlled via the --memory-limit command
|
|
line argument, which defaults to 1MiB.
|
|
|
|
Reading the statements from the input file happens via a streaming parser, it is
|
|
safe to provide input files of any size.
|
|
|
|
JSON
|
|
----
|
|
|
|
Wite an sstable based on the JSON representation of the content. The JSON
|
|
representation has to have the same schema as that of a single sstable
|
|
from the output of the dump-data operation (corresponding to the $SSTABLE
|
|
symbol).
|
|
|
|
Note that "write" doesn't yet support all the features of the scylladb
|
|
storage engine. The following is not supported:
|
|
* Counters.
|
|
* Non-strictly atomic cells, this includes frozen multi-cell types like
|
|
collections, tuples and UDTs.
|
|
|
|
Parsing uses a streaming json parser, it is safe to pass in input-files
|
|
of any size.
|
|
|
|
Produces a single output sstable.
|
|
|
|
Output SSTables
|
|
---------------
|
|
|
|
The output sstable(s) will use the BIG format, the highest supported sstable
|
|
format (can be changed with --sstable-version) and random UUID generation
|
|
(printed to stdout). By default it is placed in the local directory, can be
|
|
changed with --output-dir. If the output sstable clashes with an existing
|
|
sstable, the write will fail.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#write")),
|
|
{
|
|
typed_option<std::string>("input-file", "the file containing the input"),
|
|
typed_option<std::string>("input-format", "text", "the input-format, one of (cql, json)"),
|
|
typed_option<size_t>("memory-limit", 1 << 20, "the maximum amount of memory in bytes to use for the memtable buffering the query results (default 1MiB)"),
|
|
typed_option<std::string>("output-dir", ".", "directory to place the output sstable(s) to"),
|
|
typed_option<std::string>("validation-level", "clustering_key", "degree of validation on the output, one of (partition_region, token, partition_key, clustering_key)"),
|
|
typed_option<std::string>("sstable-version", "SSTable format version, (e.g. \"me\", \"ms\")"),
|
|
}},
|
|
write_operation},
|
|
/* script */
|
|
{{"script",
|
|
"Run a script on content of an sstable",
|
|
fmt::format(R"(
|
|
Read the sstable(s) and pass the resulting fragment stream to the script
|
|
specified by `--script-file`. Currently only Lua scripts are supported.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#script")),
|
|
{
|
|
typed_option<>("merge", "merge all sstables into a single mutation fragment stream (use a combining reader over all sstable readers)"),
|
|
typed_option<std::string>("script-file", "script file to load and execute"),
|
|
typed_option<program_options::string_map>("script-arg", {}, "parameter(s) for the script"),
|
|
}},
|
|
script_operation},
|
|
/* shard-of */
|
|
{{"shard-of",
|
|
"Print out the shard which 'owns' the sstable",
|
|
"Print out the intersection(s) of the shard-ranges and the partition ranges",
|
|
{
|
|
typed_option<unsigned>("shards", "the number of shards the source scylla instance has"),
|
|
typed_option<unsigned>("ignore-msb-bits", 12u, "'murmur3_partitioner_ignore_msb_bits' set by scylla.yaml"),
|
|
typed_option<>("tablets", "assume that tokens are distributed with tablets"),
|
|
typed_option<>("vnodes", "assume that tokens are distributed with vnodes"),
|
|
}},
|
|
shard_of_operation},
|
|
/* query */
|
|
{{"query",
|
|
"Run a query on the content of the sstable(s)",
|
|
fmt::format(R"(
|
|
The query is run on the combined content of all input sstables.
|
|
|
|
By default, the following query is run: SELECT * FROM $table.
|
|
Custom queries can be provided either via the --query (on the command-line) or
|
|
via --query-file (in a file).
|
|
When writing queries by hand, there are some things to keep in in mind:
|
|
* The keyspace of the table is changed to scylla_sstable. This is to avoid any
|
|
collisions in case the sstables belong to a system keyspace.
|
|
* If the schema is read from the sstable itself, partition key columns will be
|
|
$pk0..$pkN and clustering key columns will be $ck0..$ckN. This is because the
|
|
in-sstable schema doesn't contain key column names.
|
|
If the table-name wasn't provided with --table, the table name will be
|
|
my_table.
|
|
|
|
Chose the output format with --output-format. Text is similar to CQLSH text
|
|
output, while json is similar to SELECT JSON output.
|
|
Default output format is text.
|
|
|
|
This operation needs a temporary directory to write files to -- as it sets up a
|
|
cql_test_env. This temporary directory will have a size of a couple of megabytes.
|
|
By default it will create this in /tmp, this can be changed with the `TEMPDIR`
|
|
environment variable. This temporary directory is removed on exit.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#query")),
|
|
{
|
|
typed_option<std::string>("query,q", "execute the query provided on the command-line"),
|
|
typed_option<std::string>("query-file", "execute the query from the file, the file is expected to contain a single query"),
|
|
typed_option<std::string>("output-format", "text", "the output-format, one of (text, json)"),
|
|
}},
|
|
query_operation},
|
|
/* upgrade */
|
|
{{"upgrade",
|
|
"Upgrade sstable(s) to the highest supported version and apply the latest schema",
|
|
fmt::format(R"(
|
|
This command is an offline version of nodetool upgradesstables.
|
|
Applies the latest sstable version and the latest schema to the sstables.
|
|
|
|
To apply the latest schema, it is advised to use the system schema tables as the
|
|
schema source.
|
|
It is possible to apply an altered schema to the sstable, by providing the altered
|
|
schema via the schema-file option. Do this with care as incompatible changes to
|
|
columns, can cause crashes or data-loss. Also, not all schema options can be
|
|
directly expressed in CQL. Editing schema options (the part after WITH) is safe.
|
|
|
|
The sstable version can be selected manually with the --sstable-version option,
|
|
by default the latest supported version is used. Valid options are sstable
|
|
versions which are supported for writing: mc, md, me, ms.
|
|
|
|
Mapping of input sstables to output sstables is printed to stdout.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#upgrade")),
|
|
{
|
|
typed_option<std::string>("output-dir", ".", "directory to place the output sstable(s) to"),
|
|
typed_option<std::string>("sstable-version", "sstable version to use, defaults to the same version as ScyllaDB would"),
|
|
typed_option<>("all", "upgrade all sstables, even if they are already at the requested version"),
|
|
typed_option<>("unsafe-accept-nonempty-output-dir", "allow the operation to write into a non-empty output directory, acknowledging the risk that this may result in sstable clash"),
|
|
}},
|
|
upgrade_operation},
|
|
/* dump-schema */
|
|
{{"dump-schema",
|
|
"Dump the schema of a table or sstable",
|
|
fmt::format(R"(
|
|
Dump the schema of the table or sstable in CQL describe table format.
|
|
|
|
The schema is obtained via the regular schema sources, that all operations use.
|
|
If the schema is obtained from the system schema tables, or it is a
|
|
system-schema, no sstable argument is required.
|
|
As usual, it is possible to dump the schema from just the sstable itself.
|
|
|
|
Important note: the dumped schema will always be a `CREATE TABLE` statement,
|
|
even if the table is in fact a materialized view or an index. This schema is
|
|
enough to understand and parse the sstable data, but it may not be enough
|
|
to recreate the table or write new sstables for it.
|
|
|
|
For more information, see: {}
|
|
)", doc_link("operating-scylla/admin-tools/scylla-sstable#dump-schema")),
|
|
{
|
|
}},
|
|
dump_schema_operation},
|
|
};
|
|
|
|
} // anonymous namespace
|
|
|
|
namespace tools {
|
|
|
|
int scylla_sstable_main(int argc, char** argv) {
|
|
constexpr auto description_template =
|
|
R"(scylla-{} - a multifunctional command-line tool to examine the content of sstables.
|
|
|
|
Usage: scylla sstable {{operation}} [--option1] [--option2] ... [{{sstable_path1}}] [{{sstable_path2}}] ...
|
|
|
|
Contains various tools (operations) to examine or produce sstables.
|
|
|
|
# Operations
|
|
|
|
The operation to execute is the mandatory, first positional argument.
|
|
Operations write their output to stdout, or file(s). Logs are written to
|
|
stderr, with a logger called {}.
|
|
|
|
The supported operations are:
|
|
{}
|
|
|
|
For more details on an operation, run: scylla sstable {{operation}} --help
|
|
|
|
# Sstables
|
|
|
|
Operations that read sstables, take the sstables to-be-examined
|
|
as positional command line arguments. Sstables will be processed by the
|
|
selected operation one-by-one. Any number of sstables can be passed but
|
|
mind the open file limits and the memory consumption. Always pass the
|
|
path to the data component of the sstables (*-Data.db) even if you want
|
|
to examine another component.
|
|
NOTE: currently you have to prefix dir local paths with `./`.
|
|
|
|
# Schema
|
|
|
|
To be able to interpret the sstables, their schema is required. There
|
|
are multiple ways to obtain the schema:
|
|
* system schema
|
|
* schema file
|
|
* schema tables
|
|
* schema from the sstable itself
|
|
|
|
## System schema
|
|
|
|
If the examined sstables belong to a system table, whose schema is
|
|
hardcoded in ScyllaDB (and thus known), it is enough to provide just
|
|
the name of said table via the --keyspace and --table command line
|
|
parameters. Alternatively, the keyspace and tablename can be deduced from
|
|
the path of the sstable, if the sstable is in its natural directory, in
|
|
ScyllaDB's data dir.
|
|
The table has to be from one of the following system keyspaces:
|
|
* system
|
|
* system_schema
|
|
* system_distributed
|
|
* system_distributed_everywhere
|
|
|
|
## Schema file
|
|
|
|
The schema to read the sstables is read from a schema.cql file. This
|
|
should contain the keyspace and table definitions, any UDTs used and
|
|
dropped columns in the form of relevant CQL statements. The keyspace
|
|
definition is allowed to be missing, in which case one will be
|
|
auto-generated. Dropped columns should be present in the form of insert
|
|
statements into the system_schema.dropped_columns table.
|
|
Example scylla.cql:
|
|
|
|
CREATE KEYSPACE ks WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}};
|
|
|
|
CREATE TYPE ks.type1 (f1 int, f2 text);
|
|
|
|
CREATE TABLE ks.cf (pk int PRIMARY KEY, v frozen<type1>);
|
|
|
|
INSERT
|
|
INTO system_schema.dropped_columns (keyspace_name, table_name, column_name, dropped_time, type)
|
|
VALUES ('ks', 'cf', 'v2', 1631011979170675, 'int');
|
|
|
|
In general you should be able to use the output of `DESCRIBE TABLE` or
|
|
the relevant parts of `DESCRIBE KEYSPACE` of `cqlsh` as well as the
|
|
`schema.cql` produced by snapshots.
|
|
|
|
## schema tables
|
|
|
|
The schema can be read from the schema tables located on disk.
|
|
The path to the data directory can be provided via multiple ways:
|
|
* autodetected from the sstable's path -- if the sstable(s) are located in their
|
|
native table directory.
|
|
* --scylla-data-dir
|
|
* --scylla-yaml-file -- the scylla.yaml contains the data directory path(s).
|
|
* The SCYLLA_HOME and/or SCYLLA_CONF environment variables. These allow locating
|
|
the scylla.yaml file.
|
|
|
|
This is the most complete method, which should always result in a complete schema.
|
|
|
|
## schema from the sstable itself
|
|
|
|
The sstable stores a basic schema in the statistics component, which can be used
|
|
for most operations on the sstable. This schema should be enough for all dump-
|
|
operations.
|
|
|
|
# Examples
|
|
|
|
Dump the content of the sstable:
|
|
$ scylla sstable dump-data /path/to/md-123456-big-Data.db
|
|
|
|
Dump the content of the two sstable(s) as a unified stream:
|
|
$ scylla sstable dump-data --merge /path/to/md-123456-big-Data.db /path/to/md-123457-big-Data.db
|
|
|
|
Validate the specified sstables:
|
|
$ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-Data.db
|
|
|
|
|
|
)";
|
|
|
|
const auto operations = operations_with_func | std::views::keys | std::ranges::to<std::vector>();
|
|
tool_app_template::config app_cfg{
|
|
.name = app_name,
|
|
.description = seastar::format(description_template, app_name, sst_log.name(), fmt::join(operations | std::views::transform([] (const auto& op) {
|
|
return seastar::format("* {}: {}", op.name(), op.summary());
|
|
}), "\n")),
|
|
.logger_name = sst_log.name(),
|
|
.lsa_segment_pool_backend_size_mb = 100,
|
|
.operations = std::move(operations),
|
|
.global_options = &global_options,
|
|
.global_positional_options = &global_positional_options,
|
|
.db_cfg_ext = db_config_and_extensions()
|
|
};
|
|
tool_app_template app(std::move(app_cfg));
|
|
|
|
return app.run_async(argc, argv, [&app] (const operation& operation, const bpo::variables_map& app_config) {
|
|
schema_ptr schema;
|
|
std::optional<schema_with_source> schema_with_source;
|
|
|
|
auto& dbcfg = *app.cfg().db_cfg_ext->db_cfg;
|
|
|
|
sstring scylla_yaml_path;
|
|
sstring scylla_yaml_path_source;
|
|
|
|
if (app_config.count("scylla-yaml-file")) {
|
|
scylla_yaml_path = app_config["scylla-yaml-file"].as<sstring>();
|
|
scylla_yaml_path_source = "user provided";
|
|
} else if (std::getenv("SCYLLA_CONF") || std::getenv("SCYLLA_HOME")) {
|
|
scylla_yaml_path = db::config::get_conf_sub("scylla.yaml").string();
|
|
scylla_yaml_path_source = "environment provided";
|
|
} else {
|
|
scylla_yaml_path = db::config::get_conf_sub("scylla.yaml").string();
|
|
scylla_yaml_path_source = "dev default";
|
|
|
|
// On production machines, the default of ./conf/scylla.yaml will not
|
|
// work, try /etc/scylla/scylla.yaml instead.
|
|
if (!file_exists(scylla_yaml_path).get()) {
|
|
scylla_yaml_path = "/etc/scylla/scylla.yaml";
|
|
scylla_yaml_path_source = "prod default";
|
|
}
|
|
}
|
|
|
|
if (file_exists(scylla_yaml_path).get()) {
|
|
dbcfg.read_from_file(scylla_yaml_path, [] (const sstring& opt, const sstring& msg, std::optional<::utils::config_file::value_status> status) {
|
|
sst_log.debug("error processing configuration item: {} : {}", msg, opt);
|
|
}).get();
|
|
dbcfg.setup_directories();
|
|
sst_log.debug("Successfully read scylla.yaml from {} location of {}", scylla_yaml_path_source, scylla_yaml_path);
|
|
} else {
|
|
dbcfg.experimental_features.set(db::experimental_features_t::all());
|
|
sst_log.debug("Failed to read scylla.yaml from {} location of {}, some functionality may be unavailable", scylla_yaml_path_source, scylla_yaml_path);
|
|
}
|
|
|
|
dbcfg.enable_cache(false);
|
|
dbcfg.volatile_system_keyspace_for_testing(true);
|
|
|
|
// Override whatever value the option has. Setting this to `true` is correct because
|
|
// schema loader doesn't attempt to create any keyspace and doesn't go through any
|
|
// validation code; there is no topology either. Thanks to that, we won't run into
|
|
// any problems due to enforcing RF-rack-valid keyspaces.
|
|
//
|
|
// On the other hand, we gain access to the code hidden behind the option
|
|
dbcfg.rf_rack_valid_keyspaces(true, ::utils::config_file::config_source::CommandLine);
|
|
|
|
{
|
|
unsigned schema_sources = 0;
|
|
schema_sources += !app_config["schema-file"].defaulted();
|
|
schema_sources += app_config.contains("system-schema");
|
|
schema_sources += app_config.contains("schema-tables");
|
|
schema_sources += app_config.contains("sstable-schema");
|
|
|
|
if (!schema_sources) {
|
|
sst_log.debug("No user-provided schema source, attempting to auto-detect it");
|
|
schema_with_source = try_load_schema_autodetect(app_config, dbcfg);
|
|
} else if (schema_sources == 1) {
|
|
sst_log.debug("Single schema source provided");
|
|
schema_with_source = try_load_schema_from_user_provided_source(app_config, dbcfg);
|
|
} else {
|
|
fmt::print(std::cerr, "Multiple schema sources provided, please provide exactly one of: --schema-file, --system-schema, --schema-tables or --sstable-schema (with the accompanying --keyspace and --table if necessary)\n");
|
|
}
|
|
}
|
|
if (schema_with_source) {
|
|
schema = std::move(schema_with_source->schema);
|
|
sst_log.debug("Succesfully loaded schema from {}{}, obtained from {}",
|
|
schema_with_source->source,
|
|
schema_with_source->path ? seastar::format(" ({})", schema_with_source->path->native()) : "",
|
|
schema_with_source->obtained_from);
|
|
sst_log.trace("Loaded schema: {}", schema);
|
|
} else {
|
|
return 1;
|
|
}
|
|
|
|
gms::feature_service feature_service({get_disabled_features_from_db_config(dbcfg)});
|
|
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
|
cache_tracker tracker;
|
|
sstables::directory_semaphore dir_sem(1);
|
|
abort_source abort;
|
|
|
|
sstables::sstables_manager::config sm_cfg {
|
|
.available_memory = 1_GiB,
|
|
.enable_sstable_key_validation = dbcfg.enable_sstable_key_validation(),
|
|
.data_file_directories = dbcfg.data_file_directories(),
|
|
.format = dbcfg.sstable_format,
|
|
};
|
|
sstables::storage_manager::config stm_cfg;
|
|
stm_cfg.object_storage_clients_memory = 100_MiB;
|
|
stm_cfg.skip_metrics_registration = true;
|
|
sharded<sstables::storage_manager> sstm;
|
|
sstm.start(std::ref(dbcfg), stm_cfg).get();
|
|
auto stop_sstm = defer([&sstm] { sstm.stop().get(); });
|
|
|
|
db::nop_large_data_handler large_data_handler;
|
|
db::nop_corrupt_data_handler corrupt_data_handler(db::corrupt_data_handler::register_metrics::no);
|
|
|
|
feature_service.ms_sstable.enable();
|
|
sstables::sstables_manager sst_man(
|
|
"scylla_sstable",
|
|
large_data_handler,
|
|
corrupt_data_handler,
|
|
sm_cfg,
|
|
feature_service,
|
|
tracker,
|
|
dir_sem,
|
|
[host_id = locator::host_id::create_random_id()] { return host_id; },
|
|
*scf,
|
|
abort,
|
|
dbcfg.extensions().sstable_file_io_extensions(),
|
|
current_scheduling_group(),
|
|
&sstm.local());
|
|
auto close_sst_man = deferred_close(sst_man);
|
|
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
if (app_config.count("sstables")) {
|
|
const auto sstable_names = app_config["sstables"].as<std::vector<sstring>>();
|
|
if (std::set(sstable_names.begin(), sstable_names.end()).size() != sstable_names.size()) {
|
|
fmt::print(std::cerr, "error processing arguments: duplicate sstable arguments found\n");
|
|
return 1;
|
|
}
|
|
try {
|
|
sstables = load_sstables(schema, sst_man, sstm.local(), sstable_names);
|
|
} catch (...) {
|
|
fmt::print(std::cerr, "error loading sstables: {}\n", std::current_exception());
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name, reader_concurrency_semaphore::register_metrics::no);
|
|
auto stop_semaphore = deferred_stop(rcs_sem);
|
|
|
|
const auto permit = rcs_sem.make_tracking_only_permit(schema, app_name, db::no_timeout, {});
|
|
|
|
try {
|
|
operations_with_func.at(operation)(schema, permit, sstables, sst_man, dbcfg, app_config);
|
|
} catch (std::invalid_argument& e) {
|
|
fmt::print(std::cerr, "error processing arguments: {}\n", e.what());
|
|
return 1;
|
|
} catch (...) {
|
|
fmt::print(std::cerr, "error running operation: {}\n", std::current_exception());
|
|
return 2;
|
|
}
|
|
|
|
return 0;
|
|
});
|
|
}
|
|
|
|
} // namespace tools
|