diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 558385d1ba..5f3d815ce3 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -45,6 +45,7 @@ #include #include "batchlog_manager.hh" +#include "canonical_mutation.hh" #include "service/storage_service.hh" #include "service/storage_proxy.hh" #include "system_keyspace.hh" @@ -117,14 +118,14 @@ mutation db::batchlog_manager::get_batch_log_mutation_for(const std::vector fm(mutations.begin(), mutations.end()); + std::vector fm(mutations.begin(), mutations.end()); const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) { - return s + serializer{m}.size(); + return s + serializer{m}.size(); }); bytes buf(bytes::initialized_later(), size); data_output out(buf); for (auto& m : fm) { - serializer{m}(out); + serializer{m}(out); } return buf; }(); @@ -166,10 +167,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() { logger.debug("Replaying batch {}", id); - auto fms = make_lw_shared>(); + auto fms = make_lw_shared>(); data_input in(data); while (in.has_next()) { - fms->emplace_back(serializer::read(in)); + fms->emplace_back(serializer::read(in)); } auto mutations = make_lw_shared>(); @@ -181,11 +182,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() { } auto& fm = fms->front(); auto mid = fm.column_family_id(); - return system_keyspace::get_truncated_at(mid).then([this, &fm, written_at, mutations](db_clock::time_point t) { - warn(unimplemented::cause::SCHEMA_CHANGE); - auto schema = local_schema_registry().get(fm.schema_version()); + return system_keyspace::get_truncated_at(mid).then([this, mid, &fm, written_at, mutations](db_clock::time_point t) { + schema_ptr s = _qp.db().local().find_schema(mid); if (written_at > t) { - mutations->emplace_back(fm.unfreeze(schema)); + mutations->emplace_back(fm.to_mutation(s)); } }).then([fms] { fms->pop_front();