/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.cassandra.streaming; import java.io.IOException; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Collection; import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.io.util.DataOutputStreamAndChannel; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; /** * ConnectionHandler manages incoming/outgoing message exchange for the {@link StreamSession}. * *
* Internally, ConnectionHandler manages thread to receive incoming {@link StreamMessage} and thread to
* send outgoing message. Messages are encoded/decoded on those thread and handed to
* {@link StreamSession#messageReceived(org.apache.cassandra.streaming.messages.StreamMessage)}.
*/
public class ConnectionHandler
{
private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class);
private final StreamSession session;
private IncomingMessageHandler incoming;
private OutgoingMessageHandler outgoing;
ConnectionHandler(StreamSession session)
{
this.session = session;
this.incoming = new IncomingMessageHandler(session);
this.outgoing = new OutgoingMessageHandler(session);
}
/**
* Set up incoming message handler and initiate streaming.
*
* This method is called once on initiator.
*
* @throws IOException
*/
public void initiate() throws IOException
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = session.createConnection();
incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
incoming.sendInitMessage(incomingSocket, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = session.createConnection();
outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
outgoing.sendInitMessage(outgoingSocket, false);
}
/**
* Set up outgoing message handler on receiving side.
*
* @param socket socket to use for {@link org.apache.cassandra.streaming.ConnectionHandler.OutgoingMessageHandler}.
* @param version Streaming message version
* @throws IOException
*/
public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException
{
if (isForOutgoing)
outgoing.start(socket, version);
else
incoming.start(socket, version);
}
public ListenableFuture> close()
{
logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer);
ListenableFuture> inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close();
ListenableFuture> outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close();
return Futures.allAsList(inClosed, outClosed);
}
/**
* Enqueue messages to be sent.
*
* @param messages messages to send
*/
public void sendMessages(Collection extends StreamMessage> messages)
{
for (StreamMessage message : messages)
sendMessage(message);
}
public void sendMessage(StreamMessage message)
{
if (outgoing.isClosed())
throw new RuntimeException("Outgoing stream handler has been closed");
outgoing.enqueue(message);
}
/**
* @return true if outgoing connection is opened and ready to send messages
*/
public boolean isOutgoingConnected()
{
return outgoing != null && !outgoing.isClosed();
}
abstract static class MessageHandler implements Runnable
{
protected final StreamSession session;
protected int protocolVersion;
protected Socket socket;
private final AtomicReference