select count(*) queries' from Michał Sala
This pull request speeds up execution of `count(*)` queries. It does so by splitting given query into sub-queries and distributing them across some group of nodes for parallel execution.
New level of coordination was added. Node called super-coordinator splits aggregation query into sub-queries and distributes them across some group of coordinators. Super-coordinator is also responsible for merging results.
To develop a mechanism for speeding up `count(*)` queries, there was a need to detect which queries have a `count(*)` selector. Due to this pull request being a proof of concept, detection was realized rather poorly. It is only allows catching the simplest cases of `count(*)` queries (with only one selector and no column name specified).
After detecting that a query is a `count(*)` it should be split into sub-queries and sent to another coordinators. Splitting part wasn't that difficult, it has been achieved by limiting original query's partition ranges. Sending modified query to another node was much harder. The easiest scenario would be to send whole `cql3::statements::select_statement`. Unfortunately `cql3::statements::select_statement` can't be [de]serialized, so sending it was out of the question. Even more unfortunately, some non-[de]serializable members of `cql3::statements::select_statement` are required to start the execution process of this statement. Finally, I have decided to send a `query::read_command` paired with required [de]serializable members. Objects, that cannot be [de]serialized (such as query's selector) are mocked on the receiving end.
When a super-coordinator receives a `count(*)` query, it splits it into sub-queries. It does so, by splitting original query's partition ranges into list of vnodes, grouping them by their owner and creating sub-queries with partition ranges set to successive results of such grouping. After creation, each sub-query is sent to the owner of its partition ranges. Owner dispatches received sub-query to all of its shards. Shards slice partition ranges of the received sub-query, so that they will only query data that is owned by them. Each shard becomes a coordinator and executes so prepared sub-query.
3 node cluster set up on powerful desktops located in the office (3x32 cores)
Filled the cluster with ~2 * 10^8 rows using scylla-bench and run:
```
time cqlsh <ip> <port> --request-timeout=3600 -e "select count(*) from scylla_bench.test using timeout 1h;"
```
* master: 68s
* this branch: 2s
3 node cluster (each node had 2 shards, `murmur3_ignore_msb_bits` was set to 1, `num_tokens` was set to 3)
```
> cqlsh -e 'tracing on; select count(*) from ks.t;
Now Tracing is enabled
count
-------
1000
(1 rows)
Tracing session: e5852020-7fc3-11ec-8600-4c4c210dd657
activity | timestamp | source | source_elapsed | client
---------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
Execute CQL3 query | 2022-01-27 22:53:08.770000 | 127.0.0.1 | 0 | 127.0.0.1
Parsing a statement [shard 1] | 2022-01-27 22:53:08.770451 | 127.0.0.1 | -- | 127.0.0.1
Processing a statement [shard 1] | 2022-01-27 22:53:08.770487 | 127.0.0.1 | 36 | 127.0.0.1
Dispatching forward_request to 3 endpoints [shard 1] | 2022-01-27 22:53:08.770509 | 127.0.0.1 | 58 | 127.0.0.1
Sending forward_request to 127.0.0.1:0 [shard 1] | 2022-01-27 22:53:08.770516 | 127.0.0.1 | 64 | 127.0.0.1
Executing forward_request [shard 1] | 2022-01-27 22:53:08.770519 | 127.0.0.1 | -- | 127.0.0.1
read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770528 | 127.0.0.1 | 9 | 127.0.0.1
Start querying token range ({-4242912715832118944, end}, {-4075408479358018994, end}] [shard 1] | 2022-01-27 22:53:08.770531 | 127.0.0.1 | 12 | 127.0.0.1
Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770537 | 127.0.0.1 | 18 | 127.0.0.1
Scanning cache for range ({-4242912715832118944, end}, {-4075408479358018994, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770541 | 127.0.0.1 | 22 | 127.0.0.1
Page stats: 12 partition(s), 0 static row(s) (0 live, 0 dead), 12 clustering row(s) (12 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770589 | 127.0.0.1 | 70 | 127.0.0.1
Sending forward_request to 127.0.0.2:0 [shard 1] | 2022-01-27 22:53:08.770600 | 127.0.0.1 | 149 | 127.0.0.1
Sending forward_request to 127.0.0.3:0 [shard 1] | 2022-01-27 22:53:08.770608 | 127.0.0.1 | 157 | 127.0.0.1
Executing forward_request [shard 0] | 2022-01-27 22:53:08.770627 | 127.0.0.1 | -- | 127.0.0.1
read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770639 | 127.0.0.1 | 11 | 127.0.0.1
Start querying token range ({2507462623645193091, end}, {3897266736829642805, end}] [shard 0] | 2022-01-27 22:53:08.770643 | 127.0.0.1 | 15 | 127.0.0.1
Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770646 | 127.0.0.1 | 19 | 127.0.0.1
Scanning cache for range ({2507462623645193091, end}, {3897266736829642805, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770649 | 127.0.0.1 | 22 | 127.0.0.1
Executing forward_request [shard 1] | 2022-01-27 22:53:08.770658 | 127.0.0.2 | -- | 127.0.0.1
Executing forward_request [shard 1] | 2022-01-27 22:53:08.770674 | 127.0.0.3 | 5 | 127.0.0.1
read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770698 | 127.0.0.2 | 40 | 127.0.0.1
Start querying token range [{4611686018427387904, start}, {5592106830937975806, end}] [shard 1] | 2022-01-27 22:53:08.770704 | 127.0.0.2 | 46 | 127.0.0.1
Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770710 | 127.0.0.2 | 52 | 127.0.0.1
read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770712 | 127.0.0.3 | 43 | 127.0.0.1
Scanning cache for range [{4611686018427387904, start}, {5592106830937975806, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770714 | 127.0.0.2 | 56 | 127.0.0.1
Start querying token range [{-4611686018427387904, start}, {-4242912715832118944, end}] [shard 1] | 2022-01-27 22:53:08.770718 | 127.0.0.3 | 49 | 127.0.0.1
Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770739 | 127.0.0.3 | 70 | 127.0.0.1
Scanning cache for range [{-4611686018427387904, start}, {-4242912715832118944, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770743 | 127.0.0.3 | 73 | 127.0.0.1
Page stats: 17 partition(s), 0 static row(s) (0 live, 0 dead), 17 clustering row(s) (17 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770814 | 127.0.0.3 | 145 | 127.0.0.1
Executing forward_request [shard 0] | 2022-01-27 22:53:08.770846 | 127.0.0.3 | -- | 127.0.0.1
read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770862 | 127.0.0.3 | 16 | 127.0.0.1
Page stats: 71 partition(s), 0 static row(s) (0 live, 0 dead), 71 clustering row(s) (71 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.770865 | 127.0.0.1 | 238 | 127.0.0.1
Start querying token range ({-6683686776653114062, end}, {-6473446911791631266, end}] [shard 0] | 2022-01-27 22:53:08.770867 | 127.0.0.3 | 21 | 127.0.0.1
Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770874 | 127.0.0.3 | 28 | 127.0.0.1
Scanning cache for range ({-6683686776653114062, end}, {-6473446911791631266, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770879 | 127.0.0.3 | 33 | 127.0.0.1
Page stats: 48 partition(s), 0 static row(s) (0 live, 0 dead), 48 clustering row(s) (48 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770880 | 127.0.0.2 | 222 | 127.0.0.1
Querying is done [shard 1] | 2022-01-27 22:53:08.770888 | 127.0.0.1 | 369 | 127.0.0.1
read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770909 | 127.0.0.1 | 390 | 127.0.0.1
Start querying token range ({-4075408479358018994, end}, {-3391415989210253693, end}] [shard 1] | 2022-01-27 22:53:08.770911 | 127.0.0.1 | 392 | 127.0.0.1
Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770914 | 127.0.0.1 | 395 | 127.0.0.1
Scanning cache for range ({-4075408479358018994, end}, {-3391415989210253693, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770936 | 127.0.0.1 | 418 | 127.0.0.1
Executing forward_request [shard 0] | 2022-01-27 22:53:08.770951 | 127.0.0.2 | -- | 127.0.0.1
read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770966 | 127.0.0.2 | 15 | 127.0.0.1
Page stats: 12 partition(s), 0 static row(s) (0 live, 0 dead), 12 clustering row(s) (12 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.770969 | 127.0.0.3 | 123 | 127.0.0.1
Start querying token range (-inf, {-6683686776653114062, end}] [shard 0] | 2022-01-27 22:53:08.770969 | 127.0.0.2 | 18 | 127.0.0.1
Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770974 | 127.0.0.2 | 23 | 127.0.0.1
Scanning cache for range (-inf, {-6683686776653114062, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770977 | 127.0.0.2 | 26 | 127.0.0.1
Querying is done [shard 1] | 2022-01-27 22:53:08.770993 | 127.0.0.3 | 324 | 127.0.0.1
read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770998 | 127.0.0.3 | 329 | 127.0.0.1
Start querying token range ({-3391415989210253693, end}, {0, start}) [shard 1] | 2022-01-27 22:53:08.771001 | 127.0.0.3 | 332 | 127.0.0.1
Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.771004 | 127.0.0.3 | 335 | 127.0.0.1
Scanning cache for range ({-3391415989210253693, end}, {0, start}) and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.771007 | 127.0.0.3 | 338 | 127.0.0.1
Page stats: 48 partition(s), 0 static row(s) (0 live, 0 dead), 48 clustering row(s) (48 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.771044 | 127.0.0.1 | 525 | 127.0.0.1
Querying is done [shard 0] | 2022-01-27 22:53:08.771069 | 127.0.0.1 | 442 | 127.0.0.1
On shard execution result is [71] [shard 0] | 2022-01-27 22:53:08.771145 | 127.0.0.1 | 518 | 127.0.0.1
Querying is done [shard 1] | 2022-01-27 22:53:08.771308 | 127.0.0.1 | 789 | 127.0.0.1
On shard execution result is [60] [shard 1] | 2022-01-27 22:53:08.771351 | 127.0.0.1 | 832 | 127.0.0.1
Page stats: 127 partition(s), 0 static row(s) (0 live, 0 dead), 127 clustering row(s) (127 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.771379 | 127.0.0.2 | 427 | 127.0.0.1
Page stats: 183 partition(s), 0 static row(s) (0 live, 0 dead), 183 clustering row(s) (183 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.771385 | 127.0.0.3 | 716 | 127.0.0.1
Querying is done [shard 0] | 2022-01-27 22:53:08.771402 | 127.0.0.3 | 556 | 127.0.0.1
Querying is done [shard 1] | 2022-01-27 22:53:08.771403 | 127.0.0.2 | 745 | 127.0.0.1
read_data: querying locally [shard 1] | 2022-01-27 22:53:08.771408 | 127.0.0.2 | 750 | 127.0.0.1
read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771409 | 127.0.0.3 | 563 | 127.0.0.1
Start querying token range ({5592106830937975806, end}, +inf) [shard 1] | 2022-01-27 22:53:08.771411 | 127.0.0.2 | 754 | 127.0.0.1
Start querying token range ({-6272011798787969456, end}, {-4611686018427387904, start}) [shard 0] | 2022-01-27 22:53:08.771412 | 127.0.0.3 | 566 | 127.0.0.1
Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771415 | 127.0.0.3 | 569 | 127.0.0.1
Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.771415 | 127.0.0.2 | 757 | 127.0.0.1
Scanning cache for range ({5592106830937975806, end}, +inf) and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.771419 | 127.0.0.2 | 761 | 127.0.0.1
Scanning cache for range ({-6272011798787969456, end}, {-4611686018427387904, start}) and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771419 | 127.0.0.3 | 573 | 127.0.0.1
Received forward_result=[131] from 127.0.0.1:0 [shard 1] | 2022-01-27 22:53:08.771454 | 127.0.0.1 | 1003 | 127.0.0.1
Page stats: 74 partition(s), 0 static row(s) (0 live, 0 dead), 74 clustering row(s) (74 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.771764 | 127.0.0.3 | 918 | 127.0.0.1
read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771768 | 127.0.0.3 | 922 | 127.0.0.1
Start querying token range [{0, start}, {2507462623645193091, end}] [shard 0] | 2022-01-27 22:53:08.771771 | 127.0.0.3 | 925 | 127.0.0.1
Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771775 | 127.0.0.3 | 929 | 127.0.0.1
Scanning cache for range [{0, start}, {2507462623645193091, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771779 | 127.0.0.3 | 933 | 127.0.0.1
Querying is done [shard 1] | 2022-01-27 22:53:08.771935 | 127.0.0.3 | 1265 | 127.0.0.1
Querying is done [shard 0] | 2022-01-27 22:53:08.771950 | 127.0.0.2 | 998 | 127.0.0.1
read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771956 | 127.0.0.2 | 1004 | 127.0.0.1
Start querying token range ({-6473446911791631266, end}, {-6272011798787969456, end}] [shard 0] | 2022-01-27 22:53:08.771959 | 127.0.0.2 | 1008 | 127.0.0.1
Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771963 | 127.0.0.2 | 1011 | 127.0.0.1
Scanning cache for range ({-6473446911791631266, end}, {-6272011798787969456, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771966 | 127.0.0.2 | 1014 | 127.0.0.1
Page stats: 13 partition(s), 0 static row(s) (0 live, 0 dead), 13 clustering row(s) (13 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772008 | 127.0.0.2 | 1057 | 127.0.0.1
read_data: querying locally [shard 0] | 2022-01-27 22:53:08.772012 | 127.0.0.2 | 1061 | 127.0.0.1
Start querying token range ({3897266736829642805, end}, {4611686018427387904, start}) [shard 0] | 2022-01-27 22:53:08.772014 | 127.0.0.2 | 1063 | 127.0.0.1
Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.772016 | 127.0.0.2 | 1065 | 127.0.0.1
Scanning cache for range ({3897266736829642805, end}, {4611686018427387904, start}) and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.772019 | 127.0.0.2 | 1067 | 127.0.0.1
On shard execution result is [200] [shard 1] | 2022-01-27 22:53:08.772053 | 127.0.0.3 | 1384 | 127.0.0.1
Page stats: 56 partition(s), 0 static row(s) (0 live, 0 dead), 56 clustering row(s) (56 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772138 | 127.0.0.2 | 1186 | 127.0.0.1
Page stats: 190 partition(s), 0 static row(s) (0 live, 0 dead), 190 clustering row(s) (190 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.772364 | 127.0.0.2 | 1706 | 127.0.0.1
Page stats: 149 partition(s), 0 static row(s) (0 live, 0 dead), 149 clustering row(s) (149 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772407 | 127.0.0.3 | 1561 | 127.0.0.1
Querying is done [shard 0] | 2022-01-27 22:53:08.772417 | 127.0.0.3 | 1571 | 127.0.0.1
Querying is done [shard 1] | 2022-01-27 22:53:08.772418 | 127.0.0.2 | 1760 | 127.0.0.1
Querying is done [shard 0] | 2022-01-27 22:53:08.772426 | 127.0.0.2 | 1475 | 127.0.0.1
Querying is done [shard 0] | 2022-01-27 22:53:08.772428 | 127.0.0.2 | 1476 | 127.0.0.1
Querying is done [shard 0] | 2022-01-27 22:53:08.772449 | 127.0.0.3 | 1604 | 127.0.0.1
On shard execution result is [196] [shard 0] | 2022-01-27 22:53:08.772555 | 127.0.0.2 | 1603 | 127.0.0.1
On shard execution result is [238] [shard 1] | 2022-01-27 22:53:08.772674 | 127.0.0.2 | 2016 | 127.0.0.1
On shard execution result is [235] [shard 0] | 2022-01-27 22:53:08.772770 | 127.0.0.3 | 1924 | 127.0.0.1
Received forward_result=[435] from 127.0.0.3:0 [shard 1] | 2022-01-27 22:53:08.772933 | 127.0.0.1 | 2482 | 127.0.0.1
Received forward_result=[434] from 127.0.0.2:0 [shard 1] | 2022-01-27 22:53:08.773110 | 127.0.0.1 | 2658 | 127.0.0.1
Merged result is [1000] [shard 1] | 2022-01-27 22:53:08.773111 | 127.0.0.1 | 2660 | 127.0.0.1
Done processing - preparing a result [shard 1] | 2022-01-27 22:53:08.773114 | 127.0.0.1 | 2663 | 127.0.0.1
Request complete | 2022-01-27 22:53:08.772666 | 127.0.0.1 | 2666 | 127.0.0.1
```
Fixes #1385
Closes #9209
* github.com:scylladb/scylla:
docs: add parallel aggregations design doc
db: config: add a flag to disable new parallelized aggregation algorithm
test: add parallelized select count test
forward_service: add metrics
forward_service: parallelize execution across shards
forward_service: add tracing
cql3: statements: introduce parallelized_select_statement
cql3: query_processor: add forward_service reference to query_processor
gms: add PARALLELIZED_AGGREGATION feature
service: introduce forward_service
storage_proxy: extract query_ranges_to_vnodes_generator to a separate file
messaging_service: add verb for count(*) request forwarding
cql3: selection: detect if a selection represents count(*)
Scylla
What is Scylla?
Scylla is the real-time big data database that is API-compatible with Apache Cassandra and Amazon DynamoDB. Scylla embraces a shared-nothing approach that increases throughput and storage capacity to realize order-of-magnitude performance improvements and reduce hardware costs.
For more information, please see the ScyllaDB web site.
Build Prerequisites
Scylla is fairly fussy about its build environment, requiring very recent versions of the C++20 compiler and of many libraries to build. The document HACKING.md includes detailed information on building and developing Scylla, but to get Scylla building quickly on (almost) any build machine, Scylla offers a frozen toolchain, This is a pre-configured Docker image which includes recent versions of all the required compilers, libraries and build tools. Using the frozen toolchain allows you to avoid changing anything in your build machine to meet Scylla's requirements - you just need to meet the frozen toolchain's prerequisites (mostly, Docker or Podman being available).
Building Scylla
Building Scylla with the frozen toolchain dbuild is as easy as:
$ git submodule update --init --force --recursive
$ ./tools/toolchain/dbuild ./configure.py
$ ./tools/toolchain/dbuild ninja build/release/scylla
For further information, please see:
- Developer documentation for more information on building Scylla.
- Build documentation on how to build Scylla binaries, tests, and packages.
- Docker image build documentation for information on how to build Docker images.
Running Scylla
To start Scylla server, run:
$ ./tools/toolchain/dbuild ./build/release/scylla --workdir tmp --smp 1 --developer-mode 1
This will start a Scylla node with one CPU core allocated to it and data files stored in the tmp directory.
The --developer-mode is needed to disable the various checks Scylla performs at startup to ensure the machine is configured for maximum performance (not relevant on development workstations).
Please note that you need to run Scylla with dbuild if you built it with the frozen toolchain.
For more run options, run:
$ ./tools/toolchain/dbuild ./build/release/scylla --help
Testing
See test.py manual.
Scylla APIs and compatibility
By default, Scylla is compatible with Apache Cassandra and its APIs - CQL and Thrift. There is also support for the API of Amazon DynamoDB™, which needs to be enabled and configured in order to be used. For more information on how to enable the DynamoDB™ API in Scylla, and the current compatibility of this feature as well as Scylla-specific extensions, see Alternator and Getting started with Alternator.
Documentation
Documentation can be found here. Seastar documentation can be found here. User documentation can be found here.
Training
Training material and online courses can be found at Scylla University. The courses are free, self-paced and include hands-on examples. They cover a variety of topics including Scylla data modeling, administration, architecture, basic NoSQL concepts, using drivers for application development, Scylla setup, failover, compactions, multi-datacenters and how Scylla integrates with third-party applications.
Contributing to Scylla
If you want to report a bug or submit a pull request or a patch, please read the contribution guidelines.
If you are a developer working on Scylla, please read the developer guidelines.
Contact
- The users mailing list and Slack channel are for users to discuss configuration, management, and operations of the ScyllaDB open source.
- The developers mailing list is for developers and people interested in following the development of ScyllaDB to discuss technical topics.