streaming: Drop flush_tables option for add_transfer_ranges

We do not stream sstable files. No need to flush it.
This commit is contained in:
Asias He
2016-01-29 14:46:05 +08:00
parent aa69d5ffb2
commit ed3da7b04c
4 changed files with 4 additions and 23 deletions

View File

@@ -62,7 +62,7 @@ stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_session(to);
session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families), _flush_before_transfer);
session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families));
return *this;
}
@@ -75,11 +75,6 @@ future<stream_state> stream_plan::execute() {
return stream_result_future::init_sending_side(_plan_id, _description, _handlers, _coordinator);
}
stream_plan& stream_plan::flush_before_transfer(bool flush_before_transfer_) {
_flush_before_transfer = flush_before_transfer_;
return *this;
}
stream_plan& stream_plan::listeners(std::vector<stream_event_handler*> handlers) {
std::copy(handlers.begin(), handlers.end(), std::back_inserter(_handlers));
return *this;

View File

@@ -67,8 +67,6 @@ private:
sstring _description;
std::vector<stream_event_handler*> _handlers;
shared_ptr<stream_coordinator> _coordinator;
bool _flush_before_transfer = true;
bool _range_added = false;
public:
@@ -157,15 +155,6 @@ public:
* @return Future {@link StreamState} that you can use to listen on progress of streaming.
*/
future<stream_state> execute();
/**
* Set flushBeforeTransfer option.
* When it's true, will flush before streaming ranges. (Default: true)
*
* @param flushBeforeTransfer set to true when the node should flush before transfer
* @return this object for chaining
*/
stream_plan& flush_before_transfer(bool flush_before_transfer_);
};
} // namespace streaming

View File

@@ -343,7 +343,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
throw std::runtime_error(err);
}
}
add_transfer_ranges(request.keyspace, request.ranges, request.column_families, true);
add_transfer_ranges(request.keyspace, request.ranges, request.column_families);
}
for (auto& summary : summaries) {
sslog.debug("[Stream #{}] prepare stream_summary={}", plan_id, summary);
@@ -544,12 +544,9 @@ std::vector<column_family*> stream_session::get_column_family_stores(const sstri
return stores;
}
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families, bool flush_tables) {
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
std::vector<stream_detail> stream_details;
auto cfs = get_column_family_stores(keyspace, column_families);
if (flush_tables) {
// FIXME: flushSSTables(stores);
}
for (auto& cf : cfs) {
std::vector<mutation_reader> readers;
auto cf_id = cf->schema()->id();

View File

@@ -257,7 +257,7 @@ public:
* @param flushTables flush tables?
* @param repairedAt the time the repair started.
*/
void add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families, bool flush_tables);
void add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
std::vector<column_family*> get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families);