From ef2cc9b05d09daa84e90cf5b267bfdbe3bfebfae Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 24 Jun 2015 11:27:37 +0200 Subject: [PATCH] BatchLogManager.java -> C++ Somewhat simplifies version of the Origin code, since from what I can see, there is less need for us to do explicit query sends in the BLM itself, instead we can just go through storage_proxy. I could be wrong though. --- configure.py | 1 + db/BatchlogManager.java | 539 ---------------------------------------- db/batchlog_manager.cc | 240 ++++++++++++++++++ db/batchlog_manager.hh | 86 +++++++ 4 files changed, 327 insertions(+), 539 deletions(-) delete mode 100644 db/BatchlogManager.java create mode 100644 db/batchlog_manager.cc create mode 100644 db/batchlog_manager.hh diff --git a/configure.py b/configure.py index 98f5b35b6f..0148b71aa4 100755 --- a/configure.py +++ b/configure.py @@ -465,6 +465,7 @@ urchin_core = (['database.cc', 'db/config.cc', 'db/index/secondary_index.cc', 'db/marshal/type_parser.cc', + 'db/batchlog_manager.cc', 'io/io.cc', 'utils/utils.cc', 'utils/UUID_gen.cc', diff --git a/db/BatchlogManager.java b/db/BatchlogManager.java deleted file mode 100644 index dd84ac88b9..0000000000 --- a/db/BatchlogManager.java +++ /dev/null @@ -1,539 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInputStream; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; -import com.google.common.util.concurrent.RateLimiter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.marshal.UUIDType; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.WriteFailureException; -import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.WriteResponseHandler; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.WrappedRunnable; -import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; - -public class BatchlogManager implements BatchlogManagerMBean -{ - private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; - private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds - private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. - - private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); - public static final BatchlogManager instance = new BatchlogManager(); - - private final AtomicLong totalBatchesReplayed = new AtomicLong(); - - // Single-thread executor service for scheduling and serializing log replay. - private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); - - public void start() - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws ExecutionException, InterruptedException - { - replayAllFailedBatches(); - } - }; - - batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS); - } - - public static void shutdown() throws InterruptedException - { - batchlogTasks.shutdown(); - batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); - } - - public int countAllBatches() - { - String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG); - return (int) executeInternal(query).one().getLong("count"); - } - - public long getTotalBatchesReplayed() - { - return totalBatchesReplayed.longValue(); - } - - public void forceBatchlogReplay() - { - startBatchlogReplay(); - } - - public Future startBatchlogReplay() - { - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws ExecutionException, InterruptedException - { - replayAllFailedBatches(); - } - }; - // If a replay is already in progress this request will be executed after it completes. - return batchlogTasks.submit(runnable); - } - - public static Mutation getBatchlogMutationFor(Collection mutations, UUID uuid, int version) - { - return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros()); - } - - @VisibleForTesting - static Mutation getBatchlogMutationFor(Collection mutations, UUID uuid, int version, long now) - { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog); - CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now); - adder.add("data", serializeMutations(mutations, version)) - .add("written_at", new Date(now / 1000)) - .add("version", version); - return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid), cf); - } - - private static ByteBuffer serializeMutations(Collection mutations, int version) - { - try (DataOutputBuffer buf = new DataOutputBuffer()) - { - buf.writeInt(mutations.size()); - for (Mutation mutation : mutations) - Mutation.serializer.serialize(mutation, buf, version); - return buf.buffer(); - } - catch (IOException e) - { - throw new AssertionError(); // cannot happen. - } - } - - private void replayAllFailedBatches() throws ExecutionException, InterruptedException - { - logger.debug("Started replayAllFailedBatches"); - - // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). - // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). - int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); - RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - - UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", - SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG, - PAGE_SIZE)); - - while (!page.isEmpty()) - { - UUID id = processBatchlogPage(page, rateLimiter); - - if (page.size() < PAGE_SIZE) - break; // we've exhausted the batchlog, next query would be empty. - - page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", - SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG, - PAGE_SIZE), - id); - } - - cleanup(); - - logger.debug("Finished replayAllFailedBatches"); - } - - private void deleteBatch(UUID id) - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id)); - mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); - mutation.apply(); - } - - private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter) - { - UUID id = null; - ArrayList batches = new ArrayList<>(page.size()); - - // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others - for (UntypedResultSet.Row row : page) - { - id = row.getUUID("id"); - long writtenAt = row.getLong("written_at"); - // enough time for the actual write + batchlog entry mutation delivery (two separate requests). - long timeout = getBatchlogTimeout(); - if (System.currentTimeMillis() < writtenAt + timeout) - continue; // not ready to replay yet, might still get a deletion. - - int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; - Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version); - try - { - if (batch.replay(rateLimiter) > 0) - { - batches.add(batch); - } - else - { - deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated). - totalBatchesReplayed.incrementAndGet(); - } - } - catch (IOException e) - { - logger.warn("Skipped batch replay of {} due to {}", id, e); - deleteBatch(id); - } - } - - // now waiting for all batches to complete their processing - // schedule hints for timed out deliveries - for (Batch batch : batches) - { - batch.finish(); - deleteBatch(batch.id); - } - - totalBatchesReplayed.addAndGet(batches.size()); - - return id; - } - - public long getBatchlogTimeout() - { - return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation - } - - private static class Batch - { - private final UUID id; - private final long writtenAt; - private final ByteBuffer data; - private final int version; - - private List> replayHandlers; - - public Batch(UUID id, long writtenAt, ByteBuffer data, int version) - { - this.id = id; - this.writtenAt = writtenAt; - this.data = data; - this.version = version; - } - - public int replay(RateLimiter rateLimiter) throws IOException - { - logger.debug("Replaying batch {}", id); - - List mutations = replayingMutations(); - - if (mutations.isEmpty()) - return 0; - - int ttl = calculateHintTTL(mutations); - if (ttl <= 0) - return 0; - - replayHandlers = sendReplays(mutations, writtenAt, ttl); - - rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation. - - return replayHandlers.size(); - } - - public void finish() - { - for (int i = 0; i < replayHandlers.size(); i++) - { - ReplayWriteResponseHandler handler = replayHandlers.get(i); - try - { - handler.get(); - } - catch (WriteTimeoutException|WriteFailureException e) - { - logger.debug("Failed replaying a batched mutation to a node, will write a hint"); - logger.debug("Failure was : {}", e.getMessage()); - // writing hints for the rest to hints, starting from i - writeHintsForUndeliveredEndpoints(i); - return; - } - } - } - - private List replayingMutations() throws IOException - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); - int size = in.readInt(); - List mutations = new ArrayList<>(size); - for (int i = 0; i < size; i++) - { - Mutation mutation = Mutation.serializer.deserialize(in, version); - - // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. - // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then - // truncated. - for (UUID cfId : mutation.getColumnFamilyIds()) - if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) - mutation = mutation.without(cfId); - - if (!mutation.isEmpty()) - mutations.add(mutation); - } - return mutations; - } - - private void writeHintsForUndeliveredEndpoints(int startFrom) - { - try - { - // Here we deserialize mutations 2nd time from byte buffer. - // but this is ok, because timeout on batch direct delivery is rare - // (it can happen only several seconds until node is marked dead) - // so trading some cpu to keep less objects - List replayingMutations = replayingMutations(); - for (int i = startFrom; i < replayHandlers.size(); i++) - { - Mutation undeliveredMutation = replayingMutations.get(i); - int ttl = calculateHintTTL(replayingMutations); - ReplayWriteResponseHandler handler = replayHandlers.get(i); - - if (ttl > 0 && handler != null) - for (InetAddress endpoint : handler.undelivered) - StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint); - } - } - catch (IOException e) - { - logger.error("Cannot schedule hints for undelivered batch", e); - } - } - - private List> sendReplays(List mutations, long writtenAt, int ttl) - { - List> handlers = new ArrayList<>(mutations.size()); - for (Mutation mutation : mutations) - { - ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl); - if (handler != null) - handlers.add(handler); - } - return handlers; - } - - /** - * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints - * when a replica is down or a write request times out. - * - * @return direct delivery handler to wait on or null, if no live nodes found - */ - private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) - { - Set liveEndpoints = new HashSet<>(); - String ks = mutation.getKeyspaceName(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - - for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), - StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) - { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - mutation.apply(); - else if (FailureDetector.instance.isAlive(endpoint)) - liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. - else - StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint); - } - - if (liveEndpoints.isEmpty()) - return null; - - ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler<>(liveEndpoints); - MessageOut message = mutation.createMessage(); - for (InetAddress endpoint : liveEndpoints) - MessagingService.instance().sendRR(message, endpoint, handler, false); - return handler; - } - - /* - * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). - * This ensures that deletes aren't "undone" by an old batch replay. - */ - private int calculateHintTTL(Collection mutations) - { - int unadjustedTTL = Integer.MAX_VALUE; - for (Mutation mutation : mutations) - unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); - return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); - } - - /** - * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from - * which we did not receive a successful reply. - */ - private static class ReplayWriteResponseHandler extends WriteResponseHandler - { - private final Set undelivered = Collections.newSetFromMap(new ConcurrentHashMap()); - - public ReplayWriteResponseHandler(Collection writeEndpoints) - { - super(writeEndpoints, Collections.emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); - undelivered.addAll(writeEndpoints); - } - - @Override - protected int totalBlockFor() - { - return this.naturalEndpoints.size(); - } - - @Override - public void response(MessageIn m) - { - boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from); - assert removed; - super.response(m); - } - } - } - - // force flush + compaction to reclaim space from the replayed batches - private void cleanup() throws ExecutionException, InterruptedException - { - ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); - cfs.forceBlockingFlush(); - Collection descriptors = new ArrayList<>(); - for (SSTableReader sstr : cfs.getSSTables()) - descriptors.add(sstr.descriptor); - if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. - CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); - } - - public static class EndpointFilter - { - private final String localRack; - private final Multimap endpoints; - - public EndpointFilter(String localRack, Multimap endpoints) - { - this.localRack = localRack; - this.endpoints = endpoints; - } - - /** - * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - */ - public Collection filter() - { - // special case for single-node data centers - if (endpoints.values().size() == 1) - return endpoints.values(); - - // strip out dead endpoints and localhost - ListMultimap validated = ArrayListMultimap.create(); - for (Map.Entry entry : endpoints.entries()) - if (isValid(entry.getValue())) - validated.put(entry.getKey(), entry.getValue()); - - if (validated.size() <= 2) - return validated.values(); - - if (validated.size() - validated.get(localRack).size() >= 2) - { - // we have enough endpoints in other racks - validated.removeAll(localRack); - } - - if (validated.keySet().size() == 1) - { - // we have only 1 `other` rack - Collection otherRack = Iterables.getOnlyElement(validated.asMap().values()); - return Lists.newArrayList(Iterables.limit(otherRack, 2)); - } - - // randomize which racks we pick from if more than 2 remaining - Collection racks; - if (validated.keySet().size() == 2) - { - racks = validated.keySet(); - } - else - { - racks = Lists.newArrayList(validated.keySet()); - Collections.shuffle((List) racks); - } - - // grab a random member of up to two racks - List result = new ArrayList<>(2); - for (String rack : Iterables.limit(racks, 2)) - { - List rackMembers = validated.get(rack); - result.add(rackMembers.get(getRandomInt(rackMembers.size()))); - } - - return result; - } - - @VisibleForTesting - protected boolean isValid(InetAddress input) - { - return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); - } - - @VisibleForTesting - protected int getRandomInt(int bound) - { - return ThreadLocalRandom.current().nextInt(bound); - } - } -} diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc new file mode 100644 index 0000000000..d3e03a9456 --- /dev/null +++ b/db/batchlog_manager.cc @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#include +#include "batchlog_manager.hh" +#include "service/storage_service.hh" +#include "service/storage_proxy.hh" +#include "system_keyspace.hh" +#include "utils/rate_limiter.hh" +#include "core/future-util.hh" +#include "core/do_with.hh" +#include "log.hh" +#include "serializer.hh" +#include "db_clock.hh" +#include "database.hh" +#include "unimplemented.hh" + +static thread_local logging::logger logger("BatchLog Manager"); + +const uint32_t db::batchlog_manager::replay_interval; +const uint32_t db::batchlog_manager::page_size; + +db::batchlog_manager::batchlog_manager(cql3::query_processor& qp) + : _qp(qp) +{} + +future<> db::batchlog_manager::start() { + _timer.set_callback( + std::bind(&batchlog_manager::replay_all_failed_batches, this)); + _timer.arm( + lowres_clock::now() + + std::chrono::milliseconds( + service::storage_service::RING_DELAY), + std::experimental::optional { + std::chrono::milliseconds(replay_interval) }); + return make_ready_future<>(); +} + +future<> db::batchlog_manager::stop() { + _stop = true; + _timer.cancel(); + return _sem.wait(std::chrono::milliseconds(60)); +} + +future db::batchlog_manager::count_all_batches() const { + sstring query = sprint("SELECT count(*) FROM %s.%s", system_keyspace::NAME, system_keyspace::BATCHLOG); + return _qp.execute_internal(query).then([](::shared_ptr rs) { + return size_t(rs->one().get_as("count")); + }); +} + +mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector mutations, const utils::UUID& id, int32_t version) { + return get_batch_log_mutation_for(std::move(mutations), id, version, db_clock::now()); +} + +mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector mutations, const utils::UUID& id, int32_t version, db_clock::time_point now) { + auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); + auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)}); + auto timestamp = db_clock::now_in_usecs(); + auto data = [this, &mutations] { + std::vector fm(mutations.begin(), mutations.end()); + const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) { + return s + serializer{m}.size(); + }); + bytes buf(bytes::initialized_later(), size); + data_output out(buf); + for (auto& m : fm) { + serializer{m}(out); + } + return buf; + }(); + + mutation m(key, schema); + m.set_cell({}, to_bytes("version"), version, timestamp); + m.set_cell({}, to_bytes("written_at"), now, timestamp); + m.set_cell({}, to_bytes("data"), std::move(data), timestamp); + + return m; +} + +db_clock::duration db::batchlog_manager::get_batch_log_timeout() const { + // enough time for the actual write + BM removal mutation + return db_clock::duration(_qp.db().local().get_config().write_request_timeout_in_ms()) * 2; +} + +future<> db::batchlog_manager::replay_all_failed_batches() { + typedef db_clock::rep clock_type; + + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). + auto throttle_in_kb = _qp.db().local().get_config().batchlog_replay_throttle_in_kb() / service::get_storage_service().local().get_token_metadata().get_all_endpoints().size(); + auto limiter = make_lw_shared(throttle_in_kb * 1000); + + auto batch = [this, limiter](const cql3::untyped_result_set::row& row) { + auto written_at = row.get_as("written_at"); + // enough time for the actual write + batchlog entry mutation delivery (two separate requests). + // enough time for the actual write + batchlog entry mutation delivery (two separate requests). + auto timeout = get_batch_log_timeout(); + if (db_clock::now() < written_at + timeout) { + return make_ready_future<>(); + } + // not used currently. ever? + //auto version = row.has("version") ? row.get_as("version") : /*MessagingService.VERSION_12*/6u; + auto id = row.get_as("id"); + auto data = row.get_blob("data"); + + logger.debug("Replaying batch {}", id); + + auto fms = make_lw_shared>(); + data_input in(data); + while (in.has_next()) { + fms->emplace_back(serializer::read(in)); + } + + auto mutations = make_lw_shared>(); + auto size = data.size(); + + return repeat([this, fms = std::move(fms), written_at, mutations]() mutable { + if (fms->empty()) { + return make_ready_future(stop_iteration::yes); + } + auto& fm = fms->front(); + auto mid = fm.column_family_id(); + return system_keyspace::get_truncated_at(_qp, mid).then([this, &fm, written_at, mutations](db_clock::time_point t) { + auto schema = _qp.db().local().find_schema(fm.column_family_id()); + if (written_at > t) { + auto schema = _qp.db().local().find_schema(fm.column_family_id()); + mutations->emplace_back(fm.unfreeze(schema)); + } + }).then([fms] { + fms->pop_front(); + return make_ready_future(stop_iteration::no); + }); + }).then([this, id, mutations, limiter, written_at, size] { + if (mutations->empty()) { + return make_ready_future<>(); + } + const auto ttl = [this, mutations, written_at]() -> clock_type { + /* + * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). + * This ensures that deletes aren't "undone" by an old batch replay. + */ + auto unadjusted_ttl = std::numeric_limits::max(); + warn(unimplemented::cause::HINT); +#if 0 + for (auto& m : *mutations) { + unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); + } +#endif + return unadjusted_ttl - std::chrono::duration_cast(db_clock::now() - written_at).count(); + }(); + + if (ttl <= 0) { + return make_ready_future<>(); + } + // Origin does the send manually, however I can't see a super great reason to do so. + // Our normal write path does not add much redundancy to the dispatch, and rate is handled after send + // in both cases. + // FIXME: verify that the above is reasonably true. + return limiter->reserve(size).then([this, mutations, id] { + return _qp.proxy().local().mutate(*mutations, db::consistency_level::ANY); + }); + }).then([this, id] { + // delete batch + auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); + auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)}); + mutation m(key, schema); + auto now = service::client_state(service::client_state::internal_tag()).get_timestamp(); + m.partition().apply_delete(*schema, {}, tombstone(now, gc_clock::now())); + return _qp.proxy().local().mutate_locally(m); + }); + }; + + return _sem.wait().then([this, batch = std::move(batch)] { + logger.debug("Started replayAllFailedBatches"); + + typedef ::shared_ptr page_ptr; + sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size); + return _qp.execute_internal(query).then([this, batch = std::move(batch)](page_ptr page) { + return do_with(std::move(page), [this, batch = std::move(batch)](page_ptr & page) mutable { + return repeat([this, &page, batch = std::move(batch)]() mutable { + if (page->empty()) { + return make_ready_future(stop_iteration::yes); + } + auto id = page->back().get_as("id"); + return parallel_for_each(*page, batch).then([this, &page, id]() { + if (page->size() < page_size) { + return make_ready_future(stop_iteration::yes); // we've exhausted the batchlog, next query would be empty. + } + sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", + system_keyspace::NAME, + system_keyspace::BATCHLOG, + page_size); + return _qp.execute_internal(query, {id}).then([&page](auto res) { + page = std::move(res); + return make_ready_future(stop_iteration::no); + }); + }); + }); + }); + }).then([this] { + // TODO FIXME : cleanup() +#if 0 + ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); + cfs.forceBlockingFlush(); + Collection descriptors = new ArrayList<>(); + for (SSTableReader sstr : cfs.getSSTables()) + descriptors.add(sstr.descriptor); + if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. + CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); + +#endif + + }).then([this] { + logger.debug("Finished replayAllFailedBatches"); + }); + }).finally([this] { + _sem.signal(); + }); +} diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh new file mode 100644 index 0000000000..62174bf0d8 --- /dev/null +++ b/db/batchlog_manager.hh @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include +#include "core/future.hh" +#include "core/distributed.hh" +#include "core/timer.hh" +#include "cql3/query_processor.hh" +#include "gms/inet_address.hh" +#include "db_clock.hh" + +namespace db { + +class batchlog_manager { +private: + static constexpr uint32_t replay_interval = 60 * 1000; // milliseconds + static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. + + using clock_type = lowres_clock; + + size_t _total_batches_replayed = 0; + cql3::query_processor& _qp; + timer _timer; + semaphore _sem; + bool _stop = false; + + future<> replay_all_failed_batches(); +public: + // Takes a QP, not a distributes. Because this object is supposed + // to be per shard and does no dispatching beyond delegating the the + // shard qp (which is what you feed here). + batchlog_manager(cql3::query_processor&); + + future<> start(); + future<> stop(); + + // for testing. + future<> do_batch_log_replay() { + return replay_all_failed_batches(); + } + future count_all_batches() const; + size_t get_total_batches_replayed() const { + return _total_batches_replayed; + } + mutation get_batch_log_mutation_for(std::vector, const utils::UUID&, int32_t); + mutation get_batch_log_mutation_for(std::vector, const utils::UUID&, int32_t, db_clock::time_point); + db_clock::duration get_batch_log_timeout() const; + + class endpoint_filter { + private: + const sstring _local_rack; + const std::unordered_map> _endpoints; + + public: + endpoint_filter(sstring, std::unordered_map>); + /** + * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + */ + std::vector filter() const; + }; +}; + +}