Merge "Drop mutations that raced with truncate" from Duarte

Instead of retrying, just drop mutations that raced with a truncate.

* git@github.com:duarten/scylla.git truncate-reorder/v1:
  database: Rename replay_position_reordered_exception
  database: Drop mutations that raced with truncate

(cherry picked from commit 63caa58b70)
This commit is contained in:
Tomasz Grabiec
2017-07-18 12:53:36 +02:00
parent 0291a4491e
commit f4d3e5cdcf
2 changed files with 15 additions and 28 deletions

View File

@@ -3055,7 +3055,7 @@ void column_family::apply_streaming_big_mutation(schema_ptr m_schema, utils::UUI
void
column_family::check_valid_rp(const db::replay_position& rp) const {
if (rp != db::replay_position() && rp < _lowest_allowed_rp) {
throw replay_position_reordered_exception();
throw mutation_reordered_with_truncate_exception();
}
}
@@ -3201,25 +3201,24 @@ future<mutation> database::apply_counter_update(schema_ptr s, const frozen_mutat
}
}
static future<> maybe_handle_reorder(std::exception_ptr exp) {
try {
std::rethrow_exception(exp);
return make_exception_future(exp);
} catch (mutation_reordered_with_truncate_exception&) {
// This mutation raced with a truncate, so we can just drop it.
dblog.debug("replay_position reordering detected");
return make_ready_future<>();
}
}
future<> database::apply_with_commitlog(column_family& cf, const mutation& m, timeout_clock::time_point timeout) {
if (cf.commitlog() != nullptr) {
return do_with(freeze(m), [this, &m, &cf, timeout] (frozen_mutation& fm) {
commitlog_entry_writer cew(m.schema(), fm);
return cf.commitlog()->add_entry(m.schema()->id(), cew, timeout);
}).then([this, &m, &cf, timeout] (db::rp_handle h) {
return apply_in_memory(m, cf, std::move(h), timeout).handle_exception([this, &cf, &m, timeout] (auto ep) {
try {
std::rethrow_exception(ep);
} catch (replay_position_reordered_exception&) {
// expensive, but we're assuming this is super rare.
// if we failed to apply the mutation due to future re-ordering
// (which should be the ever only reason for rp mismatch in CF)
// let's just try again, add the mutation to the CL once more,
// and assume success in inevitable eventually.
dblog.debug("replay_position reordering detected");
return this->apply_with_commitlog(cf, m, timeout);
}
});
return apply_in_memory(m, cf, std::move(h), timeout).handle_exception(maybe_handle_reorder);
});
}
return apply_in_memory(m, cf, {}, timeout);
@@ -3230,19 +3229,7 @@ future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::
if (cl != nullptr) {
commitlog_entry_writer cew(s, m);
return cf.commitlog()->add_entry(uuid, cew, timeout).then([&m, this, s, timeout, cl](db::rp_handle h) {
return this->apply_in_memory(m, s, std::move(h), timeout).handle_exception([this, s, &m, timeout] (auto ep) {
try {
std::rethrow_exception(ep);
} catch (replay_position_reordered_exception&) {
// expensive, but we're assuming this is super rare.
// if we failed to apply the mutation due to future re-ordering
// (which should be the ever only reason for rp mismatch in CF)
// let's just try again, add the mutation to the CL once more,
// and assume success in inevitable eventually.
dblog.debug("replay_position reordering detected");
return this->apply(s, m, timeout);
}
});
return this->apply_in_memory(m, s, std::move(h), timeout).handle_exception(maybe_handle_reorder);
});
}
return apply_in_memory(m, std::move(s), {}, timeout);

View File

@@ -116,7 +116,7 @@ void make(database& db, bool durable, bool volatile_testing_only);
}
}
class replay_position_reordered_exception : public std::exception {};
class mutation_reordered_with_truncate_exception : public std::exception {};
using shared_memtable = lw_shared_ptr<memtable>;
class memtable_list;