Compare commits

...

52 Commits

Author SHA1 Message Date
Nadav Har'El
3bbd260a3e Rewriting shared sstables only after all shards loaded sstables
After commit faa4581, each shard only starts splitting its shared sstables
after opening all sstables. This was important because compaction needs to
be aware of all sstables.

However, another bug remained: If one shard finishes loading its sstables
and starts the splitting compactions, and in parallel a different shard is
still opening sstables - the second shard might find a half-written sstable
being written by the first shard, and abort on a malformed sstable.

So in this patch we start the shared sstable rewrites - on all shards -
only after all shards finished loading their sstables. Doing this is easy,
because main.cc already contains a list of sequential steps where each
uses invoke_on_all() to make sure the step completes on all shards before
continuing to the next step.

Fixes #1371

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1466426641-3972-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit 3372052d48)
2016-06-20 18:20:23 +03:00
Nadav Har'El
4905f8a49d Rewrite shared sstables only after entire CF is read
Starting in commit 721f7d1d4f, we start "rewriting" a shared sstable (i.e.,
splitting it into individual shards) as soon as it is loaded in each shard.

However as discovered in issue #1366, this is too soon: Our compaction
process relies in several places that compaction is only done after all
the sstables of the same CF have been loaded. One example is that we
need to know the content of the other sstables to decide which tombstones
we can expire (this is issue #1366). Another example is that we use the
last generation number we are aware of to decide the number of the next
compaction output - and this is wrong before we saw all sstables.

So with this patch, while loading sstables we only make a list of shared
sstables which need to be rewritten - and the actual rewrite is only started
when we finish reading all the sstables for this CF. We need to do this in
two cases: reboot (when we load all the existing sstables we find on disk),
and nodetool referesh (when we import a set of new sstables).

Fixes #1366.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1466344078-31290-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit faa45812b2)
2016-06-19 17:19:45 +03:00
Asias He
ad2b7d1e8c repair: Switch log level to warn instead of error
dtest takes error level log as serious error. It is not a serious error
for streaming to fail to send a verb and fail a streaming session which
triggers a repair failure, for example, the peer node is gone or
stopped. Switch to use log level warn instead of level error.

Fixes repair_additional_test.py:RepairAdditionalTest.repair_kill_3_test

Fixes: #1335
Message-Id: <406fb0c4a45b81bd9c0aea2a898d7ca0787b23e9.1465979288.git.asias@scylladb.com>
(cherry picked from commit de0fd98349)
2016-06-18 11:43:39 +03:00
Asias He
80e4e2da38 streaming: Switch log level to warn instead of error
dtest takes error level log as serious error. It is not a serious error
for streaming to fail to send a verb and fail a streaming session, for
example, the peer node is gone or stopped. Switch to use log level warn
instead of level error.

Fixes repair_additional_test.py:RepairAdditionalTest.repair_kill_3_test

Fixes: #1335
Message-Id: <0149d30044e6e4d80732f1a20cd20593de489fc8.1465979288.git.asias@scylladb.com>
(cherry picked from commit 94c9211b0e)
2016-06-18 11:43:28 +03:00
Asias He
29da6fa5b4 streaming: Fix indention in do_send_mutations
Message-Id: <bc8cfa7c7b29f08e70c0af6d2fb835124d0831ac.1464857352.git.asias@scylladb.com>
(cherry picked from commit 96463cc17c)
2016-06-18 11:43:11 +03:00
Pekka Enberg
ad1af17aad release: prepare for 1.1.3 2016-06-16 13:39:45 +03:00
Pekka Enberg
7e052a4e91 service/storage_service: Make do_isolate_on_error() more robust
Currently, we only stop the CQL transport server. Extract a
stop_transport() function from drain_on_shutdown() and call it from
do_isolate_on_error() to also shut down the inter-node RPC transport,
Thrift, and other communications services.

Fixes #1353

(cherry picked from commit d72c608868)

Conflicts:
	service/storage_service.cc
2016-06-16 13:37:06 +03:00
Pekka Enberg
787d8f88d3 release: prepare for 1.1.3.rc1 2016-06-15 10:19:21 +03:00
Pekka Enberg
d4e1a25858 Merge "Rebase "Rewrite shared sstables soon after startup" to 1.1" from Glauber
"This is a rebase of Nadav's "Rewrite shared sstables soon after startup" to 1.1.
Aside from minor issues, there are two major conflicts with 1.1:

 1) Parallel compactions within the same CF,
 2) Cache invalidation fixes.

Because 2) is fixing an actual bug that would bite us in 1.1 as well, I have
decided to, exercising my discretion, backport that as well. It has the nice
side-effect of getting the database.cc-side of the conflicts disappear and
apply cleanly. The patches are also fairly simple.

On the other hand, 1) has no place in 1.1, and I have manually rebased it.

I have executed the following dtests in the updated tree, which are all passing:

compaction_test.py:TestCompaction_with_LeveledCompactionStrategy.compaction_delete_test
compaction_test.py:TestCompaction_with_SizeTieredCompactionStrategy.compaction_delete_test
compaction_test.py:TestCompaction_with_LeveledCompactionStrategy.compaction_delete_2_test
compaction_test.py:TestCompaction_with_SizeTieredCompactionStrategy.compaction_delete_2_test
compaction_additional_test.py"
2016-06-15 09:44:28 +03:00
Nadav Har'El
5243c77fcb Rewrite shared sstables soon after startup
Several shards may share the same sstable - e.g., when re-starting scylla
with a different number of shards, or when importing sstables from an
external source. Sharing an sstable is fine, but it can result in excessive
disk space use because the shared sstable cannot be deleted until all
the shards using it have finished compacting it. Normally, we have no idea
when the shards will decide to compact these sstables - e.g., with size-
tiered-compaction a large sstable will take a long time until we decide
to compact it. So what this patch does is to initiate compaction of the
shared sstables - on each shard using it - so that a soon as possible after
the restart, we will have the original sstable is split into separate
sstables per shard, and the original sstable can be deleted. If several
sstables are shared, we serialize this compaction process so that each
shard only rewrites one sstable at a time. Regular compactions may happen
in parallel, but they will not not be able to choose any of the shared
sstables because those are already marked as being compacted.

Commit 3f2286d0 increased the need for this patch, because since that
commit, if we don't delete the shared sstable, we also cannot delete
additional sstables which the different shards compacted with it. For one
scylla user, this resulted in so much excessive disk space use, that it
literally filled the whole disk.

After this patch commit 3f2286d0, or the discussion in issue #1318 on how
to improve it, is no longer necessary, because we will never compact a shared
sstable together with any other sstable - as explained above, the shared
sstables are marked as "being compacted" so the regular compactions will
avoid them.

Fixes #1314.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1465406235-15378-1-git-send-email-nyh@scylladb.com>
Reviewed-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

1.1 Rebase: Glauber Costa <glauber@scylladb.com>

Conflicts:
	sstables/compaction_manager.cc

Rebase notes: this patch conflicts heavily with a previous series that allows
parallel compactions on the same CF. That machinery is not 1.1 material, so I
had to work around it. Most relevant changes:
 - changed tasks vector to a list (for consistency with parallel compaction)
 - each shard will compact task_nr + 1 tasks, the extra one being the rewrite
   task.
2016-06-14 13:02:42 -04:00
Tomasz Grabiec
d8768257fe row_cache: Make stronger guarantees in clear/invalidate
Correctness of current uses of clear() and invalidate() relies on fact
that cache is not populated using readers created before
invalidation. Sstables are first modified and then cache is
invalidated. This is not guaranteed by current implementation
though. As pointed out by Avi, a populating read may race with the
call to clear(). If that read started before clear() and completed
after it, the cache may be populated with data which does not
correspond to the new sstable set.

To provide such guarantee, invalidate() variants were adjusted to
synchronize using _populate_phaser, similarly like row_cache::update()
does.

Conflicts:
    database.cc

1.1 Rebase: Glauber Costa <glauber@scylladb.com>

Rebase notes: the conflicts are due to unrelated changes in how we
invoke the flush functions for streaming memtables. Those changes
are definitely not needed by 1.1
2016-06-14 12:03:00 -04:00
Tomasz Grabiec
c2c4e842fc row_cache: Implement clear() using invalidate()
Reduces code duplication.

1.1 Rebase: Glauber Costa <glauber@scylladb.com> - no changes needed.
2016-06-14 12:03:00 -04:00
Raphael S. Carvalho
61d4bf3800 db: fix read consistency after refresh
If sstable loaded by refresh covers a row that is cached by the
column family, read query may fail to return consistent data.
What we should do is to clear cache for the column family being
loaded with new sstables.

Fixes #1212.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <a08c9885a5ceb0b2991e40337acf5b7679580a66.1464072720.git.raphaelsc@scylladb.com>

1.1 Rebase: Glauber Costa <glauber@scylladb.com> - no changes needed
2016-06-14 12:02:27 -04:00
Pekka Enberg
1df0e435e8 utils/exceptions: Whitelist EEXIST and ENOENT in should_stop_on_system_error()
There are various call-sites that explicitly check for EEXIST and
ENOENT:

  $ git grep "std::error_code(E"
  database.cc:                            if (e.code() != std::error_code(EEXIST, std::system_category())) {
  database.cc:            if (e.code() != std::error_code(ENOENT, std::system_category())) {
  database.cc:        if (e.code() != std::error_code(ENOENT, std::system_category())) {
  database.cc:                            if (e.code() != std::error_code(ENOENT, std::system_category())) {
  sstables/sstables.cc:            if (e.code() == std::error_code(ENOENT, std::system_category())) {
  sstables/sstables.cc:            if (e.code() == std::error_code(ENOENT, std::system_category())) {

Commit 961e80a ("Be more conservative when deciding when to shut down
due to disk errors") turned these errors into a storage_io_exception
that is not expected by the callers, which causes 'nodetool snapshot'
functionality to break, for example.

Whitelist the two error codes to revert back to the old behavior of
io_check().
Message-Id: <1465454446-17954-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 8df5aa7b0c)
2016-06-14 15:27:22 +03:00
Avi Kivity
4e6f09ad95 Be more conservative when deciding when to shut down due to disk errors
Currently we only shut down on EIO.  Expand this to shut down on any
system_error.

This may cause us to shut down prematurely due to a transient error,
but this is better than not shutting down due to a permanent error
(such as ENOSPC or EPERM).  We may whitelist certain errors in the future
to improve the behavior.

Fixes #1311.
Message-Id: <1465136956-1352-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 961e80ab74)
2016-06-14 15:27:18 +03:00
Pekka Enberg
426316a4b7 release: prepare for 1.1.2 2016-06-06 14:47:32 +03:00
Pekka Enberg
3289010910 Revert "dist/common/scripts: update SET_NIC when --setup-nic passed to scylla_sysconfig_setup"
This reverts commit 73fa36b416.

Fixes #1301.
2016-06-06 14:46:32 +03:00
Asias He
52c9723e04 streaming: Reduce memory usage when sending mutations
Limit disk bandwidth to 5MB/s to emulate a slow disk:
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.write_bps_device
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.read_bps_device

Start scylla node 1 with low memory:
scylla -c 1 -m 128M --auto-bootstrap false

Run c-s:
taskset -c 7 cassandra-stress write duration=5m cl=ONE -schema
'replication(factor=1)' -pop seq=1..100000  -rate threads=20
limit=2000/s -node 127.0.0.1

Start scylla node 2 with low memory:
scylla -c 1 -m 128M --auto-bootstrap true

Without this patch, I saw std::bad_alloc during streaming

ERROR 2016-06-01 14:31:00,196 [shard 0] storage_proxy - exception during
mutation write to 127.0.0.1: std::bad_alloc (std::bad_alloc)
...
ERROR 2016-06-01 14:31:10,172 [shard 0] database - failed to move
memtable to cache: std::bad_alloc (std::bad_alloc)
...

To fix:

1. Apply the streaming mutation limiter before we read the mutation into
memory to avoid wasting memory holding the mutation which we can not
send.

2. Reduce the parallelism of sending streaming mutations. Before we send each
range in parallel, after we send each range one by one.

   before: nr_vnode * nr_shard * (send_info + cf.make_reader memory usage)

   after: nr_shard * (send_info + cf.make_reader memory usage)

We can at least save memory usage by the factor of nr_vnode, 256 by
default.

In my setup, fix 1) alone is not enough, with both fix 1) and 2), I saw
no std::bad_alloc. Also, I did not see streaming bandwidth dropped due
to 2).

In addition, I tested grow_cluster_test.py:GrowClusterTest.test_grow_3_to_4,
as described:

https://github.com/scylladb/scylla/issues/1270#issuecomment-222585375

With this patch, I saw no std::bad_alloc any more.

Fixes: #1270

Message-Id: <7703cf7a9db40e53a87f0f7b5acbb03fff2daf43.1464785542.git.asias@scylladb.com>
(cherry picked from commit 206955e47c)

Conflicts:
	streaming/stream_transfer_task.cc
2016-06-02 11:08:19 +03:00
Pekka Enberg
3cc91eeb84 release: prepare for 1.1.1 2016-05-26 12:58:39 +03:00
Pekka Enberg
d67ee37bbc Update seastar submodule
* seastar 85bdfb7...b80564d (1):
  > reactor: advertise the logging_failures metric as a DERIVE counter
2016-05-26 12:57:54 +03:00
Raphael S. Carvalho
fcbe43cc87 sstables: optimize leveled compaction strategy
Leveled compaction strategy is doing a lot of work whenever it's asked to get
a list of sstables to be compacted. It's checking if a sstable overlaps with
another sstable in the same level twice. First, when adding a sstable to a
list with sstables at the same level. Second, after adding all sstables to
their respective lists.

It's enough to check that a sstable creates an overlap in its level only once.
So I am changing the code to unconditionally insert a sstable to its respective
list, and after that, it will call repair_overlapping_sstables() that will send
any sstable that creates an overlap in its level to L0 list.

By the way, the optimization isn't in the compaction itself, instead in the
strategy code that gets a set of sstables to be compacted.

Reviewed-by: Nadav Har'El <nyh@scylladb.com>
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <8c8526737277cb47987a3a5dbd5ff3bb81a6d038.1461965074.git.raphaelsc@scylladb.com>
(cherry picked from commit ae95ce1bd7)
2016-05-24 15:56:10 +03:00
Pekka Enberg
ef79310b3c dist/docker: Use Scylla 1.1 RPM repository 2016-05-23 10:16:13 +03:00
Pekka Enberg
f7e81c7b7d dist/docker: Fetch RPM repository from Scylla web site
Fix the hard-coded Scylla RPM repository by downloading it from Scylla
web site. This makes it easier to switch between different versions.

Message-Id: <1463981271-25231-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 8a7197e390)
2016-05-23 10:15:17 +03:00
Raphael S. Carvalho
54224dfaa0 tests: check that overlapping sstable has its level changed to 0
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit cbc2e96a58)
2016-05-20 13:42:32 +03:00
Raphael S. Carvalho
e30199119c db: fix migration of sstables with level greater than 0
Refresh will rewrite statistics of any migrated sstable with level
> 0. However, this operation is currently not working because O_EXCL
flag is used, meaning that create will fail.

It turns out that we don't actually need to change on-disk level of
a sstable by overwriting statistics file.
We can only set in-memory level of a sstable to 0. If Scylla reboots
before all migrated sstables are compacted, leveled strategy is smart
enough to detect sstables that overlap, and set their in-memory level
to 0.

Fixes #1124.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit ee0f66eef6)
2016-05-20 13:42:27 +03:00
Raphael S. Carvalho
07ce4ec032 main: stop compaction manager earlier
Avi says:
"During shutdown, we prevent new compactions, but perhaps too late.
Memtables are flushed and these can trigger compaction."

To solve that, let's stop compaction manager at a very early step
of shutdown. We will still try to stop compaction manager in
database::stop() because user may ask for a shutdown before scylla
was fully started. It's fine to stop compaction manager twice.
Only the first call will actually stop the manager.

Fixes #1238.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <c64ab11f3c91129c424259d317e48abc5bde6ff3.1462496694.git.raphaelsc@scylladb.com>
(cherry picked from commit bf18025937)
2016-05-20 13:41:11 +03:00
Asias He
c7c18d9c0c gms: Optimize gossiper::is_alive
In perf-flame, I saw in

service::storage_proxy::create_write_response_handler (2.66% cpu)

  gossiper::is_alive takes 0.72% cpu
  locator::token_metadata::pending_endpoints_for takes 1.2% cpu

After this patch:

service::storage_proxy::create_write_response_handler (2.17% cpu)

  gossiper::is_alive does not show up at all
  locator::token_metadata::pending_endpoints_for takes 1.3% cpu

There is no need to copy the endpoint_state from the endpoint_state_map
to check if a node is alive. Optimize it since gossiper::is_alive is
called in the fast path.

Message-Id: <2144310aef8d170cab34a2c96cb67cabca761ca8.1463540290.git.asias@scylladb.com>
(cherry picked from commit eb9ac9ab91)
2016-05-20 13:03:44 +03:00
Asias He
2a4582ab9f token_metadata: Speed up pending_endpoints_for
pending_endpoints_for is called frequently by
storage_proxy::create_write_response_handler when doing cql query.

Before this patch, each call to pending_endpoints_for involves
converting a multimap (std::unordered_multimap<range<token>,
inet_address>>) to map (std::unordered_map<range<token>,
std::unordered_set<inet_address>>).

To speed up the token to pending endpoint mapping search, a interval map
is introduced. It is faster than searching the map linearly and can
avoid caching the token/pending endpoint mapping.

With this patch, the operations per second drop during adding node
period gets much better.

Before:
45K to 10K

After:
45k to 38K

(The number is measured with the streaming code skipping to send data to
rule out the streaming factor.)

Refs: #1223
(cherry picked from commit 089734474b)
2016-05-20 13:03:31 +03:00
Asias He
ab5e23f6e7 dht: Add default constructor for token
It is needed to put token in to a boost interval_map in the following
patch.

(cherry picked from commit ee0585cee9)
2016-05-20 13:03:24 +03:00
Calle Wilund
fecea15a25 cql3::statements::cf_prop_defs: Fix compation min/max not handled
Property parsing code was looking at wrong property level
for initial guard statement.

Fixes #1257

Message-Id: <1462967584-2875-1-git-send-email-calle@scylladb.com>
(cherry picked from commit 5604fb8aa3)
2016-05-18 14:19:27 +03:00
Tomasz Grabiec
dce549f44f tests: Add unit tests for schema_registry
(cherry picked from commit 90c31701e3)
2016-05-16 10:52:02 +03:00
Tomasz Grabiec
26a3302957 schema_registry: Fix possible hang in maybe_sync() if syncer doesn't defer
Spotted during code review.

If it doesn't defer, we may execute then_wrapped() body before we
change the state. Fix by moving then_wrapped() body after state changes.

(cherry picked from commit 443e5aef5a)
2016-05-16 10:51:54 +03:00
Tomasz Grabiec
f796d8081b migration_manager: Fix schema syncing with older version
The problem was that "s" would not be marked as synced-with if it came from
shard != 0.

As a result, mutation using that schema would fail to apply with an exception:

  "attempted to mutate using not synced schema of ..."

The problem could surface when altering schema without changing
columns and restarting one of the nodes so that it forgets past
versions.

Fixes #1258.

Will be covered by dtest:

  SchemaManagementTest.test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns

(cherry picked from commit 8703136a4f)
2016-05-16 10:51:48 +03:00
Pekka Enberg
b850cb991c release: prepare for 1.1.0 2016-05-16 09:33:26 +03:00
Tomasz Grabiec
734cfa949a migration_manager: Invalidate prepared statements on every schema change
Currently we only do that when column set changes. When prepared
statements are executed, paramaters like read repair chance are read
from schema version stored in the statement. Not invalidating prepared
statements on changes of such parameters will appear as if alter took
no effect.

Fixes #1255.
Message-Id: <1462985495-9767-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 13d8cd0ae9)
2016-05-12 09:18:00 +03:00
Calle Wilund
3606e3ab29 transport::server: Do not treat accept exception as fatal
1.) It most likely is not, i.e. either tcp or more likely, ssl
    negotiation failure. In any case, we can still try next
    connection.
2.) Not retrying will cause us to "leak" the accept, and then hang
    on shutdown.

Also, promote logging message on accept exception to "warn", since
dtest(s?) depend on seeing log output.

Message-Id: <1462283265-27051-4-git-send-email-calle@scylladb.com>
(cherry picked from commit 917bf850fa)
2016-05-10 19:26:29 +03:00
Calle Wilund
014284de00 cql_server: Use credentials_builder to init tls
Slightly cleaner, and shard-safe tls init.

Message-Id: <1462283265-27051-3-git-send-email-calle@scylladb.com>
(cherry picked from commit 437ebe7128)
2016-05-10 19:26:23 +03:00
Calle Wilund
f17764e74a messaging_service: Change tls init to use credentials_builder
To simplify init of msg service, use credendials_builder
to encapsulate tls options so actual credentials can be
more easily created in each shard.

Message-Id: <1462283265-27051-2-git-send-email-calle@scylladb.com>
(cherry picked from commit 58f7edb04f)
2016-05-10 19:26:18 +03:00
Avi Kivity
8643028d0c Update seastar submodule
* seastar 73d5583...85bdfb7 (4):
  > tests/mkcert.gmk: Fix makefile bug in snakeoil cert generator
  > tls_test: Add case to do a little checking of credentials_builder
  > tls: Add credentials_builder - copyable credentials "factory"
  > tls_test: Add test for large-ish buffer send/recieve
2016-05-10 19:24:49 +03:00
Avi Kivity
a35f1d765a Backport seastar iotune fixes
* seastar dab58e4...73d5583 (2):
  > iotune: don't coredump when directory fails to be created
  > iotune: improve recommendation in case we timeout

Fixes #1243.
2016-05-09 10:49:15 +03:00
Avi Kivity
3116a92b0e Point seastar submodule at scylla-seastar repository
Allows us to backport seastar fixes to branch-1.1.
2016-05-08 14:49:02 +03:00
Gleb Natapov
dad312ce0a tests: test for result row counting
Message-Id: <1462377579-2419-2-git-send-email-gleb@scylladb.com>
(cherry picked from commit f1cd52ff3f)
2016-05-06 13:32:36 +03:00
Gleb Natapov
3cae56f3e3 query: fix result row counting for results with multiple partitions
Message-Id: <1462377579-2419-1-git-send-email-gleb@scylladb.com>
(cherry picked from commit b75475de80)
2016-05-06 13:32:29 +03:00
Calle Wilund
656a10c4b8 storage_service: Add logging to match origin
Pointing out if CQL server is listing in SSL mode.
Message-Id: <1462368016-32394-2-git-send-email-calle@scylladb.com>

(cherry picked from commit 709dd82d59)
2016-05-06 13:30:29 +03:00
Calle Wilund
c04b3de564 messaging_service: Add logging to match origin
To announce rpc port + ssl if on.

Message-Id: <1462368016-32394-1-git-send-email-calle@scylladb.com>
(cherry picked from commit d8ea85cd90)
2016-05-06 13:26:01 +03:00
Gleb Natapov
4964fe4cf0 storage_proxy: stop range query with limit after the limit is reached
(cherry picked from commit 3039e4c7de)
2016-05-06 12:50:51 +03:00
Gleb Natapov
e3ad3cf7d9 query: put live row count into query::result
The patch calculates row count during result building and while merging.
If one of results that are being merged does not have row count the
merged result will not have one either.

(cherry picked from commit db322d8f74)
2016-05-06 12:50:47 +03:00
Gleb Natapov
cef40627a7 storage_proxy: fix calculation of concurrency queried ranges
(cherry picked from commit 41c586313a)
2016-05-06 12:50:37 +03:00
Gleb Natapov
995820c08a storage_proxy: add logging for range query row count estimation
(cherry picked from commit c364ab9121)
2016-05-06 12:50:32 +03:00
Calle Wilund
b78abd7649 auth: Make auth.* schemas use deterministic UUIDs
In initial implementation I figured this was not required, but
we get issues communicating across nodes if system tables
don't have the same UUID, since creation is forcefully local, yet
shared.

Just do a manual re-create of the scema with a name UUID, and
use migration manager directly.
Message-Id: <1462194588-11964-1-git-send-email-calle@scylladb.com>

(cherry picked from commit 6d2caedafd)
2016-05-03 10:49:16 +03:00
Calle Wilund
d47c62b51c messaging_service: Change init to use per-shard tls credentials
Fixes: #1220

While the server_credentials object is technically immutable
(esp with last change in seastar), the ::shared_ptr holding them
is not safe to share across shards.

Pre-create cpu x credentials and then move-hand them out in service
start-up instead.

Fixes assertion error in debug builds. And just maybe real memory
corruption in release.

Requires seastar tls change:
"Change server_credentials to copy dh_params input"

Message-Id: <1462187704-2056-1-git-send-email-calle@scylladb.com>
(cherry picked from commit 751ba2f0bf)
2016-05-02 15:37:43 +03:00
Pekka Enberg
bef19e7f9e release: prepare for 1.1.rc1 2016-04-29 08:49:10 +03:00
50 changed files with 870 additions and 297 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.1.3
if test -f version
then

View File

@@ -354,9 +354,12 @@ future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
::shared_ptr<cql3::statements::create_table_statement> statement =
static_pointer_cast<cql3::statements::create_table_statement>(
parsed->prepare(db)->statement);
// Origin sets "Legacy Cf Id" for the new table. We have no need to be
// pre-2.1 compatible (afaik), so lets skip a whole lotta hoolaballo
return statement->announce_migration(qp.proxy(), false).then([statement](bool) {});
auto schema = statement->get_cf_meta_data();
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
schema_builder b(schema);
b.set_uuid(uuid);
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
}
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {

View File

@@ -162,6 +162,7 @@ modes = {
scylla_tests = [
'tests/mutation_test',
'tests/schema_registry_test',
'tests/canonical_mutation_test',
'tests/range_test',
'tests/types_test',

View File

@@ -432,10 +432,9 @@ void query_processor::migration_subscriber::on_update_keyspace(const sstring& ks
void query_processor::migration_subscriber::on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed)
{
if (columns_changed) {
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
remove_invalid_prepared_statements(ks_name, cf_name);
}
// #1255: Ignoring columns_changed deliberately.
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
remove_invalid_prepared_statements(ks_name, cf_name);
}
void query_processor::migration_subscriber::on_update_user_type(const sstring& ks_name, const sstring& type_name)

View File

@@ -162,7 +162,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
}
std::experimental::optional<sstring> tmp_value = {};
if (has_property(KW_MINCOMPACTIONTHRESHOLD)) {
if (has_property(KW_COMPACTION)) {
if (get_compaction_options().count(KW_MINCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_options().at(KW_MINCOMPACTIONTHRESHOLD);
}
@@ -170,7 +170,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
int min_compaction_threshold = to_int(KW_MINCOMPACTIONTHRESHOLD, tmp_value, builder.get_min_compaction_threshold());
tmp_value = {};
if (has_property(KW_MAXCOMPACTIONTHRESHOLD)) {
if (has_property(KW_COMPACTION)) {
if (get_compaction_options().count(KW_MAXCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_options().at(KW_MAXCOMPACTIONTHRESHOLD);
}

View File

@@ -475,12 +475,75 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
return (s1 <= me) && (me <= s2);
}
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
auto key_shard = [&s] (const partition_key& pk) {
auto token = dht::global_partitioner().get_token(s, pk);
return dht::shard_of(token);
};
auto s1 = key_shard(first);
auto s2 = key_shard(last);
auto me = engine().cpu_id();
return (s1 != me) || (me != s2);
}
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
}
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
}
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, r)) {
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
sst->mark_for_deletion();
return make_ready_future<>();
}
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
if (in_other_shard) {
// If we're here, this sstable is shared by this and other
// shard(s). Shared sstables cannot be deleted until all
// shards compacted them, so to reduce disk space usage we
// want to start splitting them now.
// However, we need to delay this compaction until we read all
// the sstables belonging to this CF, because we need all of
// them to know which tombstones we can drop, and what
// generation number is free.
_sstables_need_rewrite.push_back(sst);
}
if (reset_level) {
// When loading a migrated sstable, set level to 0 because
// it may overlap with existing tables in levels > 0.
// This step is optional, because even if we didn't do this
// scylla would detect the overlap, and bring back some of
// the sstables to level 0.
sst->set_sstable_level(0);
}
add_sstable(sst);
});
});
}
// load_sstable() wants to start rewriting sstables which are shared between
// several shards, but we can't start any compaction before all the sstables
// of this CF were loaded. So call this function to start rewrites, if any.
void column_family::start_rewrite() {
for (auto sst : _sstables_need_rewrite) {
dblog.info("Splitting {} for shard", sst->get_filename());
_compaction_manager.submit_sstable_rewrite(this, sst);
}
_sstables_need_rewrite.clear();
}
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
using namespace sstables;
@@ -505,24 +568,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
}
}
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
auto fut = sst->get_sstable_key_range(*_schema);
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, std::move(r))) {
dblog.debug("sstable {} not relevant for this shard, ignoring",
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
sstables::sstable::component_type::Data));
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
return make_ready_future<>();
}
auto fut = sst->load();
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
add_sstable(std::move(*sst));
return make_ready_future<>();
});
}).then_wrapped([fname, comps] (future<> f) {
return load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
try {
f.get();
} catch (malformed_sstable_exception& e) {
@@ -971,19 +1019,14 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
future<>
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
return parallel_for_each(new_tables, [this] (auto comps) {
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
return sst->load().then([this, sst] {
return sst->mutate_sstable_level(0);
}).then([this, sst] {
auto first = sst->get_first_partition_key(*_schema);
auto last = sst->get_last_partition_key(*_schema);
if (belongs_to_current_shard(*_schema, first, last)) {
this->add_sstable(sst);
} else {
sst->mark_for_deletion();
}
return make_ready_future<>();
});
return this->load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), _config.datadir,
comps.generation, comps.version, comps.format), true);
}).then([this] {
start_rewrite();
// Drop entire cache for this column family because it may be populated
// with stale data.
return get_row_cache().clear();
});
}
@@ -2186,7 +2229,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
// gotten all things to disk. Again, need queue-ish or something.
f = cf.flush();
} else {
cf.clear();
f = cf.clear();
}
return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable {
@@ -2562,21 +2605,24 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
// temporary counter measure.
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
return seal_active_streaming_memtable_delayed().finally([this, ranges = std::move(ranges)] {
if (_config.enable_cache) {
for (auto& range : ranges) {
_cache.invalidate(range);
}
if (!_config.enable_cache) {
return make_ready_future<>();
}
return do_with(std::move(ranges), [this] (auto& ranges) {
return parallel_for_each(ranges, [this](auto&& range) {
return _cache.invalidate(range);
});
});
});
});
}
void column_family::clear() {
_cache.clear();
future<> column_family::clear() {
_memtables->clear();
_memtables->add_memtable();
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
return _cache.clear();
}
// NOTE: does not need to be futurized, but might eventually, depending on
@@ -2603,13 +2649,13 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
_sstables = std::move(pruned);
dblog.debug("cleaning out row cache");
_cache.clear();
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
});
});
}

View File

@@ -309,6 +309,11 @@ private:
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
// sstables that are shared between several shards so we want to rewrite
// them (split the data belonging to this shard to a separate sstable),
// but for correct compaction we need to start the compaction only after
// reading all sstables.
std::vector<sstables::shared_sstable> _sstables_need_rewrite;
// Control background fibers waiting for sstables to be deleted
seastar::gate _sstable_deletion_gate;
// There are situations in which we need to stop writing sstables. Flushers will take
@@ -339,6 +344,7 @@ private:
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
void add_sstable(sstables::sstable&& sstable);
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
@@ -456,7 +462,7 @@ public:
future<> flush();
future<> flush(const db::replay_position&);
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
void clear(); // discards memtable(s) without flushing them to disk.
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
// Important warning: disabling writes will only have an effect in the current shard.
@@ -618,6 +624,7 @@ private:
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
void check_valid_rp(const db::replay_position&) const;
public:
void start_rewrite();
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;

View File

@@ -87,6 +87,10 @@ public:
// [0x00, 0x80] == 1/512
// [0xff, 0x80] == 1 - 1/512
managed_bytes _data;
token() : _kind(kind::before_all_keys) {
}
token(kind k, managed_bytes d) : _kind(std::move(k)), _data(std::move(d)) {
}

View File

@@ -36,6 +36,8 @@ extern thread_local disk_error_signal_type sstable_read_error;
extern thread_local disk_error_signal_type sstable_write_error;
extern thread_local disk_error_signal_type general_disk_error;
bool should_stop_on_system_error(const std::system_error& e);
template<typename Func, typename... Args>
std::enable_if_t<!is_future<std::result_of_t<Func(Args&&...)>>::value,
std::result_of_t<Func(Args&&...)>>
@@ -44,7 +46,7 @@ do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
// calling function
return func(std::forward<Args>(args)...);
} catch (std::system_error& e) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(e)) {
signal();
throw storage_io_error(e);
}
@@ -62,7 +64,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
try {
std::rethrow_exception(ep);
} catch (std::system_error& sys_err) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(sys_err)) {
signal();
throw storage_io_error(sys_err);
}
@@ -70,7 +72,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
return futurize<std::result_of_t<Func(Args&&...)>>::make_exception_future(ep);
});
} catch (std::system_error& e) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(e)) {
signal();
throw storage_io_error(e);
}

View File

@@ -58,7 +58,7 @@ while [ $# -gt 0 ]; do
shift 2
;;
"--setup-nic")
SET_NIC=yes
SETUP_NIC=1
shift 1
;;
"--ami")

View File

@@ -2,8 +2,8 @@ FROM centos:7
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.1.repo -o /etc/yum.repos.d/scylla.repo
RUN yum -y install epel-release
ADD scylla.repo /etc/yum.repos.d/
RUN yum -y clean expire-cache
RUN yum -y update
RUN yum -y remove boost-thread boost-system

View File

@@ -1,23 +0,0 @@
[scylla]
name=Scylla for Centos $releasever - $basearch
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/$basearch/
enabled=1
gpgcheck=0
[scylla-generic]
name=Scylla for centos $releasever
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/noarch/
enabled=1
gpgcheck=0
[scylla-3rdparty]
name=Scylla 3rdParty for Centos $releasever - $basearch
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/$basearch/
enabled=1
gpgcheck=0
[scylla-3rdparty-generic]
name=Scylla 3rdParty for Centos $releasever
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/noarch/
enabled=1
gpgcheck=0

View File

@@ -1590,12 +1590,12 @@ bool gossiper::is_alive(inet_address ep) {
if (ep == get_broadcast_address()) {
return true;
}
auto eps = get_endpoint_state_for_endpoint(ep);
auto it = endpoint_state_map.find(ep);
// we could assert not-null, but having isAlive fail screws a node over so badly that
// it's worth being defensive here so minor bugs don't cause disproportionate
// badness. (See CASSANDRA-1463 for an example).
if (eps) {
return eps->is_alive();
if (it != endpoint_state_map.end()) {
return it->second.is_alive();
} else {
logger.warn("unknown endpoint {}", ep);
return false;

15
init.cc
View File

@@ -64,18 +64,23 @@ void init_ms_fd_gossiper(sstring listen_address
}
future<> f = make_ready_future<>();
::shared_ptr<server_credentials> creds;
std::shared_ptr<credentials_builder> creds;
if (ew != encrypt_what::none) {
// note: credentials are immutable after this, and ok to share across shards
creds = ::make_shared<server_credentials>(::make_shared<dh_params>(dh_params::level::MEDIUM));
creds = std::make_shared<credentials_builder>();
creds->set_dh_level(dh_params::level::MEDIUM);
creds->set_x509_key_file(ms_cert, ms_key, x509_crt_format::PEM).get();
ms_trust_store.empty() ? creds->set_system_trust().get() :
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
if (ms_trust_store.empty()) {
creds->set_system_trust().get();
} else {
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
}
}
// Init messaging_service
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
// #293 - do not stop anything
//engine().at_exit([] { return net::get_messaging_service().stop(); });
// Init failure_detector

View File

@@ -27,6 +27,8 @@
#include "log.hh"
#include <unordered_map>
#include <algorithm>
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
namespace locator {
@@ -339,25 +341,65 @@ range<token> token_metadata::get_primary_range_for(token right) {
return get_primary_ranges_for({right}).front();
}
boost::icl::interval<token>::interval_type
token_metadata::range_to_interval(range<dht::token> r) {
bool start_inclusive = false;
bool end_inclusive = false;
token start = dht::minimum_token();
token end = dht::maximum_token();
if (r.start()) {
start = r.start()->value();
start_inclusive = r.start()->is_inclusive();
}
if (r.end()) {
end = r.end()->value();
end_inclusive = r.end()->is_inclusive();
}
if (start_inclusive == false && end_inclusive == false) {
return boost::icl::interval<token>::open(std::move(start), std::move(end));
} else if (start_inclusive == false && end_inclusive == true) {
return boost::icl::interval<token>::left_open(std::move(start), std::move(end));
} else if (start_inclusive == true && end_inclusive == false) {
return boost::icl::interval<token>::right_open(std::move(start), std::move(end));
} else {
return boost::icl::interval<token>::closed(std::move(start), std::move(end));
}
}
void token_metadata::set_pending_ranges(const sstring& keyspace_name,
std::unordered_multimap<range<token>, inet_address> new_pending_ranges) {
if (new_pending_ranges.empty()) {
_pending_ranges.erase(keyspace_name);
_pending_ranges_map.erase(keyspace_name);
_pending_ranges_interval_map.erase(keyspace_name);
return;
}
std::unordered_map<range<token>, std::unordered_set<inet_address>> map;
for (const auto& x : new_pending_ranges) {
map[x.first].emplace(x.second);
}
// construct a interval map to speed up the search
_pending_ranges_interval_map[keyspace_name] = {};
for (const auto& m : map) {
_pending_ranges_interval_map[keyspace_name] +=
std::make_pair(range_to_interval(m.first), m.second);
}
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
_pending_ranges_map[keyspace_name] = std::move(map);
}
std::unordered_multimap<range<token>, inet_address>&
token_metadata::get_pending_ranges_mm(sstring keyspace_name) {
return _pending_ranges[keyspace_name];
}
std::unordered_map<range<token>, std::unordered_set<inet_address>>
const std::unordered_map<range<token>, std::unordered_set<inet_address>>&
token_metadata::get_pending_ranges(sstring keyspace_name) {
std::unordered_map<range<token>, std::unordered_set<inet_address>> ret;
for (auto x : get_pending_ranges_mm(keyspace_name)) {
auto& range_token = x.first;
auto& ep = x.second;
auto it = ret.find(range_token);
if (it != ret.end()) {
it->second.emplace(ep);
} else {
ret.emplace(range_token, std::unordered_set<inet_address>{ep});
}
}
return ret;
return _pending_ranges_map[keyspace_name];
}
std::vector<range<token>>
@@ -378,7 +420,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) {
logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name);
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
return;
}
@@ -463,7 +505,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
all_left_metadata.remove_endpoint(endpoint);
}
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : print_pending_ranges()));
@@ -508,14 +550,23 @@ void token_metadata::add_moving_endpoint(token t, inet_address endpoint) {
}
std::vector<gms::inet_address> token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) {
// Fast path 0: no pending ranges at all
if (_pending_ranges_interval_map.empty()) {
return {};
}
// Fast path 1: no pending ranges for this keyspace_name
if (_pending_ranges_interval_map[keyspace_name].empty()) {
return {};
}
// Slow path: lookup pending ranges
std::vector<gms::inet_address> endpoints;
auto ranges = get_pending_ranges(keyspace_name);
for (auto& x : ranges) {
if (x.first.contains(token, dht::token_comparator())) {
for (auto& addr : x.second) {
endpoints.push_back(addr);
}
}
auto interval = range_to_interval(range<dht::token>(token));
auto it = _pending_ranges_interval_map[keyspace_name].find(interval);
if (it != _pending_ranges_interval_map[keyspace_name].end()) {
// interval_map does not work with std::vector, convert to std::vector of ips
endpoints = std::vector<gms::inet_address>(it->second.begin(), it->second.end());
}
return endpoints;
}

View File

@@ -46,6 +46,8 @@
#include "utils/UUID.hh"
#include <experimental/optional>
#include <boost/range/iterator_range.hpp>
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
#include "query-request.hh"
#include "range.hh"
@@ -144,6 +146,8 @@ private:
std::unordered_map<token, inet_address> _moving_endpoints;
std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> _pending_ranges;
std::unordered_map<sstring, std::unordered_map<range<token>, std::unordered_set<inet_address>>> _pending_ranges_map;
std::unordered_map<sstring, boost::icl::interval_map<token, std::unordered_set<inet_address>>> _pending_ranges_interval_map;
std::vector<token> _sorted_tokens;
@@ -608,13 +612,15 @@ public:
std::vector<range<token>> get_primary_ranges_for(std::unordered_set<token> tokens);
range<token> get_primary_range_for(token right);
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
private:
std::unordered_multimap<range<token>, inet_address>& get_pending_ranges_mm(sstring keyspace_name);
void set_pending_ranges(const sstring& keyspace_name, std::unordered_multimap<range<token>, inet_address> new_pending_ranges);
public:
/** a mutable map may be returned but caller should not modify it */
std::unordered_map<range<token>, std::unordered_set<inet_address>> get_pending_ranges(sstring keyspace_name);
const std::unordered_map<range<token>, std::unordered_set<inet_address>>& get_pending_ranges(sstring keyspace_name);
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint);
/**

17
main.cc
View File

@@ -516,6 +516,18 @@ int main(int ac, char** av) {
}
return db.load_sstables(proxy);
}).get();
// If the same sstable is shared by several shards, it cannot be
// deleted until all shards decide to compact it. So we want to
// start thse compactions now. Note we start compacting only after
// all sstables in this CF were loaded on all shards - otherwise
// we will have races between the compaction and loading processes
db.invoke_on_all([&proxy] (database& db) {
for (auto& x : db.get_column_families()) {
column_family& cf = *(x.second);
// We start the rewrite, but do not wait for it.
cf.start_rewrite();
}
}).get();
supervisor_notify("setting up system keyspace");
db::system_keyspace::setup(db, qp).get();
supervisor_notify("starting commit log");
@@ -592,6 +604,11 @@ int main(int ac, char** av) {
engine().at_exit([] {
return repair_shutdown(service::get_local_storage_service().db());
});
engine().at_exit([&db] {
return db.invoke_on_all([](auto& db) {
return db.get_compaction_manager().stop();
});
});
}).or_terminate();
});
}

View File

@@ -225,7 +225,7 @@ messaging_service::messaging_service(gms::inet_address ip
, uint16_t port
, encrypt_what ew
, uint16_t ssl_port
, ::shared_ptr<seastar::tls::server_credentials> credentials
, std::shared_ptr<seastar::tls::credentials_builder> credentials
)
: _listen_address(ip)
, _port(port)
@@ -233,7 +233,7 @@ messaging_service::messaging_service(gms::inet_address ip
, _encrypt_what(ew)
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
, _credentials(std::move(credentials))
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
if (_encrypt_what == encrypt_what::none) {
return nullptr;
@@ -255,6 +255,13 @@ messaging_service::messaging_service(gms::inet_address ip
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
return rpc::no_wait;
});
// Do this on just cpu 0, to avoid duplicate logs.
if (engine().cpu_id() == 0) {
if (_server_tls) {
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
}
logger.info("Starting Messaging Service on port {}", _port);
}
}
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {

View File

@@ -186,7 +186,7 @@ public:
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
uint16_t ssl_port, ::shared_ptr<seastar::tls::server_credentials>);
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
~messaging_service();
public:
uint16_t port();

View File

@@ -706,6 +706,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
pw.retract();
} else {
pw.row_count() += row_count ? : 1;
std::move(rows_wr).end_rows().end_qr_partition();
}
}

View File

@@ -50,6 +50,7 @@ class result::partition_writer {
bool _static_row_added = false;
md5_hasher& _digest;
md5_hasher _digest_pos;
uint32_t& _row_count;
public:
partition_writer(
result_request request,
@@ -58,7 +59,8 @@ public:
ser::query_result__partitions& pw,
ser::vector_position pos,
ser::after_qr_partition__key w,
md5_hasher& digest)
md5_hasher& digest,
uint32_t& row_count)
: _request(request)
, _w(std::move(w))
, _slice(slice)
@@ -67,6 +69,7 @@ public:
, _pos(std::move(pos))
, _digest(digest)
, _digest_pos(digest)
, _row_count(row_count)
{ }
bool requested_digest() const {
@@ -98,6 +101,9 @@ public:
md5_hasher& digest() {
return _digest;
}
uint32_t& row_count() {
return _row_count;
}
};
class result::builder {
@@ -106,6 +112,7 @@ class result::builder {
const partition_slice& _slice;
ser::query_result__partitions _w;
result_request _request;
uint32_t _row_count = 0;
public:
builder(const partition_slice& slice, result_request request)
: _slice(slice)
@@ -130,21 +137,21 @@ public:
if (_request != result_request::only_result) {
key.feed_hash(_digest, s);
}
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest);
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count);
}
result build() {
std::move(_w).end_partitions().end_query_result();
switch (_request) {
case result_request::only_result:
return result(std::move(_out));
return result(std::move(_out), _row_count);
case result_request::only_digest: {
bytes_ostream buf;
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
return result(std::move(buf), result_digest(_digest.finalize_array()));
}
case result_request::result_and_digest:
return result(std::move(_out), result_digest(_digest.finalize_array()));
return result(std::move(_out), result_digest(_digest.finalize_array()), _row_count);
}
abort();
}

View File

@@ -96,14 +96,16 @@ public:
class result {
bytes_ostream _w;
stdx::optional<result_digest> _digest;
stdx::optional<uint32_t> _row_count;
public:
class builder;
class partition_writer;
friend class result_merger;
result();
result(bytes_ostream&& w) : _w(std::move(w)) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d) : _w(std::move(w)), _digest(d) {}
result(bytes_ostream&& w, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _row_count(c) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _digest(d), _row_count(c) {}
result(result&&) = default;
result(const result&) = default;
result& operator=(result&&) = default;
@@ -117,6 +119,10 @@ public:
return _digest;
}
const stdx::optional<uint32_t>& row_count() const {
return _row_count;
}
uint32_t calculate_row_count(const query::partition_slice&);
struct printer {

View File

@@ -213,8 +213,16 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
bytes_ostream w;
auto partitions = ser::writer_of_query_result(w).start_partitions();
std::experimental::optional<uint32_t> row_count = 0;
for (auto&& r : _partial) {
if (row_count) {
if (r->row_count()) {
row_count = row_count.value() + r->row_count().value();
} else {
row_count = std::experimental::nullopt;
}
}
result_view::do_with(*r, [&] (result_view rv) {
for (auto&& pv : rv._v.partitions()) {
partitions.add(pv);
@@ -224,7 +232,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
std::move(partitions).end_partitions().end_query_result();
return make_foreign(make_lw_shared<query::result>(std::move(w)));
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
}
}

View File

@@ -393,7 +393,7 @@ static future<> sync_range(seastar::sharded<database>& db,
return sp_in.execute().discard_result().then([&sp_out] {
return sp_out.execute().discard_result();
}).handle_exception([] (auto ep) {
logger.error("repair's stream failed: {}", ep);
logger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
});
});

View File

@@ -400,7 +400,16 @@ row_cache::make_reader(schema_ptr s, const query::partition_range& range, const
}
row_cache::~row_cache() {
clear();
clear_now();
}
void row_cache::clear_now() noexcept {
with_allocator(_tracker.allocator(), [this] {
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
}
void row_cache::populate(const mutation& m) {
@@ -424,16 +433,8 @@ void row_cache::populate(const mutation& m) {
});
}
void row_cache::clear() {
with_allocator(_tracker.allocator(), [this] {
// We depend on clear_and_dispose() below not looking up any keys.
// Using with_linearized_managed_bytes() is no helps, because we don't
// want to propagate an exception from here.
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
future<> row_cache::clear() {
return invalidate(query::full_partition_range);
}
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
@@ -459,8 +460,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
});
if (blow_cache) {
// We failed to invalidate the key, presumably due to with_linearized_managed_bytes()
// running out of memory. Recover using clear(), which doesn't throw.
clear();
// running out of memory. Recover using clear_now(), which doesn't throw.
clear_now();
}
});
});
@@ -534,7 +535,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
});
}
void row_cache::invalidate(const dht::decorated_key& dk) {
future<> row_cache::invalidate(const dht::decorated_key& dk) {
return _populate_phaser.advance_and_await().then([this, &dk] {
_read_section(_tracker.region(), [&] {
with_allocator(_tracker.allocator(), [this, &dk] {
with_linearized_managed_bytes([&] {
@@ -542,17 +544,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) {
});
});
});
});
}
void row_cache::invalidate(const query::partition_range& range) {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate(unwrapped.first);
invalidate(unwrapped.second);
return;
}
future<> row_cache::invalidate(const query::partition_range& range) {
return _populate_phaser.advance_and_await().then([this, &range] {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate_unwrapped(unwrapped.first);
invalidate_unwrapped(unwrapped.second);
} else {
invalidate_unwrapped(range);
}
});
});
}
void row_cache::invalidate_unwrapped(const query::partition_range& range) {
logalloc::reclaim_lock _(_tracker.region());
auto cmp = cache_entry::compare(_schema);
@@ -578,7 +587,6 @@ void row_cache::invalidate(const query::partition_range& range) {
deleter(p);
});
});
});
}
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,

View File

@@ -182,13 +182,13 @@ private:
mutation_source _underlying;
key_source _underlying_keys;
// Synchronizes populating reads with update() to ensure that cache
// Synchronizes populating reads with updates of underlying data source to ensure that cache
// remains consistent across flushes with the underlying data source.
// Readers obtained from the underlying data source in earlier than
// current phases must not be used to populate the cache, unless they hold
// phaser::operation created in the reader's phase of origin. Readers
// should hold to a phase only briefly because this inhibits progress of
// update(). Phase changes occur only in update(), which can be assumed to
// updates. Phase changes occur in update()/clear(), which can be assumed to
// be asynchronous wrt invoking of the underlying data source.
utils::phased_barrier _populate_phaser;
@@ -200,6 +200,8 @@ private:
void on_miss();
void upgrade_entry(cache_entry&);
void invalidate_locked(const dht::decorated_key&);
void invalidate_unwrapped(const query::partition_range&);
void clear_now() noexcept;
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
public:
~row_cache();
@@ -221,7 +223,9 @@ public:
void populate(const mutation& m);
// Clears the cache.
void clear();
// Guarantees that cache will not be populated using readers created
// before this method was invoked.
future<> clear();
// Synchronizes cache with the underlying data source from a memtable which
// has just been flushed to the underlying data source.
@@ -233,11 +237,21 @@ public:
void touch(const dht::decorated_key&);
// Removes given partition from cache.
void invalidate(const dht::decorated_key&);
//
// Guarantees that cache will not be populated with given key
// using readers created before this method was invoked.
//
// The key must be kept alive until method resolves.
future<> invalidate(const dht::decorated_key& key);
// Removes given range of partitions from cache.
// The range can be a wrap around.
void invalidate(const query::partition_range&);
//
// Guarantees that cache will not be populated with partitions from that range
// using readers created before this method was invoked.
//
// The range must be kept alive until method resolves.
future<> invalidate(const query::partition_range&);
auto num_entries() const {
return _partitions.size();

View File

@@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
return make_ready_future<>();
case schema_registry_entry::sync_state::SYNCING:
return _synced_future;
case schema_registry_entry::sync_state::NOT_SYNCED:
case schema_registry_entry::sync_state::NOT_SYNCED: {
logger.debug("Syncing {}", _version);
_synced_promise = {};
do_with(std::move(syncer), [] (auto& syncer) {
auto f = do_with(std::move(syncer), [] (auto& syncer) {
return syncer();
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
if (_sync_state != sync_state::SYNCING) {
return;
}
@@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
_synced_promise.set_value();
}
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
return _synced_future;
}
default:
assert(0);
}

Submodule seastar updated: dab58e4562...b80564dff1

View File

@@ -710,20 +710,28 @@ public static class MigrationsSerializer implements IVersionedSerializer<Collect
//
// The endpoint is the node from which 's' originated.
//
// FIXME: Avoid the sync if the source was/is synced by schema_tables::merge_schema().
static future<> maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) {
if (s->is_synced()) {
return make_ready_future<>();
}
// Serialize schema sync by always doing it on shard 0.
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync([endpoint, s] {
return s->registry_entry()->maybe_sync([s, endpoint] {
auto merge = [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint);
return get_local_migration_manager().merge_schema_from(endpoint);
});
};
// Serialize schema sync by always doing it on shard 0.
if (engine().cpu_id() == 0) {
return merge();
} else {
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync(merge);
});
}
});
}

View File

@@ -2260,14 +2260,14 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count) {
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<::shared_ptr<abstract_read_executor>> exec;
auto concurrent_fetch_starting_index = i;
auto p = shared_from_this();
while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) {
while (i != ranges.end() && std::distance(concurrent_fetch_starting_index, i) < concurrency_factor) {
query::partition_range& range = *i;
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints);
@@ -2325,13 +2325,15 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
return rex->execute(timeout);
}, std::move(merger));
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout]
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count]
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
total_row_count += result->row_count() ? result->row_count().value() :
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
results.emplace_back(std::move(result));
if (i == ranges.end()) {
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
} else {
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor);
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, total_row_count);
}
}).handle_exception([p] (std::exception_ptr eptr) {
p->handle_read_error(eptr);
@@ -2363,6 +2365,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
results.reserve(ranges.size()/concurrency_factor + 1);
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor)
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {

View File

@@ -219,7 +219,7 @@ private:
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor);
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count = 0);
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
lw_shared_ptr<query::read_command> cmd,

View File

@@ -972,6 +972,28 @@ void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subsc
static stdx::optional<future<>> drain_in_progress;
future<> storage_service::stop_transport() {
return run_with_no_api_lock([] (storage_service& ss) {
return seastar::async([&ss] {
logger.info("Stop transport: starts");
gms::get_local_gossiper().stop_gossiping().get();
logger.info("Stop transport: stop_gossiping done");
ss.shutdown_client_servers().get();
logger.info("Stop transport: shutdown rpc and cql server done");
ss.do_stop_ms().get();
logger.info("Stop transport: shutdown messaging_service done");
auth::auth::shutdown().get();
logger.info("Stop transport: auth shutdown");
logger.info("Stop transport: done");
});
});
}
future<> storage_service::drain_on_shutdown() {
return run_with_no_api_lock([] (storage_service& ss) {
if (drain_in_progress) {
@@ -980,17 +1002,8 @@ future<> storage_service::drain_on_shutdown() {
return seastar::async([&ss] {
logger.info("Drain on shutdown: starts");
gms::get_local_gossiper().stop_gossiping().get();
logger.info("Drain on shutdown: stop_gossiping done");
ss.shutdown_client_servers().get();
logger.info("Drain on shutdown: shutdown rpc and cql server done");
ss.do_stop_ms().get();
logger.info("Drain on shutdown: shutdown messaging_service done");
auth::auth::shutdown().get();
logger.info("Drain on shutdown: auth shutdown");
ss.stop_transport().get();
logger.info("Drain on shutdown: stop_transport done");
ss.flush_column_families();
logger.info("Drain on shutdown: flush column_families done");
@@ -1797,16 +1810,18 @@ future<> storage_service::start_native_transport() {
// return cserver->stop();
//});
::shared_ptr<seastar::tls::server_credentials> cred;
std::shared_ptr<seastar::tls::credentials_builder> cred;
auto addr = ipv4_addr{ip, port};
auto f = make_ready_future();
// main should have made sure values are clean and neatish
if (ceo.at("enabled") == "true") {
cred = ::make_shared<seastar::tls::server_credentials>(::make_shared<seastar::tls::dh_params>(seastar::tls::dh_params::level::MEDIUM));
cred = std::make_shared<seastar::tls::credentials_builder>();
cred->set_dh_level(seastar::tls::dh_params::level::MEDIUM);
f = cred->set_x509_key_file(ceo.at("certificate"), ceo.at("keyfile"), seastar::tls::x509_crt_format::PEM);
logger.info("Enabling encrypted CQL connections between client and server");
}
return f.then([cserver, addr, cred, keepalive] {
return f.then([cserver, addr, cred = std::move(cred), keepalive] {
return cserver->invoke_on_all(&transport::cql_server::listen, addr, cred, keepalive);
});
});
@@ -2987,7 +3002,7 @@ void storage_service::do_isolate_on_error(disk_error type)
if (must_isolate && !isolated.exchange(true)) {
logger.warn("Shutting down communications due to I/O errors until operator intervention");
// isolated protect us against multiple stops
service::get_storage_service().invoke_on_all([] (service::storage_service& s) { s.stop_native_transport(); });
service::get_local_storage_service().stop_transport();
}
}

View File

@@ -382,6 +382,8 @@ public:
future<> drain_on_shutdown();
future<> stop_transport();
void flush_column_families();
#if 0
/**

View File

@@ -187,6 +187,49 @@ void compaction_manager::task_start(lw_shared_ptr<compaction_manager::task>& tas
});
}
// submit_sstable_rewrite() starts a compaction task, much like submit(),
// But rather than asking a compaction policy what to compact, this function
// compacts just a single sstable, and writes one new sstable. This operation
// is useful to split an sstable containing data belonging to multiple shards
// into a separate sstable on each shard.
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
// The semaphore ensures that the sstable rewrite operations submitted by
// submit_sstable_rewrite are run in sequence, and not all of them in
// parallel. Note that unlike general compaction which currently allows
// different cfs to compact in parallel, here we don't have a semaphore
// per cf, so we only get one rewrite at a time on each shard.
static thread_local semaphore sem(1);
// We cannot, and don't need to, compact an sstable which is already
// being compacted anyway.
if (_stopped || _compacting_sstables.count(sst)) {
return;
}
// Conversely, we don't want another compaction job to compact the
// sstable we are planning to work on:
_compacting_sstables.insert(sst);
auto task = make_lw_shared<compaction_manager::task>();
_tasks.push_back(task);
_stats.active_tasks++;
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
return cf->compact_sstables(sstables::compaction_descriptor(
std::vector<sstables::shared_sstable>{sst},
sst->get_sstable_level(),
std::numeric_limits<uint64_t>::max()), false);
}).then_wrapped([this, sst, task] (future<> f) {
_compacting_sstables.erase(sst);
_stats.active_tasks--;
_tasks.remove(task);
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stop_exception& e) {
cmlog.info("compaction info: {}", e.what());
} catch (...) {
cmlog.error("compaction failed: {}", std::current_exception());
}
});
}
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task>& task) {
task->stopping = true;
return task->compaction_gate.close().then([task] {
@@ -228,7 +271,6 @@ void compaction_manager::register_collectd_metrics() {
void compaction_manager::start(int task_nr) {
_stopped = false;
_tasks.reserve(task_nr);
register_collectd_metrics();
for (int i = 0; i < task_nr; i++) {
auto task = make_lw_shared<compaction_manager::task>();

View File

@@ -58,7 +58,7 @@ private:
};
// compaction manager may have N fibers to allow parallel compaction per shard.
std::vector<lw_shared_ptr<task>> _tasks;
std::list<lw_shared_ptr<task>> _tasks;
// Queue shared among all tasks containing all column families to be compacted.
std::deque<column_family*> _cfs_to_compact;
@@ -113,6 +113,13 @@ public:
// Submit a column family to be cleaned up and wait for its termination.
future<> perform_cleanup(column_family* cf);
// Submit a specific sstable to be rewritten, while dropping data which
// does not belong to this shard. Meant to be used on startup when an
// sstable is shared by multiple shards, and we want to split it to a
// separate sstable for each shard.
void submit_sstable_rewrite(column_family* cf,
sstables::shared_sstable s);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);

View File

@@ -107,10 +107,12 @@ public:
// ensure all SSTables are in the manifest
for (auto& sstable : sstables) {
// unconditionally add a sstable to a list of its level.
manifest.add(sstable);
}
for (auto i = 1U; i < manifest._generations.size(); i++) {
// send overlapping sstables (with level > 0) to level 0, if any.
manifest.repair_overlapping_sstables(i);
}
@@ -123,36 +125,8 @@ public:
if (level >= _generations.size()) {
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (_generations.size() - 1)));
}
#if 0
logDistribution();
#endif
if (can_add_sstable(sstable)) {
// adding the sstable does not cause overlap in the level
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
_generations[level].push_back(sstable);
} else {
// this can happen if:
// * a compaction has promoted an overlapping sstable to the given level, or
// was also supposed to add an sstable at the given level.
// * we are moving sstables from unrepaired to repaired and the sstable
// would cause overlap
//
// The add(..):ed sstable will be sent to level 0
#if 0
try
{
reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
reader.reloadSSTableMetadata();
}
catch (IOException e)
{
logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
}
#endif
_generations[0].push_back(sstable);
}
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
_generations[level].push_back(sstable);
}
#if 0
@@ -258,20 +232,8 @@ public:
void send_back_to_L0(sstables::shared_sstable& sstable) {
remove(sstable);
#if 0
try
{
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
sstable.reloadSSTableMetadata();
add(sstable);
}
catch (IOException e)
{
throw new RuntimeException("Could not reload sstable meta data", e);
}
#else
_generations[0].push_back(sstable);
#endif
sstable->set_sstable_level(0);
}
#if 0

View File

@@ -1794,6 +1794,20 @@ double sstable::get_compression_ratio() const {
}
}
void sstable::set_sstable_level(uint32_t new_level) {
auto entry = _statistics.contents.find(metadata_type::Stats);
if (entry == _statistics.contents.end()) {
return;
}
auto& p = entry->second;
if (!p) {
throw std::runtime_error("Statistics is malformed");
}
stats_metadata& s = *static_cast<stats_metadata *>(p.get());
sstlog.debug("set level of {} with generation {} from {} to {}", get_filename(), _generation, s.sstable_level, new_level);
s.sstable_level = new_level;
}
future<> sstable::mutate_sstable_level(uint32_t new_level) {
if (!has_component(component_type::Statistics)) {
return make_ready_future<>();

View File

@@ -529,6 +529,9 @@ public:
return get_stats_metadata().sstable_level;
}
// This will change sstable level only in memory.
void set_sstable_level(uint32_t);
double get_compression_ratio() const;
future<> mutate_sstable_level(uint32_t);

View File

@@ -228,7 +228,7 @@ future<> stream_session::on_initialization_complete() {
}
_stream_result->handle_session_prepared(this->shared_from_this());
} catch (...) {
sslog.error("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
throw;
}
return make_ready_future<>();
@@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() {
return ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
}).handle_exception([id, plan_id] (auto ep) {
sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
std::rethrow_exception(ep);
});
}).then([this] {
@@ -248,7 +248,7 @@ future<> stream_session::on_initialization_complete() {
}
void stream_session::on_error() {
sslog.error("[Stream #{}] Streaming error occurred", plan_id());
sslog.warn("[Stream #{}] Streaming error occurred", plan_id());
// fail session
close_session(stream_session_state::FAILED);
}
@@ -270,7 +270,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
db.find_column_family(ks, cf);
} catch (no_such_column_family) {
auto err = sprint("[Stream #{}] prepare requested ks={} cf={} does not exist", ks, cf);
sslog.error(err.c_str());
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
}
@@ -284,7 +284,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
db.find_column_family(cf_id);
} catch (no_such_column_family) {
auto err = sprint("[Stream #{}] prepare cf_id=%s does not exist", plan_id, cf_id);
sslog.error(err.c_str());
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
prepare_receiving(summary);

View File

@@ -85,41 +85,41 @@ struct send_info {
};
future<stop_iteration> do_send_mutations(auto si, auto fm) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
return stop_iteration::no;
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
});
return make_ready_future<stop_iteration>(stop_iteration::no);
}
future<> send_mutations(auto si) {
auto& cf = si->db.find_column_family(si->cf_id);
auto& priority = service::get_local_streaming_read_priority();
return do_with(cf.make_reader(cf.schema(), si->pr, priority), [si] (auto& reader) {
return repeat([si, &reader] () {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return repeat([si, &reader] {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
});
}).then([si] {
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
auto cf_id = this->cf_id;
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
auto cf_id = this->cf_id;
@@ -153,7 +153,7 @@ void stream_transfer_task::start() {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,
cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) {
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
std::rethrow_exception(ep);
});
}).then([this, id, plan_id, cf_id] {
@@ -161,7 +161,7 @@ void stream_transfer_task::start() {
session->start_keep_alive_timer();
session->transfer_task_completed(cf_id);
}).handle_exception([this, plan_id, id] (auto ep){
sslog.error("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
this->session->on_error();
});
}

View File

@@ -32,6 +32,7 @@ boost_tests = [
'types_test',
'keys_test',
'mutation_test',
'schema_registry_test',
'range_test',
'mutation_reader_test',
'cql_query_test',

View File

@@ -444,3 +444,36 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
});
}
SEASTAR_TEST_CASE(test_result_row_count) {
return seastar::async([] {
auto s = make_schema();
auto now = gc_clock::now();
auto slice = partition_slice_builder(*s).build();
mutation m1(partition_key::from_single_value(*s, "key1"), s);
auto src = make_source({m1});
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
mutation m2(partition_key::from_single_value(*s, "key2"), s);
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
});
}

View File

@@ -23,10 +23,15 @@
#define BOOST_TEST_MODULE core
#include <boost/test/unit_test.hpp>
#include "boost/icl/interval.hpp"
#include "boost/icl/interval_map.hpp"
#include <unordered_set>
#include "query-request.hh"
#include "schema_builder.hh"
#include "disk-error-handler.hh"
#include "locator/token_metadata.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
@@ -447,3 +452,56 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
// [3,4) and (4,5]
BOOST_REQUIRE(range<unsigned>({3}, {{4, false}}).overlaps(range<unsigned>({{4, false}}, {5}), unsigned_comparator()) == false);
}
auto get_item(std::string left, std::string right, std::string val) {
using value_type = std::unordered_set<std::string>;
auto l = dht::global_partitioner().from_sstring(left);
auto r = dht::global_partitioner().from_sstring(right);
auto rg = range<dht::token>({{l, false}}, {r});
value_type v{val};
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
}
BOOST_AUTO_TEST_CASE(test_range_interval_map) {
using value_type = std::unordered_set<std::string>;
using token = dht::token;
boost::icl::interval_map<token, value_type> mymap;
mymap += get_item("1", "5", "A");
mymap += get_item("5", "8", "B");
mymap += get_item("1", "3", "C");
mymap += get_item("3", "8", "D");
std::cout << "my map: " << "\n";
for (auto x : mymap) {
std::cout << x.first << " -> ";
for (auto s : x.second) {
std::cout << s << ", ";
}
std::cout << "\n";
}
auto search_item = [&mymap] (std::string val) {
auto tok = dht::global_partitioner().from_sstring(val);
auto search = range<token>(tok);
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
if (it != mymap.end()) {
std::cout << "Found OK:" << " token = " << tok << " in range: " << it->first << "\n";
return true;
} else {
std::cout << "Found NO:" << " token = " << tok << "\n";
return false;
}
};
BOOST_REQUIRE(search_item("0") == false);
BOOST_REQUIRE(search_item("1") == false);
BOOST_REQUIRE(search_item("2") == true);
BOOST_REQUIRE(search_item("3") == true);
BOOST_REQUIRE(search_item("4") == true);
BOOST_REQUIRE(search_item("5") == true);
BOOST_REQUIRE(search_item("6") == true);
BOOST_REQUIRE(search_item("7") == true);
BOOST_REQUIRE(search_item("8") == true);
BOOST_REQUIRE(search_item("9") == false);
}

View File

@@ -546,27 +546,33 @@ static std::vector<mutation> updated_ring(std::vector<mutation>& mutations) {
return result;
}
static mutation_source make_mutation_source(std::vector<lw_shared_ptr<memtable>>& memtables) {
return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
}
static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtable>>& memtables) {
return key_source([s, &memtables] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
}
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
auto memtables_key_source = key_source([&] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
throttled_mutation_source cache_source(memtables_data_source);
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, memtables_key_source, tracker);
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
@@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
auto some_element = keys_in_cache.begin() + 547;
std::vector<dht::decorated_key> keys_not_in_cache;
keys_not_in_cache.push_back(*some_element);
cache.invalidate(*some_element);
cache.invalidate(*some_element).get();
keys_in_cache.erase(some_element);
for (auto&& key : keys_in_cache) {
@@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
{ *some_range_begin, true }, { *some_range_end, false }
);
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
cache.invalidate(range);
cache.invalidate(range).get();
keys_in_cache.erase(some_range_begin, some_range_end);
for (auto&& key : keys_in_cache) {
@@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) {
});
}
SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
auto ring = make_ring(s, 3);
for (auto&& m : ring) {
mt1->apply(m);
}
auto mt2 = make_lw_shared<memtable>(s);
auto ring2 = updated_ring(ring);
for (auto&& m : ring2) {
mt2->apply(m);
}
cache_source.block();
auto rd1 = cache.make_reader(s);
auto rd1_result = rd1();
sleep(10ms).get();
memtables.clear();
memtables.push_back(mt2);
// This update should miss on all partitions
auto cache_cleared = cache.clear();
auto rd2 = cache.make_reader(s);
// rd1, which is in progress, should not prevent forward progress of clear()
cache_source.unblock();
cache_cleared.get();
// Reads started before memtable flush should return previous value, otherwise this test
// doesn't trigger the conditions it is supposed to protect against.
assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]);
assert_that(rd1().get0()).has_no_mutation();
// Reads started after clear but before previous populations completed
// should already see the new data
assert_that(std::move(rd2))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
// Reads started after clear should see new data
assert_that(cache.make_reader(s))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
return seastar::async([] {
auto s = make_schema();
@@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
}
// wrap-around
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()}));
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());
@@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
verify_does_not_have(cache, ring[7].decorated_key());
// not wrap-around
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()}));
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());

View File

@@ -0,0 +1,130 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#define BOOST_TEST_DYN_LINK
#include <seastar/core/thread.hh>
#include "tests/test-utils.hh"
#include "schema_registry.hh"
#include "schema_builder.hh"
#include "mutation_source_test.hh"
#include "disk-error-handler.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
static bytes random_column_name() {
return to_bytes(to_hex(make_blob(32)));
}
static schema_ptr random_schema() {
return schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column(random_column_name(), bytes_type)
.build();
}
SEASTAR_TEST_CASE(test_async_loading) {
return seastar::async([] {
auto s1 = random_schema();
auto s2 = random_schema();
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
return make_ready_future<frozen_schema>(frozen_schema(s1));
}).get0();
BOOST_REQUIRE(s1_loaded);
BOOST_REQUIRE(s1_loaded->version() == s1->version());
auto s1_later = local_schema_registry().get_or_null(s1->version());
BOOST_REQUIRE(s1_later);
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
return later().then([s2] { return frozen_schema(s2); });
}).get0();
BOOST_REQUIRE(s2_loaded);
BOOST_REQUIRE(s2_loaded->version() == s2->version());
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
BOOST_REQUIRE(s2_later);
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return later(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
promise<> fail_sync;
auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable {
return fail_sync.get_future().then([] {
throw std::runtime_error("sync failed");
});
});
// concurrent maybe_sync should attach the the current one
auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); });
fail_sync.set_value();
try {
f1.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
try {
f2.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}

View File

@@ -1948,6 +1948,45 @@ SEASTAR_TEST_CASE(leveled_06) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(leveled_07) {
// Check that sstable, with level > 0, that overlaps with another in the same level is sent back to L0.
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
column_family::config cfg;
compaction_manager cm;
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(5);
auto min_key = key_and_token_pair[0].first;
auto max_key = key_and_token_pair[key_and_token_pair.size()-1].first;
// Creating two sstables which key range overlap.
add_sstable_for_leveled_test(cf, /*gen*/1, /*data_size*/0, /*level*/1, min_key, max_key);
BOOST_REQUIRE(cf->get_sstables()->size() == 1);
add_sstable_for_leveled_test(cf, /*gen*/2, /*data_size*/0, /*level*/1, key_and_token_pair[1].first, max_key);
BOOST_REQUIRE(cf->get_sstables()->size() == 2);
BOOST_REQUIRE(sstable_overlaps(cf, 1, 2) == true);
auto max_sstable_size_in_mb = 1;
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 1);
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
auto& l0 = manifest.get_level(0);
auto& sst = l0.front();
BOOST_REQUIRE(sst->generation() == 2);
BOOST_REQUIRE(sst->get_sstable_level() == 0);
return make_ready_future<>();
}
static lw_shared_ptr<key_reader> prepare_key_reader(schema_ptr s,
const std::vector<shared_sstable>& ssts, const query::partition_range& range)
{

View File

@@ -275,7 +275,7 @@ future<> cql_server::stop() {
}
future<>
cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> creds, bool keepalive) {
cql_server::listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
_notifier = std::make_unique<event_notifier>(addr.port);
service::get_local_migration_manager().register_listener(_notifier.get());
service::get_local_storage_service().register_subscriber(_notifier.get());
@@ -285,7 +285,7 @@ cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials
server_socket ss;
try {
ss = creds
? seastar::tls::listen(creds, make_ipv4_address(addr), lo)
? seastar::tls::listen(creds->build_server_credentials(), make_ipv4_address(addr), lo)
: engine().listen(make_ipv4_address(addr), lo);
} catch (...) {
throw std::runtime_error(sprint("CQLServer error while listening on %s -> %s", make_ipv4_address(addr), std::current_exception()));
@@ -325,11 +325,9 @@ cql_server::do_accepts(int which, bool keepalive) {
}).then_wrapped([this, which, keepalive] (future<> f) {
try {
f.get();
} catch (const std::bad_alloc&) {
logger.debug("accept failed: {}, retrying", std::current_exception());
do_accepts(which, keepalive);
} catch (...) {
logger.debug("accept failed: {}", std::current_exception());
logger.warn("acccept failed: {}", std::current_exception());
do_accepts(which, keepalive);
}
});
}

View File

@@ -107,7 +107,7 @@ private:
cql_load_balance _lb;
public:
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
future<> listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> = {}, bool keepalive = false);
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
future<> do_accepts(int which, bool keepalive);
future<> stop();
public:

View File

@@ -49,3 +49,17 @@ bool is_system_error_errno(int err_no)
code.category() == std::system_category();
});
}
bool should_stop_on_system_error(const std::system_error& e) {
if (e.code().category() == std::system_category()) {
// Whitelist of errors that don't require us to stop the server:
switch (e.code().value()) {
case EEXIST:
case ENOENT:
return false;
default:
break;
}
}
return true;
}