diff --git a/configure.py b/configure.py index 9ec9b3d785..16f3281122 100755 --- a/configure.py +++ b/configure.py @@ -465,6 +465,10 @@ urchin_core = (['database.cc', 'service/storage_service.cc', 'streaming/streaming.cc', 'streaming/stream_session.cc', + 'streaming/stream_request.cc', + 'streaming/stream_summary.cc', + 'streaming/stream_transfer_task.cc', + 'streaming/stream_receive_task.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] diff --git a/streaming/messages/prepare_message.hh b/streaming/messages/prepare_message.hh index 3d7abd3eb6..ff5ad6a39c 100644 --- a/streaming/messages/prepare_message.hh +++ b/streaming/messages/prepare_message.hh @@ -22,6 +22,8 @@ #pragma once #include "streaming/messages/stream_message.hh" +#include "streaming/stream_request.hh" +#include "streaming/stream_summary.hh" namespace streaming { namespace messages { @@ -57,17 +59,19 @@ class prepare_message : public stream_message { StreamSummary.serializer.serialize(summary, out, version); } }; - +#endif +public: /** * Streaming requests */ - public final Collection requests = new ArrayList<>(); + std::vector requests; /** * Summaries of streaming out */ - public final Collection summaries = new ArrayList<>(); + std::vector summaries; +#if 0 public PrepareMessage() { super(Type.PREPARE); diff --git a/streaming/messages/stream_init_message.hh b/streaming/messages/stream_init_message.hh index 60bf1a6c30..9f0971d953 100644 --- a/streaming/messages/stream_init_message.hh +++ b/streaming/messages/stream_init_message.hh @@ -21,38 +21,44 @@ #pragma once +#include "core/sstring.hh" #include "bytes.hh" +#include "gms/inet_address.hh" +#include "utils/UUID.hh" namespace streaming { namespace messages { + /** * StreamInitMessage is first sent from the node where {@link org.apache.cassandra.streaming.StreamSession} is started, * to initiate corresponding {@link org.apache.cassandra.streaming.StreamSession} on the other side. */ class stream_init_message { -#if 0 - public static IVersionedSerializer serializer = new StreamInitMessageSerializer(); - - public final InetAddress from; - public final int sessionIndex; - public final UUID planId; - public final String description; +public: + using inet_address = gms::inet_address; + using UUID = utils::UUID; + inet_address from; + int session_index; + UUID plan_id; + sstring description; // true if this init message is to connect for outgoing message on receiving side - public final boolean isForOutgoing; - public final boolean keepSSTableLevel; + bool is_for_outgoing; + bool keep_ss_table_level; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel) - { - this.from = from; - this.sessionIndex = sessionIndex; - this.planId = planId; - this.description = description; - this.isForOutgoing = isForOutgoing; - this.keepSSTableLevel = keepSSTableLevel; + stream_init_message() = default; + + stream_init_message(inet_address _from, int _session_index, UUID _plan_id, sstring _description, bool _is_for_outgoing, bool _keep_ss_table_level) + : from(_from) + , session_index(_session_index) + , plan_id(_plan_id) + , description(_description) + , is_for_outgoing(_is_for_outgoing) + , keep_ss_table_level(-keep_ss_table_level) { } +#if 0 /** * Create serialized message. * diff --git a/streaming/stream_receive_task.cc b/streaming/stream_receive_task.cc new file mode 100644 index 0000000000..c5c1ef7e6d --- /dev/null +++ b/streaming/stream_receive_task.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_receive_task.hh" diff --git a/streaming/stream_receive_task.hh b/streaming/stream_receive_task.hh new file mode 100644 index 0000000000..4c6e3330ce --- /dev/null +++ b/streaming/stream_receive_task.hh @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#pragma once + +#include "utils/UUID.hh" +#include "streaming/stream_session.hh" +#include "streaming/stream_task.hh" +#include "streaming/messages/outgoing_file_message.hh" + +namespace streaming { + +/** + * Task that manages receiving files for the session for certain ColumnFamily. + */ +class stream_receive_task : public stream_task { +private: + // number of files to receive + int total_files; + // total size of files to receive + long total_size; + + // true if task is done (either completed or aborted) + bool done = false; + + // holds references to SSTables received + // protected Collection sstables; +public: + stream_receive_task(stream_session& _session, UUID _cf_id, int _total_files, long _total_size) + : stream_task(_session, _cf_id) + , total_files(_total_files) + , total_size(_total_size) { + } + +#if 0 + /** + * Process received file. + * + * @param sstable SSTable file received. + */ + public synchronized void received(SSTableWriter sstable) + { + if (done) + return; + + assert cfId.equals(sstable.metadata.cfId); + + sstables.add(sstable); + if (sstables.size() == totalFiles) + { + done = true; + executor.submit(new OnCompletionRunnable(this)); + } + } + + public int getTotalNumberOfFiles() + { + return totalFiles; + } + + public long getTotalSize() + { + return totalSize; + } + + private static class OnCompletionRunnable implements Runnable + { + private final StreamReceiveTask task; + + public OnCompletionRunnable(StreamReceiveTask task) + { + this.task = task; + } + + public void run() + { + Pair kscf = Schema.instance.getCF(task.cfId); + if (kscf == null) + { + // schema was dropped during streaming + for (SSTableWriter writer : task.sstables) + writer.abort(); + task.sstables.clear(); + return; + } + ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256); + if (lockfiledir == null) + throw new IOError(new IOException("All disks full")); + StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); + lockfile.create(task.sstables); + List readers = new ArrayList<>(); + for (SSTableWriter writer : task.sstables) + readers.add(writer.closeAndOpenReader()); + lockfile.delete(); + task.sstables.clear(); + + if (!SSTableReader.acquireReferences(readers)) + throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred"); + try + { + // add sstables and build secondary indexes + cfs.addSSTables(readers); + cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); + } + finally + { + SSTableReader.releaseReferences(readers); + } + + task.session.taskCompleted(task); + } + } + + /** + * Abort this task. + * If the task already received all files and + * {@link org.apache.cassandra.streaming.StreamReceiveTask.OnCompletionRunnable} task is submitted, + * then task cannot be aborted. + */ + public synchronized void abort() + { + if (done) + return; + + done = true; + for (SSTableWriter writer : sstables) + writer.abort(); + sstables.clear(); + } +#endif +}; + +} // namespace streaming diff --git a/streaming/stream_request.cc b/streaming/stream_request.cc new file mode 100644 index 0000000000..73579a318d --- /dev/null +++ b/streaming/stream_request.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_request.hh" diff --git a/streaming/stream_request.hh b/streaming/stream_request.hh new file mode 100644 index 0000000000..ed3305b1a2 --- /dev/null +++ b/streaming/stream_request.hh @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#pragma once + +#include "core/sstring.hh" +#include "query-request.hh" +#include "dht/i_partitioner.hh" +#include + +namespace streaming { + +class stream_request { +public: + using token = dht::token; + sstring keyspace; + std::vector> ranges; + std::vector column_families; + long repaired_at; + stream_request(sstring _keyspace, std::vector> _ranges, std::vector _column_families, long _repaired_at) + : keyspace(std::move(_keyspace)) + , ranges(std::move(_ranges)) + , column_families(std::move(_column_families)) + , repaired_at(_repaired_at) { + } + +#if 0 + public static class StreamRequestSerializer implements IVersionedSerializer + { + public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException + { + out.writeUTF(request.keyspace); + out.writeLong(request.repairedAt); + out.writeInt(request.ranges.size()); + for (Range range : request.ranges) + { + Token.serializer.serialize(range.left, out); + Token.serializer.serialize(range.right, out); + } + out.writeInt(request.columnFamilies.size()); + for (sstring cf : request.columnFamilies) + out.writeUTF(cf); + } + + public StreamRequest deserialize(DataInput in, int version) throws IOException + { + sstring keyspace = in.readUTF(); + long repairedAt = in.readLong(); + int rangeCount = in.readInt(); + List> ranges = new ArrayList<>(rangeCount); + for (int i = 0; i < rangeCount; i++) + { + Token left = Token.serializer.deserialize(in); + Token right = Token.serializer.deserialize(in); + ranges.add(new Range<>(left, right)); + } + int cfCount = in.readInt(); + List columnFamilies = new ArrayList<>(cfCount); + for (int i = 0; i < cfCount; i++) + columnFamilies.add(in.readUTF()); + return new StreamRequest(keyspace, ranges, columnFamilies, repairedAt); + } + + public long serializedSize(StreamRequest request, int version) + { + int size = TypeSizes.NATIVE.sizeof(request.keyspace); + size += TypeSizes.NATIVE.sizeof(request.repairedAt); + size += TypeSizes.NATIVE.sizeof(request.ranges.size()); + for (Range range : request.ranges) + { + size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE); + size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE); + } + size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size()); + for (sstring cf : request.columnFamilies) + size += TypeSizes.NATIVE.sizeof(cf); + return size; + } + } +#endif +}; + +} // namespace streaming diff --git a/streaming/stream_summary.cc b/streaming/stream_summary.cc new file mode 100644 index 0000000000..513b65cf36 --- /dev/null +++ b/streaming/stream_summary.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_summary.hh" diff --git a/streaming/stream_summary.hh b/streaming/stream_summary.hh new file mode 100644 index 0000000000..672d0a88f6 --- /dev/null +++ b/streaming/stream_summary.hh @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#pragma once + +#include "utils/UUID.hh" + +namespace streaming { + +/** + * Summary of streaming. + */ +class stream_summary { +public: + using UUID = utils::UUID; + UUID cf_id; + + /** + * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. + */ + int files; + long total_size; + + stream_summary(UUID _cf_id, int _files, long _total_size) + : cf_id (_cf_id) + , files(_files) + , total_size(_total_size) { + } + +#if 0 + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StreamSummary summary = (StreamSummary) o; + return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId); + } + + @Override + public int hashCode() + { + return Objects.hashCode(cfId, files, totalSize); + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder("StreamSummary{"); + sb.append("path=").append(cfId); + sb.append(", files=").append(files); + sb.append(", totalSize=").append(totalSize); + sb.append('}'); + return sb.toString(); + } + + public static class StreamSummarySerializer implements IVersionedSerializer + { + // arbitrary version is fine for UUIDSerializer for now... + public void serialize(StreamSummary summary, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version); + out.writeInt(summary.files); + out.writeLong(summary.totalSize); + } + + public StreamSummary deserialize(DataInput in, int version) throws IOException + { + UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); + int files = in.readInt(); + long totalSize = in.readLong(); + return new StreamSummary(cfId, files, totalSize); + } + + public long serializedSize(StreamSummary summary, int version) + { + long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version); + size += TypeSizes.NATIVE.sizeof(summary.files); + size += TypeSizes.NATIVE.sizeof(summary.totalSize); + return size; + } + } +#endif +}; + +} // namespace streaming diff --git a/streaming/stream_task.hh b/streaming/stream_task.hh new file mode 100644 index 0000000000..00d1473f61 --- /dev/null +++ b/streaming/stream_task.hh @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#pragma once + +#include "utils/UUID.hh" +#include "streaming/stream_session.hh" +#include "streaming/stream_summary.hh" + +namespace streaming { +/** + * StreamTask is an abstraction of the streaming task performed over specific ColumnFamily. + */ +class stream_task { +protected: + using UUID = utils::UUID; + /** StreamSession that this task belongs */ + stream_session& session; + + UUID cf_id; + + stream_task(stream_session& _session, UUID _cf_id) + : session(_session) + , cf_id(std::move(_cf_id)) { + } + +public: + /** + * @return total number of files this task receives/streams. + */ + virtual int get_total_number_of_files() = 0; + + /** + * @return total bytes expected to receive + */ + virtual long get_total_size() = 0; + + /** + * Abort the task. + * Subclass should implement cleaning up resources. + */ + virtual void abort() = 0; + + /** + * @return StreamSummary that describes this task + */ + virtual stream_summary get_summary() { + return stream_summary(this->cf_id, this->get_total_number_of_files(), this->get_total_size()); + } +}; + +} // namespace streaming diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc new file mode 100644 index 0000000000..4bc7ce1671 --- /dev/null +++ b/streaming/stream_transfer_task.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#include "streaming/stream_transfer_task.hh" diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh new file mode 100644 index 0000000000..8c8f12b583 --- /dev/null +++ b/streaming/stream_transfer_task.hh @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Cloudius Systems. + * Copyright 2015 Cloudius Systems. + */ + +#pragma once + +#include "utils/UUID.hh" +#include "streaming/stream_session.hh" +#include "streaming/stream_task.hh" +#include "streaming/messages/outgoing_file_message.hh" +#include + +namespace streaming { + +/** + * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. + */ +class stream_transfer_task : public stream_task { +private: + //final AtomicInteger sequenceNumber = new AtomicInteger(0); + bool aborted = false; + + std::map files; + //final Map timeoutTasks = new HashMap<>(); + + long total_size; +public: + using UUID = utils::UUID; + stream_transfer_task(stream_session& session, UUID cf_id) + : stream_task(session, cf_id) { + } + +#if 0 + public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List> sections, long repairedAt) + { + assert sstable != null && cfId.equals(sstable.metadata.cfId); + OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); + files.put(message.header.sequenceNumber, message); + totalSize += message.header.size(); + } + + /** + * Received ACK for file at {@code sequenceNumber}. + * + * @param sequenceNumber sequence number of file + */ + public void complete(int sequenceNumber) + { + boolean signalComplete; + synchronized (this) + { + ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber); + if (timeout != null) + timeout.cancel(false); + + OutgoingFileMessage file = files.remove(sequenceNumber); + if (file != null) + file.sstable.releaseReference(); + + signalComplete = files.isEmpty(); + } + + // all file sent, notify session this task is complete. + if (signalComplete) + session.taskCompleted(this); + } + + public synchronized void abort() + { + if (aborted) + return; + aborted = true; + + for (ScheduledFuture future : timeoutTasks.values()) + future.cancel(false); + timeoutTasks.clear(); + + for (OutgoingFileMessage file : files.values()) + file.sstable.releaseReference(); + } + + public synchronized int getTotalNumberOfFiles() + { + return files.size(); + } + + public long getTotalSize() + { + return totalSize; + } + + public synchronized Collection getFileMessages() + { + // We may race between queuing all those messages and the completion of the completion of + // the first ones. So copy tthe values to avoid a ConcurrentModificationException + return new ArrayList<>(files.values()); + } + + public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber) + { + // remove previous time out task to be rescheduled later + ScheduledFuture future = timeoutTasks.remove(sequenceNumber); + if (future != null) + future.cancel(false); + return files.get(sequenceNumber); + } + + /** + * Schedule timeout task to release reference for file sent. + * When not receiving ACK after sending to receiver in given time, + * the task will release reference. + * + * @param sequenceNumber sequence number of file sent. + * @param time time to timeout + * @param unit unit of given time + * @return scheduled future for timeout task + */ + public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) + { + if (!files.containsKey(sequenceNumber)) + return null; + + ScheduledFuture future = timeoutExecutor.schedule(new Runnable() + { + public void run() + { + synchronized (StreamTransferTask.this) + { + // remove so we don't cancel ourselves + timeoutTasks.remove(sequenceNumber); + StreamTransferTask.this.complete(sequenceNumber); + } + } + }, time, unit); + + ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future); + assert prev == null; + return future; + } +#endif +}; + +} // namespace streaming diff --git a/streaming/streaming.cc b/streaming/streaming.cc index f6aa71e4b8..a63731ef49 100644 --- a/streaming/streaming.cc +++ b/streaming/streaming.cc @@ -16,3 +16,4 @@ #include "streaming/messages/retry_message.hh" #include "streaming/messages/session_failed_message.hh" #include "streaming/stream_session.hh" +#include "streaming/stream_task.hh"