diff --git a/cdc/log.cc b/cdc/log.cc index 65cec1d0d1..fbe2ade987 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -51,6 +51,7 @@ #include "types/listlike_partial_deserializing_iterator.hh" #include "tracing/trace_state.hh" #include "stats.hh" +#include "compaction_strategy.hh" namespace std { @@ -392,7 +393,26 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) { static schema_ptr create_log_schema(const schema& s, std::optional uuid) { schema_builder b(s.ks_name(), log_name(s.cf_name())); b.with_partitioner("com.scylladb.dht.CDCPartitioner"); + b.set_compaction_strategy(sstables::compaction_strategy_type::time_window); b.set_comment(sprint("CDC log for %s.%s", s.ks_name(), s.cf_name())); + auto ttl_seconds = s.cdc_options().ttl(); + if (ttl_seconds > 0) { + b.set_gc_grace_seconds(0); + auto ceil = [] (int dividend, int divisor) { + return dividend / divisor + (dividend % divisor == 0 ? 0 : 1); + }; + auto seconds_to_minutes = [] (int seconds_value) { + using namespace std::chrono; + return std::chrono::ceil(seconds(seconds_value)).count(); + }; + // What's the minimum window that won't create more than 24 sstables. + auto window_seconds = ceil(ttl_seconds, 24); + auto window_minutes = seconds_to_minutes(window_seconds); + b.set_compaction_strategy_options({ + {"compaction_window_unit", "MINUTES"}, + {"compaction_window_size", std::to_string(window_minutes)} + }); + } b.with_column(log_meta_column_name_bytes("stream_id"), bytes_type, column_kind::partition_key); b.with_column(log_meta_column_name_bytes("time"), timeuuid_type, column_kind::clustering_key); b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key); @@ -444,7 +464,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional if (uuid) { b.set_uuid(*uuid); } - + return b.build(); }