From 806fc04b37e8ec3aa08c4a46a8b7da4488032018 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 24 Jun 2015 11:12:27 +0200 Subject: [PATCH 1/7] token_metadata.hh : implement get_all_endpoints + add number_of_endpoints() --- locator/token_metadata.hh | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 5bf5697c2f..86b2431398 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -835,6 +835,20 @@ public: } } +#endif + size_t number_of_endpoints() const { + return _endpoint_to_host_id_map.size(); + } + + std::vector get_all_endpoints() const { + std::vector tmp; + std::transform(_endpoint_to_host_id_map.begin(), _endpoint_to_host_id_map.end(), std::back_inserter(tmp), [](const auto& p) { + return p.first; + }); + return tmp; + } + +#if 0 public Set getAllEndpoints() { lock.readLock().lock(); From 8a8694cbbfd07c3c01d93e488791c6c46c6d19be Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 24 Jun 2015 11:15:24 +0200 Subject: [PATCH 2/7] Add naive rate limiter object This is mostly a placeholder, since the "limiting" is rather coarse and stuttering --- configure.py | 1 + utils/rate_limiter.cc | 33 +++++++++++++++++++++++++++++++++ utils/rate_limiter.hh | 30 ++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+) create mode 100644 utils/rate_limiter.cc create mode 100644 utils/rate_limiter.hh diff --git a/configure.py b/configure.py index da17fcd52a..98f5b35b6f 100755 --- a/configure.py +++ b/configure.py @@ -471,6 +471,7 @@ urchin_core = (['database.cc', 'utils/i_filter.cc', 'utils/bloom_filter.cc', 'utils/bloom_calculations.cc', + 'utils/rate_limiter.cc', 'gms/version_generator.cc', 'gms/versioned_value.cc', 'gms/gossiper.cc', diff --git a/utils/rate_limiter.cc b/utils/rate_limiter.cc new file mode 100644 index 0000000000..789ef18200 --- /dev/null +++ b/utils/rate_limiter.cc @@ -0,0 +1,33 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include "rate_limiter.hh" + +utils::rate_limiter::rate_limiter(size_t rate) + : _units_per_s(rate) { + if (_units_per_s != 0) { + _timer.set_callback(std::bind(&rate_limiter::on_timer, this)); + _timer.arm(lowres_clock::now() + std::chrono::seconds(1), + std::experimental::optional { + std::chrono::seconds(1) }); + } +} + +void utils::rate_limiter::on_timer() { + _sem.signal(_units_per_s - _sem.current()); +} + +future<> utils::rate_limiter::reserve(size_t u) { + if (_units_per_s == 0) { + return make_ready_future<>(); + } + if (u <= _units_per_s) { + return _sem.wait(u); + } + auto n = std::min(u, _units_per_s); + auto r = u - n; + return _sem.wait(n).then([this, r] { + return reserve(r); + }); +} diff --git a/utils/rate_limiter.hh b/utils/rate_limiter.hh new file mode 100644 index 0000000000..066654afc9 --- /dev/null +++ b/utils/rate_limiter.hh @@ -0,0 +1,30 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once + +#include "core/timer.hh" +#include "core/semaphore.hh" +#include "core/reactor.hh" + +namespace utils { + +/** + * 100% naive rate limiter. Consider it a placeholder + * Will let you process X "units" per second, then reset this every s. + * Obviously, accuracy is virtually non-existant and steady rate will fluctuate. + */ +class rate_limiter { +private: + timer _timer; + size_t _units_per_s; + semaphore _sem; + + void on_timer(); +public: + rate_limiter(size_t rate); + future<> reserve(size_t u); +}; + +} From 0e500b675983636740bf5080f47f8f98327a3b6e Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 24 Jun 2015 11:16:07 +0200 Subject: [PATCH 3/7] db::serializer : add replay_position serializer --- db/serializer.cc | 18 ++++++++++++++++++ db/serializer.hh | 7 +++++++ 2 files changed, 25 insertions(+) diff --git a/db/serializer.cc b/db/serializer.cc index 63d4283c9a..3b4150da96 100644 --- a/db/serializer.cc +++ b/db/serializer.cc @@ -234,6 +234,23 @@ frozen_mutation db::serializer::read(input& in) { return frozen_mutation(bytes_serializer::read(in)); } +template<> +db::serializer::serializer(const db::replay_position& rp) + : _item(rp), _size(sizeof(uint64_t) * 2) { +} + +template<> +void db::serializer::write(output& out, const db::replay_position& rp) { + out.write(rp.id); + out.write(rp.pos); +} + +template<> +void db::serializer::read(db::replay_position& rp, input& in) { + rp.id = in.read(); + rp.pos = in.read(); +} + template class db::serializer ; template class db::serializer ; template class db::serializer ; @@ -245,3 +262,4 @@ template class db::serializer ; template class db::serializer ; template class db::serializer ; template class db::serializer ; +template class db::serializer ; diff --git a/db/serializer.hh b/db/serializer.hh index 54a9bf3e51..eb37db8411 100644 --- a/db/serializer.hh +++ b/db/serializer.hh @@ -13,6 +13,7 @@ #include "keys.hh" #include "database_fwd.hh" #include "frozen_mutation.hh" +#include "db/commitlog/replay_position.hh" namespace db { /** @@ -116,6 +117,10 @@ template<> void serializer::write(output&, const clu template<> void serializer::read(clustering_key_prefix_view&, input&); template<> clustering_key_prefix_view serializer::read(input&); +template<> serializer::serializer(const db::replay_position&); +template<> void serializer::write(output&, const db::replay_position&); +template<> void serializer::read(db::replay_position&, input&); + template T serializer::read(input& in) { type t; @@ -131,6 +136,7 @@ extern template class serializer; extern template class serializer; extern template class serializer; extern template class serializer; +extern template class serializer; typedef serializer tombstone_serializer; typedef serializer bytes_serializer; // Compatible with bytes_view_serializer @@ -143,6 +149,7 @@ typedef serializer partition_key_view_serializer; typedef serializer clustering_key_view_serializer; typedef serializer clustering_key_prefix_view_serializer; typedef serializer frozen_mutation_serializer; +typedef serializer replay_position_serializer; } From 4ba0bf7ac6733937e5faf9d7f34f50ab851a0feb Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 24 Jun 2015 11:25:02 +0200 Subject: [PATCH 4/7] system_keyspace : add support for storing and reading truncation * Straight re-impl of origin code * Uses system tables not yet created, so does not actuall work... --- db/system_keyspace.cc | 156 +++++++++++++++++++++--------------------- db/system_keyspace.hh | 99 ++++----------------------- 2 files changed, 90 insertions(+), 165 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index cbb0629dfe..2902aeefe0 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -37,6 +37,8 @@ #include "version.hh" #include "thrift/server.hh" #include "exceptions/exceptions.hh" +#include "cql3/query_processor.hh" +#include "db/serializer.hh" namespace db { namespace system_keyspace { @@ -501,89 +503,76 @@ future<> setup(distributed& db, distributed& qp UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); return CompactionHistoryTabularData.from(queryResultSet); } +#endif - public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); - truncationRecords = null; - forceBlockingFlush(LOCAL); + +typedef std::pair truncation_entry; +typedef std::unordered_map truncation_map; +static thread_local std::experimental::optional truncation_records; + +future<> save_truncation_record(cql3::query_processor& qp, const column_family& cf, db_clock::time_point truncated_at, const db::replay_position& rp) { + db::serializer rps(rp); + bytes buf(bytes::initialized_later(), sizeof(db_clock::rep) + rps.size()); + data_output out(buf); + rps(out); + out.write(truncated_at.time_since_epoch().count()); + + map_type_impl::native_type tmp; + tmp.emplace_back(boost::any{ cf.schema()->id() }, boost::any{ buf }); + + sstring req = sprint("UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'", LOCAL, LOCAL); + return qp.execute_internal(req, {tmp}).then([&qp](auto rs) { + truncation_records = {}; + return force_blocking_flush(qp.db(), LOCAL); + }); +} + +/** + * This method is used to remove information about truncation time for specified column family + */ +future<> remove_truncation_record(cql3::query_processor& qp, utils::UUID id) { + sstring req = sprint("DELETE truncated_at[?] from system.%s WHERE key = '%s'", LOCAL, LOCAL); + return qp.execute_internal(req, {id}).then([&qp](auto rs) { + truncation_records = {}; + return force_blocking_flush(qp.db(), LOCAL); + }); +} + +static future get_truncation_record(cql3::query_processor& qp, utils::UUID cf_id) { + if (!truncation_records) { + sstring req = sprint("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL); + return qp.execute_internal(req).then([&qp, cf_id](::shared_ptr rs) { + truncation_map tmp; + if (!rs->empty() && rs->one().has("truncated_set")) { + auto map = rs->one().get_map("truncated_at"); + for (auto& p : map) { + truncation_entry e; + data_input in(p.second); + e.first = db::serializer::read(in); + e.second = db_clock::time_point(db_clock::duration(in.read())); + tmp[p.first] = e; + } + } + truncation_records = std::move(tmp); + return get_truncation_record(qp, cf_id); + }); } + return make_ready_future((*truncation_records)[cf_id]); +} - /** - * This method is used to remove information about truncation time for specified column family - */ - public static synchronized void removeTruncationRecord(UUID cfId) - { - String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), cfId); - truncationRecords = null; - forceBlockingFlush(LOCAL); - } +future get_truncated_position(cql3::query_processor& qp, utils::UUID cf_id) { + return get_truncation_record(qp, cf_id).then([](truncation_entry e) { + return make_ready_future(e.first); + }); +} - private static Map truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - DataOutputBuffer out = new DataOutputBuffer(); - try - { - ReplayPosition.serializer.serialize(position, out); - out.writeLong(truncatedAt); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); - } - - public static ReplayPosition getTruncatedPosition(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? null : record.left; - } - - public static long getTruncatedAt(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? Long.MIN_VALUE : record.right; - } - - private static synchronized Pair getTruncationRecord(UUID cfId) - { - if (truncationRecords == null) - truncationRecords = readTruncationRecords(); - return truncationRecords.get(cfId); - } - - private static Map> readTruncationRecords() - { - UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); - - Map> records = new HashMap<>(); - - if (!rows.isEmpty() && rows.one().has("truncated_at")) - { - Map map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance); - for (Map.Entry entry : map.entrySet()) - records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); - } - - return records; - } - - private static Pair truncationRecordFromBlob(ByteBuffer bytes) - { - try - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); - return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } +future get_truncated_at(cql3::query_processor& qp, utils::UUID cf_id) { + return get_truncation_record(qp, cf_id).then([](truncation_entry e) { + return make_ready_future(e.second); + }); +} +#if 0 /** * Record tokens being used by another node */ @@ -694,8 +683,19 @@ future<> force_blocking_flush(sstring cfname) { return cf.flush(&qctx->db()); } +future<> force_blocking_flush(distributed& db, sstring cf_name) { #if 0 + if (!Boolean.getBoolean("cassandra.unsafesystem")) +#endif + { + return db.invoke_on_all([cf_name](database& db) { + auto& cf = db.find_column_family(NAME, cf_name); + return cf.seal_active_memtable(&db); + }); + } +} +#if 0 /** * Return a map of stored tokens to IP addresses * diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 65e70e673b..8ad282d886 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -31,6 +31,8 @@ #include "gms/inet_address.hh" #include "query-result-set.hh" #include "locator/token_metadata.hh" +#include "db_clock.hh" +#include "db/commitlog/replay_position.hh" namespace service { @@ -39,7 +41,7 @@ class storage_proxy; } namespace cql3 { -class query_processor; + class query_processor; } namespace db { @@ -210,88 +212,13 @@ load_dc_rack_info(); UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); return CompactionHistoryTabularData.from(queryResultSet); } +#endif + future<> save_truncation_record(cql3::query_processor&, const column_family&, db_clock::time_point truncated_at, const db::replay_position&); + future<> remove_truncation_record(cql3::query_processor&, utils::UUID); + future get_truncated_position(cql3::query_processor&, utils::UUID); + future get_truncated_at(cql3::query_processor&, utils::UUID); - public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); - truncationRecords = null; - forceBlockingFlush(LOCAL); - } - - /** - * This method is used to remove information about truncation time for specified column family - */ - public static synchronized void removeTruncationRecord(UUID cfId) - { - String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), cfId); - truncationRecords = null; - forceBlockingFlush(LOCAL); - } - - private static Map truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - DataOutputBuffer out = new DataOutputBuffer(); - try - { - ReplayPosition.serializer.serialize(position, out); - out.writeLong(truncatedAt); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); - } - - public static ReplayPosition getTruncatedPosition(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? null : record.left; - } - - public static long getTruncatedAt(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? Long.MIN_VALUE : record.right; - } - - private static synchronized Pair getTruncationRecord(UUID cfId) - { - if (truncationRecords == null) - truncationRecords = readTruncationRecords(); - return truncationRecords.get(cfId); - } - - private static Map> readTruncationRecords() - { - UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); - - Map> records = new HashMap<>(); - - if (!rows.isEmpty() && rows.one().has("truncated_at")) - { - Map map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance); - for (Map.Entry entry : map.entrySet()) - records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); - } - - return records; - } - - private static Pair truncationRecordFromBlob(ByteBuffer bytes) - { - try - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); - return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } +#if 0 /** * Record tokens being used by another node @@ -390,13 +317,11 @@ load_dc_rack_info(); updateTokens(tokens); return tokens; } +#endif - public static void forceBlockingFlush(String cfname) - { - if (!Boolean.getBoolean("cassandra.unsafesystem")) - FBUtilities.waitOnFuture(Keyspace.open(NAME).getColumnFamilyStore(cfname).forceFlush()); - } + future<> force_blocking_flush(distributed&, sstring cf_name); +#if o /** * Return a map of stored tokens to IP addresses * From c1a5627c1270dc17e0c30fecacd30b17074868e7 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 15 Jun 2015 16:54:50 +0200 Subject: [PATCH 5/7] Import BatchlogManager.java --- db/BatchlogManager.java | 539 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 539 insertions(+) create mode 100644 db/BatchlogManager.java diff --git a/db/BatchlogManager.java b/db/BatchlogManager.java new file mode 100644 index 0000000000..dd84ac88b9 --- /dev/null +++ b/db/BatchlogManager.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.*; +import com.google.common.util.concurrent.RateLimiter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.WriteResponseHandler; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; + +public class BatchlogManager implements BatchlogManagerMBean +{ + private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; + private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds + private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. + + private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); + public static final BatchlogManager instance = new BatchlogManager(); + + private final AtomicLong totalBatchesReplayed = new AtomicLong(); + + // Single-thread executor service for scheduling and serializing log replay. + private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); + + public void start() + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws ExecutionException, InterruptedException + { + replayAllFailedBatches(); + } + }; + + batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS); + } + + public static void shutdown() throws InterruptedException + { + batchlogTasks.shutdown(); + batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); + } + + public int countAllBatches() + { + String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG); + return (int) executeInternal(query).one().getLong("count"); + } + + public long getTotalBatchesReplayed() + { + return totalBatchesReplayed.longValue(); + } + + public void forceBatchlogReplay() + { + startBatchlogReplay(); + } + + public Future startBatchlogReplay() + { + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws ExecutionException, InterruptedException + { + replayAllFailedBatches(); + } + }; + // If a replay is already in progress this request will be executed after it completes. + return batchlogTasks.submit(runnable); + } + + public static Mutation getBatchlogMutationFor(Collection mutations, UUID uuid, int version) + { + return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros()); + } + + @VisibleForTesting + static Mutation getBatchlogMutationFor(Collection mutations, UUID uuid, int version, long now) + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog); + CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now); + adder.add("data", serializeMutations(mutations, version)) + .add("written_at", new Date(now / 1000)) + .add("version", version); + return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid), cf); + } + + private static ByteBuffer serializeMutations(Collection mutations, int version) + { + try (DataOutputBuffer buf = new DataOutputBuffer()) + { + buf.writeInt(mutations.size()); + for (Mutation mutation : mutations) + Mutation.serializer.serialize(mutation, buf, version); + return buf.buffer(); + } + catch (IOException e) + { + throw new AssertionError(); // cannot happen. + } + } + + private void replayAllFailedBatches() throws ExecutionException, InterruptedException + { + logger.debug("Started replayAllFailedBatches"); + + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). + int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); + RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + + UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", + SystemKeyspace.NAME, + SystemKeyspace.BATCHLOG, + PAGE_SIZE)); + + while (!page.isEmpty()) + { + UUID id = processBatchlogPage(page, rateLimiter); + + if (page.size() < PAGE_SIZE) + break; // we've exhausted the batchlog, next query would be empty. + + page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", + SystemKeyspace.NAME, + SystemKeyspace.BATCHLOG, + PAGE_SIZE), + id); + } + + cleanup(); + + logger.debug("Finished replayAllFailedBatches"); + } + + private void deleteBatch(UUID id) + { + Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id)); + mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); + mutation.apply(); + } + + private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter) + { + UUID id = null; + ArrayList batches = new ArrayList<>(page.size()); + + // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others + for (UntypedResultSet.Row row : page) + { + id = row.getUUID("id"); + long writtenAt = row.getLong("written_at"); + // enough time for the actual write + batchlog entry mutation delivery (two separate requests). + long timeout = getBatchlogTimeout(); + if (System.currentTimeMillis() < writtenAt + timeout) + continue; // not ready to replay yet, might still get a deletion. + + int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; + Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version); + try + { + if (batch.replay(rateLimiter) > 0) + { + batches.add(batch); + } + else + { + deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated). + totalBatchesReplayed.incrementAndGet(); + } + } + catch (IOException e) + { + logger.warn("Skipped batch replay of {} due to {}", id, e); + deleteBatch(id); + } + } + + // now waiting for all batches to complete their processing + // schedule hints for timed out deliveries + for (Batch batch : batches) + { + batch.finish(); + deleteBatch(batch.id); + } + + totalBatchesReplayed.addAndGet(batches.size()); + + return id; + } + + public long getBatchlogTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation + } + + private static class Batch + { + private final UUID id; + private final long writtenAt; + private final ByteBuffer data; + private final int version; + + private List> replayHandlers; + + public Batch(UUID id, long writtenAt, ByteBuffer data, int version) + { + this.id = id; + this.writtenAt = writtenAt; + this.data = data; + this.version = version; + } + + public int replay(RateLimiter rateLimiter) throws IOException + { + logger.debug("Replaying batch {}", id); + + List mutations = replayingMutations(); + + if (mutations.isEmpty()) + return 0; + + int ttl = calculateHintTTL(mutations); + if (ttl <= 0) + return 0; + + replayHandlers = sendReplays(mutations, writtenAt, ttl); + + rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation. + + return replayHandlers.size(); + } + + public void finish() + { + for (int i = 0; i < replayHandlers.size(); i++) + { + ReplayWriteResponseHandler handler = replayHandlers.get(i); + try + { + handler.get(); + } + catch (WriteTimeoutException|WriteFailureException e) + { + logger.debug("Failed replaying a batched mutation to a node, will write a hint"); + logger.debug("Failure was : {}", e.getMessage()); + // writing hints for the rest to hints, starting from i + writeHintsForUndeliveredEndpoints(i); + return; + } + } + } + + private List replayingMutations() throws IOException + { + DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); + int size = in.readInt(); + List mutations = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + Mutation mutation = Mutation.serializer.deserialize(in, version); + + // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. + // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then + // truncated. + for (UUID cfId : mutation.getColumnFamilyIds()) + if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) + mutation = mutation.without(cfId); + + if (!mutation.isEmpty()) + mutations.add(mutation); + } + return mutations; + } + + private void writeHintsForUndeliveredEndpoints(int startFrom) + { + try + { + // Here we deserialize mutations 2nd time from byte buffer. + // but this is ok, because timeout on batch direct delivery is rare + // (it can happen only several seconds until node is marked dead) + // so trading some cpu to keep less objects + List replayingMutations = replayingMutations(); + for (int i = startFrom; i < replayHandlers.size(); i++) + { + Mutation undeliveredMutation = replayingMutations.get(i); + int ttl = calculateHintTTL(replayingMutations); + ReplayWriteResponseHandler handler = replayHandlers.get(i); + + if (ttl > 0 && handler != null) + for (InetAddress endpoint : handler.undelivered) + StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint); + } + } + catch (IOException e) + { + logger.error("Cannot schedule hints for undelivered batch", e); + } + } + + private List> sendReplays(List mutations, long writtenAt, int ttl) + { + List> handlers = new ArrayList<>(mutations.size()); + for (Mutation mutation : mutations) + { + ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl); + if (handler != null) + handlers.add(handler); + } + return handlers; + } + + /** + * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints + * when a replica is down or a write request times out. + * + * @return direct delivery handler to wait on or null, if no live nodes found + */ + private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) + { + Set liveEndpoints = new HashSet<>(); + String ks = mutation.getKeyspaceName(); + Token tk = StorageService.getPartitioner().getToken(mutation.key()); + + for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), + StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) + { + if (endpoint.equals(FBUtilities.getBroadcastAddress())) + mutation.apply(); + else if (FailureDetector.instance.isAlive(endpoint)) + liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. + else + StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint); + } + + if (liveEndpoints.isEmpty()) + return null; + + ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler<>(liveEndpoints); + MessageOut message = mutation.createMessage(); + for (InetAddress endpoint : liveEndpoints) + MessagingService.instance().sendRR(message, endpoint, handler, false); + return handler; + } + + /* + * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). + * This ensures that deletes aren't "undone" by an old batch replay. + */ + private int calculateHintTTL(Collection mutations) + { + int unadjustedTTL = Integer.MAX_VALUE; + for (Mutation mutation : mutations) + unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); + return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); + } + + /** + * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from + * which we did not receive a successful reply. + */ + private static class ReplayWriteResponseHandler extends WriteResponseHandler + { + private final Set undelivered = Collections.newSetFromMap(new ConcurrentHashMap()); + + public ReplayWriteResponseHandler(Collection writeEndpoints) + { + super(writeEndpoints, Collections.emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); + undelivered.addAll(writeEndpoints); + } + + @Override + protected int totalBlockFor() + { + return this.naturalEndpoints.size(); + } + + @Override + public void response(MessageIn m) + { + boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from); + assert removed; + super.response(m); + } + } + } + + // force flush + compaction to reclaim space from the replayed batches + private void cleanup() throws ExecutionException, InterruptedException + { + ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); + cfs.forceBlockingFlush(); + Collection descriptors = new ArrayList<>(); + for (SSTableReader sstr : cfs.getSSTables()) + descriptors.add(sstr.descriptor); + if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. + CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); + } + + public static class EndpointFilter + { + private final String localRack; + private final Multimap endpoints; + + public EndpointFilter(String localRack, Multimap endpoints) + { + this.localRack = localRack; + this.endpoints = endpoints; + } + + /** + * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + */ + public Collection filter() + { + // special case for single-node data centers + if (endpoints.values().size() == 1) + return endpoints.values(); + + // strip out dead endpoints and localhost + ListMultimap validated = ArrayListMultimap.create(); + for (Map.Entry entry : endpoints.entries()) + if (isValid(entry.getValue())) + validated.put(entry.getKey(), entry.getValue()); + + if (validated.size() <= 2) + return validated.values(); + + if (validated.size() - validated.get(localRack).size() >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + // we have only 1 `other` rack + Collection otherRack = Iterables.getOnlyElement(validated.asMap().values()); + return Lists.newArrayList(Iterables.limit(otherRack, 2)); + } + + // randomize which racks we pick from if more than 2 remaining + Collection racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + Collections.shuffle((List) racks); + } + + // grab a random member of up to two racks + List result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List rackMembers = validated.get(rack); + result.add(rackMembers.get(getRandomInt(rackMembers.size()))); + } + + return result; + } + + @VisibleForTesting + protected boolean isValid(InetAddress input) + { + return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); + } + + @VisibleForTesting + protected int getRandomInt(int bound) + { + return ThreadLocalRandom.current().nextInt(bound); + } + } +} From ef2cc9b05d09daa84e90cf5b267bfdbe3bfebfae Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 24 Jun 2015 11:27:37 +0200 Subject: [PATCH 6/7] BatchLogManager.java -> C++ Somewhat simplifies version of the Origin code, since from what I can see, there is less need for us to do explicit query sends in the BLM itself, instead we can just go through storage_proxy. I could be wrong though. --- configure.py | 1 + db/BatchlogManager.java | 539 ---------------------------------------- db/batchlog_manager.cc | 240 ++++++++++++++++++ db/batchlog_manager.hh | 86 +++++++ 4 files changed, 327 insertions(+), 539 deletions(-) delete mode 100644 db/BatchlogManager.java create mode 100644 db/batchlog_manager.cc create mode 100644 db/batchlog_manager.hh diff --git a/configure.py b/configure.py index 98f5b35b6f..0148b71aa4 100755 --- a/configure.py +++ b/configure.py @@ -465,6 +465,7 @@ urchin_core = (['database.cc', 'db/config.cc', 'db/index/secondary_index.cc', 'db/marshal/type_parser.cc', + 'db/batchlog_manager.cc', 'io/io.cc', 'utils/utils.cc', 'utils/UUID_gen.cc', diff --git a/db/BatchlogManager.java b/db/BatchlogManager.java deleted file mode 100644 index dd84ac88b9..0000000000 --- a/db/BatchlogManager.java +++ /dev/null @@ -1,539 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInputStream; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; -import com.google.common.util.concurrent.RateLimiter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.marshal.UUIDType; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.WriteFailureException; -import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.WriteResponseHandler; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.WrappedRunnable; -import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; - -public class BatchlogManager implements BatchlogManagerMBean -{ - private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; - private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds - private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. - - private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); - public static final BatchlogManager instance = new BatchlogManager(); - - private final AtomicLong totalBatchesReplayed = new AtomicLong(); - - // Single-thread executor service for scheduling and serializing log replay. - private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); - - public void start() - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws ExecutionException, InterruptedException - { - replayAllFailedBatches(); - } - }; - - batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS); - } - - public static void shutdown() throws InterruptedException - { - batchlogTasks.shutdown(); - batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); - } - - public int countAllBatches() - { - String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG); - return (int) executeInternal(query).one().getLong("count"); - } - - public long getTotalBatchesReplayed() - { - return totalBatchesReplayed.longValue(); - } - - public void forceBatchlogReplay() - { - startBatchlogReplay(); - } - - public Future startBatchlogReplay() - { - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws ExecutionException, InterruptedException - { - replayAllFailedBatches(); - } - }; - // If a replay is already in progress this request will be executed after it completes. - return batchlogTasks.submit(runnable); - } - - public static Mutation getBatchlogMutationFor(Collection mutations, UUID uuid, int version) - { - return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros()); - } - - @VisibleForTesting - static Mutation getBatchlogMutationFor(Collection mutations, UUID uuid, int version, long now) - { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog); - CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now); - adder.add("data", serializeMutations(mutations, version)) - .add("written_at", new Date(now / 1000)) - .add("version", version); - return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid), cf); - } - - private static ByteBuffer serializeMutations(Collection mutations, int version) - { - try (DataOutputBuffer buf = new DataOutputBuffer()) - { - buf.writeInt(mutations.size()); - for (Mutation mutation : mutations) - Mutation.serializer.serialize(mutation, buf, version); - return buf.buffer(); - } - catch (IOException e) - { - throw new AssertionError(); // cannot happen. - } - } - - private void replayAllFailedBatches() throws ExecutionException, InterruptedException - { - logger.debug("Started replayAllFailedBatches"); - - // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). - // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). - int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); - RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - - UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", - SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG, - PAGE_SIZE)); - - while (!page.isEmpty()) - { - UUID id = processBatchlogPage(page, rateLimiter); - - if (page.size() < PAGE_SIZE) - break; // we've exhausted the batchlog, next query would be empty. - - page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", - SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG, - PAGE_SIZE), - id); - } - - cleanup(); - - logger.debug("Finished replayAllFailedBatches"); - } - - private void deleteBatch(UUID id) - { - Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id)); - mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); - mutation.apply(); - } - - private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter) - { - UUID id = null; - ArrayList batches = new ArrayList<>(page.size()); - - // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others - for (UntypedResultSet.Row row : page) - { - id = row.getUUID("id"); - long writtenAt = row.getLong("written_at"); - // enough time for the actual write + batchlog entry mutation delivery (two separate requests). - long timeout = getBatchlogTimeout(); - if (System.currentTimeMillis() < writtenAt + timeout) - continue; // not ready to replay yet, might still get a deletion. - - int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; - Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version); - try - { - if (batch.replay(rateLimiter) > 0) - { - batches.add(batch); - } - else - { - deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated). - totalBatchesReplayed.incrementAndGet(); - } - } - catch (IOException e) - { - logger.warn("Skipped batch replay of {} due to {}", id, e); - deleteBatch(id); - } - } - - // now waiting for all batches to complete their processing - // schedule hints for timed out deliveries - for (Batch batch : batches) - { - batch.finish(); - deleteBatch(batch.id); - } - - totalBatchesReplayed.addAndGet(batches.size()); - - return id; - } - - public long getBatchlogTimeout() - { - return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation - } - - private static class Batch - { - private final UUID id; - private final long writtenAt; - private final ByteBuffer data; - private final int version; - - private List> replayHandlers; - - public Batch(UUID id, long writtenAt, ByteBuffer data, int version) - { - this.id = id; - this.writtenAt = writtenAt; - this.data = data; - this.version = version; - } - - public int replay(RateLimiter rateLimiter) throws IOException - { - logger.debug("Replaying batch {}", id); - - List mutations = replayingMutations(); - - if (mutations.isEmpty()) - return 0; - - int ttl = calculateHintTTL(mutations); - if (ttl <= 0) - return 0; - - replayHandlers = sendReplays(mutations, writtenAt, ttl); - - rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation. - - return replayHandlers.size(); - } - - public void finish() - { - for (int i = 0; i < replayHandlers.size(); i++) - { - ReplayWriteResponseHandler handler = replayHandlers.get(i); - try - { - handler.get(); - } - catch (WriteTimeoutException|WriteFailureException e) - { - logger.debug("Failed replaying a batched mutation to a node, will write a hint"); - logger.debug("Failure was : {}", e.getMessage()); - // writing hints for the rest to hints, starting from i - writeHintsForUndeliveredEndpoints(i); - return; - } - } - } - - private List replayingMutations() throws IOException - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); - int size = in.readInt(); - List mutations = new ArrayList<>(size); - for (int i = 0; i < size; i++) - { - Mutation mutation = Mutation.serializer.deserialize(in, version); - - // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. - // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then - // truncated. - for (UUID cfId : mutation.getColumnFamilyIds()) - if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) - mutation = mutation.without(cfId); - - if (!mutation.isEmpty()) - mutations.add(mutation); - } - return mutations; - } - - private void writeHintsForUndeliveredEndpoints(int startFrom) - { - try - { - // Here we deserialize mutations 2nd time from byte buffer. - // but this is ok, because timeout on batch direct delivery is rare - // (it can happen only several seconds until node is marked dead) - // so trading some cpu to keep less objects - List replayingMutations = replayingMutations(); - for (int i = startFrom; i < replayHandlers.size(); i++) - { - Mutation undeliveredMutation = replayingMutations.get(i); - int ttl = calculateHintTTL(replayingMutations); - ReplayWriteResponseHandler handler = replayHandlers.get(i); - - if (ttl > 0 && handler != null) - for (InetAddress endpoint : handler.undelivered) - StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint); - } - } - catch (IOException e) - { - logger.error("Cannot schedule hints for undelivered batch", e); - } - } - - private List> sendReplays(List mutations, long writtenAt, int ttl) - { - List> handlers = new ArrayList<>(mutations.size()); - for (Mutation mutation : mutations) - { - ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl); - if (handler != null) - handlers.add(handler); - } - return handlers; - } - - /** - * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints - * when a replica is down or a write request times out. - * - * @return direct delivery handler to wait on or null, if no live nodes found - */ - private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) - { - Set liveEndpoints = new HashSet<>(); - String ks = mutation.getKeyspaceName(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - - for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), - StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) - { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - mutation.apply(); - else if (FailureDetector.instance.isAlive(endpoint)) - liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. - else - StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint); - } - - if (liveEndpoints.isEmpty()) - return null; - - ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler<>(liveEndpoints); - MessageOut message = mutation.createMessage(); - for (InetAddress endpoint : liveEndpoints) - MessagingService.instance().sendRR(message, endpoint, handler, false); - return handler; - } - - /* - * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). - * This ensures that deletes aren't "undone" by an old batch replay. - */ - private int calculateHintTTL(Collection mutations) - { - int unadjustedTTL = Integer.MAX_VALUE; - for (Mutation mutation : mutations) - unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); - return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); - } - - /** - * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from - * which we did not receive a successful reply. - */ - private static class ReplayWriteResponseHandler extends WriteResponseHandler - { - private final Set undelivered = Collections.newSetFromMap(new ConcurrentHashMap()); - - public ReplayWriteResponseHandler(Collection writeEndpoints) - { - super(writeEndpoints, Collections.emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); - undelivered.addAll(writeEndpoints); - } - - @Override - protected int totalBlockFor() - { - return this.naturalEndpoints.size(); - } - - @Override - public void response(MessageIn m) - { - boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from); - assert removed; - super.response(m); - } - } - } - - // force flush + compaction to reclaim space from the replayed batches - private void cleanup() throws ExecutionException, InterruptedException - { - ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); - cfs.forceBlockingFlush(); - Collection descriptors = new ArrayList<>(); - for (SSTableReader sstr : cfs.getSSTables()) - descriptors.add(sstr.descriptor); - if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. - CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); - } - - public static class EndpointFilter - { - private final String localRack; - private final Multimap endpoints; - - public EndpointFilter(String localRack, Multimap endpoints) - { - this.localRack = localRack; - this.endpoints = endpoints; - } - - /** - * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - */ - public Collection filter() - { - // special case for single-node data centers - if (endpoints.values().size() == 1) - return endpoints.values(); - - // strip out dead endpoints and localhost - ListMultimap validated = ArrayListMultimap.create(); - for (Map.Entry entry : endpoints.entries()) - if (isValid(entry.getValue())) - validated.put(entry.getKey(), entry.getValue()); - - if (validated.size() <= 2) - return validated.values(); - - if (validated.size() - validated.get(localRack).size() >= 2) - { - // we have enough endpoints in other racks - validated.removeAll(localRack); - } - - if (validated.keySet().size() == 1) - { - // we have only 1 `other` rack - Collection otherRack = Iterables.getOnlyElement(validated.asMap().values()); - return Lists.newArrayList(Iterables.limit(otherRack, 2)); - } - - // randomize which racks we pick from if more than 2 remaining - Collection racks; - if (validated.keySet().size() == 2) - { - racks = validated.keySet(); - } - else - { - racks = Lists.newArrayList(validated.keySet()); - Collections.shuffle((List) racks); - } - - // grab a random member of up to two racks - List result = new ArrayList<>(2); - for (String rack : Iterables.limit(racks, 2)) - { - List rackMembers = validated.get(rack); - result.add(rackMembers.get(getRandomInt(rackMembers.size()))); - } - - return result; - } - - @VisibleForTesting - protected boolean isValid(InetAddress input) - { - return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); - } - - @VisibleForTesting - protected int getRandomInt(int bound) - { - return ThreadLocalRandom.current().nextInt(bound); - } - } -} diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc new file mode 100644 index 0000000000..d3e03a9456 --- /dev/null +++ b/db/batchlog_manager.cc @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#include +#include "batchlog_manager.hh" +#include "service/storage_service.hh" +#include "service/storage_proxy.hh" +#include "system_keyspace.hh" +#include "utils/rate_limiter.hh" +#include "core/future-util.hh" +#include "core/do_with.hh" +#include "log.hh" +#include "serializer.hh" +#include "db_clock.hh" +#include "database.hh" +#include "unimplemented.hh" + +static thread_local logging::logger logger("BatchLog Manager"); + +const uint32_t db::batchlog_manager::replay_interval; +const uint32_t db::batchlog_manager::page_size; + +db::batchlog_manager::batchlog_manager(cql3::query_processor& qp) + : _qp(qp) +{} + +future<> db::batchlog_manager::start() { + _timer.set_callback( + std::bind(&batchlog_manager::replay_all_failed_batches, this)); + _timer.arm( + lowres_clock::now() + + std::chrono::milliseconds( + service::storage_service::RING_DELAY), + std::experimental::optional { + std::chrono::milliseconds(replay_interval) }); + return make_ready_future<>(); +} + +future<> db::batchlog_manager::stop() { + _stop = true; + _timer.cancel(); + return _sem.wait(std::chrono::milliseconds(60)); +} + +future db::batchlog_manager::count_all_batches() const { + sstring query = sprint("SELECT count(*) FROM %s.%s", system_keyspace::NAME, system_keyspace::BATCHLOG); + return _qp.execute_internal(query).then([](::shared_ptr rs) { + return size_t(rs->one().get_as("count")); + }); +} + +mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector mutations, const utils::UUID& id, int32_t version) { + return get_batch_log_mutation_for(std::move(mutations), id, version, db_clock::now()); +} + +mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector mutations, const utils::UUID& id, int32_t version, db_clock::time_point now) { + auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); + auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)}); + auto timestamp = db_clock::now_in_usecs(); + auto data = [this, &mutations] { + std::vector fm(mutations.begin(), mutations.end()); + const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) { + return s + serializer{m}.size(); + }); + bytes buf(bytes::initialized_later(), size); + data_output out(buf); + for (auto& m : fm) { + serializer{m}(out); + } + return buf; + }(); + + mutation m(key, schema); + m.set_cell({}, to_bytes("version"), version, timestamp); + m.set_cell({}, to_bytes("written_at"), now, timestamp); + m.set_cell({}, to_bytes("data"), std::move(data), timestamp); + + return m; +} + +db_clock::duration db::batchlog_manager::get_batch_log_timeout() const { + // enough time for the actual write + BM removal mutation + return db_clock::duration(_qp.db().local().get_config().write_request_timeout_in_ms()) * 2; +} + +future<> db::batchlog_manager::replay_all_failed_batches() { + typedef db_clock::rep clock_type; + + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). + auto throttle_in_kb = _qp.db().local().get_config().batchlog_replay_throttle_in_kb() / service::get_storage_service().local().get_token_metadata().get_all_endpoints().size(); + auto limiter = make_lw_shared(throttle_in_kb * 1000); + + auto batch = [this, limiter](const cql3::untyped_result_set::row& row) { + auto written_at = row.get_as("written_at"); + // enough time for the actual write + batchlog entry mutation delivery (two separate requests). + // enough time for the actual write + batchlog entry mutation delivery (two separate requests). + auto timeout = get_batch_log_timeout(); + if (db_clock::now() < written_at + timeout) { + return make_ready_future<>(); + } + // not used currently. ever? + //auto version = row.has("version") ? row.get_as("version") : /*MessagingService.VERSION_12*/6u; + auto id = row.get_as("id"); + auto data = row.get_blob("data"); + + logger.debug("Replaying batch {}", id); + + auto fms = make_lw_shared>(); + data_input in(data); + while (in.has_next()) { + fms->emplace_back(serializer::read(in)); + } + + auto mutations = make_lw_shared>(); + auto size = data.size(); + + return repeat([this, fms = std::move(fms), written_at, mutations]() mutable { + if (fms->empty()) { + return make_ready_future(stop_iteration::yes); + } + auto& fm = fms->front(); + auto mid = fm.column_family_id(); + return system_keyspace::get_truncated_at(_qp, mid).then([this, &fm, written_at, mutations](db_clock::time_point t) { + auto schema = _qp.db().local().find_schema(fm.column_family_id()); + if (written_at > t) { + auto schema = _qp.db().local().find_schema(fm.column_family_id()); + mutations->emplace_back(fm.unfreeze(schema)); + } + }).then([fms] { + fms->pop_front(); + return make_ready_future(stop_iteration::no); + }); + }).then([this, id, mutations, limiter, written_at, size] { + if (mutations->empty()) { + return make_ready_future<>(); + } + const auto ttl = [this, mutations, written_at]() -> clock_type { + /* + * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). + * This ensures that deletes aren't "undone" by an old batch replay. + */ + auto unadjusted_ttl = std::numeric_limits::max(); + warn(unimplemented::cause::HINT); +#if 0 + for (auto& m : *mutations) { + unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); + } +#endif + return unadjusted_ttl - std::chrono::duration_cast(db_clock::now() - written_at).count(); + }(); + + if (ttl <= 0) { + return make_ready_future<>(); + } + // Origin does the send manually, however I can't see a super great reason to do so. + // Our normal write path does not add much redundancy to the dispatch, and rate is handled after send + // in both cases. + // FIXME: verify that the above is reasonably true. + return limiter->reserve(size).then([this, mutations, id] { + return _qp.proxy().local().mutate(*mutations, db::consistency_level::ANY); + }); + }).then([this, id] { + // delete batch + auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); + auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)}); + mutation m(key, schema); + auto now = service::client_state(service::client_state::internal_tag()).get_timestamp(); + m.partition().apply_delete(*schema, {}, tombstone(now, gc_clock::now())); + return _qp.proxy().local().mutate_locally(m); + }); + }; + + return _sem.wait().then([this, batch = std::move(batch)] { + logger.debug("Started replayAllFailedBatches"); + + typedef ::shared_ptr page_ptr; + sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size); + return _qp.execute_internal(query).then([this, batch = std::move(batch)](page_ptr page) { + return do_with(std::move(page), [this, batch = std::move(batch)](page_ptr & page) mutable { + return repeat([this, &page, batch = std::move(batch)]() mutable { + if (page->empty()) { + return make_ready_future(stop_iteration::yes); + } + auto id = page->back().get_as("id"); + return parallel_for_each(*page, batch).then([this, &page, id]() { + if (page->size() < page_size) { + return make_ready_future(stop_iteration::yes); // we've exhausted the batchlog, next query would be empty. + } + sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", + system_keyspace::NAME, + system_keyspace::BATCHLOG, + page_size); + return _qp.execute_internal(query, {id}).then([&page](auto res) { + page = std::move(res); + return make_ready_future(stop_iteration::no); + }); + }); + }); + }); + }).then([this] { + // TODO FIXME : cleanup() +#if 0 + ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); + cfs.forceBlockingFlush(); + Collection descriptors = new ArrayList<>(); + for (SSTableReader sstr : cfs.getSSTables()) + descriptors.add(sstr.descriptor); + if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. + CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); + +#endif + + }).then([this] { + logger.debug("Finished replayAllFailedBatches"); + }); + }).finally([this] { + _sem.signal(); + }); +} diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh new file mode 100644 index 0000000000..62174bf0d8 --- /dev/null +++ b/db/batchlog_manager.hh @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include +#include "core/future.hh" +#include "core/distributed.hh" +#include "core/timer.hh" +#include "cql3/query_processor.hh" +#include "gms/inet_address.hh" +#include "db_clock.hh" + +namespace db { + +class batchlog_manager { +private: + static constexpr uint32_t replay_interval = 60 * 1000; // milliseconds + static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. + + using clock_type = lowres_clock; + + size_t _total_batches_replayed = 0; + cql3::query_processor& _qp; + timer _timer; + semaphore _sem; + bool _stop = false; + + future<> replay_all_failed_batches(); +public: + // Takes a QP, not a distributes. Because this object is supposed + // to be per shard and does no dispatching beyond delegating the the + // shard qp (which is what you feed here). + batchlog_manager(cql3::query_processor&); + + future<> start(); + future<> stop(); + + // for testing. + future<> do_batch_log_replay() { + return replay_all_failed_batches(); + } + future count_all_batches() const; + size_t get_total_batches_replayed() const { + return _total_batches_replayed; + } + mutation get_batch_log_mutation_for(std::vector, const utils::UUID&, int32_t); + mutation get_batch_log_mutation_for(std::vector, const utils::UUID&, int32_t, db_clock::time_point); + db_clock::duration get_batch_log_timeout() const; + + class endpoint_filter { + private: + const sstring _local_rack; + const std::unordered_map> _endpoints; + + public: + endpoint_filter(sstring, std::unordered_map>); + /** + * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + */ + std::vector filter() const; + }; +}; + +} From 6cf2230cb67e071b9cd0c7fd0e88b535c06d0807 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 7 Jul 2015 16:19:52 +0200 Subject: [PATCH 7/7] Add batchlog_manager_test Very simple, and limited, but at least checks that very basic replay works. --- configure.py | 2 + test.py | 1 + tests/urchin/batchlog_manager_test.cc | 63 +++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 tests/urchin/batchlog_manager_test.cc diff --git a/configure.py b/configure.py index 0148b71aa4..29ce991544 100755 --- a/configure.py +++ b/configure.py @@ -193,6 +193,7 @@ urchin_tests = [ 'tests/urchin/gossiping_property_file_snitch_test', 'tests/urchin/network_topology_strategy_test', 'tests/urchin/query_processor_test', + 'tests/urchin/batchlog_manager_test', ] tests = [ @@ -582,6 +583,7 @@ deps['tests/urchin/gossiping_property_file_snitch_test'] += boost_test_lib deps['tests/urchin/network_topology_strategy_test'] += boost_test_lib deps['tests/urchin/row_cache_test'] += boost_test_lib deps['tests/urchin/query_processor_test'] += boost_test_lib +deps['tests/urchin/batchlog_manager_test'] += boost_test_lib deps['tests/urchin/bytes_ostream_test'] = ['tests/urchin/bytes_ostream_test.cc'] deps['tests/urchin/UUID_test'] = ['utils/UUID_gen.cc', 'tests/urchin/UUID_test.cc'] diff --git a/test.py b/test.py index 68d467ecc8..11aefb820d 100755 --- a/test.py +++ b/test.py @@ -57,6 +57,7 @@ boost_tests = [ 'urchin/row_cache_test', 'urchin/network_topology_strategy_test', 'urchin/query_processor_test', + 'urchin/batchlog_manager_test', ] other_tests = [ diff --git a/tests/urchin/batchlog_manager_test.cc b/tests/urchin/batchlog_manager_test.cc new file mode 100644 index 0000000000..b7db946a51 --- /dev/null +++ b/tests/urchin/batchlog_manager_test.cc @@ -0,0 +1,63 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK + +#include +#include +#include +#include +#include + +#include "tests/test-utils.hh" +#include "tests/urchin/cql_test_env.hh" +#include "tests/urchin/cql_assertions.hh" + +#include "core/future-util.hh" +#include "core/shared_ptr.hh" +#include "transport/messages/result_message.hh" +#include "cql3/query_processor.hh" +#include "db/batchlog_manager.hh" + +static atomic_cell make_atomic_cell(bytes value) { + return atomic_cell::make_live(0, std::move(value)); +}; + +SEASTAR_TEST_CASE(test_execute_batch) { + return do_with_cql_env([] (auto& e) { + auto& qp = e.local_qp(); + auto bp = make_lw_shared(qp); + + return e.execute_cql("create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").discard_result().then([&qp, &e, bp] { + auto& db = e.local_db(); + auto s = db.find_schema("ks", "cf"); + + const column_definition& r1_col = *s->get_column_definition("r1"); + auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); + auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)}); + + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(100))); + + using namespace std::chrono_literals; + + auto bm = bp->get_batch_log_mutation_for({ m }, s->id(), 9, db_clock::now() - db_clock::duration(3h)); + + return qp.proxy().local().mutate_locally(bm).then([bp] { + return bp->count_all_batches().then([](auto n) { + BOOST_CHECK_EQUAL(n, 1); + }).then([bp] { + return bp->do_batch_log_replay(); + }); + }); + }).then([&qp, bp] { + return qp.execute_internal("select * from ks.cf where p1 = ? and c1 = ?;", { sstring("key1"), 1 }).then([](auto rs) { + BOOST_REQUIRE(!rs->empty()); + auto i = rs->one().template get_as("r1"); + BOOST_CHECK_EQUAL(i, int32_t(100)); + }); + }); + }); +} +