mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
tools/scylla-sstable: introduce filter command
Filter the content of sstable(s), including or excluding the specified partitions. Partitions can be provided on the command line via `--partition`, or in a file via `--partitions-file`. Produces one output sstable per input sstable -- if the filter selects at least one partition in the respective input sstable. Output sstables are placed in the path provided via `--oputput-dir`. Use `--merge` to filter all input sstables combined, producing one output sstable.
This commit is contained in:
@@ -878,6 +878,25 @@ But even an altered schema which changed only the table options can lead to data
|
||||
|
||||
The mapping of input SSTables to output SSTables is printed to ``stdout``.
|
||||
|
||||
filter
|
||||
^^^^^^
|
||||
|
||||
Filter the SSTable(s), including/excluding specified partitions.
|
||||
|
||||
Similar to ``scylla sstable dump-data --partition|--partition-file``, with some notable differences:
|
||||
|
||||
* Instead of dumping the content to stdout, the filtered content is written back to SSTable(s) on disk.
|
||||
* Also supports negative filters (keep all partitions except the those specified).
|
||||
|
||||
The partition list can be provided either via the ``--partition`` command line argument, or via a file path passed to the the ``--partitions-file`` argument. The file should contain one partition key per line.
|
||||
Partition keys should be provided in the hex format, as produced by `scylla types serialize </operating-scylla/admin-tools/scylla-types/>`_.
|
||||
|
||||
With ``--include``, only the specified partitions are kept from the input SSTable(s). With ``--exclude``, the specified partitions are discarded and won't be written to the output SSTable(s).
|
||||
It is possible that certain input SSTable(s) won't have any content left after the filtering. These input SSTable(s) will not have a matching output SSTable.
|
||||
|
||||
By default, each input sstable is filtered individually. Use ``--merge`` to filter the combined content of all input sstables, producing a single output SSTable.
|
||||
|
||||
Output sstables use the latest supported sstable format (can be changed with ``--sstable-version``).
|
||||
|
||||
Examples
|
||||
--------
|
||||
|
||||
@@ -2094,3 +2094,50 @@ def test_scylla_sstable_dump_schema(cql, test_keyspace, scylla_path, scylla_data
|
||||
assert new_res == expected_results[schema_table]
|
||||
finally:
|
||||
cql.execute(f"DROP TABLE {test_keyspace}.{table_name}")
|
||||
|
||||
|
||||
def test_scylla_sstable_filter(cql, test_keyspace, scylla_path, scylla_data_dir):
|
||||
with scylla_sstable(simple_no_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables):
|
||||
pks = [r.pk for r in cql.execute(f"SELECT pk FROM {test_keyspace}.{table}")]
|
||||
|
||||
print(f"Generated primary keys: {pks}")
|
||||
|
||||
assert len(pks) > 2
|
||||
|
||||
serialized_pks = [_serialize_value(scylla_path, pk) for pk in pks]
|
||||
|
||||
def filter(flag):
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
cmd = [scylla_path, "sstable", "filter", "--schema-file", schema_file, "--output-dir", tmp_dir , flag]
|
||||
for pk in serialized_pks[:2]:
|
||||
cmd += ["--partition", pk]
|
||||
cmd += sstables
|
||||
|
||||
out = subprocess.check_output(cmd, text=True)
|
||||
|
||||
print(f"Filter: {' '.join(cmd)}\n{out}")
|
||||
|
||||
out_lines = out.strip().split('\n')
|
||||
|
||||
filtered_sstables = []
|
||||
for line in out_lines:
|
||||
m = re.match(r"^Filtering.*\.\.\. output written to (.*)$", line)
|
||||
if m:
|
||||
filtered_sstables.append(m.group(1))
|
||||
|
||||
assert len(filtered_sstables) >= 1
|
||||
|
||||
query_cmd = [scylla_path, "sstable", "query", "--output-format", "json", "--schema-file", schema_file] + filtered_sstables
|
||||
query_res = json.loads(subprocess.check_output(query_cmd, text=True))
|
||||
|
||||
print(f"Query: {' '.join(query_cmd)}\n{query_res}")
|
||||
|
||||
return {row['pk'] for row in query_res}
|
||||
|
||||
if filter("--include") != set(pks[:2]):
|
||||
shutil.copy(schema_file, "/home/bdenes/out")
|
||||
for sst in sstables:
|
||||
shutil.copy(sst, "/home/bdenes/out/")
|
||||
|
||||
assert filter("--include") == set(pks[:2])
|
||||
assert filter("--exclude") == set(pks[2:])
|
||||
|
||||
@@ -31,7 +31,9 @@
|
||||
#include "gms/feature_service.hh"
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
#include "readers/combined.hh"
|
||||
#include "readers/filtering.hh"
|
||||
#include "readers/generating.hh"
|
||||
#include "readers/multi_range.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "schema/compression_initializer.hh"
|
||||
#include "sstables/index_reader.hh"
|
||||
@@ -2241,6 +2243,81 @@ void dump_schema_operation(schema_ptr schema, reader_permit permit, const std::v
|
||||
fmt::print(std::cout, "{}\n", schema_desc.create_statement.value().linearize());
|
||||
}
|
||||
|
||||
void filter_operation(schema_ptr schema, reader_permit permit, const std::vector<sstables::shared_sstable>& sstables,
|
||||
sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) {
|
||||
if (sstables.empty()) {
|
||||
throw std::invalid_argument("no sstables specified on the command line");
|
||||
}
|
||||
|
||||
if (vm.count("include") && vm.count("exclude")) {
|
||||
throw std::invalid_argument("cannot provide both --include and --exclude");
|
||||
}
|
||||
const auto include = !vm.count("exclude");
|
||||
|
||||
const auto output_dir = vm["output-dir"].as<std::string>();
|
||||
validate_output_dir(output_dir);
|
||||
|
||||
const auto local = data_dictionary::make_local_options(output_dir);
|
||||
|
||||
const auto new_format = sstables::sstable_format_types::big;
|
||||
const auto new_version = vm.contains("sstable-version")
|
||||
? sstables::version_from_string(vm["sstable-version"].as<std::string>())
|
||||
: sst_man.get_preferred_sstable_version();
|
||||
|
||||
struct named_mutation_source {
|
||||
sstring name;
|
||||
mutation_source source;
|
||||
};
|
||||
|
||||
auto sources = sstables
|
||||
| std::views::transform([] (const sstables::shared_sstable& sst) { return named_mutation_source{fmt::to_string(sst->get_filename()), sst->as_mutation_source()}; })
|
||||
| std::ranges::to<std::vector<named_mutation_source>>();
|
||||
|
||||
if (vm.count("merge")) {
|
||||
auto named_sources = std::exchange(sources, {});
|
||||
auto sources_to_merge = named_sources
|
||||
| std::views::transform([] (const named_mutation_source& ms) { return ms.source; })
|
||||
| std::ranges::to<std::vector<mutation_source>>();
|
||||
sources.push_back(named_mutation_source{"<combined>", make_combined_mutation_source(std::move(sources_to_merge))});
|
||||
}
|
||||
|
||||
const auto partitions = get_partitions(schema, vm);
|
||||
|
||||
auto make_reader = [&] (const mutation_source& source) {
|
||||
// FIXME: for the include case I want to use a much more efficient
|
||||
// multi-range reader here, but cannot due to
|
||||
// https://github.com/scylladb/scylladb/issues/28317
|
||||
return make_filtering_reader(source.make_mutation_reader(schema, permit), [&] (const dht::decorated_key& dk) { return (include == partitions.contains(dk)); });
|
||||
};
|
||||
|
||||
for (const auto& [name, source] : sources) {
|
||||
fmt::print(std::cout, "Filtering {}... ", name);
|
||||
|
||||
auto reader = make_reader(source);
|
||||
|
||||
// Peek the reader to see if it has any content after filtering.
|
||||
if (!reader.peek().get()) {
|
||||
fmt::print(std::cout, "no output\n");
|
||||
reader.close().get();
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto new_generation = sstables::generation_type(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
auto writer_cfg = sst_man.configure_writer("scylla-sstable");
|
||||
auto new_sst = sst_man.make_sstable(schema, local, new_generation, sstables::sstable_state::normal, new_version, new_format);
|
||||
|
||||
new_sst->write_components(
|
||||
std::move(reader),
|
||||
include ? partitions.size() : 1 /* cannot estimate overlap, so just use 1, the bloom filter will be regenerated at seal time */,
|
||||
schema,
|
||||
writer_cfg,
|
||||
encoding_stats{}).get();
|
||||
|
||||
fmt::print(std::cout, "output written to {}\n", new_sst->get_filename());
|
||||
}
|
||||
}
|
||||
|
||||
const std::vector<operation_option> global_options {
|
||||
typed_option<sstring>("schema-file", "schema.cql", "use the file containing the schema description as the schema source"),
|
||||
typed_option<sstring>("keyspace", "keyspace name"),
|
||||
@@ -2267,7 +2344,7 @@ Dump the content of the data component. This component contains the data-proper
|
||||
of the sstable. This might produce a huge amount of output. In general the
|
||||
human-readable output will be larger than the binary file.
|
||||
|
||||
It is possible to filter the data to print via the --partitions or
|
||||
It is possible to filter the data to print via the --partition or
|
||||
--partitions-file options. Both expect partition key values in the hexdump
|
||||
format.
|
||||
|
||||
@@ -2574,6 +2651,39 @@ For more information, see: {}
|
||||
{
|
||||
}},
|
||||
dump_schema_operation},
|
||||
/* filter */
|
||||
{{"filter",
|
||||
"Filter the sstable(s), including/excluding specified partitions",
|
||||
fmt::format(R"(
|
||||
The partition list can be provided either via the --partition command line
|
||||
argument, or via a file path passed to the the --partitions-file argument.
|
||||
The file should contain one partition key per line.
|
||||
Partition keys should be provided in the hex format.
|
||||
|
||||
With --include, only the specified partitions are kept from the input
|
||||
sstable(s). With --exclude, the specified partitions are discarded and
|
||||
won't be written to the output sstable(s).
|
||||
It is possible that certain input sstable(s) won't have any content left after
|
||||
the filtering. These input sstable(s) will not have a matching output sstable.
|
||||
|
||||
By default, each input sstable is filtered individually. Use --merge to filter
|
||||
the combined content of all input sstables, producing a single output sstable.
|
||||
|
||||
Output sstables use the latest supported sstable format (can be changed with
|
||||
--sstable-version).
|
||||
|
||||
For more information, see: {}
|
||||
)", doc_link("operating-scylla/admin-tools/scylla-sstable#filter")),
|
||||
{
|
||||
typed_option<std::string>("output-dir", ".", "directory to place the output sstable(s) to"),
|
||||
typed_option<std::string>("sstable-version", "sstable version to use, defaults to the same version as ScyllaDB would"),
|
||||
typed_option<>("include", "include only the specified partition(s) in the output, discard the rest"),
|
||||
typed_option<>("exclude", "exclude the specified partition(s) from the output, keep the rest"),
|
||||
typed_option<>("merge", "combine all sstable(s) into a single output sstable"),
|
||||
typed_option<std::vector<sstring>>("partition", "partition(s) to filter for, partitions are expected to be in the hex format"),
|
||||
typed_option<sstring>("partitions-file", "file containing partition(s) to filter for, partitions are expected to be in the hex format"),
|
||||
}},
|
||||
filter_operation},
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
Reference in New Issue
Block a user