Merge "streaming updates" from Asias

This commit is contained in:
Avi Kivity
2015-06-18 15:49:07 +03:00
13 changed files with 711 additions and 20 deletions

View File

@@ -465,6 +465,10 @@ urchin_core = (['database.cc',
'service/storage_service.cc',
'streaming/streaming.cc',
'streaming/stream_session.cc',
'streaming/stream_request.cc',
'streaming/stream_summary.cc',
'streaming/stream_transfer_task.cc',
'streaming/stream_receive_task.cc',
]
+ [Antlr3Grammar('cql3/Cql.g')]
+ [Thrift('interface/cassandra.thrift', 'Cassandra')]

View File

@@ -22,6 +22,8 @@
#pragma once
#include "streaming/messages/stream_message.hh"
#include "streaming/stream_request.hh"
#include "streaming/stream_summary.hh"
namespace streaming {
namespace messages {
@@ -57,17 +59,19 @@ class prepare_message : public stream_message {
StreamSummary.serializer.serialize(summary, out, version);
}
};
#endif
public:
/**
* Streaming requests
*/
public final Collection<StreamRequest> requests = new ArrayList<>();
std::vector<stream_request> requests;
/**
* Summaries of streaming out
*/
public final Collection<StreamSummary> summaries = new ArrayList<>();
std::vector<stream_summary> summaries;
#if 0
public PrepareMessage()
{
super(Type.PREPARE);

View File

@@ -21,38 +21,44 @@
#pragma once
#include "core/sstring.hh"
#include "bytes.hh"
#include "gms/inet_address.hh"
#include "utils/UUID.hh"
namespace streaming {
namespace messages {
/**
* StreamInitMessage is first sent from the node where {@link org.apache.cassandra.streaming.StreamSession} is started,
* to initiate corresponding {@link org.apache.cassandra.streaming.StreamSession} on the other side.
*/
class stream_init_message {
#if 0
public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
public final InetAddress from;
public final int sessionIndex;
public final UUID planId;
public final String description;
public:
using inet_address = gms::inet_address;
using UUID = utils::UUID;
inet_address from;
int session_index;
UUID plan_id;
sstring description;
// true if this init message is to connect for outgoing message on receiving side
public final boolean isForOutgoing;
public final boolean keepSSTableLevel;
bool is_for_outgoing;
bool keep_ss_table_level;
public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel)
{
this.from = from;
this.sessionIndex = sessionIndex;
this.planId = planId;
this.description = description;
this.isForOutgoing = isForOutgoing;
this.keepSSTableLevel = keepSSTableLevel;
stream_init_message() = default;
stream_init_message(inet_address _from, int _session_index, UUID _plan_id, sstring _description, bool _is_for_outgoing, bool _keep_ss_table_level)
: from(_from)
, session_index(_session_index)
, plan_id(_plan_id)
, description(_description)
, is_for_outgoing(_is_for_outgoing)
, keep_ss_table_level(-keep_ss_table_level) {
}
#if 0
/**
* Create serialized message.
*

View File

@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#include "streaming/stream_receive_task.hh"

View File

@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "utils/UUID.hh"
#include "streaming/stream_session.hh"
#include "streaming/stream_task.hh"
#include "streaming/messages/outgoing_file_message.hh"
namespace streaming {
/**
* Task that manages receiving files for the session for certain ColumnFamily.
*/
class stream_receive_task : public stream_task {
private:
// number of files to receive
int total_files;
// total size of files to receive
long total_size;
// true if task is done (either completed or aborted)
bool done = false;
// holds references to SSTables received
// protected Collection<SSTableWriter> sstables;
public:
stream_receive_task(stream_session& _session, UUID _cf_id, int _total_files, long _total_size)
: stream_task(_session, _cf_id)
, total_files(_total_files)
, total_size(_total_size) {
}
#if 0
/**
* Process received file.
*
* @param sstable SSTable file received.
*/
public synchronized void received(SSTableWriter sstable)
{
if (done)
return;
assert cfId.equals(sstable.metadata.cfId);
sstables.add(sstable);
if (sstables.size() == totalFiles)
{
done = true;
executor.submit(new OnCompletionRunnable(this));
}
}
public int getTotalNumberOfFiles()
{
return totalFiles;
}
public long getTotalSize()
{
return totalSize;
}
private static class OnCompletionRunnable implements Runnable
{
private final StreamReceiveTask task;
public OnCompletionRunnable(StreamReceiveTask task)
{
this.task = task;
}
public void run()
{
Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
if (kscf == null)
{
// schema was dropped during streaming
for (SSTableWriter writer : task.sstables)
writer.abort();
task.sstables.clear();
return;
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256);
if (lockfiledir == null)
throw new IOError(new IOException("All disks full"));
StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
readers.add(writer.closeAndOpenReader());
lockfile.delete();
task.sstables.clear();
if (!SSTableReader.acquireReferences(readers))
throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
try
{
// add sstables and build secondary indexes
cfs.addSSTables(readers);
cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
}
finally
{
SSTableReader.releaseReferences(readers);
}
task.session.taskCompleted(task);
}
}
/**
* Abort this task.
* If the task already received all files and
* {@link org.apache.cassandra.streaming.StreamReceiveTask.OnCompletionRunnable} task is submitted,
* then task cannot be aborted.
*/
public synchronized void abort()
{
if (done)
return;
done = true;
for (SSTableWriter writer : sstables)
writer.abort();
sstables.clear();
}
#endif
};
} // namespace streaming

View File

@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#include "streaming/stream_request.hh"

101
streaming/stream_request.hh Normal file
View File

@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "core/sstring.hh"
#include "query-request.hh"
#include "dht/i_partitioner.hh"
#include <vector>
namespace streaming {
class stream_request {
public:
using token = dht::token;
sstring keyspace;
std::vector<query::range<token>> ranges;
std::vector<sstring> column_families;
long repaired_at;
stream_request(sstring _keyspace, std::vector<query::range<token>> _ranges, std::vector<sstring> _column_families, long _repaired_at)
: keyspace(std::move(_keyspace))
, ranges(std::move(_ranges))
, column_families(std::move(_column_families))
, repaired_at(_repaired_at) {
}
#if 0
public static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
{
public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException
{
out.writeUTF(request.keyspace);
out.writeLong(request.repairedAt);
out.writeInt(request.ranges.size());
for (Range<Token> range : request.ranges)
{
Token.serializer.serialize(range.left, out);
Token.serializer.serialize(range.right, out);
}
out.writeInt(request.columnFamilies.size());
for (sstring cf : request.columnFamilies)
out.writeUTF(cf);
}
public StreamRequest deserialize(DataInput in, int version) throws IOException
{
sstring keyspace = in.readUTF();
long repairedAt = in.readLong();
int rangeCount = in.readInt();
List<Range<Token>> ranges = new ArrayList<>(rangeCount);
for (int i = 0; i < rangeCount; i++)
{
Token left = Token.serializer.deserialize(in);
Token right = Token.serializer.deserialize(in);
ranges.add(new Range<>(left, right));
}
int cfCount = in.readInt();
List<sstring> columnFamilies = new ArrayList<>(cfCount);
for (int i = 0; i < cfCount; i++)
columnFamilies.add(in.readUTF());
return new StreamRequest(keyspace, ranges, columnFamilies, repairedAt);
}
public long serializedSize(StreamRequest request, int version)
{
int size = TypeSizes.NATIVE.sizeof(request.keyspace);
size += TypeSizes.NATIVE.sizeof(request.repairedAt);
size += TypeSizes.NATIVE.sizeof(request.ranges.size());
for (Range<Token> range : request.ranges)
{
size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE);
size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE);
}
size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size());
for (sstring cf : request.columnFamilies)
size += TypeSizes.NATIVE.sizeof(cf);
return size;
}
}
#endif
};
} // namespace streaming

View File

@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#include "streaming/stream_summary.hh"

104
streaming/stream_summary.hh Normal file
View File

@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "utils/UUID.hh"
namespace streaming {
/**
* Summary of streaming.
*/
class stream_summary {
public:
using UUID = utils::UUID;
UUID cf_id;
/**
* Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
*/
int files;
long total_size;
stream_summary(UUID _cf_id, int _files, long _total_size)
: cf_id (_cf_id)
, files(_files)
, total_size(_total_size) {
}
#if 0
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamSummary summary = (StreamSummary) o;
return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
}
@Override
public int hashCode()
{
return Objects.hashCode(cfId, files, totalSize);
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder("StreamSummary{");
sb.append("path=").append(cfId);
sb.append(", files=").append(files);
sb.append(", totalSize=").append(totalSize);
sb.append('}');
return sb.toString();
}
public static class StreamSummarySerializer implements IVersionedSerializer<StreamSummary>
{
// arbitrary version is fine for UUIDSerializer for now...
public void serialize(StreamSummary summary, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version);
out.writeInt(summary.files);
out.writeLong(summary.totalSize);
}
public StreamSummary deserialize(DataInput in, int version) throws IOException
{
UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
int files = in.readInt();
long totalSize = in.readLong();
return new StreamSummary(cfId, files, totalSize);
}
public long serializedSize(StreamSummary summary, int version)
{
long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version);
size += TypeSizes.NATIVE.sizeof(summary.files);
size += TypeSizes.NATIVE.sizeof(summary.totalSize);
return size;
}
}
#endif
};
} // namespace streaming

70
streaming/stream_task.hh Normal file
View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "utils/UUID.hh"
#include "streaming/stream_session.hh"
#include "streaming/stream_summary.hh"
namespace streaming {
/**
* StreamTask is an abstraction of the streaming task performed over specific ColumnFamily.
*/
class stream_task {
protected:
using UUID = utils::UUID;
/** StreamSession that this task belongs */
stream_session& session;
UUID cf_id;
stream_task(stream_session& _session, UUID _cf_id)
: session(_session)
, cf_id(std::move(_cf_id)) {
}
public:
/**
* @return total number of files this task receives/streams.
*/
virtual int get_total_number_of_files() = 0;
/**
* @return total bytes expected to receive
*/
virtual long get_total_size() = 0;
/**
* Abort the task.
* Subclass should implement cleaning up resources.
*/
virtual void abort() = 0;
/**
* @return StreamSummary that describes this task
*/
virtual stream_summary get_summary() {
return stream_summary(this->cf_id, this->get_total_number_of_files(), this->get_total_size());
}
};
} // namespace streaming

View File

@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#include "streaming/stream_transfer_task.hh"

View File

@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "utils/UUID.hh"
#include "streaming/stream_session.hh"
#include "streaming/stream_task.hh"
#include "streaming/messages/outgoing_file_message.hh"
#include <map>
namespace streaming {
/**
* StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
*/
class stream_transfer_task : public stream_task {
private:
//final AtomicInteger sequenceNumber = new AtomicInteger(0);
bool aborted = false;
std::map<int, messages::outgoing_file_message> files;
//final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
long total_size;
public:
using UUID = utils::UUID;
stream_transfer_task(stream_session& session, UUID cf_id)
: stream_task(session, cf_id) {
}
#if 0
public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
assert sstable != null && cfId.equals(sstable.metadata.cfId);
OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
files.put(message.header.sequenceNumber, message);
totalSize += message.header.size();
}
/**
* Received ACK for file at {@code sequenceNumber}.
*
* @param sequenceNumber sequence number of file
*/
public void complete(int sequenceNumber)
{
boolean signalComplete;
synchronized (this)
{
ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber);
if (timeout != null)
timeout.cancel(false);
OutgoingFileMessage file = files.remove(sequenceNumber);
if (file != null)
file.sstable.releaseReference();
signalComplete = files.isEmpty();
}
// all file sent, notify session this task is complete.
if (signalComplete)
session.taskCompleted(this);
}
public synchronized void abort()
{
if (aborted)
return;
aborted = true;
for (ScheduledFuture future : timeoutTasks.values())
future.cancel(false);
timeoutTasks.clear();
for (OutgoingFileMessage file : files.values())
file.sstable.releaseReference();
}
public synchronized int getTotalNumberOfFiles()
{
return files.size();
}
public long getTotalSize()
{
return totalSize;
}
public synchronized Collection<OutgoingFileMessage> getFileMessages()
{
// We may race between queuing all those messages and the completion of the completion of
// the first ones. So copy tthe values to avoid a ConcurrentModificationException
return new ArrayList<>(files.values());
}
public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber)
{
// remove previous time out task to be rescheduled later
ScheduledFuture future = timeoutTasks.remove(sequenceNumber);
if (future != null)
future.cancel(false);
return files.get(sequenceNumber);
}
/**
* Schedule timeout task to release reference for file sent.
* When not receiving ACK after sending to receiver in given time,
* the task will release reference.
*
* @param sequenceNumber sequence number of file sent.
* @param time time to timeout
* @param unit unit of given time
* @return scheduled future for timeout task
*/
public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit)
{
if (!files.containsKey(sequenceNumber))
return null;
ScheduledFuture future = timeoutExecutor.schedule(new Runnable()
{
public void run()
{
synchronized (StreamTransferTask.this)
{
// remove so we don't cancel ourselves
timeoutTasks.remove(sequenceNumber);
StreamTransferTask.this.complete(sequenceNumber);
}
}
}, time, unit);
ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future);
assert prev == null;
return future;
}
#endif
};
} // namespace streaming

View File

@@ -16,3 +16,4 @@
#include "streaming/messages/retry_message.hh"
#include "streaming/messages/session_failed_message.hh"
#include "streaming/stream_session.hh"
#include "streaming/stream_task.hh"