Compare commits

...

66 Commits

Author SHA1 Message Date
Pekka Enberg
ba5d52c94e release: prepare for 1.2.4 2016-08-25 17:33:56 +03:00
Paweł Dziepak
ffed8a5603 mutation_partition: fix iterator invalidation in trim_rows
Reversed iterators are adaptors for 'normal' iterators. These underlying
iterators point to different objects that the reversed iterators
themselves.

The consequence of this is that removing an element pointed to by a
reversed iterator may invalidate reversed iterator which point to a
completely different object.

This is what happens in trim_rows for reversed queries. Erasing a row
can invalidate end iterator and the loop would fail to stop.

The solution is to introduce
reversal_traits::erase_dispose_and_update_end() funcion which erases and
disposes object pointed to by a given iterator but takes also a
reference to and end iterator and updates it if necessary to make sure
that it stays valid.

Fixes #1609.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1472080609-11642-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 6012a7e733)
2016-08-25 17:31:46 +03:00
Piotr Jastrzebski
ec51c8e1b8 Fix after free access bug in storage proxy
Due to speculative reads we can't guarantee that all
fibers started by storage_proxy::query will be finished
by the time the method returns a result.

We need to make sure that no parameter passed to this
method ever changes.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
Message-Id: <31952e323e599905814b7f378aafdf779f7072b8.1471005642.git.piotr@scylladb.com>
(cherry picked from commit f212a6cfcb)

[tgrabiec: resolved trivial conflict]
2016-08-12 16:38:21 +02:00
Avi Kivity
50056a6df6 Update seastar submodule
* seastar 27e13e7...d6ccc19 (1):
  > Merge "Fix the SMP queue poller" from Tomasz

Fixes #1553.
2016-08-10 10:17:06 +03:00
Duarte Nunes
184b62d790 schema_builder: Ensure dense tables have compact col
This patch ensures that when the schema is dense, regardless of
compact_storage being set, the single regular columns is translated
into a compact column.

This fixes an issue where Thrift dynamic column families are
translated to a dense schema with a regular column, instead of a
compact one.

Since a compact column is also a regular column (e.g., for purposes of
querying), no further changes are required.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1470062410-1414-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 5995aebf39)

Fixes #1535.
2016-08-03 13:50:54 +02:00
Duarte Nunes
f5a1f402f5 schema: Dense schemas are correctly upgrades
When upgrading a dense schema, we would drop the cells of the regular
(compact) column. This patch fixes this by making the regular and
compact column kinds compatible.

Fixes #1536

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1470172097-7719-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 1516cd4c08)
2016-08-03 13:39:49 +02:00
Avi Kivity
9f09812733 checked_file: preserve DMA alignment
Inherit the alignment parameters from the underlying file instead of
defaulting to 4096.  This gives better read performance on disks with 512-byte
sectors.

Fixes #1532.
Message-Id: <1470122188-25548-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 9f35e4d328)
2016-08-02 12:23:21 +03:00
Duarte Nunes
e9b7352adb storage_service: Fix get_range_to_address_map_in_local_dc
This patch fixes a couple of bugs in
get_range_to_address_map_in_local_dc.

Fixes #1517

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1469782666-21320-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 7d1b7e8da3)
2016-07-29 11:24:50 +02:00
Pekka Enberg
2461a85c0f Update seastar submodule
* seastar 3558f41...27e13e7 (2):
  > iotune: Fix SIGFPE with some executions
  > iotune: provide a status dump if we can't calculate a proper number
  > of io_queues
2016-07-29 11:13:58 +03:00
Gleb Natapov
9503145e38 api: fix use after free in sum_sstable
get_sstables_including_compacted_undeleted() may return temporary shared
ptr which will be destroyed before the loop if not stored locally.

Fixes #1514

Message-Id: <20160728100504.GD2502@scylladb.com>
(cherry picked from commit 3531dd8d71)
2016-07-28 14:34:08 +03:00
Tomasz Grabiec
9d99dd46cb tests: lsa_async_eviction_test: Use chunked_fifo<>
To protect against large reallocations during push() which are done
under reclaim lock and may fail.
2016-07-27 18:40:35 +02:00
Pekka Enberg
c9dfbf7913 release: prepare for 1.2.3 2016-07-27 13:32:11 +03:00
Avi Kivity
4f02a5f4b3 bloom_filter: fix overflow for large filters
We use ::abs(), which has an int parameter, on long arguments, resulting
in incorrect results.

Switch to std::abs() instead, which has the correct overloads.

Fixes #1494.

Message-Id: <1469347802-28933-1-git-send-email-avi@scylladb.com>
(cherry picked from commit 900639915d)
2016-07-24 11:32:54 +03:00
Tomasz Grabiec
7457ed982d schema_tables: Fix hang during keyspace drop
Fixes #1484.

We drop tables as part of keyspace drop. Table drop starts with
creating a snapshot on all shards. All shards must use the same
snapshot timestamp which, among other things, is part of the snapshot
name. The timestamp is generated using supplied timestamp generating
function (joinpoint object). The joinpoint object will wait for all
shards to arrive and then generate and return the timestamp.

However, we drop tables in parallel, using the same joinpoint
instance. So joinpoint may be contacted by snapshotting shards of
tables A and B concurrently, generating timestamp t1 for some shards
of table A and some shards of table B. Later the remaining shards of
table A will get a different timestamp. As a result, different shards
may use different snapshot names for the same table. The snapshot
creation will never complete because the sealing fiber waits for all
shards to signal it, on the same name.

The fix is to give each table a separate joinpoint instance.

Message-Id: <1469117228-17879-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 5e8f0efc85)
2016-07-22 15:53:46 +02:00
Avi Kivity
16a5be622c Update seastar submodule
* seastar 86d9b13...3558f41 (5):
  > Fix chunked_fifo move assignment
  > semaphore: switch to chunked_fifo
  > fair_queue: add missing include
  > chunked_fifo: implement back()
  > Chunked FIFO queue
2016-07-19 14:49:58 +03:00
Takuya ASADA
caab57bb24 dist/redhat/centos_dep: disable go and ada language on scylla-gcc package, since ScyllaDB never use them
centos-master jenkins job failed at building libgo, but we don't need go language, so let's disable it on scylla-gcc package.
Also we never use ada, disable it too.

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1468166660-23323-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit d2caa486ba)
2016-07-19 11:03:42 +03:00
Tomasz Grabiec
3efa1211ec types: Fix update_types()
We should replace the old type, not insert the new type before the old type.

Fixes #1465

Message-Id: <1468861076-20397-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit ce768858f5)
2016-07-18 20:14:55 +03:00
Avi Kivity
3898216831 Update seastar submodule
* seastar 34c0f6c...86d9b13 (1):
  > rpc: do not call shutdown function on already closed fd

Fixes #1463.
2016-07-18 15:26:00 +03:00
Avi Kivity
0af39f2d0c Update seastar submodule
* seastar f3826f0...34c0f6c (2):
  > rpc: fix race between send loop and expiration timer
  > reactor: create new files with a more reasonable default mode
2016-07-17 13:36:18 +03:00
Avi Kivity
e296fef581 Fix bad backport (259b2592d4) 2016-07-15 14:18:50 +03:00
Avi Kivity
5ee6a00b0f db: don't over-allocate memory for mutation_reader
column_family::make_reader() doesn't deal with sstables directly, so it
doesn't need to reserve memory for them.

Fixes #1453.
Message-Id: <1468429143-4354-1-git-send-email-avi@scylladb.com>

(cherry picked from commit d3c87975b0)
2016-07-15 14:11:01 +03:00
Avi Kivity
64df5f3f38 db: estimate queued read size more conservatively
There are plenty of continuations involved, so don't assume it fits in 1k.
Message-Id: <1468429516-4591-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 23edc1861a)
2016-07-15 14:09:47 +03:00
Avi Kivity
259b2592d4 db: do not create column family directories belonging to foreign keyspaces
Currently, for any column family, we create a directory for it in all
keyspace directories.  This is incredibly awkward.

Fix by iterating over just the keyspace's column families, not all
column families in existence.

Fixes #1457.
Message-Id: <1468495182-18424-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 1048e1071b)
2016-07-15 14:08:46 +03:00
Avi Kivity
51eba96c14 transport: encode user-defined type metadata
Right now we fall back to tuples, which confuses the client.

Fixes #1443.

Reviewed-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1468167120-1945-1-git-send-email-avi@scylladb.com>
(cherry picked from commit f126efd7f2)
2016-07-12 11:12:37 +03:00
Avi Kivity
66e8204c79 Update seastar submodule
* seastar 31d988c...f3826f0 (3):
  > Fix boost version check
  > reactor: more fix for smp poll with older boost
  > reactor: fix build on older boost due to spsc_queue::read_available()
2016-07-05 00:43:11 +03:00
Avi Kivity
7f1c63afa3 auth: fix performance problem when looking up permissions
data_resource lookup uses data_resource::name(), which uses sprint(), which
uses (indirectly) locale, which takes a global lock.  This is a bottleneck
on large machines.

Fix by not using name() during lookup.

Fixes #1419
Message-Id: <1467616296-17645-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 76cc0c0cf9)
2016-07-04 17:56:29 +03:00
Avi Kivity
8547f34d60 mutation_reader: make restricting_mutation_reader even more restricting
While limiting the number of concurrently executing sstable readers reduces
our memory load, the queued readers, although consuming a small amount of
memory, can still grow without bounds.

To limit the damage, add two limits on the queue:
 - a timeout, which is equal to the read timeout
 - a queue length limit, which is equal to 2% of the shard memory divided
   by an estimate of the queued request size (1kb)

Together, these limits bound the amount of memory needed by queued disk
requests in case the disk can't keep up.
Message-Id: <1467206055-30769-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 9ac730dcc9)
2016-06-29 17:29:00 +03:00
Avi Kivity
a3078c9b9d Fix backport of restricting_mutation_reader 2016-06-27 19:57:31 +03:00
Avi Kivity
00692d891e db: add statistics about queued reads
Fixes #1398.

(cherry picked from commit f03cd6e913)
2016-06-27 19:43:16 +03:00
Avi Kivity
94aa879d19 db: restrict replica read concurrency
Since reading mutations can consume a large amount of memory, which, moreover,
is not predicatable at the time the read is initiated, restrict the number
of reads to 100 per shard.  This is more than enough to saturate the disk,
and hopefully enough to prevent allocation failures.

Restriction is applied in column_family::make_sstable_reader(), which is
called either on a cache miss or if the cache is disabled.  This allows
cached reads to proceed without restriction, since their memory usage is
supposedly low.

Reads from the system keyspace use a separate semaphore, to prevent
user reads from blocking system reads.  Perhaps we should select the
semaphore based on the source of the read rather than the keyspace,
but for now using the keyspace is sufficient.

Fixes #1398.

(cherry picked from commit edeef03b34)
2016-06-27 19:43:07 +03:00
Avi Kivity
8361b01b9d mutation_reader: introduce restricting_reader
A restricting_reader wraps a mutation_reader, and restricts it concurrency
using a provided semaphore; this allows controlling read concurrency, which
is important since reads can consume a lot of resources ((number of
participating sstables) * 128k after we have streaming mutations, and a lot
more before).

Fixes #1398.

(cherry picked from commit bea7d7ee94)
2016-06-27 19:42:59 +03:00
Avi Kivity
67e80fd595 Update seastar submodule
* seastar 0bcdd28...31d988c (2):
  > reactor: run idle poll handler with a pure poll function
  > resource: don't abort on too-high io queue count

Fixes #1395.
Fixes #1400.
2016-06-27 19:31:43 +03:00
Avi Kivity
b3915e0363 Seastar: prepare a branch for 1.2 backports 2016-06-27 19:30:13 +03:00
Avi Kivity
985c4ffcc6 release: prepare for 1.2.2 2016-06-27 19:29:57 +03:00
Avi Kivity
c56fc99b7f main: handle exceptions during startup
If we don't, std::terminate() causes a core dump, even though an
exception is sort-of-expected here and can be handled.

Add an exception handler to fix.

Fixes #1379.
Message-Id: <1466595221-20358-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 5af22f6cb1)
2016-06-23 10:03:30 +03:00
Pekka Enberg
85d33e2ee4 release: prepare for 1.2.1 2016-06-21 16:22:17 +03:00
Duarte Nunes
ffeef2f072 database: Actually decrease query_state limit
query_state expects the current row limit to be updated so it
can be enforced across partition ranges. A regression introduced
in e4e8acc946 prevented that from
happening by passing a copy of the limit to querying_reader.

This patch fixes the issue by having column_family::query update
the limit as it processes partitions from the querying_reader.

Fixes #1338

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1465804012-30535-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit c896309383)
2016-06-21 10:03:22 +03:00
Pekka Enberg
d3ffa00eb2 systemd: Use PermissionsStartOnly instead of running sudo
Use the PermissionsStartOnly systemd option to apply the permission
related configurations only to the start command. This allows us to stop
using "sudo" for ExecStartPre and ExecStopPost hooks and drop the
"requiretty" /etc/sudoers hack from Scylla's RPM.

Tested-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1466407587-31734-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 1d5f7be447)
2016-06-21 08:49:17 +03:00
Nadav Har'El
ad50d83302 Rewriting shared sstables only after all shards loaded sstables
After commit faa4581, each shard only starts splitting its shared sstables
after opening all sstables. This was important because compaction needs to
be aware of all sstables.

However, another bug remained: If one shard finishes loading its sstables
and starts the splitting compactions, and in parallel a different shard is
still opening sstables - the second shard might find a half-written sstable
being written by the first shard, and abort on a malformed sstable.

So in this patch we start the shared sstable rewrites - on all shards -
only after all shards finished loading their sstables. Doing this is easy,
because main.cc already contains a list of sequential steps where each
uses invoke_on_all() to make sure the step completes on all shards before
continuing to the next step.

Fixes #1371

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1466426641-3972-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit 3372052d48)
2016-06-20 18:20:01 +03:00
Avi Kivity
c6a9844dfe dist: fix scylla-kernel-conf postinstall scriptlet failure
Because we build on CentOS 7, which does not have the %sysctl_apply macro,
the macro is not expanded, and therefore executed incorrectly even on 7.2,
which does.

Fix by expanding the macro manually.

Fixes #1360.
Message-Id: <1466250006-19476-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 07045ffd7c)
2016-06-20 09:37:06 +03:00
Nadav Har'El
dececbc0b9 Rewrite shared sstables only after entire CF is read
Starting in commit 721f7d1d4f, we start "rewriting" a shared sstable (i.e.,
splitting it into individual shards) as soon as it is loaded in each shard.

However as discovered in issue #1366, this is too soon: Our compaction
process relies in several places that compaction is only done after all
the sstables of the same CF have been loaded. One example is that we
need to know the content of the other sstables to decide which tombstones
we can expire (this is issue #1366). Another example is that we use the
last generation number we are aware of to decide the number of the next
compaction output - and this is wrong before we saw all sstables.

So with this patch, while loading sstables we only make a list of shared
sstables which need to be rewritten - and the actual rewrite is only started
when we finish reading all the sstables for this CF. We need to do this in
two cases: reboot (when we load all the existing sstables we find on disk),
and nodetool referesh (when we import a set of new sstables).

Fixes #1366.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1466344078-31290-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit faa45812b2)
2016-06-19 17:11:14 +03:00
Asias He
f2031bf3db repair: Switch log level to warn instead of error
dtest takes error level log as serious error. It is not a serious error
for streaming to fail to send a verb and fail a streaming session which
triggers a repair failure, for example, the peer node is gone or
stopped. Switch to use log level warn instead of level error.

Fixes repair_additional_test.py:RepairAdditionalTest.repair_kill_3_test

Fixes: #1335
Message-Id: <406fb0c4a45b81bd9c0aea2a898d7ca0787b23e9.1465979288.git.asias@scylladb.com>
(cherry picked from commit de0fd98349)
2016-06-18 11:42:21 +03:00
Asias He
da77b8885f streaming: Switch log level to warn instead of error
dtest takes error level log as serious error. It is not a serious error
for streaming to fail to send a verb and fail a streaming session, for
example, the peer node is gone or stopped. Switch to use log level warn
instead of level error.

Fixes repair_additional_test.py:RepairAdditionalTest.repair_kill_3_test

Fixes: #1335
Message-Id: <0149d30044e6e4d80732f1a20cd20593de489fc8.1465979288.git.asias@scylladb.com>
(cherry picked from commit 94c9211b0e)
2016-06-18 11:42:10 +03:00
Asias He
86434378d1 streaming: Fix indention in do_send_mutations
Message-Id: <bc8cfa7c7b29f08e70c0af6d2fb835124d0831ac.1464857352.git.asias@scylladb.com>
(cherry picked from commit 96463cc17c)
2016-06-18 11:41:51 +03:00
Pekka Enberg
e5d24d5940 service/storage_service: Make do_isolate_on_error() more robust
Currently, we only stop the CQL transport server. Extract a
stop_transport() function from drain_on_shutdown() and call it from
do_isolate_on_error() to also shut down the inter-node RPC transport,
Thrift, and other communications services.

Fixes #1353

(cherry picked from commit d72c608868)

Conflicts:
	service/storage_service.cc

(cherry picked from commit 7e052a4e91)
2016-06-16 14:01:33 +03:00
Nadav Har'El
0a2d4204bd Rewrite shared sstables soon after startup
Several shards may share the same sstable - e.g., when re-starting scylla
with a different number of shards, or when importing sstables from an
external source. Sharing an sstable is fine, but it can result in excessive
disk space use because the shared sstable cannot be deleted until all
the shards using it have finished compacting it. Normally, we have no idea
when the shards will decide to compact these sstables - e.g., with size-
tiered-compaction a large sstable will take a long time until we decide
to compact it. So what this patch does is to initiate compaction of the
shared sstables - on each shard using it - so that a soon as possible after
the restart, we will have the original sstable is split into separate
sstables per shard, and the original sstable can be deleted. If several
sstables are shared, we serialize this compaction process so that each
shard only rewrites one sstable at a time. Regular compactions may happen
in parallel, but they will not not be able to choose any of the shared
sstables because those are already marked as being compacted.

Commit 3f2286d0 increased the need for this patch, because since that
commit, if we don't delete the shared sstable, we also cannot delete
additional sstables which the different shards compacted with it. For one
scylla user, this resulted in so much excessive disk space use, that it
literally filled the whole disk.

After this patch commit 3f2286d0, or the discussion in issue #1318 on how
to improve it, is no longer necessary, because we will never compact a shared
sstable together with any other sstable - as explained above, the shared
sstables are marked as "being compacted" so the regular compactions will
avoid them.

Fixes #1314.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1465406235-15378-1-git-send-email-nyh@scylladb.com>
Reviewed-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 721f7d1d4f)
2016-06-16 14:01:33 +03:00
Tomasz Grabiec
74b8f63e8f row_cache: Make stronger guarantees in clear/invalidate
Correctness of current uses of clear() and invalidate() relies on fact
that cache is not populated using readers created before
invalidation. Sstables are first modified and then cache is
invalidated. This is not guaranteed by current implementation
though. As pointed out by Avi, a populating read may race with the
call to clear(). If that read started before clear() and completed
after it, the cache may be populated with data which does not
correspond to the new sstable set.

To provide such guarantee, invalidate() variants were adjusted to
synchronize using _populate_phaser, similarly like row_cache::update()
does.

(cherry picked from commit 170a214628)

Conflicts:
	database.cc
2016-06-16 14:01:33 +03:00
Tomasz Grabiec
9b764b726b row_cache: Implement clear() using invalidate()
Reduces code duplication.

(cherry picked from commit 2ab18dcd2d)
2016-06-16 14:01:33 +03:00
Pekka Enberg
07ba03ce7b utils/exceptions: Whitelist EEXIST and ENOENT in should_stop_on_system_error()
There are various call-sites that explicitly check for EEXIST and
ENOENT:

  $ git grep "std::error_code(E"
  database.cc:                            if (e.code() != std::error_code(EEXIST, std::system_category())) {
  database.cc:            if (e.code() != std::error_code(ENOENT, std::system_category())) {
  database.cc:        if (e.code() != std::error_code(ENOENT, std::system_category())) {
  database.cc:                            if (e.code() != std::error_code(ENOENT, std::system_category())) {
  sstables/sstables.cc:            if (e.code() == std::error_code(ENOENT, std::system_category())) {
  sstables/sstables.cc:            if (e.code() == std::error_code(ENOENT, std::system_category())) {

Commit 961e80a ("Be more conservative when deciding when to shut down
due to disk errors") turned these errors into a storage_io_exception
that is not expected by the callers, which causes 'nodetool snapshot'
functionality to break, for example.

Whitelist the two error codes to revert back to the old behavior of
io_check().
Message-Id: <1465454446-17954-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 8df5aa7b0c)
2016-06-16 14:01:33 +03:00
Avi Kivity
de690a6997 Be more conservative when deciding when to shut down due to disk errors
Currently we only shut down on EIO.  Expand this to shut down on any
system_error.

This may cause us to shut down prematurely due to a transient error,
but this is better than not shutting down due to a permanent error
(such as ENOSPC or EPERM).  We may whitelist certain errors in the future
to improve the behavior.

Fixes #1311.
Message-Id: <1465136956-1352-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 961e80ab74)
2016-06-16 14:01:33 +03:00
Pekka Enberg
7b53e969d2 dist/docker: Use Scylla 1.2 RPM repository 2016-06-15 19:50:02 +03:00
Pekka Enberg
c384b23112 release: prepare for 1.2.0 2016-06-13 15:18:13 +03:00
Shlomi Livne
3688542323 dist/common: Update scylla_io_setup to use settings done in cpuset.conf
scylla_io_setup is searching for --smp and --cpuset setting in
SCYLLA_ARGS. We have moved the settings of this args into
/etc/scylla.d/cpuset.conf and they are set by scylla_cpuset_setup into
CPUSET.

Fixes: #1327

Signed-off-by: Shlomi Livne <shlomi@scylladb.com>
Message-Id: <2735e3abdd63d245ec96cfa1e65f766b1c12132e.1465508701.git.shlomi@scylladb.com>
(cherry picked from commit ac6f2b5c13)
2016-06-10 09:38:17 +03:00
Pekka Enberg
7916182cfa Revert "Be more conservative when deciding when to shut down due to disk errors"
This reverts commit a6179476c5.

The change breaks 'nodetool snapshot', for example.
2016-06-09 10:11:29 +03:00
Tomasz Grabiec
ec1fd3945f Revert "config: adjust boost::program_options validator to work with db::string_map"
This reverts commit 653e250d04.

Compiletion is broken with this patch:

[155/264] CXX build/release/db/config.o
FAILED: g++ -MMD -MT build/release/db/config.o -MF build/release/db/config.o.d -std=gnu++1y -g  -Wall -Werror -fvisibility=hidden -pthread -I/home/shlomi/scylla/seastar -I/home/shlomi/scylla/seastar/build/release/gen  -march=nehalem -Wno-overloaded-virtual -DHAVE_HWLOC -DHAVE_NUMA  -O2 -I/usr/include/jsoncpp/  -Wno-maybe-uninitialized -DHAVE_LIBSYSTEMD=1 -I. -I build/release/gen -I seastar -I seastar/build/release/gen -c -o build/release/db/config.o db/config.cc
db/config.cc:57:13: error: ‘void db::validate(boost::any&, const std::vector<std::__cxx11::basic_string<char> >&, db::string_map*, int)’ defined but not used [-Werror=unused-function]
 static void validate(boost::any& out, const std::vector<std::string>& in,
             ^
cc1plus: all warnings being treated as errors

This branch doesn't have commits which introduce the problem which
this patch fixes, so let's just revert it.
2016-06-08 11:05:47 +02:00
Gleb Natapov
653e250d04 config: adjust boost::program_options validator to work with db::string_map
Fixes #1320

Message-Id: <20160607064511.GX9939@scylladb.com>
(cherry picked from commit 9635e67a84)
2016-06-07 10:43:30 +03:00
Amnon Heiman
6255076c20 rate_moving_average: mean_rate is not initilized
The rate_moving_average is used by timed_rate_moving_average to return
its internal values.

If there are no timed event, the mean_rate is not propertly initilized.
To solve that the mean_rate is now initilized to 0 in the structure
definition.

Refs #1306

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1465231006-7081-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit 2cf882c365)
2016-06-07 09:44:26 +03:00
Pekka Enberg
420ebe28fd release: prepare for 1.2.rc2 2016-06-06 16:17:26 +03:00
Avi Kivity
a6179476c5 Be more conservative when deciding when to shut down due to disk errors
Currently we only shut down on EIO.  Expand this to shut down on any
system_error.

This may cause us to shut down prematurely due to a transient error,
but this is better than not shutting down due to a permanent error
(such as ENOSPC or EPERM).  We may whitelist certain errors in the future
to improve the behavior.

Fixes #1311.
Message-Id: <1465136956-1352-1-git-send-email-avi@scylladb.com>

(cherry picked from commit 961e80ab74)
2016-06-06 16:15:25 +03:00
Raphael S. Carvalho
342726a23c compaction: leveled: improve log message for overlapping table
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <2dcbe3c8131f1d88a3536daa0b6cdd25c6e41d76.1464883077.git.raphaelsc@scylladb.com>
(cherry picked from commit 17b56eb459)
2016-06-06 16:13:40 +03:00
Raphael S. Carvalho
e9946032f4 compaction: disable parallel compaction for leveled strategy
It was discussed that leveled strategy may not benefit from parallel
compaction feature because almost all compaction jobs will have similar
size. It was also found that leveled strategy wasn't working correctly
with it because two overlapping sstable (targetting the same level)
could be created in parallel by two ongoing compaction.

Fixes #1293.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <60fe165d611c0283ca203c6d3aa2662ab091e363.1464883077.git.raphaelsc@scylladb.com>
(cherry picked from commit 588ce915d6)
2016-06-06 16:13:36 +03:00
Pekka Enberg
5e0b113732 Update scylla-ami submodule
* dist/ami/files/scylla-ami 72ae258...863cc45 (3):
  > Move --cpuset/--smp parameter settings from scylla_sysconfig_setup to scylla_ami_setup
  > convert scylla_install_ami to bash script
  > 'sh -x -e' is not valid since all scripts converted to bash script, so remove them
2016-06-06 13:38:53 +03:00
Asias He
c70faa4f23 streaming: Reduce memory usage when sending mutations
Limit disk bandwidth to 5MB/s to emulate a slow disk:
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.write_bps_device
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.read_bps_device

Start scylla node 1 with low memory:
scylla -c 1 -m 128M --auto-bootstrap false

Run c-s:
taskset -c 7 cassandra-stress write duration=5m cl=ONE -schema
'replication(factor=1)' -pop seq=1..100000  -rate threads=20
limit=2000/s -node 127.0.0.1

Start scylla node 2 with low memory:
scylla -c 1 -m 128M --auto-bootstrap true

Without this patch, I saw std::bad_alloc during streaming

ERROR 2016-06-01 14:31:00,196 [shard 0] storage_proxy - exception during
mutation write to 127.0.0.1: std::bad_alloc (std::bad_alloc)
...
ERROR 2016-06-01 14:31:10,172 [shard 0] database - failed to move
memtable to cache: std::bad_alloc (std::bad_alloc)
...

To fix:

1. Apply the streaming mutation limiter before we read the mutation into
memory to avoid wasting memory holding the mutation which we can not
send.

2. Reduce the parallelism of sending streaming mutations. Before we send each
range in parallel, after we send each range one by one.

   before: nr_vnode * nr_shard * (send_info + cf.make_reader memory usage)

   after: nr_shard * (send_info + cf.make_reader memory usage)

We can at least save memory usage by the factor of nr_vnode, 256 by
default.

In my setup, fix 1) alone is not enough, with both fix 1) and 2), I saw
no std::bad_alloc. Also, I did not see streaming bandwidth dropped due
to 2).

In addition, I tested grow_cluster_test.py:GrowClusterTest.test_grow_3_to_4,
as described:

https://github.com/scylladb/scylla/issues/1270#issuecomment-222585375

With this patch, I saw no std::bad_alloc any more.

Fixes: #1270

Message-Id: <7703cf7a9db40e53a87f0f7b5acbb03fff2daf43.1464785542.git.asias@scylladb.com>
(cherry picked from commit 206955e47c)
2016-06-02 11:18:59 +03:00
Gleb Natapov
15ad4c9033 storage_proxy: drop debug output
Message-Id: <20160601132641.GK2381@scylladb.com>
(cherry picked from commit 26b50eb8f4)
2016-06-01 17:14:32 +03:00
Pekka Enberg
d094329b6e Revert "Revert "main: change order between storage service and drain execution during exit""
This reverts commit b3ed55be1d.

The issue is in the failing dtest, not this commit. Gleb writes:

  "The bug is in the test, not the patch. Test waits for repair session
   to end one way or the other when node is killed, but for nodetool to
   know if repair is completed it needs to poll for it.  If node dies
   before nodetool managed to see repair completion it will stuck
   forever since jmx is alive, but does not provide answers any more.
   The patch changes timing, repair is completed much close to exit now,
   so problem appears, but it may happen even without the patch.

   The fix is for dtest to kill jmx as part of killing a node
   operation."

Now that Lucas fixed the problem in scylla-ccm, revert the revert.

(cherry picked from commit 0255318bf3)
2016-06-01 08:51:51 +03:00
Pekka Enberg
dcab915f21 release: prepare for 1.2.rc1 2016-05-30 13:14:38 +03:00
49 changed files with 836 additions and 307 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.2.4
if test -f version
then

View File

@@ -219,8 +219,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.map_reduce0([uuid, total](database& db) {
std::unordered_map<sstring, uint64_t> m;
for (auto t :*((total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
db.find_column_family(uuid).get_sstables()).get()) {
auto sstables = (total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
db.find_column_family(uuid).get_sstables();
for (auto t : *sstables) {
m[t.second->get_filename()] = t.second->bytes_on_disk();
}
return m;
@@ -234,8 +235,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
static future<json::json_return_type> sum_sstable(http_context& ctx, bool total) {
return map_reduce_cf_raw(ctx, std::unordered_map<sstring, uint64_t>(), [total](column_family& cf) {
std::unordered_map<sstring, uint64_t> m;
for (auto t :*((total) ? cf.get_sstables_including_compacted_undeleted() :
cf.get_sstables()).get()) {
auto sstables = (total) ? cf.get_sstables_including_compacted_undeleted() :
cf.get_sstables();
for (auto t : *sstables) {
m[t.second->get_filename()] = t.second->bytes_on_disk();
}
return m;

View File

@@ -97,7 +97,7 @@ namespace std {
template <>
struct hash<auth::data_resource> {
size_t operator()(const auth::data_resource & v) const {
return std::hash<sstring>()(v.name());
return v.hash_value();
}
};

View File

@@ -41,6 +41,7 @@
#pragma once
#include "utils/hash.hh"
#include <iosfwd>
#include <set>
#include <seastar/core/sstring.hh>
@@ -137,6 +138,10 @@ public:
bool operator==(const data_resource&) const;
bool operator<(const data_resource&) const;
size_t hash_value() const {
return utils::tuple_hash()(_ks, _cf);
}
};
/**

View File

@@ -28,7 +28,11 @@ class checked_file_impl : public file_impl {
public:
checked_file_impl(disk_error_signal_type& s, file f)
: _signal(s) , _file(f) {}
: _signal(s) , _file(f) {
_memory_dma_alignment = f.memory_dma_alignment();
_disk_read_dma_alignment = f.disk_read_dma_alignment();
_disk_write_dma_alignment = f.disk_write_dma_alignment();
}
virtual future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) override {
return do_io_check(_signal, [&] {

View File

@@ -51,6 +51,9 @@ public:
// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
// Return if parallel compaction is allowed by strategy.
bool parallel_compaction() const;
static sstring name(compaction_strategy_type type) {
switch (type) {
case compaction_strategy_type::null:

View File

@@ -35,7 +35,7 @@ class converting_mutation_partition_applier : public mutation_partition_visitor
deletable_row* _current_row;
private:
static bool is_compatible(const column_definition& new_def, const data_type& old_type, column_kind kind) {
return new_def.kind == kind && new_def.type->is_value_compatible_with(*old_type);
return ::is_compatible(new_def.kind, kind) && new_def.type->is_value_compatible_with(*old_type);
}
void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
if (is_compatible(new_def, old_type, kind) && cell.timestamp() > new_def.dropped_at()) {

View File

@@ -251,15 +251,24 @@ column_family::make_sstable_reader(schema_ptr s,
const query::partition_range& pr,
query::clustering_key_filtering_context ck_filtering,
const io_priority_class& pc) const {
// restricts a reader's concurrency if the configuration specifies it
auto restrict_reader = [&] (mutation_reader&& in) {
if (_config.read_concurrency_config.sem) {
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
} else {
return std::move(in);
}
};
if (pr.is_singular() && pr.start()->value().has_key()) {
const dht::ring_position& pos = pr.start()->value();
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
return make_empty_reader(); // range doesn't belong to this shard
}
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc);
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc));
} else {
// range_sstable_reader is not movable so we need to wrap it
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc);
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc));
}
}
@@ -328,7 +337,7 @@ column_family::make_reader(schema_ptr s,
}
std::vector<mutation_reader> readers;
readers.reserve(_memtables->size() + _sstables->size());
readers.reserve(_memtables->size() + 1);
// We're assuming that cache and memtables are both read atomically
// for single-key queries, so we don't need to special case memtable
@@ -484,12 +493,75 @@ static bool belongs_to_current_shard(const schema& s, const partition_key& first
return (s1 <= me) && (me <= s2);
}
static bool belongs_to_other_shard(const schema& s, const partition_key& first, const partition_key& last) {
auto key_shard = [&s] (const partition_key& pk) {
auto token = dht::global_partitioner().get_token(s, pk);
return dht::shard_of(token);
};
auto s1 = key_shard(first);
auto s2 = key_shard(last);
auto me = engine().cpu_id();
return (s1 != me) || (me != s2);
}
static bool belongs_to_current_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_current_shard(s, r.start()->value(), r.end()->value());
}
static bool belongs_to_other_shard(const schema& s, range<partition_key> r) {
assert(r.start());
assert(r.end());
return belongs_to_other_shard(s, r.start()->value(), r.end()->value());
}
future<> column_family::load_sstable(sstables::sstable&& sstab, bool reset_level) {
auto sst = make_lw_shared<sstables::sstable>(std::move(sstab));
return sst->get_sstable_key_range(*_schema).then([this, sst, reset_level] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, r)) {
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
sst->mark_for_deletion();
return make_ready_future<>();
}
bool in_other_shard = belongs_to_other_shard(*_schema, std::move(r));
return sst->load().then([this, sst, in_other_shard, reset_level] () mutable {
if (in_other_shard) {
// If we're here, this sstable is shared by this and other
// shard(s). Shared sstables cannot be deleted until all
// shards compacted them, so to reduce disk space usage we
// want to start splitting them now.
// However, we need to delay this compaction until we read all
// the sstables belonging to this CF, because we need all of
// them to know which tombstones we can drop, and what
// generation number is free.
_sstables_need_rewrite.push_back(sst);
}
if (reset_level) {
// When loading a migrated sstable, set level to 0 because
// it may overlap with existing tables in levels > 0.
// This step is optional, because even if we didn't do this
// scylla would detect the overlap, and bring back some of
// the sstables to level 0.
sst->set_sstable_level(0);
}
add_sstable(sst);
});
});
}
// load_sstable() wants to start rewriting sstables which are shared between
// several shards, but we can't start any compaction before all the sstables
// of this CF were loaded. So call this function to start rewrites, if any.
void column_family::start_rewrite() {
for (auto sst : _sstables_need_rewrite) {
dblog.info("Splitting {} for shard", sst->get_filename());
_compaction_manager.submit_sstable_rewrite(this, sst);
}
_sstables_need_rewrite.clear();
}
future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sstring fname) {
using namespace sstables;
@@ -514,24 +586,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
}
}
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
auto fut = sst->get_sstable_key_range(*_schema);
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, std::move(r))) {
dblog.debug("sstable {} not relevant for this shard, ignoring",
sstables::sstable::filename(sstdir, _schema->ks_name(), _schema->cf_name(), comps.version, comps.generation, comps.format,
sstables::sstable::component_type::Data));
sstable::mark_sstable_for_deletion(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
return make_ready_future<>();
}
auto fut = sst->load();
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
add_sstable(std::move(*sst));
return make_ready_future<>();
});
}).then_wrapped([fname, comps] (future<> f) {
return load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation,
comps.version, comps.format)).then_wrapped([fname, comps] (future<> f) {
try {
f.get();
} catch (malformed_sstable_exception& e) {
@@ -1033,29 +1090,14 @@ future<> column_family::cleanup_sstables(sstables::compaction_descriptor descrip
future<>
column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tables) {
return parallel_for_each(new_tables, [this] (auto comps) {
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
return sst->load().then([this, sst] {
// This sets in-memory level of sstable to 0.
// When loading a migrated sstable, it's important to set it to level 0 because
// leveled compaction relies on a level > 0 having no overlapping sstables.
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
// is smart enough to detect a sstable that overlaps and set its in-memory
// level to 0.
return sst->set_sstable_level(0);
}).then([this, sst] {
auto first = sst->get_first_partition_key(*_schema);
auto last = sst->get_last_partition_key(*_schema);
if (belongs_to_current_shard(*_schema, first, last)) {
this->add_sstable(sst);
} else {
sst->mark_for_deletion();
}
return make_ready_future<>();
});
return this->load_sstable(sstables::sstable(
_schema->ks_name(), _schema->cf_name(), _config.datadir,
comps.generation, comps.version, comps.format), true);
}).then([this] {
start_rewrite();
// Drop entire cache for this column family because it may be populated
// with stale data.
get_row_cache().clear();
return get_row_cache().clear();
});
}
@@ -1338,6 +1380,38 @@ database::setup_collectd() {
, "total_operations", "total_reads")
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats->total_reads)
));
_collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("database"
, scollectd::per_cpu_plugin_instance
, "total_operations", "sstable_read_queue_overloads")
, scollectd::make_typed(scollectd::data_type::COUNTER, _stats->sstable_read_queue_overloaded)
));
_collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("database"
, scollectd::per_cpu_plugin_instance
, "queue_length", "active_reads")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return max_concurrent_reads() - _read_concurrency_sem.current(); })
));
_collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("database"
, scollectd::per_cpu_plugin_instance
, "queue_length", "queued_reads")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _read_concurrency_sem.waiters(); })
));
_collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("database"
, scollectd::per_cpu_plugin_instance
, "queue_length", "active_reads_system_keyspace")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return max_system_concurrent_reads() - _system_read_concurrency_sem.current(); })
));
_collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("database"
, scollectd::per_cpu_plugin_instance
, "queue_length", "queued_reads_system_keyspace")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _system_read_concurrency_sem.waiters(); })
));
}
database::~database() {
@@ -1360,10 +1434,10 @@ future<> database::populate_keyspace(sstring datadir, sstring ks_name) {
} else {
dblog.info("Populating Keyspace {}", ks_name);
auto& ks = i->second;
return parallel_for_each(std::cbegin(_column_families), std::cend(_column_families),
[ks_name, &ks] (const std::pair<utils::UUID, lw_shared_ptr<column_family>>& e) {
utils::UUID uuid = e.first;
lw_shared_ptr<column_family> cf = e.second;
return parallel_for_each(ks.metadata()->cf_meta_data() | boost::adaptors::map_values,
[ks_name, &ks, this] (schema_ptr s) {
utils::UUID uuid = s->id();
lw_shared_ptr<column_family> cf = _column_families[uuid];
sstring cfname = cf->schema()->cf_name();
auto sstdir = ks.column_family_directory(cfname, uuid);
dblog.info("Keyspace {}: Reading CF {} ", ks_name, cfname);
@@ -1707,6 +1781,7 @@ keyspace::make_column_family_config(const schema& s) const {
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group;
cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
@@ -1889,6 +1964,7 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
auto add_partition = [&qs] (uint32_t live_rows, mutation&& m) {
auto pb = qs.builder.add_partition(*qs.schema, m.key());
m.partition().query_compacted(pb, *qs.schema, live_rows);
qs.limit -= live_rows;
};
return do_with(querying_reader(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.cmd.timestamp, add_partition),
[] (auto&& rd) { return rd.read(); });
@@ -1896,10 +1972,10 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
return make_ready_future<lw_shared_ptr<query::result>>(
make_lw_shared<query::result>(qs.builder.build()));
}).finally([lc, this]() mutable {
_stats.reads.mark(lc);
if (lc.is_start()) {
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
}
_stats.reads.mark(lc);
if (lc.is_start()) {
_stats.estimated_read.add(lc.latency(), _stats.reads.hist.count);
}
});
}
}
@@ -2166,6 +2242,14 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
}
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group;
cfg.read_concurrency_config.sem = &_read_concurrency_sem;
cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms;
// Assume a queued read takes up 10kB of memory, and allow 2% of memory to be filled up with such reads.
cfg.read_concurrency_config.max_queue_length = memory::stats().total_memory() * 0.02 / 10000;
cfg.read_concurrency_config.raise_queue_overloaded_exception = [this] {
++_stats->sstable_read_queue_overloaded;
throw std::runtime_error("sstable inactive read queue overloaded");
};
cfg.cf_stats = &_cf_stats;
cfg.enable_incremental_backups = _enable_incremental_backups;
return cfg;
@@ -2257,7 +2341,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
// gotten all things to disk. Again, need queue-ish or something.
f = cf.flush();
} else {
cf.clear();
f = cf.clear();
}
return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable {
@@ -2633,21 +2717,29 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
// temporary counter measure.
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
return _streaming_memtables->seal_active_memtable().finally([this, ranges = std::move(ranges)] {
if (_config.enable_cache) {
for (auto& range : ranges) {
_cache.invalidate(range);
}
if (!_config.enable_cache) {
return make_ready_future<>();
}
return do_with(std::move(ranges), [this] (auto& ranges) {
return parallel_for_each(ranges, [this](auto&& range) {
return _cache.invalidate(range);
});
});
return do_with(std::move(ranges), [this] (auto& ranges) {
return parallel_for_each(ranges, [this](auto&& range) {
return _cache.invalidate(range);
});
});
});
});
}
void column_family::clear() {
_cache.clear();
future<> column_family::clear() {
_memtables->clear();
_memtables->add_memtable();
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
return _cache.clear();
}
// NOTE: does not need to be futurized, but might eventually, depending on
@@ -2673,13 +2765,13 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
_sstables = std::move(pruned);
dblog.debug("cleaning out row cache");
_cache.clear();
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
});
});
}

View File

@@ -249,6 +249,7 @@ public:
size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr;
};
struct no_commitlog {};
@@ -310,6 +311,11 @@ private:
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
// sstables that are shared between several shards so we want to rewrite
// them (split the data belonging to this shard to a separate sstable),
// but for correct compaction we need to start the compaction only after
// reading all sstables.
std::vector<sstables::shared_sstable> _sstables_need_rewrite;
// Control background fibers waiting for sstables to be deleted
seastar::gate _sstable_deletion_gate;
// There are situations in which we need to stop writing sstables. Flushers will take
@@ -338,6 +344,7 @@ private:
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
void add_sstable(sstables::sstable&& sstable);
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
future<> load_sstable(sstables::sstable&& sstab, bool reset_level = false);
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
@@ -463,7 +470,7 @@ public:
future<> flush();
future<> flush(const db::replay_position&);
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
void clear(); // discards memtable(s) without flushing them to disk.
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
// Important warning: disabling writes will only have an effect in the current shard.
@@ -634,6 +641,7 @@ private:
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
void check_valid_rp(const db::replay_position&) const;
public:
void start_rewrite();
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
@@ -743,6 +751,7 @@ public:
size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr;
};
private:
@@ -822,9 +831,12 @@ public:
class database {
::cf_stats _cf_stats;
static constexpr size_t max_concurrent_reads() { return 100; }
static constexpr size_t max_system_concurrent_reads() { return 10; }
struct db_stats {
uint64_t total_writes = 0;
uint64_t total_reads = 0;
uint64_t sstable_read_queue_overloaded = 0;
};
lw_shared_ptr<db_stats> _stats;
@@ -834,6 +846,10 @@ class database {
size_t _streaming_memtable_total_space = 500 << 20;
logalloc::region_group _dirty_memory_region_group;
logalloc::region_group _streaming_dirty_memory_region_group;
semaphore _read_concurrency_sem{max_concurrent_reads()};
restricted_mutation_reader_config _read_concurrency_config;
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
restricted_mutation_reader_config _system_read_concurrency_config;
std::unordered_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
@@ -979,6 +995,9 @@ public:
std::unordered_set<sstring> get_initial_tokens();
std::experimental::optional<gms::inet_address> get_replace_address();
bool is_replacing();
semaphore& system_keyspace_read_concurrency_sem() {
return _system_read_concurrency_sem;
}
};
// FIXME: stub

View File

@@ -711,15 +711,21 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after)
{
struct dropped_table {
global_schema_ptr schema;
utils::joinpoint<db_clock::time_point> jp{[] {
return make_ready_future<db_clock::time_point>(db_clock::now());
}};
};
std::vector<global_schema_ptr> created;
std::vector<global_schema_ptr> altered;
std::vector<global_schema_ptr> dropped;
std::vector<dropped_table> dropped;
auto diff = difference(before, after);
for (auto&& key : diff.entries_only_on_left) {
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
dropped.emplace_back(s);
dropped.emplace_back(dropped_table{s});
}
for (auto&& key : diff.entries_only_on_right) {
auto s = create_table_from_mutations(after.at(key));
@@ -732,9 +738,7 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
altered.emplace_back(s);
}
do_with(utils::make_joinpoint([] { return db_clock::now();})
, [&created, &dropped, &altered, &proxy](auto& tsf) {
return proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, &tsf] (database& db) {
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered] (database& db) {
return seastar::async([&] {
for (auto&& gs : created) {
schema_ptr s = gs.get();
@@ -749,14 +753,13 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
for (auto&& gs : altered) {
update_column_family(db, gs.get()).get();
}
parallel_for_each(dropped.begin(), dropped.end(), [&db, &tsf](auto&& gs) {
schema_ptr s = gs.get();
return db.drop_column_family(s->ks_name(), s->cf_name(), [&tsf] { return tsf.value(); }).then([s] {
parallel_for_each(dropped.begin(), dropped.end(), [&db](dropped_table& dt) {
schema_ptr s = dt.schema.get();
return db.drop_column_family(s->ks_name(), s->cf_name(), [&dt] { return dt.jp.value(); }).then([s] {
return service::get_local_migration_manager().notify_drop_column_family(s);
});
}).get();
});
});
}).get();
}

View File

@@ -1022,6 +1022,10 @@ void make(database& db, bool durable, bool volatile_testing_only) {
kscfg.enable_disk_writes = !volatile_testing_only;
kscfg.enable_commitlog = !volatile_testing_only;
kscfg.enable_cache = true;
// don't make system keyspace reads wait for user reads
kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem();
kscfg.read_concurrency_config.timeout = {};
kscfg.read_concurrency_config.max_queue_length = std::numeric_limits<size_t>::max();
keyspace _ks{ksm, std::move(kscfg)};
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
_ks.set_replication_strategy(std::move(rs));

View File

@@ -36,6 +36,8 @@ extern thread_local disk_error_signal_type sstable_read_error;
extern thread_local disk_error_signal_type sstable_write_error;
extern thread_local disk_error_signal_type general_disk_error;
bool should_stop_on_system_error(const std::system_error& e);
template<typename Func, typename... Args>
std::enable_if_t<!is_future<std::result_of_t<Func(Args&&...)>>::value,
std::result_of_t<Func(Args&&...)>>
@@ -44,7 +46,7 @@ do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
// calling function
return func(std::forward<Args>(args)...);
} catch (std::system_error& e) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(e)) {
signal();
throw storage_io_error(e);
}
@@ -62,7 +64,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
try {
std::rethrow_exception(ep);
} catch (std::system_error& sys_err) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(sys_err)) {
signal();
throw storage_io_error(sys_err);
}
@@ -70,7 +72,7 @@ auto do_io_check(disk_error_signal_type& signal, Func&& func, Args&&... args) {
return futurize<std::result_of_t<Func(Args&&...)>>::make_exception_future(ep);
});
} catch (std::system_error& e) {
if (is_system_error_errno(EIO)) {
if (should_stop_on_system_error(e)) {
signal();
throw storage_io_error(e);
}

View File

@@ -44,8 +44,8 @@ output_to_user()
}
if [ `is_developer_mode` -eq 0 ]; then
SMP=`echo $SCYLLA_ARGS|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
CPUSET=`echo $SCYLLA_ARGS|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
if [ $AMI_OPT -eq 1 ]; then
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`

View File

@@ -2,6 +2,7 @@
Description=Scylla Server
[Service]
PermissionsStartOnly=true
Type=notify
LimitMEMLOCK=infinity
LimitNOFILE=200000
@@ -10,9 +11,9 @@ LimitNPROC=8096
EnvironmentFile=@@SYSCONFDIR@@/scylla-server
EnvironmentFile=/etc/scylla.d/*.conf
WorkingDirectory=$SCYLLA_HOME
ExecStartPre=/usr/bin/sudo /usr/lib/scylla/scylla_prepare
ExecStartPre=/usr/lib/scylla/scylla_prepare
ExecStart=/usr/bin/scylla $SCYLLA_ARGS $SEASTAR_IO $DEV_MODE $CPUSET
ExecStopPost=/usr/bin/sudo /usr/lib/scylla/scylla_stop
ExecStopPost=/usr/lib/scylla/scylla_stop
TimeoutStartSec=900
KillMode=process
Restart=on-abnormal

View File

@@ -2,7 +2,7 @@ FROM centos:7
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.2.repo -o /etc/yum.repos.d/scylla.repo
RUN yum -y install epel-release
RUN yum -y clean expire-cache
RUN yum -y update

View File

@@ -1,5 +1,5 @@
--- gcc.spec.orig 2015-12-08 16:03:46.000000000 +0000
+++ gcc.spec 2016-01-21 08:47:49.160667342 +0000
+++ gcc.spec 2016-07-10 06:07:27.612453480 +0000
@@ -1,6 +1,7 @@
%global DATE 20151207
%global SVNREV 231358
@@ -8,7 +8,24 @@
# Note, gcc_release must be integer, if you want to add suffixes to
# %{release}, append them after %{gcc_release} on Release: line.
%global gcc_release 2
@@ -84,7 +85,8 @@
@@ -9,16 +10,8 @@
# Hardening slows the compiler way too much.
%undefine _hardened_build
%global multilib_64_archs sparc64 ppc64 ppc64p7 s390x x86_64
-%ifarch %{ix86} x86_64 ia64 ppc ppc64 ppc64p7 alpha %{arm} aarch64
-%global build_ada 1
-%else
%global build_ada 0
-%endif
-%ifarch %{ix86} x86_64 ppc ppc64 ppc64le ppc64p7 s390 s390x %{arm} aarch64
-%global build_go 1
-%else
%global build_go 0
-%endif
%ifarch %{ix86} x86_64 ia64
%global build_libquadmath 1
%else
@@ -84,7 +77,8 @@
%global multilib_32_arch i686
%endif
Summary: Various compilers (C, C++, Objective-C, Java, ...)
@@ -18,7 +35,7 @@
Version: %{gcc_version}
Release: %{gcc_release}%{?dist}
# libgcc, libgfortran, libgomp, libstdc++ and crtstuff have
@@ -99,6 +101,7 @@
@@ -99,6 +93,7 @@
%global isl_version 0.14
URL: http://gcc.gnu.org
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
@@ -26,7 +43,7 @@
# Need binutils with -pie support >= 2.14.90.0.4-4
# Need binutils which can omit dot symbols and overlap .opd on ppc64 >= 2.15.91.0.2-4
# Need binutils which handle -msecure-plt on ppc >= 2.16.91.0.2-2
@@ -110,7 +113,7 @@
@@ -110,7 +105,7 @@
# Need binutils which support .cfi_sections >= 2.19.51.0.14-33
# Need binutils which support --no-add-needed >= 2.20.51.0.2-12
# Need binutils which support -plugin
@@ -35,7 +52,7 @@
# While gcc doesn't include statically linked binaries, during testing
# -static is used several times.
BuildRequires: glibc-static
@@ -145,15 +148,15 @@
@@ -145,15 +140,15 @@
BuildRequires: libunwind >= 0.98
%endif
%if %{build_isl}
@@ -55,7 +72,7 @@
# Need .eh_frame ld optimizations
# Need proper visibility support
# Need -pie support
@@ -168,7 +171,7 @@
@@ -168,7 +163,7 @@
# Need binutils that support .cfi_sections
# Need binutils that support --no-add-needed
# Need binutils that support -plugin
@@ -64,7 +81,7 @@
# Make sure gdb will understand DW_FORM_strp
Conflicts: gdb < 5.1-2
Requires: glibc-devel >= 2.2.90-12
@@ -176,17 +179,15 @@
@@ -176,17 +171,15 @@
# Make sure glibc supports TFmode long double
Requires: glibc >= 2.3.90-35
%endif
@@ -86,7 +103,7 @@
Requires(post): /sbin/install-info
Requires(preun): /sbin/install-info
AutoReq: true
@@ -228,12 +229,12 @@
@@ -228,12 +221,12 @@
The gcc package contains the GNU Compiler Collection version 5.
You'll need this package in order to compile C code.
@@ -101,7 +118,7 @@
%endif
Obsoletes: libmudflap
Obsoletes: libmudflap-devel
@@ -241,17 +242,19 @@
@@ -241,17 +234,19 @@
Obsoletes: libgcj < %{version}-%{release}
Obsoletes: libgcj-devel < %{version}-%{release}
Obsoletes: libgcj-src < %{version}-%{release}
@@ -125,7 +142,7 @@
Autoreq: true
%description c++
@@ -259,50 +262,55 @@
@@ -259,50 +254,55 @@
It includes support for most of the current C++ specification,
including templates and exception handling.
@@ -193,7 +210,7 @@
Autoreq: true
%description objc
@@ -313,29 +321,32 @@
@@ -313,29 +313,32 @@
%package objc++
Summary: Objective-C++ support for GCC
Group: Development/Languages
@@ -233,7 +250,7 @@
%endif
Requires(post): /sbin/install-info
Requires(preun): /sbin/install-info
@@ -345,260 +356,286 @@
@@ -345,260 +348,286 @@
The gcc-gfortran package provides support for compiling Fortran
programs with the GNU Compiler Collection.
@@ -592,7 +609,7 @@
Cpp is the GNU C-Compatible Compiler Preprocessor.
Cpp is a macro processor which is used automatically
by the C compiler to transform your program before actual
@@ -623,8 +660,9 @@
@@ -623,8 +652,9 @@
%package gnat
Summary: Ada 83, 95, 2005 and 2012 support for GCC
Group: Development/Languages
@@ -604,7 +621,7 @@
Requires(post): /sbin/install-info
Requires(preun): /sbin/install-info
Autoreq: true
@@ -633,82 +671,90 @@
@@ -633,82 +663,90 @@
GNAT is a GNU Ada 83, 95, 2005 and 2012 front-end to GCC. This package includes
development tools, the documents and Ada compiler.
@@ -717,7 +734,7 @@
Requires: gmp-devel >= 4.1.2-8, mpfr-devel >= 2.2.1, libmpc-devel >= 0.8.1
%description plugin-devel
@@ -728,7 +774,8 @@
@@ -728,7 +766,8 @@
Summary: Debug information for package %{name}
Group: Development/Debug
AutoReqProv: 0
@@ -727,7 +744,7 @@
%description debuginfo
This package provides debug information for package %{name}.
@@ -958,11 +1005,11 @@
@@ -958,11 +997,11 @@
--enable-gnu-unique-object --enable-linker-build-id --with-linker-hash-style=gnu \
--enable-plugin --enable-initfini-array \
--disable-libgcj \
@@ -741,7 +758,7 @@
%else
--without-isl \
%endif
@@ -971,11 +1018,9 @@
@@ -971,11 +1010,9 @@
%else
--disable-libmpx \
%endif
@@ -753,7 +770,7 @@
%ifarch %{arm}
--disable-sjlj-exceptions \
%endif
@@ -1006,9 +1051,6 @@
@@ -1006,9 +1043,6 @@
%if 0%{?rhel} >= 7
--with-cpu-32=power8 --with-tune-32=power8 --with-cpu-64=power8 --with-tune-64=power8 \
%endif
@@ -763,7 +780,7 @@
%endif
%ifarch ppc
--build=%{gcc_target_platform} --target=%{gcc_target_platform} --with-cpu=default32
@@ -1270,16 +1312,15 @@
@@ -1270,16 +1304,15 @@
mv %{buildroot}%{_prefix}/%{_lib}/libmpx.spec $FULLPATH/
%endif
@@ -786,7 +803,7 @@
%endif
%ifarch ppc
rm -f $FULLPATH/libgcc_s.so
@@ -1819,7 +1860,7 @@
@@ -1819,7 +1852,7 @@
chmod 755 %{buildroot}%{_prefix}/bin/c?9
cd ..
@@ -795,7 +812,7 @@
%find_lang cpplib
# Remove binaries we will not be including, so that they don't end up in
@@ -1869,11 +1910,7 @@
@@ -1869,11 +1902,7 @@
# run the tests.
make %{?_smp_mflags} -k check ALT_CC_UNDER_TEST=gcc ALT_CXX_UNDER_TEST=g++ \
@@ -807,7 +824,7 @@
echo ====================TESTING=========================
( LC_ALL=C ../contrib/test_summary || : ) 2>&1 | sed -n '/^cat.*EOF/,/^EOF/{/^cat.*EOF/d;/^EOF/d;/^LAST_UPDATED:/d;p;}'
echo ====================TESTING END=====================
@@ -1900,13 +1937,13 @@
@@ -1900,13 +1929,13 @@
--info-dir=%{_infodir} %{_infodir}/gcc.info.gz || :
fi
@@ -823,7 +840,7 @@
if [ $1 = 0 -a -f %{_infodir}/cpp.info.gz ]; then
/sbin/install-info --delete \
--info-dir=%{_infodir} %{_infodir}/cpp.info.gz || :
@@ -1945,19 +1982,19 @@
@@ -1945,19 +1974,19 @@
fi
%post go
@@ -846,7 +863,7 @@
if posix.access ("/sbin/ldconfig", "x") then
local pid = posix.fork ()
if pid == 0 then
@@ -1967,7 +2004,7 @@
@@ -1967,7 +1996,7 @@
end
end
@@ -855,7 +872,7 @@
if posix.access ("/sbin/ldconfig", "x") then
local pid = posix.fork ()
if pid == 0 then
@@ -1977,120 +2014,120 @@
@@ -1977,120 +2006,120 @@
end
end
@@ -1014,7 +1031,7 @@
%defattr(-,root,root,-)
%{_prefix}/bin/cc
%{_prefix}/bin/c89
@@ -2414,7 +2451,7 @@
@@ -2414,7 +2443,7 @@
%{!?_licensedir:%global license %%doc}
%license gcc/COPYING* COPYING.RUNTIME
@@ -1023,7 +1040,7 @@
%defattr(-,root,root,-)
%{_prefix}/lib/cpp
%{_prefix}/bin/cpp
@@ -2425,10 +2462,10 @@
@@ -2425,10 +2454,10 @@
%dir %{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}
%{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}/cc1
@@ -1037,7 +1054,7 @@
%{!?_licensedir:%global license %%doc}
%license gcc/COPYING* COPYING.RUNTIME
@@ -2469,7 +2506,7 @@
@@ -2469,7 +2498,7 @@
%endif
%doc rpm.doc/changelogs/gcc/cp/ChangeLog*
@@ -1046,7 +1063,7 @@
%defattr(-,root,root,-)
%{_prefix}/%{_lib}/libstdc++.so.6*
%dir %{_datadir}/gdb
@@ -2481,7 +2518,7 @@
@@ -2481,7 +2510,7 @@
%dir %{_prefix}/share/gcc-%{gcc_version}/python
%{_prefix}/share/gcc-%{gcc_version}/python/libstdcxx
@@ -1055,7 +1072,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/include/c++
%dir %{_prefix}/include/c++/%{gcc_version}
@@ -2507,7 +2544,7 @@
@@ -2507,7 +2536,7 @@
%endif
%doc rpm.doc/changelogs/libstdc++-v3/ChangeLog* libstdc++-v3/README*
@@ -1064,7 +1081,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2528,7 +2565,7 @@
@@ -2528,7 +2557,7 @@
%endif
%if %{build_libstdcxx_docs}
@@ -1073,7 +1090,7 @@
%defattr(-,root,root)
%{_mandir}/man3/*
%doc rpm.doc/libstdc++-v3/html
@@ -2567,7 +2604,7 @@
@@ -2567,7 +2596,7 @@
%dir %{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}
%{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}/cc1objplus
@@ -1082,7 +1099,7 @@
%defattr(-,root,root,-)
%{_prefix}/%{_lib}/libobjc.so.4*
@@ -2621,11 +2658,11 @@
@@ -2621,11 +2650,11 @@
%endif
%doc rpm.doc/gfortran/*
@@ -1096,7 +1113,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2671,12 +2708,12 @@
@@ -2671,12 +2700,12 @@
%{_prefix}/libexec/gcc/%{gcc_target_platform}/%{gcc_version}/gnat1
%doc rpm.doc/changelogs/gcc/ada/ChangeLog*
@@ -1111,7 +1128,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2702,7 +2739,7 @@
@@ -2702,7 +2731,7 @@
%exclude %{_prefix}/lib/gcc/%{gcc_target_platform}/%{gcc_version}/adalib/libgnarl.a
%endif
@@ -1120,7 +1137,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2726,7 +2763,7 @@
@@ -2726,7 +2755,7 @@
%endif
%endif
@@ -1129,7 +1146,7 @@
%defattr(-,root,root,-)
%{_prefix}/%{_lib}/libgomp.so.1*
%{_prefix}/%{_lib}/libgomp-plugin-host_nonshm.so.1*
@@ -2734,14 +2771,14 @@
@@ -2734,14 +2763,14 @@
%doc rpm.doc/changelogs/libgomp/ChangeLog*
%if %{build_libquadmath}
@@ -1146,7 +1163,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2754,7 +2791,7 @@
@@ -2754,7 +2783,7 @@
%endif
%doc rpm.doc/libquadmath/ChangeLog*
@@ -1155,7 +1172,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2773,12 +2810,12 @@
@@ -2773,12 +2802,12 @@
%endif
%if %{build_libitm}
@@ -1170,7 +1187,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2791,7 +2828,7 @@
@@ -2791,7 +2820,7 @@
%endif
%doc rpm.doc/libitm/ChangeLog*
@@ -1179,7 +1196,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2810,11 +2847,11 @@
@@ -2810,11 +2839,11 @@
%endif
%if %{build_libatomic}
@@ -1193,7 +1210,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2834,11 +2871,11 @@
@@ -2834,11 +2863,11 @@
%endif
%if %{build_libasan}
@@ -1207,7 +1224,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2860,11 +2897,11 @@
@@ -2860,11 +2889,11 @@
%endif
%if %{build_libubsan}
@@ -1221,7 +1238,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2886,11 +2923,11 @@
@@ -2886,11 +2915,11 @@
%endif
%if %{build_libtsan}
@@ -1235,7 +1252,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2902,11 +2939,11 @@
@@ -2902,11 +2931,11 @@
%endif
%if %{build_liblsan}
@@ -1249,7 +1266,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2918,11 +2955,11 @@
@@ -2918,11 +2947,11 @@
%endif
%if %{build_libcilkrts}
@@ -1263,7 +1280,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -2942,12 +2979,12 @@
@@ -2942,12 +2971,12 @@
%endif
%if %{build_libmpx}
@@ -1278,7 +1295,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -3009,12 +3046,12 @@
@@ -3009,12 +3038,12 @@
%endif
%doc rpm.doc/go/*
@@ -1293,7 +1310,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -3042,7 +3079,7 @@
@@ -3042,7 +3071,7 @@
%{_prefix}/lib/gcc/%{gcc_target_platform}/%{gcc_version}/libgo.so
%endif
@@ -1302,7 +1319,7 @@
%defattr(-,root,root,-)
%dir %{_prefix}/lib/gcc
%dir %{_prefix}/lib/gcc/%{gcc_target_platform}
@@ -3060,12 +3097,12 @@
@@ -3060,12 +3089,12 @@
%endif
%endif

View File

@@ -104,11 +104,6 @@ cp -P dist/common/sbin/* $RPM_BUILD_ROOT%{_sbindir}/
%pre server
/usr/sbin/groupadd scylla 2> /dev/null || :
/usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
%if 0%{?rhel}
sed -e "s/Defaults requiretty/#Defaults requiretty/" /etc/sudoers > /tmp/sudoers
cp /tmp/sudoers /etc/sudoers
rm /tmp/sudoers
%endif
%post server
# Upgrade coredump settings
@@ -214,7 +209,9 @@ This package contains Linux kernel configuration changes for the Scylla database
if Scylla is the main application on your server and you wish to optimize its latency and throughput.
%post kernel-conf
%sysctl_apply 99-scylla-sched.conf
# We cannot use the sysctl_apply rpm macro because it is not present in 7.0
# following is a "manual" expansion
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
%files kernel-conf
%defattr(-,root,root)

22
main.cc
View File

@@ -277,6 +277,7 @@ verify_seastar_io_scheduler(bool has_max_io_requests, bool developer_mode) {
}
int main(int ac, char** av) {
try {
// early check to avoid triggering
if (!cpu_sanity()) {
_exit(71);
@@ -516,6 +517,18 @@ int main(int ac, char** av) {
}
return db.load_sstables(proxy);
}).get();
// If the same sstable is shared by several shards, it cannot be
// deleted until all shards decide to compact it. So we want to
// start thse compactions now. Note we start compacting only after
// all sstables in this CF were loaded on all shards - otherwise
// we will have races between the compaction and loading processes
db.invoke_on_all([&proxy] (database& db) {
for (auto& x : db.get_column_families()) {
column_family& cf = *(x.second);
// We start the rewrite, but do not wait for it.
cf.start_rewrite();
}
}).get();
supervisor_notify("setting up system keyspace");
db::system_keyspace::setup(db, qp).get();
supervisor_notify("starting commit log");
@@ -595,10 +608,10 @@ int main(int ac, char** av) {
supervisor_notify("serving");
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
engine().at_exit([] {
return service::get_local_storage_service().drain_on_shutdown();
return repair_shutdown(service::get_local_storage_service().db());
});
engine().at_exit([] {
return repair_shutdown(service::get_local_storage_service().db());
return service::get_local_storage_service().drain_on_shutdown();
});
engine().at_exit([&db] {
return db.invoke_on_all([](auto& db) {
@@ -607,6 +620,11 @@ int main(int ac, char** av) {
});
}).or_terminate();
});
} catch (...) {
// reactor may not have been initialized, so can't use logger
fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception());
return 7; // 1 has a special meaning for upstart
}
}
namespace debug {

View File

@@ -53,6 +53,14 @@ struct reversal_traits<false> {
return c.erase_and_dispose(begin, end, std::move(disposer));
}
template<typename Container, typename Disposer>
static typename Container::iterator erase_dispose_and_update_end(Container& c,
typename Container::iterator it, Disposer&& disposer,
typename Container::iterator&)
{
return c.erase_and_dispose(it, std::forward<Disposer>(disposer));
}
template <typename Container>
static boost::iterator_range<typename Container::iterator> maybe_reverse(
Container& c, boost::iterator_range<typename Container::iterator> r)
@@ -89,6 +97,24 @@ struct reversal_traits<true> {
);
}
// Erases element pointed to by it and makes sure than iterator end is not
// invalidated.
template<typename Container, typename Disposer>
static typename Container::reverse_iterator erase_dispose_and_update_end(Container& c,
typename Container::reverse_iterator it, Disposer&& disposer,
typename Container::reverse_iterator& end)
{
auto to_erase = std::next(it).base();
bool update_end = end.base() == to_erase;
auto ret = typename Container::reverse_iterator(
c.erase_and_dispose(to_erase, std::forward<Disposer>(disposer))
);
if (update_end) {
end = ret;
}
return ret;
}
template <typename Container>
static boost::iterator_range<typename Container::reverse_iterator> maybe_reverse(
Container& c, boost::iterator_range<typename Container::iterator> r)
@@ -1120,7 +1146,7 @@ void mutation_partition::trim_rows(const schema& s,
}
if (e.empty()) {
last = reversal_traits<reversed>::erase_and_dispose(_rows, last, std::next(last, 1), deleter);
last = reversal_traits<reversed>::erase_dispose_and_update_end(_rows, last, deleter, end);
} else {
++last;
}

View File

@@ -218,3 +218,42 @@ public:
mutation_reader make_empty_reader() {
return make_mutation_reader<empty_reader>();
}
class restricting_mutation_reader : public mutation_reader::impl {
const restricted_mutation_reader_config& _config;
unsigned _weight = 0;
bool _waited = false;
mutation_reader _base;
public:
restricting_mutation_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base)
: _config(config), _weight(weight), _base(std::move(base)) {
if (_config.sem->waiters() >= _config.max_queue_length) {
_config.raise_queue_overloaded_exception();
}
}
~restricting_mutation_reader() {
if (_waited) {
_config.sem->signal(_weight);
}
}
future<mutation_opt> operator()() override {
// FIXME: we should defer freeing until the mutation is freed, perhaps,
// rather than just returned
if (_waited) {
return _base();
}
auto waited = _config.timeout.count() != 0
? _config.sem->wait(_config.timeout, _weight)
: _config.sem->wait(_weight);
return waited.then([this] {
_waited = true;
return _base();
});
}
};
mutation_reader
make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) {
return make_mutation_reader<restricting_mutation_reader>(config, weight, std::move(base));
}

View File

@@ -85,6 +85,23 @@ mutation_reader make_empty_reader();
// when creating the reader involves disk I/O or a shard call
mutation_reader make_lazy_reader(std::function<mutation_reader ()> make_reader);
struct restricted_mutation_reader_config {
semaphore* sem = nullptr;
std::chrono::nanoseconds timeout = {};
size_t max_queue_length = std::numeric_limits<size_t>::max();
std::function<void ()> raise_queue_overloaded_exception = default_raise_queue_overloaded_exception;
static void default_raise_queue_overloaded_exception() {
throw std::runtime_error("restricted mutation reader queue overload");
}
};
// Restricts a given `mutation_reader` to a concurrency limited according to settings in
// a restricted_mutation_reader_config. These settings include a semaphore for limiting the number
// of active concurrent readers, a timeout for inactive readers, and a maximum queue size for
// inactive readers.
mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base);
template <typename MutationFilter>
class filtering_reader : public mutation_reader::impl {
mutation_reader _rd;

View File

@@ -397,7 +397,7 @@ static future<> sync_range(seastar::sharded<database>& db,
return sp_in.execute().discard_result().then([&sp_out] {
return sp_out.execute().discard_result();
}).handle_exception([] (auto ep) {
logger.error("repair's stream failed: {}", ep);
logger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
});
});

View File

@@ -443,7 +443,16 @@ row_cache::make_reader(schema_ptr s,
}
row_cache::~row_cache() {
clear();
clear_now();
}
void row_cache::clear_now() noexcept {
with_allocator(_tracker.allocator(), [this] {
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
}
void row_cache::populate(const mutation& m) {
@@ -467,16 +476,8 @@ void row_cache::populate(const mutation& m) {
});
}
void row_cache::clear() {
with_allocator(_tracker.allocator(), [this] {
// We depend on clear_and_dispose() below not looking up any keys.
// Using with_linearized_managed_bytes() is no helps, because we don't
// want to propagate an exception from here.
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
future<> row_cache::clear() {
return invalidate(query::full_partition_range);
}
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
@@ -502,8 +503,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
});
if (blow_cache) {
// We failed to invalidate the key, presumably due to with_linearized_managed_bytes()
// running out of memory. Recover using clear(), which doesn't throw.
clear();
// running out of memory. Recover using clear_now(), which doesn't throw.
clear_now();
}
});
});
@@ -577,7 +578,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
});
}
void row_cache::invalidate(const dht::decorated_key& dk) {
future<> row_cache::invalidate(const dht::decorated_key& dk) {
return _populate_phaser.advance_and_await().then([this, &dk] {
_read_section(_tracker.region(), [&] {
with_allocator(_tracker.allocator(), [this, &dk] {
with_linearized_managed_bytes([&] {
@@ -585,17 +587,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) {
});
});
});
});
}
void row_cache::invalidate(const query::partition_range& range) {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate(unwrapped.first);
invalidate(unwrapped.second);
return;
}
future<> row_cache::invalidate(const query::partition_range& range) {
return _populate_phaser.advance_and_await().then([this, &range] {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate_unwrapped(unwrapped.first);
invalidate_unwrapped(unwrapped.second);
} else {
invalidate_unwrapped(range);
}
});
});
}
void row_cache::invalidate_unwrapped(const query::partition_range& range) {
logalloc::reclaim_lock _(_tracker.region());
auto cmp = cache_entry::compare(_schema);
@@ -621,7 +630,6 @@ void row_cache::invalidate(const query::partition_range& range) {
deleter(p);
});
});
});
}
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,

View File

@@ -184,13 +184,13 @@ private:
mutation_source _underlying;
key_source _underlying_keys;
// Synchronizes populating reads with update() to ensure that cache
// Synchronizes populating reads with updates of underlying data source to ensure that cache
// remains consistent across flushes with the underlying data source.
// Readers obtained from the underlying data source in earlier than
// current phases must not be used to populate the cache, unless they hold
// phaser::operation created in the reader's phase of origin. Readers
// should hold to a phase only briefly because this inhibits progress of
// update(). Phase changes occur only in update(), which can be assumed to
// updates. Phase changes occur in update()/clear(), which can be assumed to
// be asynchronous wrt invoking of the underlying data source.
utils::phased_barrier _populate_phaser;
@@ -204,6 +204,8 @@ private:
void on_miss();
void upgrade_entry(cache_entry&);
void invalidate_locked(const dht::decorated_key&);
void invalidate_unwrapped(const query::partition_range&);
void clear_now() noexcept;
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
public:
~row_cache();
@@ -228,7 +230,9 @@ public:
void populate(const mutation& m);
// Clears the cache.
void clear();
// Guarantees that cache will not be populated using readers created
// before this method was invoked.
future<> clear();
// Synchronizes cache with the underlying data source from a memtable which
// has just been flushed to the underlying data source.
@@ -240,11 +244,21 @@ public:
void touch(const dht::decorated_key&);
// Removes given partition from cache.
void invalidate(const dht::decorated_key&);
//
// Guarantees that cache will not be populated with given key
// using readers created before this method was invoked.
//
// The key must be kept alive until method resolves.
future<> invalidate(const dht::decorated_key& key);
// Removes given range of partitions from cache.
// The range can be a wrap around.
void invalidate(const query::partition_range&);
//
// Guarantees that cache will not be populated with partitions from that range
// using readers created before this method was invoked.
//
// The range must be kept alive until method resolves.
future<> invalidate(const query::partition_range&);
auto num_entries() const {
return _partitions.size();

View File

@@ -56,6 +56,14 @@ sstring to_sstring(index_type t) {
throw std::invalid_argument("unknown index type");
}
bool is_regular(column_kind k) {
return k == column_kind::regular_column || k == column_kind::compact_column;
}
bool is_compatible(column_kind k1, column_kind k2) {
return k1 == k2 || (is_regular(k1) && is_regular(k2));
}
column_mapping_entry::column_mapping_entry(bytes name, sstring type_name)
: _name(std::move(name))
, _type(db::marshal::type_parser::parse(type_name))
@@ -629,51 +637,60 @@ schema_builder& schema_builder::with_version(table_schema_version v) {
return *this;
}
schema_ptr schema_builder::build() {
if (_version) {
_raw._version = *_version;
} else {
_raw._version = utils::UUID_gen::get_time_UUID();
}
void schema_builder::prepare_dense_schema(schema::raw_schema& raw) {
if (raw._is_dense) {
auto regular_cols = boost::copy_range<std::vector<column_definition*>>(
raw._columns | boost::adaptors::filtered([](auto&& col) { return col.is_regular(); })
| boost::adaptors::transformed([](auto&& col) { return &col; }));
if (!_compact_storage) {
return make_lw_shared<schema>(schema(_raw));
}
schema s(_raw);
// Dense means that no part of the comparator stores a CQL column name. This means
// COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
s._raw._is_dense = (*_compact_storage == compact_storage::yes) && (s.clustering_key_size() > 0);
if (s.clustering_key_size() == 0) {
if (*_compact_storage == compact_storage::yes) {
s._raw._is_compound = false;
} else {
s._raw._is_compound = true;
}
} else {
if ((*_compact_storage == compact_storage::yes) && s.clustering_key_size() == 1) {
s._raw._is_compound = false;
} else {
s._raw._is_compound = true;
}
}
if (s._raw._is_dense) {
// In Origin, dense CFs always have at least one regular column
if (s.regular_columns_count() == 0) {
s._raw._columns.emplace_back(bytes(""), s.regular_column_name_type(), column_kind::regular_column, 0, index_info());
if (regular_cols.empty()) {
raw._columns.emplace_back(bytes(""), raw._regular_column_name_type, column_kind::compact_column, 0, index_info());
return;
}
if (s.regular_columns_count() != 1) {
throw exceptions::configuration_exception(sprint("Expecting exactly one regular column. Found %d", s.regular_columns_count()));
if (regular_cols.size() != 1) {
throw exceptions::configuration_exception(sprint("Expecting exactly one regular column. Found %d", regular_cols.size()));
}
s._raw._columns.at(s.column_offset(column_kind::regular_column)).kind = column_kind::compact_column;
regular_cols[0]->kind = column_kind::compact_column;
}
// We need to rebuild the schema in case we added some column. This is way simpler than trying to factor out the relevant code
// from the constructor
return make_lw_shared<schema>(schema(s._raw));
}
schema_ptr schema_builder::build() {
schema::raw_schema new_raw = _raw; // Copy so that build() remains idempotent.
if (_version) {
new_raw._version = *_version;
} else {
new_raw._version = utils::UUID_gen::get_time_UUID();
}
if (_compact_storage) {
// Dense means that no part of the comparator stores a CQL column name. This means
// COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
auto clustering_key_size = std::count_if(new_raw._columns.begin(), new_raw._columns.end(), [](auto&& col) {
return col.kind == column_kind::clustering_key;
});
new_raw._is_dense = (*_compact_storage == compact_storage::yes) && (clustering_key_size > 0);
if (clustering_key_size == 0) {
if (*_compact_storage == compact_storage::yes) {
new_raw._is_compound = false;
} else {
new_raw._is_compound = true;
}
} else {
if ((*_compact_storage == compact_storage::yes) && clustering_key_size == 1) {
new_raw._is_compound = false;
} else {
new_raw._is_compound = true;
}
}
}
prepare_dense_schema(new_raw);
return make_lw_shared<schema>(schema(new_raw));
}
schema_ptr schema_builder::build(compact_storage cp) {

View File

@@ -72,6 +72,8 @@ void read_collections(schema_builder& builder, sstring comparator);
enum class column_kind { partition_key, clustering_key, static_column, regular_column, compact_column };
sstring to_sstring(column_kind k);
bool is_regular(column_kind k);
bool is_compatible(column_kind k1, column_kind k2);
// CMH this is also manually defined in thrift gen file.
enum class index_type {
@@ -224,7 +226,7 @@ public:
index_info idx_info;
bool is_static() const { return kind == column_kind::static_column; }
bool is_regular() const { return kind == column_kind::regular_column || kind == column_kind::compact_column; }
bool is_regular() const { return ::is_regular(kind); }
bool is_partition_key() const { return kind == column_kind::partition_key; }
bool is_clustering_key() const { return kind == column_kind::clustering_key; }
bool is_primary_key() const { return kind == column_kind::partition_key || kind == column_kind::clustering_key; }

View File

@@ -220,4 +220,6 @@ public:
schema_ptr build(compact_storage cp);
schema_ptr build();
private:
void prepare_dense_schema(schema::raw_schema& raw);
};

Submodule seastar updated: 0bcdd282c5...d6ccc19f9b

View File

@@ -166,7 +166,8 @@ private:
);
auto ranges = _ranges;
return get_local_storage_proxy().query(_schema, _cmd, std::move(ranges),
auto command = ::make_lw_shared<query::read_command>(*_cmd);
return get_local_storage_proxy().query(_schema, std::move(command), std::move(ranges),
_options.get_consistency()).then(
[this, &builder, page_size, now](foreign_ptr<lw_shared_ptr<query::result>> results) {
handle_result(builder, std::move(results), page_size, now);

View File

@@ -2058,7 +2058,6 @@ public:
auto write_timeout = exec->_proxy->_db.local().get_config().write_request_timeout_in_ms() * 1000;
auto delta = __int128_t(digest_resolver->last_modified()) - __int128_t(exec->_cmd->read_timestamp);
if (std::abs(delta) <= write_timeout) {
print("HERE %d\n", int64_t(delta));
exec->_proxy->_stats.global_read_repairs_canceled_due_to_concurrent_write++;
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
auto i = boost::range::remove_if(exec->_targets, std::not1(std::cref(db::is_local)));

View File

@@ -289,6 +289,9 @@ public:
*
* Partitions for each range will be ordered according to decorated_key ordering. Results for
* each range from "partition_ranges" may appear in any order.
*
* IMPORTANT: Not all fibers started by this method have to be done by the time it returns so no
* parameter can be changed after being passed to this method.
*/
future<foreign_ptr<lw_shared_ptr<query::result>>> query(schema_ptr,
lw_shared_ptr<query::read_command> cmd,

View File

@@ -972,6 +972,28 @@ void storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* subsc
static stdx::optional<future<>> drain_in_progress;
future<> storage_service::stop_transport() {
return run_with_no_api_lock([] (storage_service& ss) {
return seastar::async([&ss] {
logger.info("Stop transport: starts");
gms::get_local_gossiper().stop_gossiping().get();
logger.info("Stop transport: stop_gossiping done");
ss.shutdown_client_servers().get();
logger.info("Stop transport: shutdown rpc and cql server done");
ss.do_stop_ms().get();
logger.info("Stop transport: shutdown messaging_service done");
auth::auth::shutdown().get();
logger.info("Stop transport: auth shutdown");
logger.info("Stop transport: done");
});
});
}
future<> storage_service::drain_on_shutdown() {
return run_with_no_api_lock([] (storage_service& ss) {
if (drain_in_progress) {
@@ -980,17 +1002,8 @@ future<> storage_service::drain_on_shutdown() {
return seastar::async([&ss] {
logger.info("Drain on shutdown: starts");
gms::get_local_gossiper().stop_gossiping().get();
logger.info("Drain on shutdown: stop_gossiping done");
ss.shutdown_client_servers().get();
logger.info("Drain on shutdown: shutdown rpc and cql server done");
ss.do_stop_ms().get();
logger.info("Drain on shutdown: shutdown messaging_service done");
auth::auth::shutdown().get();
logger.info("Drain on shutdown: auth shutdown");
ss.stop_transport().get();
logger.info("Drain on shutdown: stop_transport done");
ss.flush_column_families();
logger.info("Drain on shutdown: flush column_families done");
@@ -3007,7 +3020,7 @@ void storage_service::do_isolate_on_error(disk_error type)
if (must_isolate && !isolated.exchange(true)) {
logger.warn("Shutting down communications due to I/O errors until operator intervention");
// isolated protect us against multiple stops
service::get_storage_service().invoke_on_all([] (service::storage_service& s) { s.stop_native_transport(); });
service::get_local_storage_service().stop_transport();
}
}

View File

@@ -382,6 +382,8 @@ public:
future<> drain_on_shutdown();
future<> stop_transport();
void flush_column_families();
#if 0
/**
@@ -553,9 +555,9 @@ public:
auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc());
std::unordered_map<range<token>, std::vector<inet_address>> filtered_map;
for (auto entry : orig_map) {
filtered_map[entry.first].reserve(entry.second.size());
std::remove_copy_if(entry.second.begin(), entry.second.end(),
filtered_map[entry.first].begin(), filter);
auto& addresses = filtered_map[entry.first];
addresses.reserve(entry.second.size());
std::copy_if(entry.second.begin(), entry.second.end(), std::back_inserter(addresses), filter);
}
return filtered_map;

View File

@@ -83,13 +83,17 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_
return weight;
}
bool compaction_manager::try_to_register_weight(column_family* cf, int weight) {
bool compaction_manager::try_to_register_weight(column_family* cf, int weight, bool parallel_compaction) {
auto it = _weight_tracker.find(cf);
if (it == _weight_tracker.end()) {
_weight_tracker.insert({cf, {weight}});
return true;
}
std::unordered_set<int>& s = it->second;
// Only one weight is allowed if parallel compaction is disabled.
if (!parallel_compaction && !s.empty()) {
return false;
}
// TODO: Maybe allow only *smaller* compactions to start? That can be done
// by returning true only if weight is not in the set and is lower than any
// entry in the set.
@@ -164,8 +168,7 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
sstables::compaction_strategy cs = cf.get_compaction_strategy();
descriptor = cs.get_sstables_for_compaction(cf, std::move(candidates));
weight = trim_to_compact(&cf, descriptor);
if (!try_to_register_weight(&cf, weight)) {
// Refusing compaction job because of an ongoing compaction with same weight.
if (!try_to_register_weight(&cf, weight, cs.parallel_compaction())) {
task->stopping = true;
_stats.pending_tasks--;
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}",
@@ -248,6 +251,51 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
return task;
}
// submit_sstable_rewrite() starts a compaction task, much like submit(),
// But rather than asking a compaction policy what to compact, this function
// compacts just a single sstable, and writes one new sstable. This operation
// is useful to split an sstable containing data belonging to multiple shards
// into a separate sstable on each shard.
void compaction_manager::submit_sstable_rewrite(column_family* cf, sstables::shared_sstable sst) {
// The semaphore ensures that the sstable rewrite operations submitted by
// submit_sstable_rewrite are run in sequence, and not all of them in
// parallel. Note that unlike general compaction which currently allows
// different cfs to compact in parallel, here we don't have a semaphore
// per cf, so we only get one rewrite at a time on each shard.
static thread_local semaphore sem(1);
// We cannot, and don't need to, compact an sstable which is already
// being compacted anyway.
if (_stopped || _compacting_sstables.count(sst)) {
return;
}
// Conversely, we don't want another compaction job to compact the
// sstable we are planning to work on:
_compacting_sstables.insert(sst);
auto task = make_lw_shared<compaction_manager::task>();
_tasks.push_back(task);
_stats.active_tasks++;
task->compaction_done = with_semaphore(sem, 1, [cf, sst] {
return cf->compact_sstables(sstables::compaction_descriptor(
std::vector<sstables::shared_sstable>{sst},
sst->get_sstable_level(),
std::numeric_limits<uint64_t>::max()), false);
}).then_wrapped([this, sst, task] (future<> f) {
_compacting_sstables.erase(sst);
_stats.active_tasks--;
_tasks.remove(task);
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stop_exception& e) {
cmlog.info("compaction info: {}", e.what());
_stats.errors++;
} catch (...) {
cmlog.error("compaction failed: {}", std::current_exception());
_stats.errors++;
}
});
}
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
task->stopping = true;
return task->compaction_gate.close().then([task] {

View File

@@ -81,9 +81,9 @@ private:
// It will not accept new requests in case the manager was stopped.
bool can_submit();
// If weight is not taken for the column family, weight is registered and
// true is returned. Return false otherwise.
bool try_to_register_weight(column_family* cf, int weight);
// Return true if weight is not registered. If parallel_compaction is not
// true, only one weight is allowed to be registered.
bool try_to_register_weight(column_family* cf, int weight, bool parallel_compaction);
// Deregister weight for a column family.
void deregister_weight(column_family* cf, int weight);
@@ -109,6 +109,13 @@ public:
// Submit a column family to be cleaned up and wait for its termination.
future<> perform_cleanup(column_family* cf);
// Submit a specific sstable to be rewritten, while dropping data which
// does not belong to this shard. Meant to be used on startup when an
// sstable is shared by multiple shards, and we want to split it to a
// separate sstable for each shard.
void submit_sstable_rewrite(column_family* cf,
sstables::shared_sstable s);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);

View File

@@ -56,6 +56,9 @@ public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
return true;
}
};
//
@@ -402,6 +405,10 @@ public:
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
virtual bool parallel_compaction() const override {
return false;
}
virtual compaction_strategy_type type() const {
return compaction_strategy_type::leveled;
}
@@ -439,6 +446,9 @@ compaction_strategy_type compaction_strategy::type() const {
compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) {
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
}
bool compaction_strategy::parallel_compaction() const {
return _compaction_strategy_impl->parallel_compaction();
}
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
::shared_ptr<compaction_strategy_impl> impl;

View File

@@ -175,10 +175,8 @@ public:
if (previous != nullptr && current_first.tri_compare(s, previous->get_last_decorated_key(s)) <= 0) {
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 " \
"or due to the fact that you have dropped sstables from another node into the data directory. " \
"Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also " \
"have rows out-of-order within an sstable",
logger.warn("At level {}, {} [{}, {}] overlaps {} [{}, {}]. This could be caused by the fact that you have dropped " \
"sstables from another node into the data directory. Sending back to L0.",
level, previous->get_filename(), previous->get_first_partition_key(s), previous->get_last_partition_key(s),
current->get_filename(), current->get_first_partition_key(s), current->get_last_partition_key(s));

View File

@@ -228,7 +228,7 @@ future<> stream_session::on_initialization_complete() {
}
_stream_result->handle_session_prepared(this->shared_from_this());
} catch (...) {
sslog.error("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
throw;
}
return make_ready_future<>();
@@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() {
return ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
}).handle_exception([id, plan_id] (auto ep) {
sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
std::rethrow_exception(ep);
});
}).then([this] {
@@ -248,7 +248,7 @@ future<> stream_session::on_initialization_complete() {
}
void stream_session::on_error() {
sslog.error("[Stream #{}] Streaming error occurred", plan_id());
sslog.warn("[Stream #{}] Streaming error occurred", plan_id());
// fail session
close_session(stream_session_state::FAILED);
}
@@ -270,7 +270,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
db.find_column_family(ks, cf);
} catch (no_such_column_family) {
auto err = sprint("[Stream #{}] prepare requested ks={} cf={} does not exist", ks, cf);
sslog.error(err.c_str());
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
}
@@ -284,7 +284,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
db.find_column_family(cf_id);
} catch (no_such_column_family) {
auto err = sprint("[Stream #{}] prepare cf_id=%s does not exist", plan_id, cf_id);
sslog.error(err.c_str());
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
prepare_receiving(summary);

View File

@@ -85,41 +85,41 @@ struct send_info {
};
future<stop_iteration> do_send_mutations(auto si, auto fm) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
return stop_iteration::no;
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
// There might be larger number of STREAM_MUTATION inflight.
// Log one error per column_family per range
if (!si->error_logged) {
si->error_logged = true;
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
});
return make_ready_future<stop_iteration>(stop_iteration::no);
}
future<> send_mutations(auto si) {
auto& cf = si->db.find_column_family(si->cf_id);
auto& priority = service::get_local_streaming_read_priority();
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
return repeat([si, &reader] () {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return repeat([si, &reader] {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
});
}).then([si] {
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
auto cf_id = this->cf_id;
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
auto cf_id = this->cf_id;
@@ -153,7 +153,7 @@ void stream_transfer_task::start() {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,
cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) {
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
std::rethrow_exception(ep);
});
}).then([this, id, plan_id, cf_id] {
@@ -161,7 +161,7 @@ void stream_transfer_task::start() {
session->start_keep_alive_timer();
session->transfer_task_completed(cf_id);
}).handle_exception([this, plan_id, id] (auto ep){
sslog.error("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
this->session->on_error();
});
}

View File

@@ -61,7 +61,7 @@ int main(int argc, char** argv) {
logalloc::region r;
with_allocator(r.allocator(), [&] {
std::deque<managed_bytes> refs;
chunked_fifo<managed_bytes> refs;
r.make_evictable([&] {
return with_allocator(r.allocator(), [&] {

View File

@@ -1317,3 +1317,54 @@ SEASTAR_TEST_CASE(test_tombstone_purge) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_trim_rows) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build();
auto pk = partition_key::from_exploded(*s, { int32_type->decompose(0) });
mutation m(pk, s);
constexpr auto row_count = 8;
for (auto i = 0; i < row_count; i++) {
m.set_clustered_cell(clustering_key_prefix::from_single_value(*s, int32_type->decompose(i)),
to_bytes("v"), data_value(i), api::new_timestamp() - 5);
}
m.partition().apply(tombstone(api::new_timestamp(), gc_clock::now()));
auto now = gc_clock::now() + gc_clock::duration(std::chrono::hours(1));
auto compact_and_expect_empty = [&] (mutation m, std::vector<query::clustering_range> ranges) {
mutation m2 = m;
m.partition().compact_for_query(*s, now, ranges, false, query::max_rows);
BOOST_REQUIRE(m.partition().clustered_rows().empty());
std::reverse(ranges.begin(), ranges.end());
m2.partition().compact_for_query(*s, now, ranges, true, query::max_rows);
BOOST_REQUIRE(m2.partition().clustered_rows().empty());
};
std::vector<query::clustering_range> ranges = {
query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)))
};
compact_and_expect_empty(m, ranges);
ranges = {
query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(50)))
};
compact_and_expect_empty(m, ranges);
ranges = {
query::clustering_range::make_ending_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)))
};
compact_and_expect_empty(m, ranges);
ranges = {
query::clustering_range::make_open_ended_both_sides()
};
compact_and_expect_empty(m, ranges);
});
}

View File

@@ -546,27 +546,33 @@ static std::vector<mutation> updated_ring(std::vector<mutation>& mutations) {
return result;
}
static mutation_source make_mutation_source(std::vector<lw_shared_ptr<memtable>>& memtables) {
return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
}
static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtable>>& memtables) {
return key_source([s, &memtables] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
}
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
auto memtables_key_source = key_source([&] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
throttled_mutation_source cache_source(memtables_data_source);
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, memtables_key_source, tracker);
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
@@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
auto some_element = keys_in_cache.begin() + 547;
std::vector<dht::decorated_key> keys_not_in_cache;
keys_not_in_cache.push_back(*some_element);
cache.invalidate(*some_element);
cache.invalidate(*some_element).get();
keys_in_cache.erase(some_element);
for (auto&& key : keys_in_cache) {
@@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
{ *some_range_begin, true }, { *some_range_end, false }
);
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
cache.invalidate(range);
cache.invalidate(range).get();
keys_in_cache.erase(some_range_begin, some_range_end);
for (auto&& key : keys_in_cache) {
@@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) {
});
}
SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
auto ring = make_ring(s, 3);
for (auto&& m : ring) {
mt1->apply(m);
}
auto mt2 = make_lw_shared<memtable>(s);
auto ring2 = updated_ring(ring);
for (auto&& m : ring2) {
mt2->apply(m);
}
cache_source.block();
auto rd1 = cache.make_reader(s);
auto rd1_result = rd1();
sleep(10ms).get();
memtables.clear();
memtables.push_back(mt2);
// This update should miss on all partitions
auto cache_cleared = cache.clear();
auto rd2 = cache.make_reader(s);
// rd1, which is in progress, should not prevent forward progress of clear()
cache_source.unblock();
cache_cleared.get();
// Reads started before memtable flush should return previous value, otherwise this test
// doesn't trigger the conditions it is supposed to protect against.
assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]);
assert_that(rd1().get0()).has_no_mutation();
// Reads started after clear but before previous populations completed
// should already see the new data
assert_that(std::move(rd2))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
// Reads started after clear should see new data
assert_that(cache.make_reader(s))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
return seastar::async([] {
auto s = make_schema();
@@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
}
// wrap-around
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()}));
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());
@@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
verify_does_not_have(cache, ring[7].decorated_key());
// not wrap-around
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()}));
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());

View File

@@ -195,6 +195,7 @@ public:
void write_long(int64_t n);
void write_short(uint16_t n);
void write_string(const sstring& s);
void write_bytes_as_string(bytes_view s);
void write_long_string(const sstring& s);
void write_uuid(utils::UUID uuid);
void write_string_list(std::vector<sstring> string_list);
@@ -1441,6 +1442,12 @@ void cql_server::response::write_string(const sstring& s)
_body.insert(_body.end(), s.begin(), s.end());
}
void cql_server::response::write_bytes_as_string(bytes_view s)
{
write_short(cast_if_fits<uint16_t>(s.size()));
_body.insert(_body.end(), s.begin(), s.end());
}
void cql_server::response::write_long_string(const sstring& s)
{
write_int(cast_if_fits<int32_t>(s.size()));
@@ -1587,6 +1594,18 @@ public:
if (type->is_reversed()) {
fail(unimplemented::cause::REVERSED);
}
if (type->is_user_type()) {
r.write_short(uint16_t(type_id::UDT));
auto udt = static_pointer_cast<const user_type_impl>(type);
r.write_string(udt->_keyspace);
r.write_bytes_as_string(udt->_name);
r.write_short(udt->size());
for (auto&& i : boost::irange<size_t>(0, udt->size())) {
r.write_bytes_as_string(udt->field_name(i));
encode(r, udt->field_type(i));
}
return;
}
if (type->is_tuple()) {
r.write_short(uint16_t(type_id::TUPLE));
auto ttype = static_pointer_cast<const tuple_type_impl>(type);

View File

@@ -2704,7 +2704,7 @@ update_types(const std::vector<data_type> types, const user_type updated) {
if (!new_types) {
new_types = types;
}
new_types->emplace(new_types->begin() + i, std::move(*ut));
(*new_types)[i] = std::move(*ut);
}
}
return new_types;

View File

@@ -55,7 +55,7 @@ static thread_local auto reusable_indexes = std::vector<long>();
void bloom_filter::set_indexes(int64_t base, int64_t inc, int count, long max, std::vector<long>& results) {
for (int i = 0; i < count; i++) {
results[i] = abs(base % max);
results[i] = std::abs(base % max);
base = static_cast<int64_t>(static_cast<uint64_t>(base) + static_cast<uint64_t>(inc));
}
}

View File

@@ -49,3 +49,17 @@ bool is_system_error_errno(int err_no)
code.category() == std::system_category();
});
}
bool should_stop_on_system_error(const std::system_error& e) {
if (e.code().category() == std::system_category()) {
// Whitelist of errors that don't require us to stop the server:
switch (e.code().value()) {
case EEXIST:
case ENOENT:
return false;
default:
break;
}
}
return true;
}

View File

@@ -196,7 +196,7 @@ inline ihistogram operator +(ihistogram a, const ihistogram& b) {
struct rate_moving_average {
uint64_t count = 0;
double rates[3] = {0};
double mean_rate;
double mean_rate = 0;
rate_moving_average& operator +=(const rate_moving_average& o) {
count += o.count;
mean_rate += o.mean_rate;