diff --git a/configure.py b/configure.py index da17fcd52a..29ce991544 100755 --- a/configure.py +++ b/configure.py @@ -193,6 +193,7 @@ urchin_tests = [ 'tests/urchin/gossiping_property_file_snitch_test', 'tests/urchin/network_topology_strategy_test', 'tests/urchin/query_processor_test', + 'tests/urchin/batchlog_manager_test', ] tests = [ @@ -465,12 +466,14 @@ urchin_core = (['database.cc', 'db/config.cc', 'db/index/secondary_index.cc', 'db/marshal/type_parser.cc', + 'db/batchlog_manager.cc', 'io/io.cc', 'utils/utils.cc', 'utils/UUID_gen.cc', 'utils/i_filter.cc', 'utils/bloom_filter.cc', 'utils/bloom_calculations.cc', + 'utils/rate_limiter.cc', 'gms/version_generator.cc', 'gms/versioned_value.cc', 'gms/gossiper.cc', @@ -580,6 +583,7 @@ deps['tests/urchin/gossiping_property_file_snitch_test'] += boost_test_lib deps['tests/urchin/network_topology_strategy_test'] += boost_test_lib deps['tests/urchin/row_cache_test'] += boost_test_lib deps['tests/urchin/query_processor_test'] += boost_test_lib +deps['tests/urchin/batchlog_manager_test'] += boost_test_lib deps['tests/urchin/bytes_ostream_test'] = ['tests/urchin/bytes_ostream_test.cc'] deps['tests/urchin/UUID_test'] = ['utils/UUID_gen.cc', 'tests/urchin/UUID_test.cc'] diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc new file mode 100644 index 0000000000..d3e03a9456 --- /dev/null +++ b/db/batchlog_manager.cc @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#include +#include "batchlog_manager.hh" +#include "service/storage_service.hh" +#include "service/storage_proxy.hh" +#include "system_keyspace.hh" +#include "utils/rate_limiter.hh" +#include "core/future-util.hh" +#include "core/do_with.hh" +#include "log.hh" +#include "serializer.hh" +#include "db_clock.hh" +#include "database.hh" +#include "unimplemented.hh" + +static thread_local logging::logger logger("BatchLog Manager"); + +const uint32_t db::batchlog_manager::replay_interval; +const uint32_t db::batchlog_manager::page_size; + +db::batchlog_manager::batchlog_manager(cql3::query_processor& qp) + : _qp(qp) +{} + +future<> db::batchlog_manager::start() { + _timer.set_callback( + std::bind(&batchlog_manager::replay_all_failed_batches, this)); + _timer.arm( + lowres_clock::now() + + std::chrono::milliseconds( + service::storage_service::RING_DELAY), + std::experimental::optional { + std::chrono::milliseconds(replay_interval) }); + return make_ready_future<>(); +} + +future<> db::batchlog_manager::stop() { + _stop = true; + _timer.cancel(); + return _sem.wait(std::chrono::milliseconds(60)); +} + +future db::batchlog_manager::count_all_batches() const { + sstring query = sprint("SELECT count(*) FROM %s.%s", system_keyspace::NAME, system_keyspace::BATCHLOG); + return _qp.execute_internal(query).then([](::shared_ptr rs) { + return size_t(rs->one().get_as("count")); + }); +} + +mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector mutations, const utils::UUID& id, int32_t version) { + return get_batch_log_mutation_for(std::move(mutations), id, version, db_clock::now()); +} + +mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector mutations, const utils::UUID& id, int32_t version, db_clock::time_point now) { + auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); + auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)}); + auto timestamp = db_clock::now_in_usecs(); + auto data = [this, &mutations] { + std::vector fm(mutations.begin(), mutations.end()); + const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) { + return s + serializer{m}.size(); + }); + bytes buf(bytes::initialized_later(), size); + data_output out(buf); + for (auto& m : fm) { + serializer{m}(out); + } + return buf; + }(); + + mutation m(key, schema); + m.set_cell({}, to_bytes("version"), version, timestamp); + m.set_cell({}, to_bytes("written_at"), now, timestamp); + m.set_cell({}, to_bytes("data"), std::move(data), timestamp); + + return m; +} + +db_clock::duration db::batchlog_manager::get_batch_log_timeout() const { + // enough time for the actual write + BM removal mutation + return db_clock::duration(_qp.db().local().get_config().write_request_timeout_in_ms()) * 2; +} + +future<> db::batchlog_manager::replay_all_failed_batches() { + typedef db_clock::rep clock_type; + + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). + auto throttle_in_kb = _qp.db().local().get_config().batchlog_replay_throttle_in_kb() / service::get_storage_service().local().get_token_metadata().get_all_endpoints().size(); + auto limiter = make_lw_shared(throttle_in_kb * 1000); + + auto batch = [this, limiter](const cql3::untyped_result_set::row& row) { + auto written_at = row.get_as("written_at"); + // enough time for the actual write + batchlog entry mutation delivery (two separate requests). + // enough time for the actual write + batchlog entry mutation delivery (two separate requests). + auto timeout = get_batch_log_timeout(); + if (db_clock::now() < written_at + timeout) { + return make_ready_future<>(); + } + // not used currently. ever? + //auto version = row.has("version") ? row.get_as("version") : /*MessagingService.VERSION_12*/6u; + auto id = row.get_as("id"); + auto data = row.get_blob("data"); + + logger.debug("Replaying batch {}", id); + + auto fms = make_lw_shared>(); + data_input in(data); + while (in.has_next()) { + fms->emplace_back(serializer::read(in)); + } + + auto mutations = make_lw_shared>(); + auto size = data.size(); + + return repeat([this, fms = std::move(fms), written_at, mutations]() mutable { + if (fms->empty()) { + return make_ready_future(stop_iteration::yes); + } + auto& fm = fms->front(); + auto mid = fm.column_family_id(); + return system_keyspace::get_truncated_at(_qp, mid).then([this, &fm, written_at, mutations](db_clock::time_point t) { + auto schema = _qp.db().local().find_schema(fm.column_family_id()); + if (written_at > t) { + auto schema = _qp.db().local().find_schema(fm.column_family_id()); + mutations->emplace_back(fm.unfreeze(schema)); + } + }).then([fms] { + fms->pop_front(); + return make_ready_future(stop_iteration::no); + }); + }).then([this, id, mutations, limiter, written_at, size] { + if (mutations->empty()) { + return make_ready_future<>(); + } + const auto ttl = [this, mutations, written_at]() -> clock_type { + /* + * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). + * This ensures that deletes aren't "undone" by an old batch replay. + */ + auto unadjusted_ttl = std::numeric_limits::max(); + warn(unimplemented::cause::HINT); +#if 0 + for (auto& m : *mutations) { + unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); + } +#endif + return unadjusted_ttl - std::chrono::duration_cast(db_clock::now() - written_at).count(); + }(); + + if (ttl <= 0) { + return make_ready_future<>(); + } + // Origin does the send manually, however I can't see a super great reason to do so. + // Our normal write path does not add much redundancy to the dispatch, and rate is handled after send + // in both cases. + // FIXME: verify that the above is reasonably true. + return limiter->reserve(size).then([this, mutations, id] { + return _qp.proxy().local().mutate(*mutations, db::consistency_level::ANY); + }); + }).then([this, id] { + // delete batch + auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); + auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)}); + mutation m(key, schema); + auto now = service::client_state(service::client_state::internal_tag()).get_timestamp(); + m.partition().apply_delete(*schema, {}, tombstone(now, gc_clock::now())); + return _qp.proxy().local().mutate_locally(m); + }); + }; + + return _sem.wait().then([this, batch = std::move(batch)] { + logger.debug("Started replayAllFailedBatches"); + + typedef ::shared_ptr page_ptr; + sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size); + return _qp.execute_internal(query).then([this, batch = std::move(batch)](page_ptr page) { + return do_with(std::move(page), [this, batch = std::move(batch)](page_ptr & page) mutable { + return repeat([this, &page, batch = std::move(batch)]() mutable { + if (page->empty()) { + return make_ready_future(stop_iteration::yes); + } + auto id = page->back().get_as("id"); + return parallel_for_each(*page, batch).then([this, &page, id]() { + if (page->size() < page_size) { + return make_ready_future(stop_iteration::yes); // we've exhausted the batchlog, next query would be empty. + } + sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", + system_keyspace::NAME, + system_keyspace::BATCHLOG, + page_size); + return _qp.execute_internal(query, {id}).then([&page](auto res) { + page = std::move(res); + return make_ready_future(stop_iteration::no); + }); + }); + }); + }); + }).then([this] { + // TODO FIXME : cleanup() +#if 0 + ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); + cfs.forceBlockingFlush(); + Collection descriptors = new ArrayList<>(); + for (SSTableReader sstr : cfs.getSSTables()) + descriptors.add(sstr.descriptor); + if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. + CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); + +#endif + + }).then([this] { + logger.debug("Finished replayAllFailedBatches"); + }); + }).finally([this] { + _sem.signal(); + }); +} diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh new file mode 100644 index 0000000000..62174bf0d8 --- /dev/null +++ b/db/batchlog_manager.hh @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include +#include "core/future.hh" +#include "core/distributed.hh" +#include "core/timer.hh" +#include "cql3/query_processor.hh" +#include "gms/inet_address.hh" +#include "db_clock.hh" + +namespace db { + +class batchlog_manager { +private: + static constexpr uint32_t replay_interval = 60 * 1000; // milliseconds + static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. + + using clock_type = lowres_clock; + + size_t _total_batches_replayed = 0; + cql3::query_processor& _qp; + timer _timer; + semaphore _sem; + bool _stop = false; + + future<> replay_all_failed_batches(); +public: + // Takes a QP, not a distributes. Because this object is supposed + // to be per shard and does no dispatching beyond delegating the the + // shard qp (which is what you feed here). + batchlog_manager(cql3::query_processor&); + + future<> start(); + future<> stop(); + + // for testing. + future<> do_batch_log_replay() { + return replay_all_failed_batches(); + } + future count_all_batches() const; + size_t get_total_batches_replayed() const { + return _total_batches_replayed; + } + mutation get_batch_log_mutation_for(std::vector, const utils::UUID&, int32_t); + mutation get_batch_log_mutation_for(std::vector, const utils::UUID&, int32_t, db_clock::time_point); + db_clock::duration get_batch_log_timeout() const; + + class endpoint_filter { + private: + const sstring _local_rack; + const std::unordered_map> _endpoints; + + public: + endpoint_filter(sstring, std::unordered_map>); + /** + * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + */ + std::vector filter() const; + }; +}; + +} diff --git a/db/serializer.cc b/db/serializer.cc index 63d4283c9a..3b4150da96 100644 --- a/db/serializer.cc +++ b/db/serializer.cc @@ -234,6 +234,23 @@ frozen_mutation db::serializer::read(input& in) { return frozen_mutation(bytes_serializer::read(in)); } +template<> +db::serializer::serializer(const db::replay_position& rp) + : _item(rp), _size(sizeof(uint64_t) * 2) { +} + +template<> +void db::serializer::write(output& out, const db::replay_position& rp) { + out.write(rp.id); + out.write(rp.pos); +} + +template<> +void db::serializer::read(db::replay_position& rp, input& in) { + rp.id = in.read(); + rp.pos = in.read(); +} + template class db::serializer ; template class db::serializer ; template class db::serializer ; @@ -245,3 +262,4 @@ template class db::serializer ; template class db::serializer ; template class db::serializer ; template class db::serializer ; +template class db::serializer ; diff --git a/db/serializer.hh b/db/serializer.hh index 54a9bf3e51..eb37db8411 100644 --- a/db/serializer.hh +++ b/db/serializer.hh @@ -13,6 +13,7 @@ #include "keys.hh" #include "database_fwd.hh" #include "frozen_mutation.hh" +#include "db/commitlog/replay_position.hh" namespace db { /** @@ -116,6 +117,10 @@ template<> void serializer::write(output&, const clu template<> void serializer::read(clustering_key_prefix_view&, input&); template<> clustering_key_prefix_view serializer::read(input&); +template<> serializer::serializer(const db::replay_position&); +template<> void serializer::write(output&, const db::replay_position&); +template<> void serializer::read(db::replay_position&, input&); + template T serializer::read(input& in) { type t; @@ -131,6 +136,7 @@ extern template class serializer; extern template class serializer; extern template class serializer; extern template class serializer; +extern template class serializer; typedef serializer tombstone_serializer; typedef serializer bytes_serializer; // Compatible with bytes_view_serializer @@ -143,6 +149,7 @@ typedef serializer partition_key_view_serializer; typedef serializer clustering_key_view_serializer; typedef serializer clustering_key_prefix_view_serializer; typedef serializer frozen_mutation_serializer; +typedef serializer replay_position_serializer; } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index cbb0629dfe..2902aeefe0 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -37,6 +37,8 @@ #include "version.hh" #include "thrift/server.hh" #include "exceptions/exceptions.hh" +#include "cql3/query_processor.hh" +#include "db/serializer.hh" namespace db { namespace system_keyspace { @@ -501,89 +503,76 @@ future<> setup(distributed& db, distributed& qp UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); return CompactionHistoryTabularData.from(queryResultSet); } +#endif - public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); - truncationRecords = null; - forceBlockingFlush(LOCAL); + +typedef std::pair truncation_entry; +typedef std::unordered_map truncation_map; +static thread_local std::experimental::optional truncation_records; + +future<> save_truncation_record(cql3::query_processor& qp, const column_family& cf, db_clock::time_point truncated_at, const db::replay_position& rp) { + db::serializer rps(rp); + bytes buf(bytes::initialized_later(), sizeof(db_clock::rep) + rps.size()); + data_output out(buf); + rps(out); + out.write(truncated_at.time_since_epoch().count()); + + map_type_impl::native_type tmp; + tmp.emplace_back(boost::any{ cf.schema()->id() }, boost::any{ buf }); + + sstring req = sprint("UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'", LOCAL, LOCAL); + return qp.execute_internal(req, {tmp}).then([&qp](auto rs) { + truncation_records = {}; + return force_blocking_flush(qp.db(), LOCAL); + }); +} + +/** + * This method is used to remove information about truncation time for specified column family + */ +future<> remove_truncation_record(cql3::query_processor& qp, utils::UUID id) { + sstring req = sprint("DELETE truncated_at[?] from system.%s WHERE key = '%s'", LOCAL, LOCAL); + return qp.execute_internal(req, {id}).then([&qp](auto rs) { + truncation_records = {}; + return force_blocking_flush(qp.db(), LOCAL); + }); +} + +static future get_truncation_record(cql3::query_processor& qp, utils::UUID cf_id) { + if (!truncation_records) { + sstring req = sprint("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL); + return qp.execute_internal(req).then([&qp, cf_id](::shared_ptr rs) { + truncation_map tmp; + if (!rs->empty() && rs->one().has("truncated_set")) { + auto map = rs->one().get_map("truncated_at"); + for (auto& p : map) { + truncation_entry e; + data_input in(p.second); + e.first = db::serializer::read(in); + e.second = db_clock::time_point(db_clock::duration(in.read())); + tmp[p.first] = e; + } + } + truncation_records = std::move(tmp); + return get_truncation_record(qp, cf_id); + }); } + return make_ready_future((*truncation_records)[cf_id]); +} - /** - * This method is used to remove information about truncation time for specified column family - */ - public static synchronized void removeTruncationRecord(UUID cfId) - { - String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), cfId); - truncationRecords = null; - forceBlockingFlush(LOCAL); - } +future get_truncated_position(cql3::query_processor& qp, utils::UUID cf_id) { + return get_truncation_record(qp, cf_id).then([](truncation_entry e) { + return make_ready_future(e.first); + }); +} - private static Map truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - DataOutputBuffer out = new DataOutputBuffer(); - try - { - ReplayPosition.serializer.serialize(position, out); - out.writeLong(truncatedAt); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); - } - - public static ReplayPosition getTruncatedPosition(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? null : record.left; - } - - public static long getTruncatedAt(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? Long.MIN_VALUE : record.right; - } - - private static synchronized Pair getTruncationRecord(UUID cfId) - { - if (truncationRecords == null) - truncationRecords = readTruncationRecords(); - return truncationRecords.get(cfId); - } - - private static Map> readTruncationRecords() - { - UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); - - Map> records = new HashMap<>(); - - if (!rows.isEmpty() && rows.one().has("truncated_at")) - { - Map map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance); - for (Map.Entry entry : map.entrySet()) - records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); - } - - return records; - } - - private static Pair truncationRecordFromBlob(ByteBuffer bytes) - { - try - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); - return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } +future get_truncated_at(cql3::query_processor& qp, utils::UUID cf_id) { + return get_truncation_record(qp, cf_id).then([](truncation_entry e) { + return make_ready_future(e.second); + }); +} +#if 0 /** * Record tokens being used by another node */ @@ -694,8 +683,19 @@ future<> force_blocking_flush(sstring cfname) { return cf.flush(&qctx->db()); } +future<> force_blocking_flush(distributed& db, sstring cf_name) { #if 0 + if (!Boolean.getBoolean("cassandra.unsafesystem")) +#endif + { + return db.invoke_on_all([cf_name](database& db) { + auto& cf = db.find_column_family(NAME, cf_name); + return cf.seal_active_memtable(&db); + }); + } +} +#if 0 /** * Return a map of stored tokens to IP addresses * diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 65e70e673b..8ad282d886 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -31,6 +31,8 @@ #include "gms/inet_address.hh" #include "query-result-set.hh" #include "locator/token_metadata.hh" +#include "db_clock.hh" +#include "db/commitlog/replay_position.hh" namespace service { @@ -39,7 +41,7 @@ class storage_proxy; } namespace cql3 { -class query_processor; + class query_processor; } namespace db { @@ -210,88 +212,13 @@ load_dc_rack_info(); UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); return CompactionHistoryTabularData.from(queryResultSet); } +#endif + future<> save_truncation_record(cql3::query_processor&, const column_family&, db_clock::time_point truncated_at, const db::replay_position&); + future<> remove_truncation_record(cql3::query_processor&, utils::UUID); + future get_truncated_position(cql3::query_processor&, utils::UUID); + future get_truncated_at(cql3::query_processor&, utils::UUID); - public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); - truncationRecords = null; - forceBlockingFlush(LOCAL); - } - - /** - * This method is used to remove information about truncation time for specified column family - */ - public static synchronized void removeTruncationRecord(UUID cfId) - { - String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), cfId); - truncationRecords = null; - forceBlockingFlush(LOCAL); - } - - private static Map truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) - { - DataOutputBuffer out = new DataOutputBuffer(); - try - { - ReplayPosition.serializer.serialize(position, out); - out.writeLong(truncatedAt); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); - } - - public static ReplayPosition getTruncatedPosition(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? null : record.left; - } - - public static long getTruncatedAt(UUID cfId) - { - Pair record = getTruncationRecord(cfId); - return record == null ? Long.MIN_VALUE : record.right; - } - - private static synchronized Pair getTruncationRecord(UUID cfId) - { - if (truncationRecords == null) - truncationRecords = readTruncationRecords(); - return truncationRecords.get(cfId); - } - - private static Map> readTruncationRecords() - { - UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); - - Map> records = new HashMap<>(); - - if (!rows.isEmpty() && rows.one().has("truncated_at")) - { - Map map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance); - for (Map.Entry entry : map.entrySet()) - records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); - } - - return records; - } - - private static Pair truncationRecordFromBlob(ByteBuffer bytes) - { - try - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); - return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } +#if 0 /** * Record tokens being used by another node @@ -390,13 +317,11 @@ load_dc_rack_info(); updateTokens(tokens); return tokens; } +#endif - public static void forceBlockingFlush(String cfname) - { - if (!Boolean.getBoolean("cassandra.unsafesystem")) - FBUtilities.waitOnFuture(Keyspace.open(NAME).getColumnFamilyStore(cfname).forceFlush()); - } + future<> force_blocking_flush(distributed&, sstring cf_name); +#if o /** * Return a map of stored tokens to IP addresses * diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 5bf5697c2f..86b2431398 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -835,6 +835,20 @@ public: } } +#endif + size_t number_of_endpoints() const { + return _endpoint_to_host_id_map.size(); + } + + std::vector get_all_endpoints() const { + std::vector tmp; + std::transform(_endpoint_to_host_id_map.begin(), _endpoint_to_host_id_map.end(), std::back_inserter(tmp), [](const auto& p) { + return p.first; + }); + return tmp; + } + +#if 0 public Set getAllEndpoints() { lock.readLock().lock(); diff --git a/test.py b/test.py index 68d467ecc8..11aefb820d 100755 --- a/test.py +++ b/test.py @@ -57,6 +57,7 @@ boost_tests = [ 'urchin/row_cache_test', 'urchin/network_topology_strategy_test', 'urchin/query_processor_test', + 'urchin/batchlog_manager_test', ] other_tests = [ diff --git a/tests/urchin/batchlog_manager_test.cc b/tests/urchin/batchlog_manager_test.cc new file mode 100644 index 0000000000..b7db946a51 --- /dev/null +++ b/tests/urchin/batchlog_manager_test.cc @@ -0,0 +1,63 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK + +#include +#include +#include +#include +#include + +#include "tests/test-utils.hh" +#include "tests/urchin/cql_test_env.hh" +#include "tests/urchin/cql_assertions.hh" + +#include "core/future-util.hh" +#include "core/shared_ptr.hh" +#include "transport/messages/result_message.hh" +#include "cql3/query_processor.hh" +#include "db/batchlog_manager.hh" + +static atomic_cell make_atomic_cell(bytes value) { + return atomic_cell::make_live(0, std::move(value)); +}; + +SEASTAR_TEST_CASE(test_execute_batch) { + return do_with_cql_env([] (auto& e) { + auto& qp = e.local_qp(); + auto bp = make_lw_shared(qp); + + return e.execute_cql("create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").discard_result().then([&qp, &e, bp] { + auto& db = e.local_db(); + auto s = db.find_schema("ks", "cf"); + + const column_definition& r1_col = *s->get_column_definition("r1"); + auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); + auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)}); + + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(100))); + + using namespace std::chrono_literals; + + auto bm = bp->get_batch_log_mutation_for({ m }, s->id(), 9, db_clock::now() - db_clock::duration(3h)); + + return qp.proxy().local().mutate_locally(bm).then([bp] { + return bp->count_all_batches().then([](auto n) { + BOOST_CHECK_EQUAL(n, 1); + }).then([bp] { + return bp->do_batch_log_replay(); + }); + }); + }).then([&qp, bp] { + return qp.execute_internal("select * from ks.cf where p1 = ? and c1 = ?;", { sstring("key1"), 1 }).then([](auto rs) { + BOOST_REQUIRE(!rs->empty()); + auto i = rs->one().template get_as("r1"); + BOOST_CHECK_EQUAL(i, int32_t(100)); + }); + }); + }); +} + diff --git a/utils/rate_limiter.cc b/utils/rate_limiter.cc new file mode 100644 index 0000000000..789ef18200 --- /dev/null +++ b/utils/rate_limiter.cc @@ -0,0 +1,33 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include "rate_limiter.hh" + +utils::rate_limiter::rate_limiter(size_t rate) + : _units_per_s(rate) { + if (_units_per_s != 0) { + _timer.set_callback(std::bind(&rate_limiter::on_timer, this)); + _timer.arm(lowres_clock::now() + std::chrono::seconds(1), + std::experimental::optional { + std::chrono::seconds(1) }); + } +} + +void utils::rate_limiter::on_timer() { + _sem.signal(_units_per_s - _sem.current()); +} + +future<> utils::rate_limiter::reserve(size_t u) { + if (_units_per_s == 0) { + return make_ready_future<>(); + } + if (u <= _units_per_s) { + return _sem.wait(u); + } + auto n = std::min(u, _units_per_s); + auto r = u - n; + return _sem.wait(n).then([this, r] { + return reserve(r); + }); +} diff --git a/utils/rate_limiter.hh b/utils/rate_limiter.hh new file mode 100644 index 0000000000..066654afc9 --- /dev/null +++ b/utils/rate_limiter.hh @@ -0,0 +1,30 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once + +#include "core/timer.hh" +#include "core/semaphore.hh" +#include "core/reactor.hh" + +namespace utils { + +/** + * 100% naive rate limiter. Consider it a placeholder + * Will let you process X "units" per second, then reset this every s. + * Obviously, accuracy is virtually non-existant and steady rate will fluctuate. + */ +class rate_limiter { +private: + timer _timer; + size_t _units_per_s; + semaphore _sem; + + void on_timer(); +public: + rate_limiter(size_t rate); + future<> reserve(size_t u); +}; + +}