Compare commits

...

6 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
ba6e3f87e4 Update iterator methods and utility methods (range, bounds, tombstone_for_row, row_count)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:38:26 +00:00
copilot-swe-agent[bot]
03005ab209 Update critical row access methods (clustered_row, insert_row, find_row, empty)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:36:36 +00:00
copilot-swe-agent[bot]
1ee1e9acd3 Update more mutation_partition methods for variant storage
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:34:07 +00:00
copilot-swe-agent[bot]
501bb812c1 Update mutation_partition constructors for variant storage (WIP)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:31:35 +00:00
copilot-swe-agent[bot]
11d66bb9fb Add variant infrastructure to mutation_partition (WIP - compilation will fail)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:29:08 +00:00
copilot-swe-agent[bot]
5829b4d102 Initial plan 2025-12-24 21:22:11 +00:00
2 changed files with 356 additions and 86 deletions

View File

@@ -45,7 +45,9 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
: _tombstone(x._tombstone)
, _static_row(s, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _rows(use_single_row_storage(s) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _row_tombstones(x._row_tombstones)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
@@ -54,10 +56,30 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
return current_allocator().construct<rows_entry>(s, *x);
};
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
if (use_single_row_storage(s)) {
// Copy single row if it exists
if (x.uses_single_row_storage()) {
const auto& x_row = x.get_single_row_storage();
if (x_row) {
get_single_row_storage() = deletable_row(s, *x_row);
}
} else if (!x.get_rows_storage().empty()) {
// Converting from multi-row to single-row - take the first row
// This shouldn't normally happen as schema doesn't change this way
on_internal_error(mplog, "mutation_partition: cannot convert multi-row partition to single-row");
}
} else {
// Multi-row storage
if (x.uses_single_row_storage()) {
// Converting from single-row to multi-row - this shouldn't normally happen
on_internal_error(mplog, "mutation_partition: cannot convert single-row partition to multi-row");
} else {
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
return current_allocator().construct<rows_entry>(s, *x);
};
get_rows_storage().clone_from(x.get_rows_storage(), cloner, current_deleter<rows_entry>());
}
}
}
mutation_partition::mutation_partition(const mutation_partition& x, const schema& schema,
@@ -65,7 +87,9 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
: _tombstone(x._tombstone)
, _static_row(schema, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _rows(use_single_row_storage(schema) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(schema.version())
@@ -74,19 +98,37 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
_rows.insert_before_hint(_rows.end(), std::move(ce), rows_entry::tri_compare(schema));
if (use_single_row_storage(schema)) {
// Single-row storage: just copy the row if it exists
if (x.uses_single_row_storage()) {
const auto& x_row = x.get_single_row_storage();
if (x_row) {
get_single_row_storage() = deletable_row(schema, *x_row);
}
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
_row_tombstones.apply(schema, rt.tombstone());
} else {
// Filtering from multi-row - shouldn't happen with consistent schema
on_internal_error(mplog, "mutation_partition: filtering from multi-row to single-row storage");
}
} else {
// Multi-row storage with filtering
if (x.uses_single_row_storage()) {
on_internal_error(mplog, "mutation_partition: filtering from single-row to multi-row storage");
} else {
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(ce), rows_entry::tri_compare(schema));
}
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
_row_tombstones.apply(schema, rt.tombstone());
}
}
} catch (...) {
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
throw;
}
}
} catch (...) {
_rows.clear_and_dispose(current_deleter<rows_entry>());
throw;
}
}
@@ -104,14 +146,20 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
{
auto deleter = current_deleter<rows_entry>();
auto it = _rows.begin();
for (auto&& range : ck_ranges.ranges()) {
_rows.erase_and_dispose(it, lower_bound(schema, range), deleter);
it = upper_bound(schema, range);
if (use_single_row_storage(schema)) {
// Single-row storage: no filtering needed, row either exists or doesn't
// The move constructor has already moved the row if it exists
} else {
// Multi-row storage: filter the rows
if (!uses_single_row_storage()) {
auto deleter = current_deleter<rows_entry>();
auto it = get_rows_storage().begin();
for (auto&& range : ck_ranges.ranges()) {
get_rows_storage().erase_and_dispose(it, lower_bound(schema, range), deleter);
it = upper_bound(schema, range);
}
get_rows_storage().erase_and_dispose(it, get_rows_storage().end(), deleter);
}
_rows.erase_and_dispose(it, _rows.end(), deleter);
}
{
for (auto&& range : ck_ranges.ranges()) {
@@ -127,7 +175,11 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
}
mutation_partition::~mutation_partition() {
_rows.clear_and_dispose(current_deleter<rows_entry>());
if (uses_single_row_storage()) {
// Single-row storage: optional destructor handles cleanup
} else {
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
}
}
mutation_partition&
@@ -141,10 +193,14 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
void mutation_partition::ensure_last_dummy(const schema& s) {
check_schema(s);
if (_rows.empty() || !_rows.rbegin()->is_last_dummy()) {
if (uses_single_row_storage()) {
// Single-row storage doesn't use dummy entries
return;
}
if (get_rows_storage().empty() || !get_rows_storage().rbegin()->is_last_dummy()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::yes));
_rows.insert_before(_rows.end(), std::move(e));
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
}
}
@@ -419,9 +475,18 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
check_schema(schema);
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
auto j = _rows.find(key, rows_entry::tri_compare(schema));
if (j != _rows.end()) {
t.apply(j->row().deleted_at(), j->row().marker());
if (use_single_row_storage(schema)) {
// Single-row storage: check if the single row exists and has tombstone
const auto& row_opt = get_single_row_storage();
if (row_opt) {
t.apply(row_opt->deleted_at(), row_opt->marker());
}
} else {
// Multi-row storage: search in B-tree
auto j = get_rows_storage().find(key, rows_entry::tri_compare(schema));
if (j != get_rows_storage().end()) {
t.apply(j->row().deleted_at(), j->row().marker());
}
}
return t;
@@ -504,97 +569,178 @@ void mutation_partition::apply_insert(const schema& s, clustering_key_view key,
clustered_row(s, key).apply(row_marker(created_at, ttl, expiry));
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key, std::move(row)));
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
if (use_single_row_storage(s)) {
// Single-row storage: just set the row
get_single_row_storage() = std::move(row);
} else {
// Multi-row storage: insert into B-tree
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key, std::move(row)));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
}
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
check_schema(s);
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, key, row));
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
if (use_single_row_storage(s)) {
// Single-row storage: just copy the row
get_single_row_storage() = row;
} else {
// Multi-row storage: insert into B-tree
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, key, row));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
}
}
const row*
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
check_schema(s);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
if (use_single_row_storage(s)) {
// Single-row storage: return the single row's cells if it exists
const auto& row_opt = get_single_row_storage();
if (row_opt) {
return &row_opt->cells();
}
return nullptr;
} else {
// Multi-row storage: search in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
return nullptr;
}
return &i->row().cells();
}
return &i->row().cells();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(std::move(key)));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(std::move(key)));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
return i->row();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
return i->row();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
return i->row();
}
rows_entry&
mutation_partition::clustered_rows_entry(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
check_row_key(s, pos, dummy);
auto i = _rows.find(pos, rows_entry::tri_compare(s));
if (i == _rows.end()) {
if (use_single_row_storage(s)) {
// Single-row storage doesn't use rows_entry - this shouldn't be called
on_internal_error(mplog, "mutation_partition::clustered_rows_entry() called with single-row storage");
}
auto i = get_rows_storage().find(pos, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return *i;
}
deletable_row&
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
return clustered_rows_entry(s, pos, dummy, continuous).row();
if (use_single_row_storage(s)) {
// Single-row storage: ignore dummy/continuous flags, just get/create the row
check_row_key(s, pos, dummy);
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
return clustered_rows_entry(s, pos, dummy, continuous).row();
}
}
deletable_row&
mutation_partition::append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
check_row_key(s, pos, dummy);
if (use_single_row_storage(s)) {
// Single-row storage: just create/get the row
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
}
const auto cmp = rows_entry::tri_compare(s);
auto i = _rows.end();
if (!_rows.empty() && (cmp(*std::prev(i), pos) >= 0)) {
auto i = get_rows_storage().end();
if (!get_rows_storage().empty() && (cmp(*std::prev(i), pos) >= 0)) {
on_internal_error(mplog, format("mutation_partition::append_clustered_row(): cannot append clustering row with key {} to the partition"
", last clustering row is equal or greater: {}", pos, std::prev(i)->position()));
}
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = _rows.insert_before_hint(i, std::move(e), cmp).first;
i = get_rows_storage().insert_before_hint(i, std::move(e), cmp).first;
return i->row();
}
@@ -602,19 +748,33 @@ mutation_partition::append_clustered_row(const schema& s, position_in_partition_
mutation_partition::rows_type::const_iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (!r.start()) {
return std::cbegin(_rows);
if (use_single_row_storage(schema)) {
// Single-row storage: always return end iterator (empty range)
static const rows_type empty_rows;
return empty_rows.end();
}
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
if (!r.start()) {
return std::cbegin(get_rows_storage());
}
return get_rows_storage().lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
}
mutation_partition::rows_type::const_iterator
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (!r.end()) {
return std::cend(_rows);
if (use_single_row_storage(schema)) {
// Single-row storage: always return end iterator (empty range)
static const rows_type empty_rows;
return empty_rows.end();
}
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
if (!r.end()) {
return std::cend(get_rows_storage());
}
return get_rows_storage().lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
}
std::ranges::subrange<mutation_partition::rows_type::const_iterator>
@@ -625,17 +785,32 @@ mutation_partition::range(const schema& schema, const query::clustering_range& r
std::ranges::subrange<mutation_partition::rows_type::iterator>
mutation_partition::range(const schema& schema, const query::clustering_range& r) {
return unconst(_rows, static_cast<const mutation_partition*>(this)->range(schema, r));
if (use_single_row_storage(schema)) {
// Single-row storage: return empty range (rows_entry iteration not applicable)
static rows_type empty_rows;
return std::ranges::subrange(empty_rows.begin(), empty_rows.end());
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->range(schema, r));
}
mutation_partition::rows_type::iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) {
return unconst(_rows, static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
if (use_single_row_storage(schema)) {
// Single-row storage: return end iterator (empty range)
static rows_type empty_rows;
return empty_rows.end();
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
}
mutation_partition::rows_type::iterator
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) {
return unconst(_rows, static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
if (use_single_row_storage(schema)) {
// Single-row storage: return end iterator (empty range)
static rows_type empty_rows;
return empty_rows.end();
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
}
template<typename Func>
@@ -1377,7 +1552,15 @@ bool mutation_partition::empty() const
if (_tombstone.timestamp != api::missing_timestamp) {
return false;
}
return !_static_row.size() && _rows.empty() && _row_tombstones.empty();
if (_static_row.size() || !_row_tombstones.empty()) {
return false;
}
if (uses_single_row_storage()) {
return !get_single_row_storage().has_value();
} else {
return get_rows_storage().empty();
}
}
bool
@@ -1422,7 +1605,11 @@ mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_t
uint64_t
mutation_partition::row_count() const {
return _rows.calculate_size();
if (uses_single_row_storage()) {
return get_single_row_storage().has_value() ? 1 : 0;
} else {
return get_rows_storage().calculate_size();
}
}
rows_entry::rows_entry(rows_entry&& o) noexcept
@@ -2219,15 +2406,22 @@ public:
mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const schema& s, tombstone t)
: _tombstone(t)
, _static_row_continuous(!s.has_static_columns())
, _rows()
, _rows(use_single_row_storage(s) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
_rows.insert_before(_rows.end(), std::move(e));
if (use_single_row_storage(s)) {
// Single-row storage: no dummy entries needed, leave row as empty optional
} else {
// Multi-row storage: add last dummy entry for discontinuous partition
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
}
}
bool mutation_partition::is_fully_continuous() const {

View File

@@ -9,6 +9,7 @@
#pragma once
#include <iosfwd>
#include <variant>
#include <boost/intrusive/parent_from_member.hpp>
#include <seastar/util/optimized_optional.hh>
@@ -1188,6 +1189,12 @@ inline void check_row_key(const schema& s, position_in_partition_view pos, is_du
}
}
// Returns true if the schema has no clustering keys, meaning partitions can have at most one row.
// When true, mutation_partition uses std::optional<deletable_row> instead of full rows_type container.
inline bool use_single_row_storage(const schema& s) {
return s.clustering_key_size() == 0;
}
// Represents a set of writes made to a single partition.
//
// The object is schema-dependent. Each instance is governed by some
@@ -1228,20 +1235,45 @@ inline void check_row_key(const schema& s, position_in_partition_view pos, is_du
class mutation_partition final {
public:
using rows_type = rows_entry::container_type;
using rows_storage_type = std::variant<rows_type, std::optional<deletable_row>>;
friend class size_calculator;
private:
tombstone _tombstone;
lazy_row _static_row;
bool _static_row_continuous = true;
rows_type _rows;
rows_storage_type _rows;
// Contains only strict prefixes so that we don't have to lookup full keys
// in both _row_tombstones and _rows.
// Note: empty when using single-row storage (std::optional<deletable_row> variant)
range_tombstone_list _row_tombstones;
#ifdef SEASTAR_DEBUG
table_schema_version _schema_version;
#endif
friend class converting_mutation_partition_applier;
// Returns true if this partition uses single-row storage
bool uses_single_row_storage() const {
return std::holds_alternative<std::optional<deletable_row>>(_rows);
}
// Get reference to rows container (multi-row storage)
rows_type& get_rows_storage() {
return std::get<rows_type>(_rows);
}
const rows_type& get_rows_storage() const {
return std::get<rows_type>(_rows);
}
// Get reference to single row storage
std::optional<deletable_row>& get_single_row_storage() {
return std::get<std::optional<deletable_row>>(_rows);
}
const std::optional<deletable_row>& get_single_row_storage() const {
return std::get<std::optional<deletable_row>>(_rows);
}
public:
struct copy_comparators_only {};
struct incomplete_tag {};
@@ -1251,14 +1283,14 @@ public:
return mutation_partition(incomplete_tag(), s, t);
}
mutation_partition(const schema& s)
: _rows()
: _rows(use_single_row_storage(s) ? rows_storage_type(std::optional<deletable_row>{}) : rows_storage_type(rows_type{}))
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{ }
mutation_partition(mutation_partition& other, copy_comparators_only)
: _rows()
: _rows(other._rows.index() == 0 ? rows_storage_type(rows_type{}) : rows_storage_type(std::optional<deletable_row>{}))
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(other._schema_version)
@@ -1269,6 +1301,8 @@ public:
mutation_partition(const mutation_partition&, const schema&, query::clustering_key_filter_ranges);
mutation_partition(mutation_partition&&, const schema&, query::clustering_key_filter_ranges);
~mutation_partition();
// Returns the mutation_partition containing the given rows_type.
// Can only be used when the mutation_partition uses multi-row storage.
static mutation_partition& container_of(rows_type&);
mutation_partition& operator=(mutation_partition&& x) noexcept;
bool equal(const schema&, const mutation_partition&) const;
@@ -1462,9 +1496,31 @@ public:
const lazy_row& static_row() const { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
const rows_type& clustered_rows() const noexcept { return _rows; }
utils::immutable_collection<rows_type> clustered_rows() noexcept { return _rows; }
rows_type& mutable_clustered_rows() noexcept { return _rows; }
// For single-row storage (clustering_key_size() == 0), returns an empty container.
// Callers should check uses_single_row_storage() and use get_single_row() for single-row case.
const rows_type& clustered_rows() const noexcept {
if (uses_single_row_storage()) {
static const rows_type empty_rows;
return empty_rows;
}
return get_rows_storage();
}
utils::immutable_collection<rows_type> clustered_rows() noexcept {
return const_cast<const mutation_partition*>(this)->clustered_rows();
}
rows_type& mutable_clustered_rows() noexcept {
// Should only be called when NOT using single-row storage
return get_rows_storage();
}
// Access the single row when using single-row storage (clustering_key_size() == 0)
const std::optional<deletable_row>& get_single_row() const {
return get_single_row_storage();
}
std::optional<deletable_row>& get_single_row() {
return get_single_row_storage();
}
const range_tombstone_list& row_tombstones() const noexcept { return _row_tombstones; }
utils::immutable_collection<range_tombstone_list> row_tombstones() noexcept { return _row_tombstones; }
@@ -1482,8 +1538,14 @@ public:
rows_type::iterator upper_bound(const schema& schema, const query::clustering_range& r);
std::ranges::subrange<rows_type::iterator> range(const schema& schema, const query::clustering_range& r);
// Returns an iterator range of rows_entry, with only non-dummy entries.
// For single-row storage, returns an empty range.
auto non_dummy_rows() const {
return std::ranges::subrange(_rows.begin(), _rows.end())
if (uses_single_row_storage()) {
static const rows_type empty_rows;
return std::ranges::subrange(empty_rows.begin(), empty_rows.end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
return std::ranges::subrange(get_rows_storage().begin(), get_rows_storage().end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
void accept(const schema&, mutation_partition_visitor&) const;
@@ -1517,7 +1579,21 @@ private:
inline
mutation_partition& mutation_partition::container_of(rows_type& rows) {
return *boost::intrusive::get_parent_from_member(&rows, &mutation_partition::_rows);
// This method can only be called when using multi-row storage (rows_type variant alternative).
// With std::variant, when rows_type is the active alternative (index 0), it's stored at the beginning of the variant.
// We can use pointer arithmetic to get back to the mutation_partition.
// Calculate offset from rows_type to the containing variant
// The rows reference should be the active rows_type inside the variant
static_assert(std::is_same_v<std::variant_alternative_t<0, rows_storage_type>, rows_type>,
"rows_type must be the first alternative in rows_storage_type");
// Get address of the variant containing this rows_type
// When rows_type is active (index 0), it's at offset 0 in the variant's storage
rows_storage_type* variant_ptr = reinterpret_cast<rows_storage_type*>(&rows);
// Now get the mutation_partition from the variant
return *boost::intrusive::get_parent_from_member(variant_ptr, &mutation_partition::_rows);
}
bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tombstone tomb = tombstone(),