diff --git a/streaming/ConnectionHandler.java b/streaming/ConnectionHandler.java new file mode 100644 index 0000000000..7a7ccbf8be --- /dev/null +++ b/streaming/ConnectionHandler.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.streaming.messages.StreamInitMessage; +import org.apache.cassandra.streaming.messages.StreamMessage; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; + +/** + * ConnectionHandler manages incoming/outgoing message exchange for the {@link StreamSession}. + * + *
+ * Internally, ConnectionHandler manages thread to receive incoming {@link StreamMessage} and thread to
+ * send outgoing message. Messages are encoded/decoded on those thread and handed to
+ * {@link StreamSession#messageReceived(org.apache.cassandra.streaming.messages.StreamMessage)}.
+ */
+public class ConnectionHandler
+{
+ private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class);
+
+ private final StreamSession session;
+
+ private IncomingMessageHandler incoming;
+ private OutgoingMessageHandler outgoing;
+
+ ConnectionHandler(StreamSession session)
+ {
+ this.session = session;
+ this.incoming = new IncomingMessageHandler(session);
+ this.outgoing = new OutgoingMessageHandler(session);
+ }
+
+ /**
+ * Set up incoming message handler and initiate streaming.
+ *
+ * This method is called once on initiator.
+ *
+ * @throws IOException
+ */
+ public void initiate() throws IOException
+ {
+ logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
+ Socket incomingSocket = session.createConnection();
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
+ incoming.sendInitMessage(incomingSocket, true);
+
+ logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
+ Socket outgoingSocket = session.createConnection();
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
+ outgoing.sendInitMessage(outgoingSocket, false);
+ }
+
+ /**
+ * Set up outgoing message handler on receiving side.
+ *
+ * @param socket socket to use for {@link org.apache.cassandra.streaming.ConnectionHandler.OutgoingMessageHandler}.
+ * @param version Streaming message version
+ * @throws IOException
+ */
+ public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException
+ {
+ if (isForOutgoing)
+ outgoing.start(socket, version);
+ else
+ incoming.start(socket, version);
+ }
+
+ public ListenableFuture> close()
+ {
+ logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer);
+
+ ListenableFuture> inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close();
+ ListenableFuture> outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close();
+
+ return Futures.allAsList(inClosed, outClosed);
+ }
+
+ /**
+ * Enqueue messages to be sent.
+ *
+ * @param messages messages to send
+ */
+ public void sendMessages(Collection extends StreamMessage> messages)
+ {
+ for (StreamMessage message : messages)
+ sendMessage(message);
+ }
+
+ public void sendMessage(StreamMessage message)
+ {
+ if (outgoing.isClosed())
+ throw new RuntimeException("Outgoing stream handler has been closed");
+
+ outgoing.enqueue(message);
+ }
+
+ /**
+ * @return true if outgoing connection is opened and ready to send messages
+ */
+ public boolean isOutgoingConnected()
+ {
+ return outgoing != null && !outgoing.isClosed();
+ }
+
+ abstract static class MessageHandler implements Runnable
+ {
+ protected final StreamSession session;
+
+ protected int protocolVersion;
+ protected Socket socket;
+
+ private final AtomicReference