Compare commits
77 Commits
copilot/fi
...
scylla-1.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65862c4221 | ||
|
|
7672c59873 | ||
|
|
19212a059f | ||
|
|
bbbb4dffbd | ||
|
|
2f0970e83c | ||
|
|
5ab8601f64 | ||
|
|
993d216f23 | ||
|
|
630425f513 | ||
|
|
1626c9ce5e | ||
|
|
bc6def4a0f | ||
|
|
1e77bcd60f | ||
|
|
f9bcedfbf7 | ||
|
|
c814cb3433 | ||
|
|
62143d26b4 | ||
|
|
7a1ceadec4 | ||
|
|
d41ba3ba78 | ||
|
|
0e11c6218a | ||
|
|
a096a173bf | ||
|
|
13d52a8172 | ||
|
|
14b8a49b3f | ||
|
|
e437edafe6 | ||
|
|
b91456da90 | ||
|
|
ebb404275d | ||
|
|
92e6c62c1c | ||
|
|
3edcee4821 | ||
|
|
9c148d6e4c | ||
|
|
2ea4da28e8 | ||
|
|
a04722aeb2 | ||
|
|
41b8dc3b84 | ||
|
|
b6ebe2e20b | ||
|
|
7e1b245887 | ||
|
|
83fc7de65f | ||
|
|
d4781f2de3 | ||
|
|
2193a83a82 | ||
|
|
78d74bf23a | ||
|
|
642a479c73 | ||
|
|
8a6d0ad2fa | ||
|
|
37f73781ee | ||
|
|
6fd5442fb7 | ||
|
|
e2777e508c | ||
|
|
8cf7bbf208 | ||
|
|
4b5742d3a6 | ||
|
|
cf27d44412 | ||
|
|
7b40e19561 | ||
|
|
210e66b2b8 | ||
|
|
6af51c1b1d | ||
|
|
05f2bf5bd5 | ||
|
|
8002326f80 | ||
|
|
a00a1a1044 | ||
|
|
337d6fb2cf | ||
|
|
1a8211e573 | ||
|
|
a1a6c10964 | ||
|
|
c692824786 | ||
|
|
0ccdbbf1af | ||
|
|
138ad64cbc | ||
|
|
01737a51a9 | ||
|
|
9753a39284 | ||
|
|
42a76567b7 | ||
|
|
725949e8bf | ||
|
|
231cf22c0e | ||
|
|
ef0ffd1cbb | ||
|
|
2b67c65eb6 | ||
|
|
5b0971f82f | ||
|
|
da843239bf | ||
|
|
5c96b04f4d | ||
|
|
a1d463900f | ||
|
|
66598a68d5 | ||
|
|
8ad0e96025 | ||
|
|
1a2a63787a | ||
|
|
52e5706147 | ||
|
|
cec07ea366 | ||
|
|
cbe729415e | ||
|
|
b5190f9971 | ||
|
|
7739456ec2 | ||
|
|
e4da0167d4 | ||
|
|
a3266060e3 | ||
|
|
f6c83f73ef |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.6.4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -243,7 +243,8 @@ future<> auth::auth::setup() {
|
||||
std::map<sstring, sstring> opts;
|
||||
opts["replication_factor"] = "1";
|
||||
auto ksm = keyspace_metadata::new_keyspace(AUTH_KS, "org.apache.cassandra.locator.SimpleStrategy", opts, true);
|
||||
f = service::get_local_migration_manager().announce_new_keyspace(ksm, false);
|
||||
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. See issue #2129.
|
||||
f = service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false);
|
||||
}
|
||||
|
||||
return f.then([] {
|
||||
|
||||
@@ -44,7 +44,7 @@ canonical_mutation::canonical_mutation(const mutation& m)
|
||||
mutation_partition_serializer part_ser(*m.schema(), m.partition());
|
||||
|
||||
bytes_ostream out;
|
||||
ser::writer_of_canonical_mutation wr(out);
|
||||
ser::writer_of_canonical_mutation<bytes_ostream> wr(out);
|
||||
std::move(wr).write_table_id(m.schema()->id())
|
||||
.write_schema_version(m.schema()->version())
|
||||
.write_key(m.key())
|
||||
|
||||
@@ -43,6 +43,7 @@
|
||||
#include "schema_builder.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "boost/range/adaptor/map.hpp"
|
||||
#include "stdx.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -86,14 +87,14 @@ const sstring& alter_type_statement::keyspace() const
|
||||
return _name.get_keyspace();
|
||||
}
|
||||
|
||||
static int32_t get_idx_of_field(user_type type, shared_ptr<column_identifier> field)
|
||||
static stdx::optional<uint32_t> get_idx_of_field(user_type type, shared_ptr<column_identifier> field)
|
||||
{
|
||||
for (uint32_t i = 0; i < type->field_names().size(); ++i) {
|
||||
if (field->name() == type->field_names()[i]) {
|
||||
return i;
|
||||
return {i};
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
return {};
|
||||
}
|
||||
|
||||
void alter_type_statement::do_announce_migration(database& db, ::keyspace& ks, bool is_local_only)
|
||||
@@ -164,7 +165,7 @@ alter_type_statement::add_or_alter::add_or_alter(const ut_name& name, bool is_ad
|
||||
|
||||
user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_update) const
|
||||
{
|
||||
if (get_idx_of_field(to_update, _field_name) >= 0) {
|
||||
if (get_idx_of_field(to_update, _field_name)) {
|
||||
throw exceptions::invalid_request_exception(sprint("Cannot add new field %s to type %s: a field of the same name already exists", _field_name->name(), _name.to_string()));
|
||||
}
|
||||
|
||||
@@ -181,19 +182,19 @@ user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_
|
||||
|
||||
user_type alter_type_statement::add_or_alter::do_alter(database& db, user_type to_update) const
|
||||
{
|
||||
uint32_t idx = get_idx_of_field(to_update, _field_name);
|
||||
if (idx < 0) {
|
||||
stdx::optional<uint32_t> idx = get_idx_of_field(to_update, _field_name);
|
||||
if (!idx) {
|
||||
throw exceptions::invalid_request_exception(sprint("Unknown field %s in type %s", _field_name->name(), _name.to_string()));
|
||||
}
|
||||
|
||||
auto previous = to_update->field_types()[idx];
|
||||
auto previous = to_update->field_types()[*idx];
|
||||
auto new_type = _field_type->prepare(db, keyspace())->get_type();
|
||||
if (!new_type->is_compatible_with(*previous)) {
|
||||
throw exceptions::invalid_request_exception(sprint("Type %s in incompatible with previous type %s of field %s in user type %s", _field_type->to_string(), previous->as_cql3_type()->to_string(), _field_name->name(), _name.to_string()));
|
||||
}
|
||||
|
||||
std::vector<data_type> new_types(to_update->field_types());
|
||||
new_types[idx] = new_type;
|
||||
new_types[*idx] = new_type;
|
||||
return user_type_impl::get_instance(to_update->_keyspace, to_update->_name, to_update->field_names(), std::move(new_types));
|
||||
}
|
||||
|
||||
@@ -217,11 +218,11 @@ user_type alter_type_statement::renames::make_updated_type(database& db, user_ty
|
||||
std::vector<bytes> new_names(to_update->field_names());
|
||||
for (auto&& rename : _renames) {
|
||||
auto&& from = rename.first;
|
||||
int32_t idx = get_idx_of_field(to_update, from);
|
||||
if (idx < 0) {
|
||||
stdx::optional<uint32_t> idx = get_idx_of_field(to_update, from);
|
||||
if (!idx) {
|
||||
throw exceptions::invalid_request_exception(sprint("Unknown field %s in type %s", from->to_string(), _name.to_string()));
|
||||
}
|
||||
new_names[idx] = rename.second->name();
|
||||
new_names[*idx] = rename.second->name();
|
||||
}
|
||||
auto&& updated = user_type_impl::get_instance(to_update->_keyspace, to_update->_name, std::move(new_names), to_update->field_types());
|
||||
create_type_statement::check_for_duplicate_names(updated);
|
||||
|
||||
@@ -307,7 +307,8 @@ select_statement::execute(distributed<service::storage_proxy>& proxy,
|
||||
// doing post-query ordering.
|
||||
if (needs_post_query_ordering() && _limit) {
|
||||
return do_with(std::forward<std::vector<query::partition_range>>(partition_ranges), [this, &proxy, &state, &options, cmd](auto prs) {
|
||||
query::result_merger merger;
|
||||
assert(cmd->partition_limit == query::max_partitions);
|
||||
query::result_merger merger(cmd->row_limit * prs.size(), query::max_partitions);
|
||||
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, &options, cmd] (auto pr) {
|
||||
std::vector<query::partition_range> prange { pr };
|
||||
auto command = ::make_lw_shared<query::read_command>(*cmd);
|
||||
@@ -341,7 +342,8 @@ select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
|
||||
|
||||
if (needs_post_query_ordering() && _limit) {
|
||||
return do_with(std::move(partition_ranges), [this, &proxy, &state, command] (auto prs) {
|
||||
query::result_merger merger;
|
||||
assert(command->partition_limit == query::max_partitions);
|
||||
query::result_merger merger(command->row_limit * prs.size(), query::max_partitions);
|
||||
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, command] (auto pr) {
|
||||
std::vector<query::partition_range> prange { pr };
|
||||
auto cmd = ::make_lw_shared<query::read_command>(*command);
|
||||
@@ -375,8 +377,8 @@ select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> resu
|
||||
if (_is_reversed) {
|
||||
rs->reverse();
|
||||
}
|
||||
rs->trim(cmd->row_limit);
|
||||
}
|
||||
rs->trim(cmd->row_limit);
|
||||
return ::make_shared<transport::messages::result_message::rows>(std::move(rs));
|
||||
}
|
||||
|
||||
|
||||
78
database.cc
78
database.cc
@@ -816,6 +816,12 @@ void column_family::load_sstable(sstables::shared_sstable& sst, bool reset_level
|
||||
// several shards, but we can't start any compaction before all the sstables
|
||||
// of this CF were loaded. So call this function to start rewrites, if any.
|
||||
void column_family::start_rewrite() {
|
||||
// submit shared sstables in generation order to guarantee that all shards
|
||||
// owning a sstable will agree on its deletion nearly the same time,
|
||||
// therefore, reducing disk space requirements.
|
||||
boost::sort(_sstables_need_rewrite, [] (const sstables::shared_sstable& x, const sstables::shared_sstable& y) {
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
for (auto sst : _sstables_need_rewrite) {
|
||||
dblog.info("Splitting {} for shard", sst->get_filename());
|
||||
_compaction_manager.submit_sstable_rewrite(this, sst);
|
||||
@@ -1670,14 +1676,40 @@ database::database(const db::config& cfg)
|
||||
dblog.info("Row: max_vector_size: {}, internal_count: {}", size_t(row::max_vector_size), size_t(row::internal_count));
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
dirty_memory_manager::setup_collectd(sstring namestr) {
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("memory"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "bytes", namestr + "_dirty")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
|
||||
return real_dirty_memory();
|
||||
})));
|
||||
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("memory"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "bytes", namestr +"_virtual_dirty")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
|
||||
return virtual_dirty_memory();
|
||||
})));
|
||||
}
|
||||
|
||||
void
|
||||
database::setup_collectd() {
|
||||
_dirty_memory_manager.setup_collectd("regular");
|
||||
_system_dirty_memory_manager.setup_collectd("system");
|
||||
_streaming_dirty_memory_manager.setup_collectd("streaming");
|
||||
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("memory"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "bytes", "dirty")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
|
||||
return _dirty_memory_manager.real_dirty_memory();
|
||||
return _dirty_memory_manager.real_dirty_memory() +
|
||||
_system_dirty_memory_manager.real_dirty_memory() +
|
||||
_streaming_dirty_memory_manager.real_dirty_memory();
|
||||
})));
|
||||
|
||||
_collectd.push_back(
|
||||
@@ -1685,7 +1717,9 @@ database::setup_collectd() {
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "bytes", "virtual_dirty")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
|
||||
return _dirty_memory_manager.virtual_dirty_memory();
|
||||
return _dirty_memory_manager.virtual_dirty_memory() +
|
||||
_system_dirty_memory_manager.virtual_dirty_memory() +
|
||||
_streaming_dirty_memory_manager.virtual_dirty_memory();
|
||||
})));
|
||||
|
||||
_collectd.push_back(
|
||||
@@ -1956,13 +1990,12 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
|
||||
|
||||
future<>
|
||||
database::init_system_keyspace() {
|
||||
bool durable = _cfg->data_file_directories().size() > 0;
|
||||
db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing());
|
||||
|
||||
// FIXME support multiple directories
|
||||
return io_check(touch_directory, _cfg->data_file_directories()[0] + "/" + db::system_keyspace::NAME).then([this] {
|
||||
return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME).then([this]() {
|
||||
return init_commitlog();
|
||||
return init_commitlog().then([this] {
|
||||
bool durable = _cfg->data_file_directories().size() > 0;
|
||||
db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing());
|
||||
// FIXME support multiple directories
|
||||
return io_check(touch_directory, _cfg->data_file_directories()[0] + "/" + db::system_keyspace::NAME).then([this] {
|
||||
return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME);
|
||||
});
|
||||
}).then([this] {
|
||||
auto& ks = find_keyspace(db::system_keyspace::NAME);
|
||||
@@ -2382,29 +2415,33 @@ struct query_state {
|
||||
std::vector<query::partition_range>::const_iterator current_partition_range;
|
||||
std::vector<query::partition_range>::const_iterator range_end;
|
||||
mutation_reader reader;
|
||||
uint32_t remaining_rows() const {
|
||||
return limit - builder.row_count();
|
||||
}
|
||||
uint32_t remaining_partitions() const {
|
||||
return partition_limit - builder.partition_count();
|
||||
}
|
||||
bool done() const {
|
||||
return !limit || !partition_limit || current_partition_range == range_end || builder.is_short_read();
|
||||
return !remaining_rows() || !remaining_partitions() || current_partition_range == range_end || builder.is_short_read();
|
||||
}
|
||||
};
|
||||
|
||||
future<lw_shared_ptr<query::result>>
|
||||
column_family::query(schema_ptr s, const query::read_command& cmd, query::result_request request,
|
||||
const std::vector<query::partition_range>& partition_ranges,
|
||||
tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter) {
|
||||
tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter,
|
||||
uint64_t max_size) {
|
||||
utils::latency_counter lc;
|
||||
_stats.reads.set_latency(lc);
|
||||
auto f = request == query::result_request::only_digest
|
||||
? make_ready_future<query::result_memory_accounter>() : memory_limiter.new_read();
|
||||
? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size);
|
||||
return f.then([this, lc, s = std::move(s), &cmd, request, &partition_ranges, trace_state = std::move(trace_state)] (query::result_memory_accounter accounter) mutable {
|
||||
auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, request, partition_ranges, std::move(accounter));
|
||||
auto& qs = *qs_ptr;
|
||||
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state)] {
|
||||
auto&& range = *qs.current_partition_range++;
|
||||
return data_query(qs.schema, as_mutation_source(trace_state), range, qs.cmd.slice, qs.limit, qs.partition_limit,
|
||||
qs.cmd.timestamp, qs.builder).then([&qs] (auto&& r) {
|
||||
qs.limit -= r.live_rows;
|
||||
qs.partition_limit -= r.partitions;
|
||||
});
|
||||
return data_query(qs.schema, as_mutation_source(trace_state), range, qs.cmd.slice, qs.remaining_rows(),
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder);
|
||||
}).then([qs_ptr = std::move(qs_ptr), &qs] {
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
make_lw_shared<query::result>(qs.builder.build()));
|
||||
@@ -2428,9 +2465,10 @@ column_family::as_mutation_source(tracing::trace_state_ptr trace_state) const {
|
||||
}
|
||||
|
||||
future<lw_shared_ptr<query::result>>
|
||||
database::query(schema_ptr s, const query::read_command& cmd, query::result_request request, const std::vector<query::partition_range>& ranges, tracing::trace_state_ptr trace_state) {
|
||||
database::query(schema_ptr s, const query::read_command& cmd, query::result_request request, const std::vector<query::partition_range>& ranges, tracing::trace_state_ptr trace_state,
|
||||
uint64_t max_result_size) {
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
return cf.query(std::move(s), cmd, request, ranges, std::move(trace_state), get_result_memory_limiter()).then_wrapped([this, s = _stats] (auto f) {
|
||||
return cf.query(std::move(s), cmd, request, ranges, std::move(trace_state), get_result_memory_limiter(), max_result_size).then_wrapped([this, s = _stats] (auto f) {
|
||||
if (f.failed()) {
|
||||
++s->total_reads_failed;
|
||||
} else {
|
||||
@@ -2676,7 +2714,7 @@ future<> dirty_memory_manager::flush_when_needed() {
|
||||
});
|
||||
}
|
||||
|
||||
void dirty_memory_manager::start_reclaiming() {
|
||||
void dirty_memory_manager::start_reclaiming() noexcept {
|
||||
_should_flush.signal();
|
||||
}
|
||||
|
||||
|
||||
12
database.hh
12
database.hh
@@ -144,12 +144,16 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
|
||||
std::unordered_map<const logalloc::region*, flush_permit> _flush_manager;
|
||||
|
||||
future<> _waiting_flush;
|
||||
virtual void start_reclaiming() override;
|
||||
virtual void start_reclaiming() noexcept override;
|
||||
|
||||
bool has_pressure() const {
|
||||
return over_soft_limit();
|
||||
}
|
||||
|
||||
std::vector<scollectd::registration> _collectd;
|
||||
public:
|
||||
void setup_collectd(sstring namestr);
|
||||
|
||||
future<> shutdown();
|
||||
|
||||
// Limits and pressure conditions:
|
||||
@@ -650,7 +654,8 @@ public:
|
||||
const query::read_command& cmd, query::result_request request,
|
||||
const std::vector<query::partition_range>& ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
query::result_memory_limiter& memory_limiter);
|
||||
query::result_memory_limiter& memory_limiter,
|
||||
uint64_t max_result_size);
|
||||
|
||||
future<> populate(sstring datadir);
|
||||
|
||||
@@ -1162,7 +1167,8 @@ public:
|
||||
unsigned shard_of(const dht::token& t);
|
||||
unsigned shard_of(const mutation& m);
|
||||
unsigned shard_of(const frozen_mutation& m);
|
||||
future<lw_shared_ptr<query::result>> query(schema_ptr, const query::read_command& cmd, query::result_request request, const std::vector<query::partition_range>& ranges, tracing::trace_state_ptr trace_state);
|
||||
future<lw_shared_ptr<query::result>> query(schema_ptr, const query::read_command& cmd, query::result_request request, const std::vector<query::partition_range>& ranges,
|
||||
tracing::trace_state_ptr trace_state, uint64_t max_result_size);
|
||||
future<reconcilable_result> query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range,
|
||||
query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state);
|
||||
// Apply the mutation atomically.
|
||||
|
||||
@@ -1614,7 +1614,7 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
|
||||
bool failed = false;
|
||||
|
||||
work(file f, position_type o = 0)
|
||||
: f(f), fin(make_file_input_stream(f, o, make_file_input_stream_options())), start_off(o) {
|
||||
: f(f), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), start_off(o) {
|
||||
}
|
||||
work(work&&) = default;
|
||||
|
||||
|
||||
@@ -61,13 +61,19 @@
|
||||
|
||||
static logging::logger logger("commitlog_replayer");
|
||||
|
||||
struct column_mappings {
|
||||
std::unordered_map<table_schema_version, column_mapping> map;
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
};
|
||||
|
||||
class db::commitlog_replayer::impl {
|
||||
seastar::sharded<column_mappings> _column_mappings;
|
||||
struct column_mappings {
|
||||
std::unordered_map<table_schema_version, column_mapping> map;
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
};
|
||||
|
||||
// we want the processing methods to be const, since they use
|
||||
// shard-sharing of data -> read only
|
||||
// this one is special since it is thread local.
|
||||
// Should actually make sharded::local a const function (it does
|
||||
// not modify content), but...
|
||||
mutable seastar::sharded<column_mappings> _column_mappings;
|
||||
|
||||
friend class db::commitlog_replayer;
|
||||
public:
|
||||
impl(seastar::sharded<cql3::query_processor>& db);
|
||||
@@ -94,13 +100,35 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<> process(stats*, temporary_buffer<char> buf, replay_position rp);
|
||||
future<stats> recover(sstring file);
|
||||
// move start/stop of the thread local bookkeep to "top level"
|
||||
// and also make sure to assert on it actually being started.
|
||||
future<> start() {
|
||||
return _column_mappings.start();
|
||||
}
|
||||
future<> stop() {
|
||||
return _column_mappings.stop();
|
||||
}
|
||||
|
||||
future<> process(stats*, temporary_buffer<char> buf, replay_position rp) const;
|
||||
future<stats> recover(sstring file) const;
|
||||
|
||||
typedef std::unordered_map<utils::UUID, replay_position> rp_map;
|
||||
typedef std::unordered_map<unsigned, rp_map> shard_rpm_map;
|
||||
typedef std::unordered_map<unsigned, replay_position> shard_rp_map;
|
||||
|
||||
replay_position min_pos(unsigned shard) const {
|
||||
auto i = _min_pos.find(shard);
|
||||
return i != _min_pos.end() ? i->second : replay_position();
|
||||
}
|
||||
replay_position cf_min_pos(const utils::UUID& uuid, unsigned shard) const {
|
||||
auto i = _rpm.find(shard);
|
||||
if (i == _rpm.end()) {
|
||||
return replay_position();
|
||||
}
|
||||
auto j = i->second.find(uuid);
|
||||
return j != i->second.end() ? j->second : replay_position();
|
||||
}
|
||||
|
||||
seastar::sharded<cql3::query_processor>&
|
||||
_qp;
|
||||
shard_rpm_map
|
||||
@@ -175,7 +203,6 @@ future<> db::commitlog_replayer::impl::init() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto&p : _min_pos) {
|
||||
logger.debug("minimum position for shard {}: {}", p.first, p.second);
|
||||
}
|
||||
@@ -188,9 +215,11 @@ future<> db::commitlog_replayer::impl::init() {
|
||||
}
|
||||
|
||||
future<db::commitlog_replayer::impl::stats>
|
||||
db::commitlog_replayer::impl::recover(sstring file) {
|
||||
db::commitlog_replayer::impl::recover(sstring file) const {
|
||||
assert(_column_mappings.local_is_initialized());
|
||||
|
||||
replay_position rp{commitlog::descriptor(file)};
|
||||
auto gp = _min_pos[rp.shard_id()];
|
||||
auto gp = min_pos(rp.shard_id());
|
||||
|
||||
if (rp.id < gp.id) {
|
||||
logger.debug("skipping replay of fully-flushed {}", file);
|
||||
@@ -220,7 +249,7 @@ db::commitlog_replayer::impl::recover(sstring file) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char> buf, replay_position rp) {
|
||||
future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char> buf, replay_position rp) const {
|
||||
try {
|
||||
|
||||
commitlog_entry_reader cer(buf);
|
||||
@@ -238,17 +267,16 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
|
||||
const column_mapping& src_cm = cm_it->second;
|
||||
|
||||
auto shard_id = rp.shard_id();
|
||||
if (rp < _min_pos[shard_id]) {
|
||||
if (rp < min_pos(shard_id)) {
|
||||
logger.trace("entry {} is less than global min position. skipping", rp);
|
||||
s->skipped_mutations++;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto uuid = fm.column_family_id();
|
||||
auto& map = _rpm[shard_id];
|
||||
auto i = map.find(uuid);
|
||||
if (i != map.end() && rp <= i->second) {
|
||||
logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second);
|
||||
auto cf_rp = cf_min_pos(uuid, shard_id);
|
||||
if (rp <= cf_rp) {
|
||||
logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, cf_rp);
|
||||
s->skipped_mutations++;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -323,42 +351,55 @@ future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::
|
||||
}
|
||||
|
||||
future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
|
||||
return _impl->_column_mappings.start().then([this, files = std::move(files)] {
|
||||
typedef std::unordered_multimap<unsigned, sstring> shard_file_map;
|
||||
|
||||
logger.info("Replaying {}", join(", ", files));
|
||||
return map_reduce(files, [this](auto f) {
|
||||
logger.debug("Replaying {}", f);
|
||||
return _impl->recover(f).then([f](impl::stats stats) {
|
||||
if (stats.corrupt_bytes != 0) {
|
||||
logger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes);
|
||||
}
|
||||
logger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, f
|
||||
, stats.applied_mutations
|
||||
, stats.invalid_mutations
|
||||
, stats.skipped_mutations
|
||||
|
||||
// pre-compute work per shard already.
|
||||
auto map = ::make_lw_shared<shard_file_map>();
|
||||
for (auto& f : files) {
|
||||
commitlog::descriptor d(f);
|
||||
replay_position p = d;
|
||||
map->emplace(p.shard_id() % smp::count, std::move(f));
|
||||
}
|
||||
|
||||
return _impl->start().then([this, map] {
|
||||
return map_reduce(smp::all_cpus(), [this, map](unsigned id) {
|
||||
return smp::submit_to(id, [this, id, map]() {
|
||||
auto total = ::make_lw_shared<impl::stats>();
|
||||
// TODO: or something. For now, we do this serialized per shard,
|
||||
// to reduce mutation congestion. We could probably (says avi)
|
||||
// do 2 segments in parallel or something, but lets use this first.
|
||||
auto range = map->equal_range(id);
|
||||
return do_for_each(range.first, range.second, [this, total](const std::pair<unsigned, sstring>& p) {
|
||||
auto&f = p.second;
|
||||
logger.debug("Replaying {}", f);
|
||||
return _impl->recover(f).then([f, total](impl::stats stats) {
|
||||
if (stats.corrupt_bytes != 0) {
|
||||
logger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes);
|
||||
}
|
||||
logger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, f
|
||||
, stats.applied_mutations
|
||||
, stats.invalid_mutations
|
||||
, stats.skipped_mutations
|
||||
);
|
||||
*total += stats;
|
||||
});
|
||||
}).then([total] {
|
||||
return make_ready_future<impl::stats>(*total);
|
||||
});
|
||||
});
|
||||
}, impl::stats(), std::plus<impl::stats>()).then([](impl::stats totals) {
|
||||
logger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, totals.applied_mutations
|
||||
, totals.invalid_mutations
|
||||
, totals.skipped_mutations
|
||||
);
|
||||
return make_ready_future<impl::stats>(stats);
|
||||
}).handle_exception([f](auto ep) -> future<impl::stats> {
|
||||
logger.error("Error recovering {}: {}", f, ep);
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (std::invalid_argument&) {
|
||||
logger.error("Scylla cannot process {}. Make sure to fully flush all Cassandra commit log files to sstable before migrating.", f);
|
||||
throw;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}, impl::stats(), std::plus<impl::stats>()).then([](impl::stats totals) {
|
||||
logger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, totals.applied_mutations
|
||||
, totals.invalid_mutations
|
||||
, totals.skipped_mutations
|
||||
);
|
||||
}).finally([this] {
|
||||
return _impl->_column_mappings.stop();
|
||||
return _impl->stop();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> db::commitlog_replayer::recover(sstring f) {
|
||||
|
||||
@@ -736,8 +736,9 @@ public:
|
||||
val(lsa_reclamation_step, size_t, 1, Used, "Minimum number of segments to reclaim in a single step") \
|
||||
val(prometheus_port, uint16_t, 9180, Used, "Prometheus port, set to zero to disable") \
|
||||
val(prometheus_address, sstring, "0.0.0.0", Used, "Prometheus listening address") \
|
||||
val(prometheus_prefix, sstring, "scylla", Used, "Set the prefix of the exported Prometheus metrics. Changing this will break Scylla's dashboard compatibility, do not change unless you know what you are doing.") \
|
||||
val(abort_on_lsa_bad_alloc, bool, false, Used, "Abort when allocation in LSA region fails") \
|
||||
val(murmur3_partitioner_ignore_msb_bits, unsigned, 4, Used, "Number of most siginificant token bits to ignore in murmur3 partitioner; increase for very large clusters") \
|
||||
val(murmur3_partitioner_ignore_msb_bits, unsigned, 0, Used, "Number of most siginificant token bits to ignore in murmur3 partitioner; increase for very large clusters") \
|
||||
/* done! */
|
||||
|
||||
#define _make_value_member(name, type, deflt, status, desc, ...) \
|
||||
|
||||
@@ -77,6 +77,15 @@ namespace schema_tables {
|
||||
|
||||
logging::logger logger("schema_tables");
|
||||
|
||||
struct push_back_and_return {
|
||||
std::vector<mutation> muts;
|
||||
|
||||
std::vector<mutation> operator()(mutation&& m) {
|
||||
muts.emplace_back(std::move(m));
|
||||
return std::move(muts);
|
||||
}
|
||||
};
|
||||
|
||||
struct qualified_name {
|
||||
sstring keyspace_name;
|
||||
sstring table_name;
|
||||
@@ -495,6 +504,14 @@ read_schema_partition_for_table(distributed<service::storage_proxy>& proxy, cons
|
||||
return query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key));
|
||||
}
|
||||
|
||||
future<mutation>
|
||||
read_keyspace_mutation(distributed<service::storage_proxy>& proxy, const sstring& keyspace_name) {
|
||||
schema_ptr s = keyspaces();
|
||||
auto key = partition_key::from_singular(*s, keyspace_name);
|
||||
auto cmd = make_lw_shared<query::read_command>(s->id(), s->version(), query::full_slice);
|
||||
return query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
|
||||
}
|
||||
|
||||
static semaphore the_merge_lock {1};
|
||||
|
||||
future<> merge_lock() {
|
||||
@@ -1115,19 +1132,18 @@ void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp,
|
||||
mutations.emplace_back(std::move(m));
|
||||
}
|
||||
|
||||
std::vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
|
||||
future<std::vector<mutation>> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
|
||||
{
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
|
||||
std::vector<mutation> mutations;
|
||||
add_type_to_schema_mutation(type, timestamp, mutations);
|
||||
return mutations;
|
||||
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
|
||||
}
|
||||
|
||||
std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
|
||||
future<std::vector<mutation>> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
|
||||
{
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
|
||||
|
||||
std::vector<mutation> mutations;
|
||||
schema_ptr s = usertypes();
|
||||
auto pkey = partition_key::from_singular(*s, type->_keyspace);
|
||||
auto ckey = clustering_key::from_singular(*s, type->get_name_as_string());
|
||||
@@ -1135,19 +1151,21 @@ std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata>
|
||||
m.partition().apply_delete(*s, ckey, tombstone(timestamp, gc_clock::now()));
|
||||
mutations.emplace_back(std::move(m));
|
||||
|
||||
return mutations;
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
|
||||
}
|
||||
|
||||
/*
|
||||
* Table metadata serialization/deserialization.
|
||||
*/
|
||||
|
||||
std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
|
||||
future<std::vector<mutation>> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
|
||||
{
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
|
||||
std::vector<mutation> mutations;
|
||||
add_table_to_schema_mutation(table, timestamp, true, mutations);
|
||||
return mutations;
|
||||
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
|
||||
}
|
||||
|
||||
schema_mutations make_table_mutations(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers)
|
||||
@@ -1250,14 +1268,13 @@ void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestam
|
||||
make_table_mutations(table, timestamp, with_columns_and_triggers).copy_to(mutations);
|
||||
}
|
||||
|
||||
std::vector<mutation> make_update_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
|
||||
future<std::vector<mutation>> make_update_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
|
||||
schema_ptr old_table,
|
||||
schema_ptr new_table,
|
||||
api::timestamp_type timestamp,
|
||||
bool from_thrift)
|
||||
{
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
|
||||
std::vector<mutation> mutations;
|
||||
|
||||
add_table_to_schema_mutation(new_table, timestamp, false, mutations);
|
||||
|
||||
@@ -1298,13 +1315,14 @@ std::vector<mutation> make_update_table_mutations(lw_shared_ptr<keyspace_metadat
|
||||
addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
|
||||
|
||||
#endif
|
||||
return mutations;
|
||||
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
|
||||
}
|
||||
|
||||
std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
|
||||
future<std::vector<mutation>> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
|
||||
{
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
auto mutations = make_create_keyspace_mutations(keyspace, timestamp, false);
|
||||
std::vector<mutation> mutations;
|
||||
schema_ptr s = columnfamilies();
|
||||
auto pkey = partition_key::from_singular(*s, table->ks_name());
|
||||
auto ckey = clustering_key::from_singular(*s, table->cf_name());
|
||||
@@ -1324,7 +1342,9 @@ std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata>
|
||||
for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes())
|
||||
indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp);
|
||||
#endif
|
||||
return mutations;
|
||||
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
|
||||
}
|
||||
|
||||
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table)
|
||||
|
||||
@@ -79,6 +79,7 @@ future<std::vector<frozen_mutation>> convert_schema_to_mutations(distributed<ser
|
||||
|
||||
future<schema_result_value_type>
|
||||
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
|
||||
future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, const sstring& keyspace_name);
|
||||
|
||||
future<> merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations);
|
||||
|
||||
@@ -94,17 +95,17 @@ std::vector<mutation> make_drop_keyspace_mutations(lw_shared_ptr<keyspace_metada
|
||||
|
||||
lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const schema_result_value_type& partition);
|
||||
|
||||
std::vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
|
||||
future<std::vector<mutation>> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
|
||||
|
||||
std::vector<user_type> create_types_from_schema_partition(const schema_result_value_type& result);
|
||||
|
||||
std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
|
||||
future<std::vector<mutation>> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
|
||||
|
||||
void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp, std::vector<mutation>& mutations);
|
||||
|
||||
std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
|
||||
future<std::vector<mutation>> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
|
||||
|
||||
std::vector<mutation> make_update_table_mutations(
|
||||
future<std::vector<mutation>> make_update_table_mutations(
|
||||
lw_shared_ptr<keyspace_metadata> keyspace,
|
||||
schema_ptr old_table,
|
||||
schema_ptr new_table,
|
||||
@@ -117,7 +118,7 @@ future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distri
|
||||
|
||||
void add_table_to_schema_mutation(schema_ptr table, api::timestamp_type timestamp, bool with_columns_and_triggers, std::vector<mutation>& mutations);
|
||||
|
||||
std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
|
||||
future<std::vector<mutation>> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
|
||||
|
||||
future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& proxy, const sstring& keyspace, const sstring& table);
|
||||
|
||||
|
||||
@@ -276,15 +276,19 @@ ring_position_range_vector_sharder::ring_position_range_vector_sharder(std::vect
|
||||
next_range();
|
||||
}
|
||||
|
||||
stdx::optional<ring_position_range_and_shard>
|
||||
stdx::optional<ring_position_range_and_shard_and_element>
|
||||
ring_position_range_vector_sharder::next(const schema& s) {
|
||||
if (!_current_sharder) {
|
||||
return stdx::nullopt;
|
||||
}
|
||||
auto ret = _current_sharder->next(s);
|
||||
while (!ret && _current_range != _ranges.end()) {
|
||||
auto range_and_shard = _current_sharder->next(s);
|
||||
while (!range_and_shard && _current_range != _ranges.end()) {
|
||||
next_range();
|
||||
ret = _current_sharder->next(s);
|
||||
range_and_shard = _current_sharder->next(s);
|
||||
}
|
||||
auto ret = stdx::optional<ring_position_range_and_shard_and_element>();
|
||||
if (range_and_shard) {
|
||||
ret.emplace(std::move(*range_and_shard), _current_range - _ranges.begin() - 1);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -445,13 +445,20 @@ class ring_position_range_sharder {
|
||||
nonwrapping_range<ring_position> _range;
|
||||
bool _done = false;
|
||||
public:
|
||||
explicit ring_position_range_sharder(range<ring_position> rrp)
|
||||
explicit ring_position_range_sharder(nonwrapping_range<ring_position> rrp)
|
||||
: ring_position_range_sharder(global_partitioner(), std::move(rrp)) {}
|
||||
ring_position_range_sharder(const i_partitioner& partitioner, range<ring_position> rrp)
|
||||
ring_position_range_sharder(const i_partitioner& partitioner, nonwrapping_range<ring_position> rrp)
|
||||
: _partitioner(partitioner), _range(std::move(rrp)) {}
|
||||
stdx::optional<ring_position_range_and_shard> next(const schema& s);
|
||||
};
|
||||
|
||||
struct ring_position_range_and_shard_and_element : ring_position_range_and_shard {
|
||||
ring_position_range_and_shard_and_element(ring_position_range_and_shard&& rpras, unsigned element)
|
||||
: ring_position_range_and_shard(std::move(rpras)), element(element) {
|
||||
}
|
||||
unsigned element;
|
||||
};
|
||||
|
||||
class ring_position_range_vector_sharder {
|
||||
using vec_type = std::vector<nonwrapping_range<ring_position>>;
|
||||
vec_type _ranges;
|
||||
@@ -465,7 +472,8 @@ private:
|
||||
}
|
||||
public:
|
||||
explicit ring_position_range_vector_sharder(std::vector<nonwrapping_range<ring_position>> ranges);
|
||||
stdx::optional<ring_position_range_and_shard> next(const schema& s);
|
||||
// results are returned sorted by index within the vector first, then within each vector item
|
||||
stdx::optional<ring_position_range_and_shard_and_element> next(const schema& s);
|
||||
};
|
||||
|
||||
nonwrapping_range<ring_position> to_partition_range(nonwrapping_range<dht::token>);
|
||||
|
||||
19
dist/common/scripts/scylla_setup
vendored
19
dist/common/scripts/scylla_setup
vendored
@@ -79,8 +79,16 @@ verify_package() {
|
||||
fi
|
||||
}
|
||||
|
||||
list_block_devices() {
|
||||
if lsblk --help | grep -q -e '^\s*-p'; then
|
||||
lsblk -pnr | awk '{ print $1 }'
|
||||
else
|
||||
ls -1 /dev/sd* /dev/hd* /dev/xvd* /dev/nvme* /dev/mapper/* 2>/dev/null|grep -v control
|
||||
fi
|
||||
}
|
||||
|
||||
get_unused_disks() {
|
||||
blkid -c /dev/null|cut -f 1 -d ' '|sed s/://g|grep -v loop|while read dev
|
||||
list_block_devices|grep -v loop|while read dev
|
||||
do
|
||||
count_raw=$(grep $dev /proc/mounts|wc -l)
|
||||
count_pvs=0
|
||||
@@ -266,14 +274,9 @@ if [ $ENABLE_SERVICE -eq 1 ]; then
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ ! -f /etc/scylla.d/housekeeping.uuid ]; then
|
||||
uuidgen > /etc/scylla.d/housekeeping.uuid
|
||||
fi
|
||||
|
||||
UUID=`cat /etc/scylla.d/housekeeping.uuid` || true
|
||||
CUR_VERSION=`scylla --version` || true
|
||||
if [ "$CUR_VERSION" != "" ] && [ "$UUID" != "" ]; then
|
||||
NEW_VERSION=`/usr/lib/scylla/scylla-housekeeping --uuid $UUID version --version $CUR_VERSION --mode i` || true
|
||||
NEW_VERSION=`sudo -u scylla /usr/lib/scylla/scylla-housekeeping --uuid-file /var/lib/scylla-housekeeping/housekeeping.uuid version --version $CUR_VERSION --mode i` || true
|
||||
if [ "$NEW_VERSION" != "" ]; then
|
||||
echo $NEW_VERSION
|
||||
fi
|
||||
@@ -316,7 +319,7 @@ if [ $INTERACTIVE -eq 1 ]; then
|
||||
echo
|
||||
RAID_SETUP=0
|
||||
else
|
||||
echo "Please select disks from the following list: $DEVS"
|
||||
echo "Please select unmounted disks from the following list: $DEVS"
|
||||
fi
|
||||
while [ "$DEVS" != "" ]; do
|
||||
echo "type 'done' to finish selection. selected: $DISKS"
|
||||
|
||||
@@ -6,7 +6,7 @@ After=network.target
|
||||
Type=simple
|
||||
User=scylla
|
||||
Group=scylla
|
||||
ExecStart=/usr/lib/scylla/scylla-housekeeping --uuid-file /etc/scylla.d/housekeeping.uuid -q -c /etc/scylla.d/housekeeping.cfg version --mode d
|
||||
ExecStart=/usr/lib/scylla/scylla-housekeeping --uuid-file /var/lib/scylla-housekeeping/housekeeping.uuid -q -c /etc/scylla.d/housekeeping.cfg version --mode d
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -7,7 +7,7 @@ ENV container docker
|
||||
VOLUME [ "/sys/fs/cgroup" ]
|
||||
|
||||
#install scylla
|
||||
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.6.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN yum -y install epel-release
|
||||
RUN yum -y clean expire-cache
|
||||
RUN yum -y update
|
||||
@@ -38,6 +38,6 @@ ADD commandlineparser.py /commandlineparser.py
|
||||
ADD docker-entrypoint.py /docker-entrypoint.py
|
||||
ENTRYPOINT ["/docker-entrypoint.py"]
|
||||
|
||||
EXPOSE 10000 9042 9160 7000 7001
|
||||
EXPOSE 10000 9042 9160 9180 7000 7001
|
||||
VOLUME [ "/var/lib/scylla" ]
|
||||
RUN chown -R scylla.scylla /var/lib/scylla
|
||||
|
||||
11
dist/redhat/centos_dep/build_dependency.sh
vendored
11
dist/redhat/centos_dep/build_dependency.sh
vendored
@@ -28,10 +28,6 @@ if [ ! -f boost-1.58.0-11.fc23.src.rpm ]; then
|
||||
wget -nv https://kojipkgs.fedoraproject.org//packages/boost/1.58.0/11.fc23/src/boost-1.58.0-11.fc23.src.rpm
|
||||
fi
|
||||
|
||||
if [ ! -f ninja-build-1.6.0-2.fc23.src.rpm ]; then
|
||||
wget -nv https://kojipkgs.fedoraproject.org//packages/ninja-build/1.6.0/2.fc23/src/ninja-build-1.6.0-2.fc23.src.rpm
|
||||
fi
|
||||
|
||||
if [ ! -f ragel-6.8-5.fc23.src.rpm ]; then
|
||||
wget -nv https://kojipkgs.fedoraproject.org//packages/ragel/6.8/5.fc23/src/ragel-6.8-5.fc23.src.rpm
|
||||
fi
|
||||
@@ -94,13 +90,6 @@ if [ ! -f $RPMBUILD/RPMS/x86_64/scylla-boost-1.58.0-11.el7*.x86_64.rpm ]; then
|
||||
fi
|
||||
do_install scylla-boost*
|
||||
|
||||
if [ ! -f $RPMBUILD/RPMS/x86_64/scylla-ninja-build-1.6.0-2.el7*.x86_64.rpm ]; then
|
||||
rpm --define "_topdir $RPMBUILD" -ivh build/srpms/ninja-build-1.6.0-2.fc23.src.rpm
|
||||
patch $RPMBUILD/SPECS/ninja-build.spec < dist/redhat/centos_dep/ninja-build.diff
|
||||
rpmbuild --define "_topdir $RPMBUILD" -ba $RPMBUILD/SPECS/ninja-build.spec
|
||||
fi
|
||||
do_install scylla-ninja-build-1.6.0-2.el7*.x86_64.rpm
|
||||
|
||||
if [ ! -f $RPMBUILD/RPMS/x86_64/scylla-ragel-6.8-5.el7*.x86_64.rpm ]; then
|
||||
rpm --define "_topdir $RPMBUILD" -ivh build/srpms/ragel-6.8-5.fc23.src.rpm
|
||||
patch $RPMBUILD/SPECS/ragel.spec < dist/redhat/centos_dep/ragel.diff
|
||||
|
||||
56
dist/redhat/centos_dep/ninja-build.diff
vendored
56
dist/redhat/centos_dep/ninja-build.diff
vendored
@@ -1,56 +0,0 @@
|
||||
--- ninja-build.spec.orig 2016-01-20 14:41:16.892802134 +0000
|
||||
+++ ninja-build.spec 2016-01-20 14:44:42.453227192 +0000
|
||||
@@ -1,19 +1,18 @@
|
||||
-Name: ninja-build
|
||||
+Name: scylla-ninja-build
|
||||
Version: 1.6.0
|
||||
Release: 2%{?dist}
|
||||
Summary: A small build system with a focus on speed
|
||||
License: ASL 2.0
|
||||
URL: http://martine.github.com/ninja/
|
||||
Source0: https://github.com/martine/ninja/archive/v%{version}.tar.gz#/ninja-%{version}.tar.gz
|
||||
-Source1: ninja.vim
|
||||
# Rename mentions of the executable name to be ninja-build.
|
||||
Patch1000: ninja-1.6.0-binary-rename.patch
|
||||
+Requires: scylla-env
|
||||
BuildRequires: asciidoc
|
||||
BuildRequires: gtest-devel
|
||||
BuildRequires: python2-devel
|
||||
-BuildRequires: re2c >= 0.11.3
|
||||
-Requires: emacs-filesystem
|
||||
-Requires: vim-filesystem
|
||||
+#BuildRequires: scylla-re2c >= 0.11.3
|
||||
+%define _prefix /opt/scylladb
|
||||
|
||||
%description
|
||||
Ninja is a small build system with a focus on speed. It differs from other
|
||||
@@ -32,15 +31,8 @@
|
||||
./ninja -v ninja_test
|
||||
|
||||
%install
|
||||
-# TODO: Install ninja_syntax.py?
|
||||
-mkdir -p %{buildroot}/{%{_bindir},%{_datadir}/bash-completion/completions,%{_datadir}/emacs/site-lisp,%{_datadir}/vim/vimfiles/syntax,%{_datadir}/vim/vimfiles/ftdetect,%{_datadir}/zsh/site-functions}
|
||||
-
|
||||
+mkdir -p %{buildroot}/opt/scylladb/bin
|
||||
install -pm755 ninja %{buildroot}%{_bindir}/ninja-build
|
||||
-install -pm644 misc/bash-completion %{buildroot}%{_datadir}/bash-completion/completions/ninja-bash-completion
|
||||
-install -pm644 misc/ninja-mode.el %{buildroot}%{_datadir}/emacs/site-lisp/ninja-mode.el
|
||||
-install -pm644 misc/ninja.vim %{buildroot}%{_datadir}/vim/vimfiles/syntax/ninja.vim
|
||||
-install -pm644 %{SOURCE1} %{buildroot}%{_datadir}/vim/vimfiles/ftdetect/ninja.vim
|
||||
-install -pm644 misc/zsh-completion %{buildroot}%{_datadir}/zsh/site-functions/_ninja
|
||||
|
||||
%check
|
||||
# workaround possible too low default limits
|
||||
@@ -50,12 +42,6 @@
|
||||
%files
|
||||
%doc COPYING HACKING.md README doc/manual.html
|
||||
%{_bindir}/ninja-build
|
||||
-%{_datadir}/bash-completion/completions/ninja-bash-completion
|
||||
-%{_datadir}/emacs/site-lisp/ninja-mode.el
|
||||
-%{_datadir}/vim/vimfiles/syntax/ninja.vim
|
||||
-%{_datadir}/vim/vimfiles/ftdetect/ninja.vim
|
||||
-# zsh does not have a -filesystem package
|
||||
-%{_datadir}/zsh/
|
||||
|
||||
%changelog
|
||||
* Mon Nov 16 2015 Ben Boeckel <mathstuf@gmail.com> - 1.6.0-2
|
||||
18
dist/redhat/scylla.spec.in
vendored
18
dist/redhat/scylla.spec.in
vendored
@@ -27,10 +27,10 @@ Group: Applications/Databases
|
||||
Summary: The Scylla database server
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel systemtap-sdt-devel
|
||||
%{?fedora:BuildRequires: boost-devel ninja-build ragel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++-static scylla-boost-devel scylla-boost-static scylla-ninja-build scylla-ragel scylla-antlr3-tool scylla-antlr3-C++-devel python34 scylla-gcc-c++ >= 5.1.1, python34-pyparsing}
|
||||
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl util-linux
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel systemtap-sdt-devel ninja-build
|
||||
%{?fedora:BuildRequires: boost-devel ragel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++-static scylla-boost-devel scylla-boost-static scylla-ragel scylla-antlr3-tool scylla-antlr3-C++-devel python34 scylla-gcc-c++ >= 5.1.1, python34-pyparsing}
|
||||
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl util-linux python-setuptools pciutils
|
||||
%{?rhel:Requires: python34 python34-PyYAML}
|
||||
Conflicts: abrt
|
||||
|
||||
@@ -88,7 +88,7 @@ install -m755 dist/common/bin/scyllatop $RPM_BUILD_ROOT%{_bindir}
|
||||
install -m755 scylla-blocktune $RPM_BUILD_ROOT%{_prefix}/lib/scylla/
|
||||
install -m755 scylla-housekeeping $RPM_BUILD_ROOT%{_prefix}/lib/scylla/
|
||||
if @@HOUSEKEEPING_CONF@@; then
|
||||
install -m644 conf/housekeeping.cfg $RPM_BUILD_ROOT%{_sysconfdir}/scylla/
|
||||
install -m644 conf/housekeeping.cfg $RPM_BUILD_ROOT%{_sysconfdir}/scylla.d/
|
||||
fi
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_docdir}/scylla
|
||||
install -m644 README.md $RPM_BUILD_ROOT%{_docdir}/scylla/
|
||||
@@ -101,6 +101,7 @@ install -d -m755 $RPM_BUILD_ROOT%{_sharedstatedir}/scylla/
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_sharedstatedir}/scylla/data
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_sharedstatedir}/scylla/commitlog
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_sharedstatedir}/scylla/coredump
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_sharedstatedir}/scylla-housekeeping
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_prefix}/lib/scylla/swagger-ui
|
||||
cp -r swagger-ui/dist $RPM_BUILD_ROOT%{_prefix}/lib/scylla/swagger-ui
|
||||
install -d -m755 $RPM_BUILD_ROOT%{_prefix}/lib/scylla/api
|
||||
@@ -110,8 +111,8 @@ cp -r scylla-housekeeping $RPM_BUILD_ROOT%{_prefix}/lib/scylla/scylla-housekeepi
|
||||
cp -P dist/common/sbin/* $RPM_BUILD_ROOT%{_sbindir}/
|
||||
|
||||
%pre server
|
||||
/usr/sbin/groupadd scylla 2> /dev/null || :
|
||||
/usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
|
||||
getent group scylla || /usr/sbin/groupadd scylla 2> /dev/null || :
|
||||
getent passwd scylla || /usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
|
||||
|
||||
%post server
|
||||
# Upgrade coredump settings
|
||||
@@ -193,6 +194,7 @@ rm -rf $RPM_BUILD_ROOT
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/data
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/commitlog
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/coredump
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla-housekeeping
|
||||
|
||||
%package conf
|
||||
Group: Applications/Databases
|
||||
@@ -216,7 +218,7 @@ mv /tmp/scylla.yaml /etc/scylla/scylla.yaml
|
||||
%config(noreplace) %{_sysconfdir}/scylla/scylla.yaml
|
||||
%config(noreplace) %{_sysconfdir}/scylla/cassandra-rackdc.properties
|
||||
%if %is_housekeeping_conf
|
||||
%config(noreplace) %{_sysconfdir}/scylla/housekeeping.cfg
|
||||
%config(noreplace) %{_sysconfdir}/scylla.d/housekeeping.cfg
|
||||
%endif
|
||||
|
||||
|
||||
|
||||
3
dist/ubuntu/build_deb.sh
vendored
3
dist/ubuntu/build_deb.sh
vendored
@@ -51,6 +51,9 @@ fi
|
||||
if [ ! -f /usr/bin/wget ]; then
|
||||
sudo apt-get -y install wget
|
||||
fi
|
||||
if [ ! -f /usr/bin/lsb_release ]; then
|
||||
sudo apt-get -y install lsb-release
|
||||
fi
|
||||
|
||||
DISTRIBUTION=`lsb_release -i|awk '{print $3}'`
|
||||
CODENAME=`lsb_release -c|awk '{print $2}'`
|
||||
|
||||
2
dist/ubuntu/control.in
vendored
2
dist/ubuntu/control.in
vendored
@@ -16,7 +16,7 @@ Conflicts: scylla-server (<< 1.1)
|
||||
|
||||
Package: scylla-server
|
||||
Architecture: amd64
|
||||
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, util-linux, realpath, python3-yaml, python3, uuid-runtime, @@DEPENDS@@
|
||||
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, util-linux, realpath, python3-yaml, python3, uuid-runtime, pciutils, @@DEPENDS@@
|
||||
Description: Scylla database server binaries
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
|
||||
1
dist/ubuntu/debian/scylla-server.dirs
vendored
1
dist/ubuntu/debian/scylla-server.dirs
vendored
@@ -4,3 +4,4 @@ var/lib/scylla
|
||||
var/lib/scylla/data
|
||||
var/lib/scylla/commitlog
|
||||
var/lib/scylla/coredump
|
||||
var/lib/scylla-housekeeping
|
||||
|
||||
1
dist/ubuntu/debian/scylla-server.postinst
vendored
1
dist/ubuntu/debian/scylla-server.postinst
vendored
@@ -10,6 +10,7 @@ if [ "$1" = configure ]; then
|
||||
--disabled-password \
|
||||
--group scylla
|
||||
chown -R scylla:scylla /var/lib/scylla
|
||||
chown -R scylla:scylla /var/lib/scylla-housekeeping
|
||||
fi
|
||||
|
||||
ln -sfT /etc/scylla /var/lib/scylla/conf
|
||||
|
||||
@@ -23,13 +23,16 @@ description "A timer job file for running scylla-housekeeping"
|
||||
start on started scylla-server
|
||||
stop on stopping scylla-server
|
||||
|
||||
setuid scylla
|
||||
setgid scylla
|
||||
|
||||
script
|
||||
# make sure scylla is up before checking for the version
|
||||
sleep 5
|
||||
/usr/lib/scylla/scylla-housekeeping --uuid-file /etc/scylla.d/housekeeping.uuid -c /etc/scylla.d/housekeeping.cfg -q version --mode r || true
|
||||
/usr/lib/scylla/scylla-housekeeping --uuid-file /var/lib/scylla-housekeeping/housekeeping.uuid -c /etc/scylla.d/housekeeping.cfg -q version --mode r || true
|
||||
while [ 1 ]
|
||||
do
|
||||
sleep 1d
|
||||
/usr/lib/scylla/scylla-housekeeping --uuid-file /etc/scylla.d/housekeeping.uuid -c /etc/scylla.d/housekeeping.cfg -q version --mode d || true
|
||||
/usr/lib/scylla/scylla-housekeeping --uuid-file /var/lib/scylla-housekeeping/housekeeping.uuid -c /etc/scylla.d/housekeeping.cfg -q version --mode d || true
|
||||
done
|
||||
end script
|
||||
|
||||
1
dist/ubuntu/debian/scylla-server.upstart
vendored
1
dist/ubuntu/debian/scylla-server.upstart
vendored
@@ -41,6 +41,7 @@ script
|
||||
fi
|
||||
. "$i"
|
||||
done
|
||||
export SCYLLA_CONF SCYLLA_HOME
|
||||
exec /usr/bin/scylla $SCYLLA_ARGS $SEASTAR_IO $DEV_MODE $CPUSET
|
||||
end script
|
||||
|
||||
|
||||
7
dist/ubuntu/dep/build_dependency.sh
vendored
7
dist/ubuntu/dep/build_dependency.sh
vendored
@@ -77,10 +77,11 @@ fi
|
||||
|
||||
if [ "$DISTRIBUTION" = "Debian" ] && [ "$VERSION_ID" = "8" ]; then
|
||||
if [ ! -f build/gcc-5_*.deb ]; then
|
||||
sudo cp dist/ubuntu/dep/debian-stretch-source.list /etc/apt/sources.list.d/
|
||||
sudo apt-get update
|
||||
cd build
|
||||
apt-get source gcc-5/stretch=5.4.1-2
|
||||
wget https://launchpad.net/debian/+archive/primary/+files/gcc-5_5.4.1-5.dsc
|
||||
wget https://launchpad.net/debian/+archive/primary/+files/gcc-5_5.4.1.orig.tar.gz
|
||||
wget https://launchpad.net/debian/+archive/primary/+files/gcc-5_5.4.1-5.diff.gz
|
||||
dpkg-source -x gcc-5_5.4.1-5.dsc
|
||||
cd gcc-5-5.4.1
|
||||
# resolve build time dependencies manually, since mk-build-deps doesn't works for gcc package
|
||||
sudo apt-get install -y g++-multilib libc6-dev-i386 lib32gcc1 libc6-dev-x32 libx32gcc1 libc6-dbg m4 libtool autoconf2.64 autogen gawk zlib1g-dev systemtap-sdt-dev gperf bison flex gdb texinfo locales sharutils libantlr-java libffi-dev gnat-4.9 libisl-dev libmpc-dev libmpfr-dev libgmp-dev dejagnu realpath chrpath quilt doxygen graphviz ghostscript texlive-latex-base xsltproc libxml2-utils docbook-xsl-ns
|
||||
|
||||
20
dist/ubuntu/dep/debian-gcc-5-jessie.diff
vendored
20
dist/ubuntu/dep/debian-gcc-5-jessie.diff
vendored
@@ -1,6 +1,5 @@
|
||||
diff -Nur debian/rules.conf /home/syuu/gcc-5-5.4.1/debian/rules.conf
|
||||
--- debian/rules.conf 2016-10-14 04:54:21.000000000 +0000
|
||||
+++ /home/syuu/gcc-5-5.4.1/debian/rules.conf 2016-10-12 17:28:54.138711378 +0000
|
||||
--- debian/rules.conf 2017-02-24 19:02:52.000000000 +0000
|
||||
+++ /home/syuu/gcc-5.5/gcc-5-5.4.1/debian/rules.conf 2017-02-24 18:13:59.000000000 +0000
|
||||
@@ -206,7 +206,7 @@
|
||||
ifneq (,$(filter $(distrelease),vivid))
|
||||
BINUTILSBDV = 2.25-3~
|
||||
@@ -10,14 +9,16 @@ diff -Nur debian/rules.conf /home/syuu/gcc-5-5.4.1/debian/rules.conf
|
||||
else ifneq (,$(filter $(distrelease),sid stretch xenial))
|
||||
BINUTILSBDV = 2.26.1
|
||||
endif
|
||||
@@ -387,9 +387,9 @@
|
||||
@@ -386,10 +386,10 @@
|
||||
MPFR_BUILD_DEP = libmpfr-dev (>= 3.0.0-9~),
|
||||
endif
|
||||
|
||||
ISL_BUILD_DEP = libisl-dev,
|
||||
-ifneq (,$(filter $(distrelease),jessie sid experimental))
|
||||
-ISL_BUILD_DEP = libisl-dev,
|
||||
-ifneq (,$(filter $(distrelease),jessie stretch sid experimental))
|
||||
- ISL_BUILD_DEP = libisl-dev (>= 0.14),
|
||||
-endif
|
||||
+#ifneq (,$(filter $(distrelease),jessie sid experimental))
|
||||
+#ISL_BUILD_DEP = libisl-dev,
|
||||
+#ifneq (,$(filter $(distrelease),jessie stretch sid experimental))
|
||||
+# ISL_BUILD_DEP = libisl-dev (>= 0.14),
|
||||
+#endif
|
||||
|
||||
@@ -37,9 +38,8 @@ diff -Nur debian/rules.conf /home/syuu/gcc-5-5.4.1/debian/rules.conf
|
||||
ifneq ($(DEB_CROSS),yes)
|
||||
# all archs for which to create b-d's
|
||||
any_archs = alpha amd64 armel armhf arm64 i386 mips mipsel mips64 mips64el powerpc ppc64 ppc64el m68k sh4 sparc64 s390x x32
|
||||
diff -Nur debian/rules.defs /home/syuu/gcc-5-5.4.1/debian/rules.defs
|
||||
--- debian/rules.defs 2016-10-14 04:54:21.000000000 +0000
|
||||
+++ /home/syuu/gcc-5-5.4.1/debian/rules.defs 2016-10-13 10:18:51.647631508 +0000
|
||||
--- debian/rules.defs 2017-02-24 19:02:52.000000000 +0000
|
||||
+++ /home/syuu/gcc-5.5/gcc-5-5.4.1/debian/rules.defs 2017-02-24 18:13:59.000000000 +0000
|
||||
@@ -412,7 +412,7 @@
|
||||
# gcc versions (fixincludes, libgcj-common) ...
|
||||
#with_common_pkgs := yes
|
||||
|
||||
2
dist/ubuntu/dep/debian-stretch-source.list
vendored
2
dist/ubuntu/dep/debian-stretch-source.list
vendored
@@ -1,2 +0,0 @@
|
||||
deb-src http://httpredir.debian.org/debian stretch main
|
||||
deb-src http://httpredir.debian.org/debian stretch-updates main
|
||||
@@ -94,7 +94,7 @@ frozen_mutation::frozen_mutation(const mutation& m)
|
||||
{
|
||||
mutation_partition_serializer part_ser(*m.schema(), m.partition());
|
||||
|
||||
ser::writer_of_mutation wom(_bytes);
|
||||
ser::writer_of_mutation<bytes_ostream> wom(_bytes);
|
||||
std::move(wom).write_table_id(m.schema()->id())
|
||||
.write_schema_version(m.schema()->version())
|
||||
.write_key(m.key())
|
||||
@@ -157,7 +157,7 @@ stop_iteration streamed_mutation_freezer::consume(range_tombstone&& rt) {
|
||||
|
||||
frozen_mutation streamed_mutation_freezer::consume_end_of_stream() {
|
||||
bytes_ostream out;
|
||||
ser::writer_of_mutation wom(out);
|
||||
ser::writer_of_mutation<bytes_ostream> wom(out);
|
||||
std::move(wom).write_table_id(_schema.id())
|
||||
.write_schema_version(_schema.version())
|
||||
.write_key(_key)
|
||||
@@ -192,7 +192,7 @@ class fragmenting_mutation_freezer {
|
||||
private:
|
||||
future<> flush() {
|
||||
bytes_ostream out;
|
||||
ser::writer_of_mutation wom(out);
|
||||
ser::writer_of_mutation<bytes_ostream> wom(out);
|
||||
std::move(wom).write_table_id(_schema.id())
|
||||
.write_schema_version(_schema.version())
|
||||
.write_key(_key)
|
||||
|
||||
@@ -34,7 +34,7 @@ frozen_schema::frozen_schema(const schema_ptr& s)
|
||||
: _data([&s] {
|
||||
schema_mutations sm = db::schema_tables::make_table_mutations(s, api::new_timestamp());
|
||||
bytes_ostream out;
|
||||
ser::writer_of_schema wr(out);
|
||||
ser::writer_of_schema<bytes_ostream> wr(out);
|
||||
std::move(wr).write_version(s->version())
|
||||
.write_mutations(sm)
|
||||
.end_schema();
|
||||
|
||||
@@ -50,6 +50,12 @@ public:
|
||||
// for real time waits.
|
||||
};
|
||||
|
||||
// Returns a time point which is earlier from t by d, or minimum time point if it cannot be represented.
|
||||
template<typename Clock, typename Duration, typename Rep, typename Period>
|
||||
inline
|
||||
auto saturating_subtract(std::chrono::time_point<Clock, Duration> t, std::chrono::duration<Rep, Period> d) -> decltype(t) {
|
||||
return std::max(t, decltype(t)::min() + d) - d;
|
||||
}
|
||||
|
||||
using expiry_opt = std::experimental::optional<gc_clock::time_point>;
|
||||
using ttl_opt = std::experimental::optional<gc_clock::duration>;
|
||||
|
||||
@@ -1193,10 +1193,7 @@ void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) {
|
||||
std::experimental::optional<endpoint_state> local_ep_state;
|
||||
if (endpoint_state_map.count(ep) > 0) {
|
||||
local_ep_state = endpoint_state_map.at(ep);
|
||||
}
|
||||
auto eps_old = get_endpoint_state_for_endpoint(ep);
|
||||
if (!is_dead_state(eps) && !_in_shadow_round) {
|
||||
if (endpoint_state_map.count(ep)) {
|
||||
logger.debug("Node {} has restarted, now UP, status = {}", ep, get_gossip_status(eps));
|
||||
@@ -1207,24 +1204,27 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
|
||||
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
|
||||
endpoint_state_map[ep] = eps;
|
||||
|
||||
auto& ep_state = endpoint_state_map.at(ep);
|
||||
|
||||
if (local_ep_state) {
|
||||
if (eps_old) {
|
||||
// the node restarted: it is up to the subscriber to take whatever action is necessary
|
||||
_subscribers.for_each([ep, local_ep_state] (auto& subscriber) {
|
||||
subscriber->on_restart(ep, *local_ep_state);
|
||||
_subscribers.for_each([ep, eps_old] (auto& subscriber) {
|
||||
subscriber->on_restart(ep, *eps_old);
|
||||
});
|
||||
}
|
||||
|
||||
auto& ep_state = endpoint_state_map.at(ep);
|
||||
if (!is_dead_state(ep_state)) {
|
||||
mark_alive(ep, ep_state);
|
||||
} else {
|
||||
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
|
||||
mark_dead(ep, ep_state);
|
||||
}
|
||||
_subscribers.for_each([ep, ep_state] (auto& subscriber) {
|
||||
subscriber->on_join(ep, ep_state);
|
||||
});
|
||||
|
||||
auto eps_new = get_endpoint_state_for_endpoint(ep);
|
||||
if (eps_new) {
|
||||
_subscribers.for_each([ep, eps_new] (auto& subscriber) {
|
||||
subscriber->on_join(ep, *eps_new);
|
||||
});
|
||||
}
|
||||
// check this at the end so nodes will learn about the endpoint
|
||||
if (is_shutdown(ep)) {
|
||||
mark_as_shutdown(ep);
|
||||
@@ -1399,9 +1399,11 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
local_state.add_application_state(entry.first, entry.second);
|
||||
}
|
||||
|
||||
auto generation = local_state.get_heart_beat_state().get_generation();
|
||||
|
||||
//notify snitches that Gossiper is about to start
|
||||
return locator::i_endpoint_snitch::get_local_snitch_ptr()->gossiper_starting().then([this, &local_state] {
|
||||
logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation());
|
||||
return locator::i_endpoint_snitch::get_local_snitch_ptr()->gossiper_starting().then([this, generation] {
|
||||
logger.trace("gossip started with generation {}", generation);
|
||||
_enabled = true;
|
||||
_nr_run = 0;
|
||||
_scheduled_gossip_task.arm(INTERVAL);
|
||||
@@ -1498,16 +1500,19 @@ future<> gossiper::add_local_application_state(application_state state, versione
|
||||
logger.error(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
endpoint_state& ep_state = gossiper.endpoint_state_map.at(ep_addr);
|
||||
endpoint_state ep_state_before = gossiper.endpoint_state_map.at(ep_addr);
|
||||
// Fire "before change" notifications:
|
||||
gossiper.do_before_change_notifications(ep_addr, ep_state, state, value);
|
||||
gossiper.do_before_change_notifications(ep_addr, ep_state_before, state, value);
|
||||
// Notifications may have taken some time, so preventively raise the version
|
||||
// of the new value, otherwise it could be ignored by the remote node
|
||||
// if another value with a newer version was received in the meantime:
|
||||
value = storage_service_value_factory().clone_with_higher_version(value);
|
||||
// Add to local application state and fire "on change" notifications:
|
||||
ep_state.add_application_state(state, value);
|
||||
gossiper.do_on_change_notifications(ep_addr, state, value);
|
||||
if (gossiper.endpoint_state_map.count(ep_addr)) {
|
||||
auto& ep_state = gossiper.endpoint_state_map.at(ep_addr);
|
||||
ep_state.add_application_state(state, value);
|
||||
gossiper.do_on_change_notifications(ep_addr, state, value);
|
||||
}
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to apply application_state: {}", ep);
|
||||
});
|
||||
|
||||
@@ -303,10 +303,11 @@ def handle_visitors_state(info, hout, clases = []):
|
||||
name = "__".join(clases) if clases else cls["name"]
|
||||
frame = "empty_frame" if "final" in cls else "frame"
|
||||
fprintln(hout, Template("""
|
||||
template<typename Output>
|
||||
struct state_of_$name {
|
||||
$frame f;""").substitute({'name': name, 'frame': frame }))
|
||||
$frame<Output> f;""").substitute({'name': name, 'frame': frame }))
|
||||
if clases:
|
||||
local_state = "state_of_" + "__".join(clases[:-1])
|
||||
local_state = "state_of_" + "__".join(clases[:-1]) + '<Output>'
|
||||
fprintln(hout, Template(" $name _parent;").substitute({'name': local_state}))
|
||||
if "final" in cls:
|
||||
fprintln(hout, Template(" state_of_$name($state parent) : _parent(parent) {}").substitute({'name': name, 'state' : local_state}))
|
||||
@@ -327,18 +328,19 @@ def add_vector_node(hout, cls, members, base_state, current_node, ind):
|
||||
current = members[ind]
|
||||
typ = current["type"][1]
|
||||
fprintln(hout, Template("""
|
||||
template<typename Output>
|
||||
struct $node_name {
|
||||
bytes_ostream& _out;
|
||||
state_of_$base_state _state;
|
||||
place_holder _size;
|
||||
Outout& _out;
|
||||
state_of_$base_state<Output> _state;
|
||||
place_holder<Output> _size;
|
||||
size_type _count = 0;
|
||||
$node_name(bytes_ostream& out, state_of_$base_state state)
|
||||
$node_name(Output& out, state_of_$base_state<Output> state)
|
||||
: _out(out)
|
||||
, _state(state)
|
||||
, _size(start_place_holder(out))
|
||||
{
|
||||
}
|
||||
$next_state end_$name() {
|
||||
$next_state<Output> end_$name() {
|
||||
_size.set(_out, _count);
|
||||
}""").substitute({'node_name': '', 'name': current["name"] }))
|
||||
|
||||
@@ -363,7 +365,7 @@ def optional_add_methods(typ):
|
||||
}""")).substitute({'type' : added_type})
|
||||
if is_local_type(typ):
|
||||
res = res + Template(reindent(4, """
|
||||
writer_of_$type write() {
|
||||
writer_of_$type<Output> write() {
|
||||
serialize(_out, true);
|
||||
return {_out};
|
||||
}""")).substitute({'type' : param_type(typ)})
|
||||
@@ -381,7 +383,7 @@ def vector_add_method(current, base_state):
|
||||
}""").substitute({'type': param_type(typ[1][0]), 'name': current["name"]})
|
||||
else:
|
||||
res = res + Template("""
|
||||
writer_of_$type add() {
|
||||
writer_of_$type<Output> add() {
|
||||
_count++;
|
||||
return {_out};
|
||||
}""").substitute({'type': flat_type(typ[1][0]), 'name': current["name"]})
|
||||
@@ -391,7 +393,7 @@ def vector_add_method(current, base_state):
|
||||
_count++;
|
||||
}""").substitute({'type': param_view_type(typ[1][0])})
|
||||
return res + Template("""
|
||||
after_${basestate}__$name end_$name() && {
|
||||
after_${basestate}__$name<Output> end_$name() && {
|
||||
_size.set(_out, _count);
|
||||
return { _out, std::move(_state) };
|
||||
}
|
||||
@@ -418,7 +420,7 @@ def add_param_writer_basic_type(name, base_state, typ, var_type = "", var_index
|
||||
typ = 'const ' + typ + '&'
|
||||
|
||||
return Template(reindent(4, """
|
||||
after_${base_state}__$name write_$name$var_type($typ t) && {
|
||||
after_${base_state}__$name<Output> write_$name$var_type($typ t) && {
|
||||
$set_varient_index
|
||||
serialize(_out, t);
|
||||
$set_command
|
||||
@@ -431,7 +433,7 @@ def add_param_writer_object(name, base_state, typ, var_type = "", var_index = No
|
||||
var_index = "uint32_t(" + str(var_index) +")"
|
||||
set_varient_index = "serialize(_out, " + var_index +");\n" if var_index is not None else ""
|
||||
ret = Template(reindent(4,"""
|
||||
${base_state}__${name}$var_type1 start_${name}$var_type() && {
|
||||
${base_state}__${name}$var_type1<Output> start_${name}$var_type() && {
|
||||
$set_varient_index
|
||||
return { _out, std::move(_state) };
|
||||
}
|
||||
@@ -443,9 +445,9 @@ def add_param_writer_object(name, base_state, typ, var_type = "", var_index = No
|
||||
return_command = "{ _out, std::move(_state._parent) }" if var_type is not "" and not root_node else "{ _out, std::move(_state) }"
|
||||
ret += Template(reindent(4, """
|
||||
template<typename Serializer>
|
||||
after_${base_state}__${name} ${name}$var_type(Serializer&& f) && {
|
||||
after_${base_state}__${name}<Output> ${name}$var_type(Serializer&& f) && {
|
||||
$set_varient_index
|
||||
f(writer_of_$typ(_out));
|
||||
f(writer_of_$typ<Output>(_out));
|
||||
$set_command
|
||||
return $return_command;
|
||||
}""")).substitute(locals())
|
||||
@@ -458,7 +460,7 @@ def add_param_write(current, base_state, vector = False, root_node = False):
|
||||
res = res + add_param_writer_basic_type(current["name"], base_state, typ)
|
||||
elif is_optional(typ):
|
||||
res = res + Template(reindent(4, """
|
||||
after_${basestate}__$name skip_$name() && {
|
||||
after_${basestate}__$name<Output> skip_$name() && {
|
||||
serialize(_out, false);
|
||||
return { _out, std::move(_state) };
|
||||
}""")).substitute({'type': param_type(typ), 'name': current["name"], 'basestate' : base_state})
|
||||
@@ -472,11 +474,11 @@ def add_param_write(current, base_state, vector = False, root_node = False):
|
||||
set_size = "_size.set(_out, 0);" if vector else "serialize(_out, size_type(0));"
|
||||
|
||||
res = res + Template("""
|
||||
${basestate}__$name start_$name() && {
|
||||
${basestate}__$name<Output> start_$name() && {
|
||||
return { _out, std::move(_state) };
|
||||
}
|
||||
|
||||
after_${basestate}__$name skip_$name() && {
|
||||
after_${basestate}__$name<Output> skip_$name() && {
|
||||
$set
|
||||
return { _out, std::move(_state) };
|
||||
}
|
||||
@@ -506,7 +508,7 @@ def get_return_struct(variant_node, clases):
|
||||
|
||||
def add_variant_end_method(base_state, name, clases):
|
||||
|
||||
return_struct = "after_" + base_state
|
||||
return_struct = "after_" + base_state + '<Output>'
|
||||
return Template("""
|
||||
$return_struct end_$name() && {
|
||||
_state.f.end(_out);
|
||||
@@ -520,7 +522,7 @@ def add_end_method(parents, name, variant_node = False, return_value = True):
|
||||
return add_variant_end_method(parents, name, return_value)
|
||||
base_state = parents + "__" + name
|
||||
if return_value:
|
||||
return_struct = "after_" + base_state
|
||||
return_struct = "after_" + base_state + '<Output>'
|
||||
return Template("""
|
||||
$return_struct end_$name() && {
|
||||
_state.f.end(_out);
|
||||
@@ -534,7 +536,7 @@ def add_end_method(parents, name, variant_node = False, return_value = True):
|
||||
""").substitute({'name': name, 'basestate':base_state})
|
||||
|
||||
def add_vector_placeholder():
|
||||
return """ place_holder _size;
|
||||
return """ place_holder<Output> _size;
|
||||
size_type _count = 0;"""
|
||||
|
||||
def add_node(hout, name, member, base_state, prefix, parents, fun, is_type_vector = False, is_type_final = False):
|
||||
@@ -554,21 +556,22 @@ def add_node(hout, name, member, base_state, prefix, parents, fun, is_type_vecto
|
||||
else:
|
||||
state_init = ""
|
||||
if prefix == "writer_of_":
|
||||
constructor = Template("""$name(bytes_ostream& out)
|
||||
constructor = Template("""$name(Output& out)
|
||||
: _out(out)
|
||||
, _state{start_frame(out)}${vector_init}
|
||||
{}""").substitute({'name': struct_name, 'vector_init' : vector_init})
|
||||
elif state_init != "":
|
||||
constructor = Template("""$name(bytes_ostream& out, state_of_$state state)
|
||||
constructor = Template("""$name(Output& out, state_of_$state<Output> state)
|
||||
: _out(out)
|
||||
, $state_init${vector_init}
|
||||
{}""").substitute({'name': struct_name, 'vector_init' : vector_init, 'state' : parents, 'state_init' : state_init})
|
||||
else:
|
||||
constructor = ""
|
||||
fprintln(hout, Template("""
|
||||
template<typename Output>
|
||||
struct $name {
|
||||
bytes_ostream& _out;
|
||||
state_of_$state _state;
|
||||
Output& _out;
|
||||
state_of_$state<Output> _state;
|
||||
${vector_placeholder}
|
||||
${constructor}
|
||||
$fun
|
||||
@@ -588,8 +591,9 @@ def add_optional_node(hout, typ):
|
||||
return
|
||||
optional_nodes.add(full_type)
|
||||
fprintln(hout, Template(reindent(0,"""
|
||||
template<typename Output>
|
||||
struct writer_of_$type {
|
||||
bytes_ostream& _out;
|
||||
Output& _out;
|
||||
$add_method
|
||||
};""")).substitute({'type': full_type, 'add_method': optional_add_methods(typ[1][0])}))
|
||||
|
||||
@@ -604,7 +608,7 @@ def add_variant_nodes(hout, member, param, base_state, parents, classes):
|
||||
new_member = {"type": typ, "name" : "variant"}
|
||||
return_struct = "after_" + par
|
||||
end_method = Template("""
|
||||
$return_struct end_variant() && {
|
||||
$return_struct<Output> end_variant() && {
|
||||
_state.f.end(_out);
|
||||
return { _out, std::move(_state._parent) };
|
||||
}
|
||||
|
||||
@@ -27,5 +27,5 @@ class partition {
|
||||
class reconcilable_result {
|
||||
uint32_t row_count();
|
||||
std::vector<partition> partitions();
|
||||
query::short_read is_short_read() [[version 1.7]] = query::short_read::no;
|
||||
query::short_read is_short_read() [[version 1.6]] = query::short_read::no;
|
||||
};
|
||||
|
||||
@@ -29,7 +29,7 @@ class result {
|
||||
bytes buf();
|
||||
std::experimental::optional<query::result_digest> digest();
|
||||
api::timestamp_type last_modified() [ [version 1.2] ] = api::missing_timestamp;
|
||||
query::short_read is_short_read() [[version 1.7]] = query::short_read::no;
|
||||
query::short_read is_short_read() [[version 1.6]] = query::short_read::no;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
12
main.cc
12
main.cc
@@ -426,6 +426,8 @@ int main(int ac, char** av) {
|
||||
if (opts.count("developer-mode")) {
|
||||
smp::invoke_on_all([] { engine().set_strict_dma(false); }).get();
|
||||
}
|
||||
supervisor_notify("creating tracing");
|
||||
tracing::tracing::create_tracing("trace_keyspace_helper").get();
|
||||
supervisor_notify("creating snitch");
|
||||
i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
|
||||
// #293 - do not stop anything
|
||||
@@ -580,7 +582,12 @@ int main(int ac, char** av) {
|
||||
// we will have races between the compaction and loading processes
|
||||
// We also want to trigger regular compaction on boot.
|
||||
db.invoke_on_all([&proxy] (database& db) {
|
||||
for (auto& x : db.get_column_families()) {
|
||||
// avoid excessive disk usage by making sure all shards reshard
|
||||
// shared sstables in the same order. That's done by choosing
|
||||
// column families in UUID order, and each individual column
|
||||
// family will reshard shared sstables in generation order.
|
||||
auto cfs = boost::copy_range<std::map<utils::UUID, lw_shared_ptr<column_family>>>(db.get_column_families());
|
||||
for (auto& x : cfs) {
|
||||
column_family& cf = *(x.second);
|
||||
// We start the rewrite, but do not wait for it.
|
||||
cf.start_rewrite();
|
||||
@@ -632,7 +639,7 @@ int main(int ac, char** av) {
|
||||
gms::get_local_gossiper().wait_for_gossip_to_settle().get();
|
||||
api::set_server_gossip_settle(ctx).get();
|
||||
supervisor_notify("starting tracing");
|
||||
tracing::tracing::create_tracing("trace_keyspace_helper").get();
|
||||
tracing::tracing::start_tracing().get();
|
||||
supervisor_notify("starting native transport");
|
||||
service::get_local_storage_service().start_native_transport().get();
|
||||
if (start_thrift) {
|
||||
@@ -659,6 +666,7 @@ int main(int ac, char** av) {
|
||||
uint16_t pport = cfg->prometheus_port();
|
||||
if (pport) {
|
||||
pctx.metric_help = "Scylla server statistics";
|
||||
pctx.prefix = cfg->prometheus_prefix();
|
||||
prometheus_server.start().get();
|
||||
prometheus::start(prometheus_server, pctx);
|
||||
prometheus_server.listen(ipv4_addr{prom_addr.addresses[0].in.s_addr, pport}).get();
|
||||
|
||||
69
memtable.cc
69
memtable.cc
@@ -65,17 +65,15 @@ future<> memtable::clear_gently() noexcept {
|
||||
auto t = std::make_unique<seastar::thread>(attr, [this] {
|
||||
auto& alloc = allocator();
|
||||
|
||||
// entries can no longer be moved after unlink_leftmost_without_rebalance()
|
||||
// so need to disable compaction.
|
||||
logalloc::reclaim_lock rl(*this);
|
||||
|
||||
auto p = std::move(partitions);
|
||||
while (!p.empty()) {
|
||||
auto batch_size = std::min<size_t>(p.size(), 32);
|
||||
auto dirty_before = dirty_size();
|
||||
with_allocator(alloc, [&] () noexcept {
|
||||
while (batch_size--) {
|
||||
alloc.destroy(p.unlink_leftmost_without_rebalance());
|
||||
p.erase_and_dispose(p.begin(), [&] (auto e) {
|
||||
alloc.destroy(e);
|
||||
});
|
||||
}
|
||||
});
|
||||
remove_flushed_memory(dirty_before - dirty_size());
|
||||
@@ -205,19 +203,23 @@ protected:
|
||||
, _range(&range)
|
||||
{ }
|
||||
|
||||
memtable_entry* fetch_next_entry() {
|
||||
memtable_entry* fetch_entry() {
|
||||
update_iterators();
|
||||
if (_i == _end) {
|
||||
return nullptr;
|
||||
} else {
|
||||
memtable_entry& e = *_i;
|
||||
++_i;
|
||||
_last = e.key();
|
||||
_memtable->upgrade_entry(e);
|
||||
return &e;
|
||||
}
|
||||
}
|
||||
|
||||
void advance() {
|
||||
memtable_entry& e = *_i;
|
||||
_last = e.key();
|
||||
++_i;
|
||||
}
|
||||
|
||||
logalloc::allocating_section& read_section() {
|
||||
return _memtable->_read_section;
|
||||
}
|
||||
@@ -287,14 +289,18 @@ public:
|
||||
return _delegate();
|
||||
}
|
||||
|
||||
logalloc::reclaim_lock _(region());
|
||||
managed_bytes::linearization_context_guard lcg;
|
||||
memtable_entry* e = fetch_next_entry();
|
||||
if (!e) {
|
||||
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
|
||||
} else {
|
||||
return make_ready_future<streamed_mutation_opt>(e->read(mtbl(), schema(), _slice));
|
||||
}
|
||||
return read_section()(region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
memtable_entry* e = fetch_entry();
|
||||
if (!e) {
|
||||
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
|
||||
} else {
|
||||
auto ret = make_ready_future<streamed_mutation_opt>(e->read(mtbl(), schema(), _slice));
|
||||
advance();
|
||||
return ret;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -391,19 +397,24 @@ public:
|
||||
flush_reader& operator=(const flush_reader&) = delete;
|
||||
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
logalloc::reclaim_lock _(region());
|
||||
managed_bytes::linearization_context_guard lcg;
|
||||
memtable_entry* e = fetch_next_entry();
|
||||
if (!e) {
|
||||
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
|
||||
} else {
|
||||
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), query::full_slice, e->key().key());
|
||||
auto snp = e->partition().read(schema());
|
||||
auto mpsr = make_partition_snapshot_reader<partition_snapshot_accounter>(schema(), e->key(), std::move(cr), snp, region(), read_section(), mtbl(), _flushed_memory);
|
||||
_flushed_memory.account_component(*e);
|
||||
_flushed_memory.account_component(*snp);
|
||||
return make_ready_future<streamed_mutation_opt>(std::move(mpsr));
|
||||
}
|
||||
return read_section()(region(), [&] {
|
||||
return with_linearized_managed_bytes([&] {
|
||||
memtable_entry* e = fetch_entry();
|
||||
if (!e) {
|
||||
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
|
||||
} else {
|
||||
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), query::full_slice, e->key().key());
|
||||
auto snp = e->partition().read(schema());
|
||||
auto mpsr = make_partition_snapshot_reader<partition_snapshot_accounter>(schema(), e->key(), std::move(cr),
|
||||
snp, region(), read_section(), mtbl(), _flushed_memory);
|
||||
_flushed_memory.account_component(*e);
|
||||
_flushed_memory.account_component(*snp);
|
||||
auto ret = make_ready_future<streamed_mutation_opt>(std::move(mpsr));
|
||||
advance();
|
||||
return ret;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -272,7 +272,13 @@ void messaging_service::start_listen() {
|
||||
if (listen_to_bc) {
|
||||
_server_tls[1] = listen(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
}
|
||||
// Do this on just cpu 0, to avoid duplicate logs.
|
||||
if (engine().cpu_id() == 0) {
|
||||
if (_server_tls[0]) {
|
||||
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
|
||||
}
|
||||
logger.info("Starting Messaging Service on port {}", _port);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,23 +302,16 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
_rpc->set_logger([] (const sstring& log) {
|
||||
rpc_logger.info("{}", log);
|
||||
});
|
||||
register_handler(this, messaging_verb::CLIENT_ID, [] (rpc::client_info& ci, gms::inet_address broadcast_address, uint32_t src_cpu_id) {
|
||||
register_handler(this, messaging_verb::CLIENT_ID, [] (rpc::client_info& ci, gms::inet_address broadcast_address, uint32_t src_cpu_id, rpc::optional<uint64_t> max_result_size) {
|
||||
ci.attach_auxiliary("baddr", broadcast_address);
|
||||
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
|
||||
ci.attach_auxiliary("max_result_size", max_result_size.value_or(query::result_memory_limiter::maximum_result_size));
|
||||
return rpc::no_wait;
|
||||
});
|
||||
|
||||
if (listen_now) {
|
||||
start_listen();
|
||||
}
|
||||
|
||||
// Do this on just cpu 0, to avoid duplicate logs.
|
||||
if (engine().cpu_id() == 0) {
|
||||
if (_server_tls[0]) {
|
||||
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
|
||||
}
|
||||
logger.info("Starting Messaging Service on port {}", _port);
|
||||
}
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
@@ -385,6 +384,8 @@ static unsigned get_rpc_client_idx(messaging_verb verb) {
|
||||
verb == messaging_verb::STREAM_MUTATION_DONE ||
|
||||
verb == messaging_verb::COMPLETE_MESSAGE) {
|
||||
idx = 2;
|
||||
} else if (verb == messaging_verb::MUTATION_DONE) {
|
||||
idx = 3;
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
@@ -499,7 +500,8 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
|
||||
it = _clients[idx].emplace(id, shard_info(std::move(client))).first;
|
||||
uint32_t src_cpu_id = engine().cpu_id();
|
||||
_rpc->make_client<rpc::no_wait_type(gms::inet_address, uint32_t)>(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id);
|
||||
_rpc->make_client<rpc::no_wait_type(gms::inet_address, uint32_t, uint64_t)>(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id,
|
||||
query::result_memory_limiter::maximum_result_size);
|
||||
return it->second.rpc_client;
|
||||
}
|
||||
|
||||
|
||||
@@ -196,7 +196,7 @@ private:
|
||||
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server;
|
||||
::shared_ptr<seastar::tls::server_credentials> _credentials;
|
||||
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server_tls;
|
||||
std::array<clients_map, 3> _clients;
|
||||
std::array<clients_map, 4> _clients;
|
||||
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
|
||||
bool _stopping = false;
|
||||
public:
|
||||
|
||||
@@ -123,7 +123,7 @@ public:
|
||||
uint32_t partition_limit, CompactedMutationsConsumer consumer)
|
||||
: _schema(s)
|
||||
, _query_time(query_time)
|
||||
, _gc_before(query_time - s.gc_grace_seconds())
|
||||
, _gc_before(saturating_subtract(query_time, s.gc_grace_seconds()))
|
||||
, _can_gc(always_gc)
|
||||
, _slice(slice)
|
||||
, _row_limit(limit)
|
||||
@@ -139,7 +139,7 @@ public:
|
||||
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable)
|
||||
: _schema(s)
|
||||
, _query_time(compaction_time)
|
||||
, _gc_before(_query_time - s.gc_grace_seconds())
|
||||
, _gc_before(saturating_subtract(_query_time, s.gc_grace_seconds()))
|
||||
, _get_max_purgeable(std::move(get_max_purgeable))
|
||||
, _can_gc([this] (tombstone t) { return can_gc(t); })
|
||||
, _slice(query::full_slice)
|
||||
|
||||
@@ -256,9 +256,7 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
|
||||
try {
|
||||
for(auto&& r : ck_ranges) {
|
||||
for (const rows_entry& e : x.range(schema, r)) {
|
||||
std::unique_ptr<rows_entry> copy(current_allocator().construct<rows_entry>(e));
|
||||
_rows.insert(_rows.end(), *copy);
|
||||
copy.release();
|
||||
_rows.push_back(*current_allocator().construct<rows_entry>(e));
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
@@ -578,7 +576,7 @@ void mutation_partition::for_each_row(const schema& schema, const query::cluster
|
||||
template<typename RowWriter>
|
||||
void write_cell(RowWriter& w, const query::partition_slice& slice, ::atomic_cell_view c) {
|
||||
assert(c.is_live());
|
||||
ser::writer_of_qr_cell wr = w.add().write();
|
||||
auto wr = w.add().write();
|
||||
auto after_timestamp = [&, wr = std::move(wr)] () mutable {
|
||||
if (slice.options.contains<query::partition_slice::option::send_timestamp>()) {
|
||||
return std::move(wr).write_timestamp(c.timestamp());
|
||||
@@ -769,13 +767,14 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|
||||
// If ck:s exist, and we do a restriction on them, we either have maching
|
||||
// rows, or return nothing, since cql does not allow "is null".
|
||||
if (row_count == 0
|
||||
&& (has_ck_selector(pw.ranges())
|
||||
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
|
||||
pw.retract();
|
||||
} else {
|
||||
pw.row_count() += row_count ? : 1;
|
||||
&& (has_ck_selector(pw.ranges())
|
||||
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
|
||||
pw.retract();
|
||||
} else {
|
||||
pw.row_count() += row_count ? : 1;
|
||||
pw.partition_count() += 1;
|
||||
std::move(rows_wr).end_rows().end_qr_partition();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
@@ -1177,7 +1176,7 @@ uint32_t mutation_partition::do_compact(const schema& s,
|
||||
{
|
||||
assert(row_limit > 0);
|
||||
|
||||
auto gc_before = query_time - s.gc_grace_seconds();
|
||||
auto gc_before = saturating_subtract(query_time, s.gc_grace_seconds());
|
||||
|
||||
auto should_purge_tombstone = [&] (const tombstone& t) {
|
||||
return t.deletion_time < gc_before && can_gc(t);
|
||||
@@ -1670,10 +1669,11 @@ class mutation_querier {
|
||||
const schema& _schema;
|
||||
query::result_memory_accounter& _memory_accounter;
|
||||
query::result::partition_writer& _pw;
|
||||
ser::qr_partition__static_row__cells _static_cells_wr;
|
||||
ser::qr_partition__static_row__cells<bytes_ostream> _static_cells_wr;
|
||||
bool _live_data_in_static_row{};
|
||||
uint32_t _live_clustering_rows = 0;
|
||||
stdx::optional<ser::qr_partition__rows> _rows_wr;
|
||||
stdx::optional<ser::qr_partition__rows<bytes_ostream>> _rows_wr;
|
||||
bool _short_reads_allowed;
|
||||
private:
|
||||
void query_static_row(const row& r, tombstone current_tombstone);
|
||||
void prepare_writers();
|
||||
@@ -1695,6 +1695,7 @@ mutation_querier::mutation_querier(const schema& s, query::result::partition_wri
|
||||
, _memory_accounter(memory_accounter)
|
||||
, _pw(pw)
|
||||
, _static_cells_wr(pw.start().start_static_row().start_cells())
|
||||
, _short_reads_allowed(pw.slice().options.contains<query::partition_slice::option::allow_short_read>())
|
||||
{
|
||||
}
|
||||
|
||||
@@ -1706,7 +1707,13 @@ void mutation_querier::query_static_row(const row& r, tombstone current_tombston
|
||||
auto start = _static_cells_wr._out.size();
|
||||
get_compacted_row_slice(_schema, slice, column_kind::static_column,
|
||||
r, slice.static_columns, _static_cells_wr);
|
||||
_memory_accounter.update_and_check(_static_cells_wr._out.size() - start);
|
||||
_memory_accounter.update(_static_cells_wr._out.size() - start);
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__static_row__cells<seastar::measuring_output_stream> out(stream, { });
|
||||
get_compacted_row_slice(_schema, slice, column_kind::static_column,
|
||||
r, slice.static_columns, _static_cells_wr);
|
||||
_memory_accounter.update(stream.size());
|
||||
}
|
||||
if (_pw.requested_digest()) {
|
||||
::feed_hash(_pw.digest(), current_tombstone);
|
||||
@@ -1744,23 +1751,32 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, tombstone current_
|
||||
_pw.last_modified() = std::max({_pw.last_modified(), current_tombstone.timestamp, t});
|
||||
}
|
||||
|
||||
auto stop = stop_iteration::no;
|
||||
if (_pw.requested_result()) {
|
||||
auto start = _rows_wr->_out.size();
|
||||
auto write_row = [&] (auto& rows_writer) {
|
||||
auto cells_wr = [&] {
|
||||
if (slice.options.contains(query::partition_slice::option::send_clustering_key)) {
|
||||
return _rows_wr->add().write_key(cr.key()).start_cells().start_cells();
|
||||
return rows_writer.add().write_key(cr.key()).start_cells().start_cells();
|
||||
} else {
|
||||
return _rows_wr->add().skip_key().start_cells().start_cells();
|
||||
return rows_writer.add().skip_key().start_cells().start_cells();
|
||||
}
|
||||
}();
|
||||
get_compacted_row_slice(_schema, slice, column_kind::regular_column, cr.cells(), slice.regular_columns, cells_wr);
|
||||
std::move(cells_wr).end_cells().end_cells().end_qr_clustered_row();
|
||||
};
|
||||
|
||||
auto stop = stop_iteration::no;
|
||||
if (_pw.requested_result()) {
|
||||
auto start = _rows_wr->_out.size();
|
||||
write_row(*_rows_wr);
|
||||
stop = _memory_accounter.update_and_check(_rows_wr->_out.size() - start);
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__rows<seastar::measuring_output_stream> out(stream, { });
|
||||
write_row(out);
|
||||
stop = _memory_accounter.update_and_check(stream.size());
|
||||
}
|
||||
|
||||
_live_clustering_rows++;
|
||||
return stop;
|
||||
return stop && stop_iteration(_short_reads_allowed);
|
||||
}
|
||||
|
||||
uint32_t mutation_querier::consume_end_of_stream() {
|
||||
@@ -1776,15 +1792,16 @@ uint32_t mutation_querier::consume_end_of_stream() {
|
||||
_pw.retract();
|
||||
return 0;
|
||||
} else {
|
||||
auto live_rows = std::max(_live_clustering_rows, uint32_t(1));
|
||||
_pw.row_count() += live_rows;
|
||||
_pw.partition_count() += 1;
|
||||
std::move(*_rows_wr).end_rows().end_qr_partition();
|
||||
return std::max(_live_clustering_rows, uint32_t(1));
|
||||
return live_rows;
|
||||
}
|
||||
}
|
||||
|
||||
class query_result_builder {
|
||||
const schema& _schema;
|
||||
uint32_t _live_rows = 0;
|
||||
uint32_t _partitions = 0;
|
||||
query::result::builder& _rb;
|
||||
stdx::optional<query::result::partition_writer> _pw;
|
||||
stdx::optional<mutation_querier> _mutation_consumer;
|
||||
@@ -1819,9 +1836,7 @@ public:
|
||||
|
||||
stop_iteration consume_end_of_partition() {
|
||||
auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream();
|
||||
_live_rows += live_rows_in_partition;
|
||||
_partitions += live_rows_in_partition > 0;
|
||||
if (live_rows_in_partition && !_stop) {
|
||||
if (_short_read_allowed && live_rows_in_partition > 0 && !_stop) {
|
||||
_stop = _rb.memory_accounter().check();
|
||||
}
|
||||
if (_stop) {
|
||||
@@ -1830,17 +1845,17 @@ public:
|
||||
return _stop;
|
||||
}
|
||||
|
||||
data_query_result consume_end_of_stream() {
|
||||
return {_live_rows, _partitions};
|
||||
void consume_end_of_stream() {
|
||||
}
|
||||
};
|
||||
|
||||
future<data_query_result> data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit,
|
||||
gc_clock::time_point query_time, query::result::builder& builder)
|
||||
future<> data_query(
|
||||
schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit,
|
||||
gc_clock::time_point query_time, query::result::builder& builder)
|
||||
{
|
||||
if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) {
|
||||
return make_ready_future<data_query_result>();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
|
||||
@@ -1918,7 +1933,7 @@ public:
|
||||
// well. Next page fetch will ask for the next partition and if we
|
||||
// don't do that we could end up with an unbounded number of
|
||||
// partitions with only a static row.
|
||||
_stop = _stop || _memory_accounter.check();
|
||||
_stop = _stop || (_memory_accounter.check() && stop_iteration(_short_read_allowed));
|
||||
}
|
||||
_total_live_rows += _live_rows;
|
||||
_result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() });
|
||||
|
||||
@@ -189,17 +189,17 @@ mutation_partition_serializer::mutation_partition_serializer(const schema& schem
|
||||
|
||||
void
|
||||
mutation_partition_serializer::write(bytes_ostream& out) const {
|
||||
write(ser::writer_of_mutation_partition(out));
|
||||
write(ser::writer_of_mutation_partition<bytes_ostream>(out));
|
||||
}
|
||||
|
||||
void mutation_partition_serializer::write(ser::writer_of_mutation_partition&& wr) const
|
||||
void mutation_partition_serializer::write(ser::writer_of_mutation_partition<bytes_ostream>&& wr) const
|
||||
{
|
||||
write_serialized(std::move(wr), _schema, _p);
|
||||
}
|
||||
|
||||
void serialize_mutation_fragments(const schema& s, tombstone partition_tombstone,
|
||||
stdx::optional<static_row> sr, range_tombstone_list rts,
|
||||
std::deque<clustering_row> crs, ser::writer_of_mutation_partition&& wr)
|
||||
std::deque<clustering_row> crs, ser::writer_of_mutation_partition<bytes_ostream>&& wr)
|
||||
{
|
||||
auto srow_writer = std::move(wr).write_tomb(partition_tombstone).start_static_row();
|
||||
auto row_tombstones = [&] {
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "streamed_mutation.hh"
|
||||
|
||||
namespace ser {
|
||||
template<typename Output>
|
||||
class writer_of_mutation_partition;
|
||||
}
|
||||
|
||||
@@ -47,9 +48,9 @@ public:
|
||||
mutation_partition_serializer(const schema&, const mutation_partition&);
|
||||
public:
|
||||
void write(bytes_ostream&) const;
|
||||
void write(ser::writer_of_mutation_partition&&) const;
|
||||
void write(ser::writer_of_mutation_partition<bytes_ostream>&&) const;
|
||||
};
|
||||
|
||||
void serialize_mutation_fragments(const schema& s, tombstone partition_tombstone,
|
||||
stdx::optional<static_row> sr, range_tombstone_list range_tombstones,
|
||||
std::deque<clustering_row> clustering_rows, ser::writer_of_mutation_partition&&);
|
||||
std::deque<clustering_row> clustering_rows, ser::writer_of_mutation_partition<bytes_ostream>&&);
|
||||
|
||||
@@ -57,13 +57,14 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const {
|
||||
}
|
||||
|
||||
query::result
|
||||
to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_partitions) {
|
||||
to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_rows, uint32_t max_partitions) {
|
||||
query::result::builder builder(slice, query::result_request::only_result, { });
|
||||
for (const partition& p : r.partitions()) {
|
||||
if (!max_partitions--) {
|
||||
if (builder.row_count() >= max_rows || builder.partition_count() >= max_partitions) {
|
||||
break;
|
||||
}
|
||||
p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), query::max_rows);
|
||||
// Also enforces the per-partition limit.
|
||||
p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), max_rows - builder.row_count());
|
||||
}
|
||||
if (r.is_short_read()) {
|
||||
builder.mark_as_short_read();
|
||||
|
||||
@@ -105,7 +105,7 @@ public:
|
||||
printer pretty_printer(schema_ptr) const;
|
||||
};
|
||||
|
||||
query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t partition_limit = query::max_partitions);
|
||||
query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t row_limit, uint32_t partition_limit);
|
||||
|
||||
// Performs a query on given data source returning data in reconcilable form.
|
||||
//
|
||||
@@ -128,11 +128,7 @@ future<reconcilable_result> mutation_query(
|
||||
gc_clock::time_point query_time,
|
||||
query::result_memory_accounter&& accounter = { });
|
||||
|
||||
struct data_query_result {
|
||||
uint32_t live_rows{0};
|
||||
uint32_t partitions{0};
|
||||
};
|
||||
|
||||
future<data_query_result> data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit,
|
||||
gc_clock::time_point query_time, query::result::builder& builder);
|
||||
future<> data_query(
|
||||
schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit,
|
||||
gc_clock::time_point query_time, query::result::builder& builder);
|
||||
|
||||
@@ -40,29 +40,31 @@ namespace query {
|
||||
|
||||
class result::partition_writer {
|
||||
result_request _request;
|
||||
ser::after_qr_partition__key _w;
|
||||
ser::after_qr_partition__key<bytes_ostream> _w;
|
||||
const partition_slice& _slice;
|
||||
// We are tasked with keeping track of the range
|
||||
// as well, since we are the primary "context"
|
||||
// when iterating "inside" a partition
|
||||
const clustering_row_ranges& _ranges;
|
||||
ser::query_result__partitions& _pw;
|
||||
ser::query_result__partitions<bytes_ostream>& _pw;
|
||||
ser::vector_position _pos;
|
||||
bool _static_row_added = false;
|
||||
md5_hasher& _digest;
|
||||
md5_hasher _digest_pos;
|
||||
uint32_t& _row_count;
|
||||
uint32_t& _partition_count;
|
||||
api::timestamp_type& _last_modified;
|
||||
public:
|
||||
partition_writer(
|
||||
result_request request,
|
||||
const partition_slice& slice,
|
||||
const clustering_row_ranges& ranges,
|
||||
ser::query_result__partitions& pw,
|
||||
ser::query_result__partitions<bytes_ostream>& pw,
|
||||
ser::vector_position pos,
|
||||
ser::after_qr_partition__key w,
|
||||
ser::after_qr_partition__key<bytes_ostream> w,
|
||||
md5_hasher& digest,
|
||||
uint32_t& row_count,
|
||||
uint32_t& partition_count,
|
||||
api::timestamp_type& last_modified)
|
||||
: _request(request)
|
||||
, _w(std::move(w))
|
||||
@@ -73,6 +75,7 @@ public:
|
||||
, _digest(digest)
|
||||
, _digest_pos(digest)
|
||||
, _row_count(row_count)
|
||||
, _partition_count(partition_count)
|
||||
, _last_modified(last_modified)
|
||||
{ }
|
||||
|
||||
@@ -84,7 +87,7 @@ public:
|
||||
return _request != result_request::only_digest;
|
||||
}
|
||||
|
||||
ser::after_qr_partition__key start() {
|
||||
ser::after_qr_partition__key<bytes_ostream> start() {
|
||||
return std::move(_w);
|
||||
}
|
||||
|
||||
@@ -108,6 +111,9 @@ public:
|
||||
uint32_t& row_count() {
|
||||
return _row_count;
|
||||
}
|
||||
uint32_t& partition_count() {
|
||||
return _partition_count;
|
||||
}
|
||||
api::timestamp_type& last_modified() {
|
||||
return _last_modified;
|
||||
}
|
||||
@@ -118,16 +124,17 @@ class result::builder {
|
||||
bytes_ostream _out;
|
||||
md5_hasher _digest;
|
||||
const partition_slice& _slice;
|
||||
ser::query_result__partitions _w;
|
||||
ser::query_result__partitions<bytes_ostream> _w;
|
||||
result_request _request;
|
||||
uint32_t _row_count = 0;
|
||||
uint32_t _partition_count = 0;
|
||||
api::timestamp_type _last_modified = api::missing_timestamp;
|
||||
short_read _short_read;
|
||||
result_memory_accounter _memory_accounter;
|
||||
public:
|
||||
builder(const partition_slice& slice, result_request request, result_memory_accounter memory_accounter)
|
||||
: _slice(slice)
|
||||
, _w(ser::writer_of_query_result(_out).start_partitions())
|
||||
, _w(ser::writer_of_query_result<bytes_ostream>(_out).start_partitions())
|
||||
, _request(request)
|
||||
, _memory_accounter(std::move(memory_accounter))
|
||||
{ }
|
||||
@@ -140,6 +147,14 @@ public:
|
||||
|
||||
const partition_slice& slice() const { return _slice; }
|
||||
|
||||
uint32_t row_count() const {
|
||||
return _row_count;
|
||||
}
|
||||
|
||||
uint32_t partition_count() const {
|
||||
return _partition_count;
|
||||
}
|
||||
|
||||
// Starts new partition and returns a builder for its contents.
|
||||
// Invalidates all previously obtained builders
|
||||
partition_writer add_partition(const schema& s, const partition_key& key) {
|
||||
@@ -156,22 +171,23 @@ public:
|
||||
if (_request != result_request::only_result) {
|
||||
key.feed_hash(_digest, s);
|
||||
}
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count, _last_modified);
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count,
|
||||
_partition_count, _last_modified);
|
||||
}
|
||||
|
||||
result build() {
|
||||
std::move(_w).end_partitions().end_query_result();
|
||||
switch (_request) {
|
||||
case result_request::only_result:
|
||||
return result(std::move(_out), _short_read, _row_count, std::move(_memory_accounter).done());
|
||||
return result(std::move(_out), _short_read, _row_count, _partition_count, std::move(_memory_accounter).done());
|
||||
case result_request::only_digest: {
|
||||
bytes_ostream buf;
|
||||
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
|
||||
ser::writer_of_query_result<bytes_ostream>(buf).start_partitions().end_partitions().end_query_result();
|
||||
return result(std::move(buf), result_digest(_digest.finalize_array()), _last_modified, _short_read);
|
||||
}
|
||||
case result_request::result_and_digest:
|
||||
return result(std::move(_out), result_digest(_digest.finalize_array()),
|
||||
_last_modified, _short_read, _row_count, std::move(_memory_accounter).done());
|
||||
_last_modified, _short_read, _row_count, _partition_count, std::move(_memory_accounter).done());
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
105
query-result.hh
105
query-result.hh
@@ -67,8 +67,21 @@ public:
|
||||
return _maximum_total_result_memory - _memory_limiter.available_units();
|
||||
}
|
||||
|
||||
// Reserves minimum_result_size and creates new memory accounter.
|
||||
future<result_memory_accounter> new_read();
|
||||
// Reserves minimum_result_size and creates new memory accounter for
|
||||
// mutation query. Uses the specified maximum result size and may be
|
||||
// stopped before reaching it due to memory pressure on shard.
|
||||
future<result_memory_accounter> new_mutation_read(size_t max_result_size);
|
||||
|
||||
// Reserves minimum_result_size and creates new memory accounter for
|
||||
// data query. Uses the specified maximum result size, result will *not*
|
||||
// be stopped due to on shard memory pressure in order to avoid digest
|
||||
// mismatches.
|
||||
future<result_memory_accounter> new_data_read(size_t max_result_size);
|
||||
|
||||
// Creates a memory accounter for digest reads. Such accounter doesn't
|
||||
// contribute to the shard memory usage, but still stops producing the
|
||||
// result after individual limit has been reached.
|
||||
future<result_memory_accounter> new_digest_read(size_t max_result_size);
|
||||
|
||||
// Checks whether the result can grow any more, takes into account only
|
||||
// the per shard limit.
|
||||
@@ -108,12 +121,50 @@ class result_memory_accounter {
|
||||
size_t _blocked_bytes = 0;
|
||||
size_t _used_memory = 0;
|
||||
size_t _total_used_memory = 0;
|
||||
size_t _maximum_result_size = 0;
|
||||
stop_iteration _stop_on_global_limit;
|
||||
private:
|
||||
explicit result_memory_accounter(result_memory_limiter& limiter) noexcept
|
||||
// Mutation query accounter. Uses provided individual result size limit and
|
||||
// will stop when shard memory pressure grows too high.
|
||||
struct mutation_query_tag { };
|
||||
explicit result_memory_accounter(mutation_query_tag, result_memory_limiter& limiter, size_t max_size) noexcept
|
||||
: _limiter(&limiter)
|
||||
, _blocked_bytes(result_memory_limiter::minimum_result_size)
|
||||
, _maximum_result_size(max_size)
|
||||
, _stop_on_global_limit(true)
|
||||
{ }
|
||||
|
||||
// Data query accounter. Uses provided individual result size limit and
|
||||
// will *not* stop even though shard memory pressure grows too high.
|
||||
struct data_query_tag { };
|
||||
explicit result_memory_accounter(data_query_tag, result_memory_limiter& limiter, size_t max_size) noexcept
|
||||
: _limiter(&limiter)
|
||||
, _blocked_bytes(result_memory_limiter::minimum_result_size)
|
||||
, _maximum_result_size(max_size)
|
||||
{ }
|
||||
|
||||
// Digest query accounter. Uses provided individual result size limit and
|
||||
// will *not* stop even though shard memory pressure grows too high. This
|
||||
// accounter does not contribute to the shard memory limits.
|
||||
struct digest_query_tag { };
|
||||
explicit result_memory_accounter(digest_query_tag, result_memory_limiter&, size_t max_size) noexcept
|
||||
: _blocked_bytes(0)
|
||||
, _maximum_result_size(max_size)
|
||||
{ }
|
||||
|
||||
friend class result_memory_limiter;
|
||||
public:
|
||||
// State of a accounter on another shard. Used to pass information about
|
||||
// the size of the result so far in range queries.
|
||||
class foreign_state {
|
||||
size_t _used_memory;
|
||||
size_t _max_result_size;
|
||||
public:
|
||||
foreign_state(size_t used_mem, size_t max_result_size)
|
||||
: _used_memory(used_mem), _max_result_size(max_result_size) { }
|
||||
size_t used_memory() const { return _used_memory; }
|
||||
size_t max_result_size() const { return _max_result_size; }
|
||||
};
|
||||
public:
|
||||
result_memory_accounter() = default;
|
||||
|
||||
@@ -123,9 +174,10 @@ public:
|
||||
// accouter will learn how big the total result alread is and limit the
|
||||
// part produced on this shard so that after merging the final result
|
||||
// does not exceed the individual limit.
|
||||
result_memory_accounter(result_memory_limiter& limiter, const result_memory_accounter& foreign_accounter) noexcept
|
||||
result_memory_accounter(result_memory_limiter& limiter, foreign_state fstate) noexcept
|
||||
: _limiter(&limiter)
|
||||
, _total_used_memory(foreign_accounter.used_memory())
|
||||
, _total_used_memory(fstate.used_memory())
|
||||
, _maximum_result_size(fstate.max_result_size())
|
||||
{ }
|
||||
|
||||
result_memory_accounter(result_memory_accounter&& other) noexcept
|
||||
@@ -133,6 +185,8 @@ public:
|
||||
, _blocked_bytes(other._blocked_bytes)
|
||||
, _used_memory(other._used_memory)
|
||||
, _total_used_memory(other._total_used_memory)
|
||||
, _maximum_result_size(other._maximum_result_size)
|
||||
, _stop_on_global_limit(other._stop_on_global_limit)
|
||||
{ }
|
||||
|
||||
result_memory_accounter& operator=(result_memory_accounter&& other) noexcept {
|
||||
@@ -151,17 +205,21 @@ public:
|
||||
|
||||
size_t used_memory() const { return _used_memory; }
|
||||
|
||||
foreign_state state_for_another_shard() {
|
||||
return foreign_state(_used_memory, _maximum_result_size);
|
||||
}
|
||||
|
||||
// Consume n more bytes for the result. Returns stop_iteration::yes if
|
||||
// the result cannot grow any more (taking into account both individual
|
||||
// and per-shard limits).
|
||||
stop_iteration update_and_check(size_t n) {
|
||||
_used_memory += n;
|
||||
_total_used_memory += n;
|
||||
auto stop = stop_iteration(_total_used_memory > result_memory_limiter::maximum_result_size);
|
||||
auto stop = stop_iteration(_total_used_memory > _maximum_result_size);
|
||||
if (_limiter && _used_memory > _blocked_bytes) {
|
||||
auto to_block = std::min(_used_memory - _blocked_bytes, n);
|
||||
_blocked_bytes += to_block;
|
||||
stop = _limiter->update_and_check(to_block) || stop;
|
||||
stop = (_limiter->update_and_check(to_block) && _stop_on_global_limit) || stop;
|
||||
}
|
||||
return stop;
|
||||
}
|
||||
@@ -170,7 +228,7 @@ public:
|
||||
stop_iteration check() const {
|
||||
stop_iteration stop { _total_used_memory > result_memory_limiter::maximum_result_size };
|
||||
if (!stop && _used_memory >= _blocked_bytes && _limiter) {
|
||||
return _limiter->check();
|
||||
return _limiter->check() && _stop_on_global_limit;
|
||||
}
|
||||
return stop;
|
||||
}
|
||||
@@ -189,12 +247,22 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_read() {
|
||||
return _memory_limiter.wait(minimum_result_size).then([this] {
|
||||
return result_memory_accounter(*this);
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_mutation_read(size_t max_size) {
|
||||
return _memory_limiter.wait(minimum_result_size).then([this, max_size] {
|
||||
return result_memory_accounter(result_memory_accounter::mutation_query_tag(), *this, max_size);
|
||||
});
|
||||
}
|
||||
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_data_read(size_t max_size) {
|
||||
return _memory_limiter.wait(minimum_result_size).then([this, max_size] {
|
||||
return result_memory_accounter(result_memory_accounter::data_query_tag(), *this, max_size);
|
||||
});
|
||||
}
|
||||
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_digest_read(size_t max_size) {
|
||||
return make_ready_future<result_memory_accounter>(result_memory_accounter(result_memory_accounter::digest_query_tag(), *this, max_size));
|
||||
}
|
||||
|
||||
enum class result_request {
|
||||
only_result,
|
||||
only_digest,
|
||||
@@ -265,29 +333,32 @@ class result {
|
||||
api::timestamp_type _last_modified = api::missing_timestamp;
|
||||
short_read _short_read;
|
||||
query::result_memory_tracker _memory_tracker;
|
||||
stdx::optional<uint32_t> _partition_count;
|
||||
public:
|
||||
class builder;
|
||||
class partition_writer;
|
||||
friend class result_merger;
|
||||
|
||||
result();
|
||||
result(bytes_ostream&& w, short_read sr, stdx::optional<uint32_t> c = { },
|
||||
result(bytes_ostream&& w, short_read sr, stdx::optional<uint32_t> c = { }, stdx::optional<uint32_t> pc = { },
|
||||
result_memory_tracker memory_tracker = { })
|
||||
: _w(std::move(w))
|
||||
, _row_count(c)
|
||||
, _short_read(sr)
|
||||
, _memory_tracker(std::move(_memory_tracker))
|
||||
, _memory_tracker(std::move(memory_tracker))
|
||||
, _partition_count(pc)
|
||||
{
|
||||
w.reduce_chunk_count();
|
||||
}
|
||||
result(bytes_ostream&& w, stdx::optional<result_digest> d, api::timestamp_type last_modified,
|
||||
short_read sr, stdx::optional<uint32_t> c = { }, result_memory_tracker memory_tracker = { })
|
||||
short_read sr, stdx::optional<uint32_t> c = { }, stdx::optional<uint32_t> pc = { }, result_memory_tracker memory_tracker = { })
|
||||
: _w(std::move(w))
|
||||
, _digest(d)
|
||||
, _row_count(c)
|
||||
, _last_modified(last_modified)
|
||||
, _short_read(sr)
|
||||
, _memory_tracker(std::move(memory_tracker))
|
||||
, _partition_count(pc)
|
||||
{
|
||||
w.reduce_chunk_count();
|
||||
}
|
||||
@@ -316,7 +387,11 @@ public:
|
||||
return _short_read;
|
||||
}
|
||||
|
||||
uint32_t calculate_row_count(const query::partition_slice&);
|
||||
const stdx::optional<uint32_t>& partition_count() const {
|
||||
return _partition_count;
|
||||
}
|
||||
|
||||
void calculate_counts(const query::partition_slice&);
|
||||
|
||||
struct printer {
|
||||
schema_ptr s;
|
||||
|
||||
67
query.cc
67
query.cc
@@ -32,6 +32,9 @@
|
||||
|
||||
namespace query {
|
||||
|
||||
constexpr size_t result_memory_limiter::minimum_result_size;
|
||||
constexpr size_t result_memory_limiter::maximum_result_size;
|
||||
|
||||
thread_local semaphore result_memory_tracker::_dummy { 0 };
|
||||
|
||||
const partition_range full_partition_range = partition_range::make_open_ended_both_sides();
|
||||
@@ -161,16 +164,18 @@ std::ostream& operator<<(std::ostream& os, const query::result::printer& p) {
|
||||
return os;
|
||||
}
|
||||
|
||||
uint32_t result::calculate_row_count(const query::partition_slice& slice) {
|
||||
void result::calculate_counts(const query::partition_slice& slice) {
|
||||
struct {
|
||||
uint32_t total_count = 0;
|
||||
uint32_t current_partition_count = 0;
|
||||
uint32_t live_partitions = 0;
|
||||
void accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
accept_new_partition(row_count);
|
||||
}
|
||||
void accept_new_partition(uint32_t row_count) {
|
||||
total_count += row_count;
|
||||
current_partition_count = row_count;
|
||||
live_partitions += 1;
|
||||
}
|
||||
void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row) {}
|
||||
void accept_new_row(const result_row_view& static_row, const result_row_view& row) {}
|
||||
@@ -182,44 +187,78 @@ uint32_t result::calculate_row_count(const query::partition_slice& slice) {
|
||||
} counter;
|
||||
|
||||
result_view::consume(*this, slice, counter);
|
||||
return counter.total_count;
|
||||
_row_count = counter.total_count;
|
||||
_partition_count = counter.live_partitions;
|
||||
}
|
||||
|
||||
result::result()
|
||||
: result([] {
|
||||
bytes_ostream out;
|
||||
ser::writer_of_query_result(out).skip_partitions().end_query_result();
|
||||
ser::writer_of_query_result<bytes_ostream>(out).skip_partitions().end_query_result();
|
||||
return out;
|
||||
}(), short_read::no)
|
||||
}(), short_read::no, 0, 0)
|
||||
{ }
|
||||
|
||||
static void write_partial_partition(ser::writer_of_qr_partition<bytes_ostream>&& pw, const ser::qr_partition_view& pv, uint32_t rows_to_include) {
|
||||
auto key = pv.key();
|
||||
auto static_cells_wr = (key ? std::move(pw).write_key(*key) : std::move(pw).skip_key())
|
||||
.start_static_row()
|
||||
.start_cells();
|
||||
for (auto&& cell : pv.static_row().cells()) {
|
||||
static_cells_wr.add(cell);
|
||||
}
|
||||
auto rows_wr = std::move(static_cells_wr)
|
||||
.end_cells()
|
||||
.end_static_row()
|
||||
.start_rows();
|
||||
auto rows = pv.rows();
|
||||
// rows.size() can be 0 is there's a single static row
|
||||
auto it = rows.begin();
|
||||
for (uint32_t i = 0; i < std::min(rows.size(), uint64_t{rows_to_include}); ++i) {
|
||||
rows_wr.add(*it++);
|
||||
}
|
||||
std::move(rows_wr).end_rows().end_qr_partition();
|
||||
}
|
||||
|
||||
foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
if (_partial.size() == 1) {
|
||||
return std::move(_partial[0]);
|
||||
}
|
||||
|
||||
bytes_ostream w;
|
||||
auto partitions = ser::writer_of_query_result(w).start_partitions();
|
||||
std::experimental::optional<uint32_t> row_count = 0;
|
||||
auto partitions = ser::writer_of_query_result<bytes_ostream>(w).start_partitions();
|
||||
uint32_t row_count = 0;
|
||||
short_read is_short_read;
|
||||
uint32_t partition_count = 0;
|
||||
|
||||
for (auto&& r : _partial) {
|
||||
if (row_count) {
|
||||
if (r->row_count()) {
|
||||
row_count = row_count.value() + r->row_count().value();
|
||||
} else {
|
||||
row_count = std::experimental::nullopt;
|
||||
}
|
||||
}
|
||||
result_view::do_with(*r, [&] (result_view rv) {
|
||||
for (auto&& pv : rv._v.partitions()) {
|
||||
partitions.add(pv);
|
||||
auto rows = pv.rows();
|
||||
// If rows.empty(), then there's a static row, or there wouldn't be a partition
|
||||
const uint32_t rows_in_partition = rows.size() ? : 1;
|
||||
const uint32_t rows_to_include = std::min(_max_rows - row_count, rows_in_partition);
|
||||
row_count += rows_to_include;
|
||||
if (rows_to_include >= rows_in_partition) {
|
||||
partitions.add(pv);
|
||||
if (++partition_count >= _max_partitions) {
|
||||
return;
|
||||
}
|
||||
} else if (rows_to_include > 0) {
|
||||
write_partial_partition(partitions.add(), pv, rows_to_include);
|
||||
return;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
if (r->is_short_read()) {
|
||||
is_short_read = short_read::yes;
|
||||
break;
|
||||
}
|
||||
if (row_count >= _max_rows || partition_count >= _max_partitions) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
std::move(partitions).end_partitions().end_query_result();
|
||||
|
||||
@@ -30,7 +30,14 @@ namespace query {
|
||||
// Implements @Reducer concept from distributed.hh
|
||||
class result_merger {
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _partial;
|
||||
const uint32_t _max_rows;
|
||||
const uint32_t _max_partitions;
|
||||
public:
|
||||
explicit result_merger(uint32_t max_rows, uint32_t max_partitions)
|
||||
: _max_rows(max_rows)
|
||||
, _max_partitions(max_partitions)
|
||||
{ }
|
||||
|
||||
void reserve(size_t size) {
|
||||
_partial.reserve(size);
|
||||
}
|
||||
|
||||
@@ -490,6 +490,13 @@ static void split_and_add(std::vector<::nonwrapping_range<dht::token>>& ranges,
|
||||
auto midpoint = dht::global_partitioner().midpoint(
|
||||
range.start() ? range.start()->value() : dht::minimum_token(),
|
||||
range.end() ? range.end()->value() : dht::minimum_token());
|
||||
// This shouldn't happen, but if the range included just one token, we
|
||||
// can't split further (split() may actually fail with assertion failure)
|
||||
if ((range.start() && midpoint == range.start()->value()) ||
|
||||
(range.end() && midpoint == range.end()->value())) {
|
||||
ranges.push_back(range);
|
||||
return;
|
||||
}
|
||||
auto halves = range.split(midpoint, dht::token_comparator());
|
||||
ranges.push_back(halves.first);
|
||||
ranges.push_back(halves.second);
|
||||
|
||||
@@ -30,6 +30,7 @@ import ConfigParser
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import uuid
|
||||
from pkg_resources import parse_version
|
||||
|
||||
VERSION = "1.0"
|
||||
@@ -64,6 +65,10 @@ def get_api(path):
|
||||
def version_compare(a, b):
|
||||
return parse_version(a) < parse_version(b)
|
||||
|
||||
def create_uuid_file(fl):
|
||||
with open(args.uuid_file, 'w') as myfile:
|
||||
myfile.write(str(uuid.uuid1()) + "\n")
|
||||
|
||||
def check_version(ar):
|
||||
if config and (not config.has_option("housekeeping", "check-version") or not config.getboolean("housekeeping", "check-version")):
|
||||
return
|
||||
@@ -80,8 +85,8 @@ def check_version(ar):
|
||||
# mode would accept any string.
|
||||
# use i for install, c (default) for running from the command line
|
||||
params = params + "&sts=" + ar.mode
|
||||
if uuid:
|
||||
params = params + "&uu=" + uuid
|
||||
if uid:
|
||||
params = params + "&uu=" + uid
|
||||
latest_version = get_json_from_url(version_url + params)["version"]
|
||||
except:
|
||||
traceln("Unable to retrieve version information")
|
||||
@@ -112,10 +117,12 @@ if args.config != "":
|
||||
sys.exit(0)
|
||||
config = ConfigParser.SafeConfigParser()
|
||||
config.read(args.config)
|
||||
uuid = None
|
||||
uid = None
|
||||
if args.uuid != "":
|
||||
uuid = args.uuid
|
||||
if args.uuid_file != "" and os.path.exists(args.uuid_file):
|
||||
uid = args.uuid
|
||||
if args.uuid_file != "":
|
||||
if not os.path.exists(args.uuid_file):
|
||||
create_uuid_file(args.uuid_file)
|
||||
with open(args.uuid_file, 'r') as myfile:
|
||||
uuid = myfile.read().replace('\n', '')
|
||||
uid = myfile.read().replace('\n', '')
|
||||
args.func(args)
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 0b98024073...8e54a9b674
@@ -27,8 +27,14 @@ namespace ser {
|
||||
|
||||
// frame represents a place holder for object size which will be known later
|
||||
|
||||
template<typename Output>
|
||||
struct place_holder { };
|
||||
|
||||
struct place_holder {
|
||||
template<typename Output>
|
||||
struct frame { };
|
||||
|
||||
template<>
|
||||
struct place_holder<bytes_ostream> {
|
||||
bytes_ostream::place_holder<size_type> ph;
|
||||
|
||||
place_holder(bytes_ostream::place_holder<size_type> ph) : ph(ph) { }
|
||||
@@ -39,7 +45,8 @@ struct place_holder {
|
||||
}
|
||||
};
|
||||
|
||||
struct frame : public place_holder {
|
||||
template<>
|
||||
struct frame<bytes_ostream> : public place_holder<bytes_ostream> {
|
||||
bytes_ostream::size_type offset;
|
||||
|
||||
frame(bytes_ostream::place_holder<size_type> ph, bytes_ostream::size_type offset)
|
||||
@@ -56,25 +63,26 @@ struct vector_position {
|
||||
};
|
||||
|
||||
//empty frame, behave like a place holder, but is used when no place holder is needed
|
||||
template<typename Output>
|
||||
struct empty_frame {
|
||||
void end(bytes_ostream&) {}
|
||||
void end(Output&) {}
|
||||
empty_frame() = default;
|
||||
empty_frame(const frame&){}
|
||||
empty_frame(const frame<Output>&){}
|
||||
};
|
||||
|
||||
inline place_holder start_place_holder(bytes_ostream& out) {
|
||||
inline place_holder<bytes_ostream> start_place_holder(bytes_ostream& out) {
|
||||
auto size_ph = out.write_place_holder<size_type>();
|
||||
return { size_ph};
|
||||
}
|
||||
|
||||
inline frame start_frame(bytes_ostream& out) {
|
||||
inline frame<bytes_ostream> start_frame(bytes_ostream& out) {
|
||||
auto offset = out.size();
|
||||
auto size_ph = out.write_place_holder<size_type>();
|
||||
{
|
||||
auto out = size_ph.get_stream();
|
||||
serialize(out, (size_type)0);
|
||||
}
|
||||
return frame { size_ph, offset };
|
||||
return frame<bytes_ostream> { size_ph, offset };
|
||||
}
|
||||
|
||||
template<typename Input>
|
||||
@@ -86,4 +94,25 @@ size_type read_frame_size(Input& in) {
|
||||
return sz - sizeof(size_type);
|
||||
}
|
||||
|
||||
|
||||
template<>
|
||||
struct place_holder<seastar::measuring_output_stream> {
|
||||
void set(seastar::measuring_output_stream&, size_type) { }
|
||||
};
|
||||
|
||||
template<>
|
||||
struct frame<seastar::measuring_output_stream> : public place_holder<seastar::measuring_output_stream> {
|
||||
void end(seastar::measuring_output_stream& out) { }
|
||||
};
|
||||
|
||||
inline place_holder<seastar::measuring_output_stream> start_place_holder(seastar::measuring_output_stream& out) {
|
||||
serialize(out, size_type());
|
||||
return { };
|
||||
}
|
||||
|
||||
inline frame<seastar::measuring_output_stream> start_frame(seastar::measuring_output_stream& out) {
|
||||
serialize(out, size_type());
|
||||
return { };
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -439,8 +439,10 @@ future<> migration_manager::announce_new_column_family(schema_ptr cfm, bool anno
|
||||
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
|
||||
}
|
||||
logger.info("Create new ColumnFamily: {}", cfm);
|
||||
auto mutations = db::schema_tables::make_create_table_mutations(keyspace.metadata(), cfm, api::new_timestamp());
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
return db::schema_tables::make_create_table_mutations(keyspace.metadata(), cfm, api::new_timestamp())
|
||||
.then([announce_locally, this] (auto&& mutations) {
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
});
|
||||
} catch (const no_such_keyspace& e) {
|
||||
throw exceptions::configuration_exception(sprint("Cannot add table '%s' to non existing keyspace '%s'.", cfm->cf_name(), cfm->ks_name()));
|
||||
}
|
||||
@@ -459,8 +461,10 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f
|
||||
#endif
|
||||
logger.info("Update table '{}.{}' From {} To {}", cfm->ks_name(), cfm->cf_name(), *old_schema, *cfm);
|
||||
auto&& keyspace = db.find_keyspace(cfm->ks_name());
|
||||
auto mutations = db::schema_tables::make_update_table_mutations(keyspace.metadata(), old_schema, cfm, api::new_timestamp(), from_thrift);
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
return db::schema_tables::make_update_table_mutations(keyspace.metadata(), old_schema, cfm, api::new_timestamp(), from_thrift)
|
||||
.then([announce_locally] (auto&& mutations) {
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
});
|
||||
} catch (const no_such_column_family& e) {
|
||||
throw exceptions::configuration_exception(sprint("Cannot update non existing table '%s' in keyspace '%s'.",
|
||||
cfm->cf_name(), cfm->ks_name()));
|
||||
@@ -470,8 +474,10 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f
|
||||
static future<> do_announce_new_type(user_type new_type, bool announce_locally) {
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
auto&& keyspace = db.find_keyspace(new_type->_keyspace);
|
||||
auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp());
|
||||
return migration_manager::announce(std::move(mutations), announce_locally);
|
||||
return db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp())
|
||||
.then([announce_locally] (auto&& mutations) {
|
||||
return migration_manager::announce(std::move(mutations), announce_locally);
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_new_type(user_type new_type, bool announce_locally) {
|
||||
@@ -558,8 +564,10 @@ future<> migration_manager::announce_column_family_drop(const sstring& ks_name,
|
||||
auto&& old_cfm = db.find_schema(ks_name, cf_name);
|
||||
auto&& keyspace = db.find_keyspace(ks_name);
|
||||
logger.info("Drop table '{}.{}'", old_cfm->ks_name(), old_cfm->cf_name());
|
||||
auto mutations = db::schema_tables::make_drop_table_mutations(keyspace.metadata(), old_cfm, api::new_timestamp());
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
return db::schema_tables::make_drop_table_mutations(keyspace.metadata(), old_cfm, api::new_timestamp())
|
||||
.then([announce_locally] (auto&& mutations) {
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
});
|
||||
} catch (const no_such_column_family& e) {
|
||||
throw exceptions::configuration_exception(sprint("Cannot drop non existing table '%s' in keyspace '%s'.", cf_name, ks_name));
|
||||
}
|
||||
@@ -570,8 +578,10 @@ future<> migration_manager::announce_type_drop(user_type dropped_type, bool anno
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
auto&& keyspace = db.find_keyspace(dropped_type->_keyspace);
|
||||
logger.info("Drop User Type: {}", dropped_type->get_name_as_string());
|
||||
auto mutations = db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, api::new_timestamp());
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
return db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, api::new_timestamp())
|
||||
.then([announce_locally] (auto&& mutations) {
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
});
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
@@ -64,7 +64,7 @@ public:
|
||||
, _ranges(std::move(ranges))
|
||||
{}
|
||||
|
||||
private:
|
||||
private:
|
||||
static bool has_clustering_keys(const schema& s, const query::read_command& cmd) {
|
||||
return s.clustering_key_size() > 0
|
||||
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
|
||||
@@ -229,94 +229,42 @@ private:
|
||||
|
||||
class myvisitor : public cql3::selection::result_set_builder::visitor {
|
||||
public:
|
||||
impl& _impl;
|
||||
uint32_t page_size;
|
||||
uint32_t part_rows = 0;
|
||||
|
||||
uint32_t included_rows = 0;
|
||||
uint32_t total_rows = 0;
|
||||
std::experimental::optional<partition_key> last_pkey;
|
||||
std::experimental::optional<clustering_key> last_ckey;
|
||||
|
||||
// just for verbosity
|
||||
uint32_t part_ignored = 0;
|
||||
|
||||
clustering_key::less_compare _less;
|
||||
|
||||
bool include_row() {
|
||||
++total_rows;
|
||||
++part_rows;
|
||||
if (included_rows >= page_size) {
|
||||
++part_ignored;
|
||||
return false;
|
||||
}
|
||||
++included_rows;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool include_row(const clustering_key& key) {
|
||||
if (!include_row()) {
|
||||
return false;
|
||||
}
|
||||
last_ckey = key;
|
||||
return true;
|
||||
}
|
||||
myvisitor(impl& i, uint32_t ps,
|
||||
cql3::selection::result_set_builder& builder,
|
||||
myvisitor(cql3::selection::result_set_builder& builder,
|
||||
const schema& s,
|
||||
const cql3::selection::selection& selection)
|
||||
: visitor(builder, s, selection), _impl(i), page_size(ps), _less(*_impl._schema) {
|
||||
: visitor(builder, s, selection) {
|
||||
}
|
||||
|
||||
void accept_new_partition(uint32_t) {
|
||||
throw std::logic_error("Should not reach!");
|
||||
}
|
||||
void accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
logger.trace("Begin partition: {} ({})", key, row_count);
|
||||
part_rows = 0;
|
||||
part_ignored = 0;
|
||||
if (included_rows < page_size) {
|
||||
last_pkey = key;
|
||||
last_ckey = { };
|
||||
}
|
||||
logger.trace("Accepting partition: {} ({})", key, row_count);
|
||||
total_rows += row_count;
|
||||
last_pkey = key;
|
||||
last_ckey = { };
|
||||
visitor::accept_new_partition(key, row_count);
|
||||
}
|
||||
void accept_new_row(const clustering_key& key,
|
||||
const query::result_row_view& static_row,
|
||||
const query::result_row_view& row) {
|
||||
// TODO: should we use exception/long jump or introduce
|
||||
// a "stop" condition to the calling result_view and
|
||||
// avoid processing unneeded rows?
|
||||
auto ok = include_row(key);
|
||||
if (ok) {
|
||||
visitor::accept_new_row(key, static_row, row);
|
||||
}
|
||||
last_ckey = key;
|
||||
visitor::accept_new_row(key, static_row, row);
|
||||
}
|
||||
void accept_new_row(const query::result_row_view& static_row,
|
||||
const query::result_row_view& row) {
|
||||
auto ok = include_row();
|
||||
if (ok) {
|
||||
visitor::accept_new_row(static_row, row);
|
||||
}
|
||||
visitor::accept_new_row(static_row, row);
|
||||
}
|
||||
void accept_partition_end(const query::result_row_view& static_row) {
|
||||
// accept_partition_end with row_count == 0
|
||||
// means we had an empty partition but live
|
||||
// static columns, and since the fix,
|
||||
// no CK restrictions.
|
||||
// I.e. _row_count == 0 -> add a partially empty row
|
||||
// So, treat this case as an accept_row variant
|
||||
if (_row_count > 0 || include_row()) {
|
||||
visitor::accept_partition_end(static_row);
|
||||
}
|
||||
logger.trace(
|
||||
"End partition, included={}, ignored={}",
|
||||
part_rows - part_ignored,
|
||||
part_ignored);
|
||||
visitor::accept_partition_end(static_row);
|
||||
}
|
||||
};
|
||||
|
||||
myvisitor v(*this, std::min(page_size, _max), builder, *_schema, *_selection);
|
||||
myvisitor v(builder, *_schema, *_selection);
|
||||
query::result_view::consume(*results, _cmd->slice, v);
|
||||
|
||||
if (_last_pkey) {
|
||||
@@ -328,13 +276,12 @@ private:
|
||||
_cmd->slice.clear_range(*_schema, *_last_pkey);
|
||||
}
|
||||
|
||||
_max = _max - v.included_rows;
|
||||
_exhausted = (v.included_rows < page_size && !results->is_short_read()) || _max == 0;
|
||||
_max = _max - v.total_rows;
|
||||
_exhausted = (v.total_rows < page_size && !results->is_short_read()) || _max == 0;
|
||||
_last_pkey = v.last_pkey;
|
||||
_last_ckey = v.last_ckey;
|
||||
|
||||
logger.debug("Fetched {}/{} rows, max_remain={} {}", v.included_rows, v.total_rows,
|
||||
_max, _exhausted ? "(exh)" : "");
|
||||
logger.debug("Fetched {} rows, max_remain={} {}", v.total_rows, _max, _exhausted ? "(exh)" : "");
|
||||
|
||||
if (_last_pkey) {
|
||||
logger.debug("Last partition key: {}", *_last_pkey);
|
||||
@@ -363,7 +310,6 @@ private:
|
||||
// remember if we use clustering. if not, each partition == one row
|
||||
const bool _has_clustering_keys;
|
||||
bool _exhausted = false;
|
||||
uint32_t _rem = 0;
|
||||
uint32_t _max;
|
||||
|
||||
std::experimental::optional<partition_key> _last_pkey;
|
||||
|
||||
@@ -472,7 +472,6 @@ inline uint64_t& storage_proxy::split_stats::get_ep_stat(gms::inet_address ep) {
|
||||
storage_proxy::~storage_proxy() {}
|
||||
storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
_metrics.add_group(COORDINATOR_STATS_CATEGORY, {
|
||||
sm::make_queue_length("foreground_writes", [this] { return _stats.writes - _stats.background_writes; },
|
||||
sm::description("number of currently pending forground write requests")),
|
||||
@@ -480,7 +479,7 @@ storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
|
||||
sm::make_queue_length("background_writes", [this] { return _stats.background_writes; },
|
||||
sm::description("number of currently pending background write requests")),
|
||||
|
||||
sm::make_queue_length("throttled_writes", [this] { return _throttled_writes.size(); },
|
||||
sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); },
|
||||
sm::description("number of currently throttled write requests")),
|
||||
|
||||
sm::make_total_operations("throttled_writes", [this] { return _stats.throttled_writes; },
|
||||
@@ -2074,7 +2073,7 @@ public:
|
||||
versions.reserve(_data_results.front().result->partitions().size());
|
||||
|
||||
for (auto& r : _data_results) {
|
||||
_is_short_read = r.result->is_short_read();
|
||||
_is_short_read = _is_short_read || r.result->is_short_read();
|
||||
r.reached_end = !r.result->is_short_read() && r.result->row_count() < cmd.row_limit
|
||||
&& (cmd.partition_limit == query::max_partitions
|
||||
|| boost::range::count_if(r.result->partitions(), [] (const partition& p) {
|
||||
@@ -2346,7 +2345,8 @@ protected:
|
||||
if (rr_opt && (can_send_short_read || data_resolver->all_reached_end() || rr_opt->row_count() >= original_row_limit()
|
||||
|| data_resolver->live_partition_count() >= original_partition_limit())
|
||||
&& !data_resolver->any_partition_short_read()) {
|
||||
auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice)));
|
||||
auto result = ::make_foreign(::make_lw_shared(
|
||||
to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->row_limit, cmd->partition_limit)));
|
||||
// wait for write to complete before returning result to prevent multiple concurrent read requests to
|
||||
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
|
||||
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
|
||||
@@ -2387,6 +2387,13 @@ protected:
|
||||
_retry_cmd->row_limit = x(cmd->row_limit, data_resolver->total_live_count());
|
||||
}
|
||||
}
|
||||
|
||||
// We may be unable to send a single live row because of replicas bailing out too early.
|
||||
// If that is the case disallow short reads so that we can make progress.
|
||||
if (!data_resolver->total_live_count()) {
|
||||
_retry_cmd->slice.options.remove<query::partition_slice::option::allow_short_read>();
|
||||
}
|
||||
|
||||
logger.trace("Retrying query with command {} (previous is {})", *_retry_cmd, *cmd);
|
||||
reconcile(cl, timeout, _retry_cmd);
|
||||
}
|
||||
@@ -2626,17 +2633,17 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
|
||||
}
|
||||
|
||||
future<query::result_digest, api::timestamp_type>
|
||||
storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) {
|
||||
return query_singular_local(std::move(s), std::move(cmd), pr, query::result_request::only_digest, std::move(trace_state)).then([] (foreign_ptr<lw_shared_ptr<query::result>> result) {
|
||||
storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state, uint64_t max_size) {
|
||||
return query_singular_local(std::move(s), std::move(cmd), pr, query::result_request::only_digest, std::move(trace_state), max_size).then([] (foreign_ptr<lw_shared_ptr<query::result>> result) {
|
||||
return make_ready_future<query::result_digest, api::timestamp_type>(*result->digest(), result->last_modified());
|
||||
});
|
||||
}
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>>
|
||||
storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, query::result_request request, tracing::trace_state_ptr trace_state) {
|
||||
storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, query::result_request request, tracing::trace_state_ptr trace_state, uint64_t max_size) {
|
||||
unsigned shard = _db.local().shard_of(pr.start()->value().token());
|
||||
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.query(gs, *cmd, request, prv, gt).then([](auto&& f) {
|
||||
return _db.invoke_on(shard, [max_size, gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.query(gs, *cmd, request, prv, gt, max_size).then([](auto&& f) {
|
||||
return make_foreign(std::move(f));
|
||||
});
|
||||
});
|
||||
@@ -2670,7 +2677,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
|
||||
exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state));
|
||||
}
|
||||
|
||||
query::result_merger merger;
|
||||
query::result_merger merger(cmd->row_limit, cmd->partition_limit);
|
||||
merger.reserve(exec.size());
|
||||
|
||||
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout] (::shared_ptr<abstract_read_executor>& rex) {
|
||||
@@ -2687,7 +2694,8 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
|
||||
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
|
||||
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t total_row_count) {
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
|
||||
uint32_t remaining_row_count, uint32_t remaining_partition_count) {
|
||||
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
|
||||
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
|
||||
std::vector<::shared_ptr<abstract_read_executor>> exec;
|
||||
@@ -2752,22 +2760,30 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
|
||||
exec.push_back(::make_shared<range_slice_read_executor>(schema, p, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state));
|
||||
}
|
||||
|
||||
query::result_merger merger;
|
||||
query::result_merger merger(cmd->row_limit, cmd->partition_limit);
|
||||
merger.reserve(exec.size());
|
||||
|
||||
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout] (::shared_ptr<abstract_read_executor>& rex) {
|
||||
return rex->execute(timeout);
|
||||
}, std::move(merger));
|
||||
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count, trace_state = std::move(trace_state)]
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges),
|
||||
cl, cmd, concurrency_factor, timeout, remaining_row_count, remaining_partition_count, trace_state = std::move(trace_state)]
|
||||
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
|
||||
total_row_count += result->row_count() ? result->row_count().value() :
|
||||
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
|
||||
if (!result->row_count() || !result->partition_count()) {
|
||||
logger.error("no row count in query result, should not happen here");
|
||||
result->calculate_counts(cmd->slice);
|
||||
}
|
||||
remaining_row_count -= result->row_count().value();
|
||||
remaining_partition_count -= result->partition_count().value();
|
||||
results.emplace_back(std::move(result));
|
||||
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
|
||||
if (i == ranges.end() || !remaining_row_count || !remaining_partition_count) {
|
||||
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
|
||||
} else {
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, std::move(trace_state), total_row_count);
|
||||
cmd->row_limit = remaining_row_count;
|
||||
cmd->partition_limit = remaining_partition_count;
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i),
|
||||
std::move(ranges), concurrency_factor, std::move(trace_state), remaining_row_count, remaining_partition_count);
|
||||
}
|
||||
}).handle_exception([p] (std::exception_ptr eptr) {
|
||||
p->handle_read_error(eptr, true);
|
||||
@@ -2812,9 +2828,10 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
|
||||
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
|
||||
|
||||
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor, std::move(trace_state))
|
||||
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
query::result_merger merger;
|
||||
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor,
|
||||
std::move(trace_state), cmd->row_limit, cmd->partition_limit)
|
||||
.then([row_limit = cmd->row_limit, partition_limit = cmd->partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
query::result_merger merger(row_limit, partition_limit);
|
||||
merger.reserve(results.size());
|
||||
|
||||
for (auto&& r: results) {
|
||||
@@ -2838,7 +2855,8 @@ storage_proxy::query(schema_ptr s,
|
||||
logger.trace("query {}.{} cmd={}, ranges={}, id={}", s->ks_name(), s->cf_name(), *cmd, partition_ranges, query_id);
|
||||
return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state)).then([query_id, cmd, s] (foreign_ptr<lw_shared_ptr<query::result>>&& res) {
|
||||
if (res->buf().is_linearized()) {
|
||||
logger.trace("query_result id={}, size={}, rows={}", query_id, res->buf().size(), res->calculate_row_count(cmd->slice));
|
||||
res->calculate_counts(cmd->slice);
|
||||
logger.trace("query_result id={}, size={}, rows={}, partitions={}", query_id, res->buf().size(), *res->row_count(), *res->partition_count());
|
||||
} else {
|
||||
logger.trace("query_result id={}, size={}", query_id, res->buf().size());
|
||||
}
|
||||
@@ -3447,9 +3465,10 @@ void storage_proxy::init_messaging_service() {
|
||||
tracing::trace(trace_state_ptr, "read_data: message received from /{}", src_addr.addr);
|
||||
}
|
||||
auto da = oda.value_or(query::digest_algorithm::MD5);
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto max_size = cinfo.retrieve_auxiliary<uint64_t>("max_result_size");
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, max_size] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto src_ip = src_addr.addr;
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, da, &pr, &p, &trace_state_ptr] (schema_ptr s) {
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, da, &pr, &p, &trace_state_ptr, max_size] (schema_ptr s) {
|
||||
auto pr2 = compat::unwrap(std::move(pr), *s);
|
||||
if (pr2.second) {
|
||||
// this function assumes singular queries but doesn't validate
|
||||
@@ -3464,7 +3483,7 @@ void storage_proxy::init_messaging_service() {
|
||||
qrr = query::result_request::result_and_digest;
|
||||
break;
|
||||
}
|
||||
return p->query_singular_local(std::move(s), cmd, std::move(pr2.first), qrr, trace_state_ptr);
|
||||
return p->query_singular_local(std::move(s), cmd, std::move(pr2.first), qrr, trace_state_ptr, max_size);
|
||||
}).finally([&trace_state_ptr, src_ip] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
@@ -3478,10 +3497,20 @@ void storage_proxy::init_messaging_service() {
|
||||
tracing::begin(trace_state_ptr);
|
||||
tracing::trace(trace_state_ptr, "read_mutation_data: message received from /{}", src_addr.addr);
|
||||
}
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto max_size = cinfo.retrieve_auxiliary<uint64_t>("max_result_size");
|
||||
return do_with(std::move(pr),
|
||||
get_local_shared_storage_proxy(),
|
||||
std::move(trace_state_ptr),
|
||||
compat::one_or_two_partition_ranges({}),
|
||||
[&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), max_size] (
|
||||
compat::wrapping_partition_range& pr,
|
||||
shared_ptr<storage_proxy>& p,
|
||||
tracing::trace_state_ptr& trace_state_ptr,
|
||||
compat::one_or_two_partition_ranges& unwrapped) mutable {
|
||||
auto src_ip = src_addr.addr;
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) mutable {
|
||||
return p->query_mutations_locally(std::move(s), cmd, compat::unwrap(std::move(pr), *s), trace_state_ptr);
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr, max_size, &unwrapped] (schema_ptr s) mutable {
|
||||
unwrapped = compat::unwrap(std::move(pr), *s);
|
||||
return p->query_mutations_locally(std::move(s), std::move(cmd), unwrapped, trace_state_ptr, max_size);
|
||||
}).finally([&trace_state_ptr, src_ip] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_mutation_data handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
@@ -3495,15 +3524,16 @@ void storage_proxy::init_messaging_service() {
|
||||
tracing::begin(trace_state_ptr);
|
||||
tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr);
|
||||
}
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto max_size = cinfo.retrieve_auxiliary<uint64_t>("max_result_size");
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), max_size] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto src_ip = src_addr.addr;
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) {
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr, max_size] (schema_ptr s) {
|
||||
auto pr2 = compat::unwrap(std::move(pr), *s);
|
||||
if (pr2.second) {
|
||||
// this function assumes singular queries but doesn't validate
|
||||
throw std::runtime_error("READ_DIGEST called with wrapping range");
|
||||
}
|
||||
return p->query_singular_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr);
|
||||
return p->query_singular_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, max_size);
|
||||
}).finally([&trace_state_ptr, src_ip] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
@@ -3539,41 +3569,166 @@ void storage_proxy::uninit_messaging_service() {
|
||||
// Merges reconcilable_result:s from different shards into one
|
||||
// Drops partitions which exceed the limit.
|
||||
class mutation_result_merger {
|
||||
schema_ptr _schema;
|
||||
lw_shared_ptr<const query::read_command> _cmd;
|
||||
unsigned _row_count = 0;
|
||||
unsigned _partition_count = 0;
|
||||
bool _short_read_allowed;
|
||||
query::short_read _short_read;
|
||||
std::vector<partition> _partitions;
|
||||
// we get a batch of partitions each time, each with a key
|
||||
// partition batches should be maintained in key order
|
||||
// batches that share a key should be merged and sorted in decorated_key
|
||||
// order
|
||||
struct partitions_batch {
|
||||
std::vector<partition> partitions;
|
||||
query::short_read short_read;
|
||||
};
|
||||
std::multimap<unsigned, partitions_batch> _partitions;
|
||||
query::result_memory_accounter _memory_accounter;
|
||||
stdx::optional<unsigned> _stop_after_key;
|
||||
public:
|
||||
explicit mutation_result_merger(const query::read_command& cmd)
|
||||
: _short_read_allowed(cmd.slice.options.contains(query::partition_slice::option::allow_short_read))
|
||||
{ }
|
||||
|
||||
explicit mutation_result_merger(schema_ptr schema, lw_shared_ptr<const query::read_command> cmd)
|
||||
: _schema(std::move(schema))
|
||||
, _cmd(std::move(cmd))
|
||||
, _short_read_allowed(_cmd->slice.options.contains(query::partition_slice::option::allow_short_read)) {
|
||||
}
|
||||
query::result_memory_accounter& memory() {
|
||||
return _memory_accounter;
|
||||
}
|
||||
const query::result_memory_accounter& memory() const {
|
||||
return _memory_accounter;
|
||||
}
|
||||
void add_result(foreign_ptr<lw_shared_ptr<reconcilable_result>> partial_result) {
|
||||
void add_result(unsigned key, foreign_ptr<lw_shared_ptr<reconcilable_result>> partial_result) {
|
||||
if (_stop_after_key && key > *_stop_after_key) {
|
||||
// A short result was added that goes before this one.
|
||||
return;
|
||||
}
|
||||
std::vector<partition> partitions;
|
||||
partitions.reserve(partial_result->partitions().size());
|
||||
// Following three lines to simplify patch; can remove later
|
||||
for (const partition& p : partial_result->partitions()) {
|
||||
_partitions.push_back(p);
|
||||
partitions.push_back(p);
|
||||
_row_count += p._row_count;
|
||||
_partition_count += p._row_count > 0;
|
||||
}
|
||||
_short_read = partial_result->is_short_read();
|
||||
if (_memory_accounter.update_and_check(partial_result->memory_usage()) && _short_read_allowed) {
|
||||
_short_read = query::short_read::yes;
|
||||
_memory_accounter.update(partial_result->memory_usage());
|
||||
if (partial_result->is_short_read()) {
|
||||
_stop_after_key = key;
|
||||
}
|
||||
_partitions.emplace(key, partitions_batch { std::move(partitions), partial_result->is_short_read() });
|
||||
}
|
||||
reconcilable_result get() && {
|
||||
return reconcilable_result(_row_count, std::move(_partitions), _short_read,
|
||||
std::move(_memory_accounter).done());
|
||||
auto unsorted = std::unordered_set<unsigned>();
|
||||
struct partitions_and_last_key {
|
||||
std::vector<partition> partitions;
|
||||
stdx::optional<dht::decorated_key> last; // set if we had a short read
|
||||
};
|
||||
auto merged = std::map<unsigned, partitions_and_last_key>();
|
||||
auto short_read = query::short_read(this->short_read());
|
||||
// merge batches with equal keys, and note if we need to sort afterwards
|
||||
for (auto&& key_value : _partitions) {
|
||||
auto&& key = key_value.first;
|
||||
if (_stop_after_key && key > *_stop_after_key) {
|
||||
break;
|
||||
}
|
||||
auto&& batch = key_value.second;
|
||||
auto&& dest = merged[key];
|
||||
if (dest.partitions.empty()) {
|
||||
dest.partitions = std::move(batch.partitions);
|
||||
} else {
|
||||
unsorted.insert(key);
|
||||
std::move(batch.partitions.begin(), batch.partitions.end(), std::back_inserter(dest.partitions));
|
||||
}
|
||||
// In case of a short read we need to remove all partitions from the
|
||||
// batch that come after the last partition of the short read
|
||||
// result.
|
||||
if (batch.short_read) {
|
||||
// Nobody sends a short read with no data.
|
||||
const auto& last = dest.partitions.back().mut().decorated_key(*_schema);
|
||||
if (!dest.last || last.less_compare(*_schema, *dest.last)) {
|
||||
dest.last = last;
|
||||
}
|
||||
short_read = query::short_read::yes;
|
||||
}
|
||||
}
|
||||
|
||||
// Sort batches that arrived with the same keys
|
||||
for (auto key : unsorted) {
|
||||
struct comparator {
|
||||
const schema& s;
|
||||
dht::decorated_key::less_comparator dkcmp;
|
||||
|
||||
bool operator()(const partition& a, const partition& b) const {
|
||||
return dkcmp(a.mut().decorated_key(s), b.mut().decorated_key(s));
|
||||
}
|
||||
bool operator()(const dht::decorated_key& a, const partition& b) const {
|
||||
return dkcmp(a, b.mut().decorated_key(s));
|
||||
}
|
||||
bool operator()(const partition& a, const dht::decorated_key& b) const {
|
||||
return dkcmp(a.mut().decorated_key(s), b);
|
||||
}
|
||||
};
|
||||
auto cmp = comparator { *_schema, dht::decorated_key::less_comparator(_schema) };
|
||||
|
||||
auto&& batch = merged[key];
|
||||
boost::sort(batch.partitions, cmp);
|
||||
if (batch.last) {
|
||||
// This batch was built from a result that was a short read.
|
||||
// We need to remove all partitions that are after that short
|
||||
// read.
|
||||
auto it = boost::range::upper_bound(batch.partitions, std::move(*batch.last), cmp);
|
||||
batch.partitions.erase(it, batch.partitions.end());
|
||||
}
|
||||
}
|
||||
|
||||
auto final = std::vector<partition>();
|
||||
final.reserve(_partition_count);
|
||||
for (auto&& batch : merged | boost::adaptors::map_values) {
|
||||
std::move(batch.partitions.begin(), batch.partitions.end(), std::back_inserter(final));
|
||||
}
|
||||
|
||||
if (short_read) {
|
||||
// Short read row and partition counts may be incorrect, recalculate.
|
||||
_row_count = 0;
|
||||
_partition_count = 0;
|
||||
for (const auto& p : final) {
|
||||
_row_count += p.row_count();
|
||||
_partition_count += p.row_count() > 0;
|
||||
}
|
||||
|
||||
if (_row_count >= _cmd->row_limit || _partition_count > _cmd->partition_limit) {
|
||||
// Even though there was a short read contributing to the final
|
||||
// result we got limited by total row limit or partition limit.
|
||||
// Note that we cannot with trivial check make unset short read flag
|
||||
// in case _partition_count == _cmd->partition_limit since the short
|
||||
// read may have caused the last partition to contain less rows
|
||||
// than asked for.
|
||||
short_read = query::short_read::no;
|
||||
}
|
||||
}
|
||||
|
||||
// Trim back partition count and row count in case we overshot.
|
||||
// Should be rare for dense tables.
|
||||
while ((_partition_count > _cmd->partition_limit)
|
||||
|| (_partition_count && (_row_count - final.back().row_count() >= _cmd->row_limit))) {
|
||||
_row_count -= final.back().row_count();
|
||||
_partition_count -= final.back().row_count() > 0;
|
||||
final.pop_back();
|
||||
}
|
||||
if (_row_count > _cmd->row_limit) {
|
||||
auto mut = final.back().mut().unfreeze(_schema);
|
||||
static const auto all = std::vector<query::clustering_range>({query::clustering_range::make_open_ended_both_sides()});
|
||||
auto is_reversed = _cmd->slice.options.contains(query::partition_slice::option::reversed);
|
||||
auto final_rows = _cmd->row_limit - (_row_count - final.back().row_count());
|
||||
_row_count -= final.back().row_count();
|
||||
auto rc = mut.partition().compact_for_query(*_schema, _cmd->timestamp, all, is_reversed, final_rows);
|
||||
final.back() = partition(rc, freeze(mut));
|
||||
_row_count += rc;
|
||||
}
|
||||
|
||||
return reconcilable_result(_row_count, std::move(final), short_read, std::move(_memory_accounter).done());
|
||||
}
|
||||
bool short_read() const {
|
||||
return bool(_short_read);
|
||||
return bool(_stop_after_key) || (_short_read_allowed && _row_count > 0 && _memory_accounter.check());
|
||||
}
|
||||
unsigned partition_count() const {
|
||||
return _partition_count;
|
||||
@@ -3584,65 +3739,159 @@ public:
|
||||
};
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) {
|
||||
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr,
|
||||
tracing::trace_state_ptr trace_state, uint64_t max_size) {
|
||||
if (pr.is_singular()) {
|
||||
unsigned shard = _db.local().shard_of(pr.start()->value().token());
|
||||
return _db.invoke_on(shard, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.get_result_memory_limiter().new_read().then([&] (query::result_memory_accounter ma) {
|
||||
return _db.invoke_on(shard, [max_size, cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.get_result_memory_limiter().new_mutation_read(max_size).then([&] (query::result_memory_accounter ma) {
|
||||
return db.query_mutations(gs, *cmd, pr, std::move(ma), gt).then([] (reconcilable_result&& result) {
|
||||
return make_foreign(make_lw_shared(std::move(result)));
|
||||
});
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return query_nonsingular_mutations_locally(std::move(s), std::move(cmd), {pr}, std::move(trace_state));
|
||||
return query_nonsingular_mutations_locally(std::move(s), std::move(cmd), {pr}, std::move(trace_state), max_size);
|
||||
}
|
||||
}
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const compat::one_or_two_partition_ranges& pr, tracing::trace_state_ptr trace_state) {
|
||||
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const compat::one_or_two_partition_ranges& pr,
|
||||
tracing::trace_state_ptr trace_state, uint64_t max_size) {
|
||||
if (!pr.second) {
|
||||
return query_mutations_locally(std::move(s), std::move(cmd), pr.first, std::move(trace_state));
|
||||
return query_mutations_locally(std::move(s), std::move(cmd), pr.first, std::move(trace_state), max_size);
|
||||
} else {
|
||||
return query_nonsingular_mutations_locally(std::move(s), std::move(cmd), pr, std::move(trace_state));
|
||||
return query_nonsingular_mutations_locally(std::move(s), std::move(cmd), pr, std::move(trace_state), max_size);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct element_and_shard {
|
||||
unsigned element; // element in a partition range vector
|
||||
unsigned shard;
|
||||
};
|
||||
|
||||
bool operator==(element_and_shard a, element_and_shard b) {
|
||||
return a.element == b.element && a.shard == b.shard;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template <>
|
||||
struct hash<element_and_shard> {
|
||||
size_t operator()(element_and_shard es) const {
|
||||
return es.element * 31 + es.shard;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
namespace service {
|
||||
|
||||
struct partition_range_and_sort_key {
|
||||
query::partition_range pr;
|
||||
unsigned sort_key_shard_order; // for the same source partition range, we sort in shard order
|
||||
};
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||
storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& prs, tracing::trace_state_ptr trace_state) {
|
||||
storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& prs,
|
||||
tracing::trace_state_ptr trace_state, uint64_t max_size) {
|
||||
// no one permitted us to modify *cmd, so make a copy
|
||||
auto shard_cmd = make_lw_shared<query::read_command>(*cmd);
|
||||
return do_with(cmd,
|
||||
shard_cmd,
|
||||
mutation_result_merger{*cmd},
|
||||
1u,
|
||||
0u,
|
||||
false,
|
||||
static_cast<unsigned>(prs.size()),
|
||||
std::unordered_map<element_and_shard, partition_range_and_sort_key>{},
|
||||
mutation_result_merger{s, cmd},
|
||||
dht::ring_position_range_vector_sharder{prs},
|
||||
global_schema_ptr(s),
|
||||
tracing::global_trace_state_ptr(std::move(trace_state)),
|
||||
[this, s] (lw_shared_ptr<query::read_command>& cmd,
|
||||
[this, s, max_size] (lw_shared_ptr<query::read_command>& cmd,
|
||||
lw_shared_ptr<query::read_command>& shard_cmd,
|
||||
unsigned& shards_in_parallel,
|
||||
unsigned& mutation_result_merger_key,
|
||||
bool& no_more_ranges,
|
||||
unsigned& partition_range_count,
|
||||
std::unordered_map<element_and_shard, partition_range_and_sort_key>& shards_for_this_iteration,
|
||||
mutation_result_merger& mrm,
|
||||
dht::ring_position_range_vector_sharder& rprs,
|
||||
global_schema_ptr& gs,
|
||||
tracing::global_trace_state_ptr& gt) {
|
||||
return _db.local().get_result_memory_limiter().new_read().then([&, s] (query::result_memory_accounter ma) {
|
||||
return _db.local().get_result_memory_limiter().new_mutation_read(max_size).then([&, s] (query::result_memory_accounter ma) {
|
||||
mrm.memory() = std::move(ma);
|
||||
return repeat_until_value([&, s] () -> future<stdx::optional<reconcilable_result>> {
|
||||
auto now = rprs.next(*s);
|
||||
if (!now) {
|
||||
return make_ready_future<stdx::optional<reconcilable_result>>(std::move(mrm).get());
|
||||
// We don't want to query a sparsely populated table sequentially, because the latency
|
||||
// will go through the roof. We don't want to query a densely populated table in parallel,
|
||||
// because we'll throw away most of the results. So we'll exponentially increase
|
||||
// concurrency starting at 1, so we won't waste on dense tables and at most
|
||||
// `log(nr_shards) + ignore_msb_bits` latency multiplier for near-empty tables.
|
||||
shards_for_this_iteration.clear();
|
||||
// If we're reading from less than smp::count shards, then we can just append
|
||||
// each shard in order without sorting. If we're reading from more, then
|
||||
// we'll read from some shards at least twice, so the partitions within will be
|
||||
// out-of-order wrt. other shards
|
||||
auto retain_shard_order = true;
|
||||
for (auto i = 0u; i < shards_in_parallel; ++i) {
|
||||
auto now = rprs.next(*s);
|
||||
if (!now) {
|
||||
no_more_ranges = true;
|
||||
break;
|
||||
}
|
||||
// Let's see if this is a new shard, or if we can expand an existing range
|
||||
auto&& rng_ok = shards_for_this_iteration.emplace(element_and_shard{now->element, now->shard}, partition_range_and_sort_key{now->ring_range, i});
|
||||
if (!rng_ok.second) {
|
||||
// We saw this shard already, enlarge the range (we know now->ring_range came from the same partition range;
|
||||
// otherwise it would have had a unique now->element).
|
||||
auto& rng = rng_ok.first->second.pr;
|
||||
rng = nonwrapping_range<dht::ring_position>(std::move(rng.start()), std::move(now->ring_range.end()));
|
||||
// This range is no longer ordered with respect to the others, so:
|
||||
retain_shard_order = false;
|
||||
}
|
||||
}
|
||||
auto key_base = mutation_result_merger_key;
|
||||
|
||||
// prepare for next iteration
|
||||
// Each iteration uses a merger key that is either i in the loop above (so in the range [0, shards_in_parallel),
|
||||
// or, the element index in prs (so in the range [0, partition_range_count). Make room for sufficient keys.
|
||||
mutation_result_merger_key += std::max(shards_in_parallel, partition_range_count);
|
||||
shards_in_parallel *= 2;
|
||||
|
||||
shard_cmd->partition_limit = cmd->partition_limit - mrm.partition_count();
|
||||
shard_cmd->row_limit = cmd->row_limit - mrm.row_count();
|
||||
return _db.invoke_on(now->shard, [&, now = std::move(*now), gt] (database& db) {
|
||||
query::result_memory_accounter accounter(db.get_result_memory_limiter(), mrm.memory());
|
||||
return db.query_mutations(gs, *shard_cmd, now.ring_range, std::move(accounter), std::move(gt)).then([] (reconcilable_result&& rr) {
|
||||
return make_foreign(make_lw_shared(std::move(rr)));
|
||||
|
||||
return parallel_for_each(shards_for_this_iteration, [&, key_base, retain_shard_order] (const std::pair<const element_and_shard, partition_range_and_sort_key>& elem_shard_range) {
|
||||
auto&& elem = elem_shard_range.first.element;
|
||||
auto&& shard = elem_shard_range.first.shard;
|
||||
auto&& range = elem_shard_range.second.pr;
|
||||
auto sort_key_shard_order = elem_shard_range.second.sort_key_shard_order;
|
||||
return _db.invoke_on(shard, [&, range, gt, fstate = mrm.memory().state_for_another_shard()] (database& db) {
|
||||
query::result_memory_accounter accounter(db.get_result_memory_limiter(), std::move(fstate));
|
||||
return db.query_mutations(gs, *shard_cmd, range, std::move(accounter), std::move(gt)).then([] (reconcilable_result&& rr) {
|
||||
return make_foreign(make_lw_shared(std::move(rr)));
|
||||
});
|
||||
}).then([&, key_base, retain_shard_order, elem, sort_key_shard_order] (foreign_ptr<lw_shared_ptr<reconcilable_result>> partial_result) {
|
||||
// Each outer (sequential) iteration is in result order, so we pick increasing keys.
|
||||
// Within the inner (parallel) iteration, the results can be in order (if retain_shard_order), or not (if !retain_shard_order).
|
||||
// If the results are unordered, we still have to order them according to which element of prs they originated from.
|
||||
auto key = key_base; // for outer loop
|
||||
if (retain_shard_order) {
|
||||
key += sort_key_shard_order; // inner loop is ordered
|
||||
} else {
|
||||
key += elem; // inner loop ordered only by position within prs
|
||||
}
|
||||
mrm.add_result(key, std::move(partial_result));
|
||||
});
|
||||
}).then([&] (foreign_ptr<lw_shared_ptr<reconcilable_result>> rr) -> stdx::optional<reconcilable_result> {
|
||||
mrm.add_result(std::move(rr));
|
||||
if (mrm.short_read() || mrm.partition_count() >= cmd->partition_limit || mrm.row_count() >= cmd->row_limit) {
|
||||
return std::move(mrm).get();
|
||||
}).then([&] () -> stdx::optional<reconcilable_result> {
|
||||
if (mrm.short_read() || mrm.partition_count() >= cmd->partition_limit || mrm.row_count() >= cmd->row_limit || no_more_ranges) {
|
||||
return stdx::make_optional(std::move(mrm).get());
|
||||
}
|
||||
return stdx::nullopt;
|
||||
});
|
||||
|
||||
@@ -235,15 +235,18 @@ private:
|
||||
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state);
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr,
|
||||
query::result_request request,
|
||||
tracing::trace_state_ptr trace_state);
|
||||
future<query::result_digest, api::timestamp_type> query_singular_local_digest(schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state);
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> query_partition_key_range(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range> partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state);
|
||||
tracing::trace_state_ptr trace_state,
|
||||
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
|
||||
future<query::result_digest, api::timestamp_type> query_singular_local_digest(schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state,
|
||||
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> query_partition_key_range(lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range> partition_ranges, db::consistency_level cl, tracing::trace_state_ptr trace_state);
|
||||
std::vector<query::partition_range> get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range);
|
||||
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
|
||||
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t total_row_count = 0);
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
|
||||
uint32_t remaining_row_count, uint32_t remaining_partition_count);
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
@@ -262,7 +265,7 @@ private:
|
||||
template<typename Range>
|
||||
future<> mutate_internal(Range mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state);
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_nonsingular_mutations_locally(
|
||||
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& pr, tracing::trace_state_ptr trace_state);
|
||||
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& pr, tracing::trace_state_ptr trace_state, uint64_t max_size);
|
||||
|
||||
public:
|
||||
storage_proxy(distributed<database>& db);
|
||||
@@ -337,16 +340,19 @@ public:
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations_locally(
|
||||
schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range&,
|
||||
tracing::trace_state_ptr trace_state = nullptr);
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
|
||||
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations_locally(
|
||||
schema_ptr, lw_shared_ptr<query::read_command> cmd, const compat::one_or_two_partition_ranges&,
|
||||
tracing::trace_state_ptr trace_state = nullptr);
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations_locally(
|
||||
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& pr,
|
||||
tracing::trace_state_ptr trace_state = nullptr);
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
|
||||
|
||||
|
||||
future<> stop();
|
||||
|
||||
@@ -191,7 +191,8 @@ class partitioned_sstable_set : public sstable_set_impl {
|
||||
using map_iterator = interval_map_type::const_iterator;
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
interval_map_type _sstables;
|
||||
std::vector<shared_sstable> _unleveled_sstables;
|
||||
interval_map_type _leveled_sstables;
|
||||
private:
|
||||
static interval_type make_interval(const schema& s, const query::partition_range& range) {
|
||||
return interval_type::closed(
|
||||
@@ -207,16 +208,16 @@ private:
|
||||
}
|
||||
std::pair<map_iterator, map_iterator> query(const query::partition_range& range) const {
|
||||
if (range.start() && range.end()) {
|
||||
return _sstables.equal_range(make_interval(range));
|
||||
return _leveled_sstables.equal_range(make_interval(range));
|
||||
}
|
||||
else if (range.start() && !range.end()) {
|
||||
auto start = singular(range.start()->value());
|
||||
return { _sstables.lower_bound(start), _sstables.end() };
|
||||
return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() };
|
||||
} else if (!range.start() && range.end()) {
|
||||
auto end = singular(range.end()->value());
|
||||
return { _sstables.begin(), _sstables.upper_bound(end) };
|
||||
return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) };
|
||||
} else {
|
||||
return { _sstables.begin(), _sstables.end() };
|
||||
return { _leveled_sstables.begin(), _leveled_sstables.end() };
|
||||
}
|
||||
}
|
||||
public:
|
||||
@@ -234,29 +235,39 @@ public:
|
||||
while (b != e) {
|
||||
boost::copy(b++->second, std::inserter(result, result.end()));
|
||||
}
|
||||
return std::vector<shared_sstable>(result.begin(), result.end());
|
||||
auto r = _unleveled_sstables;
|
||||
r.insert(r.end(), result.begin(), result.end());
|
||||
return r;
|
||||
}
|
||||
virtual void insert(shared_sstable sst) override {
|
||||
auto first = sst->get_first_decorated_key().token();
|
||||
auto last = sst->get_last_decorated_key().token();
|
||||
using bound = query::partition_range::bound;
|
||||
_sstables.add({
|
||||
make_interval(
|
||||
query::partition_range(
|
||||
bound(dht::ring_position::starting_at(first)),
|
||||
bound(dht::ring_position::ending_at(last)))),
|
||||
value_set({sst})});
|
||||
if (sst->get_sstable_level() == 0) {
|
||||
_unleveled_sstables.push_back(std::move(sst));
|
||||
} else {
|
||||
auto first = sst->get_first_decorated_key().token();
|
||||
auto last = sst->get_last_decorated_key().token();
|
||||
using bound = query::partition_range::bound;
|
||||
_leveled_sstables.add({
|
||||
make_interval(
|
||||
query::partition_range(
|
||||
bound(dht::ring_position::starting_at(first)),
|
||||
bound(dht::ring_position::ending_at(last)))),
|
||||
value_set({sst})});
|
||||
}
|
||||
}
|
||||
virtual void erase(shared_sstable sst) override {
|
||||
auto first = sst->get_first_decorated_key().token();
|
||||
auto last = sst->get_last_decorated_key().token();
|
||||
using bound = query::partition_range::bound;
|
||||
_sstables.subtract({
|
||||
make_interval(
|
||||
query::partition_range(
|
||||
bound(dht::ring_position::starting_at(first)),
|
||||
bound(dht::ring_position::ending_at(last)))),
|
||||
value_set({sst})});
|
||||
if (sst->get_sstable_level() == 0) {
|
||||
_unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end());
|
||||
} else {
|
||||
auto first = sst->get_first_decorated_key().token();
|
||||
auto last = sst->get_last_decorated_key().token();
|
||||
using bound = query::partition_range::bound;
|
||||
_leveled_sstables.subtract({
|
||||
make_interval(
|
||||
query::partition_range(
|
||||
bound(dht::ring_position::starting_at(first)),
|
||||
bound(dht::ring_position::ending_at(last)))),
|
||||
value_set({sst})});
|
||||
}
|
||||
}
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
@@ -264,6 +275,7 @@ public:
|
||||
|
||||
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
schema_ptr _schema;
|
||||
const std::vector<shared_sstable>& _unleveled_sstables;
|
||||
map_iterator _it;
|
||||
const map_iterator _end;
|
||||
private:
|
||||
@@ -272,32 +284,35 @@ private:
|
||||
{i.upper().token(), boost::icl::is_right_closed(i.bounds())});
|
||||
}
|
||||
public:
|
||||
incremental_selector(schema_ptr schema, const interval_map_type& sstables)
|
||||
incremental_selector(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables)
|
||||
: _schema(std::move(schema))
|
||||
, _it(sstables.begin())
|
||||
, _end(sstables.end()) {
|
||||
, _unleveled_sstables(unleveled_sstables)
|
||||
, _it(leveled_sstables.begin())
|
||||
, _end(leveled_sstables.end()) {
|
||||
}
|
||||
virtual std::pair<nonwrapping_range<dht::token>, std::vector<shared_sstable>> select(const dht::token& token) override {
|
||||
auto pr = query::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token));
|
||||
auto interval = make_interval(*_schema, std::move(pr));
|
||||
auto ssts = _unleveled_sstables;
|
||||
|
||||
while (_it != _end) {
|
||||
if (boost::icl::contains(_it->first, interval)) {
|
||||
return std::make_pair(to_token_range(_it->first), std::vector<shared_sstable>(_it->second.begin(), _it->second.end()));
|
||||
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
|
||||
return std::make_pair(to_token_range(_it->first), std::move(ssts));
|
||||
}
|
||||
// we don't want to skip current interval if token lies before it.
|
||||
if (boost::icl::lower_less(interval, _it->first)) {
|
||||
return std::make_pair(nonwrapping_range<dht::token>::make({token, true}, {_it->first.lower().token(), false}),
|
||||
std::vector<shared_sstable>());
|
||||
std::move(ssts));
|
||||
}
|
||||
_it++;
|
||||
}
|
||||
return std::make_pair(nonwrapping_range<dht::token>::make_open_ended_both_sides(), std::vector<shared_sstable>());
|
||||
return std::make_pair(nonwrapping_range<dht::token>::make_open_ended_both_sides(), std::move(ssts));
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> partitioned_sstable_set::make_incremental_selector() const {
|
||||
return std::make_unique<incremental_selector>(_schema, _sstables);
|
||||
return std::make_unique<incremental_selector>(_schema, _unleveled_sstables, _leveled_sstables);
|
||||
}
|
||||
|
||||
class compaction_strategy_impl {
|
||||
|
||||
@@ -71,6 +71,12 @@ void compression::set_compressor(compressor c) {
|
||||
}
|
||||
}
|
||||
|
||||
// locate() takes a byte position in the uncompressed stream, and finds the
|
||||
// the location of the compressed chunk on disk which contains it, and the
|
||||
// offset in this chunk.
|
||||
// locate() may only be used for offsets of actual bytes, and in particular
|
||||
// the end-of-file position (one past the last byte) MUST not be used. If the
|
||||
// caller wants to read from the end of file, it should simply read nothing.
|
||||
compression::chunk_and_offset
|
||||
compression::locate(uint64_t position) const {
|
||||
auto ucl = uncompressed_chunk_length();
|
||||
@@ -310,6 +316,9 @@ public:
|
||||
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
|
||||
_pos += n;
|
||||
assert(_pos <= _end_pos);
|
||||
if (_pos == _end_pos) {
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
}
|
||||
auto addr = _compression_metadata->locate(_pos);
|
||||
auto underlying_n = addr.chunk_start - _underlying_pos;
|
||||
_underlying_pos = addr.chunk_start;
|
||||
|
||||
@@ -903,7 +903,7 @@ sstables::sstable::find_disk_ranges(
|
||||
uint64_t range_start_pos;
|
||||
auto& range_end = ck_ranges.begin()->end();
|
||||
|
||||
auto cmp = clustering_key_prefix::tri_compare(*schema);
|
||||
auto cmp = clustering_key_prefix::prefix_equal_tri_compare(*schema);
|
||||
while (num_blocks--) {
|
||||
if (data.size() < 2) {
|
||||
// When we break out of this loop, we give up on
|
||||
|
||||
@@ -1703,7 +1703,7 @@ file_writer components_writer::index_file_writer(sstable& sst, const io_priority
|
||||
options.buffer_size = sst.sstable_buffer_size;
|
||||
options.io_priority_class = pc;
|
||||
options.write_behind = 10;
|
||||
return file_writer(sst._index_file, std::move(options));
|
||||
return file_writer(std::move(sst._index_file), std::move(options));
|
||||
}
|
||||
|
||||
// Get the currently loaded configuration, or the default configuration in
|
||||
@@ -1855,7 +1855,6 @@ void components_writer::consume_end_of_stream() {
|
||||
seal_summary(_sst._summary, std::move(_first_key), std::move(_last_key)); // what if there is only one partition? what if it is empty?
|
||||
|
||||
_index.close().get();
|
||||
_sst._index_file = file(); // index->close() closed _index_file
|
||||
|
||||
if (_sst.has_component(sstable::component_type::CompressionInfo)) {
|
||||
_sst._collector.add_compression_ratio(_sst._compression.compressed_file_length(), _sst._compression.uncompressed_file_length());
|
||||
@@ -1905,20 +1904,20 @@ void sstable_writer::prepare_file_writer()
|
||||
options.write_behind = 10;
|
||||
|
||||
if (!_compression_enabled) {
|
||||
_writer = make_shared<checksummed_file_writer>(_sst._data_file, std::move(options), true);
|
||||
_writer = std::make_unique<checksummed_file_writer>(std::move(_sst._data_file), std::move(options), true);
|
||||
} else {
|
||||
prepare_compression(_sst._compression, _schema);
|
||||
_writer = make_shared<file_writer>(make_compressed_file_output_stream(_sst._data_file, std::move(options), &_sst._compression));
|
||||
_writer = std::make_unique<file_writer>(make_compressed_file_output_stream(std::move(_sst._data_file), std::move(options), &_sst._compression));
|
||||
}
|
||||
}
|
||||
|
||||
void sstable_writer::finish_file_writer()
|
||||
{
|
||||
_writer->close().get();
|
||||
_sst._data_file = file(); // w->close() closed _data_file
|
||||
auto writer = std::move(_writer);
|
||||
writer->close().get();
|
||||
|
||||
if (!_compression_enabled) {
|
||||
auto chksum_wr = static_pointer_cast<checksummed_file_writer>(_writer);
|
||||
auto chksum_wr = static_cast<checksummed_file_writer*>(writer.get());
|
||||
write_digest(_sst._write_error_handler, _sst.filename(sstable::component_type::Digest), chksum_wr->full_checksum());
|
||||
write_crc(_sst._write_error_handler, _sst.filename(sstable::component_type::CRC), chksum_wr->finalize_checksum());
|
||||
} else {
|
||||
@@ -1926,6 +1925,16 @@ void sstable_writer::finish_file_writer()
|
||||
}
|
||||
}
|
||||
|
||||
sstable_writer::~sstable_writer() {
|
||||
if (_writer) {
|
||||
try {
|
||||
_writer->close().get();
|
||||
} catch (...) {
|
||||
sstlog.error("sstable_writer failed to close file: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
|
||||
uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc)
|
||||
: _sst(sst)
|
||||
|
||||
@@ -781,7 +781,7 @@ class sstable_writer {
|
||||
bool _backup;
|
||||
bool _leave_unsealed;
|
||||
bool _compression_enabled;
|
||||
shared_ptr<file_writer> _writer;
|
||||
std::unique_ptr<file_writer> _writer;
|
||||
stdx::optional<components_writer> _components_writer;
|
||||
private:
|
||||
void prepare_file_writer();
|
||||
@@ -789,6 +789,10 @@ private:
|
||||
public:
|
||||
sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
|
||||
uint64_t max_sstable_size, bool backup, bool leave_unsealed, const io_priority_class& pc);
|
||||
~sstable_writer();
|
||||
sstable_writer(sstable_writer&& o) : _sst(o._sst), _schema(o._schema), _pc(o._pc), _backup(o._backup),
|
||||
_leave_unsealed(o._leave_unsealed), _compression_enabled(o._compression_enabled), _writer(std::move(o._writer)),
|
||||
_components_writer(std::move(o._components_writer)) {}
|
||||
void consume_new_partition(const dht::decorated_key& dk) { return _components_writer->consume_new_partition(dk); }
|
||||
void consume(tombstone t) { _components_writer->consume(t); }
|
||||
stop_iteration consume(static_row&& sr) { return _components_writer->consume(std::move(sr)); }
|
||||
|
||||
@@ -153,7 +153,12 @@ struct summary_ka {
|
||||
* Similar to origin off heap size
|
||||
*/
|
||||
uint64_t memory_footprint() const {
|
||||
return sizeof(summary_entry) * entries.size() + sizeof(uint32_t) * positions.size() + sizeof(*this);
|
||||
auto sz = sizeof(summary_entry) * entries.size() + sizeof(uint32_t) * positions.size() + sizeof(*this);
|
||||
sz += first_key.value.size() + last_key.value.size();
|
||||
for (auto& e : entries) {
|
||||
sz += e.key.size();
|
||||
}
|
||||
return sz;
|
||||
}
|
||||
|
||||
explicit operator bool() const {
|
||||
|
||||
@@ -56,23 +56,24 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
pranges.emplace_back(query::partition_range::make_singular(dht::global_partitioner().decorate_key(*s, std::move(pkey))));
|
||||
}
|
||||
|
||||
auto max_size = std::numeric_limits<size_t>::max();
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), 3);
|
||||
auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr).get0();
|
||||
auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
|
||||
}
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
|
||||
query::max_rows, gc_clock::now(), std::experimental::nullopt, 5);
|
||||
auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr).get0();
|
||||
auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(5);
|
||||
}
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(),
|
||||
query::max_rows, gc_clock::now(), std::experimental::nullopt, 3);
|
||||
auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr).get0();
|
||||
auto result = db.query(s, cmd, query::result_request::only_result, pranges, nullptr, max_size).get0();
|
||||
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(3);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -131,7 +131,7 @@ BOOST_AUTO_TEST_CASE(test_simple_compound)
|
||||
BOOST_REQUIRE_EQUAL(buf1.size(), 12);
|
||||
|
||||
bytes_ostream buf2;
|
||||
ser::writer_of_writable_simple_compound wowsc(buf2);
|
||||
ser::writer_of_writable_simple_compound<bytes_ostream> wowsc(buf2);
|
||||
std::move(wowsc).write_foo(sc.foo).write_bar(sc.bar).end_writable_simple_compound();
|
||||
BOOST_REQUIRE_EQUAL(buf1.linearize(), buf2.linearize());
|
||||
|
||||
@@ -170,7 +170,7 @@ BOOST_AUTO_TEST_CASE(test_vector)
|
||||
BOOST_REQUIRE_EQUAL(buf1.size(), 136);
|
||||
|
||||
bytes_ostream buf2;
|
||||
ser::writer_of_writable_vectors_of_compounds wowvoc(buf2);
|
||||
ser::writer_of_writable_vectors_of_compounds<bytes_ostream> wowvoc(buf2);
|
||||
auto first_writer = std::move(wowvoc).start_first();
|
||||
for (auto& c : vec1) {
|
||||
first_writer.add().write_foo(c.foo).write_bar(c.bar).end_writable_simple_compound();
|
||||
@@ -221,7 +221,7 @@ BOOST_AUTO_TEST_CASE(test_variant)
|
||||
simple_compound sc2 = { 0x12344321, 0x56788765 };
|
||||
|
||||
bytes_ostream buf;
|
||||
ser::writer_of_writable_variants wowv(buf);
|
||||
ser::writer_of_writable_variants<bytes_ostream> wowv(buf);
|
||||
auto second_writer = std::move(wowv).write_id(17).write_first_simple_compound(sc).start_second_writable_vector().start_vector();
|
||||
for (auto&& v : vec) {
|
||||
second_writer.add_vector(v);
|
||||
|
||||
@@ -29,7 +29,9 @@
|
||||
#include <seastar/core/timer.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/tests/test-utils.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <deque>
|
||||
#include "utils/phased_barrier.hh"
|
||||
|
||||
#include "utils/logalloc.hh"
|
||||
#include "utils/managed_ref.hh"
|
||||
@@ -529,11 +531,7 @@ inline void quiesce(FutureType&& fut) {
|
||||
// a request may be broken into many continuations. While we could just yield many times, the
|
||||
// exact amount needed to guarantee execution would be dependent on the internals of the
|
||||
// implementation, we want to avoid that.
|
||||
timer<> tmr;
|
||||
tmr.set_callback([] { BOOST_FAIL("The future we were waiting for took too long to get ready"); });
|
||||
tmr.arm(2s);
|
||||
fut.get();
|
||||
tmr.cancel();
|
||||
with_timeout(lowres_clock::now() + 2s, std::move(fut)).get();
|
||||
}
|
||||
|
||||
// Simple RAII structure that wraps around a region_group
|
||||
@@ -859,15 +857,22 @@ class test_reclaimer: public region_group_reclaimer {
|
||||
region_group _rg;
|
||||
std::vector<size_t> _reclaim_sizes;
|
||||
bool _shutdown = false;
|
||||
shared_promise<> _unleash_reclaimer;
|
||||
seastar::gate _reclaimers_done;
|
||||
public:
|
||||
virtual void start_reclaiming() override {
|
||||
while (this->under_pressure()) {
|
||||
size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict();
|
||||
_result_accumulator->_reclaim_sizes.push_back(reclaimed);
|
||||
}
|
||||
virtual void start_reclaiming() noexcept override {
|
||||
with_gate(_reclaimers_done, [this] {
|
||||
return _unleash_reclaimer.get_shared_future().then([this] {
|
||||
while (this->under_pressure()) {
|
||||
size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict();
|
||||
_result_accumulator->_reclaim_sizes.push_back(reclaimed);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
~test_reclaimer() {
|
||||
_reclaimers_done.close().get();
|
||||
_rg.shutdown().get();
|
||||
}
|
||||
|
||||
@@ -881,6 +886,10 @@ public:
|
||||
|
||||
test_reclaimer(size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(this), _rg(*this) {}
|
||||
test_reclaimer(test_reclaimer& parent, size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(&parent), _rg(&parent._rg, *this) {}
|
||||
|
||||
void unleash() {
|
||||
_unleash_reclaimer.set_value();
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) {
|
||||
@@ -888,6 +897,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) {
|
||||
// allocate a single region to exhaustion, and make sure active reclaim is activated.
|
||||
test_reclaimer simple(logalloc::segment_size);
|
||||
test_async_reclaim_region simple_region(simple.rg(), logalloc::segment_size);
|
||||
simple.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed something
|
||||
auto fut = simple.rg().run_when_memory_available([] {});
|
||||
@@ -912,6 +922,7 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_worst_offen
|
||||
test_async_reclaim_region small_region(simple.rg(), logalloc::segment_size);
|
||||
test_async_reclaim_region medium_region(simple.rg(), 2 * logalloc::segment_size);
|
||||
test_async_reclaim_region big_region(simple.rg(), 3 * logalloc::segment_size);
|
||||
simple.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed
|
||||
auto fut = simple.rg().run_when_memory_available([&simple] {
|
||||
@@ -941,6 +952,9 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_leaf_offend
|
||||
test_async_reclaim_region small_region(small_leaf.rg(), logalloc::segment_size);
|
||||
test_async_reclaim_region medium_region(root.rg(), 2 * logalloc::segment_size);
|
||||
test_async_reclaim_region big_region(large_leaf.rg(), 3 * logalloc::segment_size);
|
||||
root.unleash();
|
||||
large_leaf.unleash();
|
||||
small_leaf.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed. Try at the root, and we'll make sure
|
||||
// that the leaves are forced correctly.
|
||||
@@ -967,6 +981,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_ancestor_bl
|
||||
test_reclaimer leaf(root, logalloc::segment_size);
|
||||
|
||||
test_async_reclaim_region root_region(root.rg(), logalloc::segment_size);
|
||||
root.unleash();
|
||||
leaf.unleash();
|
||||
|
||||
// Can't run this function until we have reclaimed. Try at the leaf, and we'll make sure
|
||||
// that the root reclaims
|
||||
@@ -992,6 +1008,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_big_region_
|
||||
test_async_reclaim_region root_region(root.rg(), 4 * logalloc::segment_size);
|
||||
test_async_reclaim_region big_leaf_region(leaf.rg(), 3 * logalloc::segment_size);
|
||||
test_async_reclaim_region small_leaf_region(leaf.rg(), 2 * logalloc::segment_size);
|
||||
root.unleash();
|
||||
leaf.unleash();
|
||||
|
||||
auto fut = root.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
|
||||
@@ -1018,6 +1036,8 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_r
|
||||
test_reclaimer leaf(root, logalloc::segment_size);
|
||||
|
||||
test_async_reclaim_region leaf_region(leaf.rg(), logalloc::segment_size);
|
||||
root.unleash();
|
||||
leaf.unleash();
|
||||
|
||||
auto fut_root = root.rg().run_when_memory_available([&root] {
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
|
||||
@@ -1037,3 +1057,117 @@ SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_r
|
||||
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size);
|
||||
});
|
||||
}
|
||||
|
||||
// Reproduces issue #2021
|
||||
SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_region_group_size) {
|
||||
return seastar::async([] {
|
||||
#ifndef DEFAULT_ALLOCATOR // Because we need memory::stats().free_memory();
|
||||
logging::logger_registry().set_logger_level("lsa", seastar::log_level::debug);
|
||||
|
||||
auto free_space = memory::stats().free_memory();
|
||||
size_t threshold = size_t(0.75 * free_space);
|
||||
region_group_reclaimer recl(threshold, threshold);
|
||||
region_group gr(recl);
|
||||
auto close_gr = defer([&gr] { gr.shutdown().get(); });
|
||||
region r(gr);
|
||||
|
||||
with_allocator(r.allocator(), [&] {
|
||||
std::vector<managed_bytes> objs;
|
||||
|
||||
r.make_evictable([&] {
|
||||
if (objs.empty()) {
|
||||
return memory::reclaiming_result::reclaimed_nothing;
|
||||
}
|
||||
with_allocator(r.allocator(), [&] {
|
||||
objs.pop_back();
|
||||
});
|
||||
return memory::reclaiming_result::reclaimed_something;
|
||||
});
|
||||
|
||||
auto fill_to_pressure = [&] {
|
||||
while (!recl.under_pressure()) {
|
||||
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), 1024));
|
||||
}
|
||||
};
|
||||
|
||||
utils::phased_barrier request_barrier;
|
||||
auto wait_for_requests = defer([&] { request_barrier.advance_and_await().get(); });
|
||||
|
||||
for (int i = 0; i < 1000000; ++i) {
|
||||
fill_to_pressure();
|
||||
future<> f = gr.run_when_memory_available([&, op = request_barrier.start()] {
|
||||
// Trigger group size change (Refs issue #2021)
|
||||
gr.update(-10);
|
||||
gr.update(+10);
|
||||
});
|
||||
BOOST_REQUIRE(!f.available());
|
||||
}
|
||||
|
||||
// Release
|
||||
while (recl.under_pressure()) {
|
||||
objs.pop_back();
|
||||
}
|
||||
});
|
||||
#endif
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_reclaiming_runs_as_long_as_there_is_soft_pressure) {
|
||||
return seastar::async([] {
|
||||
size_t hard_threshold = logalloc::segment_size * 8;
|
||||
size_t soft_threshold = hard_threshold / 2;
|
||||
|
||||
class reclaimer : public region_group_reclaimer {
|
||||
bool _reclaim = false;
|
||||
protected:
|
||||
void start_reclaiming() noexcept override {
|
||||
_reclaim = true;
|
||||
}
|
||||
|
||||
void stop_reclaiming() noexcept override {
|
||||
_reclaim = false;
|
||||
}
|
||||
public:
|
||||
reclaimer(size_t hard_threshold, size_t soft_threshold)
|
||||
: region_group_reclaimer(hard_threshold, soft_threshold)
|
||||
{ }
|
||||
bool reclaiming() const { return _reclaim; };
|
||||
};
|
||||
|
||||
reclaimer recl(hard_threshold, soft_threshold);
|
||||
region_group gr(recl);
|
||||
auto close_gr = defer([&gr] { gr.shutdown().get(); });
|
||||
region r(gr);
|
||||
|
||||
with_allocator(r.allocator(), [&] {
|
||||
std::vector<managed_bytes> objs;
|
||||
|
||||
BOOST_REQUIRE(!recl.reclaiming());
|
||||
|
||||
while (!recl.over_soft_limit()) {
|
||||
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(recl.reclaiming());
|
||||
|
||||
while (!recl.under_pressure()) {
|
||||
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(recl.reclaiming());
|
||||
|
||||
while (recl.under_pressure()) {
|
||||
objs.pop_back();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(recl.over_soft_limit());
|
||||
BOOST_REQUIRE(recl.reclaiming());
|
||||
|
||||
while (recl.over_soft_limit()) {
|
||||
objs.pop_back();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(!recl.reclaiming());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -76,8 +76,10 @@ static query::partition_slice make_full_slice(const schema& s) {
|
||||
return partition_slice_builder(s).build();
|
||||
}
|
||||
|
||||
static auto inf32 = std::numeric_limits<unsigned>::max();
|
||||
|
||||
query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) {
|
||||
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice));
|
||||
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
@@ -460,25 +462,24 @@ SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
|
||||
auto src = make_source({m1});
|
||||
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(partition_key::from_single_value(*s, "key2"), s);
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1702,9 +1702,9 @@ static lw_shared_ptr<sstable> add_sstable_for_overlapping_test(lw_shared_ptr<col
|
||||
column_family_test(cf).add_sstable(sst);
|
||||
return sst;
|
||||
}
|
||||
static lw_shared_ptr<sstable> sstable_for_overlapping_test(const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key) {
|
||||
static lw_shared_ptr<sstable> sstable_for_overlapping_test(const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) {
|
||||
auto sst = make_lw_shared<sstable>(schema, "", gen, la, big);
|
||||
sstables::test(sst).set_values(std::move(first_key), std::move(last_key), {});
|
||||
sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key));
|
||||
return sst;
|
||||
}
|
||||
|
||||
@@ -3100,29 +3100,52 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(8);
|
||||
|
||||
sstable_set set = cs.make_sstable_set(s);
|
||||
set.insert(sstable_for_overlapping_test(s, 1, key_and_token_pair[0].first, key_and_token_pair[1].first));
|
||||
set.insert(sstable_for_overlapping_test(s, 2, key_and_token_pair[0].first, key_and_token_pair[1].first));
|
||||
set.insert(sstable_for_overlapping_test(s, 3, key_and_token_pair[3].first, key_and_token_pair[4].first));
|
||||
set.insert(sstable_for_overlapping_test(s, 4, key_and_token_pair[4].first, key_and_token_pair[4].first));
|
||||
set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first));
|
||||
|
||||
sstable_set::incremental_selector selector = set.make_incremental_selector();
|
||||
auto check = [&selector] (const dht::token& token, std::unordered_set<int64_t> expected_gens) {
|
||||
auto check = [] (sstable_set::incremental_selector& selector, const dht::token& token, std::unordered_set<int64_t> expected_gens) {
|
||||
auto sstables = selector.select(token);
|
||||
BOOST_REQUIRE(sstables.size() == expected_gens.size());
|
||||
for (auto& sst : sstables) {
|
||||
BOOST_REQUIRE(expected_gens.count(sst->generation()) == 1);
|
||||
}
|
||||
};
|
||||
check(key_and_token_pair[0].second, {1, 2});
|
||||
check(key_and_token_pair[1].second, {1, 2});
|
||||
check(key_and_token_pair[2].second, {});
|
||||
check(key_and_token_pair[3].second, {3});
|
||||
check(key_and_token_pair[4].second, {3, 4, 5});
|
||||
check(key_and_token_pair[5].second, {5});
|
||||
check(key_and_token_pair[6].second, {});
|
||||
check(key_and_token_pair[7].second, {});
|
||||
|
||||
{
|
||||
sstable_set set = cs.make_sstable_set(s);
|
||||
set.insert(sstable_for_overlapping_test(s, 1, key_and_token_pair[0].first, key_and_token_pair[1].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 2, key_and_token_pair[0].first, key_and_token_pair[1].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 3, key_and_token_pair[3].first, key_and_token_pair[4].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 4, key_and_token_pair[4].first, key_and_token_pair[4].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first, 1));
|
||||
|
||||
sstable_set::incremental_selector sel = set.make_incremental_selector();
|
||||
check(sel, key_and_token_pair[0].second, {1, 2});
|
||||
check(sel, key_and_token_pair[1].second, {1, 2});
|
||||
check(sel, key_and_token_pair[2].second, {});
|
||||
check(sel, key_and_token_pair[3].second, {3});
|
||||
check(sel, key_and_token_pair[4].second, {3, 4, 5});
|
||||
check(sel, key_and_token_pair[5].second, {5});
|
||||
check(sel, key_and_token_pair[6].second, {});
|
||||
check(sel, key_and_token_pair[7].second, {});
|
||||
}
|
||||
|
||||
{
|
||||
sstable_set set = cs.make_sstable_set(s);
|
||||
set.insert(sstable_for_overlapping_test(s, 0, key_and_token_pair[0].first, key_and_token_pair[1].first, 0));
|
||||
set.insert(sstable_for_overlapping_test(s, 1, key_and_token_pair[0].first, key_and_token_pair[1].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 2, key_and_token_pair[0].first, key_and_token_pair[1].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 3, key_and_token_pair[3].first, key_and_token_pair[4].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 4, key_and_token_pair[4].first, key_and_token_pair[4].first, 1));
|
||||
set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first, 1));
|
||||
|
||||
sstable_set::incremental_selector sel = set.make_incremental_selector();
|
||||
check(sel, key_and_token_pair[0].second, {0, 1, 2});
|
||||
check(sel, key_and_token_pair[1].second, {0, 1, 2});
|
||||
check(sel, key_and_token_pair[2].second, {0});
|
||||
check(sel, key_and_token_pair[3].second, {0, 3});
|
||||
check(sel, key_and_token_pair[4].second, {0, 3, 4, 5});
|
||||
check(sel, key_and_token_pair[5].second, {0, 5});
|
||||
check(sel, key_and_token_pair[6].second, {0});
|
||||
check(sel, key_and_token_pair[7].second, {0});
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -118,11 +118,27 @@ future<> trace_keyspace_helper::setup_table(const sstring& name, const sstring&
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::statements::raw::cf_statement> parsed = static_pointer_cast<
|
||||
cql3::statements::raw::cf_statement>(cql3::query_processor::parse_statement(cql));
|
||||
parsed->prepare_keyspace(KEYSPACE_NAME);
|
||||
::shared_ptr<cql3::statements::create_table_statement> statement =
|
||||
static_pointer_cast<cql3::statements::create_table_statement>(
|
||||
parsed->prepare(db, qp.get_cql_stats())->statement);
|
||||
auto schema = statement->get_cf_meta_data();
|
||||
|
||||
// Generate the CF UUID based on its KF names. This is needed to ensure that
|
||||
// all Nodes that create it would create it with the same UUID and we don't
|
||||
// hit the #420 issue.
|
||||
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
|
||||
|
||||
schema_builder b(schema);
|
||||
b.set_uuid(uuid);
|
||||
|
||||
// We don't care it it fails really - this may happen due to concurrent
|
||||
// "CREATE TABLE" invocation on different Nodes.
|
||||
// The important thing is that it will converge eventually (some traces may
|
||||
// be lost in a process but that's ok).
|
||||
return qp.process(cql, db::consistency_level::ONE).discard_result().handle_exception([this] (auto ep) {});
|
||||
return service::get_local_migration_manager().announce_new_column_family(b.build(), false).discard_result().handle_exception([this] (auto ep) {});;
|
||||
}
|
||||
|
||||
bool trace_keyspace_helper::cache_sessions_table_handles(const schema_ptr& schema) {
|
||||
@@ -218,7 +234,8 @@ future<> trace_keyspace_helper::start() {
|
||||
std::map<sstring, sstring> opts;
|
||||
opts["replication_factor"] = "2";
|
||||
auto ksm = keyspace_metadata::new_keyspace(KEYSPACE_NAME, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
|
||||
service::get_local_migration_manager().announce_new_keyspace(ksm, false).get();
|
||||
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. See issue #2129.
|
||||
service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false).get();
|
||||
}
|
||||
|
||||
// Create tables
|
||||
|
||||
@@ -55,9 +55,10 @@ std::vector<sstring> trace_type_names = {
|
||||
"REPAIR"
|
||||
};
|
||||
|
||||
tracing::tracing(const sstring& tracing_backend_helper_class_name)
|
||||
tracing::tracing(sstring tracing_backend_helper_class_name)
|
||||
: _write_timer([this] { write_timer_callback(); })
|
||||
, _thread_name(seastar::format("shard {:d}", engine().cpu_id()))
|
||||
, _tracing_backend_helper_class_name(std::move(tracing_backend_helper_class_name))
|
||||
, _registrations{
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
@@ -93,27 +94,23 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name)
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, _flushing_records))}
|
||||
, _gen(std::random_device()())
|
||||
, _slow_query_duration_threshold(default_slow_query_duraion_threshold)
|
||||
, _slow_query_record_ttl(default_slow_query_record_ttl) {
|
||||
try {
|
||||
_tracing_backend_helper_ptr = create_object<i_tracing_backend_helper>(tracing_backend_helper_class_name, *this);
|
||||
} catch (no_such_class& e) {
|
||||
tracing_logger.error("Can't create tracing backend helper {}: not supported", tracing_backend_helper_class_name);
|
||||
throw;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
, _slow_query_record_ttl(default_slow_query_record_ttl) {}
|
||||
|
||||
future<> tracing::create_tracing(sstring tracing_backend_class_name) {
|
||||
return tracing_instance().start(std::move(tracing_backend_class_name));
|
||||
}
|
||||
|
||||
future<> tracing::create_tracing(const sstring& tracing_backend_class_name) {
|
||||
return tracing_instance().start(tracing_backend_class_name).then([] {
|
||||
return tracing_instance().invoke_on_all([] (tracing& local_tracing) {
|
||||
return local_tracing.start();
|
||||
});
|
||||
future<> tracing::start_tracing() {
|
||||
return tracing_instance().invoke_on_all([] (tracing& local_tracing) {
|
||||
return local_tracing.start();
|
||||
});
|
||||
}
|
||||
|
||||
trace_state_ptr tracing::create_session(trace_type type, trace_state_props_set props) {
|
||||
trace_state_ptr tstate;
|
||||
if (!started()) {
|
||||
return trace_state_ptr();
|
||||
}
|
||||
|
||||
try {
|
||||
// Don't create a session if its records are likely to be dropped
|
||||
if (!may_create_new_session()) {
|
||||
@@ -129,6 +126,10 @@ trace_state_ptr tracing::create_session(trace_type type, trace_state_props_set p
|
||||
}
|
||||
|
||||
trace_state_ptr tracing::create_session(const trace_info& secondary_session_info) {
|
||||
if (!started()) {
|
||||
return trace_state_ptr();
|
||||
}
|
||||
|
||||
try {
|
||||
// Don't create a session if its records are likely to be dropped
|
||||
if (!may_create_new_session(secondary_session_info.session_id)) {
|
||||
@@ -144,7 +145,17 @@ trace_state_ptr tracing::create_session(const trace_info& secondary_session_info
|
||||
}
|
||||
|
||||
future<> tracing::start() {
|
||||
try {
|
||||
_tracing_backend_helper_ptr = create_object<i_tracing_backend_helper>(_tracing_backend_helper_class_name, *this);
|
||||
} catch (no_such_class& e) {
|
||||
tracing_logger.error("Can't create tracing backend helper {}: not supported", _tracing_backend_helper_class_name);
|
||||
throw;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
|
||||
return _tracing_backend_helper_ptr->start().then([this] {
|
||||
_down = false;
|
||||
_write_timer.arm(write_period);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@
|
||||
#include <vector>
|
||||
#include <atomic>
|
||||
#include <random>
|
||||
#include <seastar/core/scollectd.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "gc_clock.hh"
|
||||
@@ -345,10 +346,15 @@ private:
|
||||
|
||||
records_bulk _pending_for_write_records_bulk;
|
||||
timer<lowres_clock> _write_timer;
|
||||
bool _down = false;
|
||||
// _down becomes FALSE after the local service is fully initialized and
|
||||
// tracing records are allowed to be created and collected. It becomes TRUE
|
||||
// after the shutdown() call and prevents further write attempts to I/O
|
||||
// backend.
|
||||
bool _down = true;
|
||||
bool _slow_query_logging_enabled = false;
|
||||
std::unique_ptr<i_tracing_backend_helper> _tracing_backend_helper_ptr;
|
||||
sstring _thread_name;
|
||||
sstring _tracing_backend_helper_class_name;
|
||||
scollectd::registrations _registrations;
|
||||
double _trace_probability = 0.0; // keep this one for querying purposes
|
||||
uint64_t _normalized_trace_probability = 0;
|
||||
@@ -376,8 +382,13 @@ public:
|
||||
return tracing_instance().local();
|
||||
}
|
||||
|
||||
static future<> create_tracing(const sstring& tracing_backend_helper_class_name);
|
||||
tracing(const sstring& tracing_backend_helper_class_name);
|
||||
bool started() const {
|
||||
return !_down;
|
||||
}
|
||||
|
||||
static future<> create_tracing(sstring tracing_backend_helper_class_name);
|
||||
static future<> start_tracing();
|
||||
tracing(sstring tracing_backend_helper_class_name);
|
||||
|
||||
// Initialize a tracing backend (e.g. tracing_keyspace or logstash)
|
||||
future<> start();
|
||||
|
||||
@@ -662,9 +662,9 @@ future<> cql_server::connection::process_request() {
|
||||
auto bv = bytes_view{reinterpret_cast<const int8_t*>(buf.begin()), buf.size()};
|
||||
auto cpu = pick_request_cpu();
|
||||
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested] () mutable {
|
||||
return this->process_request_one(bv, op, stream, std::move(client_state), tracing_requested).then([](auto&& response) {
|
||||
return this->process_request_one(bv, op, stream, std::move(client_state), tracing_requested).then([tracing_requested] (auto&& response) {
|
||||
auto& tracing_session_id_ptr = response.second.tracing_session_id_ptr();
|
||||
if (tracing_session_id_ptr) {
|
||||
if (tracing_requested == tracing_request_type::write_on_close && tracing_session_id_ptr) {
|
||||
response.first->set_tracing_id(*tracing_session_id_ptr);
|
||||
}
|
||||
return std::make_pair(make_foreign(response.first), response.second);
|
||||
|
||||
@@ -2076,56 +2076,6 @@ uint64_t region_group::top_region_evictable_space() const {
|
||||
return _regions.empty() ? 0 : _regions.top()->evictable_occupancy().total_space();
|
||||
}
|
||||
|
||||
void region_group::release_requests() noexcept {
|
||||
// The later() statement is here to avoid executing the function in update() context. But
|
||||
// also guarantees that we won't dominate the CPU if we have many requests to release.
|
||||
//
|
||||
// However, both with_gate() and later() can ultimately call to schedule() and consequently
|
||||
// allocate memory, which (if that allocation triggers a compaction - that frees memory) would
|
||||
// defeat the very purpose of not executing this on update() context. Allocations should be rare
|
||||
// on those but can happen, so we need to at least make sure they will not reclaim.
|
||||
//
|
||||
// Whatever comes after later() is already in a safe context, so we don't need to keep the lock
|
||||
// alive until we are done with the whole execution - only until later is successfully executed.
|
||||
tracker_reclaimer_lock rl;
|
||||
|
||||
_reclaimer.notify_relief();
|
||||
if (_descendant_blocked_requests) {
|
||||
_descendant_blocked_requests->set_value();
|
||||
}
|
||||
_descendant_blocked_requests = {};
|
||||
|
||||
if (_blocked_requests.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
with_gate(_asynchronous_gate, [this, rl = std::move(rl)] () mutable {
|
||||
return later().then([this] {
|
||||
// Check again, we may have executed release_requests() in this mean time from another entry
|
||||
// point (for instance, a descendant notification)
|
||||
if (_blocked_requests.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto blocked_at = do_for_each_parent(this, [] (auto rg) {
|
||||
return rg->execution_permitted() ? stop_iteration::no : stop_iteration::yes;
|
||||
});
|
||||
|
||||
if (!blocked_at) {
|
||||
auto req = std::move(_blocked_requests.front());
|
||||
_blocked_requests.pop_front();
|
||||
req->allocate();
|
||||
release_requests();
|
||||
} else {
|
||||
// If someone blocked us in the mean time then we can't execute. We need to make
|
||||
// sure that we are listening to notifications, though. It could be that we used to
|
||||
// be blocked on ourselves and now we are blocking on an ancestor
|
||||
subscribe_for_ancestor_available_memory_notification(blocked_at);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
region* region_group::get_largest_region() {
|
||||
if (!_maximal_rg || _maximal_rg->_regions.empty()) {
|
||||
return nullptr;
|
||||
@@ -2159,6 +2109,88 @@ region_group::del(region_impl* child) {
|
||||
update(-child->occupancy().total_space());
|
||||
}
|
||||
|
||||
bool
|
||||
region_group::execution_permitted() noexcept {
|
||||
return do_for_each_parent(this, [] (auto rg) {
|
||||
return rg->under_pressure() ? stop_iteration::yes : stop_iteration::no;
|
||||
}) == nullptr;
|
||||
}
|
||||
|
||||
future<>
|
||||
region_group::start_releaser() {
|
||||
return later().then([this] {
|
||||
return repeat([this] () noexcept {
|
||||
if (_shutdown_requested) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
|
||||
if (!_blocked_requests.empty() && execution_permitted()) {
|
||||
auto req = std::move(_blocked_requests.front());
|
||||
_blocked_requests.pop_front();
|
||||
req->allocate();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
// Block reclaiming to prevent signal() from being called by reclaimer inside wait()
|
||||
// FIXME: handle allocation failures (not very likely) like allocating_section does
|
||||
tracker_reclaimer_lock rl;
|
||||
return _relief.wait().then([] {
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
region_group::region_group(region_group *parent, region_group_reclaimer& reclaimer)
|
||||
: _parent(parent)
|
||||
, _reclaimer(reclaimer)
|
||||
, _releaser(reclaimer_can_block() ? start_releaser() : make_ready_future<>())
|
||||
{
|
||||
if (_parent) {
|
||||
_parent->add(this);
|
||||
}
|
||||
}
|
||||
|
||||
bool region_group::reclaimer_can_block() const {
|
||||
return _reclaimer.throttle_threshold() != std::numeric_limits<size_t>::max();
|
||||
}
|
||||
|
||||
void region_group::notify_relief() {
|
||||
_relief.signal();
|
||||
for (region_group* child : _subgroups) {
|
||||
child->notify_relief();
|
||||
}
|
||||
}
|
||||
|
||||
void region_group::update(ssize_t delta) {
|
||||
// Most-enclosing group which was relieved.
|
||||
region_group* top_relief = nullptr;
|
||||
|
||||
do_for_each_parent(this, [&top_relief, delta] (region_group* rg) mutable {
|
||||
rg->update_maximal_rg();
|
||||
rg->_total_memory += delta;
|
||||
|
||||
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_pressure();
|
||||
} else {
|
||||
rg->_reclaimer.notify_soft_relief();
|
||||
}
|
||||
|
||||
if (rg->_total_memory > rg->_reclaimer.throttle_threshold()) {
|
||||
rg->_reclaimer.notify_pressure();
|
||||
} else if (rg->_reclaimer.under_pressure()) {
|
||||
rg->_reclaimer.notify_relief();
|
||||
top_relief = rg;
|
||||
}
|
||||
|
||||
return stop_iteration::no;
|
||||
});
|
||||
|
||||
if (top_relief) {
|
||||
top_relief->notify_relief();
|
||||
}
|
||||
}
|
||||
|
||||
allocating_section::guard::guard()
|
||||
: _prev(shard_segment_pool.emergency_reserve_max())
|
||||
{ }
|
||||
|
||||
@@ -65,8 +65,20 @@ protected:
|
||||
size_t _soft_limit;
|
||||
bool _under_pressure = false;
|
||||
bool _under_soft_pressure = false;
|
||||
virtual void start_reclaiming() {}
|
||||
virtual void stop_reclaiming() {}
|
||||
// The following restrictions apply to implementations of start_reclaiming() and stop_reclaiming():
|
||||
//
|
||||
// - must not use any region or region_group objects, because they're invoked synchronously
|
||||
// with operations on those.
|
||||
//
|
||||
// - must be noexcept, because they're called on the free path.
|
||||
//
|
||||
// - the implementation may be called synchronously with any operation
|
||||
// which allocates memory, because these are called by memory reclaimer.
|
||||
// In particular, the implementation should not depend on memory allocation
|
||||
// because that may fail when in reclaiming context.
|
||||
//
|
||||
virtual void start_reclaiming() noexcept {}
|
||||
virtual void stop_reclaiming() noexcept {}
|
||||
public:
|
||||
bool under_pressure() const {
|
||||
return _under_pressure;
|
||||
@@ -76,32 +88,26 @@ public:
|
||||
return _under_soft_pressure;
|
||||
}
|
||||
|
||||
void notify_soft_pressure() {
|
||||
void notify_soft_pressure() noexcept {
|
||||
if (!_under_soft_pressure) {
|
||||
_under_soft_pressure = true;
|
||||
start_reclaiming();
|
||||
}
|
||||
}
|
||||
|
||||
void notify_soft_relief() {
|
||||
void notify_soft_relief() noexcept {
|
||||
if (_under_soft_pressure) {
|
||||
_under_soft_pressure = false;
|
||||
stop_reclaiming();
|
||||
}
|
||||
}
|
||||
|
||||
void notify_pressure() {
|
||||
if (!_under_pressure) {
|
||||
_under_pressure = true;
|
||||
start_reclaiming();
|
||||
}
|
||||
void notify_pressure() noexcept {
|
||||
_under_pressure = true;
|
||||
}
|
||||
|
||||
void notify_relief() {
|
||||
if (_under_pressure) {
|
||||
_under_pressure = false;
|
||||
stop_reclaiming();
|
||||
}
|
||||
void notify_relief() noexcept {
|
||||
_under_pressure = false;
|
||||
}
|
||||
|
||||
region_group_reclaimer()
|
||||
@@ -109,7 +115,9 @@ public:
|
||||
region_group_reclaimer(size_t threshold)
|
||||
: _threshold(threshold), _soft_limit(threshold) {}
|
||||
region_group_reclaimer(size_t threshold, size_t soft)
|
||||
: _threshold(threshold), _soft_limit(soft) {}
|
||||
: _threshold(threshold), _soft_limit(soft) {
|
||||
assert(_soft_limit <= _threshold);
|
||||
}
|
||||
|
||||
virtual ~region_group_reclaimer() {}
|
||||
|
||||
@@ -230,9 +238,13 @@ class region_group {
|
||||
// a different ancestor)
|
||||
std::experimental::optional<shared_promise<>> _descendant_blocked_requests = {};
|
||||
|
||||
region_group* _waiting_on_ancestor = nullptr;
|
||||
seastar::gate _asynchronous_gate;
|
||||
condition_variable _relief;
|
||||
future<> _releaser;
|
||||
bool _shutdown_requested = false;
|
||||
|
||||
bool reclaimer_can_block() const;
|
||||
future<> start_releaser();
|
||||
void notify_relief();
|
||||
public:
|
||||
// When creating a region_group, one can specify an optional throttle_threshold parameter. This
|
||||
// parameter won't affect normal allocations, but an API is provided, through the region_group's
|
||||
@@ -240,17 +252,13 @@ public:
|
||||
// the total memory for the region group (and all of its parents) is lower or equal to the
|
||||
// region_group's throttle_treshold (and respectively for its parents).
|
||||
region_group(region_group_reclaimer& reclaimer = no_reclaimer) : region_group(nullptr, reclaimer) {}
|
||||
region_group(region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer) : _parent(parent), _reclaimer(reclaimer) {
|
||||
if (_parent) {
|
||||
_parent->add(this);
|
||||
}
|
||||
}
|
||||
region_group(region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer);
|
||||
region_group(region_group&& o) = delete;
|
||||
region_group(const region_group&) = delete;
|
||||
~region_group() {
|
||||
// If we set a throttle threshold, we'd be postponing many operations. So shutdown must be
|
||||
// called.
|
||||
if (_reclaimer.throttle_threshold() != std::numeric_limits<size_t>::max()) {
|
||||
if (reclaimer_can_block()) {
|
||||
assert(_shutdown_requested);
|
||||
}
|
||||
if (_parent) {
|
||||
@@ -262,24 +270,7 @@ public:
|
||||
size_t memory_used() const {
|
||||
return _total_memory;
|
||||
}
|
||||
void update(ssize_t delta) {
|
||||
do_for_each_parent(this, [delta] (auto rg) mutable {
|
||||
rg->update_maximal_rg();
|
||||
rg->_total_memory += delta;
|
||||
// It is okay to call release_requests for a region_group that can't allow execution.
|
||||
// But that can generate various spurious messages to groups waiting on us that will be
|
||||
// then woken up just so they can go to wait again. So let's filter that.
|
||||
if (rg->execution_permitted()) {
|
||||
rg->release_requests();
|
||||
}
|
||||
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_pressure();
|
||||
} else if (rg->_total_memory < rg->_reclaimer.soft_limit_threshold()) {
|
||||
rg->_reclaimer.notify_soft_relief();
|
||||
}
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
void update(ssize_t delta);
|
||||
|
||||
// It would be easier to call update, but it is unfortunately broken in boost versions up to at
|
||||
// least 1.59.
|
||||
@@ -325,36 +316,18 @@ public:
|
||||
using futurator = futurize<std::result_of_t<Func()>>;
|
||||
|
||||
auto blocked_at = do_for_each_parent(this, [] (auto rg) {
|
||||
return (rg->_blocked_requests.empty() && rg->execution_permitted()) ? stop_iteration::no : stop_iteration::yes;
|
||||
return (rg->_blocked_requests.empty() && !rg->under_pressure()) ? stop_iteration::no : stop_iteration::yes;
|
||||
});
|
||||
|
||||
if (!blocked_at) {
|
||||
return futurator::apply(func);
|
||||
}
|
||||
subscribe_for_ancestor_available_memory_notification(blocked_at);
|
||||
|
||||
auto fn = std::make_unique<concrete_allocating_function<Func>>(std::forward<Func>(func));
|
||||
auto fut = fn->get_future();
|
||||
_blocked_requests.push_back(std::move(fn), timeout);
|
||||
++_blocked_requests_counter;
|
||||
|
||||
// This is called here, and not at update(), for two reasons: the first, is that things that
|
||||
// are done during the free() path should be done carefuly, in the sense that they can
|
||||
// trigger another update call and put us in a loop. Not to mention we would like to keep
|
||||
// those from having exceptions. We solve that for release_requests by using later(), but in
|
||||
// here we can do away with that need altogether.
|
||||
//
|
||||
// Second and most important, until we actually block a request, the pressure condition may
|
||||
// very well be transient. There are opportunities for compactions, the condition can go
|
||||
// away on its own, etc.
|
||||
//
|
||||
// The reason we check execution permitted(), is that we'll still block requests if we have
|
||||
// free memory but existing requests in the queue. That is so we can keep our FIFO ordering
|
||||
// guarantee. So we need to distinguish here the case in which we're blocking merely to
|
||||
// serialize requests, so that the caller does not evict more than it should.
|
||||
if (!blocked_at->execution_permitted()) {
|
||||
blocked_at->_reclaimer.notify_pressure();
|
||||
}
|
||||
return fut;
|
||||
}
|
||||
|
||||
@@ -364,9 +337,11 @@ public:
|
||||
region* get_largest_region();
|
||||
|
||||
// Shutdown is mandatory for every user who has set a threshold
|
||||
// Can be called at most once.
|
||||
future<> shutdown() {
|
||||
_shutdown_requested = true;
|
||||
return _asynchronous_gate.close();
|
||||
_relief.signal();
|
||||
return std::move(_releaser);
|
||||
}
|
||||
|
||||
size_t blocked_requests() {
|
||||
@@ -377,43 +352,9 @@ public:
|
||||
return _blocked_requests_counter;
|
||||
}
|
||||
private:
|
||||
// Make sure we get a notification and can call release_requests when one of our ancestors that
|
||||
// used to block us is no longer under memory pressure.
|
||||
void subscribe_for_ancestor_available_memory_notification(region_group *ancestor) {
|
||||
if ((this == ancestor) || (_waiting_on_ancestor)) {
|
||||
return; // already subscribed, or no need to
|
||||
}
|
||||
|
||||
_waiting_on_ancestor = ancestor;
|
||||
|
||||
with_gate(_asynchronous_gate, [this] {
|
||||
// We reevaluate _waiting_on_ancestor here so we make sure there is no deferring point
|
||||
// between determining the ancestor and registering with it for a notification. We start
|
||||
// with _waiting_on_ancestor set to the initial value, and after we are notified, we
|
||||
// will set _waiting_on_ancestor to nullptr to force this lambda to reevaluate it.
|
||||
auto evaluate_ancestor_and_stop = [this] {
|
||||
if (!_waiting_on_ancestor) {
|
||||
auto new_blocking_point = do_for_each_parent(this, [] (auto rg) {
|
||||
return (rg->execution_permitted()) ? stop_iteration::no : stop_iteration::yes;
|
||||
});
|
||||
if (!new_blocking_point) {
|
||||
release_requests();
|
||||
}
|
||||
_waiting_on_ancestor = (new_blocking_point == this) ? nullptr : new_blocking_point;
|
||||
}
|
||||
return _waiting_on_ancestor == nullptr;
|
||||
};
|
||||
|
||||
return do_until(evaluate_ancestor_and_stop, [this] {
|
||||
if (!_waiting_on_ancestor->_descendant_blocked_requests) {
|
||||
_waiting_on_ancestor->_descendant_blocked_requests = shared_promise<>();
|
||||
}
|
||||
return _waiting_on_ancestor->_descendant_blocked_requests->get_shared_future().then([this] {
|
||||
_waiting_on_ancestor = nullptr;
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
// Returns true if and only if constraints of this group are not violated.
|
||||
// That's taking into account any constraints imposed by enclosing (parent) groups.
|
||||
bool execution_permitted() noexcept;
|
||||
|
||||
// Executes the function func for each region_group upwards in the hierarchy, starting with the
|
||||
// parameter node. The function func may return stop_iteration::no, in which case it proceeds to
|
||||
@@ -433,11 +374,10 @@ private:
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
inline bool execution_permitted() const {
|
||||
return _total_memory <= _reclaimer.throttle_threshold();
|
||||
}
|
||||
|
||||
void release_requests() noexcept;
|
||||
inline bool under_pressure() const {
|
||||
return _reclaimer.under_pressure();
|
||||
}
|
||||
|
||||
uint64_t top_region_evictable_space() const;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user