Compare commits
63 Commits
next-3.3
...
branch-1.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c40fe36c6 | ||
|
|
1ea0af8acb | ||
|
|
f32437eb28 | ||
|
|
5216764245 | ||
|
|
05afbb38fb | ||
|
|
e8fcf0a6be | ||
|
|
34a712ddb9 | ||
|
|
71265a3691 | ||
|
|
deeabaea9c | ||
|
|
d2bf1535ea | ||
|
|
077252956f | ||
|
|
3bbd260a3e | ||
|
|
4905f8a49d | ||
|
|
ad2b7d1e8c | ||
|
|
80e4e2da38 | ||
|
|
29da6fa5b4 | ||
|
|
ad1af17aad | ||
|
|
7e052a4e91 | ||
|
|
787d8f88d3 | ||
|
|
d4e1a25858 | ||
|
|
5243c77fcb | ||
|
|
d8768257fe | ||
|
|
c2c4e842fc | ||
|
|
61d4bf3800 | ||
|
|
1df0e435e8 | ||
|
|
4e6f09ad95 | ||
|
|
426316a4b7 | ||
|
|
3289010910 | ||
|
|
52c9723e04 | ||
|
|
3cc91eeb84 | ||
|
|
d67ee37bbc | ||
|
|
fcbe43cc87 | ||
|
|
ef79310b3c | ||
|
|
f7e81c7b7d | ||
|
|
54224dfaa0 | ||
|
|
e30199119c | ||
|
|
07ce4ec032 | ||
|
|
c7c18d9c0c | ||
|
|
2a4582ab9f | ||
|
|
ab5e23f6e7 | ||
|
|
fecea15a25 | ||
|
|
dce549f44f | ||
|
|
26a3302957 | ||
|
|
f796d8081b | ||
|
|
b850cb991c | ||
|
|
734cfa949a | ||
|
|
3606e3ab29 | ||
|
|
014284de00 | ||
|
|
f17764e74a | ||
|
|
8643028d0c | ||
|
|
a35f1d765a | ||
|
|
3116a92b0e | ||
|
|
dad312ce0a | ||
|
|
3cae56f3e3 | ||
|
|
656a10c4b8 | ||
|
|
c04b3de564 | ||
|
|
4964fe4cf0 | ||
|
|
e3ad3cf7d9 | ||
|
|
cef40627a7 | ||
|
|
995820c08a | ||
|
|
b78abd7649 | ||
|
|
d47c62b51c | ||
|
|
bef19e7f9e |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.1.4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
11
auth/auth.cc
11
auth/auth.cc
@@ -97,7 +97,7 @@ namespace std {
|
||||
template <>
|
||||
struct hash<auth::data_resource> {
|
||||
size_t operator()(const auth::data_resource & v) const {
|
||||
return std::hash<sstring>()(v.name());
|
||||
return v.hash_value();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -354,9 +354,12 @@ future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
|
||||
::shared_ptr<cql3::statements::create_table_statement> statement =
|
||||
static_pointer_cast<cql3::statements::create_table_statement>(
|
||||
parsed->prepare(db)->statement);
|
||||
// Origin sets "Legacy Cf Id" for the new table. We have no need to be
|
||||
// pre-2.1 compatible (afaik), so lets skip a whole lotta hoolaballo
|
||||
return statement->announce_migration(qp.proxy(), false).then([statement](bool) {});
|
||||
auto schema = statement->get_cf_meta_data();
|
||||
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
|
||||
|
||||
schema_builder b(schema);
|
||||
b.set_uuid(uuid);
|
||||
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
|
||||
}
|
||||
|
||||
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/hash.hh"
|
||||
#include <iosfwd>
|
||||
#include <set>
|
||||
#include <seastar/core/sstring.hh>
|
||||
@@ -137,6 +138,10 @@ public:
|
||||
|
||||
bool operator==(const data_resource&) const;
|
||||
bool operator<(const data_resource&) const;
|
||||
|
||||
size_t hash_value() const {
|
||||
return utils::tuple_hash()(_ks, _cf);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -162,6 +162,7 @@ modes = {
|
||||
|
||||
scylla_tests = [
|
||||
'tests/mutation_test',
|
||||
'tests/schema_registry_test',
|
||||
'tests/canonical_mutation_test',
|
||||
'tests/range_test',
|
||||
'tests/types_test',
|
||||
|
||||
@@ -432,10 +432,9 @@ void query_processor::migration_subscriber::on_update_keyspace(const sstring& ks
|
||||
|
||||
void query_processor::migration_subscriber::on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed)
|
||||
{
|
||||
if (columns_changed) {
|
||||
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
|
||||
remove_invalid_prepared_statements(ks_name, cf_name);
|
||||
}
|
||||
// #1255: Ignoring columns_changed deliberately.
|
||||
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
|
||||
remove_invalid_prepared_statements(ks_name, cf_name);
|
||||
}
|
||||
|
||||
void query_processor::migration_subscriber::on_update_user_type(const sstring& ks_name, const sstring& type_name)
|
||||
|
||||
@@ -162,7 +162,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
|
||||
}
|
||||
|
||||
std::experimental::optional<sstring> tmp_value = {};
|
||||
if (has_property(KW_MINCOMPACTIONTHRESHOLD)) {
|
||||
if (has_property(KW_COMPACTION)) {
|
||||
if (get_compaction_options().count(KW_MINCOMPACTIONTHRESHOLD)) {
|
||||
tmp_value = get_compaction_options().at(KW_MINCOMPACTIONTHRESHOLD);
|
||||
}
|
||||
@@ -170,7 +170,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
|
||||
int min_compaction_threshold = to_int(KW_MINCOMPACTIONTHRESHOLD, tmp_value, builder.get_min_compaction_threshold());
|
||||
|
||||
tmp_value = {};
|
||||
if (has_property(KW_MAXCOMPACTIONTHRESHOLD)) {
|
||||
if (has_property(KW_COMPACTION)) {
|
||||
if (get_compaction_options().count(KW_MAXCOMPACTIONTHRESHOLD)) {
|
||||
tmp_value = get_compaction_options().at(KW_MAXCOMPACTIONTHRESHOLD);
|
||||
}
|
||||
|
||||
190
database.cc
190
database.cc
@@ -239,15 +239,24 @@ public:
|
||||
|
||||
mutation_reader
|
||||
column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr, const io_priority_class& pc) const {
|
||||
// restricts a reader's concurrency if the configuration specifies it
|
||||
auto restrict_reader = [&] (mutation_reader&& in) {
|
||||
if (_config.read_concurrency_config.sem) {
|
||||
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
|
||||
} else {
|
||||
return std::move(in);
|
||||
}
|
||||
};
|
||||
|
||||
if (pr.is_singular() && pr.start()->value().has_key()) {
|
||||
const dht::ring_position& pos = pr.start()->value();
|
||||
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
|
||||
return make_empty_reader(); // range doesn't belong to this shard
|
||||
}
|
||||
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), pc);
|
||||
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), pc));
|
||||
} else {
|
||||
// range_sstable_reader is not movable so we need to wrap it
|
||||
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, pc);
|
||||
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, pc));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -475,12 +484,75 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
|
||||
return (s1 <= me) && (me <= s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
|
||||
auto key_shard = [&s] (const partition_key& pk) {
|
||||
auto token = dht::global_partitioner().get_token(s, pk);
|
||||
return dht::shard_of(token);
|
||||
};
|
||||
auto s1 = key_shard(first);
|
||||
auto s2 = key_shard(last);
|
||||
auto me = engine().cpu_id();
|
||||
return (s1 != me) || (me != s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
|
||||
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, r)) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
|
||||
sst->mark_for_deletion();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
|
||||
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
|
||||
if (in_other_shard) {
|
||||
// If we're here, this sstable is shared by this and other
|
||||
// shard(s). Shared sstables cannot be deleted until all
|
||||
// shards compacted them, so to reduce disk space usage we
|
||||
// want to start splitting them now.
|
||||
// However, we need to delay this compaction until we read all
|
||||
// the sstables belonging to this CF, because we need all of
|
||||
// them to know which tombstones we can drop, and what
|
||||
// generation number is free.
|
||||
_sstables_need_rewrite.push_back(sst);
|
||||
}
|
||||
if (reset_level) {
|
||||
// When loading a migrated sstable, set level to 0 because
|
||||
// it may overlap with existing tables in levels > 0.
|
||||
// This step is optional, because even if we didn't do this
|
||||
// scylla would detect the overlap, and bring back some of
|
||||
// the sstables to level 0.
|
||||
sst->set_sstable_level(0);
|
||||
}
|
||||
add_sstable(sst);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// load_sstable() wants to start rewriting sstables which are shared between
|
||||
// several shards, but we can't start any compaction before all the sstables
|
||||
// of this CF were loaded. So call this function to start rewrites, if any.
|
||||
void column_family::start_rewrite() {
|
||||
for (auto sst : _sstables_need_rewrite) {
|
||||
dblog.info("Splitting {} for shard", sst->get_filename());
|
||||
_compaction_manager.submit_sstable_rewrite(this, sst);
|
||||
}
|
||||
_sstables_need_rewrite.clear();
|
||||
}
|
||||
|
||||
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
|
||||
|
||||
using namespace sstables;
|
||||
@@ -505,24 +577,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
|
||||
}
|
||||
}
|
||||
|
||||
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
auto fut = sst->get_sstable_key_range(*_schema);
|
||||
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, std::move(r))) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring",
|
||||
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
|
||||
sstables::sstable::component_type::Data));
|
||||
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto fut = sst->load();
|
||||
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
|
||||
add_sstable(std::move(*sst));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then_wrapped([fname, comps] (future<> f) {
|
||||
return load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
|
||||
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (malformed_sstable_exception& e) {
|
||||
@@ -971,19 +1028,14 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
|
||||
future<>
|
||||
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
|
||||
return parallel_for_each(new_tables, [this] (auto comps) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
|
||||
return sst->load().then([this, sst] {
|
||||
return sst->mutate_sstable_level(0);
|
||||
}).then([this, sst] {
|
||||
auto first = sst->get_first_partition_key(*_schema);
|
||||
auto last = sst->get_last_partition_key(*_schema);
|
||||
if (belongs_to_current_shard(*_schema, first, last)) {
|
||||
this->add_sstable(sst);
|
||||
} else {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return this->load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), _config.datadir,
|
||||
comps.generation, comps.version, comps.format), true);
|
||||
}).then([this] {
|
||||
start_rewrite();
|
||||
// Drop entire cache for this column family because it may be populated
|
||||
// with stale data.
|
||||
return get_row_cache().clear();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1269,6 +1321,38 @@ database::setup_collectd() {
|
||||
, "total_operations", "total_reads")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats->total_reads)
|
||||
));
|
||||
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "sstable_read_queue_overloads")
|
||||
, scollectd::make_typed(scollectd::data_type::COUNTER, _stats->sstable_read_queue_overloaded)
|
||||
));
|
||||
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "active_reads")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return max_concurrent_reads() - _read_concurrency_sem.current(); })
|
||||
));
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "queued_reads")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _read_concurrency_sem.waiters(); })
|
||||
));
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "active_reads_system_keyspace")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return max_system_concurrent_reads() - _system_read_concurrency_sem.current(); })
|
||||
));
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("database"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "queued_reads_system_keyspace")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _system_read_concurrency_sem.waiters(); })
|
||||
));
|
||||
}
|
||||
|
||||
database::~database() {
|
||||
@@ -1643,6 +1727,7 @@ keyspace::make_column_family_config(const schema& s) const {
|
||||
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
|
||||
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group;
|
||||
cfg.read_concurrency_config = _config.read_concurrency_config;
|
||||
cfg.cf_stats = _config.cf_stats;
|
||||
cfg.enable_incremental_backups = _config.enable_incremental_backups;
|
||||
|
||||
@@ -2095,6 +2180,14 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
}
|
||||
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group;
|
||||
cfg.read_concurrency_config.sem = &_read_concurrency_sem;
|
||||
cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms;
|
||||
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
|
||||
cfg.read_concurrency_config.max_queue_length = memory::stats().total_memory() * 0.02 / 1000;
|
||||
cfg.read_concurrency_config.raise_queue_overloaded_exception = [this] {
|
||||
++_stats->sstable_read_queue_overloaded;
|
||||
throw std::runtime_error("sstable inactive read queue overloaded");
|
||||
};
|
||||
cfg.cf_stats = &_cf_stats;
|
||||
cfg.enable_incremental_backups = _enable_incremental_backups;
|
||||
return cfg;
|
||||
@@ -2186,7 +2279,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
f = cf.flush();
|
||||
} else {
|
||||
cf.clear();
|
||||
f = cf.clear();
|
||||
}
|
||||
|
||||
return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable {
|
||||
@@ -2562,21 +2655,24 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
|
||||
// temporary counter measure.
|
||||
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
|
||||
return seal_active_streaming_memtable_delayed().finally([this, ranges = std::move(ranges)] {
|
||||
if (_config.enable_cache) {
|
||||
for (auto& range : ranges) {
|
||||
_cache.invalidate(range);
|
||||
}
|
||||
if (!_config.enable_cache) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_with(std::move(ranges), [this] (auto& ranges) {
|
||||
return parallel_for_each(ranges, [this](auto&& range) {
|
||||
return _cache.invalidate(range);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::clear() {
|
||||
_cache.clear();
|
||||
future<> column_family::clear() {
|
||||
_memtables->clear();
|
||||
_memtables->add_memtable();
|
||||
_streaming_memtables->clear();
|
||||
_streaming_memtables->add_memtable();
|
||||
return _cache.clear();
|
||||
}
|
||||
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
@@ -2603,13 +2699,13 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
|
||||
_sstables = std::move(pruned);
|
||||
dblog.debug("cleaning out row cache");
|
||||
_cache.clear();
|
||||
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
23
database.hh
23
database.hh
@@ -249,6 +249,7 @@ public:
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
@@ -309,6 +310,11 @@ private:
|
||||
// have not been deleted yet, so must not GC any tombstones in other sstables
|
||||
// that may delete data in these sstables:
|
||||
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
|
||||
// sstables that are shared between several shards so we want to rewrite
|
||||
// them (split the data belonging to this shard to a separate sstable),
|
||||
// but for correct compaction we need to start the compaction only after
|
||||
// reading all sstables.
|
||||
std::vector<sstables::shared_sstable> _sstables_need_rewrite;
|
||||
// Control background fibers waiting for sstables to be deleted
|
||||
seastar::gate _sstable_deletion_gate;
|
||||
// There are situations in which we need to stop writing sstables. Flushers will take
|
||||
@@ -339,6 +345,7 @@ private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
|
||||
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
lw_shared_ptr<memtable> new_streaming_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
@@ -456,7 +463,7 @@ public:
|
||||
future<> flush();
|
||||
future<> flush(const db::replay_position&);
|
||||
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
|
||||
void clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<> clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<db::replay_position> discard_sstables(db_clock::time_point);
|
||||
|
||||
// Important warning: disabling writes will only have an effect in the current shard.
|
||||
@@ -618,6 +625,7 @@ private:
|
||||
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
|
||||
void check_valid_rp(const db::replay_position&) const;
|
||||
public:
|
||||
void start_rewrite();
|
||||
// Iterate over all partitions. Protocol is the same as std::all_of(),
|
||||
// so that iteration can be stopped by returning false.
|
||||
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
|
||||
@@ -727,6 +735,7 @@ public:
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
private:
|
||||
@@ -792,15 +801,24 @@ public:
|
||||
|
||||
class database {
|
||||
::cf_stats _cf_stats;
|
||||
static constexpr size_t max_concurrent_reads() { return 100; }
|
||||
static constexpr size_t max_system_concurrent_reads() { return 10; }
|
||||
struct db_stats {
|
||||
uint64_t total_writes = 0;
|
||||
uint64_t total_reads = 0;
|
||||
uint64_t sstable_read_queue_overloaded = 0;
|
||||
};
|
||||
|
||||
lw_shared_ptr<db_stats> _stats;
|
||||
|
||||
logalloc::region_group _dirty_memory_region_group;
|
||||
logalloc::region_group _streaming_dirty_memory_region_group;
|
||||
|
||||
semaphore _read_concurrency_sem{max_concurrent_reads()};
|
||||
restricted_mutation_reader_config _read_concurrency_config;
|
||||
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
|
||||
restricted_mutation_reader_config _system_read_concurrency_config;
|
||||
|
||||
std::unordered_map<sstring, keyspace> _keyspaces;
|
||||
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
|
||||
std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid;
|
||||
@@ -948,6 +966,9 @@ public:
|
||||
std::unordered_set<sstring> get_initial_tokens();
|
||||
std::experimental::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
semaphore& system_keyspace_read_concurrency_sem() {
|
||||
return _system_read_concurrency_sem;
|
||||
}
|
||||
};
|
||||
|
||||
// FIXME: stub
|
||||
|
||||
@@ -709,15 +709,21 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
std::map<qualified_name, schema_mutations>&& before,
|
||||
std::map<qualified_name, schema_mutations>&& after)
|
||||
{
|
||||
struct dropped_table {
|
||||
global_schema_ptr schema;
|
||||
utils::joinpoint<db_clock::time_point> jp{[] {
|
||||
return make_ready_future<db_clock::time_point>(db_clock::now());
|
||||
}};
|
||||
};
|
||||
std::vector<global_schema_ptr> created;
|
||||
std::vector<global_schema_ptr> altered;
|
||||
std::vector<global_schema_ptr> dropped;
|
||||
std::vector<dropped_table> dropped;
|
||||
|
||||
auto diff = difference(before, after);
|
||||
for (auto&& key : diff.entries_only_on_left) {
|
||||
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
|
||||
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
|
||||
dropped.emplace_back(s);
|
||||
dropped.emplace_back(dropped_table{s});
|
||||
}
|
||||
for (auto&& key : diff.entries_only_on_right) {
|
||||
auto s = create_table_from_mutations(after.at(key));
|
||||
@@ -730,9 +736,7 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
altered.emplace_back(s);
|
||||
}
|
||||
|
||||
do_with(utils::make_joinpoint([] { return db_clock::now();})
|
||||
, [&created, &dropped, &altered, &proxy](auto& tsf) {
|
||||
return proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, &tsf] (database& db) {
|
||||
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered] (database& db) {
|
||||
return seastar::async([&] {
|
||||
for (auto&& gs : created) {
|
||||
schema_ptr s = gs.get();
|
||||
@@ -747,14 +751,13 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
for (auto&& gs : altered) {
|
||||
update_column_family(db, gs.get()).get();
|
||||
}
|
||||
parallel_for_each(dropped.begin(), dropped.end(), [&db, &tsf](auto&& gs) {
|
||||
schema_ptr s = gs.get();
|
||||
return db.drop_column_family(s->ks_name(), s->cf_name(), [&tsf] { return tsf.value(); }).then([s] {
|
||||
parallel_for_each(dropped.begin(), dropped.end(), [&db](dropped_table& dt) {
|
||||
schema_ptr s = dt.schema.get();
|
||||
return db.drop_column_family(s->ks_name(), s->cf_name(), [&dt] { return dt.jp.value(); }).then([s] {
|
||||
return service::get_local_migration_manager().notify_drop_column_family(s);
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
|
||||
|
||||
@@ -1022,6 +1022,10 @@ void make(database& db, bool durable, bool volatile_testing_only) {
|
||||
kscfg.enable_disk_writes = !volatile_testing_only;
|
||||
kscfg.enable_commitlog = !volatile_testing_only;
|
||||
kscfg.enable_cache = true;
|
||||
// don't make system keyspace reads wait for user reads
|
||||
kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem();
|
||||
kscfg.read_concurrency_config.timeout = {};
|
||||
kscfg.read_concurrency_config.max_queue_length = std::numeric_limits<size_t>::max();
|
||||
keyspace _ks{ksm, std::move(kscfg)};
|
||||
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
|
||||
_ks.set_replication_strategy(std::move(rs));
|
||||
|
||||
@@ -87,6 +87,10 @@ public:
|
||||
// [0x00, 0x80] == 1/512
|
||||
// [0xff, 0x80] == 1 - 1/512
|
||||
managed_bytes _data;
|
||||
|
||||
token() : _kind(kind::before_all_keys) {
|
||||
}
|
||||
|
||||
token(kind k, managed_bytes d) : _kind(std::move(k)), _data(std::move(d)) {
|
||||
}
|
||||
|
||||
|
||||
@@ -36,6 +36,8 @@ extern thread_local disk_error_signal_type sstable_read_error;
|
||||
extern thread_local disk_error_signal_type sstable_write_error;
|
||||
extern thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
bool should_stop_on_system_error(const std::system_error& e);
|
||||
|
||||
template<typename Func, typename... Args>
|
||||
std::enable_if_t<!is_future<std::result_of_t<Func(Args&&...)>>::value,
|
||||
std::result_of_t<Func(Args&&...)>>
|
||||
@@ -44,7 +46,7 @@ do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
// calling function
|
||||
return func(std::forward<Args>(args)...);
|
||||
} catch (std::system_error& e) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(e)) {
|
||||
signal();
|
||||
throw storage_io_error(e);
|
||||
}
|
||||
@@ -62,7 +64,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (std::system_error& sys_err) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(sys_err)) {
|
||||
signal();
|
||||
throw storage_io_error(sys_err);
|
||||
}
|
||||
@@ -70,7 +72,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
return futurize<std::result_of_t<Func(Args&&...)>>::make_exception_future(ep);
|
||||
});
|
||||
} catch (std::system_error& e) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(e)) {
|
||||
signal();
|
||||
throw storage_io_error(e);
|
||||
}
|
||||
|
||||
2
dist/common/scripts/scylla_sysconfig_setup
vendored
2
dist/common/scripts/scylla_sysconfig_setup
vendored
@@ -58,7 +58,7 @@ while [ $# -gt 0 ]; do
|
||||
shift 2
|
||||
;;
|
||||
"--setup-nic")
|
||||
SET_NIC=yes
|
||||
SETUP_NIC=1
|
||||
shift 1
|
||||
;;
|
||||
"--ami")
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -2,8 +2,8 @@ FROM centos:7
|
||||
|
||||
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.1.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN yum -y install epel-release
|
||||
ADD scylla.repo /etc/yum.repos.d/
|
||||
RUN yum -y clean expire-cache
|
||||
RUN yum -y update
|
||||
RUN yum -y remove boost-thread boost-system
|
||||
|
||||
23
dist/docker/redhat/scylla.repo
vendored
23
dist/docker/redhat/scylla.repo
vendored
@@ -1,23 +0,0 @@
|
||||
[scylla]
|
||||
name=Scylla for Centos $releasever - $basearch
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/$basearch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
|
||||
[scylla-generic]
|
||||
name=Scylla for centos $releasever
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/noarch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
|
||||
[scylla-3rdparty]
|
||||
name=Scylla 3rdParty for Centos $releasever - $basearch
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/$basearch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
|
||||
[scylla-3rdparty-generic]
|
||||
name=Scylla 3rdParty for Centos $releasever
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/noarch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
@@ -1590,12 +1590,12 @@ bool gossiper::is_alive(inet_address ep) {
|
||||
if (ep == get_broadcast_address()) {
|
||||
return true;
|
||||
}
|
||||
auto eps = get_endpoint_state_for_endpoint(ep);
|
||||
auto it = endpoint_state_map.find(ep);
|
||||
// we could assert not-null, but having isAlive fail screws a node over so badly that
|
||||
// it's worth being defensive here so minor bugs don't cause disproportionate
|
||||
// badness. (See CASSANDRA-1463 for an example).
|
||||
if (eps) {
|
||||
return eps->is_alive();
|
||||
if (it != endpoint_state_map.end()) {
|
||||
return it->second.is_alive();
|
||||
} else {
|
||||
logger.warn("unknown endpoint {}", ep);
|
||||
return false;
|
||||
|
||||
15
init.cc
15
init.cc
@@ -64,18 +64,23 @@ void init_ms_fd_gossiper(sstring listen_address
|
||||
}
|
||||
|
||||
future<> f = make_ready_future<>();
|
||||
::shared_ptr<server_credentials> creds;
|
||||
std::shared_ptr<credentials_builder> creds;
|
||||
|
||||
if (ew != encrypt_what::none) {
|
||||
// note: credentials are immutable after this, and ok to share across shards
|
||||
creds = ::make_shared<server_credentials>(::make_shared<dh_params>(dh_params::level::MEDIUM));
|
||||
creds = std::make_shared<credentials_builder>();
|
||||
creds->set_dh_level(dh_params::level::MEDIUM);
|
||||
|
||||
creds->set_x509_key_file(ms_cert, ms_key, x509_crt_format::PEM).get();
|
||||
ms_trust_store.empty() ? creds->set_system_trust().get() :
|
||||
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
|
||||
if (ms_trust_store.empty()) {
|
||||
creds->set_system_trust().get();
|
||||
} else {
|
||||
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
|
||||
}
|
||||
}
|
||||
|
||||
// Init messaging_service
|
||||
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return net::get_messaging_service().stop(); });
|
||||
// Init failure_detector
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
#include "log.hh"
|
||||
#include <unordered_map>
|
||||
#include <algorithm>
|
||||
#include <boost/icl/interval.hpp>
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
|
||||
namespace locator {
|
||||
|
||||
@@ -339,25 +341,65 @@ range<token> token_metadata::get_primary_range_for(token right) {
|
||||
return get_primary_ranges_for({right}).front();
|
||||
}
|
||||
|
||||
boost::icl::interval<token>::interval_type
|
||||
token_metadata::range_to_interval(range<dht::token> r) {
|
||||
bool start_inclusive = false;
|
||||
bool end_inclusive = false;
|
||||
token start = dht::minimum_token();
|
||||
token end = dht::maximum_token();
|
||||
|
||||
if (r.start()) {
|
||||
start = r.start()->value();
|
||||
start_inclusive = r.start()->is_inclusive();
|
||||
}
|
||||
|
||||
if (r.end()) {
|
||||
end = r.end()->value();
|
||||
end_inclusive = r.end()->is_inclusive();
|
||||
}
|
||||
|
||||
if (start_inclusive == false && end_inclusive == false) {
|
||||
return boost::icl::interval<token>::open(std::move(start), std::move(end));
|
||||
} else if (start_inclusive == false && end_inclusive == true) {
|
||||
return boost::icl::interval<token>::left_open(std::move(start), std::move(end));
|
||||
} else if (start_inclusive == true && end_inclusive == false) {
|
||||
return boost::icl::interval<token>::right_open(std::move(start), std::move(end));
|
||||
} else {
|
||||
return boost::icl::interval<token>::closed(std::move(start), std::move(end));
|
||||
}
|
||||
}
|
||||
|
||||
void token_metadata::set_pending_ranges(const sstring& keyspace_name,
|
||||
std::unordered_multimap<range<token>, inet_address> new_pending_ranges) {
|
||||
if (new_pending_ranges.empty()) {
|
||||
_pending_ranges.erase(keyspace_name);
|
||||
_pending_ranges_map.erase(keyspace_name);
|
||||
_pending_ranges_interval_map.erase(keyspace_name);
|
||||
return;
|
||||
}
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>> map;
|
||||
for (const auto& x : new_pending_ranges) {
|
||||
map[x.first].emplace(x.second);
|
||||
}
|
||||
|
||||
// construct a interval map to speed up the search
|
||||
_pending_ranges_interval_map[keyspace_name] = {};
|
||||
for (const auto& m : map) {
|
||||
_pending_ranges_interval_map[keyspace_name] +=
|
||||
std::make_pair(range_to_interval(m.first), m.second);
|
||||
}
|
||||
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
|
||||
_pending_ranges_map[keyspace_name] = std::move(map);
|
||||
}
|
||||
|
||||
std::unordered_multimap<range<token>, inet_address>&
|
||||
token_metadata::get_pending_ranges_mm(sstring keyspace_name) {
|
||||
return _pending_ranges[keyspace_name];
|
||||
}
|
||||
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>>
|
||||
const std::unordered_map<range<token>, std::unordered_set<inet_address>>&
|
||||
token_metadata::get_pending_ranges(sstring keyspace_name) {
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>> ret;
|
||||
for (auto x : get_pending_ranges_mm(keyspace_name)) {
|
||||
auto& range_token = x.first;
|
||||
auto& ep = x.second;
|
||||
auto it = ret.find(range_token);
|
||||
if (it != ret.end()) {
|
||||
it->second.emplace(ep);
|
||||
} else {
|
||||
ret.emplace(range_token, std::unordered_set<inet_address>{ep});
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return _pending_ranges_map[keyspace_name];
|
||||
}
|
||||
|
||||
std::vector<range<token>>
|
||||
@@ -378,7 +420,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
|
||||
|
||||
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) {
|
||||
logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name);
|
||||
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
|
||||
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -463,7 +505,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
|
||||
all_left_metadata.remove_endpoint(endpoint);
|
||||
}
|
||||
|
||||
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
|
||||
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
|
||||
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
logger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : print_pending_ranges()));
|
||||
@@ -508,14 +550,23 @@ void token_metadata::add_moving_endpoint(token t, inet_address endpoint) {
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) {
|
||||
// Fast path 0: no pending ranges at all
|
||||
if (_pending_ranges_interval_map.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// Fast path 1: no pending ranges for this keyspace_name
|
||||
if (_pending_ranges_interval_map[keyspace_name].empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// Slow path: lookup pending ranges
|
||||
std::vector<gms::inet_address> endpoints;
|
||||
auto ranges = get_pending_ranges(keyspace_name);
|
||||
for (auto& x : ranges) {
|
||||
if (x.first.contains(token, dht::token_comparator())) {
|
||||
for (auto& addr : x.second) {
|
||||
endpoints.push_back(addr);
|
||||
}
|
||||
}
|
||||
auto interval = range_to_interval(range<dht::token>(token));
|
||||
auto it = _pending_ranges_interval_map[keyspace_name].find(interval);
|
||||
if (it != _pending_ranges_interval_map[keyspace_name].end()) {
|
||||
// interval_map does not work with std::vector, convert to std::vector of ips
|
||||
endpoints = std::vector<gms::inet_address>(it->second.begin(), it->second.end());
|
||||
}
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
@@ -46,6 +46,8 @@
|
||||
#include "utils/UUID.hh"
|
||||
#include <experimental/optional>
|
||||
#include <boost/range/iterator_range.hpp>
|
||||
#include <boost/icl/interval.hpp>
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
#include "query-request.hh"
|
||||
#include "range.hh"
|
||||
|
||||
@@ -144,6 +146,8 @@ private:
|
||||
std::unordered_map<token, inet_address> _moving_endpoints;
|
||||
|
||||
std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> _pending_ranges;
|
||||
std::unordered_map<sstring, std::unordered_map<range<token>, std::unordered_set<inet_address>>> _pending_ranges_map;
|
||||
std::unordered_map<sstring, boost::icl::interval_map<token, std::unordered_set<inet_address>>> _pending_ranges_interval_map;
|
||||
|
||||
std::vector<token> _sorted_tokens;
|
||||
|
||||
@@ -608,13 +612,15 @@ public:
|
||||
std::vector<range<token>> get_primary_ranges_for(std::unordered_set<token> tokens);
|
||||
|
||||
range<token> get_primary_range_for(token right);
|
||||
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
|
||||
|
||||
private:
|
||||
std::unordered_multimap<range<token>, inet_address>& get_pending_ranges_mm(sstring keyspace_name);
|
||||
void set_pending_ranges(const sstring& keyspace_name, std::unordered_multimap<range<token>, inet_address> new_pending_ranges);
|
||||
|
||||
public:
|
||||
/** a mutable map may be returned but caller should not modify it */
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>> get_pending_ranges(sstring keyspace_name);
|
||||
const std::unordered_map<range<token>, std::unordered_set<inet_address>>& get_pending_ranges(sstring keyspace_name);
|
||||
|
||||
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint);
|
||||
/**
|
||||
|
||||
17
main.cc
17
main.cc
@@ -516,6 +516,18 @@ int main(int ac, char** av) {
|
||||
}
|
||||
return db.load_sstables(proxy);
|
||||
}).get();
|
||||
// If the same sstable is shared by several shards, it cannot be
|
||||
// deleted until all shards decide to compact it. So we want to
|
||||
// start thse compactions now. Note we start compacting only after
|
||||
// all sstables in this CF were loaded on all shards - otherwise
|
||||
// we will have races between the compaction and loading processes
|
||||
db.invoke_on_all([&proxy] (database& db) {
|
||||
for (auto& x : db.get_column_families()) {
|
||||
column_family& cf = *(x.second);
|
||||
// We start the rewrite, but do not wait for it.
|
||||
cf.start_rewrite();
|
||||
}
|
||||
}).get();
|
||||
supervisor_notify("setting up system keyspace");
|
||||
db::system_keyspace::setup(db, qp).get();
|
||||
supervisor_notify("starting commit log");
|
||||
@@ -592,6 +604,11 @@ int main(int ac, char** av) {
|
||||
engine().at_exit([] {
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
});
|
||||
engine().at_exit([&db] {
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
});
|
||||
});
|
||||
}).or_terminate();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -225,7 +225,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, uint16_t port
|
||||
, encrypt_what ew
|
||||
, uint16_t ssl_port
|
||||
, ::shared_ptr<seastar::tls::server_credentials> credentials
|
||||
, std::shared_ptr<seastar::tls::credentials_builder> credentials
|
||||
)
|
||||
: _listen_address(ip)
|
||||
, _port(port)
|
||||
@@ -233,7 +233,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, _encrypt_what(ew)
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
|
||||
, _credentials(std::move(credentials))
|
||||
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
|
||||
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
|
||||
if (_encrypt_what == encrypt_what::none) {
|
||||
return nullptr;
|
||||
@@ -255,6 +255,13 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
|
||||
return rpc::no_wait;
|
||||
});
|
||||
// Do this on just cpu 0, to avoid duplicate logs.
|
||||
if (engine().cpu_id() == 0) {
|
||||
if (_server_tls) {
|
||||
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
|
||||
}
|
||||
logger.info("Starting Messaging Service on port {}", _port);
|
||||
}
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
|
||||
@@ -186,7 +186,7 @@ public:
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
|
||||
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
|
||||
uint16_t ssl_port, ::shared_ptr<seastar::tls::server_credentials>);
|
||||
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
|
||||
~messaging_service();
|
||||
public:
|
||||
uint16_t port();
|
||||
|
||||
@@ -706,6 +706,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|
||||
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
|
||||
pw.retract();
|
||||
} else {
|
||||
pw.row_count() += row_count ? : 1;
|
||||
std::move(rows_wr).end_rows().end_qr_partition();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,3 +218,42 @@ public:
|
||||
mutation_reader make_empty_reader() {
|
||||
return make_mutation_reader<empty_reader>();
|
||||
}
|
||||
|
||||
|
||||
class restricting_mutation_reader : public mutation_reader::impl {
|
||||
const restricted_mutation_reader_config& _config;
|
||||
unsigned _weight = 0;
|
||||
bool _waited = false;
|
||||
mutation_reader _base;
|
||||
public:
|
||||
restricting_mutation_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base)
|
||||
: _config(config), _weight(weight), _base(std::move(base)) {
|
||||
if (_config.sem->waiters() >= _config.max_queue_length) {
|
||||
_config.raise_queue_overloaded_exception();
|
||||
}
|
||||
}
|
||||
~restricting_mutation_reader() {
|
||||
if (_waited) {
|
||||
_config.sem->signal(_weight);
|
||||
}
|
||||
}
|
||||
future<mutation_opt> operator()() override {
|
||||
// FIXME: we should defer freeing until the mutation is freed, perhaps,
|
||||
// rather than just returned
|
||||
if (_waited) {
|
||||
return _base();
|
||||
}
|
||||
auto waited = _config.timeout.count() != 0
|
||||
? _config.sem->wait(_config.timeout, _weight)
|
||||
: _config.sem->wait(_weight);
|
||||
return waited.then([this] {
|
||||
_waited = true;
|
||||
return _base();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
mutation_reader
|
||||
make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) {
|
||||
return make_mutation_reader<restricting_mutation_reader>(config, weight, std::move(base));
|
||||
}
|
||||
|
||||
@@ -85,6 +85,23 @@ mutation_reader make_empty_reader();
|
||||
// when creating the reader involves disk I/O or a shard call
|
||||
mutation_reader make_lazy_reader(std::function<mutation_reader ()> make_reader);
|
||||
|
||||
struct restricted_mutation_reader_config {
|
||||
semaphore* sem = nullptr;
|
||||
std::chrono::nanoseconds timeout = {};
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max();
|
||||
std::function<void ()> raise_queue_overloaded_exception = default_raise_queue_overloaded_exception;
|
||||
|
||||
static void default_raise_queue_overloaded_exception() {
|
||||
throw std::runtime_error("restricted mutation reader queue overload");
|
||||
}
|
||||
};
|
||||
|
||||
// Restricts a given `mutation_reader` to a concurrency limited according to settings in
|
||||
// a restricted_mutation_reader_config. These settings include a semaphore for limiting the number
|
||||
// of active concurrent readers, a timeout for inactive readers, and a maximum queue size for
|
||||
// inactive readers.
|
||||
mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base);
|
||||
|
||||
template <typename MutationFilter>
|
||||
class filtering_reader : public mutation_reader::impl {
|
||||
mutation_reader _rd;
|
||||
|
||||
@@ -50,6 +50,7 @@ class result::partition_writer {
|
||||
bool _static_row_added = false;
|
||||
md5_hasher& _digest;
|
||||
md5_hasher _digest_pos;
|
||||
uint32_t& _row_count;
|
||||
public:
|
||||
partition_writer(
|
||||
result_request request,
|
||||
@@ -58,7 +59,8 @@ public:
|
||||
ser::query_result__partitions& pw,
|
||||
ser::vector_position pos,
|
||||
ser::after_qr_partition__key w,
|
||||
md5_hasher& digest)
|
||||
md5_hasher& digest,
|
||||
uint32_t& row_count)
|
||||
: _request(request)
|
||||
, _w(std::move(w))
|
||||
, _slice(slice)
|
||||
@@ -67,6 +69,7 @@ public:
|
||||
, _pos(std::move(pos))
|
||||
, _digest(digest)
|
||||
, _digest_pos(digest)
|
||||
, _row_count(row_count)
|
||||
{ }
|
||||
|
||||
bool requested_digest() const {
|
||||
@@ -98,6 +101,9 @@ public:
|
||||
md5_hasher& digest() {
|
||||
return _digest;
|
||||
}
|
||||
uint32_t& row_count() {
|
||||
return _row_count;
|
||||
}
|
||||
};
|
||||
|
||||
class result::builder {
|
||||
@@ -106,6 +112,7 @@ class result::builder {
|
||||
const partition_slice& _slice;
|
||||
ser::query_result__partitions _w;
|
||||
result_request _request;
|
||||
uint32_t _row_count = 0;
|
||||
public:
|
||||
builder(const partition_slice& slice, result_request request)
|
||||
: _slice(slice)
|
||||
@@ -130,21 +137,21 @@ public:
|
||||
if (_request != result_request::only_result) {
|
||||
key.feed_hash(_digest, s);
|
||||
}
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest);
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count);
|
||||
}
|
||||
|
||||
result build() {
|
||||
std::move(_w).end_partitions().end_query_result();
|
||||
switch (_request) {
|
||||
case result_request::only_result:
|
||||
return result(std::move(_out));
|
||||
return result(std::move(_out), _row_count);
|
||||
case result_request::only_digest: {
|
||||
bytes_ostream buf;
|
||||
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
|
||||
return result(std::move(buf), result_digest(_digest.finalize_array()));
|
||||
}
|
||||
case result_request::result_and_digest:
|
||||
return result(std::move(_out), result_digest(_digest.finalize_array()));
|
||||
return result(std::move(_out), result_digest(_digest.finalize_array()), _row_count);
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
@@ -96,14 +96,16 @@ public:
|
||||
class result {
|
||||
bytes_ostream _w;
|
||||
stdx::optional<result_digest> _digest;
|
||||
stdx::optional<uint32_t> _row_count;
|
||||
|
||||
public:
|
||||
class builder;
|
||||
class partition_writer;
|
||||
friend class result_merger;
|
||||
|
||||
result();
|
||||
result(bytes_ostream&& w) : _w(std::move(w)) {}
|
||||
result(bytes_ostream&& w, stdx::optional<result_digest> d) : _w(std::move(w)), _digest(d) {}
|
||||
result(bytes_ostream&& w, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _row_count(c) {}
|
||||
result(bytes_ostream&& w, stdx::optional<result_digest> d, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _digest(d), _row_count(c) {}
|
||||
result(result&&) = default;
|
||||
result(const result&) = default;
|
||||
result& operator=(result&&) = default;
|
||||
@@ -117,6 +119,10 @@ public:
|
||||
return _digest;
|
||||
}
|
||||
|
||||
const stdx::optional<uint32_t>& row_count() const {
|
||||
return _row_count;
|
||||
}
|
||||
|
||||
uint32_t calculate_row_count(const query::partition_slice&);
|
||||
|
||||
struct printer {
|
||||
|
||||
10
query.cc
10
query.cc
@@ -213,8 +213,16 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
|
||||
bytes_ostream w;
|
||||
auto partitions = ser::writer_of_query_result(w).start_partitions();
|
||||
std::experimental::optional<uint32_t> row_count = 0;
|
||||
|
||||
for (auto&& r : _partial) {
|
||||
if (row_count) {
|
||||
if (r->row_count()) {
|
||||
row_count = row_count.value() + r->row_count().value();
|
||||
} else {
|
||||
row_count = std::experimental::nullopt;
|
||||
}
|
||||
}
|
||||
result_view::do_with(*r, [&] (result_view rv) {
|
||||
for (auto&& pv : rv._v.partitions()) {
|
||||
partitions.add(pv);
|
||||
@@ -224,7 +232,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
|
||||
std::move(partitions).end_partitions().end_query_result();
|
||||
|
||||
return make_foreign(make_lw_shared<query::result>(std::move(w)));
|
||||
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -393,7 +393,7 @@ static future<> sync_range(seastar::sharded<database>& db,
|
||||
return sp_in.execute().discard_result().then([&sp_out] {
|
||||
return sp_out.execute().discard_result();
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.error("repair's stream failed: {}", ep);
|
||||
logger.warn("repair's stream failed: {}", ep);
|
||||
return make_exception_future(ep);
|
||||
});
|
||||
});
|
||||
|
||||
54
row_cache.cc
54
row_cache.cc
@@ -400,7 +400,16 @@ row_cache::make_reader(schema_ptr s, const query::partition_range& range, const
|
||||
}
|
||||
|
||||
row_cache::~row_cache() {
|
||||
clear();
|
||||
clear_now();
|
||||
}
|
||||
|
||||
void row_cache::clear_now() noexcept {
|
||||
with_allocator(_tracker.allocator(), [this] {
|
||||
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
|
||||
_tracker.on_erase();
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::populate(const mutation& m) {
|
||||
@@ -424,16 +433,8 @@ void row_cache::populate(const mutation& m) {
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::clear() {
|
||||
with_allocator(_tracker.allocator(), [this] {
|
||||
// We depend on clear_and_dispose() below not looking up any keys.
|
||||
// Using with_linearized_managed_bytes() is no helps, because we don't
|
||||
// want to propagate an exception from here.
|
||||
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
|
||||
_tracker.on_erase();
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
future<> row_cache::clear() {
|
||||
return invalidate(query::full_partition_range);
|
||||
}
|
||||
|
||||
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
|
||||
@@ -459,8 +460,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
|
||||
});
|
||||
if (blow_cache) {
|
||||
// We failed to invalidate the key, presumably due to with_linearized_managed_bytes()
|
||||
// running out of memory. Recover using clear(), which doesn't throw.
|
||||
clear();
|
||||
// running out of memory. Recover using clear_now(), which doesn't throw.
|
||||
clear_now();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -534,7 +535,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
future<> row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
return _populate_phaser.advance_and_await().then([this, &dk] {
|
||||
_read_section(_tracker.region(), [&] {
|
||||
with_allocator(_tracker.allocator(), [this, &dk] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
@@ -542,17 +544,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate(const query::partition_range& range) {
|
||||
with_linearized_managed_bytes([&] {
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
|
||||
auto unwrapped = range.unwrap();
|
||||
invalidate(unwrapped.first);
|
||||
invalidate(unwrapped.second);
|
||||
return;
|
||||
}
|
||||
future<> row_cache::invalidate(const query::partition_range& range) {
|
||||
return _populate_phaser.advance_and_await().then([this, &range] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
|
||||
auto unwrapped = range.unwrap();
|
||||
invalidate_unwrapped(unwrapped.first);
|
||||
invalidate_unwrapped(unwrapped.second);
|
||||
} else {
|
||||
invalidate_unwrapped(range);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate_unwrapped(const query::partition_range& range) {
|
||||
logalloc::reclaim_lock _(_tracker.region());
|
||||
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
@@ -578,7 +587,6 @@ void row_cache::invalidate(const query::partition_range& range) {
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,
|
||||
|
||||
24
row_cache.hh
24
row_cache.hh
@@ -182,13 +182,13 @@ private:
|
||||
mutation_source _underlying;
|
||||
key_source _underlying_keys;
|
||||
|
||||
// Synchronizes populating reads with update() to ensure that cache
|
||||
// Synchronizes populating reads with updates of underlying data source to ensure that cache
|
||||
// remains consistent across flushes with the underlying data source.
|
||||
// Readers obtained from the underlying data source in earlier than
|
||||
// current phases must not be used to populate the cache, unless they hold
|
||||
// phaser::operation created in the reader's phase of origin. Readers
|
||||
// should hold to a phase only briefly because this inhibits progress of
|
||||
// update(). Phase changes occur only in update(), which can be assumed to
|
||||
// updates. Phase changes occur in update()/clear(), which can be assumed to
|
||||
// be asynchronous wrt invoking of the underlying data source.
|
||||
utils::phased_barrier _populate_phaser;
|
||||
|
||||
@@ -200,6 +200,8 @@ private:
|
||||
void on_miss();
|
||||
void upgrade_entry(cache_entry&);
|
||||
void invalidate_locked(const dht::decorated_key&);
|
||||
void invalidate_unwrapped(const query::partition_range&);
|
||||
void clear_now() noexcept;
|
||||
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
|
||||
public:
|
||||
~row_cache();
|
||||
@@ -221,7 +223,9 @@ public:
|
||||
void populate(const mutation& m);
|
||||
|
||||
// Clears the cache.
|
||||
void clear();
|
||||
// Guarantees that cache will not be populated using readers created
|
||||
// before this method was invoked.
|
||||
future<> clear();
|
||||
|
||||
// Synchronizes cache with the underlying data source from a memtable which
|
||||
// has just been flushed to the underlying data source.
|
||||
@@ -233,11 +237,21 @@ public:
|
||||
void touch(const dht::decorated_key&);
|
||||
|
||||
// Removes given partition from cache.
|
||||
void invalidate(const dht::decorated_key&);
|
||||
//
|
||||
// Guarantees that cache will not be populated with given key
|
||||
// using readers created before this method was invoked.
|
||||
//
|
||||
// The key must be kept alive until method resolves.
|
||||
future<> invalidate(const dht::decorated_key& key);
|
||||
|
||||
// Removes given range of partitions from cache.
|
||||
// The range can be a wrap around.
|
||||
void invalidate(const query::partition_range&);
|
||||
//
|
||||
// Guarantees that cache will not be populated with partitions from that range
|
||||
// using readers created before this method was invoked.
|
||||
//
|
||||
// The range must be kept alive until method resolves.
|
||||
future<> invalidate(const query::partition_range&);
|
||||
|
||||
auto num_entries() const {
|
||||
return _partitions.size();
|
||||
|
||||
@@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
return make_ready_future<>();
|
||||
case schema_registry_entry::sync_state::SYNCING:
|
||||
return _synced_future;
|
||||
case schema_registry_entry::sync_state::NOT_SYNCED:
|
||||
case schema_registry_entry::sync_state::NOT_SYNCED: {
|
||||
logger.debug("Syncing {}", _version);
|
||||
_synced_promise = {};
|
||||
do_with(std::move(syncer), [] (auto& syncer) {
|
||||
auto f = do_with(std::move(syncer), [] (auto& syncer) {
|
||||
return syncer();
|
||||
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
});
|
||||
_synced_future = _synced_promise.get_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
if (_sync_state != sync_state::SYNCING) {
|
||||
return;
|
||||
}
|
||||
@@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
_synced_promise.set_value();
|
||||
}
|
||||
});
|
||||
_synced_future = _synced_promise.get_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
return _synced_future;
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: dab58e4562...010c27ce8c
@@ -710,20 +710,28 @@ public static class MigrationsSerializer implements IVersionedSerializer<Collect
|
||||
//
|
||||
// The endpoint is the node from which 's' originated.
|
||||
//
|
||||
// FIXME: Avoid the sync if the source was/is synced by schema_tables::merge_schema().
|
||||
static future<> maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) {
|
||||
if (s->is_synced()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Serialize schema sync by always doing it on shard 0.
|
||||
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] {
|
||||
schema_ptr s = gs.get();
|
||||
schema_registry_entry& e = *s->registry_entry();
|
||||
return e.maybe_sync([endpoint, s] {
|
||||
return s->registry_entry()->maybe_sync([s, endpoint] {
|
||||
auto merge = [gs = global_schema_ptr(s), endpoint] {
|
||||
schema_ptr s = gs.get();
|
||||
logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint);
|
||||
return get_local_migration_manager().merge_schema_from(endpoint);
|
||||
});
|
||||
};
|
||||
|
||||
// Serialize schema sync by always doing it on shard 0.
|
||||
if (engine().cpu_id() == 0) {
|
||||
return merge();
|
||||
} else {
|
||||
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] {
|
||||
schema_ptr s = gs.get();
|
||||
schema_registry_entry& e = *s->registry_entry();
|
||||
return e.maybe_sync(merge);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -2260,14 +2260,14 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
|
||||
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
|
||||
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count) {
|
||||
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
|
||||
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
|
||||
std::vector<::shared_ptr<abstract_read_executor>> exec;
|
||||
auto concurrent_fetch_starting_index = i;
|
||||
auto p = shared_from_this();
|
||||
|
||||
while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) {
|
||||
while (i != ranges.end() && std::distance(concurrent_fetch_starting_index, i) < concurrency_factor) {
|
||||
query::partition_range& range = *i;
|
||||
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
|
||||
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints);
|
||||
@@ -2325,13 +2325,15 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
|
||||
return rex->execute(timeout);
|
||||
}, std::move(merger));
|
||||
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout]
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count]
|
||||
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
|
||||
total_row_count += result->row_count() ? result->row_count().value() :
|
||||
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
|
||||
results.emplace_back(std::move(result));
|
||||
if (i == ranges.end()) {
|
||||
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
|
||||
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
|
||||
} else {
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor);
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, total_row_count);
|
||||
}
|
||||
}).handle_exception([p] (std::exception_ptr eptr) {
|
||||
p->handle_read_error(eptr);
|
||||
@@ -2363,6 +2365,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
|
||||
results.reserve(ranges.size()/concurrency_factor + 1);
|
||||
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
|
||||
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
|
||||
|
||||
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor)
|
||||
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
|
||||
@@ -219,7 +219,7 @@ private:
|
||||
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor);
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count = 0);
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
@@ -972,6 +972,28 @@ void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subsc
|
||||
|
||||
static stdx::optional<future<>> drain_in_progress;
|
||||
|
||||
future<> storage_service::stop_transport() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
logger.info("Stop transport: starts");
|
||||
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
logger.info("Stop transport: stop_gossiping done");
|
||||
|
||||
ss.shutdown_client_servers().get();
|
||||
logger.info("Stop transport: shutdown rpc and cql server done");
|
||||
|
||||
ss.do_stop_ms().get();
|
||||
logger.info("Stop transport: shutdown messaging_service done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
logger.info("Stop transport: auth shutdown");
|
||||
|
||||
logger.info("Stop transport: done");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::drain_on_shutdown() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
if (drain_in_progress) {
|
||||
@@ -980,17 +1002,8 @@ future<> storage_service::drain_on_shutdown() {
|
||||
return seastar::async([&ss] {
|
||||
logger.info("Drain on shutdown: starts");
|
||||
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
logger.info("Drain on shutdown: stop_gossiping done");
|
||||
|
||||
ss.shutdown_client_servers().get();
|
||||
logger.info("Drain on shutdown: shutdown rpc and cql server done");
|
||||
|
||||
ss.do_stop_ms().get();
|
||||
logger.info("Drain on shutdown: shutdown messaging_service done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
logger.info("Drain on shutdown: auth shutdown");
|
||||
ss.stop_transport().get();
|
||||
logger.info("Drain on shutdown: stop_transport done");
|
||||
|
||||
ss.flush_column_families();
|
||||
logger.info("Drain on shutdown: flush column_families done");
|
||||
@@ -1797,16 +1810,18 @@ future<> storage_service::start_native_transport() {
|
||||
// return cserver->stop();
|
||||
//});
|
||||
|
||||
::shared_ptr<seastar::tls::server_credentials> cred;
|
||||
std::shared_ptr<seastar::tls::credentials_builder> cred;
|
||||
auto addr = ipv4_addr{ip, port};
|
||||
auto f = make_ready_future();
|
||||
|
||||
// main should have made sure values are clean and neatish
|
||||
if (ceo.at("enabled") == "true") {
|
||||
cred = ::make_shared<seastar::tls::server_credentials>(::make_shared<seastar::tls::dh_params>(seastar::tls::dh_params::level::MEDIUM));
|
||||
cred = std::make_shared<seastar::tls::credentials_builder>();
|
||||
cred->set_dh_level(seastar::tls::dh_params::level::MEDIUM);
|
||||
f = cred->set_x509_key_file(ceo.at("certificate"), ceo.at("keyfile"), seastar::tls::x509_crt_format::PEM);
|
||||
logger.info("Enabling encrypted CQL connections between client and server");
|
||||
}
|
||||
return f.then([cserver, addr, cred, keepalive] {
|
||||
return f.then([cserver, addr, cred = std::move(cred), keepalive] {
|
||||
return cserver->invoke_on_all(&transport::cql_server::listen, addr, cred, keepalive);
|
||||
});
|
||||
});
|
||||
@@ -2987,7 +3002,7 @@ void storage_service::do_isolate_on_error(disk_error type)
|
||||
if (must_isolate && !isolated.exchange(true)) {
|
||||
logger.warn("Shutting down communications due to I/O errors until operator intervention");
|
||||
// isolated protect us against multiple stops
|
||||
service::get_storage_service().invoke_on_all([] (service::storage_service& s) { s.stop_native_transport(); });
|
||||
service::get_local_storage_service().stop_transport();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -382,6 +382,8 @@ public:
|
||||
|
||||
future<> drain_on_shutdown();
|
||||
|
||||
future<> stop_transport();
|
||||
|
||||
void flush_column_families();
|
||||
#if 0
|
||||
/**
|
||||
|
||||
@@ -187,6 +187,49 @@ void compaction_manager::task_start(lw_shared_ptr<compaction_manager::task>& tas
|
||||
});
|
||||
}
|
||||
|
||||
// submit_sstable_rewrite() starts a compaction task, much like submit(),
|
||||
// But rather than asking a compaction policy what to compact, this function
|
||||
// compacts just a single sstable, and writes one new sstable. This operation
|
||||
// is useful to split an sstable containing data belonging to multiple shards
|
||||
// into a separate sstable on each shard.
|
||||
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
|
||||
// The semaphore ensures that the sstable rewrite operations submitted by
|
||||
// submit_sstable_rewrite are run in sequence, and not all of them in
|
||||
// parallel. Note that unlike general compaction which currently allows
|
||||
// different cfs to compact in parallel, here we don't have a semaphore
|
||||
// per cf, so we only get one rewrite at a time on each shard.
|
||||
static thread_local semaphore sem(1);
|
||||
// We cannot, and don't need to, compact an sstable which is already
|
||||
// being compacted anyway.
|
||||
if (_stopped || _compacting_sstables.count(sst)) {
|
||||
return;
|
||||
}
|
||||
// Conversely, we don't want another compaction job to compact the
|
||||
// sstable we are planning to work on:
|
||||
_compacting_sstables.insert(sst);
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
_tasks.push_back(task);
|
||||
_stats.active_tasks++;
|
||||
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
|
||||
return cf->compact_sstables(sstables::compaction_descriptor(
|
||||
std::vector<sstables::shared_sstable>{sst},
|
||||
sst->get_sstable_level(),
|
||||
std::numeric_limits<uint64_t>::max()), false);
|
||||
}).then_wrapped([this, sst, task] (future<> f) {
|
||||
_compacting_sstables.erase(sst);
|
||||
_stats.active_tasks--;
|
||||
_tasks.remove(task);
|
||||
try {
|
||||
f.get();
|
||||
_stats.completed_tasks++;
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
cmlog.info("compaction info: {}", e.what());
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task>& task) {
|
||||
task->stopping = true;
|
||||
return task->compaction_gate.close().then([task] {
|
||||
@@ -228,7 +271,6 @@ void compaction_manager::register_collectd_metrics() {
|
||||
|
||||
void compaction_manager::start(int task_nr) {
|
||||
_stopped = false;
|
||||
_tasks.reserve(task_nr);
|
||||
register_collectd_metrics();
|
||||
for (int i = 0; i < task_nr; i++) {
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
|
||||
@@ -58,7 +58,7 @@ private:
|
||||
};
|
||||
|
||||
// compaction manager may have N fibers to allow parallel compaction per shard.
|
||||
std::vector<lw_shared_ptr<task>> _tasks;
|
||||
std::list<lw_shared_ptr<task>> _tasks;
|
||||
|
||||
// Queue shared among all tasks containing all column families to be compacted.
|
||||
std::deque<column_family*> _cfs_to_compact;
|
||||
@@ -113,6 +113,13 @@ public:
|
||||
// Submit a column family to be cleaned up and wait for its termination.
|
||||
future<> perform_cleanup(column_family* cf);
|
||||
|
||||
// Submit a specific sstable to be rewritten, while dropping data which
|
||||
// does not belong to this shard. Meant to be used on startup when an
|
||||
// sstable is shared by multiple shards, and we want to split it to a
|
||||
// separate sstable for each shard.
|
||||
void submit_sstable_rewrite(column_family* cf,
|
||||
sstables::shared_sstable s);
|
||||
|
||||
// Remove a column family from the compaction manager.
|
||||
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
|
||||
future<> remove(column_family* cf);
|
||||
|
||||
@@ -107,10 +107,12 @@ public:
|
||||
|
||||
// ensure all SSTables are in the manifest
|
||||
for (auto& sstable : sstables) {
|
||||
// unconditionally add a sstable to a list of its level.
|
||||
manifest.add(sstable);
|
||||
}
|
||||
|
||||
for (auto i = 1U; i < manifest._generations.size(); i++) {
|
||||
// send overlapping sstables (with level > 0) to level 0, if any.
|
||||
manifest.repair_overlapping_sstables(i);
|
||||
}
|
||||
|
||||
@@ -123,36 +125,8 @@ public:
|
||||
if (level >= _generations.size()) {
|
||||
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (_generations.size() - 1)));
|
||||
}
|
||||
#if 0
|
||||
logDistribution();
|
||||
#endif
|
||||
if (can_add_sstable(sstable)) {
|
||||
// adding the sstable does not cause overlap in the level
|
||||
|
||||
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
|
||||
|
||||
_generations[level].push_back(sstable);
|
||||
} else {
|
||||
// this can happen if:
|
||||
// * a compaction has promoted an overlapping sstable to the given level, or
|
||||
// was also supposed to add an sstable at the given level.
|
||||
// * we are moving sstables from unrepaired to repaired and the sstable
|
||||
// would cause overlap
|
||||
//
|
||||
// The add(..):ed sstable will be sent to level 0
|
||||
#if 0
|
||||
try
|
||||
{
|
||||
reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
|
||||
reader.reloadSSTableMetadata();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
|
||||
}
|
||||
#endif
|
||||
_generations[0].push_back(sstable);
|
||||
}
|
||||
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
|
||||
_generations[level].push_back(sstable);
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -258,20 +232,8 @@ public:
|
||||
|
||||
void send_back_to_L0(sstables::shared_sstable& sstable) {
|
||||
remove(sstable);
|
||||
#if 0
|
||||
try
|
||||
{
|
||||
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
|
||||
sstable.reloadSSTableMetadata();
|
||||
add(sstable);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException("Could not reload sstable meta data", e);
|
||||
}
|
||||
#else
|
||||
_generations[0].push_back(sstable);
|
||||
#endif
|
||||
sstable->set_sstable_level(0);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
@@ -1794,6 +1794,20 @@ double sstable::get_compression_ratio() const {
|
||||
}
|
||||
}
|
||||
|
||||
void sstable::set_sstable_level(uint32_t new_level) {
|
||||
auto entry = _statistics.contents.find(metadata_type::Stats);
|
||||
if (entry == _statistics.contents.end()) {
|
||||
return;
|
||||
}
|
||||
auto& p = entry->second;
|
||||
if (!p) {
|
||||
throw std::runtime_error("Statistics is malformed");
|
||||
}
|
||||
stats_metadata& s = *static_cast<stats_metadata *>(p.get());
|
||||
sstlog.debug("set level of {} with generation {} from {} to {}", get_filename(), _generation, s.sstable_level, new_level);
|
||||
s.sstable_level = new_level;
|
||||
}
|
||||
|
||||
future<> sstable::mutate_sstable_level(uint32_t new_level) {
|
||||
if (!has_component(component_type::Statistics)) {
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -529,6 +529,9 @@ public:
|
||||
return get_stats_metadata().sstable_level;
|
||||
}
|
||||
|
||||
// This will change sstable level only in memory.
|
||||
void set_sstable_level(uint32_t);
|
||||
|
||||
double get_compression_ratio() const;
|
||||
|
||||
future<> mutate_sstable_level(uint32_t);
|
||||
|
||||
@@ -228,7 +228,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}
|
||||
_stream_result->handle_session_prepared(this->shared_from_this());
|
||||
} catch (...) {
|
||||
sslog.error("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
|
||||
sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
return make_ready_future<>();
|
||||
@@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
return ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
|
||||
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
|
||||
}).handle_exception([id, plan_id] (auto ep) {
|
||||
sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
}).then([this] {
|
||||
@@ -248,7 +248,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}
|
||||
|
||||
void stream_session::on_error() {
|
||||
sslog.error("[Stream #{}] Streaming error occurred", plan_id());
|
||||
sslog.warn("[Stream #{}] Streaming error occurred", plan_id());
|
||||
// fail session
|
||||
close_session(stream_session_state::FAILED);
|
||||
}
|
||||
@@ -270,7 +270,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
db.find_column_family(ks, cf);
|
||||
} catch (no_such_column_family) {
|
||||
auto err = sprint("[Stream #{}] prepare requested ks={} cf={} does not exist", ks, cf);
|
||||
sslog.error(err.c_str());
|
||||
sslog.warn(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
}
|
||||
@@ -284,7 +284,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
db.find_column_family(cf_id);
|
||||
} catch (no_such_column_family) {
|
||||
auto err = sprint("[Stream #{}] prepare cf_id=%s does not exist", plan_id, cf_id);
|
||||
sslog.error(err.c_str());
|
||||
sslog.warn(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
prepare_receiving(summary);
|
||||
|
||||
@@ -85,41 +85,41 @@ struct send_info {
|
||||
};
|
||||
|
||||
future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
}).handle_exception([si] (auto ep) {
|
||||
// There might be larger number of STREAM_MUTATION inflight.
|
||||
// Log one error per column_family per range
|
||||
if (!si->error_logged) {
|
||||
si->error_logged = true;
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
return stop_iteration::no;
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
}).handle_exception([si] (auto ep) {
|
||||
// There might be larger number of STREAM_MUTATION inflight.
|
||||
// Log one error per column_family per range
|
||||
if (!si->error_logged) {
|
||||
si->error_logged = true;
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
auto& priority = service::get_local_streaming_read_priority();
|
||||
return do_with(cf.make_reader(cf.schema(), si->pr, priority), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return repeat([si, &reader] {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
});
|
||||
}).then([si] {
|
||||
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
|
||||
auto cf_id = this->cf_id;
|
||||
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
|
||||
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
|
||||
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
|
||||
auto cf_id = this->cf_id;
|
||||
@@ -153,7 +153,7 @@ void stream_transfer_task::start() {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
|
||||
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,
|
||||
cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) {
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
}).then([this, id, plan_id, cf_id] {
|
||||
@@ -161,7 +161,7 @@ void stream_transfer_task::start() {
|
||||
session->start_keep_alive_timer();
|
||||
session->transfer_task_completed(cf_id);
|
||||
}).handle_exception([this, plan_id, id] (auto ep){
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
this->session->on_error();
|
||||
});
|
||||
}
|
||||
|
||||
1
test.py
1
test.py
@@ -32,6 +32,7 @@ boost_tests = [
|
||||
'types_test',
|
||||
'keys_test',
|
||||
'mutation_test',
|
||||
'schema_registry_test',
|
||||
'range_test',
|
||||
'mutation_reader_test',
|
||||
'cql_query_test',
|
||||
|
||||
@@ -444,3 +444,36 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
auto now = gc_clock::now();
|
||||
auto slice = partition_slice_builder(*s).build();
|
||||
|
||||
mutation m1(partition_key::from_single_value(*s, "key1"), s);
|
||||
|
||||
auto src = make_source({m1});
|
||||
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(partition_key::from_single_value(*s, "key2"), s);
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -23,10 +23,15 @@
|
||||
#define BOOST_TEST_MODULE core
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include "boost/icl/interval.hpp"
|
||||
#include "boost/icl/interval_map.hpp"
|
||||
#include <unordered_set>
|
||||
|
||||
#include "query-request.hh"
|
||||
#include "schema_builder.hh"
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
@@ -447,3 +452,56 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
|
||||
// [3,4) and (4,5]
|
||||
BOOST_REQUIRE(range<unsigned>({3}, {{4, false}}).overlaps(range<unsigned>({{4, false}}, {5}), unsigned_comparator()) == false);
|
||||
}
|
||||
|
||||
auto get_item(std::string left, std::string right, std::string val) {
|
||||
using value_type = std::unordered_set<std::string>;
|
||||
auto l = dht::global_partitioner().from_sstring(left);
|
||||
auto r = dht::global_partitioner().from_sstring(right);
|
||||
auto rg = range<dht::token>({{l, false}}, {r});
|
||||
value_type v{val};
|
||||
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_range_interval_map) {
|
||||
using value_type = std::unordered_set<std::string>;
|
||||
using token = dht::token;
|
||||
boost::icl::interval_map<token, value_type> mymap;
|
||||
|
||||
mymap += get_item("1", "5", "A");
|
||||
mymap += get_item("5", "8", "B");
|
||||
mymap += get_item("1", "3", "C");
|
||||
mymap += get_item("3", "8", "D");
|
||||
|
||||
std::cout << "my map: " << "\n";
|
||||
for (auto x : mymap) {
|
||||
std::cout << x.first << " -> ";
|
||||
for (auto s : x.second) {
|
||||
std::cout << s << ", ";
|
||||
}
|
||||
std::cout << "\n";
|
||||
}
|
||||
|
||||
auto search_item = [&mymap] (std::string val) {
|
||||
auto tok = dht::global_partitioner().from_sstring(val);
|
||||
auto search = range<token>(tok);
|
||||
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
|
||||
if (it != mymap.end()) {
|
||||
std::cout << "Found OK:" << " token = " << tok << " in range: " << it->first << "\n";
|
||||
return true;
|
||||
} else {
|
||||
std::cout << "Found NO:" << " token = " << tok << "\n";
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
BOOST_REQUIRE(search_item("0") == false);
|
||||
BOOST_REQUIRE(search_item("1") == false);
|
||||
BOOST_REQUIRE(search_item("2") == true);
|
||||
BOOST_REQUIRE(search_item("3") == true);
|
||||
BOOST_REQUIRE(search_item("4") == true);
|
||||
BOOST_REQUIRE(search_item("5") == true);
|
||||
BOOST_REQUIRE(search_item("6") == true);
|
||||
BOOST_REQUIRE(search_item("7") == true);
|
||||
BOOST_REQUIRE(search_item("8") == true);
|
||||
BOOST_REQUIRE(search_item("9") == false);
|
||||
}
|
||||
|
||||
@@ -546,27 +546,33 @@ static std::vector<mutation> updated_ring(std::vector<mutation>& mutations) {
|
||||
return result;
|
||||
}
|
||||
|
||||
static mutation_source make_mutation_source(std::vector<lw_shared_ptr<memtable>>& memtables) {
|
||||
return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) {
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, pr));
|
||||
}
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
}
|
||||
|
||||
static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtable>>& memtables) {
|
||||
return key_source([s, &memtables] (const query::partition_range& pr) {
|
||||
std::vector<key_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->as_key_source()(pr));
|
||||
}
|
||||
return make_combined_reader(s, std::move(readers));
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
std::vector<lw_shared_ptr<memtable>> memtables;
|
||||
auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) {
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, pr));
|
||||
}
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
auto memtables_key_source = key_source([&] (const query::partition_range& pr) {
|
||||
std::vector<key_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->as_key_source()(pr));
|
||||
}
|
||||
return make_combined_reader(s, std::move(readers));
|
||||
});
|
||||
throttled_mutation_source cache_source(memtables_data_source);
|
||||
throttled_mutation_source cache_source(make_mutation_source(memtables));
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, cache_source, memtables_key_source, tracker);
|
||||
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
|
||||
|
||||
auto mt1 = make_lw_shared<memtable>(s);
|
||||
memtables.push_back(mt1);
|
||||
@@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
auto some_element = keys_in_cache.begin() + 547;
|
||||
std::vector<dht::decorated_key> keys_not_in_cache;
|
||||
keys_not_in_cache.push_back(*some_element);
|
||||
cache.invalidate(*some_element);
|
||||
cache.invalidate(*some_element).get();
|
||||
keys_in_cache.erase(some_element);
|
||||
|
||||
for (auto&& key : keys_in_cache) {
|
||||
@@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
{ *some_range_begin, true }, { *some_range_end, false }
|
||||
);
|
||||
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
|
||||
cache.invalidate(range);
|
||||
cache.invalidate(range).get();
|
||||
keys_in_cache.erase(some_range_begin, some_range_end);
|
||||
|
||||
for (auto&& key : keys_in_cache) {
|
||||
@@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
std::vector<lw_shared_ptr<memtable>> memtables;
|
||||
throttled_mutation_source cache_source(make_mutation_source(memtables));
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
|
||||
|
||||
auto mt1 = make_lw_shared<memtable>(s);
|
||||
memtables.push_back(mt1);
|
||||
auto ring = make_ring(s, 3);
|
||||
for (auto&& m : ring) {
|
||||
mt1->apply(m);
|
||||
}
|
||||
|
||||
auto mt2 = make_lw_shared<memtable>(s);
|
||||
auto ring2 = updated_ring(ring);
|
||||
for (auto&& m : ring2) {
|
||||
mt2->apply(m);
|
||||
}
|
||||
|
||||
cache_source.block();
|
||||
|
||||
auto rd1 = cache.make_reader(s);
|
||||
auto rd1_result = rd1();
|
||||
|
||||
sleep(10ms).get();
|
||||
|
||||
memtables.clear();
|
||||
memtables.push_back(mt2);
|
||||
|
||||
// This update should miss on all partitions
|
||||
auto cache_cleared = cache.clear();
|
||||
|
||||
auto rd2 = cache.make_reader(s);
|
||||
|
||||
// rd1, which is in progress, should not prevent forward progress of clear()
|
||||
cache_source.unblock();
|
||||
cache_cleared.get();
|
||||
|
||||
// Reads started before memtable flush should return previous value, otherwise this test
|
||||
// doesn't trigger the conditions it is supposed to protect against.
|
||||
|
||||
assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]);
|
||||
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]);
|
||||
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]);
|
||||
assert_that(rd1().get0()).has_no_mutation();
|
||||
|
||||
// Reads started after clear but before previous populations completed
|
||||
// should already see the new data
|
||||
assert_that(std::move(rd2))
|
||||
.produces(ring2[0])
|
||||
.produces(ring2[1])
|
||||
.produces(ring2[2])
|
||||
.produces_end_of_stream();
|
||||
|
||||
// Reads started after clear should see new data
|
||||
assert_that(cache.make_reader(s))
|
||||
.produces(ring2[0])
|
||||
.produces(ring2[1])
|
||||
.produces(ring2[2])
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
@@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
}
|
||||
|
||||
// wrap-around
|
||||
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()}));
|
||||
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
|
||||
|
||||
verify_does_not_have(cache, ring[0].decorated_key());
|
||||
verify_does_not_have(cache, ring[1].decorated_key());
|
||||
@@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
verify_does_not_have(cache, ring[7].decorated_key());
|
||||
|
||||
// not wrap-around
|
||||
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()}));
|
||||
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
|
||||
|
||||
verify_does_not_have(cache, ring[0].decorated_key());
|
||||
verify_does_not_have(cache, ring[1].decorated_key());
|
||||
|
||||
130
tests/schema_registry_test.cc
Normal file
130
tests/schema_registry_test.cc
Normal file
@@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_DYN_LINK
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
#include "tests/test-utils.hh"
|
||||
#include "schema_registry.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "mutation_source_test.hh"
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
static bytes random_column_name() {
|
||||
return to_bytes(to_hex(make_blob(32)));
|
||||
}
|
||||
|
||||
static schema_ptr random_schema() {
|
||||
return schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column(random_column_name(), bytes_type)
|
||||
.build();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_async_loading) {
|
||||
return seastar::async([] {
|
||||
auto s1 = random_schema();
|
||||
auto s2 = random_schema();
|
||||
|
||||
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
|
||||
return make_ready_future<frozen_schema>(frozen_schema(s1));
|
||||
}).get0();
|
||||
|
||||
BOOST_REQUIRE(s1_loaded);
|
||||
BOOST_REQUIRE(s1_loaded->version() == s1->version());
|
||||
auto s1_later = local_schema_registry().get_or_null(s1->version());
|
||||
BOOST_REQUIRE(s1_later);
|
||||
|
||||
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
|
||||
return later().then([s2] { return frozen_schema(s2); });
|
||||
}).get0();
|
||||
|
||||
BOOST_REQUIRE(s2_loaded);
|
||||
BOOST_REQUIRE(s2_loaded->version() == s2->version());
|
||||
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
|
||||
BOOST_REQUIRE(s2_later);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
s->registry_entry()->maybe_sync([] { return later(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
|
||||
promise<> fail_sync;
|
||||
|
||||
auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable {
|
||||
return fail_sync.get_future().then([] {
|
||||
throw std::runtime_error("sync failed");
|
||||
});
|
||||
});
|
||||
|
||||
// concurrent maybe_sync should attach the the current one
|
||||
auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); });
|
||||
|
||||
fail_sync.set_value();
|
||||
|
||||
try {
|
||||
f1.get();
|
||||
BOOST_FAIL("Should have failed");
|
||||
} catch (...) {
|
||||
// expected
|
||||
}
|
||||
|
||||
try {
|
||||
f2.get();
|
||||
BOOST_FAIL("Should have failed");
|
||||
} catch (...) {
|
||||
// expected
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
|
||||
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
@@ -1948,6 +1948,45 @@ SEASTAR_TEST_CASE(leveled_06) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(leveled_07) {
|
||||
// Check that sstable, with level > 0, that overlaps with another in the same level is sent back to L0.
|
||||
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
|
||||
|
||||
column_family::config cfg;
|
||||
compaction_manager cm;
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(5);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
auto max_key = key_and_token_pair[key_and_token_pair.size()-1].first;
|
||||
|
||||
// Creating two sstables which key range overlap.
|
||||
add_sstable_for_leveled_test(cf, /*gen*/1, /*data_size*/0, /*level*/1, min_key, max_key);
|
||||
BOOST_REQUIRE(cf->get_sstables()->size() == 1);
|
||||
|
||||
add_sstable_for_leveled_test(cf, /*gen*/2, /*data_size*/0, /*level*/1, key_and_token_pair[1].first, max_key);
|
||||
BOOST_REQUIRE(cf->get_sstables()->size() == 2);
|
||||
|
||||
BOOST_REQUIRE(sstable_overlaps(cf, 1, 2) == true);
|
||||
|
||||
auto max_sstable_size_in_mb = 1;
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 1);
|
||||
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
|
||||
|
||||
auto& l0 = manifest.get_level(0);
|
||||
auto& sst = l0.front();
|
||||
BOOST_REQUIRE(sst->generation() == 2);
|
||||
BOOST_REQUIRE(sst->get_sstable_level() == 0);
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
static lw_shared_ptr<key_reader> prepare_key_reader(schema_ptr s,
|
||||
const std::vector<shared_sstable>& ssts, const query::partition_range& range)
|
||||
{
|
||||
|
||||
@@ -275,7 +275,7 @@ future<> cql_server::stop() {
|
||||
}
|
||||
|
||||
future<>
|
||||
cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> creds, bool keepalive) {
|
||||
cql_server::listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
|
||||
_notifier = std::make_unique<event_notifier>(addr.port);
|
||||
service::get_local_migration_manager().register_listener(_notifier.get());
|
||||
service::get_local_storage_service().register_subscriber(_notifier.get());
|
||||
@@ -285,7 +285,7 @@ cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
? seastar::tls::listen(creds, make_ipv4_address(addr), lo)
|
||||
? seastar::tls::listen(creds->build_server_credentials(), make_ipv4_address(addr), lo)
|
||||
: engine().listen(make_ipv4_address(addr), lo);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(sprint("CQLServer error while listening on %s -> %s", make_ipv4_address(addr), std::current_exception()));
|
||||
@@ -325,11 +325,9 @@ cql_server::do_accepts(int which, bool keepalive) {
|
||||
}).then_wrapped([this, which, keepalive] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (const std::bad_alloc&) {
|
||||
logger.debug("accept failed: {}, retrying", std::current_exception());
|
||||
do_accepts(which, keepalive);
|
||||
} catch (...) {
|
||||
logger.debug("accept failed: {}", std::current_exception());
|
||||
logger.warn("acccept failed: {}", std::current_exception());
|
||||
do_accepts(which, keepalive);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ private:
|
||||
cql_load_balance _lb;
|
||||
public:
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
|
||||
future<> listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> = {}, bool keepalive = false);
|
||||
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive);
|
||||
future<> stop();
|
||||
public:
|
||||
|
||||
@@ -49,3 +49,17 @@ bool is_system_error_errno(int err_no)
|
||||
code.category() == std::system_category();
|
||||
});
|
||||
}
|
||||
|
||||
bool should_stop_on_system_error(const std::system_error& e) {
|
||||
if (e.code().category() == std::system_category()) {
|
||||
// Whitelist of errors that don't require us to stop the server:
|
||||
switch (e.code().value()) {
|
||||
case EEXIST:
|
||||
case ENOENT:
|
||||
return false;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user