batchlog_manager: Store canonical_mutations

We need to be able to replay mutations created using older versions of
the table's schema. frozen_mutation can be only read using the version
it was serialized with, and there is no guarantee that the node will
know this version at the time of replay. Currently versions kept
in-memory so a node forgets all past versions when it restarts.

To solve this, let's store canonical_mutations which, like data in
sstables, can be read using any later schema version of given table.
This commit is contained in:
Tomasz Grabiec
2016-01-19 13:39:00 +01:00
parent e21049328f
commit ec12b75426

View File

@@ -45,6 +45,7 @@
#include <boost/range/adaptor/sliced.hpp>
#include "batchlog_manager.hh"
#include "canonical_mutation.hh"
#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "system_keyspace.hh"
@@ -117,14 +118,14 @@ mutation db::batchlog_manager::get_batch_log_mutation_for(const std::vector<muta
auto key = partition_key::from_singular(*schema, id);
auto timestamp = api::new_timestamp();
auto data = [this, &mutations] {
std::vector<frozen_mutation> fm(mutations.begin(), mutations.end());
std::vector<canonical_mutation> fm(mutations.begin(), mutations.end());
const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) {
return s + serializer<frozen_mutation>{m}.size();
return s + serializer<canonical_mutation>{m}.size();
});
bytes buf(bytes::initialized_later(), size);
data_output out(buf);
for (auto& m : fm) {
serializer<frozen_mutation>{m}(out);
serializer<canonical_mutation>{m}(out);
}
return buf;
}();
@@ -166,10 +167,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
logger.debug("Replaying batch {}", id);
auto fms = make_lw_shared<std::deque<frozen_mutation>>();
auto fms = make_lw_shared<std::deque<canonical_mutation>>();
data_input in(data);
while (in.has_next()) {
fms->emplace_back(serializer<frozen_mutation>::read(in));
fms->emplace_back(serializer<canonical_mutation>::read(in));
}
auto mutations = make_lw_shared<std::vector<mutation>>();
@@ -181,11 +182,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
}
auto& fm = fms->front();
auto mid = fm.column_family_id();
return system_keyspace::get_truncated_at(mid).then([this, &fm, written_at, mutations](db_clock::time_point t) {
warn(unimplemented::cause::SCHEMA_CHANGE);
auto schema = local_schema_registry().get(fm.schema_version());
return system_keyspace::get_truncated_at(mid).then([this, mid, &fm, written_at, mutations](db_clock::time_point t) {
schema_ptr s = _qp.db().local().find_schema(mid);
if (written_at > t) {
mutations->emplace_back(fm.unfreeze(schema));
mutations->emplace_back(fm.to_mutation(s));
}
}).then([fms] {
fms->pop_front();