Compare commits

...

46 Commits

Author SHA1 Message Date
Asias He
38470ca6e8 main: Defer initalization of streaming
Streaming is used by bootstrap and repair. Streaming uses storage_proxy
class to apply the frozen_mutation and db/column_family class to
invalidate row cache. Defer the initalization just before repair and
bootstrap init.
Message-Id: <8e99cf443239dd8e17e6b6284dab171f7a12365c.1458034320.git.asias@scylladb.com>

(cherry picked from commit d79dbfd4e8)
2016-03-15 11:59:16 +02:00
Pekka Enberg
5bb25954b4 main: Defer REPAIR_CHECKSUM_RANGE RPC verb registration after commitlog replay
Register the REPAIR_CHECKSUM_RANGE messaging service verb handler after
we have replayed the commitlog to avoid responding with bogus checksums.
Message-Id: <1458027934-8546-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit eb13f65949)
2016-03-15 11:59:10 +02:00
Gleb Natapov
1d9ca3ef1f main: Defer storage proxy RPC verb registration after commitlog replay
Message-Id: <20160315071229.GM6117@scylladb.com>
(cherry picked from commit 5076f4878b)
2016-03-15 09:41:21 +02:00
Gleb Natapov
cb97e5dfe8 messaging: enable keepalive tcp option for inter-node communication
Some network equipment that does TCP session tracking tend to drop TCP
sessions after a period of inactivity. Use keepalive mechanism to
prevent this from happening for our inter-node communication.

Message-Id: <20160314173344.GI31837@scylladb.com>
(cherry picked from commit e228ef1bd9)
2016-03-14 20:33:12 +02:00
Pekka Enberg
831b5af999 Merge scylla-seastar branch-0.18
* seastar 60643a0...e039c46 (2):
  > rpc: allow configuring keepalive for rpc client
  > net: add keepalive configuration to socket interface
2016-03-14 20:32:52 +02:00
Pekka Enberg
7f1048efb4 main: Defer migration manager RPC verb registration after commitlog replay
Defer registering migration manager RPC verbs after commitlog has has
been replayed so that our own schema is fully loaded before other other
nodes start querying it or sending schema updates.
Message-Id: <1457971028-7325-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 1429213b4c)
2016-03-14 20:11:04 +02:00
Glauber Costa
510b1a3afc main: when scanning SSTables, run shard 0 first
Deletion of previous stale, temporary SSTables is done by Shard0. Therefore,
let's run Shard0 first. Technically, we could just have all shards agree on the
deletion and just delete it later, but that is prone to races.

Those races are not supposed to happen during normal operation, but if we have
bugs, they can. Scylla's Github Issue #1014 is an example of a situation where
that can happen, making existing problems worse. So running a single shard
first and getting making sure that all temporary tables are deleted provides
extra protection against such situations.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 6c4e31bbdb)
2016-03-14 20:10:56 +02:00
Gleb Natapov
dd831f6463 make initialization run in a thread
While looking at initialization code I felt like my head is going to
explode. Moving initialization into a thread makes things a little bit
better. Only lightly tested.

Message-Id: <20160310163142.GE28529@scylladb.com>
(cherry picked from commit 16135c2084)
2016-03-14 20:10:48 +02:00
Gleb Natapov
8bf59afb42 fix developer-mode parameter application on SMP
I am almost sure we want to apply it once on each shard, and not multiple
times on a single shard.

Message-Id: <20160310155804.GB28529@scylladb.com>
(cherry picked from commit 176aa25d35)
2016-03-14 20:10:37 +02:00
Avi Kivity
f29bc8918b main: sanity check cpu support
We require SSE 4.2 (for commitlog CRC32), verify it exists early and bail
out if it does not.

We need to check early, because the compiler may use newer instructions
in the generated code; the earlier we check, the lower the probability
we hit an undefined opcode exception.

Message-Id: <1456665401-18252-1-git-send-email-avi@scylladb.com>
(cherry picked from commit a1ff21f6ea)
2016-03-14 20:10:29 +02:00
Takuya ASADA
4c6d655e99 main: notify service start completion ealier, to reduce systemd unit startup time
Fixes #910

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1455830245-11782-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 0f87922aa6)
2016-03-14 20:10:19 +02:00
Nadav Har'El
fafe166d2c repair: stop ongoing repairs during shutdown
When shutting down a node gracefully, this patch asks all ongoing repairs
started on this node to stop as soon as possible (without completing
their work), and then waits for these repairs to finish (with failure,
usually, because they didn't complete).

We need to do this, because if the repair loop continues to run while we
start destructing the various services it relies on, it can crash (as
reported in #699, although the specific crash reported there no longer
occurs after some changes in the streaming code). Additionally, it is
important that to stop the ongoing repair, and not wait for it to complete
its normal operation, because that can take a very long time, and shutdown
is supposed to not take more than a few seconds.

Fixes #699.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1455218873-6201-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit 7dc843fc1c)
2016-03-14 20:10:13 +02:00
Avi Kivity
3380340750 Merge scylla-seastar branch-0.18
* seastar 353b1a1...60643a0 (2):
  > rpc: make client connection error more clear
  > reactor: fix work item leak in syscall work queue
2016-03-14 20:04:03 +02:00
Avi Kivity
4d3dac7f98 gitmodules: point seastar submodule at scylla-seastar repository
Prepare for branch-0.18 specific seastar commits.
2016-03-14 20:02:46 +02:00
Pekka Enberg
7f6891341e release: prepare for 0.18.2 2016-03-14 16:02:25 +02:00
Glauber Costa
ece77cce90 database: turn sstable generation number into an optional
This patch makes sure that every time we need to create a new generation number -
the very first step in the creation of a new SSTable, the respective CF is already
initialized and populated. Failure to do so can lead to data being overwritten.
Extensive details about why this is important can be found
in Scylla's Github Issue #1014

Nothing should be writing to SSTables before we have the chance to populate the
existing SSTables and calculate what should the next generation number be.

However, if that happens, we want to protect against it in a way that does not
involve overwriting existing tables. This is one of the ways to do it: every
column family starts in an unwriteable state, and when it can finally be written
to, we mark it as writeable.

Note that this *cannot* be a part of add_column_family. That adds a column family
to a db in memory only, and if anybody is about to write to a CF, that was most
likely already called. We need to call this explicitly when we are sure we're ready
to issue disk operations safely.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit a339296385)
2016-03-14 15:52:52 +02:00
Glauber Costa
d4a10a0a3c database: remove unused parameter
We are no longer using the in_flight_seals gate, but forgot to remove it.
To guarantee that all seal operations will have finished when we're done,
we are using the memtable_flush_queue, which also guarantees order. But
that gate was never removed.

The FIXME code should also be removed, since such interface does exist now.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 8eb4e69053)
2016-03-14 15:51:14 +02:00
Glauber Costa
e885eacbe4 column_family: do not open code generation calculation
We already have a function that wraps this, re-use it.  This FIXME is still
relevant, so just move it there. Let's not lose it.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 94e90d4a17)
2016-03-14 15:51:06 +02:00
Glauber Costa
3f67277804 colum_family: remove mutation_count
We use memory usage as a threshold these days, and nowhere is _mutation_count
checked. Get rid of it.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 46fdeec60a)
2016-03-14 15:50:57 +02:00
Asias He
05aea2b65a storage_service: Fix pending_range_calculator_service
Since calculate_pending_ranges will modify token_metadata, we need to
replicate to other shards. With this patch, when we call
calculate_pending_ranges, token_metadata will be replciated to other
non-zero shards.

In addition, it is not useful as a standalone class. We can merge it
into the storage_service. Kill one singleton class.

Fixes #1033
Refs #962
Message-Id: <fb5b26311cafa4d315eb9e72d823c5ade2ab4bda.1457943074.git.asias@scylladb.com>

(cherry picked from commit 9f64c36a08)
2016-03-14 14:39:39 +02:00
Vlad Zolotarov
a2751a9592 sstables: properly account removal requests
The same shard may create an sstables::sstable object for the same SStable
that doesn't belong to it more than once and mark it
for deletion (e.g. in a 'nodetool refresh' flow).

In that case the destructor of sstables::sstable accounted
the deletion requests from the same shard more than once since it was a simple
counter incremented each time there was a deletion request while it should
account request from the same shard as a single request. This is because
the removal logic waited for all shards to agree on a removal of a specific
SStable by comparing the counter mentioned above to the total
number of shards and once they were equal the SStable files were actually removed.

This patch fixes this by replacing the counter by an std::unordered_set<unsigned>
that will store a shard ids of the shards requesting the deletion
of the sstable object and will compare the size() of this set
to smp::count in order to decide whether to actually delete the corresponding
SStable files.

Fixes #1004

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1457886812-32345-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit ce47fcb1ba)
2016-03-14 14:38:17 +02:00
Raphael S. Carvalho
eda8732b8e sstables: make write_simple() safer by using exclusive flag
We should guarantee that write_simple() will not try to overwrite
an existing file.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <194bd055f1f2dc1bb9766a67225ec38c88e7b005.1457818073.git.raphaelsc@scylladb.com>
(cherry picked from commit 1ff7d32272)
2016-03-14 14:38:07 +02:00
Raphael S. Carvalho
b24f5ece1f sstables: fix race condition when writing to the same sstable in parallel
When we are about to write a new sstable, we check if the sstable exists
by checking if respective TOC exists. That check was added to handle a
possible attempt to write a new sstable with a generation being used.
Gleb was worried that a TOC could appear after the check, and that's indeed
possible if there is an ongoing sstable write that uses the same generation
(running in parallel).
If TOC appear after the check, we would again crap an existing sstable with
a temporary, and user wouldn't be to boot scylla anymore without manual
intervention.

Then Nadav proposed the following solution:
"We could do this by the following variant of Raphael's idea:

   1. create .txt.tmp unconditionally, as before the commit 031bf57c1
(if we can't create it, fail).
   2. Now confirm that .txt does not exist. If it does, delete the .txt.tmp
we just created and fail.
   3. continue as usual
   4. and at the end, as before, rename .txt.tmp to .txt.

The key to solving the race is step 1: Since we created .txt.tmp in step 1
and know this creation succeeded, we know that we cannot be running in
parallel with another writer - because such a writer too would have tried to
create the same file, and kept it existing until the very last step of its
work (step 4)."

This patch implements the solution described above.
Let me also say that the race is theoretical and scylla wasn't affected by
it so far.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <ef630f5ac1bd0d11632c343d9f77a5f6810d18c1.1457818331.git.raphaelsc@scylladb.com>
(cherry picked from commit 0af786f3ea)
2016-03-14 14:37:58 +02:00
Raphael S. Carvalho
1322ec6d6b sstables: bail out if toc exists for generation used by write_components
Currently, if sstable::write_components() is called to write a new sstable
using the same generation of a sstable that exists, a temporary TOC will
be unconditionally created. Afterwards, the same sstable::write_components()
will fail when it reaches sstable::create_data(). The reason is obvious
because data component exists for that generation (in this scenario).
After that, user will not be able to boot scylla anymore because there is
a generation with both a TOC and a temporary TOC. We cannot simply remove a
generation with TOC and temporary TOC because user data will be lost (again,
in this scenario). After all, the temporary TOC was only created because
sstable::write_components() was wrongly called with the generation of a
sstable that exists.

Solution proposed by this patch is to trigger exception if a TOC file
exists for the generation used.

Some SSTable unit tests were also changed to guarantee that we don't try
to overwrite components of an existing sstable.

Refs #1014.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <caffc4e19cdcf25e4c6b9dd277d115422f8246c4.1457643565.git.raphaelsc@scylladb.com>
(cherry picked from commit 031bf57c19)
2016-03-14 14:37:50 +02:00
Glauber Costa
efbf51c00b sstables: improve error messages
The standard C++ exception messages that will be thrown if there is anything
wrong writing the file, are suboptimal: they barely tell us the name of the failing
file.

Use a specialized create function so that we can capture that better.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit f2a8bcabc2)
2016-03-14 14:37:41 +02:00
Pekka Enberg
5d901b19c4 main: Initialize system keyspace earlier
We start services like gossiper before system keyspace is initialized
which means we can start writing too early. Shuffle code so that system
keyspace is initialized earlier.

Refs #1014
Message-Id: <1457593758-9444-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 5dd1fda6cf)
2016-03-14 13:47:18 +02:00
Tomasz Grabiec
7085fc95d1 log: Fix operator<<(std::ostream&, const std::exception_ptr&)
Attempt to print std::nested_exception currently results in exception
to leak outside the printer. Fix by capturing all exception in the
final catch block.

For nested exception, the logger will print now just
"std::nested_exception".  For nested exceptions specifically we should
log more, but that is a separate problem to solve.
Message-Id: <1457532215-7498-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 838a038cbd)
2016-03-09 16:11:14 +02:00
Pekka Enberg
776908fbf6 types: Implement to_string for timestamps and dates
The to_string() function is used for logging purpose so use boost
to_iso_extended_string() to format both timestamps and dates.

Fixes #968 (showstopper)
Message-Id: <1457528755-6164-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit ab502bcfa8)
2016-03-09 16:10:02 +02:00
Gleb Natapov
5f7f276ef6 fix EACH_QUORUM handling during bootstrapping
Currently write acknowledgements handling does not take bootstrapping
node into account for CL=EACH_QUORUM. The patch fixes it.

Fixes #994

Message-Id: <20160307121620.GR2253@scylladb.com>
(cherry picked from commit 626c9d046b)
2016-03-08 13:35:10 +02:00
Paweł Dziepak
5a38f3cbfd lsa: set _active to nullptr in region destructor
In region destructor, after active segments is freed pointer to it is
left unchanged. This confuses the remaining parts of the destructor
logic (namely, removal from region group) which may rely on the
information in region_impl::_active.

In this particular case the problem was that code removing from the
region group called region_impl::occupancy() which was
dereferencing _active if not null.

Fixes #993.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1457341670-18266-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 99b61d3944)
2016-03-08 13:32:30 +02:00
Tomasz Grabiec
2d4309a926 validation: Fix validation of empty partition key
The validation was wrongly assuming that empty thrift key, for which
the original C* code guards against, can only correspond to empty
representation of our partition_key. This no longer holds after:

   commit 095efd01d6
   "keys: Make from_exploded() and components() work without schema"

This was responsible for dtest failure:
cql_additional_tests.TestCQL:column_name_validation_test

(cherry picked from commit 100b540a53)
2016-03-08 11:42:14 +02:00
Tomasz Grabiec
988d6cd153 cql3: Fix handling of lists with static columns
List operations and prefetching were not handling static columns
correctly. One issue was that prefetching was attaching static column
data to row data using ids which might overlap with clustered columns.

Another problem was that list operations were always constructing
clustering key even if they worked on a static column. For static
columns the key would be always empty and lookup would fail.

The effect was that list operations which depend on curent state had
no effect. Similar problem could be observed on C* 2.1.9, but not on 2.2.3.

Fixes #903.

(cherry picked from commit 383296c05b)
2016-03-06 11:06:03 +02:00
Pekka Enberg
bf71575fd7 release: prepare for 0.18.1 2016-03-05 08:53:07 +02:00
Gleb Natapov
cd75075214 storage_proxy: fix race between read cl completion and timeout in digest resolver
If timeout happens after cl promise is fulfilled, but before
continuation runs it removes all the data that cl continuation needs
to calculate result. Fix this by calculating result immediately and
returning it in cl promise instead of delaying this work until
continuation runs. This has a nice side effect of simplifying digest
mismatch handling and making it exception free.

Fixes #977.

Message-Id: <1457015870-2106-3-git-send-email-gleb@scylladb.com>
(cherry picked from commit b89b6f442b)
2016-03-03 17:10:38 +02:00
Gleb Natapov
e85f11566b storage_proxy: store only one data reply in digest resolver.
Read executor may ask for more than one data reply during digest
resolving stage, but only one result is actually needed to satisfy
a query, so no need to store all of them.

Message-Id: <1457015870-2106-2-git-send-email-gleb@scylladb.com>
(cherry picked from commit e4ac5157bc)
2016-03-03 17:10:32 +02:00
Gleb Natapov
8f682f018e storage_proxy: fix cl achieved condition in digest resolver timeout handler
In digest resolver for cl to be achieved it is not enough to get correct
number of replies, but also to have data reply among them. The condition
in digest timeout does not check that, fortunately we have a variable
that we set to true when cl is achieved, so use it instead.

Message-Id: <1457015870-2106-1-git-send-email-gleb@scylladb.com>
(cherry picked from commit 69b61b81ce)
2016-03-03 17:10:26 +02:00
Tomasz Grabiec
dba2b617e7 db: Fix error handling in populate_keyspace()
When find_uuid() fails Scylla would terminate with:

  Exiting on unhandled exception of type 'std::out_of_range': _Map_base::at

But we are supposed to ignore directories for unknown column
families. The try {} catch block is doing just that when
no_such_column_family is thrown from the find_column_family() call
which follows find_uuid(). Fix by converting std::out_of_range to
no_such_column_family.

Message-Id: <1456056280-3933-1-git-send-email-tgrabiec@scylladb.com>
2016-03-03 11:37:26 +02:00
Paweł Dziepak
f4e11007cf Revert "do not use boost::multiprecision::msb()"
This reverts commit dadd097f9c.

That commit caused serialized forms of varint and decimal to have some
excess leading zeros. They didn't affect deserialization in any way but
caused computed tokens to differ from the Cassandra ones.

Fixes #898.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1455537278-20106-1-git-send-email-pdziepak@scylladb.com>
2016-03-03 10:54:19 +02:00
Asias He
fdfa1df395 locator: Fix get token from a range<token>
With a range{t1, t2}, if t2 == {}, the range.end() will contain no
value. Fix getting t2 in this case.

Fixes #911.
Message-Id: <4462e499d706d275c03b116c4645e8aaee7821e1.1456128310.git.asias@scylladb.com>
2016-03-03 10:53:21 +02:00
Tomasz Grabiec
116055cc6f bytes_ostream: Avoid recursion when freeing chunks
When there is a lot of chunks we may get stack overflow.

This seems to fix issue #906, a memory corruption during schema
merge. I suspect that what causes corruption there is overflowing of
the stack allocated for the seastar thread. Those stacks don't have
red zones which would catch overflow.

Message-Id: <1456056288-3983-1-git-send-email-tgrabiec@scylladb.com>
2016-03-03 10:53:01 +02:00
Calle Wilund
04c19344de database: Fix use and assumptions about pending compations
Fixes #934 - faulty assert in discard_sstables

run_with_compaction_disabled clears out a CF from compaction
mananger queue. discard_sstables wants to assert on this, but looks
at the wrong counters.

pending_compactions is an indicator on how much interested parties
want a CF compacted (again and again). It should not be considered
an indicator of compactions actually being done.

This modifies the usage slightly so that:
1.) The counter is always incremented, even if compaction is disallowed.
    The counters value on end of run_with_compaction_disabled is then
    instead used as an indicator as to whether a compaction should be
    re-triggered. (If compactions finished, it will be zero)
2.) Document the use and purpose of the pending counter, and add
    method to re-add CF to compaction for r_w_c_d above.
3.) discard_sstables now asserts on the right things.

Message-Id: <1456332824-23349-1-git-send-email-calle@scylladb.com>
2016-03-03 10:51:27 +02:00
Raphael S. Carvalho
df19e546f9 tests: sstable_test: submit compaction request through column family
That's needed for reverted commit 9586793c to work. It's also the
correct thing to do, i.e. column family submits itself to manager.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <2a1d141ad929c1957933f57412083dd52af0390b.1456415398.git.raphaelsc@scylladb.com>
2016-03-03 10:51:23 +02:00
Takuya ASADA
b532919c55 dist: add posix_net_conf.sh on Ubuntu package
Fixes #881

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1455522990-32044-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit fb3f4cc148)
2016-02-15 17:03:10 +02:00
Takuya ASADA
6ae6dcc2fc dist: switch AMI base image to 'CentOS7-Base2', uses CentOS official kernel
On previous CentOS base image, it accsidently uses non-standard kernel from elrepo.
This replaces base image to new one, contains CentOS default kernel.

Fixes #890

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1455398903-2865-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 3697cee76d)
2016-02-15 15:59:04 +02:00
Tomasz Grabiec
5716140a14 abstract_replication_strategy: Fix generation of token ranges
We can't move-from in the loop because the subject will be empty in
all but the first iteration.

Fixes crash during node stratup:

  "Exiting on unhandled exception of type 'runtime_exception': runtime error: Invalid token. Should have size 8, has size 0"

Fixes update_cluster_layout_tests.py:TestUpdateClusterLayout.simple_add_node_1_test (and probably others)

Signed-off-by: Tomasz Grabiec <tgrabiec@scylladb.com>
(cherry picked from commit efdbc3d6d7)
2016-02-14 14:39:31 +02:00
Avi Kivity
91cb9bae2e release: prepare for 0.18 2016-02-11 17:55:20 +02:00
43 changed files with 697 additions and 607 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=0.18.2
if test -f version
then

View File

@@ -42,6 +42,14 @@ private:
struct chunk {
// FIXME: group fragment pointers to reduce pointer chasing when packetizing
std::unique_ptr<chunk> next;
~chunk() {
auto p = std::move(next);
while (p) {
// Avoid recursion when freeing chunks
auto p_next = std::move(p->next);
p = std::move(p_next);
}
}
size_type offset; // Also means "size" after chunk is closed
size_type size;
value_type data[0];

View File

@@ -416,7 +416,6 @@ scylla_core = (['database.cc',
'service/client_state.cc',
'service/migration_task.cc',
'service/storage_service.cc',
'service/pending_range_calculator_service.cc',
'service/load_broadcaster.cc',
'service/pager/paging_state.cc',
'service/pager/query_pagers.cc',

View File

@@ -259,7 +259,10 @@ lists::setter_by_index::execute(mutation& m, const exploded_clustering_prefix& p
// we should not get here for frozen lists
assert(column.type->is_multi_cell()); // "Attempted to set an individual element on a frozen list";
auto row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
std::experimental::optional<clustering_key> row_key;
if (!column.is_static()) {
row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
}
auto index = _idx->bind_and_get(params._options);
auto value = _t->bind_and_get(params._options);
@@ -269,8 +272,7 @@ lists::setter_by_index::execute(mutation& m, const exploded_clustering_prefix& p
}
auto idx = net::ntoh(int32_t(*unaligned_cast<int32_t>(index->begin())));
auto existing_list_opt = params.get_prefetched_list(m.key(), row_key, column);
auto&& existing_list_opt = params.get_prefetched_list(m.key(), std::move(row_key), column);
if (!existing_list_opt) {
throw exceptions::invalid_request_exception("Attempted to set an element on a list which is null");
}
@@ -383,8 +385,13 @@ lists::discarder::requires_read() {
void
lists::discarder::execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) {
assert(column.type->is_multi_cell()); // "Attempted to delete from a frozen list";
auto&& row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
auto&& existing_list = params.get_prefetched_list(m.key(), row_key, column);
std::experimental::optional<clustering_key> row_key;
if (!column.is_static()) {
row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
}
auto&& existing_list = params.get_prefetched_list(m.key(), std::move(row_key), column);
// We want to call bind before possibly returning to reject queries where the value provided is not a list.
auto&& value = _t->bind(params._options);
@@ -444,8 +451,11 @@ lists::discarder_by_index::execute(mutation& m, const exploded_clustering_prefix
auto cvalue = dynamic_pointer_cast<constants::value>(index);
assert(cvalue);
auto row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
auto&& existing_list = params.get_prefetched_list(m.key(), row_key, column);
std::experimental::optional<clustering_key> row_key;
if (!column.is_static()) {
row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
}
auto&& existing_list = params.get_prefetched_list(m.key(), std::move(row_key), column);
int32_t idx = read_simple_exactly<int32_t>(*cvalue->_bytes);
if (!existing_list) {
throw exceptions::invalid_request_exception("Attempted to delete an element from a list which is null");

View File

@@ -186,11 +186,23 @@ modification_statement::make_update_parameters(
class prefetch_data_builder {
update_parameters::prefetch_data& _data;
const query::partition_slice& _ps;
schema_ptr _schema;
std::experimental::optional<partition_key> _pkey;
private:
void add_cell(update_parameters::prefetch_data::row& cells, const column_definition& def, const std::experimental::optional<collection_mutation_view>& cell) {
if (cell) {
auto ctype = static_pointer_cast<const collection_type_impl>(def.type);
if (!ctype->is_multi_cell()) {
throw std::logic_error(sprint("cannot prefetch frozen collection: %s", def.name_as_text()));
}
cells.emplace(def.id, collection_mutation{*cell});
}
};
public:
prefetch_data_builder(update_parameters::prefetch_data& data, const query::partition_slice& ps)
prefetch_data_builder(schema_ptr s, update_parameters::prefetch_data& data, const query::partition_slice& ps)
: _data(data)
, _ps(ps)
, _schema(std::move(s))
{ }
void accept_new_partition(const partition_key& key, uint32_t row_count) {
@@ -205,20 +217,9 @@ public:
const query::result_row_view& row) {
update_parameters::prefetch_data::row cells;
auto add_cell = [&cells] (column_id id, std::experimental::optional<collection_mutation_view>&& cell) {
if (cell) {
cells.emplace(id, collection_mutation{to_bytes(cell->data)});
}
};
auto static_row_iterator = static_row.iterator();
for (auto&& id : _ps.static_columns) {
add_cell(id, static_row_iterator.next_collection_cell());
}
auto row_iterator = row.iterator();
for (auto&& id : _ps.regular_columns) {
add_cell(id, row_iterator.next_collection_cell());
add_cell(cells, _schema->regular_column_at(id), row_iterator.next_collection_cell());
}
_data.rows.emplace(std::make_pair(*_pkey, key), std::move(cells));
@@ -228,7 +229,16 @@ public:
assert(0);
}
void accept_partition_end(const query::result_row_view& static_row) {}
void accept_partition_end(const query::result_row_view& static_row) {
update_parameters::prefetch_data::row cells;
auto static_row_iterator = static_row.iterator();
for (auto&& id : _ps.static_columns) {
add_cell(cells, _schema->static_column_at(id), static_row_iterator.next_collection_cell());
}
_data.rows.emplace(std::make_pair(*_pkey, std::experimental::nullopt), std::move(cells));
}
};
future<update_parameters::prefetched_rows_type>
@@ -278,7 +288,7 @@ modification_statement::read_required_rows(
bytes_ostream buf(result->buf());
query::result_view v(buf.linearize());
auto prefetched_rows = update_parameters::prefetched_rows_type({update_parameters::prefetch_data(s)});
v.consume(ps, prefetch_data_builder(prefetched_rows.value(), ps));
v.consume(ps, prefetch_data_builder(s, prefetched_rows.value(), ps));
return prefetched_rows;
});
}

View File

@@ -45,15 +45,15 @@ namespace cql3 {
std::experimental::optional<collection_mutation_view>
update_parameters::get_prefetched_list(
const partition_key& pkey,
const clustering_key& row_key,
partition_key pkey,
std::experimental::optional<clustering_key> ckey,
const column_definition& column) const
{
if (!_prefetched) {
return {};
}
auto i = _prefetched->rows.find(std::make_pair(pkey, row_key));
auto i = _prefetched->rows.find(std::make_pair(std::move(pkey), std::move(ckey)));
if (i == _prefetched->rows.end()) {
return {};
}

View File

@@ -58,8 +58,9 @@ namespace cql3 {
*/
class update_parameters final {
public:
// Holder for data needed by CQL list updates which depend on current state of the list.
struct prefetch_data {
using key = std::pair<partition_key, clustering_key>;
using key = std::pair<partition_key, std::experimental::optional<clustering_key>>;
struct key_hashing {
partition_key::hashing pk_hash;
clustering_key::hashing ck_hash;
@@ -70,7 +71,7 @@ public:
{ }
size_t operator()(const key& k) const {
return pk_hash(k.first) ^ ck_hash(k.second);
return pk_hash(k.first) ^ (k.second ? ck_hash(*k.second) : 0);
}
};
struct key_equality {
@@ -83,7 +84,8 @@ public:
{ }
bool operator()(const key& k1, const key& k2) const {
return pk_eq(k1.first, k2.first) && ck_eq(k1.second, k2.second);
return pk_eq(k1.first, k2.first)
&& bool(k1.second) == bool(k2.second) && (!k1.second || ck_eq(*k1.second, *k2.second));
}
};
using row = std::unordered_map<column_id, collection_mutation>;
@@ -183,8 +185,11 @@ public:
return _timestamp;
}
std::experimental::optional<collection_mutation_view> get_prefetched_list(
const partition_key& pkey, const clustering_key& row_key, const column_definition& column) const;
std::experimental::optional<collection_mutation_view>
get_prefetched_list(
partition_key pkey,
std::experimental::optional<clustering_key> ckey,
const column_definition& column) const;
};
}

View File

@@ -589,9 +589,7 @@ column_family::seal_active_memtable() {
future<stop_iteration>
column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
// FIXME: better way of ensuring we don't attempt to
// overwrite an existing table.
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
auto gen = calculate_generation_for_new_table();
auto newtab = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(),
_config.datadir, gen,
@@ -859,14 +857,25 @@ void column_family::start_compaction() {
void column_family::trigger_compaction() {
// Submitting compaction job to compaction manager.
// #934 - always inc the pending counter, to help
// indicate the want for compaction.
_stats.pending_compactions++;
do_trigger_compaction(); // see below
}
void column_family::do_trigger_compaction() {
// But only submit if we're not locked out
if (!_compaction_disabled) {
_stats.pending_compactions++;
_compaction_manager.submit(this);
}
}
future<> column_family::run_compaction(sstables::compaction_descriptor descriptor) {
assert(_stats.pending_compactions > 0);
return compact_sstables(std::move(descriptor)).then([this] {
// only do this on success. (no exceptions)
// in that case, we rely on it being still set
// for reqeueuing
_stats.pending_compactions--;
});
}
@@ -1005,6 +1014,9 @@ future<> column_family::populate(sstring sstdir) {
return make_ready_future<>();
});
});
}).then([this] {
// Make sure this is called even if CF is empty
mark_ready_for_writes();
});
}
@@ -1082,7 +1094,13 @@ future<> database::populate_keyspace(sstring datadir, sstring ks_name) {
sstring uuidst = comps[1];
try {
auto&& uuid = find_uuid(ks_name, cfname);
auto&& uuid = [&] {
try {
return find_uuid(ks_name, cfname);
} catch (const std::out_of_range& e) {
std::throw_with_nested(no_such_column_family(ks_name, cfname));
}
}();
auto& cf = find_column_family(uuid);
// #870: Check that the directory name matches
@@ -1183,6 +1201,14 @@ database::init_system_keyspace() {
return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME).then([this]() {
return init_commitlog();
});
}).then([this] {
auto& ks = find_keyspace(db::system_keyspace::NAME);
return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) {
auto cfm = pair.second;
auto& cf = this->find_column_family(cfm);
cf.mark_ready_for_writes();
return make_ready_future<>();
});
});
}
@@ -1698,11 +1724,9 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const
void
column_family::seal_on_overflow() {
++_mutation_count;
if (active_memtable().occupancy().total_space() >= _config.max_memtable_size) {
// FIXME: if sparse, do some in-memory compaction first
// FIXME: maybe merge with other in-memory memtables
_mutation_count = 0;
seal_active_memtable();
}
}
@@ -2272,7 +2296,8 @@ void column_family::clear() {
// NOTE: does not need to be futurized, but might eventually, depending on
// if we implement notifications, whatnot.
future<db::replay_position> column_family::discard_sstables(db_clock::time_point truncated_at) {
assert(_stats.pending_compactions == 0);
assert(_compaction_disabled > 0);
assert(!compaction_manager_queued());
return with_lock(_sstables_lock.for_read(), [this, truncated_at] {
db::replay_position rp;

View File

@@ -159,8 +159,8 @@ private:
// the read lock, and the ones that wish to stop that process will take the write lock.
rwlock _sstables_lock;
mutable row_cache _cache; // Cache covers only sstables.
int64_t _sstable_generation = 1;
unsigned _mutation_count = 0;
std::experimental::optional<int64_t> _sstable_generation = {};
db::replay_position _highest_flushed_rp;
// Provided by the database that owns this commitlog
db::commitlog* _commitlog;
@@ -185,11 +185,17 @@ private:
// update the sstable generation, making sure that new new sstables don't overwrite this one.
void update_sstables_known_generation(unsigned generation) {
_sstable_generation = std::max<uint64_t>(_sstable_generation, generation / smp::count + 1);
if (!_sstable_generation) {
_sstable_generation = 1;
}
_sstable_generation = std::max<uint64_t>(*_sstable_generation, generation / smp::count + 1);
}
uint64_t calculate_generation_for_new_table() {
return _sstable_generation++ * smp::count + engine().cpu_id();
assert(_sstable_generation);
// FIXME: better way of ensuring we don't attempt to
// overwrite an existing table.
return (*_sstable_generation)++ * smp::count + engine().cpu_id();
}
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
@@ -206,7 +212,29 @@ private:
key_source sstables_as_key_source() const;
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstable_list> old_sstables);
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
void do_trigger_compaction();
public:
// This function should be called when this column family is ready for writes, IOW,
// to produce SSTables. Extensive details about why this is important can be found
// in Scylla's Github Issue #1014
//
// Nothing should be writing to SSTables before we have the chance to populate the
// existing SSTables and calculate what should the next generation number be.
//
// However, if that happens, we want to protect against it in a way that does not
// involve overwriting existing tables. This is one of the ways to do it: every
// column family starts in an unwriteable state, and when it can finally be written
// to, we mark it as writeable.
//
// Note that this *cannot* be a part of add_column_family. That adds a column family
// to a db in memory only, and if anybody is about to write to a CF, that was most
// likely already called. We need to call this explicitly when we are sure we're ready
// to issue disk operations safely.
void mark_ready_for_writes() {
update_sstables_known_generation(0);
}
// Creates a mutation reader which covers all data sources for this column family.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// Note: for data queries use query() instead.
@@ -361,8 +389,12 @@ public:
Result run_with_compaction_disabled(Func && func) {
++_compaction_disabled;
return _compaction_manager.remove(this).then(std::forward<Func>(func)).finally([this] {
if (--_compaction_disabled == 0) {
trigger_compaction();
// #934. The pending counter is actually a great indicator into whether we
// actually need to trigger a compaction again.
if (--_compaction_disabled == 0 && _stats.pending_compactions > 0) {
// we're turning if on again, use function that does not increment
// the counter further.
do_trigger_compaction();
}
});
}
@@ -379,16 +411,11 @@ private:
// But it is possible to synchronously wait for the seal to complete by
// waiting on this future. This is useful in situations where we want to
// synchronously flush data to disk.
//
// FIXME: A better interface would guarantee that all writes before this
// one are also complete
future<> seal_active_memtable();
// filter manifest.json files out
static bool manifest_json_filter(const sstring& fname);
seastar::gate _in_flight_seals;
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)

View File

@@ -703,6 +703,8 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
auto& ks = db.find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s);
db.add_column_family(s, cfg);
auto& cf = db.find_column_family(s);
cf.mark_ready_for_writes();
ks.make_directory_for_column_family(s->cf_name(), s->id()).get();
service::get_local_migration_manager().notify_create_column_family(s);
}

View File

@@ -8,7 +8,7 @@
"security_group_id": "{{user `security_group_id`}}",
"region": "{{user `region`}}",
"associate_public_ip_address": "{{user `associate_public_ip_address`}}",
"source_ami": "ami-8ef1d6e4",
"source_ami": "ami-f3102499",
"user_data_file": "user_data.txt",
"instance_type": "{{user `instance_type`}}",
"ssh_username": "centos",

View File

@@ -40,7 +40,8 @@ override_dh_auto_install:
cp -r $(CURDIR)/licenses $(DOC)
mkdir -p $(SCRIPTS) && \
cp $(CURDIR)/seastar/dpdk/tools/dpdk_nic_bind.py $(SCRIPTS)
cp $(CURDIR)/seastar/scripts/dpdk_nic_bind.py $(SCRIPTS)
cp $(CURDIR)/seastar/scripts/posix_net_conf.sh $(SCRIPTS)
cp $(CURDIR)/dist/common/scripts/* $(SCRIPTS)
cp $(CURDIR)/dist/ubuntu/scripts/* $(SCRIPTS)

View File

@@ -409,8 +409,14 @@ future<> gossiper::apply_state_locally(const std::map<inet_address, endpoint_sta
// Runs inside seastar::async context
void gossiper::remove_endpoint(inet_address endpoint) {
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
_subscribers.for_each([endpoint] (auto& subscriber) {
subscriber->on_remove(endpoint);
// We can not run on_remove callbacks here becasue on_remove in
// storage_service might take the gossiper::timer_callback_lock
seastar::async([this, endpoint] {
_subscribers.for_each([endpoint] (auto& subscriber) {
subscriber->on_remove(endpoint);
});
}).handle_exception([] (auto ep) {
logger.warn("Fail to call on_remove callback: {}", ep);
});
if(_seeds.count(endpoint)) {

10
init.cc
View File

@@ -24,7 +24,6 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "service/storage_service.hh"
#include "service/pending_range_calculator_service.hh"
#include "to_string.hh"
#include "gms/inet_address.hh"
@@ -34,14 +33,9 @@
// until proper shutdown is done.
future<> init_storage_service(distributed<database>& db) {
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
return service::init_storage_service(db).then([] {
// #293 - do not stop anything
// engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
}).then([&db] {
return service::init_storage_service(db).then([] {
// #293 - do not stop anything
//engine().at_exit([] { return service::deinit_storage_service(); });
});
//engine().at_exit([] { return service::deinit_storage_service(); });
});
}

View File

@@ -168,12 +168,12 @@ abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
if (wrap) {
auto split_ranges = r.unwrap();
for (auto ep : eps) {
ret.emplace(ep, std::move(split_ranges.first));
ret.emplace(ep, std::move(split_ranges.second));
ret.emplace(ep, split_ranges.first);
ret.emplace(ep, split_ranges.second);
}
} else {
for (auto ep : eps) {
ret.emplace(ep, std::move(r));
ret.emplace(ep, r);
}
}
}
@@ -190,12 +190,12 @@ abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
if (wrap) {
auto split_ranges = r.unwrap();
for (auto ep : eps) {
ret.emplace(std::move(split_ranges.first), ep);
ret.emplace(std::move(split_ranges.second), ep);
ret.emplace(split_ranges.first, ep);
ret.emplace(split_ranges.second, ep);
}
} else {
for (auto ep : eps) {
ret.emplace(std::move(r), ep);
ret.emplace(r, ep);
}
}
}

View File

@@ -398,7 +398,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
// all leaving nodes are gone.
auto metadata = clone_only_token_map(); // don't do this in the loop! #7758
for (const auto& r : affected_ranges) {
auto t = r.end()->value();
auto t = r.end() ? r.end()->value() : dht::maximum_token();
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata);
std::vector<inet_address> diff;

2
log.cc
View File

@@ -246,6 +246,8 @@ std::ostream& operator<<(std::ostream& out, const std::exception_ptr& eptr) {
out << " (error " << e.code() << ", " << e.code().message() << ")";
} catch(const std::exception& e) {
out << " (" << e.what() << ")";
} catch(...) {
// no extra info
}
}
return out;

408
main.cc
View File

@@ -221,7 +221,19 @@ verify_rlimit(bool developer_mode) {
}
}
static bool cpu_sanity() {
if (!__builtin_cpu_supports("sse4.2")) {
std::cerr << "Scylla requires a processor with SSE 4.2 support\n";
return false;
}
return true;
}
int main(int ac, char** av) {
// early check to avoid triggering
if (!cpu_sanity()) {
_exit(71);
}
runtime::init_uptime();
std::setvbuf(stdout, nullptr, _IOLBF, 1000);
app_template app;
@@ -264,7 +276,8 @@ int main(int ac, char** av) {
engine().set_strict_dma(false);
}
return read_config(opts, *cfg).then([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs]() {
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs] {
read_config(opts, *cfg).get();
apply_logger_settings(cfg->default_log_level(), cfg->logger_log_level(),
cfg->log_to_stdout(), cfg->log_to_syslog());
verify_rlimit(cfg->developer_mode());
@@ -320,230 +333,203 @@ int main(int ac, char** av) {
using namespace locator;
// Re-apply strict-dma after we've read the config file, this time
// to all reactors
return parallel_for_each(boost::irange(0u, smp::count), [devmode = opts.count("developer-mode")] (unsigned cpu) {
smp::invoke_on_all([devmode = opts.count("developer-mode")] {
if (devmode) {
engine().set_strict_dma(false);
}
return make_ready_future<>();
}).then([cfg] {
supervisor_notify("creating snitch");
return i_endpoint_snitch::create_snitch(cfg->endpoint_snitch());
// #293 - do not stop anything
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
}).then([api_address] {
supervisor_notify("determining DNS name");
return dns::gethostbyname(api_address);
}).then([&db, api_address, api_port, &ctx] (dns::hostent e){
supervisor_notify("starting API server");
auto ip = e.addresses[0].in.s_addr;
return ctx.http_server.start().then([api_address, api_port, ip, &ctx] {
return api::set_server_init(ctx);
}).then([api_address, api_port, ip, &ctx] {
return ctx.http_server.listen(ipv4_addr{ip, api_port});
}).then([api_address, api_port] {
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
}).get();
supervisor_notify("creating snitch");
i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
// #293 - do not stop anything
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
supervisor_notify("determining DNS name");
dns::hostent e = dns::gethostbyname(api_address).get0();
supervisor_notify("starting API server");
auto ip = e.addresses[0].in.s_addr;
ctx.http_server.start().get();
api::set_server_init(ctx).get();
ctx.http_server.listen(ipv4_addr{ip, api_port}).get();
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
supervisor_notify("initializing storage service");
init_storage_service(db).get();
api::set_server_storage_service(ctx).get();
supervisor_notify("starting per-shard database core");
// Note: changed from using a move here, because we want the config object intact.
db.start(std::ref(*cfg)).get();
engine().at_exit([&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.
return db.invoke_on_all([](auto& db) {
return db.stop();
}).then([] {
return sstables::await_background_jobs_on_all_shards();
}).then([] {
::_exit(0);
});
}).then([&db] {
supervisor_notify("initializing storage service");
return init_storage_service(db);
}).then([&ctx] {
return api::set_server_storage_service(ctx);
}).then([&db, cfg] {
});
supervisor_notify("creating data directories");
dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
supervisor_notify("creating commitlog directory");
dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
supervisor_notify("verifying data and commitlog directories");
std::unordered_set<sstring> directories;
directories.insert(db.local().get_config().data_file_directories().cbegin(),
db.local().get_config().data_file_directories().cend());
directories.insert(db.local().get_config().commitlog_directory());
parallel_for_each(directories, [&db] (sstring pathname) {
return disk_sanity(pathname, db.local().get_config().developer_mode());
}).get();
supervisor_notify("starting per-shard database core");
// Note: changed from using a move here, because we want the config object intact.
return db.start(std::ref(*cfg)).then([&db] {
engine().at_exit([&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.
return db.invoke_on_all([](auto& db) {
return db.stop();
}).then([] {
return sstables::await_background_jobs_on_all_shards();
}).then([] {
::_exit(0);
});
});
});
}).then([cfg, listen_address] {
supervisor_notify("starting gossip");
// Moved local parameters here, esp since with the
// ssl stuff it gets to be a lot.
uint16_t storage_port = cfg->storage_port();
uint16_t ssl_storage_port = cfg->ssl_storage_port();
double phi = cfg->phi_convict_threshold();
auto seed_provider= cfg->seed_provider();
sstring cluster_name = cfg->cluster_name();
const auto& ssl_opts = cfg->server_encryption_options();
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
auto trust_store = get_or_default(ssl_opts, "truststore");
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
return init_ms_fd_gossiper(listen_address
, storage_port
, ssl_storage_port
, encrypt_what
, trust_store
, cert
, key
, seed_provider
, cluster_name
, phi);
}).then([&ctx] {
return api::set_server_gossip(ctx);
}).then([&db] {
supervisor_notify("starting streaming service");
return streaming::stream_session::init_streaming_service(db);
}).then([&ctx] {
return api::set_server_stream_manager(ctx);
}).then([&db] {
supervisor_notify("starting messaging service");
// Start handling REPAIR_CHECKSUM_RANGE messages
return net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db] (auto& keyspace, auto& cf, auto& range) {
return checksum_range(db, keyspace, cf, range);
});
});
});
}).then([&ctx](){
return api::set_server_messaging_service(ctx);
}).then([&proxy, &db] {
supervisor_notify("starting storage proxy");
return proxy.start(std::ref(db)).then([&proxy] {
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
});
}).then([&ctx]() {
return api::set_server_storage_proxy(ctx);
}).then([&mm] {
supervisor_notify("starting migration manager");
return mm.start().then([&mm] {
// #293 - do not stop anything
// engine().at_exit([&mm] { return mm.stop(); });
});
}).then([&db, &proxy, &qp] {
supervisor_notify("starting query processor");
return qp.start(std::ref(proxy), std::ref(db)).then([&qp] {
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });
});
}).then([&qp] {
supervisor_notify("initializing batchlog manager");
return db::get_batchlog_manager().start(std::ref(qp)).then([] {
// #293 - do not stop anything
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
});
}).then([&db, &dirs] {
supervisor_notify("creating data directories");
return dirs.touch_and_lock(db.local().get_config().data_file_directories());
}).then([&db, &dirs] {
supervisor_notify("creating commitlog directory");
return dirs.touch_and_lock(db.local().get_config().commitlog_directory());
}).then([&db] {
supervisor_notify("verifying data and commitlog directories");
std::unordered_set<sstring> directories;
directories.insert(db.local().get_config().data_file_directories().cbegin(),
db.local().get_config().data_file_directories().cend());
directories.insert(db.local().get_config().commitlog_directory());
return do_with(std::move(directories), [&db] (auto& directories) {
return parallel_for_each(directories, [&db] (sstring pathname) {
return disk_sanity(pathname, db.local().get_config().developer_mode());
});
});
}).then([&db] {
supervisor_notify("loading sstables");
return db.invoke_on_all([] (database& db) {
return db.init_system_keyspace();
}).then([&db] {
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
auto cfm = pair.second;
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
});
});
}).then([&db, &proxy] {
supervisor_notify("loading sstables");
return db.invoke_on_all([&proxy] (database& db) {
return db.load_sstables(proxy);
});
}).then([&ctx] {
return api::set_server_load_sstable(ctx);
}).then([&db, &qp] {
supervisor_notify("setting up system keyspace");
return db::system_keyspace::setup(db, qp);
}).then([&db, &qp] {
supervisor_notify("starting commit log");
auto cl = db.local().commitlog();
if (cl == nullptr) {
// Deletion of previous stale, temporary SSTables is done by Shard0. Therefore,
// let's run Shard0 first. Technically, we could just have all shards agree on
// the deletion and just delete it later, but that is prone to races.
//
// Those races are not supposed to happen during normal operation, but if we have
// bugs, they can. Scylla's Github Issue #1014 is an example of a situation where
// that can happen, making existing problems worse. So running a single shard first
// and getting making sure that all temporary tables are deleted provides extra
// protection against such situations.
db.invoke_on(0, [] (database& db) { return db.init_system_keyspace(); }).get();
db.invoke_on_all([] (database& db) {
if (engine().cpu_id() == 0) {
return make_ready_future<>();
}
return cl->list_existing_segments().then([&db, &qp](auto paths) {
if (paths.empty()) {
return make_ready_future<>();
}
return db.init_system_keyspace();
}).get();
supervisor_notify("starting gossip");
// Moved local parameters here, esp since with the
// ssl stuff it gets to be a lot.
uint16_t storage_port = cfg->storage_port();
uint16_t ssl_storage_port = cfg->ssl_storage_port();
double phi = cfg->phi_convict_threshold();
auto seed_provider= cfg->seed_provider();
sstring cluster_name = cfg->cluster_name();
const auto& ssl_opts = cfg->server_encryption_options();
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
auto trust_store = get_or_default(ssl_opts, "truststore");
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
init_ms_fd_gossiper(listen_address
, storage_port
, ssl_storage_port
, encrypt_what
, trust_store
, cert
, key
, seed_provider
, cluster_name
, phi).get();
api::set_server_gossip(ctx).get();
supervisor_notify("starting messaging service");
api::set_server_messaging_service(ctx).get();
supervisor_notify("starting storage proxy");
proxy.start(std::ref(db)).get();
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
api::set_server_storage_proxy(ctx).get();
supervisor_notify("starting migration manager");
mm.start().get();
// #293 - do not stop anything
// engine().at_exit([&mm] { return mm.stop(); });
supervisor_notify("starting query processor");
qp.start(std::ref(proxy), std::ref(db)).get();
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });
supervisor_notify("initializing batchlog manager");
db::get_batchlog_manager().start(std::ref(qp)).get();
// #293 - do not stop anything
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
supervisor_notify("loading sstables");
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
auto cfm = pair.second;
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
}).get();
supervisor_notify("loading sstables");
// See comment on top of our call to init_system_keyspace as per why we invoke
// on Shard0 first. Scylla's Github Issue #1014 for details
db.invoke_on(0, [&proxy] (database& db) { return db.load_sstables(proxy); }).get();
db.invoke_on_all([&proxy] (database& db) {
if (engine().cpu_id() == 0) {
return make_ready_future<>();
}
return db.load_sstables(proxy);
}).get();
api::set_server_load_sstable(ctx).get();
supervisor_notify("setting up system keyspace");
db::system_keyspace::setup(db, qp).get();
supervisor_notify("starting commit log");
auto cl = db.local().commitlog();
if (cl != nullptr) {
auto paths = cl->list_existing_segments().get0();
if (!paths.empty()) {
supervisor_notify("replaying commit log");
return db::commitlog_replayer::create_replayer(qp).then([paths](auto rp) {
return do_with(std::move(rp), [paths = std::move(paths)](auto& rp) {
return rp.recover(paths);
});
}).then([&db] {
supervisor_notify("replaying commit log - flushing memtables");
return db.invoke_on_all([] (database& db) {
return db.flush_all_memtables();
});
}).then([paths] {
supervisor_notify("replaying commit log - removing old commitlog segments");
for (auto& path : paths) {
::unlink(path.c_str());
}
auto rp = db::commitlog_replayer::create_replayer(qp).get0();
rp.recover(paths).get();
supervisor_notify("replaying commit log - flushing memtables");
db.invoke_on_all([] (database& db) {
return db.flush_all_memtables();
}).get();
supervisor_notify("replaying commit log - removing old commitlog segments");
for (auto& path : paths) {
::unlink(path.c_str());
}
}
}
supervisor_notify("initializing migration manager RPC verbs");
service::get_migration_manager().invoke_on_all([] (auto& mm) {
mm.init_messaging_service();
}).get();
supervisor_notify("initializing storage proxy RPC verbs");
proxy.invoke_on_all([] (service::storage_proxy& p) {
p.init_messaging_service();
}).get();
supervisor_notify("starting streaming service");
streaming::stream_session::init_streaming_service(db).get();
api::set_server_stream_manager(ctx).get();
// Start handling REPAIR_CHECKSUM_RANGE messages
net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db] (auto& keyspace, auto& cf, auto& range) {
return checksum_range(db, keyspace, cf, range);
});
});
}).then([] {
supervisor_notify("starting storage service");
auto& ss = service::get_local_storage_service();
return ss.init_server();
}).then([&ctx] {
return api::set_server_storage_service(ctx);
}).then([] {
supervisor_notify("starting batchlog manager");
return db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();
});
}).then([&db] {
supervisor_notify("starting load broadcaster");
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
// casting to std::function<> will fail to compile
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
}).then([] {
return gms::get_local_gossiper().wait_for_gossip_to_settle();
}).then([&ctx] {
return api::set_server_gossip_settle(ctx);
}).then([start_thrift] () {
supervisor_notify("starting native transport");
return service::get_local_storage_service().start_native_transport().then([start_thrift] () {
if (start_thrift) {
return service::get_local_storage_service().start_rpc_server();
}
return make_ready_future<>();
});
}).then([&ctx] {
return api::set_server_done(ctx);
});
}).then([] {
supervisor_notify("serving", true);
}).get();
supervisor_notify("starting storage service", true);
auto& ss = service::get_local_storage_service();
ss.init_server().get();
api::set_server_storage_service(ctx).get();
supervisor_notify("starting batchlog manager");
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();
}).get();
supervisor_notify("starting load broadcaster");
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
// casting to std::function<> will fail to compile
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
gms::get_local_gossiper().wait_for_gossip_to_settle().get();
api::set_server_gossip_settle(ctx).get();
supervisor_notify("starting native transport");
service::get_local_storage_service().start_native_transport().get();
if (start_thrift) {
service::get_local_storage_service().start_rpc_server().get();
}
api::set_server_done(ctx).get();
supervisor_notify("serving");
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
engine().at_exit([] {
return service::get_local_storage_service().drain_on_shutdown();
});
engine().at_exit([] {
return repair_shutdown(service::get_local_storage_service().db());
});
}).or_terminate();
});
}

View File

@@ -112,11 +112,11 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
class messaging_service::rpc_protocol_client_wrapper {
std::unique_ptr<rpc_protocol::client> _p;
public:
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local = ipv4_addr())
: _p(std::make_unique<rpc_protocol::client>(proto, addr, local)) {
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr())
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
}
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
: _p(std::make_unique<rpc_protocol::client>(proto, addr, seastar::tls::connect(c, addr, local)))
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, seastar::tls::connect(c, addr, local)))
{}
auto get_stats() const { return _p->get_stats(); }
future<> stop() { return _p->stop(); }
@@ -390,10 +390,14 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
auto remote_addr = ipv4_addr(get_preferred_ip(id.addr).raw_addr(), must_encrypt ? _ssl_port : _port);
auto local_addr = ipv4_addr{_listen_address.raw_addr(), 0};
rpc::client_options opts;
// send keepalive messages each minute if connection is idle, drop connection after 10 failures
opts.keepalive = std::experimental::optional<net::tcp_keepalive_params>({60s, 60s, 10});
auto client = must_encrypt ?
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
remote_addr, local_addr, _credentials) :
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
remote_addr, local_addr);
it = _clients[idx].emplace(id, shard_info(std::move(client))).first;

View File

@@ -203,9 +203,14 @@ private:
// Successfully-finished repairs are those with id < _next_repair_command
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
// Variables used to allow shutting down all repair in progress
bool _in_shutdown = false;
promise<> _shutdown_done;
int _repairs_running = 0;
public:
void start(int id) {
_status[id] = repair_status::RUNNING;
++_repairs_running;
}
void done(int id, bool succeeded) {
if (succeeded) {
@@ -213,6 +218,9 @@ public:
} else {
_status[id] = repair_status::FAILED;
}
if (--_repairs_running == 0 && _in_shutdown) {
_shutdown_done.set_value();
}
}
repair_status get(int id) {
if (id >= _next_repair_command) {
@@ -228,8 +236,28 @@ public:
int next_repair_command() {
return _next_repair_command++;
}
future<> shutdown() {
assert(!_in_shutdown);
_in_shutdown = true;
if (_repairs_running == 0) {
return make_ready_future<>();
}
return _shutdown_done.get_future();
}
bool in_shutdown() {
return _in_shutdown;
}
} repair_tracker;
static void check_in_shutdown() {
// Only call this from the single CPU managing the repair - the only CPU
// which is allowed to use repair_tracker.
assert(engine().cpu_id() == 0);
if (repair_tracker.in_shutdown()) {
throw repair_stopped_exception();
}
}
partition_checksum::partition_checksum(const mutation& m) {
auto frozen = freeze(m);
@@ -445,6 +473,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
[&db, &neighbors, parallelism] (auto& sem, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) {
return do_for_each(ranges, [&sem, &success, &db, &neighbors, &keyspace, &cf]
(const auto& range) {
check_in_shutdown();
return sem.wait(1).then([&sem, &success, &db, &neighbors, &keyspace, &cf, &range] {
// Ask this node, and all neighbors, to calculate checksums in
// this range. When all are done, compare the results, and if
@@ -713,6 +742,7 @@ static future<> repair_ranges(seastar::sharded<database>& db, sstring keyspace,
// repair all the ranges in sequence
return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) {
#endif
check_in_shutdown();
return repair_range(db, keyspace, range, cfs, data_centers, hosts);
}).then([id] {
logger.info("repair {} completed sucessfully", id);
@@ -731,6 +761,7 @@ static future<> repair_ranges(seastar::sharded<database>& db, sstring keyspace,
// itself does very little (mainly tell other nodes and CPUs what to do).
static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
std::unordered_map<sstring, sstring> options_map) {
check_in_shutdown();
repair_options options(options_map);
@@ -806,3 +837,12 @@ future<repair_status> repair_get_status(seastar::sharded<database>& db, int id)
return repair_tracker.get(id);
});
}
future<> repair_shutdown(seastar::sharded<database>& db) {
logger.info("Starting shutdown of repair");
return db.invoke_on(0, [] (database& localdb) {
return repair_tracker.shutdown().then([] {
logger.info("Completed shutdown of repair");
});
});
}

View File

@@ -40,6 +40,11 @@ public:
virtual const char* what() const noexcept override { return _what.c_str(); }
};
class repair_stopped_exception : public repair_exception {
public:
repair_stopped_exception() : repair_exception("Repair stopped") { }
};
// NOTE: repair_start() can be run on any node, but starts a node-global
// operation.
// repair_start() starts the requested repair on this node. It returns an
@@ -58,6 +63,13 @@ enum class repair_status { RUNNING, SUCCESSFUL, FAILED };
// different CPU (cpu 0) and that might be a deferring operation.
future<repair_status> repair_get_status(seastar::sharded<database>& db, int id);
// repair_shutdown() stops all ongoing repairs started on this node (and
// prevents any further repairs from being started). It returns a future
// saying when all repairs have stopped, and attempts to stop them as
// quickly as possible (we do not wait for repairs to finish but rather
// stop them abruptly).
future<> repair_shutdown(seastar::sharded<database>& db);
// The class partition_checksum calculates a 256-bit cryptographically-secure
// checksum of a set of partitions fed to it. The checksum of a partition set
// is calculated by calculating a strong hash function (SHA-256) of each

Submodule seastar updated: 353b1a1481...e039c46e8b

View File

@@ -66,9 +66,43 @@ migration_manager::migration_manager()
future<> migration_manager::stop()
{
if (ms_inited) {
uninit_messaging_service();
}
return make_ready_future<>();
}
void migration_manager::init_messaging_service()
{
ms_inited = true;
auto& ms = net::get_local_messaging_service();
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
auto src = net::messaging_service::get_source(cinfo);
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
return service::get_local_migration_manager().merge_schema_from(src, mutations);
}).then_wrapped([src] (auto&& f) {
if (f.failed()) {
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
} else {
logger.debug("Applied definitions update from {}.", src);
}
});
return net::messaging_service::no_wait();
});
ms.register_migration_request([this] () {
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
// keep local proxy alive
});
});
}
void migration_manager::uninit_messaging_service()
{
auto& ms = net::get_local_messaging_service();
ms.unregister_migration_request();
ms.unregister_definitions_update();
}
void migration_manager::register_listener(migration_listener* listener)
{
_listeners.emplace_back(listener);

View File

@@ -52,10 +52,12 @@
namespace service {
class migration_manager {
class migration_manager : public seastar::async_sharded_service<migration_manager> {
std::vector<migration_listener*> _listeners;
static const std::chrono::milliseconds migration_delay;
bool ms_inited = false;
public:
migration_manager();
@@ -118,6 +120,10 @@ public:
future<> stop();
bool is_ready_for_bootstrap();
void init_messaging_service();
private:
void uninit_messaging_service();
};
extern distributed<migration_manager> _the_migration_manager;

View File

@@ -1,90 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Modified by ScyllaDB
* Copyright 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "service/pending_range_calculator_service.hh"
#include "service/storage_service.hh"
#include "database.hh"
#include <seastar/core/sleep.hh>
namespace service {
distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
void pending_range_calculator_service::run() {
// long start = System.currentTimeMillis();
auto keyspaces = _db.local().get_non_system_keyspaces();
for (auto& keyspace_name : keyspaces) {
auto& ks = _db.local().find_keyspace(keyspace_name);
calculate_pending_ranges(ks.get_replication_strategy(), keyspace_name);
}
_update_jobs--;
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
}
void pending_range_calculator_service::calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name) {
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
}
future<> pending_range_calculator_service::stop() {
return make_ready_future<>();
}
future<> pending_range_calculator_service::update() {
return smp::submit_to(0, [] {
get_local_pending_range_calculator_service().do_update();
});
}
void pending_range_calculator_service::do_update() {
assert(engine().cpu_id() == 0);
get_local_pending_range_calculator_service()._update_jobs++;
get_local_pending_range_calculator_service().run();
}
future<> pending_range_calculator_service::block_until_finished() {
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
return smp::submit_to(0, [] {
return do_until(
[] { return !(get_local_pending_range_calculator_service()._update_jobs > 0); },
[] { return sleep(std::chrono::milliseconds(100)); });
});
}
}

View File

@@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Modified by ScyllaDB
* Copyright 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "locator/abstract_replication_strategy.hh"
#include "database.hh"
#include <seastar/core/sharded.hh>
namespace service {
class pending_range_calculator_service {
private:
int _update_jobs{0};
distributed<database>& _db;
void calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name);
void run();
public:
pending_range_calculator_service(distributed<database>& db) : _db(db) {}
void do_update();
future<> update();
future<> block_until_finished();
future<> stop();
};
extern distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
inline distributed<pending_range_calculator_service>& get_pending_range_calculator_service() {
return _the_pending_range_calculator_service;
}
inline pending_range_calculator_service& get_local_pending_range_calculator_service() {
return _the_pending_range_calculator_service.local();
}
} // service

View File

@@ -206,12 +206,20 @@ class datacenter_write_response_handler : public abstract_write_response_handler
}
}
public:
using abstract_write_response_handler::abstract_write_response_handler;
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
std::move(targets), boost::range::count_if(pending_endpoints, db::is_local), std::move(dead_endpoints)) {}
};
class write_response_handler : public abstract_write_response_handler {
public:
using abstract_write_response_handler::abstract_write_response_handler;
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
std::move(targets), pending_endpoints.size(), std::move(dead_endpoints)) {}
};
class datacenter_sync_write_response_handler : public abstract_write_response_handler {
@@ -229,16 +237,20 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
public:
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
schema_ptr s,
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, size_t pending_endpoints,
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address> pending_endpoints,
std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, pending_endpoints, dead_endpoints) {
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, 0, dead_endpoints) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
for (auto& target : targets) {
auto dc = snitch_ptr->get_datacenter(target);
if (_dc_responses.find(dc) == _dc_responses.end()) {
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc));
auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (gms::inet_address& ep){
return snitch_ptr->get_datacenter(ep) == dc;
});
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc) + pending_for_dc).first;
_pending_endpoints += pending_for_dc;
}
}
}
@@ -317,24 +329,21 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(sch
{
std::unique_ptr<abstract_write_response_handler> h;
auto& rs = ks.get_replication_strategy();
size_t pending_count = pending_endpoints.size();
auto m = make_lw_shared<const frozen_mutation>(std::move(mutation));
if (db::is_datacenter_local(cl)) {
pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local);
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
} else {
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
}
return register_response_handler(std::move(h));
}
storage_proxy::~storage_proxy() {}
storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
init_messaging_service();
_collectd_registrations = std::make_unique<scollectd::registrations>(scollectd::registrations({
scollectd::add_polled_metric(scollectd::type_instance_id("storage_proxy"
, scollectd::per_cpu_plugin_instance
@@ -1390,11 +1399,6 @@ future<> storage_proxy::schedule_repair(std::unordered_map<gms::inet_address, st
}).finally([p = shared_from_this()] {});
}
class digest_mismatch_exception : public std::runtime_error {
public:
digest_mismatch_exception() : std::runtime_error("Digest mismatch") {}
};
class abstract_read_resolver {
protected:
db::consistency_level _cl;
@@ -1442,37 +1446,31 @@ public:
class digest_read_resolver : public abstract_read_resolver {
size_t _block_for;
size_t _cl_responses = 0;
promise<> _cl_promise; // cl is reached
promise<foreign_ptr<lw_shared_ptr<query::result>>, bool> _cl_promise; // cl is reached
bool _cl_reported = false;
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _data_results;
foreign_ptr<lw_shared_ptr<query::result>> _data_result;
std::vector<query::result_digest> _digest_results;
virtual void on_timeout() override {
if (_cl_responses < _block_for) {
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_results.size() != 0));
if (!_cl_reported) {
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_result));
}
// we will not need them any more
_data_results.clear();
_data_result = foreign_ptr<lw_shared_ptr<query::result>>();
_digest_results.clear();
}
virtual size_t response_count() const override {
return _digest_results.size();
}
bool digests_match() const {
assert(response_count());
if (response_count() == 1) {
return true;
}
auto& first = *_digest_results.begin();
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
}
public:
digest_read_resolver(db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(cl, 0, timeout), _block_for(block_for) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
if (!_timedout) {
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
_digest_results.emplace_back(_targets_count == 1 ? query::result_digest() : result->digest());
_data_results.emplace_back(std::move(result));
if (!_data_result) {
_data_result = std::move(result);
}
got_response(from);
}
}
@@ -1482,12 +1480,13 @@ public:
got_response(from);
}
}
foreign_ptr<lw_shared_ptr<query::result>> resolve() {
assert(_data_results.size());
if (!digests_match()) {
throw digest_mismatch_exception();
bool digests_match() const {
assert(response_count());
if (response_count() == 1) {
return true;
}
return std::move(*_data_results.begin());
auto& first = *_digest_results.begin();
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
}
bool waiting_for(gms::inet_address ep) {
return db::is_datacenter_local(_cl) ? is_me(ep) || db::is_local(ep) : true;
@@ -1497,9 +1496,9 @@ public:
if (waiting_for(ep)) {
_cl_responses++;
}
if (_cl_responses >= _block_for && _data_results.size()) {
if (_cl_responses >= _block_for && _data_result) {
_cl_reported = true;
_cl_promise.set_value();
_cl_promise.set_value(std::move(_data_result), digests_match());
}
}
if (is_completed()) {
@@ -1507,11 +1506,11 @@ public:
_done_promise.set_value();
}
}
future<> has_cl() {
future<foreign_ptr<lw_shared_ptr<query::result>>, bool> has_cl() {
return _cl_promise.get_future();
}
bool has_data() {
return _data_results.size() != 0;
return _data_result;
}
void add_wait_targets(size_t targets_count) {
_targets_count += targets_count;
@@ -1803,37 +1802,41 @@ public:
// hold on to executor until all queries are complete
});
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<> f) {
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<foreign_ptr<lw_shared_ptr<query::result>>, bool> f) {
try {
exec->got_cl();
f.get();
exec->_result_promise.set_value(digest_resolver->resolve()); // can throw digest missmatch exception
auto done = digest_resolver->done();
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
exec->_proxy->_stats.background_reads++;
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
try {
f.get();
digest_resolver->resolve();
exec->_proxy->_stats.background_reads--;
} catch(digest_mismatch_exception& ex) {
exec->_proxy->_stats.read_repair_repaired_background++;
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
exec->reconcile(exec->_cl, timeout);
exec->_result_promise.get_future().then_wrapped([exec] (auto f) {
f.ignore_ready_future(); // ignore any failures during background repair
exec->_proxy->_stats.background_reads--;
});
} catch(...) {
// ignore all exception besides digest mismatch during background check
}
});
} else {
done.discard_result(); // no need for background check, discard done future explicitly
foreign_ptr<lw_shared_ptr<query::result>> result;
bool digests_match;
std::tie(result, digests_match) = f.get(); // can throw
if (digests_match) {
exec->_result_promise.set_value(std::move(result));
auto done = digest_resolver->done();
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
exec->_proxy->_stats.background_reads++;
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
if (f.failed()) {
f.ignore_ready_future(); // ignore all exception besides digest mismatch during background check
} else {
if (!digest_resolver->digests_match()) {
exec->_proxy->_stats.read_repair_repaired_background++;
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
exec->reconcile(exec->_cl, timeout);
exec->_result_promise.get_future().then_wrapped([exec] (future<foreign_ptr<lw_shared_ptr<query::result>>> f) {
f.ignore_ready_future(); // ignore any failures during background repair
exec->_proxy->_stats.background_reads--;
});
} else {
exec->_proxy->_stats.background_reads--;
}
}
});
}
} else { // digest missmatch
exec->reconcile(exec->_cl, timeout);
exec->_proxy->_stats.read_repair_repaired_blocking++;
}
} catch (digest_mismatch_exception& ex) {
exec->reconcile(exec->_cl, timeout);
exec->_proxy->_stats.read_repair_repaired_blocking++;
} catch (read_timeout_exception& ex) {
exec->_result_promise.set_exception(ex);
}
@@ -2677,24 +2680,6 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
void storage_proxy::init_messaging_service() {
auto& ms = net::get_local_messaging_service();
ms.register_definitions_update([] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
auto src = net::messaging_service::get_source(cinfo);
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
return service::get_local_migration_manager().merge_schema_from(src, mutations);
}).then_wrapped([src] (auto&& f) {
if (f.failed()) {
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
} else {
logger.debug("Applied definitions update from {}.", src);
}
});
return net::messaging_service::no_wait();
});
ms.register_migration_request([] () {
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
// keep local proxy alive
});
});
ms.register_mutation([] (const rpc::client_info& cinfo, frozen_mutation in, std::vector<gms::inet_address> forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id) {
return do_with(std::move(in), get_local_shared_storage_proxy(), [&cinfo, forward = std::move(forward), reply_to, shard, response_id] (const frozen_mutation& m, shared_ptr<storage_proxy>& p) {
return when_all(
@@ -2777,8 +2762,6 @@ void storage_proxy::init_messaging_service() {
void storage_proxy::uninit_messaging_service() {
auto& ms = net::get_local_messaging_service();
ms.unregister_definitions_update();
ms.unregister_migration_request();
ms.unregister_mutation();
ms.unregister_mutation_done();
ms.unregister_read_data();

View File

@@ -121,7 +121,6 @@ private:
std::uniform_real_distribution<> _read_repair_chance = std::uniform_real_distribution<>(0,1);
std::unique_ptr<scollectd::registrations> _collectd_registrations;
private:
void init_messaging_service();
void uninit_messaging_service();
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges, db::consistency_level cl);
response_id_type register_response_handler(std::unique_ptr<abstract_write_response_handler>&& h);
@@ -173,6 +172,8 @@ public:
return _db;
}
void init_messaging_service();
future<> mutate_locally(const mutation& m);
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m);
future<> mutate_locally(std::vector<mutation> mutations);

View File

@@ -54,7 +54,6 @@
#include "locator/local_strategy.hh"
#include "version.hh"
#include "unimplemented.hh"
#include "service/pending_range_calculator_service.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "dht/range_streamer.hh"
@@ -265,7 +264,7 @@ void storage_service::join_token_ring(int delay) {
}
set_mode(mode::JOINING, "schema complete, ready to bootstrap", true);
set_mode(mode::JOINING, "waiting for pending range calculation", true);
get_local_pending_range_calculator_service().block_until_finished().get();
block_until_update_pending_ranges_finished().get();
set_mode(mode::JOINING, "calculation complete, ready to bootstrap", true);
logger.debug("... got ring + schema info");
@@ -292,7 +291,7 @@ void storage_service::join_token_ring(int delay) {
set_mode(mode::JOINING, "waiting for schema information to complete", true);
sleep(std::chrono::seconds(1)).get();
}
get_local_pending_range_calculator_service().block_until_finished().get();
block_until_update_pending_ranges_finished().get();
}
logger.info("Checking bootstrapping/leaving/moving nodes: ok");
@@ -463,7 +462,7 @@ void storage_service::handle_state_bootstrap(inet_address endpoint) {
}
_token_metadata.add_bootstrap_tokens(tokens, endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
auto& gossiper = gms::get_local_gossiper();
if (gossiper.uses_host_id(endpoint)) {
@@ -561,7 +560,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
// a race where natural endpoint was updated to contain node A, but A was
// not yet removed from pending endpoints
_token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint);
get_local_pending_range_calculator_service().do_update();
do_update_pending_ranges();
for (auto ep : endpoints_to_remove) {
remove_endpoint(ep);
@@ -608,7 +607,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
}).get();
}
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
if (logger.is_enabled(logging::log_level::debug)) {
auto ver = _token_metadata.get_ring_version();
for (auto& x : _token_metadata.get_token_to_endpoint()) {
@@ -643,7 +642,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) {
// at this point the endpoint is certainly a member with this token, so let's proceed
// normally
_token_metadata.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces) {
@@ -660,7 +659,7 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector<sst
auto token = dht::global_partitioner().from_sstring(pieces[1]);
logger.debug("Node {} state moving, new token {}", endpoint, token);
_token_metadata.add_moving_endpoint(token, endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
@@ -687,7 +686,7 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector<s
logger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
// Note that the endpoint is being removed
_token_metadata.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
// find the endpoint coordinating this removal that we need to notify when we're done
auto state = gossiper.get_endpoint_state_for_endpoint(endpoint);
assert(state);
@@ -790,7 +789,7 @@ void storage_service::on_change(inet_address endpoint, application_state state,
void storage_service::on_remove(gms::inet_address endpoint) {
logger.debug("endpoint={} on_remove", endpoint);
_token_metadata.remove_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
@@ -1665,7 +1664,7 @@ future<> storage_service::decommission() {
throw std::runtime_error(sprint("Node in %s state; wait for status to become normal or restart", ss._operation_mode));
}
get_local_pending_range_calculator_service().block_until_finished().get();
ss.update_pending_ranges().get();
auto non_system_keyspaces = db.get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
@@ -1764,7 +1763,7 @@ future<> storage_service::remove_node(sstring host_id_string) {
}
ss._removing_node = endpoint;
tm.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
ss.update_pending_ranges().get();
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
// we add our own token so other nodes to let us know when they're done
@@ -1968,7 +1967,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758
for (auto& r : ranges) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), metadata);
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, metadata);
current_replica_endpoints.emplace(r, std::move(eps));
}
@@ -1989,7 +1989,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
// range.
for (auto& r : ranges) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), temp);
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp);
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
@@ -2133,7 +2134,7 @@ void storage_service::excise(std::unordered_set<token> tokens, inet_address endp
}
}).get();
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, int64_t expire_time) {
@@ -2182,7 +2183,7 @@ future<> storage_service::confirm_replication(inet_address node) {
void storage_service::leave_ring() {
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get();
_token_metadata.remove_endpoint(get_broadcast_address());
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
auto& gossiper = gms::get_local_gossiper();
auto expire_time = gossiper.compute_expire_time().time_since_epoch().count();
@@ -2281,7 +2282,7 @@ future<> storage_service::start_leaving() {
auto& gossiper = gms::get_local_gossiper();
return gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens())).then([this] {
_token_metadata.add_leaving_endpoint(get_broadcast_address());
return get_local_pending_range_calculator_service().update();
return update_pending_ranges();
});
}
@@ -2503,8 +2504,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
if (r.contains(to_fetch, dht::token_comparator())) {
std::vector<inet_address> endpoints;
if (dht::range_streamer::use_strict_consistency()) {
auto end_token = to_fetch.end() ? to_fetch.end()->value() : dht::maximum_token();
std::vector<inet_address> old_endpoints = eps;
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_fetch.end()->value(), token_meta_clone_all_settled);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
@@ -2563,8 +2565,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
std::unordered_multimap<inet_address, range<token>> endpoint_ranges;
std::unordered_map<inet_address, std::vector<range<token>>> endpoint_ranges_map;
for (range<token> to_stream : ranges_per_keyspace.first) {
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone_all_settled);
auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token();
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
logger.debug("Range: {} Current endpoints: {} New endpoints: {}", to_stream, current_endpoints, new_endpoints);
std::sort(current_endpoints.begin(), current_endpoints.end());
std::sort(new_endpoints.begin(), new_endpoints.end());
@@ -2631,7 +2634,7 @@ future<> storage_service::move(token new_token) {
auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces();
get_local_pending_range_calculator_service().block_until_finished().get();
ss.block_until_update_pending_ranges_finished().get();
// checking if data is moving to this node
for (auto keyspace_name : keyspaces_to_process) {
@@ -2676,5 +2679,39 @@ std::chrono::milliseconds storage_service::get_ring_delay() {
return std::chrono::milliseconds(ring_delay);
}
void storage_service::do_update_pending_ranges() {
if (engine().cpu_id() != 0) {
throw std::runtime_error("do_update_pending_ranges should be called on cpu zero");
}
// long start = System.currentTimeMillis();
auto keyspaces = _db.local().get_non_system_keyspaces();
for (auto& keyspace_name : keyspaces) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
}
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
}
future<> storage_service::update_pending_ranges() {
return get_storage_service().invoke_on(0, [] (auto& ss){
ss._update_jobs++;
ss.do_update_pending_ranges();
// calculate_pending_ranges will modify token_metadata, we need to repliate to other cores
return ss.replicate_to_all_cores().finally([&ss, ss0 = ss.shared_from_this()] {
ss._update_jobs--;
});
});
}
future<> storage_service::block_until_update_pending_ranges_finished() {
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
return smp::submit_to(0, [] {
return do_until(
[] { return !(get_local_storage_service()._update_jobs > 0); },
[] { return sleep(std::chrono::milliseconds(100)); });
});
}
} // namespace service

View File

@@ -108,6 +108,7 @@ private:
private final AtomicLong notificationSerialNumber = new AtomicLong();
#endif
distributed<database>& _db;
int _update_jobs{0};
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
// given objective criteria
@@ -127,6 +128,10 @@ public:
// Needed by distributed<>
future<> stop();
void do_update_pending_ranges();
future<> update_pending_ranges();
future<> block_until_update_pending_ranges_finished();
const locator::token_metadata& get_token_metadata() const {
return _token_metadata;
}

View File

@@ -52,7 +52,14 @@ namespace sstables {
logging::logger sstlog("sstable");
thread_local std::unordered_map<sstring, unsigned> sstable::_shards_agreeing_to_remove_sstable;
future<file> new_sstable_component_file(sstring name, open_flags flags) {
return open_file_dma(name, flags).handle_exception([name] (auto ep) {
sstlog.error("Could not create SSTable component {}. Found exception: {}", name, ep);
return make_exception_future<file>(ep);
});
}
thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> sstable::_shards_agreeing_to_remove_sstable;
static utils::phased_barrier& background_jobs() {
static thread_local utils::phased_barrier gate;
@@ -749,7 +756,19 @@ void sstable::write_toc(const io_priority_class& pc) {
sstlog.debug("Writing TOC file {} ", file_path);
// Writing TOC content to temporary file.
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
// If creation of temporary TOC failed, it implies that that boot failed to
// delete a sstable with temporary for this column family, or there is a
// sstable being created in parallel with the same generation.
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
bool toc_exists = file_exists(filename(sstable::component_type::TOC)).get0();
if (toc_exists) {
// TOC will exist at this point if write_components() was called with
// the generation of a sstable that exists.
f.close().get();
remove_file(file_path).get();
throw std::runtime_error(sprint("SSTable write failed due to existence of TOC file for generation %ld of %s.%s", _generation, _ks, _cf));
}
file_output_stream_options options;
options.buffer_size = 4096;
@@ -792,7 +811,7 @@ void write_crc(const sstring file_path, checksum& c) {
sstlog.debug("Writing CRC file {} ", file_path);
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
file f = open_file_dma(file_path, oflags).get0();
file f = new_sstable_component_file(file_path, oflags).get0();
file_output_stream_options options;
options.buffer_size = 4096;
@@ -806,7 +825,7 @@ void write_digest(const sstring file_path, uint32_t full_checksum) {
sstlog.debug("Writing Digest file {} ", file_path);
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
auto f = open_file_dma(file_path, oflags).get0();
auto f = new_sstable_component_file(file_path, oflags).get0();
file_output_stream_options options;
options.buffer_size = 4096;
@@ -877,7 +896,7 @@ template <sstable::component_type Type, typename T>
void sstable::write_simple(T& component, const io_priority_class& pc) {
auto file_path = filename(Type);
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
file_output_stream_options options;
options.buffer_size = sstable_buffer_size;
@@ -938,8 +957,8 @@ future<> sstable::open_data() {
future<> sstable::create_data() {
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
return when_all(open_file_dma(filename(component_type::Index), oflags),
open_file_dma(filename(component_type::Data), oflags)).then([this] (auto files) {
return when_all(new_sstable_component_file(filename(component_type::Index), oflags),
new_sstable_component_file(filename(component_type::Data), oflags)).then([this] (auto files) {
// FIXME: If both files could not be created, the first get below will
// throw an exception, and second get() will not be attempted, and
// we'll get a warning about the second future being destructed
@@ -1715,9 +1734,11 @@ sstable::shared_remove_by_toc_name(sstring toc_name, bool shared) {
return remove_by_toc_name(toc_name);
} else {
auto shard = std::hash<sstring>()(toc_name) % smp::count;
return smp::submit_to(shard, [toc_name] {
auto& counter = _shards_agreeing_to_remove_sstable[toc_name];
if (++counter == smp::count) {
return smp::submit_to(shard, [toc_name, src_shard = engine().cpu_id()] {
auto& remove_set = _shards_agreeing_to_remove_sstable[toc_name];
remove_set.insert(src_shard);
auto counter = remove_set.size();
if (counter == smp::count) {
_shards_agreeing_to_remove_sstable.erase(toc_name);
return remove_by_toc_name(toc_name);
} else {

View File

@@ -343,7 +343,7 @@ private:
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
static thread_local std::unordered_map<sstring, unsigned> _shards_agreeing_to_remove_sstable;
static thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> _shards_agreeing_to_remove_sstable;
std::unordered_set<component_type, enum_hash<component_type>> _components;

View File

@@ -41,19 +41,14 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "service/storage_service.hh"
#include "service/pending_range_calculator_service.hh"
// TODO : remove once shutdown is ok.
// Broke these test when doing horror patch for #293
// Simpler to copy the code from init.cc than trying to do clever parameterization
// and whatnot.
static future<> tst_init_storage_service(distributed<database>& db) {
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
}).then([&db] {
return service::init_storage_service(db).then([] {
engine().at_exit([] { return service::deinit_storage_service(); });
});
return service::init_storage_service(db).then([] {
engine().at_exit([] { return service::deinit_storage_service(); });
});
}
@@ -340,7 +335,6 @@ public:
_db->stop().get();
service::get_storage_service().stop().get();
service::get_pending_range_calculator_service().stop().get();
locator::i_endpoint_snitch::stop_snitch().get();

View File

@@ -29,7 +29,6 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "core/reactor.hh"
#include "service/pending_range_calculator_service.hh"
#include "service/storage_service.hh"
#include "core/distributed.hh"
#include "database.hh"
@@ -39,7 +38,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
distributed<database> db;
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
service::get_pending_range_calculator_service().start(std::ref(db));
service::get_storage_service().start(std::ref(db)).get();
db.start().get();
net::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
@@ -51,7 +49,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
net::get_messaging_service().stop().get();
db.stop().get();
service::get_storage_service().stop().get();
service::get_pending_range_calculator_service().stop().get();
locator::i_endpoint_snitch::stop_snitch().get();
});
}

View File

@@ -68,6 +68,7 @@ future<>
with_column_family(schema_ptr s, column_family::config cfg, Func func) {
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
return func(*cf).then([cf, cm] {
return cf->stop();
}).finally([cf, cm] {});
@@ -404,9 +405,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
auto cm = make_lw_shared<compaction_manager>();
return do_with(make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm), [s, cm] (auto& cf_ptr) mutable {
column_family& cf = *cf_ptr;
return with_column_family(s, cfg, [s] (auto& cf) mutable {
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;
const column_definition& r1_col = *s->get_column_definition("r1");

View File

@@ -987,6 +987,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
cfg.enable_incremental_backups = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
cf->start();
cf->mark_ready_for_writes();
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
auto generations = make_lw_shared<std::vector<unsigned long>>({1, 2, 3, 4});
@@ -1019,7 +1020,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
// were compacted.
BOOST_REQUIRE(cf->sstables_count() == generations->size());
cm->submit(&*cf);
cf->trigger_compaction();
BOOST_REQUIRE(cm->get_stats().pending_tasks == 1);
// wait for submitted job to finish.
@@ -1063,6 +1064,7 @@ SEASTAR_TEST_CASE(compact) {
auto s = builder.build();
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
return open_sstables("tests/sstables/compaction", {1,2,3}).then([s = std::move(s), cf, cm, generation] (auto sstables) {
return test_setup::do_with_test_directory([sstables, s, generation, cf, cm] {
@@ -1161,6 +1163,7 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type));
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
auto generations = make_lw_shared<std::vector<unsigned long>>(std::move(generations_to_compact));
auto sstables = make_lw_shared<std::vector<sstables::shared_sstable>>();
@@ -1670,6 +1673,7 @@ SEASTAR_TEST_CASE(leveled_01) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1714,6 +1718,7 @@ SEASTAR_TEST_CASE(leveled_02) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1768,6 +1773,7 @@ SEASTAR_TEST_CASE(leveled_03) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1826,6 +1832,7 @@ SEASTAR_TEST_CASE(leveled_04) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -2159,6 +2166,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
}).then([s, tmp, sstables] {
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
auto create = [tmp] {
return make_lw_shared<sstable>("ks", "cf", tmp->path, 3, la, big);
};
@@ -2259,6 +2267,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
};
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
std::vector<shared_sstable> sstables;
sstables.push_back(std::move(sstp));

View File

@@ -36,6 +36,7 @@
#include "database.hh"
#include <memory>
#include "sstable_test.hh"
#include "tmpdir.hh"
using namespace sstables;
@@ -169,10 +170,11 @@ SEASTAR_TEST_CASE(big_summary_query_32) {
return summary_query<32, 0xc4000, 182>("tests/sstables/bigsummary", 76);
}
static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
auto sst = make_lw_shared<sstable>("ks", "cf", dir, generation, la, big);
return sst->load().then([sst, generation] {
static future<sstable_ptr> do_write_sst(sstring load_dir, sstring write_dir, unsigned long generation) {
auto sst = make_lw_shared<sstable>("ks", "cf", load_dir, generation, la, big);
return sst->load().then([sst, write_dir, generation] {
sstables::test(sst).change_generation_number(generation + 1);
sstables::test(sst).change_dir(write_dir);
auto fut = sstables::test(sst).store();
return std::move(fut).then([sst = std::move(sst)] {
return make_ready_future<sstable_ptr>(std::move(sst));
@@ -180,8 +182,8 @@ static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
});
}
static future<> write_sst_info(sstring dir, unsigned long generation) {
return do_write_sst(dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
static future<> write_sst_info(sstring load_dir, sstring write_dir, unsigned long generation) {
return do_write_sst(load_dir, write_dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
}
using bufptr_t = std::unique_ptr<char [], free_deleter>;
@@ -223,11 +225,12 @@ static future<> compare_files(sstdesc file1, sstdesc file2, sstable::component_t
}
static future<> check_component_integrity(sstable::component_type component) {
return write_sst_info("tests/sstables/compressed", 1).then([component] {
auto tmp = make_lw_shared<tmpdir>();
return write_sst_info("tests/sstables/compressed", tmp->path, 1).then([component, tmp] {
return compare_files(sstdesc{"tests/sstables/compressed", 1 },
sstdesc{"tests/sstables/compressed", 2 },
sstdesc{tmp->path, 2 },
component);
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(check_compressed_info_func) {
@@ -235,8 +238,9 @@ SEASTAR_TEST_CASE(check_compressed_info_func) {
}
SEASTAR_TEST_CASE(check_summary_func) {
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
auto tmp = make_lw_shared<tmpdir>();
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
return sstables::test(sst2).read_summary().then([sst1, sst2] {
summary& sst1_s = sstables::test(sst1).get_summary();
summary& sst2_s = sstables::test(sst2).get_summary();
@@ -247,7 +251,7 @@ SEASTAR_TEST_CASE(check_summary_func) {
BOOST_REQUIRE(sst1_s.first_key.value == sst2_s.first_key.value);
BOOST_REQUIRE(sst1_s.last_key.value == sst2_s.last_key.value);
});
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(check_filter_func) {
@@ -255,8 +259,9 @@ SEASTAR_TEST_CASE(check_filter_func) {
}
SEASTAR_TEST_CASE(check_statistics_func) {
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
auto tmp = make_lw_shared<tmpdir>();
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
return sstables::test(sst2).read_statistics().then([sst1, sst2] {
statistics& sst1_s = sstables::test(sst1).get_statistics();
statistics& sst2_s = sstables::test(sst2).get_statistics();
@@ -271,19 +276,20 @@ SEASTAR_TEST_CASE(check_statistics_func) {
});
// TODO: compare the field contents from both sstables.
});
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(check_toc_func) {
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
auto tmp = make_lw_shared<tmpdir>();
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
return sstables::test(sst2).read_toc().then([sst1, sst2] {
auto& sst1_c = sstables::test(sst1).get_components();
auto& sst2_c = sstables::test(sst2).get_components();
BOOST_REQUIRE(sst1_c == sst2_c);
});
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(uncompressed_random_access_read) {
@@ -857,6 +863,7 @@ SEASTAR_TEST_CASE(reshuffle) {
cfg.enable_incremental_backups = false;
auto cf = make_lw_shared<column_family>(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm);
cf->start();
cf->mark_ready_for_writes();
return cf->reshuffle_sstables(3).then([cm, cf] (std::vector<sstables::entry_descriptor> reshuffled) {
BOOST_REQUIRE(reshuffled.size() == 2);
BOOST_REQUIRE(reshuffled[0].generation == 3);

View File

@@ -100,6 +100,10 @@ public:
_sst->_generation = generation;
}
void change_dir(sstring dir) {
_sst->_dir = dir;
}
future<> store() {
_sst->_components.erase(sstable::component_type::Index);
_sst->_components.erase(sstable::component_type::Data);

View File

@@ -135,6 +135,8 @@ void test_timestamp_like_string_conversions(data_type timestamp_type) {
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-03T12:30:00+1230"), timestamp_type->decompose(tp)));
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-02T23:00-0100"), timestamp_type->decompose(tp)));
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00");
auto now = time(nullptr);
auto local_now = *localtime(&now);
char buf[100];

View File

@@ -40,6 +40,14 @@
#include <boost/multiprecision/cpp_int.hpp>
#include "utils/big_decimal.hh"
template<typename T>
sstring time_point_to_string(const T& tp)
{
auto timestamp = tp.time_since_epoch().count();
auto time = boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(timestamp);
return boost::posix_time::to_iso_extended_string(time);
}
static const char* int32_type_name = "org.apache.cassandra.db.marshal.Int32Type";
static const char* long_type_name = "org.apache.cassandra.db.marshal.LongType";
static const char* ascii_type_name = "org.apache.cassandra.db.marshal.AsciiType";
@@ -421,7 +429,11 @@ public:
}
virtual bytes from_string(sstring_view s) const override;
virtual sstring to_string(const bytes& b) const override {
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
auto v = deserialize(b);
if (v.is_null()) {
return "";
}
return time_point_to_string(from_value(v).get());
}
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::timestamp;
@@ -684,7 +696,11 @@ public:
return b;
}
virtual sstring to_string(const bytes& b) const override {
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
auto v = deserialize(b);
if (v.is_null()) {
return "";
}
return time_point_to_string(from_value(v).get());
}
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::timestamp;
@@ -1059,7 +1075,8 @@ public:
if (!num) {
return 1;
}
return boost::multiprecision::cpp_int::canonical_value(num).size() * sizeof(boost::multiprecision::limb_type) + 1;
auto pnum = abs(num);
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
}
virtual int32_t compare(bytes_view v1, bytes_view v2) const override {
if (v1.empty()) {

View File

@@ -1214,6 +1214,7 @@ public:
if (_active) {
assert(_active->is_empty());
free_segment(_active);
_active = nullptr;
}
if (_group) {
_group->del(this);

View File

@@ -50,12 +50,14 @@ namespace validation {
*/
void
validate_cql_key(schema_ptr schema, const partition_key& key) {
bytes_view b(key);
if (b.empty()) {
// C* validates here that the thrift key is not empty.
// It can only be empty if it is not composite and its only component in CQL form is empty.
if (schema->partition_key_size() == 1 && key.begin(*schema)->empty()) {
throw exceptions::invalid_request_exception("Key may not be empty");
}
// check that key can be handled by FBUtilities.writeShortByteArray
auto b = key.representation();
if (b.size() > max_key_size) {
throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", b.size(), max_key_size));
}