From ec12b75426daae3d2f7269c430ed43da37a8941b Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 19 Jan 2016 13:39:00 +0100 Subject: [PATCH] batchlog_manager: Store canonical_mutations We need to be able to replay mutations created using older versions of the table's schema. frozen_mutation can be only read using the version it was serialized with, and there is no guarantee that the node will know this version at the time of replay. Currently versions kept in-memory so a node forgets all past versions when it restarts. To solve this, let's store canonical_mutations which, like data in sstables, can be read using any later schema version of given table. --- db/batchlog_manager.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 558385d1ba..5f3d815ce3 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -45,6 +45,7 @@ #include #include "batchlog_manager.hh" +#include "canonical_mutation.hh" #include "service/storage_service.hh" #include "service/storage_proxy.hh" #include "system_keyspace.hh" @@ -117,14 +118,14 @@ mutation db::batchlog_manager::get_batch_log_mutation_for(const std::vector fm(mutations.begin(), mutations.end()); + std::vector fm(mutations.begin(), mutations.end()); const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) { - return s + serializer{m}.size(); + return s + serializer{m}.size(); }); bytes buf(bytes::initialized_later(), size); data_output out(buf); for (auto& m : fm) { - serializer{m}(out); + serializer{m}(out); } return buf; }(); @@ -166,10 +167,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() { logger.debug("Replaying batch {}", id); - auto fms = make_lw_shared>(); + auto fms = make_lw_shared>(); data_input in(data); while (in.has_next()) { - fms->emplace_back(serializer::read(in)); + fms->emplace_back(serializer::read(in)); } auto mutations = make_lw_shared>(); @@ -181,11 +182,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() { } auto& fm = fms->front(); auto mid = fm.column_family_id(); - return system_keyspace::get_truncated_at(mid).then([this, &fm, written_at, mutations](db_clock::time_point t) { - warn(unimplemented::cause::SCHEMA_CHANGE); - auto schema = local_schema_registry().get(fm.schema_version()); + return system_keyspace::get_truncated_at(mid).then([this, mid, &fm, written_at, mutations](db_clock::time_point t) { + schema_ptr s = _qp.db().local().find_schema(mid); if (written_at > t) { - mutations->emplace_back(fm.unfreeze(schema)); + mutations->emplace_back(fm.to_mutation(s)); } }).then([fms] { fms->pop_front();