Merge "BatchLogMana" from Calle

"(Partial?) implementation of BatchLogManager.
Requires the token function/restriction series.

Functional as in that it can create batchlog mutations, and do replay
of data in this system table.
Since range queries does not yet work, it only handles a very small
table contents.

It is not used yet either, but will eventually be needed for batch statements
etc."
This commit is contained in:
Avi Kivity
2015-07-08 14:48:33 +03:00
12 changed files with 586 additions and 165 deletions

View File

@@ -193,6 +193,7 @@ urchin_tests = [
'tests/urchin/gossiping_property_file_snitch_test',
'tests/urchin/network_topology_strategy_test',
'tests/urchin/query_processor_test',
'tests/urchin/batchlog_manager_test',
]
tests = [
@@ -465,12 +466,14 @@ urchin_core = (['database.cc',
'db/config.cc',
'db/index/secondary_index.cc',
'db/marshal/type_parser.cc',
'db/batchlog_manager.cc',
'io/io.cc',
'utils/utils.cc',
'utils/UUID_gen.cc',
'utils/i_filter.cc',
'utils/bloom_filter.cc',
'utils/bloom_calculations.cc',
'utils/rate_limiter.cc',
'gms/version_generator.cc',
'gms/versioned_value.cc',
'gms/gossiper.cc',
@@ -580,6 +583,7 @@ deps['tests/urchin/gossiping_property_file_snitch_test'] += boost_test_lib
deps['tests/urchin/network_topology_strategy_test'] += boost_test_lib
deps['tests/urchin/row_cache_test'] += boost_test_lib
deps['tests/urchin/query_processor_test'] += boost_test_lib
deps['tests/urchin/batchlog_manager_test'] += boost_test_lib
deps['tests/urchin/bytes_ostream_test'] = ['tests/urchin/bytes_ostream_test.cc']
deps['tests/urchin/UUID_test'] = ['utils/UUID_gen.cc', 'tests/urchin/UUID_test.cc']

240
db/batchlog_manager.cc Normal file
View File

@@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#include <chrono>
#include "batchlog_manager.hh"
#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "system_keyspace.hh"
#include "utils/rate_limiter.hh"
#include "core/future-util.hh"
#include "core/do_with.hh"
#include "log.hh"
#include "serializer.hh"
#include "db_clock.hh"
#include "database.hh"
#include "unimplemented.hh"
static thread_local logging::logger logger("BatchLog Manager");
const uint32_t db::batchlog_manager::replay_interval;
const uint32_t db::batchlog_manager::page_size;
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp)
: _qp(qp)
{}
future<> db::batchlog_manager::start() {
_timer.set_callback(
std::bind(&batchlog_manager::replay_all_failed_batches, this));
_timer.arm(
lowres_clock::now()
+ std::chrono::milliseconds(
service::storage_service::RING_DELAY),
std::experimental::optional<lowres_clock::duration> {
std::chrono::milliseconds(replay_interval) });
return make_ready_future<>();
}
future<> db::batchlog_manager::stop() {
_stop = true;
_timer.cancel();
return _sem.wait(std::chrono::milliseconds(60));
}
future<size_t> db::batchlog_manager::count_all_batches() const {
sstring query = sprint("SELECT count(*) FROM %s.%s", system_keyspace::NAME, system_keyspace::BATCHLOG);
return _qp.execute_internal(query).then([](::shared_ptr<cql3::untyped_result_set> rs) {
return size_t(rs->one().get_as<int64_t>("count"));
});
}
mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector<mutation> mutations, const utils::UUID& id, int32_t version) {
return get_batch_log_mutation_for(std::move(mutations), id, version, db_clock::now());
}
mutation db::batchlog_manager::get_batch_log_mutation_for(std::vector<mutation> mutations, const utils::UUID& id, int32_t version, db_clock::time_point now) {
auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)});
auto timestamp = db_clock::now_in_usecs();
auto data = [this, &mutations] {
std::vector<frozen_mutation> fm(mutations.begin(), mutations.end());
const auto size = std::accumulate(fm.begin(), fm.end(), size_t(0), [](size_t s, auto& m) {
return s + serializer<frozen_mutation>{m}.size();
});
bytes buf(bytes::initialized_later(), size);
data_output out(buf);
for (auto& m : fm) {
serializer<frozen_mutation>{m}(out);
}
return buf;
}();
mutation m(key, schema);
m.set_cell({}, to_bytes("version"), version, timestamp);
m.set_cell({}, to_bytes("written_at"), now, timestamp);
m.set_cell({}, to_bytes("data"), std::move(data), timestamp);
return m;
}
db_clock::duration db::batchlog_manager::get_batch_log_timeout() const {
// enough time for the actual write + BM removal mutation
return db_clock::duration(_qp.db().local().get_config().write_request_timeout_in_ms()) * 2;
}
future<> db::batchlog_manager::replay_all_failed_batches() {
typedef db_clock::rep clock_type;
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
auto throttle_in_kb = _qp.db().local().get_config().batchlog_replay_throttle_in_kb() / service::get_storage_service().local().get_token_metadata().get_all_endpoints().size();
auto limiter = make_lw_shared<utils::rate_limiter>(throttle_in_kb * 1000);
auto batch = [this, limiter](const cql3::untyped_result_set::row& row) {
auto written_at = row.get_as<db_clock::time_point>("written_at");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = get_batch_log_timeout();
if (db_clock::now() < written_at + timeout) {
return make_ready_future<>();
}
// not used currently. ever?
//auto version = row.has("version") ? row.get_as<uint32_t>("version") : /*MessagingService.VERSION_12*/6u;
auto id = row.get_as<utils::UUID>("id");
auto data = row.get_blob("data");
logger.debug("Replaying batch {}", id);
auto fms = make_lw_shared<std::deque<frozen_mutation>>();
data_input in(data);
while (in.has_next()) {
fms->emplace_back(serializer<frozen_mutation>::read(in));
}
auto mutations = make_lw_shared<std::vector<mutation>>();
auto size = data.size();
return repeat([this, fms = std::move(fms), written_at, mutations]() mutable {
if (fms->empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto& fm = fms->front();
auto mid = fm.column_family_id();
return system_keyspace::get_truncated_at(_qp, mid).then([this, &fm, written_at, mutations](db_clock::time_point t) {
auto schema = _qp.db().local().find_schema(fm.column_family_id());
if (written_at > t) {
auto schema = _qp.db().local().find_schema(fm.column_family_id());
mutations->emplace_back(fm.unfreeze(schema));
}
}).then([fms] {
fms->pop_front();
return make_ready_future<stop_iteration>(stop_iteration::no);
});
}).then([this, id, mutations, limiter, written_at, size] {
if (mutations->empty()) {
return make_ready_future<>();
}
const auto ttl = [this, mutations, written_at]() -> clock_type {
/*
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
* This ensures that deletes aren't "undone" by an old batch replay.
*/
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
warn(unimplemented::cause::HINT);
#if 0
for (auto& m : *mutations) {
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
}
#endif
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
}();
if (ttl <= 0) {
return make_ready_future<>();
}
// Origin does the send manually, however I can't see a super great reason to do so.
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
// in both cases.
// FIXME: verify that the above is reasonably true.
return limiter->reserve(size).then([this, mutations, id] {
return _qp.proxy().local().mutate(*mutations, db::consistency_level::ANY);
});
}).then([this, id] {
// delete batch
auto schema = _qp.db().local().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(id)});
mutation m(key, schema);
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
m.partition().apply_delete(*schema, {}, tombstone(now, gc_clock::now()));
return _qp.proxy().local().mutate_locally(m);
});
};
return _sem.wait().then([this, batch = std::move(batch)] {
logger.debug("Started replayAllFailedBatches");
typedef ::shared_ptr<cql3::untyped_result_set> page_ptr;
sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size);
return _qp.execute_internal(query).then([this, batch = std::move(batch)](page_ptr page) {
return do_with(std::move(page), [this, batch = std::move(batch)](page_ptr & page) mutable {
return repeat([this, &page, batch = std::move(batch)]() mutable {
if (page->empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto id = page->back().get_as<utils::UUID>("id");
return parallel_for_each(*page, batch).then([this, &page, id]() {
if (page->size() < page_size) {
return make_ready_future<stop_iteration>(stop_iteration::yes); // we've exhausted the batchlog, next query would be empty.
}
sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
system_keyspace::NAME,
system_keyspace::BATCHLOG,
page_size);
return _qp.execute_internal(query, {id}).then([&page](auto res) {
page = std::move(res);
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
});
});
}).then([this] {
// TODO FIXME : cleanup()
#if 0
ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG);
cfs.forceBlockingFlush();
Collection<Descriptor> descriptors = new ArrayList<>();
for (SSTableReader sstr : cfs.getSSTables())
descriptors.add(sstr.descriptor);
if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
#endif
}).then([this] {
logger.debug("Finished replayAllFailedBatches");
});
}).finally([this] {
_sem.signal();
});
}

86
db/batchlog_manager.hh Normal file
View File

@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include <unordered_map>
#include "core/future.hh"
#include "core/distributed.hh"
#include "core/timer.hh"
#include "cql3/query_processor.hh"
#include "gms/inet_address.hh"
#include "db_clock.hh"
namespace db {
class batchlog_manager {
private:
static constexpr uint32_t replay_interval = 60 * 1000; // milliseconds
static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
using clock_type = lowres_clock;
size_t _total_batches_replayed = 0;
cql3::query_processor& _qp;
timer<clock_type> _timer;
semaphore _sem;
bool _stop = false;
future<> replay_all_failed_batches();
public:
// Takes a QP, not a distributes. Because this object is supposed
// to be per shard and does no dispatching beyond delegating the the
// shard qp (which is what you feed here).
batchlog_manager(cql3::query_processor&);
future<> start();
future<> stop();
// for testing.
future<> do_batch_log_replay() {
return replay_all_failed_batches();
}
future<size_t> count_all_batches() const;
size_t get_total_batches_replayed() const {
return _total_batches_replayed;
}
mutation get_batch_log_mutation_for(std::vector<mutation>, const utils::UUID&, int32_t);
mutation get_batch_log_mutation_for(std::vector<mutation>, const utils::UUID&, int32_t, db_clock::time_point);
db_clock::duration get_batch_log_timeout() const;
class endpoint_filter {
private:
const sstring _local_rack;
const std::unordered_map<sstring, std::vector<gms::inet_address>> _endpoints;
public:
endpoint_filter(sstring, std::unordered_map<sstring, std::vector<gms::inet_address>>);
/**
* @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
*/
std::vector<gms::inet_address> filter() const;
};
};
}

View File

@@ -234,6 +234,23 @@ frozen_mutation db::serializer<frozen_mutation>::read(input& in) {
return frozen_mutation(bytes_serializer::read(in));
}
template<>
db::serializer<db::replay_position>::serializer(const db::replay_position& rp)
: _item(rp), _size(sizeof(uint64_t) * 2) {
}
template<>
void db::serializer<db::replay_position>::write(output& out, const db::replay_position& rp) {
out.write<uint64_t>(rp.id);
out.write<uint64_t>(rp.pos);
}
template<>
void db::serializer<db::replay_position>::read(db::replay_position& rp, input& in) {
rp.id = in.read<uint64_t>();
rp.pos = in.read<uint64_t>();
}
template class db::serializer<tombstone> ;
template class db::serializer<bytes> ;
template class db::serializer<bytes_view> ;
@@ -245,3 +262,4 @@ template class db::serializer<partition_key_view> ;
template class db::serializer<clustering_key_view> ;
template class db::serializer<clustering_key_prefix_view> ;
template class db::serializer<frozen_mutation> ;
template class db::serializer<db::replay_position> ;

View File

@@ -13,6 +13,7 @@
#include "keys.hh"
#include "database_fwd.hh"
#include "frozen_mutation.hh"
#include "db/commitlog/replay_position.hh"
namespace db {
/**
@@ -116,6 +117,10 @@ template<> void serializer<clustering_key_prefix_view>::write(output&, const clu
template<> void serializer<clustering_key_prefix_view>::read(clustering_key_prefix_view&, input&);
template<> clustering_key_prefix_view serializer<clustering_key_prefix_view>::read(input&);
template<> serializer<db::replay_position>::serializer(const db::replay_position&);
template<> void serializer<db::replay_position>::write(output&, const db::replay_position&);
template<> void serializer<db::replay_position>::read(db::replay_position&, input&);
template<typename T>
T serializer<T>::read(input& in) {
type t;
@@ -131,6 +136,7 @@ extern template class serializer<utils::UUID>;
extern template class serializer<partition_key_view>;
extern template class serializer<clustering_key_view>;
extern template class serializer<clustering_key_prefix_view>;
extern template class serializer<db::replay_position>;
typedef serializer<tombstone> tombstone_serializer;
typedef serializer<bytes> bytes_serializer; // Compatible with bytes_view_serializer
@@ -143,6 +149,7 @@ typedef serializer<partition_key_view> partition_key_view_serializer;
typedef serializer<clustering_key_view> clustering_key_view_serializer;
typedef serializer<clustering_key_prefix_view> clustering_key_prefix_view_serializer;
typedef serializer<frozen_mutation> frozen_mutation_serializer;
typedef serializer<db::replay_position> replay_position_serializer;
}

View File

@@ -37,6 +37,8 @@
#include "version.hh"
#include "thrift/server.hh"
#include "exceptions/exceptions.hh"
#include "cql3/query_processor.hh"
#include "db/serializer.hh"
namespace db {
namespace system_keyspace {
@@ -501,89 +503,76 @@ future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp
UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
return CompactionHistoryTabularData.from(queryResultSet);
}
#endif
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
forceBlockingFlush(LOCAL);
typedef std::pair<db::replay_position, db_clock::time_point> truncation_entry;
typedef std::unordered_map<utils::UUID, truncation_entry> truncation_map;
static thread_local std::experimental::optional<truncation_map> truncation_records;
future<> save_truncation_record(cql3::query_processor& qp, const column_family& cf, db_clock::time_point truncated_at, const db::replay_position& rp) {
db::serializer<replay_position> rps(rp);
bytes buf(bytes::initialized_later(), sizeof(db_clock::rep) + rps.size());
data_output out(buf);
rps(out);
out.write<db_clock::rep>(truncated_at.time_since_epoch().count());
map_type_impl::native_type tmp;
tmp.emplace_back(boost::any{ cf.schema()->id() }, boost::any{ buf });
sstring req = sprint("UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'", LOCAL, LOCAL);
return qp.execute_internal(req, {tmp}).then([&qp](auto rs) {
truncation_records = {};
return force_blocking_flush(qp.db(), LOCAL);
});
}
/**
* This method is used to remove information about truncation time for specified column family
*/
future<> remove_truncation_record(cql3::query_processor& qp, utils::UUID id) {
sstring req = sprint("DELETE truncated_at[?] from system.%s WHERE key = '%s'", LOCAL, LOCAL);
return qp.execute_internal(req, {id}).then([&qp](auto rs) {
truncation_records = {};
return force_blocking_flush(qp.db(), LOCAL);
});
}
static future<truncation_entry> get_truncation_record(cql3::query_processor& qp, utils::UUID cf_id) {
if (!truncation_records) {
sstring req = sprint("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL);
return qp.execute_internal(req).then([&qp, cf_id](::shared_ptr<cql3::untyped_result_set> rs) {
truncation_map tmp;
if (!rs->empty() && rs->one().has("truncated_set")) {
auto map = rs->one().get_map<utils::UUID, bytes>("truncated_at");
for (auto& p : map) {
truncation_entry e;
data_input in(p.second);
e.first = db::serializer<replay_position>::read(in);
e.second = db_clock::time_point(db_clock::duration(in.read<db_clock::rep>()));
tmp[p.first] = e;
}
}
truncation_records = std::move(tmp);
return get_truncation_record(qp, cf_id);
});
}
return make_ready_future<truncation_entry>((*truncation_records)[cf_id]);
}
/**
* This method is used to remove information about truncation time for specified column family
*/
public static synchronized void removeTruncationRecord(UUID cfId)
{
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), cfId);
truncationRecords = null;
forceBlockingFlush(LOCAL);
}
future<db::replay_position> get_truncated_position(cql3::query_processor& qp, utils::UUID cf_id) {
return get_truncation_record(qp, cf_id).then([](truncation_entry e) {
return make_ready_future<db::replay_position>(e.first);
});
}
private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
DataOutputBuffer out = new DataOutputBuffer();
try
{
ReplayPosition.serializer.serialize(position, out);
out.writeLong(truncatedAt);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength()));
}
public static ReplayPosition getTruncatedPosition(UUID cfId)
{
Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
return record == null ? null : record.left;
}
public static long getTruncatedAt(UUID cfId)
{
Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
return record == null ? Long.MIN_VALUE : record.right;
}
private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
{
if (truncationRecords == null)
truncationRecords = readTruncationRecords();
return truncationRecords.get(cfId);
}
private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
{
UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
if (!rows.isEmpty() && rows.one().has("truncated_at"))
{
Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
}
return records;
}
private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
{
try
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
future<db_clock::time_point> get_truncated_at(cql3::query_processor& qp, utils::UUID cf_id) {
return get_truncation_record(qp, cf_id).then([](truncation_entry e) {
return make_ready_future<db_clock::time_point>(e.second);
});
}
#if 0
/**
* Record tokens being used by another node
*/
@@ -694,8 +683,19 @@ future<> force_blocking_flush(sstring cfname) {
return cf.flush(&qctx->db());
}
future<> force_blocking_flush(distributed<database>& db, sstring cf_name) {
#if 0
if (!Boolean.getBoolean("cassandra.unsafesystem"))
#endif
{
return db.invoke_on_all([cf_name](database& db) {
auto& cf = db.find_column_family(NAME, cf_name);
return cf.seal_active_memtable(&db);
});
}
}
#if 0
/**
* Return a map of stored tokens to IP addresses
*

View File

@@ -31,6 +31,8 @@
#include "gms/inet_address.hh"
#include "query-result-set.hh"
#include "locator/token_metadata.hh"
#include "db_clock.hh"
#include "db/commitlog/replay_position.hh"
namespace service {
@@ -39,7 +41,7 @@ class storage_proxy;
}
namespace cql3 {
class query_processor;
class query_processor;
}
namespace db {
@@ -210,88 +212,13 @@ load_dc_rack_info();
UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY));
return CompactionHistoryTabularData.from(queryResultSet);
}
#endif
future<> save_truncation_record(cql3::query_processor&, const column_family&, db_clock::time_point truncated_at, const db::replay_position&);
future<> remove_truncation_record(cql3::query_processor&, utils::UUID);
future<db::replay_position> get_truncated_position(cql3::query_processor&, utils::UUID);
future<db_clock::time_point> get_truncated_at(cql3::query_processor&, utils::UUID);
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
forceBlockingFlush(LOCAL);
}
/**
* This method is used to remove information about truncation time for specified column family
*/
public static synchronized void removeTruncationRecord(UUID cfId)
{
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), cfId);
truncationRecords = null;
forceBlockingFlush(LOCAL);
}
private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
DataOutputBuffer out = new DataOutputBuffer();
try
{
ReplayPosition.serializer.serialize(position, out);
out.writeLong(truncatedAt);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength()));
}
public static ReplayPosition getTruncatedPosition(UUID cfId)
{
Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
return record == null ? null : record.left;
}
public static long getTruncatedAt(UUID cfId)
{
Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
return record == null ? Long.MIN_VALUE : record.right;
}
private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
{
if (truncationRecords == null)
truncationRecords = readTruncationRecords();
return truncationRecords.get(cfId);
}
private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
{
UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
if (!rows.isEmpty() && rows.one().has("truncated_at"))
{
Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
}
return records;
}
private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
{
try
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
#if 0
/**
* Record tokens being used by another node
@@ -390,13 +317,11 @@ load_dc_rack_info();
updateTokens(tokens);
return tokens;
}
#endif
public static void forceBlockingFlush(String cfname)
{
if (!Boolean.getBoolean("cassandra.unsafesystem"))
FBUtilities.waitOnFuture(Keyspace.open(NAME).getColumnFamilyStore(cfname).forceFlush());
}
future<> force_blocking_flush(distributed<database>&, sstring cf_name);
#if o
/**
* Return a map of stored tokens to IP addresses
*

View File

@@ -835,6 +835,20 @@ public:
}
}
#endif
size_t number_of_endpoints() const {
return _endpoint_to_host_id_map.size();
}
std::vector<inet_address> get_all_endpoints() const {
std::vector<inet_address> tmp;
std::transform(_endpoint_to_host_id_map.begin(), _endpoint_to_host_id_map.end(), std::back_inserter(tmp), [](const auto& p) {
return p.first;
});
return tmp;
}
#if 0
public Set<InetAddress> getAllEndpoints()
{
lock.readLock().lock();

View File

@@ -57,6 +57,7 @@ boost_tests = [
'urchin/row_cache_test',
'urchin/network_topology_strategy_test',
'urchin/query_processor_test',
'urchin/batchlog_manager_test',
]
other_tests = [

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2015 Cloudius Systems
*/
#define BOOST_TEST_DYN_LINK
#include <boost/range/irange.hpp>
#include <boost/range/adaptors.hpp>
#include <boost/range/algorithm.hpp>
#include <boost/test/unit_test.hpp>
#include <stdint.h>
#include "tests/test-utils.hh"
#include "tests/urchin/cql_test_env.hh"
#include "tests/urchin/cql_assertions.hh"
#include "core/future-util.hh"
#include "core/shared_ptr.hh"
#include "transport/messages/result_message.hh"
#include "cql3/query_processor.hh"
#include "db/batchlog_manager.hh"
static atomic_cell make_atomic_cell(bytes value) {
return atomic_cell::make_live(0, std::move(value));
};
SEASTAR_TEST_CASE(test_execute_batch) {
return do_with_cql_env([] (auto& e) {
auto& qp = e.local_qp();
auto bp = make_lw_shared<db::batchlog_manager>(qp);
return e.execute_cql("create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").discard_result().then([&qp, &e, bp] {
auto& db = e.local_db();
auto s = db.find_schema("ks", "cf");
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
mutation m(key, s);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(100)));
using namespace std::chrono_literals;
auto bm = bp->get_batch_log_mutation_for({ m }, s->id(), 9, db_clock::now() - db_clock::duration(3h));
return qp.proxy().local().mutate_locally(bm).then([bp] {
return bp->count_all_batches().then([](auto n) {
BOOST_CHECK_EQUAL(n, 1);
}).then([bp] {
return bp->do_batch_log_replay();
});
});
}).then([&qp, bp] {
return qp.execute_internal("select * from ks.cf where p1 = ? and c1 = ?;", { sstring("key1"), 1 }).then([](auto rs) {
BOOST_REQUIRE(!rs->empty());
auto i = rs->one().template get_as<int32_t>("r1");
BOOST_CHECK_EQUAL(i, int32_t(100));
});
});
});
}

33
utils/rate_limiter.cc Normal file
View File

@@ -0,0 +1,33 @@
/*
* Copyright 2015 Cloudius Systems
*/
#include "rate_limiter.hh"
utils::rate_limiter::rate_limiter(size_t rate)
: _units_per_s(rate) {
if (_units_per_s != 0) {
_timer.set_callback(std::bind(&rate_limiter::on_timer, this));
_timer.arm(lowres_clock::now() + std::chrono::seconds(1),
std::experimental::optional<lowres_clock::duration> {
std::chrono::seconds(1) });
}
}
void utils::rate_limiter::on_timer() {
_sem.signal(_units_per_s - _sem.current());
}
future<> utils::rate_limiter::reserve(size_t u) {
if (_units_per_s == 0) {
return make_ready_future<>();
}
if (u <= _units_per_s) {
return _sem.wait(u);
}
auto n = std::min(u, _units_per_s);
auto r = u - n;
return _sem.wait(n).then([this, r] {
return reserve(r);
});
}

30
utils/rate_limiter.hh Normal file
View File

@@ -0,0 +1,30 @@
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include "core/timer.hh"
#include "core/semaphore.hh"
#include "core/reactor.hh"
namespace utils {
/**
* 100% naive rate limiter. Consider it a placeholder
* Will let you process X "units" per second, then reset this every s.
* Obviously, accuracy is virtually non-existant and steady rate will fluctuate.
*/
class rate_limiter {
private:
timer<lowres_clock> _timer;
size_t _units_per_s;
semaphore _sem;
void on_timer();
public:
rate_limiter(size_t rate);
future<> reserve(size_t u);
};
}