Compare commits

...

32 Commits

Author SHA1 Message Date
Avi Kivity
c56fc99b7f main: handle exceptions during startup
If we don't, std::terminate() causes a core dump, even though an
exception is sort-of-expected here and can be handled.

Add an exception handler to fix.

Fixes #1379.
Message-Id: <1466595221-20358-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 5af22f6cb1)
2016-06-23 10:03:30 +03:00
Pekka Enberg
85d33e2ee4 release: prepare for 1.2.1 2016-06-21 16:22:17 +03:00
Duarte Nunes
ffeef2f072 database: Actually decrease query_state limit
query_state expects the current row limit to be updated so it
can be enforced across partition ranges. A regression introduced
in e4e8acc946 prevented that from
happening by passing a copy of the limit to querying_reader.

This patch fixes the issue by having column_family::query update
the limit as it processes partitions from the querying_reader.

Fixes #1338

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1465804012-30535-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit c896309383)
2016-06-21 10:03:22 +03:00
Pekka Enberg
d3ffa00eb2 systemd: Use PermissionsStartOnly instead of running sudo
Use the PermissionsStartOnly systemd option to apply the permission
related configurations only to the start command. This allows us to stop
using "sudo" for ExecStartPre and ExecStopPost hooks and drop the
"requiretty" /etc/sudoers hack from Scylla's RPM.

Tested-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1466407587-31734-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 1d5f7be447)
2016-06-21 08:49:17 +03:00
Nadav Har'El
ad50d83302 Rewriting shared sstables only after all shards loaded sstables
After commit faa4581, each shard only starts splitting its shared sstables
after opening all sstables. This was important because compaction needs to
be aware of all sstables.

However, another bug remained: If one shard finishes loading its sstables
and starts the splitting compactions, and in parallel a different shard is
still opening sstables - the second shard might find a half-written sstable
being written by the first shard, and abort on a malformed sstable.

So in this patch we start the shared sstable rewrites - on all shards -
only after all shards finished loading their sstables. Doing this is easy,
because main.cc already contains a list of sequential steps where each
uses invoke_on_all() to make sure the step completes on all shards before
continuing to the next step.

Fixes #1371

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1466426641-3972-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit 3372052d48)
2016-06-20 18:20:01 +03:00
Avi Kivity
c6a9844dfe dist: fix scylla-kernel-conf postinstall scriptlet failure
Because we build on CentOS 7, which does not have the %sysctl_apply macro,
the macro is not expanded, and therefore executed incorrectly even on 7.2,
which does.

Fix by expanding the macro manually.

Fixes #1360.
Message-Id: <1466250006-19476-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 07045ffd7c)
2016-06-20 09:37:06 +03:00
Nadav Har'El
dececbc0b9 Rewrite shared sstables only after entire CF is read
Starting in commit 721f7d1d4f, we start "rewriting" a shared sstable (i.e.,
splitting it into individual shards) as soon as it is loaded in each shard.

However as discovered in issue #1366, this is too soon: Our compaction
process relies in several places that compaction is only done after all
the sstables of the same CF have been loaded. One example is that we
need to know the content of the other sstables to decide which tombstones
we can expire (this is issue #1366). Another example is that we use the
last generation number we are aware of to decide the number of the next
compaction output - and this is wrong before we saw all sstables.

So with this patch, while loading sstables we only make a list of shared
sstables which need to be rewritten - and the actual rewrite is only started
when we finish reading all the sstables for this CF. We need to do this in
two cases: reboot (when we load all the existing sstables we find on disk),
and nodetool referesh (when we import a set of new sstables).

Fixes #1366.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1466344078-31290-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit faa45812b2)
2016-06-19 17:11:14 +03:00
Asias He
f2031bf3db repair: Switch log level to warn instead of error
dtest takes error level log as serious error. It is not a serious error
for streaming to fail to send a verb and fail a streaming session which
triggers a repair failure, for example, the peer node is gone or
stopped. Switch to use log level warn instead of level error.

Fixes repair_additional_test.py:RepairAdditionalTest.repair_kill_3_test

Fixes: #1335
Message-Id: <406fb0c4a45b81bd9c0aea2a898d7ca0787b23e9.1465979288.git.asias@scylladb.com>
(cherry picked from commit de0fd98349)
2016-06-18 11:42:21 +03:00
Asias He
da77b8885f streaming: Switch log level to warn instead of error
dtest takes error level log as serious error. It is not a serious error
for streaming to fail to send a verb and fail a streaming session, for
example, the peer node is gone or stopped. Switch to use log level warn
instead of level error.

Fixes repair_additional_test.py:RepairAdditionalTest.repair_kill_3_test

Fixes: #1335
Message-Id: <0149d30044e6e4d80732f1a20cd20593de489fc8.1465979288.git.asias@scylladb.com>
(cherry picked from commit 94c9211b0e)
2016-06-18 11:42:10 +03:00
Asias He
86434378d1 streaming: Fix indention in do_send_mutations
Message-Id: <bc8cfa7c7b29f08e70c0af6d2fb835124d0831ac.1464857352.git.asias@scylladb.com>
(cherry picked from commit 96463cc17c)
2016-06-18 11:41:51 +03:00
Pekka Enberg
e5d24d5940 service/storage_service: Make do_isolate_on_error() more robust
Currently, we only stop the CQL transport server. Extract a
stop_transport() function from drain_on_shutdown() and call it from
do_isolate_on_error() to also shut down the inter-node RPC transport,
Thrift, and other communications services.

Fixes #1353

(cherry picked from commit d72c608868)

Conflicts:
	service/storage_service.cc

(cherry picked from commit 7e052a4e91)
2016-06-16 14:01:33 +03:00
Nadav Har'El
0a2d4204bd Rewrite shared sstables soon after startup
Several shards may share the same sstable - e.g., when re-starting scylla
with a different number of shards, or when importing sstables from an
external source. Sharing an sstable is fine, but it can result in excessive
disk space use because the shared sstable cannot be deleted until all
the shards using it have finished compacting it. Normally, we have no idea
when the shards will decide to compact these sstables - e.g., with size-
tiered-compaction a large sstable will take a long time until we decide
to compact it. So what this patch does is to initiate compaction of the
shared sstables - on each shard using it - so that a soon as possible after
the restart, we will have the original sstable is split into separate
sstables per shard, and the original sstable can be deleted. If several
sstables are shared, we serialize this compaction process so that each
shard only rewrites one sstable at a time. Regular compactions may happen
in parallel, but they will not not be able to choose any of the shared
sstables because those are already marked as being compacted.

Commit 3f2286d0 increased the need for this patch, because since that
commit, if we don't delete the shared sstable, we also cannot delete
additional sstables which the different shards compacted with it. For one
scylla user, this resulted in so much excessive disk space use, that it
literally filled the whole disk.

After this patch commit 3f2286d0, or the discussion in issue #1318 on how
to improve it, is no longer necessary, because we will never compact a shared
sstable together with any other sstable - as explained above, the shared
sstables are marked as "being compacted" so the regular compactions will
avoid them.

Fixes #1314.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1465406235-15378-1-git-send-email-nyh@scylladb.com>
Reviewed-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 721f7d1d4f)
2016-06-16 14:01:33 +03:00
Tomasz Grabiec
74b8f63e8f row_cache: Make stronger guarantees in clear/invalidate
Correctness of current uses of clear() and invalidate() relies on fact
that cache is not populated using readers created before
invalidation. Sstables are first modified and then cache is
invalidated. This is not guaranteed by current implementation
though. As pointed out by Avi, a populating read may race with the
call to clear(). If that read started before clear() and completed
after it, the cache may be populated with data which does not
correspond to the new sstable set.

To provide such guarantee, invalidate() variants were adjusted to
synchronize using _populate_phaser, similarly like row_cache::update()
does.

(cherry picked from commit 170a214628)

Conflicts:
	database.cc
2016-06-16 14:01:33 +03:00
Tomasz Grabiec
9b764b726b row_cache: Implement clear() using invalidate()
Reduces code duplication.

(cherry picked from commit 2ab18dcd2d)
2016-06-16 14:01:33 +03:00
Pekka Enberg
07ba03ce7b utils/exceptions: Whitelist EEXIST and ENOENT in should_stop_on_system_error()
There are various call-sites that explicitly check for EEXIST and
ENOENT:

  $ git grep "std::error_code(E"
  database.cc:                            if (e.code() != std::error_code(EEXIST, std::system_category())) {
  database.cc:            if (e.code() != std::error_code(ENOENT, std::system_category())) {
  database.cc:        if (e.code() != std::error_code(ENOENT, std::system_category())) {
  database.cc:                            if (e.code() != std::error_code(ENOENT, std::system_category())) {
  sstables/sstables.cc:            if (e.code() == std::error_code(ENOENT, std::system_category())) {
  sstables/sstables.cc:            if (e.code() == std::error_code(ENOENT, std::system_category())) {

Commit 961e80a ("Be more conservative when deciding when to shut down
due to disk errors") turned these errors into a storage_io_exception
that is not expected by the callers, which causes 'nodetool snapshot'
functionality to break, for example.

Whitelist the two error codes to revert back to the old behavior of
io_check().
Message-Id: <1465454446-17954-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 8df5aa7b0c)
2016-06-16 14:01:33 +03:00
Avi Kivity
de690a6997 Be more conservative when deciding when to shut down due to disk errors
Currently we only shut down on EIO.  Expand this to shut down on any
system_error.

This may cause us to shut down prematurely due to a transient error,
but this is better than not shutting down due to a permanent error
(such as ENOSPC or EPERM).  We may whitelist certain errors in the future
to improve the behavior.

Fixes #1311.
Message-Id: <1465136956-1352-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 961e80ab74)
2016-06-16 14:01:33 +03:00
Pekka Enberg
7b53e969d2 dist/docker: Use Scylla 1.2 RPM repository 2016-06-15 19:50:02 +03:00
Pekka Enberg
c384b23112 release: prepare for 1.2.0 2016-06-13 15:18:13 +03:00
Shlomi Livne
3688542323 dist/common: Update scylla_io_setup to use settings done in cpuset.conf
scylla_io_setup is searching for --smp and --cpuset setting in
SCYLLA_ARGS. We have moved the settings of this args into
/etc/scylla.d/cpuset.conf and they are set by scylla_cpuset_setup into
CPUSET.

Fixes: #1327

Signed-off-by: Shlomi Livne <shlomi@scylladb.com>
Message-Id: <2735e3abdd63d245ec96cfa1e65f766b1c12132e.1465508701.git.shlomi@scylladb.com>
(cherry picked from commit ac6f2b5c13)
2016-06-10 09:38:17 +03:00
Pekka Enberg
7916182cfa Revert "Be more conservative when deciding when to shut down due to disk errors"
This reverts commit a6179476c5.

The change breaks 'nodetool snapshot', for example.
2016-06-09 10:11:29 +03:00
Tomasz Grabiec
ec1fd3945f Revert "config: adjust boost::program_options validator to work with db::string_map"
This reverts commit 653e250d04.

Compiletion is broken with this patch:

[155/264] CXX build/release/db/config.o
FAILED: g++ -MMD -MT build/release/db/config.o -MF build/release/db/config.o.d -std=gnu++1y -g  -Wall -Werror -fvisibility=hidden -pthread -I/home/shlomi/scylla/seastar -I/home/shlomi/scylla/seastar/build/release/gen  -march=nehalem -Wno-overloaded-virtual -DHAVE_HWLOC -DHAVE_NUMA  -O2 -I/usr/include/jsoncpp/  -Wno-maybe-uninitialized -DHAVE_LIBSYSTEMD=1 -I. -I build/release/gen -I seastar -I seastar/build/release/gen -c -o build/release/db/config.o db/config.cc
db/config.cc:57:13: error: ‘void db::validate(boost::any&, const std::vector<std::__cxx11::basic_string<char> >&, db::string_map*, int)’ defined but not used [-Werror=unused-function]
 static void validate(boost::any& out, const std::vector<std::string>& in,
             ^
cc1plus: all warnings being treated as errors

This branch doesn't have commits which introduce the problem which
this patch fixes, so let's just revert it.
2016-06-08 11:05:47 +02:00
Gleb Natapov
653e250d04 config: adjust boost::program_options validator to work with db::string_map
Fixes #1320

Message-Id: <20160607064511.GX9939@scylladb.com>
(cherry picked from commit 9635e67a84)
2016-06-07 10:43:30 +03:00
Amnon Heiman
6255076c20 rate_moving_average: mean_rate is not initilized
The rate_moving_average is used by timed_rate_moving_average to return
its internal values.

If there are no timed event, the mean_rate is not propertly initilized.
To solve that the mean_rate is now initilized to 0 in the structure
definition.

Refs #1306

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1465231006-7081-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit 2cf882c365)
2016-06-07 09:44:26 +03:00
Pekka Enberg
420ebe28fd release: prepare for 1.2.rc2 2016-06-06 16:17:26 +03:00
Avi Kivity
a6179476c5 Be more conservative when deciding when to shut down due to disk errors
Currently we only shut down on EIO.  Expand this to shut down on any
system_error.

This may cause us to shut down prematurely due to a transient error,
but this is better than not shutting down due to a permanent error
(such as ENOSPC or EPERM).  We may whitelist certain errors in the future
to improve the behavior.

Fixes #1311.
Message-Id: <1465136956-1352-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 961e80ab74)
2016-06-06 16:15:25 +03:00
Raphael S. Carvalho
342726a23c compaction: leveled: improve log message for overlapping table
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <2dcbe3c8131f1d88a3536daa0b6cdd25c6e41d76.1464883077.git.raphaelsc@scylladb.com>
(cherry picked from commit 17b56eb459)
2016-06-06 16:13:40 +03:00
Raphael S. Carvalho
e9946032f4 compaction: disable parallel compaction for leveled strategy
It was discussed that leveled strategy may not benefit from parallel
compaction feature because almost all compaction jobs will have similar
size. It was also found that leveled strategy wasn't working correctly
with it because two overlapping sstable (targetting the same level)
could be created in parallel by two ongoing compaction.

Fixes #1293.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <60fe165d611c0283ca203c6d3aa2662ab091e363.1464883077.git.raphaelsc@scylladb.com>
(cherry picked from commit 588ce915d6)
2016-06-06 16:13:36 +03:00
Pekka Enberg
5e0b113732 Update scylla-ami submodule
* dist/ami/files/scylla-ami 72ae258...863cc45 (3):
  > Move --cpuset/--smp parameter settings from scylla_sysconfig_setup to scylla_ami_setup
  > convert scylla_install_ami to bash script
  > 'sh -x -e' is not valid since all scripts converted to bash script, so remove them
2016-06-06 13:38:53 +03:00
Asias He
c70faa4f23 streaming: Reduce memory usage when sending mutations
Limit disk bandwidth to 5MB/s to emulate a slow disk:
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.write_bps_device
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.read_bps_device

Start scylla node 1 with low memory:
scylla -c 1 -m 128M --auto-bootstrap false

Run c-s:
taskset -c 7 cassandra-stress write duration=5m cl=ONE -schema
'replication(factor=1)' -pop seq=1..100000  -rate threads=20
limit=2000/s -node 127.0.0.1

Start scylla node 2 with low memory:
scylla -c 1 -m 128M --auto-bootstrap true

Without this patch, I saw std::bad_alloc during streaming

ERROR 2016-06-01 14:31:00,196 [shard 0] storage_proxy - exception during
mutation write to 127.0.0.1: std::bad_alloc (std::bad_alloc)
...
ERROR 2016-06-01 14:31:10,172 [shard 0] database - failed to move
memtable to cache: std::bad_alloc (std::bad_alloc)
...

To fix:

1. Apply the streaming mutation limiter before we read the mutation into
memory to avoid wasting memory holding the mutation which we can not
send.

2. Reduce the parallelism of sending streaming mutations. Before we send each
range in parallel, after we send each range one by one.

   before: nr_vnode * nr_shard * (send_info + cf.make_reader memory usage)

   after: nr_shard * (send_info + cf.make_reader memory usage)

We can at least save memory usage by the factor of nr_vnode, 256 by
default.

In my setup, fix 1) alone is not enough, with both fix 1) and 2), I saw
no std::bad_alloc. Also, I did not see streaming bandwidth dropped due
to 2).

In addition, I tested grow_cluster_test.py:GrowClusterTest.test_grow_3_to_4,
as described:

https://github.com/scylladb/scylla/issues/1270#issuecomment-222585375

With this patch, I saw no std::bad_alloc any more.

Fixes: #1270

Message-Id: <7703cf7a9db40e53a87f0f7b5acbb03fff2daf43.1464785542.git.asias@scylladb.com>
(cherry picked from commit 206955e47c)
2016-06-02 11:18:59 +03:00
Gleb Natapov
15ad4c9033 storage_proxy: drop debug output
Message-Id: <20160601132641.GK2381@scylladb.com>
(cherry picked from commit 26b50eb8f4)
2016-06-01 17:14:32 +03:00
Pekka Enberg
d094329b6e Revert "Revert "main: change order between storage service and drain execution during exit""
This reverts commit b3ed55be1d.

The issue is in the failing dtest, not this commit. Gleb writes:

  "The bug is in the test, not the patch. Test waits for repair session
   to end one way or the other when node is killed, but for nodetool to
   know if repair is completed it needs to poll for it.  If node dies
   before nodetool managed to see repair completion it will stuck
   forever since jmx is alive, but does not provide answers any more.
   The patch changes timing, repair is completed much close to exit now,
   so problem appears, but it may happen even without the patch.

   The fix is for dtest to kill jmx as part of killing a node
   operation."

Now that Lucas fixed the problem in scylla-ccm, revert the revert.

(cherry picked from commit 0255318bf3)
2016-06-01 08:51:51 +03:00
Pekka Enberg
dcab915f21 release: prepare for 1.2.rc1 2016-05-30 13:14:38 +03:00
26 changed files with 439 additions and 184 deletions

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.2.1
if test -f version
then

View File

@@ -51,6 +51,9 @@ public:
// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
// Return if parallel compaction is allowed by strategy.
bool parallel_compaction() const;
static sstring name(compaction_strategy_type type) {
switch (type) {
case compaction_strategy_type::null:

View File

@@ -484,12 +484,75 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
return (s1 <= me) && (me <= s2);
}
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
auto key_shard = [&s] (const partition_key& pk) {
auto token = dht::global_partitioner().get_token(s, pk);
return dht::shard_of(token);
};
auto s1 = key_shard(first);
auto s2 = key_shard(last);
auto me = engine().cpu_id();
return (s1 != me) || (me != s2);
}
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
}
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
}
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, r)) {
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
sst->mark_for_deletion();
return make_ready_future<>();
}
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
if (in_other_shard) {
// If we're here, this sstable is shared by this and other
// shard(s). Shared sstables cannot be deleted until all
// shards compacted them, so to reduce disk space usage we
// want to start splitting them now.
// However, we need to delay this compaction until we read all
// the sstables belonging to this CF, because we need all of
// them to know which tombstones we can drop, and what
// generation number is free.
_sstables_need_rewrite.push_back(sst);
}
if (reset_level) {
// When loading a migrated sstable, set level to 0 because
// it may overlap with existing tables in levels > 0.
// This step is optional, because even if we didn't do this
// scylla would detect the overlap, and bring back some of
// the sstables to level 0.
sst->set_sstable_level(0);
}
add_sstable(sst);
});
});
}
// load_sstable() wants to start rewriting sstables which are shared between
// several shards, but we can't start any compaction before all the sstables
// of this CF were loaded. So call this function to start rewrites, if any.
void column_family::start_rewrite() {
for (auto sst : _sstables_need_rewrite) {
dblog.info("Splitting {} for shard", sst->get_filename());
_compaction_manager.submit_sstable_rewrite(this, sst);
}
_sstables_need_rewrite.clear();
}
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
using namespace sstables;
@@ -514,24 +577,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
}
}
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
auto fut = sst->get_sstable_key_range(*_schema);
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, std::move(r))) {
dblog.debug("sstable {} not relevant for this shard, ignoring",
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
sstables::sstable::component_type::Data));
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
return make_ready_future<>();
}
auto fut = sst->load();
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
add_sstable(std::move(*sst));
return make_ready_future<>();
});
}).then_wrapped([fname, comps] (future<> f) {
return load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
try {
f.get();
} catch (malformed_sstable_exception& e) {
@@ -1033,29 +1081,14 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
future<>
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
return parallel_for_each(new_tables, [this] (auto comps) {
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
return sst->load().then([this, sst] {
// This sets in-memory level of sstable to 0.
// When loading a migrated sstable, it's important to set it to level 0 because
// leveled compaction relies on a level > 0 having no overlapping sstables.
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
// is smart enough to detect a sstable that overlaps and set its in-memory
// level to 0.
return sst->set_sstable_level(0);
}).then([this, sst] {
auto first = sst->get_first_partition_key(*_schema);
auto last = sst->get_last_partition_key(*_schema);
if (belongs_to_current_shard(*_schema, first, last)) {
this->add_sstable(sst);
} else {
sst->mark_for_deletion();
}
return make_ready_future<>();
});
return this->load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), _config.datadir,
comps.generation, comps.version, comps.format), true);
}).then([this] {
start_rewrite();
// Drop entire cache for this column family because it may be populated
// with stale data.
get_row_cache().clear();
return get_row_cache().clear();
});
}
@@ -1889,6 +1922,7 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
auto add_partition = [&qs] (uint32_t live_rows, mutation&& m) {
auto pb = qs.builder.add_partition(*qs.schema, m.key());
m.partition().query_compacted(pb, *qs.schema, live_rows);
qs.limit -= live_rows;
};
return do_with(querying_reader(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.cmd.timestamp, add_partition),
[] (auto&& rd) { return rd.read(); });
@@ -1896,10 +1930,10 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
return make_ready_future<lw_shared_ptr<query::result>>(
make_lw_shared<query::result>(qs.builder.build()));
}).finally([lc, this]() mutable {
_stats.reads.mark(lc);
if (lc.is_start()) {
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
}
_stats.reads.mark(lc);
if (lc.is_start()) {
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
}
});
}
}
@@ -2257,7 +2291,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
// gotten all things to disk. Again, need queue-ish or something.
f = cf.flush();
} else {
cf.clear();
f = cf.clear();
}
return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable {
@@ -2633,21 +2667,29 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
// temporary counter measure.
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
return _streaming_memtables->seal_active_memtable().finally([this, ranges = std::move(ranges)] {
if (_config.enable_cache) {
for (auto& range : ranges) {
_cache.invalidate(range);
}
if (!_config.enable_cache) {
return make_ready_future<>();
}
return do_with(std::move(ranges), [this] (auto& ranges) {
return parallel_for_each(ranges, [this](auto&& range) {
return _cache.invalidate(range);
});
});
return do_with(std::move(ranges), [this] (auto& ranges) {
return parallel_for_each(ranges, [this](auto&& range) {
return _cache.invalidate(range);
});
});
});
});
}
void column_family::clear() {
_cache.clear();
future<> column_family::clear() {
_memtables->clear();
_memtables->add_memtable();
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
return _cache.clear();
}
// NOTE: does not need to be futurized, but might eventually, depending on
@@ -2673,13 +2715,13 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
_sstables = std::move(pruned);
dblog.debug("cleaning out row cache");
_cache.clear();
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
});
});
}

View File

@@ -310,6 +310,11 @@ private:
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
// sstables that are shared between several shards so we want to rewrite
// them (split the data belonging to this shard to a separate sstable),
// but for correct compaction we need to start the compaction only after
// reading all sstables.
std::vector<sstables::shared_sstable> _sstables_need_rewrite;
// Control background fibers waiting for sstables to be deleted
seastar::gate _sstable_deletion_gate;
// There are situations in which we need to stop writing sstables. Flushers will take
@@ -338,6 +343,7 @@ private:
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
void add_sstable(sstables::sstable&& sstable);
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
@@ -463,7 +469,7 @@ public:
future<> flush();
future<> flush(const db::replay_position&);
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
void clear(); // discards memtable(s) without flushing them to disk.
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
// Important warning: disabling writes will only have an effect in the current shard.
@@ -634,6 +640,7 @@ private:
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
void check_valid_rp(const db::replay_position&) const;
public:
void start_rewrite();
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;

View File

@@ -36,6 +36,8 @@ extern thread_local disk_error_signal_type sstable_read_error;
extern thread_local disk_error_signal_type sstable_write_error;
extern thread_local disk_error_signal_type general_disk_error;
bool should_stop_on_system_error(const std::system_error& e);
template<typename Func, typename... Args>
std::enable_if_t<!is_future<std::result_of_t<Func(Args&&...)>>::value,
std::result_of_t<Func(Args&&...)>>
@@ -44,7 +46,7 @@ do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
// calling function
return func(std::forward<Args>(args)...);
} catch (std::system_error& e) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(e)) {
signal();
throw storage_io_error(e);
}
@@ -62,7 +64,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
try {
std::rethrow_exception(ep);
} catch (std::system_error& sys_err) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(sys_err)) {
signal();
throw storage_io_error(sys_err);
}
@@ -70,7 +72,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
return futurize<std::result_of_t<Func(Args&&...)>>::make_exception_future(ep);
});
} catch (std::system_error& e) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(e)) {
signal();
throw storage_io_error(e);
}

View File

@@ -44,8 +44,8 @@ output_to_user()
}
if [ `is_developer_mode` -eq 0 ]; then
SMP=`echo $SCYLLA_ARGS|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
CPUSET=`echo $SCYLLA_ARGS|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
if [ $AMI_OPT -eq 1 ]; then
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`

View File

@@ -2,6 +2,7 @@
Description=Scylla Server
[Service]
PermissionsStartOnly=true
Type=notify
LimitMEMLOCK=infinity
LimitNOFILE=200000
@@ -10,9 +11,9 @@ LimitNPROC=8096
EnvironmentFile=@@SYSCONFDIR@@/scylla-server
EnvironmentFile=/etc/scylla.d/*.conf
WorkingDirectory=$SCYLLA_HOME
ExecStartPre=/usr/bin/sudo /usr/lib/scylla/scylla_prepare
ExecStartPre=/usr/lib/scylla/scylla_prepare
ExecStart=/usr/bin/scylla $SCYLLA_ARGS $SEASTAR_IO $DEV_MODE $CPUSET
ExecStopPost=/usr/bin/sudo /usr/lib/scylla/scylla_stop
ExecStopPost=/usr/lib/scylla/scylla_stop
TimeoutStartSec=900
KillMode=process
Restart=on-abnormal

View File

@@ -2,7 +2,7 @@ FROM centos:7
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.2.repo -o /etc/yum.repos.d/scylla.repo
RUN yum -y install epel-release
RUN yum -y clean expire-cache
RUN yum -y update

View File

@@ -104,11 +104,6 @@ cp -P dist/common/sbin/* $RPM_BUILD_ROOT%{_sbindir}/
%pre server
/usr/sbin/groupadd scylla 2> /dev/null || :
/usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
%if 0%{?rhel}
sed -e "s/Defaults requiretty/#Defaults requiretty/" /etc/sudoers > /tmp/sudoers
cp /tmp/sudoers /etc/sudoers
rm /tmp/sudoers
%endif
%post server
# Upgrade coredump settings
@@ -214,7 +209,9 @@ This package contains Linux kernel configuration changes for the Scylla database
if Scylla is the main application on your server and you wish to optimize its latency and throughput.
%post kernel-conf
%sysctl_apply 99-scylla-sched.conf
# We cannot use the sysctl_apply rpm macro because it is not present in 7.0
# following is a "manual" expansion
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
%files kernel-conf
%defattr(-,root,root)

22
main.cc
View File

@@ -277,6 +277,7 @@ verify_seastar_io_scheduler(bool has_max_io_requests, bool developer_mode) {
}
int main(int ac, char** av) {
try {
// early check to avoid triggering
if (!cpu_sanity()) {
_exit(71);
@@ -516,6 +517,18 @@ int main(int ac, char** av) {
}
return db.load_sstables(proxy);
}).get();
// If the same sstable is shared by several shards, it cannot be
// deleted until all shards decide to compact it. So we want to
// start thse compactions now. Note we start compacting only after
// all sstables in this CF were loaded on all shards - otherwise
// we will have races between the compaction and loading processes
db.invoke_on_all([&proxy] (database& db) {
for (auto& x : db.get_column_families()) {
column_family& cf = *(x.second);
// We start the rewrite, but do not wait for it.
cf.start_rewrite();
}
}).get();
supervisor_notify("setting up system keyspace");
db::system_keyspace::setup(db, qp).get();
supervisor_notify("starting commit log");
@@ -595,10 +608,10 @@ int main(int ac, char** av) {
supervisor_notify("serving");
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
engine().at_exit([] {
return service::get_local_storage_service().drain_on_shutdown();
return repair_shutdown(service::get_local_storage_service().db());
});
engine().at_exit([] {
return repair_shutdown(service::get_local_storage_service().db());
return service::get_local_storage_service().drain_on_shutdown();
});
engine().at_exit([&db] {
return db.invoke_on_all([](auto& db) {
@@ -607,6 +620,11 @@ int main(int ac, char** av) {
});
}).or_terminate();
});
} catch (...) {
// reactor may not have been initialized, so can't use logger
fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception());
return 7; // 1 has a special meaning for upstart
}
}
namespace debug {

View File

@@ -397,7 +397,7 @@ static future<> sync_range(seastar::sharded<database>& db,
return sp_in.execute().discard_result().then([&sp_out] {
return sp_out.execute().discard_result();
}).handle_exception([] (auto ep) {
logger.error("repair's stream failed: {}", ep);
logger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
});
});

View File

@@ -443,7 +443,16 @@ row_cache::make_reader(schema_ptr s,
}
row_cache::~row_cache() {
clear();
clear_now();
}
void row_cache::clear_now() noexcept {
with_allocator(_tracker.allocator(), [this] {
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
}
void row_cache::populate(const mutation& m) {
@@ -467,16 +476,8 @@ void row_cache::populate(const mutation& m) {
});
}
void row_cache::clear() {
with_allocator(_tracker.allocator(), [this] {
// We depend on clear_and_dispose() below not looking up any keys.
// Using with_linearized_managed_bytes() is no helps, because we don't
// want to propagate an exception from here.
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
future<> row_cache::clear() {
return invalidate(query::full_partition_range);
}
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
@@ -502,8 +503,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
});
if (blow_cache) {
// We failed to invalidate the key, presumably due to with_linearized_managed_bytes()
// running out of memory. Recover using clear(), which doesn't throw.
clear();
// running out of memory. Recover using clear_now(), which doesn't throw.
clear_now();
}
});
});
@@ -577,7 +578,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
});
}
void row_cache::invalidate(const dht::decorated_key& dk) {
future<> row_cache::invalidate(const dht::decorated_key& dk) {
return _populate_phaser.advance_and_await().then([this, &dk] {
_read_section(_tracker.region(), [&] {
with_allocator(_tracker.allocator(), [this, &dk] {
with_linearized_managed_bytes([&] {
@@ -585,17 +587,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) {
});
});
});
});
}
void row_cache::invalidate(const query::partition_range& range) {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate(unwrapped.first);
invalidate(unwrapped.second);
return;
}
future<> row_cache::invalidate(const query::partition_range& range) {
return _populate_phaser.advance_and_await().then([this, &range] {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate_unwrapped(unwrapped.first);
invalidate_unwrapped(unwrapped.second);
} else {
invalidate_unwrapped(range);
}
});
});
}
void row_cache::invalidate_unwrapped(const query::partition_range& range) {
logalloc::reclaim_lock _(_tracker.region());
auto cmp = cache_entry::compare(_schema);
@@ -621,7 +630,6 @@ void row_cache::invalidate(const query::partition_range& range) {
deleter(p);
});
});
});
}
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,

View File

@@ -184,13 +184,13 @@ private:
mutation_source _underlying;
key_source _underlying_keys;
// Synchronizes populating reads with update() to ensure that cache
// Synchronizes populating reads with updates of underlying data source to ensure that cache
// remains consistent across flushes with the underlying data source.
// Readers obtained from the underlying data source in earlier than
// current phases must not be used to populate the cache, unless they hold
// phaser::operation created in the reader's phase of origin. Readers
// should hold to a phase only briefly because this inhibits progress of
// update(). Phase changes occur only in update(), which can be assumed to
// updates. Phase changes occur in update()/clear(), which can be assumed to
// be asynchronous wrt invoking of the underlying data source.
utils::phased_barrier _populate_phaser;
@@ -204,6 +204,8 @@ private:
void on_miss();
void upgrade_entry(cache_entry&);
void invalidate_locked(const dht::decorated_key&);
void invalidate_unwrapped(const query::partition_range&);
void clear_now() noexcept;
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
public:
~row_cache();
@@ -228,7 +230,9 @@ public:
void populate(const mutation& m);
// Clears the cache.
void clear();
// Guarantees that cache will not be populated using readers created
// before this method was invoked.
future<> clear();
// Synchronizes cache with the underlying data source from a memtable which
// has just been flushed to the underlying data source.
@@ -240,11 +244,21 @@ public:
void touch(const dht::decorated_key&);
// Removes given partition from cache.
void invalidate(const dht::decorated_key&);
//
// Guarantees that cache will not be populated with given key
// using readers created before this method was invoked.
//
// The key must be kept alive until method resolves.
future<> invalidate(const dht::decorated_key& key);
// Removes given range of partitions from cache.
// The range can be a wrap around.
void invalidate(const query::partition_range&);
//
// Guarantees that cache will not be populated with partitions from that range
// using readers created before this method was invoked.
//
// The range must be kept alive until method resolves.
future<> invalidate(const query::partition_range&);
auto num_entries() const {
return _partitions.size();

View File

@@ -2058,7 +2058,6 @@ public:
auto write_timeout = exec->_proxy->_db.local().get_config().write_request_timeout_in_ms() * 1000;
auto delta = __int128_t(digest_resolver->last_modified()) - __int128_t(exec->_cmd->read_timestamp);
if (std::abs(delta) <= write_timeout) {
print("HERE %d\n", int64_t(delta));
exec->_proxy->_stats.global_read_repairs_canceled_due_to_concurrent_write++;
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
auto i = boost::range::remove_if(exec->_targets, std::not1(std::cref(db::is_local)));

View File

@@ -972,6 +972,28 @@ void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subsc
static stdx::optional<future<>> drain_in_progress;
future<> storage_service::stop_transport() {
return run_with_no_api_lock([] (storage_service& ss) {
return seastar::async([&ss] {
logger.info("Stop transport: starts");
gms::get_local_gossiper().stop_gossiping().get();
logger.info("Stop transport: stop_gossiping done");
ss.shutdown_client_servers().get();
logger.info("Stop transport: shutdown rpc and cql server done");
ss.do_stop_ms().get();
logger.info("Stop transport: shutdown messaging_service done");
auth::auth::shutdown().get();
logger.info("Stop transport: auth shutdown");
logger.info("Stop transport: done");
});
});
}
future<> storage_service::drain_on_shutdown() {
return run_with_no_api_lock([] (storage_service& ss) {
if (drain_in_progress) {
@@ -980,17 +1002,8 @@ future<> storage_service::drain_on_shutdown() {
return seastar::async([&ss] {
logger.info("Drain on shutdown: starts");
gms::get_local_gossiper().stop_gossiping().get();
logger.info("Drain on shutdown: stop_gossiping done");
ss.shutdown_client_servers().get();
logger.info("Drain on shutdown: shutdown rpc and cql server done");
ss.do_stop_ms().get();
logger.info("Drain on shutdown: shutdown messaging_service done");
auth::auth::shutdown().get();
logger.info("Drain on shutdown: auth shutdown");
ss.stop_transport().get();
logger.info("Drain on shutdown: stop_transport done");
ss.flush_column_families();
logger.info("Drain on shutdown: flush column_families done");
@@ -3007,7 +3020,7 @@ void storage_service::do_isolate_on_error(disk_error type)
if (must_isolate && !isolated.exchange(true)) {
logger.warn("Shutting down communications due to I/O errors until operator intervention");
// isolated protect us against multiple stops
service::get_storage_service().invoke_on_all([] (service::storage_service& s) { s.stop_native_transport(); });
service::get_local_storage_service().stop_transport();
}
}

View File

@@ -382,6 +382,8 @@ public:
future<> drain_on_shutdown();
future<> stop_transport();
void flush_column_families();
#if 0
/**

View File

@@ -83,13 +83,17 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
return weight;
}
bool compaction_manager::try_to_register_weight(column_family* cf, int weight) {
bool compaction_manager::try_to_register_weight(column_family* cf, int weight, bool parallel_compaction) {
auto it = _weight_tracker.find(cf);
if (it == _weight_tracker.end()) {
_weight_tracker.insert({cf, {weight}});
return true;
}
std::unordered_set<int>& s = it->second;
// Only one weight is allowed if parallel compaction is disabled.
if (!parallel_compaction && !s.empty()) {
return false;
}
// TODO: Maybe allow only *smaller* compactions to start? That can be done
// by returning true only if weight is not in the set and is lower than any
// entry in the set.
@@ -164,8 +168,7 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
sstables::compaction_strategy cs = cf.get_compaction_strategy();
descriptor = cs.get_sstables_for_compaction(cf, std::move(candidates));
weight = trim_to_compact(&cf, descriptor);
if (!try_to_register_weight(&cf, weight)) {
// Refusing compaction job because of an ongoing compaction with same weight.
if (!try_to_register_weight(&cf, weight, cs.parallel_compaction())) {
task->stopping = true;
_stats.pending_tasks--;
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
@@ -248,6 +251,51 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
return task;
}
// submit_sstable_rewrite() starts a compaction task, much like submit(),
// But rather than asking a compaction policy what to compact, this function
// compacts just a single sstable, and writes one new sstable. This operation
// is useful to split an sstable containing data belonging to multiple shards
// into a separate sstable on each shard.
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
// The semaphore ensures that the sstable rewrite operations submitted by
// submit_sstable_rewrite are run in sequence, and not all of them in
// parallel. Note that unlike general compaction which currently allows
// different cfs to compact in parallel, here we don't have a semaphore
// per cf, so we only get one rewrite at a time on each shard.
static thread_local semaphore sem(1);
// We cannot, and don't need to, compact an sstable which is already
// being compacted anyway.
if (_stopped || _compacting_sstables.count(sst)) {
return;
}
// Conversely, we don't want another compaction job to compact the
// sstable we are planning to work on:
_compacting_sstables.insert(sst);
auto task = make_lw_shared<compaction_manager::task>();
_tasks.push_back(task);
_stats.active_tasks++;
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
return cf->compact_sstables(sstables::compaction_descriptor(
std::vector<sstables::shared_sstable>{sst},
sst->get_sstable_level(),
std::numeric_limits<uint64_t>::max()), false);
}).then_wrapped([this, sst, task] (future<> f) {
_compacting_sstables.erase(sst);
_stats.active_tasks--;
_tasks.remove(task);
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stop_exception& e) {
cmlog.info("compaction info: {}", e.what());
_stats.errors++;
} catch (...) {
cmlog.error("compaction failed: {}", std::current_exception());
_stats.errors++;
}
});
}
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
task->stopping = true;
return task->compaction_gate.close().then([task] {

View File

@@ -81,9 +81,9 @@ private:
// It will not accept new requests in case the manager was stopped.
bool can_submit();
// If weight is not taken for the column family, weight is registered and
// true is returned. Return false otherwise.
bool try_to_register_weight(column_family* cf, int weight);
// Return true if weight is not registered. If parallel_compaction is not
// true, only one weight is allowed to be registered.
bool try_to_register_weight(column_family* cf, int weight, bool parallel_compaction);
// Deregister weight for a column family.
void deregister_weight(column_family* cf, int weight);
@@ -109,6 +109,13 @@ public:
// Submit a column family to be cleaned up and wait for its termination.
future<> perform_cleanup(column_family* cf);
// Submit a specific sstable to be rewritten, while dropping data which
// does not belong to this shard. Meant to be used on startup when an
// sstable is shared by multiple shards, and we want to split it to a
// separate sstable for each shard.
void submit_sstable_rewrite(column_family* cf,
sstables::shared_sstable s);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);

View File

@@ -56,6 +56,9 @@ public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
return true;
}
};
//
@@ -402,6 +405,10 @@ public:
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
virtual bool parallel_compaction() const override {
return false;
}
virtual compaction_strategy_type type() const {
return compaction_strategy_type::leveled;
}
@@ -439,6 +446,9 @@ compaction_strategy_type compaction_strategy::type() const {
compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) {
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
}
bool compaction_strategy::parallel_compaction() const {
return _compaction_strategy_impl->parallel_compaction();
}
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
::shared_ptr<compaction_strategy_impl> impl;

View File

@@ -175,10 +175,8 @@ public:
if (previous != nullptr && current_first.tri_compare(s, previous->get_last_decorated_key(s)) <= 0) {
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 " \
"or due to the fact that you have dropped sstables from another node into the data directory. " \
"Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also " \
"have rows out-of-order within an sstable",
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by the fact that you have dropped " \
"sstables from another node into the data directory. Sending back to L0.",
level, previous->get_filename(), previous->get_first_partition_key(s), previous->get_last_partition_key(s),
current->get_filename(), current->get_first_partition_key(s), current->get_last_partition_key(s));

View File

@@ -228,7 +228,7 @@ future<> stream_session::on_initialization_complete() {
}
_stream_result->handle_session_prepared(this->shared_from_this());
} catch (...) {
sslog.error("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
throw;
}
return make_ready_future<>();
@@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() {
return ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
}).handle_exception([id, plan_id] (auto ep) {
sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
std::rethrow_exception(ep);
});
}).then([this] {
@@ -248,7 +248,7 @@ future<> stream_session::on_initialization_complete() {
}
void stream_session::on_error() {
sslog.error("[Stream #{}] Streaming error occurred", plan_id());
sslog.warn("[Stream #{}] Streaming error occurred", plan_id());
// fail session
close_session(stream_session_state::FAILED);
}
@@ -270,7 +270,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
db.find_column_family(ks, cf);
} catch (no_such_column_family) {
auto err = sprint("[Stream #{}] prepare requested ks={} cf={} does not exist", ks, cf);
sslog.error(err.c_str());
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
}
@@ -284,7 +284,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
db.find_column_family(cf_id);
} catch (no_such_column_family) {
auto err = sprint("[Stream #{}] prepare cf_id=%s does not exist", plan_id, cf_id);
sslog.error(err.c_str());
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
prepare_receiving(summary);

View File

@@ -85,41 +85,41 @@ struct send_info {
};
future<stop_iteration> do_send_mutations(auto si, auto fm) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
return stop_iteration::no;
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
});
return make_ready_future<stop_iteration>(stop_iteration::no);
}
future<> send_mutations(auto si) {
auto& cf = si->db.find_column_family(si->cf_id);
auto& priority = service::get_local_streaming_read_priority();
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
return repeat([si, &reader] () {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return repeat([si, &reader] {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
});
}).then([si] {
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
auto cf_id = this->cf_id;
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
auto cf_id = this->cf_id;
@@ -153,7 +153,7 @@ void stream_transfer_task::start() {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,
cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) {
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
std::rethrow_exception(ep);
});
}).then([this, id, plan_id, cf_id] {
@@ -161,7 +161,7 @@ void stream_transfer_task::start() {
session->start_keep_alive_timer();
session->transfer_task_completed(cf_id);
}).handle_exception([this, plan_id, id] (auto ep){
sslog.error("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
this->session->on_error();
});
}

View File

@@ -546,27 +546,33 @@ static std::vector<mutation> updated_ring(std::vector<mutation>& mutations) {
return result;
}
static mutation_source make_mutation_source(std::vector<lw_shared_ptr<memtable>>& memtables) {
return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
}
static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtable>>& memtables) {
return key_source([s, &memtables] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
}
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
auto memtables_key_source = key_source([&] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
throttled_mutation_source cache_source(memtables_data_source);
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, memtables_key_source, tracker);
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
@@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
auto some_element = keys_in_cache.begin() + 547;
std::vector<dht::decorated_key> keys_not_in_cache;
keys_not_in_cache.push_back(*some_element);
cache.invalidate(*some_element);
cache.invalidate(*some_element).get();
keys_in_cache.erase(some_element);
for (auto&& key : keys_in_cache) {
@@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
{ *some_range_begin, true }, { *some_range_end, false }
);
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
cache.invalidate(range);
cache.invalidate(range).get();
keys_in_cache.erase(some_range_begin, some_range_end);
for (auto&& key : keys_in_cache) {
@@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) {
});
}
SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
auto ring = make_ring(s, 3);
for (auto&& m : ring) {
mt1->apply(m);
}
auto mt2 = make_lw_shared<memtable>(s);
auto ring2 = updated_ring(ring);
for (auto&& m : ring2) {
mt2->apply(m);
}
cache_source.block();
auto rd1 = cache.make_reader(s);
auto rd1_result = rd1();
sleep(10ms).get();
memtables.clear();
memtables.push_back(mt2);
// This update should miss on all partitions
auto cache_cleared = cache.clear();
auto rd2 = cache.make_reader(s);
// rd1, which is in progress, should not prevent forward progress of clear()
cache_source.unblock();
cache_cleared.get();
// Reads started before memtable flush should return previous value, otherwise this test
// doesn't trigger the conditions it is supposed to protect against.
assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]);
assert_that(rd1().get0()).has_no_mutation();
// Reads started after clear but before previous populations completed
// should already see the new data
assert_that(std::move(rd2))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
// Reads started after clear should see new data
assert_that(cache.make_reader(s))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
return seastar::async([] {
auto s = make_schema();
@@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
}
// wrap-around
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()}));
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());
@@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
verify_does_not_have(cache, ring[7].decorated_key());
// not wrap-around
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()}));
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());

View File

@@ -49,3 +49,17 @@ bool is_system_error_errno(int err_no)
code.category() == std::system_category();
});
}
bool should_stop_on_system_error(const std::system_error& e) {
if (e.code().category() == std::system_category()) {
// Whitelist of errors that don't require us to stop the server:
switch (e.code().value()) {
case EEXIST:
case ENOENT:
return false;
default:
break;
}
}
return true;
}

View File

@@ -196,7 +196,7 @@ inline ihistogram operator +(ihistogram a, const ihistogram& b) {
struct rate_moving_average {
uint64_t count = 0;
double rates[3] = {0};
double mean_rate;
double mean_rate = 0;
rate_moving_average& operator +=(const rate_moving_average& o) {
count += o.count;
mean_rate += o.mean_rate;