storage_proxy: cleanup commented origin code
Remove code that was already reimplemented. Makes file navigation much easier.
This commit is contained in:
@@ -261,62 +261,8 @@ storage_proxy::storage_proxy(distributed<database>& db) : _db(db) {
|
||||
storage_proxy::rh_entry::rh_entry(std::unique_ptr<abstract_write_response_handler>&& h, std::function<void()>&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {}
|
||||
|
||||
#if 0
|
||||
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
|
||||
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
|
||||
static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
|
||||
|
||||
public static final String UNREACHABLE = "UNREACHABLE";
|
||||
|
||||
private static final WritePerformer standardWritePerformer;
|
||||
private static final WritePerformer counterWritePerformer;
|
||||
private static final WritePerformer counterWriteOnCoordinatorPerformer;
|
||||
|
||||
public static final StorageProxy instance = new StorageProxy();
|
||||
|
||||
private static volatile int maxHintsInProgress = 128 * FBUtilities.getAvailableProcessors();
|
||||
private static final CacheLoader<InetAddress, AtomicInteger> hintsInProgress = new CacheLoader<InetAddress, AtomicInteger>()
|
||||
{
|
||||
public AtomicInteger load(InetAddress inetAddress)
|
||||
{
|
||||
return new AtomicInteger(0);
|
||||
}
|
||||
};
|
||||
private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read");
|
||||
private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
|
||||
private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
|
||||
private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite");
|
||||
private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
|
||||
|
||||
private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
|
||||
|
||||
private StorageProxy() {}
|
||||
|
||||
static
|
||||
{
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
try
|
||||
{
|
||||
mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
standardWritePerformer = new WritePerformer()
|
||||
{
|
||||
public void apply(IMutation mutation,
|
||||
Iterable<InetAddress> targets,
|
||||
AbstractWriteResponseHandler responseHandler,
|
||||
String localDataCenter,
|
||||
ConsistencyLevel consistency_level)
|
||||
throws OverloadedException
|
||||
{
|
||||
assert mutation instanceof Mutation;
|
||||
sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter);
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or
|
||||
* in CounterMutationVerbHandler on a replica othewise. The write must be executed on the COUNTER_MUTATION stage
|
||||
@@ -986,166 +932,6 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
|
||||
throws WriteTimeoutException
|
||||
{
|
||||
AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
|
||||
Collections.<InetAddress>emptyList(),
|
||||
ConsistencyLevel.ONE,
|
||||
Keyspace.open(SystemKeyspace.NAME),
|
||||
null,
|
||||
WriteType.BATCH_LOG);
|
||||
|
||||
MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
|
||||
.createMessage();
|
||||
for (InetAddress target : endpoints)
|
||||
{
|
||||
int targetVersion = MessagingService.instance().getVersion(target);
|
||||
if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
|
||||
{
|
||||
insertLocal(message.payload, handler);
|
||||
}
|
||||
else if (targetVersion == MessagingService.current_version)
|
||||
{
|
||||
MessagingService.instance().sendRR(message, target, handler, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
|
||||
.createMessage(),
|
||||
target,
|
||||
handler,
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
handler.get();
|
||||
}
|
||||
|
||||
private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
|
||||
{
|
||||
AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
|
||||
Collections.<InetAddress>emptyList(),
|
||||
ConsistencyLevel.ANY,
|
||||
Keyspace.open(SystemKeyspace.NAME),
|
||||
null,
|
||||
WriteType.SIMPLE);
|
||||
Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid));
|
||||
mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
|
||||
MessageOut<Mutation> message = mutation.createMessage();
|
||||
for (InetAddress target : endpoints)
|
||||
{
|
||||
if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
|
||||
insertLocal(message.payload, handler);
|
||||
else
|
||||
MessagingService.instance().sendRR(message, target, handler, false);
|
||||
}
|
||||
}
|
||||
|
||||
private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter)
|
||||
throws WriteTimeoutException, OverloadedException
|
||||
{
|
||||
for (WriteResponseHandlerWrapper wrapper : wrappers)
|
||||
{
|
||||
Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
|
||||
sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter);
|
||||
}
|
||||
|
||||
for (WriteResponseHandlerWrapper wrapper : wrappers)
|
||||
wrapper.handler.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the write of a mutation given a WritePerformer.
|
||||
* Gather the list of write endpoints, apply locally and/or forward the mutation to
|
||||
* said write endpoint (deletaged to the actual WritePerformer) and wait for the
|
||||
* responses based on consistency level.
|
||||
*
|
||||
* @param mutation the mutation to be applied
|
||||
* @param consistency_level the consistency level for the write operation
|
||||
* @param performer the WritePerformer in charge of appliying the mutation
|
||||
* given the list of write endpoints (either standardWritePerformer for
|
||||
* standard writes or counterWritePerformer for counter writes).
|
||||
* @param callback an optional callback to be run if and when the write is
|
||||
* successful.
|
||||
*/
|
||||
public static AbstractWriteResponseHandler performWrite(IMutation mutation,
|
||||
ConsistencyLevel consistency_level,
|
||||
String localDataCenter,
|
||||
WritePerformer performer,
|
||||
Runnable callback,
|
||||
WriteType writeType)
|
||||
throws UnavailableException, OverloadedException
|
||||
{
|
||||
String keyspaceName = mutation.getKeyspaceName();
|
||||
AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
|
||||
|
||||
Token tk = StorageService.getPartitioner().getToken(mutation.key());
|
||||
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
|
||||
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
|
||||
|
||||
AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
|
||||
|
||||
// exit early if we can't fulfill the CL at this time
|
||||
responseHandler.assureSufficientLiveNodes();
|
||||
|
||||
performer.apply(mutation, Iterables.concat(naturalEndpoints, pendingEndpoints), responseHandler, localDataCenter, consistency_level);
|
||||
return responseHandler;
|
||||
}
|
||||
|
||||
// same as above except does not initiate writes (but does perform availability checks).
|
||||
private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, WriteType writeType)
|
||||
{
|
||||
AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
|
||||
String keyspaceName = mutation.getKeyspaceName();
|
||||
Token tk = StorageService.getPartitioner().getToken(mutation.key());
|
||||
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
|
||||
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
|
||||
AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
|
||||
return new WriteResponseHandlerWrapper(responseHandler, mutation);
|
||||
}
|
||||
|
||||
// used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints.
|
||||
private static class WriteResponseHandlerWrapper
|
||||
{
|
||||
final AbstractWriteResponseHandler handler;
|
||||
final Mutation mutation;
|
||||
|
||||
WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, Mutation mutation)
|
||||
{
|
||||
this.handler = handler;
|
||||
this.mutation = mutation;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Replicas are picked manually:
|
||||
* - replicas should be alive according to the failure detector
|
||||
* - replicas should be in the local datacenter
|
||||
* - choose min(2, number of qualifying candiates above)
|
||||
* - allow the local node to be the only replica only if it's a single-node DC
|
||||
*/
|
||||
private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
|
||||
throws UnavailableException
|
||||
{
|
||||
TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
|
||||
Multimap<String, InetAddress> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
|
||||
String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
|
||||
|
||||
Collection<InetAddress> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
|
||||
if (chosenEndpoints.isEmpty())
|
||||
{
|
||||
if (consistencyLevel == ConsistencyLevel.ANY)
|
||||
return Collections.singleton(FBUtilities.getBroadcastAddress());
|
||||
|
||||
throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
|
||||
}
|
||||
|
||||
return chosenEndpoints;
|
||||
}
|
||||
#endif
|
||||
|
||||
bool storage_proxy::cannot_hint(gms::inet_address target) {
|
||||
return _total_hints_in_progress > _max_hints_in_progress
|
||||
&& (get_hints_in_progress_for(target) > 0 && should_hint(target));
|
||||
@@ -1280,58 +1066,6 @@ bool storage_proxy::submit_hint(lw_shared_ptr<const frozen_mutation> m, gms::ine
|
||||
StorageMetrics.totalHints.inc();
|
||||
}
|
||||
|
||||
private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
|
||||
{
|
||||
Iterator<InetAddress> iter = targets.iterator();
|
||||
InetAddress target = iter.next();
|
||||
|
||||
// Add the other destinations of the same message as a FORWARD_HEADER entry
|
||||
DataOutputBuffer out = new DataOutputBuffer();
|
||||
try
|
||||
{
|
||||
out.writeInt(targets.size() - 1);
|
||||
while (iter.hasNext())
|
||||
{
|
||||
InetAddress destination = iter.next();
|
||||
CompactEndpointSerializationHelper.serialize(destination, out);
|
||||
int id = MessagingService.instance().addCallback(handler,
|
||||
message,
|
||||
destination,
|
||||
message.getTimeout(),
|
||||
handler.consistencyLevel,
|
||||
true);
|
||||
out.writeInt(id);
|
||||
logger.trace("Adding FWD message to {}@{}", id, destination);
|
||||
}
|
||||
message = message.withParameter(Mutation.FORWARD_TO, out.getData());
|
||||
// send the combined message + forward headers
|
||||
int id = MessagingService.instance().sendRR(message, target, handler, true);
|
||||
logger.trace("Sending message to {}@{}", id, target);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
// DataOutputBuffer is in-memory, doesn't throw IOException
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
|
||||
{
|
||||
|
||||
StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
|
||||
{
|
||||
public void runMayThrow()
|
||||
{
|
||||
IMutation processed = SinkManager.processWriteRequest(mutation);
|
||||
if (processed != null)
|
||||
{
|
||||
((Mutation) processed).apply();
|
||||
responseHandler.response(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle counter mutation on the coordinator host.
|
||||
*
|
||||
@@ -2093,33 +1827,6 @@ storage_proxy::do_query(schema_ptr s,
|
||||
}
|
||||
|
||||
#if 0
|
||||
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
|
||||
throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException
|
||||
{
|
||||
// When using serial CL, the ClientState should be provided
|
||||
assert !consistencyLevel.isSerialConsistency();
|
||||
return read(commands, consistencyLevel, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the actual reading of a row out of the StorageService, fetching
|
||||
* a specific set of column names from a given column family.
|
||||
*/
|
||||
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
|
||||
throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException
|
||||
{
|
||||
if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands))
|
||||
{
|
||||
readMetrics.unavailables.mark();
|
||||
ClientRequestMetrics.readUnavailables.inc();
|
||||
throw new IsBootstrappingException();
|
||||
}
|
||||
|
||||
return consistencyLevel.isSerialConsistency()
|
||||
? readWithPaxos(commands, consistencyLevel, state)
|
||||
: readRegular(commands, consistencyLevel);
|
||||
}
|
||||
|
||||
private static List<Row> readWithPaxos(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
|
||||
throws InvalidRequestException, UnavailableException, ReadTimeoutException
|
||||
{
|
||||
@@ -2183,263 +1890,6 @@ storage_proxy::do_query(schema_ptr s,
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
|
||||
throws UnavailableException, ReadTimeoutException
|
||||
{
|
||||
long start = System.nanoTime();
|
||||
List<Row> rows = null;
|
||||
|
||||
try
|
||||
{
|
||||
rows = fetchRows(commands, consistencyLevel);
|
||||
}
|
||||
catch (UnavailableException e)
|
||||
{
|
||||
readMetrics.unavailables.mark();
|
||||
ClientRequestMetrics.readUnavailables.inc();
|
||||
throw e;
|
||||
}
|
||||
catch (ReadTimeoutException e)
|
||||
{
|
||||
readMetrics.timeouts.mark();
|
||||
ClientRequestMetrics.readTimeouts.inc();
|
||||
throw e;
|
||||
}
|
||||
finally
|
||||
{
|
||||
long latency = System.nanoTime() - start;
|
||||
readMetrics.addNano(latency);
|
||||
// TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329
|
||||
for (ReadCommand command : commands)
|
||||
Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function executes local and remote reads, and blocks for the results:
|
||||
*
|
||||
* 1. Get the replica locations, sorted by response time according to the snitch
|
||||
* 2. Send a data request to the closest replica, and digest requests to either
|
||||
* a) all the replicas, if read repair is enabled
|
||||
* b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel
|
||||
* 3. Wait for a response from R replicas
|
||||
* 4. If the digests (if any) match the data return the data
|
||||
* 5. else carry out read repair by getting data from all the nodes.
|
||||
*/
|
||||
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
|
||||
throws UnavailableException, ReadTimeoutException
|
||||
{
|
||||
List<Row> rows = new ArrayList<>(initialCommands.size());
|
||||
// (avoid allocating a new list in the common case of nothing-to-retry)
|
||||
List<ReadCommand> commandsToRetry = Collections.emptyList();
|
||||
|
||||
do
|
||||
{
|
||||
List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
|
||||
AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
|
||||
|
||||
if (!commandsToRetry.isEmpty())
|
||||
Tracing.trace("Retrying {} commands", commandsToRetry.size());
|
||||
|
||||
// send out read requests
|
||||
for (int i = 0; i < commands.size(); i++)
|
||||
{
|
||||
ReadCommand command = commands.get(i);
|
||||
assert !command.isDigestQuery();
|
||||
|
||||
AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
|
||||
exec.executeAsync();
|
||||
readExecutors[i] = exec;
|
||||
}
|
||||
|
||||
for (AbstractReadExecutor exec : readExecutors)
|
||||
exec.maybeTryAdditionalReplicas();
|
||||
|
||||
// read results and make a second pass for any digest mismatches
|
||||
List<ReadCommand> repairCommands = null;
|
||||
List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
|
||||
for (AbstractReadExecutor exec: readExecutors)
|
||||
{
|
||||
try
|
||||
{
|
||||
Row row = exec.get();
|
||||
if (row != null)
|
||||
{
|
||||
exec.command.maybeTrim(row);
|
||||
rows.add(row);
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
|
||||
}
|
||||
catch (ReadTimeoutException ex)
|
||||
{
|
||||
int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
|
||||
int responseCount = exec.handler.getReceivedCount();
|
||||
String gotData = responseCount > 0
|
||||
? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
|
||||
: "";
|
||||
|
||||
if (Tracing.isTracing())
|
||||
{
|
||||
Tracing.trace("Timed out; received {} of {} responses{}",
|
||||
new Object[]{ responseCount, blockFor, gotData });
|
||||
}
|
||||
else if (logger.isDebugEnabled())
|
||||
{
|
||||
logger.debug("Read timeout; received {} of {} responses{}", responseCount, blockFor, gotData);
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
catch (DigestMismatchException ex)
|
||||
{
|
||||
Tracing.trace("Digest mismatch: {}", ex);
|
||||
|
||||
ReadRepairMetrics.repairedBlocking.mark();
|
||||
|
||||
// Do a full data read to resolve the correct response (and repair node that need be)
|
||||
RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size());
|
||||
ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver,
|
||||
ConsistencyLevel.ALL,
|
||||
exec.getContactedReplicas().size(),
|
||||
exec.command,
|
||||
Keyspace.open(exec.command.getKeyspace()),
|
||||
exec.handler.endpoints);
|
||||
|
||||
if (repairCommands == null)
|
||||
{
|
||||
repairCommands = new ArrayList<>();
|
||||
repairResponseHandlers = new ArrayList<>();
|
||||
}
|
||||
repairCommands.add(exec.command);
|
||||
repairResponseHandlers.add(repairHandler);
|
||||
|
||||
MessageOut<ReadCommand> message = exec.command.createMessage();
|
||||
for (InetAddress endpoint : exec.getContactedReplicas())
|
||||
{
|
||||
Tracing.trace("Enqueuing full data read to {}", endpoint);
|
||||
MessagingService.instance().sendRR(message, endpoint, repairHandler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
commandsToRetry.clear();
|
||||
|
||||
// read the results for the digest mismatch retries
|
||||
if (repairResponseHandlers != null)
|
||||
{
|
||||
for (int i = 0; i < repairCommands.size(); i++)
|
||||
{
|
||||
ReadCommand command = repairCommands.get(i);
|
||||
ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i);
|
||||
|
||||
Row row;
|
||||
try
|
||||
{
|
||||
row = handler.get();
|
||||
}
|
||||
catch (DigestMismatchException e)
|
||||
{
|
||||
throw new AssertionError(e); // full data requested from each node here, no digests should be sent
|
||||
}
|
||||
catch (ReadTimeoutException e)
|
||||
{
|
||||
if (Tracing.isTracing())
|
||||
Tracing.trace("Timed out waiting on digest mismatch repair requests");
|
||||
else
|
||||
logger.debug("Timed out waiting on digest mismatch repair requests");
|
||||
// the caught exception here will have CL.ALL from the repair command,
|
||||
// not whatever CL the initial command was at (CASSANDRA-7947)
|
||||
int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
|
||||
throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
|
||||
}
|
||||
|
||||
RowDataResolver resolver = (RowDataResolver)handler.resolver;
|
||||
try
|
||||
{
|
||||
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
|
||||
// behind on writes in case the out-of-sync row is read multiple times in quick succession
|
||||
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
|
||||
}
|
||||
catch (TimeoutException e)
|
||||
{
|
||||
if (Tracing.isTracing())
|
||||
Tracing.trace("Timed out waiting on digest mismatch repair acknowledgements");
|
||||
else
|
||||
logger.debug("Timed out waiting on digest mismatch repair acknowledgements");
|
||||
int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
|
||||
throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
|
||||
}
|
||||
|
||||
// retry any potential short reads
|
||||
ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
|
||||
if (retryCommand != null)
|
||||
{
|
||||
Tracing.trace("Issuing retry for read command");
|
||||
if (commandsToRetry == Collections.EMPTY_LIST)
|
||||
commandsToRetry = new ArrayList<>();
|
||||
commandsToRetry.add(retryCommand);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (row != null)
|
||||
{
|
||||
command.maybeTrim(row);
|
||||
rows.add(row);
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (!commandsToRetry.isEmpty());
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
static class LocalReadRunnable extends DroppableRunnable
|
||||
{
|
||||
private final ReadCommand command;
|
||||
private final ReadCallback<ReadResponse, Row> handler;
|
||||
private final long start = System.nanoTime();
|
||||
|
||||
LocalReadRunnable(ReadCommand command, ReadCallback<ReadResponse, Row> handler)
|
||||
{
|
||||
super(MessagingService.Verb.READ);
|
||||
this.command = command;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
protected void runMayThrow()
|
||||
{
|
||||
Keyspace keyspace = Keyspace.open(command.ksName);
|
||||
Row r = command.getRow(keyspace);
|
||||
ReadResponse result = ReadVerbHandler.getResponse(command, r);
|
||||
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
|
||||
handler.response(result);
|
||||
}
|
||||
}
|
||||
|
||||
static class LocalRangeSliceRunnable extends DroppableRunnable
|
||||
{
|
||||
private final AbstractRangeCommand command;
|
||||
private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
|
||||
private final long start = System.nanoTime();
|
||||
|
||||
LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
|
||||
{
|
||||
super(MessagingService.Verb.RANGE_SLICE);
|
||||
this.command = command;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
protected void runMayThrow()
|
||||
{
|
||||
RangeSliceReply result = new RangeSliceReply(command.executeLocally());
|
||||
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
|
||||
handler.response(result);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) {
|
||||
@@ -2527,85 +1977,6 @@ float storage_proxy::estimate_result_rows_per_range(lw_shared_ptr<query::read_co
|
||||
else
|
||||
return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;
|
||||
}
|
||||
|
||||
public Map<String, List<String>> getSchemaVersions()
|
||||
{
|
||||
return describeSchemaVersions();
|
||||
}
|
||||
|
||||
/**
|
||||
* initiate a request/response session with each live node to check whether or not everybody is using the same
|
||||
* migration id. This is useful for determining if a schema change has propagated through the cluster. Disagreement
|
||||
* is assumed if any node fails to respond.
|
||||
*/
|
||||
public static Map<String, List<String>> describeSchemaVersions()
|
||||
{
|
||||
final String myVersion = Schema.instance.getVersion().toString();
|
||||
final Map<InetAddress, UUID> versions = new ConcurrentHashMap<InetAddress, UUID>();
|
||||
final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
|
||||
final CountDownLatch latch = new CountDownLatch(liveHosts.size());
|
||||
|
||||
IAsyncCallback<UUID> cb = new IAsyncCallback<UUID>()
|
||||
{
|
||||
public void response(MessageIn<UUID> message)
|
||||
{
|
||||
// record the response from the remote node.
|
||||
versions.put(message.from, message.payload);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
public boolean isLatencyForSnitch()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
};
|
||||
// an empty message acts as a request to the SchemaCheckVerbHandler.
|
||||
MessageOut message = new MessageOut(MessagingService.Verb.SCHEMA_CHECK);
|
||||
for (InetAddress endpoint : liveHosts)
|
||||
MessagingService.instance().sendRR(message, endpoint, cb);
|
||||
|
||||
try
|
||||
{
|
||||
// wait for as long as possible. timeout-1s if possible.
|
||||
latch.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (InterruptedException ex)
|
||||
{
|
||||
throw new AssertionError("This latch shouldn't have been interrupted.");
|
||||
}
|
||||
|
||||
// maps versions to hosts that are on that version.
|
||||
Map<String, List<String>> results = new HashMap<String, List<String>>();
|
||||
Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
|
||||
for (InetAddress host : allHosts)
|
||||
{
|
||||
UUID version = versions.get(host);
|
||||
String stringVersion = version == null ? UNREACHABLE : version.toString();
|
||||
List<String> hosts = results.get(stringVersion);
|
||||
if (hosts == null)
|
||||
{
|
||||
hosts = new ArrayList<String>();
|
||||
results.put(stringVersion, hosts);
|
||||
}
|
||||
hosts.add(host.getHostAddress());
|
||||
}
|
||||
|
||||
// we're done: the results map is ready to return to the client. the rest is just debug logging:
|
||||
if (results.get(UNREACHABLE) != null)
|
||||
logger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", StringUtils.join(results.get(UNREACHABLE), ","));
|
||||
for (Map.Entry<String, List<String>> entry : results.entrySet())
|
||||
{
|
||||
// check for version disagreement. log the hosts that don't agree.
|
||||
if (entry.getKey().equals(UNREACHABLE) || entry.getKey().equals(myVersion))
|
||||
continue;
|
||||
for (String host : entry.getValue())
|
||||
logger.debug("{} disagrees ({})", host, entry.getKey());
|
||||
}
|
||||
if (results.size() == 1)
|
||||
logger.debug("Schemas are in agreement.");
|
||||
|
||||
return results;
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
@@ -2669,113 +2040,6 @@ storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::parti
|
||||
return ranges;
|
||||
}
|
||||
|
||||
#if 0
|
||||
public long getReadOperations()
|
||||
{
|
||||
return readMetrics.latency.count();
|
||||
}
|
||||
|
||||
public long getTotalReadLatencyMicros()
|
||||
{
|
||||
return readMetrics.totalLatency.count();
|
||||
}
|
||||
|
||||
public double getRecentReadLatencyMicros()
|
||||
{
|
||||
return readMetrics.getRecentLatency();
|
||||
}
|
||||
|
||||
public long[] getTotalReadLatencyHistogramMicros()
|
||||
{
|
||||
return readMetrics.totalLatencyHistogram.getBuckets(false);
|
||||
}
|
||||
|
||||
public long[] getRecentReadLatencyHistogramMicros()
|
||||
{
|
||||
return readMetrics.recentLatencyHistogram.getBuckets(true);
|
||||
}
|
||||
|
||||
public long getRangeOperations()
|
||||
{
|
||||
return rangeMetrics.latency.count();
|
||||
}
|
||||
|
||||
public long getTotalRangeLatencyMicros()
|
||||
{
|
||||
return rangeMetrics.totalLatency.count();
|
||||
}
|
||||
|
||||
public double getRecentRangeLatencyMicros()
|
||||
{
|
||||
return rangeMetrics.getRecentLatency();
|
||||
}
|
||||
|
||||
public long[] getTotalRangeLatencyHistogramMicros()
|
||||
{
|
||||
return rangeMetrics.totalLatencyHistogram.getBuckets(false);
|
||||
}
|
||||
|
||||
public long[] getRecentRangeLatencyHistogramMicros()
|
||||
{
|
||||
return rangeMetrics.recentLatencyHistogram.getBuckets(true);
|
||||
}
|
||||
|
||||
public long getWriteOperations()
|
||||
{
|
||||
return writeMetrics.latency.count();
|
||||
}
|
||||
|
||||
public long getTotalWriteLatencyMicros()
|
||||
{
|
||||
return writeMetrics.totalLatency.count();
|
||||
}
|
||||
|
||||
public double getRecentWriteLatencyMicros()
|
||||
{
|
||||
return writeMetrics.getRecentLatency();
|
||||
}
|
||||
|
||||
public long[] getTotalWriteLatencyHistogramMicros()
|
||||
{
|
||||
return writeMetrics.totalLatencyHistogram.getBuckets(false);
|
||||
}
|
||||
|
||||
public long[] getRecentWriteLatencyHistogramMicros()
|
||||
{
|
||||
return writeMetrics.recentLatencyHistogram.getBuckets(true);
|
||||
}
|
||||
|
||||
public boolean getHintedHandoffEnabled()
|
||||
{
|
||||
return DatabaseDescriptor.hintedHandoffEnabled();
|
||||
}
|
||||
|
||||
public Set<String> getHintedHandoffEnabledByDC()
|
||||
{
|
||||
return DatabaseDescriptor.hintedHandoffEnabledByDC();
|
||||
}
|
||||
|
||||
public void setHintedHandoffEnabled(boolean b)
|
||||
{
|
||||
DatabaseDescriptor.setHintedHandoffEnabled(b);
|
||||
}
|
||||
|
||||
public void setHintedHandoffEnabledByDCList(String dcNames)
|
||||
{
|
||||
DatabaseDescriptor.setHintedHandoffEnabled(dcNames);
|
||||
}
|
||||
|
||||
public int getMaxHintWindow()
|
||||
{
|
||||
return DatabaseDescriptor.getMaxHintWindow();
|
||||
}
|
||||
|
||||
public void setMaxHintWindow(int ms)
|
||||
{
|
||||
DatabaseDescriptor.setMaxHintWindow(ms);
|
||||
}
|
||||
#endif
|
||||
|
||||
bool storage_proxy::should_hint(gms::inet_address ep) {
|
||||
if (is_me(ep)) { // do not hint to local address
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user