Commit Graph

108 Commits

Author SHA1 Message Date
Avi Kivity
827a4d0010 Merge "streaming: Invalidate cache upon receiving of stream" from Asias
"When a node gain or regain responsibility for certain token ranges, streaming
will be performed, upon receiving of the stream data, the row cache
is invalidated for that range.

Refs #484."
2015-12-28 10:24:46 +02:00
Asias He
20c258f202 streaming: Fix session hang with maybe_completed: WAIT_COMPLETE -> WAIT_COMPLETE
The problem is that we set the session state to WAIT_COMPLETE in
send_complete_message's continuation, the peer node might send
COMPLETE_MESSAGE before we run the continuation, thus we set the wrong
status in COMPLETE_MESSAGE's handler and will not close the session.

Before:

   GOT STREAM_MUTATION_DONE
   receive  task_completed
   SEND COMPLETE_MESSAGE to 127.0.0.2:0
   GOT COMPLETE_MESSAGE, from=127.0.0.2, connecting=127.0.0.3, dst_cpu_id=0
   complete: PREPARING -> WAIT_COMPLETE
   GOT COMPLETE_MESSAGE Reply
   maybe_completed: WAIT_COMPLETE -> WAIT_COMPLETE

After:

   GOT STREAM_MUTATION_DONE
   receive  task_completed
   maybe_completed: PREPARING -> WAIT_COMPLETE
   SEND COMPLETE_MESSAGE to 127.0.0.2:0
   GOT COMPLETE_MESSAGE, from=127.0.0.2, connecting=127.0.0.3, dst_cpu_id=0
   complete: WAIT_COMPLETE -> COMPLETE
   Session with 127.0.0.2 is complete
2015-12-24 20:34:44 +08:00
Asias He
c971fad618 streaming: Introduce keep alive timer for each stream_session
If the session is idle for 10 minutes, close the session. This can
detect the following hangs:

1) if the sending node is gone, the receiving peer will wait forever
2) if the node which should send COMPLETE_MESSAGE to the peer node is
gone, the peer node will wait forever

Fixes simple_kill_streaming_node_while_bootstrapping_test.
2015-12-24 20:34:44 +08:00
Asias He
f527e07be6 streaming: Get stream_session in STREAM_MUTATION handler
Get from address from cinfo. It is needed to figure out which stream
session this mutation is belonged to, since we need to update the keep
alive timer for this stream session.
2015-12-24 20:34:44 +08:00
Asias He
d7a8c655a6 streaming: Print All sessions completed after state change message
close_session will print "All sessions completed" message, print the
state change message before that.
2015-12-24 20:34:44 +08:00
Asias He
eaea09ee71 streaming: Retransmit COMPLETE_MESSAGE message
It is oneway message at the moment. If a COMPLETE_MESSAGE is lost, no
one will close the session. The first step to fix the issue is to try to
retransmit the message.
2015-12-24 20:34:44 +08:00
Asias He
d1d6395978 streaming: Print old state before setting the new state 2015-12-24 20:34:44 +08:00
Asias He
2d32195c32 streaming: Invalidate cache upon receiving of stream
When a node gain or regain responsibility for certain token ranges,
streaming will be performed, upon receiving of the stream data, the
row cache is invalidated for that range.

Refs #484.
2015-12-21 14:44:13 +08:00
Asias He
b7d10b710e streaming: Propagate fail to send PREPARE_DONE_MESSAGE exception
Otherwise the stream_plan will not be marked as failed state.
2015-12-10 12:38:00 +02:00
Asias He
19a6dfcfd0 streaming: stream_session print stream_session_state properly 2015-11-10 15:39:34 +08:00
Asias He
860c7aff37 streaming: Print plan_id in logger 2015-11-10 15:39:34 +08:00
Asias He
d2e5d13e69 streaming: Set state to STREAMING only if we really have data to sent 2015-11-10 15:39:34 +08:00
Asias He
fcf7486d4c streaming: Improve state transition log for maybe_completed and complete 2015-11-10 15:39:34 +08:00
Asias He
72a7a6bd9b streaming: session close
Currently, there are multiple places we can close a session, this makes
the close code path hard to follow. Remove the call to maybe_completed
in follower_start_sent to simplify closing a bit.

- stream_session::follower_start_sent -> maybe_completed()
- stream_session::receive_task_completed -> maybe_completed()
- stream_session::transfer_task_completed -> maybe_completed()
- on receive of the COMPLETE_MESSAGE -> complete()
2015-11-10 15:39:34 +08:00
Asias He
7959c12073 stream_session: Support column_families is empty case
An empty column_families means to get all the column families.
2015-10-13 15:44:59 +08:00
Avi Kivity
d5cf0fb2b1 Add license notices 2015-09-20 10:43:39 +03:00
Calle Wilund
27421d55bf stream_session: Fix use of query_options::DEFAULT
Make (apparently dead?) test routine (not in test class)stream_session::test
use query_options::DEFAULT the way it is intended. Not copy it (semantically
prohibited, but accidentally possible in code)
2015-09-15 11:19:47 +02:00
Asias He
e7c0db0160 streaming: Fix a race between initiator and follower
1) Node A sends prepare message (msg1) to Node A
2) Node B sends prepare message (msg2) back to Node A
3) Node A prepares what to receive according to msg2

The issue is that, Node B might sends before Node A prepares to receive.

To fix, we send a PREPARE_DONE_MESSAGE after step 3 to notify
node B to start sending.
2015-08-17 14:28:11 +08:00
Asias He
fd1c0e0bb3 streaming: Fix iterate and delete
The problem is that in start_streaming_files we iterate the _transfers
map, however in task.start() we can delete the task from _transfers:
stream_transfer_task::start() -> stream_transfer_task::complete ->
stream_session::task_completed -> _transfers.erase(completed_task.cf_id)

To fix, we advance the iterator before we start the task.

std::_Rb_tree_increment(std::_Rb_tree_node_base const*) () from
/lib64/libstdc++.so.6
/usr/include/c++/5.1.1/bits/stl_tree.h:205
(this=this@entry=0x6000000dc290) at streaming/stream_transfer_task.cc:55
streaming::stream_session::start_streaming_files
(this=this@entry=0x6000000ab500) at streaming/stream_session.cc:526
(this=0x6000000ab500, requests=std::vector of length 1, capacity 1 =
{...}, summaries=std::vector of length 1, capacity 1 = {...})
    at streaming/stream_session.cc:356
streaming/stream_session.cc:83
2015-08-17 11:00:30 +08:00
Asias He
8c6e08c7e2 streaming: Log state in maybe_completed 2015-08-17 11:00:30 +08:00
Asias He
0f1f710b27 streaming: Introduce transfer_task_completed 2015-08-17 11:00:30 +08:00
Asias He
651200c123 streaming: Log exception
It is easier to tell what is going wrong.
2015-08-17 10:52:30 +08:00
Asias He
0e2f9beec4 streaming: Wait after create keyspace and create table
Give it some time to propagate the schema to other nodes.
2015-08-10 15:53:42 +08:00
Asias He
d724fd449c streaming: Avoid storing partition_range in stream_detail
Now, make_local_reader does not need partition_range to be alive when we
read the mutation reader. No need to store it in stream_detail for its
lifetime.
2015-08-10 15:51:13 +08:00
Asias He
62394cc9d0 streaming: Add error handling for PREPARE_MESSAGE 2015-08-10 15:05:10 +08:00
Asias He
9f83588e66 streaming: Add error handling for STREAM_INIT_MESSAGE 2015-08-10 15:01:29 +08:00
Asias He
924ca5915e stream_session: Make sure cf exists before streaming
We use storage_proxy::mutate_locally() to apply the mutations when we
receive them. mutate_locally() will ignore the mutation if the cf does not
exist. We check in the prepare phase to make sure all the cf's exist.
2015-08-04 16:21:40 +08:00
Asias He
02ae515541 streaming: Add sharding support
Thanks to the new mutation reader (storage_proxy::make_local_reader), we
can read mutations for a cf on all shard. This simplifies the sharding
handling a lot. When user of streaming creates a stream_plan on any
shard, it will send data from all shards to remote node and receive
data from all shards on remote node.
2015-08-04 16:21:40 +08:00
Avi Kivity
98ec451d6a Extract range<> into its own header
It's not just for queries any more.
2015-08-02 16:07:42 +03:00
Asias He
90ec97743b streaming: Introduce get_stream_result_future
Stream manager tracks two kind of streams: initiated or receiving. Give
a plan_id, search both lists to get a stream_result_future instance.
2015-07-31 16:27:55 +08:00
Asias He
59cae82470 streaming: Make stream_plan::execute return a future
Returns a ready future if the stream_plan completes successfully, or a
failed future otherwise.
2015-07-31 16:27:55 +08:00
Tomasz Grabiec
4d06c2aa1d Move to_partition_range() adaptor to global scope
It should be moved to i_partitioner.hh, but to do that range<> has to
be first moved out of query-request.hh to break cyclic dependency.
I didn't want to cause conflicts with in-flight patches to range<>.
2015-07-24 16:08:41 +02:00
Tomasz Grabiec
e5feff5d71 dht: ring_position: Switch to total ordering
range::is_wrap_around() and range::contains() rely on total ordering
on values to work properly. Current ring_position_comparator was only
imposing a weak ordering (token positions equal to all key positions
with that token).

range::before() and range::after() can't work for weak ordering. If
the bound is exclusive, we don't know if user-provided token position
is inside or outside.

Also, is_wrap_around() can't properly detect wrap around in all
cases. Consider this case:

 (1) ]A; B]
 (2) [A; B]

For A = (tok1) and B = (tok1, key1), (1) is a wrap around and (2) is
not. Without total ordering between A and B, range::is_wrap_around() can't
tell that.

I think the simplest soution is to define a total ordering on
ring_position by making token positions positioned either before or
after all keys with that token.
2015-07-24 16:08:41 +02:00
Asias He
54d482afe4 streaming: Test both pushing and pulling of data
stream_plan.transfer_ranges() sends data from local to remote node.

stream_plan.request_ranges() asks remote to send data to local.

After streaming, both nodes contains all the keys.

$ cat /tmp/out1|grep "\[Stream"
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Executing streaming plan for MYPLAN
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Starting streaming to 127.0.0.2
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Sending stream init for incoming stream
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Beginning stream session with 127.0.0.2
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Prepare completed.  Receiving 1 files(105553124400080 bytes), sending 1 files(105553124104160 bytes)
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Session with 127.0.0.2 is complete
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] All sessions completed

$ cat /tmp/out2|grep "\[Stream"
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Creating new streaming plan for MYPLAN
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Received streaming plan for MYPLAN
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Prepare completed.  Receiving 1 files(105553124104160 bytes), sending 1 files(105553124400080 bytes)
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Session with 127.0.0.1 is complete
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] All sessions completed

Node 1
$ sstable2json tmp/1/ks/*/la-1-big-Data.db | grep key | sort
{"key": "1",
{"key": "2",
{"key": "3",
{"key": "4",
{"key": "5",
{"key": "6",

Node 2
$ sstable2json tmp/2/ks/*/la-1-big-Data.db | grep key | sort
{"key": "1",
{"key": "2",
{"key": "3",
{"key": "4",
{"key": "5",
{"key": "6",
2015-07-22 11:49:30 +08:00
Asias He
96049a99cf streaming: Add more debug print for stream_session::prepare 2015-07-22 11:49:30 +08:00
Asias He
1c60844727 streaming: Always start_streaming_files upon receiving of PREPARE_MESSAGE reply 2015-07-22 11:49:30 +08:00
Asias He
19f46fdbe1 streaming: Remove redundant debug log info 2015-07-21 16:12:54 +08:00
Asias He
fc718dc87d streaming: Set up dst_cpu_id in PREPARE_MESSAGE hanlder 2015-07-21 16:12:54 +08:00
Asias He
a010829f0c streaming: Add src_cpu_id parameter for PREPARE_MESSAGE verb
We need it to setup dst_cpu_id for the session of the follower.
2015-07-21 16:12:54 +08:00
Asias He
6712e9404e streaming: Implement session completion logic 2015-07-21 16:12:54 +08:00
Asias He
f9109c33ba streaming: Implement stream_transfer_task completion logic 2015-07-21 16:12:54 +08:00
Asias He
f2960a7cb0 streaming: Send plan_id for STREAM_MUTATION
We need this to find session associated with this frozen_mutation.
2015-07-21 16:12:54 +08:00
Asias He
34e33a9afe streaming: Wire up handle_session_complete and friends 2015-07-21 16:12:54 +08:00
Asias He
f20b281e1c streaming: Update comments in stream_session::prepare
To make things a bit clear. Note: In Origin, prepare() can be called by
both initiator and follower.
2015-07-21 16:12:54 +08:00
Asias He
9b8d542b35 streaming: Improve handler of prepare_message from follower
Upon receiving of the prepare_message message from the follower,
msg.requests can only be empty. Don't bother look at the requests.
2015-07-21 16:12:54 +08:00
Asias He
2dac3306a0 streaming: Enable more loggers in stream_session 2015-07-21 16:12:54 +08:00
Asias He
8561315cf2 streaming: de-thread_local-ize logger 2015-07-21 16:12:54 +08:00
Asias He
521df14240 streaming: Rename sstable_details to stream_details 2015-07-21 16:12:54 +08:00
Asias He
7e365cb440 streaming: Handle cf_id does not exist case in STREAM_MUTATION handler 2015-07-21 16:12:54 +08:00
Asias He
69ce554a0f streaming: Init session when follower receives a PREPARE_MESSAGE 2015-07-21 16:12:54 +08:00