/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modified by Cloudius Systems * Copyright 2015 Cloudius Systems */ #include #include #include #include #include #include #include #include "commitlog.hh" #include "commitlog_replayer.hh" #include "database.hh" #include "sstables/sstables.hh" #include "db/system_keyspace.hh" #include "db/serializer.hh" #include "cql3/query_processor.hh" #include "log.hh" static logging::logger logger("commitlog_replayer"); class db::commitlog_replayer::impl { public: impl(seastar::sharded& db); future<> init(); struct stats { uint64_t invalid_mutations = 0; uint64_t skipped_mutations = 0; uint64_t applied_mutations = 0; }; future<> process(stats*, temporary_buffer buf, replay_position rp); future recover(sstring file); typedef std::unordered_map rp_map; typedef std::unordered_map shard_rpm_map; typedef std::unordered_map shard_rp_map; seastar::sharded& _qp; shard_rpm_map _rpm; shard_rp_map _min_pos; }; db::commitlog_replayer::impl::impl(seastar::sharded& qp) : _qp(qp) {} future<> db::commitlog_replayer::impl::init() { return _qp.map_reduce([this](shard_rpm_map map) { for (auto& p1 : map) { for (auto& p2 : p1.second) { auto& pp = _rpm[p1.first][p2.first]; pp = std::max(pp, p2.second); auto& min = _min_pos[p1.first]; min = (min == replay_position()) ? p2.second : std::min(p2.second, min); } } }, [this](cql3::query_processor& qp) { return do_with(shard_rpm_map{}, [this, &qp](shard_rpm_map& map) { return parallel_for_each(qp.db().local().get_column_families(), [&map, &qp](auto& cfp) { auto uuid = cfp.first; for (auto& sst : *cfp.second->get_sstables() | boost::adaptors::map_values) { try { auto p = sst->get_stats_metadata().position; logger.trace("sstable {} -> rp {}", sst->get_filename(), p); if (p != replay_position()) { auto& pp = map[p.shard_id()][uuid]; pp = std::max(pp, p); } } catch (...) { logger.warn("Could not read sstable metadata {}", std::current_exception()); } } // TODO: this is not correct. Truncation does not fully take sharding into consideration return db::system_keyspace::get_truncated_position(qp, uuid).then([&map, uuid](auto truncated_rp) { if (truncated_rp != replay_position()) { auto& pp = map[engine().cpu_id()][uuid]; pp = std::max(pp, truncated_rp); } }); }).then([&map] { return make_ready_future(map); }); }); }).finally([this] { for (auto&p : _min_pos) { logger.debug("minimum position for shard {}: {}", p.first, p.second); } for (auto&p1 : _rpm) { for (auto& p2 : p1.second) { logger.debug("replay position for shard/uuid {}/{}: {}", p1.first, p2.first, p2.second); } } }); } future db::commitlog_replayer::impl::recover(sstring file) { logger.info("Replaying {}", file); replay_position rp{commitlog::descriptor(file)}; auto gp = _min_pos[rp.shard_id()]; if (rp.id < gp.id) { logger.debug("skipping replay of fully-flushed {}", file); return make_ready_future(); } position_type p = 0; if (rp.id == gp.id) { p = gp.pos; } auto s = make_lw_shared(); return db::commitlog::read_log_file(file, std::bind(&impl::process, this, s.get(), std::placeholders::_1, std::placeholders::_2), p).then([](auto s) { auto f = s.done(); return f.finally([s = std::move(s)] {}); }).then([s] { return make_ready_future(*s); }); } future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer buf, replay_position rp) { auto shard = rp.shard_id(); if (rp < _min_pos[shard]) { logger.trace("entry {} is less than global min position. skipping", rp); s->skipped_mutations++; return make_ready_future<>(); } try { frozen_mutation fm(bytes(reinterpret_cast(buf.get()), buf.size())); auto uuid = fm.column_family_id(); auto& map = _rpm[shard]; auto i = map.find(uuid); if (i != map.end() && rp < i->second) { logger.trace("entry {} at {} is less than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second); s->skipped_mutations++; return make_ready_future<>(); } auto shard = _qp.local().db().local().shard_of(fm); return _qp.local().db().invoke_on(shard, [fm = std::move(fm), rp, shard, s] (database& db) -> future<> { // TODO: might need better verification that the deserialized mutation // is schema compatible. My guess is that just applying the mutation // will not do this. auto& cf = db.find_column_family(fm.column_family_id()); if (logger.is_enabled(logging::log_level::debug)) { logger.debug("replaying at {} {}:{} at {}", fm.column_family_id(), cf.schema()->ks_name(), cf.schema()->cf_name(), rp); } // Removed forwarding "new" RP. Instead give none/empty. // This is what origin does, and it should be fine. // The end result should be that once sstables are flushed out // their "replay_position" attribute will be empty, which is // lower than anything the new session will produce. cf.apply(fm); s->applied_mutations++; return make_ready_future<>(); }).handle_exception([s](auto ep) { s->invalid_mutations++; // TODO: write mutation to file like origin. logger.warn("error replaying: {}", ep); }); } catch (no_such_column_family&) { // No such CF now? Origin just ignores this. } catch (...) { s->invalid_mutations++; // TODO: write mutation to file like origin. logger.warn("error replaying: {}", std::current_exception()); } return make_ready_future<>(); } db::commitlog_replayer::commitlog_replayer(seastar::sharded& qp) : _impl(std::make_unique(qp)) {} db::commitlog_replayer::commitlog_replayer(commitlog_replayer&& r) : _impl(std::move(r._impl)) {} db::commitlog_replayer::~commitlog_replayer() {} future db::commitlog_replayer::create_replayer(seastar::sharded& qp) { return do_with(commitlog_replayer(qp), [](auto&& rp) { auto f = rp._impl->init(); return f.then([rp = std::move(rp)]() mutable { return make_ready_future(std::move(rp)); }); }); } future<> db::commitlog_replayer::recover(std::vector files) { logger.info("Replaying {}", files); return parallel_for_each(files, [this](auto f) { return this->recover(std::move(f)); }); } future<> db::commitlog_replayer::recover(sstring file) { return _impl->recover(file).then([file](impl::stats stats) { logger.info("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)" , file , stats.applied_mutations , stats.invalid_mutations , stats.skipped_mutations ); }); }