Compare commits
44 Commits
copilot/fi
...
scylla-1.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
23c340bed8 | ||
|
|
503f6c6755 | ||
|
|
7d73599acd | ||
|
|
bf27379583 | ||
|
|
02cf5a517a | ||
|
|
ec3d59bf13 | ||
|
|
30c72ef3b4 | ||
|
|
15e69a32ba | ||
|
|
4e43cb84ff | ||
|
|
07d5e939be | ||
|
|
a2a5a22504 | ||
|
|
a39bec0e24 | ||
|
|
f0af5719d5 | ||
|
|
0523000af5 | ||
|
|
69a0e6e002 | ||
|
|
58d4de295c | ||
|
|
026061733f | ||
|
|
1d7ed190f8 | ||
|
|
2d66a4621a | ||
|
|
aaa9b5ace8 | ||
|
|
8d491e9879 | ||
|
|
b63c9fb84b | ||
|
|
b229f03198 | ||
|
|
6caa59560b | ||
|
|
79196af9fb | ||
|
|
afe09da858 | ||
|
|
d6cb41ff24 | ||
|
|
6bf77c7b49 | ||
|
|
6d34b4dab7 | ||
|
|
d367f1e9ab | ||
|
|
75a36ae453 | ||
|
|
35c1781913 | ||
|
|
1489b28ffd | ||
|
|
f975653c94 | ||
|
|
96f5cbb604 | ||
|
|
66ebef7d10 | ||
|
|
789fb0db97 | ||
|
|
af7c0f6433 | ||
|
|
aaf6786997 | ||
|
|
e8cb163cdf | ||
|
|
2d7c322805 | ||
|
|
13f18c6445 | ||
|
|
9c430c2cff | ||
|
|
c84e030fe9 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.3.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -219,8 +219,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([uuid, total](database& db) {
|
||||
std::unordered_map<sstring, uint64_t> m;
|
||||
for (auto t :*((total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
|
||||
db.find_column_family(uuid).get_sstables()).get()) {
|
||||
auto sstables = (total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
|
||||
db.find_column_family(uuid).get_sstables();
|
||||
for (auto t : *sstables) {
|
||||
m[t->get_filename()] = t->bytes_on_disk();
|
||||
}
|
||||
return m;
|
||||
@@ -234,8 +235,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
|
||||
static future<json::json_return_type> sum_sstable(http_context& ctx, bool total) {
|
||||
return map_reduce_cf_raw(ctx, std::unordered_map<sstring, uint64_t>(), [total](column_family& cf) {
|
||||
std::unordered_map<sstring, uint64_t> m;
|
||||
for (auto t :*((total) ? cf.get_sstables_including_compacted_undeleted() :
|
||||
cf.get_sstables()).get()) {
|
||||
auto sstables = (total) ? cf.get_sstables_including_compacted_undeleted() :
|
||||
cf.get_sstables();
|
||||
for (auto t : *sstables) {
|
||||
m[t->get_filename()] = t->bytes_on_disk();
|
||||
}
|
||||
return m;
|
||||
|
||||
10
database.cc
10
database.cc
@@ -127,7 +127,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl
|
||||
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
|
||||
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
||||
, _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema)))
|
||||
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker())
|
||||
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker(), _config.max_cached_partition_size_in_bytes)
|
||||
, _commitlog(cl)
|
||||
, _compaction_manager(compaction_manager)
|
||||
, _flush_queue(std::make_unique<memtable_flush_queue>())
|
||||
@@ -785,7 +785,7 @@ future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_bi
|
||||
future<>
|
||||
column_family::seal_active_memtable(memtable_list::flush_behavior ignored) {
|
||||
auto old = _memtables->back();
|
||||
dblog.debug("Sealing active memtable, partitions: {}, occupancy: {}", old->partition_count(), old->occupancy());
|
||||
dblog.debug("Sealing active memtable of {}.{}, partitions: {}, occupancy: {}", _schema->cf_name(), _schema->ks_name(), old->partition_count(), old->occupancy());
|
||||
|
||||
if (old->empty()) {
|
||||
dblog.debug("Memtable is empty");
|
||||
@@ -1581,7 +1581,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
|
||||
return parallel_for_each(tables.begin(), tables.end(), [this] (auto& t) {
|
||||
auto s = t.second;
|
||||
auto& ks = this->find_keyspace(s->ks_name());
|
||||
auto cfg = ks.make_column_family_config(*s);
|
||||
auto cfg = ks.make_column_family_config(*s, this->get_config());
|
||||
this->add_column_family(s, std::move(cfg));
|
||||
return ks.make_directory_for_column_family(s->cf_name(), s->id()).then([s] {});
|
||||
});
|
||||
@@ -1838,7 +1838,7 @@ void keyspace::update_from(::lw_shared_ptr<keyspace_metadata> ksm) {
|
||||
}
|
||||
|
||||
column_family::config
|
||||
keyspace::make_column_family_config(const schema& s) const {
|
||||
keyspace::make_column_family_config(const schema& s, const db::config& db_config) const {
|
||||
column_family::config cfg;
|
||||
cfg.datadir = column_family_directory(s.cf_name(), s.id());
|
||||
cfg.enable_disk_reads = _config.enable_disk_reads;
|
||||
@@ -1852,6 +1852,7 @@ keyspace::make_column_family_config(const schema& s) const {
|
||||
cfg.read_concurrency_config = _config.read_concurrency_config;
|
||||
cfg.cf_stats = _config.cf_stats;
|
||||
cfg.enable_incremental_backups = _config.enable_incremental_backups;
|
||||
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
|
||||
|
||||
return cfg;
|
||||
}
|
||||
@@ -2502,6 +2503,7 @@ future<> update_schema_version_and_announce(distributed<service::storage_proxy>&
|
||||
return make_ready_future<>();
|
||||
}).then([uuid] {
|
||||
return db::system_keyspace::update_schema_version(uuid).then([uuid] {
|
||||
dblog.info("Schema version changed to {}", uuid);
|
||||
return service::get_local_migration_manager().passive_announce(uuid);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -316,6 +316,7 @@ public:
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
uint64_t max_cached_partition_size_in_bytes;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
struct stats {
|
||||
@@ -884,7 +885,7 @@ public:
|
||||
*/
|
||||
locator::abstract_replication_strategy& get_replication_strategy();
|
||||
const locator::abstract_replication_strategy& get_replication_strategy() const;
|
||||
column_family::config make_column_family_config(const schema& s) const;
|
||||
column_family::config make_column_family_config(const schema& s, const db::config& db_config) const;
|
||||
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
|
||||
void add_or_update_column_family(const schema_ptr& s) {
|
||||
_metadata->add_or_update_column_family(s);
|
||||
|
||||
@@ -369,6 +369,9 @@ public:
|
||||
val(reduce_cache_sizes_at, double, .85, Invalid, \
|
||||
"When Java heap usage (after a full concurrent mark sweep (CMS) garbage collection) exceeds this percentage, Cassandra reduces the cache capacity to the fraction of the current size as specified by reduce_cache_capacity_to. To disable, set the value to 1.0." \
|
||||
) \
|
||||
val(max_cached_partition_size_in_kb, uint64_t, 10240uLL, Used, \
|
||||
"Partitions with size greater than this value won't be cached." \
|
||||
) \
|
||||
/* Disks settings */ \
|
||||
val(stream_throughput_outbound_megabits_per_sec, uint32_t, 400, Unused, \
|
||||
"Throttles all outbound streaming file transfers on a node to the specified throughput. Cassandra does mostly sequential I/O when streaming data during bootstrap or repair, which can lead to saturating the network connection and degrading client (RPC) performance." \
|
||||
@@ -556,7 +559,7 @@ public:
|
||||
val(rpc_port, uint16_t, 9160, Used, \
|
||||
"Thrift port for client connections." \
|
||||
) \
|
||||
val(start_rpc, bool, false, Used, \
|
||||
val(start_rpc, bool, true, Used, \
|
||||
"Starts the Thrift RPC server" \
|
||||
) \
|
||||
val(rpc_keepalive, bool, true, Used, \
|
||||
|
||||
@@ -665,13 +665,16 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
|
||||
auto diff = difference(before, after, indirect_equal_to<lw_shared_ptr<query::result_set>>());
|
||||
|
||||
for (auto&& key : diff.entries_only_on_left) {
|
||||
logger.info("Dropping keyspace {}", key);
|
||||
dropped.emplace(key);
|
||||
}
|
||||
for (auto&& key : diff.entries_only_on_right) {
|
||||
auto&& value = after[key];
|
||||
logger.info("Creating keyspace {}", key);
|
||||
created.emplace_back(schema_result_value_type{key, std::move(value)});
|
||||
}
|
||||
for (auto&& key : diff.entries_differing) {
|
||||
logger.info("Altering keyspace {}", key);
|
||||
altered.emplace_back(key);
|
||||
}
|
||||
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) mutable {
|
||||
@@ -713,15 +716,21 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
std::map<qualified_name, schema_mutations>&& before,
|
||||
std::map<qualified_name, schema_mutations>&& after)
|
||||
{
|
||||
struct dropped_table {
|
||||
global_schema_ptr schema;
|
||||
utils::joinpoint<db_clock::time_point> jp{[] {
|
||||
return make_ready_future<db_clock::time_point>(db_clock::now());
|
||||
}};
|
||||
};
|
||||
std::vector<global_schema_ptr> created;
|
||||
std::vector<global_schema_ptr> altered;
|
||||
std::vector<global_schema_ptr> dropped;
|
||||
std::vector<dropped_table> dropped;
|
||||
|
||||
auto diff = difference(before, after);
|
||||
for (auto&& key : diff.entries_only_on_left) {
|
||||
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
|
||||
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
|
||||
dropped.emplace_back(s);
|
||||
dropped.emplace_back(dropped_table{s});
|
||||
}
|
||||
for (auto&& key : diff.entries_only_on_right) {
|
||||
auto s = create_table_from_mutations(after.at(key));
|
||||
@@ -734,14 +743,12 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
altered.emplace_back(s);
|
||||
}
|
||||
|
||||
do_with(utils::make_joinpoint([] { return db_clock::now();})
|
||||
, [&created, &dropped, &altered, &proxy](auto& tsf) {
|
||||
return proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, &tsf] (database& db) {
|
||||
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered] (database& db) {
|
||||
return seastar::async([&] {
|
||||
for (auto&& gs : created) {
|
||||
schema_ptr s = gs.get();
|
||||
auto& ks = db.find_keyspace(s->ks_name());
|
||||
auto cfg = ks.make_column_family_config(*s);
|
||||
auto cfg = ks.make_column_family_config(*s, db.get_config());
|
||||
db.add_column_family(s, cfg);
|
||||
auto& cf = db.find_column_family(s);
|
||||
cf.mark_ready_for_writes();
|
||||
@@ -751,14 +758,13 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
|
||||
for (auto&& gs : altered) {
|
||||
update_column_family(db, gs.get()).get();
|
||||
}
|
||||
parallel_for_each(dropped.begin(), dropped.end(), [&db, &tsf](auto&& gs) {
|
||||
schema_ptr s = gs.get();
|
||||
return db.drop_column_family(s->ks_name(), s->cf_name(), [&tsf] { return tsf.value(); }).then([s] {
|
||||
parallel_for_each(dropped.begin(), dropped.end(), [&db](dropped_table& dt) {
|
||||
schema_ptr s = dt.schema.get();
|
||||
return db.drop_column_family(s->ks_name(), s->cf_name(), [&dt] { return dt.jp.value(); }).then([s] {
|
||||
return service::get_local_migration_manager().notify_drop_column_family(s);
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
}
|
||||
|
||||
|
||||
@@ -71,14 +71,30 @@ static std::vector<db::system_keyspace::range_estimates> estimates_for(const col
|
||||
std::vector<db::system_keyspace::range_estimates> estimates;
|
||||
estimates.reserve(local_ranges.size());
|
||||
|
||||
std::vector<query::partition_range> unwrapped;
|
||||
// Each range defines both bounds.
|
||||
for (auto& range : local_ranges) {
|
||||
int64_t count{0};
|
||||
sstables::estimated_histogram hist{0};
|
||||
for (auto&& sstable : cf.select_sstables(range)) {
|
||||
unwrapped.clear();
|
||||
if (range.is_wrap_around(dht::ring_position_comparator(*cf.schema()))) {
|
||||
auto uw = range.unwrap();
|
||||
unwrapped.push_back(std::move(uw.first));
|
||||
unwrapped.push_back(std::move(uw.second));
|
||||
} else {
|
||||
unwrapped.push_back(range);
|
||||
}
|
||||
for (auto&& uwr : unwrapped) {
|
||||
for (auto&& sstable : cf.select_sstables(uwr)) {
|
||||
count += sstable->get_estimated_key_count();
|
||||
hist.merge(sstable->get_stats_metadata().estimated_row_size);
|
||||
}
|
||||
}
|
||||
estimates.emplace_back(&range, db::system_keyspace::partition_estimates{count, count > 0 ? hist.mean() : 0});
|
||||
estimates.emplace_back(db::system_keyspace::range_estimates{
|
||||
range.start()->value().token(),
|
||||
range.end()->value().token(),
|
||||
count,
|
||||
count > 0 ? hist.mean() : 0});
|
||||
}
|
||||
|
||||
return estimates;
|
||||
@@ -130,7 +146,7 @@ future<> size_estimates_recorder::record_size_estimates() {
|
||||
}
|
||||
|
||||
future<> size_estimates_recorder::stop() {
|
||||
if (get_size_estimates_recorder().local_is_initialized()) {
|
||||
if (engine().cpu_id() == 0) {
|
||||
service::get_local_migration_manager().unregister_listener(this);
|
||||
_timer.cancel();
|
||||
return _gate.close();
|
||||
|
||||
@@ -1043,7 +1043,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
|
||||
db.add_keyspace(NAME, std::move(_ks));
|
||||
auto& ks = db.find_keyspace(NAME);
|
||||
for (auto&& table : all_tables()) {
|
||||
db.add_column_family(table, ks.make_column_family_config(*table));
|
||||
db.add_column_family(table, ks.make_column_family_config(*table, db.get_config()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1195,10 +1195,10 @@ future<int> increment_and_get_generation() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> update_size_estimates(const sstring& ks_name, const sstring& cf_name, std::vector<range_estimates> estimates) {
|
||||
future<> update_size_estimates(sstring ks_name, sstring cf_name, std::vector<range_estimates> estimates) {
|
||||
auto&& schema = size_estimates();
|
||||
auto timestamp = api::new_timestamp();
|
||||
mutation m_to_apply{partition_key::from_singular(*schema, ks_name), schema};
|
||||
mutation m_to_apply{partition_key::from_single_value(*schema, to_bytes(ks_name)), schema};
|
||||
|
||||
// delete all previous values with a single range tombstone.
|
||||
auto ck = clustering_key_prefix::from_single_value(*schema, utf8_type->decompose(cf_name));
|
||||
@@ -1206,28 +1206,50 @@ future<> update_size_estimates(const sstring& ks_name, const sstring& cf_name, s
|
||||
|
||||
// add a CQL row for each primary token range.
|
||||
for (auto&& e : estimates) {
|
||||
// This range has both start and end bounds. We're only interested in the tokens.
|
||||
const range<dht::ring_position>* range = e.first;
|
||||
auto ck = clustering_key_prefix(std::vector<bytes>{
|
||||
utf8_type->decompose(cf_name),
|
||||
utf8_type->decompose(dht::global_partitioner().to_sstring(range->start()->value().token())),
|
||||
utf8_type->decompose(dht::global_partitioner().to_sstring(range->end()->value().token()))});
|
||||
utf8_type->decompose(dht::global_partitioner().to_sstring(e.range_start_token)),
|
||||
utf8_type->decompose(dht::global_partitioner().to_sstring(e.range_end_token))});
|
||||
|
||||
auto mean_partition_size_col = schema->get_column_definition("mean_partition_size");
|
||||
auto cell = atomic_cell::make_live(timestamp, long_type->decompose(e.second.mean_partition_size), { });
|
||||
auto cell = atomic_cell::make_live(timestamp, long_type->decompose(e.mean_partition_size), { });
|
||||
m_to_apply.set_clustered_cell(ck, *mean_partition_size_col, std::move(cell));
|
||||
|
||||
auto partitions_count_col = schema->get_column_definition("partitions_count");
|
||||
cell = atomic_cell::make_live(timestamp, long_type->decompose(e.second.partitions_count), { });
|
||||
cell = atomic_cell::make_live(timestamp, long_type->decompose(e.partitions_count), { });
|
||||
m_to_apply.set_clustered_cell(std::move(ck), *partitions_count_col, std::move(cell));
|
||||
}
|
||||
|
||||
return service::get_local_storage_proxy().mutate_locally(std::move(m_to_apply));
|
||||
}
|
||||
|
||||
future<> clear_size_estimates(const sstring& ks_name, const sstring& cf_name) {
|
||||
future<> clear_size_estimates(sstring ks_name, sstring cf_name) {
|
||||
sstring req = "DELETE FROM system.%s WHERE keyspace_name = ? AND table_name = ?";
|
||||
return execute_cql(req, SIZE_ESTIMATES, ks_name, cf_name).discard_result();
|
||||
return execute_cql(std::move(req), SIZE_ESTIMATES, std::move(ks_name), std::move(cf_name)).discard_result();
|
||||
}
|
||||
|
||||
future<std::vector<range_estimates>> query_size_estimates(sstring ks_name, sstring cf_name, dht::token start_token, dht::token end_token) {
|
||||
sstring req = "SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.%s WHERE keyspace_name = ? AND table_name = ?";
|
||||
return execute_cql(req, SIZE_ESTIMATES, std::move(ks_name), std::move(cf_name))
|
||||
.then([start_token = std::move(start_token), end_token = std::move(end_token)](::shared_ptr<cql3::untyped_result_set> result) {
|
||||
std::vector<range_estimates> estimates;
|
||||
for (auto&& row : *result) {
|
||||
auto range_start = dht::global_partitioner().from_sstring(row.get_as<sstring>("range_start"));
|
||||
if (range_start < start_token) {
|
||||
continue;
|
||||
}
|
||||
auto range_end = dht::global_partitioner().from_sstring(row.get_as<sstring>("range_end"));
|
||||
if (range_end > end_token) {
|
||||
break;
|
||||
}
|
||||
estimates.emplace_back(range_estimates{
|
||||
std::move(range_start),
|
||||
std::move(range_end),
|
||||
row.get_as<int64_t>("partitions_count"),
|
||||
row.get_as<int64_t>("mean_partition_size")});
|
||||
}
|
||||
return estimates;
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace system_keyspace
|
||||
|
||||
@@ -80,13 +80,13 @@ static constexpr auto SSTABLE_ACTIVITY = "sstable_activity";
|
||||
static constexpr auto SIZE_ESTIMATES = "size_estimates";
|
||||
|
||||
// Partition estimates for a given range of tokens.
|
||||
struct partition_estimates {
|
||||
struct range_estimates {
|
||||
dht::token range_start_token;
|
||||
dht::token range_end_token;
|
||||
int64_t partitions_count;
|
||||
int64_t mean_partition_size;
|
||||
};
|
||||
|
||||
using range_estimates = std::pair<const range<dht::ring_position>*, partition_estimates>;
|
||||
|
||||
extern schema_ptr hints();
|
||||
extern schema_ptr batchlog();
|
||||
extern schema_ptr built_indexes(); // TODO (from Cassandra): make private
|
||||
@@ -572,12 +572,17 @@ future<> set_bootstrap_state(bootstrap_state state);
|
||||
/**
|
||||
* Writes the current partition count and size estimates into SIZE_ESTIMATES_CF
|
||||
*/
|
||||
future<> update_size_estimates(const sstring& ks_name, const sstring& cf_name, std::vector<range_estimates> estimates);
|
||||
future<> update_size_estimates(sstring ks_name, sstring cf_name, std::vector<range_estimates> estimates);
|
||||
|
||||
/**
|
||||
* Clears size estimates for a table (on table drop)
|
||||
*/
|
||||
future<> clear_size_estimates(const sstring& ks_name, const sstring& cf_name);
|
||||
future<> clear_size_estimates(sstring ks_name, sstring cf_name);
|
||||
|
||||
/**
|
||||
* Queries the size estimates within the specified range
|
||||
*/
|
||||
future<std::vector<range_estimates>> query_size_estimates(sstring ks_name, sstring cf_name, dht::token start_token, dht::token end_token);
|
||||
|
||||
} // namespace system_keyspace
|
||||
} // namespace db
|
||||
|
||||
@@ -5,7 +5,7 @@ Description=Scylla Housekeeping
|
||||
Type=simple
|
||||
User=scylla
|
||||
Group=scylla
|
||||
ExecStart=/usr/lib/scylla/scylla-Housekeeping -q version
|
||||
ExecStart=/usr/lib/scylla/scylla-housekeeping -q version
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
105
mutation.cc
105
mutation.cc
@@ -213,51 +213,90 @@ mutation& mutation::operator=(const mutation& m) {
|
||||
return *this = mutation(m);
|
||||
}
|
||||
|
||||
future<mutation_opt> mutation_from_streamed_mutation(streamed_mutation_opt sm)
|
||||
{
|
||||
class rebuilder {
|
||||
mutation& _m;
|
||||
public:
|
||||
rebuilder(mutation& m) : _m(m) { }
|
||||
enum class limit_mutation_size { yes, no };
|
||||
|
||||
stop_iteration consume(tombstone t) {
|
||||
_m.partition().apply(t);
|
||||
return stop_iteration::no;
|
||||
template <limit_mutation_size with_limit>
|
||||
class mutation_rebuilder {
|
||||
mutation _m;
|
||||
streamed_mutation& _sm;
|
||||
size_t _remaining_limit;
|
||||
|
||||
template <typename T> bool check_remaining_limit(const T& e) {
|
||||
if (with_limit == limit_mutation_size::no) {
|
||||
return true;
|
||||
}
|
||||
|
||||
stop_iteration consume(static_row&& sr) {
|
||||
_m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells()));
|
||||
return stop_iteration::no;
|
||||
size_t size = e.memory_usage();
|
||||
if (_remaining_limit <= size) {
|
||||
_remaining_limit = 0;
|
||||
} else {
|
||||
_remaining_limit -= size;
|
||||
}
|
||||
return _remaining_limit > 0;
|
||||
}
|
||||
public:
|
||||
mutation_rebuilder(streamed_mutation& sm)
|
||||
: _m(sm.decorated_key(), sm.schema()), _sm(sm), _remaining_limit(0) {
|
||||
static_assert(with_limit == limit_mutation_size::no,
|
||||
"This constructor should be used only for mutation_rebuildeer with no limit");
|
||||
}
|
||||
mutation_rebuilder(streamed_mutation& sm, size_t limit)
|
||||
: _m(sm.decorated_key(), sm.schema()), _sm(sm), _remaining_limit(limit) {
|
||||
static_assert(with_limit == limit_mutation_size::yes,
|
||||
"This constructor should be used only for mutation_rebuildeer with limit");
|
||||
check_remaining_limit(_m.key());
|
||||
}
|
||||
|
||||
stop_iteration consume(range_tombstone&& rt) {
|
||||
_m.partition().apply_row_tombstone(*_m.schema(), std::move(rt));
|
||||
return stop_iteration::no;
|
||||
stop_iteration consume(tombstone t) {
|
||||
_m.partition().apply(t);
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume(range_tombstone&& rt) {
|
||||
if (!check_remaining_limit(rt)) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_m.partition().apply_row_tombstone(*_m.schema(), std::move(rt));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume(clustering_row&& cr) {
|
||||
auto& dr = _m.partition().clustered_row(std::move(cr.key()));
|
||||
dr.apply(cr.tomb());
|
||||
dr.apply(cr.marker());
|
||||
dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells()));
|
||||
return stop_iteration::no;
|
||||
stop_iteration consume(static_row&& sr) {
|
||||
if (!check_remaining_limit(sr)) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells()));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
void consume_end_of_stream() { }
|
||||
};
|
||||
stop_iteration consume(clustering_row&& cr) {
|
||||
if (!check_remaining_limit(cr)) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
auto& dr = _m.partition().clustered_row(std::move(cr.key()));
|
||||
dr.apply(cr.tomb());
|
||||
dr.apply(cr.marker());
|
||||
dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells()));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
struct data {
|
||||
mutation m;
|
||||
streamed_mutation sm;
|
||||
};
|
||||
mutation_opt consume_end_of_stream() {
|
||||
return with_limit == limit_mutation_size::yes && _remaining_limit == 0 ? mutation_opt()
|
||||
: mutation_opt(std::move(_m));
|
||||
}
|
||||
};
|
||||
|
||||
future<mutation_opt>
|
||||
mutation_from_streamed_mutation_with_limit(streamed_mutation sm, size_t limit) {
|
||||
return do_with(std::move(sm), [limit] (auto& sm) {
|
||||
return consume(sm, mutation_rebuilder<limit_mutation_size::yes>(sm, limit));
|
||||
});
|
||||
}
|
||||
|
||||
future<mutation_opt> mutation_from_streamed_mutation(streamed_mutation_opt sm) {
|
||||
if (!sm) {
|
||||
return make_ready_future<mutation_opt>();
|
||||
}
|
||||
mutation m(sm->decorated_key(), sm->schema());
|
||||
return do_with(data { std::move(m), std::move(*sm) }, [] (auto& d) {
|
||||
return consume(d.sm, rebuilder(d.m)).then([&d] {
|
||||
return mutation_opt(std::move(d.m));
|
||||
});
|
||||
return do_with(std::move(*sm), [] (auto& sm) {
|
||||
return consume(sm, mutation_rebuilder<limit_mutation_size::no>(sm));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -182,3 +182,5 @@ boost::iterator_range<std::vector<mutation>::const_iterator> slice(
|
||||
const query::partition_range&);
|
||||
|
||||
future<mutation_opt> mutation_from_streamed_mutation(streamed_mutation_opt sm);
|
||||
future<mutation_opt>
|
||||
mutation_from_streamed_mutation_with_limit(streamed_mutation sm, size_t limit);
|
||||
|
||||
129
row_cache.cc
129
row_cache.cc
@@ -38,6 +38,22 @@ static logging::logger logger("cache");
|
||||
|
||||
thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduling_group(1ms, 0.2);
|
||||
|
||||
enum class is_wide_partition { yes, no };
|
||||
|
||||
future<is_wide_partition, mutation_opt>
|
||||
try_to_read(uint64_t max_cached_partition_size_in_bytes, streamed_mutation_opt&& sm) {
|
||||
if (!sm) {
|
||||
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, mutation_opt());
|
||||
}
|
||||
return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_cached_partition_size_in_bytes).then(
|
||||
[] (mutation_opt&& omo) mutable {
|
||||
if (omo) {
|
||||
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, std::move(omo));
|
||||
} else {
|
||||
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::yes, mutation_opt());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
cache_tracker& global_cache_tracker() {
|
||||
static thread_local cache_tracker instance;
|
||||
@@ -103,6 +119,11 @@ cache_tracker::setup_collectd() {
|
||||
, "total_operations", "misses")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _misses)
|
||||
),
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("cache"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "uncached_wide_partitions")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _uncached_wide_partitions)
|
||||
),
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("cache"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "insertions")
|
||||
@@ -180,6 +201,10 @@ void cache_tracker::on_miss() {
|
||||
++_misses;
|
||||
}
|
||||
|
||||
void cache_tracker::on_uncached_wide_partition() {
|
||||
++_uncached_wide_partitions;
|
||||
}
|
||||
|
||||
allocation_strategy& cache_tracker::allocator() {
|
||||
return _region.allocator();
|
||||
}
|
||||
@@ -196,29 +221,50 @@ const logalloc::region& cache_tracker::region() const {
|
||||
class single_partition_populating_reader final : public mutation_reader::impl {
|
||||
schema_ptr _schema;
|
||||
row_cache& _cache;
|
||||
mutation_source& _underlying;
|
||||
mutation_reader _delegate;
|
||||
const io_priority_class _pc;
|
||||
query::clustering_key_filtering_context _ck_filtering;
|
||||
public:
|
||||
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate, query::clustering_key_filtering_context ck_filtering)
|
||||
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_source& underlying,
|
||||
mutation_reader delegate, const io_priority_class pc, query::clustering_key_filtering_context ck_filtering)
|
||||
: _schema(std::move(s))
|
||||
, _cache(cache)
|
||||
, _underlying(underlying)
|
||||
, _delegate(std::move(delegate))
|
||||
, _pc(pc)
|
||||
, _ck_filtering(ck_filtering)
|
||||
{ }
|
||||
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return _delegate().then([] (auto sm) {
|
||||
return mutation_from_streamed_mutation(std::move(sm));
|
||||
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return streamed_mutation_from_mutation(std::move(*mo));
|
||||
auto op = _cache._populate_phaser.start();
|
||||
return _delegate().then([this, op = std::move(op)] (auto sm) mutable {
|
||||
if (!sm) {
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
|
||||
}
|
||||
return { };
|
||||
dht::decorated_key dk = sm->decorated_key();
|
||||
return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then(
|
||||
[this, op = std::move(op), dk = std::move(dk)]
|
||||
(is_wide_partition wide_partition, mutation_opt&& mo) {
|
||||
if (wide_partition == is_wide_partition::no) {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
|
||||
}
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
|
||||
} else {
|
||||
_cache.on_uncached_wide_partition();
|
||||
auto reader = _underlying(_schema,
|
||||
query::partition_range::make_singular(dht::ring_position(std::move(dk))),
|
||||
_ck_filtering,
|
||||
_pc);
|
||||
return reader();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -233,6 +279,10 @@ void row_cache::on_miss() {
|
||||
_tracker.on_miss();
|
||||
}
|
||||
|
||||
void row_cache::on_uncached_wide_partition() {
|
||||
_tracker.on_uncached_wide_partition();
|
||||
}
|
||||
|
||||
class just_cache_scanning_reader final {
|
||||
schema_ptr _schema;
|
||||
row_cache& _cache;
|
||||
@@ -397,22 +447,38 @@ public:
|
||||
{}
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
update_reader();
|
||||
return _reader().then([] (auto sm) {
|
||||
return mutation_from_streamed_mutation(std::move(sm));
|
||||
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
|
||||
_last_key = dht::ring_position(mo->decorated_key());
|
||||
_last_key_populate_phase = _cache._populate_phaser.phase();
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return streamed_mutation_from_mutation(std::move(*mo));
|
||||
}
|
||||
maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
|
||||
return {};
|
||||
auto op = _cache._populate_phaser.start();
|
||||
return _reader().then([this, op = std::move(op)] (auto sm) mutable {
|
||||
stdx::optional<dht::decorated_key> dk = (sm) ? stdx::optional<dht::decorated_key>(sm->decorated_key())
|
||||
: stdx::optional<dht::decorated_key>(stdx::nullopt);
|
||||
return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then(
|
||||
[this, op = std::move(op), dk = std::move(dk)]
|
||||
(is_wide_partition wide_partition, mutation_opt&& mo) mutable {
|
||||
if (wide_partition == is_wide_partition::no) {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
this->maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
|
||||
_last_key = dht::ring_position(mo->decorated_key());
|
||||
_last_key_populate_phase = _cache._populate_phaser.phase();
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
|
||||
}
|
||||
this->maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
|
||||
} else {
|
||||
assert(bool(dk));
|
||||
_last_key = std::experimental::optional<dht::ring_position>();
|
||||
_cache.on_uncached_wide_partition();
|
||||
auto reader = _underlying(_schema,
|
||||
query::partition_range::make_singular(dht::ring_position(std::move(*dk))),
|
||||
_ck_filtering,
|
||||
_pc);
|
||||
return reader();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -678,8 +744,8 @@ row_cache::make_reader(schema_ptr s,
|
||||
return make_reader_returning(e.read(*this, s, ck_filtering));
|
||||
} else {
|
||||
on_miss();
|
||||
return make_mutation_reader<single_partition_populating_reader>(s, *this,
|
||||
_underlying(_schema, range, query::no_clustering_key_filtering, pc),
|
||||
return make_mutation_reader<single_partition_populating_reader>(s, *this, _underlying,
|
||||
_underlying(_schema, range, query::no_clustering_key_filtering, pc), pc,
|
||||
ck_filtering);
|
||||
}
|
||||
});
|
||||
@@ -912,12 +978,13 @@ void row_cache::invalidate_unwrapped(const query::partition_range& range) {
|
||||
}
|
||||
|
||||
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,
|
||||
cache_tracker& tracker)
|
||||
cache_tracker& tracker, uint64_t max_cached_partition_size_in_bytes)
|
||||
: _tracker(tracker)
|
||||
, _schema(std::move(s))
|
||||
, _partitions(cache_entry::compare(_schema))
|
||||
, _underlying(std::move(fallback_factory))
|
||||
, _underlying_keys(std::move(underlying_keys))
|
||||
, _max_cached_partition_size_in_bytes(max_cached_partition_size_in_bytes)
|
||||
{
|
||||
with_allocator(_tracker.allocator(), [this] {
|
||||
cache_entry* entry = current_allocator().construct<cache_entry>(_schema);
|
||||
|
||||
@@ -149,6 +149,7 @@ public:
|
||||
private:
|
||||
uint64_t _hits = 0;
|
||||
uint64_t _misses = 0;
|
||||
uint64_t _uncached_wide_partitions = 0;
|
||||
uint64_t _insertions = 0;
|
||||
uint64_t _merges = 0;
|
||||
uint64_t _evictions = 0;
|
||||
@@ -170,11 +171,13 @@ public:
|
||||
void on_merge();
|
||||
void on_hit();
|
||||
void on_miss();
|
||||
void on_uncached_wide_partition();
|
||||
allocation_strategy& allocator();
|
||||
logalloc::region& region();
|
||||
const logalloc::region& region() const;
|
||||
uint64_t modification_count() const { return _modification_count; }
|
||||
uint64_t partitions() const { return _partitions; }
|
||||
uint64_t uncached_wide_partitions() const { return _uncached_wide_partitions; }
|
||||
};
|
||||
|
||||
// Returns a reference to shard-wide cache_tracker.
|
||||
@@ -211,6 +214,7 @@ private:
|
||||
partitions_type _partitions; // Cached partitions are complete.
|
||||
mutation_source _underlying;
|
||||
key_source _underlying_keys;
|
||||
uint64_t _max_cached_partition_size_in_bytes;
|
||||
|
||||
// Synchronizes populating reads with updates of underlying data source to ensure that cache
|
||||
// remains consistent across flushes with the underlying data source.
|
||||
@@ -231,6 +235,7 @@ private:
|
||||
query::clustering_key_filtering_context ck_filtering);
|
||||
void on_hit();
|
||||
void on_miss();
|
||||
void on_uncached_wide_partition();
|
||||
void upgrade_entry(cache_entry&);
|
||||
void invalidate_locked(const dht::decorated_key&);
|
||||
void invalidate_unwrapped(const query::partition_range&);
|
||||
@@ -238,7 +243,7 @@ private:
|
||||
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
|
||||
public:
|
||||
~row_cache();
|
||||
row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&);
|
||||
row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&, uint64_t _max_cached_partition_size_in_bytes = 10 * 1024 * 1024);
|
||||
row_cache(row_cache&&) = default;
|
||||
row_cache(const row_cache&) = delete;
|
||||
row_cache& operator=(row_cache&&) = default;
|
||||
|
||||
@@ -2128,10 +2128,13 @@ protected:
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> make_mutation_data_request(lw_shared_ptr<query::read_command> cmd, gms::inet_address ep, clock_type::time_point timeout) {
|
||||
++_proxy->_stats.mutation_data_read_attempts.get_ep_stat(ep);
|
||||
if (is_me(ep)) {
|
||||
tracing::trace(_trace_state, "read_mutation_data: querying locally");
|
||||
return _proxy->query_mutations_locally(_schema, cmd, _partition_range);
|
||||
} else {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
return ms.send_read_mutation_data(net::messaging_service::msg_addr{ep, 0}, timeout, *cmd, _partition_range).then([this](reconcilable_result&& result) {
|
||||
tracing::trace(_trace_state, "read_mutation_data: sending a message to /{}", ep);
|
||||
return ms.send_read_mutation_data(net::messaging_service::msg_addr{ep, 0}, timeout, *cmd, _partition_range).then([this, ep](reconcilable_result&& result) {
|
||||
tracing::trace(_trace_state, "read_mutation_data: got response from /{}", ep);
|
||||
return make_foreign(::make_lw_shared<reconcilable_result>(std::move(result)));
|
||||
});
|
||||
}
|
||||
@@ -2139,10 +2142,13 @@ protected:
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> make_data_request(gms::inet_address ep, clock_type::time_point timeout) {
|
||||
++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
|
||||
if (is_me(ep)) {
|
||||
tracing::trace(_trace_state, "read_data: querying locally");
|
||||
return _proxy->query_singular_local(_schema, _cmd, _partition_range);
|
||||
} else {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
return ms.send_read_data(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([this](query::result&& result) {
|
||||
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
|
||||
return ms.send_read_data(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([this, ep](query::result&& result) {
|
||||
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
|
||||
return make_foreign(::make_lw_shared<query::result>(std::move(result)));
|
||||
});
|
||||
}
|
||||
@@ -2150,10 +2156,13 @@ protected:
|
||||
future<query::result_digest, api::timestamp_type> make_digest_request(gms::inet_address ep, clock_type::time_point timeout) {
|
||||
++_proxy->_stats.digest_read_attempts.get_ep_stat(ep);
|
||||
if (is_me(ep)) {
|
||||
tracing::trace(_trace_state, "read_digest: querying locally");
|
||||
return _proxy->query_singular_local_digest(_schema, _cmd, _partition_range);
|
||||
} else {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
return ms.send_read_digest(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([] (query::result_digest d, rpc::optional<api::timestamp_type> t) {
|
||||
tracing::trace(_trace_state, "read_digest: sending a message to /{}", ep);
|
||||
return ms.send_read_digest(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([this, ep] (query::result_digest d, rpc::optional<api::timestamp_type> t) {
|
||||
tracing::trace(_trace_state, "read_digest: got response from /{}", ep);
|
||||
return make_ready_future<query::result_digest, api::timestamp_type>(d, t ? t.value() : api::missing_timestamp);
|
||||
});
|
||||
}
|
||||
@@ -3271,10 +3280,11 @@ void storage_proxy::init_messaging_service() {
|
||||
}
|
||||
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto src_ip = src_addr.addr;
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p] (schema_ptr s) {
|
||||
return p->query_singular_local(std::move(s), cmd, pr);
|
||||
}).finally([&trace_state_ptr] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_data handling is done");
|
||||
}).finally([&trace_state_ptr, src_ip] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -3287,10 +3297,11 @@ void storage_proxy::init_messaging_service() {
|
||||
tracing::trace(trace_state_ptr, "read_mutation_data: message received from /{}", src_addr.addr);
|
||||
}
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto src_ip = src_addr.addr;
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p] (schema_ptr s) {
|
||||
return p->query_mutations_locally(std::move(s), cmd, pr);
|
||||
}).finally([&trace_state_ptr] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_mutation_data handling is done");
|
||||
}).finally([&trace_state_ptr, src_ip] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_mutation_data handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -3303,10 +3314,11 @@ void storage_proxy::init_messaging_service() {
|
||||
tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr);
|
||||
}
|
||||
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
|
||||
auto src_ip = src_addr.addr;
|
||||
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p] (schema_ptr s) {
|
||||
return p->query_singular_local_digest(std::move(s), cmd, pr);
|
||||
}).finally([&trace_state_ptr] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_digest handling is done");
|
||||
}).finally([&trace_state_ptr, src_ip] () mutable {
|
||||
tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -48,24 +48,93 @@
|
||||
#include <iterator>
|
||||
#include "sstables.hh"
|
||||
#include "compaction.hh"
|
||||
#include "timestamp.hh"
|
||||
#include "cql3/statements/property_definitions.hh"
|
||||
|
||||
static constexpr double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365;
|
||||
static constexpr int64_t DEFAULT_BASE_TIME_SECONDS = 60;
|
||||
|
||||
struct duration_conversor {
|
||||
// Convert given duration to TargetDuration and return value as timestamp.
|
||||
template <typename TargetDuration, typename SourceDuration>
|
||||
static api::timestamp_type convert(SourceDuration d) {
|
||||
return std::chrono::duration_cast<TargetDuration>(d).count();
|
||||
}
|
||||
|
||||
// Convert given duration to duration that is represented by the string
|
||||
// target_duration, and return value as timestamp.
|
||||
template <typename SourceDuration>
|
||||
static api::timestamp_type convert(const sstring& target_duration, SourceDuration d) {
|
||||
if (target_duration == "HOURS") {
|
||||
return convert<std::chrono::hours>(d);
|
||||
} else if (target_duration == "MICROSECONDS") {
|
||||
return convert<std::chrono::microseconds>(d);
|
||||
} else if (target_duration == "MILLISECONDS") {
|
||||
return convert<std::chrono::milliseconds>(d);
|
||||
} else if (target_duration == "MINUTES") {
|
||||
return convert<std::chrono::minutes>(d);
|
||||
} else if (target_duration == "NANOSECONDS") {
|
||||
return convert<std::chrono::nanoseconds>(d);
|
||||
} else if (target_duration == "SECONDS") {
|
||||
return convert<std::chrono::seconds>(d);
|
||||
} else {
|
||||
throw std::runtime_error(sprint("target duration %s is not available", target_duration));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class date_tiered_compaction_strategy_options {
|
||||
const sstring DEFAULT_TIMESTAMP_RESOLUTION = "MICROSECONDS";
|
||||
const sstring TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
|
||||
const sstring MAX_SSTABLE_AGE_KEY = "max_sstable_age_days";
|
||||
const sstring BASE_TIME_KEY = "base_time_seconds";
|
||||
|
||||
api::timestamp_type max_sstable_age;
|
||||
api::timestamp_type base_time;
|
||||
public:
|
||||
date_tiered_compaction_strategy_options(const std::map<sstring, sstring>& options) {
|
||||
using namespace cql3::statements;
|
||||
|
||||
auto tmp_value = get_value(options, TIMESTAMP_RESOLUTION_KEY);
|
||||
auto target_unit = tmp_value ? tmp_value.value() : DEFAULT_TIMESTAMP_RESOLUTION;
|
||||
|
||||
tmp_value = get_value(options, MAX_SSTABLE_AGE_KEY);
|
||||
auto fractional_days = property_definitions::to_double(MAX_SSTABLE_AGE_KEY, tmp_value, DEFAULT_MAX_SSTABLE_AGE_DAYS);
|
||||
int64_t max_sstable_age_in_hours = std::lround(fractional_days * 24);
|
||||
max_sstable_age = duration_conversor::convert(target_unit, std::chrono::hours(max_sstable_age_in_hours));
|
||||
|
||||
tmp_value = get_value(options, BASE_TIME_KEY);
|
||||
auto base_time_seconds = property_definitions::to_long(BASE_TIME_KEY, tmp_value, DEFAULT_BASE_TIME_SECONDS);
|
||||
base_time = duration_conversor::convert(target_unit, std::chrono::seconds(base_time_seconds));
|
||||
}
|
||||
|
||||
date_tiered_compaction_strategy_options() {
|
||||
auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24);
|
||||
max_sstable_age = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::hours(max_sstable_age_in_hours)).count();
|
||||
base_time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS)).count();
|
||||
}
|
||||
private:
|
||||
static std::experimental::optional<sstring> get_value(const std::map<sstring, sstring>& options, const sstring& name) {
|
||||
auto it = options.find(name);
|
||||
if (it == options.end()) {
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
friend class date_tiered_manifest;
|
||||
};
|
||||
|
||||
class date_tiered_manifest {
|
||||
static logging::logger logger;
|
||||
|
||||
// TODO: implement date_tiered_compaction_strategy_options.
|
||||
db_clock::duration _max_sstable_age;
|
||||
db_clock::duration _base_time;
|
||||
date_tiered_compaction_strategy_options _options;
|
||||
public:
|
||||
date_tiered_manifest() = delete;
|
||||
|
||||
date_tiered_manifest(const std::map<sstring, sstring>& options) {
|
||||
auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24);
|
||||
_max_sstable_age = std::chrono::duration_cast<db_clock::duration>(std::chrono::hours(max_sstable_age_in_hours));
|
||||
_base_time = std::chrono::duration_cast<db_clock::duration>(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS));
|
||||
|
||||
date_tiered_manifest(const std::map<sstring, sstring>& options)
|
||||
: _options(options)
|
||||
{
|
||||
// FIXME: implement option to disable tombstone compaction.
|
||||
#if 0
|
||||
if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
|
||||
@@ -119,8 +188,8 @@ public:
|
||||
for (auto& entry : *cf.get_sstables()) {
|
||||
sstables.push_back(entry);
|
||||
}
|
||||
auto candidates = filter_old_sstables(sstables, _max_sstable_age, now);
|
||||
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now);
|
||||
auto candidates = filter_old_sstables(sstables, _options.max_sstable_age, now);
|
||||
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now);
|
||||
|
||||
for (auto& bucket : buckets) {
|
||||
if (bucket.size() >= size_t(cf.schema()->min_compaction_threshold())) {
|
||||
@@ -161,11 +230,11 @@ private:
|
||||
get_compaction_candidates(column_family& cf, std::vector<sstables::shared_sstable> candidate_sstables, int64_t now, int base) {
|
||||
int min_threshold = cf.schema()->min_compaction_threshold();
|
||||
int max_threshold = cf.schema()->max_compaction_threshold();
|
||||
auto candidates = filter_old_sstables(candidate_sstables, _max_sstable_age, now);
|
||||
auto candidates = filter_old_sstables(candidate_sstables, _options.max_sstable_age, now);
|
||||
|
||||
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now);
|
||||
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now);
|
||||
|
||||
return newest_bucket(buckets, min_threshold, max_threshold, now, _base_time);
|
||||
return newest_bucket(buckets, min_threshold, max_threshold, now, _options.base_time);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -186,12 +255,11 @@ private:
|
||||
* @return a list of sstables with the oldest sstables excluded
|
||||
*/
|
||||
static std::vector<sstables::shared_sstable>
|
||||
filter_old_sstables(std::vector<sstables::shared_sstable> sstables, db_clock::duration max_sstable_age, int64_t now) {
|
||||
int64_t max_sstable_age_count = std::chrono::duration_cast<std::chrono::microseconds>(max_sstable_age).count();
|
||||
if (max_sstable_age_count == 0) {
|
||||
filter_old_sstables(std::vector<sstables::shared_sstable> sstables, api::timestamp_type max_sstable_age, int64_t now) {
|
||||
if (max_sstable_age == 0) {
|
||||
return sstables;
|
||||
}
|
||||
int64_t cutoff = now - max_sstable_age_count;
|
||||
int64_t cutoff = now - max_sstable_age;
|
||||
|
||||
sstables.erase(std::remove_if(sstables.begin(), sstables.end(), [cutoff] (auto& sst) {
|
||||
return sst->get_stats_metadata().max_timestamp < cutoff;
|
||||
@@ -275,14 +343,14 @@ private:
|
||||
* Each bucket is also a list of files ordered from newest to oldest.
|
||||
*/
|
||||
std::vector<std::vector<sstables::shared_sstable>>
|
||||
get_buckets(std::vector<std::pair<sstables::shared_sstable,int64_t>>&& files, db_clock::duration time_unit, int base, int64_t now) const {
|
||||
get_buckets(std::vector<std::pair<sstables::shared_sstable,int64_t>>&& files, api::timestamp_type time_unit, int base, int64_t now) const {
|
||||
// Sort files by age. Newest first.
|
||||
std::sort(files.begin(), files.end(), [] (auto& i, auto& j) {
|
||||
return i.second > j.second;
|
||||
});
|
||||
|
||||
std::vector<std::vector<sstables::shared_sstable>> buckets;
|
||||
auto target = get_initial_target(now, std::chrono::duration_cast<std::chrono::microseconds>(time_unit).count());
|
||||
auto target = get_initial_target(now, time_unit);
|
||||
auto it = files.begin();
|
||||
|
||||
while (it != files.end()) {
|
||||
@@ -329,12 +397,12 @@ private:
|
||||
*/
|
||||
std::vector<sstables::shared_sstable>
|
||||
newest_bucket(std::vector<std::vector<sstables::shared_sstable>>& buckets, int min_threshold, int max_threshold,
|
||||
int64_t now, db_clock::duration base_time) {
|
||||
int64_t now, api::timestamp_type base_time) {
|
||||
|
||||
// If the "incoming window" has at least minThreshold SSTables, choose that one.
|
||||
// For any other bucket, at least 2 SSTables is enough.
|
||||
// In any case, limit to maxThreshold SSTables.
|
||||
target incoming_window = get_initial_target(now, std::chrono::duration_cast<std::chrono::microseconds>(base_time).count());
|
||||
target incoming_window = get_initial_target(now, base_time);
|
||||
for (auto& bucket : buckets) {
|
||||
auto min_timestamp = bucket.front()->get_stats_metadata().min_timestamp;
|
||||
if (bucket.size() >= size_t(min_threshold) ||
|
||||
|
||||
@@ -555,8 +555,8 @@ public:
|
||||
return *_pc;
|
||||
}
|
||||
|
||||
bool is_mutation_end() const {
|
||||
return _is_mutation_end;
|
||||
bool get_and_reset_is_mutation_end() {
|
||||
return std::exchange(_is_mutation_end, false);
|
||||
}
|
||||
|
||||
stdx::optional<new_mutation> get_mutation() {
|
||||
@@ -577,46 +577,72 @@ public:
|
||||
};
|
||||
|
||||
class sstable_streamed_mutation : public streamed_mutation::impl {
|
||||
const schema& _schema;
|
||||
data_consume_context& _context;
|
||||
mp_row_consumer& _consumer;
|
||||
tombstone _t;
|
||||
bool _finished = false;
|
||||
range_tombstone_stream _range_tombstones;
|
||||
mutation_fragment_opt _current_candidate;
|
||||
mutation_fragment_opt _next_candidate;
|
||||
stdx::optional<position_in_partition> _last_position;
|
||||
position_in_partition::less_compare _cmp;
|
||||
position_in_partition::equal_compare _eq;
|
||||
private:
|
||||
future<mutation_fragment_opt> read_next() {
|
||||
future<stdx::optional<mutation_fragment_opt>> read_next() {
|
||||
// Because of #1203 we may encounter sstables with range tombstones
|
||||
// placed earler than expected.
|
||||
if (_next_candidate) {
|
||||
auto mf = _range_tombstones.get_next(*_next_candidate);
|
||||
if (_next_candidate || (_current_candidate && _finished)) {
|
||||
assert(_current_candidate);
|
||||
auto mf = _range_tombstones.get_next(*_current_candidate);
|
||||
if (!mf) {
|
||||
mf = move_and_disengage(_next_candidate);
|
||||
mf = move_and_disengage(_current_candidate);
|
||||
_current_candidate = move_and_disengage(_next_candidate);
|
||||
}
|
||||
return make_ready_future<mutation_fragment_opt>(std::move(mf));
|
||||
return make_ready_future<stdx::optional<mutation_fragment_opt>>(std::move(mf));
|
||||
}
|
||||
if (_finished) {
|
||||
return make_ready_future<mutation_fragment_opt>(_range_tombstones.get_next());
|
||||
// No need to update _last_position here. We've already read everything from the sstable.
|
||||
return make_ready_future<stdx::optional<mutation_fragment_opt>>(_range_tombstones.get_next());
|
||||
}
|
||||
return _context.read().then([this] {
|
||||
if (_consumer.is_mutation_end()) {
|
||||
_finished = true;
|
||||
}
|
||||
_finished = _consumer.get_and_reset_is_mutation_end();
|
||||
auto mf = _consumer.get_mutation_fragment();
|
||||
if (mf && mf->is_range_tombstone()) {
|
||||
_range_tombstones.apply(std::move(mf->as_range_tombstone()));
|
||||
} else {
|
||||
_next_candidate = std::move(mf);
|
||||
if (mf) {
|
||||
if (mf->is_range_tombstone()) {
|
||||
// If sstable uses promoted index it will repeat relevant range tombstones in
|
||||
// each block. Do not emit these duplicates as they will break the guarantee
|
||||
// that mutation fragment are produced in ascending order.
|
||||
if (!_last_position || !_cmp(*mf, *_last_position)) {
|
||||
_last_position = mf->position();
|
||||
_range_tombstones.apply(std::move(mf->as_range_tombstone()));
|
||||
}
|
||||
} else {
|
||||
// mp_row_consumer may produce mutation_fragments in parts if they are
|
||||
// interrupted by range tombstone duplicate. Make sure they are merged
|
||||
// before emitting them.
|
||||
_last_position = mf->position();
|
||||
if (!_current_candidate) {
|
||||
_current_candidate = std::move(mf);
|
||||
} else if (_current_candidate && _eq(*_current_candidate, *mf)) {
|
||||
_current_candidate->apply(_schema, std::move(*mf));
|
||||
} else {
|
||||
_next_candidate = std::move(mf);
|
||||
}
|
||||
}
|
||||
}
|
||||
return read_next();
|
||||
return stdx::optional<mutation_fragment_opt>();
|
||||
});
|
||||
}
|
||||
public:
|
||||
sstable_streamed_mutation(schema_ptr s, dht::decorated_key dk, data_consume_context& context, mp_row_consumer& consumer, tombstone t)
|
||||
: streamed_mutation::impl(s, std::move(dk), t), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s) { }
|
||||
: streamed_mutation::impl(s, std::move(dk), t), _schema(*s), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s), _cmp(*s), _eq(*s) { }
|
||||
|
||||
virtual future<> fill_buffer() final override {
|
||||
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
|
||||
return read_next().then([this] (mutation_fragment_opt&& mfopt) {
|
||||
return repeat_until_value([this] {
|
||||
return read_next();
|
||||
}).then([this] (mutation_fragment_opt&& mfopt) {
|
||||
if (!mfopt) {
|
||||
_end_of_stream = true;
|
||||
} else {
|
||||
@@ -800,7 +826,7 @@ private:
|
||||
return _context->read().then([this] {
|
||||
auto mut = _consumer.get_mutation();
|
||||
if (!mut) {
|
||||
if (_consumer.get_mutation_fragment()) {
|
||||
if (_consumer.get_mutation_fragment() || _consumer.get_and_reset_is_mutation_end()) {
|
||||
// We are still in the middle of the previous mutation.
|
||||
_consumer.skip_partition();
|
||||
return do_read();
|
||||
|
||||
@@ -116,6 +116,16 @@ std::ostream& operator<<(std::ostream& os, const streamed_mutation& sm) {
|
||||
return os;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, mutation_fragment::kind k)
|
||||
{
|
||||
switch (k) {
|
||||
case mutation_fragment::kind::static_row: return os << "static row";
|
||||
case mutation_fragment::kind::clustering_row: return os << "clustering row";
|
||||
case mutation_fragment::kind::range_tombstone: return os << "range tombstone";
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
streamed_mutation streamed_mutation_from_mutation(mutation m)
|
||||
{
|
||||
class reader final : public streamed_mutation::impl {
|
||||
|
||||
@@ -249,6 +249,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream&, mutation_fragment::kind);
|
||||
|
||||
class position_in_partition {
|
||||
int _bound_weight = 0;
|
||||
stdx::optional<clustering_key_prefix> _ck;
|
||||
|
||||
@@ -59,7 +59,7 @@ int main(int argc, char** argv) {
|
||||
auto objects_in_batch = app.configuration()["batch"].as<unsigned>();
|
||||
|
||||
return seastar::async([obj_size, obj_count, objects_in_batch] {
|
||||
std::deque<managed_bytes> refs;
|
||||
chunked_fifo<managed_bytes> refs;
|
||||
logalloc::region r;
|
||||
|
||||
with_allocator(r.allocator(), [&] {
|
||||
|
||||
@@ -110,3 +110,55 @@ mutation_opt_assertions assert_that(streamed_mutation_opt smo) {
|
||||
return { std::move(mo) };
|
||||
}
|
||||
|
||||
class streamed_mutation_assertions {
|
||||
streamed_mutation _sm;
|
||||
clustering_key::equality _ck_eq;
|
||||
public:
|
||||
streamed_mutation_assertions(streamed_mutation sm)
|
||||
: _sm(std::move(sm)), _ck_eq(*_sm.schema()) { }
|
||||
|
||||
streamed_mutation_assertions& produces_static_row() {
|
||||
auto mfopt = _sm().get0();
|
||||
if (!mfopt) {
|
||||
BOOST_FAIL("Expected static row, got end of stream");
|
||||
}
|
||||
if (mfopt->mutation_fragment_kind() != mutation_fragment::kind::static_row) {
|
||||
BOOST_FAIL(sprint("Expected static row, got: %s", mfopt->mutation_fragment_kind()));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
streamed_mutation_assertions& produces(mutation_fragment::kind k, std::vector<int> ck_elements) {
|
||||
std::vector<bytes> ck_bytes;
|
||||
for (auto&& e : ck_elements) {
|
||||
ck_bytes.emplace_back(int32_type->decompose(e));
|
||||
}
|
||||
auto ck = clustering_key_prefix::from_exploded(*_sm.schema(), std::move(ck_bytes));
|
||||
|
||||
auto mfopt = _sm().get0();
|
||||
if (!mfopt) {
|
||||
BOOST_FAIL(sprint("Expected mutation fragment %s, got end of stream", ck));
|
||||
}
|
||||
if (mfopt->mutation_fragment_kind() != k) {
|
||||
BOOST_FAIL(sprint("Expected mutation fragment kind %s, got: %s", k, mfopt->mutation_fragment_kind()));
|
||||
}
|
||||
if (!_ck_eq(mfopt->key(), ck)) {
|
||||
BOOST_FAIL(sprint("Expected key %s, got: %s", ck, mfopt->key()));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
streamed_mutation_assertions& produces_end_of_stream() {
|
||||
auto mfopt = _sm().get0();
|
||||
BOOST_REQUIRE(!mfopt);
|
||||
if (mfopt) {
|
||||
BOOST_FAIL(sprint("Expected end of stream, got: %s", mfopt->mutation_fragment_kind()));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
|
||||
static inline streamed_mutation_assertions assert_that_stream(streamed_mutation sm)
|
||||
{
|
||||
return streamed_mutation_assertions(std::move(sm));
|
||||
}
|
||||
@@ -112,6 +112,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying) {
|
||||
assert_that(cache.make_reader(s, query::full_partition_range))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
assert(tracker.uncached_wide_partitions() == 0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -140,6 +141,58 @@ SEASTAR_TEST_CASE(test_cache_works_after_clearing) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_always_for_wide_partition_full_range) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
auto m = make_new_mutation(s);
|
||||
int secondary_calls_count = 0;
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, mutation_source([&secondary_calls_count, &m] (schema_ptr s, const query::partition_range& range) {
|
||||
++secondary_calls_count;
|
||||
return make_reader_returning(m);
|
||||
}), key_source([&m] (auto&&) {
|
||||
return make_key_from_mutation_reader(make_reader_returning(m));
|
||||
}), tracker, 0);
|
||||
|
||||
assert_that(cache.make_reader(s, query::full_partition_range))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
assert(secondary_calls_count == 2);
|
||||
assert(tracker.uncached_wide_partitions() == 1);
|
||||
assert_that(cache.make_reader(s, query::full_partition_range))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
assert(secondary_calls_count == 4);
|
||||
assert(tracker.uncached_wide_partitions() == 2);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_always_for_wide_partition_single_partition) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
auto m = make_new_mutation(s);
|
||||
int secondary_calls_count = 0;
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, mutation_source([&secondary_calls_count, &m] (schema_ptr s, const query::partition_range& range) {
|
||||
++secondary_calls_count;
|
||||
return make_reader_returning(m);
|
||||
}), key_source([&m] (auto&&) {
|
||||
return make_key_from_mutation_reader(make_reader_returning(m));
|
||||
}), tracker, 0);
|
||||
|
||||
assert_that(cache.make_reader(s, query::partition_range::make_singular(query::ring_position(m.decorated_key()))))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
assert(secondary_calls_count == 2);
|
||||
assert(tracker.uncached_wide_partitions() == 1);
|
||||
assert_that(cache.make_reader(s, query::partition_range::make_singular(query::ring_position(m.decorated_key()))))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
assert(secondary_calls_count == 4);
|
||||
assert(tracker.uncached_wide_partitions() == 2);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_full_range) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
#include "range.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "sstables/date_tiered_compaction_strategy.hh"
|
||||
#include "mutation_assertions.hh"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <ftw.h>
|
||||
@@ -2596,43 +2597,25 @@ SEASTAR_TEST_CASE(test_wrong_range_tombstone_order) {
|
||||
|
||||
auto smopt = reader().get0();
|
||||
BOOST_REQUIRE(smopt);
|
||||
auto& sm = *smopt;
|
||||
|
||||
using kind = mutation_fragment::kind;
|
||||
auto then_expect = [&] (kind k, std::vector<int> ck_elems) {
|
||||
std::vector<bytes> ck_bytes;
|
||||
for (auto&& e : ck_elems) {
|
||||
ck_bytes.emplace_back(int32_type->decompose(e));
|
||||
}
|
||||
auto ck = clustering_key_prefix::from_exploded(*s, std::move(ck_bytes));
|
||||
|
||||
auto mfopt = sm().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
if (mfopt->mutation_fragment_kind() != k) {
|
||||
abort();
|
||||
}
|
||||
BOOST_REQUIRE(mfopt->mutation_fragment_kind() == k);
|
||||
BOOST_REQUIRE(ck_eq(mfopt->key(), ck));
|
||||
};
|
||||
|
||||
then_expect(kind::range_tombstone, { 0 });
|
||||
then_expect(kind::clustering_row, { 1 });
|
||||
then_expect(kind::clustering_row, { 1, 1 });
|
||||
then_expect(kind::clustering_row, { 1, 2 });
|
||||
then_expect(kind::clustering_row, { 1, 2, 3 });
|
||||
then_expect(kind::range_tombstone, { 1, 3 });
|
||||
then_expect(kind::clustering_row, { 1, 3 });
|
||||
then_expect(kind::clustering_row, { 1, 3, 4 });
|
||||
then_expect(kind::clustering_row, { 1, 4 });
|
||||
then_expect(kind::clustering_row, { 1, 4, 0 });
|
||||
then_expect(kind::range_tombstone, { 2 });
|
||||
then_expect(kind::range_tombstone, { 2, 1 });
|
||||
then_expect(kind::range_tombstone, { 2, 1 });
|
||||
then_expect(kind::range_tombstone, { 2, 2 });
|
||||
then_expect(kind::range_tombstone, { 2, 2 });
|
||||
|
||||
auto mfopt = sm().get0();
|
||||
BOOST_REQUIRE(!mfopt);
|
||||
assert_that_stream(std::move(*smopt))
|
||||
.produces(kind::range_tombstone, { 0 })
|
||||
.produces(kind::clustering_row, { 1 })
|
||||
.produces(kind::clustering_row, { 1, 1 })
|
||||
.produces(kind::clustering_row, { 1, 2 })
|
||||
.produces(kind::clustering_row, { 1, 2, 3 })
|
||||
.produces(kind::range_tombstone, { 1, 3 })
|
||||
.produces(kind::clustering_row, { 1, 3 })
|
||||
.produces(kind::clustering_row, { 1, 3, 4 })
|
||||
.produces(kind::clustering_row, { 1, 4 })
|
||||
.produces(kind::clustering_row, { 1, 4, 0 })
|
||||
.produces(kind::range_tombstone, { 2 })
|
||||
.produces(kind::range_tombstone, { 2, 1 })
|
||||
.produces(kind::range_tombstone, { 2, 1 })
|
||||
.produces(kind::range_tombstone, { 2, 2 })
|
||||
.produces(kind::range_tombstone, { 2, 2 })
|
||||
.produces_end_of_stream();
|
||||
|
||||
smopt = reader().get0();
|
||||
BOOST_REQUIRE(!smopt);
|
||||
@@ -2790,3 +2773,108 @@ SEASTAR_TEST_CASE(basic_date_tiered_strategy_test) {
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
|
||||
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
|
||||
compaction_manager cm;
|
||||
column_family::config cfg;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
|
||||
// deterministic timestamp for Fri, 01 Jan 2016 00:00:00 GMT.
|
||||
auto tp = db_clock::from_time_t(1451606400);
|
||||
int64_t timestamp = tp.time_since_epoch().count() * 1000; // in microseconds.
|
||||
|
||||
std::vector<sstables::shared_sstable> candidates;
|
||||
int min_threshold = cf->schema()->min_compaction_threshold();
|
||||
|
||||
// add sstables that belong to same time window until min threshold is satisfied.
|
||||
for (auto i = 1; i <= min_threshold; i++) {
|
||||
auto sst = add_sstable_for_overlapping_test(cf, /*gen*/i, "a", "a",
|
||||
build_stats(timestamp, timestamp, std::numeric_limits<int32_t>::max()));
|
||||
candidates.push_back(sst);
|
||||
}
|
||||
// belongs to the time window
|
||||
auto tp2 = tp + std::chrono::seconds(1800);
|
||||
timestamp = tp2.time_since_epoch().count() * 1000;
|
||||
auto sst = add_sstable_for_overlapping_test(cf, /*gen*/min_threshold + 1, "a", "a",
|
||||
build_stats(timestamp, timestamp, std::numeric_limits<int32_t>::max()));
|
||||
candidates.push_back(sst);
|
||||
|
||||
// doesn't belong to the time window above
|
||||
auto tp3 = tp + std::chrono::seconds(4000);
|
||||
timestamp = tp3.time_since_epoch().count() * 1000;
|
||||
auto sst2 = add_sstable_for_overlapping_test(cf, /*gen*/min_threshold + 2, "a", "a",
|
||||
build_stats(timestamp, timestamp, std::numeric_limits<int32_t>::max()));
|
||||
candidates.push_back(sst2);
|
||||
|
||||
std::map<sstring, sstring> options;
|
||||
// Use a 1-hour time window.
|
||||
options.emplace(sstring("base_time_seconds"), sstring("3600"));
|
||||
|
||||
date_tiered_manifest manifest(options);
|
||||
auto gc_before = gc_clock::time_point(std::chrono::seconds(0)); // disable gc before.
|
||||
auto sstables = manifest.get_next_sstables(*cf, candidates, gc_before);
|
||||
std::unordered_set<int64_t> gens;
|
||||
for (auto sst : sstables) {
|
||||
gens.insert(sst->generation());
|
||||
}
|
||||
BOOST_REQUIRE(sstables.size() == size_t(min_threshold + 1));
|
||||
BOOST_REQUIRE(gens.count(min_threshold + 1));
|
||||
BOOST_REQUIRE(!gens.count(min_threshold + 2));
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_promoted_index_read) {
|
||||
// create table promoted_index_read (
|
||||
// pk int,
|
||||
// ck1 int,
|
||||
// ck2 int,
|
||||
// v int,
|
||||
// primary key (pk, ck1, ck2)
|
||||
// );
|
||||
//
|
||||
// column_index_size_in_kb: 0
|
||||
//
|
||||
// delete from promoted_index_read where pk = 0 and ck1 = 0;
|
||||
// insert into promoted_index_read (pk, ck1, ck2, v) values (0, 0, 0, 0);
|
||||
// insert into promoted_index_read (pk, ck1, ck2, v) values (0, 0, 1, 1);
|
||||
//
|
||||
// SSTable:
|
||||
// [
|
||||
// {"key": "0",
|
||||
// "cells": [["0:_","0:!",1468923292708929,"t",1468923292],
|
||||
// ["0:_","0:!",1468923292708929,"t",1468923292],
|
||||
// ["0:0:","",1468923308379491],
|
||||
// ["0:_","0:!",1468923292708929,"t",1468923292],
|
||||
// ["0:0:v","0",1468923308379491],
|
||||
// ["0:_","0:!",1468923292708929,"t",1468923292],
|
||||
// ["0:1:","",1468923311744298],
|
||||
// ["0:_","0:!",1468923292708929,"t",1468923292],
|
||||
// ["0:1:v","1",1468923311744298]]}
|
||||
// ]
|
||||
|
||||
return seastar::async([] {
|
||||
auto s = schema_builder("ks", "promoted_index_read")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck1", int32_type, column_kind::clustering_key)
|
||||
.with_column("ck2", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", int32_type)
|
||||
.build();
|
||||
|
||||
auto sst = make_lw_shared<sstable>("ks", "promoted_index_read", "tests/sstables/promoted_index_read", 1, sstables::sstable::version_types::ka, big);
|
||||
sst->load().get0();
|
||||
|
||||
auto rd = sstable_reader(sst, s);
|
||||
auto smopt = rd().get0();
|
||||
BOOST_REQUIRE(smopt);
|
||||
|
||||
using kind = mutation_fragment::kind;
|
||||
assert_that_stream(std::move(*smopt))
|
||||
.produces(kind::range_tombstone, { 0 })
|
||||
.produces(kind::clustering_row, { 0, 0 })
|
||||
.produces(kind::clustering_row, { 0, 1 })
|
||||
.produces_end_of_stream();
|
||||
});
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
1158289805
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
Data.db
|
||||
TOC.txt
|
||||
Digest.sha1
|
||||
Filter.db
|
||||
Summary.db
|
||||
Statistics.db
|
||||
Index.db
|
||||
CompressionInfo.db
|
||||
@@ -51,6 +51,8 @@
|
||||
#include <boost/range/adaptor/reversed.hpp>
|
||||
#include <boost/range/adaptor/indirected.hpp>
|
||||
#include "query-result-reader.hh"
|
||||
#include "thrift/server.hh"
|
||||
#include "db/size_estimates_recorder.hh"
|
||||
|
||||
using namespace ::apache::thrift;
|
||||
using namespace ::apache::thrift::protocol;
|
||||
@@ -92,8 +94,10 @@ public:
|
||||
throw make_exception<InvalidRequestException>(ae.what());
|
||||
} catch (exceptions::configuration_exception& ce) {
|
||||
throw make_exception<InvalidRequestException>(ce.what());
|
||||
} catch (no_such_column_family&) {
|
||||
throw NotFoundException();
|
||||
} catch (exceptions::invalid_request_exception& ire) {
|
||||
throw make_exception<InvalidRequestException>(ire.what());
|
||||
} catch (no_such_column_family& nocf) {
|
||||
throw make_exception<InvalidRequestException>(nocf.what());
|
||||
} catch (no_such_keyspace&) {
|
||||
throw NotFoundException();
|
||||
} catch (exceptions::syntax_exception& se) {
|
||||
@@ -130,23 +134,6 @@ with_cob(tcxx::function<void (const T& ret)>&& cob,
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func, typename T>
|
||||
void
|
||||
with_cob_dereference(tcxx::function<void (const T& ret)>&& cob,
|
||||
tcxx::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob,
|
||||
Func&& func) {
|
||||
using ptr_type = foreign_ptr<lw_shared_ptr<T>>;
|
||||
// then_wrapped() terminates the fiber by calling one of the cob objects
|
||||
futurize<ptr_type>::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<ptr_type> f) {
|
||||
try {
|
||||
cob(*f.get0());
|
||||
} catch (...) {
|
||||
delayed_exception_wrapper dew(std::current_exception());
|
||||
exn_cob(&dew);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
void
|
||||
with_cob(tcxx::function<void ()>&& cob,
|
||||
@@ -274,7 +261,7 @@ public:
|
||||
return query::result_view::do_with(*result, [schema, cmd, cell_limit](query::result_view v) {
|
||||
column_aggregator aggregator(*schema, cmd->slice, cell_limit);
|
||||
v.consume(cmd->slice, aggregator);
|
||||
return aggregator.release();
|
||||
return aggregator.release_as_map();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -300,7 +287,7 @@ public:
|
||||
return query::result_view::do_with(*result, [schema, cmd, cell_limit](query::result_view v) {
|
||||
column_counter counter(*schema, cmd->slice, cell_limit);
|
||||
v.consume(cmd->slice, counter);
|
||||
return counter.release();
|
||||
return counter.release_as_map();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -637,7 +624,7 @@ public:
|
||||
}
|
||||
|
||||
void describe_version(tcxx::function<void(std::string const& _return)> cob) {
|
||||
cob("20.1.0");
|
||||
cob(org::apache::cassandra::thrift_version);
|
||||
}
|
||||
|
||||
void do_describe_ring(tcxx::function<void(std::vector<TokenRange> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace, bool local) {
|
||||
@@ -707,13 +694,15 @@ public:
|
||||
}
|
||||
|
||||
void describe_splits(tcxx::function<void(std::vector<std::string> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
|
||||
// FIXME: Maybe implement.
|
||||
// Origin's thrift interface has this to say about the verb:
|
||||
// "experimental API for hadoop/parallel query support. may change violently and without warning.".
|
||||
// Some drivers have moved away from depending on this verb (SPARKC-94). The correct way to implement
|
||||
// this, as well as describe_splits_ex, is to use the size_estimates system table (CASSANDRA-7688).
|
||||
// However, we currently don't populate that table, which is done by SizeEstimatesRecorder.java in Origin.
|
||||
return pass_unimplemented(exn_cob);
|
||||
return describe_splits_ex([cob = std::move(cob)](auto&& results) {
|
||||
std::vector<std::string> res;
|
||||
res.reserve(results.size() + 1);
|
||||
res.emplace_back(results[0].start_token);
|
||||
for (auto&& s : results) {
|
||||
res.emplace_back(std::move(s.end_token));
|
||||
}
|
||||
return cob(std::move(res));
|
||||
}, exn_cob, cfName, start_token, end_token, keys_per_split);
|
||||
}
|
||||
|
||||
void trace_next_query(tcxx::function<void(std::string const& _return)> cob) {
|
||||
@@ -723,8 +712,40 @@ public:
|
||||
}
|
||||
|
||||
void describe_splits_ex(tcxx::function<void(std::vector<CfSplit> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
|
||||
// FIXME: To implement. See describe_splits.
|
||||
return pass_unimplemented(exn_cob);
|
||||
with_cob(std::move(cob), std::move(exn_cob), [&]{
|
||||
auto tstart = start_token.empty() ? dht::minimum_token() : dht::global_partitioner().from_sstring(sstring(start_token));
|
||||
auto tend = end_token.empty() ? dht::maximum_token() : dht::global_partitioner().from_sstring(sstring(end_token));
|
||||
return db::get_local_size_estimates_recorder().record_size_estimates()
|
||||
.then([this, keys_per_split, cf = sstring(cfName), tstart = std::move(tstart), tend = std::move(tend)] {
|
||||
return db::system_keyspace::query_size_estimates(current_keyspace(), std::move(cf), std::move(tstart), std::move(tend))
|
||||
.then([keys_per_split](auto&& estimates) {
|
||||
std::vector<CfSplit> splits;
|
||||
if (estimates.empty()) {
|
||||
return splits;
|
||||
}
|
||||
auto&& acc = estimates[0];
|
||||
auto emplace_acc = [&] {
|
||||
splits.emplace_back();
|
||||
auto start_token = dht::global_partitioner().to_sstring(acc.range_start_token);
|
||||
auto end_token = dht::global_partitioner().to_sstring(acc.range_end_token);
|
||||
splits.back().__set_start_token(bytes_to_string(to_bytes_view(start_token)));
|
||||
splits.back().__set_end_token(bytes_to_string(to_bytes_view(end_token)));
|
||||
splits.back().__set_row_count(acc.partitions_count);
|
||||
};
|
||||
for (auto&& e : estimates | boost::adaptors::sliced(1, estimates.size())) {
|
||||
if (acc.partitions_count + e.partitions_count > keys_per_split) {
|
||||
emplace_acc();
|
||||
acc = std::move(e);
|
||||
} else {
|
||||
acc.range_end_token = std::move(e.range_end_token);
|
||||
acc.partitions_count += e.partitions_count;
|
||||
}
|
||||
}
|
||||
emplace_acc();
|
||||
return splits;
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void system_add_column_family(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const CfDef& cf_def) {
|
||||
@@ -810,6 +831,9 @@ public:
|
||||
}
|
||||
|
||||
auto s = schema_from_thrift(cf_def, cf_def.keyspace, schema->id());
|
||||
if (schema->thrift().is_dynamic() && s->regular_columns_count() > 1) {
|
||||
fail(unimplemented::cause::MIXED_CF);
|
||||
}
|
||||
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::ALTER).then([this, s = std::move(s)] {
|
||||
return service::get_local_migration_manager().announce_column_family_update(std::move(s), true, false).then([this] {
|
||||
return std::string(_db.local().get_version().to_sstring());
|
||||
@@ -1041,7 +1065,7 @@ private:
|
||||
}
|
||||
cf_def.__set_comment(s->comment());
|
||||
cf_def.__set_read_repair_chance(s->read_repair_chance());
|
||||
if (s->regular_columns_count()) {
|
||||
if (!s->thrift().is_dynamic()) {
|
||||
std::vector<ColumnDef> columns;
|
||||
for (auto&& c : s->regular_columns()) {
|
||||
ColumnDef c_def;
|
||||
@@ -1104,7 +1128,7 @@ private:
|
||||
builder.with_column(to_bytes(cf_def.key_alias), std::move(pk_types.back()), column_kind::partition_key);
|
||||
} else {
|
||||
for (uint32_t i = 0; i < pk_types.size(); ++i) {
|
||||
builder.with_column(to_bytes("key" + (i + 1)), std::move(pk_types[i]), column_kind::partition_key);
|
||||
builder.with_column(to_bytes(sprint("key%d", i + 1)), std::move(pk_types[i]), column_kind::partition_key);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -1120,7 +1144,7 @@ private:
|
||||
auto ck_types = std::move(p.first);
|
||||
builder.set_is_compound(p.second);
|
||||
for (uint32_t i = 0; i < ck_types.size(); ++i) {
|
||||
builder.with_column(to_bytes("column" + (i + 1)), std::move(ck_types[i]), column_kind::clustering_key);
|
||||
builder.with_column(to_bytes(sprint("column%d", i + 1)), std::move(ck_types[i]), column_kind::clustering_key);
|
||||
}
|
||||
auto&& vtype = cf_def.__isset.default_validation_class
|
||||
? db::marshal::type_parser::parse(to_sstring(cf_def.default_validation_class))
|
||||
@@ -1205,18 +1229,11 @@ private:
|
||||
ks_def.durable_writes,
|
||||
std::move(cf_defs));
|
||||
}
|
||||
static column_family& lookup_column_family(database& db, const sstring& ks_name, const sstring& cf_name) {
|
||||
static schema_ptr lookup_schema(database& db, const sstring& ks_name, const sstring& cf_name) {
|
||||
if (ks_name.empty()) {
|
||||
throw make_exception<InvalidRequestException>("keyspace not set");
|
||||
}
|
||||
try {
|
||||
return db.find_column_family(ks_name, cf_name);
|
||||
} catch (no_such_column_family&) {
|
||||
throw make_exception<InvalidRequestException>("column family %s not found", cf_name);
|
||||
}
|
||||
}
|
||||
static schema_ptr lookup_schema(database& db, const sstring& ks_name, const sstring& cf_name) {
|
||||
return lookup_column_family(db, ks_name, cf_name).schema();
|
||||
return db.find_schema(ks_name, cf_name);
|
||||
}
|
||||
static partition_key key_from_thrift(const schema& s, bytes_view k) {
|
||||
thrift_validation::validate_key(s, k);
|
||||
@@ -1433,17 +1450,23 @@ private:
|
||||
const schema& _s;
|
||||
const query::partition_slice& _slice;
|
||||
uint32_t _cell_limit;
|
||||
std::map<std::string, typename Aggregator::type> _aggregation;
|
||||
std::vector<std::pair<std::string, typename Aggregator::type>> _aggregation;
|
||||
typename Aggregator::type* _current_aggregation;
|
||||
public:
|
||||
column_visitor(const schema& s, const query::partition_slice& slice, uint32_t cell_limit)
|
||||
: _s(s), _slice(slice), _cell_limit(cell_limit)
|
||||
{ }
|
||||
std::map<std::string, typename Aggregator::type>&& release() {
|
||||
std::vector<std::pair<std::string, typename Aggregator::type>>&& release() {
|
||||
return std::move(_aggregation);
|
||||
}
|
||||
std::map<std::string, typename Aggregator::type> release_as_map() {
|
||||
return std::map<std::string, typename Aggregator::type>(
|
||||
boost::make_move_iterator(_aggregation.begin()),
|
||||
boost::make_move_iterator(_aggregation.end()));
|
||||
}
|
||||
void accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
_current_aggregation = &_aggregation[partition_key_to_string(_s, key)];
|
||||
_aggregation.emplace_back(partition_key_to_string(_s, key), typename Aggregator::type());
|
||||
_current_aggregation = &_aggregation.back().second;
|
||||
}
|
||||
void accept_new_partition(uint32_t row_count) {
|
||||
// We always ask for the partition_key to be sent in query_opts().
|
||||
@@ -1658,7 +1681,7 @@ private:
|
||||
}
|
||||
add_live_cell(s, col, *def, clustering_key_prefix::make_empty(s), m_to_apply);
|
||||
} else {
|
||||
throw make_exception<InvalidRequestException>("No such column %s", col.name);
|
||||
fail(unimplemented::cause::MIXED_CF);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
|
||||
"parameters map<text, text>,"
|
||||
"request text,"
|
||||
"started_at timestamp,"
|
||||
"PRIMARY KEY ((session_id)))", KEYSPACE_NAME, SESSIONS);
|
||||
"PRIMARY KEY ((session_id))) "
|
||||
"WITH default_time_to_live = 86400", KEYSPACE_NAME, SESSIONS);
|
||||
|
||||
_events_create_cql = sprint("CREATE TABLE %s.%s ("
|
||||
"session_id uuid,"
|
||||
@@ -82,7 +83,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
|
||||
"source inet,"
|
||||
"source_elapsed int,"
|
||||
"thread text,"
|
||||
"PRIMARY KEY ((session_id), event_id))", KEYSPACE_NAME, EVENTS);
|
||||
"PRIMARY KEY ((session_id), event_id)) "
|
||||
"WITH default_time_to_live = 86400", KEYSPACE_NAME, EVENTS);
|
||||
}
|
||||
|
||||
future<> trace_keyspace_helper::setup_table(const sstring& name, const sstring& cql) const {
|
||||
|
||||
@@ -803,7 +803,7 @@ future<response_type> cql_server::connection::process_prepare(uint16_t stream, b
|
||||
tracing::trace(cs.get_trace_state(), "Done preparing on remote shards");
|
||||
return _server._query_processor.local().prepare(query, cs, false).then([this, stream, &cs] (auto msg) {
|
||||
tracing::trace(cs.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
|
||||
return messages::result_message::prepared::cql::get_id(std::move(msg));
|
||||
return messages::result_message::prepared::cql::get_id(msg);
|
||||
}));
|
||||
return this->make_result(stream, msg);
|
||||
});
|
||||
|
||||
@@ -59,6 +59,7 @@ std::ostream& operator<<(std::ostream& out, cause c) {
|
||||
case cause::STORAGE_SERVICE: return out << "STORAGE_SERVICE";
|
||||
case cause::API: return out << "API";
|
||||
case cause::SCHEMA_CHANGE: return out << "SCHEMA_CHANGE";
|
||||
case cause::MIXED_CF: return out << "MIXED_CF";
|
||||
}
|
||||
assert(0);
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ enum class cause {
|
||||
WRAP_AROUND, // Support for handling wrap around ranges in queries on database level and below
|
||||
STORAGE_SERVICE,
|
||||
SCHEMA_CHANGE,
|
||||
MIXED_CF,
|
||||
};
|
||||
|
||||
void fail(cause what) __attribute__((noreturn));
|
||||
|
||||
@@ -55,7 +55,7 @@ static thread_local auto reusable_indexes = std::vector<long>();
|
||||
|
||||
void bloom_filter::set_indexes(int64_t base, int64_t inc, int count, long max, std::vector<long>& results) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
results[i] = abs(base % max);
|
||||
results[i] = std::abs(base % max);
|
||||
base = static_cast<int64_t>(static_cast<uint64_t>(base) + static_cast<uint64_t>(inc));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user