Compare commits

...

47 Commits

Author SHA1 Message Date
Nadav Har'El
8d8932cb15 sstable: fix use-after-free of temporary ioclass copy
Commit 6a3872b355 fixed some use-after-free
bugs but introduced a new one because of a typo:

Instead of capturing a reference to the long-living io-class object, as
all the code does, one place in the code accidentally captured a *copy*
of this object. This copy had a very temporary life, and when a reference
to that *copy* was passed to sstable reading code which assumed that it
lives at least as long as the read call, a use-after-free resulted.

Fixes #1072

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <1458595629-9314-1-git-send-email-nyh@scylladb.com>
(cherry picked from commit 2eb0627665)
2016-03-22 08:11:00 +02:00
Asias He
891476dfc6 gossip: Handle unknown application_state when printing
In case an unknown application_state is received, we should be able to
handle it when printting.

Message-Id: <98d2307359292e90c8925f38f67a74b69e45bebe.1458553057.git.asias@scylladb.com>
(cherry picked from commit 7acc9816d2)
2016-03-21 11:59:53 +02:00
Pekka Enberg
346c729531 main: Defer API server hooks until commitlog replay
Defer registering services to the API server until commitlog has been
replayed to ensure that nobody is able to trigger sstable operations via
'nodetool' before we are ready for them.
Message-Id: <1458116227-4671-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 972fc6e014)
2016-03-18 09:20:45 +02:00
Pekka Enberg
2c06609fc1 main: Fix broadcast_address and listen_address validation errors
Fix the validation error message to look like this:

  Scylla version 666.development-20160316.49af399 starting ...
  WARN  2016-03-17 12:24:15,137 [shard 0] config - Option partitioner is not (yet) used.
  WARN  2016-03-17 12:24:15,138 [shard 0] init - NOFILE rlimit too low (recommended setting 200000, minimum setting 10000; you may run out of file descriptors.
  ERROR 2016-03-17 12:24:15,138 [shard 0] init - Bad configuration: invalid 'listen_address': eth0: boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> > (Invalid argument)
  Exiting on unhandled exception of type 'bad_configuration_error': std::exception

Instead of:

  Exiting on unhandled exception of type 'boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >': Invalid argument

Fixes #1051.

Message-Id: <1458210329-4488-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 69dacf9063)
2016-03-18 09:00:18 +02:00
Takuya ASADA
0f22c3ebe9 dist: follow sysconfig setting when counting number of cpus on scylla_io_setup
When NR_CPU >= 8, we disabled cpu0 for AMI on scylla_sysconfig_setup.
But scylla_io_setup doesn't know that, try to assign NR_CPU queues, then scylla fails to start because queues > cpus.
So on this fix scylla_io_setup checks sysconfig settings, if '--smp <n>' specified on SCYLLA_ARGS, use n to limit queue size.
Also, when instance type is not supported pre-configured parameters, we need to passes --cpuset parameters to iotune. Otherwise iotune will run on a different set of CPUs, which may have different performance characteristics.

Fixes #996, #1043, #1046

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1458221762-10595-2-git-send-email-syuu@scylladb.com>
(cherry picked from commit 4cc589872d)
2016-03-18 08:57:52 +02:00
Takuya ASADA
ee1ce3c6b4 dist: On scylla_sysconfig_setup, don't disable cpu0 on non-AMI environments
Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1458221762-10595-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 6f71173827)
2016-03-18 08:57:42 +02:00
Paweł Dziepak
b2f07c0d44 lsa: update _closed_occupancy after freeing all segments
_closed_occupancy will be used when a region is removed from its region
group, make sure that it is accurate.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
(cherry picked from commit 338fd34770)
2016-03-18 08:11:25 +02:00
Glauber Costa
6c6447c131 sstables: do not assume mutation_reader will be kept alive
Our sstables::mutation_reader has a specialization in which start and end
ranges are passed as futures. That is needed because we may have to read the
index file for those.

This works well under the assumption that every time a mutation_reader will be
created it will be used, since whoever is using it will surely keep the state
of the reader alive.

However, that assumption is no longer true - for a while. We use a reader
interface for reading everything from mutations and sstables to cache entries,
and when we create an sstable mutation_reader, that does not mean we'll use it.
In fact we won't, if the read can be serviced first by a higher level entity.

If that happens to be the case, the reader will be destructed. However, since
it may take more time than that for the start and end futures to resolve, by
the time they are resolved the state of the mutation reader will no longer be
valid.

The proposed fix for that is to only resolve the future inside
mutation_reader's read() function. If that function is called,  we can have a
reasonable expectation that the caller object is being kept alive.

A second way to fix this would be to force the mutation reader to be kept alive
by transforming it into a shared pointer and acquiring a reference to itself.
However, because the reader may turn out not to be used, the delayed read
actually has the advantage of not even reading anything from the disk if there
is no need for it.

Also, because sstables can be compacted, we can't guarantee that the sst object
itself , used in the resolution of start and end can be alive and that has the
same problem. If we delay the calling of those, we will also solve a similar
problem.  We assume here that the outter reader is keeping the SSTable object
alive.

I must note that I have not reproduced this problem. What goes above is the
result of the analysis we have made in #1036. That being the case, a thorough
review is appreciated.

Fixes #1036

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <a7e4e722f76774d0b1f263d86c973061fb7fe2f2.1458135770.git.glauber@scylladb.com>
(cherry picked from commit 6a3872b355)
2016-03-18 07:56:35 +02:00
Asias He
ade185e518 storage_service: Update pending ranges when keyspace is changed
If a keyspace is created after we calcuate the pending ranges during
bootstrap. We will ignore the keyspace in pending ranges when handling
write request for that keyspace which will casue data lose if rf = 1.

Fixes #1000

(cherry picked from commit d63281b256)
2016-03-16 14:29:45 +02:00
Asias He
d7001cad04 migration_manager: Make the migration callbacks runs inside seastar thread
At the momment, the callbacks returns void, it is impossible to wait for
the callbacks to complete. Make the callbacks runs inside seastar
thread, so if we need to wait for the callback, we can make it call
foo_operation().get() in the callback. It is easier than making the
callbacks return future<>.

(cherry picked from commit 93015bcc54)
2016-03-16 14:29:43 +02:00
Takuya ASADA
14504bdb25 dist: do not auto-start scylla-server job on Ubuntu package install time
Fixes #1017

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1458122424-22889-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit f1d18e9980)
2016-03-16 13:56:58 +02:00
Asias He
22682636ae main: Defer initalization of streaming
Streaming is used by bootstrap and repair. Streaming uses storage_proxy
class to apply the frozen_mutation and db/column_family class to
invalidate row cache. Defer the initalization just before repair and
bootstrap init.
Message-Id: <8e99cf443239dd8e17e6b6284dab171f7a12365c.1458034320.git.asias@scylladb.com>

(cherry picked from commit d79dbfd4e8)
2016-03-15 11:59:32 +02:00
Pekka Enberg
c409e3508e main: Defer REPAIR_CHECKSUM_RANGE RPC verb registration after commitlog replay
Register the REPAIR_CHECKSUM_RANGE messaging service verb handler after
we have replayed the commitlog to avoid responding with bogus checksums.
Message-Id: <1458027934-8546-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit eb13f65949)
2016-03-15 11:59:28 +02:00
Gleb Natapov
f137536c1c main: Defer storage proxy RPC verb registration after commitlog replay
Message-Id: <20160315071229.GM6117@scylladb.com>
(cherry picked from commit 5076f4878b)
2016-03-15 09:41:28 +02:00
Gleb Natapov
d738863ed6 messaging: enable keepalive tcp option for inter-node communication
Some network equipment that does TCP session tracking tend to drop TCP
sessions after a period of inactivity. Use keepalive mechanism to
prevent this from happening for our inter-node communication.

Message-Id: <20160314173344.GI31837@scylladb.com>
(cherry picked from commit e228ef1bd9)
2016-03-14 21:07:43 +02:00
Pekka Enberg
7426cf980e Merge seastar upstream
* seastar 88cc232...0739576 (4):
  > rpc: allow configuring keepalive for rpc client
  > net: add keepalive configuration to socket interface
  > iotune: refuse to run if there is not enough space available
  > rpc: make client connection error more clear
2016-03-14 21:07:05 +02:00
Pekka Enberg
8b88789dfb main: Defer migration manager RPC verb registration after commitlog replay
Defer registering migration manager RPC verbs after commitlog has has
been replayed so that our own schema is fully loaded before other other
nodes start querying it or sending schema updates.
Message-Id: <1457971028-7325-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 1429213b4c)
2016-03-14 21:06:35 +02:00
Glauber Costa
7dbcd5f2ca main: when scanning SSTables, run shard 0 first
Deletion of previous stale, temporary SSTables is done by Shard0. Therefore,
let's run Shard0 first. Technically, we could just have all shards agree on the
deletion and just delete it later, but that is prone to races.

Those races are not supposed to happen during normal operation, but if we have
bugs, they can. Scylla's Github Issue #1014 is an example of a situation where
that can happen, making existing problems worse. So running a single shard
first and getting making sure that all temporary tables are deleted provides
extra protection against such situations.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 6c4e31bbdb)
2016-03-14 21:06:27 +02:00
Gleb Natapov
ffbf02deb5 make initialization run in a thread
While looking at initialization code I felt like my head is going to
explode. Moving initialization into a thread makes things a little bit
better. Only lightly tested.

Message-Id: <20160310163142.GE28529@scylladb.com>
(cherry picked from commit 16135c2084)
2016-03-14 21:06:19 +02:00
Gleb Natapov
65aa036c75 fix developer-mode parameter application on SMP
I am almost sure we want to apply it once on each shard, and not multiple
times on a single shard.

Message-Id: <20160310155804.GB28529@scylladb.com>
(cherry picked from commit 176aa25d35)
2016-03-14 21:06:12 +02:00
Glauber Costa
6ab1b2d453 database: turn sstable generation number into an optional
This patch makes sure that every time we need to create a new generation number -
the very first step in the creation of a new SSTable, the respective CF is already
initialized and populated. Failure to do so can lead to data being overwritten.
Extensive details about why this is important can be found
in Scylla's Github Issue #1014

Nothing should be writing to SSTables before we have the chance to populate the
existing SSTables and calculate what should the next generation number be.

However, if that happens, we want to protect against it in a way that does not
involve overwriting existing tables. This is one of the ways to do it: every
column family starts in an unwriteable state, and when it can finally be written
to, we mark it as writeable.

Note that this *cannot* be a part of add_column_family. That adds a column family
to a db in memory only, and if anybody is about to write to a CF, that was most
likely already called. We need to call this explicitly when we are sure we're ready
to issue disk operations safely.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit a339296385)
2016-03-14 16:04:23 +02:00
Glauber Costa
11679b28f5 database: remove unused parameter
We are no longer using the in_flight_seals gate, but forgot to remove it.
To guarantee that all seal operations will have finished when we're done,
we are using the memtable_flush_queue, which also guarantees order. But
that gate was never removed.

The FIXME code should also be removed, since such interface does exist now.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 8eb4e69053)
2016-03-14 16:04:15 +02:00
Glauber Costa
ffb5e6f01e column_family: do not open code generation calculation
We already have a function that wraps this, re-use it.  This FIXME is still
relevant, so just move it there. Let's not lose it.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit 94e90d4a17)
2016-03-14 16:04:08 +02:00
Glauber Costa
6aea747275 colum_family: remove mutation_count
We use memory usage as a threshold these days, and nowhere is _mutation_count
checked. Get rid of it.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
2016-03-14 16:04:02 +02:00
Asias He
2a840788fd storage_service: Fix pending_range_calculator_service
Since calculate_pending_ranges will modify token_metadata, we need to
replicate to other shards. With this patch, when we call
calculate_pending_ranges, token_metadata will be replciated to other
non-zero shards.

In addition, it is not useful as a standalone class. We can merge it
into the storage_service. Kill one singleton class.

Fixes #1033
Refs #962
Message-Id: <fb5b26311cafa4d315eb9e72d823c5ade2ab4bda.1457943074.git.asias@scylladb.com>

(cherry picked from commit 9f64c36a08)
2016-03-14 14:38:31 +02:00
Pekka Enberg
6b443db4d9 main: Initialize system keyspace earlier
We start services like gossiper before system keyspace is initialized
which means we can start writing too early. Shuffle code so that system
keyspace is initialized earlier.

Refs #1014
Message-Id: <1457593758-9444-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit 5dd1fda6cf)
2016-03-14 13:50:15 +02:00
Asias He
9919211c25 storage_service: Do not stop messaging_service more than once
If we do
   - Decommission a node
   - Stop a node
we will shutdown messaging_service more than once in:
   - storage_service::decommission
   - storage_service::drain_on_shutdown

Fixes #1005
Refs  #1013

This fix a dtest failure in debug build.

update_cluster_layout_tests.TestUpdateClusterLayout.simple_decommission_node_1_test/

/data/jenkins/workspace/urchin-dtest/label/monster/mode/debug/scylla/seastar/core/future.hh:802:35:
runtime error: member call on null pointer of type 'struct
future_state'
core/future.hh:334:49: runtime error: member access within null
pointer of type 'const struct future_state'
ASAN:SIGSEGV
=================================================================
==4557==ERROR: AddressSanitizer: SEGV on unknown address
0x000000000000 (pc 0x00000065923e bp 0x7fbf6ffac430 sp 0x7fbf6ffac420
T0)
    #0 0x65923d in future_state<>::available() const
/data/jenkins/workspace/urchin-dtest/label/monster/mode/debug/scylla/seastar/core/future.hh:334
    #1 0x41458f1 in future<>::available()
/data/jenkins/workspace/urchin-dtest/label/monster/mode/debug/scylla/seastar/core/future.hh:802
    #2 0x41458f1 in then_wrapped<parallel_for_each(Iterator, Iterator,
Func&&)::<lambda(parallel_for_each_state&)> [with Iterator =
std::__detail::_Node_iterator<std::pair<const net::msg_addr,
net::messaging_service::shard_info>, false, true>; Func =
net::messaging_service::stop()::<lambda(auto:39&)> [with auto:39 =
std::unordered_map<net::msg_addr, net::messaging_service::shard_info,
net::msg_addr::hash>]::<lambda(std::pair<const net::msg_addr,
net::messaging_service::shard_info>&)>]::<lambda(future<>)>, future<>
> /data/jenkins/workspace/urchin-dtest/label/monster/mode/debug/scylla/seastar/core/future.hh:878

(cherry picked from commit 138c5f5834)
2016-03-14 12:20:57 +02:00
Vlad Zolotarov
c4f73f4e12 sstables: properly account removal requests
The same shard may create an sstables::sstable object for the same SStable
that doesn't belong to it more than once and mark it
for deletion (e.g. in a 'nodetool refresh' flow).

In that case the destructor of sstables::sstable accounted
the deletion requests from the same shard more than once since it was a simple
counter incremented each time there was a deletion request while it should
account request from the same shard as a single request. This is because
the removal logic waited for all shards to agree on a removal of a specific
SStable by comparing the counter mentioned above to the total
number of shards and once they were equal the SStable files were actually removed.

This patch fixes this by replacing the counter by an std::unordered_set<unsigned>
that will store a shard ids of the shards requesting the deletion
of the sstable object and will compare the size() of this set
to smp::count in order to decide whether to actually delete the corresponding
SStable files.

Fixes #1004

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1457886812-32345-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit ce47fcb1ba)
2016-03-14 12:00:10 +02:00
Raphael S. Carvalho
dcd62cc0be sstables: make write_simple() safer by using exclusive flag
We should guarantee that write_simple() will not try to overwrite
an existing file.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <194bd055f1f2dc1bb9766a67225ec38c88e7b005.1457818073.git.raphaelsc@scylladb.com>
(cherry picked from commit 1ff7d32272)
2016-03-14 12:00:03 +02:00
Raphael S. Carvalho
dcd2b85e02 sstables: fix race condition when writing to the same sstable in parallel
When we are about to write a new sstable, we check if the sstable exists
by checking if respective TOC exists. That check was added to handle a
possible attempt to write a new sstable with a generation being used.
Gleb was worried that a TOC could appear after the check, and that's indeed
possible if there is an ongoing sstable write that uses the same generation
(running in parallel).
If TOC appear after the check, we would again crap an existing sstable with
a temporary, and user wouldn't be to boot scylla anymore without manual
intervention.

Then Nadav proposed the following solution:
"We could do this by the following variant of Raphael's idea:

   1. create .txt.tmp unconditionally, as before the commit 031bf57c1
(if we can't create it, fail).
   2. Now confirm that .txt does not exist. If it does, delete the .txt.tmp
we just created and fail.
   3. continue as usual
   4. and at the end, as before, rename .txt.tmp to .txt.

The key to solving the race is step 1: Since we created .txt.tmp in step 1
and know this creation succeeded, we know that we cannot be running in
parallel with another writer - because such a writer too would have tried to
create the same file, and kept it existing until the very last step of its
work (step 4)."

This patch implements the solution described above.
Let me also say that the race is theoretical and scylla wasn't affected by
it so far.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <ef630f5ac1bd0d11632c343d9f77a5f6810d18c1.1457818331.git.raphaelsc@scylladb.com>
(cherry picked from commit 0af786f3ea)
2016-03-14 11:59:55 +02:00
Raphael S. Carvalho
1d1416f841 sstables: bail out if toc exists for generation used by write_components
Currently, if sstable::write_components() is called to write a new sstable
using the same generation of a sstable that exists, a temporary TOC will
be unconditionally created. Afterwards, the same sstable::write_components()
will fail when it reaches sstable::create_data(). The reason is obvious
because data component exists for that generation (in this scenario).
After that, user will not be able to boot scylla anymore because there is
a generation with both a TOC and a temporary TOC. We cannot simply remove a
generation with TOC and temporary TOC because user data will be lost (again,
in this scenario). After all, the temporary TOC was only created because
sstable::write_components() was wrongly called with the generation of a
sstable that exists.

Solution proposed by this patch is to trigger exception if a TOC file
exists for the generation used.

Some SSTable unit tests were also changed to guarantee that we don't try
to overwrite components of an existing sstable.

Refs #1014.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <caffc4e19cdcf25e4c6b9dd277d115422f8246c4.1457643565.git.raphaelsc@scylladb.com>
(cherry picked from commit 031bf57c19)
2016-03-14 11:59:46 +02:00
Glauber Costa
be552139ce sstables: improve error messages
The standard C++ exception messages that will be thrown if there is anything
wrong writing the file, are suboptimal: they barely tell us the name of the failing
file.

Use a specialized create function so that we can capture that better.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit f2a8bcabc2)
2016-03-14 11:59:39 +02:00
Avi Kivity
1b45b5d649 Merge seastar upstream
* seastar 906b562...88cc232 (2):
  > reactor: fix work item leak in syscall work queue
  > rpc_test: add missing header
2016-03-14 11:16:22 +02:00
Tomasz Grabiec
7c1268765c log: Fix operator<<(std::ostream&, const std::exception_ptr&)
Attempt to print std::nested_exception currently results in exception
to leak outside the printer. Fix by capturing all exception in the
final catch block.

For nested exception, the logger will print now just
"std::nested_exception".  For nested exceptions specifically we should
log more, but that is a separate problem to solve.
Message-Id: <1457532215-7498-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 838a038cbd)
2016-03-09 16:10:27 +02:00
Pekka Enberg
9ef84d1f01 types: Implement to_string for timestamps and dates
The to_string() function is used for logging purpose so use boost
to_iso_extended_string() to format both timestamps and dates.

Fixes #968 (showstopper)
Message-Id: <1457528755-6164-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit ab502bcfa8)
2016-03-09 16:10:18 +02:00
Calle Wilund
4e3b98f281 lists.cc: fix update insert of frozen list
Fixes #967

Frozen lists are just atomic cells. However, old code inserted the
frozen data directly as an atomic_cell_or_collection, which in turn
meant it lacked the header data of a cell. When in turn it was
handled by internal serialization (freeze), since the schema said
is was not a (non-frozen) collection, we tried to look at frozen
list data as cell header -> most likely considered dead.
Message-Id: <1457432538-28836-1-git-send-email-calle@scylladb.com>

(cherry picked from commit 8575f1391f)
2016-03-08 15:36:29 +02:00
Pekka Enberg
124489e8d8 Update scylla-ami submodule
* dist/ami/files/scylla-ami d4a0e18...84bcd0d (1):
  > Add --ami parameter

(cherry picked from commit 81af486b69)
2016-03-08 14:10:53 +02:00
Takuya ASADA
7a2c57d6bd dist: export all entries on /etc/default/scylla-server on Ubuntu
Signed-off-by: Takuya ASADA <syuu@scylladb.com>
(cherry picked from commit 18a27de3c8)
2016-03-08 14:10:46 +02:00
Takuya ASADA
10543bf81e dist: export sysconfig for scylla-io-setup.service
Signed-off-by: Takuya ASADA <syuu@scylladb.com>
(cherry picked from commit 9ee14abf24)
2016-03-08 14:10:22 +02:00
Takuya ASADA
579a220162 Revert "Revert "dist: align ami option with others (-a --> --ami)""
This reverts commit 66c5feb9e9.

Conflicts:
	dist/common/scripts/scylla_sysconfig_setup

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
(cherry picked from commit 3d9dc52f5f)
2016-03-08 14:10:14 +02:00
Takuya ASADA
8c5ffb84ce Revert "Revert "Revert "dist: remove AMI entry from sysconfig, since there is no script refering it"""
This reverts commit 643beefc8c.

Conflicts:
	dist/common/scripts/scylla_sysconfig_setup
	dist/common/sysconfig/scylla-server

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
(cherry picked from commit c9882bc2c4)
2016-03-08 14:10:05 +02:00
Takuya ASADA
d05cdb0f6e dist: add /etc/scylla.d/io.conf on Ubuntu
Signed-off-by: Takuya ASADA <syuu@scylladb.com>
(cherry picked from commit c888eaac74)
2016-03-08 14:09:58 +02:00
Gleb Natapov
df02fb7a3e fix EACH_QUORUM handling during bootstrapping
Currently write acknowledgements handling does not take bootstrapping
node into account for CL=EACH_QUORUM. The patch fixes it.

Fixes #994

Message-Id: <20160307121620.GR2253@scylladb.com>
(cherry picked from commit 626c9d046b)
2016-03-08 13:35:58 +02:00
Gleb Natapov
559a8b41f2 log: add space between log level and date in the outpu
It was dropped by 6dc51027a3

Message-Id: <20160306125313.GI2253@scylladb.com>
(cherry picked from commit 8dad399256)
2016-03-08 13:31:07 +02:00
Paweł Dziepak
8b1f18ee1a lsa: set _active to nullptr in region destructor
In region destructor, after active segments is freed pointer to it is
left unchanged. This confuses the remaining parts of the destructor
logic (namely, removal from region group) which may rely on the
information in region_impl::_active.

In this particular case the problem was that code removing from the
region group called region_impl::occupancy() which was
dereferencing _active if not null.

Fixes #993.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1457341670-18266-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit 99b61d3944)
2016-03-08 13:30:37 +02:00
Takuya ASADA
cbbd18a249 dist: show message to use XFS for scylla data directory and also notify about developer mode, when iotune fails
Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1457426286-15925-1-git-send-email-syuu@scylladb.com>
2016-03-08 12:21:02 +02:00
Pekka Enberg
4db985e505 release: prepare for 0.19 2016-03-06 13:26:44 +02:00
43 changed files with 657 additions and 594 deletions

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=0.19
if test -f version
then

View File

@@ -423,7 +423,6 @@ scylla_core = (['database.cc',
'service/client_state.cc',
'service/migration_task.cc',
'service/storage_service.cc',
'service/pending_range_calculator_service.cc',
'service/load_broadcaster.cc',
'service/pager/paging_state.cc',
'service/pager/query_pagers.cc',

View File

@@ -338,13 +338,8 @@ lists::do_append(shared_ptr<term> t,
if (!value) {
m.set_cell(prefix, column, params.make_dead_cell());
} else {
auto&& to_add = list_value->_elements;
auto deref = [] (const bytes_opt& v) { return *v; };
auto&& newv = collection_mutation{list_type_impl::pack(
boost::make_transform_iterator(to_add.begin(), deref),
boost::make_transform_iterator(to_add.end(), deref),
to_add.size(), cql_serialization_format::internal())};
m.set_cell(prefix, column, atomic_cell_or_collection::from_collection_mutation(std::move(newv)));
auto newv = list_value->get_with_protocol_version(cql_serialization_format::internal());
m.set_cell(prefix, column, params.make_cell(std::move(newv)));
}
}
}

View File

@@ -588,9 +588,7 @@ column_family::seal_active_memtable() {
future<stop_iteration>
column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
// FIXME: better way of ensuring we don't attempt to
// overwrite an existing table.
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
auto gen = calculate_generation_for_new_table();
auto newtab = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(),
_config.datadir, gen,
@@ -1017,6 +1015,9 @@ future<> column_family::populate(sstring sstdir) {
return make_ready_future<>();
});
});
}).then([this] {
// Make sure this is called even if CF is empty
mark_ready_for_writes();
});
}
@@ -1201,6 +1202,14 @@ database::init_system_keyspace() {
return populate_keyspace(_cfg->data_file_directories()[0], db::system_keyspace::NAME).then([this]() {
return init_commitlog();
});
}).then([this] {
auto& ks = find_keyspace(db::system_keyspace::NAME);
return parallel_for_each(ks.metadata()->cf_meta_data(), [this] (auto& pair) {
auto cfm = pair.second;
auto& cf = this->find_column_family(cfm);
cf.mark_ready_for_writes();
return make_ready_future<>();
});
});
}
@@ -1718,11 +1727,9 @@ column_family::apply(const frozen_mutation& m, const schema_ptr& m_schema, const
void
column_family::seal_on_overflow() {
++_mutation_count;
if (active_memtable().occupancy().total_space() >= _config.max_memtable_size) {
// FIXME: if sparse, do some in-memory compaction first
// FIXME: maybe merge with other in-memory memtables
_mutation_count = 0;
seal_active_memtable();
}
}

View File

@@ -159,8 +159,8 @@ private:
// the read lock, and the ones that wish to stop that process will take the write lock.
rwlock _sstables_lock;
mutable row_cache _cache; // Cache covers only sstables.
int64_t _sstable_generation = 1;
unsigned _mutation_count = 0;
std::experimental::optional<int64_t> _sstable_generation = {};
db::replay_position _highest_flushed_rp;
// Provided by the database that owns this commitlog
db::commitlog* _commitlog;
@@ -182,11 +182,17 @@ private:
// update the sstable generation, making sure that new new sstables don't overwrite this one.
void update_sstables_known_generation(unsigned generation) {
_sstable_generation = std::max<uint64_t>(_sstable_generation, generation / smp::count + 1);
if (!_sstable_generation) {
_sstable_generation = 1;
}
_sstable_generation = std::max<uint64_t>(*_sstable_generation, generation / smp::count + 1);
}
uint64_t calculate_generation_for_new_table() {
return _sstable_generation++ * smp::count + engine().cpu_id();
assert(_sstable_generation);
// FIXME: better way of ensuring we don't attempt to
// overwrite an existing table.
return (*_sstable_generation)++ * smp::count + engine().cpu_id();
}
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
@@ -205,6 +211,27 @@ private:
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
void do_trigger_compaction();
public:
// This function should be called when this column family is ready for writes, IOW,
// to produce SSTables. Extensive details about why this is important can be found
// in Scylla's Github Issue #1014
//
// Nothing should be writing to SSTables before we have the chance to populate the
// existing SSTables and calculate what should the next generation number be.
//
// However, if that happens, we want to protect against it in a way that does not
// involve overwriting existing tables. This is one of the ways to do it: every
// column family starts in an unwriteable state, and when it can finally be written
// to, we mark it as writeable.
//
// Note that this *cannot* be a part of add_column_family. That adds a column family
// to a db in memory only, and if anybody is about to write to a CF, that was most
// likely already called. We need to call this explicitly when we are sure we're ready
// to issue disk operations safely.
void mark_ready_for_writes() {
update_sstables_known_generation(0);
}
// Creates a mutation reader which covers all data sources for this column family.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// Note: for data queries use query() instead.
@@ -377,16 +404,11 @@ private:
// But it is possible to synchronously wait for the seal to complete by
// waiting on this future. This is useful in situations where we want to
// synchronously flush data to disk.
//
// FIXME: A better interface would guarantee that all writes before this
// one are also complete
future<> seal_active_memtable();
// filter manifest.json files out
static bool manifest_json_filter(const sstring& fname);
seastar::gate _in_flight_seals;
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)

View File

@@ -607,10 +607,10 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
#endif
proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
for (auto&& keyspace_to_drop : keyspaces_to_drop) {
return do_for_each(keyspaces_to_drop, [&db] (auto keyspace_to_drop) {
db.drop_keyspace(keyspace_to_drop);
service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
}
return service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
});
}).get0();
});
}
@@ -650,7 +650,7 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
return do_for_each(created, [&db](auto&& val) {
auto ksm = create_keyspace_from_schema_partition(val);
return db.create_keyspace(ksm).then([ksm] {
service::get_local_migration_manager().notify_create_keyspace(ksm);
return service::get_local_migration_manager().notify_create_keyspace(ksm);
});
}).then([&altered, &db] () mutable {
for (auto&& name : altered) {
@@ -710,6 +710,8 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
auto& ks = db.find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s);
db.add_column_family(s, cfg);
auto& cf = db.find_column_family(s);
cf.mark_ready_for_writes();
ks.make_directory_for_column_family(s->cf_name(), s->id()).get();
service::get_local_migration_manager().notify_create_column_family(s);
}

View File

@@ -1,4 +1,4 @@
#!/bin/sh -e
#!/bin/sh
is_ami() {
if [ "`dmidecode --string system-version | grep \.amazon`" != "" ] && \
@@ -18,20 +18,21 @@ is_supported_instance_type() {
}
is_developer_mode() {
. /etc/os-release
if [ "$NAME" = "Ubuntu" ]; then
. /etc/default/scylla-server
else
. /etc/sysconfig/scylla-server
fi
echo $SCYLLA_ARGS|egrep -c "\-\-developer-mode(\s+|=)1"
}
if [ ! -f /etc/scylla/io_configured ] && [ `is_developer_mode` -eq 0 ]; then
if [ `is_ami` -eq 1 ]; then
SMP=`echo $SCYLLA_ARGS|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
CPUSET=`echo $SCYLLA_ARGS|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
fi
if [ `is_ami` -eq 1 ] && [ `is_supported_instance_type` -eq 1 ]; then
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
NR_DISKS=`curl http://169.254.169.254/latest/meta-data/block-device-mapping/|grep ephemeral|wc -l`
if [ "$SMP" != "" ]; then
NR_CPU=$SMP
fi
NR_SHARDS=$NR_CPU
if [ $NR_CPU -ge 8 ] && [ "$SET_NIC" = "no" ]; then
NR_SHARDS=$((NR_CPU - 1))
@@ -49,7 +50,13 @@ if [ ! -f /etc/scylla/io_configured ] && [ `is_developer_mode` -eq 0 ]; then
echo "SCYLLA_IO=\"--num-io-queues $NR_IO_QUEUES --max-io-requests $NR_REQS\"" > /etc/scylla.d/io.conf
else
iotune --evaluation-directory /var/lib/scylla --format envfile --options-file /etc/scylla.d/io.conf
iotune --evaluation-directory /var/lib/scylla --format envfile --options-file /etc/scylla.d/io.conf $CPUSET
if [ $? -ne 0 ]; then
logger -p user.err "/var/lib/scylla did not pass validation tests, it may not be on XFS and/or has limited disk space."
logger -p user.err "This is a non-supported setup, and performance is expected to be very bad."
logger -p user.err "For better performance, placing your data on XFS-formatted directories is required."
logger -p user.err " To override this error, see the developer_mode configuration option."
fi
fi
touch /etc/scylla/io_configured
fi

View File

@@ -1,6 +1,6 @@
#!/bin/sh -e
if [ -f /etc/scylla/ami_disabled ]; then
if [ "$AMI" = "yes" ] && [ -f /etc/scylla/ami_disabled ]; then
rm /etc/scylla/ami_disabled
exit 1
fi

View File

@@ -12,6 +12,7 @@ print_usage() {
echo " --homedir scylla home directory"
echo " --confdir scylla config directory"
echo " --setup-nic setup NIC's interrupts, RPS, XPS"
echo " --ami AMI instance mode"
exit 1
}
@@ -60,6 +61,10 @@ while [ $# -gt 0 ]; do
SETUP_NIC=1
shift 1
;;
"--ami")
AMI=yes
shift 1
;;
*)
print_usage
;;
@@ -71,7 +76,7 @@ echo Setting parameters on $SYSCONFIG/scylla-server
ETHDRV=`/usr/lib/scylla/dpdk_nic_bind.py --status | grep if=$NIC | sed -e "s/^.*drv=//" -e "s/ .*$//"`
ETHPCIID=`/usr/lib/scylla/dpdk_nic_bind.py --status | grep if=$NIC | awk '{print $1}'`
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
if [ $NR_CPU -ge 8 ] && [ "$SET_NIC" = "no" ]; then
if [ "$AMI" = "yes" ] && [ $NR_CPU -ge 8 ] && [ "$SET_NIC" = "no" ]; then
NR=$((NR_CPU - 1))
SET_NIC="yes"
SCYLLA_ARGS="$SCYLLA_ARGS --cpuset 1-$NR --smp $NR"
@@ -86,5 +91,6 @@ sed -e s#^NETWORK_MODE=.*#NETWORK_MODE=$NETWORK_MODE# \
-e s#^SCYLLA_CONF=.*#SCYLLA_CONF=$SCYLLA_CONF# \
-e s#^SET_NIC=.*#SET_NIC=$SET_NIC# \
-e "s#^SCYLLA_ARGS=.*#SCYLLA_ARGS=\"$SCYLLA_ARGS\"#" \
-e s#^AMI=.*#AMI=$AMI# \
$SYSCONFIG/scylla-server > /tmp/scylla-server
mv /tmp/scylla-server $SYSCONFIG/scylla-server

View File

@@ -40,5 +40,5 @@ SCYLLA_ARGS="--log-to-syslog 1 --log-to-stdout 0 --default-log-level info --coll
## scylla arguments (for dpdk mode)
#SCYLLA_ARGS="--log-to-syslog 1 --log-to-stdout 0 --default-log-level info --collectd-address=127.0.0.1:25826 --collectd=1 --collectd-poll-period 3000 --network-stack native --dpdk-pmd"
# scylla IO parameters (max-io-requests and max-io-queues) are automatically
# configured, saved at /etc/scylla.d/io.conf
# setup as AMI instance
AMI=no

View File

@@ -4,6 +4,7 @@ After=network.target
[Service]
Type=oneshot
EnvironmentFile=/etc/sysconfig/scylla-server
ExecStart=/usr/lib/scylla/scylla_io_setup
RemainAfterExit=yes
TimeoutStartSec=1800

View File

@@ -29,8 +29,8 @@ SCRIPTNAME=/etc/init.d/$NAME
[ -x "$DAEMON" ] || exit 0
# Read configuration variable file if it is present
[ -r /etc/default/$NAME ] && . /etc/default/$NAME
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
# Define LSB log_* functions.
. /lib/lsb/init-functions

View File

@@ -25,20 +25,20 @@ chdir /var/lib/scylla
env HOME=/var/lib/scylla
pre-start script
. /etc/default/scylla-server
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS SCYLLA_IO
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
sudo /usr/lib/scylla/scylla_prepare
sudo /usr/lib/scylla/scylla_io_setup
end script
script
. /etc/default/scylla-server
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS SCYLLA_IO
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
exec /usr/bin/scylla $SCYLLA_ARGS $SCYLLA_IO
end script
post-stop script
. /etc/default/scylla-server
export NETWORK_MODE TAP BRIDGE ETHDRV ETHPCIID NR_HUGEPAGES USER GROUP SCYLLA_HOME SCYLLA_CONF SCYLLA_ARGS SCYLLA_IO
eval "`grep -v -e "^\s*#" -e "^$" /etc/default/scylla-server|sed -e 's/^/export /'`"
eval "`grep -v -e "^\s*#" -e "^$" /etc/scylla.d/*.conf|sed -e 's/^/export /'`"
sudo /usr/lib/scylla/scylla_stop
end script

View File

@@ -8,6 +8,7 @@ SYSCTL = $(CURDIR)/debian/scylla-server/etc/sysctl.d
SUDOERS = $(CURDIR)/debian/scylla-server/etc/sudoers.d
LIMITS= $(CURDIR)/debian/scylla-server/etc/security/limits.d
COLLECTD= $(CURDIR)/debian/scylla-server/etc/collectd/collectd.conf.d
SCYLLAD= $(CURDIR)/debian/scylla-server/etc/scylla.d
LIBS = $(CURDIR)/debian/scylla-server/usr/lib
CONF = $(CURDIR)/debian/scylla-server/etc/scylla
@@ -33,6 +34,9 @@ override_dh_auto_install:
mkdir -p $(COLLECTD) && \
cp $(CURDIR)/dist/common/collectd.d/scylla.conf $(COLLECTD)
mkdir -p $(SCYLLAD) && \
cp $(CURDIR)/dist/common/scylla.d/io.conf $(SCYLLAD)
mkdir -p $(CONF) && \
cp $(CURDIR)/conf/scylla.yaml $(CONF)
cp $(CURDIR)/conf/cassandra-rackdc.properties $(CONF)
@@ -68,6 +72,9 @@ override_dh_auto_install:
mkdir -p $(CURDIR)/debian/scylla-server/var/lib/scylla/commitlog
mkdir -p $(CURDIR)/debian/scylla-server/var/lib/scylla/coredump
override_dh_installinit:
dh_installinit --no-start
override_dh_strip:
dh_strip --dbg-package=scylla-server-dbg
%:

View File

@@ -62,7 +62,12 @@ static const std::map<application_state, sstring> application_state_names = {
};
std::ostream& operator<<(std::ostream& os, const application_state& m) {
os << application_state_names.at(m);
auto it = application_state_names.find(m);
if (it != application_state_names.end()) {
os << application_state_names.at(m);
} else {
os << "UNKNOWN";
}
return os;
}

View File

@@ -414,8 +414,14 @@ future<> gossiper::apply_state_locally(const std::map<inet_address, endpoint_sta
// Runs inside seastar::async context
void gossiper::remove_endpoint(inet_address endpoint) {
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
_subscribers.for_each([endpoint] (auto& subscriber) {
subscriber->on_remove(endpoint);
// We can not run on_remove callbacks here becasue on_remove in
// storage_service might take the gossiper::timer_callback_lock
seastar::async([this, endpoint] {
_subscribers.for_each([endpoint] (auto& subscriber) {
subscriber->on_remove(endpoint);
});
}).handle_exception([] (auto ep) {
logger.warn("Fail to call on_remove callback: {}", ep);
});
if(_seeds.count(endpoint)) {

10
init.cc
View File

@@ -24,7 +24,6 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "service/storage_service.hh"
#include "service/pending_range_calculator_service.hh"
#include "to_string.hh"
#include "gms/inet_address.hh"
@@ -34,14 +33,9 @@
// until proper shutdown is done.
future<> init_storage_service(distributed<database>& db) {
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
return service::init_storage_service(db).then([] {
// #293 - do not stop anything
// engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
}).then([&db] {
return service::init_storage_service(db).then([] {
// #293 - do not stop anything
//engine().at_exit([] { return service::deinit_storage_service(); });
});
//engine().at_exit([] { return service::deinit_storage_service(); });
});
}

4
log.cc
View File

@@ -100,7 +100,7 @@ logger::really_do_log(log_level level, const char* fmt, stringer** s, size_t n)
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() % 1000;
auto tm = std::chrono::system_clock::to_time_t(now);
char tmp[100];
strftime(tmp, sizeof(tmp), "%Y-%m-%d %T", std::localtime(&tm));
strftime(tmp, sizeof(tmp), " %Y-%m-%d %T", std::localtime(&tm));
out << tmp << sprint(",%03d", residual_millis);
syslog_offset += 24;
}
@@ -258,6 +258,8 @@ std::ostream& operator<<(std::ostream& out, const std::exception_ptr& eptr) {
out << " (error " << e.code() << ", " << e.code().message() << ")";
} catch(const std::exception& e) {
out << " (" << e.what() << ")";
} catch(...) {
// no extra info
}
}
return out;

405
main.cc
View File

@@ -276,7 +276,8 @@ int main(int ac, char** av) {
engine().set_strict_dma(false);
}
return read_config(opts, *cfg).then([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs]() {
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs] {
read_config(opts, *cfg).get();
apply_logger_settings(cfg->default_log_level(), cfg->logger_log_level(),
cfg->log_to_stdout(), cfg->log_to_syslog());
verify_rlimit(cfg->developer_mode());
@@ -292,9 +293,19 @@ int main(int ac, char** av) {
sstring broadcast_rpc_address = cfg->broadcast_rpc_address();
if (!broadcast_address.empty()) {
utils::fb_utilities::set_broadcast_address(broadcast_address);
try {
utils::fb_utilities::set_broadcast_address(broadcast_address);
} catch (...) {
startlog.error("Bad configuration: invalid 'broadcast_address': {}: {}", broadcast_address, std::current_exception());
throw bad_configuration_error();
}
} else if (!listen_address.empty()) {
utils::fb_utilities::set_broadcast_address(listen_address);
try {
utils::fb_utilities::set_broadcast_address(listen_address);
} catch (...) {
startlog.error("Bad configuration: invalid 'listen_address': {}: {}", listen_address, std::current_exception());
throw bad_configuration_error();
}
} else {
startlog.error("Bad configuration: neither listen_address nor broadcast_address are defined\n");
throw bad_configuration_error();
@@ -332,225 +343,195 @@ int main(int ac, char** av) {
using namespace locator;
// Re-apply strict-dma after we've read the config file, this time
// to all reactors
return parallel_for_each(boost::irange(0u, smp::count), [devmode = opts.count("developer-mode")] (unsigned cpu) {
smp::invoke_on_all([devmode = opts.count("developer-mode")] {
if (devmode) {
engine().set_strict_dma(false);
}
return make_ready_future<>();
}).then([cfg] {
supervisor_notify("creating snitch");
return i_endpoint_snitch::create_snitch(cfg->endpoint_snitch());
// #293 - do not stop anything
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
}).then([api_address] {
supervisor_notify("determining DNS name");
return dns::gethostbyname(api_address);
}).then([&db, api_address, api_port, &ctx] (dns::hostent e){
supervisor_notify("starting API server");
auto ip = e.addresses[0].in.s_addr;
return ctx.http_server.start().then([api_address, api_port, ip, &ctx] {
return api::set_server_init(ctx);
}).then([api_address, api_port, ip, &ctx] {
return ctx.http_server.listen(ipv4_addr{ip, api_port});
}).then([api_address, api_port] {
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
}).get();
supervisor_notify("creating snitch");
i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
// #293 - do not stop anything
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
supervisor_notify("determining DNS name");
dns::hostent e = dns::gethostbyname(api_address).get0();
supervisor_notify("starting API server");
auto ip = e.addresses[0].in.s_addr;
ctx.http_server.start().get();
api::set_server_init(ctx).get();
ctx.http_server.listen(ipv4_addr{ip, api_port}).get();
print("Scylla API server listening on %s:%s ...\n", api_address, api_port);
supervisor_notify("initializing storage service");
init_storage_service(db).get();
supervisor_notify("starting per-shard database core");
// Note: changed from using a move here, because we want the config object intact.
db.start(std::ref(*cfg)).get();
engine().at_exit([&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.
return db.invoke_on_all([](auto& db) {
return db.stop();
}).then([] {
return sstables::await_background_jobs_on_all_shards();
}).then([] {
::_exit(0);
});
}).then([&db] {
supervisor_notify("initializing storage service");
return init_storage_service(db);
}).then([&ctx] {
return api::set_server_storage_service(ctx);
}).then([&db, cfg] {
});
supervisor_notify("creating data directories");
dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
supervisor_notify("creating commitlog directory");
dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
supervisor_notify("verifying data and commitlog directories");
std::unordered_set<sstring> directories;
directories.insert(db.local().get_config().data_file_directories().cbegin(),
db.local().get_config().data_file_directories().cend());
directories.insert(db.local().get_config().commitlog_directory());
parallel_for_each(directories, [&db] (sstring pathname) {
return disk_sanity(pathname, db.local().get_config().developer_mode());
}).get();
supervisor_notify("starting per-shard database core");
// Note: changed from using a move here, because we want the config object intact.
return db.start(std::ref(*cfg)).then([&db] {
engine().at_exit([&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.
return db.invoke_on_all([](auto& db) {
return db.stop();
}).then([] {
return sstables::await_background_jobs_on_all_shards();
}).then([] {
::_exit(0);
});
});
});
}).then([cfg, listen_address] {
supervisor_notify("starting gossip");
// Moved local parameters here, esp since with the
// ssl stuff it gets to be a lot.
uint16_t storage_port = cfg->storage_port();
uint16_t ssl_storage_port = cfg->ssl_storage_port();
double phi = cfg->phi_convict_threshold();
auto seed_provider= cfg->seed_provider();
sstring cluster_name = cfg->cluster_name();
const auto& ssl_opts = cfg->server_encryption_options();
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
auto trust_store = get_or_default(ssl_opts, "truststore");
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
return init_ms_fd_gossiper(listen_address
, storage_port
, ssl_storage_port
, encrypt_what
, trust_store
, cert
, key
, seed_provider
, cluster_name
, phi);
}).then([&ctx] {
return api::set_server_gossip(ctx);
}).then([&db] {
supervisor_notify("starting streaming service");
return streaming::stream_session::init_streaming_service(db);
}).then([&ctx] {
return api::set_server_stream_manager(ctx);
}).then([&db] {
supervisor_notify("starting messaging service");
// Start handling REPAIR_CHECKSUM_RANGE messages
return net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db] (auto& keyspace, auto& cf, auto& range) {
return checksum_range(db, keyspace, cf, range);
});
});
});
}).then([&ctx](){
return api::set_server_messaging_service(ctx);
}).then([&proxy, &db] {
supervisor_notify("starting storage proxy");
return proxy.start(std::ref(db)).then([&proxy] {
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
});
}).then([&ctx]() {
return api::set_server_storage_proxy(ctx);
}).then([&mm] {
supervisor_notify("starting migration manager");
return mm.start().then([&mm] {
// #293 - do not stop anything
// engine().at_exit([&mm] { return mm.stop(); });
});
}).then([&db, &proxy, &qp] {
supervisor_notify("starting query processor");
return qp.start(std::ref(proxy), std::ref(db)).then([&qp] {
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });
});
}).then([&qp] {
supervisor_notify("initializing batchlog manager");
return db::get_batchlog_manager().start(std::ref(qp)).then([] {
// #293 - do not stop anything
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
});
}).then([&db, &dirs] {
supervisor_notify("creating data directories");
return dirs.touch_and_lock(db.local().get_config().data_file_directories());
}).then([&db, &dirs] {
supervisor_notify("creating commitlog directory");
return dirs.touch_and_lock(db.local().get_config().commitlog_directory());
}).then([&db] {
supervisor_notify("verifying data and commitlog directories");
std::unordered_set<sstring> directories;
directories.insert(db.local().get_config().data_file_directories().cbegin(),
db.local().get_config().data_file_directories().cend());
directories.insert(db.local().get_config().commitlog_directory());
return do_with(std::move(directories), [&db] (auto& directories) {
return parallel_for_each(directories, [&db] (sstring pathname) {
return disk_sanity(pathname, db.local().get_config().developer_mode());
});
});
}).then([&db] {
supervisor_notify("loading sstables");
return db.invoke_on_all([] (database& db) {
return db.init_system_keyspace();
}).then([&db] {
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
auto cfm = pair.second;
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
});
});
}).then([&db, &proxy] {
supervisor_notify("loading sstables");
return db.invoke_on_all([&proxy] (database& db) {
return db.load_sstables(proxy);
});
}).then([&ctx] {
return api::set_server_load_sstable(ctx);
}).then([&db, &qp] {
supervisor_notify("setting up system keyspace");
return db::system_keyspace::setup(db, qp);
}).then([&db, &qp] {
supervisor_notify("starting commit log");
auto cl = db.local().commitlog();
if (cl == nullptr) {
// Deletion of previous stale, temporary SSTables is done by Shard0. Therefore,
// let's run Shard0 first. Technically, we could just have all shards agree on
// the deletion and just delete it later, but that is prone to races.
//
// Those races are not supposed to happen during normal operation, but if we have
// bugs, they can. Scylla's Github Issue #1014 is an example of a situation where
// that can happen, making existing problems worse. So running a single shard first
// and getting making sure that all temporary tables are deleted provides extra
// protection against such situations.
db.invoke_on(0, [] (database& db) { return db.init_system_keyspace(); }).get();
db.invoke_on_all([] (database& db) {
if (engine().cpu_id() == 0) {
return make_ready_future<>();
}
return cl->list_existing_segments().then([&db, &qp](auto paths) {
if (paths.empty()) {
return make_ready_future<>();
}
return db.init_system_keyspace();
}).get();
supervisor_notify("starting gossip");
// Moved local parameters here, esp since with the
// ssl stuff it gets to be a lot.
uint16_t storage_port = cfg->storage_port();
uint16_t ssl_storage_port = cfg->ssl_storage_port();
double phi = cfg->phi_convict_threshold();
auto seed_provider= cfg->seed_provider();
sstring cluster_name = cfg->cluster_name();
const auto& ssl_opts = cfg->server_encryption_options();
auto encrypt_what = get_or_default(ssl_opts, "internode_encryption", "none");
auto trust_store = get_or_default(ssl_opts, "truststore");
auto cert = get_or_default(ssl_opts, "certificate", relative_conf_dir("scylla.crt").string());
auto key = get_or_default(ssl_opts, "keyfile", relative_conf_dir("scylla.key").string());
init_ms_fd_gossiper(listen_address
, storage_port
, ssl_storage_port
, encrypt_what
, trust_store
, cert
, key
, seed_provider
, cluster_name
, phi).get();
supervisor_notify("starting messaging service");
supervisor_notify("starting storage proxy");
proxy.start(std::ref(db)).get();
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor_notify("starting migration manager");
mm.start().get();
// #293 - do not stop anything
// engine().at_exit([&mm] { return mm.stop(); });
supervisor_notify("starting query processor");
qp.start(std::ref(proxy), std::ref(db)).get();
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });
supervisor_notify("initializing batchlog manager");
db::get_batchlog_manager().start(std::ref(qp)).get();
// #293 - do not stop anything
// engine().at_exit([] { return db::get_batchlog_manager().stop(); });
supervisor_notify("loading sstables");
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
auto cfm = pair.second;
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
}).get();
supervisor_notify("loading sstables");
// See comment on top of our call to init_system_keyspace as per why we invoke
// on Shard0 first. Scylla's Github Issue #1014 for details
db.invoke_on(0, [&proxy] (database& db) { return db.load_sstables(proxy); }).get();
db.invoke_on_all([&proxy] (database& db) {
if (engine().cpu_id() == 0) {
return make_ready_future<>();
}
return db.load_sstables(proxy);
}).get();
supervisor_notify("setting up system keyspace");
db::system_keyspace::setup(db, qp).get();
supervisor_notify("starting commit log");
auto cl = db.local().commitlog();
if (cl != nullptr) {
auto paths = cl->list_existing_segments().get0();
if (!paths.empty()) {
supervisor_notify("replaying commit log");
return db::commitlog_replayer::create_replayer(qp).then([paths](auto rp) {
return do_with(std::move(rp), [paths = std::move(paths)](auto& rp) {
return rp.recover(paths);
});
}).then([&db] {
supervisor_notify("replaying commit log - flushing memtables");
return db.invoke_on_all([] (database& db) {
return db.flush_all_memtables();
});
}).then([paths] {
supervisor_notify("replaying commit log - removing old commitlog segments");
for (auto& path : paths) {
::unlink(path.c_str());
}
auto rp = db::commitlog_replayer::create_replayer(qp).get0();
rp.recover(paths).get();
supervisor_notify("replaying commit log - flushing memtables");
db.invoke_on_all([] (database& db) {
return db.flush_all_memtables();
}).get();
supervisor_notify("replaying commit log - removing old commitlog segments");
for (auto& path : paths) {
::unlink(path.c_str());
}
}
}
api::set_server_storage_service(ctx).get();
api::set_server_gossip(ctx).get();
api::set_server_messaging_service(ctx).get();
api::set_server_storage_proxy(ctx).get();
api::set_server_load_sstable(ctx).get();
supervisor_notify("initializing migration manager RPC verbs");
service::get_migration_manager().invoke_on_all([] (auto& mm) {
mm.init_messaging_service();
}).get();
supervisor_notify("initializing storage proxy RPC verbs");
proxy.invoke_on_all([] (service::storage_proxy& p) {
p.init_messaging_service();
}).get();
supervisor_notify("starting streaming service");
streaming::stream_session::init_streaming_service(db).get();
api::set_server_stream_manager(ctx).get();
// Start handling REPAIR_CHECKSUM_RANGE messages
net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range) {
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db] (auto& keyspace, auto& cf, auto& range) {
return checksum_range(db, keyspace, cf, range);
});
});
}).then([] {
supervisor_notify("starting storage service", true);
auto& ss = service::get_local_storage_service();
return ss.init_server();
}).then([&ctx] {
return api::set_server_storage_service(ctx);
}).then([] {
supervisor_notify("starting batchlog manager");
return db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();
});
}).then([&db] {
supervisor_notify("starting load broadcaster");
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
// casting to std::function<> will fail to compile
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
}).then([] {
return gms::get_local_gossiper().wait_for_gossip_to_settle();
}).then([&ctx] {
return api::set_server_gossip_settle(ctx);
}).then([start_thrift] () {
supervisor_notify("starting native transport");
return service::get_local_storage_service().start_native_transport().then([start_thrift] () {
if (start_thrift) {
return service::get_local_storage_service().start_rpc_server();
}
return make_ready_future<>();
});
}).then([&ctx] {
return api::set_server_done(ctx);
});
}).then([] {
}).get();
supervisor_notify("starting storage service", true);
auto& ss = service::get_local_storage_service();
ss.init_server().get();
api::set_server_storage_service(ctx).get();
supervisor_notify("starting batchlog manager");
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();
}).get();
supervisor_notify("starting load broadcaster");
// should be unique_ptr, but then lambda passed to at_exit will be non copieable and
// casting to std::function<> will fail to compile
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
gms::get_local_gossiper().wait_for_gossip_to_settle().get();
api::set_server_gossip_settle(ctx).get();
supervisor_notify("starting native transport");
service::get_local_storage_service().start_native_transport().get();
if (start_thrift) {
service::get_local_storage_service().start_rpc_server().get();
}
api::set_server_done(ctx).get();
supervisor_notify("serving");
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
engine().at_exit([] {

View File

@@ -113,11 +113,11 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
class messaging_service::rpc_protocol_client_wrapper {
std::unique_ptr<rpc_protocol::client> _p;
public:
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local = ipv4_addr())
: _p(std::make_unique<rpc_protocol::client>(proto, addr, local)) {
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr())
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
}
rpc_protocol_client_wrapper(rpc_protocol& proto, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
: _p(std::make_unique<rpc_protocol::client>(proto, addr, seastar::tls::connect(c, addr, local)))
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, seastar::tls::connect(c, addr, local)))
{}
auto get_stats() const { return _p->get_stats(); }
future<> stop() { return _p->stop(); }
@@ -391,10 +391,14 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
auto remote_addr = ipv4_addr(get_preferred_ip(id.addr).raw_addr(), must_encrypt ? _ssl_port : _port);
auto local_addr = ipv4_addr{_listen_address.raw_addr(), 0};
rpc::client_options opts;
// send keepalive messages each minute if connection is idle, drop connection after 10 failures
opts.keepalive = std::experimental::optional<net::tcp_keepalive_params>({60s, 60s, 10});
auto client = must_encrypt ?
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
remote_addr, local_addr, _credentials) :
::make_shared<rpc_protocol_client_wrapper>(*_rpc,
::make_shared<rpc_protocol_client_wrapper>(*_rpc, std::move(opts),
remote_addr, local_addr);
it = _clients[idx].emplace(id, shard_info(std::move(client))).first;

Submodule seastar updated: 906b562a04...0739576bd6

View File

@@ -50,18 +50,21 @@ public:
virtual ~migration_listener()
{ }
// The callback runs inside seastar thread
virtual void on_create_keyspace(const sstring& ks_name) = 0;
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) = 0;
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) = 0;
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
// The callback runs inside seastar thread
virtual void on_update_keyspace(const sstring& ks_name) = 0;
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) = 0;
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) = 0;
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) = 0;
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
// The callback runs inside seastar thread
virtual void on_drop_keyspace(const sstring& ks_name) = 0;
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) = 0;
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) = 0;

View File

@@ -66,9 +66,43 @@ migration_manager::migration_manager()
future<> migration_manager::stop()
{
if (ms_inited) {
uninit_messaging_service();
}
return make_ready_future<>();
}
void migration_manager::init_messaging_service()
{
ms_inited = true;
auto& ms = net::get_local_messaging_service();
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
auto src = net::messaging_service::get_source(cinfo);
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
return service::get_local_migration_manager().merge_schema_from(src, mutations);
}).then_wrapped([src] (auto&& f) {
if (f.failed()) {
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
} else {
logger.debug("Applied definitions update from {}.", src);
}
});
return net::messaging_service::no_wait();
});
ms.register_migration_request([this] () {
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
// keep local proxy alive
});
});
}
void migration_manager::uninit_messaging_service()
{
auto& ms = net::get_local_messaging_service();
ms.unregister_migration_request();
ms.unregister_definitions_update();
}
void migration_manager::register_listener(migration_listener* listener)
{
_listeners.emplace_back(listener);
@@ -197,8 +231,8 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
}
void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
{
future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
try {
@@ -207,10 +241,11 @@ void migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_meta
logger.warn("Create keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
}
void migration_manager::notify_create_column_family(const schema_ptr& cfm)
{
future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
for (auto&& listener : _listeners) {
@@ -220,6 +255,7 @@ void migration_manager::notify_create_column_family(const schema_ptr& cfm)
logger.warn("Create column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
#if 0
@@ -242,8 +278,8 @@ public void notifyCreateAggregate(UDAggregate udf)
}
#endif
void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm)
{
future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
try {
@@ -252,10 +288,11 @@ void migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_meta
logger.warn("Update keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
}
void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed)
{
future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed) {
return seastar::async([this, cfm, columns_changed] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
for (auto&& listener : _listeners) {
@@ -265,6 +302,7 @@ void migration_manager::notify_update_column_family(const schema_ptr& cfm, bool
logger.warn("Update column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
#if 0
@@ -287,8 +325,8 @@ public void notifyUpdateAggregate(UDAggregate udf)
}
#endif
void migration_manager::notify_drop_keyspace(const sstring& ks_name)
{
future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
return seastar::async([this, ks_name] {
for (auto&& listener : _listeners) {
try {
listener->on_drop_keyspace(ks_name);
@@ -296,10 +334,11 @@ void migration_manager::notify_drop_keyspace(const sstring& ks_name)
logger.warn("Drop keyspace notification failed {}: {}", ks_name, std::current_exception());
}
}
});
}
void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
{
future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& cf_name = cfm->cf_name();
auto&& ks_name = cfm->ks_name();
for (auto&& listener : _listeners) {
@@ -309,6 +348,7 @@ void migration_manager::notify_drop_column_family(const schema_ptr& cfm)
logger.warn("Drop column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
}
#if 0

View File

@@ -52,10 +52,12 @@
namespace service {
class migration_manager {
class migration_manager : public seastar::async_sharded_service<migration_manager> {
std::vector<migration_listener*> _listeners;
static const std::chrono::milliseconds migration_delay;
bool ms_inited = false;
public:
migration_manager();
@@ -79,12 +81,12 @@ public:
// Keep mutations alive around whole async operation.
future<> merge_schema_from(net::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations);
void notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
void notify_create_column_family(const schema_ptr& cfm);
void notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
void notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
void notify_drop_keyspace(const sstring& ks_name);
void notify_drop_column_family(const schema_ptr& cfm);
future<> notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
future<> notify_create_column_family(const schema_ptr& cfm);
future<> notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
future<> notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
future<> notify_drop_keyspace(const sstring& ks_name);
future<> notify_drop_column_family(const schema_ptr& cfm);
bool should_pull_schema_from(const gms::inet_address& endpoint);
@@ -118,6 +120,10 @@ public:
future<> stop();
bool is_ready_for_bootstrap();
void init_messaging_service();
private:
void uninit_messaging_service();
};
extern distributed<migration_manager> _the_migration_manager;

View File

@@ -1,90 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Modified by ScyllaDB
* Copyright 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "service/pending_range_calculator_service.hh"
#include "service/storage_service.hh"
#include "database.hh"
#include <seastar/core/sleep.hh>
namespace service {
distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
void pending_range_calculator_service::run() {
// long start = System.currentTimeMillis();
auto keyspaces = _db.local().get_non_system_keyspaces();
for (auto& keyspace_name : keyspaces) {
auto& ks = _db.local().find_keyspace(keyspace_name);
calculate_pending_ranges(ks.get_replication_strategy(), keyspace_name);
}
_update_jobs--;
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
}
void pending_range_calculator_service::calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name) {
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
}
future<> pending_range_calculator_service::stop() {
return make_ready_future<>();
}
future<> pending_range_calculator_service::update() {
return smp::submit_to(0, [] {
get_local_pending_range_calculator_service().do_update();
});
}
void pending_range_calculator_service::do_update() {
assert(engine().cpu_id() == 0);
get_local_pending_range_calculator_service()._update_jobs++;
get_local_pending_range_calculator_service().run();
}
future<> pending_range_calculator_service::block_until_finished() {
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
return smp::submit_to(0, [] {
return do_until(
[] { return !(get_local_pending_range_calculator_service()._update_jobs > 0); },
[] { return sleep(std::chrono::milliseconds(100)); });
});
}
}

View File

@@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Modified by ScyllaDB
* Copyright 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "locator/abstract_replication_strategy.hh"
#include "database.hh"
#include <seastar/core/sharded.hh>
namespace service {
class pending_range_calculator_service {
private:
int _update_jobs{0};
distributed<database>& _db;
void calculate_pending_ranges(locator::abstract_replication_strategy& strategy, const sstring& keyspace_name);
void run();
public:
pending_range_calculator_service(distributed<database>& db) : _db(db) {}
void do_update();
future<> update();
future<> block_until_finished();
future<> stop();
};
extern distributed<pending_range_calculator_service> _the_pending_range_calculator_service;
inline distributed<pending_range_calculator_service>& get_pending_range_calculator_service() {
return _the_pending_range_calculator_service;
}
inline pending_range_calculator_service& get_local_pending_range_calculator_service() {
return _the_pending_range_calculator_service.local();
}
} // service

View File

@@ -205,12 +205,20 @@ class datacenter_write_response_handler : public abstract_write_response_handler
}
}
public:
using abstract_write_response_handler::abstract_write_response_handler;
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
std::move(targets), boost::range::count_if(pending_endpoints, db::is_local), std::move(dead_endpoints)) {}
};
class write_response_handler : public abstract_write_response_handler {
public:
using abstract_write_response_handler::abstract_write_response_handler;
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
std::move(targets), pending_endpoints.size(), std::move(dead_endpoints)) {}
};
class datacenter_sync_write_response_handler : public abstract_write_response_handler {
@@ -228,16 +236,20 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
public:
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
schema_ptr s,
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, size_t pending_endpoints,
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address> pending_endpoints,
std::vector<gms::inet_address> dead_endpoints) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, pending_endpoints, dead_endpoints) {
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, 0, dead_endpoints) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
for (auto& target : targets) {
auto dc = snitch_ptr->get_datacenter(target);
if (_dc_responses.find(dc) == _dc_responses.end()) {
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc));
auto pending_for_dc = boost::range::count_if(pending_endpoints, [&snitch_ptr, &dc] (gms::inet_address& ep){
return snitch_ptr->get_datacenter(ep) == dc;
});
_dc_responses.emplace(dc, db::local_quorum_for(ks, dc) + pending_for_dc).first;
_pending_endpoints += pending_for_dc;
}
}
}
@@ -316,24 +328,21 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(sch
{
std::unique_ptr<abstract_write_response_handler> h;
auto& rs = ks.get_replication_strategy();
size_t pending_count = pending_endpoints.size();
auto m = make_lw_shared<const frozen_mutation>(std::move(mutation));
if (db::is_datacenter_local(cl)) {
pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local);
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
} else {
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
}
return register_response_handler(std::move(h));
}
storage_proxy::~storage_proxy() {}
storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
init_messaging_service();
_collectd_registrations = std::make_unique<scollectd::registrations>(scollectd::registrations({
scollectd::add_polled_metric(scollectd::type_instance_id("storage_proxy"
, scollectd::per_cpu_plugin_instance
@@ -2686,24 +2695,6 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
void storage_proxy::init_messaging_service() {
auto& ms = net::get_local_messaging_service();
ms.register_definitions_update([] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
auto src = net::messaging_service::get_source(cinfo);
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
return service::get_local_migration_manager().merge_schema_from(src, mutations);
}).then_wrapped([src] (auto&& f) {
if (f.failed()) {
logger.error("Failed to update definitions from {}: {}", src, f.get_exception());
} else {
logger.debug("Applied definitions update from {}.", src);
}
});
return net::messaging_service::no_wait();
});
ms.register_migration_request([] () {
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
// keep local proxy alive
});
});
ms.register_mutation([] (const rpc::client_info& cinfo, frozen_mutation in, std::vector<gms::inet_address> forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id) {
return do_with(std::move(in), get_local_shared_storage_proxy(), [&cinfo, forward = std::move(forward), reply_to, shard, response_id] (const frozen_mutation& m, shared_ptr<storage_proxy>& p) {
return when_all(
@@ -2791,8 +2782,6 @@ void storage_proxy::init_messaging_service() {
void storage_proxy::uninit_messaging_service() {
auto& ms = net::get_local_messaging_service();
ms.unregister_definitions_update();
ms.unregister_migration_request();
ms.unregister_mutation();
ms.unregister_mutation_done();
ms.unregister_read_data();

View File

@@ -121,7 +121,6 @@ private:
std::uniform_real_distribution<> _read_repair_chance = std::uniform_real_distribution<>(0,1);
std::unique_ptr<scollectd::registrations> _collectd_registrations;
private:
void init_messaging_service();
void uninit_messaging_service();
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges, db::consistency_level cl);
response_id_type register_response_handler(std::unique_ptr<abstract_write_response_handler>&& h);
@@ -174,6 +173,8 @@ public:
return _db;
}
void init_messaging_service();
future<> mutate_locally(const mutation& m);
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m);
future<> mutate_locally(std::vector<mutation> mutations);

View File

@@ -54,7 +54,6 @@
#include "locator/local_strategy.hh"
#include "version.hh"
#include "unimplemented.hh"
#include "service/pending_range_calculator_service.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "dht/range_streamer.hh"
@@ -266,7 +265,7 @@ void storage_service::join_token_ring(int delay) {
}
set_mode(mode::JOINING, "schema complete, ready to bootstrap", true);
set_mode(mode::JOINING, "waiting for pending range calculation", true);
get_local_pending_range_calculator_service().block_until_finished().get();
block_until_update_pending_ranges_finished().get();
set_mode(mode::JOINING, "calculation complete, ready to bootstrap", true);
logger.debug("... got ring + schema info");
@@ -293,7 +292,7 @@ void storage_service::join_token_ring(int delay) {
set_mode(mode::JOINING, "waiting for schema information to complete", true);
sleep(std::chrono::seconds(1)).get();
}
get_local_pending_range_calculator_service().block_until_finished().get();
block_until_update_pending_ranges_finished().get();
}
logger.info("Checking bootstrapping/leaving/moving nodes: ok");
@@ -477,7 +476,7 @@ void storage_service::handle_state_bootstrap(inet_address endpoint) {
}
_token_metadata.add_bootstrap_tokens(tokens, endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
auto& gossiper = gms::get_local_gossiper();
if (gossiper.uses_host_id(endpoint)) {
@@ -575,7 +574,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
// a race where natural endpoint was updated to contain node A, but A was
// not yet removed from pending endpoints
_token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint);
get_local_pending_range_calculator_service().do_update();
do_update_pending_ranges();
for (auto ep : endpoints_to_remove) {
remove_endpoint(ep);
@@ -622,7 +621,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
}).get();
}
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
if (logger.is_enabled(logging::log_level::debug)) {
auto ver = _token_metadata.get_ring_version();
for (auto& x : _token_metadata.get_token_to_endpoint()) {
@@ -657,7 +656,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) {
// at this point the endpoint is certainly a member with this token, so let's proceed
// normally
_token_metadata.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces) {
@@ -680,7 +679,7 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector<sst
auto token = dht::global_partitioner().from_sstring(pieces[1]);
logger.debug("Node {} state moving, new token {}", endpoint, token);
_token_metadata.add_moving_endpoint(token, endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
@@ -710,7 +709,7 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector<s
logger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
// Note that the endpoint is being removed
_token_metadata.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
// find the endpoint coordinating this removal that we need to notify when we're done
auto state = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!state) {
@@ -831,7 +830,7 @@ void storage_service::on_change(inet_address endpoint, application_state state,
void storage_service::on_remove(gms::inet_address endpoint) {
logger.debug("endpoint={} on_remove", endpoint);
_token_metadata.remove_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
@@ -980,9 +979,7 @@ future<> storage_service::drain_on_shutdown() {
ss.shutdown_client_servers().get();
logger.info("Drain on shutdown: shutdown rpc and cql server done");
net::get_messaging_service().invoke_on_all([] (auto& ms) {
return ms.stop();
}).get();
ss.do_stop_ms().get();
logger.info("Drain on shutdown: shutdown messaging_service done");
auth::auth::shutdown().get();
@@ -995,6 +992,14 @@ future<> storage_service::drain_on_shutdown() {
return db.commitlog()->shutdown();
}).get();
logger.info("Drain on shutdown: shutdown commitlog done");
// NOTE: We currently don't destory migration_manager nor
// storage_service in scylla, so when we reach here
// migration_manager should to be still alive. Be careful, when
// scylla starts to destroy migration_manager in the shutdown
// process.
service::get_local_migration_manager().unregister_listener(&ss);
logger.info("Drain on shutdown: done");
});
});
@@ -1066,6 +1071,10 @@ future<> storage_service::init_server(int delay) {
logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
#endif
_initialized = true;
// Register storage_service to migration_manager so we can update
// pending ranges when keyspace is chagned
service::get_local_migration_manager().register_listener(this);
#if 0
try
{
@@ -1470,6 +1479,18 @@ future<> storage_service::stop_gossiping() {
});
}
future<> storage_service::do_stop_ms() {
if (_ms_stopped) {
return make_ready_future<>();
}
_ms_stopped = true;
return net::get_messaging_service().invoke_on_all([] (auto& ms) {
return ms.stop();
}).then([] {
logger.info("messaging_service stopped");
});
}
future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) {
auto& ks = db.find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) {
@@ -1763,7 +1784,7 @@ future<> storage_service::decommission() {
throw std::runtime_error(sprint("Node in %s state; wait for status to become normal or restart", ss._operation_mode));
}
get_local_pending_range_calculator_service().block_until_finished().get();
ss.update_pending_ranges().get();
auto non_system_keyspaces = db.get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
@@ -1788,9 +1809,7 @@ future<> storage_service::decommission() {
logger.debug("DECOMMISSIONING: shutdown rpc and cql server done");
gms::get_local_gossiper().stop_gossiping().get();
logger.debug("DECOMMISSIONING: stop_gossiping done");
net::get_messaging_service().invoke_on_all([] (auto& ms) {
return ms.stop();
}).get();
ss.do_stop_ms().get();
// StageManager.shutdownNow();
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::DECOMMISSIONED).get();
logger.debug("DECOMMISSIONING: set_bootstrap_state done");
@@ -1862,7 +1881,7 @@ future<> storage_service::remove_node(sstring host_id_string) {
}
ss._removing_node = endpoint;
tm.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
ss.update_pending_ranges().get();
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
// we add our own token so other nodes to let us know when they're done
@@ -1942,10 +1961,8 @@ future<> storage_service::drain() {
ss.shutdown_client_servers().get();
gms::get_local_gossiper().stop_gossiping().get();
ss.set_mode(mode::DRAINING, "shutting down MessageService", false);
net::get_messaging_service().invoke_on_all([] (auto& ms) {
return ms.stop();
}).get();
ss.set_mode(mode::DRAINING, "shutting down messaging_service", false);
ss.do_stop_ms().get();
#if 0
StorageProxy.instance.verifyNoHintsInProgress();
@@ -2236,7 +2253,7 @@ void storage_service::excise(std::unordered_set<token> tokens, inet_address endp
}
}).get();
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
}
void storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, int64_t expire_time) {
@@ -2285,7 +2302,7 @@ future<> storage_service::confirm_replication(inet_address node) {
void storage_service::leave_ring() {
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get();
_token_metadata.remove_endpoint(get_broadcast_address());
get_local_pending_range_calculator_service().update().get();
update_pending_ranges().get();
auto& gossiper = gms::get_local_gossiper();
auto expire_time = gossiper.compute_expire_time().time_since_epoch().count();
@@ -2384,7 +2401,7 @@ future<> storage_service::start_leaving() {
auto& gossiper = gms::get_local_gossiper();
return gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens())).then([this] {
_token_metadata.add_leaving_endpoint(get_broadcast_address());
return get_local_pending_range_calculator_service().update();
return update_pending_ranges();
});
}
@@ -2741,7 +2758,7 @@ future<> storage_service::move(token new_token) {
auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces();
get_local_pending_range_calculator_service().block_until_finished().get();
ss.block_until_update_pending_ranges_finished().get();
// checking if data is moving to this node
for (auto keyspace_name : keyspaces_to_process) {
@@ -2786,5 +2803,46 @@ std::chrono::milliseconds storage_service::get_ring_delay() {
return std::chrono::milliseconds(ring_delay);
}
void storage_service::do_update_pending_ranges() {
if (engine().cpu_id() != 0) {
throw std::runtime_error("do_update_pending_ranges should be called on cpu zero");
}
// long start = System.currentTimeMillis();
auto keyspaces = _db.local().get_non_system_keyspaces();
for (auto& keyspace_name : keyspaces) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
get_local_storage_service().get_token_metadata().calculate_pending_ranges(strategy, keyspace_name);
}
// logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
}
future<> storage_service::update_pending_ranges() {
return get_storage_service().invoke_on(0, [] (auto& ss){
ss._update_jobs++;
ss.do_update_pending_ranges();
// calculate_pending_ranges will modify token_metadata, we need to repliate to other cores
return ss.replicate_to_all_cores().finally([&ss, ss0 = ss.shared_from_this()] {
ss._update_jobs--;
});
});
}
future<> storage_service::block_until_update_pending_ranges_finished() {
// We want to be sure the job we're blocking for is actually finished and we can't trust the TPE's active job count
return smp::submit_to(0, [] {
return do_until(
[] { return !(get_local_storage_service()._update_jobs > 0); },
[] { return sleep(std::chrono::milliseconds(100)); });
});
}
future<> storage_service::keyspace_changed(const sstring& ks_name) {
// Update pending ranges since keyspace can be changed after we calculate pending ranges.
return update_pending_ranges().handle_exception([ks_name] (auto ep) {
logger.warn("Failed to update pending ranges for ks = {}: {}", ks_name, ep);
});
}
} // namespace service

View File

@@ -84,7 +84,7 @@ int get_generation_number();
* This class will also maintain histograms of the load information
* of other nodes in the cluster.
*/
class storage_service : public gms::i_endpoint_state_change_subscriber, public seastar::async_sharded_service<storage_service> {
class storage_service : public service::migration_listener, public gms::i_endpoint_state_change_subscriber, public seastar::async_sharded_service<storage_service> {
public:
struct snapshot_details {
int64_t live;
@@ -108,6 +108,7 @@ private:
private final AtomicLong notificationSerialNumber = new AtomicLong();
#endif
distributed<database>& _db;
int _update_jobs{0};
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
// given objective criteria
@@ -119,6 +120,7 @@ private:
shared_ptr<distributed<transport::cql_server>> _cql_server;
shared_ptr<distributed<thrift_server>> _thrift_server;
sstring _operation_in_progress;
bool _ms_stopped = false;
public:
storage_service(distributed<database>& db)
: _db(db) {
@@ -127,6 +129,11 @@ public:
// Needed by distributed<>
future<> stop();
future<> keyspace_changed(const sstring& ks_name);
void do_update_pending_ranges();
future<> update_pending_ranges();
future<> block_until_update_pending_ranges_finished();
const locator::token_metadata& get_token_metadata() const {
return _token_metadata;
}
@@ -285,6 +292,7 @@ public:
private:
future<> do_stop_rpc_server();
future<> do_stop_native_transport();
future<> do_stop_ms();
#if 0
public void stopTransports()
{
@@ -696,6 +704,26 @@ public:
virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override;
virtual void on_remove(gms::inet_address endpoint) override;
virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
public:
// For migration_listener
virtual void on_create_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_update_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
private:
void update_peer_info(inet_address endpoint);
void do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value);

View File

@@ -495,52 +495,59 @@ class mutation_reader::impl {
private:
mp_row_consumer _consumer;
std::experimental::optional<data_consume_context> _context;
std::experimental::optional<future<data_consume_context>> _context_future;
std::function<future<data_consume_context> ()> _get_context;
public:
impl(sstable& sst, schema_ptr schema, uint64_t start, uint64_t end,
const io_priority_class &pc)
: _consumer(schema, pc)
, _context(sst.data_consume_rows(_consumer, start, end)) { }
, _get_context([&sst, this, start, end] {
return make_ready_future<data_consume_context>(sst.data_consume_rows(_consumer, start, end));
}) { }
impl(sstable& sst, schema_ptr schema,
const io_priority_class &pc)
: _consumer(schema, pc)
, _context(sst.data_consume_rows(_consumer)) { }
impl(sstable& sst, schema_ptr schema, future<uint64_t> start, future<uint64_t> end, const io_priority_class& pc)
, _get_context([this, &sst] {
return make_ready_future<data_consume_context>(sst.data_consume_rows(_consumer));
}) { }
impl(sstable& sst, schema_ptr schema, std::function<future<uint64_t>()> start, std::function<future<uint64_t>()> end, const io_priority_class& pc)
: _consumer(schema, pc)
, _context_future(start.then([this, &sst, end = std::move(end)] (uint64_t start) mutable {
return end.then([this, &sst, start] (uint64_t end) mutable {
return sst.data_consume_rows(_consumer, start, end);
});
})) { }
impl() : _consumer() { }
, _get_context([this, &sst, start = std::move(start), end = std::move(end)] () {
return start().then([this, &sst, end = std::move(end)] (uint64_t start) {
return end().then([this, &sst, start] (uint64_t end) {
return make_ready_future<data_consume_context>(sst.data_consume_rows(_consumer, start, end));
});
});
}) { }
impl() : _consumer(), _get_context() { }
// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
impl(impl&&) = delete;
impl(const impl&) = delete;
future<mutation_opt> read() {
if (_context) {
return _context->read().then([this] {
// We want after returning a mutation that _consumer.mut()
// will be left in unengaged state (so on EOF we return an
// unengaged optional). Moving _consumer.mut is *not* enough.
auto ret = std::move(_consumer.mut);
_consumer.mut = {};
return std::move(ret);
});
} else if (_context_future) {
return _context_future->then([this] (auto context) {
_context = std::move(context);
return _context->read().then([this] {
auto ret = std::move(_consumer.mut);
_consumer.mut = {};
return std::move(ret);
});
});
} else {
if (!_get_context) {
// empty mutation reader returns EOF immediately
return make_ready_future<mutation_opt>();
}
if (_context) {
return do_read();
}
return (_get_context)().then([this] (data_consume_context context) {
_context = std::move(context);
return do_read();
});
}
private:
future<mutation_opt> do_read() {
return _context->read().then([this] {
// We want after returning a mutation that _consumer.mut()
// will be left in unengaged state (so on EOF we return an
// unengaged optional). Moving _consumer.mut is *not* enough.
auto ret = std::move(_consumer.mut);
_consumer.mut = {};
return std::move(ret);
});
}
};
@@ -649,17 +656,19 @@ sstable::read_range_rows(schema_ptr schema, const query::partition_range& range,
fail(unimplemented::cause::WRAP_AROUND);
}
future<uint64_t> start = range.start()
? (range.start()->is_inclusive()
auto start = [this, range, schema, &pc] {
return range.start() ? (range.start()->is_inclusive()
? lower_bound(schema, range.start()->value(), pc)
: upper_bound(schema, range.start()->value(), pc))
: make_ready_future<uint64_t>(0);
};
future<uint64_t> end = range.end()
? (range.end()->is_inclusive()
auto end = [this, range, schema, &pc] {
return range.end() ? (range.end()->is_inclusive()
? upper_bound(schema, range.end()->value(), pc)
: lower_bound(schema, range.end()->value(), pc))
: make_ready_future<uint64_t>(data_size());
};
return std::make_unique<mutation_reader::impl>(
*this, std::move(schema), std::move(start), std::move(end), pc);

View File

@@ -52,7 +52,14 @@ namespace sstables {
logging::logger sstlog("sstable");
thread_local std::unordered_map<sstring, unsigned> sstable::_shards_agreeing_to_remove_sstable;
future<file> new_sstable_component_file(sstring name, open_flags flags) {
return open_file_dma(name, flags).handle_exception([name] (auto ep) {
sstlog.error("Could not create SSTable component {}. Found exception: {}", name, ep);
return make_exception_future<file>(ep);
});
}
thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> sstable::_shards_agreeing_to_remove_sstable;
static utils::phased_barrier& background_jobs() {
static thread_local utils::phased_barrier gate;
@@ -749,7 +756,19 @@ void sstable::write_toc(const io_priority_class& pc) {
sstlog.debug("Writing TOC file {} ", file_path);
// Writing TOC content to temporary file.
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
// If creation of temporary TOC failed, it implies that that boot failed to
// delete a sstable with temporary for this column family, or there is a
// sstable being created in parallel with the same generation.
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
bool toc_exists = file_exists(filename(sstable::component_type::TOC)).get0();
if (toc_exists) {
// TOC will exist at this point if write_components() was called with
// the generation of a sstable that exists.
f.close().get();
remove_file(file_path).get();
throw std::runtime_error(sprint("SSTable write failed due to existence of TOC file for generation %ld of %s.%s", _generation, _ks, _cf));
}
file_output_stream_options options;
options.buffer_size = 4096;
@@ -792,7 +811,7 @@ void write_crc(const sstring file_path, checksum& c) {
sstlog.debug("Writing CRC file {} ", file_path);
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
file f = open_file_dma(file_path, oflags).get0();
file f = new_sstable_component_file(file_path, oflags).get0();
file_output_stream_options options;
options.buffer_size = 4096;
@@ -806,7 +825,7 @@ void write_digest(const sstring file_path, uint32_t full_checksum) {
sstlog.debug("Writing Digest file {} ", file_path);
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
auto f = open_file_dma(file_path, oflags).get0();
auto f = new_sstable_component_file(file_path, oflags).get0();
file_output_stream_options options;
options.buffer_size = 4096;
@@ -877,7 +896,7 @@ template <sstable::component_type Type, typename T>
void sstable::write_simple(T& component, const io_priority_class& pc) {
auto file_path = filename(Type);
sstlog.debug(("Writing " + _component_map[Type] + " file {} ").c_str(), file_path);
file f = open_file_dma(file_path, open_flags::wo | open_flags::create | open_flags::truncate).get0();
file f = new_sstable_component_file(file_path, open_flags::wo | open_flags::create | open_flags::exclusive).get0();
file_output_stream_options options;
options.buffer_size = sstable_buffer_size;
@@ -946,8 +965,8 @@ future<> sstable::open_data() {
future<> sstable::create_data() {
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
return when_all(open_file_dma(filename(component_type::Index), oflags),
open_file_dma(filename(component_type::Data), oflags)).then([this] (auto files) {
return when_all(new_sstable_component_file(filename(component_type::Index), oflags),
new_sstable_component_file(filename(component_type::Data), oflags)).then([this] (auto files) {
// FIXME: If both files could not be created, the first get below will
// throw an exception, and second get() will not be attempted, and
// we'll get a warning about the second future being destructed
@@ -1723,9 +1742,11 @@ sstable::shared_remove_by_toc_name(sstring toc_name, bool shared) {
return remove_by_toc_name(toc_name);
} else {
auto shard = std::hash<sstring>()(toc_name) % smp::count;
return smp::submit_to(shard, [toc_name] {
auto& counter = _shards_agreeing_to_remove_sstable[toc_name];
if (++counter == smp::count) {
return smp::submit_to(shard, [toc_name, src_shard = engine().cpu_id()] {
auto& remove_set = _shards_agreeing_to_remove_sstable[toc_name];
remove_set.insert(src_shard);
auto counter = remove_set.size();
if (counter == smp::count) {
_shards_agreeing_to_remove_sstable.erase(toc_name);
return remove_by_toc_name(toc_name);
} else {

View File

@@ -343,7 +343,7 @@ private:
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
static thread_local std::unordered_map<sstring, unsigned> _shards_agreeing_to_remove_sstable;
static thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> _shards_agreeing_to_remove_sstable;
std::unordered_set<component_type, enum_hash<component_type>> _components;

View File

@@ -41,7 +41,6 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "service/storage_service.hh"
#include "service/pending_range_calculator_service.hh"
#include "auth/auth.hh"
// TODO : remove once shutdown is ok.
@@ -49,12 +48,8 @@
// Simpler to copy the code from init.cc than trying to do clever parameterization
// and whatnot.
static future<> tst_init_storage_service(distributed<database>& db) {
return service::get_pending_range_calculator_service().start(std::ref(db)).then([] {
engine().at_exit([] { return service::get_pending_range_calculator_service().stop(); });
}).then([&db] {
return service::init_storage_service(db).then([] {
engine().at_exit([] { return service::deinit_storage_service(); });
});
return service::init_storage_service(db).then([] {
engine().at_exit([] { return service::deinit_storage_service(); });
});
}
@@ -343,7 +338,6 @@ public:
_db->stop().get();
service::get_storage_service().stop().get();
service::get_pending_range_calculator_service().stop().get();
locator::i_endpoint_snitch::stop_snitch().get();

View File

@@ -29,7 +29,6 @@
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "core/reactor.hh"
#include "service/pending_range_calculator_service.hh"
#include "service/storage_service.hh"
#include "core/distributed.hh"
#include "database.hh"
@@ -39,7 +38,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
distributed<database> db;
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
service::get_pending_range_calculator_service().start(std::ref(db));
service::get_storage_service().start(std::ref(db)).get();
db.start().get();
net::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
@@ -51,7 +49,6 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
net::get_messaging_service().stop().get();
db.stop().get();
service::get_storage_service().stop().get();
service::get_pending_range_calculator_service().stop().get();
locator::i_endpoint_snitch::stop_snitch().get();
});
}

View File

@@ -68,6 +68,7 @@ future<>
with_column_family(schema_ptr s, column_family::config cfg, Func func) {
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
return func(*cf).then([cf, cm] {
return cf->stop();
}).finally([cf, cm] {});
@@ -404,9 +405,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
auto cm = make_lw_shared<compaction_manager>();
return do_with(make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm), [s, cm] (auto& cf_ptr) mutable {
column_family& cf = *cf_ptr;
return with_column_family(s, cfg, [s] (auto& cf) mutable {
std::map<int32_t, std::map<int32_t, int32_t>> shadow, result;
const column_definition& r1_col = *s->get_column_definition("r1");

View File

@@ -987,6 +987,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
cfg.enable_incremental_backups = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
cf->start();
cf->mark_ready_for_writes();
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
auto generations = make_lw_shared<std::vector<unsigned long>>({1, 2, 3, 4});
@@ -1063,6 +1064,7 @@ SEASTAR_TEST_CASE(compact) {
auto s = builder.build();
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
return open_sstables("tests/sstables/compaction", {1,2,3}).then([s = std::move(s), cf, cm, generation] (auto sstables) {
return test_setup::do_with_test_directory([sstables, s, generation, cf, cm] {
@@ -1161,6 +1163,7 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", utf8_type}}, {}, utf8_type));
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
auto generations = make_lw_shared<std::vector<unsigned long>>(std::move(generations_to_compact));
auto sstables = make_lw_shared<std::vector<sstables::shared_sstable>>();
@@ -1670,6 +1673,7 @@ SEASTAR_TEST_CASE(leveled_01) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1714,6 +1718,7 @@ SEASTAR_TEST_CASE(leveled_02) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1768,6 +1773,7 @@ SEASTAR_TEST_CASE(leveled_03) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -1826,6 +1832,7 @@ SEASTAR_TEST_CASE(leveled_04) {
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(50);
auto min_key = key_and_token_pair[0].first;
@@ -2159,6 +2166,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
}).then([s, tmp, sstables] {
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
auto create = [tmp] {
return make_lw_shared<sstable>("ks", "cf", tmp->path, 3, la, big);
};
@@ -2259,6 +2267,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
};
auto cm = make_lw_shared<compaction_manager>();
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm);
cf->mark_ready_for_writes();
std::vector<shared_sstable> sstables;
sstables.push_back(std::move(sstp));

View File

@@ -36,6 +36,7 @@
#include "database.hh"
#include <memory>
#include "sstable_test.hh"
#include "tmpdir.hh"
using namespace sstables;
@@ -169,10 +170,11 @@ SEASTAR_TEST_CASE(big_summary_query_32) {
return summary_query<32, 0xc4000, 182>("tests/sstables/bigsummary", 76);
}
static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
auto sst = make_lw_shared<sstable>("ks", "cf", dir, generation, la, big);
return sst->load().then([sst, generation] {
static future<sstable_ptr> do_write_sst(sstring load_dir, sstring write_dir, unsigned long generation) {
auto sst = make_lw_shared<sstable>("ks", "cf", load_dir, generation, la, big);
return sst->load().then([sst, write_dir, generation] {
sstables::test(sst).change_generation_number(generation + 1);
sstables::test(sst).change_dir(write_dir);
auto fut = sstables::test(sst).store();
return std::move(fut).then([sst = std::move(sst)] {
return make_ready_future<sstable_ptr>(std::move(sst));
@@ -180,8 +182,8 @@ static future<sstable_ptr> do_write_sst(sstring dir, unsigned long generation) {
});
}
static future<> write_sst_info(sstring dir, unsigned long generation) {
return do_write_sst(dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
static future<> write_sst_info(sstring load_dir, sstring write_dir, unsigned long generation) {
return do_write_sst(load_dir, write_dir, generation).then([] (auto ptr) { return make_ready_future<>(); });
}
using bufptr_t = std::unique_ptr<char [], free_deleter>;
@@ -223,11 +225,12 @@ static future<> compare_files(sstdesc file1, sstdesc file2, sstable::component_t
}
static future<> check_component_integrity(sstable::component_type component) {
return write_sst_info("tests/sstables/compressed", 1).then([component] {
auto tmp = make_lw_shared<tmpdir>();
return write_sst_info("tests/sstables/compressed", tmp->path, 1).then([component, tmp] {
return compare_files(sstdesc{"tests/sstables/compressed", 1 },
sstdesc{"tests/sstables/compressed", 2 },
sstdesc{tmp->path, 2 },
component);
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(check_compressed_info_func) {
@@ -235,8 +238,9 @@ SEASTAR_TEST_CASE(check_compressed_info_func) {
}
SEASTAR_TEST_CASE(check_summary_func) {
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
auto tmp = make_lw_shared<tmpdir>();
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
return sstables::test(sst2).read_summary().then([sst1, sst2] {
summary& sst1_s = sstables::test(sst1).get_summary();
summary& sst2_s = sstables::test(sst2).get_summary();
@@ -247,7 +251,7 @@ SEASTAR_TEST_CASE(check_summary_func) {
BOOST_REQUIRE(sst1_s.first_key.value == sst2_s.first_key.value);
BOOST_REQUIRE(sst1_s.last_key.value == sst2_s.last_key.value);
});
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(check_filter_func) {
@@ -255,8 +259,9 @@ SEASTAR_TEST_CASE(check_filter_func) {
}
SEASTAR_TEST_CASE(check_statistics_func) {
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
auto tmp = make_lw_shared<tmpdir>();
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
return sstables::test(sst2).read_statistics().then([sst1, sst2] {
statistics& sst1_s = sstables::test(sst1).get_statistics();
statistics& sst2_s = sstables::test(sst2).get_statistics();
@@ -271,19 +276,20 @@ SEASTAR_TEST_CASE(check_statistics_func) {
});
// TODO: compare the field contents from both sstables.
});
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(check_toc_func) {
return do_write_sst("tests/sstables/compressed", 1).then([] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", "tests/sstables/compressed", 2, la, big);
auto tmp = make_lw_shared<tmpdir>();
return do_write_sst("tests/sstables/compressed", tmp->path, 1).then([tmp] (auto sst1) {
auto sst2 = make_lw_shared<sstable>("ks", "cf", tmp->path, 2, la, big);
return sstables::test(sst2).read_toc().then([sst1, sst2] {
auto& sst1_c = sstables::test(sst1).get_components();
auto& sst2_c = sstables::test(sst2).get_components();
BOOST_REQUIRE(sst1_c == sst2_c);
});
});
}).then([tmp] {});
}
SEASTAR_TEST_CASE(uncompressed_random_access_read) {
@@ -857,6 +863,7 @@ SEASTAR_TEST_CASE(reshuffle) {
cfg.enable_incremental_backups = false;
auto cf = make_lw_shared<column_family>(uncompressed_schema(), cfg, column_family::no_commitlog(), *cm);
cf->start();
cf->mark_ready_for_writes();
return cf->reshuffle_sstables(3).then([cm, cf] (std::vector<sstables::entry_descriptor> reshuffled) {
BOOST_REQUIRE(reshuffled.size() == 2);
BOOST_REQUIRE(reshuffled[0].generation == 3);

View File

@@ -100,6 +100,10 @@ public:
_sst->_generation = generation;
}
void change_dir(sstring dir) {
_sst->_dir = dir;
}
future<> store() {
_sst->_components.erase(sstable::component_type::Index);
_sst->_components.erase(sstable::component_type::Data);

View File

@@ -135,6 +135,8 @@ void test_timestamp_like_string_conversions(data_type timestamp_type) {
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-03T12:30:00+1230"), timestamp_type->decompose(tp)));
BOOST_REQUIRE(timestamp_type->equal(timestamp_type->from_string("2015-07-02T23:00-0100"), timestamp_type->decompose(tp)));
BOOST_REQUIRE_EQUAL(timestamp_type->to_string(timestamp_type->decompose(tp)), "2015-07-03T00:00:00");
auto now = time(nullptr);
auto local_now = *localtime(&now);
char buf[100];

View File

@@ -40,6 +40,14 @@
#include <boost/multiprecision/cpp_int.hpp>
#include "utils/big_decimal.hh"
template<typename T>
sstring time_point_to_string(const T& tp)
{
auto timestamp = tp.time_since_epoch().count();
auto time = boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(timestamp);
return boost::posix_time::to_iso_extended_string(time);
}
static const char* int32_type_name = "org.apache.cassandra.db.marshal.Int32Type";
static const char* long_type_name = "org.apache.cassandra.db.marshal.LongType";
static const char* ascii_type_name = "org.apache.cassandra.db.marshal.AsciiType";
@@ -421,7 +429,11 @@ public:
}
virtual bytes from_string(sstring_view s) const override;
virtual sstring to_string(const bytes& b) const override {
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
auto v = deserialize(b);
if (v.is_null()) {
return "";
}
return time_point_to_string(from_value(v).get());
}
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::timestamp;
@@ -684,7 +696,11 @@ public:
return b;
}
virtual sstring to_string(const bytes& b) const override {
throw std::runtime_error(sprint("%s not implemented", __PRETTY_FUNCTION__));
auto v = deserialize(b);
if (v.is_null()) {
return "";
}
return time_point_to_string(from_value(v).get());
}
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() const override {
return cql3::cql3_type::timestamp;

View File

@@ -1211,9 +1211,11 @@ public:
assert(seg->is_empty());
free_segment(seg);
}
_closed_occupancy = {};
if (_active) {
assert(_active->is_empty());
free_segment(_active);
_active = nullptr;
}
if (_group) {
_group->del(this);