Database/commitlog: guard against replay position reordering

Commit log guarantees that once an RP is assigned to a data frame/caller, it
will not block before returning the result via future. However, this is not
enough, since we could
a.) Have blocked earlier, in which case the return value processing will be
async anyway
b.) Even if no blocking takes place, future chaining mechanism could decide
it has to reorder execution.

Assuming though that the case where this happens is rare, and cases where it
actually affects the rule of replay position ordering is even rarer, we can
guard against it by simply keeping track of the highest RP _discarded_ (sent
to sstable flush), and if we attempt to apply a mutation with a higher RP,
simply re-do the operation (i.e. write same entry to commit log again).

Signed-off-by: Calle Wilund <calle@cloudius-systems.com>
This commit is contained in:
Calle Wilund
2015-06-10 09:43:14 +02:00
committed by Avi Kivity
parent 4c4e90a948
commit 8b9a63a3c6
2 changed files with 27 additions and 1 deletions

View File

@@ -415,6 +415,8 @@ column_family::seal_active_memtable(database* db) {
return;
}
add_memtable();
assert(_highest_flushed_rp < old->replay_position());
_highest_flushed_rp = old->replay_position();
// FIXME: better way of ensuring we don't attemt to
// overwrite an existing table.
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
@@ -422,6 +424,7 @@ column_family::seal_active_memtable(database* db) {
_config.datadir,
_schema->ks_name(), _schema->cf_name(),
gen);
// FIXME: this does not clear CL. Should it?
if (!_config.enable_disk_writes) {
return;
}
@@ -978,7 +981,17 @@ future<> database::apply(const frozen_mutation& m) {
bytes_view repr = m.representation();
auto write_repr = [repr] (data_output& out) { out.write(repr.begin(), repr.end()); };
return _commitlog->add_mutation(uuid, repr.size(), write_repr).then([&m, this](auto rp) {
return this->apply_in_memory(m, rp);
try {
return this->apply_in_memory(m, rp);
} catch (replay_position_reordered_exception&) {
// expensive, but we're assuming this is super rare.
// if we failed to apply the mutation due to future re-ordering
// (which should be the ever only reason for rp mismatch in CF)
// let's just try again, add the mutation to the CL once more,
// and assume success in inevitable eventually.
dblog.warn("replay_position reordering detected");
return this->apply(m);
}
});
}
return apply_in_memory(m, db::replay_position());

View File

@@ -58,6 +58,8 @@ class commitlog;
class config;
}
class replay_position_reordered_exception : public std::exception {};
class column_family {
public:
struct config {
@@ -75,6 +77,7 @@ private:
lw_shared_ptr<sstable_list> _sstables;
unsigned _sstable_generation = 1;
unsigned _mutation_count = 0;
db::replay_position _highest_flushed_rp;
private:
void add_sstable(sstables::sstable&& sstable);
void add_memtable();
@@ -111,6 +114,7 @@ private:
future<bool> for_all_partitions(Func&& func) const;
future<> probe_file(sstring sstdir, sstring fname);
void seal_on_overflow(database*);
void check_valid_rp(const db::replay_position&) const;
public:
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
@@ -334,9 +338,18 @@ column_family::seal_on_overflow(database* db) {
}
}
inline
void
column_family::check_valid_rp(const db::replay_position& rp) const {
if (rp < _highest_flushed_rp) {
throw replay_position_reordered_exception();
}
}
inline
void
column_family::apply(const frozen_mutation& m, const db::replay_position& rp, database* db) {
check_valid_rp(rp);
active_memtable().apply(m, rp);
seal_on_overflow(db);
}