Compare commits

...

44 Commits

Author SHA1 Message Date
Gleb Natapov
23c340bed8 api: fix use after free in sum_sstable
get_sstables_including_compacted_undeleted() may return temporary shared
ptr which will be destroyed before the loop if not stored locally.

Fixes #1514

Message-Id: <20160728100504.GD2502@scylladb.com>
(cherry picked from commit 3531dd8d71)
2016-07-28 14:28:25 +03:00
Pekka Enberg
503f6c6755 release: prepare for 1.3.rc2 2016-07-28 10:57:11 +03:00
Tomasz Grabiec
7d73599acd tests: lsa_async_eviction_test: Use chunked_fifo<>
To protect against large reallocations during push() which are done
under reclaim lock and may fail.
2016-07-28 09:43:51 +02:00
Piotr Jastrzebski
bf27379583 Add tests for wide partiton handling in cache.
They shouldn't be cached.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit 7d29cdf81f)
2016-07-27 14:09:45 +03:00
Piotr Jastrzebski
02cf5a517a Add collectd counter for uncached wide partitions.
Keep track of every read of wide partition that's
not cached.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit 37a7d49676)
2016-07-27 14:09:40 +03:00
Piotr Jastrzebski
ec3d59bf13 Add flag to configure
max size of a cached partition.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit 636a4acfd0)
2016-07-27 14:09:34 +03:00
Piotr Jastrzebski
30c72ef3b4 Try to read whole streamed_mutation up to limit
If limit is exceeded then return the streamed_mutation
and don't cache it.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit 98c12dc2e2)
2016-07-27 14:09:29 +03:00
Piotr Jastrzebski
15e69a32ba Implement mutation_from_streamed_mutation_with_limit
If mutation is bigger than this limit
it won't be read and mutation_from_streamed_mutation
will return empty optional.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit 0d39bb1ad0)
2016-07-27 14:09:23 +03:00
Paweł Dziepak
4e43cb84ff mests/sstables: test reading sstable with duplicated range tombstones
Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
(cherry picked from commit b405ff8ad2)
2016-07-27 14:09:02 +03:00
Paweł Dziepak
07d5e939be sstables: avoid recursion in sstable_streamed_mutation::read_next()
Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
(cherry picked from commit 04f2c278c2)
2016-07-27 14:06:03 +03:00
Paweł Dziepak
a2a5a22504 sstables: protect against duplicated range tombstones
Promoted index may cause sstable to have range tombstones duplicated
several times. These duplicates appear in the "wrong" place since they
are smaller than the entity preceeding them.

This patch ignores such duplicates by skipping range tombstones that are
smaller than previously read ones.

Moreover, these duplicted range tombstone may appear in the middle of
clustering row, so the sstable reader has also gained the ability to
merge parts of the row in such cases.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
(cherry picked from commit 08032db269)
2016-07-27 14:05:58 +03:00
Paweł Dziepak
a39bec0e24 tests: extract streamed_mutation assertions
Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
(cherry picked from commit 50469e5ef3)
2016-07-27 14:05:43 +03:00
Duarte Nunes
f0af5719d5 thrift: Preserve partition order when accumulating
This patch changes the column_visitor so that it preservers the order
of the partitions it visits when building the accumulation result.

This is required by verbs such as get_range_slice, on top of which
users can implement paging. In such cases, the last key returned by
the query will be that start of the range for the next query. If
that key is not actually the last in the partitioner's order, then
the new request will likely result in duplicate values being sent.

Ref #693

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1469568135-19644-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 5aaf43d1bc)
2016-07-27 12:11:41 +03:00
Avi Kivity
0523000af5 size_estimates_recorder: unwrap ranges before searching for sstables
column_family::select_sstables() requires unwrapped ranges, so unwrap
them.  Fixes crash with Leveled Compaction Strategy.

Fixes #1507.

Reviewed-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1469563488-14869-1-git-send-email-avi@scylladb.com>
(cherry picked from commit 64d0cf58ea)
2016-07-27 10:07:13 +03:00
Paweł Dziepak
69a0e6e002 stables: fix skipping partitions with no rows
If partition contains no static and clustering rows or range tombstones
mp_row_consumer will return disengaged mutation_fragment_opt with
is_mutation_end flag set to mark end of this partition.

Current, mutation_reader::impl code incorrectly recognized disengaged
mutation fragment as end of the stream of all mutations. This patch
fixes that by using is_mutation_end flag to determine whether end of
partition or end of stream was reached.

Fixes #1503.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1469525449-15525-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit efa690ce8c)
2016-07-26 13:10:31 +03:00
Amos Kong
58d4de295c scylla-housekeeping: fix typo of script path
I tried to start scylla-housekeeping service by:
 # sudo systemctl restart scylla-housekeeping.service

But it's failed for wrong script path, error detail:
 systemd[5605]: Failed at step EXEC spawning
 /usr/lib/scylla/scylla-Housekeeping: No such file or directory

The right script name is 'scylla-housekeeping'

Signed-off-by: Amos Kong <amos@scylladb.com>
Message-Id: <c11319a3c7d3f22f613f5f6708699be0aa6bd740.1469506477.git.amos@scylladb.com>
(cherry picked from commit 64530e9686)
2016-07-26 09:19:15 +03:00
Vlad Zolotarov
026061733f tracing: set a default TTL for system_traces tables when they are created
Fixes #1482

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1469104164-4452-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 4647ad9d8a)
2016-07-25 13:50:43 +03:00
Vlad Zolotarov
1d7ed190f8 SELECT tracing instrumentation: improve inter-nodes communication stages messages
Add/fix "sending to"/"received from" messages.

With this patch the single key select trace with a data on an external node
looks as follows:

Tracing session: 65dbfcc0-4f51-11e6-8dd2-000000000001

 activity                                                                                                                        | timestamp                  | source    | source_elapsed
---------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------
                                                                                                              Execute CQL3 query | 2016-07-21 17:42:50.124000 | 127.0.0.2 |              0
                                                                                                   Parsing a statement [shard 1] | 2016-07-21 17:42:50.124127 | 127.0.0.2 |             --
                                                                                                Processing a statement [shard 1] | 2016-07-21 17:42:50.124190 | 127.0.0.2 |             64
 Creating read executor for token 2309717968349690594 with all: {127.0.0.1} targets: {127.0.0.1} repair decision: NONE [shard 1] | 2016-07-21 17:42:50.124229 | 127.0.0.2 |            103
                                                                            read_data: sending a message to /127.0.0.1 [shard 1] | 2016-07-21 17:42:50.124234 | 127.0.0.2 |            108
                                                                           read_data: message received from /127.0.0.2 [shard 1] | 2016-07-21 17:42:50.124358 | 127.0.0.1 |             14
                                                          read_data handling is done, sending a response to /127.0.0.2 [shard 1] | 2016-07-21 17:42:50.124434 | 127.0.0.1 |             89
                                                                               read_data: got response from /127.0.0.1 [shard 1] | 2016-07-21 17:42:50.124662 | 127.0.0.2 |            536
                                                                                  Done processing - preparing a result [shard 1] | 2016-07-21 17:42:50.124695 | 127.0.0.2 |            569
                                                                                                                Request complete | 2016-07-21 17:42:50.124580 | 127.0.0.2 |            580

Fixes #1481

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1469112271-22818-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 57b58cad8e)
2016-07-25 13:50:39 +03:00
Raphael S. Carvalho
2d66a4621a compaction: do not convert timestamp resolution to uppercase
C* only allows timestamp resolution in uppercase, so we shouldn't
be forgiving about it, otherwise migration to C* will not work.
Timestamp resolution is stored in compaction strategy options of
schema BTW.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <d64878fc9bbcf40fd8de3d0f08cce9f6c2fde717.1469133851.git.raphaelsc@scylladb.com>
(cherry picked from commit c4f34f5038)
2016-07-25 13:47:23 +03:00
Duarte Nunes
aaa9b5ace8 system_keyspace: Add query_size_estimates() function
The query_size_estimates() function queries the size_estimates system
table for a given keyspace and table, filtering out the token ranges
according to the specified tokens.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit ecfa04da77)
2016-07-25 13:43:16 +03:00
Duarte Nunes
8d491e9879 size_estimates_recorder: Fix stop()
This patch fixes stop() by checking if the current CPU instead of
whether the service is active (which it won't be at the time stop() is
called).

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit d984cc30bf)
2016-07-25 13:43:08 +03:00
Duarte Nunes
b63c9fb84b system_keyspace: Avoid pointers in range_estimates
This patch makes range_estimates a proper struct, where tokens are
represented as dht::tokens rather than dht::ring_position*.

We also pass other arguments to update_ and clear_size_estimates by
copy, since one will already be required.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit e16f3f2969)
2016-07-25 13:42:53 +03:00
Duarte Nunes
b229f03198 thrift: Fail when creating mixed CF
This patch ensures we fail when creating a mixed column family, either
when adding columns to a dynamic CF through updated_column_family() or
when adding a dynamic column upon insertion.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1469378658-19853-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 5c4a2044d5)
2016-07-25 13:42:05 +03:00
Duarte Nunes
6caa59560b thrift: Correctly translate no_such_column_family
The no_such_column_family exception is translated to
InvalidRequestException instead of to NotFoundException.

8991d35231 exposed this problem.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1469376674-14603-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 560cc12fd7)
2016-07-25 13:41:58 +03:00
Duarte Nunes
79196af9fb thrift: Implement describe_splits verb
This patch implements the describe_splits verb on top of
describe_splits_ex.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit ab08561b89)
2016-07-25 13:41:54 +03:00
Duarte Nunes
afe09da858 thrift: Implement describe_splits_ex verb
This patch implements the describe_splits_ex verbs by querying the
size_estimates system table for all the estimates in the specified
token range.

If the keys_per_split argument is bigger then the
estimated partitions count, then we merge ranges until keys_per_split
is met. Note that the tokens can't be split any further,
keys_per_split might be less than the reported number of keys in one
or more ranges.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit 472c23d7d2)
2016-07-25 13:41:46 +03:00
Duarte Nunes
d6cb41ff24 thrift: Handle and convert invalid_request_exception
This patch converts an exceptions::invalid_request_exception
into a Thrift InvalidRequestException instead of into a generic one.

This makes TitanDB work correctly, which expects an
InvalidRequestException when setting a non-existent keyspace.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1469362086-1013-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 2be45c4806)
2016-07-24 16:46:18 +03:00
Duarte Nunes
6bf77c7b49 thrift: Use database::find_schema directly
This patch changes lookup_schema() so it directly calls
database::find_schema() instead of going through
database::find_column_family(). It also drops conversion of the
no_such_column_family exeption, as that is already handled at a higher
layer.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit 8991d35231)
2016-07-24 16:46:05 +03:00
Duarte Nunes
6d34b4dab7 thrift: Remove hardcoded version constant
...and use the one in thrift_server.hh instead.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit 038d42c589)
2016-07-24 16:45:46 +03:00
Duarte Nunes
d367f1e9ab thrift: Remove unused with_cob_dereference function
Signed-off-by: Duarte Nunes <duarte@scylladb.com>
(cherry picked from commit 8bb43d09b1)
2016-07-24 16:45:22 +03:00
Avi Kivity
75a36ae453 bloom_filter: fix overflow for large filters
We use ::abs(), which has an int parameter, on long arguments, resulting
in incorrect results.

Switch to std::abs() instead, which has the correct overloads.

Fixes #1494.

Message-Id: <1469347802-28933-1-git-send-email-avi@scylladb.com>
(cherry picked from commit 900639915d)
2016-07-24 11:32:28 +03:00
Tomasz Grabiec
35c1781913 schema_tables: Fix hang during keyspace drop
Fixes #1484.

We drop tables as part of keyspace drop. Table drop starts with
creating a snapshot on all shards. All shards must use the same
snapshot timestamp which, among other things, is part of the snapshot
name. The timestamp is generated using supplied timestamp generating
function (joinpoint object). The joinpoint object will wait for all
shards to arrive and then generate and return the timestamp.

However, we drop tables in parallel, using the same joinpoint
instance. So joinpoint may be contacted by snapshotting shards of
tables A and B concurrently, generating timestamp t1 for some shards
of table A and some shards of table B. Later the remaining shards of
table A will get a different timestamp. As a result, different shards
may use different snapshot names for the same table. The snapshot
creation will never complete because the sealing fiber waits for all
shards to signal it, on the same name.

The fix is to give each table a separate joinpoint instance.

Message-Id: <1469117228-17879-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 5e8f0efc85)
2016-07-22 15:36:45 +02:00
Vlad Zolotarov
1489b28ffd cql_server::connection::process_prepare(): don't std::move() a shared_ptr captured by reference in value_of() lambda
A seastar::value_of() lambda used in a trace point was doing the unthinkable:
it called std::move() on a value captured by reference. Not only it compiled(!!!)
but it also actually std::move()ed the shared_ptr before it was used in a make_result()
which naturally caused a SIGSEG crash.

Fixes #1491

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1469193763-27631-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 9423c13419)
2016-07-22 16:33:17 +03:00
Avi Kivity
f975653c94 Update seastar submodule to point at scylla-seastar 2016-07-21 12:31:09 +03:00
Duarte Nunes
96f5cbb604 thrift: Omit regular columns for dynamic CFs
This patch skips adding the auto-generated regular column when
describing a dynamic Column family for the describe_keyspace(s) verbs.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1469091720-10113-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit a436cf945c)
2016-07-21 12:06:29 +03:00
Raphael S. Carvalho
66ebef7d10 tests: add new test for date tiered strategy
This test set the time window to 1 hour and checks that the strategy
works accordingly.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit cf54af9e58)
2016-07-21 12:00:26 +03:00
Raphael S. Carvalho
789fb0db97 compaction: implement date tiered compaction strategy options
Now date tiered compaction strategy will take into account the
strategy options which are defined in the schema.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit eaa6e281a2)
2016-07-21 12:00:18 +03:00
Pekka Enberg
af7c0f6433 Revert "Merge seastar upstream"
This reverts commit aaf6786997.

We should backport the iotune fixes for 1.3 and not pull everything.
2016-07-21 11:19:50 +03:00
Pekka Enberg
aaf6786997 Merge seastar upstream
* seastar 103543a...9d1db3f (8):
  > reactor: limit task backlog
  > iotune: Fix SIGFPE with some executions
  > Merge "Preparation for protobuf" from Amnon
  > byteorder: add missing cpu_to_be(), be_to_cpu() functions
  > rpc: fix gcc-7 compilation error
  > reactor: Register the smp metrics disabled
  > scollectd: Allow creating metric that is disabled
  > Merge "Propagate timeout to a server" from Gleb
2016-07-21 11:04:31 +03:00
Pekka Enberg
e8cb163cdf db/config: Start Thrift server by default
We have Thrift support now so start the server by default.
Message-Id: <1469002000-26767-1-git-send-email-penberg@scylladb.com>

(cherry picked from commit aff8cf319d)
2016-07-20 11:29:24 +03:00
Duarte Nunes
2d7c322805 thrift: Actually concatenate strings
This patch fixes concatenating a char[] with an int by using sprint
instead of just increasing the pointer.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <1468971542-9600-1-git-send-email-duarte@scylladb.com>
(cherry picked from commit 64dff69077)
2016-07-20 11:09:15 +03:00
Tomasz Grabiec
13f18c6445 database: Add table name to log message about sealing
Message-Id: <1468917744-2539-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 0d26294fac)
2016-07-20 10:13:32 +03:00
Tomasz Grabiec
9c430c2cff schema_tables: Add more logging
Message-Id: <1468917771-2592-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit a0832f08d2)
2016-07-20 10:13:28 +03:00
Pekka Enberg
c84e030fe9 release: prepare for 1.3.rc1 2016-07-19 20:15:26 +03:00
38 changed files with 760 additions and 243 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.3.rc2
if test -f version
then

View File

@@ -219,8 +219,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.map_reduce0([uuid, total](database& db) {
std::unordered_map<sstring, uint64_t> m;
for (auto t :*((total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
db.find_column_family(uuid).get_sstables()).get()) {
auto sstables = (total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
db.find_column_family(uuid).get_sstables();
for (auto t : *sstables) {
m[t->get_filename()] = t->bytes_on_disk();
}
return m;
@@ -234,8 +235,9 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
static future<json::json_return_type> sum_sstable(http_context& ctx, bool total) {
return map_reduce_cf_raw(ctx, std::unordered_map<sstring, uint64_t>(), [total](column_family& cf) {
std::unordered_map<sstring, uint64_t> m;
for (auto t :*((total) ? cf.get_sstables_including_compacted_undeleted() :
cf.get_sstables()).get()) {
auto sstables = (total) ? cf.get_sstables_including_compacted_undeleted() :
cf.get_sstables();
for (auto t : *sstables) {
m[t->get_filename()] = t->bytes_on_disk();
}
return m;

View File

@@ -127,7 +127,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema)))
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker())
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker(), _config.max_cached_partition_size_in_bytes)
, _commitlog(cl)
, _compaction_manager(compaction_manager)
, _flush_queue(std::make_unique<memtable_flush_queue>())
@@ -785,7 +785,7 @@ future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_bi
future<>
column_family::seal_active_memtable(memtable_list::flush_behavior ignored) {
auto old = _memtables->back();
dblog.debug("Sealing active memtable, partitions: {}, occupancy: {}", old->partition_count(), old->occupancy());
dblog.debug("Sealing active memtable of {}.{}, partitions: {}, occupancy: {}", _schema->cf_name(), _schema->ks_name(), old->partition_count(), old->occupancy());
if (old->empty()) {
dblog.debug("Memtable is empty");
@@ -1581,7 +1581,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
return parallel_for_each(tables.begin(), tables.end(), [this] (auto& t) {
auto s = t.second;
auto& ks = this->find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s);
auto cfg = ks.make_column_family_config(*s, this->get_config());
this->add_column_family(s, std::move(cfg));
return ks.make_directory_for_column_family(s->cf_name(), s->id()).then([s] {});
});
@@ -1838,7 +1838,7 @@ void keyspace::update_from(::lw_shared_ptr<keyspace_metadata> ksm) {
}
column_family::config
keyspace::make_column_family_config(const schema& s) const {
keyspace::make_column_family_config(const schema& s, const db::config& db_config) const {
column_family::config cfg;
cfg.datadir = column_family_directory(s.cf_name(), s.id());
cfg.enable_disk_reads = _config.enable_disk_reads;
@@ -1852,6 +1852,7 @@ keyspace::make_column_family_config(const schema& s) const {
cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
return cfg;
}
@@ -2502,6 +2503,7 @@ future<> update_schema_version_and_announce(distributed<service::storage_proxy>&
return make_ready_future<>();
}).then([uuid] {
return db::system_keyspace::update_schema_version(uuid).then([uuid] {
dblog.info("Schema version changed to {}", uuid);
return service::get_local_migration_manager().passive_announce(uuid);
});
});

View File

@@ -316,6 +316,7 @@ public:
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr;
uint64_t max_cached_partition_size_in_bytes;
};
struct no_commitlog {};
struct stats {
@@ -884,7 +885,7 @@ public:
*/
locator::abstract_replication_strategy& get_replication_strategy();
const locator::abstract_replication_strategy& get_replication_strategy() const;
column_family::config make_column_family_config(const schema& s) const;
column_family::config make_column_family_config(const schema& s, const db::config& db_config) const;
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
void add_or_update_column_family(const schema_ptr& s) {
_metadata->add_or_update_column_family(s);

View File

@@ -369,6 +369,9 @@ public:
val(reduce_cache_sizes_at, double, .85, Invalid, \
"When Java heap usage (after a full concurrent mark sweep (CMS) garbage collection) exceeds this percentage, Cassandra reduces the cache capacity to the fraction of the current size as specified by reduce_cache_capacity_to. To disable, set the value to 1.0." \
) \
val(max_cached_partition_size_in_kb, uint64_t, 10240uLL, Used, \
"Partitions with size greater than this value won't be cached." \
) \
/* Disks settings */ \
val(stream_throughput_outbound_megabits_per_sec, uint32_t, 400, Unused, \
"Throttles all outbound streaming file transfers on a node to the specified throughput. Cassandra does mostly sequential I/O when streaming data during bootstrap or repair, which can lead to saturating the network connection and degrading client (RPC) performance." \
@@ -556,7 +559,7 @@ public:
val(rpc_port, uint16_t, 9160, Used, \
"Thrift port for client connections." \
) \
val(start_rpc, bool, false, Used, \
val(start_rpc, bool, true, Used, \
"Starts the Thrift RPC server" \
) \
val(rpc_keepalive, bool, true, Used, \

View File

@@ -665,13 +665,16 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
auto diff = difference(before, after, indirect_equal_to<lw_shared_ptr<query::result_set>>());
for (auto&& key : diff.entries_only_on_left) {
logger.info("Dropping keyspace {}", key);
dropped.emplace(key);
}
for (auto&& key : diff.entries_only_on_right) {
auto&& value = after[key];
logger.info("Creating keyspace {}", key);
created.emplace_back(schema_result_value_type{key, std::move(value)});
}
for (auto&& key : diff.entries_differing) {
logger.info("Altering keyspace {}", key);
altered.emplace_back(key);
}
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) mutable {
@@ -713,15 +716,21 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after)
{
struct dropped_table {
global_schema_ptr schema;
utils::joinpoint<db_clock::time_point> jp{[] {
return make_ready_future<db_clock::time_point>(db_clock::now());
}};
};
std::vector<global_schema_ptr> created;
std::vector<global_schema_ptr> altered;
std::vector<global_schema_ptr> dropped;
std::vector<dropped_table> dropped;
auto diff = difference(before, after);
for (auto&& key : diff.entries_only_on_left) {
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
dropped.emplace_back(s);
dropped.emplace_back(dropped_table{s});
}
for (auto&& key : diff.entries_only_on_right) {
auto s = create_table_from_mutations(after.at(key));
@@ -734,14 +743,12 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
altered.emplace_back(s);
}
do_with(utils::make_joinpoint([] { return db_clock::now();})
, [&created, &dropped, &altered, &proxy](auto& tsf) {
return proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, &tsf] (database& db) {
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered] (database& db) {
return seastar::async([&] {
for (auto&& gs : created) {
schema_ptr s = gs.get();
auto& ks = db.find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s);
auto cfg = ks.make_column_family_config(*s, db.get_config());
db.add_column_family(s, cfg);
auto& cf = db.find_column_family(s);
cf.mark_ready_for_writes();
@@ -751,14 +758,13 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
for (auto&& gs : altered) {
update_column_family(db, gs.get()).get();
}
parallel_for_each(dropped.begin(), dropped.end(), [&db, &tsf](auto&& gs) {
schema_ptr s = gs.get();
return db.drop_column_family(s->ks_name(), s->cf_name(), [&tsf] { return tsf.value(); }).then([s] {
parallel_for_each(dropped.begin(), dropped.end(), [&db](dropped_table& dt) {
schema_ptr s = dt.schema.get();
return db.drop_column_family(s->ks_name(), s->cf_name(), [&dt] { return dt.jp.value(); }).then([s] {
return service::get_local_migration_manager().notify_drop_column_family(s);
});
}).get();
});
});
}).get();
}

View File

@@ -71,14 +71,30 @@ static std::vector<db::system_keyspace::range_estimates> estimates_for(const col
std::vector<db::system_keyspace::range_estimates> estimates;
estimates.reserve(local_ranges.size());
std::vector<query::partition_range> unwrapped;
// Each range defines both bounds.
for (auto& range : local_ranges) {
int64_t count{0};
sstables::estimated_histogram hist{0};
for (auto&& sstable : cf.select_sstables(range)) {
unwrapped.clear();
if (range.is_wrap_around(dht::ring_position_comparator(*cf.schema()))) {
auto uw = range.unwrap();
unwrapped.push_back(std::move(uw.first));
unwrapped.push_back(std::move(uw.second));
} else {
unwrapped.push_back(range);
}
for (auto&& uwr : unwrapped) {
for (auto&& sstable : cf.select_sstables(uwr)) {
count += sstable->get_estimated_key_count();
hist.merge(sstable->get_stats_metadata().estimated_row_size);
}
}
estimates.emplace_back(&range, db::system_keyspace::partition_estimates{count, count > 0 ? hist.mean() : 0});
estimates.emplace_back(db::system_keyspace::range_estimates{
range.start()->value().token(),
range.end()->value().token(),
count,
count > 0 ? hist.mean() : 0});
}
return estimates;
@@ -130,7 +146,7 @@ future<> size_estimates_recorder::record_size_estimates() {
}
future<> size_estimates_recorder::stop() {
if (get_size_estimates_recorder().local_is_initialized()) {
if (engine().cpu_id() == 0) {
service::get_local_migration_manager().unregister_listener(this);
_timer.cancel();
return _gate.close();

View File

@@ -1043,7 +1043,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
db.add_keyspace(NAME, std::move(_ks));
auto& ks = db.find_keyspace(NAME);
for (auto&& table : all_tables()) {
db.add_column_family(table, ks.make_column_family_config(*table));
db.add_column_family(table, ks.make_column_family_config(*table, db.get_config()));
}
}
@@ -1195,10 +1195,10 @@ future<int> increment_and_get_generation() {
});
}
future<> update_size_estimates(const sstring& ks_name, const sstring& cf_name, std::vector<range_estimates> estimates) {
future<> update_size_estimates(sstring ks_name, sstring cf_name, std::vector<range_estimates> estimates) {
auto&& schema = size_estimates();
auto timestamp = api::new_timestamp();
mutation m_to_apply{partition_key::from_singular(*schema, ks_name), schema};
mutation m_to_apply{partition_key::from_single_value(*schema, to_bytes(ks_name)), schema};
// delete all previous values with a single range tombstone.
auto ck = clustering_key_prefix::from_single_value(*schema, utf8_type->decompose(cf_name));
@@ -1206,28 +1206,50 @@ future<> update_size_estimates(const sstring& ks_name, const sstring& cf_name, s
// add a CQL row for each primary token range.
for (auto&& e : estimates) {
// This range has both start and end bounds. We're only interested in the tokens.
const range<dht::ring_position>* range = e.first;
auto ck = clustering_key_prefix(std::vector<bytes>{
utf8_type->decompose(cf_name),
utf8_type->decompose(dht::global_partitioner().to_sstring(range->start()->value().token())),
utf8_type->decompose(dht::global_partitioner().to_sstring(range->end()->value().token()))});
utf8_type->decompose(dht::global_partitioner().to_sstring(e.range_start_token)),
utf8_type->decompose(dht::global_partitioner().to_sstring(e.range_end_token))});
auto mean_partition_size_col = schema->get_column_definition("mean_partition_size");
auto cell = atomic_cell::make_live(timestamp, long_type->decompose(e.second.mean_partition_size), { });
auto cell = atomic_cell::make_live(timestamp, long_type->decompose(e.mean_partition_size), { });
m_to_apply.set_clustered_cell(ck, *mean_partition_size_col, std::move(cell));
auto partitions_count_col = schema->get_column_definition("partitions_count");
cell = atomic_cell::make_live(timestamp, long_type->decompose(e.second.partitions_count), { });
cell = atomic_cell::make_live(timestamp, long_type->decompose(e.partitions_count), { });
m_to_apply.set_clustered_cell(std::move(ck), *partitions_count_col, std::move(cell));
}
return service::get_local_storage_proxy().mutate_locally(std::move(m_to_apply));
}
future<> clear_size_estimates(const sstring& ks_name, const sstring& cf_name) {
future<> clear_size_estimates(sstring ks_name, sstring cf_name) {
sstring req = "DELETE FROM system.%s WHERE keyspace_name = ? AND table_name = ?";
return execute_cql(req, SIZE_ESTIMATES, ks_name, cf_name).discard_result();
return execute_cql(std::move(req), SIZE_ESTIMATES, std::move(ks_name), std::move(cf_name)).discard_result();
}
future<std::vector<range_estimates>> query_size_estimates(sstring ks_name, sstring cf_name, dht::token start_token, dht::token end_token) {
sstring req = "SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.%s WHERE keyspace_name = ? AND table_name = ?";
return execute_cql(req, SIZE_ESTIMATES, std::move(ks_name), std::move(cf_name))
.then([start_token = std::move(start_token), end_token = std::move(end_token)](::shared_ptr<cql3::untyped_result_set> result) {
std::vector<range_estimates> estimates;
for (auto&& row : *result) {
auto range_start = dht::global_partitioner().from_sstring(row.get_as<sstring>("range_start"));
if (range_start < start_token) {
continue;
}
auto range_end = dht::global_partitioner().from_sstring(row.get_as<sstring>("range_end"));
if (range_end > end_token) {
break;
}
estimates.emplace_back(range_estimates{
std::move(range_start),
std::move(range_end),
row.get_as<int64_t>("partitions_count"),
row.get_as<int64_t>("mean_partition_size")});
}
return estimates;
});
}
} // namespace system_keyspace

View File

@@ -80,13 +80,13 @@ static constexpr auto SSTABLE_ACTIVITY = "sstable_activity";
static constexpr auto SIZE_ESTIMATES = "size_estimates";
// Partition estimates for a given range of tokens.
struct partition_estimates {
struct range_estimates {
dht::token range_start_token;
dht::token range_end_token;
int64_t partitions_count;
int64_t mean_partition_size;
};
using range_estimates = std::pair<const range<dht::ring_position>*, partition_estimates>;
extern schema_ptr hints();
extern schema_ptr batchlog();
extern schema_ptr built_indexes(); // TODO (from Cassandra): make private
@@ -572,12 +572,17 @@ future<> set_bootstrap_state(bootstrap_state state);
/**
* Writes the current partition count and size estimates into SIZE_ESTIMATES_CF
*/
future<> update_size_estimates(const sstring& ks_name, const sstring& cf_name, std::vector<range_estimates> estimates);
future<> update_size_estimates(sstring ks_name, sstring cf_name, std::vector<range_estimates> estimates);
/**
* Clears size estimates for a table (on table drop)
*/
future<> clear_size_estimates(const sstring& ks_name, const sstring& cf_name);
future<> clear_size_estimates(sstring ks_name, sstring cf_name);
/**
* Queries the size estimates within the specified range
*/
future<std::vector<range_estimates>> query_size_estimates(sstring ks_name, sstring cf_name, dht::token start_token, dht::token end_token);
} // namespace system_keyspace
} // namespace db

View File

@@ -5,7 +5,7 @@ Description=Scylla Housekeeping
Type=simple
User=scylla
Group=scylla
ExecStart=/usr/lib/scylla/scylla-Housekeeping -q version
ExecStart=/usr/lib/scylla/scylla-housekeeping -q version
[Install]
WantedBy=multi-user.target

View File

@@ -213,51 +213,90 @@ mutation& mutation::operator=(const mutation& m) {
return *this = mutation(m);
}
future<mutation_opt> mutation_from_streamed_mutation(streamed_mutation_opt sm)
{
class rebuilder {
mutation& _m;
public:
rebuilder(mutation& m) : _m(m) { }
enum class limit_mutation_size { yes, no };
stop_iteration consume(tombstone t) {
_m.partition().apply(t);
return stop_iteration::no;
template <limit_mutation_size with_limit>
class mutation_rebuilder {
mutation _m;
streamed_mutation& _sm;
size_t _remaining_limit;
template <typename T> bool check_remaining_limit(const T& e) {
if (with_limit == limit_mutation_size::no) {
return true;
}
stop_iteration consume(static_row&& sr) {
_m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells()));
return stop_iteration::no;
size_t size = e.memory_usage();
if (_remaining_limit <= size) {
_remaining_limit = 0;
} else {
_remaining_limit -= size;
}
return _remaining_limit > 0;
}
public:
mutation_rebuilder(streamed_mutation& sm)
: _m(sm.decorated_key(), sm.schema()), _sm(sm), _remaining_limit(0) {
static_assert(with_limit == limit_mutation_size::no,
"This constructor should be used only for mutation_rebuildeer with no limit");
}
mutation_rebuilder(streamed_mutation& sm, size_t limit)
: _m(sm.decorated_key(), sm.schema()), _sm(sm), _remaining_limit(limit) {
static_assert(with_limit == limit_mutation_size::yes,
"This constructor should be used only for mutation_rebuildeer with limit");
check_remaining_limit(_m.key());
}
stop_iteration consume(range_tombstone&& rt) {
_m.partition().apply_row_tombstone(*_m.schema(), std::move(rt));
return stop_iteration::no;
stop_iteration consume(tombstone t) {
_m.partition().apply(t);
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
if (!check_remaining_limit(rt)) {
return stop_iteration::yes;
}
_m.partition().apply_row_tombstone(*_m.schema(), std::move(rt));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
auto& dr = _m.partition().clustered_row(std::move(cr.key()));
dr.apply(cr.tomb());
dr.apply(cr.marker());
dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells()));
return stop_iteration::no;
stop_iteration consume(static_row&& sr) {
if (!check_remaining_limit(sr)) {
return stop_iteration::yes;
}
_m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells()));
return stop_iteration::no;
}
void consume_end_of_stream() { }
};
stop_iteration consume(clustering_row&& cr) {
if (!check_remaining_limit(cr)) {
return stop_iteration::yes;
}
auto& dr = _m.partition().clustered_row(std::move(cr.key()));
dr.apply(cr.tomb());
dr.apply(cr.marker());
dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells()));
return stop_iteration::no;
}
struct data {
mutation m;
streamed_mutation sm;
};
mutation_opt consume_end_of_stream() {
return with_limit == limit_mutation_size::yes && _remaining_limit == 0 ? mutation_opt()
: mutation_opt(std::move(_m));
}
};
future<mutation_opt>
mutation_from_streamed_mutation_with_limit(streamed_mutation sm, size_t limit) {
return do_with(std::move(sm), [limit] (auto& sm) {
return consume(sm, mutation_rebuilder<limit_mutation_size::yes>(sm, limit));
});
}
future<mutation_opt> mutation_from_streamed_mutation(streamed_mutation_opt sm) {
if (!sm) {
return make_ready_future<mutation_opt>();
}
mutation m(sm->decorated_key(), sm->schema());
return do_with(data { std::move(m), std::move(*sm) }, [] (auto& d) {
return consume(d.sm, rebuilder(d.m)).then([&d] {
return mutation_opt(std::move(d.m));
});
return do_with(std::move(*sm), [] (auto& sm) {
return consume(sm, mutation_rebuilder<limit_mutation_size::no>(sm));
});
}

View File

@@ -182,3 +182,5 @@ boost::iterator_range<std::vector<mutation>::const_iterator> slice(
const query::partition_range&);
future<mutation_opt> mutation_from_streamed_mutation(streamed_mutation_opt sm);
future<mutation_opt>
mutation_from_streamed_mutation_with_limit(streamed_mutation sm, size_t limit);

View File

@@ -38,6 +38,22 @@ static logging::logger logger("cache");
thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduling_group(1ms, 0.2);
enum class is_wide_partition { yes, no };
future<is_wide_partition, mutation_opt>
try_to_read(uint64_t max_cached_partition_size_in_bytes, streamed_mutation_opt&& sm) {
if (!sm) {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, mutation_opt());
}
return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_cached_partition_size_in_bytes).then(
[] (mutation_opt&& omo) mutable {
if (omo) {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, std::move(omo));
} else {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::yes, mutation_opt());
}
});
}
cache_tracker& global_cache_tracker() {
static thread_local cache_tracker instance;
@@ -103,6 +119,11 @@ cache_tracker::setup_collectd() {
, "total_operations", "misses")
, scollectd::make_typed(scollectd::data_type::DERIVE, _misses)
),
scollectd::add_polled_metric(scollectd::type_instance_id("cache"
, scollectd::per_cpu_plugin_instance
, "total_operations", "uncached_wide_partitions")
, scollectd::make_typed(scollectd::data_type::DERIVE, _uncached_wide_partitions)
),
scollectd::add_polled_metric(scollectd::type_instance_id("cache"
, scollectd::per_cpu_plugin_instance
, "total_operations", "insertions")
@@ -180,6 +201,10 @@ void cache_tracker::on_miss() {
++_misses;
}
void cache_tracker::on_uncached_wide_partition() {
++_uncached_wide_partitions;
}
allocation_strategy& cache_tracker::allocator() {
return _region.allocator();
}
@@ -196,29 +221,50 @@ const logalloc::region& cache_tracker::region() const {
class single_partition_populating_reader final : public mutation_reader::impl {
schema_ptr _schema;
row_cache& _cache;
mutation_source& _underlying;
mutation_reader _delegate;
const io_priority_class _pc;
query::clustering_key_filtering_context _ck_filtering;
public:
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate, query::clustering_key_filtering_context ck_filtering)
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_source& underlying,
mutation_reader delegate, const io_priority_class pc, query::clustering_key_filtering_context ck_filtering)
: _schema(std::move(s))
, _cache(cache)
, _underlying(underlying)
, _delegate(std::move(delegate))
, _pc(pc)
, _ck_filtering(ck_filtering)
{ }
virtual future<streamed_mutation_opt> operator()() override {
return _delegate().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return streamed_mutation_from_mutation(std::move(*mo));
auto op = _cache._populate_phaser.start();
return _delegate().then([this, op = std::move(op)] (auto sm) mutable {
if (!sm) {
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
}
return { };
dht::decorated_key dk = sm->decorated_key();
return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then(
[this, op = std::move(op), dk = std::move(dk)]
(is_wide_partition wide_partition, mutation_opt&& mo) {
if (wide_partition == is_wide_partition::no) {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
}
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
} else {
_cache.on_uncached_wide_partition();
auto reader = _underlying(_schema,
query::partition_range::make_singular(dht::ring_position(std::move(dk))),
_ck_filtering,
_pc);
return reader();
}
});
});
}
};
@@ -233,6 +279,10 @@ void row_cache::on_miss() {
_tracker.on_miss();
}
void row_cache::on_uncached_wide_partition() {
_tracker.on_uncached_wide_partition();
}
class just_cache_scanning_reader final {
schema_ptr _schema;
row_cache& _cache;
@@ -397,22 +447,38 @@ public:
{}
virtual future<streamed_mutation_opt> operator()() override {
update_reader();
return _reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
_last_key = dht::ring_position(mo->decorated_key());
_last_key_populate_phase = _cache._populate_phaser.phase();
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return streamed_mutation_from_mutation(std::move(*mo));
}
maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
return {};
auto op = _cache._populate_phaser.start();
return _reader().then([this, op = std::move(op)] (auto sm) mutable {
stdx::optional<dht::decorated_key> dk = (sm) ? stdx::optional<dht::decorated_key>(sm->decorated_key())
: stdx::optional<dht::decorated_key>(stdx::nullopt);
return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then(
[this, op = std::move(op), dk = std::move(dk)]
(is_wide_partition wide_partition, mutation_opt&& mo) mutable {
if (wide_partition == is_wide_partition::no) {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
this->maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
_last_key = dht::ring_position(mo->decorated_key());
_last_key_populate_phase = _cache._populate_phaser.phase();
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
}
this->maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
} else {
assert(bool(dk));
_last_key = std::experimental::optional<dht::ring_position>();
_cache.on_uncached_wide_partition();
auto reader = _underlying(_schema,
query::partition_range::make_singular(dht::ring_position(std::move(*dk))),
_ck_filtering,
_pc);
return reader();
}
});
});
}
};
@@ -678,8 +744,8 @@ row_cache::make_reader(schema_ptr s,
return make_reader_returning(e.read(*this, s, ck_filtering));
} else {
on_miss();
return make_mutation_reader<single_partition_populating_reader>(s, *this,
_underlying(_schema, range, query::no_clustering_key_filtering, pc),
return make_mutation_reader<single_partition_populating_reader>(s, *this, _underlying,
_underlying(_schema, range, query::no_clustering_key_filtering, pc), pc,
ck_filtering);
}
});
@@ -912,12 +978,13 @@ void row_cache::invalidate_unwrapped(const query::partition_range& range) {
}
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,
cache_tracker& tracker)
cache_tracker& tracker, uint64_t max_cached_partition_size_in_bytes)
: _tracker(tracker)
, _schema(std::move(s))
, _partitions(cache_entry::compare(_schema))
, _underlying(std::move(fallback_factory))
, _underlying_keys(std::move(underlying_keys))
, _max_cached_partition_size_in_bytes(max_cached_partition_size_in_bytes)
{
with_allocator(_tracker.allocator(), [this] {
cache_entry* entry = current_allocator().construct<cache_entry>(_schema);

View File

@@ -149,6 +149,7 @@ public:
private:
uint64_t _hits = 0;
uint64_t _misses = 0;
uint64_t _uncached_wide_partitions = 0;
uint64_t _insertions = 0;
uint64_t _merges = 0;
uint64_t _evictions = 0;
@@ -170,11 +171,13 @@ public:
void on_merge();
void on_hit();
void on_miss();
void on_uncached_wide_partition();
allocation_strategy& allocator();
logalloc::region& region();
const logalloc::region& region() const;
uint64_t modification_count() const { return _modification_count; }
uint64_t partitions() const { return _partitions; }
uint64_t uncached_wide_partitions() const { return _uncached_wide_partitions; }
};
// Returns a reference to shard-wide cache_tracker.
@@ -211,6 +214,7 @@ private:
partitions_type _partitions; // Cached partitions are complete.
mutation_source _underlying;
key_source _underlying_keys;
uint64_t _max_cached_partition_size_in_bytes;
// Synchronizes populating reads with updates of underlying data source to ensure that cache
// remains consistent across flushes with the underlying data source.
@@ -231,6 +235,7 @@ private:
query::clustering_key_filtering_context ck_filtering);
void on_hit();
void on_miss();
void on_uncached_wide_partition();
void upgrade_entry(cache_entry&);
void invalidate_locked(const dht::decorated_key&);
void invalidate_unwrapped(const query::partition_range&);
@@ -238,7 +243,7 @@ private:
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
public:
~row_cache();
row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&);
row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&, uint64_t _max_cached_partition_size_in_bytes = 10 * 1024 * 1024);
row_cache(row_cache&&) = default;
row_cache(const row_cache&) = delete;
row_cache& operator=(row_cache&&) = default;

View File

@@ -2128,10 +2128,13 @@ protected:
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> make_mutation_data_request(lw_shared_ptr<query::read_command> cmd, gms::inet_address ep, clock_type::time_point timeout) {
++_proxy->_stats.mutation_data_read_attempts.get_ep_stat(ep);
if (is_me(ep)) {
tracing::trace(_trace_state, "read_mutation_data: querying locally");
return _proxy->query_mutations_locally(_schema, cmd, _partition_range);
} else {
auto& ms = net::get_local_messaging_service();
return ms.send_read_mutation_data(net::messaging_service::msg_addr{ep, 0}, timeout, *cmd, _partition_range).then([this](reconcilable_result&& result) {
tracing::trace(_trace_state, "read_mutation_data: sending a message to /{}", ep);
return ms.send_read_mutation_data(net::messaging_service::msg_addr{ep, 0}, timeout, *cmd, _partition_range).then([this, ep](reconcilable_result&& result) {
tracing::trace(_trace_state, "read_mutation_data: got response from /{}", ep);
return make_foreign(::make_lw_shared<reconcilable_result>(std::move(result)));
});
}
@@ -2139,10 +2142,13 @@ protected:
future<foreign_ptr<lw_shared_ptr<query::result>>> make_data_request(gms::inet_address ep, clock_type::time_point timeout) {
++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
if (is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
return _proxy->query_singular_local(_schema, _cmd, _partition_range);
} else {
auto& ms = net::get_local_messaging_service();
return ms.send_read_data(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([this](query::result&& result) {
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
return ms.send_read_data(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([this, ep](query::result&& result) {
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
return make_foreign(::make_lw_shared<query::result>(std::move(result)));
});
}
@@ -2150,10 +2156,13 @@ protected:
future<query::result_digest, api::timestamp_type> make_digest_request(gms::inet_address ep, clock_type::time_point timeout) {
++_proxy->_stats.digest_read_attempts.get_ep_stat(ep);
if (is_me(ep)) {
tracing::trace(_trace_state, "read_digest: querying locally");
return _proxy->query_singular_local_digest(_schema, _cmd, _partition_range);
} else {
auto& ms = net::get_local_messaging_service();
return ms.send_read_digest(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([] (query::result_digest d, rpc::optional<api::timestamp_type> t) {
tracing::trace(_trace_state, "read_digest: sending a message to /{}", ep);
return ms.send_read_digest(net::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range).then([this, ep] (query::result_digest d, rpc::optional<api::timestamp_type> t) {
tracing::trace(_trace_state, "read_digest: got response from /{}", ep);
return make_ready_future<query::result_digest, api::timestamp_type>(d, t ? t.value() : api::missing_timestamp);
});
}
@@ -3271,10 +3280,11 @@ void storage_proxy::init_messaging_service() {
}
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
auto src_ip = src_addr.addr;
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p] (schema_ptr s) {
return p->query_singular_local(std::move(s), cmd, pr);
}).finally([&trace_state_ptr] () mutable {
tracing::trace(trace_state_ptr, "read_data handling is done");
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
});
});
});
@@ -3287,10 +3297,11 @@ void storage_proxy::init_messaging_service() {
tracing::trace(trace_state_ptr, "read_mutation_data: message received from /{}", src_addr.addr);
}
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
auto src_ip = src_addr.addr;
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p] (schema_ptr s) {
return p->query_mutations_locally(std::move(s), cmd, pr);
}).finally([&trace_state_ptr] () mutable {
tracing::trace(trace_state_ptr, "read_mutation_data handling is done");
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_mutation_data handling is done, sending a response to /{}", src_ip);
});
});
});
@@ -3303,10 +3314,11 @@ void storage_proxy::init_messaging_service() {
tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr);
}
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
auto src_ip = src_addr.addr;
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p] (schema_ptr s) {
return p->query_singular_local_digest(std::move(s), cmd, pr);
}).finally([&trace_state_ptr] () mutable {
tracing::trace(trace_state_ptr, "read_digest handling is done");
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip);
});
});
});

View File

@@ -48,24 +48,93 @@
#include <iterator>
#include "sstables.hh"
#include "compaction.hh"
#include "timestamp.hh"
#include "cql3/statements/property_definitions.hh"
static constexpr double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365;
static constexpr int64_t DEFAULT_BASE_TIME_SECONDS = 60;
struct duration_conversor {
// Convert given duration to TargetDuration and return value as timestamp.
template <typename TargetDuration, typename SourceDuration>
static api::timestamp_type convert(SourceDuration d) {
return std::chrono::duration_cast<TargetDuration>(d).count();
}
// Convert given duration to duration that is represented by the string
// target_duration, and return value as timestamp.
template <typename SourceDuration>
static api::timestamp_type convert(const sstring& target_duration, SourceDuration d) {
if (target_duration == "HOURS") {
return convert<std::chrono::hours>(d);
} else if (target_duration == "MICROSECONDS") {
return convert<std::chrono::microseconds>(d);
} else if (target_duration == "MILLISECONDS") {
return convert<std::chrono::milliseconds>(d);
} else if (target_duration == "MINUTES") {
return convert<std::chrono::minutes>(d);
} else if (target_duration == "NANOSECONDS") {
return convert<std::chrono::nanoseconds>(d);
} else if (target_duration == "SECONDS") {
return convert<std::chrono::seconds>(d);
} else {
throw std::runtime_error(sprint("target duration %s is not available", target_duration));
}
}
};
class date_tiered_compaction_strategy_options {
const sstring DEFAULT_TIMESTAMP_RESOLUTION = "MICROSECONDS";
const sstring TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
const sstring MAX_SSTABLE_AGE_KEY = "max_sstable_age_days";
const sstring BASE_TIME_KEY = "base_time_seconds";
api::timestamp_type max_sstable_age;
api::timestamp_type base_time;
public:
date_tiered_compaction_strategy_options(const std::map<sstring, sstring>& options) {
using namespace cql3::statements;
auto tmp_value = get_value(options, TIMESTAMP_RESOLUTION_KEY);
auto target_unit = tmp_value ? tmp_value.value() : DEFAULT_TIMESTAMP_RESOLUTION;
tmp_value = get_value(options, MAX_SSTABLE_AGE_KEY);
auto fractional_days = property_definitions::to_double(MAX_SSTABLE_AGE_KEY, tmp_value, DEFAULT_MAX_SSTABLE_AGE_DAYS);
int64_t max_sstable_age_in_hours = std::lround(fractional_days * 24);
max_sstable_age = duration_conversor::convert(target_unit, std::chrono::hours(max_sstable_age_in_hours));
tmp_value = get_value(options, BASE_TIME_KEY);
auto base_time_seconds = property_definitions::to_long(BASE_TIME_KEY, tmp_value, DEFAULT_BASE_TIME_SECONDS);
base_time = duration_conversor::convert(target_unit, std::chrono::seconds(base_time_seconds));
}
date_tiered_compaction_strategy_options() {
auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24);
max_sstable_age = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::hours(max_sstable_age_in_hours)).count();
base_time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS)).count();
}
private:
static std::experimental::optional<sstring> get_value(const std::map<sstring, sstring>& options, const sstring& name) {
auto it = options.find(name);
if (it == options.end()) {
return std::experimental::nullopt;
}
return it->second;
}
friend class date_tiered_manifest;
};
class date_tiered_manifest {
static logging::logger logger;
// TODO: implement date_tiered_compaction_strategy_options.
db_clock::duration _max_sstable_age;
db_clock::duration _base_time;
date_tiered_compaction_strategy_options _options;
public:
date_tiered_manifest() = delete;
date_tiered_manifest(const std::map<sstring, sstring>& options) {
auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24);
_max_sstable_age = std::chrono::duration_cast<db_clock::duration>(std::chrono::hours(max_sstable_age_in_hours));
_base_time = std::chrono::duration_cast<db_clock::duration>(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS));
date_tiered_manifest(const std::map<sstring, sstring>& options)
: _options(options)
{
// FIXME: implement option to disable tombstone compaction.
#if 0
if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
@@ -119,8 +188,8 @@ public:
for (auto& entry : *cf.get_sstables()) {
sstables.push_back(entry);
}
auto candidates = filter_old_sstables(sstables, _max_sstable_age, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now);
auto candidates = filter_old_sstables(sstables, _options.max_sstable_age, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now);
for (auto& bucket : buckets) {
if (bucket.size() >= size_t(cf.schema()->min_compaction_threshold())) {
@@ -161,11 +230,11 @@ private:
get_compaction_candidates(column_family& cf, std::vector<sstables::shared_sstable> candidate_sstables, int64_t now, int base) {
int min_threshold = cf.schema()->min_compaction_threshold();
int max_threshold = cf.schema()->max_compaction_threshold();
auto candidates = filter_old_sstables(candidate_sstables, _max_sstable_age, now);
auto candidates = filter_old_sstables(candidate_sstables, _options.max_sstable_age, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now);
auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _options.base_time, base, now);
return newest_bucket(buckets, min_threshold, max_threshold, now, _base_time);
return newest_bucket(buckets, min_threshold, max_threshold, now, _options.base_time);
}
/**
@@ -186,12 +255,11 @@ private:
* @return a list of sstables with the oldest sstables excluded
*/
static std::vector<sstables::shared_sstable>
filter_old_sstables(std::vector<sstables::shared_sstable> sstables, db_clock::duration max_sstable_age, int64_t now) {
int64_t max_sstable_age_count = std::chrono::duration_cast<std::chrono::microseconds>(max_sstable_age).count();
if (max_sstable_age_count == 0) {
filter_old_sstables(std::vector<sstables::shared_sstable> sstables, api::timestamp_type max_sstable_age, int64_t now) {
if (max_sstable_age == 0) {
return sstables;
}
int64_t cutoff = now - max_sstable_age_count;
int64_t cutoff = now - max_sstable_age;
sstables.erase(std::remove_if(sstables.begin(), sstables.end(), [cutoff] (auto& sst) {
return sst->get_stats_metadata().max_timestamp < cutoff;
@@ -275,14 +343,14 @@ private:
* Each bucket is also a list of files ordered from newest to oldest.
*/
std::vector<std::vector<sstables::shared_sstable>>
get_buckets(std::vector<std::pair<sstables::shared_sstable,int64_t>>&& files, db_clock::duration time_unit, int base, int64_t now) const {
get_buckets(std::vector<std::pair<sstables::shared_sstable,int64_t>>&& files, api::timestamp_type time_unit, int base, int64_t now) const {
// Sort files by age. Newest first.
std::sort(files.begin(), files.end(), [] (auto& i, auto& j) {
return i.second > j.second;
});
std::vector<std::vector<sstables::shared_sstable>> buckets;
auto target = get_initial_target(now, std::chrono::duration_cast<std::chrono::microseconds>(time_unit).count());
auto target = get_initial_target(now, time_unit);
auto it = files.begin();
while (it != files.end()) {
@@ -329,12 +397,12 @@ private:
*/
std::vector<sstables::shared_sstable>
newest_bucket(std::vector<std::vector<sstables::shared_sstable>>& buckets, int min_threshold, int max_threshold,
int64_t now, db_clock::duration base_time) {
int64_t now, api::timestamp_type base_time) {
// If the "incoming window" has at least minThreshold SSTables, choose that one.
// For any other bucket, at least 2 SSTables is enough.
// In any case, limit to maxThreshold SSTables.
target incoming_window = get_initial_target(now, std::chrono::duration_cast<std::chrono::microseconds>(base_time).count());
target incoming_window = get_initial_target(now, base_time);
for (auto& bucket : buckets) {
auto min_timestamp = bucket.front()->get_stats_metadata().min_timestamp;
if (bucket.size() >= size_t(min_threshold) ||

View File

@@ -555,8 +555,8 @@ public:
return *_pc;
}
bool is_mutation_end() const {
return _is_mutation_end;
bool get_and_reset_is_mutation_end() {
return std::exchange(_is_mutation_end, false);
}
stdx::optional<new_mutation> get_mutation() {
@@ -577,46 +577,72 @@ public:
};
class sstable_streamed_mutation : public streamed_mutation::impl {
const schema& _schema;
data_consume_context& _context;
mp_row_consumer& _consumer;
tombstone _t;
bool _finished = false;
range_tombstone_stream _range_tombstones;
mutation_fragment_opt _current_candidate;
mutation_fragment_opt _next_candidate;
stdx::optional<position_in_partition> _last_position;
position_in_partition::less_compare _cmp;
position_in_partition::equal_compare _eq;
private:
future<mutation_fragment_opt> read_next() {
future<stdx::optional<mutation_fragment_opt>> read_next() {
// Because of #1203 we may encounter sstables with range tombstones
// placed earler than expected.
if (_next_candidate) {
auto mf = _range_tombstones.get_next(*_next_candidate);
if (_next_candidate || (_current_candidate && _finished)) {
assert(_current_candidate);
auto mf = _range_tombstones.get_next(*_current_candidate);
if (!mf) {
mf = move_and_disengage(_next_candidate);
mf = move_and_disengage(_current_candidate);
_current_candidate = move_and_disengage(_next_candidate);
}
return make_ready_future<mutation_fragment_opt>(std::move(mf));
return make_ready_future<stdx::optional<mutation_fragment_opt>>(std::move(mf));
}
if (_finished) {
return make_ready_future<mutation_fragment_opt>(_range_tombstones.get_next());
// No need to update _last_position here. We've already read everything from the sstable.
return make_ready_future<stdx::optional<mutation_fragment_opt>>(_range_tombstones.get_next());
}
return _context.read().then([this] {
if (_consumer.is_mutation_end()) {
_finished = true;
}
_finished = _consumer.get_and_reset_is_mutation_end();
auto mf = _consumer.get_mutation_fragment();
if (mf && mf->is_range_tombstone()) {
_range_tombstones.apply(std::move(mf->as_range_tombstone()));
} else {
_next_candidate = std::move(mf);
if (mf) {
if (mf->is_range_tombstone()) {
// If sstable uses promoted index it will repeat relevant range tombstones in
// each block. Do not emit these duplicates as they will break the guarantee
// that mutation fragment are produced in ascending order.
if (!_last_position || !_cmp(*mf, *_last_position)) {
_last_position = mf->position();
_range_tombstones.apply(std::move(mf->as_range_tombstone()));
}
} else {
// mp_row_consumer may produce mutation_fragments in parts if they are
// interrupted by range tombstone duplicate. Make sure they are merged
// before emitting them.
_last_position = mf->position();
if (!_current_candidate) {
_current_candidate = std::move(mf);
} else if (_current_candidate && _eq(*_current_candidate, *mf)) {
_current_candidate->apply(_schema, std::move(*mf));
} else {
_next_candidate = std::move(mf);
}
}
}
return read_next();
return stdx::optional<mutation_fragment_opt>();
});
}
public:
sstable_streamed_mutation(schema_ptr s, dht::decorated_key dk, data_consume_context& context, mp_row_consumer& consumer, tombstone t)
: streamed_mutation::impl(s, std::move(dk), t), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s) { }
: streamed_mutation::impl(s, std::move(dk), t), _schema(*s), _context(context), _consumer(consumer), _t(t), _range_tombstones(*s), _cmp(*s), _eq(*s) { }
virtual future<> fill_buffer() final override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return read_next().then([this] (mutation_fragment_opt&& mfopt) {
return repeat_until_value([this] {
return read_next();
}).then([this] (mutation_fragment_opt&& mfopt) {
if (!mfopt) {
_end_of_stream = true;
} else {
@@ -800,7 +826,7 @@ private:
return _context->read().then([this] {
auto mut = _consumer.get_mutation();
if (!mut) {
if (_consumer.get_mutation_fragment()) {
if (_consumer.get_mutation_fragment() || _consumer.get_and_reset_is_mutation_end()) {
// We are still in the middle of the previous mutation.
_consumer.skip_partition();
return do_read();

View File

@@ -116,6 +116,16 @@ std::ostream& operator<<(std::ostream& os, const streamed_mutation& sm) {
return os;
}
std::ostream& operator<<(std::ostream& os, mutation_fragment::kind k)
{
switch (k) {
case mutation_fragment::kind::static_row: return os << "static row";
case mutation_fragment::kind::clustering_row: return os << "clustering row";
case mutation_fragment::kind::range_tombstone: return os << "range tombstone";
}
abort();
}
streamed_mutation streamed_mutation_from_mutation(mutation m)
{
class reader final : public streamed_mutation::impl {

View File

@@ -249,6 +249,8 @@ public:
}
};
std::ostream& operator<<(std::ostream&, mutation_fragment::kind);
class position_in_partition {
int _bound_weight = 0;
stdx::optional<clustering_key_prefix> _ck;

View File

@@ -59,7 +59,7 @@ int main(int argc, char** argv) {
auto objects_in_batch = app.configuration()["batch"].as<unsigned>();
return seastar::async([obj_size, obj_count, objects_in_batch] {
std::deque<managed_bytes> refs;
chunked_fifo<managed_bytes> refs;
logalloc::region r;
with_allocator(r.allocator(), [&] {

View File

@@ -110,3 +110,55 @@ mutation_opt_assertions assert_that(streamed_mutation_opt smo) {
return { std::move(mo) };
}
class streamed_mutation_assertions {
streamed_mutation _sm;
clustering_key::equality _ck_eq;
public:
streamed_mutation_assertions(streamed_mutation sm)
: _sm(std::move(sm)), _ck_eq(*_sm.schema()) { }
streamed_mutation_assertions& produces_static_row() {
auto mfopt = _sm().get0();
if (!mfopt) {
BOOST_FAIL("Expected static row, got end of stream");
}
if (mfopt->mutation_fragment_kind() != mutation_fragment::kind::static_row) {
BOOST_FAIL(sprint("Expected static row, got: %s", mfopt->mutation_fragment_kind()));
}
return *this;
}
streamed_mutation_assertions& produces(mutation_fragment::kind k, std::vector<int> ck_elements) {
std::vector<bytes> ck_bytes;
for (auto&& e : ck_elements) {
ck_bytes.emplace_back(int32_type->decompose(e));
}
auto ck = clustering_key_prefix::from_exploded(*_sm.schema(), std::move(ck_bytes));
auto mfopt = _sm().get0();
if (!mfopt) {
BOOST_FAIL(sprint("Expected mutation fragment %s, got end of stream", ck));
}
if (mfopt->mutation_fragment_kind() != k) {
BOOST_FAIL(sprint("Expected mutation fragment kind %s, got: %s", k, mfopt->mutation_fragment_kind()));
}
if (!_ck_eq(mfopt->key(), ck)) {
BOOST_FAIL(sprint("Expected key %s, got: %s", ck, mfopt->key()));
}
return *this;
}
streamed_mutation_assertions& produces_end_of_stream() {
auto mfopt = _sm().get0();
BOOST_REQUIRE(!mfopt);
if (mfopt) {
BOOST_FAIL(sprint("Expected end of stream, got: %s", mfopt->mutation_fragment_kind()));
}
return *this;
}
};
static inline streamed_mutation_assertions assert_that_stream(streamed_mutation sm)
{
return streamed_mutation_assertions(std::move(sm));
}

View File

@@ -112,6 +112,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying) {
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(m)
.produces_end_of_stream();
assert(tracker.uncached_wide_partitions() == 0);
});
}
@@ -140,6 +141,58 @@ SEASTAR_TEST_CASE(test_cache_works_after_clearing) {
});
}
SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_always_for_wide_partition_full_range) {
return seastar::async([] {
auto s = make_schema();
auto m = make_new_mutation(s);
int secondary_calls_count = 0;
cache_tracker tracker;
row_cache cache(s, mutation_source([&secondary_calls_count, &m] (schema_ptr s, const query::partition_range& range) {
++secondary_calls_count;
return make_reader_returning(m);
}), key_source([&m] (auto&&) {
return make_key_from_mutation_reader(make_reader_returning(m));
}), tracker, 0);
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(m)
.produces_end_of_stream();
assert(secondary_calls_count == 2);
assert(tracker.uncached_wide_partitions() == 1);
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(m)
.produces_end_of_stream();
assert(secondary_calls_count == 4);
assert(tracker.uncached_wide_partitions() == 2);
});
}
SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_always_for_wide_partition_single_partition) {
return seastar::async([] {
auto s = make_schema();
auto m = make_new_mutation(s);
int secondary_calls_count = 0;
cache_tracker tracker;
row_cache cache(s, mutation_source([&secondary_calls_count, &m] (schema_ptr s, const query::partition_range& range) {
++secondary_calls_count;
return make_reader_returning(m);
}), key_source([&m] (auto&&) {
return make_key_from_mutation_reader(make_reader_returning(m));
}), tracker, 0);
assert_that(cache.make_reader(s, query::partition_range::make_singular(query::ring_position(m.decorated_key()))))
.produces(m)
.produces_end_of_stream();
assert(secondary_calls_count == 2);
assert(tracker.uncached_wide_partitions() == 1);
assert_that(cache.make_reader(s, query::partition_range::make_singular(query::ring_position(m.decorated_key()))))
.produces(m)
.produces_end_of_stream();
assert(secondary_calls_count == 4);
assert(tracker.uncached_wide_partitions() == 2);
});
}
SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_full_range) {
return seastar::async([] {
auto s = make_schema();

View File

@@ -41,6 +41,7 @@
#include "range.hh"
#include "partition_slice_builder.hh"
#include "sstables/date_tiered_compaction_strategy.hh"
#include "mutation_assertions.hh"
#include <stdio.h>
#include <ftw.h>
@@ -2596,43 +2597,25 @@ SEASTAR_TEST_CASE(test_wrong_range_tombstone_order) {
auto smopt = reader().get0();
BOOST_REQUIRE(smopt);
auto& sm = *smopt;
using kind = mutation_fragment::kind;
auto then_expect = [&] (kind k, std::vector<int> ck_elems) {
std::vector<bytes> ck_bytes;
for (auto&& e : ck_elems) {
ck_bytes.emplace_back(int32_type->decompose(e));
}
auto ck = clustering_key_prefix::from_exploded(*s, std::move(ck_bytes));
auto mfopt = sm().get0();
BOOST_REQUIRE(mfopt);
if (mfopt->mutation_fragment_kind() != k) {
abort();
}
BOOST_REQUIRE(mfopt->mutation_fragment_kind() == k);
BOOST_REQUIRE(ck_eq(mfopt->key(), ck));
};
then_expect(kind::range_tombstone, { 0 });
then_expect(kind::clustering_row, { 1 });
then_expect(kind::clustering_row, { 1, 1 });
then_expect(kind::clustering_row, { 1, 2 });
then_expect(kind::clustering_row, { 1, 2, 3 });
then_expect(kind::range_tombstone, { 1, 3 });
then_expect(kind::clustering_row, { 1, 3 });
then_expect(kind::clustering_row, { 1, 3, 4 });
then_expect(kind::clustering_row, { 1, 4 });
then_expect(kind::clustering_row, { 1, 4, 0 });
then_expect(kind::range_tombstone, { 2 });
then_expect(kind::range_tombstone, { 2, 1 });
then_expect(kind::range_tombstone, { 2, 1 });
then_expect(kind::range_tombstone, { 2, 2 });
then_expect(kind::range_tombstone, { 2, 2 });
auto mfopt = sm().get0();
BOOST_REQUIRE(!mfopt);
assert_that_stream(std::move(*smopt))
.produces(kind::range_tombstone, { 0 })
.produces(kind::clustering_row, { 1 })
.produces(kind::clustering_row, { 1, 1 })
.produces(kind::clustering_row, { 1, 2 })
.produces(kind::clustering_row, { 1, 2, 3 })
.produces(kind::range_tombstone, { 1, 3 })
.produces(kind::clustering_row, { 1, 3 })
.produces(kind::clustering_row, { 1, 3, 4 })
.produces(kind::clustering_row, { 1, 4 })
.produces(kind::clustering_row, { 1, 4, 0 })
.produces(kind::range_tombstone, { 2 })
.produces(kind::range_tombstone, { 2, 1 })
.produces(kind::range_tombstone, { 2, 1 })
.produces(kind::range_tombstone, { 2, 2 })
.produces(kind::range_tombstone, { 2, 2 })
.produces_end_of_stream();
smopt = reader().get0();
BOOST_REQUIRE(!smopt);
@@ -2790,3 +2773,108 @@ SEASTAR_TEST_CASE(basic_date_tiered_strategy_test) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
compaction_manager cm;
column_family::config cfg;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
// deterministic timestamp for Fri, 01 Jan 2016 00:00:00 GMT.
auto tp = db_clock::from_time_t(1451606400);
int64_t timestamp = tp.time_since_epoch().count() * 1000; // in microseconds.
std::vector<sstables::shared_sstable> candidates;
int min_threshold = cf->schema()->min_compaction_threshold();
// add sstables that belong to same time window until min threshold is satisfied.
for (auto i = 1; i <= min_threshold; i++) {
auto sst = add_sstable_for_overlapping_test(cf, /*gen*/i, "a", "a",
build_stats(timestamp, timestamp, std::numeric_limits<int32_t>::max()));
candidates.push_back(sst);
}
// belongs to the time window
auto tp2 = tp + std::chrono::seconds(1800);
timestamp = tp2.time_since_epoch().count() * 1000;
auto sst = add_sstable_for_overlapping_test(cf, /*gen*/min_threshold + 1, "a", "a",
build_stats(timestamp, timestamp, std::numeric_limits<int32_t>::max()));
candidates.push_back(sst);
// doesn't belong to the time window above
auto tp3 = tp + std::chrono::seconds(4000);
timestamp = tp3.time_since_epoch().count() * 1000;
auto sst2 = add_sstable_for_overlapping_test(cf, /*gen*/min_threshold + 2, "a", "a",
build_stats(timestamp, timestamp, std::numeric_limits<int32_t>::max()));
candidates.push_back(sst2);
std::map<sstring, sstring> options;
// Use a 1-hour time window.
options.emplace(sstring("base_time_seconds"), sstring("3600"));
date_tiered_manifest manifest(options);
auto gc_before = gc_clock::time_point(std::chrono::seconds(0)); // disable gc before.
auto sstables = manifest.get_next_sstables(*cf, candidates, gc_before);
std::unordered_set<int64_t> gens;
for (auto sst : sstables) {
gens.insert(sst->generation());
}
BOOST_REQUIRE(sstables.size() == size_t(min_threshold + 1));
BOOST_REQUIRE(gens.count(min_threshold + 1));
BOOST_REQUIRE(!gens.count(min_threshold + 2));
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_promoted_index_read) {
// create table promoted_index_read (
// pk int,
// ck1 int,
// ck2 int,
// v int,
// primary key (pk, ck1, ck2)
// );
//
// column_index_size_in_kb: 0
//
// delete from promoted_index_read where pk = 0 and ck1 = 0;
// insert into promoted_index_read (pk, ck1, ck2, v) values (0, 0, 0, 0);
// insert into promoted_index_read (pk, ck1, ck2, v) values (0, 0, 1, 1);
//
// SSTable:
// [
// {"key": "0",
// "cells": [["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:0:","",1468923308379491],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:0:v","0",1468923308379491],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:1:","",1468923311744298],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:1:v","1",1468923311744298]]}
// ]
return seastar::async([] {
auto s = schema_builder("ks", "promoted_index_read")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck1", int32_type, column_kind::clustering_key)
.with_column("ck2", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build();
auto sst = make_lw_shared<sstable>("ks", "promoted_index_read", "tests/sstables/promoted_index_read", 1, sstables::sstable::version_types::ka, big);
sst->load().get0();
auto rd = sstable_reader(sst, s);
auto smopt = rd().get0();
BOOST_REQUIRE(smopt);
using kind = mutation_fragment::kind;
assert_that_stream(std::move(*smopt))
.produces(kind::range_tombstone, { 0 })
.produces(kind::clustering_row, { 0, 0 })
.produces(kind::clustering_row, { 0, 1 })
.produces_end_of_stream();
});
}

View File

@@ -0,0 +1 @@
1158289805

View File

@@ -0,0 +1,8 @@
Data.db
TOC.txt
Digest.sha1
Filter.db
Summary.db
Statistics.db
Index.db
CompressionInfo.db

View File

@@ -51,6 +51,8 @@
#include <boost/range/adaptor/reversed.hpp>
#include <boost/range/adaptor/indirected.hpp>
#include "query-result-reader.hh"
#include "thrift/server.hh"
#include "db/size_estimates_recorder.hh"
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
@@ -92,8 +94,10 @@ public:
throw make_exception<InvalidRequestException>(ae.what());
} catch (exceptions::configuration_exception& ce) {
throw make_exception<InvalidRequestException>(ce.what());
} catch (no_such_column_family&) {
throw NotFoundException();
} catch (exceptions::invalid_request_exception& ire) {
throw make_exception<InvalidRequestException>(ire.what());
} catch (no_such_column_family& nocf) {
throw make_exception<InvalidRequestException>(nocf.what());
} catch (no_such_keyspace&) {
throw NotFoundException();
} catch (exceptions::syntax_exception& se) {
@@ -130,23 +134,6 @@ with_cob(tcxx::function<void (const T& ret)>&& cob,
});
}
template <typename Func, typename T>
void
with_cob_dereference(tcxx::function<void (const T& ret)>&& cob,
tcxx::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob,
Func&& func) {
using ptr_type = foreign_ptr<lw_shared_ptr<T>>;
// then_wrapped() terminates the fiber by calling one of the cob objects
futurize<ptr_type>::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<ptr_type> f) {
try {
cob(*f.get0());
} catch (...) {
delayed_exception_wrapper dew(std::current_exception());
exn_cob(&dew);
}
});
}
template <typename Func>
void
with_cob(tcxx::function<void ()>&& cob,
@@ -274,7 +261,7 @@ public:
return query::result_view::do_with(*result, [schema, cmd, cell_limit](query::result_view v) {
column_aggregator aggregator(*schema, cmd->slice, cell_limit);
v.consume(cmd->slice, aggregator);
return aggregator.release();
return aggregator.release_as_map();
});
});
});
@@ -300,7 +287,7 @@ public:
return query::result_view::do_with(*result, [schema, cmd, cell_limit](query::result_view v) {
column_counter counter(*schema, cmd->slice, cell_limit);
v.consume(cmd->slice, counter);
return counter.release();
return counter.release_as_map();
});
});
});
@@ -637,7 +624,7 @@ public:
}
void describe_version(tcxx::function<void(std::string const& _return)> cob) {
cob("20.1.0");
cob(org::apache::cassandra::thrift_version);
}
void do_describe_ring(tcxx::function<void(std::vector<TokenRange> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace, bool local) {
@@ -707,13 +694,15 @@ public:
}
void describe_splits(tcxx::function<void(std::vector<std::string> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
// FIXME: Maybe implement.
// Origin's thrift interface has this to say about the verb:
// "experimental API for hadoop/parallel query support. may change violently and without warning.".
// Some drivers have moved away from depending on this verb (SPARKC-94). The correct way to implement
// this, as well as describe_splits_ex, is to use the size_estimates system table (CASSANDRA-7688).
// However, we currently don't populate that table, which is done by SizeEstimatesRecorder.java in Origin.
return pass_unimplemented(exn_cob);
return describe_splits_ex([cob = std::move(cob)](auto&& results) {
std::vector<std::string> res;
res.reserve(results.size() + 1);
res.emplace_back(results[0].start_token);
for (auto&& s : results) {
res.emplace_back(std::move(s.end_token));
}
return cob(std::move(res));
}, exn_cob, cfName, start_token, end_token, keys_per_split);
}
void trace_next_query(tcxx::function<void(std::string const& _return)> cob) {
@@ -723,8 +712,40 @@ public:
}
void describe_splits_ex(tcxx::function<void(std::vector<CfSplit> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
// FIXME: To implement. See describe_splits.
return pass_unimplemented(exn_cob);
with_cob(std::move(cob), std::move(exn_cob), [&]{
auto tstart = start_token.empty() ? dht::minimum_token() : dht::global_partitioner().from_sstring(sstring(start_token));
auto tend = end_token.empty() ? dht::maximum_token() : dht::global_partitioner().from_sstring(sstring(end_token));
return db::get_local_size_estimates_recorder().record_size_estimates()
.then([this, keys_per_split, cf = sstring(cfName), tstart = std::move(tstart), tend = std::move(tend)] {
return db::system_keyspace::query_size_estimates(current_keyspace(), std::move(cf), std::move(tstart), std::move(tend))
.then([keys_per_split](auto&& estimates) {
std::vector<CfSplit> splits;
if (estimates.empty()) {
return splits;
}
auto&& acc = estimates[0];
auto emplace_acc = [&] {
splits.emplace_back();
auto start_token = dht::global_partitioner().to_sstring(acc.range_start_token);
auto end_token = dht::global_partitioner().to_sstring(acc.range_end_token);
splits.back().__set_start_token(bytes_to_string(to_bytes_view(start_token)));
splits.back().__set_end_token(bytes_to_string(to_bytes_view(end_token)));
splits.back().__set_row_count(acc.partitions_count);
};
for (auto&& e : estimates | boost::adaptors::sliced(1, estimates.size())) {
if (acc.partitions_count + e.partitions_count > keys_per_split) {
emplace_acc();
acc = std::move(e);
} else {
acc.range_end_token = std::move(e.range_end_token);
acc.partitions_count += e.partitions_count;
}
}
emplace_acc();
return splits;
});
});
});
}
void system_add_column_family(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const CfDef& cf_def) {
@@ -810,6 +831,9 @@ public:
}
auto s = schema_from_thrift(cf_def, cf_def.keyspace, schema->id());
if (schema->thrift().is_dynamic() && s->regular_columns_count() > 1) {
fail(unimplemented::cause::MIXED_CF);
}
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::ALTER).then([this, s = std::move(s)] {
return service::get_local_migration_manager().announce_column_family_update(std::move(s), true, false).then([this] {
return std::string(_db.local().get_version().to_sstring());
@@ -1041,7 +1065,7 @@ private:
}
cf_def.__set_comment(s->comment());
cf_def.__set_read_repair_chance(s->read_repair_chance());
if (s->regular_columns_count()) {
if (!s->thrift().is_dynamic()) {
std::vector<ColumnDef> columns;
for (auto&& c : s->regular_columns()) {
ColumnDef c_def;
@@ -1104,7 +1128,7 @@ private:
builder.with_column(to_bytes(cf_def.key_alias), std::move(pk_types.back()), column_kind::partition_key);
} else {
for (uint32_t i = 0; i < pk_types.size(); ++i) {
builder.with_column(to_bytes("key" + (i + 1)), std::move(pk_types[i]), column_kind::partition_key);
builder.with_column(to_bytes(sprint("key%d", i + 1)), std::move(pk_types[i]), column_kind::partition_key);
}
}
} else {
@@ -1120,7 +1144,7 @@ private:
auto ck_types = std::move(p.first);
builder.set_is_compound(p.second);
for (uint32_t i = 0; i < ck_types.size(); ++i) {
builder.with_column(to_bytes("column" + (i + 1)), std::move(ck_types[i]), column_kind::clustering_key);
builder.with_column(to_bytes(sprint("column%d", i + 1)), std::move(ck_types[i]), column_kind::clustering_key);
}
auto&& vtype = cf_def.__isset.default_validation_class
? db::marshal::type_parser::parse(to_sstring(cf_def.default_validation_class))
@@ -1205,18 +1229,11 @@ private:
ks_def.durable_writes,
std::move(cf_defs));
}
static column_family& lookup_column_family(database& db, const sstring& ks_name, const sstring& cf_name) {
static schema_ptr lookup_schema(database& db, const sstring& ks_name, const sstring& cf_name) {
if (ks_name.empty()) {
throw make_exception<InvalidRequestException>("keyspace not set");
}
try {
return db.find_column_family(ks_name, cf_name);
} catch (no_such_column_family&) {
throw make_exception<InvalidRequestException>("column family %s not found", cf_name);
}
}
static schema_ptr lookup_schema(database& db, const sstring& ks_name, const sstring& cf_name) {
return lookup_column_family(db, ks_name, cf_name).schema();
return db.find_schema(ks_name, cf_name);
}
static partition_key key_from_thrift(const schema& s, bytes_view k) {
thrift_validation::validate_key(s, k);
@@ -1433,17 +1450,23 @@ private:
const schema& _s;
const query::partition_slice& _slice;
uint32_t _cell_limit;
std::map<std::string, typename Aggregator::type> _aggregation;
std::vector<std::pair<std::string, typename Aggregator::type>> _aggregation;
typename Aggregator::type* _current_aggregation;
public:
column_visitor(const schema& s, const query::partition_slice& slice, uint32_t cell_limit)
: _s(s), _slice(slice), _cell_limit(cell_limit)
{ }
std::map<std::string, typename Aggregator::type>&& release() {
std::vector<std::pair<std::string, typename Aggregator::type>>&& release() {
return std::move(_aggregation);
}
std::map<std::string, typename Aggregator::type> release_as_map() {
return std::map<std::string, typename Aggregator::type>(
boost::make_move_iterator(_aggregation.begin()),
boost::make_move_iterator(_aggregation.end()));
}
void accept_new_partition(const partition_key& key, uint32_t row_count) {
_current_aggregation = &_aggregation[partition_key_to_string(_s, key)];
_aggregation.emplace_back(partition_key_to_string(_s, key), typename Aggregator::type());
_current_aggregation = &_aggregation.back().second;
}
void accept_new_partition(uint32_t row_count) {
// We always ask for the partition_key to be sent in query_opts().
@@ -1658,7 +1681,7 @@ private:
}
add_live_cell(s, col, *def, clustering_key_prefix::make_empty(s), m_to_apply);
} else {
throw make_exception<InvalidRequestException>("No such column %s", col.name);
fail(unimplemented::cause::MIXED_CF);
}
}
}

View File

@@ -73,7 +73,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
"parameters map<text, text>,"
"request text,"
"started_at timestamp,"
"PRIMARY KEY ((session_id)))", KEYSPACE_NAME, SESSIONS);
"PRIMARY KEY ((session_id))) "
"WITH default_time_to_live = 86400", KEYSPACE_NAME, SESSIONS);
_events_create_cql = sprint("CREATE TABLE %s.%s ("
"session_id uuid,"
@@ -82,7 +83,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
"source inet,"
"source_elapsed int,"
"thread text,"
"PRIMARY KEY ((session_id), event_id))", KEYSPACE_NAME, EVENTS);
"PRIMARY KEY ((session_id), event_id)) "
"WITH default_time_to_live = 86400", KEYSPACE_NAME, EVENTS);
}
future<> trace_keyspace_helper::setup_table(const sstring& name, const sstring& cql) const {

View File

@@ -803,7 +803,7 @@ future<response_type> cql_server::connection::process_prepare(uint16_t stream, b
tracing::trace(cs.get_trace_state(), "Done preparing on remote shards");
return _server._query_processor.local().prepare(query, cs, false).then([this, stream, &cs] (auto msg) {
tracing::trace(cs.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
return messages::result_message::prepared::cql::get_id(std::move(msg));
return messages::result_message::prepared::cql::get_id(msg);
}));
return this->make_result(stream, msg);
});

View File

@@ -59,6 +59,7 @@ std::ostream& operator<<(std::ostream& out, cause c) {
case cause::STORAGE_SERVICE: return out << "STORAGE_SERVICE";
case cause::API: return out << "API";
case cause::SCHEMA_CHANGE: return out << "SCHEMA_CHANGE";
case cause::MIXED_CF: return out << "MIXED_CF";
}
assert(0);
}

View File

@@ -55,6 +55,7 @@ enum class cause {
WRAP_AROUND, // Support for handling wrap around ranges in queries on database level and below
STORAGE_SERVICE,
SCHEMA_CHANGE,
MIXED_CF,
};
void fail(cause what) __attribute__((noreturn));

View File

@@ -55,7 +55,7 @@ static thread_local auto reusable_indexes = std::vector<long>();
void bloom_filter::set_indexes(int64_t base, int64_t inc, int count, long max, std::vector<long>& results) {
for (int i = 0; i < count; i++) {
results[i] = abs(base % max);
results[i] = std::abs(base % max);
base = static_cast<int64_t>(static_cast<uint64_t>(base) + static_cast<uint64_t>(inc));
}
}