Files
scylladb/db
Avi Kivity fe65122ccd Merge 'Distribute select count(*) queries' from Michał Sala
This pull request speeds up execution of `count(*)` queries. It does so by splitting given query into sub-queries and distributing them across some group of nodes for parallel execution.

New level of coordination was added. Node called super-coordinator splits aggregation query into sub-queries and distributes them across some group of coordinators. Super-coordinator is also responsible for merging results.

To develop a mechanism for speeding up `count(*)` queries, there was a need to detect which queries have a `count(*)` selector. Due to this pull request being a proof of concept, detection was realized rather poorly. It is only allows catching the simplest cases of `count(*)` queries (with only one selector and no column name specified).

After detecting that a query is a `count(*)` it should be split into sub-queries and sent to another coordinators. Splitting part wasn't that difficult, it has been achieved by limiting original query's partition ranges. Sending modified query to another node was much harder. The easiest scenario would be to send whole `cql3::statements::select_statement`. Unfortunately `cql3::statements::select_statement` can't be [de]serialized, so sending it was out of the question. Even more unfortunately, some non-[de]serializable members of `cql3::statements::select_statement` are required to start the execution process of this statement. Finally, I have decided to send a `query::read_command` paired with required [de]serializable members. Objects, that cannot be [de]serialized (such as query's selector) are mocked on the receiving end.

When a super-coordinator receives a `count(*)` query, it splits it into sub-queries. It does so, by splitting original query's partition ranges into list of vnodes, grouping them by their owner and creating sub-queries with partition ranges set to successive results of such grouping. After creation, each sub-query is sent to the owner of its partition ranges. Owner dispatches received sub-query to all of its shards. Shards slice partition ranges of the received sub-query, so that they will only query data that is owned by them. Each shard becomes a coordinator and executes so prepared sub-query.

3 node cluster set up on powerful desktops located in the office (3x32 cores)
Filled the cluster with ~2 * 10^8 rows using scylla-bench and run:
```
time cqlsh <ip> <port> --request-timeout=3600 -e "select count(*) from scylla_bench.test using timeout 1h;"
```

* master: 68s
* this branch: 2s

3 node cluster (each node had 2 shards, `murmur3_ignore_msb_bits` was set to 1, `num_tokens` was set to 3)

```
>  cqlsh -e 'tracing on; select count(*) from ks.t;
Now Tracing is enabled

 count
-------
  1000

(1 rows)

Tracing session: e5852020-7fc3-11ec-8600-4c4c210dd657

 activity                                                                                                                                    | timestamp                  | source    | source_elapsed | client
---------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
                                                                                                                          Execute CQL3 query | 2022-01-27 22:53:08.770000 | 127.0.0.1 |              0 | 127.0.0.1
                                                                                                               Parsing a statement [shard 1] | 2022-01-27 22:53:08.770451 | 127.0.0.1 |             -- | 127.0.0.1
                                                                                                            Processing a statement [shard 1] | 2022-01-27 22:53:08.770487 | 127.0.0.1 |             36 | 127.0.0.1
                                                                                        Dispatching forward_request to 3 endpoints [shard 1] | 2022-01-27 22:53:08.770509 | 127.0.0.1 |             58 | 127.0.0.1
                                                                                            Sending forward_request to 127.0.0.1:0 [shard 1] | 2022-01-27 22:53:08.770516 | 127.0.0.1 |             64 | 127.0.0.1
                                                                                                         Executing forward_request [shard 1] | 2022-01-27 22:53:08.770519 | 127.0.0.1 |             -- | 127.0.0.1
                                                                                                       read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770528 | 127.0.0.1 |              9 | 127.0.0.1
                                             Start querying token range ({-4242912715832118944, end}, {-4075408479358018994, end}] [shard 1] | 2022-01-27 22:53:08.770531 | 127.0.0.1 |             12 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770537 | 127.0.0.1 |             18 | 127.0.0.1
                      Scanning cache for range ({-4242912715832118944, end}, {-4075408479358018994, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770541 | 127.0.0.1 |             22 | 127.0.0.1
    Page stats: 12 partition(s), 0 static row(s) (0 live, 0 dead), 12 clustering row(s) (12 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770589 | 127.0.0.1 |             70 | 127.0.0.1
                                                                                            Sending forward_request to 127.0.0.2:0 [shard 1] | 2022-01-27 22:53:08.770600 | 127.0.0.1 |            149 | 127.0.0.1
                                                                                            Sending forward_request to 127.0.0.3:0 [shard 1] | 2022-01-27 22:53:08.770608 | 127.0.0.1 |            157 | 127.0.0.1
                                                                                                         Executing forward_request [shard 0] | 2022-01-27 22:53:08.770627 | 127.0.0.1 |             -- | 127.0.0.1
                                                                                                       read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770639 | 127.0.0.1 |             11 | 127.0.0.1
                                               Start querying token range ({2507462623645193091, end}, {3897266736829642805, end}] [shard 0] | 2022-01-27 22:53:08.770643 | 127.0.0.1 |             15 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770646 | 127.0.0.1 |             19 | 127.0.0.1
                        Scanning cache for range ({2507462623645193091, end}, {3897266736829642805, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770649 | 127.0.0.1 |             22 | 127.0.0.1
                                                                                                         Executing forward_request [shard 1] | 2022-01-27 22:53:08.770658 | 127.0.0.2 |             -- | 127.0.0.1
                                                                                                         Executing forward_request [shard 1] | 2022-01-27 22:53:08.770674 | 127.0.0.3 |              5 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770698 | 127.0.0.2 |             40 | 127.0.0.1
                                             Start querying token range [{4611686018427387904, start}, {5592106830937975806, end}] [shard 1] | 2022-01-27 22:53:08.770704 | 127.0.0.2 |             46 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770710 | 127.0.0.2 |             52 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770712 | 127.0.0.3 |             43 | 127.0.0.1
                      Scanning cache for range [{4611686018427387904, start}, {5592106830937975806, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770714 | 127.0.0.2 |             56 | 127.0.0.1
                                           Start querying token range [{-4611686018427387904, start}, {-4242912715832118944, end}] [shard 1] | 2022-01-27 22:53:08.770718 | 127.0.0.3 |             49 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770739 | 127.0.0.3 |             70 | 127.0.0.1
                    Scanning cache for range [{-4611686018427387904, start}, {-4242912715832118944, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770743 | 127.0.0.3 |             73 | 127.0.0.1
    Page stats: 17 partition(s), 0 static row(s) (0 live, 0 dead), 17 clustering row(s) (17 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770814 | 127.0.0.3 |            145 | 127.0.0.1
                                                                                                         Executing forward_request [shard 0] | 2022-01-27 22:53:08.770846 | 127.0.0.3 |             -- | 127.0.0.1
                                                                                                       read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770862 | 127.0.0.3 |             16 | 127.0.0.1
    Page stats: 71 partition(s), 0 static row(s) (0 live, 0 dead), 71 clustering row(s) (71 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.770865 | 127.0.0.1 |            238 | 127.0.0.1
                                             Start querying token range ({-6683686776653114062, end}, {-6473446911791631266, end}] [shard 0] | 2022-01-27 22:53:08.770867 | 127.0.0.3 |             21 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770874 | 127.0.0.3 |             28 | 127.0.0.1
                      Scanning cache for range ({-6683686776653114062, end}, {-6473446911791631266, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770879 | 127.0.0.3 |             33 | 127.0.0.1
    Page stats: 48 partition(s), 0 static row(s) (0 live, 0 dead), 48 clustering row(s) (48 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770880 | 127.0.0.2 |            222 | 127.0.0.1
                                                                                                                  Querying is done [shard 1] | 2022-01-27 22:53:08.770888 | 127.0.0.1 |            369 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770909 | 127.0.0.1 |            390 | 127.0.0.1
                                             Start querying token range ({-4075408479358018994, end}, {-3391415989210253693, end}] [shard 1] | 2022-01-27 22:53:08.770911 | 127.0.0.1 |            392 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770914 | 127.0.0.1 |            395 | 127.0.0.1
                      Scanning cache for range ({-4075408479358018994, end}, {-3391415989210253693, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770936 | 127.0.0.1 |            418 | 127.0.0.1
                                                                                                         Executing forward_request [shard 0] | 2022-01-27 22:53:08.770951 | 127.0.0.2 |             -- | 127.0.0.1
                                                                                                       read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770966 | 127.0.0.2 |             15 | 127.0.0.1
    Page stats: 12 partition(s), 0 static row(s) (0 live, 0 dead), 12 clustering row(s) (12 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.770969 | 127.0.0.3 |            123 | 127.0.0.1
                                                                    Start querying token range (-inf, {-6683686776653114062, end}] [shard 0] | 2022-01-27 22:53:08.770969 | 127.0.0.2 |             18 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770974 | 127.0.0.2 |             23 | 127.0.0.1
                                             Scanning cache for range (-inf, {-6683686776653114062, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770977 | 127.0.0.2 |             26 | 127.0.0.1
                                                                                                                  Querying is done [shard 1] | 2022-01-27 22:53:08.770993 | 127.0.0.3 |            324 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770998 | 127.0.0.3 |            329 | 127.0.0.1
                                                              Start querying token range ({-3391415989210253693, end}, {0, start}) [shard 1] | 2022-01-27 22:53:08.771001 | 127.0.0.3 |            332 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.771004 | 127.0.0.3 |            335 | 127.0.0.1
                                       Scanning cache for range ({-3391415989210253693, end}, {0, start}) and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.771007 | 127.0.0.3 |            338 | 127.0.0.1
    Page stats: 48 partition(s), 0 static row(s) (0 live, 0 dead), 48 clustering row(s) (48 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.771044 | 127.0.0.1 |            525 | 127.0.0.1
                                                                                                                  Querying is done [shard 0] | 2022-01-27 22:53:08.771069 | 127.0.0.1 |            442 | 127.0.0.1
                                                                                                 On shard execution result is [71] [shard 0] | 2022-01-27 22:53:08.771145 | 127.0.0.1 |            518 | 127.0.0.1
                                                                                                                  Querying is done [shard 1] | 2022-01-27 22:53:08.771308 | 127.0.0.1 |            789 | 127.0.0.1
                                                                                                 On shard execution result is [60] [shard 1] | 2022-01-27 22:53:08.771351 | 127.0.0.1 |            832 | 127.0.0.1
 Page stats: 127 partition(s), 0 static row(s) (0 live, 0 dead), 127 clustering row(s) (127 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.771379 | 127.0.0.2 |            427 | 127.0.0.1
 Page stats: 183 partition(s), 0 static row(s) (0 live, 0 dead), 183 clustering row(s) (183 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.771385 | 127.0.0.3 |            716 | 127.0.0.1
                                                                                                                  Querying is done [shard 0] | 2022-01-27 22:53:08.771402 | 127.0.0.3 |            556 | 127.0.0.1
                                                                                                                  Querying is done [shard 1] | 2022-01-27 22:53:08.771403 | 127.0.0.2 |            745 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 1] | 2022-01-27 22:53:08.771408 | 127.0.0.2 |            750 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771409 | 127.0.0.3 |            563 | 127.0.0.1
                                                                     Start querying token range ({5592106830937975806, end}, +inf) [shard 1] | 2022-01-27 22:53:08.771411 | 127.0.0.2 |            754 | 127.0.0.1
                                           Start querying token range ({-6272011798787969456, end}, {-4611686018427387904, start}) [shard 0] | 2022-01-27 22:53:08.771412 | 127.0.0.3 |            566 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771415 | 127.0.0.3 |            569 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.771415 | 127.0.0.2 |            757 | 127.0.0.1
                                              Scanning cache for range ({5592106830937975806, end}, +inf) and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.771419 | 127.0.0.2 |            761 | 127.0.0.1
                    Scanning cache for range ({-6272011798787969456, end}, {-4611686018427387904, start}) and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771419 | 127.0.0.3 |            573 | 127.0.0.1
                                                                                    Received forward_result=[131] from 127.0.0.1:0 [shard 1] | 2022-01-27 22:53:08.771454 | 127.0.0.1 |           1003 | 127.0.0.1
    Page stats: 74 partition(s), 0 static row(s) (0 live, 0 dead), 74 clustering row(s) (74 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.771764 | 127.0.0.3 |            918 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771768 | 127.0.0.3 |            922 | 127.0.0.1
                                                               Start querying token range [{0, start}, {2507462623645193091, end}] [shard 0] | 2022-01-27 22:53:08.771771 | 127.0.0.3 |            925 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771775 | 127.0.0.3 |            929 | 127.0.0.1
                                        Scanning cache for range [{0, start}, {2507462623645193091, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771779 | 127.0.0.3 |            933 | 127.0.0.1
                                                                                                                  Querying is done [shard 1] | 2022-01-27 22:53:08.771935 | 127.0.0.3 |           1265 | 127.0.0.1
                                                                                                                  Querying is done [shard 0] | 2022-01-27 22:53:08.771950 | 127.0.0.2 |            998 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771956 | 127.0.0.2 |           1004 | 127.0.0.1
                                             Start querying token range ({-6473446911791631266, end}, {-6272011798787969456, end}] [shard 0] | 2022-01-27 22:53:08.771959 | 127.0.0.2 |           1008 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771963 | 127.0.0.2 |           1011 | 127.0.0.1
                      Scanning cache for range ({-6473446911791631266, end}, {-6272011798787969456, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771966 | 127.0.0.2 |           1014 | 127.0.0.1
    Page stats: 13 partition(s), 0 static row(s) (0 live, 0 dead), 13 clustering row(s) (13 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772008 | 127.0.0.2 |           1057 | 127.0.0.1
                                                                                                       read_data: querying locally [shard 0] | 2022-01-27 22:53:08.772012 | 127.0.0.2 |           1061 | 127.0.0.1
                                             Start querying token range ({3897266736829642805, end}, {4611686018427387904, start}) [shard 0] | 2022-01-27 22:53:08.772014 | 127.0.0.2 |           1063 | 127.0.0.1
                                                                                                 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.772016 | 127.0.0.2 |           1065 | 127.0.0.1
                      Scanning cache for range ({3897266736829642805, end}, {4611686018427387904, start}) and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.772019 | 127.0.0.2 |           1067 | 127.0.0.1
                                                                                                On shard execution result is [200] [shard 1] | 2022-01-27 22:53:08.772053 | 127.0.0.3 |           1384 | 127.0.0.1
    Page stats: 56 partition(s), 0 static row(s) (0 live, 0 dead), 56 clustering row(s) (56 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772138 | 127.0.0.2 |           1186 | 127.0.0.1
 Page stats: 190 partition(s), 0 static row(s) (0 live, 0 dead), 190 clustering row(s) (190 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.772364 | 127.0.0.2 |           1706 | 127.0.0.1
 Page stats: 149 partition(s), 0 static row(s) (0 live, 0 dead), 149 clustering row(s) (149 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772407 | 127.0.0.3 |           1561 | 127.0.0.1
                                                                                                                  Querying is done [shard 0] | 2022-01-27 22:53:08.772417 | 127.0.0.3 |           1571 | 127.0.0.1
                                                                                                                  Querying is done [shard 1] | 2022-01-27 22:53:08.772418 | 127.0.0.2 |           1760 | 127.0.0.1
                                                                                                                  Querying is done [shard 0] | 2022-01-27 22:53:08.772426 | 127.0.0.2 |           1475 | 127.0.0.1
                                                                                                                  Querying is done [shard 0] | 2022-01-27 22:53:08.772428 | 127.0.0.2 |           1476 | 127.0.0.1
                                                                                                                  Querying is done [shard 0] | 2022-01-27 22:53:08.772449 | 127.0.0.3 |           1604 | 127.0.0.1
                                                                                                On shard execution result is [196] [shard 0] | 2022-01-27 22:53:08.772555 | 127.0.0.2 |           1603 | 127.0.0.1
                                                                                                On shard execution result is [238] [shard 1] | 2022-01-27 22:53:08.772674 | 127.0.0.2 |           2016 | 127.0.0.1
                                                                                                On shard execution result is [235] [shard 0] | 2022-01-27 22:53:08.772770 | 127.0.0.3 |           1924 | 127.0.0.1
                                                                                    Received forward_result=[435] from 127.0.0.3:0 [shard 1] | 2022-01-27 22:53:08.772933 | 127.0.0.1 |           2482 | 127.0.0.1
                                                                                    Received forward_result=[434] from 127.0.0.2:0 [shard 1] | 2022-01-27 22:53:08.773110 | 127.0.0.1 |           2658 | 127.0.0.1
                                                                                                           Merged result is [1000] [shard 1] | 2022-01-27 22:53:08.773111 | 127.0.0.1 |           2660 | 127.0.0.1
                                                                                              Done processing - preparing a result [shard 1] | 2022-01-27 22:53:08.773114 | 127.0.0.1 |           2663 | 127.0.0.1
                                                                                                                            Request complete | 2022-01-27 22:53:08.772666 | 127.0.0.1 |           2666 | 127.0.0.1
```

Fixes #1385

Closes #9209

* github.com:scylladb/scylla:
  docs: add parallel aggregations design doc
  db: config: add a flag to disable new parallelized aggregation algorithm
  test: add parallelized select count test
  forward_service: add metrics
  forward_service: parallelize execution across shards
  forward_service: add tracing
  cql3: statements: introduce parallelized_select_statement
  cql3: query_processor: add forward_service reference to query_processor
  gms: add PARALLELIZED_AGGREGATION feature
  service: introduce forward_service
  storage_proxy: extract query_ranges_to_vnodes_generator to a separate file
  messaging_service: add verb for count(*) request forwarding
  cql3: selection: detect if a selection represents count(*)
2022-02-04 12:34:19 +02:00
..