Streaming has currently one class, that can be used to contain the read
operations being generated by the streaming process. Those reads come from two
places:
- checksums (if doing repair)
- reading mutations to be sent over the wire.
Depending on the amount of data we're dealing with, that can generate a
significant chunk of data, with seconds worth of backlog, and if we need to
have the incoming writes intertwined with those reads, those can take a long
time.
Even if one node is only acting as a receiver, it may still read a lot for the
checksums - if we're talking about repairs, those are coming from the
checksums.
However, in more complicated failure scenarios, it is not hard to imagine a
node that will be both sending and receiving a lot of data.
The best way to guarantee progress on both fronts, is to put both kinds of
operations into different classes.
This patch introduces a new write class, and rename the old read class so it
can have a more meaningful name.
Signed-off-by: Glauber Costa <glauber@scylladb.com>
Fix bootstrap_test.py:TestBootstrap.failed_bootstap_wiped_node_can_join_test
Logs on node 1:
INFO 2016-03-11 15:53:43,287 [shard 0] gossip - FatClient 127.0.0.2 has been silent for 30000ms, removing from gossip
INFO 2016-03-11 15:53:43,287 [shard 0] stream_session - stream_manager: Close all stream_session with peer = 127.0.0.2 in on_remove
WARN 2016-03-11 15:53:43,498 [shard 0] stream_session - [Stream #4e411ba0-e75e-11e5-81f8-000000000000] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to 127.0.0.2:0: std::runtime_error ([Stream #4e411ba0-e75e-11e5-81f8-000000000000] GOT STREAM_ MUTATION_DONE 127.0.0.1: Can not find stream_manager)
terminate called without an active exception
Backtrace on node 1:
#0 0x00007fb74723da98 in raise () from /lib64/libc.so.6
#1 0x00007fb74723f69a in abort () from /lib64/libc.so.6
#2 0x00007fb74ab84aed in __gnu_cxx::__verbose_terminate_handler() () from /lib64/libstdc++.so.6
#3 0x00007fb74ab82936 in ?? () from /lib64/libstdc++.so.6
#4 0x00007fb74ab82981 in std::terminate() () from /lib64/libstdc++.so.6
#5 0x00007fb74ab82be9 in __cxa_rethrow () from /lib64/libstdc++.so.6
#6 0x0000000000f3521e in streaming::stream_transfer_task::<lambda()>::<lambda(auto:44)>::operator()<std::__exception_ptr::exception_ptr> (ep=..., __closure=0x7ffce74d8630) at streaming/stream_transfer_task.cc:169
#7 do_void_futurize_apply<const streaming::stream_transfer_task::start()::<lambda()>::<lambda(auto:44)>&, std::__exception_ptr::exception_ptr> (func=...) at /home/asias/src/cloudius-systems/scylla/seastar/core/future.hh:1142
#8 futurize<void>::apply<const streaming::stream_transfer_task::start()::<lambda()>::<lambda(auto:44)>&, std::__exception_ptr::exception_ptr> (func=...) at /home/asias/src/cloudius-systems/scylla/seastar/core/future.hh:1190
#9 future<>::<lambda(auto:7&&)>::operator()<future<> > ( fut=fut@entry=<unknown type in /home/asias/src/cloudius-systems/scylla/build/release/scylla, CU 0xec84d00, DIE 0xee2561d>, __closure=__closure@entry=0x7ffce74d8630) at /home/asias/src/cloudius-systems/scylla/seastar/core/future.hh:1014
Message-Id: <1457684884-4776-2-git-send-email-asias@scylladb.com>
In the preparation phase of streaming, we check that remote node has all
the cf_id which are needed for the entire streaming process, including the
cf_id which local node will send to remote node and wise versa.
So, at later time, if the cf_id is missing, it must be that the cf_id is
deleted. It is fine to ingore no_such_column_family exception. In this
patch, we change the code to ignore at server side to avoid sending the
exception back, to avoid handle exception in an IDL compatiable way.
One thing we can improve is that the sender might know the cf is deleted
later than the receiver does. In this case, the sender will send some
more mutations if we send back the no_such_column_family back to the
sender. However, since we do not throw exceptions in the receiver stream
mutation handler, it will not cause a lot of overhead, the receiver will
just ignore the mutation received.
Fixes#979
It is possible that a cf is deleted after we make the cf reader. Avoid
sending them to avoid the unnecessary overhead to send them on the wire and
the peer node to drop the received mutations.
Currently, only the shard where the stream_plan is created on will send
streaing mutations. To utilize all the available cores, we can make each
shard send mutations which it is responsbile for. On the receiver side,
we do not forward the mutations to the shard where the stream_session is
created, so that we can avoid unnecessary forwarding.
Note: the downside is that it is now harder to:
1) to track number of bytes sent and received
2) to update the keep alive timer upon receive of the STREAM_MUTATION
To fix, we now store the sent/recieved bytes info on all shards. When
the keep alive timer expires, we check if any progress has been made.
Hopefully, this patch will make the streaming much faster and in turn
make the repair/decommission/adding a node faster.
Refs: https://github.com/scylladb/scylla/issues/849
Tested with decommission/repair dtest.
Message-Id: <96b419ab11b736a297edd54a0b455ffdc2511ac5.1454645370.git.asias@scylladb.com>
There are only two messages: prepare_message and outgoing_file_message.
Actually only the prepare_message is the message we send on wire.
Flatten the namespace.
- int connections_per_host
Scylla does not create connections per stream_session, instead it uses
rpc, thus connections_per_host is not relevant to scylla.
- bool keep_ss_table_level
- int repaired_at
Scylla does not stream sstable files. They are not relevant to scylla.
- Add debug for the peer address info
- Add debug in stream_transfer_task and stream_receive_task
- Add debug when cancel the keep_alive timer
- Add debug for has_active_sessions in stream_result_future::maybe_complete
messaging_service will use private ip address automatically to connect a
peer node if possible. There is no need for the upper level like
streaming to worry about it. Drop it simplifies things a bit.
"When a node gain or regain responsibility for certain token ranges, streaming
will be performed, upon receiving of the stream data, the row cache
is invalidated for that range.
Refs #484."
If the session is idle for 10 minutes, close the session. This can
detect the following hangs:
1) if the sending node is gone, the receiving peer will wait forever
2) if the node which should send COMPLETE_MESSAGE to the peer node is
gone, the peer node will wait forever
Fixes simple_kill_streaming_node_while_bootstrapping_test.
When a node gain or regain responsibility for certain token ranges,
streaming will be performed, upon receiving of the stream data, the
row cache is invalidated for that range.
Refs #484.
When we start to sending mutations for cf_id to remote node, remote node
might do not have the cf_id anymore due to dropping of the cf for
instance.
We should not fail the streaming if this happens, since the cf does not
exist anymore there is no point streaming it.
Fixes#566
Many mutation_reader implementations capture 'this', which, if copied,
becomes invalid. Protect against this error my making mutation_reader
a non-copyable object.
Fix inadvertant copied around the code base.
The problem is that in start_streaming_files we iterate the _transfers
map, however in task.start() we can delete the task from _transfers:
stream_transfer_task::start() -> stream_transfer_task::complete ->
stream_session::task_completed -> _transfers.erase(completed_task.cf_id)
To fix, we advance the iterator before we start the task.
std::_Rb_tree_increment(std::_Rb_tree_node_base const*) () from
/lib64/libstdc++.so.6
/usr/include/c++/5.1.1/bits/stl_tree.h:205
(this=this@entry=0x6000000dc290) at streaming/stream_transfer_task.cc:55
streaming::stream_session::start_streaming_files
(this=this@entry=0x6000000ab500) at streaming/stream_session.cc:526
(this=0x6000000ab500, requests=std::vector of length 1, capacity 1 =
{...}, summaries=std::vector of length 1, capacity 1 = {...})
at streaming/stream_session.cc:356
streaming/stream_session.cc:83
At the moment, when local node send a mutation to remote node, it will
wait for remote node to apply the mutation and send back a response,
then it will send the next mutation. This means the sender are sending
mutations one by one. To optimize, we can make the sender send more
mutations in parallel without waiting for the response. In order to
apply back pressure from remote node, a per shard mutation send limiter
is introduced so that the sender will not overwhelm the receiver.
I tried our lw_shared_ptr, the compiler complained endless usage of
incomplete type stream_session. I can not include stream_session.hh
everywhere due to circular dependency.
For now, I'm using std::shared_ptr which works fine.
In streaming code, we need core to core connection(the second connection
from B to A). That is when node A initiates a stream to node B, it is
possible that node A will transfer data to node B and vice verse, so we
need two connections. When node A creates a tcp connection (within the
messaging_service) to node B, we have a connection ip_a:core_a to
ip_b:core_b. When node B creates a connection to node B, we can not
guarantee it is ip_b:core_b to ip_a:core_a.
Current messaging_service does not support core to core connection yet,
although we use shard_id{ip, cpu_id} as the destination of the message.
We can solve the issue in upper layer. We can pass extra cpu_id as a
user msg.
Node A sends stream_init_message with my_cpu_id = current_cpu_id
Node B receives stream_init_message, it runs on whatever cpu this
connection goes to, then it sends response back with Node B's
current_cpu_id.
After this, each node knows which cpu_id to send to each other.
TODO: we need to handle the case when peer node reboots with different
number of cpus.
Each outgoing_file_message might contain multiple mutations. Send them
one mutation per RPC call (using frozen_mutation), instead of one big
outgoing_file_message per one RPC call.