Files
scylladb/tests/sstable_mutation_test.cc
Vladimir Krivopalov 3a9cb54c76 Merge the pair of index_readers into just one tracking a range.
Historically, we had two index_readers per a sstable_mutation_reader,
one for the lower bound and one for the upper bound. Most of public
members of the index_reader class were only called on either of those.
With the changes introduced in #2981, two readers are even more tied
together as they now have a shared-per-pair list of index pages that
needs proper cleanup and was protruding woefully into the caller code.

This fix re-structures index_reader so that it now keeps track of both
lower and upper bounds. The shared_index_lists structure is encapsulated
within index_reader and becomes an internal detail rather than a
liability.

Fixes #3220.

Tests: unit (debug, release)
+
Tested using cassandra-stress commands from #3189.

perf_fast_forward results indicate there is no performance degradation
caused by thix fix.

=========================== Baseline ===================================
running: large-partition-skips
Testing scanning large partition with skips.
Reads whole range interleaving reads with skips according to read-skip pattern:
read    skip      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
1       0         0.494458   1000000    2022418   1018     126960      27       0        0        0        0        0        0        0  97.6%
1       1         1.754717    500000     284946    997     127064       6       0        0        3        3        0        0        0  99.9%
1       8         0.551664    111112     201413    997     127064       6       0        0        3        3        0        0        0  99.7%
1       16        0.383888     58824     153232   1001     127080      10       0        0        5        5        0        0        0  99.5%
1       32        0.289073     30304     104832    997     127064      28       0        0        3        3        0        0        0  99.3%
1       64        0.236963     15385      64926    997     127064     122       0        0        3        3        0        0        0  99.2%
1       256       0.172901      3892      22510    997     127064     217       0        0        3        3        0        0        0  95.5%
1       1024      0.117570       976       8301    997     127064     235       0        0        3        3        0        0        0  49.0%
1       4096      0.085811       245       2855    664      27172     375     274        0        3        3        0        0        0  21.4%
64      1         0.512781    984616    1920149   1142     127064     139       0        0        3        3        0        0        0  98.7%
64      8         0.479232    888896    1854833   1001     127080      10       0        0        5        5        0        0        0  99.6%
64      16        0.451193    800000    1773078    997     127064       6       0        0        3        3        0        0        0  99.6%
64      32        0.408684    666688    1631305    997     127064       6       0        0        3        3        0        0        0  99.5%
64      64        0.351906    500032    1420924    997     127064      14       0        0        3        3        0        0        0  99.5%
64      256       0.227008    200000     881026    997     127064     211       0        0        3        3        0        0        0  99.1%
64      1024      0.125803     58880     468032    997     127064     290       0        0        3        3        0        0        0  65.1%
64      4096      0.098155     15424     157139    703      27856     401     267        0        3        3        0        0        0  25.8%

running: large-partition-slicing
Testing slicing of large partition:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.000701         1       1427      9        296       6       4        0        3        3        0        0        0  12.4%
0       32        0.000698        32      45827      9        296       6       3        0        3        3        0        0        0  13.9%
0       256       0.000808       256     316920     10        328       6       3        0        3        3        0        0        0  24.9%
0       4096      0.004368      4096     937697     25        808      14       3        0        3        3        0        0        0  45.9%
500000  1         0.001196         1        836     13        412       9       4        0        3        3        0        0        0  22.7%
500000  32        0.001200        32      26664     13        412       9       4        0        3        3        0        0        0  22.2%
500000  256       0.001503       256     170338     14        444      10       4        0        3        3        0        0        0  25.3%
500000  4096      0.004351      4096     941465     30        956      20       4        0        3        3        0        0        0  50.7%

running: large-partition-slicing-clustering-keys
Testing slicing of large partition using clustering keys:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.000625         1       1601      7        176       6       0        0        3        3        0        0        0  23.2%
0       32        0.000604        32      53016      7        176       6       0        0        3        3        0        0        0  24.7%
0       256       0.000695       256     368498      8        180       6       0        0        3        3        0        0        0  36.4%
0       4096      0.004083      4096    1003106     20        692      12       1        0        3        3        0        0        0  47.0%
500000  1         0.001198         1        835     12        516       9       3        0        3        3        0        0        0  22.8%
500000  32        0.000981        32      32631     12        388       9       3        0        3        3        0        0        0  29.2%
500000  256       0.001320       256     194011     13        384      10       3        0        3        3        0        0        0  29.0%
500000  4096      0.003944      4096    1038567     25        840      17       2        0        3        3        0        0        0  52.2%

running: large-partition-slicing-single-key-reader
Testing slicing of large partition, single-partition reader:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.000849         1       1178      9        488       6       0        0        3        3        0        0        0  16.5%
0       32        0.000661        32      48415      9        296       6       0        0        3        3        0        0        0  22.2%
0       256       0.000756       256     338648     10        328       6       0        0        3        3        0        0        0  33.3%
0       4096      0.004147      4096     987610     22        840      12       1        0        3        3        0        0        0  47.9%
500000  1         0.001041         1        960     13        476       9       3        0        3        3        0        0        0  25.9%
500000  32        0.001020        32      31375     13        412       9       3        0        3        3        0        0        0  29.1%
500000  256       0.001265       256     202373     14        444      10       3        0        3        3        0        0        0  32.0%
500000  4096      0.004121      4096     994014     30        988      18       3        0        3        3        0        0        0  52.7%

running: large-partition-select-few-rows
Testing selecting few rows from a large partition:
stride  rows      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
1000000 1         0.000668         1       1498      9        296       6       4        0        3        3        0        0        0  19.8%
500000  2         0.000976         2       2048     13        412       9       4        0        3        3        0        0        0  29.0%
250000  4         0.001408         4       2842     18        572      12       6        0        3        3        0        0        0  28.8%
125000  8         0.002004         8       3993     29        912      19      10        0        3        3        0        0        0  34.0%
62500   16        0.002883        16       5551     50       1584      32      18        0        3        3        0        0        0  41.9%
2       500000    1.053215    500000     474737   1138     127080     120       0        0        5        5        0        0        0  99.7%

running: large-partition-forwarding
Testing forwarding with clustering restriction in a large partition:
pk-scan   time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
yes       0.002717         2        736     24       2684       8      16        0        3        3        0        0        0  19.7%
no        0.001004         2       1992     13        412       8       2        0        3        3        0        0        0  30.2%

running: small-partition-skips
Testing scanning small partitions with skips.
Reads whole range interleaving reads with skips according to read-skip pattern:
   read    skip      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
-> 1       0         1.466523   1000000     681885   1369     139732      33       1        0        0        0        0        0        0  99.7%
-> 1       1        12.792183    500000      39086   6235     177736    5155       0        0     5123     7663        0        0        0  96.4%
-> 1       8         3.451431    111112      32193   6235     177736    5155       0        0     5123     9673        0        0        0  84.8%
-> 1       16        2.223815     58824      26452   6234     177704    5154       0        0     5122     9965        0        0        0  75.0%
-> 1       32        1.512511     30304      20036   6233     177680    5155       1        0     5123    10090        0        0        0  61.8%
-> 1       64        1.129465     15385      13621   6227     177464    5154       0        0     5122    10159        0        0        0  49.5%
-> 1       256       0.733282      3892       5308   6211     175464    5178      24        0     5122    10220        0        0        0  33.8%
-> 1       1024      0.397302       976       2457   5946     142152    5369     217        0     5120    10235        0        0        0  32.1%
-> 1       4096      0.187746       245       1305   5499      81992    5296     142        0     5122    10240        0        0        0  46.8%
-> 64      1         2.428488    984616     405444   7332     177736    5155      25        0     5123     5208        0        0        0  79.9%
-> 64      8         2.262876    888896     392817   6235     177736    5155       0        0     5123     5654        0        0        0  78.1%
-> 64      16        2.137544    800000     374261   6234     177732    5154       0        0     5122     6110        0        0        0  77.1%
-> 64      32        1.862466    666688     357960   6235     177736    5155       0        0     5123     6844        0        0        0  73.7%
-> 64      64        1.547757    500032     323069   6234     177728    5155       0        0     5123     7651        0        0        0  68.7%
-> 64      256       0.914612    200000     218672   6233     177704    5154       0        0     5122     9202        0        0        0  55.5%
-> 64      1024      0.475472     58880     123835   6229     177492    5154       5        0     5122     9930        0        0        0  45.4%
-> 64      4096      0.271239     15424      56865   6158     169480    5257     114        0     5115    10142        0        0        0  44.1%

running: small-partition-slicing
Testing slicing small partitions:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.003209         1        312      3        260       2       7        0        1        1        0        0        0  15.5%
0       32        0.004205        32       7610     16       1428      10       0        0        5        5        0        0        0  15.7%
0       256       0.009830       256      26042     97       8572      62       0        0       31       31        0        0        0  18.7%
0       4096      0.015471      4096     264748    100       8704      64       0        0       32       32        0        0        0  48.4%
500000  1         0.003654         1        274     34        492      33       0        0       32       64        0        0        0  28.7%
500000  32        0.004287        32       7464     40       1260      36       0        0       32       64        0        0        0  26.0%
500000  256       0.009598       256      26673    100       8748      64       4        0       32       64        0        0        0  20.6%
500000  4096      0.014151      4096     289449    119       7892      85       0        0       53       64        0        0        0  54.1%

========================  With the patch ================================
running: large-partition-skips
Testing scanning large partition with skips.
Reads whole range interleaving reads with skips according to read-skip pattern:
read    skip      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
1       0         0.468887   1000000    2132711   1018     126960      29       0        0        0        0        0        0        0  98.4%
1       1         1.735113    500000     288166   1001     127080      10       0        0        5        5        0        0        0  99.9%
1       8         0.535616    111112     207447    997     127064       6       0        0        3        3        0        0        0  99.6%
1       16        0.365487     58824     160947   1001     127080      15       0        0        5        5        0        0        0  99.5%
1       32        0.272208     30304     111326    997     127064      21       0        0        3        3        0        0        0  99.3%
1       64        0.224049     15385      68668    997     127064     208       0        0        3        3        0        0        0  99.1%
1       256       0.159247      3892      24440    997     127064     250       0        0        3        3        0        0        0  94.7%
1       1024      0.102107       976       9559    997     127064     292       0        0        3        3        0        0        0  53.6%
1       4096      0.084310       245       2906    664      27172     371     273        0        3        3        0        0        0  20.2%
64      1         0.508340    984616    1936923   1142     127064     129       0        0        3        3        0        0        0  98.1%
64      8         0.470369    888896    1889786    997     127064       6       0        0        3        3        0        0        0  99.6%
64      16        0.439917    800000    1818526   1001     127080      10       0        0        5        5        0        0        0  99.6%
64      32        0.397938    666688    1675358    997     127064       6       0        0        3        3        0        0        0  99.5%
64      64        0.344144    500032    1452972    997     127064      18       0        0        3        3        0        0        0  99.4%
64      256       0.219996    200000     909107    997     127064     251       0        0        3        3        0        0        0  99.1%
64      1024      0.124294     58880     473715    997     127064     284       1        0        3        3        0        0        0  62.2%
64      4096      0.097580     15424     158065    703      27856     400     267        0        3        3        0        0        0  25.3%

running: large-partition-slicing
Testing slicing of large partition:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.000733         1       1365      9        296       6       4        0        3        3        0        0        0  19.3%
0       32        0.000705        32      45417      9        296       6       3        0        3        3        0        0        0  15.3%
0       256       0.000830       256     308364     10        328       6       3        0        3        3        0        0        0  26.7%
0       4096      0.004631      4096     884529     25        808      14       3        0        3        3        0        0        0  48.1%
500000  1         0.001184         1        845     13        412       9       4        0        3        3        0        0        0  23.7%
500000  32        0.001199        32      26690     13        412       9       4        0        3        3        0        0        0  21.9%
500000  256       0.001530       256     167296     14        444      10       4        0        3        3        0        0        0  26.8%
500000  4096      0.004379      4096     935474     30        956      19       4        0        3        3        0        0        0  51.5%

running: large-partition-slicing-clustering-keys
Testing slicing of large partition using clustering keys:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.000620         1       1614      7        176       6       0        0        3        3        0        0        0  27.4%
0       32        0.000625        32      51218      7        176       6       0        0        3        3        0        0        0  27.0%
0       256       0.000701       256     365148      8        180       6       0        0        3        3        0        0        0  35.2%
0       4096      0.004063      4096    1008130     20        692      12       1        0        3        3        0        0        0  47.6%
500000  1         0.001208         1        827     12        516       9       3        0        3        3        0        0        0  24.3%
500000  32        0.000973        32      32876     12        388       9       3        0        3        3        0        0        0  28.7%
500000  256       0.001315       256     194612     13        384      10       3        0        3        3        0        0        0  29.0%
500000  4096      0.003950      4096    1037068     25        840      17       2        0        3        3        0        0        0  52.7%

running: large-partition-slicing-single-key-reader
Testing slicing of large partition, single-partition reader:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.000844         1       1185      9        488       6       0        0        3        3        0        0        0  16.5%
0       32        0.000656        32      48753      9        296       6       0        0        3        3        0        0        0  23.1%
0       256       0.000751       256     341011     10        328       6       0        0        3        3        0        0        0  34.0%
0       4096      0.004173      4096     981632     22        840      12       1        0        3        3        0        0        0  47.0%
500000  1         0.001036         1        966     13        476       9       3        0        3        3        0        0        0  25.4%
500000  32        0.001014        32      31573     13        412       9       3        0        3        3        0        0        0  27.4%
500000  256       0.001280       256     200044     14        444      10       3        0        3        3        0        0        0  31.8%
500000  4096      0.004081      4096    1003746     30        988      18       3        0        3        3        0        0        0  51.6%

running: large-partition-select-few-rows
Testing selecting few rows from a large partition:
stride  rows      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
1000000 1         0.000668         1       1498      9        296       6       3        0        3        3        0        0        0  21.7%
500000  2         0.000958         2       2088     13        412       9       4        0        3        3        0        0        0  27.7%
250000  4         0.001495         4       2676     18        572      12       6        0        3        3        0        0        0  25.8%
125000  8         0.002069         8       3866     29        912      19      10        0        3        3        0        0        0  30.8%
62500   16        0.002856        16       5603     50       1584      32      18        0        3        3        0        0        0  41.7%
2       500000    1.063129    500000     470310   1138     127080     120       0        0        5        5        0        0        0  99.7%

running: large-partition-forwarding
Testing forwarding with clustering restriction in a large partition:
pk-scan   time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
yes       0.002567         2        779     24       2684       8      16        0        3        3        0        0        0  21.5%
no        0.001013         2       1975     13        412       8       2        0        3        3        0        0        0  28.9%

running: small-partition-skips
Testing scanning small partitions with skips.
Reads whole range interleaving reads with skips according to read-skip pattern:
   read    skip      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
-> 1       0         1.349959   1000000     740763   1369     139732      33       1        0        0        0        0        0        0  99.7%
-> 1       1        12.640751    500000      39555   8144     191168    7064       0        0     7032    11481        0        0        0  96.2%
-> 1       8         3.404269    111112      32639   6651     180660    5571       0        0     5539    10505        0        0        0  84.5%
-> 1       16        2.175424     58824      27040   6434     179116    5354       0        0     5322    10365        0        0        0  74.3%
-> 1       32        1.493365     30304      20292   6335     178404    5257       0        0     5225    10294        0        0        0  61.1%
-> 1       64        1.112168     15385      13833   6256     177672    5183       0        0     5151    10217        0        0        0  48.7%
-> 1       256       0.719282      3892       5411   6211     175464    5178      24        0     5122    10220        0        0        0  33.3%
-> 1       1024      0.393236       976       2482   5946     142152    5369     217        0     5120    10235        0        0        0  30.7%
-> 1       4096      0.185284       245       1322   5499      81992    5296     142        0     5122    10240        0        0        0  44.7%
-> 64      1         2.356711    984616     417792   7361     177944    5184      21        0     5152     5266        0        0        0  79.1%
-> 64      8         2.192331    888896     405457   6253     177868    5173       0        0     5141     5690        0        0        0  77.2%
-> 64      16        2.029835    800000     394121   6245     177812    5165       0        0     5133     6132        0        0        0  75.7%
-> 64      32        1.806448    666688     369060   6245     177808    5165       0        0     5133     6864        0        0        0  72.6%
-> 64      64        1.508492    500032     331478   6242     177788    5163       0        0     5131     7667        0        0        0  67.7%
-> 64      256       0.892881    200000     223994   6233     177704    5154       0        0     5122     9202        0        0        0  54.2%
-> 64      1024      0.465715     58880     126429   6229     177492    5154       0        0     5122     9930        0        0        0  44.0%
-> 64      4096      0.266582     15424      57858   6158     169480    5257     114        0     5115    10142        0        0        0  42.3%

running: small-partition-slicing
Testing slicing small partitions:
offset  read      time (s)     frags     frag/s    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
0       1         0.003113         1        321      3        260       2       0        0        1        1        0        0        0  13.4%
0       32        0.004166        32       7682     16       1428      10       0        0        5        5        0        0        0  14.9%
0       256       0.009813       256      26088     97       8572      62       0        0       31       31        0        0        0  18.4%
0       4096      0.014798      4096     276794    100       8704      64       0        0       32       32        0        0        0  46.3%
500000  1         0.003700         1        270     34        492      33       0        0       32       64        0        0        0  28.4%
500000  32        0.004030        32       7940     40       1260      36       0        0       32       64        0        0        0  27.8%
500000  256       0.009514       256      26908    100       8748      64       0        0       32       64        0        0        0  20.2%
500000  4096      0.013368      4096     306413    119       7892      85       0        0       53       64        0        0        0  53.6%

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
Message-Id: <a72818f79ca4081a606424545b0053fa581d49e7.1522173144.git.vladimir@scylladb.com>
2018-03-29 15:23:31 +03:00

1198 lines
56 KiB
C++

/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/test/unit_test.hpp>
#include "tests/test-utils.hh"
#include "sstable_test.hh"
#include "sstables/key.hh"
#include "core/do_with.hh"
#include "core/thread.hh"
#include "sstables/sstables.hh"
#include "database.hh"
#include "timestamp.hh"
#include "schema_builder.hh"
#include "mutation_reader.hh"
#include "mutation_source_test.hh"
#include "partition_slice_builder.hh"
#include "tmpdir.hh"
#include "memtable-sstable.hh"
#include "tests/sstable_assertions.hh"
#include "tests/test_services.hh"
#include "flat_mutation_reader_assertions.hh"
#include "simple_schema.hh"
#include "tests/sstable_utils.hh"
using namespace sstables;
SEASTAR_TEST_CASE(nonexistent_key) {
return reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([] (auto sstp) {
return do_with(make_dkey(uncompressed_schema(), "invalid_key"), [sstp] (auto& key) {
auto s = uncompressed_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return (*rd)().then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(!mutation);
return make_ready_future<>();
});
});
});
}
future<> test_no_clustered(bytes&& key, std::unordered_map<bytes, data_value> &&map) {
return reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([k = std::move(key), map = std::move(map)] (auto sstp) mutable {
return do_with(make_dkey(uncompressed_schema(), std::move(k)), [sstp, map = std::move(map)] (auto& key) {
auto s = uncompressed_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd, map = std::move(map)] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
for (auto&& e : mp.range(*s, nonwrapping_range<clustering_key_prefix>())) {
BOOST_REQUIRE(to_bytes(e.key()) == to_bytes(""));
BOOST_REQUIRE(e.row().cells().size() == map.size());
auto &row = e.row().cells();
for (auto&& c: map) {
match_live_cell(row, *s, c.first, c.second);
}
}
return make_ready_future<>();
});
});
});
}
SEASTAR_TEST_CASE(uncompressed_1) {
return test_no_clustered("vinna", {{ "col1", to_sstring("daughter") }, { "col2", 3 }});
}
SEASTAR_TEST_CASE(uncompressed_2) {
return test_no_clustered("gustaf", {{ "col1", to_sstring("son") }, { "col2", 0 }});
}
SEASTAR_TEST_CASE(uncompressed_3) {
return test_no_clustered("isak", {{ "col1", to_sstring("son") }, { "col2", 1 }});
}
SEASTAR_TEST_CASE(uncompressed_4) {
return test_no_clustered("finna", {{ "col1", to_sstring("daughter") }, { "col2", 2 }});
}
/*
*
* insert into todata.complex_schema (key, clust1, clust2, reg_set, reg, static_obj) values ('key1', 'cl1.1', 'cl2.1', { '1', '2' }, 'v1', 'static_value');
* insert into todata.complex_schema (key, clust1, clust2, reg_list, reg, static_obj) values ('key1', 'cl1.2', 'cl2.2', [ '2', '1'], 'v2','static_value');
* insert into todata.complex_schema (key, clust1, clust2, reg_map, reg, static_obj) values ('key2', 'kcl1.1', 'kcl2.1', { '3': '1', '4' : '2' }, 'v3', 'static_value');
* insert into todata.complex_schema (key, clust1, clust2, reg_fset, reg, static_obj) values ('key2', 'kcl1.2', 'kcl2.2', { '3', '1', '4' , '2' }, 'v4', 'static_value');
* insert into todata.complex_schema (key, static_collection) values ('key2', { '1', '2', '3' , '4' });
* (flush)
*
* delete reg from todata.complex_schema where key = 'key2' and clust1 = 'kcl1.2' and clust2 = 'kcl2.2';
* insert into todata.complex_schema (key, clust1, clust2, reg, static_obj) values ('key3', 'tcl1.1', 'tcl2.1', 'v5', 'static_value_3') using ttl 86400;
* delete from todata.complex_schema where key = 'key1' and clust1='cl1.1';
* delete static_obj from todata.complex_schema where key = 'key2';
* delete reg_list[0] from todata.complex_schema where key = 'key1' and clust1='cl1.2' and clust2='cl2.2';
* delete reg_fset from todata.complex_schema where key = 'key2' and clust1='kcl1.2' and clust2='kcl2.2';
* delete reg_map['3'] from todata.complex_schema where key = 'key2' and clust1='kcl1.1' and clust2='kcl2.1';
* delete static_collection['1'] from todata.complex_schema where key = 'key2';
* (flush)
*
* insert into todata.complex_schema (key, static_obj) values('key2', 'final_static');
* update todata.complex_schema set reg_map = reg_map + { '6': '1' } where key = 'key2' and clust1='kcl1.1' and clust2='kcl2.1';
* update todata.complex_schema set reg_list = reg_list + [ '6' ] where key = 'key1' and clust1='cl1.2' and clust2='cl2.2';
* update todata.complex_schema set reg_set = reg_set + { '6' } where key = 'key1' and clust1='cl1.2' and clust2='cl2.2';
* (flush)
*/
// FIXME: we are lacking a full deletion test
template <int Generation>
future<mutation> generate_clustered(bytes&& key) {
return reusable_sst(complex_schema(), "tests/sstables/complex", Generation).then([k = std::move(key)] (auto sstp) mutable {
return do_with(make_dkey(complex_schema(), std::move(k)), [sstp] (auto& key) {
auto s = complex_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(mutation);
return std::move(*mutation);
});
});
});
}
inline auto clustered_row(mutation& mutation, const schema& s, std::vector<bytes>&& v) {
auto exploded = exploded_clustering_prefix(std::move(v));
auto clustering_pair = clustering_key::from_clustering_prefix(s, exploded);
return mutation.partition().clustered_row(s, clustering_pair);
}
SEASTAR_TEST_CASE(complex_sst1_k1) {
return generate_clustered<1>("key1").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("static_value")));
auto row1 = clustered_row(mutation, *s, {"cl1.1", "cl2.1"});
match_live_cell(row1.cells(), *s, "reg", data_value(to_bytes("v1")));
match_absent(row1.cells(), *s, "reg_list");
match_absent(row1.cells(), *s, "reg_map");
match_absent(row1.cells(), *s, "reg_fset");
auto reg_set = match_collection(row1.cells(), *s, "reg_set", tombstone(deletion_time{1431451390, 1431451390209521l}));
match_collection_element<status::live>(reg_set.cells[0], to_bytes("1"), bytes_opt{});
match_collection_element<status::live>(reg_set.cells[1], to_bytes("2"), bytes_opt{});
auto row2 = clustered_row(mutation, *s, {"cl1.2", "cl2.2"});
match_live_cell(row2.cells(), *s, "reg", data_value(to_bytes("v2")));
match_absent(row2.cells(), *s, "reg_set");
match_absent(row2.cells(), *s, "reg_map");
match_absent(row2.cells(), *s, "reg_fset");
auto reg_list = match_collection(row2.cells(), *s, "reg_list", tombstone(deletion_time{1431451390, 1431451390213471l}));
match_collection_element<status::live>(reg_list.cells[0], bytes_opt{}, to_bytes("2"));
match_collection_element<status::live>(reg_list.cells[1], bytes_opt{}, to_bytes("1"));
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(complex_sst1_k2) {
return generate_clustered<1>("key2").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("static_value")));
auto static_set = match_collection(sr, *s, "static_collection", tombstone(deletion_time{1431451390, 1431451390225257l}));
match_collection_element<status::live>(static_set.cells[0], to_bytes("1"), bytes_opt{});
match_collection_element<status::live>(static_set.cells[1], to_bytes("2"), bytes_opt{});
match_collection_element<status::live>(static_set.cells[2], to_bytes("3"), bytes_opt{});
match_collection_element<status::live>(static_set.cells[3], to_bytes("4"), bytes_opt{});
auto row1 = clustered_row(mutation, *s, {"kcl1.1", "kcl2.1"});
match_live_cell(row1.cells(), *s, "reg", data_value(to_bytes("v3")));
match_absent(row1.cells(), *s, "reg_list");
match_absent(row1.cells(), *s, "reg_set");
match_absent(row1.cells(), *s, "reg_fset");
auto reg_map = match_collection(row1.cells(), *s, "reg_map", tombstone(deletion_time{1431451390, 1431451390217436l}));
match_collection_element<status::live>(reg_map.cells[0], to_bytes("3"), to_bytes("1"));
match_collection_element<status::live>(reg_map.cells[1], to_bytes("4"), to_bytes("2"));
auto row2 = clustered_row(mutation, *s, {"kcl1.2", "kcl2.2"});
match_live_cell(row2.cells(), *s, "reg", data_value(to_bytes("v4")));
match_absent(row2.cells(), *s, "reg_set");
match_absent(row2.cells(), *s, "reg_map");
match_absent(row2.cells(), *s, "reg_list");
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(complex_sst2_k1) {
return generate_clustered<2>("key1").then([] (auto&& mutation) {
auto s = complex_schema();
auto exploded = exploded_clustering_prefix({"cl1.1", "cl2.1"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto t1 = mutation.partition().range_tombstone_for_row(*s, clustering);
BOOST_REQUIRE(t1.timestamp == 1431451394600754l);
BOOST_REQUIRE(t1.deletion_time == gc_clock::time_point(gc_clock::duration(1431451394)));
auto row = clustered_row(mutation, *s, {"cl1.2", "cl2.2"});
auto reg_list = match_collection(row.cells(), *s, "reg_list", tombstone(deletion_time{0, api::missing_timestamp}));
match_collection_element<status::dead>(reg_list.cells[0], bytes_opt{}, bytes_opt{});
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(complex_sst2_k2) {
return generate_clustered<2>("key2").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
match_dead_cell(sr, *s, "static_obj");
auto static_set = match_collection(sr, *s, "static_collection", tombstone(deletion_time{0, api::missing_timestamp}));
match_collection_element<status::dead>(static_set.cells[0], to_bytes("1"), bytes_opt{});
auto row1 = clustered_row(mutation, *s, {"kcl1.1", "kcl2.1"});
// map dead
match_absent(row1.cells(), *s, "reg_list");
match_absent(row1.cells(), *s, "reg_set");
match_absent(row1.cells(), *s, "reg_fset");
match_absent(row1.cells(), *s, "reg");
match_collection(row1.cells(), *s, "reg_map", tombstone(deletion_time{0, api::missing_timestamp}));
auto row2 = clustered_row(mutation, *s, {"kcl1.2", "kcl2.2"});
match_dead_cell(row2.cells(), *s, "reg");
match_absent(row2.cells(), *s, "reg_map");
match_absent(row2.cells(), *s, "reg_list");
match_absent(row2.cells(), *s, "reg_set");
match_dead_cell(row2.cells(), *s, "reg_fset");
match_dead_cell(row2.cells(), *s, "reg");
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(complex_sst2_k3) {
return generate_clustered<2>("key3").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
match_expiring_cell(sr, *s, "static_obj", data_value(to_bytes("static_value_3")), 1431451394597062l, 1431537794);
auto row1 = clustered_row(mutation, *s, {"tcl1.1", "tcl2.1"});
BOOST_REQUIRE(row1.created_at() == 1431451394597062l);
match_expiring_cell(row1.cells(), *s, "reg", data_value(to_bytes("v5")), 1431451394597062l, 1431537794);
match_absent(row1.cells(), *s, "reg_list");
match_absent(row1.cells(), *s, "reg_set");
match_absent(row1.cells(), *s, "reg_map");
match_absent(row1.cells(), *s, "reg_fset");
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(complex_sst3_k1) {
return generate_clustered<3>("key1").then([] (auto&& mutation) {
auto s = complex_schema();
auto row = clustered_row(mutation, *s, {"cl1.2", "cl2.2"});
auto reg_set = match_collection(row.cells(), *s, "reg_set", tombstone(deletion_time{0, api::missing_timestamp}));
match_collection_element<status::live>(reg_set.cells[0], to_bytes("6"), bytes_opt{});
auto reg_list = match_collection(row.cells(), *s, "reg_list", tombstone(deletion_time{0, api::missing_timestamp}));
match_collection_element<status::live>(reg_list.cells[0], bytes_opt{}, to_bytes("6"));
match_absent(row.cells(), *s, "static_obj");
match_absent(row.cells(), *s, "reg_map");
match_absent(row.cells(), *s, "reg");
match_absent(row.cells(), *s, "reg_fset");
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(complex_sst3_k2) {
return generate_clustered<3>("key2").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("final_static")));
auto row = clustered_row(mutation, *s, {"kcl1.1", "kcl2.1"});
auto reg_map = match_collection(row.cells(), *s, "reg_map", tombstone(deletion_time{0, api::missing_timestamp}));
match_collection_element<status::live>(reg_map.cells[0], to_bytes("6"), to_bytes("1"));
match_absent(row.cells(), *s, "reg_list");
match_absent(row.cells(), *s, "reg_set");
match_absent(row.cells(), *s, "reg");
match_absent(row.cells(), *s, "reg_fset");
return make_ready_future<>();
});
}
future<> test_range_reads(const dht::token& min, const dht::token& max, std::vector<bytes>& expected) {
return reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([min, max, &expected] (auto sstp) mutable {
auto s = uncompressed_schema();
auto count = make_lw_shared<size_t>(0);
auto expected_size = expected.size();
auto stop = make_lw_shared<bool>(false);
return do_with(dht::partition_range::make(dht::ring_position::starting_at(min),
dht::ring_position::ending_at(max)), [&, sstp, s] (auto& pr) {
auto mutations = make_lw_shared<flat_mutation_reader>(sstp->read_range_rows_flat(s, pr));
return do_until([stop] { return *stop; },
// Note: The data in the following lambda, including
// "mutations", continues to live until after the last
// iteration's future completes, so its lifetime is safe.
[sstp, mutations = std::move(mutations), &expected, expected_size, count, stop] () mutable {
return (*mutations)().then([&expected, expected_size, count, stop, mutations] (mutation_fragment_opt mfopt) mutable {
if (mfopt) {
BOOST_REQUIRE(mfopt->is_partition_start());
BOOST_REQUIRE(*count < expected_size);
BOOST_REQUIRE(std::vector<bytes>({expected.back()}) == mfopt->as_partition_start().key().key().explode());
expected.pop_back();
(*count)++;
mutations->next_partition();
} else {
*stop = true;
}
});
}).then([count, expected_size] {
BOOST_REQUIRE(*count == expected_size);
});
});
});
}
SEASTAR_TEST_CASE(read_range) {
std::vector<bytes> expected = { to_bytes("finna"), to_bytes("isak"), to_bytes("gustaf"), to_bytes("vinna") };
return do_with(std::move(expected), [] (auto& expected) {
return test_range_reads(dht::minimum_token(), dht::maximum_token(), expected);
});
}
SEASTAR_TEST_CASE(read_partial_range) {
std::vector<bytes> expected = { to_bytes("finna"), to_bytes("isak") };
return do_with(std::move(expected), [] (auto& expected) {
return test_range_reads(dht::global_partitioner().get_token(key_view(bytes_view(expected.back()))), dht::maximum_token(), expected);
});
}
SEASTAR_TEST_CASE(read_partial_range_2) {
std::vector<bytes> expected = { to_bytes("gustaf"), to_bytes("vinna") };
return do_with(std::move(expected), [] (auto& expected) {
return test_range_reads(dht::minimum_token(), dht::global_partitioner().get_token(key_view(bytes_view(expected.front()))), expected);
});
}
// Must be run in a seastar thread
static
void test_mutation_source(sstable_writer_config cfg, sstables::sstable::version_types version) {
std::vector<tmpdir> dirs;
run_mutation_source_tests([&dirs, &cfg, version] (schema_ptr s, const std::vector<mutation>& partitions) -> mutation_source {
tmpdir sstable_dir;
auto sst = sstables::make_sstable(s,
sstable_dir.path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
dirs.emplace_back(std::move(sstable_dir));
auto mt = make_lw_shared<memtable>(s);
for (auto&& m : partitions) {
mt->apply(m);
}
sst->write_components(mt->make_flat_reader(s), partitions.size(), s, cfg).get();
sst->load().get();
return as_mutation_source(sst);
});
}
SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
return seastar::async([] {
storage_service_for_tests ssft;
for (auto version : {sstables::sstable::version_types::ka, sstables::sstable::version_types::la}) {
for (auto index_block_size : {1, 128, 64*1024}) {
sstable_writer_config cfg;
cfg.promoted_index_block_size = index_block_size;
test_mutation_source(cfg, version);
}
}
});
}
SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) {
return seastar::async([] {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
auto s = make_lw_shared(schema({}, "ks", "cf",
{{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
auto key = partition_key::from_exploded(*s, {to_bytes(make_local_key(s))});
auto c_key_start = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
auto c_key_end = clustering_key::from_exploded(*s, {int32_type->decompose(2)});
mutation m(s, key);
auto ttl = gc_clock::now() + std::chrono::seconds(1);
m.partition().apply_delete(*s, range_tombstone(c_key_start, bound_kind::excl_start, c_key_end, bound_kind::excl_end, tombstone(9, ttl)));
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
sstables::sstable::version_types::la,
sstables::sstable::format_types::big);
write_memtable_to_sstable(*mt, sst).get();
sst->load().get();
auto mr = sst->read_rows_flat(s);
auto mut = read_mutation_from_flat_mutation_reader(mr).get0();
BOOST_REQUIRE(bool(mut));
auto& rts = mut->partition().row_tombstones();
BOOST_REQUIRE(rts.size() == 1);
auto it = rts.begin();
BOOST_REQUIRE(it->equal(*s, range_tombstone(
c_key_start,
bound_kind::excl_start,
c_key_end,
bound_kind::excl_end,
tombstone(9, ttl))));
});
}
SEASTAR_TEST_CASE(compact_storage_sparse_read) {
return reusable_sst(compact_sparse_schema(), "tests/sstables/compact_sparse", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_sparse_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_sparse_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
auto row = mp.clustered_row(*s, clustering_key::make_empty());
match_live_cell(row.cells(), *s, "cl1", data_value(to_bytes("cl1")));
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
});
});
});
}
SEASTAR_TEST_CASE(compact_storage_simple_dense_read) {
return reusable_sst(compact_simple_dense_schema(), "tests/sstables/compact_simple_dense", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_simple_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_simple_dense_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
auto& mp = mutation->partition();
auto exploded = exploded_clustering_prefix({"cl1"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
});
});
});
}
SEASTAR_TEST_CASE(compact_storage_dense_read) {
return reusable_sst(compact_dense_schema(), "tests/sstables/compact_dense", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_dense_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
auto& mp = mutation->partition();
auto exploded = exploded_clustering_prefix({"cl1", "cl2"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
return make_ready_future<>();
});
});
});
}
// We recently had an issue, documented at #188, where range-reading from an
// sstable would break if collections were used.
//
// Make sure we don't regress on that.
SEASTAR_TEST_CASE(broken_ranges_collection) {
return reusable_sst(peers_schema(), "tests/sstables/broken_ranges", 2).then([] (auto sstp) {
auto s = peers_schema();
auto reader = make_lw_shared<flat_mutation_reader>(sstp->as_mutation_source().make_reader(s, query::full_partition_range));
return repeat([s, reader] {
return read_mutation_from_flat_mutation_reader(*reader).then([s, reader] (mutation_opt mut) {
auto key_equal = [s, &mut] (sstring ip) {
return mut->key().equal(*s, partition_key::from_deeply_exploded(*s, { net::ipv4_address(ip) }));
};
if (!mut) {
return stop_iteration::yes;
} else if (key_equal("127.0.0.1")) {
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
match_absent(row.cells(), *s, "tokens");
} else if (key_equal("127.0.0.3")) {
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
auto tokens = match_collection(row.cells(), *s, "tokens", tombstone(deletion_time{0x55E5F2D5, 0x051EB3FC99715Dl }));
match_collection_element<status::live>(tokens.cells[0], to_bytes("-8180144272884242102"), bytes_opt{});
} else {
BOOST_REQUIRE(key_equal("127.0.0.2"));
auto t = mut->partition().partition_tombstone();
BOOST_REQUIRE(t.timestamp == 0x051EB3FB016850l);
}
return stop_iteration::no;
});
});
});
}
static schema_ptr tombstone_overlap_schema() {
static thread_local auto s = [] {
schema_builder builder(make_lw_shared(schema(generate_legacy_id("try1", "tab"), "try1", "tab",
// partition key
{{"pk", utf8_type}},
// clustering key
{{"ck1", utf8_type}, {"ck2", utf8_type}},
// regular columns
{{"data", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
""
)));
return builder.build(schema_builder::compact_storage::no);
}();
return s;
}
static future<sstable_ptr> ka_sst(schema_ptr schema, sstring dir, unsigned long generation) {
auto sst = make_sstable(std::move(schema), dir, generation, sstables::sstable::version_types::ka, big);
auto fut = sst->load();
return std::move(fut).then([sst = std::move(sst)] {
return make_ready_future<sstable_ptr>(std::move(sst));
});
}
// Considering the schema above, the sstable looks like:
// {"key": "pk",
// "cells": [["aaa:_","aaa:bbb:_",1459334681228103,"t",1459334681],
// ["aaa:bbb:_","aaa:bbb:!",1459334681244989,"t",1459334681],
// ["aaa:bbb:!","aaa:!",1459334681228103,"t",1459334681]]}
// ]
SEASTAR_TEST_CASE(tombstone_in_tombstone) {
return ka_sst(tombstone_overlap_schema(), "tests/sstables/tombstone_overlap", 1).then([] (auto sstp) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) {
if (!mut) {
return stop_iteration::yes;
}
auto make_pkey = [s] (sstring b) {
return partition_key::from_deeply_exploded(*s, { data_value(b) });
};
auto make_ckey = [s] (sstring c1, sstring c2 = {}) {
std::vector<data_value> v;
v.push_back(data_value(c1));
if (!c2.empty()) {
v.push_back(data_value(c2));
}
return clustering_key::from_deeply_exploded(*s, std::move(v));
};
BOOST_REQUIRE(mut->key().equal(*s, make_pkey("pk")));
// Somewhat counterintuitively, scylla represents
// deleting a small row with all clustering keys set - not
// as a "row tombstone" but rather as a deleted clustering row.
auto& rts = mut->partition().row_tombstones();
BOOST_REQUIRE(rts.size() == 2);
auto it = rts.begin();
BOOST_REQUIRE(it->equal(*s, range_tombstone(
make_ckey("aaa"),
bound_kind::incl_start,
make_ckey("aaa", "bbb"),
bound_kind::excl_end,
tombstone(1459334681228103LL, it->tomb.deletion_time))));
++it;
BOOST_REQUIRE(it->equal(*s, range_tombstone(
make_ckey("aaa", "bbb"),
bound_kind::excl_start,
make_ckey("aaa"),
bound_kind::incl_end,
tombstone(1459334681228103LL, it->tomb.deletion_time))));
auto& rows = mut->partition().clustered_rows();
BOOST_REQUIRE(rows.calculate_size() == 1);
for (auto e : rows) {
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb")));
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459334681244989LL);
}
return stop_iteration::no;
});
});
});
});
}
// Same schema as above, the sstable looks like:
// {"key": "pk",
// "cells": [["aaa:_","aaa:bbb:_",1459334681228103,"t",1459334681],
// ["aaa:bbb:_","aaa:ccc:!",1459334681228103,"t",1459334681],
// ["aaa:ccc:!","aaa:ddd:!",1459334681228103,"t",1459334681],
// ["aaa:ddd:!","aaa:!",1459334681228103,"t",1459334681]]}
//
// We're not sure how this sort of sstable can be generated with Cassandra 2's
// CQL, but we saw a similar thing is a real use case.
SEASTAR_TEST_CASE(range_tombstone_reading) {
return ka_sst(tombstone_overlap_schema(), "tests/sstables/tombstone_overlap", 4).then([] (auto sstp) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) {
if (!mut) {
return stop_iteration::yes;
}
auto make_pkey = [s] (sstring b) {
return partition_key::from_deeply_exploded(*s, { data_value(b) });
};
auto make_ckey = [s] (sstring c1, sstring c2 = {}) {
std::vector<data_value> v;
v.push_back(data_value(c1));
if (!c2.empty()) {
v.push_back(data_value(c2));
}
return clustering_key::from_deeply_exploded(*s, std::move(v));
};
BOOST_REQUIRE(mut->key().equal(*s, make_pkey("pk")));
auto& rts = mut->partition().row_tombstones();
BOOST_REQUIRE(rts.size() == 1);
auto it = rts.begin();
BOOST_REQUIRE(it->equal(*s, range_tombstone(
make_ckey("aaa"),
bound_kind::incl_start,
make_ckey("aaa"),
bound_kind::incl_end,
tombstone(1459334681228103LL, it->tomb.deletion_time))));
auto& rows = mut->partition().clustered_rows();
BOOST_REQUIRE(rows.calculate_size() == 0);
return stop_iteration::no;
});
});
});
});
}
// In this test case we have *three* levels of of tombstones:
// create COLUMNFAMILY tab2 (pk text, ck1 text, ck2 text, ck3 text, data text, primary key(pk, ck1, ck2, ck3));
// delete from tab2 where pk = 'pk' and ck1 = 'aaa';
// delete from tab2 where pk = 'pk' and ck1 = 'aaa' and ck2 = 'bbb';
// delete from tab2 where pk = 'pk' and ck1 = 'aaa' and ck2 = 'bbb' and ck3 = 'ccc';
// And then, to have more fun, I edited the resulting sstable manually (using
// Cassandra's json2sstable and sstable2json tools) to further split the
// resulting tombstones into even more tombstones:
// {"key": "pk",
// "cells":
// [["aaa:_","aaa:bba:_",1459438519943668,"t",1459438519],
// ["aaa:bba:_","aaa:bbb:_",1459438519943668,"t",1459438519],
// ["aaa:bbb:_","aaa:bbb:ccb:_",1459438519950348,"t",1459438519],
// ["aaa:bbb:ccb:_","aaa:bbb:ccc:_",1459438519950348,"t",1459438519],
// ["aaa:bbb:ccc:_","aaa:bbb:ccc:!",1459438519958850,"t",1459438519],
// ["aaa:bbb:ccc:!","aaa:bbb:ddd:!",1459438519950348,"t",1459438519],
// ["aaa:bbb:ddd:!","aaa:bbb:!",1459438519950348,"t",1459438519],
// ["aaa:bbb:!","aaa:!",1459438519943668,"t",1459438519]]}
static schema_ptr tombstone_overlap_schema2() {
static thread_local auto s = [] {
schema_builder builder(make_lw_shared(schema(generate_legacy_id("try1", "tab2"), "try1", "tab2",
// partition key
{{"pk", utf8_type}},
// clustering key
{{"ck1", utf8_type}, {"ck2", utf8_type}, {"ck3", utf8_type}},
// regular columns
{{"data", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
""
)));
return builder.build(schema_builder::compact_storage::no);
}();
return s;
}
SEASTAR_TEST_CASE(tombstone_in_tombstone2) {
return ka_sst(tombstone_overlap_schema2(), "tests/sstables/tombstone_overlap", 3).then([] (auto sstp) {
auto s = tombstone_overlap_schema2();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) {
if (!mut) {
return stop_iteration::yes;
}
auto make_pkey = [s] (sstring b) {
return partition_key::from_deeply_exploded(*s, { data_value(b) });
};
auto make_ckey = [s] (sstring c1, sstring c2 = {}, sstring c3 = {}) {
std::vector<data_value> v;
v.push_back(data_value(c1));
if (!c2.empty()) {
v.push_back(data_value(c2));
}
if (!c3.empty()) {
v.push_back(data_value(c3));
}
return clustering_key::from_deeply_exploded(*s, std::move(v));
};
BOOST_REQUIRE(mut->key().equal(*s, make_pkey("pk")));
auto& rows = mut->partition().clustered_rows();
auto& rts = mut->partition().row_tombstones();
auto it = rts.begin();
BOOST_REQUIRE(it->start_bound().equal(*s, bound_view(make_ckey("aaa"), bound_kind::incl_start)));
BOOST_REQUIRE(it->end_bound().equal(*s, bound_view(make_ckey("aaa", "bbb"), bound_kind::excl_end)));
BOOST_REQUIRE(it->tomb.timestamp == 1459438519943668L);
++it;
BOOST_REQUIRE(it->start_bound().equal(*s, bound_view(make_ckey("aaa", "bbb"), bound_kind::incl_start)));
BOOST_REQUIRE(it->end_bound().equal(*s, bound_view(make_ckey("aaa", "bbb", "ccc"), bound_kind::excl_end)));
BOOST_REQUIRE(it->tomb.timestamp == 1459438519950348L);
++it;
BOOST_REQUIRE(it->start_bound().equal(*s, bound_view(make_ckey("aaa", "bbb", "ccc"), bound_kind::excl_start)));
BOOST_REQUIRE(it->end_bound().equal(*s, bound_view(make_ckey("aaa", "bbb"), bound_kind::incl_end)));
BOOST_REQUIRE(it->tomb.timestamp == 1459438519950348L);
++it;
BOOST_REQUIRE(it->start_bound().equal(*s, bound_view(make_ckey("aaa", "bbb"), bound_kind::excl_start)));
BOOST_REQUIRE(it->end_bound().equal(*s, bound_view(make_ckey("aaa"), bound_kind::incl_end)));
BOOST_REQUIRE(it->tomb.timestamp == 1459438519943668L);
++it;
BOOST_REQUIRE(it == rts.end());
BOOST_REQUIRE(rows.calculate_size() == 1);
for (auto e : rows) {
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb", "ccc")));
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459438519958850LL);
}
return stop_iteration::no;
});
});
});
});
}
SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
schema_builder builder("ks", "cf");
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("c", int32_type, column_kind::clustering_key);
builder.with_column("v", int32_type);
auto s = builder.build(schema_builder::compact_storage::yes);
auto k = partition_key::from_exploded(*s, {to_bytes(make_local_key(s))});
auto ck = clustering_key::from_exploded(*s, {int32_type->decompose(static_cast<int32_t>(0xffff0000))});
mutation m(s, k);
auto cell = atomic_cell::make_live(1, int32_type->decompose(17), { });
m.set_clustered_cell(ck, *s->get_column_definition("v"), std::move(cell));
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
write_memtable_to_sstable(*mt, sst).get();
sst->load().get();
auto mr = sst->read_rows_flat(s);
auto mut = read_mutation_from_flat_mutation_reader(mr).get0();
BOOST_REQUIRE(bool(mut));
}
});
}
static std::unique_ptr<index_reader> get_index_reader(shared_sstable sst) {
return std::make_unique<index_reader>(sst, default_priority_class());
}
SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic) {
return seastar::async([] {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
schema_builder builder("ks", "cf");
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("c1", int32_type, column_kind::clustering_key);
builder.with_column("c2", int32_type, column_kind::clustering_key);
builder.with_column("v", int32_type);
auto s = builder.build();
auto k = partition_key::from_exploded(*s, {to_bytes(make_local_key(s))});
auto cell = atomic_cell::make_live(1, int32_type->decompose(88), { });
mutation m(s, k);
auto ck = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(2)});
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
ck = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(4)});
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
ck = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(6)});
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
ck = clustering_key::from_exploded(*s, {int32_type->decompose(3), int32_type->decompose(9)});
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
m.partition().apply_row_tombstone(*s, range_tombstone(
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
bound_kind::excl_start,
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(2)}),
bound_kind::incl_end,
{1, gc_clock::now()}));
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s), 1, s, cfg).get();
sst->load().get();
assert_that(get_index_reader(sst)).has_monotonic_positions(*s);
});
}
SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
schema_builder builder("ks", "cf");
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("c1", int32_type, column_kind::clustering_key);
builder.with_column("c2", int32_type, column_kind::clustering_key);
builder.with_column("v", int32_type);
auto s = builder.build(schema_builder::compact_storage::yes);
auto dk = dht::global_partitioner().decorate_key(*s, partition_key::from_exploded(*s, {to_bytes(make_local_key(s))}));
auto cell = atomic_cell::make_live(1, int32_type->decompose(88), { });
mutation m(s, dk);
auto ck1 = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(2)});
m.set_clustered_cell(ck1, *s->get_column_definition("v"), cell);
auto ck2 = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(4)});
m.set_clustered_cell(ck2, *s->get_column_definition("v"), cell);
auto ck3 = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(6)});
m.set_clustered_cell(ck3, *s->get_column_definition("v"), cell);
auto ck4 = clustering_key::from_exploded(*s, {int32_type->decompose(3), int32_type->decompose(9)});
m.set_clustered_cell(ck4, *s->get_column_definition("v"), cell);
m.partition().apply_row_tombstone(*s, range_tombstone(
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
bound_kind::incl_start,
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(2)}),
bound_kind::incl_end,
{1, gc_clock::now()}));
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s), 1, s, cfg).get();
sst->load().get();
{
assert_that(get_index_reader(sst)).has_monotonic_positions(*s);
}
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
}
});
}
SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
schema_builder builder("ks", "cf");
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("c1", int32_type, column_kind::clustering_key);
builder.with_column("v", int32_type);
auto s = builder.build(schema_builder::compact_storage::yes);
auto dk = dht::global_partitioner().decorate_key(*s, partition_key::from_exploded(*s, {to_bytes(make_local_key(s))}));
auto cell = atomic_cell::make_live(1, int32_type->decompose(88), { });
mutation m(s, dk);
auto ck1 = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
m.set_clustered_cell(ck1, *s->get_column_definition("v"), cell);
auto ck2 = clustering_key::from_exploded(*s, {int32_type->decompose(2)});
m.set_clustered_cell(ck2, *s->get_column_definition("v"), cell);
auto ck3 = clustering_key::from_exploded(*s, {int32_type->decompose(3)});
m.set_clustered_cell(ck3, *s->get_column_definition("v"), cell);
m.partition().apply_row_tombstone(*s, range_tombstone(
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
bound_kind::incl_start,
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(2)}),
bound_kind::incl_end,
{1, gc_clock::now()}));
auto mt = make_lw_shared<memtable>(s);
mt->apply(std::move(m));
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s), 1, s, cfg).get();
sst->load().get();
{
assert_that(get_index_reader(sst)).has_monotonic_positions(*s);
}
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
}
});
}
SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
int id = 0;
for (auto& compact : { schema_builder::compact_storage::no, schema_builder::compact_storage::yes }) {
const auto generation = id++;
schema_builder builder("ks", sprint("cf%d", generation));
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("c1", bytes_type, column_kind::clustering_key);
builder.with_column("v", int32_type);
auto s = builder.build(compact);
auto dk = dht::global_partitioner().decorate_key(*s, partition_key::from_exploded(*s, {to_bytes(make_local_key(s))}));
auto cell = atomic_cell::make_live(1, int32_type->decompose(88), { });
mutation m(s, dk);
m.partition().apply_row_tombstone(*s, range_tombstone(
clustering_key_prefix::from_exploded(*s, {bytes_type->decompose(data_value(to_bytes("ck1")))}),
bound_kind::incl_start,
clustering_key_prefix::from_exploded(*s, {bytes_type->decompose(data_value(to_bytes("ck5")))}),
bound_kind::incl_end,
{1, gc_clock::now()}));
auto ck = clustering_key::from_exploded(*s, {bytes_type->decompose(data_value(to_bytes("ck3")))});
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
auto sst = sstables::make_sstable(s,
dir->path,
generation,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s), 1, s, cfg).get();
sst->load().get();
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck})).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
}
}
});
}
SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound_dense_schemas) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
schema_builder builder("ks", "cf");
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("c", int32_type, column_kind::clustering_key);
builder.with_column("v", int32_type);
auto s = builder.build(schema_builder::compact_storage::yes);
auto dk = dht::global_partitioner().decorate_key(*s, partition_key::from_exploded(*s, {to_bytes(make_local_key(s))}));
mutation m(s, dk);
m.partition().apply_row_tombstone(*s, range_tombstone(
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
bound_kind::incl_start,
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(2)}),
bound_kind::incl_end,
{1, gc_clock::now()}));
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
sst->write_components(mt->make_flat_reader(s), 1, s, cfg).get();
sst->load().get();
{
auto slice = partition_slice_builder(*s).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
}
});
}
SEASTAR_TEST_CASE(test_promoted_index_is_absent_for_schemas_without_clustering_key) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
schema_builder builder("ks", "cf");
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("v", int32_type);
auto s = builder.build(schema_builder::compact_storage::yes);
auto dk = dht::global_partitioner().decorate_key(*s, partition_key::from_exploded(*s, {to_bytes(make_local_key(s))}));
mutation m(s, dk);
for (auto&& v : { 1, 2, 3, 4 }) {
auto cell = atomic_cell::make_live(1, int32_type->decompose(v), { });
m.set_clustered_cell(clustering_key_prefix::make_empty(), *s->get_column_definition("v"), cell);
}
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
cfg.promoted_index_block_size = 1;
sst->write_components(mt->make_flat_reader(s), 1, s, cfg).get();
sst->load().get();
assert_that(get_index_reader(sst)).is_empty(*s);
}
});
}
SEASTAR_TEST_CASE(test_can_write_and_read_non_compound_range_tombstone_as_compound) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
schema_builder builder("ks", "cf");
builder.with_column("p", utf8_type, column_kind::partition_key);
builder.with_column("c", int32_type, column_kind::clustering_key);
builder.with_column("v", int32_type);
auto s = builder.build(schema_builder::compact_storage::yes);
auto dk = dht::global_partitioner().decorate_key(*s, partition_key::from_exploded(*s, {to_bytes(make_local_key(s))}));
mutation m(s, dk);
m.partition().apply_row_tombstone(*s, range_tombstone(
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
bound_kind::incl_start,
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(2)}),
bound_kind::incl_end,
{1, gc_clock::now()}));
auto mt = make_lw_shared<memtable>(s);
mt->apply(m);
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
cfg.correctly_serialize_non_compound_range_tombstones = false;
sst->write_components(mt->make_flat_reader(s), 1, s, cfg).get();
sst->load().get();
{
auto slice = partition_slice_builder(*s).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
}
});
}
SEASTAR_TEST_CASE(test_writing_combined_stream_with_tombstones_at_the_same_position) {
return seastar::async([] {
for (const auto version : all_sstable_versions) {
storage_service_for_tests ssft;
auto dir = make_lw_shared<tmpdir>();
simple_schema ss;
auto s = ss.schema();
auto rt1 = ss.make_range_tombstone(ss.make_ckey_range(1, 10));
auto rt2 = ss.make_range_tombstone(ss.make_ckey_range(1, 5)); // rt1 + rt2 = {[1, 5], (5, 10]}
auto local_k = make_local_key(s);
mutation m1 = ss.new_mutation(local_k);
ss.add_row(m1, ss.make_ckey(0), "v0"); // So that we don't hit workaround for #1203, which would cover up bugs
m1.partition().apply_delete(*s, rt1);
m1.partition().apply_delete(*s, ss.make_ckey(4), ss.new_tombstone());
auto rt3 = ss.make_range_tombstone(ss.make_ckey_range(20, 21));
m1.partition().apply_delete(*s, ss.make_ckey(20), ss.new_tombstone());
m1.partition().apply_delete(*s, rt3);
mutation m2 = ss.new_mutation(local_k);
m2.partition().apply_delete(*s, rt2);
ss.add_row(m2, ss.make_ckey(4), "v2"); // position inside rt2
auto mt1 = make_lw_shared<memtable>(s);
mt1->apply(m1);
auto mt2 = make_lw_shared<memtable>(s);
mt2->apply(m2);
auto sst = sstables::make_sstable(s,
dir->path,
1 /* generation */,
version,
sstables::sstable::format_types::big);
sstable_writer_config cfg;
sst->write_components(make_combined_reader(s,
mt1->make_flat_reader(s),
mt2->make_flat_reader(s)), 1, s, cfg).get();
sst->load().get();
assert_that(sst->as_mutation_source().make_reader(s))
.produces(m1 + m2)
.produces_end_of_stream();
}
});
}