diff --git a/docs/operating-scylla/admin-tools/scylla-sstable.rst b/docs/operating-scylla/admin-tools/scylla-sstable.rst index 5c92d34033..51b2682961 100644 --- a/docs/operating-scylla/admin-tools/scylla-sstable.rst +++ b/docs/operating-scylla/admin-tools/scylla-sstable.rst @@ -878,6 +878,25 @@ But even an altered schema which changed only the table options can lead to data The mapping of input SSTables to output SSTables is printed to ``stdout``. +filter +^^^^^^ + +Filter the SSTable(s), including/excluding specified partitions. + +Similar to ``scylla sstable dump-data --partition|--partition-file``, with some notable differences: + +* Instead of dumping the content to stdout, the filtered content is written back to SSTable(s) on disk. +* Also supports negative filters (keep all partitions except the those specified). + +The partition list can be provided either via the ``--partition`` command line argument, or via a file path passed to the the ``--partitions-file`` argument. The file should contain one partition key per line. +Partition keys should be provided in the hex format, as produced by `scylla types serialize `_. + +With ``--include``, only the specified partitions are kept from the input SSTable(s). With ``--exclude``, the specified partitions are discarded and won't be written to the output SSTable(s). +It is possible that certain input SSTable(s) won't have any content left after the filtering. These input SSTable(s) will not have a matching output SSTable. + +By default, each input sstable is filtered individually. Use ``--merge`` to filter the combined content of all input sstables, producing a single output SSTable. + +Output sstables use the latest supported sstable format (can be changed with ``--sstable-version``). Examples -------- diff --git a/test/cqlpy/test_tools.py b/test/cqlpy/test_tools.py index 9c558f9c0b..2c51b90e2d 100644 --- a/test/cqlpy/test_tools.py +++ b/test/cqlpy/test_tools.py @@ -2094,3 +2094,50 @@ def test_scylla_sstable_dump_schema(cql, test_keyspace, scylla_path, scylla_data assert new_res == expected_results[schema_table] finally: cql.execute(f"DROP TABLE {test_keyspace}.{table_name}") + + +def test_scylla_sstable_filter(cql, test_keyspace, scylla_path, scylla_data_dir): + with scylla_sstable(simple_no_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): + pks = [r.pk for r in cql.execute(f"SELECT pk FROM {test_keyspace}.{table}")] + + print(f"Generated primary keys: {pks}") + + assert len(pks) > 2 + + serialized_pks = [_serialize_value(scylla_path, pk) for pk in pks] + + def filter(flag): + with tempfile.TemporaryDirectory() as tmp_dir: + cmd = [scylla_path, "sstable", "filter", "--schema-file", schema_file, "--output-dir", tmp_dir , flag] + for pk in serialized_pks[:2]: + cmd += ["--partition", pk] + cmd += sstables + + out = subprocess.check_output(cmd, text=True) + + print(f"Filter: {' '.join(cmd)}\n{out}") + + out_lines = out.strip().split('\n') + + filtered_sstables = [] + for line in out_lines: + m = re.match(r"^Filtering.*\.\.\. output written to (.*)$", line) + if m: + filtered_sstables.append(m.group(1)) + + assert len(filtered_sstables) >= 1 + + query_cmd = [scylla_path, "sstable", "query", "--output-format", "json", "--schema-file", schema_file] + filtered_sstables + query_res = json.loads(subprocess.check_output(query_cmd, text=True)) + + print(f"Query: {' '.join(query_cmd)}\n{query_res}") + + return {row['pk'] for row in query_res} + + if filter("--include") != set(pks[:2]): + shutil.copy(schema_file, "/home/bdenes/out") + for sst in sstables: + shutil.copy(sst, "/home/bdenes/out/") + + assert filter("--include") == set(pks[:2]) + assert filter("--exclude") == set(pks[2:]) diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index 214241ecb9..b25aa099d6 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -31,7 +31,9 @@ #include "gms/feature_service.hh" #include "reader_concurrency_semaphore.hh" #include "readers/combined.hh" +#include "readers/filtering.hh" #include "readers/generating.hh" +#include "readers/multi_range.hh" #include "schema/schema_builder.hh" #include "schema/compression_initializer.hh" #include "sstables/index_reader.hh" @@ -2241,6 +2243,81 @@ void dump_schema_operation(schema_ptr schema, reader_permit permit, const std::v fmt::print(std::cout, "{}\n", schema_desc.create_statement.value().linearize()); } +void filter_operation(schema_ptr schema, reader_permit permit, const std::vector& sstables, + sstables::sstables_manager& sst_man, const db::config&, const bpo::variables_map& vm) { + if (sstables.empty()) { + throw std::invalid_argument("no sstables specified on the command line"); + } + + if (vm.count("include") && vm.count("exclude")) { + throw std::invalid_argument("cannot provide both --include and --exclude"); + } + const auto include = !vm.count("exclude"); + + const auto output_dir = vm["output-dir"].as(); + validate_output_dir(output_dir); + + const auto local = data_dictionary::make_local_options(output_dir); + + const auto new_format = sstables::sstable_format_types::big; + const auto new_version = vm.contains("sstable-version") + ? sstables::version_from_string(vm["sstable-version"].as()) + : sst_man.get_preferred_sstable_version(); + + struct named_mutation_source { + sstring name; + mutation_source source; + }; + + auto sources = sstables + | std::views::transform([] (const sstables::shared_sstable& sst) { return named_mutation_source{fmt::to_string(sst->get_filename()), sst->as_mutation_source()}; }) + | std::ranges::to>(); + + if (vm.count("merge")) { + auto named_sources = std::exchange(sources, {}); + auto sources_to_merge = named_sources + | std::views::transform([] (const named_mutation_source& ms) { return ms.source; }) + | std::ranges::to>(); + sources.push_back(named_mutation_source{"", make_combined_mutation_source(std::move(sources_to_merge))}); + } + + const auto partitions = get_partitions(schema, vm); + + auto make_reader = [&] (const mutation_source& source) { + // FIXME: for the include case I want to use a much more efficient + // multi-range reader here, but cannot due to + // https://github.com/scylladb/scylladb/issues/28317 + return make_filtering_reader(source.make_mutation_reader(schema, permit), [&] (const dht::decorated_key& dk) { return (include == partitions.contains(dk)); }); + }; + + for (const auto& [name, source] : sources) { + fmt::print(std::cout, "Filtering {}... ", name); + + auto reader = make_reader(source); + + // Peek the reader to see if it has any content after filtering. + if (!reader.peek().get()) { + fmt::print(std::cout, "no output\n"); + reader.close().get(); + continue; + } + + const auto new_generation = sstables::generation_type(utils::UUID_gen::get_time_UUID()); + + auto writer_cfg = sst_man.configure_writer("scylla-sstable"); + auto new_sst = sst_man.make_sstable(schema, local, new_generation, sstables::sstable_state::normal, new_version, new_format); + + new_sst->write_components( + std::move(reader), + include ? partitions.size() : 1 /* cannot estimate overlap, so just use 1, the bloom filter will be regenerated at seal time */, + schema, + writer_cfg, + encoding_stats{}).get(); + + fmt::print(std::cout, "output written to {}\n", new_sst->get_filename()); + } +} + const std::vector global_options { typed_option("schema-file", "schema.cql", "use the file containing the schema description as the schema source"), typed_option("keyspace", "keyspace name"), @@ -2267,7 +2344,7 @@ Dump the content of the data component. This component contains the data-proper of the sstable. This might produce a huge amount of output. In general the human-readable output will be larger than the binary file. -It is possible to filter the data to print via the --partitions or +It is possible to filter the data to print via the --partition or --partitions-file options. Both expect partition key values in the hexdump format. @@ -2574,6 +2651,39 @@ For more information, see: {} { }}, dump_schema_operation}, +/* filter */ + {{"filter", + "Filter the sstable(s), including/excluding specified partitions", +fmt::format(R"( +The partition list can be provided either via the --partition command line +argument, or via a file path passed to the the --partitions-file argument. +The file should contain one partition key per line. +Partition keys should be provided in the hex format. + +With --include, only the specified partitions are kept from the input +sstable(s). With --exclude, the specified partitions are discarded and +won't be written to the output sstable(s). +It is possible that certain input sstable(s) won't have any content left after +the filtering. These input sstable(s) will not have a matching output sstable. + +By default, each input sstable is filtered individually. Use --merge to filter +the combined content of all input sstables, producing a single output sstable. + +Output sstables use the latest supported sstable format (can be changed with +--sstable-version). + +For more information, see: {} +)", doc_link("operating-scylla/admin-tools/scylla-sstable#filter")), + { + typed_option("output-dir", ".", "directory to place the output sstable(s) to"), + typed_option("sstable-version", "sstable version to use, defaults to the same version as ScyllaDB would"), + typed_option<>("include", "include only the specified partition(s) in the output, discard the rest"), + typed_option<>("exclude", "exclude the specified partition(s) from the output, keep the rest"), + typed_option<>("merge", "combine all sstable(s) into a single output sstable"), + typed_option>("partition", "partition(s) to filter for, partitions are expected to be in the hex format"), + typed_option("partitions-file", "file containing partition(s) to filter for, partitions are expected to be in the hex format"), + }}, + filter_operation}, }; } // anonymous namespace