diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index ee7b883d39..049727508b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -261,62 +261,8 @@ storage_proxy::storage_proxy(distributed& db) : _db(db) { storage_proxy::rh_entry::rh_entry(std::unique_ptr&& h, std::function&& cb) : handler(std::move(h)), expire_timer(std::move(cb)) {} #if 0 - public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy"; - private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class); - static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node - - public static final String UNREACHABLE = "UNREACHABLE"; - - private static final WritePerformer standardWritePerformer; - private static final WritePerformer counterWritePerformer; - private static final WritePerformer counterWriteOnCoordinatorPerformer; - - public static final StorageProxy instance = new StorageProxy(); - - private static volatile int maxHintsInProgress = 128 * FBUtilities.getAvailableProcessors(); - private static final CacheLoader hintsInProgress = new CacheLoader() - { - public AtomicInteger load(InetAddress inetAddress) - { - return new AtomicInteger(0); - } - }; - private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read"); - private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice"); - private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write"); - private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite"); - private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead"); - - private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10; - - private StorageProxy() {} - static { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(instance, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - standardWritePerformer = new WritePerformer() - { - public void apply(IMutation mutation, - Iterable targets, - AbstractWriteResponseHandler responseHandler, - String localDataCenter, - ConsistencyLevel consistency_level) - throws OverloadedException - { - assert mutation instanceof Mutation; - sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter); - } - }; - /* * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the COUNTER_MUTATION stage @@ -986,166 +932,6 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc } } -#if 0 - private static void syncWriteToBatchlog(Collection mutations, Collection endpoints, UUID uuid) - throws WriteTimeoutException - { - AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, - Collections.emptyList(), - ConsistencyLevel.ONE, - Keyspace.open(SystemKeyspace.NAME), - null, - WriteType.BATCH_LOG); - - MessageOut message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version) - .createMessage(); - for (InetAddress target : endpoints) - { - int targetVersion = MessagingService.instance().getVersion(target); - if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) - { - insertLocal(message.payload, handler); - } - else if (targetVersion == MessagingService.current_version) - { - MessagingService.instance().sendRR(message, target, handler, false); - } - else - { - MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion) - .createMessage(), - target, - handler, - false); - } - } - - handler.get(); - } - - private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) - { - AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, - Collections.emptyList(), - ConsistencyLevel.ANY, - Keyspace.open(SystemKeyspace.NAME), - null, - WriteType.SIMPLE); - Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid)); - mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); - MessageOut message = mutation.createMessage(); - for (InetAddress target : endpoints) - { - if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) - insertLocal(message.payload, handler); - else - MessagingService.instance().sendRR(message, target, handler, false); - } - } - - private static void syncWriteBatchedMutations(List wrappers, String localDataCenter) - throws WriteTimeoutException, OverloadedException - { - for (WriteResponseHandlerWrapper wrapper : wrappers) - { - Iterable endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); - sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter); - } - - for (WriteResponseHandlerWrapper wrapper : wrappers) - wrapper.handler.get(); - } - - /** - * Perform the write of a mutation given a WritePerformer. - * Gather the list of write endpoints, apply locally and/or forward the mutation to - * said write endpoint (deletaged to the actual WritePerformer) and wait for the - * responses based on consistency level. - * - * @param mutation the mutation to be applied - * @param consistency_level the consistency level for the write operation - * @param performer the WritePerformer in charge of appliying the mutation - * given the list of write endpoints (either standardWritePerformer for - * standard writes or counterWritePerformer for counter writes). - * @param callback an optional callback to be run if and when the write is - * successful. - */ - public static AbstractWriteResponseHandler performWrite(IMutation mutation, - ConsistencyLevel consistency_level, - String localDataCenter, - WritePerformer performer, - Runnable callback, - WriteType writeType) - throws UnavailableException, OverloadedException - { - String keyspaceName = mutation.getKeyspaceName(); - AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); - - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - List naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType); - - // exit early if we can't fulfill the CL at this time - responseHandler.assureSufficientLiveNodes(); - - performer.apply(mutation, Iterables.concat(naturalEndpoints, pendingEndpoints), responseHandler, localDataCenter, consistency_level); - return responseHandler; - } - - // same as above except does not initiate writes (but does perform availability checks). - private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, WriteType writeType) - { - AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy(); - String keyspaceName = mutation.getKeyspaceName(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - List naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); - return new WriteResponseHandlerWrapper(responseHandler, mutation); - } - - // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints. - private static class WriteResponseHandlerWrapper - { - final AbstractWriteResponseHandler handler; - final Mutation mutation; - - WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, Mutation mutation) - { - this.handler = handler; - this.mutation = mutation; - } - } - - /* - * Replicas are picked manually: - * - replicas should be alive according to the failure detector - * - replicas should be in the local datacenter - * - choose min(2, number of qualifying candiates above) - * - allow the local node to be the only replica only if it's a single-node DC - */ - private static Collection getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) - throws UnavailableException - { - TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); - Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); - String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress()); - - Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); - if (chosenEndpoints.isEmpty()) - { - if (consistencyLevel == ConsistencyLevel.ANY) - return Collections.singleton(FBUtilities.getBroadcastAddress()); - - throw new UnavailableException(ConsistencyLevel.ONE, 1, 0); - } - - return chosenEndpoints; - } -#endif - bool storage_proxy::cannot_hint(gms::inet_address target) { return _total_hints_in_progress > _max_hints_in_progress && (get_hints_in_progress_for(target) > 0 && should_hint(target)); @@ -1280,58 +1066,6 @@ bool storage_proxy::submit_hint(lw_shared_ptr m, gms::ine StorageMetrics.totalHints.inc(); } - private static void sendMessagesToNonlocalDC(MessageOut message, Collection targets, AbstractWriteResponseHandler handler) - { - Iterator iter = targets.iterator(); - InetAddress target = iter.next(); - - // Add the other destinations of the same message as a FORWARD_HEADER entry - DataOutputBuffer out = new DataOutputBuffer(); - try - { - out.writeInt(targets.size() - 1); - while (iter.hasNext()) - { - InetAddress destination = iter.next(); - CompactEndpointSerializationHelper.serialize(destination, out); - int id = MessagingService.instance().addCallback(handler, - message, - destination, - message.getTimeout(), - handler.consistencyLevel, - true); - out.writeInt(id); - logger.trace("Adding FWD message to {}@{}", id, destination); - } - message = message.withParameter(Mutation.FORWARD_TO, out.getData()); - // send the combined message + forward headers - int id = MessagingService.instance().sendRR(message, target, handler, true); - logger.trace("Sending message to {}@{}", id, target); - } - catch (IOException e) - { - // DataOutputBuffer is in-memory, doesn't throw IOException - throw new AssertionError(e); - } - } - - private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler) - { - - StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable() - { - public void runMayThrow() - { - IMutation processed = SinkManager.processWriteRequest(mutation); - if (processed != null) - { - ((Mutation) processed).apply(); - responseHandler.response(null); - } - } - }); - } - /** * Handle counter mutation on the coordinator host. * @@ -2093,33 +1827,6 @@ storage_proxy::do_query(schema_ptr s, } #if 0 - public static List read(List commands, ConsistencyLevel consistencyLevel) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException - { - // When using serial CL, the ClientState should be provided - assert !consistencyLevel.isSerialConsistency(); - return read(commands, consistencyLevel, null); - } - - /** - * Performs the actual reading of a row out of the StorageService, fetching - * a specific set of column names from a given column family. - */ - public static List read(List commands, ConsistencyLevel consistencyLevel, ClientState state) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException - { - if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands)) - { - readMetrics.unavailables.mark(); - ClientRequestMetrics.readUnavailables.inc(); - throw new IsBootstrappingException(); - } - - return consistencyLevel.isSerialConsistency() - ? readWithPaxos(commands, consistencyLevel, state) - : readRegular(commands, consistencyLevel); - } - private static List readWithPaxos(List commands, ConsistencyLevel consistencyLevel, ClientState state) throws InvalidRequestException, UnavailableException, ReadTimeoutException { @@ -2183,263 +1890,6 @@ storage_proxy::do_query(schema_ptr s, return rows; } - - private static List readRegular(List commands, ConsistencyLevel consistencyLevel) - throws UnavailableException, ReadTimeoutException - { - long start = System.nanoTime(); - List rows = null; - - try - { - rows = fetchRows(commands, consistencyLevel); - } - catch (UnavailableException e) - { - readMetrics.unavailables.mark(); - ClientRequestMetrics.readUnavailables.inc(); - throw e; - } - catch (ReadTimeoutException e) - { - readMetrics.timeouts.mark(); - ClientRequestMetrics.readTimeouts.inc(); - throw e; - } - finally - { - long latency = System.nanoTime() - start; - readMetrics.addNano(latency); - // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 - for (ReadCommand command : commands) - Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); - } - - return rows; - } - - /** - * This function executes local and remote reads, and blocks for the results: - * - * 1. Get the replica locations, sorted by response time according to the snitch - * 2. Send a data request to the closest replica, and digest requests to either - * a) all the replicas, if read repair is enabled - * b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel - * 3. Wait for a response from R replicas - * 4. If the digests (if any) match the data return the data - * 5. else carry out read repair by getting data from all the nodes. - */ - private static List fetchRows(List initialCommands, ConsistencyLevel consistencyLevel) - throws UnavailableException, ReadTimeoutException - { - List rows = new ArrayList<>(initialCommands.size()); - // (avoid allocating a new list in the common case of nothing-to-retry) - List commandsToRetry = Collections.emptyList(); - - do - { - List commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry; - AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()]; - - if (!commandsToRetry.isEmpty()) - Tracing.trace("Retrying {} commands", commandsToRetry.size()); - - // send out read requests - for (int i = 0; i < commands.size(); i++) - { - ReadCommand command = commands.get(i); - assert !command.isDigestQuery(); - - AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel); - exec.executeAsync(); - readExecutors[i] = exec; - } - - for (AbstractReadExecutor exec : readExecutors) - exec.maybeTryAdditionalReplicas(); - - // read results and make a second pass for any digest mismatches - List repairCommands = null; - List> repairResponseHandlers = null; - for (AbstractReadExecutor exec: readExecutors) - { - try - { - Row row = exec.get(); - if (row != null) - { - exec.command.maybeTrim(row); - rows.add(row); - } - - if (logger.isDebugEnabled()) - logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start)); - } - catch (ReadTimeoutException ex) - { - int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace())); - int responseCount = exec.handler.getReceivedCount(); - String gotData = responseCount > 0 - ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - if (Tracing.isTracing()) - { - Tracing.trace("Timed out; received {} of {} responses{}", - new Object[]{ responseCount, blockFor, gotData }); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Read timeout; received {} of {} responses{}", responseCount, blockFor, gotData); - } - throw ex; - } - catch (DigestMismatchException ex) - { - Tracing.trace("Digest mismatch: {}", ex); - - ReadRepairMetrics.repairedBlocking.mark(); - - // Do a full data read to resolve the correct response (and repair node that need be) - RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size()); - ReadCallback repairHandler = new ReadCallback<>(resolver, - ConsistencyLevel.ALL, - exec.getContactedReplicas().size(), - exec.command, - Keyspace.open(exec.command.getKeyspace()), - exec.handler.endpoints); - - if (repairCommands == null) - { - repairCommands = new ArrayList<>(); - repairResponseHandlers = new ArrayList<>(); - } - repairCommands.add(exec.command); - repairResponseHandlers.add(repairHandler); - - MessageOut message = exec.command.createMessage(); - for (InetAddress endpoint : exec.getContactedReplicas()) - { - Tracing.trace("Enqueuing full data read to {}", endpoint); - MessagingService.instance().sendRR(message, endpoint, repairHandler); - } - } - } - - commandsToRetry.clear(); - - // read the results for the digest mismatch retries - if (repairResponseHandlers != null) - { - for (int i = 0; i < repairCommands.size(); i++) - { - ReadCommand command = repairCommands.get(i); - ReadCallback handler = repairResponseHandlers.get(i); - - Row row; - try - { - row = handler.get(); - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // full data requested from each node here, no digests should be sent - } - catch (ReadTimeoutException e) - { - if (Tracing.isTracing()) - Tracing.trace("Timed out waiting on digest mismatch repair requests"); - else - logger.debug("Timed out waiting on digest mismatch repair requests"); - // the caught exception here will have CL.ALL from the repair command, - // not whatever CL the initial command was at (CASSANDRA-7947) - int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())); - throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true); - } - - RowDataResolver resolver = (RowDataResolver)handler.resolver; - try - { - // wait for the repair writes to be acknowledged, to minimize impact on any replica that's - // behind on writes in case the out-of-sync row is read multiple times in quick succession - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException e) - { - if (Tracing.isTracing()) - Tracing.trace("Timed out waiting on digest mismatch repair acknowledgements"); - else - logger.debug("Timed out waiting on digest mismatch repair acknowledgements"); - int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())); - throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true); - } - - // retry any potential short reads - ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row); - if (retryCommand != null) - { - Tracing.trace("Issuing retry for read command"); - if (commandsToRetry == Collections.EMPTY_LIST) - commandsToRetry = new ArrayList<>(); - commandsToRetry.add(retryCommand); - continue; - } - - if (row != null) - { - command.maybeTrim(row); - rows.add(row); - } - } - } - } while (!commandsToRetry.isEmpty()); - - return rows; - } - - static class LocalReadRunnable extends DroppableRunnable - { - private final ReadCommand command; - private final ReadCallback handler; - private final long start = System.nanoTime(); - - LocalReadRunnable(ReadCommand command, ReadCallback handler) - { - super(MessagingService.Verb.READ); - this.command = command; - this.handler = handler; - } - - protected void runMayThrow() - { - Keyspace keyspace = Keyspace.open(command.ksName); - Row r = command.getRow(keyspace); - ReadResponse result = ReadVerbHandler.getResponse(command, r); - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - handler.response(result); - } - } - - static class LocalRangeSliceRunnable extends DroppableRunnable - { - private final AbstractRangeCommand command; - private final ReadCallback> handler; - private final long start = System.nanoTime(); - - LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback> handler) - { - super(MessagingService.Verb.RANGE_SLICE); - this.command = command; - this.handler = handler; - } - - protected void runMayThrow() - { - RangeSliceReply result = new RangeSliceReply(command.executeLocally()); - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - handler.response(result); - } - } #endif std::vector storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) { @@ -2527,85 +1977,6 @@ float storage_proxy::estimate_result_rows_per_range(lw_shared_ptr command.limit() ? rows.subList(0, command.limit()) : rows; } - - public Map> getSchemaVersions() - { - return describeSchemaVersions(); - } - - /** - * initiate a request/response session with each live node to check whether or not everybody is using the same - * migration id. This is useful for determining if a schema change has propagated through the cluster. Disagreement - * is assumed if any node fails to respond. - */ - public static Map> describeSchemaVersions() - { - final String myVersion = Schema.instance.getVersion().toString(); - final Map versions = new ConcurrentHashMap(); - final Set liveHosts = Gossiper.instance.getLiveMembers(); - final CountDownLatch latch = new CountDownLatch(liveHosts.size()); - - IAsyncCallback cb = new IAsyncCallback() - { - public void response(MessageIn message) - { - // record the response from the remote node. - versions.put(message.from, message.payload); - latch.countDown(); - } - - public boolean isLatencyForSnitch() - { - return false; - } - }; - // an empty message acts as a request to the SchemaCheckVerbHandler. - MessageOut message = new MessageOut(MessagingService.Verb.SCHEMA_CHECK); - for (InetAddress endpoint : liveHosts) - MessagingService.instance().sendRR(message, endpoint, cb); - - try - { - // wait for as long as possible. timeout-1s if possible. - latch.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException ex) - { - throw new AssertionError("This latch shouldn't have been interrupted."); - } - - // maps versions to hosts that are on that version. - Map> results = new HashMap>(); - Iterable allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); - for (InetAddress host : allHosts) - { - UUID version = versions.get(host); - String stringVersion = version == null ? UNREACHABLE : version.toString(); - List hosts = results.get(stringVersion); - if (hosts == null) - { - hosts = new ArrayList(); - results.put(stringVersion, hosts); - } - hosts.add(host.getHostAddress()); - } - - // we're done: the results map is ready to return to the client. the rest is just debug logging: - if (results.get(UNREACHABLE) != null) - logger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", StringUtils.join(results.get(UNREACHABLE), ",")); - for (Map.Entry> entry : results.entrySet()) - { - // check for version disagreement. log the hosts that don't agree. - if (entry.getKey().equals(UNREACHABLE) || entry.getKey().equals(myVersion)) - continue; - for (String host : entry.getValue()) - logger.debug("{} disagrees ({})", host, entry.getKey()); - } - if (results.size() == 1) - logger.debug("Schemas are in agreement."); - - return results; - } #endif /** @@ -2669,113 +2040,6 @@ storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::parti return ranges; } -#if 0 - public long getReadOperations() - { - return readMetrics.latency.count(); - } - - public long getTotalReadLatencyMicros() - { - return readMetrics.totalLatency.count(); - } - - public double getRecentReadLatencyMicros() - { - return readMetrics.getRecentLatency(); - } - - public long[] getTotalReadLatencyHistogramMicros() - { - return readMetrics.totalLatencyHistogram.getBuckets(false); - } - - public long[] getRecentReadLatencyHistogramMicros() - { - return readMetrics.recentLatencyHistogram.getBuckets(true); - } - - public long getRangeOperations() - { - return rangeMetrics.latency.count(); - } - - public long getTotalRangeLatencyMicros() - { - return rangeMetrics.totalLatency.count(); - } - - public double getRecentRangeLatencyMicros() - { - return rangeMetrics.getRecentLatency(); - } - - public long[] getTotalRangeLatencyHistogramMicros() - { - return rangeMetrics.totalLatencyHistogram.getBuckets(false); - } - - public long[] getRecentRangeLatencyHistogramMicros() - { - return rangeMetrics.recentLatencyHistogram.getBuckets(true); - } - - public long getWriteOperations() - { - return writeMetrics.latency.count(); - } - - public long getTotalWriteLatencyMicros() - { - return writeMetrics.totalLatency.count(); - } - - public double getRecentWriteLatencyMicros() - { - return writeMetrics.getRecentLatency(); - } - - public long[] getTotalWriteLatencyHistogramMicros() - { - return writeMetrics.totalLatencyHistogram.getBuckets(false); - } - - public long[] getRecentWriteLatencyHistogramMicros() - { - return writeMetrics.recentLatencyHistogram.getBuckets(true); - } - - public boolean getHintedHandoffEnabled() - { - return DatabaseDescriptor.hintedHandoffEnabled(); - } - - public Set getHintedHandoffEnabledByDC() - { - return DatabaseDescriptor.hintedHandoffEnabledByDC(); - } - - public void setHintedHandoffEnabled(boolean b) - { - DatabaseDescriptor.setHintedHandoffEnabled(b); - } - - public void setHintedHandoffEnabledByDCList(String dcNames) - { - DatabaseDescriptor.setHintedHandoffEnabled(dcNames); - } - - public int getMaxHintWindow() - { - return DatabaseDescriptor.getMaxHintWindow(); - } - - public void setMaxHintWindow(int ms) - { - DatabaseDescriptor.setMaxHintWindow(ms); - } -#endif - bool storage_proxy::should_hint(gms::inet_address ep) { if (is_me(ep)) { // do not hint to local address return false;