Asias He
0e2f9beec4
streaming: Wait after create keyspace and create table
...
Give it some time to propagate the schema to other nodes.
2015-08-10 15:53:42 +08:00
Asias He
d724fd449c
streaming: Avoid storing partition_range in stream_detail
...
Now, make_local_reader does not need partition_range to be alive when we
read the mutation reader. No need to store it in stream_detail for its
lifetime.
2015-08-10 15:51:13 +08:00
Asias He
62394cc9d0
streaming: Add error handling for PREPARE_MESSAGE
2015-08-10 15:05:10 +08:00
Asias He
9f83588e66
streaming: Add error handling for STREAM_INIT_MESSAGE
2015-08-10 15:01:29 +08:00
Asias He
e13d93b2ff
streaming: Improve error handling in stream_transfer_task::complete
2015-08-10 14:49:34 +08:00
Asias He
c7c33a9f44
streaming: Add error handling for STREAM_MUTATION sending
2015-08-10 14:44:25 +08:00
Asias He
be4d9c63b1
streaming: Drop do_with in stream_transfer_task::start
...
We can copy id instead, it is cheap.
2015-08-10 14:13:15 +08:00
Asias He
924ca5915e
stream_session: Make sure cf exists before streaming
...
We use storage_proxy::mutate_locally() to apply the mutations when we
receive them. mutate_locally() will ignore the mutation if the cf does not
exist. We check in the prepare phase to make sure all the cf's exist.
2015-08-04 16:21:40 +08:00
Asias He
36eb1d1d79
streaming: Add operator<< for stream_summary
2015-08-04 16:21:40 +08:00
Asias He
02ae515541
streaming: Add sharding support
...
Thanks to the new mutation reader (storage_proxy::make_local_reader), we
can read mutations for a cf on all shard. This simplifies the sharding
handling a lot. When user of streaming creates a stream_plan on any
shard, it will send data from all shards to remote node and receive
data from all shards on remote node.
2015-08-04 16:21:40 +08:00
Avi Kivity
98ec451d6a
Extract range<> into its own header
...
It's not just for queries any more.
2015-08-02 16:07:42 +03:00
Asias He
804564fe76
streaming: Remove completed stream in stream_manager
2015-07-31 16:27:55 +08:00
Asias He
90ec97743b
streaming: Introduce get_stream_result_future
...
Stream manager tracks two kind of streams: initiated or receiving. Give
a plan_id, search both lists to get a stream_result_future instance.
2015-07-31 16:27:55 +08:00
Asias He
1749305165
streaming: Fix stream_result_future::create_and_register
...
It is used to create stream initiated by us.
2015-07-31 16:27:55 +08:00
Asias He
588da78574
streaming: Introduce stream_manager::get_sending_stream
...
Return streams initiated by us.
2015-07-31 16:27:55 +08:00
Asias He
4d12f40b8a
streaming: Implement stream_manager::register_sending
...
It is used to track streams initiated by us.
2015-07-31 16:27:55 +08:00
Asias He
59cae82470
streaming: Make stream_plan::execute return a future
...
Returns a ready future if the stream_plan completes successfully, or a
failed future otherwise.
2015-07-31 16:27:55 +08:00
Asias He
3a5af0a7fb
streaming: Fix stream_coordinator::is_receiving
...
Unlike Origin, We can not use _connections_per_host == 0 to indicate we
are a follower since we set _connections_per_host to 1 for follower too.
2015-07-31 16:27:55 +08:00
Asias He
67acaf1a79
streaming: Convert StreamException.java to C++
2015-07-31 16:27:55 +08:00
Asias He
8864d0c25c
streaming: Import StreamException.java
2015-07-31 16:27:55 +08:00
Tomasz Grabiec
4d06c2aa1d
Move to_partition_range() adaptor to global scope
...
It should be moved to i_partitioner.hh, but to do that range<> has to
be first moved out of query-request.hh to break cyclic dependency.
I didn't want to cause conflicts with in-flight patches to range<>.
2015-07-24 16:08:41 +02:00
Tomasz Grabiec
e5feff5d71
dht: ring_position: Switch to total ordering
...
range::is_wrap_around() and range::contains() rely on total ordering
on values to work properly. Current ring_position_comparator was only
imposing a weak ordering (token positions equal to all key positions
with that token).
range::before() and range::after() can't work for weak ordering. If
the bound is exclusive, we don't know if user-provided token position
is inside or outside.
Also, is_wrap_around() can't properly detect wrap around in all
cases. Consider this case:
(1) ]A; B]
(2) [A; B]
For A = (tok1) and B = (tok1, key1), (1) is a wrap around and (2) is
not. Without total ordering between A and B, range::is_wrap_around() can't
tell that.
I think the simplest soution is to define a total ordering on
ring_position by making token positions positioned either before or
after all keys with that token.
2015-07-24 16:08:41 +02:00
Asias He
a216f258a2
streaming: Serialize query::range<token> in stream_request
2015-07-23 09:08:15 +08:00
Asias He
54d482afe4
streaming: Test both pushing and pulling of data
...
stream_plan.transfer_ranges() sends data from local to remote node.
stream_plan.request_ranges() asks remote to send data to local.
After streaming, both nodes contains all the keys.
$ cat /tmp/out1|grep "\[Stream"
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Executing streaming plan for MYPLAN
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Starting streaming to 127.0.0.2
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Sending stream init for incoming stream
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Beginning stream session with 127.0.0.2
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Prepare completed. Receiving 1 files(105553124400080 bytes), sending 1 files(105553124104160 bytes)
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Session with 127.0.0.2 is complete
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] All sessions completed
$ cat /tmp/out2|grep "\[Stream"
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Creating new streaming plan for MYPLAN
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Received streaming plan for MYPLAN
[Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Prepare completed. Receiving 1 files(105553124104160 bytes), sending 1 files(105553124400080 bytes)
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] Session with 127.0.0.1 is complete
[Stream #9fd8c3c0-3023-11e5-b450-000000000000] All sessions completed
Node 1
$ sstable2json tmp/1/ks/*/la-1-big-Data.db | grep key | sort
{"key": "1",
{"key": "2",
{"key": "3",
{"key": "4",
{"key": "5",
{"key": "6",
Node 2
$ sstable2json tmp/2/ks/*/la-1-big-Data.db | grep key | sort
{"key": "1",
{"key": "2",
{"key": "3",
{"key": "4",
{"key": "5",
{"key": "6",
2015-07-22 11:49:30 +08:00
Asias He
96049a99cf
streaming: Add more debug print for stream_session::prepare
2015-07-22 11:49:30 +08:00
Asias He
1c60844727
streaming: Always start_streaming_files upon receiving of PREPARE_MESSAGE reply
2015-07-22 11:49:30 +08:00
Asias He
736c0bc08f
streaming: Improve query::range<token> deserialization a bit
...
query::range<token> is not serialized ATM. Always use full range for
now.
2015-07-22 11:49:30 +08:00
Asias He
d39dd0f9d1
streaming: Add operator<< for stream_request
2015-07-22 11:49:30 +08:00
Asias He
d934b2c761
streaming: Fix prepare_message and stream_request deserialization
...
vector(size_type count) constructs the container with count
default-inserted instances of T. So, current code will end up with 2*num
elements which is wrong.
2015-07-22 11:49:30 +08:00
Asias He
f83d0bf7c4
streaming: Fix info printout format
2015-07-22 11:49:30 +08:00
Asias He
19f46fdbe1
streaming: Remove redundant debug log info
2015-07-21 16:12:54 +08:00
Asias He
93f17024a8
streaming: Fix a logger printout
...
Before:
[Stream #136c5310-2f7a-11e5-87df-000000000000 ID#0]
[Stream #136c5310-2f7a-11e5-87df-000000000000, ID#0]
After:
[Stream #136c5310-2f7a-11e5-87df-000000000000 ID#0]
[Stream #136c5310-2f7a-11e5-87df-000000000000 ID#0]
2015-07-21 16:12:54 +08:00
Asias He
0e5fa35bd2
streaming: Register to stream_manager in create_and_register
2015-07-21 16:12:54 +08:00
Asias He
fc718dc87d
streaming: Set up dst_cpu_id in PREPARE_MESSAGE hanlder
2015-07-21 16:12:54 +08:00
Asias He
ad86f5ea6e
streaming: Fix get_or_create_host_data
...
It is always creating a new host_streaming_data which is incorrect.
2015-07-21 16:12:54 +08:00
Asias He
a010829f0c
streaming: Add src_cpu_id parameter for PREPARE_MESSAGE verb
...
We need it to setup dst_cpu_id for the session of the follower.
2015-07-21 16:12:54 +08:00
Asias He
6712e9404e
streaming: Implement session completion logic
2015-07-21 16:12:54 +08:00
Asias He
f9109c33ba
streaming: Implement stream_transfer_task completion logic
2015-07-21 16:12:54 +08:00
Asias He
f2960a7cb0
streaming: Send plan_id for STREAM_MUTATION
...
We need this to find session associated with this frozen_mutation.
2015-07-21 16:12:54 +08:00
Asias He
ccb32ceec5
streaming: Add stream_transfer_task::complete
2015-07-21 16:12:54 +08:00
Asias He
34e33a9afe
streaming: Wire up handle_session_complete and friends
2015-07-21 16:12:54 +08:00
Asias He
1f5feee5f2
streaming: Implement stream_result_future::handle_progress
2015-07-21 16:12:54 +08:00
Asias He
d74e414d09
streaming: Implement stream_result_future::handle_session_complete
2015-07-21 16:12:54 +08:00
Asias He
5e6f84cba8
streaming: Implement stream_result_future::handle_session_prepared
...
Instead of playing the game of casting between stream_event and derived
class. We overload handle_stream_event with derived stream_event class.
virtual void handle_stream_event(session_complete_event event) {}
virtual void handle_stream_event(progress_event event) {}
virtual void handle_stream_event(session_prepared_event event) {}
Also, make the virtual function non pure virtual, so user can override
the interested event only without defining all of the three.
2015-07-21 16:12:54 +08:00
Asias He
5ff5abbe2a
streaming: handle_stream_event should be public
2015-07-21 16:12:54 +08:00
Asias He
d574e6e3c8
streaming: Rename getTotalFilesToSend to get_total_files_to_send
2015-07-21 16:12:54 +08:00
Asias He
f20b281e1c
streaming: Update comments in stream_session::prepare
...
To make things a bit clear. Note: In Origin, prepare() can be called by
both initiator and follower.
2015-07-21 16:12:54 +08:00
Asias He
9b8d542b35
streaming: Improve handler of prepare_message from follower
...
Upon receiving of the prepare_message message from the follower,
msg.requests can only be empty. Don't bother look at the requests.
2015-07-21 16:12:54 +08:00
Asias He
d9b7cb4142
streaming: Enable logger in stream_result_future
2015-07-21 16:12:54 +08:00
Asias He
2e8b34dd9c
streaming: Enable logger in stream_coordinator
...
Move connect_all_stream_sessions to source first.
2015-07-21 16:12:54 +08:00