Compare commits
15 Commits
next
...
scylla-1.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c384b23112 | ||
|
|
3688542323 | ||
|
|
7916182cfa | ||
|
|
ec1fd3945f | ||
|
|
653e250d04 | ||
|
|
6255076c20 | ||
|
|
420ebe28fd | ||
|
|
a6179476c5 | ||
|
|
342726a23c | ||
|
|
e9946032f4 | ||
|
|
5e0b113732 | ||
|
|
c70faa4f23 | ||
|
|
15ad4c9033 | ||
|
|
d094329b6e | ||
|
|
dcab915f21 |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.2.0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -51,6 +51,9 @@ public:
|
||||
// Return a list of sstables to be compacted after applying the strategy.
|
||||
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
|
||||
|
||||
// Return if parallel compaction is allowed by strategy.
|
||||
bool parallel_compaction() const;
|
||||
|
||||
static sstring name(compaction_strategy_type type) {
|
||||
switch (type) {
|
||||
case compaction_strategy_type::null:
|
||||
|
||||
2
dist/ami/files/scylla-ami
vendored
2
dist/ami/files/scylla-ami
vendored
Submodule dist/ami/files/scylla-ami updated: 72ae2580c1...863cc4598a
4
dist/common/scripts/scylla_io_setup
vendored
4
dist/common/scripts/scylla_io_setup
vendored
@@ -44,8 +44,8 @@ output_to_user()
|
||||
}
|
||||
|
||||
if [ `is_developer_mode` -eq 0 ]; then
|
||||
SMP=`echo $SCYLLA_ARGS|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $SCYLLA_ARGS|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
if [ $AMI_OPT -eq 1 ]; then
|
||||
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
|
||||
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`
|
||||
|
||||
4
main.cc
4
main.cc
@@ -595,10 +595,10 @@ int main(int ac, char** av) {
|
||||
supervisor_notify("serving");
|
||||
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
|
||||
engine().at_exit([] {
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
});
|
||||
engine().at_exit([] {
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
});
|
||||
engine().at_exit([&db] {
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
|
||||
@@ -2058,7 +2058,6 @@ public:
|
||||
auto write_timeout = exec->_proxy->_db.local().get_config().write_request_timeout_in_ms() * 1000;
|
||||
auto delta = __int128_t(digest_resolver->last_modified()) - __int128_t(exec->_cmd->read_timestamp);
|
||||
if (std::abs(delta) <= write_timeout) {
|
||||
print("HERE %d\n", int64_t(delta));
|
||||
exec->_proxy->_stats.global_read_repairs_canceled_due_to_concurrent_write++;
|
||||
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
|
||||
auto i = boost::range::remove_if(exec->_targets, std::not1(std::cref(db::is_local)));
|
||||
|
||||
@@ -83,13 +83,17 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
|
||||
return weight;
|
||||
}
|
||||
|
||||
bool compaction_manager::try_to_register_weight(column_family* cf, int weight) {
|
||||
bool compaction_manager::try_to_register_weight(column_family* cf, int weight, bool parallel_compaction) {
|
||||
auto it = _weight_tracker.find(cf);
|
||||
if (it == _weight_tracker.end()) {
|
||||
_weight_tracker.insert({cf, {weight}});
|
||||
return true;
|
||||
}
|
||||
std::unordered_set<int>& s = it->second;
|
||||
// Only one weight is allowed if parallel compaction is disabled.
|
||||
if (!parallel_compaction && !s.empty()) {
|
||||
return false;
|
||||
}
|
||||
// TODO: Maybe allow only *smaller* compactions to start? That can be done
|
||||
// by returning true only if weight is not in the set and is lower than any
|
||||
// entry in the set.
|
||||
@@ -164,8 +168,7 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
|
||||
sstables::compaction_strategy cs = cf.get_compaction_strategy();
|
||||
descriptor = cs.get_sstables_for_compaction(cf, std::move(candidates));
|
||||
weight = trim_to_compact(&cf, descriptor);
|
||||
if (!try_to_register_weight(&cf, weight)) {
|
||||
// Refusing compaction job because of an ongoing compaction with same weight.
|
||||
if (!try_to_register_weight(&cf, weight, cs.parallel_compaction())) {
|
||||
task->stopping = true;
|
||||
_stats.pending_tasks--;
|
||||
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
|
||||
|
||||
@@ -81,9 +81,9 @@ private:
|
||||
// It will not accept new requests in case the manager was stopped.
|
||||
bool can_submit();
|
||||
|
||||
// If weight is not taken for the column family, weight is registered and
|
||||
// true is returned. Return false otherwise.
|
||||
bool try_to_register_weight(column_family* cf, int weight);
|
||||
// Return true if weight is not registered. If parallel_compaction is not
|
||||
// true, only one weight is allowed to be registered.
|
||||
bool try_to_register_weight(column_family* cf, int weight, bool parallel_compaction);
|
||||
// Deregister weight for a column family.
|
||||
void deregister_weight(column_family* cf, int weight);
|
||||
|
||||
|
||||
@@ -56,6 +56,9 @@ public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
virtual bool parallel_compaction() const {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
//
|
||||
@@ -402,6 +405,10 @@ public:
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual bool parallel_compaction() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual compaction_strategy_type type() const {
|
||||
return compaction_strategy_type::leveled;
|
||||
}
|
||||
@@ -439,6 +446,9 @@ compaction_strategy_type compaction_strategy::type() const {
|
||||
compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) {
|
||||
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
|
||||
}
|
||||
bool compaction_strategy::parallel_compaction() const {
|
||||
return _compaction_strategy_impl->parallel_compaction();
|
||||
}
|
||||
|
||||
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
|
||||
::shared_ptr<compaction_strategy_impl> impl;
|
||||
|
||||
@@ -175,10 +175,8 @@ public:
|
||||
|
||||
if (previous != nullptr && current_first.tri_compare(s, previous->get_last_decorated_key(s)) <= 0) {
|
||||
|
||||
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 " \
|
||||
"or due to the fact that you have dropped sstables from another node into the data directory. " \
|
||||
"Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also " \
|
||||
"have rows out-of-order within an sstable",
|
||||
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by the fact that you have dropped " \
|
||||
"sstables from another node into the data directory. Sending back to L0.",
|
||||
level, previous->get_filename(), previous->get_first_partition_key(s), previous->get_last_partition_key(s),
|
||||
current->get_filename(), current->get_first_partition_key(s), current->get_last_partition_key(s));
|
||||
|
||||
|
||||
@@ -85,7 +85,6 @@ struct send_info {
|
||||
};
|
||||
|
||||
future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
@@ -100,26 +99,27 @@ future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
return stop_iteration::no;
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
auto& priority = service::get_local_streaming_read_priority();
|
||||
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return repeat([si, &reader] {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
});
|
||||
}).then([si] {
|
||||
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
|
||||
auto cf_id = this->cf_id;
|
||||
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
|
||||
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
|
||||
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
|
||||
auto cf_id = this->cf_id;
|
||||
|
||||
@@ -196,7 +196,7 @@ inline ihistogram operator +(ihistogram a, const ihistogram& b) {
|
||||
struct rate_moving_average {
|
||||
uint64_t count = 0;
|
||||
double rates[3] = {0};
|
||||
double mean_rate;
|
||||
double mean_rate = 0;
|
||||
rate_moving_average& operator +=(const rate_moving_average& o) {
|
||||
count += o.count;
|
||||
mean_rate += o.mean_rate;
|
||||
|
||||
Reference in New Issue
Block a user