Compare commits
46 Commits
next
...
scylla-0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38470ca6e8 | ||
|
|
5bb25954b4 | ||
|
|
1d9ca3ef1f | ||
|
|
cb97e5dfe8 | ||
|
|
831b5af999 | ||
|
|
7f1048efb4 | ||
|
|
510b1a3afc | ||
|
|
dd831f6463 | ||
|
|
8bf59afb42 | ||
|
|
f29bc8918b | ||
|
|
4c6d655e99 | ||
|
|
fafe166d2c | ||
|
|
3380340750 | ||
|
|
4d3dac7f98 | ||
|
|
7f6891341e | ||
|
|
ece77cce90 | ||
|
|
d4a10a0a3c | ||
|
|
e885eacbe4 | ||
|
|
3f67277804 | ||
|
|
05aea2b65a | ||
|
|
a2751a9592 | ||
|
|
eda8732b8e | ||
|
|
b24f5ece1f | ||
|
|
1322ec6d6b | ||
|
|
efbf51c00b | ||
|
|
5d901b19c4 | ||
|
|
7085fc95d1 | ||
|
|
776908fbf6 | ||
|
|
5f7f276ef6 | ||
|
|
5a38f3cbfd | ||
|
|
2d4309a926 | ||
|
|
988d6cd153 | ||
|
|
bf71575fd7 | ||
|
|
cd75075214 | ||
|
|
e85f11566b | ||
|
|
8f682f018e | ||
|
|
dba2b617e7 | ||
|
|
f4e11007cf | ||
|
|
fdfa1df395 | ||
|
|
116055cc6f | ||
|
|
04c19344de | ||
|
|
df19e546f9 | ||
|
|
b532919c55 | ||
|
|
6ae6dcc2fc | ||
|
|
5716140a14 | ||
|
|
91cb9bae2e |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=0.18.2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -42,6 +42,14 @@ private:
|
||||
struct chunk {
|
||||
// FIXME: group fragment pointers to reduce pointer chasing when packetizing
|
||||
std::unique_ptr<chunk> next;
|
||||
~chunk() {
|
||||
auto p = std::move(next);
|
||||
while (p) {
|
||||
// Avoid recursion when freeing chunks
|
||||
auto p_next = std::move(p->next);
|
||||
p = std::move(p_next);
|
||||
}
|
||||
}
|
||||
size_type offset; // Also means "size" after chunk is closed
|
||||
size_type size;
|
||||
value_type data[0];
|
||||
|
||||
@@ -416,7 +416,6 @@ scylla_core = (['database.cc',
|
||||
'service/client_state.cc',
|
||||
'service/migration_task.cc',
|
||||
'service/storage_service.cc',
|
||||
'service/pending_range_calculator_service.cc',
|
||||
'service/load_broadcaster.cc',
|
||||
'service/pager/paging_state.cc',
|
||||
'service/pager/query_pagers.cc',
|
||||
|
||||
@@ -259,7 +259,10 @@ lists::setter_by_index::execute(mutation& m, const exploded_clustering_prefix& p
|
||||
// we should not get here for frozen lists
|
||||
assert(column.type->is_multi_cell()); // "Attempted to set an individual element on a frozen list";
|
||||
|
||||
auto row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
|
||||
std::experimental::optional<clustering_key> row_key;
|
||||
if (!column.is_static()) {
|
||||
row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
|
||||
}
|
||||
|
||||
auto index = _idx->bind_and_get(params._options);
|
||||
auto value = _t->bind_and_get(params._options);
|
||||
@@ -269,8 +272,7 @@ lists::setter_by_index::execute(mutation& m, const exploded_clustering_prefix& p
|
||||
}
|
||||
|
||||
auto idx = net::ntoh(int32_t(*unaligned_cast<int32_t>(index->begin())));
|
||||
|
||||
auto existing_list_opt = params.get_prefetched_list(m.key(), row_key, column);
|
||||
auto&& existing_list_opt = params.get_prefetched_list(m.key(), std::move(row_key), column);
|
||||
if (!existing_list_opt) {
|
||||
throw exceptions::invalid_request_exception("Attempted to set an element on a list which is null");
|
||||
}
|
||||
@@ -383,8 +385,13 @@ lists::discarder::requires_read() {
|
||||
void
|
||||
lists::discarder::execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) {
|
||||
assert(column.type->is_multi_cell()); // "Attempted to delete from a frozen list";
|
||||
auto&& row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
|
||||
auto&& existing_list = params.get_prefetched_list(m.key(), row_key, column);
|
||||
|
||||
std::experimental::optional<clustering_key> row_key;
|
||||
if (!column.is_static()) {
|
||||
row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
|
||||
}
|
||||
|
||||
auto&& existing_list = params.get_prefetched_list(m.key(), std::move(row_key), column);
|
||||
// We want to call bind before possibly returning to reject queries where the value provided is not a list.
|
||||
auto&& value = _t->bind(params._options);
|
||||
|
||||
@@ -444,8 +451,11 @@ lists::discarder_by_index::execute(mutation& m, const exploded_clustering_prefix
|
||||
auto cvalue = dynamic_pointer_cast<constants::value>(index);
|
||||
assert(cvalue);
|
||||
|
||||
auto row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
|
||||
auto&& existing_list = params.get_prefetched_list(m.key(), row_key, column);
|
||||
std::experimental::optional<clustering_key> row_key;
|
||||
if (!column.is_static()) {
|
||||
row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
|
||||
}
|
||||
auto&& existing_list = params.get_prefetched_list(m.key(), std::move(row_key), column);
|
||||
int32_t idx = read_simple_exactly<int32_t>(*cvalue->_bytes);
|
||||
if (!existing_list) {
|
||||
throw exceptions::invalid_request_exception("Attempted to delete an element from a list which is null");
|
||||
|
||||
@@ -186,11 +186,23 @@ modification_statement::make_update_parameters(
|
||||
class prefetch_data_builder {
|
||||
update_parameters::prefetch_data& _data;
|
||||
const query::partition_slice& _ps;
|
||||
schema_ptr _schema;
|
||||
std::experimental::optional<partition_key> _pkey;
|
||||
private:
|
||||
void add_cell(update_parameters::prefetch_data::row& cells, const column_definition& def, const std::experimental::optional<collection_mutation_view>& cell) {
|
||||
if (cell) {
|
||||
auto ctype = static_pointer_cast<const collection_type_impl>(def.type);
|
||||
if (!ctype->is_multi_cell()) {
|
||||
throw std::logic_error(sprint("cannot prefetch frozen collection: %s", def.name_as_text()));
|
||||
}
|
||||
cells.emplace(def.id, collection_mutation{*cell});
|
||||
}
|
||||
};
|
||||
public:
|
||||
prefetch_data_builder(update_parameters::prefetch_data& data, const query::partition_slice& ps)
|
||||
prefetch_data_builder(schema_ptr s, update_parameters::prefetch_data& data, const query::partition_slice& ps)
|
||||
: _data(data)
|
||||
, _ps(ps)
|
||||
, _schema(std::move(s))
|
||||
{ }
|
||||
|
||||
void accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
@@ -205,20 +217,9 @@ public:
|
||||
const query::result_row_view& row) {
|
||||
update_parameters::prefetch_data::row cells;
|
||||
|
||||
auto add_cell = [&cells] (column_id id, std::experimental::optional<collection_mutation_view>&& cell) {
|
||||
if (cell) {
|
||||
cells.emplace(id, collection_mutation{to_bytes(cell->data)});
|
||||
}
|
||||
};
|
||||
|
||||
auto static_row_iterator = static_row.iterator();
|
||||
for (auto&& id : _ps.static_columns) {
|
||||
add_cell(id, static_row_iterator.next_collection_cell());
|
||||
}
|
||||
|
||||
auto row_iterator = row.iterator();
|
||||
for (auto&& id : _ps.regular_columns) {
|
||||
add_cell(id, row_iterator.next_collection_cell());
|
||||
add_cell(cells, _schema->regular_column_at(id), row_iterator.next_collection_cell());
|
||||
}
|
||||
|
||||
_data.rows.emplace(std::make_pair(*_pkey, key), std::move(cells));
|
||||
@@ -228,7 +229,16 @@ public:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
void accept_partition_end(const query::result_row_view& static_row) {}
|
||||
void accept_partition_end(const query::result_row_view& static_row) {
|
||||
update_parameters::prefetch_data::row cells;
|
||||
|
||||
auto static_row_iterator = static_row.iterator();
|
||||
for (auto&& id : _ps.static_columns) {
|
||||
add_cell(cells, _schema->static_column_at(id), static_row_iterator.next_collection_cell());
|
||||
}
|
||||
|
||||
_data.rows.emplace(std::make_pair(*_pkey, std::experimental::nullopt), std::move(cells));
|
||||
}
|
||||
};
|
||||
|
||||
future<update_parameters::prefetched_rows_type>
|
||||
@@ -278,7 +288,7 @@ modification_statement::read_required_rows(
|
||||
bytes_ostream buf(result->buf());
|
||||
query::result_view v(buf.linearize());
|
||||
auto prefetched_rows = update_parameters::prefetched_rows_type({update_parameters::prefetch_data(s)});
|
||||
v.consume(ps, prefetch_data_builder(prefetched_rows.value(), ps));
|
||||
v.consume(ps, prefetch_data_builder(s, prefetched_rows.value(), ps));
|
||||
return prefetched_rows;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -45,15 +45,15 @@ namespace cql3 {
|
||||
|
||||
std::experimental::optional<collection_mutation_view>
|
||||
update_parameters::get_prefetched_list(
|
||||
const partition_key& pkey,
|
||||
const clustering_key& row_key,
|
||||
partition_key pkey,
|
||||
std::experimental::optional<clustering_key> ckey,
|
||||
const column_definition& column) const
|
||||
{
|
||||
if (!_prefetched) {
|
||||
return {};
|
||||
}
|
||||
|
||||
auto i = _prefetched->rows.find(std::make_pair(pkey, row_key));
|
||||
auto i = _prefetched->rows.find(std::make_pair(std::move(pkey), std::move(ckey)));
|
||||
if (i == _prefetched->rows.end()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -58,8 +58,9 @@ namespace cql3 {
|
||||
*/
|
||||
class update_parameters final {
|
||||
public:
|
||||
// Holder for data needed by CQL list updates which depend on current state of the list.
|
||||
struct prefetch_data {
|
||||
using key = std::pair<partition_key, clustering_key>;
|
||||
using key = std::pair<partition_key, std::experimental::optional<clustering_key>>;
|
||||
struct key_hashing {
|
||||
partition_key::hashing pk_hash;
|
||||
clustering_key::hashing ck_hash;
|
||||
@@ -70,7 +71,7 @@ public:
|
||||
{ }
|
||||
|
||||
size_t operator()(const key& k) const {
|
||||
return pk_hash(k.first) ^ ck_hash(k.second);
|
||||
return pk_hash(k.first) ^ (k.second ? ck_hash(*k.second) : 0);
|
||||
}
|
||||
};
|
||||
struct key_equality {
|
||||
@@ -83,7 +84,8 @@ public:
|
||||
{ }
|
||||
|
||||
bool operator()(const key& k1, const key& k2) const {
|
||||
return pk_eq(k1.first, k2.first) && ck_eq(k1.second, k2.second);
|
||||
return pk_eq(k1.first, k2.first)
|
||||
&& bool(k1.second) == bool(k2.second) && (!k1.second || ck_eq(*k1.second, *k2.second));
|
||||
}
|
||||
};
|
||||
using row = std::unordered_map<column_id, collection_mutation>;
|
||||
@@ -183,8 +185,11 @@ public:
|
||||
return _timestamp;
|
||||
}
|
||||
|
||||
std::experimental::optional<collection_mutation_view> get_prefetched_list(
|
||||
const partition_key& pkey, const clustering_key& row_key, const column_definition& column) const;
|
||||
std::experimental::optional<collection_mutation_view>
|
||||
get_prefetched_list(
|
||||
partition_key pkey,
|
||||
std::experimental::optional<clustering_key> ckey,
|
||||
const column_definition& column) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
41
database.cc
41
database.cc
@@ -589,9 +589,7 @@ column_family::seal_active_memtable() {
|
||||
|
||||
future<stop_iteration>
|
||||
column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
|
||||
// FIXME: better way of ensuring we don't attempt to
|
||||
// overwrite an existing table.
|
||||
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
|
||||
auto gen = calculate_generation_for_new_table();
|
||||
|
||||
auto newtab = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(),
|
||||
_config.datadir, gen,
|
||||
@@ -859,14 +857,25 @@ void column_family::start_compaction() {
|
||||
|
||||
void column_family::trigger_compaction() {
|
||||
// Submitting compaction job to compaction manager.
|
||||
// #934 - always inc the pending counter, to help
|
||||
// indicate the want for compaction.
|
||||
_stats.pending_compactions++;
|
||||
do_trigger_compaction(); // see below
|
||||
}
|
||||
|
||||
void column_family::do_trigger_compaction() {
|
||||
// But only submit if we're not locked out
|
||||
if (!_compaction_disabled) {
|
||||
_stats.pending_compactions++;
|
||||
_compaction_manager.submit(this);
|
||||
}
|
||||
}
|
||||
|
||||
future<> column_family::run_compaction(sstables::compaction_descriptor descriptor) {
|
||||
assert(_stats.pending_compactions > 0);
|
||||
return compact_sstables(std::move(descriptor)).then([this] {
|
||||
// only do this on success. (no exceptions)
|
||||
// in that case, we rely on it being still set
|
||||
// for reqeueuing
|
||||
_stats.pending_compactions--;
|
||||
});
|
||||
}
|
||||
@@ -1005,6 +1014,9 @@ future<> column_family::populate(sstring sstdir) {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}).then([this] {
|
||||
// Make sure this is called even if CF is empty
|
||||
mark_ready_for_writes();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1082,7 +1094,13 @@ future<> database::populate_keyspace(sstring datadir, sstring ks_name) {
|
||||
sstring uuidst = comps[1];
|
||||
|
||||
try {
|
||||
auto&& uuid = find_uuid(ks_name, cfname);
|
||||
auto&& uuid = [&] {
|
||||
try {
|
||||
return find_uuid(ks_name, cfname);
|
||||
} catch (const std::out_of_range& e) {
|
||||
std::throw_with_nested(no_such_column_family(ks_name, cfname));
|
||||
}
|
||||
}();
|
||||
auto& cf = find_column_family(uuid);
|
||||
|
||||
// #870: Check that the directory name matches
|
||||
@@ -1183,6 +1201,14 @@ database::init_system_keyspace() {
|
||||
return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME).then([this]() {
|
||||
return init_commitlog();
|
||||
});
|
||||
}).then([this] {
|
||||
auto& ks = find_keyspace(db::system_keyspace::NAME);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) {
|
||||
auto cfm = pair.second;
|
||||
auto& cf = this->find_column_family(cfm);
|
||||
cf.mark_ready_for_writes();
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1698,11 +1724,9 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const
|
||||
|
||||
void
|
||||
column_family::seal_on_overflow() {
|
||||
++_mutation_count;
|
||||
if (active_memtable().occupancy().total_space() >= _config.max_memtable_size) {
|
||||
// FIXME: if sparse, do some in-memory compaction first
|
||||
// FIXME: maybe merge with other in-memory memtables
|
||||
_mutation_count = 0;
|
||||
seal_active_memtable();
|
||||
}
|
||||
}
|
||||
@@ -2272,7 +2296,8 @@ void column_family::clear() {
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
// if we implement notifications, whatnot.
|
||||
future<db::replay_position> column_family::discard_sstables(db_clock::time_point truncated_at) {
|
||||
assert(_stats.pending_compactions == 0);
|
||||
assert(_compaction_disabled > 0);
|
||||
assert(!compaction_manager_queued());
|
||||
|
||||
return with_lock(_sstables_lock.for_read(), [this, truncated_at] {
|
||||
db::replay_position rp;
|
||||
|
||||
49
database.hh
49
database.hh
@@ -159,8 +159,8 @@ private:
|
||||
// the read lock, and the ones that wish to stop that process will take the write lock.
|
||||
rwlock _sstables_lock;
|
||||
mutable row_cache _cache; // Cache covers only sstables.
|
||||
int64_t _sstable_generation = 1;
|
||||
unsigned _mutation_count = 0;
|
||||
std::experimental::optional<int64_t> _sstable_generation = {};
|
||||
|
||||
db::replay_position _highest_flushed_rp;
|
||||
// Provided by the database that owns this commitlog
|
||||
db::commitlog* _commitlog;
|
||||
@@ -185,11 +185,17 @@ private:
|
||||
|
||||
// update the sstable generation, making sure that new new sstables don't overwrite this one.
|
||||
void update_sstables_known_generation(unsigned generation) {
|
||||
_sstable_generation = std::max<uint64_t>(_sstable_generation, generation / smp::count + 1);
|
||||
if (!_sstable_generation) {
|
||||
_sstable_generation = 1;
|
||||
}
|
||||
_sstable_generation = std::max<uint64_t>(*_sstable_generation, generation / smp::count + 1);
|
||||
}
|
||||
|
||||
uint64_t calculate_generation_for_new_table() {
|
||||
return _sstable_generation++ * smp::count + engine().cpu_id();
|
||||
assert(_sstable_generation);
|
||||
// FIXME: better way of ensuring we don't attempt to
|
||||
// overwrite an existing table.
|
||||
return (*_sstable_generation)++ * smp::count + engine().cpu_id();
|
||||
}
|
||||
|
||||
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
|
||||
@@ -206,7 +212,29 @@ private:
|
||||
key_source sstables_as_key_source() const;
|
||||
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstable_list> old_sstables);
|
||||
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
|
||||
void do_trigger_compaction();
|
||||
public:
|
||||
|
||||
// This function should be called when this column family is ready for writes, IOW,
|
||||
// to produce SSTables. Extensive details about why this is important can be found
|
||||
// in Scylla's Github Issue #1014
|
||||
//
|
||||
// Nothing should be writing to SSTables before we have the chance to populate the
|
||||
// existing SSTables and calculate what should the next generation number be.
|
||||
//
|
||||
// However, if that happens, we want to protect against it in a way that does not
|
||||
// involve overwriting existing tables. This is one of the ways to do it: every
|
||||
// column family starts in an unwriteable state, and when it can finally be written
|
||||
// to, we mark it as writeable.
|
||||
//
|
||||
// Note that this *cannot* be a part of add_column_family. That adds a column family
|
||||
// to a db in memory only, and if anybody is about to write to a CF, that was most
|
||||
// likely already called. We need to call this explicitly when we are sure we're ready
|
||||
// to issue disk operations safely.
|
||||
void mark_ready_for_writes() {
|
||||
update_sstables_known_generation(0);
|
||||
}
|
||||
|
||||
// Creates a mutation reader which covers all data sources for this column family.
|
||||
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
||||
// Note: for data queries use query() instead.
|
||||
@@ -361,8 +389,12 @@ public:
|
||||
Result run_with_compaction_disabled(Func && func) {
|
||||
++_compaction_disabled;
|
||||
return _compaction_manager.remove(this).then(std::forward<Func>(func)).finally([this] {
|
||||
if (--_compaction_disabled == 0) {
|
||||
trigger_compaction();
|
||||
// #934. The pending counter is actually a great indicator into whether we
|
||||
// actually need to trigger a compaction again.
|
||||
if (--_compaction_disabled == 0 && _stats.pending_compactions > 0) {
|
||||
// we're turning if on again, use function that does not increment
|
||||
// the counter further.
|
||||
do_trigger_compaction();
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -379,16 +411,11 @@ private:
|
||||
// But it is possible to synchronously wait for the seal to complete by
|
||||
// waiting on this future. This is useful in situations where we want to
|
||||
// synchronously flush data to disk.
|
||||
//
|
||||
// FIXME: A better interface would guarantee that all writes before this
|
||||
// one are also complete
|
||||
future<> seal_active_memtable();
|
||||
|
||||
// filter manifest.json files out
|
||||
static bool manifest_json_filter(const sstring& fname);
|
||||
|
||||
seastar::gate _in_flight_seals;
|
||||
|
||||
// Iterate over all partitions. Protocol is the same as std::all_of(),
|
||||
// so that iteration can be stopped by returning false.
|
||||
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)
|
||||
|
||||
@@ -703,6 +703,8 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
auto& ks = db.find_keyspace(s->ks_name());
|
||||
auto cfg = ks.make_column_family_config(*s);
|
||||
db.add_column_family(s, cfg);
|
||||
auto& cf = db.find_column_family(s);
|
||||
cf.mark_ready_for_writes();
|
||||
ks.make_directory_for_column_family(s->cf_name(), s->id()).get();
|
||||
service::get_local_migration_manager().notify_create_column_family(s);
|
||||
}
|
||||
|
||||
2
dist/ami/scylla.json
vendored
2
dist/ami/scylla.json
vendored
@@ -8,7 +8,7 @@
|
||||
"security_group_id": "{{user `security_group_id`}}",
|
||||
"region": "{{user `region`}}",
|
||||
"associate_public_ip_address": "{{user `associate_public_ip_address`}}",
|
||||
"source_ami": "ami-8ef1d6e4",
|
||||
"source_ami": "ami-f3102499",
|
||||
"user_data_file": "user_data.txt",
|
||||
"instance_type": "{{user `instance_type`}}",
|
||||
"ssh_username": "centos",
|
||||
|
||||
3
dist/ubuntu/rules.in
vendored
3
dist/ubuntu/rules.in
vendored
@@ -40,7 +40,8 @@ override_dh_auto_install:
|
||||
cp -r $(CURDIR)/licenses $(DOC)
|
||||
|
||||
mkdir -p $(SCRIPTS) && \
|
||||
cp $(CURDIR)/seastar/dpdk/tools/dpdk_nic_bind.py $(SCRIPTS)
|
||||
cp $(CURDIR)/seastar/scripts/dpdk_nic_bind.py $(SCRIPTS)
|
||||
cp $(CURDIR)/seastar/scripts/posix_net_conf.sh $(SCRIPTS)
|
||||
cp $(CURDIR)/dist/common/scripts/* $(SCRIPTS)
|
||||
cp $(CURDIR)/dist/ubuntu/scripts/* $(SCRIPTS)
|
||||
|
||||
|
||||
@@ -409,8 +409,14 @@ future<> gossiper::apply_state_locally(const std::map<inet_address, endpoint_sta
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::remove_endpoint(inet_address endpoint) {
|
||||
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
|
||||
_subscribers.for_each([endpoint] (auto& subscriber) {
|
||||
subscriber->on_remove(endpoint);
|
||||
// We can not run on_remove callbacks here becasue on_remove in
|
||||
// storage_service might take the gossiper::timer_callback_lock
|
||||
seastar::async([this, endpoint] {
|
||||
_subscribers.for_each([endpoint] (auto& subscriber) {
|
||||
subscriber->on_remove(endpoint);
|
||||
});
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to call on_remove callback: {}", ep);
|
||||
});
|
||||
|
||||
if(_seeds.count(endpoint)) {
|
||||
|
||||
10
init.cc
10
init.cc
@@ -24,7 +24,6 @@
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "to_string.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
@@ -34,14 +33,9 @@
|
||||
// until proper shutdown is done.
|
||||
|
||||
future<> init_storage_service(distributed<database>& db) {
|
||||
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
|
||||
return service::init_storage_service(db).then([] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
|
||||
}).then([&db] {
|
||||
return service::init_storage_service(db).then([] {
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -168,12 +168,12 @@ abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
|
||||
if (wrap) {
|
||||
auto split_ranges = r.unwrap();
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(ep, std::move(split_ranges.first));
|
||||
ret.emplace(ep, std::move(split_ranges.second));
|
||||
ret.emplace(ep, split_ranges.first);
|
||||
ret.emplace(ep, split_ranges.second);
|
||||
}
|
||||
} else {
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(ep, std::move(r));
|
||||
ret.emplace(ep, r);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -190,12 +190,12 @@ abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
|
||||
if (wrap) {
|
||||
auto split_ranges = r.unwrap();
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(std::move(split_ranges.first), ep);
|
||||
ret.emplace(std::move(split_ranges.second), ep);
|
||||
ret.emplace(split_ranges.first, ep);
|
||||
ret.emplace(split_ranges.second, ep);
|
||||
}
|
||||
} else {
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(std::move(r), ep);
|
||||
ret.emplace(r, ep);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +398,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
|
||||
// all leaving nodes are gone.
|
||||
auto metadata = clone_only_token_map(); // don't do this in the loop! #7758
|
||||
for (const auto& r : affected_ranges) {
|
||||
auto t = r.end()->value();
|
||||
auto t = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
|
||||
auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata);
|
||||
std::vector<inet_address> diff;
|
||||
|
||||
2
log.cc
2
log.cc
@@ -246,6 +246,8 @@ std::ostream& operator<<(std::ostream& out, const std::exception_ptr& eptr) {
|
||||
out << " (error " << e.code() << ", " << e.code().message() << ")";
|
||||
} catch(const std::exception& e) {
|
||||
out << " (" << e.what() << ")";
|
||||
} catch(...) {
|
||||
// no extra info
|
||||
}
|
||||
}
|
||||
return out;
|
||||
|
||||
408
main.cc
408
main.cc
@@ -221,7 +221,19 @@ verify_rlimit(bool developer_mode) {
|
||||
}
|
||||
}
|
||||
|
||||
static bool cpu_sanity() {
|
||||
if (!__builtin_cpu_supports("sse4.2")) {
|
||||
std::cerr << "Scylla requires a processor with SSE 4.2 support\n";
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
int main(int ac, char** av) {
|
||||
// early check to avoid triggering
|
||||
if (!cpu_sanity()) {
|
||||
_exit(71);
|
||||
}
|
||||
runtime::init_uptime();
|
||||
std::setvbuf(stdout, nullptr, _IOLBF, 1000);
|
||||
app_template app;
|
||||
@@ -264,7 +276,8 @@ int main(int ac, char** av) {
|
||||
engine().set_strict_dma(false);
|
||||
}
|
||||
|
||||
return read_config(opts, *cfg).then([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs]() {
|
||||
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs] {
|
||||
read_config(opts, *cfg).get();
|
||||
apply_logger_settings(cfg->default_log_level(), cfg->logger_log_level(),
|
||||
cfg->log_to_stdout(), cfg->log_to_syslog());
|
||||
verify_rlimit(cfg->developer_mode());
|
||||
@@ -320,230 +333,203 @@ int main(int ac, char** av) {
|
||||
using namespace locator;
|
||||
// Re-apply strict-dma after we've read the config file, this time
|
||||
// to all reactors
|
||||
return parallel_for_each(boost::irange(0u, smp::count), [devmode = opts.count("developer-mode")] (unsigned cpu) {
|
||||
smp::invoke_on_all([devmode = opts.count("developer-mode")] {
|
||||
if (devmode) {
|
||||
engine().set_strict_dma(false);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).then([cfg] {
|
||||
supervisor_notify("creating snitch");
|
||||
return i_endpoint_snitch::create_snitch(cfg->endpoint_snitch());
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
|
||||
}).then([api_address] {
|
||||
supervisor_notify("determining DNS name");
|
||||
return dns::gethostbyname(api_address);
|
||||
}).then([&db, api_address, api_port, &ctx] (dns::hostent e){
|
||||
supervisor_notify("starting API server");
|
||||
auto ip = e.addresses[0].in.s_addr;
|
||||
return ctx.http_server.start().then([api_address, api_port, ip, &ctx] {
|
||||
return api::set_server_init(ctx);
|
||||
}).then([api_address, api_port, ip, &ctx] {
|
||||
return ctx.http_server.listen(ipv4_addr{ip, api_port});
|
||||
}).then([api_address, api_port] {
|
||||
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
|
||||
}).get();
|
||||
supervisor_notify("creating snitch");
|
||||
i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
|
||||
supervisor_notify("determining DNS name");
|
||||
dns::hostent e = dns::gethostbyname(api_address).get0();
|
||||
supervisor_notify("starting API server");
|
||||
auto ip = e.addresses[0].in.s_addr;
|
||||
ctx.http_server.start().get();
|
||||
api::set_server_init(ctx).get();
|
||||
ctx.http_server.listen(ipv4_addr{ip, api_port}).get();
|
||||
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
|
||||
supervisor_notify("initializing storage service");
|
||||
init_storage_service(db).get();
|
||||
api::set_server_storage_service(ctx).get();
|
||||
supervisor_notify("starting per-shard database core");
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
db.start(std::ref(*cfg)).get();
|
||||
engine().at_exit([&db] {
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
//return db.stop();
|
||||
// call stop on each db instance, but leave the shareded<database> pointers alive.
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.stop();
|
||||
}).then([] {
|
||||
return sstables::await_background_jobs_on_all_shards();
|
||||
}).then([] {
|
||||
::_exit(0);
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("initializing storage service");
|
||||
return init_storage_service(db);
|
||||
}).then([&ctx] {
|
||||
return api::set_server_storage_service(ctx);
|
||||
}).then([&db, cfg] {
|
||||
});
|
||||
supervisor_notify("creating data directories");
|
||||
dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
|
||||
supervisor_notify("creating commitlog directory");
|
||||
dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
|
||||
supervisor_notify("verifying data and commitlog directories");
|
||||
std::unordered_set<sstring> directories;
|
||||
directories.insert(db.local().get_config().data_file_directories().cbegin(),
|
||||
db.local().get_config().data_file_directories().cend());
|
||||
directories.insert(db.local().get_config().commitlog_directory());
|
||||
parallel_for_each(directories, [&db] (sstring pathname) {
|
||||
return disk_sanity(pathname, db.local().get_config().developer_mode());
|
||||
}).get();
|
||||
|
||||
supervisor_notify("starting per-shard database core");
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
return db.start(std::ref(*cfg)).then([&db] {
|
||||
engine().at_exit([&db] {
|
||||
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
//return db.stop();
|
||||
// call stop on each db instance, but leave the shareded<database> pointers alive.
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.stop();
|
||||
}).then([] {
|
||||
return sstables::await_background_jobs_on_all_shards();
|
||||
}).then([] {
|
||||
::_exit(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([cfg, listen_address] {
|
||||
supervisor_notify("starting gossip");
|
||||
// Moved local parameters here, esp since with the
|
||||
// ssl stuff it gets to be a lot.
|
||||
uint16_t storage_port = cfg->storage_port();
|
||||
uint16_t ssl_storage_port = cfg->ssl_storage_port();
|
||||
double phi = cfg->phi_convict_threshold();
|
||||
auto seed_provider= cfg->seed_provider();
|
||||
sstring cluster_name = cfg->cluster_name();
|
||||
|
||||
const auto& ssl_opts = cfg->server_encryption_options();
|
||||
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
|
||||
auto trust_store = get_or_default(ssl_opts, "truststore");
|
||||
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
|
||||
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
|
||||
|
||||
return init_ms_fd_gossiper(listen_address
|
||||
, storage_port
|
||||
, ssl_storage_port
|
||||
, encrypt_what
|
||||
, trust_store
|
||||
, cert
|
||||
, key
|
||||
, seed_provider
|
||||
, cluster_name
|
||||
, phi);
|
||||
}).then([&ctx] {
|
||||
return api::set_server_gossip(ctx);
|
||||
}).then([&db] {
|
||||
supervisor_notify("starting streaming service");
|
||||
return streaming::stream_session::init_streaming_service(db);
|
||||
}).then([&ctx] {
|
||||
return api::set_server_stream_manager(ctx);
|
||||
}).then([&db] {
|
||||
supervisor_notify("starting messaging service");
|
||||
// Start handling REPAIR_CHECKSUM_RANGE messages
|
||||
return net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
|
||||
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
|
||||
return do_with(std::move(keyspace), std::move(cf), std::move(range),
|
||||
[&db] (auto& keyspace, auto& cf, auto& range) {
|
||||
return checksum_range(db, keyspace, cf, range);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([&ctx](){
|
||||
return api::set_server_messaging_service(ctx);
|
||||
}).then([&proxy, &db] {
|
||||
supervisor_notify("starting storage proxy");
|
||||
return proxy.start(std::ref(db)).then([&proxy] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&proxy] { return proxy.stop(); });
|
||||
});
|
||||
}).then([&ctx]() {
|
||||
return api::set_server_storage_proxy(ctx);
|
||||
}).then([&mm] {
|
||||
supervisor_notify("starting migration manager");
|
||||
return mm.start().then([&mm] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&mm] { return mm.stop(); });
|
||||
});
|
||||
}).then([&db, &proxy, &qp] {
|
||||
supervisor_notify("starting query processor");
|
||||
return qp.start(std::ref(proxy), std::ref(db)).then([&qp] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
});
|
||||
}).then([&qp] {
|
||||
supervisor_notify("initializing batchlog manager");
|
||||
return db::get_batchlog_manager().start(std::ref(qp)).then([] {
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
|
||||
});
|
||||
}).then([&db, &dirs] {
|
||||
supervisor_notify("creating data directories");
|
||||
return dirs.touch_and_lock(db.local().get_config().data_file_directories());
|
||||
}).then([&db, &dirs] {
|
||||
supervisor_notify("creating commitlog directory");
|
||||
return dirs.touch_and_lock(db.local().get_config().commitlog_directory());
|
||||
}).then([&db] {
|
||||
supervisor_notify("verifying data and commitlog directories");
|
||||
std::unordered_set<sstring> directories;
|
||||
directories.insert(db.local().get_config().data_file_directories().cbegin(),
|
||||
db.local().get_config().data_file_directories().cend());
|
||||
directories.insert(db.local().get_config().commitlog_directory());
|
||||
return do_with(std::move(directories), [&db] (auto& directories) {
|
||||
return parallel_for_each(directories, [&db] (sstring pathname) {
|
||||
return disk_sanity(pathname, db.local().get_config().developer_mode());
|
||||
});
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("loading sstables");
|
||||
return db.invoke_on_all([] (database& db) {
|
||||
return db.init_system_keyspace();
|
||||
}).then([&db] {
|
||||
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
|
||||
auto cfm = pair.second;
|
||||
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
|
||||
});
|
||||
});
|
||||
}).then([&db, &proxy] {
|
||||
supervisor_notify("loading sstables");
|
||||
return db.invoke_on_all([&proxy] (database& db) {
|
||||
return db.load_sstables(proxy);
|
||||
});
|
||||
}).then([&ctx] {
|
||||
return api::set_server_load_sstable(ctx);
|
||||
}).then([&db, &qp] {
|
||||
supervisor_notify("setting up system keyspace");
|
||||
return db::system_keyspace::setup(db, qp);
|
||||
}).then([&db, &qp] {
|
||||
supervisor_notify("starting commit log");
|
||||
auto cl = db.local().commitlog();
|
||||
if (cl == nullptr) {
|
||||
// Deletion of previous stale, temporary SSTables is done by Shard0. Therefore,
|
||||
// let's run Shard0 first. Technically, we could just have all shards agree on
|
||||
// the deletion and just delete it later, but that is prone to races.
|
||||
//
|
||||
// Those races are not supposed to happen during normal operation, but if we have
|
||||
// bugs, they can. Scylla's Github Issue #1014 is an example of a situation where
|
||||
// that can happen, making existing problems worse. So running a single shard first
|
||||
// and getting making sure that all temporary tables are deleted provides extra
|
||||
// protection against such situations.
|
||||
db.invoke_on(0, [] (database& db) { return db.init_system_keyspace(); }).get();
|
||||
db.invoke_on_all([] (database& db) {
|
||||
if (engine().cpu_id() == 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return cl->list_existing_segments().then([&db, &qp](auto paths) {
|
||||
if (paths.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return db.init_system_keyspace();
|
||||
}).get();
|
||||
supervisor_notify("starting gossip");
|
||||
// Moved local parameters here, esp since with the
|
||||
// ssl stuff it gets to be a lot.
|
||||
uint16_t storage_port = cfg->storage_port();
|
||||
uint16_t ssl_storage_port = cfg->ssl_storage_port();
|
||||
double phi = cfg->phi_convict_threshold();
|
||||
auto seed_provider= cfg->seed_provider();
|
||||
sstring cluster_name = cfg->cluster_name();
|
||||
|
||||
const auto& ssl_opts = cfg->server_encryption_options();
|
||||
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
|
||||
auto trust_store = get_or_default(ssl_opts, "truststore");
|
||||
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
|
||||
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
|
||||
|
||||
init_ms_fd_gossiper(listen_address
|
||||
, storage_port
|
||||
, ssl_storage_port
|
||||
, encrypt_what
|
||||
, trust_store
|
||||
, cert
|
||||
, key
|
||||
, seed_provider
|
||||
, cluster_name
|
||||
, phi).get();
|
||||
api::set_server_gossip(ctx).get();
|
||||
supervisor_notify("starting messaging service");
|
||||
api::set_server_messaging_service(ctx).get();
|
||||
supervisor_notify("starting storage proxy");
|
||||
proxy.start(std::ref(db)).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&proxy] { return proxy.stop(); });
|
||||
api::set_server_storage_proxy(ctx).get();
|
||||
supervisor_notify("starting migration manager");
|
||||
mm.start().get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&mm] { return mm.stop(); });
|
||||
supervisor_notify("starting query processor");
|
||||
qp.start(std::ref(proxy), std::ref(db)).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
supervisor_notify("initializing batchlog manager");
|
||||
db::get_batchlog_manager().start(std::ref(qp)).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
|
||||
supervisor_notify("loading sstables");
|
||||
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
|
||||
parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
|
||||
auto cfm = pair.second;
|
||||
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
|
||||
}).get();
|
||||
supervisor_notify("loading sstables");
|
||||
// See comment on top of our call to init_system_keyspace as per why we invoke
|
||||
// on Shard0 first. Scylla's Github Issue #1014 for details
|
||||
db.invoke_on(0, [&proxy] (database& db) { return db.load_sstables(proxy); }).get();
|
||||
db.invoke_on_all([&proxy] (database& db) {
|
||||
if (engine().cpu_id() == 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return db.load_sstables(proxy);
|
||||
}).get();
|
||||
api::set_server_load_sstable(ctx).get();
|
||||
supervisor_notify("setting up system keyspace");
|
||||
db::system_keyspace::setup(db, qp).get();
|
||||
supervisor_notify("starting commit log");
|
||||
auto cl = db.local().commitlog();
|
||||
if (cl != nullptr) {
|
||||
auto paths = cl->list_existing_segments().get0();
|
||||
if (!paths.empty()) {
|
||||
supervisor_notify("replaying commit log");
|
||||
return db::commitlog_replayer::create_replayer(qp).then([paths](auto rp) {
|
||||
return do_with(std::move(rp), [paths = std::move(paths)](auto& rp) {
|
||||
return rp.recover(paths);
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("replaying commit log - flushing memtables");
|
||||
return db.invoke_on_all([] (database& db) {
|
||||
return db.flush_all_memtables();
|
||||
});
|
||||
}).then([paths] {
|
||||
supervisor_notify("replaying commit log - removing old commitlog segments");
|
||||
for (auto& path : paths) {
|
||||
::unlink(path.c_str());
|
||||
}
|
||||
auto rp = db::commitlog_replayer::create_replayer(qp).get0();
|
||||
rp.recover(paths).get();
|
||||
supervisor_notify("replaying commit log - flushing memtables");
|
||||
db.invoke_on_all([] (database& db) {
|
||||
return db.flush_all_memtables();
|
||||
}).get();
|
||||
supervisor_notify("replaying commit log - removing old commitlog segments");
|
||||
for (auto& path : paths) {
|
||||
::unlink(path.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
supervisor_notify("initializing migration manager RPC verbs");
|
||||
service::get_migration_manager().invoke_on_all([] (auto& mm) {
|
||||
mm.init_messaging_service();
|
||||
}).get();
|
||||
supervisor_notify("initializing storage proxy RPC verbs");
|
||||
proxy.invoke_on_all([] (service::storage_proxy& p) {
|
||||
p.init_messaging_service();
|
||||
}).get();
|
||||
supervisor_notify("starting streaming service");
|
||||
streaming::stream_session::init_streaming_service(db).get();
|
||||
api::set_server_stream_manager(ctx).get();
|
||||
// Start handling REPAIR_CHECKSUM_RANGE messages
|
||||
net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
|
||||
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
|
||||
return do_with(std::move(keyspace), std::move(cf), std::move(range),
|
||||
[&db] (auto& keyspace, auto& cf, auto& range) {
|
||||
return checksum_range(db, keyspace, cf, range);
|
||||
});
|
||||
});
|
||||
}).then([] {
|
||||
supervisor_notify("starting storage service");
|
||||
auto& ss = service::get_local_storage_service();
|
||||
return ss.init_server();
|
||||
}).then([&ctx] {
|
||||
return api::set_server_storage_service(ctx);
|
||||
}).then([] {
|
||||
supervisor_notify("starting batchlog manager");
|
||||
return db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
|
||||
return b.start();
|
||||
});
|
||||
}).then([&db] {
|
||||
supervisor_notify("starting load broadcaster");
|
||||
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
|
||||
// casting to std::function<> will fail to compile
|
||||
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
|
||||
lb->start_broadcasting();
|
||||
service::get_local_storage_service().set_load_broadcaster(lb);
|
||||
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
|
||||
}).then([] {
|
||||
return gms::get_local_gossiper().wait_for_gossip_to_settle();
|
||||
}).then([&ctx] {
|
||||
return api::set_server_gossip_settle(ctx);
|
||||
}).then([start_thrift] () {
|
||||
supervisor_notify("starting native transport");
|
||||
return service::get_local_storage_service().start_native_transport().then([start_thrift] () {
|
||||
if (start_thrift) {
|
||||
return service::get_local_storage_service().start_rpc_server();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([&ctx] {
|
||||
return api::set_server_done(ctx);
|
||||
});
|
||||
}).then([] {
|
||||
supervisor_notify("serving", true);
|
||||
}).get();
|
||||
supervisor_notify("starting storage service", true);
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.init_server().get();
|
||||
api::set_server_storage_service(ctx).get();
|
||||
supervisor_notify("starting batchlog manager");
|
||||
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
|
||||
return b.start();
|
||||
}).get();
|
||||
supervisor_notify("starting load broadcaster");
|
||||
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
|
||||
// casting to std::function<> will fail to compile
|
||||
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
|
||||
lb->start_broadcasting();
|
||||
service::get_local_storage_service().set_load_broadcaster(lb);
|
||||
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
|
||||
gms::get_local_gossiper().wait_for_gossip_to_settle().get();
|
||||
api::set_server_gossip_settle(ctx).get();
|
||||
supervisor_notify("starting native transport");
|
||||
service::get_local_storage_service().start_native_transport().get();
|
||||
if (start_thrift) {
|
||||
service::get_local_storage_service().start_rpc_server().get();
|
||||
}
|
||||
api::set_server_done(ctx).get();
|
||||
supervisor_notify("serving");
|
||||
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
|
||||
engine().at_exit([] {
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
});
|
||||
engine().at_exit([] {
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
});
|
||||
}).or_terminate();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -112,11 +112,11 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
|
||||
class messaging_service::rpc_protocol_client_wrapper {
|
||||
std::unique_ptr<rpc_protocol::client> _p;
|
||||
public:
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local = ipv4_addr())
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, addr, local)) {
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr())
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
|
||||
}
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, addr, seastar::tls::connect(c, addr, local)))
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, seastar::tls::connect(c, addr, local)))
|
||||
{}
|
||||
auto get_stats() const { return _p->get_stats(); }
|
||||
future<> stop() { return _p->stop(); }
|
||||
@@ -390,10 +390,14 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
auto remote_addr = ipv4_addr(get_preferred_ip(id.addr).raw_addr(), must_encrypt ? _ssl_port : _port);
|
||||
auto local_addr = ipv4_addr{_listen_address.raw_addr(), 0};
|
||||
|
||||
rpc::client_options opts;
|
||||
// send keepalive messages each minute if connection is idle, drop connection after 10 failures
|
||||
opts.keepalive = std::experimental::optional<net::tcp_keepalive_params>({60s, 60s, 10});
|
||||
|
||||
auto client = must_encrypt ?
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
|
||||
remote_addr, local_addr, _credentials) :
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
|
||||
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
|
||||
remote_addr, local_addr);
|
||||
|
||||
it = _clients[idx].emplace(id, shard_info(std::move(client))).first;
|
||||
|
||||
@@ -203,9 +203,14 @@ private:
|
||||
// Successfully-finished repairs are those with id < _next_repair_command
|
||||
// but aren't listed as running or failed the status map.
|
||||
std::unordered_map<int, repair_status> _status;
|
||||
// Variables used to allow shutting down all repair in progress
|
||||
bool _in_shutdown = false;
|
||||
promise<> _shutdown_done;
|
||||
int _repairs_running = 0;
|
||||
public:
|
||||
void start(int id) {
|
||||
_status[id] = repair_status::RUNNING;
|
||||
++_repairs_running;
|
||||
}
|
||||
void done(int id, bool succeeded) {
|
||||
if (succeeded) {
|
||||
@@ -213,6 +218,9 @@ public:
|
||||
} else {
|
||||
_status[id] = repair_status::FAILED;
|
||||
}
|
||||
if (--_repairs_running == 0 && _in_shutdown) {
|
||||
_shutdown_done.set_value();
|
||||
}
|
||||
}
|
||||
repair_status get(int id) {
|
||||
if (id >= _next_repair_command) {
|
||||
@@ -228,8 +236,28 @@ public:
|
||||
int next_repair_command() {
|
||||
return _next_repair_command++;
|
||||
}
|
||||
future<> shutdown() {
|
||||
assert(!_in_shutdown);
|
||||
_in_shutdown = true;
|
||||
if (_repairs_running == 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _shutdown_done.get_future();
|
||||
}
|
||||
bool in_shutdown() {
|
||||
return _in_shutdown;
|
||||
}
|
||||
} repair_tracker;
|
||||
|
||||
static void check_in_shutdown() {
|
||||
// Only call this from the single CPU managing the repair - the only CPU
|
||||
// which is allowed to use repair_tracker.
|
||||
assert(engine().cpu_id() == 0);
|
||||
if (repair_tracker.in_shutdown()) {
|
||||
throw repair_stopped_exception();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
partition_checksum::partition_checksum(const mutation& m) {
|
||||
auto frozen = freeze(m);
|
||||
@@ -445,6 +473,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
[&db, &neighbors, parallelism] (auto& sem, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) {
|
||||
return do_for_each(ranges, [&sem, &success, &db, &neighbors, &keyspace, &cf]
|
||||
(const auto& range) {
|
||||
check_in_shutdown();
|
||||
return sem.wait(1).then([&sem, &success, &db, &neighbors, &keyspace, &cf, &range] {
|
||||
// Ask this node, and all neighbors, to calculate checksums in
|
||||
// this range. When all are done, compare the results, and if
|
||||
@@ -713,6 +742,7 @@ static future<> repair_ranges(seastar::sharded<database>& db, sstring keyspace,
|
||||
// repair all the ranges in sequence
|
||||
return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) {
|
||||
#endif
|
||||
check_in_shutdown();
|
||||
return repair_range(db, keyspace, range, cfs, data_centers, hosts);
|
||||
}).then([id] {
|
||||
logger.info("repair {} completed sucessfully", id);
|
||||
@@ -731,6 +761,7 @@ static future<> repair_ranges(seastar::sharded<database>& db, sstring keyspace,
|
||||
// itself does very little (mainly tell other nodes and CPUs what to do).
|
||||
static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
|
||||
std::unordered_map<sstring, sstring> options_map) {
|
||||
check_in_shutdown();
|
||||
|
||||
repair_options options(options_map);
|
||||
|
||||
@@ -806,3 +837,12 @@ future<repair_status> repair_get_status(seastar::sharded<database>& db, int id)
|
||||
return repair_tracker.get(id);
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_shutdown(seastar::sharded<database>& db) {
|
||||
logger.info("Starting shutdown of repair");
|
||||
return db.invoke_on(0, [] (database& localdb) {
|
||||
return repair_tracker.shutdown().then([] {
|
||||
logger.info("Completed shutdown of repair");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -40,6 +40,11 @@ public:
|
||||
virtual const char* what() const noexcept override { return _what.c_str(); }
|
||||
};
|
||||
|
||||
class repair_stopped_exception : public repair_exception {
|
||||
public:
|
||||
repair_stopped_exception() : repair_exception("Repair stopped") { }
|
||||
};
|
||||
|
||||
// NOTE: repair_start() can be run on any node, but starts a node-global
|
||||
// operation.
|
||||
// repair_start() starts the requested repair on this node. It returns an
|
||||
@@ -58,6 +63,13 @@ enum class repair_status { RUNNING, SUCCESSFUL, FAILED };
|
||||
// different CPU (cpu 0) and that might be a deferring operation.
|
||||
future<repair_status> repair_get_status(seastar::sharded<database>& db, int id);
|
||||
|
||||
// repair_shutdown() stops all ongoing repairs started on this node (and
|
||||
// prevents any further repairs from being started). It returns a future
|
||||
// saying when all repairs have stopped, and attempts to stop them as
|
||||
// quickly as possible (we do not wait for repairs to finish but rather
|
||||
// stop them abruptly).
|
||||
future<> repair_shutdown(seastar::sharded<database>& db);
|
||||
|
||||
// The class partition_checksum calculates a 256-bit cryptographically-secure
|
||||
// checksum of a set of partitions fed to it. The checksum of a partition set
|
||||
// is calculated by calculating a strong hash function (SHA-256) of each
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 353b1a1481...e039c46e8b
@@ -66,9 +66,43 @@ migration_manager::migration_manager()
|
||||
|
||||
future<> migration_manager::stop()
|
||||
{
|
||||
if (ms_inited) {
|
||||
uninit_messaging_service();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void migration_manager::init_messaging_service()
|
||||
{
|
||||
ms_inited = true;
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
|
||||
auto src = net::messaging_service::get_source(cinfo);
|
||||
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
}).then_wrapped([src] (auto&& f) {
|
||||
if (f.failed()) {
|
||||
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
|
||||
} else {
|
||||
logger.debug("Applied definitions update from {}.", src);
|
||||
}
|
||||
});
|
||||
return net::messaging_service::no_wait();
|
||||
});
|
||||
ms.register_migration_request([this] () {
|
||||
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
|
||||
// keep local proxy alive
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void migration_manager::uninit_messaging_service()
|
||||
{
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.unregister_migration_request();
|
||||
ms.unregister_definitions_update();
|
||||
}
|
||||
|
||||
void migration_manager::register_listener(migration_listener* listener)
|
||||
{
|
||||
_listeners.emplace_back(listener);
|
||||
|
||||
@@ -52,10 +52,12 @@
|
||||
|
||||
namespace service {
|
||||
|
||||
class migration_manager {
|
||||
class migration_manager : public seastar::async_sharded_service<migration_manager> {
|
||||
std::vector<migration_listener*> _listeners;
|
||||
|
||||
static const std::chrono::milliseconds migration_delay;
|
||||
|
||||
bool ms_inited = false;
|
||||
public:
|
||||
migration_manager();
|
||||
|
||||
@@ -118,6 +120,10 @@ public:
|
||||
future<> stop();
|
||||
|
||||
bool is_ready_for_bootstrap();
|
||||
|
||||
void init_messaging_service();
|
||||
private:
|
||||
void uninit_messaging_service();
|
||||
};
|
||||
|
||||
extern distributed<migration_manager> _the_migration_manager;
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "database.hh"
|
||||
#include <seastar/core/sleep.hh>
|
||||
|
||||
namespace service {
|
||||
|
||||
distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
|
||||
|
||||
void pending_range_calculator_service::run() {
|
||||
// long start = System.currentTimeMillis();
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
calculate_pending_ranges(ks.get_replication_strategy(), keyspace_name);
|
||||
}
|
||||
_update_jobs--;
|
||||
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
|
||||
}
|
||||
|
||||
void pending_range_calculator_service::calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name) {
|
||||
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
|
||||
}
|
||||
|
||||
future<> pending_range_calculator_service::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> pending_range_calculator_service::update() {
|
||||
return smp::submit_to(0, [] {
|
||||
get_local_pending_range_calculator_service().do_update();
|
||||
});
|
||||
}
|
||||
|
||||
void pending_range_calculator_service::do_update() {
|
||||
assert(engine().cpu_id() == 0);
|
||||
get_local_pending_range_calculator_service()._update_jobs++;
|
||||
get_local_pending_range_calculator_service().run();
|
||||
}
|
||||
|
||||
future<> pending_range_calculator_service::block_until_finished() {
|
||||
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
|
||||
return smp::submit_to(0, [] {
|
||||
return do_until(
|
||||
[] { return !(get_local_pending_range_calculator_service()._update_jobs > 0); },
|
||||
[] { return sleep(std::chrono::milliseconds(100)); });
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,70 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "database.hh"
|
||||
#include <seastar/core/sharded.hh>
|
||||
|
||||
namespace service {
|
||||
|
||||
class pending_range_calculator_service {
|
||||
private:
|
||||
int _update_jobs{0};
|
||||
distributed<database>& _db;
|
||||
void calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name);
|
||||
void run();
|
||||
public:
|
||||
pending_range_calculator_service(distributed<database>& db) : _db(db) {}
|
||||
void do_update();
|
||||
future<> update();
|
||||
future<> block_until_finished();
|
||||
future<> stop();
|
||||
};
|
||||
|
||||
extern distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
|
||||
|
||||
inline distributed<pending_range_calculator_service>& get_pending_range_calculator_service() {
|
||||
return _the_pending_range_calculator_service;
|
||||
}
|
||||
|
||||
inline pending_range_calculator_service& get_local_pending_range_calculator_service() {
|
||||
return _the_pending_range_calculator_service.local();
|
||||
}
|
||||
|
||||
} // service
|
||||
@@ -206,12 +206,20 @@ class datacenter_write_response_handler : public abstract_write_response_handler
|
||||
}
|
||||
}
|
||||
public:
|
||||
using abstract_write_response_handler::abstract_write_response_handler;
|
||||
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
|
||||
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
|
||||
std::move(targets), boost::range::count_if(pending_endpoints, db::is_local), std::move(dead_endpoints)) {}
|
||||
};
|
||||
|
||||
class write_response_handler : public abstract_write_response_handler {
|
||||
public:
|
||||
using abstract_write_response_handler::abstract_write_response_handler;
|
||||
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
|
||||
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
|
||||
std::move(targets), pending_endpoints.size(), std::move(dead_endpoints)) {}
|
||||
};
|
||||
|
||||
class datacenter_sync_write_response_handler : public abstract_write_response_handler {
|
||||
@@ -229,16 +237,20 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
|
||||
public:
|
||||
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s,
|
||||
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, size_t pending_endpoints,
|
||||
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address> pending_endpoints,
|
||||
std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, pending_endpoints, dead_endpoints) {
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, 0, dead_endpoints) {
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
|
||||
for (auto& target : targets) {
|
||||
auto dc = snitch_ptr->get_datacenter(target);
|
||||
|
||||
if (_dc_responses.find(dc) == _dc_responses.end()) {
|
||||
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc));
|
||||
auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (gms::inet_address& ep){
|
||||
return snitch_ptr->get_datacenter(ep) == dc;
|
||||
});
|
||||
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc) + pending_for_dc).first;
|
||||
_pending_endpoints += pending_for_dc;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -317,24 +329,21 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(sch
|
||||
{
|
||||
std::unique_ptr<abstract_write_response_handler> h;
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
size_t pending_count = pending_endpoints.size();
|
||||
|
||||
auto m = make_lw_shared<const frozen_mutation>(std::move(mutation));
|
||||
|
||||
if (db::is_datacenter_local(cl)) {
|
||||
pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local);
|
||||
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
|
||||
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
|
||||
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
|
||||
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
} else {
|
||||
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
|
||||
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
}
|
||||
return register_response_handler(std::move(h));
|
||||
}
|
||||
|
||||
storage_proxy::~storage_proxy() {}
|
||||
storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
|
||||
init_messaging_service();
|
||||
_collectd_registrations = std::make_unique<scollectd::registrations>(scollectd::registrations({
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("storage_proxy"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
@@ -1390,11 +1399,6 @@ future<> storage_proxy::schedule_repair(std::unordered_map<gms::inet_address, st
|
||||
}).finally([p = shared_from_this()] {});
|
||||
}
|
||||
|
||||
class digest_mismatch_exception : public std::runtime_error {
|
||||
public:
|
||||
digest_mismatch_exception() : std::runtime_error("Digest mismatch") {}
|
||||
};
|
||||
|
||||
class abstract_read_resolver {
|
||||
protected:
|
||||
db::consistency_level _cl;
|
||||
@@ -1442,37 +1446,31 @@ public:
|
||||
class digest_read_resolver : public abstract_read_resolver {
|
||||
size_t _block_for;
|
||||
size_t _cl_responses = 0;
|
||||
promise<> _cl_promise; // cl is reached
|
||||
promise<foreign_ptr<lw_shared_ptr<query::result>>, bool> _cl_promise; // cl is reached
|
||||
bool _cl_reported = false;
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _data_results;
|
||||
foreign_ptr<lw_shared_ptr<query::result>> _data_result;
|
||||
std::vector<query::result_digest> _digest_results;
|
||||
|
||||
virtual void on_timeout() override {
|
||||
if (_cl_responses < _block_for) {
|
||||
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_results.size() != 0));
|
||||
if (!_cl_reported) {
|
||||
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_result));
|
||||
}
|
||||
// we will not need them any more
|
||||
_data_results.clear();
|
||||
_data_result = foreign_ptr<lw_shared_ptr<query::result>>();
|
||||
_digest_results.clear();
|
||||
}
|
||||
virtual size_t response_count() const override {
|
||||
return _digest_results.size();
|
||||
}
|
||||
bool digests_match() const {
|
||||
assert(response_count());
|
||||
if (response_count() == 1) {
|
||||
return true;
|
||||
}
|
||||
auto& first = *_digest_results.begin();
|
||||
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
|
||||
}
|
||||
public:
|
||||
digest_read_resolver(db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(cl, 0, timeout), _block_for(block_for) {}
|
||||
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
|
||||
if (!_timedout) {
|
||||
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
|
||||
_digest_results.emplace_back(_targets_count == 1 ? query::result_digest() : result->digest());
|
||||
_data_results.emplace_back(std::move(result));
|
||||
if (!_data_result) {
|
||||
_data_result = std::move(result);
|
||||
}
|
||||
got_response(from);
|
||||
}
|
||||
}
|
||||
@@ -1482,12 +1480,13 @@ public:
|
||||
got_response(from);
|
||||
}
|
||||
}
|
||||
foreign_ptr<lw_shared_ptr<query::result>> resolve() {
|
||||
assert(_data_results.size());
|
||||
if (!digests_match()) {
|
||||
throw digest_mismatch_exception();
|
||||
bool digests_match() const {
|
||||
assert(response_count());
|
||||
if (response_count() == 1) {
|
||||
return true;
|
||||
}
|
||||
return std::move(*_data_results.begin());
|
||||
auto& first = *_digest_results.begin();
|
||||
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
|
||||
}
|
||||
bool waiting_for(gms::inet_address ep) {
|
||||
return db::is_datacenter_local(_cl) ? is_me(ep) || db::is_local(ep) : true;
|
||||
@@ -1497,9 +1496,9 @@ public:
|
||||
if (waiting_for(ep)) {
|
||||
_cl_responses++;
|
||||
}
|
||||
if (_cl_responses >= _block_for && _data_results.size()) {
|
||||
if (_cl_responses >= _block_for && _data_result) {
|
||||
_cl_reported = true;
|
||||
_cl_promise.set_value();
|
||||
_cl_promise.set_value(std::move(_data_result), digests_match());
|
||||
}
|
||||
}
|
||||
if (is_completed()) {
|
||||
@@ -1507,11 +1506,11 @@ public:
|
||||
_done_promise.set_value();
|
||||
}
|
||||
}
|
||||
future<> has_cl() {
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>, bool> has_cl() {
|
||||
return _cl_promise.get_future();
|
||||
}
|
||||
bool has_data() {
|
||||
return _data_results.size() != 0;
|
||||
return _data_result;
|
||||
}
|
||||
void add_wait_targets(size_t targets_count) {
|
||||
_targets_count += targets_count;
|
||||
@@ -1803,37 +1802,41 @@ public:
|
||||
// hold on to executor until all queries are complete
|
||||
});
|
||||
|
||||
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<> f) {
|
||||
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<foreign_ptr<lw_shared_ptr<query::result>>, bool> f) {
|
||||
try {
|
||||
exec->got_cl();
|
||||
f.get();
|
||||
exec->_result_promise.set_value(digest_resolver->resolve()); // can throw digest missmatch exception
|
||||
auto done = digest_resolver->done();
|
||||
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
|
||||
exec->_proxy->_stats.background_reads++;
|
||||
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
|
||||
try {
|
||||
f.get();
|
||||
digest_resolver->resolve();
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
} catch(digest_mismatch_exception& ex) {
|
||||
exec->_proxy->_stats.read_repair_repaired_background++;
|
||||
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_result_promise.get_future().then_wrapped([exec] (auto f) {
|
||||
f.ignore_ready_future(); // ignore any failures during background repair
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
});
|
||||
} catch(...) {
|
||||
// ignore all exception besides digest mismatch during background check
|
||||
}
|
||||
});
|
||||
} else {
|
||||
done.discard_result(); // no need for background check, discard done future explicitly
|
||||
|
||||
foreign_ptr<lw_shared_ptr<query::result>> result;
|
||||
bool digests_match;
|
||||
std::tie(result, digests_match) = f.get(); // can throw
|
||||
|
||||
if (digests_match) {
|
||||
exec->_result_promise.set_value(std::move(result));
|
||||
auto done = digest_resolver->done();
|
||||
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
|
||||
exec->_proxy->_stats.background_reads++;
|
||||
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
|
||||
if (f.failed()) {
|
||||
f.ignore_ready_future(); // ignore all exception besides digest mismatch during background check
|
||||
} else {
|
||||
if (!digest_resolver->digests_match()) {
|
||||
exec->_proxy->_stats.read_repair_repaired_background++;
|
||||
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_result_promise.get_future().then_wrapped([exec] (future<foreign_ptr<lw_shared_ptr<query::result>>> f) {
|
||||
f.ignore_ready_future(); // ignore any failures during background repair
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
});
|
||||
} else {
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} else { // digest missmatch
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_proxy->_stats.read_repair_repaired_blocking++;
|
||||
}
|
||||
} catch (digest_mismatch_exception& ex) {
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_proxy->_stats.read_repair_repaired_blocking++;
|
||||
} catch (read_timeout_exception& ex) {
|
||||
exec->_result_promise.set_exception(ex);
|
||||
}
|
||||
@@ -2677,24 +2680,6 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
|
||||
|
||||
void storage_proxy::init_messaging_service() {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.register_definitions_update([] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
|
||||
auto src = net::messaging_service::get_source(cinfo);
|
||||
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
}).then_wrapped([src] (auto&& f) {
|
||||
if (f.failed()) {
|
||||
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
|
||||
} else {
|
||||
logger.debug("Applied definitions update from {}.", src);
|
||||
}
|
||||
});
|
||||
return net::messaging_service::no_wait();
|
||||
});
|
||||
ms.register_migration_request([] () {
|
||||
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
|
||||
// keep local proxy alive
|
||||
});
|
||||
});
|
||||
ms.register_mutation([] (const rpc::client_info& cinfo, frozen_mutation in, std::vector<gms::inet_address> forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id) {
|
||||
return do_with(std::move(in), get_local_shared_storage_proxy(), [&cinfo, forward = std::move(forward), reply_to, shard, response_id] (const frozen_mutation& m, shared_ptr<storage_proxy>& p) {
|
||||
return when_all(
|
||||
@@ -2777,8 +2762,6 @@ void storage_proxy::init_messaging_service() {
|
||||
|
||||
void storage_proxy::uninit_messaging_service() {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
ms.unregister_definitions_update();
|
||||
ms.unregister_migration_request();
|
||||
ms.unregister_mutation();
|
||||
ms.unregister_mutation_done();
|
||||
ms.unregister_read_data();
|
||||
|
||||
@@ -121,7 +121,6 @@ private:
|
||||
std::uniform_real_distribution<> _read_repair_chance = std::uniform_real_distribution<>(0,1);
|
||||
std::unique_ptr<scollectd::registrations> _collectd_registrations;
|
||||
private:
|
||||
void init_messaging_service();
|
||||
void uninit_messaging_service();
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges, db::consistency_level cl);
|
||||
response_id_type register_response_handler(std::unique_ptr<abstract_write_response_handler>&& h);
|
||||
@@ -173,6 +172,8 @@ public:
|
||||
return _db;
|
||||
}
|
||||
|
||||
void init_messaging_service();
|
||||
|
||||
future<> mutate_locally(const mutation& m);
|
||||
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m);
|
||||
future<> mutate_locally(std::vector<mutation> mutations);
|
||||
|
||||
@@ -54,7 +54,6 @@
|
||||
#include "locator/local_strategy.hh"
|
||||
#include "version.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "streaming/stream_plan.hh"
|
||||
#include "streaming/stream_state.hh"
|
||||
#include "dht/range_streamer.hh"
|
||||
@@ -265,7 +264,7 @@ void storage_service::join_token_ring(int delay) {
|
||||
}
|
||||
set_mode(mode::JOINING, "schema complete, ready to bootstrap", true);
|
||||
set_mode(mode::JOINING, "waiting for pending range calculation", true);
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
block_until_update_pending_ranges_finished().get();
|
||||
set_mode(mode::JOINING, "calculation complete, ready to bootstrap", true);
|
||||
logger.debug("... got ring + schema info");
|
||||
|
||||
@@ -292,7 +291,7 @@ void storage_service::join_token_ring(int delay) {
|
||||
set_mode(mode::JOINING, "waiting for schema information to complete", true);
|
||||
sleep(std::chrono::seconds(1)).get();
|
||||
}
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
block_until_update_pending_ranges_finished().get();
|
||||
}
|
||||
logger.info("Checking bootstrapping/leaving/moving nodes: ok");
|
||||
|
||||
@@ -463,7 +462,7 @@ void storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
}
|
||||
|
||||
_token_metadata.add_bootstrap_tokens(tokens, endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
if (gossiper.uses_host_id(endpoint)) {
|
||||
@@ -561,7 +560,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
|
||||
// a race where natural endpoint was updated to contain node A, but A was
|
||||
// not yet removed from pending endpoints
|
||||
_token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint);
|
||||
get_local_pending_range_calculator_service().do_update();
|
||||
do_update_pending_ranges();
|
||||
|
||||
for (auto ep : endpoints_to_remove) {
|
||||
remove_endpoint(ep);
|
||||
@@ -608,7 +607,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
|
||||
}).get();
|
||||
}
|
||||
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
auto ver = _token_metadata.get_ring_version();
|
||||
for (auto& x : _token_metadata.get_token_to_endpoint()) {
|
||||
@@ -643,7 +642,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) {
|
||||
// at this point the endpoint is certainly a member with this token, so let's proceed
|
||||
// normally
|
||||
_token_metadata.add_leaving_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
@@ -660,7 +659,7 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector<sst
|
||||
auto token = dht::global_partitioner().from_sstring(pieces[1]);
|
||||
logger.debug("Node {} state moving, new token {}", endpoint, token);
|
||||
_token_metadata.add_moving_endpoint(token, endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
@@ -687,7 +686,7 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector<s
|
||||
logger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
|
||||
// Note that the endpoint is being removed
|
||||
_token_metadata.add_leaving_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
// find the endpoint coordinating this removal that we need to notify when we're done
|
||||
auto state = gossiper.get_endpoint_state_for_endpoint(endpoint);
|
||||
assert(state);
|
||||
@@ -790,7 +789,7 @@ void storage_service::on_change(inet_address endpoint, application_state state,
|
||||
void storage_service::on_remove(gms::inet_address endpoint) {
|
||||
logger.debug("endpoint={} on_remove", endpoint);
|
||||
_token_metadata.remove_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
@@ -1665,7 +1664,7 @@ future<> storage_service::decommission() {
|
||||
throw std::runtime_error(sprint("Node in %s state; wait for status to become normal or restart", ss._operation_mode));
|
||||
}
|
||||
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
ss.update_pending_ranges().get();
|
||||
|
||||
auto non_system_keyspaces = db.get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
@@ -1764,7 +1763,7 @@ future<> storage_service::remove_node(sstring host_id_string) {
|
||||
}
|
||||
ss._removing_node = endpoint;
|
||||
tm.add_leaving_endpoint(endpoint);
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
ss.update_pending_ranges().get();
|
||||
|
||||
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
|
||||
// we add our own token so other nodes to let us know when they're done
|
||||
@@ -1968,7 +1967,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
|
||||
auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758
|
||||
for (auto& r : ranges) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), metadata);
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, metadata);
|
||||
current_replica_endpoints.emplace(r, std::move(eps));
|
||||
}
|
||||
|
||||
@@ -1989,7 +1989,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
|
||||
// range.
|
||||
for (auto& r : ranges) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), temp);
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp);
|
||||
|
||||
auto rg = current_replica_endpoints.equal_range(r);
|
||||
for (auto it = rg.first; it != rg.second; it++) {
|
||||
@@ -2133,7 +2134,7 @@ void storage_service::excise(std::unordered_set<token> tokens, inet_address endp
|
||||
}
|
||||
}).get();
|
||||
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
}
|
||||
|
||||
void storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, int64_t expire_time) {
|
||||
@@ -2182,7 +2183,7 @@ future<> storage_service::confirm_replication(inet_address node) {
|
||||
void storage_service::leave_ring() {
|
||||
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get();
|
||||
_token_metadata.remove_endpoint(get_broadcast_address());
|
||||
get_local_pending_range_calculator_service().update().get();
|
||||
update_pending_ranges().get();
|
||||
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
auto expire_time = gossiper.compute_expire_time().time_since_epoch().count();
|
||||
@@ -2281,7 +2282,7 @@ future<> storage_service::start_leaving() {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens())).then([this] {
|
||||
_token_metadata.add_leaving_endpoint(get_broadcast_address());
|
||||
return get_local_pending_range_calculator_service().update();
|
||||
return update_pending_ranges();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2503,8 +2504,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
|
||||
if (r.contains(to_fetch, dht::token_comparator())) {
|
||||
std::vector<inet_address> endpoints;
|
||||
if (dht::range_streamer::use_strict_consistency()) {
|
||||
auto end_token = to_fetch.end() ? to_fetch.end()->value() : dht::maximum_token();
|
||||
std::vector<inet_address> old_endpoints = eps;
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_fetch.end()->value(), token_meta_clone_all_settled);
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
|
||||
|
||||
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
|
||||
//So we need to be careful to only be strict when endpoints == RF
|
||||
@@ -2563,8 +2565,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
|
||||
std::unordered_multimap<inet_address, range<token>> endpoint_ranges;
|
||||
std::unordered_map<inet_address, std::vector<range<token>>> endpoint_ranges_map;
|
||||
for (range<token> to_stream : ranges_per_keyspace.first) {
|
||||
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone);
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone_all_settled);
|
||||
auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token();
|
||||
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone);
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
|
||||
logger.debug("Range: {} Current endpoints: {} New endpoints: {}", to_stream, current_endpoints, new_endpoints);
|
||||
std::sort(current_endpoints.begin(), current_endpoints.end());
|
||||
std::sort(new_endpoints.begin(), new_endpoints.end());
|
||||
@@ -2631,7 +2634,7 @@ future<> storage_service::move(token new_token) {
|
||||
|
||||
auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces();
|
||||
|
||||
get_local_pending_range_calculator_service().block_until_finished().get();
|
||||
ss.block_until_update_pending_ranges_finished().get();
|
||||
|
||||
// checking if data is moving to this node
|
||||
for (auto keyspace_name : keyspaces_to_process) {
|
||||
@@ -2676,5 +2679,39 @@ std::chrono::milliseconds storage_service::get_ring_delay() {
|
||||
return std::chrono::milliseconds(ring_delay);
|
||||
}
|
||||
|
||||
void storage_service::do_update_pending_ranges() {
|
||||
if (engine().cpu_id() != 0) {
|
||||
throw std::runtime_error("do_update_pending_ranges should be called on cpu zero");
|
||||
}
|
||||
// long start = System.currentTimeMillis();
|
||||
auto keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto& strategy = ks.get_replication_strategy();
|
||||
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
|
||||
}
|
||||
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
|
||||
}
|
||||
|
||||
future<> storage_service::update_pending_ranges() {
|
||||
return get_storage_service().invoke_on(0, [] (auto& ss){
|
||||
ss._update_jobs++;
|
||||
ss.do_update_pending_ranges();
|
||||
// calculate_pending_ranges will modify token_metadata, we need to repliate to other cores
|
||||
return ss.replicate_to_all_cores().finally([&ss, ss0 = ss.shared_from_this()] {
|
||||
ss._update_jobs--;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::block_until_update_pending_ranges_finished() {
|
||||
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
|
||||
return smp::submit_to(0, [] {
|
||||
return do_until(
|
||||
[] { return !(get_local_storage_service()._update_jobs > 0); },
|
||||
[] { return sleep(std::chrono::milliseconds(100)); });
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
|
||||
@@ -108,6 +108,7 @@ private:
|
||||
private final AtomicLong notificationSerialNumber = new AtomicLong();
|
||||
#endif
|
||||
distributed<database>& _db;
|
||||
int _update_jobs{0};
|
||||
// Note that this is obviously only valid for the current shard. Users of
|
||||
// this facility should elect a shard to be the coordinator based on any
|
||||
// given objective criteria
|
||||
@@ -127,6 +128,10 @@ public:
|
||||
// Needed by distributed<>
|
||||
future<> stop();
|
||||
|
||||
void do_update_pending_ranges();
|
||||
future<> update_pending_ranges();
|
||||
future<> block_until_update_pending_ranges_finished();
|
||||
|
||||
const locator::token_metadata& get_token_metadata() const {
|
||||
return _token_metadata;
|
||||
}
|
||||
|
||||
@@ -52,7 +52,14 @@ namespace sstables {
|
||||
|
||||
logging::logger sstlog("sstable");
|
||||
|
||||
thread_local std::unordered_map<sstring, unsigned> sstable::_shards_agreeing_to_remove_sstable;
|
||||
future<file> new_sstable_component_file(sstring name, open_flags flags) {
|
||||
return open_file_dma(name, flags).handle_exception([name] (auto ep) {
|
||||
sstlog.error("Could not create SSTable component {}. Found exception: {}", name, ep);
|
||||
return make_exception_future<file>(ep);
|
||||
});
|
||||
}
|
||||
|
||||
thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> sstable::_shards_agreeing_to_remove_sstable;
|
||||
|
||||
static utils::phased_barrier& background_jobs() {
|
||||
static thread_local utils::phased_barrier gate;
|
||||
@@ -749,7 +756,19 @@ void sstable::write_toc(const io_priority_class& pc) {
|
||||
sstlog.debug("Writing TOC file {} ", file_path);
|
||||
|
||||
// Writing TOC content to temporary file.
|
||||
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
// If creation of temporary TOC failed, it implies that that boot failed to
|
||||
// delete a sstable with temporary for this column family, or there is a
|
||||
// sstable being created in parallel with the same generation.
|
||||
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
|
||||
|
||||
bool toc_exists = file_exists(filename(sstable::component_type::TOC)).get0();
|
||||
if (toc_exists) {
|
||||
// TOC will exist at this point if write_components() was called with
|
||||
// the generation of a sstable that exists.
|
||||
f.close().get();
|
||||
remove_file(file_path).get();
|
||||
throw std::runtime_error(sprint("SSTable write failed due to existence of TOC file for generation %ld of %s.%s", _generation, _ks, _cf));
|
||||
}
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
@@ -792,7 +811,7 @@ void write_crc(const sstring file_path, checksum& c) {
|
||||
sstlog.debug("Writing CRC file {} ", file_path);
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
file f = open_file_dma(file_path, oflags).get0();
|
||||
file f = new_sstable_component_file(file_path, oflags).get0();
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
@@ -806,7 +825,7 @@ void write_digest(const sstring file_path, uint32_t full_checksum) {
|
||||
sstlog.debug("Writing Digest file {} ", file_path);
|
||||
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
auto f = open_file_dma(file_path, oflags).get0();
|
||||
auto f = new_sstable_component_file(file_path, oflags).get0();
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
@@ -877,7 +896,7 @@ template <sstable::component_type Type, typename T>
|
||||
void sstable::write_simple(T& component, const io_priority_class& pc) {
|
||||
auto file_path = filename(Type);
|
||||
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
|
||||
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
|
||||
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
@@ -938,8 +957,8 @@ future<> sstable::open_data() {
|
||||
|
||||
future<> sstable::create_data() {
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
return when_all(open_file_dma(filename(component_type::Index), oflags),
|
||||
open_file_dma(filename(component_type::Data), oflags)).then([this] (auto files) {
|
||||
return when_all(new_sstable_component_file(filename(component_type::Index), oflags),
|
||||
new_sstable_component_file(filename(component_type::Data), oflags)).then([this] (auto files) {
|
||||
// FIXME: If both files could not be created, the first get below will
|
||||
// throw an exception, and second get() will not be attempted, and
|
||||
// we'll get a warning about the second future being destructed
|
||||
@@ -1715,9 +1734,11 @@ sstable::shared_remove_by_toc_name(sstring toc_name, bool shared) {
|
||||
return remove_by_toc_name(toc_name);
|
||||
} else {
|
||||
auto shard = std::hash<sstring>()(toc_name) % smp::count;
|
||||
return smp::submit_to(shard, [toc_name] {
|
||||
auto& counter = _shards_agreeing_to_remove_sstable[toc_name];
|
||||
if (++counter == smp::count) {
|
||||
return smp::submit_to(shard, [toc_name, src_shard = engine().cpu_id()] {
|
||||
auto& remove_set = _shards_agreeing_to_remove_sstable[toc_name];
|
||||
remove_set.insert(src_shard);
|
||||
auto counter = remove_set.size();
|
||||
if (counter == smp::count) {
|
||||
_shards_agreeing_to_remove_sstable.erase(toc_name);
|
||||
return remove_by_toc_name(toc_name);
|
||||
} else {
|
||||
|
||||
@@ -343,7 +343,7 @@ private:
|
||||
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
|
||||
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
|
||||
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
|
||||
static thread_local std::unordered_map<sstring, unsigned> _shards_agreeing_to_remove_sstable;
|
||||
static thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> _shards_agreeing_to_remove_sstable;
|
||||
|
||||
std::unordered_set<component_type, enum_hash<component_type>> _components;
|
||||
|
||||
|
||||
@@ -41,19 +41,14 @@
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
|
||||
// TODO : remove once shutdown is ok.
|
||||
// Broke these test when doing horror patch for #293
|
||||
// Simpler to copy the code from init.cc than trying to do clever parameterization
|
||||
// and whatnot.
|
||||
static future<> tst_init_storage_service(distributed<database>& db) {
|
||||
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
|
||||
engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
|
||||
}).then([&db] {
|
||||
return service::init_storage_service(db).then([] {
|
||||
engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
return service::init_storage_service(db).then([] {
|
||||
engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -340,7 +335,6 @@ public:
|
||||
_db->stop().get();
|
||||
|
||||
service::get_storage_service().stop().get();
|
||||
service::get_pending_range_calculator_service().stop().get();
|
||||
|
||||
locator::i_endpoint_snitch::stop_snitch().get();
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "core/reactor.hh"
|
||||
#include "service/pending_range_calculator_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "database.hh"
|
||||
@@ -39,7 +38,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
distributed<database> db;
|
||||
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
|
||||
service::get_pending_range_calculator_service().start(std::ref(db));
|
||||
service::get_storage_service().start(std::ref(db)).get();
|
||||
db.start().get();
|
||||
net::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
|
||||
@@ -51,7 +49,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
net::get_messaging_service().stop().get();
|
||||
db.stop().get();
|
||||
service::get_storage_service().stop().get();
|
||||
service::get_pending_range_calculator_service().stop().get();
|
||||
locator::i_endpoint_snitch::stop_snitch().get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -68,6 +68,7 @@ future<>
|
||||
with_column_family(schema_ptr s, column_family::config cfg, Func func) {
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
return func(*cf).then([cf, cm] {
|
||||
return cf->stop();
|
||||
}).finally([cf, cm] {});
|
||||
@@ -404,9 +405,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
cfg.cf_stats = &*cf_stats;
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
return do_with(make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm), [s, cm] (auto& cf_ptr) mutable {
|
||||
column_family& cf = *cf_ptr;
|
||||
return with_column_family(s, cfg, [s] (auto& cf) mutable {
|
||||
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;
|
||||
|
||||
const column_definition& r1_col = *s->get_column_definition("r1");
|
||||
|
||||
@@ -987,6 +987,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
|
||||
cfg.enable_incremental_backups = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
|
||||
cf->start();
|
||||
cf->mark_ready_for_writes();
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
|
||||
auto generations = make_lw_shared<std::vector<unsigned long>>({1, 2, 3, 4});
|
||||
@@ -1019,7 +1020,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
|
||||
// were compacted.
|
||||
|
||||
BOOST_REQUIRE(cf->sstables_count() == generations->size());
|
||||
cm->submit(&*cf);
|
||||
cf->trigger_compaction();
|
||||
BOOST_REQUIRE(cm->get_stats().pending_tasks == 1);
|
||||
|
||||
// wait for submitted job to finish.
|
||||
@@ -1063,6 +1064,7 @@ SEASTAR_TEST_CASE(compact) {
|
||||
auto s = builder.build();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
return open_sstables("tests/sstables/compaction", {1,2,3}).then([s = std::move(s), cf, cm, generation] (auto sstables) {
|
||||
return test_setup::do_with_test_directory([sstables, s, generation, cf, cm] {
|
||||
@@ -1161,6 +1163,7 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
|
||||
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type));
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto generations = make_lw_shared<std::vector<unsigned long>>(std::move(generations_to_compact));
|
||||
auto sstables = make_lw_shared<std::vector<sstables::shared_sstable>>();
|
||||
@@ -1670,6 +1673,7 @@ SEASTAR_TEST_CASE(leveled_01) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -1714,6 +1718,7 @@ SEASTAR_TEST_CASE(leveled_02) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -1768,6 +1773,7 @@ SEASTAR_TEST_CASE(leveled_03) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -1826,6 +1832,7 @@ SEASTAR_TEST_CASE(leveled_04) {
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(50);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
@@ -2159,6 +2166,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
|
||||
}).then([s, tmp, sstables] {
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
auto create = [tmp] {
|
||||
return make_lw_shared<sstable>("ks", "cf", tmp->path, 3, la, big);
|
||||
};
|
||||
@@ -2259,6 +2267,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
|
||||
};
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
|
||||
cf->mark_ready_for_writes();
|
||||
std::vector<shared_sstable> sstables;
|
||||
sstables.push_back(std::move(sstp));
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "database.hh"
|
||||
#include <memory>
|
||||
#include "sstable_test.hh"
|
||||
#include "tmpdir.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
@@ -169,10 +170,11 @@ SEASTAR_TEST_CASE(big_summary_query_32) {
|
||||
return summary_query<32, 0xc4000, 182>("tests/sstables/bigsummary", 76);
|
||||
}
|
||||
|
||||
static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
|
||||
auto sst = make_lw_shared<sstable>("ks", "cf", dir, generation, la, big);
|
||||
return sst->load().then([sst, generation] {
|
||||
static future<sstable_ptr> do_write_sst(sstring load_dir, sstring write_dir, unsigned long generation) {
|
||||
auto sst = make_lw_shared<sstable>("ks", "cf", load_dir, generation, la, big);
|
||||
return sst->load().then([sst, write_dir, generation] {
|
||||
sstables::test(sst).change_generation_number(generation + 1);
|
||||
sstables::test(sst).change_dir(write_dir);
|
||||
auto fut = sstables::test(sst).store();
|
||||
return std::move(fut).then([sst = std::move(sst)] {
|
||||
return make_ready_future<sstable_ptr>(std::move(sst));
|
||||
@@ -180,8 +182,8 @@ static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
|
||||
});
|
||||
}
|
||||
|
||||
static future<> write_sst_info(sstring dir, unsigned long generation) {
|
||||
return do_write_sst(dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
|
||||
static future<> write_sst_info(sstring load_dir, sstring write_dir, unsigned long generation) {
|
||||
return do_write_sst(load_dir, write_dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
|
||||
}
|
||||
|
||||
using bufptr_t = std::unique_ptr<char [], free_deleter>;
|
||||
@@ -223,11 +225,12 @@ static future<> compare_files(sstdesc file1, sstdesc file2, sstable::component_t
|
||||
}
|
||||
|
||||
static future<> check_component_integrity(sstable::component_type component) {
|
||||
return write_sst_info("tests/sstables/compressed", 1).then([component] {
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return write_sst_info("tests/sstables/compressed", tmp->path, 1).then([component, tmp] {
|
||||
return compare_files(sstdesc{"tests/sstables/compressed", 1 },
|
||||
sstdesc{"tests/sstables/compressed", 2 },
|
||||
sstdesc{tmp->path, 2 },
|
||||
component);
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_compressed_info_func) {
|
||||
@@ -235,8 +238,9 @@ SEASTAR_TEST_CASE(check_compressed_info_func) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_summary_func) {
|
||||
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
|
||||
return sstables::test(sst2).read_summary().then([sst1, sst2] {
|
||||
summary& sst1_s = sstables::test(sst1).get_summary();
|
||||
summary& sst2_s = sstables::test(sst2).get_summary();
|
||||
@@ -247,7 +251,7 @@ SEASTAR_TEST_CASE(check_summary_func) {
|
||||
BOOST_REQUIRE(sst1_s.first_key.value == sst2_s.first_key.value);
|
||||
BOOST_REQUIRE(sst1_s.last_key.value == sst2_s.last_key.value);
|
||||
});
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_filter_func) {
|
||||
@@ -255,8 +259,9 @@ SEASTAR_TEST_CASE(check_filter_func) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_statistics_func) {
|
||||
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
|
||||
return sstables::test(sst2).read_statistics().then([sst1, sst2] {
|
||||
statistics& sst1_s = sstables::test(sst1).get_statistics();
|
||||
statistics& sst2_s = sstables::test(sst2).get_statistics();
|
||||
@@ -271,19 +276,20 @@ SEASTAR_TEST_CASE(check_statistics_func) {
|
||||
});
|
||||
// TODO: compare the field contents from both sstables.
|
||||
});
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(check_toc_func) {
|
||||
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
|
||||
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
|
||||
return sstables::test(sst2).read_toc().then([sst1, sst2] {
|
||||
auto& sst1_c = sstables::test(sst1).get_components();
|
||||
auto& sst2_c = sstables::test(sst2).get_components();
|
||||
|
||||
BOOST_REQUIRE(sst1_c == sst2_c);
|
||||
});
|
||||
});
|
||||
}).then([tmp] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(uncompressed_random_access_read) {
|
||||
@@ -857,6 +863,7 @@ SEASTAR_TEST_CASE(reshuffle) {
|
||||
cfg.enable_incremental_backups = false;
|
||||
auto cf = make_lw_shared<column_family>(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm);
|
||||
cf->start();
|
||||
cf->mark_ready_for_writes();
|
||||
return cf->reshuffle_sstables(3).then([cm, cf] (std::vector<sstables::entry_descriptor> reshuffled) {
|
||||
BOOST_REQUIRE(reshuffled.size() == 2);
|
||||
BOOST_REQUIRE(reshuffled[0].generation == 3);
|
||||
|
||||
@@ -100,6 +100,10 @@ public:
|
||||
_sst->_generation = generation;
|
||||
}
|
||||
|
||||
void change_dir(sstring dir) {
|
||||
_sst->_dir = dir;
|
||||
}
|
||||
|
||||
future<> store() {
|
||||
_sst->_components.erase(sstable::component_type::Index);
|
||||
_sst->_components.erase(sstable::component_type::Data);
|
||||
|
||||
@@ -135,6 +135,8 @@ void test_timestamp_like_string_conversions(data_type timestamp_type) {
|
||||
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-03T12:30:00+1230"), timestamp_type->decompose(tp)));
|
||||
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-02T23:00-0100"), timestamp_type->decompose(tp)));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00");
|
||||
|
||||
auto now = time(nullptr);
|
||||
auto local_now = *localtime(&now);
|
||||
char buf[100];
|
||||
|
||||
23
types.cc
23
types.cc
@@ -40,6 +40,14 @@
|
||||
#include <boost/multiprecision/cpp_int.hpp>
|
||||
#include "utils/big_decimal.hh"
|
||||
|
||||
template<typename T>
|
||||
sstring time_point_to_string(const T& tp)
|
||||
{
|
||||
auto timestamp = tp.time_since_epoch().count();
|
||||
auto time = boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(timestamp);
|
||||
return boost::posix_time::to_iso_extended_string(time);
|
||||
}
|
||||
|
||||
static const char* int32_type_name = "org.apache.cassandra.db.marshal.Int32Type";
|
||||
static const char* long_type_name = "org.apache.cassandra.db.marshal.LongType";
|
||||
static const char* ascii_type_name = "org.apache.cassandra.db.marshal.AsciiType";
|
||||
@@ -421,7 +429,11 @@ public:
|
||||
}
|
||||
virtual bytes from_string(sstring_view s) const override;
|
||||
virtual sstring to_string(const bytes& b) const override {
|
||||
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
|
||||
auto v = deserialize(b);
|
||||
if (v.is_null()) {
|
||||
return "";
|
||||
}
|
||||
return time_point_to_string(from_value(v).get());
|
||||
}
|
||||
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
|
||||
return cql3::cql3_type::timestamp;
|
||||
@@ -684,7 +696,11 @@ public:
|
||||
return b;
|
||||
}
|
||||
virtual sstring to_string(const bytes& b) const override {
|
||||
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
|
||||
auto v = deserialize(b);
|
||||
if (v.is_null()) {
|
||||
return "";
|
||||
}
|
||||
return time_point_to_string(from_value(v).get());
|
||||
}
|
||||
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
|
||||
return cql3::cql3_type::timestamp;
|
||||
@@ -1059,7 +1075,8 @@ public:
|
||||
if (!num) {
|
||||
return 1;
|
||||
}
|
||||
return boost::multiprecision::cpp_int::canonical_value(num).size() * sizeof(boost::multiprecision::limb_type) + 1;
|
||||
auto pnum = abs(num);
|
||||
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
|
||||
}
|
||||
virtual int32_t compare(bytes_view v1, bytes_view v2) const override {
|
||||
if (v1.empty()) {
|
||||
|
||||
@@ -1214,6 +1214,7 @@ public:
|
||||
if (_active) {
|
||||
assert(_active->is_empty());
|
||||
free_segment(_active);
|
||||
_active = nullptr;
|
||||
}
|
||||
if (_group) {
|
||||
_group->del(this);
|
||||
|
||||
@@ -50,12 +50,14 @@ namespace validation {
|
||||
*/
|
||||
void
|
||||
validate_cql_key(schema_ptr schema, const partition_key& key) {
|
||||
bytes_view b(key);
|
||||
if (b.empty()) {
|
||||
// C* validates here that the thrift key is not empty.
|
||||
// It can only be empty if it is not composite and its only component in CQL form is empty.
|
||||
if (schema->partition_key_size() == 1 && key.begin(*schema)->empty()) {
|
||||
throw exceptions::invalid_request_exception("Key may not be empty");
|
||||
}
|
||||
|
||||
// check that key can be handled by FBUtilities.writeShortByteArray
|
||||
auto b = key.representation();
|
||||
if (b.size() > max_key_size) {
|
||||
throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", b.size(), max_key_size));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user