Files
scylladb/compaction/time_window_compaction_strategy.hh
Lakshmi Narayanan Sreethar 18c071c94b compaction: fix use after free when strategy is altered during compaction
The `compaction_strategy_state` class holds strategy specific state via
a `std::variant` containing different state types. When a compaction
strategy performs compaction, it retrieves a reference to its state from
the `compaction_strategy_state` object. If the table's compaction
strategy is ALTERed while a compaction is in progress, the
`compaction_strategy_state` object gets replaced, destroying the old
state. This leaves the ongoing compaction holding a dangling reference,
resulting in a use after free.

Fix this by using `seastar::shared_ptr` for the state variant
alternatives(`leveled_compaction_strategy_state_ptr` and
`time_window_compaction_strategy_state_ptr`). The compaction strategies
now hold a copy of the shared_ptr, ensuring the state remains valid for
the duration of the compaction even if the strategy is altered.

The `compaction_strategy_state` itself is still passed by reference and
only the variant alternatives use shared_ptrs. This allows ongoing
compactions to retain ownership of the state independently of the
wrapper's lifetime.

Fixes #25913

Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>
2025-10-17 22:57:05 +05:30

172 lines
8.4 KiB
C++

/*
* Copyright (C) 2017-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "compaction_strategy_impl.hh"
#include "size_tiered_compaction_strategy.hh"
#include "mutation/timestamp.hh"
#include "sstables/shared_sstable.hh"
namespace compaction {
extern logging::logger clogger;
using namespace std::chrono_literals;
class time_window_backlog_tracker;
class time_window_compaction_strategy_options {
public:
static constexpr std::chrono::seconds DEFAULT_COMPACTION_WINDOW_UNIT = 86400s;
static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return 600s; }
static constexpr auto TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
static constexpr auto COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
static constexpr auto COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
static constexpr auto EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
static const std::unordered_map<sstring, std::chrono::seconds> valid_window_units;
enum class timestamp_resolutions {
microsecond,
millisecond,
};
static const std::unordered_map<sstring, timestamp_resolutions> valid_timestamp_resolutions;
private:
std::chrono::seconds sstable_window_size = DEFAULT_COMPACTION_WINDOW_UNIT * DEFAULT_COMPACTION_WINDOW_SIZE;
db_clock::duration expired_sstable_check_frequency = DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
timestamp_resolutions timestamp_resolution = timestamp_resolutions::microsecond;
bool enable_optimized_twcs_queries{true};
public:
time_window_compaction_strategy_options(const time_window_compaction_strategy_options&);
time_window_compaction_strategy_options(time_window_compaction_strategy_options&&);
time_window_compaction_strategy_options(const std::map<sstring, sstring>& options);
static void validate(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
public:
std::chrono::seconds get_sstable_window_size() const { return sstable_window_size; }
friend class time_window_compaction_strategy;
friend class time_window_backlog_tracker;
};
struct time_window_compaction_strategy_state {
db_clock::time_point last_expired_check;
// As api::timestamp_type is an int64_t, a primitive type, it must be initialized here.
api::timestamp_type highest_window_seen = 0;
// Keep track of all recent active windows that still need to be compacted into a single SSTable
std::unordered_set<api::timestamp_type> recent_active_windows;
};
using time_window_compaction_strategy_state_ptr = seastar::shared_ptr<time_window_compaction_strategy_state>;
class time_window_compaction_strategy : public compaction_strategy_impl {
time_window_compaction_strategy_options _options;
size_tiered_compaction_strategy_options _stcs_options;
public:
// The maximum amount of buckets we segregate data into when writing into sstables.
// To prevent an explosion in the number of sstables we cap it.
// Better co-locate some windows into the same sstables than OOM.
static constexpr uint64_t max_data_segregation_window_count = 100;
static constexpr float reshape_target_space_overhead = 0.1f;
using bucket_t = std::vector<sstables::shared_sstable>;
enum class bucket_compaction_mode { none, size_tiered, major };
public:
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) const override;
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
private:
time_window_compaction_strategy_state_ptr get_state(compaction_group_view& table_s) const;
static api::timestamp_type
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
switch (resolution) {
case time_window_compaction_strategy_options::timestamp_resolutions::microsecond:
return api::timestamp_type(timestamp_from_sstable);
case time_window_compaction_strategy_options::timestamp_resolutions::millisecond:
return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::milliseconds(timestamp_from_sstable)).count();
};
on_internal_error(clogger, std::format("Timestamp resolution invalid for TWCS : {}", static_cast<int>(resolution)));
}
// Returns true if bucket is the last, most active one.
bool is_last_active_bucket(api::timestamp_type bucket_key, api::timestamp_type now) const {
return bucket_key >= now;
}
// Returns which compaction type should be performed on a given window bucket.
bucket_compaction_mode
compaction_mode(const time_window_compaction_strategy_state&, const bucket_t& bucket, api::timestamp_type bucket_key, api::timestamp_type now, size_t min_threshold) const;
std::vector<sstables::shared_sstable>
get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> non_expiring_sstables,
gc_clock::time_point compaction_time, time_window_compaction_strategy_state& state);
std::vector<sstables::shared_sstable> get_compaction_candidates(compaction_group_view& table_s, strategy_control& control,
std::vector<sstables::shared_sstable> candidate_sstables, time_window_compaction_strategy_state& state);
public:
// Find the lowest timestamp for window of given size
static api::timestamp_type
get_window_lower_bound(std::chrono::seconds sstable_window_size, api::timestamp_type timestamp);
// Group files with similar max timestamp into buckets.
// @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
// and the right is the highest timestamp seen
static std::pair<std::map<api::timestamp_type, std::vector<sstables::shared_sstable>>, api::timestamp_type>
get_buckets(std::vector<sstables::shared_sstable> files, const time_window_compaction_strategy_options& options);
std::vector<sstables::shared_sstable>
newest_bucket(compaction_group_view& table_s, strategy_control& control, std::map<api::timestamp_type, std::vector<sstables::shared_sstable>> buckets,
int min_threshold, int max_threshold, api::timestamp_type now, time_window_compaction_strategy_state& state);
static std::vector<sstables::shared_sstable>
trim_to_threshold(std::vector<sstables::shared_sstable> bucket, int max_threshold);
static int64_t
get_window_for(const time_window_compaction_strategy_options& options, api::timestamp_type ts) {
return get_window_lower_bound(options.sstable_window_size, to_timestamp_type(options.timestamp_resolution, ts));
}
static api::timestamp_type
get_window_size(const time_window_compaction_strategy_options& options) {
return api::timestamp_type(std::chrono::duration_cast<std::chrono::microseconds>(options.get_sstable_window_size()).count());
}
private:
friend class time_window_backlog_tracker;
public:
virtual future<int64_t> estimated_pending_compactions(compaction_group_view& table_s) const override;
virtual compaction_strategy_type type() const override {
return compaction_strategy_type::time_window;
}
virtual std::unique_ptr<sstables::sstable_set_impl> make_sstable_set(const compaction_group_view& ts) const override;
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const override;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const override;
virtual mutation_reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, mutation_reader_consumer end_consumer) const override;
virtual bool use_interposer_consumer() const override {
return true;
}
virtual compaction_descriptor get_reshaping_job(std::vector<sstables::shared_sstable> input, schema_ptr schema, reshape_config cfg) const override;
};
}