Compare commits
32 Commits
next
...
scylla-1.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c56fc99b7f | ||
|
|
85d33e2ee4 | ||
|
|
ffeef2f072 | ||
|
|
d3ffa00eb2 | ||
|
|
ad50d83302 | ||
|
|
c6a9844dfe | ||
|
|
dececbc0b9 | ||
|
|
f2031bf3db | ||
|
|
da77b8885f | ||
|
|
86434378d1 | ||
|
|
e5d24d5940 | ||
|
|
0a2d4204bd | ||
|
|
74b8f63e8f | ||
|
|
9b764b726b | ||
|
|
07ba03ce7b | ||
|
|
de690a6997 | ||
|
|
7b53e969d2 | ||
|
|
c384b23112 | ||
|
|
3688542323 | ||
|
|
7916182cfa | ||
|
|
ec1fd3945f | ||
|
|
653e250d04 | ||
|
|
6255076c20 | ||
|
|
420ebe28fd | ||
|
|
a6179476c5 | ||
|
|
342726a23c | ||
|
|
e9946032f4 | ||
|
|
5e0b113732 | ||
|
|
c70faa4f23 | ||
|
|
15ad4c9033 | ||
|
|
d094329b6e | ||
|
|
dcab915f21 |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.2.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -51,6 +51,9 @@ public:
|
||||
// Return a list of sstables to be compacted after applying the strategy.
|
||||
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
|
||||
|
||||
// Return if parallel compaction is allowed by strategy.
|
||||
bool parallel_compaction() const;
|
||||
|
||||
static sstring name(compaction_strategy_type type) {
|
||||
switch (type) {
|
||||
case compaction_strategy_type::null:
|
||||
|
||||
154
database.cc
154
database.cc
@@ -484,12 +484,75 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
|
||||
return (s1 <= me) && (me <= s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
|
||||
auto key_shard = [&s] (const partition_key& pk) {
|
||||
auto token = dht::global_partitioner().get_token(s, pk);
|
||||
return dht::shard_of(token);
|
||||
};
|
||||
auto s1 = key_shard(first);
|
||||
auto s2 = key_shard(last);
|
||||
auto me = engine().cpu_id();
|
||||
return (s1 != me) || (me != s2);
|
||||
}
|
||||
|
||||
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
|
||||
assert(r.start());
|
||||
assert(r.end());
|
||||
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
|
||||
}
|
||||
|
||||
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
|
||||
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, r)) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
|
||||
sst->mark_for_deletion();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
|
||||
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
|
||||
if (in_other_shard) {
|
||||
// If we're here, this sstable is shared by this and other
|
||||
// shard(s). Shared sstables cannot be deleted until all
|
||||
// shards compacted them, so to reduce disk space usage we
|
||||
// want to start splitting them now.
|
||||
// However, we need to delay this compaction until we read all
|
||||
// the sstables belonging to this CF, because we need all of
|
||||
// them to know which tombstones we can drop, and what
|
||||
// generation number is free.
|
||||
_sstables_need_rewrite.push_back(sst);
|
||||
}
|
||||
if (reset_level) {
|
||||
// When loading a migrated sstable, set level to 0 because
|
||||
// it may overlap with existing tables in levels > 0.
|
||||
// This step is optional, because even if we didn't do this
|
||||
// scylla would detect the overlap, and bring back some of
|
||||
// the sstables to level 0.
|
||||
sst->set_sstable_level(0);
|
||||
}
|
||||
add_sstable(sst);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// load_sstable() wants to start rewriting sstables which are shared between
|
||||
// several shards, but we can't start any compaction before all the sstables
|
||||
// of this CF were loaded. So call this function to start rewrites, if any.
|
||||
void column_family::start_rewrite() {
|
||||
for (auto sst : _sstables_need_rewrite) {
|
||||
dblog.info("Splitting {} for shard", sst->get_filename());
|
||||
_compaction_manager.submit_sstable_rewrite(this, sst);
|
||||
}
|
||||
_sstables_need_rewrite.clear();
|
||||
}
|
||||
|
||||
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
|
||||
|
||||
using namespace sstables;
|
||||
@@ -514,24 +577,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
|
||||
}
|
||||
}
|
||||
|
||||
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
auto fut = sst->get_sstable_key_range(*_schema);
|
||||
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
|
||||
// Checks whether or not sstable belongs to current shard.
|
||||
if (!belongs_to_current_shard(*_schema, std::move(r))) {
|
||||
dblog.debug("sstable {} not relevant for this shard, ignoring",
|
||||
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
|
||||
sstables::sstable::component_type::Data));
|
||||
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto fut = sst->load();
|
||||
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
|
||||
add_sstable(std::move(*sst));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then_wrapped([fname, comps] (future<> f) {
|
||||
return load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
|
||||
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (malformed_sstable_exception& e) {
|
||||
@@ -1033,29 +1081,14 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
|
||||
future<>
|
||||
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
|
||||
return parallel_for_each(new_tables, [this] (auto comps) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
|
||||
return sst->load().then([this, sst] {
|
||||
// This sets in-memory level of sstable to 0.
|
||||
// When loading a migrated sstable, it's important to set it to level 0 because
|
||||
// leveled compaction relies on a level > 0 having no overlapping sstables.
|
||||
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
|
||||
// is smart enough to detect a sstable that overlaps and set its in-memory
|
||||
// level to 0.
|
||||
return sst->set_sstable_level(0);
|
||||
}).then([this, sst] {
|
||||
auto first = sst->get_first_partition_key(*_schema);
|
||||
auto last = sst->get_last_partition_key(*_schema);
|
||||
if (belongs_to_current_shard(*_schema, first, last)) {
|
||||
this->add_sstable(sst);
|
||||
} else {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return this->load_sstable(sstables::sstable(
|
||||
_schema->ks_name(), _schema->cf_name(), _config.datadir,
|
||||
comps.generation, comps.version, comps.format), true);
|
||||
}).then([this] {
|
||||
start_rewrite();
|
||||
// Drop entire cache for this column family because it may be populated
|
||||
// with stale data.
|
||||
get_row_cache().clear();
|
||||
return get_row_cache().clear();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1889,6 +1922,7 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
|
||||
auto add_partition = [&qs] (uint32_t live_rows, mutation&& m) {
|
||||
auto pb = qs.builder.add_partition(*qs.schema, m.key());
|
||||
m.partition().query_compacted(pb, *qs.schema, live_rows);
|
||||
qs.limit -= live_rows;
|
||||
};
|
||||
return do_with(querying_reader(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.cmd.timestamp, add_partition),
|
||||
[] (auto&& rd) { return rd.read(); });
|
||||
@@ -1896,10 +1930,10 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
make_lw_shared<query::result>(qs.builder.build()));
|
||||
}).finally([lc, this]() mutable {
|
||||
_stats.reads.mark(lc);
|
||||
if (lc.is_start()) {
|
||||
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
|
||||
}
|
||||
_stats.reads.mark(lc);
|
||||
if (lc.is_start()) {
|
||||
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -2257,7 +2291,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
f = cf.flush();
|
||||
} else {
|
||||
cf.clear();
|
||||
f = cf.clear();
|
||||
}
|
||||
|
||||
return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable {
|
||||
@@ -2633,21 +2667,29 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
|
||||
// temporary counter measure.
|
||||
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
|
||||
return _streaming_memtables->seal_active_memtable().finally([this, ranges = std::move(ranges)] {
|
||||
if (_config.enable_cache) {
|
||||
for (auto& range : ranges) {
|
||||
_cache.invalidate(range);
|
||||
}
|
||||
if (!_config.enable_cache) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_with(std::move(ranges), [this] (auto& ranges) {
|
||||
return parallel_for_each(ranges, [this](auto&& range) {
|
||||
return _cache.invalidate(range);
|
||||
});
|
||||
});
|
||||
return do_with(std::move(ranges), [this] (auto& ranges) {
|
||||
return parallel_for_each(ranges, [this](auto&& range) {
|
||||
return _cache.invalidate(range);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::clear() {
|
||||
_cache.clear();
|
||||
future<> column_family::clear() {
|
||||
_memtables->clear();
|
||||
_memtables->add_memtable();
|
||||
_streaming_memtables->clear();
|
||||
_streaming_memtables->add_memtable();
|
||||
return _cache.clear();
|
||||
}
|
||||
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
@@ -2673,13 +2715,13 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
|
||||
_sstables = std::move(pruned);
|
||||
dblog.debug("cleaning out row cache");
|
||||
_cache.clear();
|
||||
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -310,6 +310,11 @@ private:
|
||||
// have not been deleted yet, so must not GC any tombstones in other sstables
|
||||
// that may delete data in these sstables:
|
||||
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
|
||||
// sstables that are shared between several shards so we want to rewrite
|
||||
// them (split the data belonging to this shard to a separate sstable),
|
||||
// but for correct compaction we need to start the compaction only after
|
||||
// reading all sstables.
|
||||
std::vector<sstables::shared_sstable> _sstables_need_rewrite;
|
||||
// Control background fibers waiting for sstables to be deleted
|
||||
seastar::gate _sstable_deletion_gate;
|
||||
// There are situations in which we need to stop writing sstables. Flushers will take
|
||||
@@ -338,6 +343,7 @@ private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
|
||||
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
lw_shared_ptr<memtable> new_streaming_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
@@ -463,7 +469,7 @@ public:
|
||||
future<> flush();
|
||||
future<> flush(const db::replay_position&);
|
||||
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
|
||||
void clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<> clear(); // discards memtable(s) without flushing them to disk.
|
||||
future<db::replay_position> discard_sstables(db_clock::time_point);
|
||||
|
||||
// Important warning: disabling writes will only have an effect in the current shard.
|
||||
@@ -634,6 +640,7 @@ private:
|
||||
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
|
||||
void check_valid_rp(const db::replay_position&) const;
|
||||
public:
|
||||
void start_rewrite();
|
||||
// Iterate over all partitions. Protocol is the same as std::all_of(),
|
||||
// so that iteration can be stopped by returning false.
|
||||
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
|
||||
|
||||
@@ -36,6 +36,8 @@ extern thread_local disk_error_signal_type sstable_read_error;
|
||||
extern thread_local disk_error_signal_type sstable_write_error;
|
||||
extern thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
bool should_stop_on_system_error(const std::system_error& e);
|
||||
|
||||
template<typename Func, typename... Args>
|
||||
std::enable_if_t<!is_future<std::result_of_t<Func(Args&&...)>>::value,
|
||||
std::result_of_t<Func(Args&&...)>>
|
||||
@@ -44,7 +46,7 @@ do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
// calling function
|
||||
return func(std::forward<Args>(args)...);
|
||||
} catch (std::system_error& e) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(e)) {
|
||||
signal();
|
||||
throw storage_io_error(e);
|
||||
}
|
||||
@@ -62,7 +64,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (std::system_error& sys_err) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(sys_err)) {
|
||||
signal();
|
||||
throw storage_io_error(sys_err);
|
||||
}
|
||||
@@ -70,7 +72,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
|
||||
return futurize<std::result_of_t<Func(Args&&...)>>::make_exception_future(ep);
|
||||
});
|
||||
} catch (std::system_error& e) {
|
||||
if (is_system_error_errno(EIO)) {
|
||||
if (should_stop_on_system_error(e)) {
|
||||
signal();
|
||||
throw storage_io_error(e);
|
||||
}
|
||||
|
||||
2
dist/ami/files/scylla-ami
vendored
2
dist/ami/files/scylla-ami
vendored
Submodule dist/ami/files/scylla-ami updated: 72ae2580c1...863cc4598a
4
dist/common/scripts/scylla_io_setup
vendored
4
dist/common/scripts/scylla_io_setup
vendored
@@ -44,8 +44,8 @@ output_to_user()
|
||||
}
|
||||
|
||||
if [ `is_developer_mode` -eq 0 ]; then
|
||||
SMP=`echo $SCYLLA_ARGS|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $SCYLLA_ARGS|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
if [ $AMI_OPT -eq 1 ]; then
|
||||
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
|
||||
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`
|
||||
|
||||
5
dist/common/systemd/scylla-server.service.in
vendored
5
dist/common/systemd/scylla-server.service.in
vendored
@@ -2,6 +2,7 @@
|
||||
Description=Scylla Server
|
||||
|
||||
[Service]
|
||||
PermissionsStartOnly=true
|
||||
Type=notify
|
||||
LimitMEMLOCK=infinity
|
||||
LimitNOFILE=200000
|
||||
@@ -10,9 +11,9 @@ LimitNPROC=8096
|
||||
EnvironmentFile=@@SYSCONFDIR@@/scylla-server
|
||||
EnvironmentFile=/etc/scylla.d/*.conf
|
||||
WorkingDirectory=$SCYLLA_HOME
|
||||
ExecStartPre=/usr/bin/sudo /usr/lib/scylla/scylla_prepare
|
||||
ExecStartPre=/usr/lib/scylla/scylla_prepare
|
||||
ExecStart=/usr/bin/scylla $SCYLLA_ARGS $SEASTAR_IO $DEV_MODE $CPUSET
|
||||
ExecStopPost=/usr/bin/sudo /usr/lib/scylla/scylla_stop
|
||||
ExecStopPost=/usr/lib/scylla/scylla_stop
|
||||
TimeoutStartSec=900
|
||||
KillMode=process
|
||||
Restart=on-abnormal
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -2,7 +2,7 @@ FROM centos:7
|
||||
|
||||
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
|
||||
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.2.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN yum -y install epel-release
|
||||
RUN yum -y clean expire-cache
|
||||
RUN yum -y update
|
||||
|
||||
9
dist/redhat/scylla.spec.in
vendored
9
dist/redhat/scylla.spec.in
vendored
@@ -104,11 +104,6 @@ cp -P dist/common/sbin/* $RPM_BUILD_ROOT%{_sbindir}/
|
||||
%pre server
|
||||
/usr/sbin/groupadd scylla 2> /dev/null || :
|
||||
/usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
|
||||
%if 0%{?rhel}
|
||||
sed -e "s/Defaults requiretty/#Defaults requiretty/" /etc/sudoers > /tmp/sudoers
|
||||
cp /tmp/sudoers /etc/sudoers
|
||||
rm /tmp/sudoers
|
||||
%endif
|
||||
|
||||
%post server
|
||||
# Upgrade coredump settings
|
||||
@@ -214,7 +209,9 @@ This package contains Linux kernel configuration changes for the Scylla database
|
||||
if Scylla is the main application on your server and you wish to optimize its latency and throughput.
|
||||
|
||||
%post kernel-conf
|
||||
%sysctl_apply 99-scylla-sched.conf
|
||||
# We cannot use the sysctl_apply rpm macro because it is not present in 7.0
|
||||
# following is a "manual" expansion
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
|
||||
|
||||
%files kernel-conf
|
||||
%defattr(-,root,root)
|
||||
|
||||
22
main.cc
22
main.cc
@@ -277,6 +277,7 @@ verify_seastar_io_scheduler(bool has_max_io_requests, bool developer_mode) {
|
||||
}
|
||||
|
||||
int main(int ac, char** av) {
|
||||
try {
|
||||
// early check to avoid triggering
|
||||
if (!cpu_sanity()) {
|
||||
_exit(71);
|
||||
@@ -516,6 +517,18 @@ int main(int ac, char** av) {
|
||||
}
|
||||
return db.load_sstables(proxy);
|
||||
}).get();
|
||||
// If the same sstable is shared by several shards, it cannot be
|
||||
// deleted until all shards decide to compact it. So we want to
|
||||
// start thse compactions now. Note we start compacting only after
|
||||
// all sstables in this CF were loaded on all shards - otherwise
|
||||
// we will have races between the compaction and loading processes
|
||||
db.invoke_on_all([&proxy] (database& db) {
|
||||
for (auto& x : db.get_column_families()) {
|
||||
column_family& cf = *(x.second);
|
||||
// We start the rewrite, but do not wait for it.
|
||||
cf.start_rewrite();
|
||||
}
|
||||
}).get();
|
||||
supervisor_notify("setting up system keyspace");
|
||||
db::system_keyspace::setup(db, qp).get();
|
||||
supervisor_notify("starting commit log");
|
||||
@@ -595,10 +608,10 @@ int main(int ac, char** av) {
|
||||
supervisor_notify("serving");
|
||||
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
|
||||
engine().at_exit([] {
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
});
|
||||
engine().at_exit([] {
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
});
|
||||
engine().at_exit([&db] {
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
@@ -607,6 +620,11 @@ int main(int ac, char** av) {
|
||||
});
|
||||
}).or_terminate();
|
||||
});
|
||||
} catch (...) {
|
||||
// reactor may not have been initialized, so can't use logger
|
||||
fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception());
|
||||
return 7; // 1 has a special meaning for upstart
|
||||
}
|
||||
}
|
||||
|
||||
namespace debug {
|
||||
|
||||
@@ -397,7 +397,7 @@ static future<> sync_range(seastar::sharded<database>& db,
|
||||
return sp_in.execute().discard_result().then([&sp_out] {
|
||||
return sp_out.execute().discard_result();
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.error("repair's stream failed: {}", ep);
|
||||
logger.warn("repair's stream failed: {}", ep);
|
||||
return make_exception_future(ep);
|
||||
});
|
||||
});
|
||||
|
||||
54
row_cache.cc
54
row_cache.cc
@@ -443,7 +443,16 @@ row_cache::make_reader(schema_ptr s,
|
||||
}
|
||||
|
||||
row_cache::~row_cache() {
|
||||
clear();
|
||||
clear_now();
|
||||
}
|
||||
|
||||
void row_cache::clear_now() noexcept {
|
||||
with_allocator(_tracker.allocator(), [this] {
|
||||
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
|
||||
_tracker.on_erase();
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::populate(const mutation& m) {
|
||||
@@ -467,16 +476,8 @@ void row_cache::populate(const mutation& m) {
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::clear() {
|
||||
with_allocator(_tracker.allocator(), [this] {
|
||||
// We depend on clear_and_dispose() below not looking up any keys.
|
||||
// Using with_linearized_managed_bytes() is no helps, because we don't
|
||||
// want to propagate an exception from here.
|
||||
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
|
||||
_tracker.on_erase();
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
future<> row_cache::clear() {
|
||||
return invalidate(query::full_partition_range);
|
||||
}
|
||||
|
||||
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
|
||||
@@ -502,8 +503,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
|
||||
});
|
||||
if (blow_cache) {
|
||||
// We failed to invalidate the key, presumably due to with_linearized_managed_bytes()
|
||||
// running out of memory. Recover using clear(), which doesn't throw.
|
||||
clear();
|
||||
// running out of memory. Recover using clear_now(), which doesn't throw.
|
||||
clear_now();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -577,7 +578,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
future<> row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
return _populate_phaser.advance_and_await().then([this, &dk] {
|
||||
_read_section(_tracker.region(), [&] {
|
||||
with_allocator(_tracker.allocator(), [this, &dk] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
@@ -585,17 +587,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) {
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate(const query::partition_range& range) {
|
||||
with_linearized_managed_bytes([&] {
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
|
||||
auto unwrapped = range.unwrap();
|
||||
invalidate(unwrapped.first);
|
||||
invalidate(unwrapped.second);
|
||||
return;
|
||||
}
|
||||
future<> row_cache::invalidate(const query::partition_range& range) {
|
||||
return _populate_phaser.advance_and_await().then([this, &range] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
|
||||
auto unwrapped = range.unwrap();
|
||||
invalidate_unwrapped(unwrapped.first);
|
||||
invalidate_unwrapped(unwrapped.second);
|
||||
} else {
|
||||
invalidate_unwrapped(range);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void row_cache::invalidate_unwrapped(const query::partition_range& range) {
|
||||
logalloc::reclaim_lock _(_tracker.region());
|
||||
|
||||
auto cmp = cache_entry::compare(_schema);
|
||||
@@ -621,7 +630,6 @@ void row_cache::invalidate(const query::partition_range& range) {
|
||||
deleter(p);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,
|
||||
|
||||
24
row_cache.hh
24
row_cache.hh
@@ -184,13 +184,13 @@ private:
|
||||
mutation_source _underlying;
|
||||
key_source _underlying_keys;
|
||||
|
||||
// Synchronizes populating reads with update() to ensure that cache
|
||||
// Synchronizes populating reads with updates of underlying data source to ensure that cache
|
||||
// remains consistent across flushes with the underlying data source.
|
||||
// Readers obtained from the underlying data source in earlier than
|
||||
// current phases must not be used to populate the cache, unless they hold
|
||||
// phaser::operation created in the reader's phase of origin. Readers
|
||||
// should hold to a phase only briefly because this inhibits progress of
|
||||
// update(). Phase changes occur only in update(), which can be assumed to
|
||||
// updates. Phase changes occur in update()/clear(), which can be assumed to
|
||||
// be asynchronous wrt invoking of the underlying data source.
|
||||
utils::phased_barrier _populate_phaser;
|
||||
|
||||
@@ -204,6 +204,8 @@ private:
|
||||
void on_miss();
|
||||
void upgrade_entry(cache_entry&);
|
||||
void invalidate_locked(const dht::decorated_key&);
|
||||
void invalidate_unwrapped(const query::partition_range&);
|
||||
void clear_now() noexcept;
|
||||
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
|
||||
public:
|
||||
~row_cache();
|
||||
@@ -228,7 +230,9 @@ public:
|
||||
void populate(const mutation& m);
|
||||
|
||||
// Clears the cache.
|
||||
void clear();
|
||||
// Guarantees that cache will not be populated using readers created
|
||||
// before this method was invoked.
|
||||
future<> clear();
|
||||
|
||||
// Synchronizes cache with the underlying data source from a memtable which
|
||||
// has just been flushed to the underlying data source.
|
||||
@@ -240,11 +244,21 @@ public:
|
||||
void touch(const dht::decorated_key&);
|
||||
|
||||
// Removes given partition from cache.
|
||||
void invalidate(const dht::decorated_key&);
|
||||
//
|
||||
// Guarantees that cache will not be populated with given key
|
||||
// using readers created before this method was invoked.
|
||||
//
|
||||
// The key must be kept alive until method resolves.
|
||||
future<> invalidate(const dht::decorated_key& key);
|
||||
|
||||
// Removes given range of partitions from cache.
|
||||
// The range can be a wrap around.
|
||||
void invalidate(const query::partition_range&);
|
||||
//
|
||||
// Guarantees that cache will not be populated with partitions from that range
|
||||
// using readers created before this method was invoked.
|
||||
//
|
||||
// The range must be kept alive until method resolves.
|
||||
future<> invalidate(const query::partition_range&);
|
||||
|
||||
auto num_entries() const {
|
||||
return _partitions.size();
|
||||
|
||||
@@ -2058,7 +2058,6 @@ public:
|
||||
auto write_timeout = exec->_proxy->_db.local().get_config().write_request_timeout_in_ms() * 1000;
|
||||
auto delta = __int128_t(digest_resolver->last_modified()) - __int128_t(exec->_cmd->read_timestamp);
|
||||
if (std::abs(delta) <= write_timeout) {
|
||||
print("HERE %d\n", int64_t(delta));
|
||||
exec->_proxy->_stats.global_read_repairs_canceled_due_to_concurrent_write++;
|
||||
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
|
||||
auto i = boost::range::remove_if(exec->_targets, std::not1(std::cref(db::is_local)));
|
||||
|
||||
@@ -972,6 +972,28 @@ void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subsc
|
||||
|
||||
static stdx::optional<future<>> drain_in_progress;
|
||||
|
||||
future<> storage_service::stop_transport() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
return seastar::async([&ss] {
|
||||
logger.info("Stop transport: starts");
|
||||
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
logger.info("Stop transport: stop_gossiping done");
|
||||
|
||||
ss.shutdown_client_servers().get();
|
||||
logger.info("Stop transport: shutdown rpc and cql server done");
|
||||
|
||||
ss.do_stop_ms().get();
|
||||
logger.info("Stop transport: shutdown messaging_service done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
logger.info("Stop transport: auth shutdown");
|
||||
|
||||
logger.info("Stop transport: done");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::drain_on_shutdown() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
if (drain_in_progress) {
|
||||
@@ -980,17 +1002,8 @@ future<> storage_service::drain_on_shutdown() {
|
||||
return seastar::async([&ss] {
|
||||
logger.info("Drain on shutdown: starts");
|
||||
|
||||
gms::get_local_gossiper().stop_gossiping().get();
|
||||
logger.info("Drain on shutdown: stop_gossiping done");
|
||||
|
||||
ss.shutdown_client_servers().get();
|
||||
logger.info("Drain on shutdown: shutdown rpc and cql server done");
|
||||
|
||||
ss.do_stop_ms().get();
|
||||
logger.info("Drain on shutdown: shutdown messaging_service done");
|
||||
|
||||
auth::auth::shutdown().get();
|
||||
logger.info("Drain on shutdown: auth shutdown");
|
||||
ss.stop_transport().get();
|
||||
logger.info("Drain on shutdown: stop_transport done");
|
||||
|
||||
ss.flush_column_families();
|
||||
logger.info("Drain on shutdown: flush column_families done");
|
||||
@@ -3007,7 +3020,7 @@ void storage_service::do_isolate_on_error(disk_error type)
|
||||
if (must_isolate && !isolated.exchange(true)) {
|
||||
logger.warn("Shutting down communications due to I/O errors until operator intervention");
|
||||
// isolated protect us against multiple stops
|
||||
service::get_storage_service().invoke_on_all([] (service::storage_service& s) { s.stop_native_transport(); });
|
||||
service::get_local_storage_service().stop_transport();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -382,6 +382,8 @@ public:
|
||||
|
||||
future<> drain_on_shutdown();
|
||||
|
||||
future<> stop_transport();
|
||||
|
||||
void flush_column_families();
|
||||
#if 0
|
||||
/**
|
||||
|
||||
@@ -83,13 +83,17 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
|
||||
return weight;
|
||||
}
|
||||
|
||||
bool compaction_manager::try_to_register_weight(column_family* cf, int weight) {
|
||||
bool compaction_manager::try_to_register_weight(column_family* cf, int weight, bool parallel_compaction) {
|
||||
auto it = _weight_tracker.find(cf);
|
||||
if (it == _weight_tracker.end()) {
|
||||
_weight_tracker.insert({cf, {weight}});
|
||||
return true;
|
||||
}
|
||||
std::unordered_set<int>& s = it->second;
|
||||
// Only one weight is allowed if parallel compaction is disabled.
|
||||
if (!parallel_compaction && !s.empty()) {
|
||||
return false;
|
||||
}
|
||||
// TODO: Maybe allow only *smaller* compactions to start? That can be done
|
||||
// by returning true only if weight is not in the set and is lower than any
|
||||
// entry in the set.
|
||||
@@ -164,8 +168,7 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
|
||||
sstables::compaction_strategy cs = cf.get_compaction_strategy();
|
||||
descriptor = cs.get_sstables_for_compaction(cf, std::move(candidates));
|
||||
weight = trim_to_compact(&cf, descriptor);
|
||||
if (!try_to_register_weight(&cf, weight)) {
|
||||
// Refusing compaction job because of an ongoing compaction with same weight.
|
||||
if (!try_to_register_weight(&cf, weight, cs.parallel_compaction())) {
|
||||
task->stopping = true;
|
||||
_stats.pending_tasks--;
|
||||
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
|
||||
@@ -248,6 +251,51 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
|
||||
return task;
|
||||
}
|
||||
|
||||
// submit_sstable_rewrite() starts a compaction task, much like submit(),
|
||||
// But rather than asking a compaction policy what to compact, this function
|
||||
// compacts just a single sstable, and writes one new sstable. This operation
|
||||
// is useful to split an sstable containing data belonging to multiple shards
|
||||
// into a separate sstable on each shard.
|
||||
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
|
||||
// The semaphore ensures that the sstable rewrite operations submitted by
|
||||
// submit_sstable_rewrite are run in sequence, and not all of them in
|
||||
// parallel. Note that unlike general compaction which currently allows
|
||||
// different cfs to compact in parallel, here we don't have a semaphore
|
||||
// per cf, so we only get one rewrite at a time on each shard.
|
||||
static thread_local semaphore sem(1);
|
||||
// We cannot, and don't need to, compact an sstable which is already
|
||||
// being compacted anyway.
|
||||
if (_stopped || _compacting_sstables.count(sst)) {
|
||||
return;
|
||||
}
|
||||
// Conversely, we don't want another compaction job to compact the
|
||||
// sstable we are planning to work on:
|
||||
_compacting_sstables.insert(sst);
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
_tasks.push_back(task);
|
||||
_stats.active_tasks++;
|
||||
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
|
||||
return cf->compact_sstables(sstables::compaction_descriptor(
|
||||
std::vector<sstables::shared_sstable>{sst},
|
||||
sst->get_sstable_level(),
|
||||
std::numeric_limits<uint64_t>::max()), false);
|
||||
}).then_wrapped([this, sst, task] (future<> f) {
|
||||
_compacting_sstables.erase(sst);
|
||||
_stats.active_tasks--;
|
||||
_tasks.remove(task);
|
||||
try {
|
||||
f.get();
|
||||
_stats.completed_tasks++;
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
cmlog.info("compaction info: {}", e.what());
|
||||
_stats.errors++;
|
||||
} catch (...) {
|
||||
cmlog.error("compaction failed: {}", std::current_exception());
|
||||
_stats.errors++;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
|
||||
task->stopping = true;
|
||||
return task->compaction_gate.close().then([task] {
|
||||
|
||||
@@ -81,9 +81,9 @@ private:
|
||||
// It will not accept new requests in case the manager was stopped.
|
||||
bool can_submit();
|
||||
|
||||
// If weight is not taken for the column family, weight is registered and
|
||||
// true is returned. Return false otherwise.
|
||||
bool try_to_register_weight(column_family* cf, int weight);
|
||||
// Return true if weight is not registered. If parallel_compaction is not
|
||||
// true, only one weight is allowed to be registered.
|
||||
bool try_to_register_weight(column_family* cf, int weight, bool parallel_compaction);
|
||||
// Deregister weight for a column family.
|
||||
void deregister_weight(column_family* cf, int weight);
|
||||
|
||||
@@ -109,6 +109,13 @@ public:
|
||||
// Submit a column family to be cleaned up and wait for its termination.
|
||||
future<> perform_cleanup(column_family* cf);
|
||||
|
||||
// Submit a specific sstable to be rewritten, while dropping data which
|
||||
// does not belong to this shard. Meant to be used on startup when an
|
||||
// sstable is shared by multiple shards, and we want to split it to a
|
||||
// separate sstable for each shard.
|
||||
void submit_sstable_rewrite(column_family* cf,
|
||||
sstables::shared_sstable s);
|
||||
|
||||
// Remove a column family from the compaction manager.
|
||||
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
|
||||
future<> remove(column_family* cf);
|
||||
|
||||
@@ -56,6 +56,9 @@ public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
virtual bool parallel_compaction() const {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
//
|
||||
@@ -402,6 +405,10 @@ public:
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual bool parallel_compaction() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual compaction_strategy_type type() const {
|
||||
return compaction_strategy_type::leveled;
|
||||
}
|
||||
@@ -439,6 +446,9 @@ compaction_strategy_type compaction_strategy::type() const {
|
||||
compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) {
|
||||
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
|
||||
}
|
||||
bool compaction_strategy::parallel_compaction() const {
|
||||
return _compaction_strategy_impl->parallel_compaction();
|
||||
}
|
||||
|
||||
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
|
||||
::shared_ptr<compaction_strategy_impl> impl;
|
||||
|
||||
@@ -175,10 +175,8 @@ public:
|
||||
|
||||
if (previous != nullptr && current_first.tri_compare(s, previous->get_last_decorated_key(s)) <= 0) {
|
||||
|
||||
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 " \
|
||||
"or due to the fact that you have dropped sstables from another node into the data directory. " \
|
||||
"Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also " \
|
||||
"have rows out-of-order within an sstable",
|
||||
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by the fact that you have dropped " \
|
||||
"sstables from another node into the data directory. Sending back to L0.",
|
||||
level, previous->get_filename(), previous->get_first_partition_key(s), previous->get_last_partition_key(s),
|
||||
current->get_filename(), current->get_first_partition_key(s), current->get_last_partition_key(s));
|
||||
|
||||
|
||||
@@ -228,7 +228,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}
|
||||
_stream_result->handle_session_prepared(this->shared_from_this());
|
||||
} catch (...) {
|
||||
sslog.error("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
|
||||
sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
return make_ready_future<>();
|
||||
@@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
return ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
|
||||
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
|
||||
}).handle_exception([id, plan_id] (auto ep) {
|
||||
sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
}).then([this] {
|
||||
@@ -248,7 +248,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}
|
||||
|
||||
void stream_session::on_error() {
|
||||
sslog.error("[Stream #{}] Streaming error occurred", plan_id());
|
||||
sslog.warn("[Stream #{}] Streaming error occurred", plan_id());
|
||||
// fail session
|
||||
close_session(stream_session_state::FAILED);
|
||||
}
|
||||
@@ -270,7 +270,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
db.find_column_family(ks, cf);
|
||||
} catch (no_such_column_family) {
|
||||
auto err = sprint("[Stream #{}] prepare requested ks={} cf={} does not exist", ks, cf);
|
||||
sslog.error(err.c_str());
|
||||
sslog.warn(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
}
|
||||
@@ -284,7 +284,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
|
||||
db.find_column_family(cf_id);
|
||||
} catch (no_such_column_family) {
|
||||
auto err = sprint("[Stream #{}] prepare cf_id=%s does not exist", plan_id, cf_id);
|
||||
sslog.error(err.c_str());
|
||||
sslog.warn(err.c_str());
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
prepare_receiving(summary);
|
||||
|
||||
@@ -85,41 +85,41 @@ struct send_info {
|
||||
};
|
||||
|
||||
future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
}).handle_exception([si] (auto ep) {
|
||||
// There might be larger number of STREAM_MUTATION inflight.
|
||||
// Log one error per column_family per range
|
||||
if (!si->error_logged) {
|
||||
si->error_logged = true;
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
return stop_iteration::no;
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
}).handle_exception([si] (auto ep) {
|
||||
// There might be larger number of STREAM_MUTATION inflight.
|
||||
// Log one error per column_family per range
|
||||
if (!si->error_logged) {
|
||||
si->error_logged = true;
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
auto& priority = service::get_local_streaming_read_priority();
|
||||
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return repeat([si, &reader] {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
});
|
||||
}).then([si] {
|
||||
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
|
||||
auto cf_id = this->cf_id;
|
||||
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
|
||||
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
|
||||
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
|
||||
auto cf_id = this->cf_id;
|
||||
@@ -153,7 +153,7 @@ void stream_transfer_task::start() {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
|
||||
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,
|
||||
cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) {
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
}).then([this, id, plan_id, cf_id] {
|
||||
@@ -161,7 +161,7 @@ void stream_transfer_task::start() {
|
||||
session->start_keep_alive_timer();
|
||||
session->transfer_task_completed(cf_id);
|
||||
}).handle_exception([this, plan_id, id] (auto ep){
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
this->session->on_error();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -546,27 +546,33 @@ static std::vector<mutation> updated_ring(std::vector<mutation>& mutations) {
|
||||
return result;
|
||||
}
|
||||
|
||||
static mutation_source make_mutation_source(std::vector<lw_shared_ptr<memtable>>& memtables) {
|
||||
return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) {
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, pr));
|
||||
}
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
}
|
||||
|
||||
static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtable>>& memtables) {
|
||||
return key_source([s, &memtables] (const query::partition_range& pr) {
|
||||
std::vector<key_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->as_key_source()(pr));
|
||||
}
|
||||
return make_combined_reader(s, std::move(readers));
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
std::vector<lw_shared_ptr<memtable>> memtables;
|
||||
auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) {
|
||||
std::vector<mutation_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, pr));
|
||||
}
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
auto memtables_key_source = key_source([&] (const query::partition_range& pr) {
|
||||
std::vector<key_reader> readers;
|
||||
for (auto&& mt : memtables) {
|
||||
readers.emplace_back(mt->as_key_source()(pr));
|
||||
}
|
||||
return make_combined_reader(s, std::move(readers));
|
||||
});
|
||||
throttled_mutation_source cache_source(memtables_data_source);
|
||||
throttled_mutation_source cache_source(make_mutation_source(memtables));
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, cache_source, memtables_key_source, tracker);
|
||||
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
|
||||
|
||||
auto mt1 = make_lw_shared<memtable>(s);
|
||||
memtables.push_back(mt1);
|
||||
@@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
auto some_element = keys_in_cache.begin() + 547;
|
||||
std::vector<dht::decorated_key> keys_not_in_cache;
|
||||
keys_not_in_cache.push_back(*some_element);
|
||||
cache.invalidate(*some_element);
|
||||
cache.invalidate(*some_element).get();
|
||||
keys_in_cache.erase(some_element);
|
||||
|
||||
for (auto&& key : keys_in_cache) {
|
||||
@@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
{ *some_range_begin, true }, { *some_range_end, false }
|
||||
);
|
||||
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
|
||||
cache.invalidate(range);
|
||||
cache.invalidate(range).get();
|
||||
keys_in_cache.erase(some_range_begin, some_range_end);
|
||||
|
||||
for (auto&& key : keys_in_cache) {
|
||||
@@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
std::vector<lw_shared_ptr<memtable>> memtables;
|
||||
throttled_mutation_source cache_source(make_mutation_source(memtables));
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
|
||||
|
||||
auto mt1 = make_lw_shared<memtable>(s);
|
||||
memtables.push_back(mt1);
|
||||
auto ring = make_ring(s, 3);
|
||||
for (auto&& m : ring) {
|
||||
mt1->apply(m);
|
||||
}
|
||||
|
||||
auto mt2 = make_lw_shared<memtable>(s);
|
||||
auto ring2 = updated_ring(ring);
|
||||
for (auto&& m : ring2) {
|
||||
mt2->apply(m);
|
||||
}
|
||||
|
||||
cache_source.block();
|
||||
|
||||
auto rd1 = cache.make_reader(s);
|
||||
auto rd1_result = rd1();
|
||||
|
||||
sleep(10ms).get();
|
||||
|
||||
memtables.clear();
|
||||
memtables.push_back(mt2);
|
||||
|
||||
// This update should miss on all partitions
|
||||
auto cache_cleared = cache.clear();
|
||||
|
||||
auto rd2 = cache.make_reader(s);
|
||||
|
||||
// rd1, which is in progress, should not prevent forward progress of clear()
|
||||
cache_source.unblock();
|
||||
cache_cleared.get();
|
||||
|
||||
// Reads started before memtable flush should return previous value, otherwise this test
|
||||
// doesn't trigger the conditions it is supposed to protect against.
|
||||
|
||||
assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]);
|
||||
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]);
|
||||
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]);
|
||||
assert_that(rd1().get0()).has_no_mutation();
|
||||
|
||||
// Reads started after clear but before previous populations completed
|
||||
// should already see the new data
|
||||
assert_that(std::move(rd2))
|
||||
.produces(ring2[0])
|
||||
.produces(ring2[1])
|
||||
.produces(ring2[2])
|
||||
.produces_end_of_stream();
|
||||
|
||||
// Reads started after clear should see new data
|
||||
assert_that(cache.make_reader(s))
|
||||
.produces(ring2[0])
|
||||
.produces(ring2[1])
|
||||
.produces(ring2[2])
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
@@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
}
|
||||
|
||||
// wrap-around
|
||||
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()}));
|
||||
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
|
||||
|
||||
verify_does_not_have(cache, ring[0].decorated_key());
|
||||
verify_does_not_have(cache, ring[1].decorated_key());
|
||||
@@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
|
||||
verify_does_not_have(cache, ring[7].decorated_key());
|
||||
|
||||
// not wrap-around
|
||||
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()}));
|
||||
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
|
||||
|
||||
verify_does_not_have(cache, ring[0].decorated_key());
|
||||
verify_does_not_have(cache, ring[1].decorated_key());
|
||||
|
||||
@@ -49,3 +49,17 @@ bool is_system_error_errno(int err_no)
|
||||
code.category() == std::system_category();
|
||||
});
|
||||
}
|
||||
|
||||
bool should_stop_on_system_error(const std::system_error& e) {
|
||||
if (e.code().category() == std::system_category()) {
|
||||
// Whitelist of errors that don't require us to stop the server:
|
||||
switch (e.code().value()) {
|
||||
case EEXIST:
|
||||
case ENOENT:
|
||||
return false;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ inline ihistogram operator +(ihistogram a, const ihistogram& b) {
|
||||
struct rate_moving_average {
|
||||
uint64_t count = 0;
|
||||
double rates[3] = {0};
|
||||
double mean_rate;
|
||||
double mean_rate = 0;
|
||||
rate_moving_average& operator +=(const rate_moving_average& o) {
|
||||
count += o.count;
|
||||
mean_rate += o.mean_rate;
|
||||
|
||||
Reference in New Issue
Block a user