Files
scylladb/sstables/kl/reader.cc
Avi Kivity 99d5355007 Merge "Cache sstable indexes in memory" from Tomasz
"
The main goal of this series is to improve efficiency of reads from large partitions by
reducing amount of I/O needed to read the sstable index. This is achieved by caching
index file pages and partition index entries in memory.

Currently, the pages are cached by individual reads only for the duration of the read.
This was done to facilitate binary search in the promoted index (intra-partition index).
After this series, all reads share the index file page cache, which stays around even after reads stop.

The page cache is subject to eviction. It uses the same region as the current row cache and shares
the LRU with row cache entries. This means that LRU objects need to be virtualized. This series takes
an easy approach and does this by introducing a virtual base class. This adds an overhead to row cache
entry to store the vtable pointer.

SStable indexes have a hierarchy. There is a summary, which is a sparse partition key index into the
full partition index. This one is already kept in memory. The partition index is divided by the summary
into pages. Each entry in the partition index contains promoted index, which is a sparse index into atoms
identified by the clustering key (rows, tombstones).

In order to read the promoted index, the reader needs to read the partition index entry first.
To speed this up, this series also adds caching of partition index entries. This cache survives
reads and is subject to eviction, just like the index file page cache. The unit of caching is
the partition index page. Without this cache, each access to promoted index would have to be
preceded with the parsing of the partition index page containing the partition key.

Performance testing results follow.

1) scylla-bench large partition reads

  Populated with:

        perf_fast_forward --run-tests=large-partition-skips --datasets=sb-large-part-ds1 \
            -c1 -m1G --populate --value-size=1024 --rows=10000000

  Single partition, 9G data file, 4MB index file

  Test execution:

    build/release/scylla -c1 -m4G
    scylla-bench -workload uniform -mode read -limit 1 -concurrency 100 -partition-count 1 \
       -clustering-row-count 10000000 -duration 60m

  TL;DR: after: 2x throughput, 0.5 median latency

    Before (c1daf2bb24):

    Results
    Time (avg):	 5m21.033180213s
    Total ops:	 966951
    Total rows:	 966951
    Operations/s:	 3011.997048812112
    Rows/s:		 3011.997048812112
    Latency:
      max:		 74.055679ms
      99.9th:	 63.569919ms
      99th:		 41.320447ms
      95th:		 38.076415ms
      90th:		 37.158911ms
      median:	 34.537471ms
      mean:		 33.195994ms

    After:

    Results
    Time (avg):	 5m14.706669345s
    Total ops:	 2042831
    Total rows:	 2042831
    Operations/s:	 6491.22243800942
    Rows/s:		 6491.22243800942
    Latency:
      max:		 60.096511ms
      99.9th:	 35.520511ms
      99th:		 27.000831ms
      95th:		 23.986175ms
      90th:		 21.659647ms
      median:	 15.040511ms
      mean:		 15.402076ms

2) scylla-bench small partitions

  I tested several scenarios with a varying data set size, e.g. data fully fitting in memory,
  half fitting, and being much larger. The improvement varied a bit but in all cases the "after"
  code performed slightly better.

  Below is a representative run over data set which does not fit in memory.

  scylla -c1 -m4G
  scylla-bench -workload uniform -mode read  -concurrency 400 -partition-count 10000000 \
      -clustering-row-count 1 -duration 60m -no-lower-bound

  Before:

    Time (avg):	 51.072411913s
    Total ops:	 3165885
    Total rows:	 3165885
    Operations/s:	 61988.164024260645
    Rows/s:		 61988.164024260645
    Latency:
      max:		 34.045951ms
      99.9th:	 25.985023ms
      99th:		 23.298047ms
      95th:		 19.070975ms
      90th:		 17.530879ms
      median:	 3.899391ms
      mean:		 6.450616ms

  After:

    Time (avg):	 50.232410679s
    Total ops:	 3778863
    Total rows:	 3778863
    Operations/s:	 75227.58014424688
    Rows/s:		 75227.58014424688
    Latency:
      max:		 37.027839ms
      99.9th:	 24.805375ms
      99th:		 18.219007ms
      95th:		 14.090239ms
      90th:		 12.124159ms
      median:	 4.030463ms
      mean:		 5.315111ms

  The results include the warmup phase which populates the partition index cache, so the hot-cache effect
  is dampened in the statistics. See the 99th percentile. Latency gets better after the cache warms up which
  moves it lower.

3) perf_fast_forward --run-tests=large-partition-skips

    Caching is not used here, included to show there are no regressions for the cold cache case.

    TL;DR: No significant change

    perf_fast_forward --run-tests=large-partition-skips --datasets=large-part-ds1 -c1 -m1G

    Config: rows: 10000000, value size: 2000

    Before:

    read    skip      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
    1       0        36.429822            4  10000000     274500         62     274521     274429   153889.2 153883   19696986  153853       0        0        0        0        0        0        0  22.5%
    1       1        36.856236            4   5000000     135662          7     135670     135650   155652.0 155652   19704117  139326       1        0        1        1        0        0        0  38.1%
    1       8        36.347667            4   1111112      30569          0      30570      30569   155652.0 155652   19704117  139071       1        0        1        1        0        0        0  19.5%
    1       16       36.278866            4    588236      16214          1      16215      16213   155652.0 155652   19704117  139073       1        0        1        1        0        0        0  16.6%
    1       32       36.174784            4    303031       8377          0       8377       8376   155652.0 155652   19704117  139056       1        0        1        1        0        0        0  12.3%
    1       64       36.147104            4    153847       4256          0       4256       4256   155652.0 155652   19704117  139109       1        0        1        1        0        0        0  11.1%
    1       256       9.895288            4     38911       3932          1       3933       3930   100869.2 100868    3178298   59944   38912        0        1        1        0        0        0  14.3%
    1       1024      2.599921            4      9757       3753          0       3753       3753    26604.0  26604     801850   15071    9758        0        1        1        0        0        0  14.6%
    1       4096      0.784568            4      2441       3111          1       3111       3109     7982.0   7982     205946    3772    2442        0        1        1        0        0        0  13.8%

    64      1        36.553975            4   9846154     269359         10     269369     269337   155663.8 155652   19704117  139230       1        0        1        1        0        0        0  28.2%
    64      8        36.509694            4   8888896     243467          8     243475     243449   155652.0 155652   19704117  139120       1        0        1        1        0        0        0  26.5%
    64      16       36.466282            4   8000000     219381          4     219385     219374   155652.0 155652   19704117  139232       1        0        1        1        0        0        0  24.8%
    64      32       36.395926            4   6666688     183171          6     183180     183165   155652.0 155652   19704117  139158       1        0        1        1        0        0        0  21.8%
    64      64       36.296856            4   5000000     137753          4     137757     137737   155652.0 155652   19704117  139105       1        0        1        1        0        0        0  17.7%
    64      256      20.590392            4   2000000      97133         18      97151      94996   135248.8 131395    7877402   98335   31282        0        1        1        0        0        0  15.7%
    64      1024      6.225773            4    588288      94492       1436      95434      88748    46066.5  41321    2324378   30360    9193        0        1        1        0        0        0  15.8%
    64      4096      1.856069            4    153856      82893         54      82948      82721    16115.0  16043     583674   11574    2675        0        1        1        0        0        0  16.3%

    After:

    read    skip      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
    1       0        36.429240            4  10000000     274505         38     274515     274417   153887.8 153883   19696986  153849       0        0        0        0        0        0        0  22.4%
    1       1        36.933806            4   5000000     135377         15     135385     135354   155658.0 155658   19704085  139398       1        0        1        1        0        0        0  40.0%
    1       8        36.419187            4   1111112      30509          2      30510      30507   155658.0 155658   19704085  139233       1        0        1        1        0        0        0  22.0%
    1       16       36.353475            4    588236      16181          0      16182      16181   155658.0 155658   19704085  139183       1        0        1        1        0        0        0  19.2%
    1       32       36.251356            4    303031       8359          0       8359       8359   155658.0 155658   19704085  139120       1        0        1        1        0        0        0  14.8%
    1       64       36.203692            4    153847       4249          0       4250       4249   155658.0 155658   19704085  139071       1        0        1        1        0        0        0  13.0%
    1       256       9.965876            4     38911       3904          0       3906       3904   100875.2 100874    3178266   60108   38912        0        1        1        0        0        0  17.9%
    1       1024      2.637501            4      9757       3699          1       3700       3697    26610.0  26610     801818   15071    9758        0        1        1        0        0        0  19.5%
    1       4096      0.806745            4      2441       3026          1       3027       3024     7988.0   7988     205914    3773    2442        0        1        1        0        0        0  18.3%

    64      1        36.611243            4   9846154     268938          5     268942     268921   155669.8 155705   19704085  139330       2        0        1        1        0        0        0  29.9%
    64      8        36.559471            4   8888896     243135         11     243156     243124   155658.0 155658   19704085  139261       1        0        1        1        0        0        0  28.1%
    64      16       36.510319            4   8000000     219116         15     219126     219101   155658.0 155658   19704085  139173       1        0        1        1        0        0        0  26.3%
    64      32       36.439069            4   6666688     182954          9     182964     182943   155658.0 155658   19704085  139274       1        0        1        1        0        0        0  23.2%
    64      64       36.334808            4   5000000     137609         11     137612     137596   155658.0 155658   19704085  139258       2        0        1        1        0        0        0  19.1%
    64      256      20.624759            4   2000000      96971         88      97059      92717   138296.0 131401    7877370   98332   31282        0        1        1        0        0        0  17.2%
    64      1024      6.260598            4    588288      93967       1429      94905      88051    45939.5  41327    2324346   30361    9193        0        1        1        0        0        0  17.8%
    64      4096      1.881338            4    153856      81780        140      81920      81520    16109.8  16092     582714   11617    2678        0        1        1        0        0        0  18.2%

4) perf_fast_forward --run-tests=large-partition-slicing

    Caching enabled, each line shows the median run from many iterations

    TL;DR: We can observe reduction in IO which translates to reduction in execution time,
           especially for slicing in the middle of partition.

    perf_fast_forward --run-tests=large-partition-slicing --datasets=large-part-ds1 -c1 -m1G --keep-cache-across-test-cases

    Config: rows: 10000000, value size: 2000

    Before:

    offset  read      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    allocs   tasks insns/f    cpu
    0       1         0.000491          127         1       2037         24       2109        127        4.0      4        128       2       2        0        1        1        0        0        0       157      80 3058208  15.0%
    0       32        0.000561         1740        32      56995        410      60031      47208        5.0      5        160       3       2        0        1        1        0        0        0       386     111  113353  17.5%
    0       256       0.002052          488       256     124736       7111     144762      89053       16.6     17        672      14       2        0        1        1        0        0        0      2113     446   52669  18.6%
    0       4096      0.016437           61      4096     249199        692     252389     244995       69.4     69       8640      57       5        0        1        1        0        0        0     26638    1717   23321  22.4%
    5000000 1         0.002171          221         1        461          2        466        221       25.0     25        268       3       3        0        1        1        0        0        0       638     376 14311524  10.2%
    5000000 32        0.002392          404        32      13376         48      13528      13015       27.0     27        332       5       3        0        1        1        0        0        0       931     432  489691  11.9%
    5000000 256       0.003659          279       256      69967        764      73130      52563       39.5     41        780      19       3        0        1        1        0        0        0      2689     825   93756  15.8%
    5000000 4096      0.018592           55      4096     220313        433     234214     218803       94.2     94       9484      62       9        0        1        1        0        0        0     27349    2213   26562  21.0%

    After:

    offset  read      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    allocs   tasks insns/f    cpu
    0       1         0.000229          115         1       4371         85       4585        115        2.1      2         64       1       1        1        0        0        0        0        0        90      31 1314749  22.2%
    0       32        0.000277         2174        32     115674       1015     128109      14144        3.0      3         96       2       1        1        0        0        0        0        0       319      62   52508  26.1%
    0       256       0.001786          576       256     143298       5534     179142     113715       14.7     17        544      15       1        1        0        0        0        0        0      2110     453   45419  21.4%
    0       4096      0.015498           61      4096     264289       2006     268850     259342       67.4     67       8576      59       4        1        0        0        0        0        0     26657    1738   22897  23.7%
    5000000 1         0.000415          233         1       2411         15       2456        234        4.1      4        128       2       2        1        0        0        0        0        0       199      72 2644719  16.8%
    5000000 32        0.000635         1413        32      50398        349      51149      46439        6.0      6        192       4       2        1        0        0        0        0        0       458     128  125893  18.6%
    5000000 256       0.002028          486       256     126228       3024     146327      82559       17.8     18       1024      13       4        1        0        0        0        0        0      2123     385   51787  19.6%
    5000000 4096      0.016836           61      4096     243294        814     263434     241660       73.0     73       9344      62       8        1        0        0        0        0        0     26922    1920   24389  22.4%

Future work:

 - Check the impact on non-uniform workloads. Caching sstable indexes takes space away from the row cache
   which may reduce the hit ratio.

 - Reduce memory footprint of partition index cache. Currently, about 8x bloat over the on-disk size.

 - Disable cache population for "bypass cache" reads

 - Add a switch to disable sstable index caching, per-node, maybe per-table

 - Better sstable index format. Current format leads to inefficiency in caching since only some elements of the cached
   page can be hot. A B-tree index would be more efficient. Same applies to the partition index. Only some elements in
   the partition index page can be hot.

 - Add heuristic for reducing index file IO size when large partitions are anticipated. If we're bound by disk's
   bandwidth it's wasteful to read the front of promoted index using 32K IO, better use 4K which should cover the
   partition entry and then let binary search read the rest.

In V2:

 - Fixed perf_fast_forward regression in the number of IOs used to read partition index page
   The reader uses 32K reads, which were split by page cache into 4K reads
   Fix by propagating IO size hints to page cache and using single IO to populate it.
   New patch: "cached_file: Issue single I/O for the whole read range on miss"

 - Avoid large allocations to store partition index page entries (due to managed_vector storage).
   There is a unit test which detects this and fails.
   Fixed by implementing chunked_managed_vector, based on chunked_vector.

 - fixed bug in cached_file::evict_gently() where the wrong allocation strategy was used to free btree chunks

 - Simplify region_impl::free_buf() according to Avi's suggestions

 - Fit segment_kind in segment_descriptor::_free_space and lift requirement that _buf_pointers emptiness determines the kind

 - Workaround sigsegv which was most likely due to coroutine miscompilation. Worked around by manipulating local object scope.

 - Wire up system/drop_sstable_caches RESTful API

 - Fix use-after-move on permit for the old scanning ka/la index reader

 - Fixed more cases of double open_data() in tests leading to assert failure

 - Adjusted cached_file class doc to account for changes in behavior.

 - Rebased

Fixes #7079.
Refs #363.
"

* tag 'sstable-index-caching-v2' of github.com:tgrabiec/scylla: (39 commits)
  api: Drop sstable index caches on system/drop_sstable_caches
  cached_file: Issue single I/O for the whole read range on miss
  row_cache: cache_tracker: Do not register metrics when constructed for tests
  sstables, cached_file: Evict cache gently when sstable is destroyed
  sstables: Hide partition_index_cache implementation away from sstables.hh
  sstables: Drop shared_index_lists alias
  sstables: Destroy partition index cache gently
  sstables: Cache partition index pages in LSA and link to LRU
  utils: Introduce lsa::weak_ptr<>
  sstables: Rename index_list to partition_index_page and shared_index_lists to partition_index_cache
  sstables, cached_file: Avoid copying buffers from cache when parsing promoted index
  cached_file: Introduce get_page_units()
  sstables: read: Document that primitive_consumer::read_32() is alloc-free
  sstables: read: Count partition index page evictions
  sstables: Drop the _use_binary_search flag from index entries
  sstables: index_reader: Keep index objects under LSA
  lsa: chunked_managed_vector: Adapt more to managed_vector
  utils: lsa: chunked_managed_vector: Make LSA-aware
  test: chunked_managed_vector_test: Make exception_safe_class standard layout
  lsa: Copy chunked_vector to chunked_managed_vector
  ...
2021-07-07 18:17:10 +03:00

1195 lines
50 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstables/kl/reader.hh"
#include "sstables/kl/reader_impl.hh"
#include "sstables/sstable_mutation_reader.hh"
#include "concrete_types.hh"
namespace sstables {
namespace kl {
static inline bytes_view pop_back(std::vector<bytes_view>& vec) {
auto b = std::move(vec.back());
vec.pop_back();
return b;
}
class mp_row_consumer_k_l : public row_consumer {
private:
mp_row_consumer_reader_k_l* _reader;
schema_ptr _schema;
const query::partition_slice& _slice;
bool _out_of_range = false;
std::optional<query::clustering_key_filter_ranges> _ck_ranges;
std::optional<clustering_ranges_walker> _ck_ranges_walker;
// When set, the fragment pending in _in_progress should not be emitted.
bool _skip_in_progress = false;
// The value of _ck_ranges->lower_bound_counter() last time we tried to skip to _ck_ranges->lower_bound().
size_t _last_lower_bound_counter = 0;
// We don't have "end of clustering row" markers. So we know that the current
// row has ended once we get something (e.g. a live cell) that belongs to another
// one. If that happens sstable reader is interrupted (proceed::no) but we
// already have the whole row that just ended and a part of the new row.
// The finished row is moved to _ready so that upper layer can retrieve it and
// the part of the new row goes to _in_progress and this is were we will continue
// accumulating data once sstable reader is continued.
//
// _ready only holds fragments which are in the query range, but _in_progress
// not necessarily.
//
// _in_progress may be disengaged only before reading first fragment of partition
// or after all fragments of partition were consumed. Fast-forwarding within partition
// should not clear it, we rely on it being set to detect repeated tombstones.
mutation_fragment_opt _in_progress;
mutation_fragment_opt _ready;
bool _is_mutation_end = true;
position_in_partition _fwd_end = position_in_partition::after_all_clustered_rows(); // Restricts the stream on top of _ck_ranges_walker.
streamed_mutation::forwarding _fwd;
// Because of #1203 we may encounter sstables with range tombstones
// placed earlier than expected. We fix the ordering by loading range tombstones
// initially into _range_tombstones, until first row is encountered,
// and then merge the two streams in push_ready_fragments().
//
// _range_tombstones holds only tombstones which are relevant for current ranges.
range_tombstone_stream _range_tombstones;
bool _first_row_encountered = false;
// See #2986
bool _treat_non_compound_rt_as_compound;
public:
struct column {
bool is_static;
bytes_view col_name;
std::vector<bytes_view> clustering;
// see is_collection. collections have an extra element aside from the name.
// This will be non-zero size if this is a collection, and zero size othersize.
bytes_view collection_extra_data;
bytes_view cell;
const column_definition *cdef;
bool is_present;
static constexpr size_t static_size = 2;
// For every normal column, we expect the clustering key, followed by the
// extra element for the column name.
//
// For a collection, some auxiliary data will be embedded into the
// column_name as seen by the row consumer. This means that if our
// exploded clustering keys has more rows than expected, we are dealing
// with a collection.
bool is_collection(const schema& s) const {
auto expected_normal = s.clustering_key_size() + 1;
// Note that we can have less than the expected. That is the case for
// incomplete prefixes, for instance.
if (clustering.size() <= expected_normal) {
return false;
} else if (clustering.size() == (expected_normal + 1)) {
return true;
}
throw malformed_sstable_exception(format("Found {:d} clustering elements in column name. Was not expecting that!", clustering.size()));
}
static bool check_static(const schema& schema, bytes_view col) {
return composite_view(col, schema.is_compound()).is_static();
}
static bytes_view fix_static_name(const schema& schema, bytes_view col) {
return fix_static_name(col, check_static(schema, col));
}
static bytes_view fix_static_name(bytes_view col, bool is_static) {
if(is_static) {
col.remove_prefix(static_size);
}
return col;
}
std::vector<bytes_view> extract_clustering_key(const schema& schema) {
return composite_view(col_name, schema.is_compound()).explode();
}
column(const schema& schema, bytes_view col, api::timestamp_type timestamp)
: is_static(check_static(schema, col))
, col_name(fix_static_name(col, is_static))
, clustering(extract_clustering_key(schema))
, collection_extra_data(is_collection(schema) ? pop_back(clustering) : bytes()) // collections are not supported with COMPACT STORAGE, so this is fine
, cell(!schema.is_dense() ? pop_back(clustering) : (*(schema.regular_begin())).name()) // dense: cell name is not provided. It is the only regular column
, cdef(schema.get_column_definition(to_bytes(cell)))
, is_present(cdef && timestamp > cdef->dropped_at())
{
if (is_static) {
for (auto& e: clustering) {
if (e.size() != 0) {
throw malformed_sstable_exception("Static row has clustering key information. I didn't expect that!");
}
}
}
if (is_present && is_static != cdef->is_static()) {
throw malformed_sstable_exception(seastar::format("Mismatch between {} cell and {} column definition",
is_static ? "static" : "non-static", cdef->is_static() ? "static" : "non-static"));
}
}
};
private:
// Notes for collection mutation:
//
// While we could in theory generate the mutation for the elements as they
// appear, that would be costly. We would need to keep deserializing and
// serializing them, either explicitly or through a merge.
//
// The best way forward is to accumulate the collection data into a data
// structure, and later on serialize it fully when this (sstable) row ends.
class collection_mutation {
const column_definition *_cdef;
public:
collection_mutation_description cm;
// We need to get a copy of the prefix here, because the outer object may be short lived.
collection_mutation(const column_definition *cdef)
: _cdef(cdef) { }
collection_mutation() : _cdef(nullptr) {}
bool is_new_collection(const column_definition *c) const {
if (!_cdef || ((_cdef->id != c->id) || (_cdef->kind != c->kind))) {
return true;
}
return false;
};
void flush(const schema& s, mutation_fragment& mf) {
if (!_cdef) {
return;
}
auto ac = atomic_cell_or_collection::from_collection_mutation(cm.serialize(*_cdef->type));
if (_cdef->is_static()) {
mf.mutate_as_static_row(s, [&] (static_row& sr) mutable {
sr.set_cell(*_cdef, std::move(ac));
});
} else {
mf.mutate_as_clustering_row(s, [&] (clustering_row& cr) {
cr.set_cell(*_cdef, std::move(ac));
});
}
}
};
std::optional<collection_mutation> _pending_collection = {};
collection_mutation& pending_collection(const column_definition *cdef) {
assert(cdef->is_multi_cell() && "frozen set should behave like a cell\n");
if (!_pending_collection || _pending_collection->is_new_collection(cdef)) {
flush_pending_collection(*_schema);
_pending_collection = collection_mutation(cdef);
}
return *_pending_collection;
}
proceed push_ready_fragments_out_of_range() {
// Emit all range tombstones relevant to the current forwarding range first.
while (!_reader->is_buffer_full()) {
auto mfo = _range_tombstones.get_next(_fwd_end);
if (!mfo) {
if (!_reader->_partition_finished) {
_reader->on_out_of_clustering_range();
}
break;
}
_reader->push_mutation_fragment(std::move(*mfo));
}
return proceed::no;
}
proceed push_ready_fragments_with_ready_set() {
// We're merging two streams here, one is _range_tombstones
// and the other is the main fragment stream represented by
// _ready and _out_of_range (which means end of stream).
while (!_reader->is_buffer_full()) {
auto mfo = _range_tombstones.get_next(*_ready);
if (mfo) {
_reader->push_mutation_fragment(std::move(*mfo));
} else {
_reader->push_mutation_fragment(std::move(*_ready));
_ready = {};
return proceed(!_reader->is_buffer_full());
}
}
return proceed::no;
}
void update_pending_collection(const column_definition *cdef, bytes&& col, atomic_cell&& ac) {
pending_collection(cdef).cm.cells.emplace_back(std::move(col), std::move(ac));
}
void update_pending_collection(const column_definition *cdef, tombstone&& t) {
pending_collection(cdef).cm.tomb = std::move(t);
}
void flush_pending_collection(const schema& s) {
if (_pending_collection) {
_pending_collection->flush(s, *_in_progress);
_pending_collection = {};
}
}
// Assumes that this and the other advance_to() are called with monotonic positions.
// We rely on the fact that the first 'S' in SSTables stands for 'sorted'
// and the clustering row keys are always in an ascending order.
void advance_to(position_in_partition_view pos) {
position_in_partition::less_compare less(*_schema);
if (!less(pos, _fwd_end)) {
_out_of_range = true;
_skip_in_progress = false;
} else {
_skip_in_progress = !_ck_ranges_walker->advance_to(pos);
_out_of_range |= _ck_ranges_walker->out_of_range();
}
sstlog.trace("mp_row_consumer_k_l {}: advance_to({}) => out_of_range={}, skip_in_progress={}", fmt::ptr(this), pos, _out_of_range, _skip_in_progress);
}
// Assumes that this and other advance_to() overloads are called with monotonic positions.
void advance_to(const range_tombstone& rt) {
position_in_partition::less_compare less(*_schema);
auto&& start = rt.position();
auto&& end = rt.end_position();
if (!less(start, _fwd_end)) {
_out_of_range = true;
_skip_in_progress = false; // It may become in range after next forwarding, so cannot drop it
} else {
_skip_in_progress = !_ck_ranges_walker->advance_to(start, end);
_out_of_range |= _ck_ranges_walker->out_of_range();
}
sstlog.trace("mp_row_consumer_k_l {}: advance_to({}) => out_of_range={}, skip_in_progress={}", fmt::ptr(this), rt, _out_of_range, _skip_in_progress);
}
void advance_to(const mutation_fragment& mf) {
if (mf.is_range_tombstone()) {
advance_to(mf.as_range_tombstone());
} else {
advance_to(mf.position());
}
}
void set_up_ck_ranges(const partition_key& pk) {
sstlog.trace("mp_row_consumer_k_l {}: set_up_ck_ranges({})", fmt::ptr(this), pk);
_ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk);
_ck_ranges_walker.emplace(*_schema, _ck_ranges->ranges(), _schema->has_static_columns());
_last_lower_bound_counter = 0;
_fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows();
_out_of_range = false;
_range_tombstones.reset();
_ready = {};
_first_row_encountered = false;
}
public:
mutation_opt mut;
mp_row_consumer_k_l(mp_row_consumer_reader_k_l* reader,
const schema_ptr schema,
reader_permit permit,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: row_consumer(std::move(permit), std::move(trace_state), pc)
, _reader(reader)
, _schema(schema)
, _slice(slice)
, _fwd(fwd)
, _range_tombstones(*_schema, this->permit())
, _treat_non_compound_rt_as_compound(!sst->has_correct_non_compound_range_tombstones())
{ }
mp_row_consumer_k_l(mp_row_consumer_reader_k_l* reader,
const schema_ptr schema,
reader_permit permit,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: mp_row_consumer_k_l(reader, schema, std::move(permit), schema->full_slice(), pc, std::move(trace_state), fwd, sst) { }
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
if (!_is_mutation_end) {
return proceed::yes;
}
auto pk = partition_key::from_exploded(key.explode(*_schema));
setup_for_partition(pk);
auto dk = dht::decorate_key(*_schema, pk);
_reader->on_next_partition(std::move(dk), tombstone(deltime));
return proceed::yes;
}
void setup_for_partition(const partition_key& pk) {
_is_mutation_end = false;
_skip_in_progress = false;
set_up_ck_ranges(pk);
}
proceed flush() {
sstlog.trace("mp_row_consumer_k_l {}: flush(in_progress={}, ready={}, skip={})", fmt::ptr(this),
_in_progress ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_in_progress) : std::optional<mutation_fragment::printer>(),
_ready ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_ready) : std::optional<mutation_fragment::printer>(),
_skip_in_progress);
flush_pending_collection(*_schema);
// If _ready is already set we have a bug: get_mutation_fragment()
// was not called, and below we will lose one clustering row!
assert(!_ready);
if (!_skip_in_progress) {
_ready = std::exchange(_in_progress, { });
return push_ready_fragments_with_ready_set();
} else {
_in_progress = { };
_ready = { };
_skip_in_progress = false;
return proceed::yes;
}
}
proceed flush_if_needed(range_tombstone&& rt) {
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed(in_progress={}, ready={}, skip={})", fmt::ptr(this),
_in_progress ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_in_progress) : std::optional<mutation_fragment::printer>(),
_ready ? std::optional<mutation_fragment::printer>(std::in_place, *_schema, *_ready) : std::optional<mutation_fragment::printer>(),
_skip_in_progress);
proceed ret = proceed::yes;
if (_in_progress) {
ret = flush();
}
advance_to(rt);
auto rt_opt = _ck_ranges_walker->split_tombstone(rt, _range_tombstones);
if (rt_opt) {
_in_progress = mutation_fragment(*_schema, permit(), std::move(*rt_opt));
}
if (_out_of_range) {
ret = push_ready_fragments_out_of_range();
}
if (needs_skip()) {
ret = proceed::no;
}
return ret;
}
proceed flush_if_needed(bool is_static, position_in_partition&& pos) {
sstlog.trace("mp_row_consumer_k_l {}: flush_if_needed({})", fmt::ptr(this), pos);
// Part of workaround for #1203
_first_row_encountered = !is_static;
position_in_partition::equal_compare eq(*_schema);
proceed ret = proceed::yes;
if (_in_progress && !eq(_in_progress->position(), pos)) {
ret = flush();
}
if (!_in_progress) {
advance_to(pos);
if (is_static) {
_in_progress = mutation_fragment(*_schema, permit(), static_row());
} else {
_in_progress = mutation_fragment(*_schema, permit(), clustering_row(std::move(pos.key())));
}
if (_out_of_range) {
ret = push_ready_fragments_out_of_range();
}
if (needs_skip()) {
ret = proceed::no;
}
}
return ret;
}
proceed flush_if_needed(bool is_static, const std::vector<bytes_view>& ecp) {
auto pos = [&] {
if (is_static) {
return position_in_partition(position_in_partition::static_row_tag_t());
} else {
auto ck = clustering_key_prefix::from_exploded_view(ecp);
return position_in_partition(position_in_partition::clustering_row_tag_t(), std::move(ck));
}
}();
return flush_if_needed(is_static, std::move(pos));
}
proceed flush_if_needed(clustering_key_prefix&& ck) {
return flush_if_needed(false, position_in_partition(position_in_partition::clustering_row_tag_t(), std::move(ck)));
}
template<typename CreateCell>
//requires requires(CreateCell create_cell, column col) {
// { create_cell(col) } -> void;
//}
proceed do_consume_cell(bytes_view col_name, int64_t timestamp, int64_t ttl, int64_t expiration, CreateCell&& create_cell) {
struct column col(*_schema, col_name, timestamp);
auto ret = flush_if_needed(col.is_static, col.clustering);
if (_skip_in_progress) {
return ret;
}
if (col.cell.size() == 0) {
row_marker rm(timestamp, gc_clock::duration(ttl), gc_clock::time_point(gc_clock::duration(expiration)));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) {
cr.apply(std::move(rm));
});
return ret;
}
if (!col.is_present) {
return ret;
}
create_cell(std::move(col));
return ret;
}
virtual proceed consume_counter_cell(bytes_view col_name, fragmented_temporary_buffer::view value, int64_t timestamp) override {
return do_consume_cell(col_name, timestamp, 0, 0, [&] (auto&& col) {
auto ac = make_counter_cell(timestamp, value);
if (col.is_static) {
_in_progress->mutate_as_static_row(*_schema, [&] (static_row& sr) mutable {
sr.set_cell(*(col.cdef), std::move(ac));
});
} else {
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac)));
});
}
});
}
virtual proceed consume_cell(bytes_view col_name, fragmented_temporary_buffer::view value, int64_t timestamp, int64_t ttl, int64_t expiration) override {
return do_consume_cell(col_name, timestamp, ttl, expiration, [&] (auto&& col) {
bool is_multi_cell = col.collection_extra_data.size();
if (is_multi_cell != col.cdef->is_multi_cell()) {
return;
}
if (is_multi_cell) {
auto& value_type = visit(*col.cdef->type, make_visitor(
[] (const collection_type_impl& ctype) -> const abstract_type& { return *ctype.value_comparator(); },
[&] (const user_type_impl& utype) -> const abstract_type& {
if (col.collection_extra_data.size() != sizeof(int16_t)) {
throw malformed_sstable_exception(format("wrong size of field index while reading UDT column: expected {}, got {}",
sizeof(int16_t), col.collection_extra_data.size()));
}
auto field_idx = deserialize_field_index(col.collection_extra_data);
if (field_idx >= utype.size()) {
throw malformed_sstable_exception(format("field index too big while reading UDT column: type has {} fields, got {}",
utype.size(), field_idx));
}
return *utype.type(field_idx);
},
[] (const abstract_type& o) -> const abstract_type& {
throw malformed_sstable_exception(format("attempted to read multi-cell column, but expected type was {}", o.name()));
}
));
auto ac = make_atomic_cell(value_type,
api::timestamp_type(timestamp),
value,
gc_clock::duration(ttl),
gc_clock::time_point(gc_clock::duration(expiration)),
atomic_cell::collection_member::yes);
update_pending_collection(col.cdef, to_bytes(col.collection_extra_data), std::move(ac));
return;
}
auto ac = make_atomic_cell(*col.cdef->type,
api::timestamp_type(timestamp),
value,
gc_clock::duration(ttl),
gc_clock::time_point(gc_clock::duration(expiration)),
atomic_cell::collection_member::no);
if (col.is_static) {
_in_progress->mutate_as_static_row(*_schema, [&] (static_row& sr) mutable {
sr.set_cell(*(col.cdef), std::move(ac));
});
return;
}
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac)));
});
});
}
virtual proceed consume_deleted_cell(bytes_view col_name, sstables::deletion_time deltime) override {
auto timestamp = deltime.marked_for_delete_at;
struct column col(*_schema, col_name, timestamp);
gc_clock::duration secs(deltime.local_deletion_time);
return consume_deleted_cell(col, timestamp, gc_clock::time_point(secs));
}
proceed consume_deleted_cell(column &col, int64_t timestamp, gc_clock::time_point local_deletion_time) {
auto ret = flush_if_needed(col.is_static, col.clustering);
if (_skip_in_progress) {
return ret;
}
if (col.cell.size() == 0) {
row_marker rm(tombstone(timestamp, local_deletion_time));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(rm);
});
return ret;
}
if (!col.is_present) {
return ret;
}
auto ac = atomic_cell::make_dead(timestamp, local_deletion_time);
bool is_multi_cell = col.collection_extra_data.size();
if (is_multi_cell != col.cdef->is_multi_cell()) {
return ret;
}
if (is_multi_cell) {
update_pending_collection(col.cdef, to_bytes(col.collection_extra_data), std::move(ac));
} else if (col.is_static) {
_in_progress->mutate_as_static_row(*_schema, [&] (static_row& sr) {
sr.set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac)));
});
} else {
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac)));
});
}
return ret;
}
virtual proceed consume_row_end() override {
if (_in_progress) {
flush();
}
_is_mutation_end = true;
_out_of_range = true;
return proceed::no;
}
virtual proceed consume_shadowable_row_tombstone(bytes_view col_name, sstables::deletion_time deltime) override {
auto key = composite_view(column::fix_static_name(*_schema, col_name)).explode();
auto ck = clustering_key_prefix::from_exploded_view(key);
auto ret = flush_if_needed(std::move(ck));
if (!_skip_in_progress) {
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(shadowable_tombstone(tombstone(deltime)));
});
}
return ret;
}
static bound_kind start_marker_to_bound_kind(bytes_view component) {
auto found = composite::eoc(component.back());
switch (found) {
// start_col may have composite_marker::none in sstables
// from older versions of Cassandra (see CASSANDRA-7593).
case composite::eoc::none:
return bound_kind::incl_start;
case composite::eoc::start:
return bound_kind::incl_start;
case composite::eoc::end:
return bound_kind::excl_start;
}
throw malformed_sstable_exception(format("Unexpected start composite marker {:d}", uint16_t(uint8_t(found))));
}
static bound_kind end_marker_to_bound_kind(bytes_view component) {
auto found = composite::eoc(component.back());
switch (found) {
// start_col may have composite_marker::none in sstables
// from older versions of Cassandra (see CASSANDRA-7593).
case composite::eoc::none:
return bound_kind::incl_end;
case composite::eoc::start:
return bound_kind::excl_end;
case composite::eoc::end:
return bound_kind::incl_end;
}
throw malformed_sstable_exception(format("Unexpected end composite marker {:d}", uint16_t(uint8_t(found))));
}
virtual proceed consume_range_tombstone(
bytes_view start_col, bytes_view end_col,
sstables::deletion_time deltime) override {
auto compound = _schema->is_compound() || _treat_non_compound_rt_as_compound;
auto start = composite_view(column::fix_static_name(*_schema, start_col), compound).explode();
// Note how this is slightly different from the check in is_collection. Collection tombstones
// do not have extra data.
//
// Still, it is enough to check if we're dealing with a collection, since any other tombstone
// won't have a full clustering prefix (otherwise it isn't a range)
if (start.size() <= _schema->clustering_key_size()) {
auto start_ck = clustering_key_prefix::from_exploded_view(start);
auto start_kind = compound ? start_marker_to_bound_kind(start_col) : bound_kind::incl_start;
auto end = clustering_key_prefix::from_exploded_view(composite_view(column::fix_static_name(*_schema, end_col), compound).explode());
auto end_kind = compound ? end_marker_to_bound_kind(end_col) : bound_kind::incl_end;
if (range_tombstone::is_single_clustering_row_tombstone(*_schema, start_ck, start_kind, end, end_kind)) {
auto ret = flush_if_needed(std::move(start_ck));
if (!_skip_in_progress) {
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(tombstone(deltime));
});
}
return ret;
} else {
auto rt = range_tombstone(std::move(start_ck), start_kind, std::move(end), end_kind, tombstone(deltime));
position_in_partition::less_compare less(*_schema);
auto rt_pos = rt.position();
if (_in_progress && !less(_in_progress->position(), rt_pos)) {
return proceed::yes; // repeated tombstone, ignore
}
// Workaround for #1203
if (!_first_row_encountered) {
if (auto rt_opt = _ck_ranges_walker->split_tombstone(std::move(rt), _range_tombstones)) {
_range_tombstones.apply(std::move(*rt_opt));
}
return proceed::yes;
}
return flush_if_needed(std::move(rt));
}
} else {
auto&& column = pop_back(start);
auto cdef = _schema->get_column_definition(to_bytes(column));
if (cdef && cdef->is_multi_cell() && deltime.marked_for_delete_at > cdef->dropped_at()) {
auto ret = flush_if_needed(cdef->is_static(), start);
if (!_skip_in_progress) {
update_pending_collection(cdef, tombstone(deltime));
}
return ret;
}
}
return proceed::yes;
}
// Returns true if the consumer is positioned at partition boundary,
// meaning that after next read partition_start will be emitted
// or end of stream was reached.
bool is_mutation_end() const {
return _is_mutation_end;
}
bool is_out_of_range() const {
return _out_of_range;
}
// See the RowConsumer concept
void push_ready_fragments() {
if (_ready) {
if (push_ready_fragments_with_ready_set() == proceed::no) {
return;
}
}
if (_out_of_range) {
push_ready_fragments_out_of_range();
}
}
virtual void reset(indexable_element el) override {
sstlog.trace("mp_row_consumer_k_l {}: reset({})", fmt::ptr(this), static_cast<int>(el));
_ready = {};
if (el == indexable_element::partition) {
_pending_collection = {};
_in_progress = {};
_is_mutation_end = true;
_out_of_range = true;
} else {
// Do not reset _in_progress so that out-of-order tombstone detection works.
_is_mutation_end = false;
}
}
virtual position_in_partition_view position() override {
if (_in_progress) {
return _in_progress->position();
}
if (_ready) {
return _ready->position();
}
if (_is_mutation_end) {
return position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{});
}
return position_in_partition_view(position_in_partition_view::partition_start_tag_t{});
}
// Changes current fragment range.
//
// When there are no more fragments for current range,
// is_out_of_range() will return true.
//
// The new range must not overlap with the previous range and
// must be after it.
//
std::optional<position_in_partition_view> fast_forward_to(position_range r, db::timeout_clock::time_point timeout) {
sstlog.trace("mp_row_consumer_k_l {}: fast_forward_to({})", fmt::ptr(this), r);
_out_of_range = _is_mutation_end;
_fwd_end = std::move(r).end();
// range_tombstone::trim() requires !is_clustering_row().
if (r.start().is_clustering_row()) {
r.set_start(position_in_partition::before_key(r.start().key()));
}
if (r.end().is_clustering_row()) {
r.set_end(position_in_partition::before_key(r.end().key()));
}
_range_tombstones.forward_to(r.start());
_ck_ranges_walker->trim_front(std::move(r).start());
if (_ck_ranges_walker->out_of_range()) {
_out_of_range = true;
_ready = {};
sstlog.trace("mp_row_consumer_k_l {}: no more ranges", fmt::ptr(this));
return { };
}
auto start = _ck_ranges_walker->lower_bound();
if (_ready && !_ready->relevant_for_range(*_schema, start)) {
_ready = {};
}
if (_in_progress) {
advance_to(*_in_progress);
if (!_skip_in_progress) {
sstlog.trace("mp_row_consumer_k_l {}: _in_progress in range", fmt::ptr(this));
return { };
}
}
if (_out_of_range) {
sstlog.trace("mp_row_consumer_k_l {}: _out_of_range=true", fmt::ptr(this));
return { };
}
position_in_partition::less_compare less(*_schema);
if (!less(start, _fwd_end)) {
_out_of_range = true;
sstlog.trace("mp_row_consumer_k_l {}: no overlap with restrictions", fmt::ptr(this));
return { };
}
sstlog.trace("mp_row_consumer_k_l {}: advance_context({})", fmt::ptr(this), start);
_last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter();
return start;
}
bool needs_skip() const {
return (_skip_in_progress || !_in_progress)
&& _last_lower_bound_counter != _ck_ranges_walker->lower_bound_change_counter();
}
// Tries to fast forward the consuming context to the next position.
// Must be called outside consuming context.
std::optional<position_in_partition_view> maybe_skip() {
if (!needs_skip()) {
return { };
}
_last_lower_bound_counter = _ck_ranges_walker->lower_bound_change_counter();
sstlog.trace("mp_row_consumer_k_l {}: advance_context({})", fmt::ptr(this), _ck_ranges_walker->lower_bound());
return _ck_ranges_walker->lower_bound();
}
};
class sstable_mutation_reader : public mp_row_consumer_reader_k_l {
using DataConsumeRowsContext = kl::data_consume_rows_context;
using Consumer = mp_row_consumer_k_l;
static_assert(RowConsumer<Consumer>);
Consumer _consumer;
bool _will_likely_slice = false;
bool _read_enabled = true;
std::unique_ptr<DataConsumeRowsContext> _context;
std::unique_ptr<index_reader> _index_reader;
// We avoid unnecessary lookup for single partition reads thanks to this flag
bool _single_partition_read = false;
const dht::partition_range& _pr;
const query::partition_slice& _slice;
streamed_mutation::forwarding _fwd;
mutation_reader::forwarding _fwd_mr;
read_monitor& _monitor;
public:
sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& mon)
: mp_row_consumer_reader_k_l(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
// FIXME: I want to add `&& fwd_mr == mutation_reader::forwarding::no` below
// but can't because many call sites use the default value for
// `mutation_reader::forwarding` which is `yes`.
, _single_partition_read(pr.is_singular())
, _pr(pr)
, _slice(slice)
, _fwd(fwd)
, _fwd_mr(fwd_mr)
, _monitor(mon) { }
// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
sstable_mutation_reader(sstable_mutation_reader&&) = delete;
sstable_mutation_reader(const sstable_mutation_reader&) = delete;
~sstable_mutation_reader() {
if (_context || _index_reader) {
sstlog.warn("sstable_mutation_reader was not closed. Closing in the background. Backtrace: {}", current_backtrace());
// FIXME: discarded future.
(void)close();
}
}
private:
static bool will_likely_slice(const query::partition_slice& slice) {
return (!slice.default_row_ranges().empty() && !slice.default_row_ranges()[0].is_full())
|| slice.get_specific_ranges();
}
index_reader& get_index_reader() {
if (!_index_reader) {
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(), _consumer.trace_state());
}
return *_index_reader;
}
future<> advance_to_next_partition() {
sstlog.trace("reader {}: advance_to_next_partition()", fmt::ptr(this));
_before_partition = true;
auto& consumer = _consumer;
if (consumer.is_mutation_end()) {
sstlog.trace("reader {}: already at partition boundary", fmt::ptr(this));
_index_in_current_partition = false;
return make_ready_future<>();
}
return (_index_in_current_partition
? _index_reader->advance_to_next_partition()
: get_index_reader().advance_to(dht::ring_position_view::for_after_key(*_current_partition_key))).then([this] {
_index_in_current_partition = true;
auto [start, end] = _index_reader->data_file_positions();
if (end && start > *end) {
_read_enabled = false;
return make_ready_future<>();
}
assert(_index_reader->element_kind() == indexable_element::partition);
return skip_to(_index_reader->element_kind(), start).then([this] {
_sst->get_stats().on_partition_seek();
});
});
}
future<> read_from_index() {
sstlog.trace("reader {}: read from index", fmt::ptr(this));
auto tomb = _index_reader->partition_tombstone();
if (!tomb) {
sstlog.trace("reader {}: no tombstone", fmt::ptr(this));
return read_from_datafile();
}
auto pk = _index_reader->get_partition_key();
auto key = dht::decorate_key(*_schema, std::move(pk));
_consumer.setup_for_partition(key.key());
on_next_partition(std::move(key), tombstone(*tomb));
return make_ready_future<>();
}
future<> read_from_datafile() {
sstlog.trace("reader {}: read from data file", fmt::ptr(this));
return _context->consume_input();
}
// Assumes that we're currently positioned at partition boundary.
future<> read_partition() {
sstlog.trace("reader {}: reading partition", fmt::ptr(this));
_end_of_stream = true; // on_next_partition() will set it to true
if (!_read_enabled) {
sstlog.trace("reader {}: eof", fmt::ptr(this));
return make_ready_future<>();
}
if (!_consumer.is_mutation_end()) {
throw malformed_sstable_exception(format("consumer not at partition boundary, position: {}",
position_in_partition_view::printer(*_schema, _consumer.position())), _sst->get_filename());
}
// It's better to obtain partition information from the index if we already have it.
// We can save on IO if the user will skip past the front of partition immediately.
//
// It is also better to pay the cost of reading the index if we know that we will
// need to use the index anyway soon.
//
if (_index_in_current_partition) {
if (_context->eof()) {
sstlog.trace("reader {}: eof", fmt::ptr(this));
return make_ready_future<>();
}
if (_index_reader->partition_data_ready()) {
return read_from_index();
}
if (_will_likely_slice) {
return _index_reader->read_partition_data().then([this] {
return read_from_index();
});
}
}
// FIXME: advance index to current partition if _will_likely_slice
return read_from_datafile();
}
// Can be called from any position.
future<> read_next_partition() {
sstlog.trace("reader {}: read next partition", fmt::ptr(this));
// If next partition exists then on_next_partition will be called
// and _end_of_stream will be set to false again.
_end_of_stream = true;
if (!_read_enabled || _single_partition_read) {
sstlog.trace("reader {}: eof", fmt::ptr(this));
return make_ready_future<>();
}
return advance_to_next_partition().then([this] {
return read_partition();
});
}
future<> advance_context(std::optional<position_in_partition_view> pos) {
if (!pos || pos->is_before_all_fragments(*_schema)) {
return make_ready_future<>();
}
assert (_current_partition_key);
return [this] {
if (!_index_in_current_partition) {
_index_in_current_partition = true;
return get_index_reader().advance_to(*_current_partition_key);
}
return make_ready_future();
}().then([this, pos] {
return get_index_reader().advance_to(*pos).then([this] {
index_reader& idx = *_index_reader;
auto index_position = idx.data_file_positions();
if (index_position.start <= _context->position()) {
return make_ready_future<>();
}
return skip_to(idx.element_kind(), index_position.start).then([this, &idx] {
_sst->get_stats().on_partition_seek();
});
});
});
}
bool is_initialized() const {
return bool(_context);
}
future<> initialize() {
if (_single_partition_read) {
_sst->get_stats().on_single_partition_read();
const auto& key = dht::ring_position_view(_pr.start()->value());
position_in_partition_view pos = get_slice_upper_bound(*_schema, _slice, key);
const auto present = co_await get_index_reader().advance_lower_and_check_if_present(key, pos);
if (!present) {
_sst->get_filter_tracker().add_false_positive();
co_return;
}
_sst->get_filter_tracker().add_true_positive();
} else {
_sst->get_stats().on_range_partition_read();
co_await get_index_reader().advance_to(_pr);
}
auto [begin, end] = _index_reader->data_file_positions();
assert(end);
if (_single_partition_read) {
_read_enabled = (begin != *end);
_context = data_consume_single_partition<DataConsumeRowsContext>(*_schema, _sst, _consumer, { begin, *end });
} else {
sstable::disk_read_range drr{begin, *end};
auto last_end = _fwd_mr ? _sst->data_size() : drr.end;
_read_enabled = bool(drr);
_context = data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, std::move(drr), last_end);
}
_monitor.on_read_started(_context->reader_position());
_index_in_current_partition = true;
_will_likely_slice = will_likely_slice(_slice);
}
future<> ensure_initialized() {
if (is_initialized()) {
return make_ready_future<>();
}
return initialize();
}
future<> skip_to(indexable_element el, uint64_t begin) {
sstlog.trace("sstable_reader: {}: skip_to({} -> {}, el={})", fmt::ptr(_context.get()), _context->position(), begin, static_cast<int>(el));
if (begin <= _context->position()) {
return make_ready_future<>();
}
_context->reset(el);
return _context->skip_to(begin);
}
public:
void on_out_of_clustering_range() override {
if (_fwd == streamed_mutation::forwarding::yes) {
_end_of_stream = true;
} else {
this->push_mutation_fragment(mutation_fragment(*_schema, _permit, partition_end()));
_partition_finished = true;
}
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
return ensure_initialized().then([this, &pr] {
if (!is_initialized()) {
_end_of_stream = true;
return make_ready_future<>();
} else {
clear_buffer();
_partition_finished = true;
_before_partition = true;
_end_of_stream = false;
assert(_index_reader);
auto f1 = _index_reader->advance_to(pr);
return f1.then([this] {
auto [start, end] = _index_reader->data_file_positions();
assert(end);
if (start != *end) {
_read_enabled = true;
_index_in_current_partition = true;
_context->reset(indexable_element::partition);
return _context->fast_forward_to(start, *end);
}
_index_in_current_partition = false;
_read_enabled = false;
return make_ready_future<>();
});
}
});
}
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
if (_end_of_stream) {
return make_ready_future<>();
}
if (!is_initialized()) {
return initialize().then([this, timeout] {
if (!is_initialized()) {
_end_of_stream = true;
return make_ready_future<>();
} else {
return fill_buffer(timeout);
}
});
}
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
if (_partition_finished) {
if (_before_partition) {
return read_partition();
} else {
return read_next_partition();
}
} else {
return do_until([this] { return is_buffer_full() || _partition_finished || _end_of_stream; }, [this] {
_consumer.push_ready_fragments();
if (is_buffer_full() || _partition_finished || _end_of_stream) {
return make_ready_future<>();
}
return advance_context(_consumer.maybe_skip()).then([this] {
return _context->consume_input();
});
});
}
}).then_wrapped([this] (future<> f) {
try {
f.get();
} catch(sstables::malformed_sstable_exception& e) {
throw sstables::malformed_sstable_exception(format("Failed to read partition from SSTable {} due to {}", _sst->get_filename(), e.what()));
}
});
}
virtual future<> next_partition() override {
if (is_initialized()) {
if (_fwd == streamed_mutation::forwarding::yes) {
clear_buffer();
_partition_finished = true;
_end_of_stream = false;
} else {
clear_buffer_to_next_partition();
if (!_partition_finished && is_buffer_empty()) {
_partition_finished = true;
}
}
}
return make_ready_future<>();
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
}
virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override {
forward_buffer_to(cr.start());
if (!_partition_finished) {
_end_of_stream = false;
return advance_context(_consumer.fast_forward_to(std::move(cr), timeout));
} else {
_end_of_stream = true;
return make_ready_future<>();
}
}
virtual future<> close() noexcept override {
auto close_context = make_ready_future<>();
if (_context) {
_monitor.on_read_completed();
// move _context to prevent double-close from destructor.
close_context = _context->close().finally([_ = std::move(_context)] {});
}
auto close_index_reader = make_ready_future<>();
if (_index_reader) {
// move _index_reader to prevent double-close from destructor.
close_index_reader = _index_reader->close().finally([_ = std::move(_index_reader)] {});
}
return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([] (std::exception_ptr ep) {
// close can not fail as it is called either from the destructor or from flat_mutation_reader::close
sstlog.warn("Failed closing of sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
});
}
};
flat_mutation_reader make_reader(
shared_sstable sstable,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& monitor) {
return make_flat_mutation_reader<sstable_mutation_reader>(
std::move(sstable), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, monitor);
}
} // namespace kl
} // namespace sstables