BatchLogManager.java -> C++
Somewhat simplifies version of the Origin code, since from what I can see, there is less need for us to do explicit query sends in the BLM itself, instead we can just go through storage_proxy. I could be wrong though.
This commit is contained in:
@@ -465,6 +465,7 @@ urchin_core = (['database.cc',
|
||||
'db/config.cc',
|
||||
'db/index/secondary_index.cc',
|
||||
'db/marshal/type_parser.cc',
|
||||
'db/batchlog_manager.cc',
|
||||
'io/io.cc',
|
||||
'utils/utils.cc',
|
||||
'utils/UUID_gen.cc',
|
||||
|
||||
@@ -1,539 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.cassandra.db;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.*;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
|
||||
import org.apache.cassandra.config.DatabaseDescriptor;
|
||||
import org.apache.cassandra.cql3.UntypedResultSet;
|
||||
import org.apache.cassandra.db.compaction.CompactionManager;
|
||||
import org.apache.cassandra.db.marshal.UUIDType;
|
||||
import org.apache.cassandra.dht.Token;
|
||||
import org.apache.cassandra.exceptions.WriteFailureException;
|
||||
import org.apache.cassandra.exceptions.WriteTimeoutException;
|
||||
import org.apache.cassandra.gms.FailureDetector;
|
||||
import org.apache.cassandra.io.sstable.Descriptor;
|
||||
import org.apache.cassandra.io.sstable.format.SSTableReader;
|
||||
import org.apache.cassandra.io.util.DataOutputBuffer;
|
||||
import org.apache.cassandra.net.MessageIn;
|
||||
import org.apache.cassandra.net.MessageOut;
|
||||
import org.apache.cassandra.net.MessagingService;
|
||||
import org.apache.cassandra.service.StorageProxy;
|
||||
import org.apache.cassandra.service.StorageService;
|
||||
import org.apache.cassandra.service.WriteResponseHandler;
|
||||
import org.apache.cassandra.utils.ByteBufferUtil;
|
||||
import org.apache.cassandra.utils.FBUtilities;
|
||||
import org.apache.cassandra.utils.WrappedRunnable;
|
||||
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
|
||||
|
||||
public class BatchlogManager implements BatchlogManagerMBean
|
||||
{
|
||||
private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
|
||||
private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
|
||||
private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
|
||||
public static final BatchlogManager instance = new BatchlogManager();
|
||||
|
||||
private final AtomicLong totalBatchesReplayed = new AtomicLong();
|
||||
|
||||
// Single-thread executor service for scheduling and serializing log replay.
|
||||
private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
|
||||
|
||||
public void start()
|
||||
{
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
try
|
||||
{
|
||||
mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
Runnable runnable = new WrappedRunnable()
|
||||
{
|
||||
public void runMayThrow() throws ExecutionException, InterruptedException
|
||||
{
|
||||
replayAllFailedBatches();
|
||||
}
|
||||
};
|
||||
|
||||
batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public static void shutdown() throws InterruptedException
|
||||
{
|
||||
batchlogTasks.shutdown();
|
||||
batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public int countAllBatches()
|
||||
{
|
||||
String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG);
|
||||
return (int) executeInternal(query).one().getLong("count");
|
||||
}
|
||||
|
||||
public long getTotalBatchesReplayed()
|
||||
{
|
||||
return totalBatchesReplayed.longValue();
|
||||
}
|
||||
|
||||
public void forceBatchlogReplay()
|
||||
{
|
||||
startBatchlogReplay();
|
||||
}
|
||||
|
||||
public Future<?> startBatchlogReplay()
|
||||
{
|
||||
Runnable runnable = new WrappedRunnable()
|
||||
{
|
||||
public void runMayThrow() throws ExecutionException, InterruptedException
|
||||
{
|
||||
replayAllFailedBatches();
|
||||
}
|
||||
};
|
||||
// If a replay is already in progress this request will be executed after it completes.
|
||||
return batchlogTasks.submit(runnable);
|
||||
}
|
||||
|
||||
public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
|
||||
{
|
||||
return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now)
|
||||
{
|
||||
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog);
|
||||
CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now);
|
||||
adder.add("data", serializeMutations(mutations, version))
|
||||
.add("written_at", new Date(now / 1000))
|
||||
.add("version", version);
|
||||
return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid), cf);
|
||||
}
|
||||
|
||||
private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
|
||||
{
|
||||
try (DataOutputBuffer buf = new DataOutputBuffer())
|
||||
{
|
||||
buf.writeInt(mutations.size());
|
||||
for (Mutation mutation : mutations)
|
||||
Mutation.serializer.serialize(mutation, buf, version);
|
||||
return buf.buffer();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new AssertionError(); // cannot happen.
|
||||
}
|
||||
}
|
||||
|
||||
private void replayAllFailedBatches() throws ExecutionException, InterruptedException
|
||||
{
|
||||
logger.debug("Started replayAllFailedBatches");
|
||||
|
||||
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
|
||||
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
|
||||
int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
|
||||
RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
|
||||
|
||||
UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
|
||||
SystemKeyspace.NAME,
|
||||
SystemKeyspace.BATCHLOG,
|
||||
PAGE_SIZE));
|
||||
|
||||
while (!page.isEmpty())
|
||||
{
|
||||
UUID id = processBatchlogPage(page, rateLimiter);
|
||||
|
||||
if (page.size() < PAGE_SIZE)
|
||||
break; // we've exhausted the batchlog, next query would be empty.
|
||||
|
||||
page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
|
||||
SystemKeyspace.NAME,
|
||||
SystemKeyspace.BATCHLOG,
|
||||
PAGE_SIZE),
|
||||
id);
|
||||
}
|
||||
|
||||
cleanup();
|
||||
|
||||
logger.debug("Finished replayAllFailedBatches");
|
||||
}
|
||||
|
||||
private void deleteBatch(UUID id)
|
||||
{
|
||||
Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id));
|
||||
mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
|
||||
mutation.apply();
|
||||
}
|
||||
|
||||
private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
|
||||
{
|
||||
UUID id = null;
|
||||
ArrayList<Batch> batches = new ArrayList<>(page.size());
|
||||
|
||||
// Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
|
||||
for (UntypedResultSet.Row row : page)
|
||||
{
|
||||
id = row.getUUID("id");
|
||||
long writtenAt = row.getLong("written_at");
|
||||
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
||||
long timeout = getBatchlogTimeout();
|
||||
if (System.currentTimeMillis() < writtenAt + timeout)
|
||||
continue; // not ready to replay yet, might still get a deletion.
|
||||
|
||||
int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
|
||||
Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version);
|
||||
try
|
||||
{
|
||||
if (batch.replay(rateLimiter) > 0)
|
||||
{
|
||||
batches.add(batch);
|
||||
}
|
||||
else
|
||||
{
|
||||
deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
|
||||
totalBatchesReplayed.incrementAndGet();
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
logger.warn("Skipped batch replay of {} due to {}", id, e);
|
||||
deleteBatch(id);
|
||||
}
|
||||
}
|
||||
|
||||
// now waiting for all batches to complete their processing
|
||||
// schedule hints for timed out deliveries
|
||||
for (Batch batch : batches)
|
||||
{
|
||||
batch.finish();
|
||||
deleteBatch(batch.id);
|
||||
}
|
||||
|
||||
totalBatchesReplayed.addAndGet(batches.size());
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
public long getBatchlogTimeout()
|
||||
{
|
||||
return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
|
||||
}
|
||||
|
||||
private static class Batch
|
||||
{
|
||||
private final UUID id;
|
||||
private final long writtenAt;
|
||||
private final ByteBuffer data;
|
||||
private final int version;
|
||||
|
||||
private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
|
||||
|
||||
public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
|
||||
{
|
||||
this.id = id;
|
||||
this.writtenAt = writtenAt;
|
||||
this.data = data;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public int replay(RateLimiter rateLimiter) throws IOException
|
||||
{
|
||||
logger.debug("Replaying batch {}", id);
|
||||
|
||||
List<Mutation> mutations = replayingMutations();
|
||||
|
||||
if (mutations.isEmpty())
|
||||
return 0;
|
||||
|
||||
int ttl = calculateHintTTL(mutations);
|
||||
if (ttl <= 0)
|
||||
return 0;
|
||||
|
||||
replayHandlers = sendReplays(mutations, writtenAt, ttl);
|
||||
|
||||
rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
|
||||
|
||||
return replayHandlers.size();
|
||||
}
|
||||
|
||||
public void finish()
|
||||
{
|
||||
for (int i = 0; i < replayHandlers.size(); i++)
|
||||
{
|
||||
ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
|
||||
try
|
||||
{
|
||||
handler.get();
|
||||
}
|
||||
catch (WriteTimeoutException|WriteFailureException e)
|
||||
{
|
||||
logger.debug("Failed replaying a batched mutation to a node, will write a hint");
|
||||
logger.debug("Failure was : {}", e.getMessage());
|
||||
// writing hints for the rest to hints, starting from i
|
||||
writeHintsForUndeliveredEndpoints(i);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Mutation> replayingMutations() throws IOException
|
||||
{
|
||||
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
|
||||
int size = in.readInt();
|
||||
List<Mutation> mutations = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++)
|
||||
{
|
||||
Mutation mutation = Mutation.serializer.deserialize(in, version);
|
||||
|
||||
// Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
|
||||
// We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
|
||||
// truncated.
|
||||
for (UUID cfId : mutation.getColumnFamilyIds())
|
||||
if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
|
||||
mutation = mutation.without(cfId);
|
||||
|
||||
if (!mutation.isEmpty())
|
||||
mutations.add(mutation);
|
||||
}
|
||||
return mutations;
|
||||
}
|
||||
|
||||
private void writeHintsForUndeliveredEndpoints(int startFrom)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Here we deserialize mutations 2nd time from byte buffer.
|
||||
// but this is ok, because timeout on batch direct delivery is rare
|
||||
// (it can happen only several seconds until node is marked dead)
|
||||
// so trading some cpu to keep less objects
|
||||
List<Mutation> replayingMutations = replayingMutations();
|
||||
for (int i = startFrom; i < replayHandlers.size(); i++)
|
||||
{
|
||||
Mutation undeliveredMutation = replayingMutations.get(i);
|
||||
int ttl = calculateHintTTL(replayingMutations);
|
||||
ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
|
||||
|
||||
if (ttl > 0 && handler != null)
|
||||
for (InetAddress endpoint : handler.undelivered)
|
||||
StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint);
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
logger.error("Cannot schedule hints for undelivered batch", e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
|
||||
{
|
||||
List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
|
||||
for (Mutation mutation : mutations)
|
||||
{
|
||||
ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, ttl);
|
||||
if (handler != null)
|
||||
handlers.add(handler);
|
||||
}
|
||||
return handlers;
|
||||
}
|
||||
|
||||
/**
|
||||
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
|
||||
* when a replica is down or a write request times out.
|
||||
*
|
||||
* @return direct delivery handler to wait on or null, if no live nodes found
|
||||
*/
|
||||
private ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
|
||||
{
|
||||
Set<InetAddress> liveEndpoints = new HashSet<>();
|
||||
String ks = mutation.getKeyspaceName();
|
||||
Token tk = StorageService.getPartitioner().getToken(mutation.key());
|
||||
|
||||
for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
|
||||
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
|
||||
{
|
||||
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
|
||||
mutation.apply();
|
||||
else if (FailureDetector.instance.isAlive(endpoint))
|
||||
liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
|
||||
else
|
||||
StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
|
||||
}
|
||||
|
||||
if (liveEndpoints.isEmpty())
|
||||
return null;
|
||||
|
||||
ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
|
||||
MessageOut<Mutation> message = mutation.createMessage();
|
||||
for (InetAddress endpoint : liveEndpoints)
|
||||
MessagingService.instance().sendRR(message, endpoint, handler, false);
|
||||
return handler;
|
||||
}
|
||||
|
||||
/*
|
||||
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
|
||||
* This ensures that deletes aren't "undone" by an old batch replay.
|
||||
*/
|
||||
private int calculateHintTTL(Collection<Mutation> mutations)
|
||||
{
|
||||
int unadjustedTTL = Integer.MAX_VALUE;
|
||||
for (Mutation mutation : mutations)
|
||||
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
|
||||
return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
|
||||
* which we did not receive a successful reply.
|
||||
*/
|
||||
private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
|
||||
{
|
||||
private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
|
||||
|
||||
public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
|
||||
{
|
||||
super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
|
||||
undelivered.addAll(writeEndpoints);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int totalBlockFor()
|
||||
{
|
||||
return this.naturalEndpoints.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void response(MessageIn<T> m)
|
||||
{
|
||||
boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
|
||||
assert removed;
|
||||
super.response(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// force flush + compaction to reclaim space from the replayed batches
|
||||
private void cleanup() throws ExecutionException, InterruptedException
|
||||
{
|
||||
ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG);
|
||||
cfs.forceBlockingFlush();
|
||||
Collection<Descriptor> descriptors = new ArrayList<>();
|
||||
for (SSTableReader sstr : cfs.getSSTables())
|
||||
descriptors.add(sstr.descriptor);
|
||||
if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
|
||||
CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
|
||||
}
|
||||
|
||||
public static class EndpointFilter
|
||||
{
|
||||
private final String localRack;
|
||||
private final Multimap<String, InetAddress> endpoints;
|
||||
|
||||
public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
|
||||
{
|
||||
this.localRack = localRack;
|
||||
this.endpoints = endpoints;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
|
||||
*/
|
||||
public Collection<InetAddress> filter()
|
||||
{
|
||||
// special case for single-node data centers
|
||||
if (endpoints.values().size() == 1)
|
||||
return endpoints.values();
|
||||
|
||||
// strip out dead endpoints and localhost
|
||||
ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
|
||||
for (Map.Entry<String, InetAddress> entry : endpoints.entries())
|
||||
if (isValid(entry.getValue()))
|
||||
validated.put(entry.getKey(), entry.getValue());
|
||||
|
||||
if (validated.size() <= 2)
|
||||
return validated.values();
|
||||
|
||||
if (validated.size() - validated.get(localRack).size() >= 2)
|
||||
{
|
||||
// we have enough endpoints in other racks
|
||||
validated.removeAll(localRack);
|
||||
}
|
||||
|
||||
if (validated.keySet().size() == 1)
|
||||
{
|
||||
// we have only 1 `other` rack
|
||||
Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
|
||||
return Lists.newArrayList(Iterables.limit(otherRack, 2));
|
||||
}
|
||||
|
||||
// randomize which racks we pick from if more than 2 remaining
|
||||
Collection<String> racks;
|
||||
if (validated.keySet().size() == 2)
|
||||
{
|
||||
racks = validated.keySet();
|
||||
}
|
||||
else
|
||||
{
|
||||
racks = Lists.newArrayList(validated.keySet());
|
||||
Collections.shuffle((List) racks);
|
||||
}
|
||||
|
||||
// grab a random member of up to two racks
|
||||
List<InetAddress> result = new ArrayList<>(2);
|
||||
for (String rack : Iterables.limit(racks, 2))
|
||||
{
|
||||
List<InetAddress> rackMembers = validated.get(rack);
|
||||
result.add(rackMembers.get(getRandomInt(rackMembers.size())));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected boolean isValid(InetAddress input)
|
||||
{
|
||||
return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected int getRandomInt(int bound)
|
||||
{
|
||||
return ThreadLocalRandom.current().nextInt(bound);
|
||||
}
|
||||
}
|
||||
}
|
||||
240
db/batchlog_manager.cc
Normal file
240
db/batchlog_manager.cc
Normal file
@@ -0,0 +1,240 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#include <chrono>
|
||||
#include "batchlog_manager.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "utils/rate_limiter.hh"
|
||||
#include "core/future-util.hh"
|
||||
#include "core/do_with.hh"
|
||||
#include "log.hh"
|
||||
#include "serializer.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "database.hh"
|
||||
#include "unimplemented.hh"
|
||||
|
||||
static thread_local logging::logger logger("BatchLog Manager");
|
||||
|
||||
const uint32_t db::batchlog_manager::replay_interval;
|
||||
const uint32_t db::batchlog_manager::page_size;
|
||||
|
||||
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp)
|
||||
: _qp(qp)
|
||||
{}
|
||||
|
||||
future<> db::batchlog_manager::start() {
|
||||
_timer.set_callback(
|
||||
std::bind(&batchlog_manager::replay_all_failed_batches, this));
|
||||
_timer.arm(
|
||||
lowres_clock::now()
|
||||
+ std::chrono::milliseconds(
|
||||
service::storage_service::RING_DELAY),
|
||||
std::experimental::optional<lowres_clock::duration> {
|
||||
std::chrono::milliseconds(replay_interval) });
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> db::batchlog_manager::stop() {
|
||||
_stop = true;
|
||||
_timer.cancel();
|
||||
return _sem.wait(std::chrono::milliseconds(60));
|
||||
}
|
||||
|
||||
future<size_t> db::batchlog_manager::count_all_batches() const {
|
||||
sstring query = sprint("SELECT count(*) FROM %s.%s", system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
return _qp.execute_internal(query).then([](::shared_ptr<cql3::untyped_result_set> rs) {
|
||||
return size_t(rs->one().get_as<int64_t>("count"));
|
||||
});
|
||||
}
|
||||
|
||||
mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector<mutation> mutations, const utils::UUID& id, int32_t version) {
|
||||
return get_batch_log_mutation_for(std::move(mutations), id, version, db_clock::now());
|
||||
}
|
||||
|
||||
mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector<mutation> mutations, const utils::UUID& id, int32_t version, db_clock::time_point now) {
|
||||
auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)});
|
||||
auto timestamp = db_clock::now_in_usecs();
|
||||
auto data = [this, &mutations] {
|
||||
std::vector<frozen_mutation> fm(mutations.begin(), mutations.end());
|
||||
const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) {
|
||||
return s + serializer<frozen_mutation>{m}.size();
|
||||
});
|
||||
bytes buf(bytes::initialized_later(), size);
|
||||
data_output out(buf);
|
||||
for (auto& m : fm) {
|
||||
serializer<frozen_mutation>{m}(out);
|
||||
}
|
||||
return buf;
|
||||
}();
|
||||
|
||||
mutation m(key, schema);
|
||||
m.set_cell({}, to_bytes("version"), version, timestamp);
|
||||
m.set_cell({}, to_bytes("written_at"), now, timestamp);
|
||||
m.set_cell({}, to_bytes("data"), std::move(data), timestamp);
|
||||
|
||||
return m;
|
||||
}
|
||||
|
||||
db_clock::duration db::batchlog_manager::get_batch_log_timeout() const {
|
||||
// enough time for the actual write + BM removal mutation
|
||||
return db_clock::duration(_qp.db().local().get_config().write_request_timeout_in_ms()) * 2;
|
||||
}
|
||||
|
||||
future<> db::batchlog_manager::replay_all_failed_batches() {
|
||||
typedef db_clock::rep clock_type;
|
||||
|
||||
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
|
||||
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
|
||||
auto throttle_in_kb = _qp.db().local().get_config().batchlog_replay_throttle_in_kb() / service::get_storage_service().local().get_token_metadata().get_all_endpoints().size();
|
||||
auto limiter = make_lw_shared<utils::rate_limiter>(throttle_in_kb * 1000);
|
||||
|
||||
auto batch = [this, limiter](const cql3::untyped_result_set::row& row) {
|
||||
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
||||
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
||||
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
||||
auto timeout = get_batch_log_timeout();
|
||||
if (db_clock::now() < written_at + timeout) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// not used currently. ever?
|
||||
//auto version = row.has("version") ? row.get_as<uint32_t>("version") : /*MessagingService.VERSION_12*/6u;
|
||||
auto id = row.get_as<utils::UUID>("id");
|
||||
auto data = row.get_blob("data");
|
||||
|
||||
logger.debug("Replaying batch {}", id);
|
||||
|
||||
auto fms = make_lw_shared<std::deque<frozen_mutation>>();
|
||||
data_input in(data);
|
||||
while (in.has_next()) {
|
||||
fms->emplace_back(serializer<frozen_mutation>::read(in));
|
||||
}
|
||||
|
||||
auto mutations = make_lw_shared<std::vector<mutation>>();
|
||||
auto size = data.size();
|
||||
|
||||
return repeat([this, fms = std::move(fms), written_at, mutations]() mutable {
|
||||
if (fms->empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
auto& fm = fms->front();
|
||||
auto mid = fm.column_family_id();
|
||||
return system_keyspace::get_truncated_at(_qp, mid).then([this, &fm, written_at, mutations](db_clock::time_point t) {
|
||||
auto schema = _qp.db().local().find_schema(fm.column_family_id());
|
||||
if (written_at > t) {
|
||||
auto schema = _qp.db().local().find_schema(fm.column_family_id());
|
||||
mutations->emplace_back(fm.unfreeze(schema));
|
||||
}
|
||||
}).then([fms] {
|
||||
fms->pop_front();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
}).then([this, id, mutations, limiter, written_at, size] {
|
||||
if (mutations->empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
const auto ttl = [this, mutations, written_at]() -> clock_type {
|
||||
/*
|
||||
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
|
||||
* This ensures that deletes aren't "undone" by an old batch replay.
|
||||
*/
|
||||
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
|
||||
warn(unimplemented::cause::HINT);
|
||||
#if 0
|
||||
for (auto& m : *mutations) {
|
||||
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
|
||||
}
|
||||
#endif
|
||||
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
|
||||
}();
|
||||
|
||||
if (ttl <= 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// Origin does the send manually, however I can't see a super great reason to do so.
|
||||
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
|
||||
// in both cases.
|
||||
// FIXME: verify that the above is reasonably true.
|
||||
return limiter->reserve(size).then([this, mutations, id] {
|
||||
return _qp.proxy().local().mutate(*mutations, db::consistency_level::ANY);
|
||||
});
|
||||
}).then([this, id] {
|
||||
// delete batch
|
||||
auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
||||
auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)});
|
||||
mutation m(key, schema);
|
||||
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
|
||||
m.partition().apply_delete(*schema, {}, tombstone(now, gc_clock::now()));
|
||||
return _qp.proxy().local().mutate_locally(m);
|
||||
});
|
||||
};
|
||||
|
||||
return _sem.wait().then([this, batch = std::move(batch)] {
|
||||
logger.debug("Started replayAllFailedBatches");
|
||||
|
||||
typedef ::shared_ptr<cql3::untyped_result_set> page_ptr;
|
||||
sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size);
|
||||
return _qp.execute_internal(query).then([this, batch = std::move(batch)](page_ptr page) {
|
||||
return do_with(std::move(page), [this, batch = std::move(batch)](page_ptr & page) mutable {
|
||||
return repeat([this, &page, batch = std::move(batch)]() mutable {
|
||||
if (page->empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
auto id = page->back().get_as<utils::UUID>("id");
|
||||
return parallel_for_each(*page, batch).then([this, &page, id]() {
|
||||
if (page->size() < page_size) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes); // we've exhausted the batchlog, next query would be empty.
|
||||
}
|
||||
sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
|
||||
system_keyspace::NAME,
|
||||
system_keyspace::BATCHLOG,
|
||||
page_size);
|
||||
return _qp.execute_internal(query, {id}).then([&page](auto res) {
|
||||
page = std::move(res);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([this] {
|
||||
// TODO FIXME : cleanup()
|
||||
#if 0
|
||||
ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG);
|
||||
cfs.forceBlockingFlush();
|
||||
Collection<Descriptor> descriptors = new ArrayList<>();
|
||||
for (SSTableReader sstr : cfs.getSSTables())
|
||||
descriptors.add(sstr.descriptor);
|
||||
if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
|
||||
CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
|
||||
|
||||
#endif
|
||||
|
||||
}).then([this] {
|
||||
logger.debug("Finished replayAllFailedBatches");
|
||||
});
|
||||
}).finally([this] {
|
||||
_sem.signal();
|
||||
});
|
||||
}
|
||||
86
db/batchlog_manager.hh
Normal file
86
db/batchlog_manager.hh
Normal file
@@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include "core/future.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "core/timer.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "db_clock.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
class batchlog_manager {
|
||||
private:
|
||||
static constexpr uint32_t replay_interval = 60 * 1000; // milliseconds
|
||||
static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
|
||||
|
||||
using clock_type = lowres_clock;
|
||||
|
||||
size_t _total_batches_replayed = 0;
|
||||
cql3::query_processor& _qp;
|
||||
timer<clock_type> _timer;
|
||||
semaphore _sem;
|
||||
bool _stop = false;
|
||||
|
||||
future<> replay_all_failed_batches();
|
||||
public:
|
||||
// Takes a QP, not a distributes. Because this object is supposed
|
||||
// to be per shard and does no dispatching beyond delegating the the
|
||||
// shard qp (which is what you feed here).
|
||||
batchlog_manager(cql3::query_processor&);
|
||||
|
||||
future<> start();
|
||||
future<> stop();
|
||||
|
||||
// for testing.
|
||||
future<> do_batch_log_replay() {
|
||||
return replay_all_failed_batches();
|
||||
}
|
||||
future<size_t> count_all_batches() const;
|
||||
size_t get_total_batches_replayed() const {
|
||||
return _total_batches_replayed;
|
||||
}
|
||||
mutation get_batch_log_mutation_for(std::vector<mutation>, const utils::UUID&, int32_t);
|
||||
mutation get_batch_log_mutation_for(std::vector<mutation>, const utils::UUID&, int32_t, db_clock::time_point);
|
||||
db_clock::duration get_batch_log_timeout() const;
|
||||
|
||||
class endpoint_filter {
|
||||
private:
|
||||
const sstring _local_rack;
|
||||
const std::unordered_map<sstring, std::vector<gms::inet_address>> _endpoints;
|
||||
|
||||
public:
|
||||
endpoint_filter(sstring, std::unordered_map<sstring, std::vector<gms::inet_address>>);
|
||||
/**
|
||||
* @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
|
||||
*/
|
||||
std::vector<gms::inet_address> filter() const;
|
||||
};
|
||||
};
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user