|
|
|
|
@@ -144,7 +144,14 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
|
|
|
|
|
, _static_row(s, column_kind::static_column, x._static_row)
|
|
|
|
|
, _static_row_continuous(x._static_row_continuous)
|
|
|
|
|
, _rows()
|
|
|
|
|
, _row_tombstones(x._row_tombstones) {
|
|
|
|
|
, _row_tombstones(x._row_tombstones)
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
, _schema_version(s.version())
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
assert(x._schema_version == _schema_version);
|
|
|
|
|
#endif
|
|
|
|
|
auto cloner = [&s] (const auto& x) {
|
|
|
|
|
return current_allocator().construct<rows_entry>(s, x);
|
|
|
|
|
};
|
|
|
|
|
@@ -157,7 +164,14 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
|
|
|
|
|
, _static_row(schema, column_kind::static_column, x._static_row)
|
|
|
|
|
, _static_row_continuous(x._static_row_continuous)
|
|
|
|
|
, _rows()
|
|
|
|
|
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only()) {
|
|
|
|
|
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only())
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
, _schema_version(schema.version())
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
assert(x._schema_version == _schema_version);
|
|
|
|
|
#endif
|
|
|
|
|
try {
|
|
|
|
|
for(auto&& r : ck_ranges) {
|
|
|
|
|
for (const rows_entry& e : x.range(schema, r)) {
|
|
|
|
|
@@ -180,7 +194,13 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
|
|
|
|
|
, _static_row_continuous(x._static_row_continuous)
|
|
|
|
|
, _rows(std::move(x._rows))
|
|
|
|
|
, _row_tombstones(std::move(x._row_tombstones))
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
, _schema_version(schema.version())
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
assert(x._schema_version == _schema_version);
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
auto deleter = current_deleter<rows_entry>();
|
|
|
|
|
auto it = _rows.begin();
|
|
|
|
|
@@ -220,6 +240,7 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mutation_partition::ensure_last_dummy(const schema& s) {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
if (_rows.empty() || !_rows.rbegin()->is_last_dummy()) {
|
|
|
|
|
_rows.insert_before(_rows.end(),
|
|
|
|
|
*current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::yes));
|
|
|
|
|
@@ -276,11 +297,16 @@ void deletable_row::apply(const schema& s, clustering_row cr) {
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::apply(const schema& s, const mutation_fragment& mf) {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
mutation_fragment_applier applier{s, *this};
|
|
|
|
|
mf.visit(applier);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mutation_partition::apply_monotonically(const schema& s, mutation_partition&& p, cache_tracker* tracker) {
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
assert(s.version() == _schema_version);
|
|
|
|
|
assert(p._schema_version == _schema_version);
|
|
|
|
|
#endif
|
|
|
|
|
_tombstone.apply(p._tombstone);
|
|
|
|
|
_row_tombstones.apply_monotonically(s, std::move(p._row_tombstones));
|
|
|
|
|
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
|
|
|
|
|
@@ -356,6 +382,7 @@ void mutation_partition::apply_weak(const schema& s, mutation_partition&& p) {
|
|
|
|
|
|
|
|
|
|
tombstone
|
|
|
|
|
mutation_partition::range_tombstone_for_row(const schema& schema, const clustering_key& key) const {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
tombstone t = _tombstone;
|
|
|
|
|
if (!_row_tombstones.empty()) {
|
|
|
|
|
auto found = _row_tombstones.search_tombstone_covering(schema, key);
|
|
|
|
|
@@ -366,6 +393,7 @@ mutation_partition::range_tombstone_for_row(const schema& schema, const clusteri
|
|
|
|
|
|
|
|
|
|
row_tombstone
|
|
|
|
|
mutation_partition::tombstone_for_row(const schema& schema, const clustering_key& key) const {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
|
|
|
|
|
|
|
|
|
|
auto j = _rows.find(key, rows_entry::compare(schema));
|
|
|
|
|
@@ -378,6 +406,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
|
|
|
|
|
|
|
|
|
|
row_tombstone
|
|
|
|
|
mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e) const {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
row_tombstone t = e.row().deleted_at();
|
|
|
|
|
t.apply(range_tombstone_for_row(schema, e.key()));
|
|
|
|
|
return t;
|
|
|
|
|
@@ -385,6 +414,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e)
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t) {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
assert(!prefix.is_full(schema));
|
|
|
|
|
auto start = prefix;
|
|
|
|
|
_row_tombstones.apply(schema, {std::move(start), std::move(prefix), std::move(t)});
|
|
|
|
|
@@ -392,11 +422,13 @@ mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_pre
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::apply_row_tombstone(const schema& schema, range_tombstone rt) {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
_row_tombstones.apply(schema, std::move(rt));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::apply_delete(const schema& schema, const clustering_key_prefix& prefix, tombstone t) {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
if (prefix.is_empty(schema)) {
|
|
|
|
|
apply(t);
|
|
|
|
|
} else if (prefix.is_full(schema)) {
|
|
|
|
|
@@ -408,6 +440,7 @@ mutation_partition::apply_delete(const schema& schema, const clustering_key_pref
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::apply_delete(const schema& schema, range_tombstone rt) {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
if (range_tombstone::is_single_clustering_row_tombstone(schema, rt.start, rt.start_kind, rt.end, rt.end_kind)) {
|
|
|
|
|
apply_delete(schema, std::move(rt.start), std::move(rt.tomb));
|
|
|
|
|
return;
|
|
|
|
|
@@ -417,6 +450,7 @@ mutation_partition::apply_delete(const schema& schema, range_tombstone rt) {
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::apply_delete(const schema& schema, clustering_key&& prefix, tombstone t) {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
if (prefix.is_empty(schema)) {
|
|
|
|
|
apply(t);
|
|
|
|
|
} else if (prefix.is_full(schema)) {
|
|
|
|
|
@@ -428,6 +462,7 @@ mutation_partition::apply_delete(const schema& schema, clustering_key&& prefix,
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::apply_delete(const schema& schema, clustering_key_prefix_view prefix, tombstone t) {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
if (prefix.is_empty(schema)) {
|
|
|
|
|
apply(t);
|
|
|
|
|
} else if (prefix.is_full(schema)) {
|
|
|
|
|
@@ -451,12 +486,14 @@ void mutation_partition::insert_row(const schema& s, const clustering_key& key,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
auto e = current_allocator().construct<rows_entry>(s, key, row);
|
|
|
|
|
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const row*
|
|
|
|
|
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
auto i = _rows.find(key, rows_entry::compare(s));
|
|
|
|
|
if (i == _rows.end()) {
|
|
|
|
|
return nullptr;
|
|
|
|
|
@@ -466,6 +503,7 @@ mutation_partition::find_row(const schema& s, const clustering_key& key) const {
|
|
|
|
|
|
|
|
|
|
deletable_row&
|
|
|
|
|
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
auto i = _rows.find(key, rows_entry::compare(s));
|
|
|
|
|
if (i == _rows.end()) {
|
|
|
|
|
auto e = current_allocator().construct<rows_entry>(std::move(key));
|
|
|
|
|
@@ -477,6 +515,7 @@ mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
|
|
|
|
|
|
|
|
|
|
deletable_row&
|
|
|
|
|
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
auto i = _rows.find(key, rows_entry::compare(s));
|
|
|
|
|
if (i == _rows.end()) {
|
|
|
|
|
auto e = current_allocator().construct<rows_entry>(key);
|
|
|
|
|
@@ -488,6 +527,7 @@ mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
|
|
|
|
|
|
|
|
|
|
deletable_row&
|
|
|
|
|
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
auto i = _rows.find(key, rows_entry::compare(s));
|
|
|
|
|
if (i == _rows.end()) {
|
|
|
|
|
auto e = current_allocator().construct<rows_entry>(key);
|
|
|
|
|
@@ -499,6 +539,7 @@ mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
|
|
|
|
|
|
|
|
|
|
deletable_row&
|
|
|
|
|
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
auto i = _rows.find(pos, rows_entry::compare(s));
|
|
|
|
|
if (i == _rows.end()) {
|
|
|
|
|
auto e = current_allocator().construct<rows_entry>(s, pos, dummy, continuous);
|
|
|
|
|
@@ -510,6 +551,7 @@ mutation_partition::clustered_row(const schema& s, position_in_partition_view po
|
|
|
|
|
|
|
|
|
|
mutation_partition::rows_type::const_iterator
|
|
|
|
|
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
if (!r.start()) {
|
|
|
|
|
return std::cbegin(_rows);
|
|
|
|
|
}
|
|
|
|
|
@@ -518,6 +560,7 @@ mutation_partition::lower_bound(const schema& schema, const query::clustering_ra
|
|
|
|
|
|
|
|
|
|
mutation_partition::rows_type::const_iterator
|
|
|
|
|
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) const {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
if (!r.end()) {
|
|
|
|
|
return std::cend(_rows);
|
|
|
|
|
}
|
|
|
|
|
@@ -526,6 +569,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
|
|
|
|
|
|
|
|
|
|
boost::iterator_range<mutation_partition::rows_type::const_iterator>
|
|
|
|
|
mutation_partition::range(const schema& schema, const query::clustering_range& r) const {
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
return boost::make_iterator_range(lower_bound(schema, r), upper_bound(schema, r));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -562,6 +606,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
|
|
|
|
|
template<typename Func>
|
|
|
|
|
void mutation_partition::for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const
|
|
|
|
|
{
|
|
|
|
|
check_schema(schema);
|
|
|
|
|
auto r = range(schema, row_range);
|
|
|
|
|
if (!reversed) {
|
|
|
|
|
for (const auto& e : r) {
|
|
|
|
|
@@ -778,6 +823,7 @@ bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tomb
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
mutation_partition::query_compacted(query::result::partition_writer& pw, const schema& s, uint32_t limit) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
const query::partition_slice& slice = pw.slice();
|
|
|
|
|
max_timestamp max_ts{pw.last_modified()};
|
|
|
|
|
|
|
|
|
|
@@ -996,6 +1042,10 @@ bool mutation_partition::equal(const schema& s, const mutation_partition& p) con
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool mutation_partition::equal(const schema& this_schema, const mutation_partition& p, const schema& p_schema) const {
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
assert(_schema_version == this_schema.version());
|
|
|
|
|
assert(p._schema_version == p_schema.version());
|
|
|
|
|
#endif
|
|
|
|
|
if (_tombstone != p._tombstone) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
@@ -1189,6 +1239,7 @@ size_t rows_entry::memory_usage(const schema& s) const {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t mutation_partition::external_memory_usage(const schema& s) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
size_t sum = 0;
|
|
|
|
|
sum += static_row().external_memory_usage(s, column_kind::static_column);
|
|
|
|
|
for (auto& clr : clustered_rows()) {
|
|
|
|
|
@@ -1207,6 +1258,7 @@ void mutation_partition::trim_rows(const schema& s,
|
|
|
|
|
const std::vector<query::clustering_range>& row_ranges,
|
|
|
|
|
Func&& func)
|
|
|
|
|
{
|
|
|
|
|
check_schema(s);
|
|
|
|
|
static_assert(std::is_same<stop_iteration, std::result_of_t<Func(rows_entry&)>>::value, "Bad func signature");
|
|
|
|
|
|
|
|
|
|
stop_iteration stop = stop_iteration::no;
|
|
|
|
|
@@ -1251,6 +1303,7 @@ uint32_t mutation_partition::do_compact(const schema& s,
|
|
|
|
|
uint32_t row_limit,
|
|
|
|
|
can_gc_fn& can_gc)
|
|
|
|
|
{
|
|
|
|
|
check_schema(s);
|
|
|
|
|
assert(row_limit > 0);
|
|
|
|
|
|
|
|
|
|
auto gc_before = saturating_subtract(query_time, s.gc_grace_seconds());
|
|
|
|
|
@@ -1316,12 +1369,14 @@ mutation_partition::compact_for_query(
|
|
|
|
|
bool reverse,
|
|
|
|
|
uint32_t row_limit)
|
|
|
|
|
{
|
|
|
|
|
check_schema(s);
|
|
|
|
|
return do_compact(s, query_time, row_ranges, reverse, row_limit, always_gc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mutation_partition::compact_for_compaction(const schema& s,
|
|
|
|
|
can_gc_fn& can_gc, gc_clock::time_point compaction_time)
|
|
|
|
|
{
|
|
|
|
|
check_schema(s);
|
|
|
|
|
static const std::vector<query::clustering_range> all_rows = {
|
|
|
|
|
query::clustering_range::make_open_ended_both_sides()
|
|
|
|
|
};
|
|
|
|
|
@@ -1355,11 +1410,13 @@ row::is_live(const schema& s, column_kind kind, tombstone base_tombstone, gc_clo
|
|
|
|
|
|
|
|
|
|
bool
|
|
|
|
|
mutation_partition::is_static_row_live(const schema& s, gc_clock::time_point query_time) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
return has_any_live_data(s, column_kind::static_column, static_row(), _tombstone, query_time);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t
|
|
|
|
|
mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_time) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
size_t count = 0;
|
|
|
|
|
|
|
|
|
|
for (const rows_entry& e : non_dummy_rows()) {
|
|
|
|
|
@@ -1705,6 +1762,7 @@ row row::difference(const schema& s, column_kind kind, const row& other) const
|
|
|
|
|
|
|
|
|
|
mutation_partition mutation_partition::difference(schema_ptr s, const mutation_partition& other) const
|
|
|
|
|
{
|
|
|
|
|
check_schema(*s);
|
|
|
|
|
mutation_partition mp(s);
|
|
|
|
|
if (_tombstone > other._tombstone) {
|
|
|
|
|
mp.apply(_tombstone);
|
|
|
|
|
@@ -1735,6 +1793,7 @@ mutation_partition mutation_partition::difference(schema_ptr s, const mutation_p
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mutation_partition::accept(const schema& s, mutation_partition_visitor& v) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
v.accept_partition_tombstone(_tombstone);
|
|
|
|
|
_static_row.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
|
|
|
|
|
const column_definition& def = s.static_column_at(id);
|
|
|
|
|
@@ -2168,6 +2227,9 @@ mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const
|
|
|
|
|
, _static_row_continuous(!s.has_static_columns())
|
|
|
|
|
, _rows()
|
|
|
|
|
, _row_tombstones(s)
|
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
|
|
|
, _schema_version(s.version())
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
_rows.insert_before(_rows.end(),
|
|
|
|
|
*current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
|
|
|
|
|
@@ -2199,6 +2261,7 @@ void mutation_partition::make_fully_continuous() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
clustering_interval_set mutation_partition::get_continuity(const schema& s, is_continuous cont) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
clustering_interval_set result;
|
|
|
|
|
auto i = _rows.begin();
|
|
|
|
|
auto prev_pos = position_in_partition::before_all_clustered_rows();
|
|
|
|
|
@@ -2248,6 +2311,7 @@ stop_iteration mutation_partition::clear_gently(cache_tracker* tracker) noexcept
|
|
|
|
|
|
|
|
|
|
bool
|
|
|
|
|
mutation_partition::check_continuity(const schema& s, const position_range& r, is_continuous cont) const {
|
|
|
|
|
check_schema(s);
|
|
|
|
|
auto less = rows_entry::compare(s);
|
|
|
|
|
auto i = _rows.lower_bound(r.start(), less);
|
|
|
|
|
auto end = _rows.lower_bound(r.end(), less);
|
|
|
|
|
|