mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-09 08:23:29 +00:00
Merge "Unify logged and unlogged batch write code paths" from Gleb
"Logic duplication bothered me for a long time. This series consolidates both cases as much as possible."
This commit is contained in:
@@ -250,7 +250,8 @@ abstract_write_response_handler& storage_proxy::get_write_response_handler(stora
|
||||
return *_response_handlers.find(id)->second.handler;
|
||||
}
|
||||
|
||||
storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints)
|
||||
storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation,
|
||||
std::unordered_set<gms::inet_address> targets, const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints)
|
||||
{
|
||||
std::unique_ptr<abstract_write_response_handler> h;
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
@@ -258,7 +259,6 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(key
|
||||
|
||||
auto m = make_lw_shared<const frozen_mutation>(std::move(mutation));
|
||||
|
||||
// for now make is simple
|
||||
if (db::is_datacenter_local(cl)) {
|
||||
pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local);
|
||||
h = std::make_unique<datacenter_write_response_handler>(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints));
|
||||
@@ -720,7 +720,7 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_
|
||||
|
||||
db::assure_sufficient_live_nodes(cl, ks, live_endpoints);
|
||||
|
||||
return create_write_response_handler(ks, cl, type, freeze(m), std::move(live_endpoints), pending_endpoints, dead_endpoints);
|
||||
return create_write_response_handler(ks, cl, type, freeze(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -735,6 +735,73 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level
|
||||
}
|
||||
}
|
||||
|
||||
template<typename Range, typename CreateWriteHandler>
|
||||
future<std::vector<storage_proxy::response_id_type>> storage_proxy::mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler create_handler) {
|
||||
std::vector<response_id_type> ids;
|
||||
|
||||
try {
|
||||
ids.reserve(mutations.size());
|
||||
for (auto& m : mutations) {
|
||||
ids.emplace_back(create_handler(m, cl, type));
|
||||
}
|
||||
return make_ready_future<std::vector<response_id_type>>(std::move(ids));
|
||||
} catch(...) {
|
||||
boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, this, std::placeholders::_1));
|
||||
return make_exception_future<std::vector<response_id_type>>(std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
future<std::vector<storage_proxy::response_id_type>> storage_proxy::mutate_prepare(std::vector<mutation>& mutations, db::consistency_level cl, db::write_type type) {
|
||||
return mutate_prepare<>(mutations, cl, type, [this] (const mutation& m, db::consistency_level cl, db::write_type type) {
|
||||
return create_write_response_handler(m, cl, type);
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_proxy::mutate_begin(std::vector<storage_proxy::response_id_type> ids, db::consistency_level cl, const sstring& local_dc) {
|
||||
return parallel_for_each(ids, [this, cl, &local_dc] (storage_proxy::response_id_type response_id) {
|
||||
// it is better to send first and hint afterwards to reduce latency
|
||||
// but request may complete before hint_to_dead_endpoints() is called and
|
||||
// response_id handler will be removed, so we will have to do hint with separate
|
||||
// frozen_mutation copy, or manage handler live time differently.
|
||||
hint_to_dead_endpoints(response_id, cl);
|
||||
|
||||
// call before send_to_live_endpoints() for the same reason as above
|
||||
auto f = response_wait(response_id);
|
||||
send_to_live_endpoints(response_id, local_dc);
|
||||
return f.handle_exception([this, response_id] (std::exception_ptr exp) {
|
||||
remove_response_handler(response_id); // cancel expire_timer, so no hint will happen
|
||||
return make_exception_future<>(exp);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// this function should be called with a future that holds result of mutation attempt (usually
|
||||
// future returned by mutate_begin()). The future should be ready when function is called.
|
||||
future<> storage_proxy::mutate_end(future<> mutate_result, utils::latency_counter lc) {
|
||||
assert(mutate_result.available());
|
||||
_stats.write.mark(lc.stop().latency_in_nano());
|
||||
try {
|
||||
mutate_result.get();
|
||||
return make_ready_future<>();
|
||||
} catch (no_such_keyspace& ex) {
|
||||
logger.trace("Write to non existing keyspace: {}", ex.what());
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch(mutation_write_timeout_exception& ex) {
|
||||
// timeout
|
||||
logger.trace("Write timeout; received {} of {} required replies", ex.received, ex.block_for);
|
||||
_stats.write_timeouts++;
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.write_unavailables++;
|
||||
logger.trace("Unavailable");
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch(overloaded_exception& ex) {
|
||||
_stats.write_unavailables++;
|
||||
logger.trace("Overloaded");
|
||||
return make_exception_future<>(std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method to have these Mutations applied
|
||||
* across all replicas. This method will take care
|
||||
@@ -746,51 +813,17 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level
|
||||
*/
|
||||
future<>
|
||||
storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl) {
|
||||
auto local_addr = utils::fb_utilities::get_broadcast_address();
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
sstring local_dc = snitch_ptr->get_datacenter(local_addr);
|
||||
auto type = mutations.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH;
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
return parallel_for_each(mutations, [this, cl, type, &local_dc] (mutation& m) {
|
||||
storage_proxy::response_id_type response_id = create_write_response_handler(m, cl, type);
|
||||
// it is better to send first and hint afterwards to reduce latency
|
||||
// but request may complete before hint_to_dead_endpoints() is called and
|
||||
// response_id handler will be removed, so we will have to do hint with separate
|
||||
// frozen_mutation copy, or manage handler live time differently.
|
||||
hint_to_dead_endpoints(response_id, cl);
|
||||
|
||||
// call before send_to_live_endpoints() for the same reason as above
|
||||
auto f = response_wait(response_id);
|
||||
send_to_live_endpoints(response_id, local_dc);
|
||||
return f.handle_exception([this, response_id, cl] (std::exception_ptr exp) {
|
||||
remove_response_handler(response_id); // cancel expire_timer, so no hint will happen
|
||||
//std::rethrow_exception(ex);
|
||||
return make_exception_future<>(exp);
|
||||
});
|
||||
}).then_wrapped([this, p = shared_from_this(), lc] (future<>&& f) mutable {
|
||||
_stats.write.mark(lc.stop().latency_in_nano());
|
||||
try {
|
||||
f.get();
|
||||
return make_ready_future<>();
|
||||
} catch (no_such_keyspace& ex) {
|
||||
logger.trace("Write to non existing keyspace: {}", ex.what());
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch(mutation_write_timeout_exception& ex) {
|
||||
// timeout
|
||||
logger.trace("Write timeout; received {} of {} required replies", ex.received, ex.block_for);
|
||||
_stats.write_timeouts++;
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.write_unavailables++;
|
||||
logger.trace("Unavailable");
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch(overloaded_exception& ex) {
|
||||
_stats.write_unavailables++;
|
||||
logger.trace("Overloaded");
|
||||
return make_exception_future<>(std::current_exception());
|
||||
}
|
||||
return mutate_prepare(mutations, cl, type).then([this, cl] (std::vector<storage_proxy::response_id_type> ids) {
|
||||
auto local_addr = utils::fb_utilities::get_broadcast_address();
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
sstring local_dc = snitch_ptr->get_datacenter(local_addr);
|
||||
return mutate_begin(std::move(ids), cl, local_dc);
|
||||
}).then_wrapped([p = shared_from_this(), lc] (future<> f) {
|
||||
return p->mutate_end(std::move(f), lc);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -829,7 +862,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
lc.start();
|
||||
|
||||
class context {
|
||||
storage_proxy & _p;
|
||||
storage_proxy& _p;
|
||||
std::vector<mutation> _mutations;
|
||||
db::consistency_level _cl;
|
||||
|
||||
@@ -862,18 +895,18 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
return chosen_endpoints;
|
||||
}()) {
|
||||
}
|
||||
|
||||
future<> send_batchlog_mutation(mutation m) {
|
||||
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, _cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) {
|
||||
auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name());
|
||||
return _p.create_write_response_handler(ks, cl, type, freeze(m), _batchlog_endpoints, {}, {});
|
||||
}).then([this] (std::vector<response_id_type> ids) {
|
||||
return _p.mutate_begin(std::move(ids), _cl, _local_dc);
|
||||
});
|
||||
}
|
||||
future<> sync_write_to_batchlog() {
|
||||
auto m = db::get_batchlog_manager().local().get_batch_log_mutation_for(_mutations, _batch_uuid, net::messaging_service::current_version);
|
||||
auto h = std::make_unique<write_response_handler>(_p._db.local().find_keyspace(db::system_keyspace::NAME), db::consistency_level::ONE, db::write_type::BATCH_LOG, make_lw_shared<frozen_mutation>(freeze(m)), _batchlog_endpoints);
|
||||
response_id_type response_id = _p.register_response_handler(std::move(h));
|
||||
|
||||
auto f = _p.response_wait(response_id);
|
||||
_p.send_to_live_endpoints(response_id, _local_dc);
|
||||
return f.handle_exception([this, response_id](auto ex) {
|
||||
_p.remove_response_handler(response_id); // cancel expire_timer, so no hint will happen
|
||||
//return make_exception_future(ex);
|
||||
std::rethrow_exception(ex);
|
||||
});
|
||||
return send_batchlog_mutation(std::move(m));
|
||||
};
|
||||
void async_remove_from_batchlog() {
|
||||
// delete batch
|
||||
@@ -883,70 +916,38 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
mutation m(key, schema);
|
||||
m.partition().apply_delete(*schema, {}, tombstone(now, gc_clock::now()));
|
||||
|
||||
auto h = std::make_unique<write_response_handler>(_p._db.local().find_keyspace(db::system_keyspace::NAME), db::consistency_level::ONE, db::write_type::BATCH_LOG, make_lw_shared<frozen_mutation>(freeze(m)), _batchlog_endpoints);
|
||||
auto response_id = _p.register_response_handler(std::move(h));
|
||||
|
||||
auto f = _p.response_wait(response_id);
|
||||
_p.send_to_live_endpoints(response_id, _local_dc);
|
||||
f.handle_exception([&p = _p, response_id](std::exception_ptr ex) {
|
||||
p.remove_response_handler(response_id); // cancel expire_timer, so no hint will happen
|
||||
std::rethrow_exception(ex);
|
||||
});
|
||||
send_batchlog_mutation(std::move(m));
|
||||
};
|
||||
|
||||
future<> run() {
|
||||
std::vector<response_id_type> ids;
|
||||
|
||||
ids.reserve(_mutations.size());
|
||||
try {
|
||||
for (auto& m : _mutations) {
|
||||
ids.emplace_back(_p.create_write_response_handler(m, _cl, db::write_type::BATCH));
|
||||
}
|
||||
} catch(...) {
|
||||
boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, &_p, std::placeholders::_1));
|
||||
throw;
|
||||
}
|
||||
|
||||
return sync_write_to_batchlog().then([this, ids = std::move(ids)] {
|
||||
return parallel_for_each(ids.begin(), ids.end(), [this](auto response_id) {
|
||||
// it is better to send first and hint afterwards to reduce latency
|
||||
// but request may complete before hint_to_dead_endpoints() is called and
|
||||
// response_id handler will be removed, so we will have to do hint with separate
|
||||
// frozen_mutation copy, or manage handler live time differently.
|
||||
_p.hint_to_dead_endpoints(response_id, _cl);
|
||||
// call before send_to_live_endpoints() for the same reason as above
|
||||
auto f = _p.response_wait(response_id);
|
||||
_p.send_to_live_endpoints(response_id, _local_dc);
|
||||
return f.handle_exception([this, response_id](std::exception_ptr p) {
|
||||
_p.remove_response_handler(response_id); // cancel expire_timer, so no hint will happen
|
||||
try {
|
||||
std::rethrow_exception(p);
|
||||
} catch (mutation_write_timeout_exception& ex) {
|
||||
logger.trace("Write timeout; received {} of {} required replies", ex.received, ex.block_for);
|
||||
throw;
|
||||
}
|
||||
});
|
||||
return _p.mutate_prepare(_mutations, _cl, db::write_type::BATCH).then([this] (std::vector<response_id_type> ids) {
|
||||
return sync_write_to_batchlog().then_wrapped([this, ids = std::move(ids)] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
return _p.mutate_begin(std::move(ids), _cl, _local_dc);
|
||||
} catch(...) {
|
||||
// writing batchlog failed, remove responce handlers that will not be used now
|
||||
boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, &_p, std::placeholders::_1));
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}).finally(std::bind(&context::async_remove_from_batchlog, this));
|
||||
}
|
||||
};
|
||||
try {
|
||||
auto ctxt = make_lw_shared<context>(*this, std::move(mutations), cl);
|
||||
|
||||
return ctxt->run().finally([p = shared_from_this(), lc, ctxt]() mutable {
|
||||
p->_stats.write.mark(lc.stop().latency_in_nano());
|
||||
});
|
||||
} catch (no_such_keyspace& ex) {
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.write_unavailables++;
|
||||
logger.trace("Unavailable");
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch (overloaded_exception& ex) {
|
||||
_stats.write_unavailables++;
|
||||
logger.trace("Overloaded");
|
||||
return make_exception_future<>(std::current_exception());
|
||||
}
|
||||
auto mk_ctxt = [this] (std::vector<mutation> mutations, db::consistency_level cl) {
|
||||
try {
|
||||
return make_ready_future<lw_shared_ptr<context>>(make_lw_shared<context>(*this, std::move(mutations), cl));
|
||||
} catch(...) {
|
||||
return make_exception_future<lw_shared_ptr<context>>(std::current_exception());
|
||||
}
|
||||
};
|
||||
|
||||
return mk_ctxt(std::move(mutations), cl).then([this] (lw_shared_ptr<context> ctxt) {
|
||||
return ctxt->run().finally([ctxt]{});
|
||||
}).then_wrapped([p = shared_from_this(), lc] (future<> f) mutable {
|
||||
return p->mutate_end(std::move(f), lc);
|
||||
});
|
||||
}
|
||||
|
||||
bool storage_proxy::cannot_hint(gms::inet_address target) {
|
||||
|
||||
@@ -97,7 +97,8 @@ private:
|
||||
void got_response(response_id_type id, gms::inet_address from);
|
||||
future<> response_wait(response_id_type id);
|
||||
abstract_write_response_handler& get_write_response_handler(storage_proxy::response_id_type id);
|
||||
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address>);
|
||||
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set<gms::inet_address> targets,
|
||||
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address>);
|
||||
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type);
|
||||
future<> send_to_live_endpoints(response_id_type response_id, sstring local_data_center);
|
||||
template<typename Range>
|
||||
@@ -124,6 +125,12 @@ private:
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
std::vector<query::partition_range>&& partition_ranges,
|
||||
db::consistency_level cl);
|
||||
template<typename Range, typename CreateWriteHandler>
|
||||
future<std::vector<storage_proxy::response_id_type>> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler);
|
||||
future<std::vector<storage_proxy::response_id_type>> mutate_prepare(std::vector<mutation>& mutations, db::consistency_level cl, db::write_type type);
|
||||
future<> mutate_begin(const std::vector<storage_proxy::response_id_type> ids, db::consistency_level cl, const sstring& local_dc);
|
||||
future<> mutate_end(future<> mutate_result, utils::latency_counter);
|
||||
|
||||
public:
|
||||
storage_proxy(distributed<database>& db);
|
||||
~storage_proxy();
|
||||
|
||||
Reference in New Issue
Block a user