From e2b986910bf738f34a8e89c77d7a2ce060803919 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 17 Jun 2015 16:29:43 +0800 Subject: [PATCH 01/15] streaming: Add RETRY_MESSAGE handler --- streaming/messages/retry_message.hh | 9 +++++++++ streaming/stream_session.cc | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/streaming/messages/retry_message.hh b/streaming/messages/retry_message.hh index a774103e0b..82bbf88651 100644 --- a/streaming/messages/retry_message.hh +++ b/streaming/messages/retry_message.hh @@ -61,6 +61,15 @@ class retry_message : public stream_message { return sb.toString(); } #endif +public: + void serialize(bytes::iterator& out) const { + } + static retry_message deserialize(bytes_view& v) { + return retry_message(); + } + size_t serialized_size() const { + return 0; + } }; } // namespace messages diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 3eaf3854bc..1149d3c532 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -25,6 +25,7 @@ #include "streaming/messages/prepare_message.hh" #include "streaming/messages/outgoing_file_message.hh" #include "streaming/messages/received_message.hh" +#include "streaming/messages/retry_message.hh" namespace streaming { @@ -52,6 +53,14 @@ void stream_session::init_messaging_service_handler() { return make_ready_future(std::move(msg_ret)); }); }); + ms().register_handler(messaging_verb::RETRY_MESSAGE, [] (messages::retry_message msg) { + auto cpu_id = 0; + return smp::submit_to(cpu_id, [msg = std::move(msg)] () mutable { + // TODO + messages::outgoing_file_message msg_ret; + return make_ready_future(std::move(msg_ret)); + }); + }); } future<> stream_session::start() { From 8170f6ee0adb8367b02ded7ff388aaeaf59d5d31 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 17 Jun 2015 16:45:00 +0800 Subject: [PATCH 02/15] streaming: Add COMPLETE_MESSAGE handler --- streaming/messages/complete_message.hh | 9 +++++++++ streaming/stream_session.cc | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/streaming/messages/complete_message.hh b/streaming/messages/complete_message.hh index 5a3424b7dd..c960fbbe98 100644 --- a/streaming/messages/complete_message.hh +++ b/streaming/messages/complete_message.hh @@ -49,6 +49,15 @@ class complete_message : public stream_message { return "Complete"; } #endif +public: + void serialize(bytes::iterator& out) const { + } + static complete_message deserialize(bytes_view& v) { + return complete_message(); + } + size_t serialized_size() const { + return 0; + } }; } // namespace messages diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 1149d3c532..6a75d8e347 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -26,6 +26,7 @@ #include "streaming/messages/outgoing_file_message.hh" #include "streaming/messages/received_message.hh" #include "streaming/messages/retry_message.hh" +#include "streaming/messages/complete_message.hh" namespace streaming { @@ -61,6 +62,14 @@ void stream_session::init_messaging_service_handler() { return make_ready_future(std::move(msg_ret)); }); }); + ms().register_handler(messaging_verb::COMPLETE_MESSAGE, [] (messages::complete_message msg) { + auto cpu_id = 0; + return smp::submit_to(cpu_id, [msg = std::move(msg)] () mutable { + // TODO + messages::complete_message msg_ret; + return make_ready_future(std::move(msg_ret)); + }); + }); } future<> stream_session::start() { From cde167627440a163ef632231c6db239ad47af0f0 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 17 Jun 2015 16:55:11 +0800 Subject: [PATCH 03/15] streaming: Add SESSION_FAILED_MESSAGE handler --- streaming/messages/session_failed_message.hh | 9 +++++++++ streaming/stream_session.cc | 14 ++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/streaming/messages/session_failed_message.hh b/streaming/messages/session_failed_message.hh index fdc3ed7150..1c3b683483 100644 --- a/streaming/messages/session_failed_message.hh +++ b/streaming/messages/session_failed_message.hh @@ -49,6 +49,15 @@ class session_failed_message : public stream_message { return "Session Failed"; } #endif +public: + void serialize(bytes::iterator& out) const { + } + static session_failed_message deserialize(bytes_view& v) { + return session_failed_message(); + } + size_t serialized_size() const { + return 0; + } }; } // namespace messages diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 6a75d8e347..4ff3903f14 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -27,6 +27,7 @@ #include "streaming/messages/received_message.hh" #include "streaming/messages/retry_message.hh" #include "streaming/messages/complete_message.hh" +#include "streaming/messages/session_failed_message.hh" namespace streaming { @@ -70,6 +71,19 @@ void stream_session::init_messaging_service_handler() { return make_ready_future(std::move(msg_ret)); }); }); + ms().register_handler(messaging_verb::SESSION_FAILED_MESSAGE, [] (messages::session_failed_message msg) { + auto cpu_id = 0; + smp::submit_to(cpu_id, [msg = std::move(msg)] () mutable { + // TODO + }).then_wrapped([] (auto&& f) { + try { + f.get(); + } catch (...) { + print("stream_session: SESSION_FAILED_MESSAGE error\n"); + } + }); + return messaging_service::no_wait(); + }); } future<> stream_session::start() { From a41445ac083da5ad5b2ef056573d1c6651cc8620 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 11:37:46 +0800 Subject: [PATCH 04/15] streaming: Import StreamRequest.java --- streaming/StreamRequest.java | 101 +++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 streaming/StreamRequest.java diff --git a/streaming/StreamRequest.java b/streaming/StreamRequest.java new file mode 100644 index 0000000000..9c5b974ee1 --- /dev/null +++ b/streaming/StreamRequest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.DataInput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class StreamRequest +{ + public static final IVersionedSerializer serializer = new StreamRequestSerializer(); + + public final String keyspace; + public final Collection> ranges; + public final Collection columnFamilies = new HashSet<>(); + public final long repairedAt; + public StreamRequest(String keyspace, Collection> ranges, Collection columnFamilies, long repairedAt) + { + this.keyspace = keyspace; + this.ranges = ranges; + this.columnFamilies.addAll(columnFamilies); + this.repairedAt = repairedAt; + } + + public static class StreamRequestSerializer implements IVersionedSerializer + { + public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException + { + out.writeUTF(request.keyspace); + out.writeLong(request.repairedAt); + out.writeInt(request.ranges.size()); + for (Range range : request.ranges) + { + Token.serializer.serialize(range.left, out); + Token.serializer.serialize(range.right, out); + } + out.writeInt(request.columnFamilies.size()); + for (String cf : request.columnFamilies) + out.writeUTF(cf); + } + + public StreamRequest deserialize(DataInput in, int version) throws IOException + { + String keyspace = in.readUTF(); + long repairedAt = in.readLong(); + int rangeCount = in.readInt(); + List> ranges = new ArrayList<>(rangeCount); + for (int i = 0; i < rangeCount; i++) + { + Token left = Token.serializer.deserialize(in); + Token right = Token.serializer.deserialize(in); + ranges.add(new Range<>(left, right)); + } + int cfCount = in.readInt(); + List columnFamilies = new ArrayList<>(cfCount); + for (int i = 0; i < cfCount; i++) + columnFamilies.add(in.readUTF()); + return new StreamRequest(keyspace, ranges, columnFamilies, repairedAt); + } + + public long serializedSize(StreamRequest request, int version) + { + int size = TypeSizes.NATIVE.sizeof(request.keyspace); + size += TypeSizes.NATIVE.sizeof(request.repairedAt); + size += TypeSizes.NATIVE.sizeof(request.ranges.size()); + for (Range range : request.ranges) + { + size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE); + size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE); + } + size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size()); + for (String cf : request.columnFamilies) + size += TypeSizes.NATIVE.sizeof(cf); + return size; + } + } +} From ab7602693852dd1faa780f824f6873b5369a446c Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 11:38:15 +0800 Subject: [PATCH 05/15] streaming: Convert StreamRequest.java to C++ --- configure.py | 1 + streaming/stream_request.cc | 22 +++++++ .../{StreamRequest.java => stream_request.hh} | 60 +++++++++---------- 3 files changed, 53 insertions(+), 30 deletions(-) create mode 100644 streaming/stream_request.cc rename streaming/{StreamRequest.java => stream_request.hh} (70%) diff --git a/configure.py b/configure.py index a546b10536..2f0b211577 100755 --- a/configure.py +++ b/configure.py @@ -465,6 +465,7 @@ urchin_core = (['database.cc', 'service/storage_service.cc', 'streaming/streaming.cc', 'streaming/stream_session.cc', + 'streaming/stream_request.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] diff --git a/streaming/stream_request.cc b/streaming/stream_request.cc new file mode 100644 index 0000000000..73579a318d --- /dev/null +++ b/streaming/stream_request.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_request.hh" diff --git a/streaming/StreamRequest.java b/streaming/stream_request.hh similarity index 70% rename from streaming/StreamRequest.java rename to streaming/stream_request.hh index 9c5b974ee1..ed3305b1a2 100644 --- a/streaming/StreamRequest.java +++ b/streaming/stream_request.hh @@ -14,38 +14,35 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. */ -package org.apache.cassandra.streaming; -import java.io.DataInput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; +#pragma once -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; +#include "core/sstring.hh" +#include "query-request.hh" +#include "dht/i_partitioner.hh" +#include -public class StreamRequest -{ - public static final IVersionedSerializer serializer = new StreamRequestSerializer(); +namespace streaming { - public final String keyspace; - public final Collection> ranges; - public final Collection columnFamilies = new HashSet<>(); - public final long repairedAt; - public StreamRequest(String keyspace, Collection> ranges, Collection columnFamilies, long repairedAt) - { - this.keyspace = keyspace; - this.ranges = ranges; - this.columnFamilies.addAll(columnFamilies); - this.repairedAt = repairedAt; +class stream_request { +public: + using token = dht::token; + sstring keyspace; + std::vector> ranges; + std::vector column_families; + long repaired_at; + stream_request(sstring _keyspace, std::vector> _ranges, std::vector _column_families, long _repaired_at) + : keyspace(std::move(_keyspace)) + , ranges(std::move(_ranges)) + , column_families(std::move(_column_families)) + , repaired_at(_repaired_at) { } +#if 0 public static class StreamRequestSerializer implements IVersionedSerializer { public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException @@ -59,13 +56,13 @@ public class StreamRequest Token.serializer.serialize(range.right, out); } out.writeInt(request.columnFamilies.size()); - for (String cf : request.columnFamilies) + for (sstring cf : request.columnFamilies) out.writeUTF(cf); } public StreamRequest deserialize(DataInput in, int version) throws IOException { - String keyspace = in.readUTF(); + sstring keyspace = in.readUTF(); long repairedAt = in.readLong(); int rangeCount = in.readInt(); List> ranges = new ArrayList<>(rangeCount); @@ -76,7 +73,7 @@ public class StreamRequest ranges.add(new Range<>(left, right)); } int cfCount = in.readInt(); - List columnFamilies = new ArrayList<>(cfCount); + List columnFamilies = new ArrayList<>(cfCount); for (int i = 0; i < cfCount; i++) columnFamilies.add(in.readUTF()); return new StreamRequest(keyspace, ranges, columnFamilies, repairedAt); @@ -93,9 +90,12 @@ public class StreamRequest size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE); } size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size()); - for (String cf : request.columnFamilies) + for (sstring cf : request.columnFamilies) size += TypeSizes.NATIVE.sizeof(cf); return size; } } -} +#endif +}; + +} // namespace streaming From 501eebce3997a685b25a058510e298baf78e50a1 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 11:52:12 +0800 Subject: [PATCH 06/15] streaming: Import StreamSummary.java --- streaming/StreamSummary.java | 107 +++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 streaming/StreamSummary.java diff --git a/streaming/StreamSummary.java b/streaming/StreamSummary.java new file mode 100644 index 0000000000..dc332cbb8a --- /dev/null +++ b/streaming/StreamSummary.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.DataInput; +import java.io.IOException; +import java.io.Serializable; +import java.util.UUID; + +import com.google.common.base.Objects; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.UUIDSerializer; + +/** + * Summary of streaming. + */ +public class StreamSummary implements Serializable +{ + public static final IVersionedSerializer serializer = new StreamSummarySerializer(); + + public final UUID cfId; + + /** + * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. + */ + public final int files; + public final long totalSize; + + public StreamSummary(UUID cfId, int files, long totalSize) + { + this.cfId = cfId; + this.files = files; + this.totalSize = totalSize; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StreamSummary summary = (StreamSummary) o; + return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId); + } + + @Override + public int hashCode() + { + return Objects.hashCode(cfId, files, totalSize); + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder("StreamSummary{"); + sb.append("path=").append(cfId); + sb.append(", files=").append(files); + sb.append(", totalSize=").append(totalSize); + sb.append('}'); + return sb.toString(); + } + + public static class StreamSummarySerializer implements IVersionedSerializer + { + // arbitrary version is fine for UUIDSerializer for now... + public void serialize(StreamSummary summary, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version); + out.writeInt(summary.files); + out.writeLong(summary.totalSize); + } + + public StreamSummary deserialize(DataInput in, int version) throws IOException + { + UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); + int files = in.readInt(); + long totalSize = in.readLong(); + return new StreamSummary(cfId, files, totalSize); + } + + public long serializedSize(StreamSummary summary, int version) + { + long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version); + size += TypeSizes.NATIVE.sizeof(summary.files); + size += TypeSizes.NATIVE.sizeof(summary.totalSize); + return size; + } + } +} From d1889fe7e5963eb4e6e39bd380f117bfdddf7954 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 11:52:39 +0800 Subject: [PATCH 07/15] streaming: Convert StreamSummary.java to C++ --- configure.py | 1 + streaming/stream_summary.cc | 22 +++++++++ .../{StreamSummary.java => stream_summary.hh} | 45 +++++++++---------- 3 files changed, 44 insertions(+), 24 deletions(-) create mode 100644 streaming/stream_summary.cc rename streaming/{StreamSummary.java => stream_summary.hh} (77%) diff --git a/configure.py b/configure.py index 2f0b211577..0f6675abaa 100755 --- a/configure.py +++ b/configure.py @@ -466,6 +466,7 @@ urchin_core = (['database.cc', 'streaming/streaming.cc', 'streaming/stream_session.cc', 'streaming/stream_request.cc', + 'streaming/stream_summary.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] diff --git a/streaming/stream_summary.cc b/streaming/stream_summary.cc new file mode 100644 index 0000000000..513b65cf36 --- /dev/null +++ b/streaming/stream_summary.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_summary.hh" diff --git a/streaming/StreamSummary.java b/streaming/stream_summary.hh similarity index 77% rename from streaming/StreamSummary.java rename to streaming/stream_summary.hh index dc332cbb8a..672d0a88f6 100644 --- a/streaming/StreamSummary.java +++ b/streaming/stream_summary.hh @@ -14,44 +14,38 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. */ -package org.apache.cassandra.streaming; -import java.io.DataInput; -import java.io.IOException; -import java.io.Serializable; -import java.util.UUID; +#pragma once -import com.google.common.base.Objects; +#include "utils/UUID.hh" -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.UUIDSerializer; +namespace streaming { /** * Summary of streaming. */ -public class StreamSummary implements Serializable -{ - public static final IVersionedSerializer serializer = new StreamSummarySerializer(); - - public final UUID cfId; +class stream_summary { +public: + using UUID = utils::UUID; + UUID cf_id; /** * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. */ - public final int files; - public final long totalSize; + int files; + long total_size; - public StreamSummary(UUID cfId, int files, long totalSize) - { - this.cfId = cfId; - this.files = files; - this.totalSize = totalSize; + stream_summary(UUID _cf_id, int _files, long _total_size) + : cf_id (_cf_id) + , files(_files) + , total_size(_total_size) { } +#if 0 @Override public boolean equals(Object o) { @@ -104,4 +98,7 @@ public class StreamSummary implements Serializable return size; } } -} +#endif +}; + +} // namespace streaming From 6c8b002e41a652d72ecef7b0c7c79b436cdf41dc Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 12:06:28 +0800 Subject: [PATCH 08/15] streaming: Enable requests and summaries in prepare_message --- streaming/messages/prepare_message.hh | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/streaming/messages/prepare_message.hh b/streaming/messages/prepare_message.hh index 3d7abd3eb6..ff5ad6a39c 100644 --- a/streaming/messages/prepare_message.hh +++ b/streaming/messages/prepare_message.hh @@ -22,6 +22,8 @@ #pragma once #include "streaming/messages/stream_message.hh" +#include "streaming/stream_request.hh" +#include "streaming/stream_summary.hh" namespace streaming { namespace messages { @@ -57,17 +59,19 @@ class prepare_message : public stream_message { StreamSummary.serializer.serialize(summary, out, version); } }; - +#endif +public: /** * Streaming requests */ - public final Collection requests = new ArrayList<>(); + std::vector requests; /** * Summaries of streaming out */ - public final Collection summaries = new ArrayList<>(); + std::vector summaries; +#if 0 public PrepareMessage() { super(Type.PREPARE); From 2f7ab6f293a5d8c1e448ba01494d889a76b38475 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 12:16:12 +0800 Subject: [PATCH 09/15] streaming: Add fields to stream_init_message --- streaming/messages/stream_init_message.hh | 40 +++++++++++++---------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/streaming/messages/stream_init_message.hh b/streaming/messages/stream_init_message.hh index 60bf1a6c30..9f0971d953 100644 --- a/streaming/messages/stream_init_message.hh +++ b/streaming/messages/stream_init_message.hh @@ -21,38 +21,44 @@ #pragma once +#include "core/sstring.hh" #include "bytes.hh" +#include "gms/inet_address.hh" +#include "utils/UUID.hh" namespace streaming { namespace messages { + /** * StreamInitMessage is first sent from the node where {@link org.apache.cassandra.streaming.StreamSession} is started, * to initiate corresponding {@link org.apache.cassandra.streaming.StreamSession} on the other side. */ class stream_init_message { -#if 0 - public static IVersionedSerializer serializer = new StreamInitMessageSerializer(); - - public final InetAddress from; - public final int sessionIndex; - public final UUID planId; - public final String description; +public: + using inet_address = gms::inet_address; + using UUID = utils::UUID; + inet_address from; + int session_index; + UUID plan_id; + sstring description; // true if this init message is to connect for outgoing message on receiving side - public final boolean isForOutgoing; - public final boolean keepSSTableLevel; + bool is_for_outgoing; + bool keep_ss_table_level; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel) - { - this.from = from; - this.sessionIndex = sessionIndex; - this.planId = planId; - this.description = description; - this.isForOutgoing = isForOutgoing; - this.keepSSTableLevel = keepSSTableLevel; + stream_init_message() = default; + + stream_init_message(inet_address _from, int _session_index, UUID _plan_id, sstring _description, bool _is_for_outgoing, bool _keep_ss_table_level) + : from(_from) + , session_index(_session_index) + , plan_id(_plan_id) + , description(_description) + , is_for_outgoing(_is_for_outgoing) + , keep_ss_table_level(-keep_ss_table_level) { } +#if 0 /** * Create serialized message. * From e119cb946363e6850e72b02c19839f45a9463fd9 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 12:19:20 +0800 Subject: [PATCH 10/15] streaming: Add StreamTask.java --- streaming/StreamTask.java | 61 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 streaming/StreamTask.java diff --git a/streaming/StreamTask.java b/streaming/StreamTask.java new file mode 100644 index 0000000000..ac72cffc66 --- /dev/null +++ b/streaming/StreamTask.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.util.UUID; + +/** + * StreamTask is an abstraction of the streaming task performed over specific ColumnFamily. + */ +public abstract class StreamTask +{ + /** StreamSession that this task belongs */ + protected final StreamSession session; + + protected final UUID cfId; + + protected StreamTask(StreamSession session, UUID cfId) + { + this.session = session; + this.cfId = cfId; + } + + /** + * @return total number of files this task receives/streams. + */ + public abstract int getTotalNumberOfFiles(); + + /** + * @return total bytes expected to receive + */ + public abstract long getTotalSize(); + + /** + * Abort the task. + * Subclass should implement cleaning up resources. + */ + public abstract void abort(); + + /** + * @return StreamSummary that describes this task + */ + public StreamSummary getSummary() + { + return new StreamSummary(cfId, getTotalNumberOfFiles(), getTotalSize()); + } +} From b315e6505a41d4e1d4c0e7921d6a00d90b73a538 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 12:19:56 +0800 Subject: [PATCH 11/15] streaming: Convert StreamTask to C++ --- streaming/{StreamTask.java => stream_task.hh} | 43 +++++++++++-------- streaming/streaming.cc | 1 + 2 files changed, 27 insertions(+), 17 deletions(-) rename streaming/{StreamTask.java => stream_task.hh} (63%) diff --git a/streaming/StreamTask.java b/streaming/stream_task.hh similarity index 63% rename from streaming/StreamTask.java rename to streaming/stream_task.hh index ac72cffc66..00d1473f61 100644 --- a/streaming/StreamTask.java +++ b/streaming/stream_task.hh @@ -14,48 +14,57 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. */ -package org.apache.cassandra.streaming; -import java.util.UUID; +#pragma once +#include "utils/UUID.hh" +#include "streaming/stream_session.hh" +#include "streaming/stream_summary.hh" + +namespace streaming { /** * StreamTask is an abstraction of the streaming task performed over specific ColumnFamily. */ -public abstract class StreamTask -{ +class stream_task { +protected: + using UUID = utils::UUID; /** StreamSession that this task belongs */ - protected final StreamSession session; + stream_session& session; - protected final UUID cfId; + UUID cf_id; - protected StreamTask(StreamSession session, UUID cfId) - { - this.session = session; - this.cfId = cfId; + stream_task(stream_session& _session, UUID _cf_id) + : session(_session) + , cf_id(std::move(_cf_id)) { } +public: /** * @return total number of files this task receives/streams. */ - public abstract int getTotalNumberOfFiles(); + virtual int get_total_number_of_files() = 0; /** * @return total bytes expected to receive */ - public abstract long getTotalSize(); + virtual long get_total_size() = 0; /** * Abort the task. * Subclass should implement cleaning up resources. */ - public abstract void abort(); + virtual void abort() = 0; /** * @return StreamSummary that describes this task */ - public StreamSummary getSummary() - { - return new StreamSummary(cfId, getTotalNumberOfFiles(), getTotalSize()); + virtual stream_summary get_summary() { + return stream_summary(this->cf_id, this->get_total_number_of_files(), this->get_total_size()); } -} +}; + +} // namespace streaming diff --git a/streaming/streaming.cc b/streaming/streaming.cc index f6aa71e4b8..a63731ef49 100644 --- a/streaming/streaming.cc +++ b/streaming/streaming.cc @@ -16,3 +16,4 @@ #include "streaming/messages/retry_message.hh" #include "streaming/messages/session_failed_message.hh" #include "streaming/stream_session.hh" +#include "streaming/stream_task.hh" From 0caad5f8f26c973bc5ba869db69677b4b178d0c8 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 12:34:42 +0800 Subject: [PATCH 12/15] streaming: Import StreamTransferTask.java --- streaming/StreamTransferTask.java | 156 ++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 streaming/StreamTransferTask.java diff --git a/streaming/StreamTransferTask.java b/streaming/StreamTransferTask.java new file mode 100644 index 0000000000..a3dd10f1ca --- /dev/null +++ b/streaming/StreamTransferTask.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.messages.OutgoingFileMessage; +import org.apache.cassandra.utils.Pair; + +/** + * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. + */ +public class StreamTransferTask extends StreamTask +{ + private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts")); + + private final AtomicInteger sequenceNumber = new AtomicInteger(0); + private boolean aborted = false; + + private final Map files = new HashMap<>(); + private final Map timeoutTasks = new HashMap<>(); + + private long totalSize; + + public StreamTransferTask(StreamSession session, UUID cfId) + { + super(session, cfId); + } + + public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List> sections, long repairedAt) + { + assert sstable != null && cfId.equals(sstable.metadata.cfId); + OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); + files.put(message.header.sequenceNumber, message); + totalSize += message.header.size(); + } + + /** + * Received ACK for file at {@code sequenceNumber}. + * + * @param sequenceNumber sequence number of file + */ + public void complete(int sequenceNumber) + { + boolean signalComplete; + synchronized (this) + { + ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber); + if (timeout != null) + timeout.cancel(false); + + OutgoingFileMessage file = files.remove(sequenceNumber); + if (file != null) + file.sstable.releaseReference(); + + signalComplete = files.isEmpty(); + } + + // all file sent, notify session this task is complete. + if (signalComplete) + session.taskCompleted(this); + } + + public synchronized void abort() + { + if (aborted) + return; + aborted = true; + + for (ScheduledFuture future : timeoutTasks.values()) + future.cancel(false); + timeoutTasks.clear(); + + for (OutgoingFileMessage file : files.values()) + file.sstable.releaseReference(); + } + + public synchronized int getTotalNumberOfFiles() + { + return files.size(); + } + + public long getTotalSize() + { + return totalSize; + } + + public synchronized Collection getFileMessages() + { + // We may race between queuing all those messages and the completion of the completion of + // the first ones. So copy tthe values to avoid a ConcurrentModificationException + return new ArrayList<>(files.values()); + } + + public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber) + { + // remove previous time out task to be rescheduled later + ScheduledFuture future = timeoutTasks.remove(sequenceNumber); + if (future != null) + future.cancel(false); + return files.get(sequenceNumber); + } + + /** + * Schedule timeout task to release reference for file sent. + * When not receiving ACK after sending to receiver in given time, + * the task will release reference. + * + * @param sequenceNumber sequence number of file sent. + * @param time time to timeout + * @param unit unit of given time + * @return scheduled future for timeout task + */ + public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) + { + if (!files.containsKey(sequenceNumber)) + return null; + + ScheduledFuture future = timeoutExecutor.schedule(new Runnable() + { + public void run() + { + synchronized (StreamTransferTask.this) + { + // remove so we don't cancel ourselves + timeoutTasks.remove(sequenceNumber); + StreamTransferTask.this.complete(sequenceNumber); + } + } + }, time, unit); + + ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future); + assert prev == null; + return future; + } +} From 334b1f81fca0732c08bf71a30c76ef14b32cfd55 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 12:35:16 +0800 Subject: [PATCH 13/15] streaming: Convert StreamTransferTask.java to C++ --- configure.py | 1 + streaming/stream_transfer_task.cc | 22 ++++++++ ...nsferTask.java => stream_transfer_task.hh} | 50 ++++++++++--------- 3 files changed, 50 insertions(+), 23 deletions(-) create mode 100644 streaming/stream_transfer_task.cc rename streaming/{StreamTransferTask.java => stream_transfer_task.hh} (81%) diff --git a/configure.py b/configure.py index 0f6675abaa..305adff7f7 100755 --- a/configure.py +++ b/configure.py @@ -467,6 +467,7 @@ urchin_core = (['database.cc', 'streaming/stream_session.cc', 'streaming/stream_request.cc', 'streaming/stream_summary.cc', + 'streaming/stream_transfer_task.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc new file mode 100644 index 0000000000..4bc7ce1671 --- /dev/null +++ b/streaming/stream_transfer_task.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_transfer_task.hh" diff --git a/streaming/StreamTransferTask.java b/streaming/stream_transfer_task.hh similarity index 81% rename from streaming/StreamTransferTask.java rename to streaming/stream_transfer_task.hh index a3dd10f1ca..8c8f12b583 100644 --- a/streaming/StreamTransferTask.java +++ b/streaming/stream_transfer_task.hh @@ -14,39 +14,40 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. */ -package org.apache.cassandra.streaming; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicInteger; +#pragma once -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.streaming.messages.OutgoingFileMessage; -import org.apache.cassandra.utils.Pair; +#include "utils/UUID.hh" +#include "streaming/stream_session.hh" +#include "streaming/stream_task.hh" +#include "streaming/messages/outgoing_file_message.hh" +#include + +namespace streaming { /** * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. */ -public class StreamTransferTask extends StreamTask -{ - private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts")); +class stream_transfer_task : public stream_task { +private: + //final AtomicInteger sequenceNumber = new AtomicInteger(0); + bool aborted = false; - private final AtomicInteger sequenceNumber = new AtomicInteger(0); - private boolean aborted = false; + std::map files; + //final Map timeoutTasks = new HashMap<>(); - private final Map files = new HashMap<>(); - private final Map timeoutTasks = new HashMap<>(); - - private long totalSize; - - public StreamTransferTask(StreamSession session, UUID cfId) - { - super(session, cfId); + long total_size; +public: + using UUID = utils::UUID; + stream_transfer_task(stream_session& session, UUID cf_id) + : stream_task(session, cf_id) { } +#if 0 public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List> sections, long repairedAt) { assert sstable != null && cfId.equals(sstable.metadata.cfId); @@ -153,4 +154,7 @@ public class StreamTransferTask extends StreamTask assert prev == null; return future; } -} +#endif +}; + +} // namespace streaming From 0391f55ffd5f3b9c8c5243de23383cf0edda86e1 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 14:40:41 +0800 Subject: [PATCH 14/15] streaming: Import StreamReceiveTask.java --- streaming/StreamReceiveTask.java | 162 +++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 streaming/StreamReceiveTask.java diff --git a/streaming/StreamReceiveTask.java b/streaming/StreamReceiveTask.java new file mode 100644 index 0000000000..0efcc93ec9 --- /dev/null +++ b/streaming/StreamReceiveTask.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.File; +import java.io.IOError; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +/** + * Task that manages receiving files for the session for certain ColumnFamily. + */ +public class StreamReceiveTask extends StreamTask +{ + private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask")); + + // number of files to receive + private final int totalFiles; + // total size of files to receive + private final long totalSize; + + // true if task is done (either completed or aborted) + private boolean done = false; + + // holds references to SSTables received + protected Collection sstables; + + public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) + { + super(session, cfId); + this.totalFiles = totalFiles; + this.totalSize = totalSize; + this.sstables = new ArrayList<>(totalFiles); + } + + /** + * Process received file. + * + * @param sstable SSTable file received. + */ + public synchronized void received(SSTableWriter sstable) + { + if (done) + return; + + assert cfId.equals(sstable.metadata.cfId); + + sstables.add(sstable); + if (sstables.size() == totalFiles) + { + done = true; + executor.submit(new OnCompletionRunnable(this)); + } + } + + public int getTotalNumberOfFiles() + { + return totalFiles; + } + + public long getTotalSize() + { + return totalSize; + } + + private static class OnCompletionRunnable implements Runnable + { + private final StreamReceiveTask task; + + public OnCompletionRunnable(StreamReceiveTask task) + { + this.task = task; + } + + public void run() + { + Pair kscf = Schema.instance.getCF(task.cfId); + if (kscf == null) + { + // schema was dropped during streaming + for (SSTableWriter writer : task.sstables) + writer.abort(); + task.sstables.clear(); + return; + } + ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256); + if (lockfiledir == null) + throw new IOError(new IOException("All disks full")); + StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); + lockfile.create(task.sstables); + List readers = new ArrayList<>(); + for (SSTableWriter writer : task.sstables) + readers.add(writer.closeAndOpenReader()); + lockfile.delete(); + task.sstables.clear(); + + if (!SSTableReader.acquireReferences(readers)) + throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred"); + try + { + // add sstables and build secondary indexes + cfs.addSSTables(readers); + cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); + } + finally + { + SSTableReader.releaseReferences(readers); + } + + task.session.taskCompleted(task); + } + } + + /** + * Abort this task. + * If the task already received all files and + * {@link org.apache.cassandra.streaming.StreamReceiveTask.OnCompletionRunnable} task is submitted, + * then task cannot be aborted. + */ + public synchronized void abort() + { + if (done) + return; + + done = true; + for (SSTableWriter writer : sstables) + writer.abort(); + sstables.clear(); + } +} From d1c04ec2e3b34e3f7f5f76e1c386840a9bbef7ea Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 18 Jun 2015 14:53:24 +0800 Subject: [PATCH 15/15] streaming: Convert StreamReceiveTask.java to C++ --- configure.py | 1 + streaming/stream_receive_task.cc | 22 +++++++ ...eceiveTask.java => stream_receive_task.hh} | 63 ++++++++----------- 3 files changed, 50 insertions(+), 36 deletions(-) create mode 100644 streaming/stream_receive_task.cc rename streaming/{StreamReceiveTask.java => stream_receive_task.hh} (74%) diff --git a/configure.py b/configure.py index 305adff7f7..d921450bdb 100755 --- a/configure.py +++ b/configure.py @@ -468,6 +468,7 @@ urchin_core = (['database.cc', 'streaming/stream_request.cc', 'streaming/stream_summary.cc', 'streaming/stream_transfer_task.cc', + 'streaming/stream_receive_task.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] diff --git a/streaming/stream_receive_task.cc b/streaming/stream_receive_task.cc new file mode 100644 index 0000000000..c5c1ef7e6d --- /dev/null +++ b/streaming/stream_receive_task.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_receive_task.hh" diff --git a/streaming/StreamReceiveTask.java b/streaming/stream_receive_task.hh similarity index 74% rename from streaming/StreamReceiveTask.java rename to streaming/stream_receive_task.hh index 0efcc93ec9..4c6e3330ce 100644 --- a/streaming/StreamReceiveTask.java +++ b/streaming/stream_receive_task.hh @@ -14,55 +14,43 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. */ -package org.apache.cassandra.streaming; -import java.io.File; -import java.io.IOError; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +#pragma once -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; +#include "utils/UUID.hh" +#include "streaming/stream_session.hh" +#include "streaming/stream_task.hh" +#include "streaming/messages/outgoing_file_message.hh" + +namespace streaming { /** * Task that manages receiving files for the session for certain ColumnFamily. */ -public class StreamReceiveTask extends StreamTask -{ - private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask")); - +class stream_receive_task : public stream_task { +private: // number of files to receive - private final int totalFiles; + int total_files; // total size of files to receive - private final long totalSize; + long total_size; // true if task is done (either completed or aborted) - private boolean done = false; + bool done = false; - // holds references to SSTables received - protected Collection sstables; - - public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) - { - super(session, cfId); - this.totalFiles = totalFiles; - this.totalSize = totalSize; - this.sstables = new ArrayList<>(totalFiles); + // holds references to SSTables received + // protected Collection sstables; +public: + stream_receive_task(stream_session& _session, UUID _cf_id, int _total_files, long _total_size) + : stream_task(_session, _cf_id) + , total_files(_total_files) + , total_size(_total_size) { } +#if 0 /** * Process received file. * @@ -159,4 +147,7 @@ public class StreamReceiveTask extends StreamTask writer.abort(); sstables.clear(); } -} +#endif +}; + +} // namespace streaming