From 4604ae7d4bd78858ecd9836e92831082e8a7c299 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 19 Jun 2015 14:23:14 +0800 Subject: [PATCH] streaming: Import ConnectionHandler.java --- streaming/ConnectionHandler.java | 360 +++++++++++++++++++++++++++++++ 1 file changed, 360 insertions(+) create mode 100644 streaming/ConnectionHandler.java diff --git a/streaming/ConnectionHandler.java b/streaming/ConnectionHandler.java new file mode 100644 index 0000000000..7a7ccbf8be --- /dev/null +++ b/streaming/ConnectionHandler.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.streaming.messages.StreamInitMessage; +import org.apache.cassandra.streaming.messages.StreamMessage; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; + +/** + * ConnectionHandler manages incoming/outgoing message exchange for the {@link StreamSession}. + * + *

+ * Internally, ConnectionHandler manages thread to receive incoming {@link StreamMessage} and thread to + * send outgoing message. Messages are encoded/decoded on those thread and handed to + * {@link StreamSession#messageReceived(org.apache.cassandra.streaming.messages.StreamMessage)}. + */ +public class ConnectionHandler +{ + private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class); + + private final StreamSession session; + + private IncomingMessageHandler incoming; + private OutgoingMessageHandler outgoing; + + ConnectionHandler(StreamSession session) + { + this.session = session; + this.incoming = new IncomingMessageHandler(session); + this.outgoing = new OutgoingMessageHandler(session); + } + + /** + * Set up incoming message handler and initiate streaming. + * + * This method is called once on initiator. + * + * @throws IOException + */ + public void initiate() throws IOException + { + logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId()); + Socket incomingSocket = session.createConnection(); + incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION); + incoming.sendInitMessage(incomingSocket, true); + + logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId()); + Socket outgoingSocket = session.createConnection(); + outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION); + outgoing.sendInitMessage(outgoingSocket, false); + } + + /** + * Set up outgoing message handler on receiving side. + * + * @param socket socket to use for {@link org.apache.cassandra.streaming.ConnectionHandler.OutgoingMessageHandler}. + * @param version Streaming message version + * @throws IOException + */ + public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException + { + if (isForOutgoing) + outgoing.start(socket, version); + else + incoming.start(socket, version); + } + + public ListenableFuture close() + { + logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer); + + ListenableFuture inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close(); + ListenableFuture outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close(); + + return Futures.allAsList(inClosed, outClosed); + } + + /** + * Enqueue messages to be sent. + * + * @param messages messages to send + */ + public void sendMessages(Collection messages) + { + for (StreamMessage message : messages) + sendMessage(message); + } + + public void sendMessage(StreamMessage message) + { + if (outgoing.isClosed()) + throw new RuntimeException("Outgoing stream handler has been closed"); + + outgoing.enqueue(message); + } + + /** + * @return true if outgoing connection is opened and ready to send messages + */ + public boolean isOutgoingConnected() + { + return outgoing != null && !outgoing.isClosed(); + } + + abstract static class MessageHandler implements Runnable + { + protected final StreamSession session; + + protected int protocolVersion; + protected Socket socket; + + private final AtomicReference> closeFuture = new AtomicReference<>(); + + protected MessageHandler(StreamSession session) + { + this.session = session; + } + + protected abstract String name(); + + protected static DataOutputStreamAndChannel getWriteChannel(Socket socket) throws IOException + { + WritableByteChannel out = socket.getChannel(); + // socket channel is null when encrypted(SSL) + if (out == null) + out = Channels.newChannel(socket.getOutputStream()); + return new DataOutputStreamAndChannel(socket.getOutputStream(), out); + } + + protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException + { + ReadableByteChannel in = socket.getChannel(); + // socket channel is null when encrypted(SSL) + return in == null + ? Channels.newChannel(socket.getInputStream()) + : in; + } + + public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException + { + StreamInitMessage message = new StreamInitMessage( + FBUtilities.getBroadcastAddress(), + session.sessionIndex(), + session.planId(), + session.description(), + isForOutgoing, + session.keepSSTableLevel()); + ByteBuffer messageBuf = message.createMessage(false, protocolVersion); + getWriteChannel(socket).write(messageBuf); + } + + public void start(Socket socket, int protocolVersion) + { + this.socket = socket; + this.protocolVersion = protocolVersion; + + new Thread(this, name() + "-" + session.peer).start(); + } + + public ListenableFuture close() + { + // Assume it wasn't closed. Not a huge deal if we create a future on a race + SettableFuture future = SettableFuture.create(); + return closeFuture.compareAndSet(null, future) + ? future + : closeFuture.get(); + } + + public boolean isClosed() + { + return closeFuture.get() != null; + } + + protected void signalCloseDone() + { + closeFuture.get().set(null); + + // We can now close the socket + try + { + socket.close(); + } + catch (IOException ignore) {} + } + } + + /** + * Incoming streaming message handler + */ + static class IncomingMessageHandler extends MessageHandler + { + IncomingMessageHandler(StreamSession session) + { + super(session); + } + + protected String name() + { + return "STREAM-IN"; + } + + public void run() + { + try + { + ReadableByteChannel in = getReadChannel(socket); + while (!isClosed()) + { + // receive message + StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session); + // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok + // to ignore here since we'll have asked for a retry. + if (message != null) + { + logger.debug("[Stream #{}] Received {}", session.planId(), message); + session.messageReceived(message); + } + } + } + catch (SocketException e) + { + // socket is closed + close(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + session.onError(t); + } + finally + { + signalCloseDone(); + } + } + } + + /** + * Outgoing file transfer thread + */ + static class OutgoingMessageHandler extends MessageHandler + { + /* + * All out going messages are queued up into messageQueue. + * The size will grow when received streaming request. + * + * Queue is also PriorityQueue so that prior messages can go out fast. + */ + private final PriorityBlockingQueue messageQueue = new PriorityBlockingQueue<>(64, new Comparator() + { + public int compare(StreamMessage o1, StreamMessage o2) + { + return o2.getPriority() - o1.getPriority(); + } + }); + + OutgoingMessageHandler(StreamSession session) + { + super(session); + } + + protected String name() + { + return "STREAM-OUT"; + } + + public void enqueue(StreamMessage message) + { + messageQueue.put(message); + } + + public void run() + { + try + { + DataOutputStreamAndChannel out = getWriteChannel(socket); + + StreamMessage next; + while (!isClosed()) + { + if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null) + { + logger.debug("[Stream #{}] Sending {}", session.planId(), next); + sendMessage(out, next); + if (next.type == StreamMessage.Type.SESSION_FAILED) + close(); + } + } + + // Sends the last messages on the queue + while ((next = messageQueue.poll()) != null) + sendMessage(out, next); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (Throwable e) + { + session.onError(e); + } + finally + { + signalCloseDone(); + } + } + + private void sendMessage(DataOutputStreamAndChannel out, StreamMessage message) + { + try + { + StreamMessage.serialize(message, out, protocolVersion, session); + } + catch (SocketException e) + { + session.onError(e); + close(); + } + catch (IOException e) + { + session.onError(e); + } + } + } +}