Compare commits

...

84 Commits

Author SHA1 Message Date
Takuya ASADA
9d9d0258a2 dist/redhat: don't try to adduser when user is already exists
Currently we get "failed adding user 'scylla'" on .rpm installation when user is already exists, we can skip it to prevent error.

Fixes #1958

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1482550075-27939-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit f3e45bc9ef)
2016-12-27 09:48:22 +02:00
Tomasz Grabiec
05b6b459a0 tests: Remove unintentional enablement of trace-level logging
Sneaked in by mistake.

(cherry picked from commit c9344826e9)
2016-12-21 15:39:28 +01:00
Tomasz Grabiec
49b43131f4 Revert "Revert "commitlog: use read ahead for replay requests""
This reverts commit 13eb7a6213.
2016-12-20 20:09:10 +01:00
Tomasz Grabiec
faba7b2ad4 Revert "Revert "commitlog: use commitlog priority for replay""
This reverts commit 53c3219e56.
2016-12-20 20:09:10 +01:00
Tomasz Grabiec
c5d8c61652 Revert "Revert "commitlog: close file after read, and not at stop""
This reverts commit 55dc520405.
2016-12-20 20:09:10 +01:00
Tomasz Grabiec
6b58100900 Revert "Revert "commitlog: close replay file""
This reverts commit a68721f67b.
2016-12-20 20:09:10 +01:00
Tomasz Grabiec
48b7e7a368 tests: commitlog: Fix assumption about write visibility
The test assumed that mutations added to the commitlog are visible to
reads as soon as a new segment is opened. That's not true because
buffers are written back in the background, and new segment may be
active while the previous one is still being written or not yet
synced.

Fix the test so that it expectes that the number of mutations read
this way is <= the number of mutations read, and that after all
segments are synced, the number of mutations read is equal.
2016-12-20 20:09:10 +01:00
Tomasz Grabiec
81f8b8d47b Update seastar submodule
* seastar 05b4438...7907bae (2):
  > Revert "Revert "circular_buffer: fix reserve() for non-power-of-twos""
  > Revert "Revert "bitops: extract log2ceil() and log2floor() from memory.cc""
2016-12-20 20:09:09 +01:00
Duarte Nunes
b10dcfc5f6 lz4: Conditionally use LZ4_compress_default()
Since not all distributions have a version of LZ4 with
LZ4_compress_default(), we use it conditionally.

This is specially important beginning with version 1.7.3 of LZ4,
which deprecates the LZ4_compress() function in favour of
LZ4_compress_default() and thus prevents Scylla from compiling
due to the deprecated warning.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <20161124092339.23017-1-duarte@scylladb.com>
(cherry picked from commit cc3f26c993)
2016-12-15 19:59:07 +02:00
Pekka Enberg
fbf029acb3 Update seastar submodule
* seastar c50a301...05b4438 (5):
  > Revert "circular_buffer: fix reserve() for non-power-of-twos"
  > Revert "bitops: extract log2ceil() and log2floor() from memory.cc"
  > rpc: Conditionally use LZ4_compress_default()
  > bitops: extract log2ceil() and log2floor() from memory.cc
  > circular_buffer: fix reserve() for non-power-of-twos
2016-12-14 20:21:52 +02:00
Pekka Enberg
a68721f67b Revert "commitlog: close replay file"
This reverts commit 7647acd201.
2016-12-13 09:47:38 +02:00
Pekka Enberg
55dc520405 Revert "commitlog: close file after read, and not at stop"
This reverts commit be9e419a32.
2016-12-13 09:47:32 +02:00
Pekka Enberg
53c3219e56 Revert "commitlog: use commitlog priority for replay"
This reverts commit 58504bda5b.
2016-12-13 09:47:27 +02:00
Pekka Enberg
13eb7a6213 Revert "commitlog: use read ahead for replay requests"
This reverts commit 38e78bb8a2.
2016-12-13 09:47:22 +02:00
Pekka Enberg
3f94dead9e Revert "Update seastar submodule"
This reverts commit 4513d12311.
2016-12-13 09:47:11 +02:00
Tomasz Grabiec
4513d12311 Update seastar submodule
* seastar c50a301...cf62d43 (2):
  > bitops: extract log2ceil() and log2floor() from memory.cc
  > circular_buffer: fix reserve() for non-power-of-twos

Fixes failure in commitlog_test after the following backport:

  commit 38e78bb8a2
  Author: Glauber Costa <glauber@scylladb.com>
  Date:   Thu Nov 17 14:09:17 2016 -0500

    commitlog: use read ahead for replay requests
2016-12-12 11:33:48 +01:00
Pekka Enberg
c427193428 Revert "sstables: fix probe with Unknown component"
This reverts commit 4f6beb96f3 because it fails to compile.
2016-12-10 09:01:03 +02:00
Pekka Enberg
55b5149c17 release: prepare for 1.4.3 2016-12-09 17:31:55 +02:00
Avi Kivity
4f6beb96f3 sstables: fix probe with Unknown component
Commit 53b7b7def3 ("sstables: handle unrecognized sstable component")
ignores unrecognized components, but misses one code path during probe_file().

Ignore unrecognized components there too.

Fixes #1922.
Message-Id: <20161208131027.28939-1-avi@scylladb.com>

(cherry picked from commit 872b5ef5f0)
2016-12-08 17:23:42 +02:00
Amos Kong
102792ee4b systemd: reset housekeeping timer at each start
Currently housekeeping timer won't be reset when we restart scylla-server.
We expect the service to be run at each start, it will be consistent with
upstart script in Ubuntu 14.04

When we restart scylla-server, housekeepting timer will also be restarted,
so let's replace "OnBootSec" with "OnActiveSec".

Fixes: #1601

Signed-off-by: Amos Kong <amos@scylladb.com>
Message-Id: <a22943cc11a3de23db266c52fd476c08014098c4.1480607401.git.amos@scylladb.com>
2016-12-06 18:34:09 +02:00
Takuya ASADA
125c39d8d1 dist/common/systemd/scylla-housekeeping.timer: workaround to avoid crash of systemd on RHEL 7.3
RHEL 7.3's systemd contains known bug on timer.c:
https://github.com/systemd/systemd/issues/2632

This is workaround to avoid hitting bug.

Fixes #1846

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1480452194-11683-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 8464903021)
2016-12-06 10:49:12 +02:00
Glauber Costa
38e78bb8a2 commitlog: use read ahead for replay requests
Aside from putting the requests in the commitlog class, read ahead
will help us going through the file faster.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 59a41cf7f1)
2016-12-01 16:15:49 +01:00
Glauber Costa
58504bda5b commitlog: use commitlog priority for replay
Right now replay is being issued with the standard seastar priority.
The rationale for that at the time is that it is an early event that
doesn't really share the disk with anybody.

That is largely untrue now that we start compactions on boot.
Compactions may fight for bandwidth with the commitlog, and with such
low priority the commitlog is guaranteed to lose.

Fixes #1856

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit aa375cd33d)
2016-12-01 16:15:44 +01:00
Glauber Costa
be9e419a32 commitlog: close file after read, and not at stop
There are other code paths that may interrupt the read in the middle
and bypass stop. It's safer this way.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <8c32ca2777ce2f44462d141fd582848ac7cf832d.1479477360.git.glauber@scylladb.com>
(cherry picked from commit 60b7d35f15)
2016-12-01 16:15:39 +01:00
Glauber Costa
7647acd201 commitlog: close replay file
Replay file is opened, so it should be closed. We're not seeing any
problems arising from this, but they may happen. Enabling read ahead in
this stream makes them happen immediately. Fix it.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 4d3d774757)
2016-12-01 16:15:35 +01:00
Takuya ASADA
f35c088363 dist/common/scripts/scylla_kernel_check: fix incorrect document URL
Fixes #1871

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1480327243-18177-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 1042e40188)
2016-11-29 11:12:37 +02:00
Avi Kivity
f19fca6b8b Update seastar submodule
* seastar 28aeb47...c50a301 (1):
  > Collectd get_value_map safe scan the map

Fixes #1835.
2016-11-27 18:21:07 +02:00
Pekka Enberg
0d6223580c release: prepare for 1.4.2 2016-11-24 10:58:28 +02:00
Duarte Nunes
6152f091d4 thrift: Don't apply cell limit across rows
In Thrift, SliceRange defines a count that limits the number of cells
to return from that row (in CQL3 terms, it limits the number of rows
in that partition). While this limit is honored in the engine, the
Thrift layer also applies the same limit, which, while redundant in
most cases, is used to support the get_paged_slice verb.

Currently, the limit is not being reset per Thrift row (CQL3
partition), so in practice, instead of limiting the cells in a row,
we're limiting the rows we return as well. This patch fixes that by
ensuring the limit applies only within a row/partition.

Fixes #1882

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <20161123220001.15496-1-duarte@scylladb.com>
(cherry picked from commit a527ba285f)
2016-11-24 10:39:08 +02:00
Avi Kivity
9bc54bcf5e tests: fix tests with boost 1.60
In boost 1.60, the executable's command-line arguments are expected to
be separated from the boost command-line arguments by '--'.  Detect
this requirement and comply with it.
Message-Id: <1477212424-3831-1-git-send-email-avi@scylladb.com>

(cherry picked from commit fc8210a875)
2016-11-24 10:11:11 +02:00
Avi Kivity
d6c9abd9ae storage_proxy: don't query concurrently needlessly during range queries
storage_proxy has an optimization where it tries to query multiple token
ranges concurrently to satisfy very large requests (an optimization which is
likely meaningless when paging is enabled, as it always should be).  However,
the rows-per-range code severely underestimates the number of rows per range,
resulting in a large number of "read-ahead" internal queries being performed,
the results of most of which are discarded.

Fix by disabling this code. We should likely remove it completely, but let's
start with a band-aid that can be backported.

Fixes #1863.

Message-Id: <20161120165741.2488-1-avi@scylladb.com>
(cherry picked from commit 6bdb8ba31d)
2016-11-21 18:28:14 +02:00
Raphael S. Carvalho
b862c30bc0 db: do not leak deleted sstable when deletion triggers an exception
The leakage results in deleted sstables being opened until shutdown, and disk
space isn't released. That's because column_family::rebuild_sstable_list()
will not remove reference to deleted sstables if an exception was triggered in
sstables::delete_atomically(). A sstable only has its files closed when its
object is destructed.

The exception happens when a major compaction is issued in parallel to a
regular one, and one of them will be unable to delete a sstable already deleted
by the other. That results in remove_by_toc_name() triggering boost::filesystem
::filesystem_error because TOC and temporary TOC don't exist.

We wouldn't have seen this problem if major compaction were going through
compaction manager, but remove_by_toc_name() and rebuild_sstable_list() should
be made resilient.

Fixes #1840.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <d43b2e78f9658e2c3c5bbb7f813756f18874bf92.1479390842.git.raphaelsc@scylladb.com>
(cherry picked from commit 3dc9294023)
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <fd3035b14f4e10f6bfd36cfd644388a95e60e6a8.1479431741.git.raphaelsc@scylladb.com>
2016-11-18 13:11:14 +02:00
Raphael S. Carvalho
87fedbbb9d sstables: Fix "fix ad-hoc summary creation" backport
Commit 8bc1c87cfd ("sstables: fix ad-hoc summary creation ") didn't take
into account that there were some places calling
index_consume_entry_context() with wrong arguments.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <98ca5f7bf517c4e55dacc3a23ad294d3bcde8907.1479431741.git.raphaelsc@scylladb.com>
2016-11-18 13:03:53 +02:00
Gleb Natapov
8bc1c87cfd sstables: fix ad-hoc summary creation
If sstable Summary is not present Scylla does not refuses to boot but
instead creates summary information on the fly. There is a bug in this
code though. Summary files is a map between keys and offsets into Index
file, but the code creates map between keys and Data file offsets
instead. Fix it by keeping offset of an index entry in index_entry
structure and use it during Summary file creation.

Fixes #1857.

Reviewed-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20161116165421.GA22296@scylladb.com>
(cherry picked from commit ae0a2935b4)
2016-11-17 11:46:18 +02:00
Raphael S. Carvalho
44448b3ab9 main: fix exception handling when initializing data or commitlog dirs
Exception handling was broken because after io checker, storage_io_error
exception is wrapped around system error exceptions. Also the message
when handling exception wasn't precise enough for all cases. For example,
lack of permission to write to existing data directory.

Fixes #883.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <b2dc75010a06f16ab1b676ce905ae12e930a700a.1478542388.git.raphaelsc@scylladb.com>
(cherry picked from commit 9a9f0d3a0f)
2016-11-16 15:13:14 +02:00
Amnon Heiman
419045548d API: cache_capacity should use uint for summing
Using integer as a type for the map_reduce causes number over overflow.

Fixes #1801

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1479299425-782-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit a4be7afbb0)
2016-11-16 15:04:07 +02:00
Paweł Dziepak
25830dd6ac partition_version: make sure that snapshot is destroyed under LSA
Snapshot destructor may free some objects managed by the LSA. That's why
partition_snapshot_reader destructor explicitly destroys the snapshot it
uses. However, it was possible that exception thrown by _read_section
prevented that from happenning making snapshot destoryed implicitly
without current allocator set to LSA.

Refs #1831.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1478778570-2795-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit f16d6f9c40)
2016-11-16 12:47:38 +00:00
Paweł Dziepak
7a6dd3c56d query_pagers: distinct queries do not have clustering keys
Query pager needs to handle results that contain partitions with
possibly multiple clustering rows quite differently than results with
just one row per partition (for example a page may end in a middle of
partition). However, the logic dealing with partitions with clustering
rows doesn't work correctly for SELECT DISTINCT queries, which are
much more similar to the ones for schemas without clustering key.

The solution is to set _has_clustering_keys to false in case of SELECT
DISTINCT queries regardless of the schema which will make pager
correctly expect each partition to return at most one rows.

Fixes #1822.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1478612486-13421-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 055d78ee4c)
2016-11-16 10:17:55 +01:00
Paweł Dziepak
739bc54246 row_cache: touch entries read during range queries
Fixes #1847.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1479230809-27547-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 999dafbe57)
2016-11-15 20:39:05 +00:00
Avi Kivity
092b214a2e Merge "Fixes for histogram and moving average calculations" from Glauber
"JMX metrics were found to be either not showing, or showing absurd
values.  Turns out there were multiple things wrong with them. The
patches were sent separately but conflict with one another. This series
is a collection of the patches needed to fix the issues we saw.

Fixes #1832, #1836, #1837"

(cherry picked from commit bf20aa722b)
2016-11-13 11:43:09 +02:00
Amnon Heiman
5cca752ebb API: fix a type in storage_proxy
This patch fixes a typo in the URL definition, causing the metric in the
jmx not to find it.

Fixes #1821

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1478563869-20504-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit c8082ccadb)
2016-11-13 09:24:00 +02:00
Paweł Dziepak
5990044158 Merge "Remove quadratic behavior from atomic sstable deletion" from Avi
"The atomic sstable deletion provides exception safety at the cost of
quadratic behavior in the number of sstables awaiting deletion.  This
causes high cpu utilization during startup.

Change the code to avoid quadratic complexity, and add some unit tests.

See #1812."

(cherry picked from commit 985d2f6d4a)
2016-11-13 00:14:19 +02:00
Glauber Costa
e8c804f4f9 histogram: moving averages: fix inverted parameters
moving_averages constructor is defined like this:

    moving_average(latency_counter::duration interval, latency_counter::duration tick_interval)

But when it is time to initialize them, we do this:

	... {tick_interval(), std::chrono::minutes(1)} ...

As it can be seen, the interval and tick interval are inverted. This
leads to the metrics being assigned bogus values.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <d83f09eed20ea2ea007d120544a003b2e0099732.1478798595.git.glauber@scylladb.com>
(cherry picked from commit d3f11fbabf)
2016-11-11 10:15:55 +02:00
Pekka Enberg
ce9468a95d abstract_replication_strategy: Fix exception type if class not found
Change abstract_replication_strategy::create_replication_strategy() to
throw exceptions::configuration_error if replication strategy class
lookup to make sure the error is converted to the correct CQL response.

Fixes #1755

Message-Id: <1476361262-28723-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 3b4e6cdc5e)
2016-11-07 15:11:06 +02:00
Calle Wilund
a7616b9116 auth::password_authenticator: Ensure exceptions are processed in continuation
Fixes #1718 (even more)
Message-Id: <1475497389-27016-1-git-send-email-calle@scylladb.com>

(cherry picked from commit 5b815b81b4)
2016-11-07 09:23:38 +02:00
Calle Wilund
2f8a846b46 auth::password_authenticator: "authenticate" should not throw undeclared excpt
Fixes #1718

Message-Id: <1475487331-25927-1-git-send-email-calle@scylladb.com>
(cherry picked from commit d24d0f8f90)
2016-11-07 09:23:33 +02:00
Pekka Enberg
5ef32112ae release: prepare for 1.4.1 2016-11-07 09:17:46 +02:00
Pekka Enberg
36ad8a8fd8 cql3: Fix selecting same column multiple times
Under the hood, the selectable::add_and_get_index() function
deliberately filters out duplicate columns. This causes
simple_selector::get_output_row() to return a row with all duplicate
columns filtered out, which triggers and assertion because of row
mismatch with metadata (which contains the duplicate columns).

The fix is rather simple: just make selection::from_selectors() use
selection_with_processing if the number of selectors and column
definitions doesn't match -- like Apache Cassandra does.

Fixes #1367
Message-Id: <1477989740-6485-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit e1e8ca2788)
2016-11-01 09:34:22 +00:00
Pekka Enberg
ef012856a5 release: prepare for 1.4.0 2016-10-31 14:04:54 +02:00
Avi Kivity
e87bed5816 Update seastar submodule
* seastar b7be36a...28aeb47 (1):
  > rpc: Avoid using zero-copy interface of output_stream (Fixes #1786)
2016-10-28 14:15:02 +03:00
Pekka Enberg
577ffc5851 auth: Fix resource level handling
We use `data_resource` class in the CQL parser, which let's users refer
to a table resource without specifying a keyspace. This asserts out in
get_level() for no good reason as we already know the intented level
based on the constructor. Therefore, change `data_resource` to track the
level like upstream Cassandra does and use that.

Fixes #1790

Message-Id: <1477599169-2945-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit b54870764f)
2016-10-27 23:37:57 +03:00
Glauber Costa
a4fffc9c5d auth: always convert string to upper case before comparing
We store all auth perm strings in upper case, but the user might very
well pass this in upper case.

We could use a standard key comparator / hash here, but since the
strings tend to be small, the new sstring will likely be allocated in
the stack here and this approach yields significantly less code.

Fixes #1791.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <51df92451e6e0a6325a005c19c95eaa55270da61.1477594199.git.glauber@scylladb.com>
(cherry picked from commit ef3c7ab38e)
2016-10-27 22:09:42 +03:00
Pekka Enberg
10ba47674a release: prepare for 1.4.rc3 2016-10-26 12:20:13 +03:00
Tomasz Grabiec
1c278d9abf Update seastar submodule
* seastar 810ef2b...b7be36a (2):
  > rpc: Fix crash during connection teardown
  > rpc: Move _connected flag to protocol::connection
2016-10-26 10:03:58 +02:00
Tomasz Grabiec
be0b5ad962 Merge seastar upstream
* seastar 742eb00...810ef2b (1):
  > rpc: Do not close client connection on error response for a timed out request

Refs #1778
2016-10-25 13:55:19 +02:00
Vlad Zolotarov
707b59100c service::storage_proxy: use global_trace_state_ptr when using invoke_on
When trace_state may migrate to a different shard a global_trace_state_ptr
has to be used.

This patch completes the patch below:

commit 7e180c7bd3
Author: Vlad Zolotarov <vladz@cloudius-systems.com>
Date:   Tue Sep 20 19:09:27 2016 +0300

    tracing: introduce the tracing::global_trace_state_ptr class

Fixes #1770

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1476993537-27388-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit f75a350a8f)
2016-10-25 11:36:16 +03:00
Takuya ASADA
dc8fa5090d dist/ami: fix incorrect /etc/fstab entry on CentOS7 base image
There was incorrect rootfs entry on /etc/fstab:
 /dev/sda1 / xfs defaults,noatime 1 1
This causes boot error when updated to new kernel.
(see:
https://github.com/scylladb/scylla/issues/1597#issuecomment-250243187)

So replaced the entry to
 UUID=<uuid>  / xfs defaults,noatime 1 1
Also all recent security updates applied.

Fixes #1597
Fixes #1707

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475094957-9464-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 80e3d8286c)
2016-10-20 11:53:04 +03:00
Avi Kivity
ea9a8e7f65 Update seastar submodule
* seastar c960804...742eb00 (1):
  > rpc: Add missing adjustment of snd_buf::size

Fixes #1767.
2016-10-19 19:45:16 +03:00
Tomasz Grabiec
8d91b8652f partition_version: Fix corruption of partition_version list
The move constructor of partition_version was not invoking move
constructor of anchorless_list_base_hook. As a result, when
partition_version objects were moved, e.g. during LSA compaction, they
were unlinked from their lists.

This can make readers return invalid data, because not all versions
will be reachable.

It also casues leaks of the versions which are not directly attached
to memtable entry. This will trigger assertion failure in LSA region
destructor. This assetion triggers with row cache disabled. With cache
enabled (default) all segments are merged into the cache region, which
currently is not destroyed on shutdown, so this problem would go
unnoticed. With cache disabled, memtable region is destroyed after
memtable is flushed and after all readers stop using that memtable.

Fixes #1753.
Message-Id: <1476778472-5711-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit fe387f8ba0)
2016-10-18 10:58:47 +02:00
Pekka Enberg
830df18df5 release: prepare for 1.4.0 2016-10-14 14:37:13 +03:00
Amnon Heiman
ced171c28b scylla_setup: Reorder questions and actions
The expected behaviour in the scylla_setup script is that a question
will be followed by the answer.

For example, after asking if the scylla should be run as a service the
relevant actions will be taken before the following question.

This patch address two such mis-orders:
1. the scylla-housekeeping depends on the scylla-server, but the
setup should first setup the scylla-server service and only then ask
(and install if needed) the scylla-housekeeping.
2. The node_exporter should be placed after the io_setup is done.

Fixes #1739

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1476370098-25617-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit 7829da13b4)
2016-10-13 18:29:52 +03:00
Avi Kivity
34bf40b552 Merge "node_exporter service on ubuntu 16" from Amnon
"This series address two issues that interfere with running the node_exporter as a service in ubuntu 16.
1. The service file should be packed in the deb file
2. When setting the node_exporter as a service it doesn't need to run with scylla use"

* 'amnon/node_exporter_ubuntu_v2' of github.com:cloudius-systems/seastar-dev:
  node-exporter service: No need to run as scylla user
  debian package: Include the node_exporter service file

(cherry picked from commit 1506b06617)
2016-10-13 15:54:41 +03:00
Avi Kivity
a68d829644 Update seastar submodule
* seastar f9f4746...c960804 (1):
  > Merge "rometheus API with grafana uses labels" from Amnon
2016-10-13 15:53:51 +03:00
Takuya ASADA
551c4ff965 dist/common/script/scylla_io_setup: handle comma correctly when parsing cpuset
The script mistakenly split value at "," when cpuset list is separated
by comma. Instead of matching possible patterns of the argument, let's
pass all characters until reach to space delimiter or end of line.

Fixes #1716

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1476171037-32373-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit ccad720bb1)
2016-10-11 10:43:17 +03:00
Pekka Enberg
19b35e812b release: prepare for 1.4.rc2 2016-10-10 16:09:16 +03:00
Takuya ASADA
d9ac058bff dist/ubuntu: add realpath to dependency, requires for scylla_setup
We need dependency to realpath, since scylla_setup using it.

Fixes #1740.

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475788340-22939-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 8452045b85)
2016-10-10 15:59:10 +03:00
Pekka Enberg
766367a6c5 dist/docker: Use Scylla 1.4 RPM repository 2016-10-10 15:21:08 +03:00
Pekka Enberg
7ac9b6e9ca docs/docker: Tag --listen-address as 1.4 feature
The Docker Hub documentation is the same for all image versions. Tag
`--listen-address` as 1.4 feature.

Message-Id: <1475819164-7865-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 3b75ff1496)
2016-10-10 14:34:04 +03:00
Vlad Zolotarov
b5bab524e1 api::storage_service::slow_query: don't use duration_cast in GET
The slow_query_record_ttl() and slow_query_threshold() return the duration
of the appropriate type already - no need for an additional cast.
In addition there was a mistake in a cast of ttl.

Fixes #1734

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1475669400-5925-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 006999f46c)
2016-10-09 19:27:09 +03:00
Takuya ASADA
f4bb52096b dist/common/scripts/scylla_setup: use 'swapon -s' instead of 'swapon --show'
Since Ubuntu 14.04 doesn't supported --show option, we need to prevent use it.
Fixes #1740

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475788340-22939-2-git-send-email-syuu@scylladb.com>
(cherry picked from commit 469e9af1f4)
2016-10-09 19:27:07 +03:00
Raphael S. Carvalho
af26e7a691 lcs: fix starvation at higher levels
When max sstable size is increased, higher levels are suffering from
starvation because we decide to compact a given level if the following
calculation results in a number greater than 1.001:
level_size(L) / max_size_for_level_l(L)

Fixes #1720.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit a8ab4b8f37)
2016-10-09 11:07:48 +03:00
Raphael S. Carvalho
770c982541 lcs: fix broken token range distribution at higher levels
Uniform token range distribution across sstables in a level > 1 was broken,
because we were only choosing sstable with lowest first key, when compacting
a level > 0. This resulted in performance problem because L1->L2 may have a
huge overlap over time, for example.
Last compacted key will now be stored for each level to ensure sort of
"round robin" selection of sstables for compactions at level >= 1.
That's also done by C*, and they were once affected by it as described in
https://issues.apache.org/jira/browse/CASSANDRA-6284.

Fixes #1719.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit a3bf7558f2)
2016-10-09 11:07:39 +03:00
Tomasz Grabiec
2e22c027b2 db: Do not timeout streaming readers
There is a limit to concurrency of sstable readers on each shard. When
this limit is exhausted (currently 100 readers) readers queue. There
is a timeout after which queued readers are failed, equal to
read_request_timeout_in_ms (5s by default). The reason we have the
timeout here is primarily because the readers created for the purpose
of serving a CQL request no longer need to execute after waiting
longer than read_request_timeout_in_ms. The coordinator no longer
waits for the result so there is no point in proceeding with the read.

This timeout should not apply for readers created for streaming. The
streaming client currently times out after 10 minutes, so we could
wait at least that long. Timing out sooner makes streaming unreliable,
which under high load may prevent streaming from completing.

The change sets no timeout for streaming readers at replica level,
similarly as we do for system tables readers.

Fixes #1741.

Message-Id: <1475840678-25606-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 2a5a90f391)
2016-10-09 10:33:00 +03:00
Avi Kivity
d2c0a5c318 Update seastar submodule
* seastar c2489c6...f9f4746 (2):
  > prometheus CPU should start in 0
  > Collectd: bytes ordering depends on the type

Fixes #1726.
Fixes #1727.
2016-10-06 11:26:17 +03:00
Pekka Enberg
52e2688c7b Update seastar submodule
* seastar 777ab50...c2489c6 (1):
  > Merge "Fix signal mask corruption" from Tomasz
2016-10-05 12:39:45 +03:00
Pekka Enberg
08b71b25c1 dist/docker: Add '--listen-address' to 'docker run'
Add a '--listen-address' command line parameter to the Docker image,
which can be used to set Scylla's listen address.

Refs #1723

Message-Id: <1475485165-6772-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit c3bebea1ef)
2016-10-04 14:08:36 +03:00
Avi Kivity
9682b36cdc Update seastar submodule
* seastar 9e1d5db...777ab50 (1):
  > prometheus: remove invalid chars from meric names

Fixes #1710.
2016-10-02 11:39:52 +03:00
Avi Kivity
e953955515 seastar: switch to branch-1.4 on scylla-seastar submodule
This allows us to backport individual commits, rather than the entire
master branch.
2016-10-02 11:37:48 +03:00
Tomasz Grabiec
c1f93c461c transport: Extend request memory footprint accounting to also cover execution
CQL server is supposed to throttle requests so that they don't
overflow memory. The problem is that it currently accounts for
request's memory only around reading of its frame from the connection
and not actual request execution. As a result too many requests may be
allowed to execute and we may run out of memory.

Fixes #1708.
Message-Id: <1475149302-11517-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 7e25b958ac)
2016-10-02 11:34:35 +03:00
Vlad Zolotarov
c0d32a8297 tracing: introduce the tracing::global_trace_state_ptr class
This object, similarly to a global_schema_ptr, allows to dynamically
create the trace_state_ptr objects on different shards in a context
of the original tracing session.

This object would create a secondary tracing session object from the
original trace_state_ptr object when a trace_state_ptr object is needed
on a "remote" shard, similarly to what we do when we need it on a remote
Node.

Fixes #1678
Fixes #1647

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1474387767-21910-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 7e180c7bd3)
2016-10-02 11:33:16 +03:00
Paweł Dziepak
a9bd9289a4 query_pagers: fix clustering key range calculation
Paging code assumes that clustering row range [a, a] contains only one
row which may not be true. Another problem is that it tries to use
range<> interface for dealing with clustering key ranges which doesn't
work because of the lack of correct comparator.

Refs #1446.
Fixes #1684.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1475236805-16223-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit eb1fcf3ecc)
2016-10-02 10:55:43 +03:00
Takuya ASADA
a2feaa998c dist/redhat: add missing build time dependency for libunwind
There was missing dependency for libunwind, so add it.
Fixes #1722

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475260099-25881-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 9639cc840e)
2016-09-30 21:34:20 +03:00
Takuya ASADA
11950dcba3 dist/ubuntu: add missing build time dependency for libunwind
There was missing dependency for libunwind, so add it.
Fixes #1721

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475255706-26434-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit c89d9599b1)
2016-09-30 21:34:13 +03:00
Pekka Enberg
08ce047792 release: prepare for 1.4.rc1 2016-09-30 13:59:46 +03:00
52 changed files with 890 additions and 376 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.4.3
if test -f version
then

View File

@@ -777,7 +777,7 @@
]
},
{
"path": "/storage_proxy/metrics/read/moving_avrage_histogram",
"path": "/storage_proxy/metrics/read/moving_average_histogram",
"operations": [
{
"method": "GET",
@@ -792,7 +792,7 @@
]
},
{
"path": "/storage_proxy/metrics/range/moving_avrage_histogram",
"path": "/storage_proxy/metrics/range/moving_average_histogram",
"operations": [
{
"method": "GET",
@@ -942,7 +942,7 @@
]
},
{
"path": "/storage_proxy/metrics/write/moving_avrage_histogram",
"path": "/storage_proxy/metrics/write/moving_average_histogram",
"operations": [
{
"method": "GET",

View File

@@ -194,7 +194,7 @@ void set_cache_service(http_context& ctx, routes& r) {
});
cs::get_row_capacity.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
return map_reduce_cf(ctx, uint64_t(0), [](const column_family& cf) {
return cf.get_row_cache().get_cache_tracker().region().occupancy().used_space();
}, std::plus<uint64_t>());
});

View File

@@ -684,8 +684,8 @@ void set_storage_service(http_context& ctx, routes& r) {
ss::get_slow_query_info.set(r, [](const_req req) {
ss::slow_query_info res;
res.enable = tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled();
res.ttl = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_record_ttl()).count() ;
res.threshold = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_threshold()).count();
res.ttl = tracing::tracing::get_local_tracing_instance().slow_query_record_ttl().count() ;
res.threshold = tracing::tracing::get_local_tracing_instance().slow_query_threshold().count();
return res;
});

View File

@@ -47,11 +47,8 @@
const sstring auth::data_resource::ROOT_NAME("data");
auth::data_resource::data_resource(level l, const sstring& ks, const sstring& cf)
: _ks(ks), _cf(cf)
: _level(l), _ks(ks), _cf(cf)
{
if (l != get_level()) {
throw std::invalid_argument("level/keyspace/column mismatch");
}
}
auth::data_resource::data_resource()
@@ -67,14 +64,7 @@ auth::data_resource::data_resource(const sstring& ks, const sstring& cf)
{}
auth::data_resource::level auth::data_resource::get_level() const {
if (!_cf.empty()) {
assert(!_ks.empty());
return level::COLUMN_FAMILY;
}
if (!_ks.empty()) {
return level::KEYSPACE;
}
return level::ROOT;
return _level;
}
auth::data_resource auth::data_resource::from_name(

View File

@@ -56,6 +56,7 @@ private:
static const sstring ROOT_NAME;
level _level;
sstring _ks;
sstring _cf;

View File

@@ -218,12 +218,12 @@ future<::shared_ptr<auth::authenticated_user> > auth::password_authenticator::au
// obsolete prepared statements pretty quickly.
// Rely on query processing caching statements instead, and lets assume
// that a map lookup string->statement is not gonna kill us much.
auto& qp = cql3::get_local_query_processor();
return qp.process(
sprint("SELECT %s FROM %s.%s WHERE %s = ?", SALTED_HASH,
auth::AUTH_KS, CREDENTIALS_CF, USER_NAME),
consistency_for_user(username), { username }, true).then_wrapped(
[=](future<::shared_ptr<cql3::untyped_result_set>> f) {
return futurize_apply([this, username, password] {
auto& qp = cql3::get_local_query_processor();
return qp.process(sprint("SELECT %s FROM %s.%s WHERE %s = ?", SALTED_HASH,
auth::AUTH_KS, CREDENTIALS_CF, USER_NAME),
consistency_for_user(username), {username}, true);
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
try {
auto res = f.get0();
if (res->empty() || !checkpw(password, res->one().get_as<sstring>(SALTED_HASH))) {
@@ -234,6 +234,8 @@ future<::shared_ptr<auth::authenticated_user> > auth::password_authenticator::au
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));
}
});
}

View File

@@ -40,6 +40,7 @@
*/
#include <unordered_map>
#include <boost/algorithm/string.hpp>
#include "permission.hh"
const auth::permission_set auth::permissions::ALL_DATA =
@@ -75,7 +76,9 @@ const sstring& auth::permissions::to_string(permission p) {
}
auth::permission auth::permissions::from_string(const sstring& s) {
return permission_names.at(s);
sstring upper(s);
boost::to_upper(upper);
return permission_names.at(upper);
}
std::unordered_set<sstring> auth::permissions::to_strings(const permission_set& set) {

View File

@@ -54,6 +54,10 @@ public:
// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
// Some strategies may look at the compacted and resulting sstables to
// get some useful information for subsequent compactions.
void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added);
// Return if parallel compaction is allowed by strategy.
bool parallel_compaction() const;

View File

@@ -222,6 +222,7 @@ scylla_tests = [
'tests/database_test',
'tests/nonwrapping_range_test',
'tests/input_stream_test',
'tests/sstable_atomic_deletion_test',
]
apps = [
@@ -309,6 +310,7 @@ scylla_core = (['database.cc',
'sstables/compaction.cc',
'sstables/compaction_strategy.cc',
'sstables/compaction_manager.cc',
'sstables/atomic_deletion.cc',
'transport/event.cc',
'transport/event_notifier.cc',
'transport/server.cc',

View File

@@ -232,7 +232,7 @@ uint32_t selection::add_column_for_ordering(const column_definition& c) {
raw_selector::to_selectables(raw_selectors, schema), db, schema, defs);
auto metadata = collect_metadata(schema, raw_selectors, *factories);
if (processes_selection(raw_selectors)) {
if (processes_selection(raw_selectors) || raw_selectors.size() != defs.size()) {
return ::make_shared<selection_with_processing>(schema, std::move(defs), std::move(metadata), std::move(factories));
} else {
return ::make_shared<simple_selection>(schema, std::move(defs), std::move(metadata), false);

View File

@@ -427,8 +427,14 @@ column_family::make_sstable_reader(schema_ptr s,
tracing::trace_state_ptr trace_state) const {
// restricts a reader's concurrency if the configuration specifies it
auto restrict_reader = [&] (mutation_reader&& in) {
if (_config.read_concurrency_config.sem) {
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
auto&& config = [this, &pc] () -> const restricted_mutation_reader_config& {
if (service::get_local_streaming_read_priority().id() == pc.id()) {
return _config.streaming_read_concurrency_config;
}
return _config.read_concurrency_config;
}();
if (config.sem) {
return make_restricted_reader(config, 1, std::move(in));
} else {
return std::move(in);
}
@@ -1241,10 +1247,17 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
// Second, delete the old sstables. This is done in the background, so we can
// consider this compaction completed.
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
auto current_sstables = _sstables;
auto new_sstable_list = make_lw_shared<sstable_list>();
return sstables::delete_atomically(sstables_to_remove).then_wrapped([this, sstables_to_remove] (future<> f) {
std::exception_ptr eptr;
try {
f.get();
} catch(...) {
eptr = std::current_exception();
}
// unconditionally remove compacted sstables from _sstables_compacted_but_not_deleted,
// or they could stay forever in the set, resulting in deleted files remaining
// opened and disk space not being released until shutdown.
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
@@ -1252,6 +1265,11 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
});
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
rebuild_statistics();
if (eptr) {
return make_exception_future<>(eptr);
}
return make_ready_future<>();
}).handle_exception([] (std::exception_ptr e) {
try {
std::rethrow_exception(e);
@@ -1283,6 +1301,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
};
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
_compaction_strategy.notify_completion(*sstables_to_compact, new_sstables);
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
});
});
@@ -2070,6 +2089,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.dirty_memory_manager = _config.dirty_memory_manager;
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
@@ -2559,6 +2579,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
++_stats->sstable_read_queue_overloaded;
throw std::runtime_error("sstable inactive read queue overloaded");
};
cfg.streaming_read_concurrency_config = cfg.read_concurrency_config;
cfg.streaming_read_concurrency_config.timeout = {};
cfg.cf_stats = &_cf_stats;
cfg.enable_incremental_backups = _enable_incremental_backups;
return cfg;

View File

@@ -325,6 +325,7 @@ public:
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
uint64_t max_cached_partition_size_in_bytes;
};
@@ -879,6 +880,7 @@ public:
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
};
private:

View File

@@ -1557,6 +1557,15 @@ db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func ne
subscription<temporary_buffer<char>, db::replay_position>
db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type off) {
struct work {
private:
file_input_stream_options make_file_input_stream_options() {
file_input_stream_options fo;
fo.buffer_size = db::commitlog::segment::default_size;
fo.read_ahead = 10;
fo.io_priority_class = service::get_local_commitlog_priority();
return fo;
}
public:
file f;
stream<temporary_buffer<char>, replay_position> s;
input_stream<char> fin;
@@ -1572,7 +1581,7 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
bool header = true;
work(file f, position_type o = 0)
: f(f), fin(make_file_input_stream(f)), start_off(o) {
: f(f), fin(make_file_input_stream(f, o, make_file_input_stream_options())), start_off(o) {
}
work(work&&) = default;
@@ -1755,6 +1764,8 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
throw segment_data_corruption_error("Data corruption", corrupt_size);
}
});
}).finally([this] {
return fin.close();
});
}
};

View File

@@ -33,7 +33,7 @@ done
. /etc/os-release
case "$ID" in
"centos")
AMI=ami-f3102499
AMI=ami-4e1d5b59
REGION=us-east-1
SSH_USERNAME=centos
;;

View File

@@ -44,8 +44,8 @@ output_to_user()
}
if [ `is_developer_mode` -eq 0 ]; then
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([^ ]*\).*$/\2/"`
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[^ ]*\).*$/\1/"`
if [ $AMI_OPT -eq 1 ]; then
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`

View File

@@ -30,6 +30,6 @@ else
else
echo "Please upgrade to a newer kernel version."
fi
echo " see http://docs.scylladb.com/kb/kb-fs-not-qualified-aio/ for details"
echo " see http://www.scylladb.com/kb/kb-fs-not-qualified-aio/ for details"
fi
exit $RET

View File

@@ -84,7 +84,7 @@ get_unused_disks() {
if [ -f /usr/sbin/pvs ]; then
count_pvs=$(pvs|grep $dev|wc -l)
fi
count_swap=$(swapon --show |grep `realpath $dev`|wc -l)
count_swap=$(swapon -s |grep `realpath $dev`|wc -l)
if [ $count_raw -eq 0 -a $count_pvs -eq 0 -a $count_swap -eq 0 ]; then
echo -n "$dev "
fi
@@ -226,31 +226,32 @@ fi
if [ $INTERACTIVE -eq 1 ]; then
interactive_ask_service "Do you want to enable ScyllaDB services?" "Answer yes to automatically start Scylla when the node boots; answer no to skip this step." "yes" &&:
ENABLE_SERVICE=$?
if [ $ENABLE_SERVICE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
ENABLE_CHECK_VERSION=$?
fi
fi
if [ $ENABLE_SERVICE -eq 1 ]; then
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
systemctl enable scylla-server.service
systemctl enable collectd.service
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
systemctl unmask scylla-housekeeping.timer
else
systemctl mask scylla-housekeeping.timer
systemctl stop scylla-housekeeping.timer || true
fi
fi
if [ $INTERACTIVE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
ENABLE_CHECK_VERSION=$?
fi
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
printf "[housekeeping]\ncheck-version: True\n" > /etc/scylla.d/housekeeping.cfg
fi
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
systemctl unmask scylla-housekeeping.timer
fi
else
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
printf "[housekeeping]\ncheck-version: False\n" > /etc/scylla.d/housekeeping.cfg
fi
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
systemctl mask scylla-housekeeping.timer
systemctl stop scylla-housekeeping.timer || true
fi
fi
fi
@@ -374,6 +375,10 @@ if [ $INTERACTIVE -eq 1 ]; then
IO_SETUP=$?
fi
if [ $IO_SETUP -eq 1 ]; then
/usr/lib/scylla/scylla_io_setup
fi
if [ $INTERACTIVE -eq 1 ]; then
interactive_ask_service "Do you want to install node exporter, that exports prometheus data from the node?" "Answer yes to install it; answer no to skip this installation." "yes" &&:
NODE_EXPORTER=$?
@@ -383,10 +388,6 @@ if [ $NODE_EXPORTER -eq 1 ]; then
/usr/lib/scylla/node_exporter_install
fi
if [ $IO_SETUP -eq 1 ]; then
/usr/lib/scylla/scylla_io_setup
fi
if [ $DEV_MODE -eq 1 ]; then
/usr/lib/scylla/scylla_dev_mode_setup --developer-mode 1
fi

View File

@@ -3,8 +3,6 @@ Description=Node Exporter
[Service]
Type=simple
User=scylla
Group=scylla
ExecStart=/usr/bin/node_exporter
[Install]

View File

@@ -4,7 +4,8 @@ After=scylla-server.service
BindsTo=scylla-server.service
[Timer]
OnBootSec=0
# set OnActiveSec to 3 to safely avoid issues/1846
OnActiveSec=3
OnUnitActiveSec=1d
[Install]

View File

@@ -7,7 +7,7 @@ ENV container docker
VOLUME [ "/sys/fs/cgroup" ]
#install scylla
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.4.repo -o /etc/yum.repos.d/scylla.repo
RUN yum -y install epel-release
RUN yum -y clean expire-cache
RUN yum -y update

View File

@@ -9,6 +9,7 @@ def parse():
parser.add_argument('--smp', default=None, help="e.g --smp 2 to use two CPUs")
parser.add_argument('--memory', default=None, help="e.g. --memory 1G to use 1 GB of RAM")
parser.add_argument('--overprovisioned', default='0', choices=['0', '1'], help="run in overprovisioned environment")
parser.add_argument('--listen-address', default=None, dest='listenAddress')
parser.add_argument('--broadcast-address', default=None, dest='broadcastAddress')
parser.add_argument('--broadcast-rpc-address', default=None, dest='broadcastRpcAddress')
return parser.parse_args()

View File

@@ -8,6 +8,7 @@ class ScyllaSetup:
self._developerMode = arguments.developerMode
self._seeds = arguments.seeds
self._cpuset = arguments.cpuset
self._listenAddress = arguments.listenAddress
self._broadcastAddress = arguments.broadcastAddress
self._broadcastRpcAddress = arguments.broadcastRpcAddress
self._smp = arguments.smp
@@ -31,14 +32,15 @@ class ScyllaSetup:
def scyllaYAML(self):
configuration = yaml.load(open('/etc/scylla/scylla.yaml'))
IP = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
configuration['listen_address'] = IP
configuration['rpc_address'] = IP
if self._listenAddress is None:
self._listenAddress = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
configuration['listen_address'] = self._listenAddress
configuration['rpc_address'] = self._listenAddress
if self._seeds is None:
if self._broadcastAddress is not None:
self._seeds = self._broadcastAddress
else:
self._seeds = IP
self._seeds = self._listenAddress
configuration['seed_provider'] = [
{'class_name': 'org.apache.cassandra.locator.SimpleSeedProvider',
'parameters': [{'seeds': self._seeds}]}

View File

@@ -27,7 +27,7 @@ Group: Applications/Databases
Summary: The Scylla database server
License: AGPLv3
URL: http://www.scylladb.com/
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel
%{?fedora:BuildRequires: boost-devel ninja-build ragel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
%{?rhel:BuildRequires: scylla-libstdc++-static scylla-boost-devel scylla-ninja-build scylla-ragel scylla-antlr3-tool scylla-antlr3-C++-devel python34 scylla-gcc-c++ >= 5.1.1, python34-pyparsing}
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl bc util-linux
@@ -108,8 +108,8 @@ cp -r scylla-housekeeping $RPM_BUILD_ROOT%{_prefix}/lib/scylla/scylla-housekeepi
cp -P dist/common/sbin/* $RPM_BUILD_ROOT%{_sbindir}/
%pre server
/usr/sbin/groupadd scylla 2> /dev/null || :
/usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
getent group scylla || /usr/sbin/groupadd scylla 2> /dev/null || :
getent passwd scylla || /usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
%post server
# Upgrade coredump settings

View File

@@ -78,13 +78,13 @@ cp dist/ubuntu/scylla-server.install.in debian/scylla-server.install
if [ "$RELEASE" = "14.04" ]; then
sed -i -e "s/@@DH_INSTALLINIT@@/--upstart-only/g" debian/rules
sed -i -e "s/@@COMPILER@@/g++-5/g" debian/rules
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5/g" debian/control
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5, libunwind8-dev/g" debian/control
sed -i -e "s#@@INSTALL@@#dist/ubuntu/sudoers.d/scylla etc/sudoers.d#g" debian/scylla-server.install
sed -i -e "s#@@HKDOTTIMER@@##g" debian/scylla-server.install
else
sed -i -e "s/@@DH_INSTALLINIT@@//g" debian/rules
sed -i -e "s/@@COMPILER@@/g++/g" debian/rules
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++/g" debian/control
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++, libunwind-dev/g" debian/control
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
sed -i -e "s#@@HKDOTTIMER@@#dist/common/systemd/scylla-housekeeping.timer /lib/systemd/system#g" debian/scylla-server.install
fi
@@ -102,6 +102,7 @@ fi
cp dist/common/systemd/scylla-server.service.in debian/scylla-server.service
sed -i -e "s#@@SYSCONFDIR@@#/etc/default#g" debian/scylla-server.service
cp dist/common/systemd/scylla-housekeeping.service debian/scylla-server.scylla-housekeeping.service
cp dist/common/systemd/node-exporter.service debian/scylla-server.node-exporter.service
if [ "$RELEASE" = "14.04" ] && [ $REBUILD -eq 0 ]; then
if [ ! -f /etc/apt/sources.list.d/scylla-3rdparty-trusty.list ]; then

View File

@@ -16,7 +16,7 @@ Conflicts: scylla-server (<< 1.1)
Package: scylla-server
Architecture: amd64
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, @@DEPENDS@@
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, realpath, @@DEPENDS@@
Description: Scylla database server binaries
Scylla is a highly scalable, eventually consistent, distributed,
partitioned row DB.

View File

@@ -12,6 +12,7 @@ override_dh_auto_clean:
override_dh_installinit:
dh_installinit --no-start @@DH_INSTALLINIT@@
dh_installinit --no-start --name scylla-housekeeping @@DH_INSTALLINIT@@
dh_installinit --no-start --name node-exporter @@DH_INSTALLINIT@@
override_dh_strip:
dh_strip --dbg-package=scylla-server-dbg

View File

@@ -97,6 +97,18 @@ For example, to configure Scylla to run with two seed nodes `192.168.0.100` and
$ docker run --name some-scylla -d scylladb/scylla --seeds 192.168.0.100,192.168.0.200
```
### `--listen-address ADDR`
The `--listen-address` command line option configures the IP address the Scylla instance listens for client connections.
For example, to configure Scylla to use listen address `10.0.0.5`:
```console
$ docker run --name some-scylla -d scylladb/scylla --listen-address 10.0.0.5
```
**Since: 1.4**
### `--broadcast-address ADDR`
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.

View File

@@ -54,14 +54,17 @@ static void unwrap_first_range(std::vector<range<token>>& ret) {
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, const std::map<sstring, sstring>& config_options) {
assert(locator::i_endpoint_snitch::get_local_snitch_ptr());
return create_object<abstract_replication_strategy,
const sstring&,
token_metadata&,
snitch_ptr&,
const std::map<sstring, sstring>&>
(strategy_name, ks_name, tk_metadata,
locator::i_endpoint_snitch::get_local_snitch_ptr(), config_options);
try {
return create_object<abstract_replication_strategy,
const sstring&,
token_metadata&,
snitch_ptr&,
const std::map<sstring, sstring>&>
(strategy_name, ks_name, tk_metadata,
locator::i_endpoint_snitch::get_local_snitch_ptr(), config_options);
} catch (const no_such_class& e) {
throw exceptions::configuration_exception(e.what());
}
}
void abstract_replication_strategy::validate_replication_strategy(const sstring& ks_name,

View File

@@ -183,8 +183,8 @@ public:
throw;
}
});
} catch (std::system_error& e) {
startlog.error("Directory '{}' not found. Tried to created it but failed: {}", path, e.what());
} catch (...) {
startlog.error("Directory '{}' cannot be initialized. Tried to do it but failed with: {}", path, std::current_exception());
throw;
}
});

View File

@@ -36,7 +36,8 @@ static void remove_or_mark_as_unique_owner(partition_version* current)
}
partition_version::partition_version(partition_version&& pv) noexcept
: _backref(pv._backref)
: anchorless_list_base_hook(std::move(pv))
, _backref(pv._backref)
, _partition(std::move(pv._partition))
{
if (_backref) {
@@ -326,9 +327,9 @@ partition_snapshot_reader::~partition_snapshot_reader()
try {
_read_section(_lsa_region, [this] {
_snapshot->merge_partition_versions();
_snapshot = {};
});
} catch (...) { }
_snapshot = {};
});
});
}

View File

@@ -375,6 +375,7 @@ public:
++_it;
_last = ce.key();
_cache.upgrade_entry(ce);
_cache._tracker.touch(ce);
cache_data cd { { }, _cache._tracker.continuity_flags_cleared(), ce.continuous() };
if (ce.wide_partition()) {
return ce.read_wide(_cache, _schema, _slice, _pc).then([this, cd = std::move(cd)] (auto smopt) mutable {

Submodule seastar updated: 9e1d5dbc66...7907baee37

View File

@@ -54,7 +54,7 @@ public:
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
std::vector<query::partition_range> ranges)
: _has_clustering_keys(s->clustering_key_size() > 0)
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
, _max(cmd->row_limit)
, _schema(std::move(s))
, _selection(selection)
@@ -65,6 +65,11 @@ public:
{}
private:
static bool has_clustering_keys(const schema& s, const query::read_command& cmd) {
return s.clustering_key_size() > 0
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
}
future<> fetch_page(cql3::selection::result_set_builder& builder, uint32_t page_size, db_clock::time_point now) override {
auto state = _options.get_paging_state();
@@ -124,6 +129,43 @@ private:
logger.trace("Result ranges {}", ranges);
};
// Because of #1446 we don't have a comparator to use with
// range<clustering_key_prefix> which would produce correct results.
// This means we cannot reuse the same logic for dealing with
// partition and clustering keys.
auto modify_ck_ranges = [reversed] (const schema& s, auto& ranges, auto& lo) {
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
typedef typename range_type::bound bound_type;
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
};
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.second : range.first;
};
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.first : range.second;
};
clustering_key_prefix::equality eq(s);
auto it = ranges.begin();
while (it != ranges.end()) {
auto range = bound_view::from_range(*it);
if (cmp(end_bound(range), lo) || eq(end_bound(range).prefix, lo)) {
logger.trace("Remove ck range {}", *it);
it = ranges.erase(it);
continue;
} else if (cmp(start_bound(range), lo)) {
assert(cmp(lo, end_bound(range)));
auto r = reversed ? range_type(it->start(), bound_type { lo, false })
: range_type(bound_type { lo, false }, it->end());
logger.trace("Modify ck range {} -> {}", *it, r);
*it = std::move(r);
}
++it;
}
};
// last ck can be empty depending on whether we
// deserialized state or not. This case means "last page ended on
// something-not-bound-by-clustering" (i.e. a static row, alone)
@@ -136,15 +178,7 @@ private:
if (has_ck) {
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
clustering_key_prefix::less_compare cmp_rt(*_schema);
modify_ranges(row_ranges, ckp, false, [&cmp_rt](auto& c1, auto c2) {
if (cmp_rt(c1, c2)) {
return -1;
} else if (cmp_rt(c2, c1)) {
return 1;
}
return 0;
});
modify_ck_ranges(*_schema, row_ranges, ckp);
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
}

View File

@@ -2496,8 +2496,8 @@ storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr<query::re
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, query::result_request request, tracing::trace_state_ptr trace_state) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, trace_state = std::move(trace_state)] (database& db) mutable {
return db.query(gs, *cmd, request, prv, std::move(trace_state)).then([](auto&& f) {
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
return db.query(gs, *cmd, request, prv, gt).then([](auto&& f) {
return make_foreign(std::move(f));
});
});
@@ -2643,12 +2643,19 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
}
}
// estimate_result_rows_per_range() is currently broken, and this is not needed
// when paging is available in any case
#if 0
// our estimate of how many result rows there will be per-range
float result_rows_per_range = estimate_result_rows_per_range(cmd, ks);
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
// fetch enough rows in the first round
result_rows_per_range -= result_rows_per_range * CONCURRENT_SUBREQUESTS_MARGIN;
int concurrency_factor = result_rows_per_range == 0.0 ? 1 : std::max(1, std::min(int(ranges.size()), int(std::ceil(cmd->row_limit / result_rows_per_range))));
#else
int result_rows_per_range = 0;
int concurrency_factor = 1;
#endif
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
results.reserve(ranges.size()/concurrency_factor + 1);
@@ -3446,14 +3453,14 @@ future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) {
if (pr.is_singular()) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
return _db.invoke_on(shard, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) mutable {
return db.query_mutations(gs, *cmd, pr, std::move(trace_state)).then([] (reconcilable_result&& result) {
return _db.invoke_on(shard, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
});
} else {
return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) {
return db.query_mutations(gs, *cmd, pr, trace_state).then([] (reconcilable_result&& result) {
return _db.map_reduce(mutation_result_merger{ cmd, s }, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) {
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
}).then([] (reconcilable_result&& result) {

140
sstables/atomic_deletion.cc Normal file
View File

@@ -0,0 +1,140 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "atomic_deletion.hh"
#include "to_string.hh"
#include <seastar/core/shared_future.hh>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/copy.hpp>
namespace sstables {
atomic_deletion_manager::atomic_deletion_manager(unsigned shard_count,
std::function<future<> (std::vector<sstring> sstables)> delete_sstables)
: _shard_count(shard_count)
, _delete_sstables(std::move(delete_sstables)) {
}
future<>
atomic_deletion_manager::delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
// runs on shard 0 only
_deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
if (_atomic_deletions_cancelled) {
_deletion_logger.debug("atomic deletions disabled, erroring out");
using boost::adaptors::transformed;
throw atomic_deletion_cancelled(atomic_deletion_set
| transformed(std::mem_fn(&sstable_to_delete::name)));
}
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
auto merged_set = make_lw_shared(pending_deletion());
for (auto&& sst_to_delete : atomic_deletion_set) {
merged_set->names.insert(sst_to_delete.name);
if (!sst_to_delete.shared) {
for (auto shard : boost::irange<shard_id>(0, _shard_count)) {
_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
}
}
new_atomic_deletion_sets.emplace(sst_to_delete.name, merged_set);
}
auto pr = make_lw_shared<promise<>>();
merged_set->completions.insert(pr);
auto ret = pr->get_future();
for (auto&& sst_to_delete : atomic_deletion_set) {
auto i = _atomic_deletion_sets.find(sst_to_delete.name);
// merge from old deletion set to new deletion set
// i->second can be nullptr, see below why
if (i != _atomic_deletion_sets.end() && i->second) {
boost::copy(i->second->names, std::inserter(merged_set->names, merged_set->names.end()));
boost::copy(i->second->completions, std::inserter(merged_set->completions, merged_set->completions.end()));
}
}
_deletion_logger.debug("new atomic set: {}", merged_set->names);
// we need to merge new_atomic_deletion_sets into g_atomic_deletion_sets,
// but beware of exceptions. We do that with a first pass that inserts
// nullptr as the value, so the second pass only replaces, and does not allocate
for (auto&& sst_to_delete : atomic_deletion_set) {
_atomic_deletion_sets.emplace(sst_to_delete.name, nullptr);
}
// now, no allocations are involved, so this commits the operation atomically
for (auto&& n : merged_set->names) {
auto i = _atomic_deletion_sets.find(n);
i->second = merged_set;
}
// Mark each sstable as being deleted from deleting_shard. We have to do
// this in a separate pass, so the consideration whether we can delete or not
// sees all the data from this pass.
for (auto&& sst : atomic_deletion_set) {
_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
}
// Figure out if the (possibly merged) set can be deleted
for (auto&& sst : merged_set->names) {
if (_shards_agreeing_to_delete_sstable[sst].size() != _shard_count) {
// Not everyone agrees, leave the set pending
_deletion_logger.debug("deferring deletion until all shards agree");
return ret;
}
}
// Cannot recover from a failed deletion
for (auto&& name : merged_set->names) {
_atomic_deletion_sets.erase(name);
_shards_agreeing_to_delete_sstable.erase(name);
}
// Everyone agrees, let's delete
auto names = boost::copy_range<std::vector<sstring>>(merged_set->names);
_deletion_logger.debug("deleting {}", names);
return _delete_sstables(names).then_wrapped([this, merged_set] (future<> result) {
_deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
shared_future<> sf(std::move(result));
for (auto&& comp : merged_set->completions) {
sf.get_future().forward_to(std::move(*comp));
}
});
return ret;
}
void
atomic_deletion_manager::cancel_atomic_deletions() {
_atomic_deletions_cancelled = true;
for (auto&& pd : _atomic_deletion_sets) {
if (!pd.second) {
// Could happen if a delete_atomically() failed
continue;
}
for (auto&& c : pd.second->completions) {
c->set_exception(atomic_deletion_cancelled(pd.second->names));
}
// since sets are shared, make sure we don't hit the same one again
pd.second->completions.clear();
}
_atomic_deletion_sets.clear();
_shards_agreeing_to_delete_sstable.clear();
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
// The atomic deletion manager solves the problem of orchestrating
// the deletion of files that must be deleted as a group, where each
// shard has different groups, and all shards delete a file for it to
// be deleted. For example,
//
// shard 0: delete "A"
// we can't delete anything because shard 1 hasn't agreed yet.
// shard 1: delete "A" and B"
// shard 1 agrees to delete "A", but we can't delete it yet,
// because shard 1 requires that it be deleted together with "B",
// and shard 0 hasn't agreed to delete "B" yet.
// shard 0: delete "B" and "C"
// shards 0 and 1 now both agree to delete "A" and "B", but shard 0
// doesn't allow us to delete "B" without "C".
// shard 1: delete "C"
// finally, we can delete "A", "B", and "C".
#include "log.hh"
#include <seastar/core/future.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/reactor.hh> // for shard_id
#include <unordered_set>
#include <unordered_map>
#include <vector>
namespace sstables {
struct sstable_to_delete {
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
sstring name;
bool shared = false;
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
};
class atomic_deletion_cancelled : public std::exception {
std::string _msg;
public:
explicit atomic_deletion_cancelled(std::vector<sstring> names);
template <typename StringRange>
explicit atomic_deletion_cancelled(StringRange range)
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
}
const char* what() const noexcept override;
};
class atomic_deletion_manager {
logging::logger _deletion_logger{"sstable-deletion"};
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
using sstables_to_delete_atomically_type = std::set<sstring>;
struct pending_deletion {
sstables_to_delete_atomically_type names;
std::unordered_set<lw_shared_ptr<promise<>>> completions;
};
bool _atomic_deletions_cancelled = false;
// map from sstable name to a set of sstables that must be deleted atomically, including itself
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> _atomic_deletion_sets;
std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> _shards_agreeing_to_delete_sstable;
unsigned _shard_count;
std::function<future<> (std::vector<sstring> sstables)> _delete_sstables;
public:
atomic_deletion_manager(unsigned shard_count,
std::function<future<> (std::vector<sstring> sstables)> delete_sstables);
future<> delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard);
void cancel_atomic_deletions();
};
}

View File

@@ -216,6 +216,7 @@ protected:
public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) { }
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
return true;
@@ -583,6 +584,8 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
const sstring SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
int32_t _max_sstable_size_in_mb = DEFAULT_MAX_SSTABLE_SIZE_IN_MB;
std::vector<stdx::optional<dht::decorated_key>> _last_compacted_keys;
std::vector<int> _compaction_counter;
public:
leveled_compaction_strategy(const std::map<sstring, sstring>& options) {
using namespace cql3::statements;
@@ -596,10 +599,14 @@ public:
logger.warn("Max sstable size of {}MB is configured. Testing done for CASSANDRA-5727 indicates that performance improves up to 160MB",
_max_sstable_size_in_mb);
}
_last_compacted_keys.resize(leveled_manifest::MAX_LEVELS);
_compaction_counter.resize(leveled_manifest::MAX_LEVELS);
}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) override;
virtual int64_t estimated_pending_compactions(column_family& cf) const override;
virtual bool parallel_compaction() const override {
@@ -621,7 +628,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
// sstable in it may be marked for deletion after compacted.
// Currently, we create a new manifest whenever it's time for compaction.
leveled_manifest manifest = leveled_manifest::create(cfs, candidates, _max_sstable_size_in_mb);
auto candidate = manifest.get_compaction_candidates();
auto candidate = manifest.get_compaction_candidates(_last_compacted_keys, _compaction_counter);
if (candidate.sstables.empty()) {
return sstables::compaction_descriptor();
@@ -632,6 +639,24 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
return std::move(candidate);
}
void leveled_compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
if (removed.empty() || added.empty()) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();
for (auto& sstable : removed) {
min_level = std::min(min_level, sstable->get_sstable_level());
}
const sstables::sstable *last = nullptr;
for (auto& candidate : added) {
if (!last || last->compare_by_first_key(*candidate) < 0) {
last = &*candidate;
}
}
_last_compacted_keys[min_level] = last->get_last_decorated_key();
}
int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
std::vector<sstables::shared_sstable> sstables;
sstables.reserve(cf.sstables_count());
@@ -686,6 +711,10 @@ compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_fa
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
}
void compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
_compaction_strategy_impl->notify_completion(removed, added);
}
bool compaction_strategy::parallel_compaction() const {
return _compaction_strategy_impl->parallel_compaction();
}

View File

@@ -121,7 +121,11 @@ size_t compress_lz4(const char* input, size_t input_len,
output[1] = (input_len >> 8) & 0xFF;
output[2] = (input_len >> 16) & 0xFF;
output[3] = (input_len >> 24) & 0xFF;
#ifdef HAVE_LZ4_COMPRESS_DEFAULT
auto ret = LZ4_compress_default(input, output + 4, input_len, LZ4_compressBound(input_len));
#else
auto ret = LZ4_compress(input, output + 4, input_len);
#endif
if (ret == 0) {
throw std::runtime_error("LZ4 compression failure: LZ4_compress() failed");
}

View File

@@ -37,7 +37,7 @@ public:
bool should_continue() {
return indexes.size() < max_quantity;
}
void consume_entry(index_entry&& ie) {
void consume_entry(index_entry&& ie, uint64_t offset) {
indexes.push_back(std::move(ie));
}
};
@@ -45,13 +45,14 @@ public:
// IndexConsumer is a concept that implements:
//
// bool should_continue();
// void consume_entry(index_entry&& ie);
// void consume_entry(index_entry&& ie, uintt64_t offset);
template <class IndexConsumer>
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
using proceed = data_consumer::proceed;
using continuous_data_consumer = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
private:
IndexConsumer& _consumer;
uint64_t _entry_offset;
enum class state {
START,
@@ -109,9 +110,12 @@ public:
_state = state::CONSUME_ENTRY;
break;
}
case state::CONSUME_ENTRY:
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)));
case state::CONSUME_ENTRY: {
auto len = (_key.size() + _promoted.size() + 14);
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)), _entry_offset);
_entry_offset += len;
_state = state::START;
}
break;
default:
throw malformed_sstable_exception("unknown state");
@@ -120,10 +124,9 @@ public:
}
index_consume_entry_context(IndexConsumer& consumer,
input_stream<char>&& input, uint64_t maxlen)
input_stream<char>&& input, uint64_t start, uint64_t maxlen)
: continuous_data_consumer(std::move(input), maxlen)
, _consumer(consumer)
, _consumer(consumer), _entry_offset(start)
{}
};
}

View File

@@ -64,16 +64,14 @@ class leveled_manifest {
schema_ptr _schema;
std::vector<std::list<sstables::shared_sstable>> _generations;
#if 0
private final RowPosition[] lastCompactedKeys;
#endif
uint64_t _max_sstable_size_in_bytes;
#if 0
private final SizeTieredCompactionStrategyOptions options;
private final int [] compactionCounter;
#endif
public:
static constexpr int MAX_LEVELS = 9; // log10(1000^3);
leveled_manifest(column_family& cfs, int max_sstable_size_in_MB)
: logger("LeveledManifest")
, _schema(cfs.schema())
@@ -82,15 +80,8 @@ public:
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
// dependent on maxSSTableSize.)
uint64_t n = 9; // log10(1000^3)
_generations.resize(n);
_generations.resize(MAX_LEVELS);
#if 0
lastCompactedKeys = new RowPosition[n];
for (int i = 0; i < generations.length; i++)
{
generations[i] = new ArrayList<>();
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
}
compactionCounter = new int[n];
#endif
}
@@ -129,37 +120,6 @@ public:
_generations[level].push_back(sstable);
}
#if 0
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
{
assert !removed.isEmpty(); // use add() instead of promote when adding new sstables
logDistribution();
if (logger.isDebugEnabled())
logger.debug("Replacing [{}]", toString(removed));
// the level for the added sstables is the max of the removed ones,
// plus one if the removed were all on the same level
int minLevel = Integer.MAX_VALUE;
for (SSTableReader sstable : removed)
{
int thisLevel = remove(sstable);
minLevel = Math.min(minLevel, thisLevel);
}
// it's valid to do a remove w/o an add (e.g. on truncate)
if (added.isEmpty())
return;
if (logger.isDebugEnabled())
logger.debug("Adding [{}]", toString(added));
for (SSTableReader ssTableReader : added)
add(ssTableReader);
lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
}
#endif
void repair_overlapping_sstables(int level) {
const sstables::sstable *previous = nullptr;
const schema& s = *_schema;
@@ -272,7 +232,8 @@ public:
* @return highest-priority sstables to compact, and level to compact them to
* If no compactions are necessary, will return null
*/
sstables::compaction_descriptor get_compaction_candidates() {
sstables::compaction_descriptor get_compaction_candidates(const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys,
std::vector<int>& compaction_counter) {
#if 0
// during bootstrap we only do size tiering in L0 to make sure
// the streamed files can be placed in their original levels
@@ -339,11 +300,12 @@ public:
}
}
// L0 is fine, proceed with this level
auto candidates = get_candidates_for(i);
auto candidates = get_candidates_for(i, last_compacted_keys);
if (!candidates.empty()) {
int next_level = get_next_level(candidates);
candidates = get_overlapping_starved_sstables(next_level, std::move(candidates), compaction_counter);
#if 0
candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
#endif
@@ -359,7 +321,7 @@ public:
if (get_level(0).empty()) {
return sstables::compaction_descriptor();
}
auto candidates = get_candidates_for(0);
auto candidates = get_candidates_for(0, last_compacted_keys);
if (candidates.empty()) {
return sstables::compaction_descriptor();
}
@@ -391,49 +353,57 @@ public:
* @param candidates the original sstables to compact
* @return
*/
#if 0
private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
{
Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
std::vector<sstables::shared_sstable>
get_overlapping_starved_sstables(int target_level, std::vector<sstables::shared_sstable>&& candidates, std::vector<int>& compaction_counter) {
for (int i = _generations.size() - 1; i > 0; i--) {
compaction_counter[i]++;
}
compaction_counter[target_level] = 0;
for (int i = generations.length - 1; i > 0; i--)
compactionCounter[i]++;
compactionCounter[targetLevel] = 0;
if (logger.isDebugEnabled())
{
for (int j = 0; j < compactionCounter.length; j++)
logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
if (logger.level() == logging::log_level::debug) {
for (auto j = 0U; j < compaction_counter.size(); j++) {
logger.debug("CompactionCounter: {}: {}", j, compaction_counter[j]);
}
}
for (int i = generations.length - 1; i > 0; i--)
{
if (getLevelSize(i) > 0)
{
if (compactionCounter[i] > NO_COMPACTION_LIMIT)
{
for (int i = _generations.size() - 1; i > 0; i--) {
if (get_level_size(i) > 0) {
if (compaction_counter[i] > NO_COMPACTION_LIMIT) {
// we try to find an sstable that is fully contained within the boundaries we are compacting;
// say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
// this means that we will not create overlap in L2 if we add an sstable
// contained within 0 -> 33 to the compaction
RowPosition max = null;
RowPosition min = null;
for (SSTableReader candidate : candidates)
{
if (min == null || candidate.first.compareTo(min) < 0)
min = candidate.first;
if (max == null || candidate.last.compareTo(max) > 0)
max = candidate.last;
stdx::optional<dht::decorated_key> max;
stdx::optional<dht::decorated_key> min;
for (auto& candidate : candidates) {
auto& candidate_first = candidate->get_first_decorated_key();
if (!min || candidate_first.tri_compare(*_schema, *min) < 0) {
min = candidate_first;
}
auto& candidate_last = candidate->get_first_decorated_key();
if (!max || candidate_last.tri_compare(*_schema, *max) > 0) {
max = candidate_last;
}
}
#if 0
// NOTE: We don't need to filter out compacting sstables by now because strategy only deals with
// uncompacting sstables and parallel compaction is also disabled for lcs.
Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
Range<RowPosition> boundaries = new Range<>(min, max);
for (SSTableReader sstable : getLevel(i))
{
Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
if (boundaries.contains(r) && !compacting.contains(sstable))
{
logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
withStarvedCandidate.add(sstable);
return withStarvedCandidate;
#endif
auto boundaries = ::range<dht::decorated_key>::make(*min, *max);
for (auto& sstable : get_level(i)) {
auto r = ::range<dht::decorated_key>::make(sstable->get_first_decorated_key(), sstable->get_last_decorated_key());
if (boundaries.contains(r, dht::ring_position_comparator(*_schema))) {
logger.info("Adding high-level (L{}) {} to candidates", sstable->get_sstable_level(), sstable->get_filename());
auto result = std::find_if(std::begin(candidates), std::end(candidates), [&sstable] (auto& candidate) {
return sstable->generation() == candidate->generation();
});
if (result != std::end(candidates)) {
continue;
}
candidates.push_back(sstable);
return candidates;
}
}
}
@@ -443,7 +413,6 @@ public:
return candidates;
}
#endif
size_t get_level_size(uint32_t level) {
#if 0
@@ -557,7 +526,7 @@ public:
* If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
* for prior failure), will return an empty list. Never returns null.
*/
std::vector<sstables::shared_sstable> get_candidates_for(int level) {
std::vector<sstables::shared_sstable> get_candidates_for(int level, const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys) {
const schema& s = *_schema;
assert(!get_level(level).empty());
@@ -657,31 +626,35 @@ public:
}
// for non-L0 compactions, pick up where we left off last time
get_level(level).sort([&s] (auto& i, auto& j) {
std::list<sstables::shared_sstable>& sstables = get_level(level);
sstables.sort([&s] (auto& i, auto& j) {
return i->compare_by_first_key(*j) < 0;
});
int start = 0; // handles case where the prior compaction touched the very last range
#if 0
for (int i = 0; i < getLevel(level).size(); i++)
{
SSTableReader sstable = getLevel(level).get(i);
if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
{
start = i;
int idx = 0;
for (auto& sstable : sstables) {
if (uint32_t(level) >= last_compacted_keys.size()) {
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (last_compacted_keys.size() - 1)));
}
auto& sstable_first = sstable->get_first_decorated_key();
if (!last_compacted_keys[level] || sstable_first.tri_compare(s, *last_compacted_keys[level]) > 0) {
start = idx;
break;
}
idx++;
}
#endif
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
// and wrapping back to the beginning of the generation if necessary
for (auto i = 0U; i < get_level(level).size(); i++) {
for (auto i = 0U; i < sstables.size(); i++) {
// get an iterator to the element of position pos from the list get_level(level).
auto pos = (start + i) % get_level(level).size();
auto it = get_level(level).begin();
auto pos = (start + i) % sstables.size();
auto it = sstables.begin();
std::advance(it, pos);
auto sstable = *it;
auto& sstable = *it;
auto candidates = overlapping(*_schema, sstable, get_level(level + 1));
candidates.push_back(sstable);
#if 0
if (Iterables.any(candidates, suspectP))

View File

@@ -902,7 +902,7 @@ future<index_list> sstable::read_indexes(uint64_t summary_idx, const io_priority
auto stream = make_file_input_stream(this->_index_file, position, end - position, std::move(options));
// TODO: it's redundant to constrain the consumer here to stop at
// index_size()-position, the input stream is already constrained.
auto ctx = make_lw_shared<index_consume_entry_context<index_consumer>>(ic, std::move(stream), this->index_size() - position);
auto ctx = make_lw_shared<index_consume_entry_context<index_consumer>>(ic, std::move(stream), position, this->index_size() - position);
return ctx->consume_input(*ctx).finally([ctx] {
return ctx->close();
}).then([ctx, &ic] {
@@ -1887,8 +1887,8 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
bool should_continue() {
return true;
}
void consume_entry(index_entry&& ie) {
maybe_add_summary_entry(_summary, ie.get_key_bytes(), ie.position());
void consume_entry(index_entry&& ie, uint64_t offset) {
maybe_add_summary_entry(_summary, ie.get_key_bytes(), offset);
if (!first_key) {
first_key = key(to_bytes(ie.get_key_bytes()));
} else {
@@ -1911,7 +1911,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
options.io_priority_class = pc;
auto stream = make_file_input_stream(index_file, 0, size, std::move(options));
return do_with(summary_generator(_summary), [this, &pc, stream = std::move(stream), size] (summary_generator& s) mutable {
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), size);
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), 0, size);
return ctx->consume_input(*ctx).finally([ctx] {
return ctx->close();
}).then([this, ctx, &s] {
@@ -2259,8 +2259,11 @@ remove_by_toc_name(sstring sstable_toc_name) {
dir = dirname(sstable_toc_name);
sstable_write_io_check(rename_file, sstable_toc_name, new_toc_name).get();
sstable_write_io_check(fsync_directory, dir).get();
} else {
} else if (sstable_write_io_check(file_exists, new_toc_name).get0()) {
dir = dirname(new_toc_name);
} else {
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
return;
}
auto toc_file = open_checked_file_dma(sstable_read_error, new_toc_name, open_flags::ro).get0();
@@ -2392,107 +2395,21 @@ operator<<(std::ostream& os, const sstable_to_delete& std) {
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
}
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
using sstables_to_delete_atomically_type = std::set<sstring>;
struct pending_deletion {
sstables_to_delete_atomically_type names;
std::vector<lw_shared_ptr<promise<>>> completions;
};
static thread_local bool g_atomic_deletions_cancelled = false;
static thread_local std::list<lw_shared_ptr<pending_deletion>> g_atomic_deletion_sets;
static thread_local std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> g_shards_agreeing_to_delete_sstable;
static logging::logger deletion_logger("sstable-deletion");
static
future<>
do_delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
// runs on shard 0 only
deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
if (g_atomic_deletions_cancelled) {
deletion_logger.debug("atomic deletions disabled, erroring out");
using boost::adaptors::transformed;
throw atomic_deletion_cancelled(atomic_deletion_set
| transformed(std::mem_fn(&sstable_to_delete::name)));
}
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
std::list<lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
auto merged_set = make_lw_shared(pending_deletion());
for (auto&& sst_to_delete : atomic_deletion_set) {
merged_set->names.insert(sst_to_delete.name);
if (!sst_to_delete.shared) {
for (auto shard : boost::irange<shard_id>(0, smp::count)) {
g_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
}
}
}
merged_set->completions.push_back(make_lw_shared<promise<>>());
auto ret = merged_set->completions.back()->get_future();
for (auto&& old_set : g_atomic_deletion_sets) {
auto intersection = sstables_to_delete_atomically_type();
boost::set_intersection(merged_set->names, old_set->names, std::inserter(intersection, intersection.end()));
if (intersection.empty()) {
// We copy old_set to avoid corrupting g_atomic_deletion_sets if we fail
// further on.
new_atomic_deletion_sets.push_back(old_set);
} else {
deletion_logger.debug("merging with {}", old_set->names);
merged_set->names.insert(old_set->names.begin(), old_set->names.end());
boost::push_back(merged_set->completions, old_set->completions);
}
}
deletion_logger.debug("new atomic set: {}", merged_set->names);
new_atomic_deletion_sets.push_back(merged_set);
// can now exception-safely commit:
g_atomic_deletion_sets = std::move(new_atomic_deletion_sets);
// Mark each sstable as being deleted from deleting_shard. We have to do
// this in a separate pass, so the consideration whether we can delete or not
// sees all the data from this pass.
for (auto&& sst : atomic_deletion_set) {
g_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
}
// Figure out if the (possibly merged) set can be deleted
for (auto&& sst : merged_set->names) {
if (g_shards_agreeing_to_delete_sstable[sst].size() != smp::count) {
// Not everyone agrees, leave the set pending
deletion_logger.debug("deferring deletion until all shards agree");
return ret;
}
}
// Cannot recover from a failed deletion
g_atomic_deletion_sets.pop_back();
for (auto&& name : merged_set->names) {
g_shards_agreeing_to_delete_sstable.erase(name);
}
// Everyone agrees, let's delete
delete_sstables(std::vector<sstring> tocs) {
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
parallel_for_each(merged_set->names, [] (sstring name) {
deletion_logger.debug("deleting {}", name);
return parallel_for_each(tocs, [] (sstring name) {
return remove_by_toc_name(name);
}).then_wrapped([merged_set] (future<> result) {
deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
shared_future<> sf(std::move(result));
for (auto&& comp : merged_set->completions) {
sf.get_future().forward_to(std::move(*comp));
}
});
return ret;
}
static thread_local atomic_deletion_manager g_atomic_deletion_manager(smp::count, delete_sstables);
future<>
delete_atomically(std::vector<sstable_to_delete> ssts) {
auto shard = engine().cpu_id();
return smp::submit_to(0, [=] {
return do_delete_atomically(ssts, shard);
return g_atomic_deletion_manager.delete_atomically(ssts, shard);
});
}
@@ -2505,16 +2422,8 @@ delete_atomically(std::vector<shared_sstable> ssts) {
return delete_atomically(std::move(sstables_to_delete_atomically));
}
void
cancel_atomic_deletions() {
g_atomic_deletions_cancelled = true;
for (auto&& pd : g_atomic_deletion_sets) {
for (auto&& c : pd->completions) {
c->set_exception(atomic_deletion_cancelled(pd->names));
}
}
g_atomic_deletion_sets.clear();
g_shards_agreeing_to_delete_sstable.clear();
void cancel_atomic_deletions() {
g_atomic_deletion_manager.cancel_atomic_deletions();
}
atomic_deletion_cancelled::atomic_deletion_cancelled(std::vector<sstring> names)

View File

@@ -49,6 +49,7 @@
#include "query-request.hh"
#include "key_reader.hh"
#include "compound_compat.hh"
#include "atomic_deletion.hh"
namespace sstables {
@@ -697,14 +698,6 @@ future<> await_background_jobs();
// Invokes await_background_jobs() on all shards
future<> await_background_jobs_on_all_shards();
struct sstable_to_delete {
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
sstring name;
bool shared = false;
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
};
// When we compact sstables, we have to atomically instantiate the new
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
// and if A contained some data that was tombstoned by B, and if B was
@@ -723,17 +716,6 @@ struct sstable_to_delete {
future<> delete_atomically(std::vector<shared_sstable> ssts);
future<> delete_atomically(std::vector<sstable_to_delete> ssts);
class atomic_deletion_cancelled : public std::exception {
std::string _msg;
public:
explicit atomic_deletion_cancelled(std::vector<sstring> names);
template <typename StringRange>
explicit atomic_deletion_cancelled(StringRange range)
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
}
const char* what() const noexcept override;
};
// Cancel any deletions scheduled by delete_atomically() and make their
// futures complete (with an atomic_deletion_cancelled exception).
void cancel_atomic_deletions();

36
test.py
View File

@@ -39,6 +39,7 @@ boost_tests = [
'storage_proxy_test',
'schema_change_test',
'sstable_mutation_test',
'sstable_atomic_deletion_test',
'commitlog_test',
'hash_test',
'test-serialization',
@@ -99,6 +100,10 @@ class Alarm(Exception):
def alarm_handler(signum, frame):
raise Alarm
def boost_test_wants_double_dash(path):
magic = b'All the arguments after the -- are ignored'
return magic in subprocess.check_output([path, '--help'], stderr=subprocess.STDOUT)
if __name__ == "__main__":
all_modes = ['debug', 'release']
@@ -130,14 +135,19 @@ if __name__ == "__main__":
test_to_run.append((os.path.join(prefix, test), 'boost'))
if 'release' in modes_to_run:
test_to_run.append(('build/release/tests/lsa_async_eviction_test -c1 -m200M --size 1024 --batch 3000 --count 2000000','other'))
test_to_run.append(('build/release/tests/lsa_sync_eviction_test -c1 -m100M --count 10 --standard-object-size 3000000','other'))
test_to_run.append(('build/release/tests/lsa_sync_eviction_test -c1 -m100M --count 24000 --standard-object-size 2048','other'))
test_to_run.append(('build/release/tests/lsa_sync_eviction_test -c1 -m1G --count 4000000 --standard-object-size 128','other'))
test_to_run.append(('build/release/tests/row_cache_alloc_stress -c1 -m1G','other'))
test_to_run.append(('build/release/tests/sstable_test -c1','boost'))
test_to_run.append(('build/release/tests/lsa_async_eviction_test', 'other',
'-c1 -m200M --size 1024 --batch 3000 --count 2000000'.split()))
test_to_run.append(('build/release/tests/lsa_sync_eviction_test', 'other',
'-c1 -m100M --count 10 --standard-object-size 3000000'.split()))
test_to_run.append(('build/release/tests/lsa_sync_eviction_test', 'other',
'-c1 -m100M --count 24000 --standard-object-size 2048'.split()))
test_to_run.append(('build/release/tests/lsa_sync_eviction_test', 'other',
'-c1 -m1G --count 4000000 --standard-object-size 128'.split()))
test_to_run.append(('build/release/tests/row_cache_alloc_stress', 'other',
'-c1 -m1G'.split()))
test_to_run.append(('build/release/tests/sstable_test', 'boost', ['-c1']))
if 'debug' in modes_to_run:
test_to_run.append(('build/debug/tests/sstable_test -c1','boost'))
test_to_run.append(('build/debug/tests/sstable_test', 'boost', ['-c1']))
if args.name:
test_to_run = [t for t in test_to_run if args.name in t[0]]
@@ -150,8 +160,10 @@ if __name__ == "__main__":
env['ASAN_OPTIONS'] = 'alloc_dealloc_mismatch=0'
for n, test in enumerate(test_to_run):
path = test[0]
exec_args = test[2] if len(test) >= 3 else []
boost_args = []
prefix = '[%d/%d]' % (n + 1, n_total)
path += ' --collectd 0'
exec_args += '--collectd 0'.split()
signal.signal(signal.SIGALRM, alarm_handler)
if args.jenkins and test[1] == 'boost':
mode = 'release'
@@ -159,9 +171,11 @@ if __name__ == "__main__":
mode = 'debug'
xmlout = (args.jenkins + "." + mode + "." +
os.path.basename(test[0].split()[0]) + ".boost.xml")
path = path + " --output_format=XML --log_level=test_suite --report_level=no --log_sink=" + xmlout
print_status('%s RUNNING %s' % (prefix, path))
proc = subprocess.Popen(shlex.split(path), stdout=subprocess.PIPE,
boost_args += ['--output_format=XML', '--log_level=test_suite', '--report_level=no', '--log_sink=' + xmlout]
print_status('%s RUNNING %s %s' % (prefix, path, ' '.join(boost_args + exec_args)))
if test[1] == 'boost' and boost_test_wants_double_dash(path):
boost_args += ['--']
proc = subprocess.Popen([path] + boost_args + exec_args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env, preexec_fn=os.setsid)
out = None

View File

@@ -277,6 +277,21 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
}
SEASTAR_TEST_CASE(test_commitlog_reader){
static auto count_mutations_in_segment = [] (sstring path) -> future<size_t> {
auto count = make_lw_shared<size_t>(0);
return db::commitlog::read_log_file(path, [count](temporary_buffer<char> buf, db::replay_position rp) {
sstring str(buf.get(), buf.size());
BOOST_CHECK_EQUAL(str, "hej bubba cow");
(*count)++;
return make_ready_future<>();
}).then([](auto s) {
return do_with(std::move(s), [](auto& s) {
return s->done();
});
}).then([count] {
return *count;
});
};
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 1;
return cl_test(cfg, [](commitlog& log) {
@@ -309,18 +324,19 @@ SEASTAR_TEST_CASE(test_commitlog_reader){
if (i == segments.end()) {
throw std::runtime_error("Did not find expected log file");
}
return db::commitlog::read_log_file(*i, [count2](temporary_buffer<char> buf, db::replay_position rp) {
sstring str(buf.get(), buf.size());
BOOST_CHECK_EQUAL(str, "hej bubba cow");
(*count2)++;
return make_ready_future<>();
}).then([](auto s) {
return do_with(std::move(s), [](auto& s) {
return s->done();
});
return *i;
}).then([&log, count] (sstring segment_path) {
// Check reading from an unsynced segment
return count_mutations_in_segment(segment_path).then([count] (size_t replay_count) {
BOOST_CHECK_GE(*count, replay_count);
}).then([&log, count, segment_path] {
return log.sync_all_segments().then([count, segment_path] {
// Check reading from a synced segment
return count_mutations_in_segment(segment_path).then([count] (size_t replay_count) {
BOOST_CHECK_EQUAL(*count, replay_count);
});
}).then([count, count2] {
BOOST_CHECK_EQUAL(*count, *count2);
});
});
});
});
}

View File

@@ -0,0 +1,170 @@
/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstables/atomic_deletion.hh"
#include <seastar/tests/test-utils.hh>
#include <deque>
#include <boost/range/numeric.hpp>
#include <boost/range/adaptor/transformed.hpp>
using namespace sstables;
class atomic_deletion_test_env {
public:
using event = std::function<future<> (atomic_deletion_test_env& adm)>;
private:
struct a_hash {
size_t operator()(const std::unordered_set<sstring>& s) const {
auto h = std::hash<sstring>();
return boost::accumulate(s | boost::adaptors::transformed(h), size_t(0)); // sue me
}
};
atomic_deletion_manager _adm;
std::deque<event> _events;
std::unordered_set<std::unordered_set<sstring>, a_hash> _deletes;
semaphore _deletion_counter { 0 };
private:
future<> delete_sstables(std::vector<sstring> names) {
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
_deletes.insert(s1);
_deletion_counter.signal();
return make_ready_future<>();
}
public:
explicit atomic_deletion_test_env(unsigned shard_count, std::vector<event> events)
: _adm(shard_count, [this] (std::vector<sstring> names) {
return delete_sstables(names);
})
, _events(events.begin(), events.end()) {
}
void expect_no_deletion() {
BOOST_REQUIRE(_deletes.empty());
}
future<> schedule_delete(std::vector<sstable_to_delete> names, unsigned shard) {
_adm.delete_atomically(names, shard).discard_result();
return make_ready_future<>();
}
future<> expect_deletion(std::vector<sstring> names) {
return _deletion_counter.wait().then([this, names] {
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
auto erased = _deletes.erase(s1);
BOOST_REQUIRE_EQUAL(erased, 1);
});
}
future<> test() {
// run all _events sequentially
return repeat([this] {
if (_events.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto ev = std::move(_events.front());
_events.pop_front();
return ev(*this).then([] {
return stop_iteration::no;
});
});
}
};
future<> test_atomic_deletion_manager(unsigned shards, std::vector<atomic_deletion_test_env::event> events) {
auto env = make_lw_shared<atomic_deletion_test_env>(shards, events);
return env->test().finally([env] {});
}
atomic_deletion_test_env::event
delete_many(std::vector<sstable_to_delete> v, unsigned shard) {
return [v, shard] (atomic_deletion_test_env& env) {
// verify we didn't have an early delete from previous deletion
env.expect_no_deletion();
return env.schedule_delete(v, shard);
};
}
atomic_deletion_test_env::event
delete_one(sstable_to_delete s, unsigned shard) {
return delete_many({s}, shard);
}
atomic_deletion_test_env::event
expect_many(std::vector<sstring> names) {
return [names] (atomic_deletion_test_env& env) {
return env.expect_deletion(names);
};
}
atomic_deletion_test_env::event
expect_one(sstring name) {
return expect_many({name});
}
SEASTAR_TEST_CASE(test_single_shard_single_sstable) {
return test_atomic_deletion_manager(1, {
delete_one({"1", false}, 0),
expect_one("1"),
delete_one({"2", true}, 0),
expect_one("2"),
});
}
SEASTAR_TEST_CASE(test_multi_shard_single_sstable) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_one({"1", true}, 1),
delete_one({"1", true}, 2),
expect_one("1"),
delete_one({"2", false}, 1),
expect_one("2"),
});
}
SEASTAR_TEST_CASE(test_nonshared_compaction) {
return test_atomic_deletion_manager(5, {
delete_many({{"1", false}, {"2", false}, {"3", false}}, 2),
expect_many({"1", "2", "3"}),
});
}
SEASTAR_TEST_CASE(test_shared_compaction) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_many({{"1", true}, {"2", false}, {"3", false}}, 2),
delete_one({"1", true}, 1),
expect_many({"1", "2", "3"}),
});
}
SEASTAR_TEST_CASE(test_overlapping_compaction) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_one({"3", true}, 0),
delete_many({{"1", true}, {"2", false}, {"3", true}}, 2),
delete_one({"1", true}, 1),
delete_many({{"3", true}, {"4", false}}, 1),
expect_many({"1", "2", "3", "4"}),
});
}
#include "disk-error-handler.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;

View File

@@ -1252,7 +1252,9 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
}
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, 1);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == sstables->size());
BOOST_REQUIRE(candidate.level == 1);
BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024);
@@ -1731,7 +1733,9 @@ SEASTAR_TEST_CASE(leveled_01) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 0);
@@ -1786,7 +1790,9 @@ SEASTAR_TEST_CASE(leveled_02) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 3);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 0);
@@ -1844,7 +1850,9 @@ SEASTAR_TEST_CASE(leveled_03) {
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
BOOST_REQUIRE(manifest.get_level_size(1) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 1);
@@ -1914,7 +1922,9 @@ SEASTAR_TEST_CASE(leveled_04) {
auto level2_score = (double) manifest.get_total_bytes(manifest.get_level(2)) / (double) manifest.max_bytes_for_level(2);
BOOST_REQUIRE(level2_score < 1.001);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 2);
@@ -1976,7 +1986,9 @@ SEASTAR_TEST_CASE(leveled_06) {
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
BOOST_REQUIRE(manifest.get_level_size(2) == 0);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.level == 2);
BOOST_REQUIRE(candidate.sstables.size() == 1);
auto& sst = (candidate.sstables)[0];

View File

@@ -1474,12 +1474,13 @@ private:
class column_visitor : public Aggregator {
const schema& _s;
const query::partition_slice& _slice;
uint32_t _cell_limit;
const uint32_t _cell_limit;
uint32_t _current_cell_limit;
std::vector<std::pair<std::string, typename Aggregator::type>> _aggregation;
typename Aggregator::type* _current_aggregation;
public:
column_visitor(const schema& s, const query::partition_slice& slice, uint32_t cell_limit)
: _s(s), _slice(slice), _cell_limit(cell_limit)
: _s(s), _slice(slice), _cell_limit(cell_limit), _current_cell_limit(0)
{ }
std::vector<std::pair<std::string, typename Aggregator::type>>&& release() {
return std::move(_aggregation);
@@ -1492,6 +1493,7 @@ private:
void accept_new_partition(const partition_key& key, uint32_t row_count) {
_aggregation.emplace_back(partition_key_to_string(_s, key), typename Aggregator::type());
_current_aggregation = &_aggregation.back().second;
_current_cell_limit = _cell_limit;
}
void accept_new_partition(uint32_t row_count) {
// We always ask for the partition_key to be sent in query_opts().
@@ -1500,19 +1502,19 @@ private:
void accept_new_row(const clustering_key_prefix& key, const query::result_row_view& static_row, const query::result_row_view& row) {
auto it = row.iterator();
auto cell = it.next_atomic_cell();
if (cell && _cell_limit > 0) {
if (cell && _current_cell_limit > 0) {
bytes column_name = composite::serialize_value(key.components(), _s.thrift().has_compound_comparator());
Aggregator::on_column(_current_aggregation, column_name, *cell);
_cell_limit -= 1;
_current_cell_limit -= 1;
}
}
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) {
auto it = row.iterator();
for (auto&& id : _slice.regular_columns) {
auto cell = it.next_atomic_cell();
if (cell && _cell_limit > 0) {
if (cell && _current_cell_limit > 0) {
Aggregator::on_column(_current_aggregation, _s.regular_column_at(id).name(), *cell);
_cell_limit -= 1;
_current_cell_limit -= 1;
}
}
}

View File

@@ -600,4 +600,62 @@ inline void stop_foreground(const trace_state_ptr& state) {
state->stop_foreground_and_write();
}
}
// global_trace_state_ptr is a helper class that may be used for creating spans
// of an existing tracing session on other shards. When a tracing span on a
// different shard is needed global_trace_state_ptr would create a secondary
// tracing session on that shard similarly to what we do when we create tracing
// spans on remote Nodes.
//
// The usage is straight forward:
// 1. Create a global_trace_state_ptr from the existing trace_state_ptr object.
// 2. Pass it to the execution unit that (possibly) runs on a different shard
// and pass the global_trace_state_ptr object instead of a trace_state_ptr
// object.
class global_trace_state_ptr {
unsigned _cpu_of_origin;
trace_state_ptr _ptr;
public:
// Note: the trace_state_ptr must come from the current shard
global_trace_state_ptr(trace_state_ptr t)
: _cpu_of_origin(engine().cpu_id())
, _ptr(std::move(t))
{ }
// May be invoked across shards.
global_trace_state_ptr(const global_trace_state_ptr& other)
: global_trace_state_ptr(other.get())
{ }
// May be invoked across shards.
global_trace_state_ptr(global_trace_state_ptr&& other)
: global_trace_state_ptr(other.get())
{ }
global_trace_state_ptr& operator=(const global_trace_state_ptr&) = delete;
// May be invoked across shards.
trace_state_ptr get() const {
// optimize the "tracing not enabled" case
if (!_ptr) {
return nullptr;
}
if (_cpu_of_origin != engine().cpu_id()) {
auto opt_trace_info = make_trace_info(_ptr);
if (opt_trace_info) {
trace_state_ptr new_trace_state = tracing::get_local_tracing_instance().create_session(*opt_trace_info);
begin(new_trace_state);
return new_trace_state;
} else {
return nullptr;
}
}
return _ptr;
}
// May be invoked across shards.
operator trace_state_ptr() const { return get(); }
};
}

View File

@@ -652,13 +652,13 @@ future<> cql_server::connection::process_request() {
f.length, mem_estimate, _server._max_request_size));
}
return with_semaphore(_server._memory_available, mem_estimate, [this, length = f.length, flags = f.flags, op, stream, tracing_requested] {
return read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested] (temporary_buffer<char> buf) {
return get_units(_server._memory_available, mem_estimate).then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
return this->read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested, mem_permit = std::move(mem_permit)] (temporary_buffer<char> buf) mutable {
++_server._requests_served;
++_server._requests_serving;
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested] () mutable {
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested, mem_permit = std::move(mem_permit)] () mutable {
auto bv = bytes_view{reinterpret_cast<const int8_t*>(buf.begin()), buf.size()};
auto cpu = pick_request_cpu();
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested] () mutable {
@@ -672,7 +672,7 @@ future<> cql_server::connection::process_request() {
}).then([this, flags] (auto&& response) {
_client_state.merge(response.second);
return this->write_response(std::move(response.first), _compression);
}).then([buf = std::move(buf)] {
}).then([buf = std::move(buf), mem_permit = std::move(mem_permit)] {
// Keep buf alive.
});
}).handle_exception([] (std::exception_ptr ex) {
@@ -1504,7 +1504,11 @@ std::vector<char> cql_server::response::compress_lz4(const std::vector<char>& bo
output[1] = (input_len >> 16) & 0xFF;
output[2] = (input_len >> 8) & 0xFF;
output[3] = input_len & 0xFF;
#ifdef HAVE_LZ4_COMPRESS_DEFAULT
auto ret = LZ4_compress_default(input, output + 4, input_len, LZ4_compressBound(input_len));
#else
auto ret = LZ4_compress(input, output + 4, input_len);
#endif
if (ret == 0) {
throw std::runtime_error("CQL frame LZ4 compression failure");
}

View File

@@ -39,8 +39,8 @@ class moving_average {
public:
moving_average(latency_counter::duration interval, latency_counter::duration tick_interval) :
_tick_interval(tick_interval) {
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::nanoseconds>(interval).count()/
static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(tick_interval).count()));
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::seconds>(tick_interval).count()/
static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(interval).count()));
}
void add(uint64_t val = 1) {
@@ -48,7 +48,7 @@ public:
}
void update() {
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(_tick_interval).count());
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(_tick_interval).count());
if (_initialized) {
_rate += (_alpha * (instant_rate - _rate));
} else {
@@ -70,7 +70,8 @@ public:
}
};
class ihistogram {
template <typename Unit>
class basic_ihistogram {
public:
// count holds all the events
int64_t count;
@@ -84,12 +85,13 @@ public:
double variance;
int64_t sample_mask;
boost::circular_buffer<int64_t> sample;
ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
basic_ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
: count(0), total(0), min(0), max(0), sum(0), started(0), mean(0), variance(0),
sample_mask(_sample_mask), sample(
size) {
}
void mark(int64_t value) {
void mark(int64_t ns_value) {
auto value = std::chrono::duration_cast<Unit>(std::chrono::nanoseconds(ns_value)).count();
if (total == 0 || value < min) {
min = value;
}
@@ -131,7 +133,7 @@ public:
/**
* Set the latency according to the sample rate.
*/
ihistogram& set_latency(latency_counter& lc) {
basic_ihistogram& set_latency(latency_counter& lc) {
if (should_sample()) {
lc.start();
}
@@ -144,7 +146,7 @@ public:
* Increment the total number of events without
* sampling the value.
*/
ihistogram& inc() {
basic_ihistogram& inc() {
count++;
return *this;
}
@@ -157,7 +159,7 @@ public:
return a * a;
}
ihistogram& operator +=(const ihistogram& o) {
basic_ihistogram& operator +=(const basic_ihistogram& o) {
if (count == 0) {
*this = o;
} else if (o.count > 0) {
@@ -190,14 +192,18 @@ public:
return mean * count;
}
friend ihistogram operator +(ihistogram a, const ihistogram& b);
template <typename U>
friend basic_ihistogram<U> operator +(basic_ihistogram<U> a, const basic_ihistogram<U>& b);
};
inline ihistogram operator +(ihistogram a, const ihistogram& b) {
template <typename Unit>
inline basic_ihistogram<Unit> operator +(basic_ihistogram<Unit> a, const basic_ihistogram<Unit>& b) {
a += b;
return a;
}
using ihistogram = basic_ihistogram<std::chrono::microseconds>;
struct rate_moving_average {
uint64_t count = 0;
double rates[3] = {0};
@@ -222,7 +228,7 @@ class timed_rate_moving_average {
static constexpr latency_counter::duration tick_interval() {
return std::chrono::seconds(10);
}
moving_average rates[3] = {{tick_interval(), std::chrono::minutes(1)}, {tick_interval(), std::chrono::minutes(5)}, {tick_interval(), std::chrono::minutes(15)}};
moving_average rates[3] = {{std::chrono::minutes(1), tick_interval()}, {std::chrono::minutes(5), tick_interval()}, {std::chrono::minutes(15), tick_interval()}};
latency_counter::time_point start_time;
timer<> _timer;
@@ -246,7 +252,7 @@ public:
rate_moving_average rate() const {
rate_moving_average res;
if (_count > 0) {
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(latency_counter::now() - start_time).count();
double elapsed = std::chrono::duration_cast<std::chrono::seconds>(latency_counter::now() - start_time).count();
res.mean_rate = (_count / elapsed);
}
res.count = _count;