Compare commits
28 Commits
scylla-2.3
...
next-2.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b252bba4a2 | ||
|
|
a0b9fcc041 | ||
|
|
35c9b675c1 | ||
|
|
d71836fef7 | ||
|
|
f8e150e97c | ||
|
|
10c300f894 | ||
|
|
de1d3e5c6b | ||
|
|
69810c13ca | ||
|
|
9b025a5742 | ||
|
|
74eebc4cab | ||
|
|
9b2ca4ee44 | ||
|
|
773bf45774 | ||
|
|
c6705b4335 | ||
|
|
3997871b4d | ||
|
|
4ff1d731bd | ||
|
|
0e0f9143c9 | ||
|
|
9d809d6ea4 | ||
|
|
630d599c34 | ||
|
|
0933c1a00a | ||
|
|
7a7099fcfb | ||
|
|
50235aacb4 | ||
|
|
e888009f12 | ||
|
|
a19615ee9b | ||
|
|
357ca67fda | ||
|
|
7818c63eb1 | ||
|
|
da10eae18c | ||
|
|
d5292cd3ec | ||
|
|
9cb35361d9 |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=2.3.3
|
||||
VERSION=2.3.6
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -364,7 +364,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
|
||||
}
|
||||
});
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}, timeout);
|
||||
}
|
||||
|
||||
inline
|
||||
|
||||
@@ -516,6 +516,7 @@ scylla_core = (['database.cc',
|
||||
'db/consistency_level.cc',
|
||||
'db/system_keyspace.cc',
|
||||
'db/system_distributed_keyspace.cc',
|
||||
'db/size_estimates_virtual_reader.cc',
|
||||
'db/schema_tables.cc',
|
||||
'db/cql_type_parser.cc',
|
||||
'db/legacy_schema_migrator.cc',
|
||||
|
||||
@@ -405,7 +405,7 @@ public:
|
||||
in_marker(int32_t bind_index, ::shared_ptr<column_specification> receiver)
|
||||
: abstract_marker(bind_index, std::move(receiver))
|
||||
{
|
||||
assert(dynamic_pointer_cast<const list_type_impl>(receiver->type));
|
||||
assert(dynamic_pointer_cast<const list_type_impl>(_receiver->type));
|
||||
}
|
||||
|
||||
virtual shared_ptr<terminal> bind(const query_options& options) override {
|
||||
|
||||
@@ -1480,7 +1480,10 @@ future<> table::cleanup_sstables(sstables::compaction_descriptor descriptor) {
|
||||
static thread_local semaphore sem(1);
|
||||
|
||||
return with_semaphore(sem, 1, [this, &sst] {
|
||||
return this->compact_sstables(sstables::compaction_descriptor({ sst }, sst->get_sstable_level()), true);
|
||||
// release reference to sstables cleaned up, otherwise space usage from their data and index
|
||||
// components cannot be reclaimed until all of them are cleaned.
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
return this->compact_sstables(sstables::compaction_descriptor({ std::move(sst) }, sstable_level), true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -686,33 +686,7 @@ read_keyspace_mutation(distributed<service::storage_proxy>& proxy, const sstring
|
||||
static semaphore the_merge_lock {1};
|
||||
|
||||
future<> merge_lock() {
|
||||
// ref: #1088
|
||||
// to avoid deadlocks, we don't want long-standing calls to the shard 0
|
||||
// as they can cause a deadlock:
|
||||
//
|
||||
// fiber1 fiber2
|
||||
// merge_lock() (succeeds)
|
||||
// merge_lock() (waits)
|
||||
// invoke_on_all() (waits on merge_lock to relinquish smp::submit_to slot)
|
||||
//
|
||||
// so we issue the lock calls with a timeout; the slot will be relinquished, and invoke_on_all()
|
||||
// can complete
|
||||
return repeat([] () mutable {
|
||||
return smp::submit_to(0, [] {
|
||||
return the_merge_lock.try_wait();
|
||||
}).then([] (bool result) {
|
||||
if (result) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
} else {
|
||||
static thread_local auto rand_engine = std::default_random_engine();
|
||||
auto dist = std::uniform_int_distribution<int>(0, 100);
|
||||
auto to = std::chrono::microseconds(dist(rand_engine));
|
||||
return sleep(to).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
return smp::submit_to(0, [] { return the_merge_lock.wait(); });
|
||||
}
|
||||
|
||||
future<> merge_unlock() {
|
||||
|
||||
329
db/size_estimates_virtual_reader.cc
Normal file
329
db/size_estimates_virtual_reader.cc
Normal file
@@ -0,0 +1,329 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <boost/range/adaptor/indirected.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
#include "database.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "partition_range_compat.hh"
|
||||
#include "range.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "stdx.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "database.hh"
|
||||
|
||||
#include "db/size_estimates_virtual_reader.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
namespace size_estimates {
|
||||
|
||||
struct virtual_row {
|
||||
const bytes& cf_name;
|
||||
const token_range& tokens;
|
||||
clustering_key_prefix as_key() const {
|
||||
return clustering_key_prefix::from_exploded(std::vector<bytes_view>{cf_name, tokens.start, tokens.end});
|
||||
}
|
||||
};
|
||||
|
||||
struct virtual_row_comparator {
|
||||
schema_ptr _schema;
|
||||
virtual_row_comparator(schema_ptr schema) : _schema(schema) { }
|
||||
bool operator()(const clustering_key_prefix& key1, const clustering_key_prefix& key2) {
|
||||
return clustering_key_prefix::prefix_equality_less_compare(*_schema)(key1, key2);
|
||||
}
|
||||
bool operator()(const virtual_row& row, const clustering_key_prefix& key) {
|
||||
return operator()(row.as_key(), key);
|
||||
}
|
||||
bool operator()(const clustering_key_prefix& key, const virtual_row& row) {
|
||||
return operator()(key, row.as_key());
|
||||
}
|
||||
};
|
||||
|
||||
// Iterating over the cartesian product of cf_names and token_ranges.
|
||||
class virtual_row_iterator : public std::iterator<std::input_iterator_tag, const virtual_row> {
|
||||
std::reference_wrapper<const std::vector<bytes>> _cf_names;
|
||||
std::reference_wrapper<const std::vector<token_range>> _ranges;
|
||||
size_t _cf_names_idx = 0;
|
||||
size_t _ranges_idx = 0;
|
||||
public:
|
||||
struct end_iterator_tag {};
|
||||
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges)
|
||||
: _cf_names(std::ref(cf_names))
|
||||
, _ranges(std::ref(ranges))
|
||||
{ }
|
||||
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges, end_iterator_tag)
|
||||
: _cf_names(std::ref(cf_names))
|
||||
, _ranges(std::ref(ranges))
|
||||
, _cf_names_idx(cf_names.size())
|
||||
, _ranges_idx(ranges.size())
|
||||
{
|
||||
if (cf_names.empty() || ranges.empty()) {
|
||||
// The product of an empty range with any range is an empty range.
|
||||
// In this case we want the end iterator to be equal to the begin iterator,
|
||||
// which has_ranges_idx = _cf_names_idx = 0.
|
||||
_ranges_idx = _cf_names_idx = 0;
|
||||
}
|
||||
}
|
||||
virtual_row_iterator& operator++() {
|
||||
if (++_ranges_idx == _ranges.get().size() && ++_cf_names_idx < _cf_names.get().size()) {
|
||||
_ranges_idx = 0;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
virtual_row_iterator operator++(int) {
|
||||
virtual_row_iterator i(*this);
|
||||
++(*this);
|
||||
return i;
|
||||
}
|
||||
const value_type operator*() const {
|
||||
return { _cf_names.get()[_cf_names_idx], _ranges.get()[_ranges_idx] };
|
||||
}
|
||||
bool operator==(const virtual_row_iterator& i) const {
|
||||
return _cf_names_idx == i._cf_names_idx
|
||||
&& _ranges_idx == i._ranges_idx;
|
||||
}
|
||||
bool operator!=(const virtual_row_iterator& i) const {
|
||||
return !(*this == i);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns the keyspaces, ordered by name, as selected by the partition_range.
|
||||
*/
|
||||
static std::vector<sstring> get_keyspaces(const schema& s, const database& db, dht::partition_range range) {
|
||||
struct keyspace_less_comparator {
|
||||
const schema& _s;
|
||||
keyspace_less_comparator(const schema& s) : _s(s) { }
|
||||
dht::ring_position as_ring_position(const sstring& ks) {
|
||||
auto pkey = partition_key::from_single_value(_s, utf8_type->decompose(ks));
|
||||
return dht::global_partitioner().decorate_key(_s, std::move(pkey));
|
||||
}
|
||||
bool operator()(const sstring& ks1, const sstring& ks2) {
|
||||
return as_ring_position(ks1).less_compare(_s, as_ring_position(ks2));
|
||||
}
|
||||
bool operator()(const sstring& ks, const dht::ring_position& rp) {
|
||||
return as_ring_position(ks).less_compare(_s, rp);
|
||||
}
|
||||
bool operator()(const dht::ring_position& rp, const sstring& ks) {
|
||||
return rp.less_compare(_s, as_ring_position(ks));
|
||||
}
|
||||
};
|
||||
auto keyspaces = db.get_non_system_keyspaces();
|
||||
auto cmp = keyspace_less_comparator(s);
|
||||
boost::sort(keyspaces, cmp);
|
||||
return boost::copy_range<std::vector<sstring>>(
|
||||
range.slice(keyspaces, std::move(cmp)) | boost::adaptors::filtered([&s] (const auto& ks) {
|
||||
// If this is a range query, results are divided between shards by the partition key (keyspace_name).
|
||||
return shard_of(dht::global_partitioner().get_token(s,
|
||||
partition_key::from_single_value(s, utf8_type->decompose(ks))))
|
||||
== engine().cpu_id();
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
|
||||
*/
|
||||
static dht::partition_range as_ring_position_range(dht::token_range& r) {
|
||||
stdx::optional<range<dht::ring_position>::bound> start_bound, end_bound;
|
||||
if (r.start()) {
|
||||
start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
|
||||
}
|
||||
if (r.end()) {
|
||||
end_bound = {{ dht::ring_position(r.end()->value(), dht::ring_position::token_bound::end), r.end()->is_inclusive() }};
|
||||
}
|
||||
return dht::partition_range(std::move(start_bound), std::move(end_bound), r.is_singular());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
|
||||
*/
|
||||
static system_keyspace::range_estimates estimate(const column_family& cf, const token_range& r) {
|
||||
int64_t count{0};
|
||||
utils::estimated_histogram hist{0};
|
||||
auto from_bytes = [] (auto& b) {
|
||||
return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
|
||||
};
|
||||
dht::token_range_vector ranges;
|
||||
compat::unwrap_into(
|
||||
wrapping_range<dht::token>({{ from_bytes(r.start) }}, {{ from_bytes(r.end) }}),
|
||||
dht::token_comparator(),
|
||||
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
|
||||
for (auto&& r : ranges) {
|
||||
auto rp_range = as_ring_position_range(r);
|
||||
for (auto&& sstable : cf.select_sstables(rp_range)) {
|
||||
count += sstable->estimated_keys_for_range(r);
|
||||
hist.merge(sstable->get_stats_metadata().estimated_row_size);
|
||||
}
|
||||
}
|
||||
return {cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
|
||||
}
|
||||
|
||||
future<std::vector<token_range>> get_local_ranges() {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
return ss.get_local_tokens().then([&ss] (auto&& tokens) {
|
||||
auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens));
|
||||
std::vector<token_range> local_ranges;
|
||||
auto to_bytes = [](const stdx::optional<dht::token_range::bound>& b) {
|
||||
assert(b);
|
||||
return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
|
||||
};
|
||||
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
|
||||
// All queries will be on that table, where all entries are text and there's no notion of
|
||||
// token ranges form the CQL point of view.
|
||||
auto left_inf = boost::find_if(ranges, [] (auto&& r) {
|
||||
return !r.start() || r.start()->value() == dht::minimum_token();
|
||||
});
|
||||
auto right_inf = boost::find_if(ranges, [] (auto&& r) {
|
||||
return !r.end() || r.start()->value() == dht::maximum_token();
|
||||
});
|
||||
if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
|
||||
local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});
|
||||
ranges.erase(left_inf);
|
||||
ranges.erase(right_inf);
|
||||
}
|
||||
for (auto&& r : ranges) {
|
||||
local_ranges.push_back(token_range{to_bytes(r.start()), to_bytes(r.end())});
|
||||
}
|
||||
boost::sort(local_ranges, [] (auto&& tr1, auto&& tr2) {
|
||||
return utf8_type->less(tr1.start, tr2.start);
|
||||
});
|
||||
return local_ranges;
|
||||
});
|
||||
}
|
||||
|
||||
size_estimates_mutation_reader::size_estimates_mutation_reader(schema_ptr schema, const dht::partition_range& prange, const query::partition_slice& slice, streamed_mutation::forwarding fwd)
|
||||
: impl(schema)
|
||||
, _schema(std::move(schema))
|
||||
, _prange(&prange)
|
||||
, _slice(slice)
|
||||
, _fwd(fwd)
|
||||
{ }
|
||||
|
||||
future<> size_estimates_mutation_reader::get_next_partition() {
|
||||
auto& db = service::get_local_storage_proxy().get_db().local();
|
||||
if (!_keyspaces) {
|
||||
_keyspaces = get_keyspaces(*_schema, db, *_prange);
|
||||
_current_partition = _keyspaces->begin();
|
||||
}
|
||||
if (_current_partition == _keyspaces->end()) {
|
||||
_end_of_stream = true;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return get_local_ranges().then([&db, this] (auto&& ranges) {
|
||||
auto estimates = this->estimates_for_current_keyspace(db, std::move(ranges));
|
||||
auto mutations = db::system_keyspace::make_size_estimates_mutation(*_current_partition, std::move(estimates));
|
||||
++_current_partition;
|
||||
std::vector<mutation> ms;
|
||||
ms.emplace_back(std::move(mutations));
|
||||
_partition_reader = flat_mutation_reader_from_mutations(std::move(ms), _fwd);
|
||||
});
|
||||
}
|
||||
|
||||
future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
|
||||
if (!_partition_reader) {
|
||||
return get_next_partition();
|
||||
}
|
||||
return _partition_reader->consume_pausable([this] (mutation_fragment mf) {
|
||||
push_mutation_fragment(std::move(mf));
|
||||
return stop_iteration(is_buffer_full());
|
||||
}, timeout).then([this] {
|
||||
if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
|
||||
_partition_reader = stdx::nullopt;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void size_estimates_mutation_reader::next_partition() {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_partition_reader = stdx::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
clear_buffer();
|
||||
_prange = ≺
|
||||
_keyspaces = stdx::nullopt;
|
||||
_partition_reader = stdx::nullopt;
|
||||
_end_of_stream = false;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
|
||||
forward_buffer_to(pr.start());
|
||||
_end_of_stream = false;
|
||||
if (_partition_reader) {
|
||||
return _partition_reader->fast_forward_to(std::move(pr), timeout);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
size_t size_estimates_mutation_reader::buffer_size() const {
|
||||
if (_partition_reader) {
|
||||
return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size();
|
||||
}
|
||||
return flat_mutation_reader::impl::buffer_size();
|
||||
}
|
||||
|
||||
std::vector<db::system_keyspace::range_estimates>
|
||||
size_estimates_mutation_reader::estimates_for_current_keyspace(const database& db, std::vector<token_range> local_ranges) const {
|
||||
// For each specified range, estimate (crudely) mean partition size and partitions count.
|
||||
auto pkey = partition_key::from_single_value(*_schema, utf8_type->decompose(*_current_partition));
|
||||
auto cfs = db.find_keyspace(*_current_partition).metadata()->cf_meta_data();
|
||||
auto cf_names = boost::copy_range<std::vector<bytes>>(cfs | boost::adaptors::transformed([] (auto&& cf) {
|
||||
return utf8_type->decompose(cf.first);
|
||||
}));
|
||||
boost::sort(cf_names, [] (auto&& n1, auto&& n2) {
|
||||
return utf8_type->less(n1, n2);
|
||||
});
|
||||
std::vector<db::system_keyspace::range_estimates> estimates;
|
||||
for (auto& range : _slice.row_ranges(*_schema, pkey)) {
|
||||
auto rows = boost::make_iterator_range(
|
||||
virtual_row_iterator(cf_names, local_ranges),
|
||||
virtual_row_iterator(cf_names, local_ranges, virtual_row_iterator::end_iterator_tag()));
|
||||
auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
|
||||
for (auto&& r : rows_to_estimate) {
|
||||
auto& cf = db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
|
||||
estimates.push_back(estimate(cf, r.tokens));
|
||||
if (estimates.size() >= _slice.partition_row_limit()) {
|
||||
return estimates;
|
||||
}
|
||||
}
|
||||
}
|
||||
return estimates;
|
||||
}
|
||||
|
||||
} // namespace size_estimates
|
||||
|
||||
} // namespace db
|
||||
@@ -21,33 +21,19 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <boost/range/adaptor/indirected.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
#include "database.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "partition_range_compat.hh"
|
||||
#include "range.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "stdx.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
namespace size_estimates {
|
||||
|
||||
struct token_range {
|
||||
bytes start;
|
||||
bytes end;
|
||||
};
|
||||
|
||||
class size_estimates_mutation_reader final : public flat_mutation_reader::impl {
|
||||
struct token_range {
|
||||
bytes start;
|
||||
bytes end;
|
||||
};
|
||||
schema_ptr _schema;
|
||||
const dht::partition_range* _prange;
|
||||
const query::partition_slice& _slice;
|
||||
@@ -57,267 +43,18 @@ class size_estimates_mutation_reader final : public flat_mutation_reader::impl {
|
||||
streamed_mutation::forwarding _fwd;
|
||||
flat_mutation_reader_opt _partition_reader;
|
||||
public:
|
||||
size_estimates_mutation_reader(schema_ptr schema, const dht::partition_range& prange, const query::partition_slice& slice, streamed_mutation::forwarding fwd)
|
||||
: impl(schema)
|
||||
, _schema(std::move(schema))
|
||||
, _prange(&prange)
|
||||
, _slice(slice)
|
||||
, _fwd(fwd)
|
||||
{ }
|
||||
size_estimates_mutation_reader(schema_ptr, const dht::partition_range&, const query::partition_slice&, streamed_mutation::forwarding);
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point) override;
|
||||
virtual void next_partition() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override;
|
||||
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override;
|
||||
virtual size_t buffer_size() const override;
|
||||
private:
|
||||
future<> get_next_partition() {
|
||||
// For each specified range, estimate (crudely) mean partition size and partitions count.
|
||||
auto& db = service::get_local_storage_proxy().get_db().local();
|
||||
if (!_keyspaces) {
|
||||
_keyspaces = get_keyspaces(*_schema, db, *_prange);
|
||||
_current_partition = _keyspaces->begin();
|
||||
}
|
||||
if (_current_partition == _keyspaces->end()) {
|
||||
_end_of_stream = true;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return get_local_ranges().then([&db, this] (auto&& ranges) {
|
||||
auto estimates = this->estimates_for_current_keyspace(db, std::move(ranges));
|
||||
auto mutations = db::system_keyspace::make_size_estimates_mutation(*_current_partition, std::move(estimates));
|
||||
++_current_partition;
|
||||
std::vector<mutation> ms;
|
||||
ms.emplace_back(std::move(mutations));
|
||||
_partition_reader = flat_mutation_reader_from_mutations(std::move(ms), _fwd);
|
||||
});
|
||||
}
|
||||
public:
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
|
||||
if (!_partition_reader) {
|
||||
return get_next_partition();
|
||||
}
|
||||
return _partition_reader->consume_pausable([this] (mutation_fragment mf) {
|
||||
push_mutation_fragment(std::move(mf));
|
||||
return stop_iteration(is_buffer_full());
|
||||
}, timeout).then([this] {
|
||||
if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
|
||||
_partition_reader = stdx::nullopt;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
virtual void next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_partition_reader = stdx::nullopt;
|
||||
}
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
_prange = ≺
|
||||
_keyspaces = stdx::nullopt;
|
||||
_partition_reader = stdx::nullopt;
|
||||
_end_of_stream = false;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
forward_buffer_to(pr.start());
|
||||
_end_of_stream = false;
|
||||
if (_partition_reader) {
|
||||
return _partition_reader->fast_forward_to(std::move(pr), timeout);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual size_t buffer_size() const override {
|
||||
if (_partition_reader) {
|
||||
return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size();
|
||||
}
|
||||
return flat_mutation_reader::impl::buffer_size();
|
||||
}
|
||||
/**
|
||||
* Returns the primary ranges for the local node.
|
||||
* Used for testing as well.
|
||||
*/
|
||||
static future<std::vector<token_range>> get_local_ranges() {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
return ss.get_local_tokens().then([&ss] (auto&& tokens) {
|
||||
auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens));
|
||||
std::vector<token_range> local_ranges;
|
||||
auto to_bytes = [](const stdx::optional<dht::token_range::bound>& b) {
|
||||
assert(b);
|
||||
return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
|
||||
};
|
||||
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
|
||||
// All queries will be on that table, where all entries are text and there's no notion of
|
||||
// token ranges form the CQL point of view.
|
||||
auto left_inf = boost::find_if(ranges, [] (auto&& r) {
|
||||
return !r.start() || r.start()->value() == dht::minimum_token();
|
||||
});
|
||||
auto right_inf = boost::find_if(ranges, [] (auto&& r) {
|
||||
return !r.end() || r.start()->value() == dht::maximum_token();
|
||||
});
|
||||
if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
|
||||
local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});
|
||||
ranges.erase(left_inf);
|
||||
ranges.erase(right_inf);
|
||||
}
|
||||
for (auto&& r : ranges) {
|
||||
local_ranges.push_back(token_range{to_bytes(r.start()), to_bytes(r.end())});
|
||||
}
|
||||
boost::sort(local_ranges, [] (auto&& tr1, auto&& tr2) {
|
||||
return utf8_type->less(tr1.start, tr2.start);
|
||||
});
|
||||
return local_ranges;
|
||||
});
|
||||
}
|
||||
private:
|
||||
struct virtual_row {
|
||||
const bytes& cf_name;
|
||||
const token_range& tokens;
|
||||
clustering_key_prefix as_key() const {
|
||||
return clustering_key_prefix::from_exploded(std::vector<bytes_view>{cf_name, tokens.start, tokens.end});
|
||||
}
|
||||
};
|
||||
struct virtual_row_comparator {
|
||||
schema_ptr _schema;
|
||||
virtual_row_comparator(schema_ptr schema) : _schema(schema) { }
|
||||
bool operator()(const clustering_key_prefix& key1, const clustering_key_prefix& key2) {
|
||||
return clustering_key_prefix::prefix_equality_less_compare(*_schema)(key1, key2);
|
||||
}
|
||||
bool operator()(const virtual_row& row, const clustering_key_prefix& key) {
|
||||
return operator()(row.as_key(), key);
|
||||
}
|
||||
bool operator()(const clustering_key_prefix& key, const virtual_row& row) {
|
||||
return operator()(key, row.as_key());
|
||||
}
|
||||
};
|
||||
class virtual_row_iterator : public std::iterator<std::input_iterator_tag, const virtual_row> {
|
||||
std::reference_wrapper<const std::vector<bytes>> _cf_names;
|
||||
std::reference_wrapper<const std::vector<token_range>> _ranges;
|
||||
size_t _cf_names_idx = 0;
|
||||
size_t _ranges_idx = 0;
|
||||
public:
|
||||
struct end_iterator_tag {};
|
||||
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges)
|
||||
: _cf_names(std::ref(cf_names))
|
||||
, _ranges(std::ref(ranges))
|
||||
{ }
|
||||
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges, end_iterator_tag)
|
||||
: _cf_names(std::ref(cf_names))
|
||||
, _ranges(std::ref(ranges))
|
||||
, _cf_names_idx(cf_names.size())
|
||||
, _ranges_idx(ranges.size())
|
||||
{ }
|
||||
virtual_row_iterator& operator++() {
|
||||
if (++_ranges_idx == _ranges.get().size() && ++_cf_names_idx < _cf_names.get().size()) {
|
||||
_ranges_idx = 0;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
virtual_row_iterator operator++(int) {
|
||||
virtual_row_iterator i(*this);
|
||||
++(*this);
|
||||
return i;
|
||||
}
|
||||
const value_type operator*() const {
|
||||
return { _cf_names.get()[_cf_names_idx], _ranges.get()[_ranges_idx] };
|
||||
}
|
||||
bool operator==(const virtual_row_iterator& i) const {
|
||||
return _cf_names_idx == i._cf_names_idx
|
||||
&& _ranges_idx == i._ranges_idx;
|
||||
}
|
||||
bool operator!=(const virtual_row_iterator& i) const {
|
||||
return !(*this == i);
|
||||
}
|
||||
};
|
||||
future<> get_next_partition();
|
||||
|
||||
std::vector<db::system_keyspace::range_estimates>
|
||||
estimates_for_current_keyspace(const database& db, std::vector<token_range> local_ranges) const {
|
||||
auto pkey = partition_key::from_single_value(*_schema, utf8_type->decompose(*_current_partition));
|
||||
auto cfs = db.find_keyspace(*_current_partition).metadata()->cf_meta_data();
|
||||
auto cf_names = boost::copy_range<std::vector<bytes>>(cfs | boost::adaptors::transformed([] (auto&& cf) {
|
||||
return utf8_type->decompose(cf.first);
|
||||
}));
|
||||
boost::sort(cf_names, [] (auto&& n1, auto&& n2) {
|
||||
return utf8_type->less(n1, n2);
|
||||
});
|
||||
std::vector<db::system_keyspace::range_estimates> estimates;
|
||||
for (auto& range : _slice.row_ranges(*_schema, pkey)) {
|
||||
auto rows = boost::make_iterator_range(
|
||||
virtual_row_iterator(cf_names, local_ranges),
|
||||
virtual_row_iterator(cf_names, local_ranges, virtual_row_iterator::end_iterator_tag()));
|
||||
auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
|
||||
for (auto&& r : rows_to_estimate) {
|
||||
auto& cf = db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
|
||||
estimates.push_back(estimate(cf, r.tokens));
|
||||
if (estimates.size() >= _slice.partition_row_limit()) {
|
||||
return estimates;
|
||||
}
|
||||
}
|
||||
}
|
||||
return estimates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the keyspaces, ordered by name, as selected by the partition_range.
|
||||
*/
|
||||
static ks_range get_keyspaces(const schema& s, const database& db, dht::partition_range range) {
|
||||
struct keyspace_less_comparator {
|
||||
const schema& _s;
|
||||
keyspace_less_comparator(const schema& s) : _s(s) { }
|
||||
dht::ring_position as_ring_position(const sstring& ks) {
|
||||
auto pkey = partition_key::from_single_value(_s, utf8_type->decompose(ks));
|
||||
return dht::global_partitioner().decorate_key(_s, std::move(pkey));
|
||||
}
|
||||
bool operator()(const sstring& ks1, const sstring& ks2) {
|
||||
return as_ring_position(ks1).less_compare(_s, as_ring_position(ks2));
|
||||
}
|
||||
bool operator()(const sstring& ks, const dht::ring_position& rp) {
|
||||
return as_ring_position(ks).less_compare(_s, rp);
|
||||
}
|
||||
bool operator()(const dht::ring_position& rp, const sstring& ks) {
|
||||
return rp.less_compare(_s, as_ring_position(ks));
|
||||
}
|
||||
};
|
||||
auto keyspaces = db.get_non_system_keyspaces();
|
||||
auto cmp = keyspace_less_comparator(s);
|
||||
boost::sort(keyspaces, cmp);
|
||||
return boost::copy_range<ks_range>(range.slice(keyspaces, std::move(cmp)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
|
||||
*/
|
||||
static dht::partition_range as_ring_position_range(dht::token_range& r) {
|
||||
stdx::optional<range<dht::ring_position>::bound> start_bound, end_bound;
|
||||
if (r.start()) {
|
||||
start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
|
||||
}
|
||||
if (r.end()) {
|
||||
end_bound = {{ dht::ring_position(r.end()->value(), dht::ring_position::token_bound::end), r.end()->is_inclusive() }};
|
||||
}
|
||||
return dht::partition_range(std::move(start_bound), std::move(end_bound), r.is_singular());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
|
||||
*/
|
||||
static system_keyspace::range_estimates estimate(const column_family& cf, const token_range& r) {
|
||||
int64_t count{0};
|
||||
utils::estimated_histogram hist{0};
|
||||
auto from_bytes = [] (auto& b) {
|
||||
return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
|
||||
};
|
||||
dht::token_range_vector ranges;
|
||||
compat::unwrap_into(
|
||||
wrapping_range<dht::token>({{ from_bytes(r.start) }}, {{ from_bytes(r.end) }}),
|
||||
dht::token_comparator(),
|
||||
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
|
||||
for (auto&& r : ranges) {
|
||||
auto rp_range = as_ring_position_range(r);
|
||||
for (auto&& sstable : cf.select_sstables(rp_range)) {
|
||||
count += sstable->estimated_keys_for_range(r);
|
||||
hist.merge(sstable->get_stats_metadata().estimated_row_size);
|
||||
}
|
||||
}
|
||||
return {cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
|
||||
}
|
||||
estimates_for_current_keyspace(const database&, std::vector<token_range> local_ranges) const;
|
||||
};
|
||||
|
||||
struct virtual_reader {
|
||||
@@ -332,6 +69,12 @@ struct virtual_reader {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns the primary ranges for the local node.
|
||||
* Used for testing as well.
|
||||
*/
|
||||
future<std::vector<token_range>> get_local_ranges();
|
||||
|
||||
} // namespace size_estimates
|
||||
|
||||
} // namespace db
|
||||
|
||||
2
dist/ami/scylla.json
vendored
2
dist/ami/scylla.json
vendored
@@ -68,7 +68,7 @@
|
||||
"type": "shell",
|
||||
"inline": [
|
||||
"sudo yum install -y epel-release",
|
||||
"sudo yum install -y python34",
|
||||
"sudo yum install -y python36",
|
||||
"sudo /home/{{user `ssh_username`}}/scylla-ami/scylla_install_ami {{ user `install_args` }}"
|
||||
]
|
||||
}
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -33,7 +33,7 @@ RUN curl http://downloads.scylladb.com/rpm/centos/scylla-2.3.repo -o /etc/yum.re
|
||||
yum -y remove boost-thread boost-system && \
|
||||
yum -y install scylla hostname supervisor && \
|
||||
yum clean all && \
|
||||
yum -y install python34 python34-PyYAML && \
|
||||
yum -y install python36 python36-PyYAML && \
|
||||
cat /scylla_bashrc >> /etc/bashrc && \
|
||||
mkdir -p /etc/supervisor.conf.d && \
|
||||
mkdir -p /var/log/scylla && \
|
||||
|
||||
6
dist/redhat/scylla.spec.mustache
vendored
6
dist/redhat/scylla.spec.mustache
vendored
@@ -56,9 +56,9 @@ License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler systemtap-sdt-devel ninja-build cmake python ragel grep kernel-headers
|
||||
%{?fedora:BuildRequires: boost-devel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum python2-pystache}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++73-static scylla-boost163-devel scylla-boost163-static scylla-antlr35-tool scylla-antlr35-C++-devel python34 scylla-gcc73-c++, scylla-python34-pyparsing20 yaml-cpp-static pystache python-setuptools}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++73-static scylla-boost163-devel scylla-boost163-static scylla-antlr35-tool scylla-antlr35-C++-devel python36 scylla-gcc73-c++, scylla-python36-pyparsing20 yaml-cpp-static pystache python-setuptools}
|
||||
Requires: scylla-conf systemd-libs hwloc PyYAML python-urwid pciutils pyparsing python-requests curl util-linux python-setuptools pciutils python3-pyudev mdadm xfsprogs
|
||||
%{?rhel:Requires: python34 python34-PyYAML kernel >= 3.10.0-514}
|
||||
%{?rhel:Requires: python36 python36-PyYAML kernel >= 3.10.0-514}
|
||||
%{?fedora:Requires: python3 python3-PyYAML}
|
||||
Conflicts: abrt
|
||||
%ifarch x86_64
|
||||
@@ -97,7 +97,7 @@ cflags="--cflags=${defines[*]}"
|
||||
%endif
|
||||
%if 0%{?rhel}
|
||||
. /etc/profile.d/scylla.sh
|
||||
python3.4 ./configure.py %{?configure_opt} --with=scylla --with=iotune --mode=release "$cflags" --static-boost --static-yaml-cpp --compiler=/opt/scylladb/bin/g++-7.3 --python python3.4 --ldflag=-Wl,-rpath=/opt/scylladb/lib64
|
||||
python3.6 ./configure.py %{?configure_opt} --with=scylla --with=iotune --mode=release "$cflags" --static-boost --static-yaml-cpp --compiler=/opt/scylladb/bin/g++-7.3 --python python3.6 --ldflag=-Wl,-rpath=/opt/scylladb/lib64
|
||||
%endif
|
||||
ninja-build %{?_smp_mflags} build/release/scylla build/release/iotune
|
||||
|
||||
|
||||
@@ -449,9 +449,13 @@ GCC6_CONCEPT(requires requires(StopCondition stop, ConsumeMutationFragment consu
|
||||
{ consume_mf(std::move(mf)) } -> void;
|
||||
{ consume_eos() } -> future<>;
|
||||
})
|
||||
future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition&& stop,
|
||||
ConsumeMutationFragment&& consume_mf, ConsumeEndOfStream&& consume_eos) {
|
||||
return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos] {
|
||||
future<> consume_mutation_fragments_until(
|
||||
flat_mutation_reader& r,
|
||||
StopCondition&& stop,
|
||||
ConsumeMutationFragment&& consume_mf,
|
||||
ConsumeEndOfStream&& consume_eos,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos, timeout] {
|
||||
while (!r.is_buffer_empty()) {
|
||||
consume_mf(r.pop_mutation_fragment());
|
||||
if (stop()) {
|
||||
@@ -461,7 +465,7 @@ future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition
|
||||
if (r.is_end_of_stream()) {
|
||||
return consume_eos();
|
||||
}
|
||||
return r.fill_buffer();
|
||||
return r.fill_buffer(timeout);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -42,5 +42,5 @@ elif [ "$ID" = "fedora" ]; then
|
||||
yum install -y yaml-cpp-devel thrift-devel antlr3-tool antlr3-C++-devel jsoncpp-devel snappy-devel
|
||||
elif [ "$ID" = "centos" ]; then
|
||||
yum install -y yaml-cpp-devel thrift-devel scylla-antlr35-tool scylla-antlr35-C++-devel jsoncpp-devel snappy-devel scylla-boost163-static scylla-python34-pyparsing20 systemd-devel
|
||||
echo -e "Configure example:\n\tpython3.4 ./configure.py --enable-dpdk --mode=release --static-boost --compiler=/opt/scylladb/bin/g++-7.3 --python python3.4 --ldflag=-Wl,-rpath=/opt/scylladb/lib64 --cflags=-I/opt/scylladb/include --with-antlr3=/opt/scylladb/bin/antlr3"
|
||||
echo -e "Configure example:\n\tpython3.6 ./configure.py --enable-dpdk --mode=release --static-boost --compiler=/opt/scylladb/bin/g++-7.3 --python python3.6 --ldflag=-Wl,-rpath=/opt/scylladb/lib64 --cflags=-I/opt/scylladb/include --with-antlr3=/opt/scylladb/bin/antlr3"
|
||||
fi
|
||||
|
||||
7
main.cc
7
main.cc
@@ -763,8 +763,11 @@ int main(int ac, char** av) {
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
});
|
||||
|
||||
engine().at_exit([] {
|
||||
return view_builder.stop();
|
||||
engine().at_exit([cfg] {
|
||||
if (cfg->view_building()) {
|
||||
return view_builder.stop();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
engine().at_exit([&db] {
|
||||
|
||||
@@ -144,7 +144,14 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
|
||||
, _static_row(s, column_kind::static_column, x._static_row)
|
||||
, _static_row_continuous(x._static_row_continuous)
|
||||
, _rows()
|
||||
, _row_tombstones(x._row_tombstones) {
|
||||
, _row_tombstones(x._row_tombstones)
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s.version())
|
||||
#endif
|
||||
{
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(x._schema_version == _schema_version);
|
||||
#endif
|
||||
auto cloner = [&s] (const auto& x) {
|
||||
return current_allocator().construct<rows_entry>(s, x);
|
||||
};
|
||||
@@ -157,7 +164,14 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
|
||||
, _static_row(schema, column_kind::static_column, x._static_row)
|
||||
, _static_row_continuous(x._static_row_continuous)
|
||||
, _rows()
|
||||
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only()) {
|
||||
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only())
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(schema.version())
|
||||
#endif
|
||||
{
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(x._schema_version == _schema_version);
|
||||
#endif
|
||||
try {
|
||||
for(auto&& r : ck_ranges) {
|
||||
for (const rows_entry& e : x.range(schema, r)) {
|
||||
@@ -180,7 +194,13 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
|
||||
, _static_row_continuous(x._static_row_continuous)
|
||||
, _rows(std::move(x._rows))
|
||||
, _row_tombstones(std::move(x._row_tombstones))
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(schema.version())
|
||||
#endif
|
||||
{
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(x._schema_version == _schema_version);
|
||||
#endif
|
||||
{
|
||||
auto deleter = current_deleter<rows_entry>();
|
||||
auto it = _rows.begin();
|
||||
@@ -220,6 +240,7 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
|
||||
}
|
||||
|
||||
void mutation_partition::ensure_last_dummy(const schema& s) {
|
||||
check_schema(s);
|
||||
if (_rows.empty() || !_rows.rbegin()->is_last_dummy()) {
|
||||
_rows.insert_before(_rows.end(),
|
||||
*current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::yes));
|
||||
@@ -276,11 +297,16 @@ void deletable_row::apply(const schema& s, clustering_row cr) {
|
||||
|
||||
void
|
||||
mutation_partition::apply(const schema& s, const mutation_fragment& mf) {
|
||||
check_schema(s);
|
||||
mutation_fragment_applier applier{s, *this};
|
||||
mf.visit(applier);
|
||||
}
|
||||
|
||||
void mutation_partition::apply_monotonically(const schema& s, mutation_partition&& p, cache_tracker* tracker) {
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(s.version() == _schema_version);
|
||||
assert(p._schema_version == _schema_version);
|
||||
#endif
|
||||
_tombstone.apply(p._tombstone);
|
||||
_row_tombstones.apply_monotonically(s, std::move(p._row_tombstones));
|
||||
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
|
||||
@@ -356,6 +382,7 @@ void mutation_partition::apply_weak(const schema& s, mutation_partition&& p) {
|
||||
|
||||
tombstone
|
||||
mutation_partition::range_tombstone_for_row(const schema& schema, const clustering_key& key) const {
|
||||
check_schema(schema);
|
||||
tombstone t = _tombstone;
|
||||
if (!_row_tombstones.empty()) {
|
||||
auto found = _row_tombstones.search_tombstone_covering(schema, key);
|
||||
@@ -366,6 +393,7 @@ mutation_partition::range_tombstone_for_row(const schema& schema, const clusteri
|
||||
|
||||
row_tombstone
|
||||
mutation_partition::tombstone_for_row(const schema& schema, const clustering_key& key) const {
|
||||
check_schema(schema);
|
||||
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
|
||||
|
||||
auto j = _rows.find(key, rows_entry::compare(schema));
|
||||
@@ -378,6 +406,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
|
||||
|
||||
row_tombstone
|
||||
mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e) const {
|
||||
check_schema(schema);
|
||||
row_tombstone t = e.row().deleted_at();
|
||||
t.apply(range_tombstone_for_row(schema, e.key()));
|
||||
return t;
|
||||
@@ -385,6 +414,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e)
|
||||
|
||||
void
|
||||
mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t) {
|
||||
check_schema(schema);
|
||||
assert(!prefix.is_full(schema));
|
||||
auto start = prefix;
|
||||
_row_tombstones.apply(schema, {std::move(start), std::move(prefix), std::move(t)});
|
||||
@@ -392,11 +422,13 @@ mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_pre
|
||||
|
||||
void
|
||||
mutation_partition::apply_row_tombstone(const schema& schema, range_tombstone rt) {
|
||||
check_schema(schema);
|
||||
_row_tombstones.apply(schema, std::move(rt));
|
||||
}
|
||||
|
||||
void
|
||||
mutation_partition::apply_delete(const schema& schema, const clustering_key_prefix& prefix, tombstone t) {
|
||||
check_schema(schema);
|
||||
if (prefix.is_empty(schema)) {
|
||||
apply(t);
|
||||
} else if (prefix.is_full(schema)) {
|
||||
@@ -408,6 +440,7 @@ mutation_partition::apply_delete(const schema& schema, const clustering_key_pref
|
||||
|
||||
void
|
||||
mutation_partition::apply_delete(const schema& schema, range_tombstone rt) {
|
||||
check_schema(schema);
|
||||
if (range_tombstone::is_single_clustering_row_tombstone(schema, rt.start, rt.start_kind, rt.end, rt.end_kind)) {
|
||||
apply_delete(schema, std::move(rt.start), std::move(rt.tomb));
|
||||
return;
|
||||
@@ -417,6 +450,7 @@ mutation_partition::apply_delete(const schema& schema, range_tombstone rt) {
|
||||
|
||||
void
|
||||
mutation_partition::apply_delete(const schema& schema, clustering_key&& prefix, tombstone t) {
|
||||
check_schema(schema);
|
||||
if (prefix.is_empty(schema)) {
|
||||
apply(t);
|
||||
} else if (prefix.is_full(schema)) {
|
||||
@@ -428,6 +462,7 @@ mutation_partition::apply_delete(const schema& schema, clustering_key&& prefix,
|
||||
|
||||
void
|
||||
mutation_partition::apply_delete(const schema& schema, clustering_key_prefix_view prefix, tombstone t) {
|
||||
check_schema(schema);
|
||||
if (prefix.is_empty(schema)) {
|
||||
apply(t);
|
||||
} else if (prefix.is_full(schema)) {
|
||||
@@ -451,12 +486,14 @@ void mutation_partition::insert_row(const schema& s, const clustering_key& key,
|
||||
}
|
||||
|
||||
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
|
||||
check_schema(s);
|
||||
auto e = current_allocator().construct<rows_entry>(s, key, row);
|
||||
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
|
||||
}
|
||||
|
||||
const row*
|
||||
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
if (i == _rows.end()) {
|
||||
return nullptr;
|
||||
@@ -466,6 +503,7 @@ mutation_partition::find_row(const schema& s, const clustering_key& key) const {
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = current_allocator().construct<rows_entry>(std::move(key));
|
||||
@@ -477,6 +515,7 @@ mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = current_allocator().construct<rows_entry>(key);
|
||||
@@ -488,6 +527,7 @@ mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = current_allocator().construct<rows_entry>(key);
|
||||
@@ -499,6 +539,7 @@ mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(pos, rows_entry::compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = current_allocator().construct<rows_entry>(s, pos, dummy, continuous);
|
||||
@@ -510,6 +551,7 @@ mutation_partition::clustered_row(const schema& s, position_in_partition_view po
|
||||
|
||||
mutation_partition::rows_type::const_iterator
|
||||
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
|
||||
check_schema(schema);
|
||||
if (!r.start()) {
|
||||
return std::cbegin(_rows);
|
||||
}
|
||||
@@ -518,6 +560,7 @@ mutation_partition::lower_bound(const schema& schema, const query::clustering_ra
|
||||
|
||||
mutation_partition::rows_type::const_iterator
|
||||
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) const {
|
||||
check_schema(schema);
|
||||
if (!r.end()) {
|
||||
return std::cend(_rows);
|
||||
}
|
||||
@@ -526,6 +569,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
|
||||
|
||||
boost::iterator_range<mutation_partition::rows_type::const_iterator>
|
||||
mutation_partition::range(const schema& schema, const query::clustering_range& r) const {
|
||||
check_schema(schema);
|
||||
return boost::make_iterator_range(lower_bound(schema, r), upper_bound(schema, r));
|
||||
}
|
||||
|
||||
@@ -562,6 +606,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
|
||||
template<typename Func>
|
||||
void mutation_partition::for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const
|
||||
{
|
||||
check_schema(schema);
|
||||
auto r = range(schema, row_range);
|
||||
if (!reversed) {
|
||||
for (const auto& e : r) {
|
||||
@@ -778,6 +823,7 @@ bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tomb
|
||||
|
||||
void
|
||||
mutation_partition::query_compacted(query::result::partition_writer& pw, const schema& s, uint32_t limit) const {
|
||||
check_schema(s);
|
||||
const query::partition_slice& slice = pw.slice();
|
||||
max_timestamp max_ts{pw.last_modified()};
|
||||
|
||||
@@ -996,6 +1042,10 @@ bool mutation_partition::equal(const schema& s, const mutation_partition& p) con
|
||||
}
|
||||
|
||||
bool mutation_partition::equal(const schema& this_schema, const mutation_partition& p, const schema& p_schema) const {
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(_schema_version == this_schema.version());
|
||||
assert(p._schema_version == p_schema.version());
|
||||
#endif
|
||||
if (_tombstone != p._tombstone) {
|
||||
return false;
|
||||
}
|
||||
@@ -1124,6 +1174,7 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
|
||||
void
|
||||
row::append_cell(column_id id, atomic_cell_or_collection value) {
|
||||
if (_type == storage_type::vector && id < max_vector_size) {
|
||||
assert(_storage.vector.v.size() <= id);
|
||||
_storage.vector.v.resize(id);
|
||||
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
|
||||
_storage.vector.present.set(id);
|
||||
@@ -1188,6 +1239,7 @@ size_t rows_entry::memory_usage(const schema& s) const {
|
||||
}
|
||||
|
||||
size_t mutation_partition::external_memory_usage(const schema& s) const {
|
||||
check_schema(s);
|
||||
size_t sum = 0;
|
||||
sum += static_row().external_memory_usage(s, column_kind::static_column);
|
||||
for (auto& clr : clustered_rows()) {
|
||||
@@ -1206,6 +1258,7 @@ void mutation_partition::trim_rows(const schema& s,
|
||||
const std::vector<query::clustering_range>& row_ranges,
|
||||
Func&& func)
|
||||
{
|
||||
check_schema(s);
|
||||
static_assert(std::is_same<stop_iteration, std::result_of_t<Func(rows_entry&)>>::value, "Bad func signature");
|
||||
|
||||
stop_iteration stop = stop_iteration::no;
|
||||
@@ -1250,6 +1303,7 @@ uint32_t mutation_partition::do_compact(const schema& s,
|
||||
uint32_t row_limit,
|
||||
can_gc_fn& can_gc)
|
||||
{
|
||||
check_schema(s);
|
||||
assert(row_limit > 0);
|
||||
|
||||
auto gc_before = saturating_subtract(query_time, s.gc_grace_seconds());
|
||||
@@ -1315,12 +1369,14 @@ mutation_partition::compact_for_query(
|
||||
bool reverse,
|
||||
uint32_t row_limit)
|
||||
{
|
||||
check_schema(s);
|
||||
return do_compact(s, query_time, row_ranges, reverse, row_limit, always_gc);
|
||||
}
|
||||
|
||||
void mutation_partition::compact_for_compaction(const schema& s,
|
||||
can_gc_fn& can_gc, gc_clock::time_point compaction_time)
|
||||
{
|
||||
check_schema(s);
|
||||
static const std::vector<query::clustering_range> all_rows = {
|
||||
query::clustering_range::make_open_ended_both_sides()
|
||||
};
|
||||
@@ -1354,11 +1410,13 @@ row::is_live(const schema& s, column_kind kind, tombstone base_tombstone, gc_clo
|
||||
|
||||
bool
|
||||
mutation_partition::is_static_row_live(const schema& s, gc_clock::time_point query_time) const {
|
||||
check_schema(s);
|
||||
return has_any_live_data(s, column_kind::static_column, static_row(), _tombstone, query_time);
|
||||
}
|
||||
|
||||
size_t
|
||||
mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_time) const {
|
||||
check_schema(s);
|
||||
size_t count = 0;
|
||||
|
||||
for (const rows_entry& e : non_dummy_rows()) {
|
||||
@@ -1704,6 +1762,7 @@ row row::difference(const schema& s, column_kind kind, const row& other) const
|
||||
|
||||
mutation_partition mutation_partition::difference(schema_ptr s, const mutation_partition& other) const
|
||||
{
|
||||
check_schema(*s);
|
||||
mutation_partition mp(s);
|
||||
if (_tombstone > other._tombstone) {
|
||||
mp.apply(_tombstone);
|
||||
@@ -1734,6 +1793,7 @@ mutation_partition mutation_partition::difference(schema_ptr s, const mutation_p
|
||||
}
|
||||
|
||||
void mutation_partition::accept(const schema& s, mutation_partition_visitor& v) const {
|
||||
check_schema(s);
|
||||
v.accept_partition_tombstone(_tombstone);
|
||||
_static_row.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
|
||||
const column_definition& def = s.static_column_at(id);
|
||||
@@ -2167,6 +2227,9 @@ mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const
|
||||
, _static_row_continuous(!s.has_static_columns())
|
||||
, _rows()
|
||||
, _row_tombstones(s)
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s.version())
|
||||
#endif
|
||||
{
|
||||
_rows.insert_before(_rows.end(),
|
||||
*current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
|
||||
@@ -2198,6 +2261,7 @@ void mutation_partition::make_fully_continuous() {
|
||||
}
|
||||
|
||||
clustering_interval_set mutation_partition::get_continuity(const schema& s, is_continuous cont) const {
|
||||
check_schema(s);
|
||||
clustering_interval_set result;
|
||||
auto i = _rows.begin();
|
||||
auto prev_pos = position_in_partition::before_all_clustered_rows();
|
||||
@@ -2247,6 +2311,7 @@ stop_iteration mutation_partition::clear_gently(cache_tracker* tracker) noexcept
|
||||
|
||||
bool
|
||||
mutation_partition::check_continuity(const schema& s, const position_range& r, is_continuous cont) const {
|
||||
check_schema(s);
|
||||
auto less = rows_entry::compare(s);
|
||||
auto i = _rows.lower_bound(r.start(), less);
|
||||
auto end = _rows.lower_bound(r.end(), less);
|
||||
|
||||
@@ -905,6 +905,9 @@ private:
|
||||
// Contains only strict prefixes so that we don't have to lookup full keys
|
||||
// in both _row_tombstones and _rows.
|
||||
range_tombstone_list _row_tombstones;
|
||||
#ifdef SEASTAR_DEBUG
|
||||
table_schema_version _schema_version;
|
||||
#endif
|
||||
|
||||
friend class mutation_partition_applier;
|
||||
friend class converting_mutation_partition_applier;
|
||||
@@ -919,10 +922,16 @@ public:
|
||||
mutation_partition(schema_ptr s)
|
||||
: _rows()
|
||||
, _row_tombstones(*s)
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s->version())
|
||||
#endif
|
||||
{ }
|
||||
mutation_partition(mutation_partition& other, copy_comparators_only)
|
||||
: _rows()
|
||||
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(other._schema_version)
|
||||
#endif
|
||||
{ }
|
||||
mutation_partition(mutation_partition&&) = default;
|
||||
mutation_partition(const schema& s, const mutation_partition&);
|
||||
@@ -1122,6 +1131,12 @@ private:
|
||||
template<typename Func>
|
||||
void for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const;
|
||||
friend class counter_write_query_result_builder;
|
||||
|
||||
void check_schema(const schema& s) const {
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(s.version() == _schema_version);
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
inline
|
||||
|
||||
@@ -312,7 +312,7 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
|
||||
|
||||
void partition_entry::apply(const schema& s, const mutation_partition& mp, const schema& mp_schema)
|
||||
{
|
||||
apply(s, mutation_partition(s, mp), mp_schema);
|
||||
apply(s, mutation_partition(mp_schema, mp), mp_schema);
|
||||
}
|
||||
|
||||
void partition_entry::apply(const schema& s, mutation_partition&& mp, const schema& mp_schema)
|
||||
|
||||
@@ -38,7 +38,7 @@ class autoupdating_underlying_reader final {
|
||||
row_cache& _cache;
|
||||
read_context& _read_context;
|
||||
stdx::optional<flat_mutation_reader> _reader;
|
||||
utils::phased_barrier::phase_type _reader_creation_phase;
|
||||
utils::phased_barrier::phase_type _reader_creation_phase = 0;
|
||||
dht::partition_range _range = { };
|
||||
stdx::optional<dht::decorated_key> _last_key;
|
||||
stdx::optional<dht::decorated_key> _new_last_key;
|
||||
@@ -105,7 +105,6 @@ public:
|
||||
return make_ready_future<>();
|
||||
}
|
||||
utils::phased_barrier::phase_type creation_phase() const {
|
||||
assert(_reader);
|
||||
return _reader_creation_phase;
|
||||
}
|
||||
const dht::partition_range& range() const {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 10ac122dd6...efda4281c2
@@ -179,6 +179,8 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
|
||||
void remove_sstable(bool is_tracking) {
|
||||
if (is_tracking) {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
|
||||
} else if (_sst) {
|
||||
_cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
|
||||
}
|
||||
_sst = {};
|
||||
}
|
||||
@@ -303,6 +305,7 @@ public:
|
||||
class compaction {
|
||||
protected:
|
||||
column_family& _cf;
|
||||
schema_ptr _schema;
|
||||
std::vector<shared_sstable> _sstables;
|
||||
uint64_t _max_sstable_size;
|
||||
uint32_t _sstable_level;
|
||||
@@ -313,6 +316,7 @@ protected:
|
||||
protected:
|
||||
compaction(column_family& cf, std::vector<shared_sstable> sstables, uint64_t max_sstable_size, uint32_t sstable_level)
|
||||
: _cf(cf)
|
||||
, _schema(cf.schema())
|
||||
, _sstables(std::move(sstables))
|
||||
, _max_sstable_size(max_sstable_size)
|
||||
, _sstable_level(sstable_level)
|
||||
@@ -361,10 +365,9 @@ private:
|
||||
virtual flat_mutation_reader make_sstable_reader(lw_shared_ptr<sstables::sstable_set> ssts) const = 0;
|
||||
|
||||
flat_mutation_reader setup() {
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(_cf.get_compaction_strategy().make_sstable_set(_cf.schema()));
|
||||
auto schema = _cf.schema();
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(_cf.get_compaction_strategy().make_sstable_set(_schema));
|
||||
sstring formatted_msg = "[";
|
||||
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - schema->gc_grace_seconds());
|
||||
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds());
|
||||
|
||||
for (auto& sst : _sstables) {
|
||||
// Compacted sstable keeps track of its ancestors.
|
||||
@@ -396,8 +399,8 @@ private:
|
||||
}
|
||||
formatted_msg += "]";
|
||||
_info->sstables = _sstables.size();
|
||||
_info->ks = schema->ks_name();
|
||||
_info->cf = schema->cf_name();
|
||||
_info->ks = _schema->ks_name();
|
||||
_info->cf = _schema->cf_name();
|
||||
report_start(formatted_msg);
|
||||
|
||||
return make_sstable_reader(std::move(ssts));
|
||||
@@ -462,7 +465,7 @@ private:
|
||||
}
|
||||
|
||||
const schema_ptr& schema() const {
|
||||
return _cf.schema();
|
||||
return _schema;
|
||||
}
|
||||
public:
|
||||
static future<compaction_info> run(std::unique_ptr<compaction> c);
|
||||
@@ -518,10 +521,10 @@ public:
|
||||
}
|
||||
|
||||
flat_mutation_reader make_sstable_reader(lw_shared_ptr<sstables::sstable_set> ssts) const override {
|
||||
return ::make_local_shard_sstable_reader(_cf.schema(),
|
||||
return ::make_local_shard_sstable_reader(_schema,
|
||||
std::move(ssts),
|
||||
query::full_partition_range,
|
||||
_cf.schema()->full_slice(),
|
||||
_schema->full_slice(),
|
||||
service::get_local_compaction_priority(),
|
||||
no_resource_tracking(),
|
||||
nullptr,
|
||||
@@ -570,7 +573,7 @@ public:
|
||||
cfg.monitor = &_active_write_monitors.back();
|
||||
cfg.large_partition_handler = _cf.get_large_partition_handler();
|
||||
// TODO: calculate encoding_stats based on statistics of compacted sstables
|
||||
_writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, encoding_stats{}, priority));
|
||||
_writer.emplace(_sst->get_writer(*_schema, partitions_per_sstable(), cfg, encoding_stats{}, priority));
|
||||
}
|
||||
return &*_writer;
|
||||
}
|
||||
@@ -610,7 +613,7 @@ public:
|
||||
}
|
||||
|
||||
std::function<bool(const dht::decorated_key&)> filter_func() const override {
|
||||
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_cf.schema()->ks_name());
|
||||
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
|
||||
|
||||
return [this, owned_ranges = std::move(owned_ranges)] (const dht::decorated_key& dk) {
|
||||
if (dht::shard_of(dk.token()) != engine().cpu_id()) {
|
||||
@@ -684,10 +687,10 @@ public:
|
||||
|
||||
// Use reader that makes sure no non-local mutation will not be filtered out.
|
||||
flat_mutation_reader make_sstable_reader(lw_shared_ptr<sstables::sstable_set> ssts) const override {
|
||||
return ::make_range_sstable_reader(_cf.schema(),
|
||||
return ::make_range_sstable_reader(_schema,
|
||||
std::move(ssts),
|
||||
query::full_partition_range,
|
||||
_cf.schema()->full_slice(),
|
||||
_schema->full_slice(),
|
||||
service::get_local_compaction_priority(),
|
||||
no_resource_tracking(),
|
||||
nullptr,
|
||||
@@ -719,7 +722,7 @@ public:
|
||||
cfg.large_partition_handler = _cf.get_large_partition_handler();
|
||||
auto&& priority = service::get_local_compaction_priority();
|
||||
// TODO: calculate encoding_stats based on statistics of compacted sstables
|
||||
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, encoding_stats{}, priority, _shard));
|
||||
writer.emplace(sst->get_writer(*_schema, partitions_per_sstable(_shard), cfg, encoding_stats{}, priority, _shard));
|
||||
}
|
||||
return &*writer;
|
||||
}
|
||||
|
||||
@@ -66,6 +66,14 @@ public:
|
||||
_cm->deregister_compacting_sstables(_compacting);
|
||||
}
|
||||
}
|
||||
|
||||
// Explicitly release compacting sstables
|
||||
void release_compacting(const std::vector<sstables::shared_sstable>& sstables) {
|
||||
_cm->deregister_compacting_sstables(sstables);
|
||||
for (auto& sst : sstables) {
|
||||
_compacting.erase(boost::remove(_compacting, sst), _compacting.end());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
compaction_weight_registration::compaction_weight_registration(compaction_manager* cm, int weight)
|
||||
@@ -564,17 +572,23 @@ future<> compaction_manager::perform_cleanup(column_family* cf) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
column_family& cf = *task->compacting_cf;
|
||||
sstables::compaction_descriptor descriptor = sstables::compaction_descriptor(get_candidates(cf));
|
||||
auto compacting = compacting_sstable_registration(this, descriptor.sstables);
|
||||
auto sstables = get_candidates(cf);
|
||||
auto compacting = make_lw_shared<compacting_sstable_registration>(this, sstables);
|
||||
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
|
||||
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
|
||||
return cf.cleanup_sstables(std::move(descriptor));
|
||||
return do_with(std::move(user_initiated), std::move(sstables), [this, &cf, compacting] (compaction_backlog_tracker& bt,
|
||||
std::vector<sstables::shared_sstable>& sstables) mutable {
|
||||
return with_scheduling_group(_scheduling_group, [this, &cf, &sstables, compacting] () mutable {
|
||||
return do_for_each(sstables, [this, &cf, compacting] (auto& sst) {
|
||||
return cf.cleanup_sstables(sstables::compaction_descriptor({sst})).then([&sst, compacting] {
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
compacting->release_compacting({std::move(sst)});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
|
||||
}).then_wrapped([this, task, compacting] (future<> f) mutable {
|
||||
_stats.active_tasks--;
|
||||
if (!can_proceed(task)) {
|
||||
maybe_stop_on_error(std::move(f));
|
||||
|
||||
@@ -404,11 +404,6 @@ public:
|
||||
auto itw = writes_per_window.find(bound);
|
||||
if (itw != writes_per_window.end()) {
|
||||
ow_this_window = &itw->second;
|
||||
// We will erase here so we can keep track of which
|
||||
// writes belong to existing windows. Writes that don't belong to any window
|
||||
// are writes in progress to new windows and will be accounted in the final
|
||||
// loop before we return
|
||||
writes_per_window.erase(itw);
|
||||
}
|
||||
auto* oc_this_window = &no_oc;
|
||||
auto itc = compactions_per_window.find(bound);
|
||||
@@ -416,6 +411,13 @@ public:
|
||||
oc_this_window = &itc->second;
|
||||
}
|
||||
b += windows.second.backlog(*ow_this_window, *oc_this_window);
|
||||
if (itw != writes_per_window.end()) {
|
||||
// We will erase here so we can keep track of which
|
||||
// writes belong to existing windows. Writes that don't belong to any window
|
||||
// are writes in progress to new windows and will be accounted in the final
|
||||
// loop before we return
|
||||
writes_per_window.erase(itw);
|
||||
}
|
||||
}
|
||||
|
||||
// Partial writes that don't belong to any window are accounted here.
|
||||
|
||||
@@ -695,9 +695,12 @@ public:
|
||||
// Sets streamed_mutation::_end_of_range when there are no more fragments for the query range.
|
||||
// Returns information whether the parser should continue to parse more
|
||||
// input and produce more fragments or we have collected enough and should yield.
|
||||
// Returns proceed:yes only when all pending fragments have been pushed.
|
||||
proceed push_ready_fragments() {
|
||||
if (_ready) {
|
||||
return push_ready_fragments_with_ready_set();
|
||||
if (push_ready_fragments_with_ready_set() == proceed::no) {
|
||||
return proceed::no;
|
||||
}
|
||||
}
|
||||
|
||||
if (_out_of_range) {
|
||||
|
||||
@@ -292,7 +292,7 @@ void stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state)
|
||||
}
|
||||
|
||||
void stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
|
||||
if (has_peer(endpoint) && ep_state.is_shutdown()) {
|
||||
if (has_peer(endpoint)) {
|
||||
sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint);
|
||||
get_stream_manager().invoke_on_all([endpoint] (auto& sm) {
|
||||
sm.fail_sessions(endpoint);
|
||||
|
||||
@@ -509,8 +509,7 @@ void stream_session::close_session(stream_session_state final_state) {
|
||||
_stream_result->handle_session_complete(shared_from_this());
|
||||
}
|
||||
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}, cancel keep_alive timer", plan_id(), this, final_state);
|
||||
_keep_alive.cancel();
|
||||
sslog.debug("[Stream #{}] close_session session={}, state={}", plan_id(), this, final_state);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -537,41 +536,6 @@ bool stream_session::is_initialized() const {
|
||||
|
||||
void stream_session::init(shared_ptr<stream_result_future> stream_result_) {
|
||||
_stream_result = stream_result_;
|
||||
_keep_alive.set_callback([this] {
|
||||
auto plan_id = this->plan_id();
|
||||
auto peer = this->peer;
|
||||
get_local_stream_manager().get_progress_on_all_shards(plan_id, peer).then([this, peer, plan_id] (stream_bytes sbytes) {
|
||||
if (this->_is_aborted) {
|
||||
sslog.info("[Stream #{}] The session {} is closed, keep alive timer will do nothing", plan_id, this);
|
||||
return;
|
||||
}
|
||||
auto now = lowres_clock::now();
|
||||
sslog.debug("[Stream #{}] keep alive timer callback sbytes old: tx={}, rx={} new: tx={} rx={}",
|
||||
plan_id, this->_last_stream_bytes.bytes_sent, this->_last_stream_bytes.bytes_received,
|
||||
sbytes.bytes_sent, sbytes.bytes_received);
|
||||
if (sbytes.bytes_sent > this->_last_stream_bytes.bytes_sent ||
|
||||
sbytes.bytes_received > this->_last_stream_bytes.bytes_received) {
|
||||
sslog.debug("[Stream #{}] The session {} made progress with peer {}", plan_id, this, peer);
|
||||
// Progress has been made
|
||||
this->_last_stream_bytes = sbytes;
|
||||
this->_last_stream_progress = now;
|
||||
this->start_keep_alive_timer();
|
||||
} else if (now - this->_last_stream_progress >= this->_keep_alive_timeout) {
|
||||
// Timeout
|
||||
sslog.info("[Stream #{}] The session {} is idle for {} seconds, the peer {} is probably gone, close it",
|
||||
plan_id, this, this->_keep_alive_timeout.count(), peer);
|
||||
this->on_error();
|
||||
} else {
|
||||
// Start the timer to check again
|
||||
sslog.info("[Stream #{}] The session {} made no progress with peer {}", plan_id, this, peer);
|
||||
this->start_keep_alive_timer();
|
||||
}
|
||||
}).handle_exception([plan_id, peer, session = this->shared_from_this()] (auto ep) {
|
||||
sslog.info("[Stream #{}] keep alive timer callback fails with peer {}: {}", plan_id, peer, ep);
|
||||
});
|
||||
});
|
||||
_last_stream_progress = lowres_clock::now();
|
||||
start_keep_alive_timer();
|
||||
}
|
||||
|
||||
utils::UUID stream_session::plan_id() {
|
||||
|
||||
@@ -175,20 +175,8 @@ private:
|
||||
bool _complete_sent = false;
|
||||
bool _received_failed_complete_message = false;
|
||||
|
||||
// If the session is idle for 10 minutes, close the session
|
||||
std::chrono::seconds _keep_alive_timeout{60 * 10};
|
||||
// Check every 1 minutes
|
||||
std::chrono::seconds _keep_alive_interval{60};
|
||||
timer<lowres_clock> _keep_alive;
|
||||
stream_bytes _last_stream_bytes;
|
||||
lowres_clock::time_point _last_stream_progress;
|
||||
|
||||
session_info _session_info;
|
||||
public:
|
||||
void start_keep_alive_timer() {
|
||||
_keep_alive.rearm(lowres_clock::now() + _keep_alive_interval);
|
||||
}
|
||||
|
||||
void add_bytes_sent(int64_t bytes) {
|
||||
_bytes_sent += bytes;
|
||||
}
|
||||
|
||||
@@ -161,7 +161,6 @@ future<> stream_transfer_task::execute() {
|
||||
});
|
||||
}).then([this, id, plan_id, cf_id] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr);
|
||||
session->start_keep_alive_timer();
|
||||
}).handle_exception([this, plan_id, id] (auto ep){
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
|
||||
@@ -1171,7 +1171,7 @@ static mutation_sets generate_mutation_sets() {
|
||||
auto tomb = new_tombstone();
|
||||
m1.partition().apply_delete(*s1, ck2, tomb);
|
||||
result.unequal.emplace_back(mutations{m1, m2});
|
||||
m2.partition().apply_delete(*s1, ck2, tomb);
|
||||
m2.partition().apply_delete(*s2, ck2, tomb);
|
||||
result.equal.emplace_back(mutations{m1, m2});
|
||||
}
|
||||
|
||||
@@ -1180,7 +1180,7 @@ static mutation_sets generate_mutation_sets() {
|
||||
auto key = clustering_key_prefix::from_deeply_exploded(*s1, {data_value(bytes("ck2_0"))});
|
||||
m1.partition().apply_row_tombstone(*s1, key, tomb);
|
||||
result.unequal.emplace_back(mutations{m1, m2});
|
||||
m2.partition().apply_row_tombstone(*s1, key, tomb);
|
||||
m2.partition().apply_row_tombstone(*s2, key, tomb);
|
||||
result.equal.emplace_back(mutations{m1, m2});
|
||||
}
|
||||
|
||||
@@ -1204,7 +1204,7 @@ static mutation_sets generate_mutation_sets() {
|
||||
auto ts = new_timestamp();
|
||||
m1.partition().apply_insert(*s1, ck2, ts);
|
||||
result.unequal.emplace_back(mutations{m1, m2});
|
||||
m2.partition().apply_insert(*s1, ck2, ts);
|
||||
m2.partition().apply_insert(*s2, ck2, ts);
|
||||
result.equal.emplace_back(mutations{m1, m2});
|
||||
}
|
||||
|
||||
|
||||
@@ -620,6 +620,66 @@ SEASTAR_TEST_CASE(test_single_key_queries_after_population_in_reverse_order) {
|
||||
});
|
||||
}
|
||||
|
||||
// Reproducer for https://github.com/scylladb/scylla/issues/4236
|
||||
SEASTAR_TEST_CASE(test_partition_range_population_with_concurrent_memtable_flushes) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
|
||||
std::vector<mutation> mutations = make_ring(s, 3);
|
||||
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
for (auto&& m : mutations) {
|
||||
mt->apply(m);
|
||||
}
|
||||
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, snapshot_source_from_snapshot(mt->as_data_source()), tracker);
|
||||
|
||||
bool cancel_updater = false;
|
||||
auto updater = repeat([&] {
|
||||
if (cancel_updater) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return later().then([&] {
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
return cache.update([]{}, *mt).then([mt] {
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
{
|
||||
auto pr = dht::partition_range::make_singular(query::ring_position(mutations[1].decorated_key()));
|
||||
assert_that(cache.make_reader(s, pr))
|
||||
.produces(mutations[1])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
{
|
||||
auto pr = dht::partition_range::make_ending_with(
|
||||
{query::ring_position(mutations[2].decorated_key()), true});
|
||||
assert_that(cache.make_reader(s, pr))
|
||||
.produces(mutations[0])
|
||||
.produces(mutations[1])
|
||||
.produces(mutations[2])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
cache.invalidate([]{}).get();
|
||||
|
||||
{
|
||||
assert_that(cache.make_reader(s, query::full_partition_range))
|
||||
.produces(mutations[0])
|
||||
.produces(mutations[1])
|
||||
.produces(mutations[2])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
cancel_updater = true;
|
||||
updater.get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) {
|
||||
return seastar::async([] {
|
||||
cache_tracker tracker;
|
||||
|
||||
@@ -4649,3 +4649,74 @@ SEASTAR_TEST_CASE(sstable_timestamp_metadata_correcness_with_negative) {
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().max_timestamp == 5);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
|
||||
return async([] {
|
||||
storage_service_for_tests ssft;
|
||||
cell_locker_stats cl_stats;
|
||||
|
||||
auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type);
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
auto sst = make_sstable(s, tmp->path, (*gen)++, la, big);
|
||||
sst->set_unshared();
|
||||
return sst;
|
||||
};
|
||||
|
||||
column_family_for_tests cf(s);
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled);
|
||||
|
||||
{
|
||||
auto tokens = token_generation_for_current_shard(4);
|
||||
auto make_insert = [&] (auto p) {
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(p.first)});
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 1 /* ts */);
|
||||
BOOST_REQUIRE(m.decorated_key().token() == p.second);
|
||||
return m;
|
||||
};
|
||||
auto mut1 = make_insert(tokens[0]);
|
||||
auto mut2 = make_insert(tokens[1]);
|
||||
auto mut3 = make_insert(tokens[2]);
|
||||
auto mut4 = make_insert(tokens[3]);
|
||||
std::vector<shared_sstable> ssts = {
|
||||
make_sstable_containing(sst_gen, {mut1, mut2}),
|
||||
make_sstable_containing(sst_gen, {mut3, mut4})
|
||||
};
|
||||
|
||||
for (auto& sst : ssts) {
|
||||
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
|
||||
}
|
||||
|
||||
// Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog.
|
||||
// That's done to assert backlog will work for compaction that is finished and was stopped tracking.
|
||||
|
||||
auto fut = sstables::compact_sstables(sstables::compaction_descriptor(ssts), *cf, sst_gen);
|
||||
|
||||
bool stopped_tracking = false;
|
||||
for (auto& info : cf._data->cm.get_compactions()) {
|
||||
if (info->cf == cf->schema()->cf_name()) {
|
||||
info->stop_tracking();
|
||||
stopped_tracking = true;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE(stopped_tracking);
|
||||
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
for (auto& sst : ssts) {
|
||||
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
|
||||
}
|
||||
|
||||
auto ret = fut.get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
BOOST_REQUIRE(ret.tracking == false);
|
||||
}
|
||||
// triggers code that iterates through registered compactions.
|
||||
cf._data->cm.backlog();
|
||||
cf->get_compaction_strategy().get_backlog_tracker().backlog();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -45,180 +45,143 @@
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) {
|
||||
return do_with_cql_env([] (auto& e) {
|
||||
auto ranges = db::size_estimates::size_estimates_mutation_reader::get_local_ranges().get0();
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto ranges = db::size_estimates::get_local_ranges().get0();
|
||||
auto start_token1 = utf8_type->to_string(ranges[3].start);
|
||||
auto start_token2 = utf8_type->to_string(ranges[5].start);
|
||||
auto end_token1 = utf8_type->to_string(ranges[3].end);
|
||||
auto end_token2 = utf8_type->to_string(ranges[55].end);
|
||||
auto &qp = e.local_qp();
|
||||
return e.execute_cql("create table cf1(pk text PRIMARY KEY, v int);").discard_result().then([&e] {
|
||||
return e.execute_cql("create table cf2(pk text PRIMARY KEY, v int);").discard_result();
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 512);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' limit 100;").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 100);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 256);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name > 'cf1';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 256);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name >= 'cf1';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 512);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name < 'cf2';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 256);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name <= 'cf2';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 512);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name in ('cf1', 'cf2');").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 512);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name >= 'cf1' and table_name <= 'cf1';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 256);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name >= 'cf1' and table_name <= 'cf2';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 512);
|
||||
});
|
||||
}).then([&qp] {
|
||||
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name > 'cf1' and table_name < 'cf2';").then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 0);
|
||||
});
|
||||
}).then([&qp, start_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s';", start_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s';", start_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 253);
|
||||
});
|
||||
}).then([&qp, start_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start > '%s';", start_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 252);
|
||||
});
|
||||
}).then([&qp, start_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start <= '%s';", start_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 4);
|
||||
});
|
||||
}).then([&qp, start_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start < '%s';", start_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 3);
|
||||
});
|
||||
}).then([&qp, start_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1, start_token2] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token2)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 3);
|
||||
});
|
||||
}).then([&qp, start_token1, start_token2] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start > '%s' and range_start < '%s';", start_token1, start_token2)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1, start_token2] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start in ('%s', '%s');", start_token1, start_token2)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 2);
|
||||
});
|
||||
}).then([&qp, start_token1, start_token2] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start > '%s' and range_start <= '%s';", start_token1, start_token2)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 2);
|
||||
});
|
||||
}).then([&qp, start_token1, start_token2] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s' and range_start < '%s';", start_token1, start_token2)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 2);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end = '%s';", start_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s';", start_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s';", start_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 0);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end <= '%s';", start_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end < '%s';", start_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 0);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s' and range_end <= '%s';", start_token1, end_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s' and range_end < '%s';", start_token1, end_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 0);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start, range_end) = ('cf1', '%s', '%s');", start_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 1);
|
||||
});
|
||||
}).then([&qp, start_token1, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') and (table_name) <= ('cf2');", start_token1, end_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 509);
|
||||
});
|
||||
}).then([&qp, start_token1, start_token2, end_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') "
|
||||
"and (table_name, range_start) <= ('cf2', '%s');", start_token1, end_token1, start_token2)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 259);
|
||||
});
|
||||
}).then([&qp, start_token1] {
|
||||
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start) < ('cf2', '%s');", start_token1)).then([](auto rs) {
|
||||
BOOST_REQUIRE_EQUAL(rs->size(), 259);
|
||||
});
|
||||
}).discard_result();
|
||||
|
||||
// Should not timeout.
|
||||
e.execute_cql("select * from system.size_estimates;").discard_result().get();
|
||||
|
||||
auto rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks';").get0();
|
||||
assert_that(rs).is_rows().with_size(0);
|
||||
|
||||
e.execute_cql("create table cf1(pk text PRIMARY KEY, v int);").discard_result().get();
|
||||
e.execute_cql("create table cf2(pk text PRIMARY KEY, v int);").discard_result().get();
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks';").get0();
|
||||
assert_that(rs).is_rows().with_size(512);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' limit 100;").get0();
|
||||
assert_that(rs).is_rows().with_size(100);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name = 'cf1';").get0();
|
||||
assert_that(rs).is_rows().with_size(256);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name > 'cf1';").get0();
|
||||
assert_that(rs).is_rows().with_size(256);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1';").get0();
|
||||
assert_that(rs).is_rows().with_size(512);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name < 'cf2';").get0();
|
||||
assert_that(rs).is_rows().with_size(256);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name <= 'cf2';").get0();
|
||||
assert_that(rs).is_rows().with_size(512);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name in ('cf1', 'cf2');").get0();
|
||||
assert_that(rs).is_rows().with_size(512);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1' and table_name <= 'cf1';").get0();
|
||||
assert_that(rs).is_rows().with_size(256);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1' and table_name <= 'cf2';").get0();
|
||||
assert_that(rs).is_rows().with_size(512);
|
||||
|
||||
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name > 'cf1' and table_name < 'cf2';").get0();
|
||||
assert_that(rs).is_rows().with_size(0);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s';", start_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s';", start_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(253);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start > '%s';", start_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(252);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start <= '%s';", start_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(4);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start < '%s';", start_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(3);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token2)).get0();
|
||||
assert_that(rs).is_rows().with_size(3);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start > '%s' and range_start < '%s';", start_token1, start_token2)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start in ('%s', '%s');", start_token1, start_token2)).get0();
|
||||
assert_that(rs).is_rows().with_size(2);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start > '%s' and range_start <= '%s';", start_token1, start_token2)).get0();
|
||||
assert_that(rs).is_rows().with_size(2);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start >= '%s' and range_start < '%s';", start_token1, start_token2)).get0();
|
||||
assert_that(rs).is_rows().with_size(2);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end = '%s';", start_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s';", start_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s';", start_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(0);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end <= '%s';", start_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end < '%s';", start_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(0);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s' and range_end <= '%s';", start_token1, end_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s' and range_end < '%s';", start_token1, end_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(0);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start, range_end) = ('cf1', '%s', '%s');", start_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') and (table_name) <= ('cf2');", start_token1, end_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(509);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') "
|
||||
"and (table_name, range_start) <= ('cf2', '%s');", start_token1, end_token1, start_token2)).get0();
|
||||
assert_that(rs).is_rows().with_size(259);
|
||||
|
||||
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
|
||||
"and (table_name, range_start) < ('cf2', '%s');", start_token1)).get0();
|
||||
assert_that(rs).is_rows().with_size(259);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
2
types.cc
2
types.cc
@@ -1471,7 +1471,7 @@ public:
|
||||
}
|
||||
b.push_back(v);
|
||||
}
|
||||
std::copy(b.crbegin(), b.crend(), out);
|
||||
out = std::copy(b.crbegin(), b.crend(), out);
|
||||
}
|
||||
virtual size_t serialized_size(const void* value) const override {
|
||||
if (!value) {
|
||||
|
||||
@@ -667,6 +667,7 @@ segment* segment_pool::allocate_segment(size_t reserve, size_t reclamation_step)
|
||||
return seg;
|
||||
}
|
||||
if (can_allocate_more_memory(segment::size)) {
|
||||
memory::disable_abort_on_alloc_failure_temporarily dfg;
|
||||
auto p = aligned_alloc(segment::size, segment::size);
|
||||
if (!p) {
|
||||
continue;
|
||||
@@ -677,10 +678,6 @@ segment* segment_pool::allocate_segment(size_t reserve, size_t reclamation_step)
|
||||
return seg;
|
||||
}
|
||||
} while (shard_tracker().get_impl().compact_and_evict(reclamation_step * segment::size));
|
||||
if (shard_tracker().should_abort_on_bad_alloc()) {
|
||||
llogger.error("Aborting due to segment allocation failure");
|
||||
abort();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
@@ -2261,6 +2258,7 @@ allocating_section::guard::~guard() {
|
||||
#ifndef SEASTAR_DEFAULT_ALLOCATOR
|
||||
|
||||
void allocating_section::reserve() {
|
||||
try {
|
||||
shard_segment_pool.set_emergency_reserve_max(std::max(_lsa_reserve, _minimum_lsa_emergency_reserve));
|
||||
shard_segment_pool.refill_emergency_reserve();
|
||||
|
||||
@@ -2275,6 +2273,13 @@ void allocating_section::reserve() {
|
||||
}
|
||||
|
||||
shard_segment_pool.clear_allocation_failure_flag();
|
||||
} catch (const std::bad_alloc&) {
|
||||
if (shard_tracker().should_abort_on_bad_alloc()) {
|
||||
llogger.error("Aborting due to allocation failure");
|
||||
abort();
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void allocating_section::on_alloc_failure(logalloc::region& r) {
|
||||
|
||||
@@ -710,6 +710,7 @@ public:
|
||||
while (true) {
|
||||
try {
|
||||
logalloc::reclaim_lock _(r);
|
||||
memory::disable_abort_on_alloc_failure_temporarily dfg;
|
||||
return fn();
|
||||
} catch (const std::bad_alloc&) {
|
||||
on_alloc_failure(r);
|
||||
|
||||
Reference in New Issue
Block a user