Compare commits

...

28 Commits

Author SHA1 Message Date
Avi Kivity
b252bba4a2 Merge "mvcc: Fix incorrect schema version being used to copy the mutation when applying (#5099)" from Tomasz
"
Currently affects only counter tables.

Introduced in 27014a2.

mutation_partition(s, mp) is incorrect because it uses s to interpret
mp, while it should use mp_schema.

We may hit this if the current node has a newer schema than the
incoming mutation. This can happen during table schema altering when we receive the
mutation from a node which hasn't processed the schema change yet.

This is undefined behavior in general. If the alter was adding or
removing columns, this may result in corruption of the write where
values of one column are inserted into a different column.

Fixes #5095.
"

* 'fix-schema-alter-counter-tables' of https://github.com/tgrabiec/scylla:
  mvcc: Fix incorrect schema verison being used to copy the mutation when applying
  mutation_partition: Track and validate schema version in debug builds
  tests: Use the correct schema to access mutation_partition

(cherry picked from commit 83bc59a89f)
2019-09-28 19:48:49 +03:00
Botond Dénes
a0b9fcc041 cache_flat_mutation_reader: read_from_underlying(): propagate timeout
Propagate the timeout to `consume_mutation_fragments_until()` and hence
to the underlying reader, to ensure queued sstable reads that belong
to timed-out requests are dropped from the queue, instead of
pointlessly serving them.

consume_mutation_fragments_until() received a `timeout` parameter as it
didn't have one.

Fixes: #1068
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20190906135629.67342-1-bdenes@scylladb.com>
2019-09-06 16:05:42 +02:00
Paweł Dziepak
35c9b675c1 mutation_partition: verify row::append_cell() precondition
row::append_cell() has a precondition that the new cell column id needs
to be larger than that of any other already existing cell. If this
precondition is violated the row will end up in an invalid state. This
patch adds assertion to make sure we fail early in such cases.

(cherry picked from commit 060e3f8ac2)
2019-08-23 15:06:35 +02:00
Jenkins
d71836fef7 release: prepare for 2.3.6 by hagitsegev 2019-08-17 13:12:40 +03:00
Tomasz Grabiec
f8e150e97c Merge "Fix the system.size_estimates table" from Kamil
Fixes a segfault when querying for an empty keyspace.

Also, fixes an infinite loop on smp > 1. Queries to
system.size_estimates table which are not single-partition queries
caused Scylla to go into an infinite loop inside
multishard_combining_reader::fill_buffer. This happened because
multishard_combinind_reader assumes that shards return rows belonging
to separate partitions, which was not the case for
size_estimates_mutation_reader.

Fixes #4689
2019-08-14 15:33:33 +02:00
Kamil Braun
10c300f894 Fix infinite looping when performing a range query on system.size_estimates.
Queries to system.size_estimates table which are not single parition queries
caused Scylla to go into an infinite loop inside multishard_combining_reader::fill_buffer.
This happened because multishard_combinind_reader assumes that shards return rows belonging
to separate partitions, which was not the case for size_estimates_mutation_reader.
This commit fixes the issue and closes #4689.
2019-08-14 13:11:56 +02:00
Kamil Braun
de1d3e5c6b Fix segmentation fault when querying system.size_estimates for an empty keyspace. 2019-08-14 13:11:56 +02:00
Kamil Braun
69810c13ca Refactor size_estimates_virtual_reader
Move the implementation of size_estimates_mutation_reader
to a separate compilation unit to speed up compilation times
and increase readability.

Refactor tests to use seastar::thread.
2019-08-14 13:11:54 +02:00
Raphael S. Carvalho
9b025a5742 table: do not rely on undefined behavior in cleanup_sstables
It shouldn't rely on argument evaluation order, which is ub.

Fixes #4718.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry-picked from commit 0e732ed1cf)
2019-08-07 22:01:11 +03:00
Piotr Jastrzębski
74eebc4cab sstables: ka/la: reader: Make sure push_ready_fragments() does not miss to emit partition_end (#4790)
Currently, if there is a fragment in _ready and _out_of_range was set
after row end was consumer, push_ready_fragments() would return
without emitting partition_end.

This is problematic once we make consume_row_start() emit
partiton_start directly, because we will want to assume that all
fragments for the previous partition are emitted by then. If they're
not, then we'd emit partition_start before partition_end for the
previous partition. The fix is to make sure that
push_ready_fragments() emits everything.

Fixes #4786

(cherry picked from commit 9b8ac5ecbc)
Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
2019-08-05 10:17:42 +03:00
Piotr Sarna
9b2ca4ee44 main: stop view builder conditionally
The view builder is started only if it's enabled in config,
via the view_building=true variable. Unfortunately, stopping
the builder was unconditional, which may result in failed
assertions during shutdown. To remedy this, view building
is stopped only if it was previously started.

Fixes #4589

(cherry picked from commit efa7951ea5)
2019-06-26 11:06:02 +03:00
Benny Halevy
773bf45774 time_window_backlog_tracker: fix use after free
Fixes #4465

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20190430094209.13958-1-bhalevy@scylladb.com>
(cherry picked from commit 3a2fa82d6e)
2019-05-06 09:38:49 +03:00
Asias He
c6705b4335 streaming: Get rid of the keep alive timer in streaming
There is no guarantee that rpc streaming makes progress in some time
period. Remove the keep alive timer in streaming to avoid killing the
session when the rpc streaming is just slow.

The keep alive timer is used to close the session in the following case:

n2 (the rpc streaming sender) streams to n1 (the rpc streaming receiver)
kill -9 n2

We need this because we do not kill the session when gossip think a node
is down, because we think the node down might only be temporary
and it is a waste to drop the previous work that has done especially
when the stream session takes long time.

Since in range_streamer, we do not stream all data in a single stream
session, we stream 10% of the data per time, and we have retry logic.
I think it is fine to kill a stream session when gossip thinks a node is
down. This patch changes to close all stream session with the node that
gossip think it is down.
Message-Id: <bdbb9486a533eee25fcaf4a23a946629ba946537.1551773823.git.asias@scylladb.com>

(cherry picked from commit b8158dd65d)
Message-Id: <4ebc544c85261873591fd5ac30043e693d74434a.1555466551.git.asias@scylladb.com>
2019-04-17 17:40:08 +03:00
Tomasz Grabiec
3997871b4d lsa: Cover more bad_alloc cases with abort
When --abort-on-lsa-bad-alloc is enabled we want to abort whenever
we think we can be out of memory.

We covered failures due to bad_alloc thrown from inside of the
allocation section, but did not cover failures from reservations done
at the beginning of with_reserve(). Fix by moving the trap into
reserve().

Message-Id: <1553258915-27929-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 3356a085d2)
2019-04-17 15:21:35 +02:00
Tomasz Grabiec
4ff1d731bd lsa: Fix spurios abort with --enable-abort-on-lsa-bad-alloc
allocate_segment() can fail even though we're not out of memory, when
it's invoked inside an allocating section with the cache region
locked. That section may later succeed after retried after memory
reclamation.

We should ignore bad_alloc thrown inside allocating section body and
fail only when the whole section fails.

Fixes #2924

Message-Id: <1550597493-22500-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit dafe22dd83)
2019-04-17 15:20:21 +02:00
Jenkins
0e0f9143c9 release: prepare for 2.3.5 by hagitsegev 2019-04-17 11:47:53 +03:00
Raphael S. Carvalho
9d809d6ea4 database: fix 2x increase in disk usage during cleanup compaction
Don't hold reference to sstables cleaned up, so that file descriptors
for their index and data files will be closed and consequently disk
space released.

Fixes #3735.

Backport note:
To reduce risk considerably, we'll not backport a mechanism to release
sstable introduced in incremental compaction work.
Instead, only one sstable is passed to table::cleanup_sstables() at a
time (it won't affect performance because the operation is serialized
anyway), to make it easy to release reference to cleaned sstable held
by compaction manager.

tests: release mode.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20180914194047.26288-1-raphaelsc@scylladb.com>
(cherry picked from commit 5bc028f78b)
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20190416025801.15048-1-raphaelsc@scylladb.com>
2019-04-16 17:41:30 +03:00
Tomasz Grabiec
630d599c34 schema_tables: Serialize schema merges fairly
All schema changes made to the node locally are serialized on a
semaphore which lives on shard 0. For historical reasons, they don't
queue but rather try to take the lock without blocking and retry on
failure with a random delay from the range [0, 100 us]. Contenders
which do not originate on shard 0 will have an extra disadvantage as
each lock attempt will be longer by the across-shard round trip
latency. If there is constant contention on shard 0, contenders
originating from other shards may keep loosing to take the lock.

Schema merge executed on behalf of a DDL statement may originate on
any shard. Same for the schema merge which is coming from a push
notification. Schema merge executed as part of the background schema
pull will originate on shard 0 only, where the application state
change listeners run. So if there are constant schema pulls, DDL
statements may take a long time to get through.

The fix is to serialize merge requests fairly, by using the blocking
semaphore::wait(), which is fair.

We don't have to back-off any more, since submit_to() no longer has a
global concurrency limit.

Fixes #4436.

Message-Id: <1555349915-27703-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 3fd82021b1)
2019-04-16 10:21:10 +03:00
Takuya ASADA
0933c1a00a dist/docker/redhat: switch to python36
Since EPEL switched python3 default version to 3.6, we need to follow
the change.

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <20190409094139.4797-2-syuu@scylladb.com>
(cherry picked from commit d527ef19f7)
2019-04-14 21:17:57 +03:00
Takuya ASADA
7a7099fcfb dist/ami: switch to python36
Since EPEL switched python3 default version to 3.6, we need to follow
the change.

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <20190409094139.4797-1-syuu@scylladb.com>
2019-04-14 21:17:03 +03:00
Avi Kivity
50235aacb4 compaction: fix use-after-free when calculating backlog after schema change
The problem happens after a schema change because we fail to properly
remove ongoing compaction, which stopped being tracked, from list that
is used to calculate backlog, so it may happen that a compaction read
monitor (ceases to exist after compaction ends) is used after freed.

Fixes #4410.
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20190409024936.23775-1-raphaelsc@scylladb.com>
(cherry-picked from commit 8a117c338a)
2019-04-14 14:00:36 +03:00
Takuya ASADA
e888009f12 dist/redhat: switch to python36
Since EPEL switched python3 default version to 3.6, we need to follow
the change.

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <20190408160934.32701-1-syuu@scylladb.com>
(cherry picked from commit 6e51a95668)
2019-04-09 09:08:28 +03:00
Piotr Sarna
a19615ee9b types: fix varint and decimal serialization
Varint and decimal types serialization did not update the output
iterator after generating a value, which may lead to corrupted
sstables - variable-length integers were properly serialized,
but if anything followed them directly in the buffer (e.g. in a tuple),
their value will be overwritten.

Fixes #4348

Tests: unit (dev)
dtest: json_test.FromJsonUpdateTests.complex_data_types_test
       json_test.FromJsonInsertTests.complex_data_types_test
       json_test.ToJsonSelectTests.complex_data_types_test

Note that dtests still do not succeed 100% due to formatting differences
in compared results (e.g. 1.0e+07 vs 1.0E7, but it's no longer a query
correctness issue.

(cherry picked from commit 287a02dc05)
2019-03-26 16:38:58 +02:00
Tomasz Grabiec
357ca67fda row_cache: Fix abort in cache populating read concurrent with memtable flush
When we're populating a partition range and the population range ends
with a partition key (not a token) which is present in sstables and
there was a concurrent memtable flush, we would abort on the following
assert in cache::autoupdating_underlying_reader:

     utils::phased_barrier::phase_type creation_phase() const {
         assert(_reader);
         return _reader_creation_phase;
     }

That's because autoupdating_underlying_reader::move_to_next_partition()
clears the _reader field when it tries to recreate a reader but it finds
the new range to be empty:

         if (!_reader || _reader_creation_phase != phase) {
            if (_last_key) {
                auto cmp = dht::ring_position_comparator(*_cache._schema);
                auto&& new_range = _range.split_after(*_last_key, cmp);
                if (!new_range) {
                    _reader = {};
                    return make_ready_future<mutation_fragment_opt>();
                }

Fix by not asserting on _reader. creation_phase() will now be
meaningful even after we clear the _reader. The meaning of
creation_phase() is now "the phase in which the reader was last
created or 0", which makes it valid in more cases than before.

If the reader was never created we will return 0, which is smaller
than any phase returned by cache::phase_of(), since cache starts from
phase 1. This shouldn't affect current behavior, since we'd abort() if
called for this case, it just makes the value more appropriate for the
new semantics.

Tests:

  - unit.row_cache_test (debug)

Fixes #4236
Message-Id: <1553107389-16214-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 69775c5721)
2019-03-22 09:34:15 -03:00
Jenkins
7818c63eb1 release: prepare for 2.3.4 by hagitsegev 2019-03-18 12:44:10 +02:00
Eliran Sinvani
da10eae18c cql3 : fix a crash upon preparing select with an IN restriction due to memory violation
When preparing a select query with a multicolumn in restriction, the
node crashed due to using a parameter after using a move on it.

Tests:
1. UnitTests (release)
2. Preparing a select statement that crashed the system before,
and verify it is not crashing.

Fixes #3204
Fixes #3692

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>
Message-Id: <7ebd210cd714a460ee5557ac612da970cee03270.1537947897.git.eliransin@scylladb.com>
(cherry picked from commit 22ad5434d1)
2019-03-07 10:10:03 +00:00
Tomasz Grabiec
d5292cd3ec sstable/compaction: Use correct schema in the writing consumer
Introduced in 2a437ab427.

regular_compaction::select_sstable_writer() creates the sstable writer
when the first partition is consumed from the combined mutation
fragment stream. It gets the schema directly from the table
object. That may be a different schema than the one used by the
readers if there was a concurrent schema alter duringthat small time
window. As a result, the writing consumer attached to readers will
interpret fragments using the wrong version of the schema.

One effect of this is storing values of some columns under a different
column.

This patch replaces all column_family::schema() accesses with accesses
to the _schema memeber which is obtained once per compaction and is
the same schema which readers use.

Fixes #4304.

Tests:

  - manual tests with hard-coded schema change injection to reproduce the bug
  - build/dev/scylla boot
  - tests/sstable_mutation_test

Message-Id: <1551698056-23386-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 58e7ad20eb)
2019-03-05 15:06:28 +02:00
Avi Kivity
9cb35361d9 Update seastar submodule
* seastar 10ac122...efda428 (1):
  > net: fix tcp load balancer accounting leak while moving socket to other shard

Fixes #4269.
2019-03-05 15:06:07 +02:00
34 changed files with 788 additions and 579 deletions

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=2.3.3
VERSION=2.3.6
if test -f version
then

View File

@@ -364,7 +364,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
}
});
return make_ready_future<>();
});
}, timeout);
}
inline

View File

@@ -516,6 +516,7 @@ scylla_core = (['database.cc',
'db/consistency_level.cc',
'db/system_keyspace.cc',
'db/system_distributed_keyspace.cc',
'db/size_estimates_virtual_reader.cc',
'db/schema_tables.cc',
'db/cql_type_parser.cc',
'db/legacy_schema_migrator.cc',

View File

@@ -405,7 +405,7 @@ public:
in_marker(int32_t bind_index, ::shared_ptr<column_specification> receiver)
: abstract_marker(bind_index, std::move(receiver))
{
assert(dynamic_pointer_cast<const list_type_impl>(receiver->type));
assert(dynamic_pointer_cast<const list_type_impl>(_receiver->type));
}
virtual shared_ptr<terminal> bind(const query_options& options) override {

View File

@@ -1480,7 +1480,10 @@ future<> table::cleanup_sstables(sstables::compaction_descriptor descriptor) {
static thread_local semaphore sem(1);
return with_semaphore(sem, 1, [this, &sst] {
return this->compact_sstables(sstables::compaction_descriptor({ sst }, sst->get_sstable_level()), true);
// release reference to sstables cleaned up, otherwise space usage from their data and index
// components cannot be reclaimed until all of them are cleaned.
auto sstable_level = sst->get_sstable_level();
return this->compact_sstables(sstables::compaction_descriptor({ std::move(sst) }, sstable_level), true);
});
});
});

View File

@@ -686,33 +686,7 @@ read_keyspace_mutation(distributed<service::storage_proxy>& proxy, const sstring
static semaphore the_merge_lock {1};
future<> merge_lock() {
// ref: #1088
// to avoid deadlocks, we don't want long-standing calls to the shard 0
// as they can cause a deadlock:
//
// fiber1 fiber2
// merge_lock() (succeeds)
// merge_lock() (waits)
// invoke_on_all() (waits on merge_lock to relinquish smp::submit_to slot)
//
// so we issue the lock calls with a timeout; the slot will be relinquished, and invoke_on_all()
// can complete
return repeat([] () mutable {
return smp::submit_to(0, [] {
return the_merge_lock.try_wait();
}).then([] (bool result) {
if (result) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
} else {
static thread_local auto rand_engine = std::default_random_engine();
auto dist = std::uniform_int_distribution<int>(0, 100);
auto to = std::chrono::microseconds(dist(rand_engine));
return sleep(to).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
}
});
});
return smp::submit_to(0, [] { return the_merge_lock.wait(); });
}
future<> merge_unlock() {

View File

@@ -0,0 +1,329 @@
/*
* Copyright (C) 2019 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/find_if.hpp>
#include "clustering_bounds_comparator.hh"
#include "database.hh"
#include "db/system_keyspace.hh"
#include "dht/i_partitioner.hh"
#include "partition_range_compat.hh"
#include "range.hh"
#include "service/storage_service.hh"
#include "stdx.hh"
#include "mutation_fragment.hh"
#include "sstables/sstables.hh"
#include "db/timeout_clock.hh"
#include "database.hh"
#include "db/size_estimates_virtual_reader.hh"
namespace db {
namespace size_estimates {
struct virtual_row {
const bytes& cf_name;
const token_range& tokens;
clustering_key_prefix as_key() const {
return clustering_key_prefix::from_exploded(std::vector<bytes_view>{cf_name, tokens.start, tokens.end});
}
};
struct virtual_row_comparator {
schema_ptr _schema;
virtual_row_comparator(schema_ptr schema) : _schema(schema) { }
bool operator()(const clustering_key_prefix& key1, const clustering_key_prefix& key2) {
return clustering_key_prefix::prefix_equality_less_compare(*_schema)(key1, key2);
}
bool operator()(const virtual_row& row, const clustering_key_prefix& key) {
return operator()(row.as_key(), key);
}
bool operator()(const clustering_key_prefix& key, const virtual_row& row) {
return operator()(key, row.as_key());
}
};
// Iterating over the cartesian product of cf_names and token_ranges.
class virtual_row_iterator : public std::iterator<std::input_iterator_tag, const virtual_row> {
std::reference_wrapper<const std::vector<bytes>> _cf_names;
std::reference_wrapper<const std::vector<token_range>> _ranges;
size_t _cf_names_idx = 0;
size_t _ranges_idx = 0;
public:
struct end_iterator_tag {};
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges)
: _cf_names(std::ref(cf_names))
, _ranges(std::ref(ranges))
{ }
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges, end_iterator_tag)
: _cf_names(std::ref(cf_names))
, _ranges(std::ref(ranges))
, _cf_names_idx(cf_names.size())
, _ranges_idx(ranges.size())
{
if (cf_names.empty() || ranges.empty()) {
// The product of an empty range with any range is an empty range.
// In this case we want the end iterator to be equal to the begin iterator,
// which has_ranges_idx = _cf_names_idx = 0.
_ranges_idx = _cf_names_idx = 0;
}
}
virtual_row_iterator& operator++() {
if (++_ranges_idx == _ranges.get().size() && ++_cf_names_idx < _cf_names.get().size()) {
_ranges_idx = 0;
}
return *this;
}
virtual_row_iterator operator++(int) {
virtual_row_iterator i(*this);
++(*this);
return i;
}
const value_type operator*() const {
return { _cf_names.get()[_cf_names_idx], _ranges.get()[_ranges_idx] };
}
bool operator==(const virtual_row_iterator& i) const {
return _cf_names_idx == i._cf_names_idx
&& _ranges_idx == i._ranges_idx;
}
bool operator!=(const virtual_row_iterator& i) const {
return !(*this == i);
}
};
/**
* Returns the keyspaces, ordered by name, as selected by the partition_range.
*/
static std::vector<sstring> get_keyspaces(const schema& s, const database& db, dht::partition_range range) {
struct keyspace_less_comparator {
const schema& _s;
keyspace_less_comparator(const schema& s) : _s(s) { }
dht::ring_position as_ring_position(const sstring& ks) {
auto pkey = partition_key::from_single_value(_s, utf8_type->decompose(ks));
return dht::global_partitioner().decorate_key(_s, std::move(pkey));
}
bool operator()(const sstring& ks1, const sstring& ks2) {
return as_ring_position(ks1).less_compare(_s, as_ring_position(ks2));
}
bool operator()(const sstring& ks, const dht::ring_position& rp) {
return as_ring_position(ks).less_compare(_s, rp);
}
bool operator()(const dht::ring_position& rp, const sstring& ks) {
return rp.less_compare(_s, as_ring_position(ks));
}
};
auto keyspaces = db.get_non_system_keyspaces();
auto cmp = keyspace_less_comparator(s);
boost::sort(keyspaces, cmp);
return boost::copy_range<std::vector<sstring>>(
range.slice(keyspaces, std::move(cmp)) | boost::adaptors::filtered([&s] (const auto& ks) {
// If this is a range query, results are divided between shards by the partition key (keyspace_name).
return shard_of(dht::global_partitioner().get_token(s,
partition_key::from_single_value(s, utf8_type->decompose(ks))))
== engine().cpu_id();
})
);
}
/**
* Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
*/
static dht::partition_range as_ring_position_range(dht::token_range& r) {
stdx::optional<range<dht::ring_position>::bound> start_bound, end_bound;
if (r.start()) {
start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
}
if (r.end()) {
end_bound = {{ dht::ring_position(r.end()->value(), dht::ring_position::token_bound::end), r.end()->is_inclusive() }};
}
return dht::partition_range(std::move(start_bound), std::move(end_bound), r.is_singular());
}
/**
* Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
*/
static system_keyspace::range_estimates estimate(const column_family& cf, const token_range& r) {
int64_t count{0};
utils::estimated_histogram hist{0};
auto from_bytes = [] (auto& b) {
return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
};
dht::token_range_vector ranges;
compat::unwrap_into(
wrapping_range<dht::token>({{ from_bytes(r.start) }}, {{ from_bytes(r.end) }}),
dht::token_comparator(),
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
for (auto&& r : ranges) {
auto rp_range = as_ring_position_range(r);
for (auto&& sstable : cf.select_sstables(rp_range)) {
count += sstable->estimated_keys_for_range(r);
hist.merge(sstable->get_stats_metadata().estimated_row_size);
}
}
return {cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
}
future<std::vector<token_range>> get_local_ranges() {
auto& ss = service::get_local_storage_service();
return ss.get_local_tokens().then([&ss] (auto&& tokens) {
auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens));
std::vector<token_range> local_ranges;
auto to_bytes = [](const stdx::optional<dht::token_range::bound>& b) {
assert(b);
return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
};
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
// All queries will be on that table, where all entries are text and there's no notion of
// token ranges form the CQL point of view.
auto left_inf = boost::find_if(ranges, [] (auto&& r) {
return !r.start() || r.start()->value() == dht::minimum_token();
});
auto right_inf = boost::find_if(ranges, [] (auto&& r) {
return !r.end() || r.start()->value() == dht::maximum_token();
});
if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});
ranges.erase(left_inf);
ranges.erase(right_inf);
}
for (auto&& r : ranges) {
local_ranges.push_back(token_range{to_bytes(r.start()), to_bytes(r.end())});
}
boost::sort(local_ranges, [] (auto&& tr1, auto&& tr2) {
return utf8_type->less(tr1.start, tr2.start);
});
return local_ranges;
});
}
size_estimates_mutation_reader::size_estimates_mutation_reader(schema_ptr schema, const dht::partition_range& prange, const query::partition_slice& slice, streamed_mutation::forwarding fwd)
: impl(schema)
, _schema(std::move(schema))
, _prange(&prange)
, _slice(slice)
, _fwd(fwd)
{ }
future<> size_estimates_mutation_reader::get_next_partition() {
auto& db = service::get_local_storage_proxy().get_db().local();
if (!_keyspaces) {
_keyspaces = get_keyspaces(*_schema, db, *_prange);
_current_partition = _keyspaces->begin();
}
if (_current_partition == _keyspaces->end()) {
_end_of_stream = true;
return make_ready_future<>();
}
return get_local_ranges().then([&db, this] (auto&& ranges) {
auto estimates = this->estimates_for_current_keyspace(db, std::move(ranges));
auto mutations = db::system_keyspace::make_size_estimates_mutation(*_current_partition, std::move(estimates));
++_current_partition;
std::vector<mutation> ms;
ms.emplace_back(std::move(mutations));
_partition_reader = flat_mutation_reader_from_mutations(std::move(ms), _fwd);
});
}
future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_point timeout) {
return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
if (!_partition_reader) {
return get_next_partition();
}
return _partition_reader->consume_pausable([this] (mutation_fragment mf) {
push_mutation_fragment(std::move(mf));
return stop_iteration(is_buffer_full());
}, timeout).then([this] {
if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
_partition_reader = stdx::nullopt;
}
});
});
}
void size_estimates_mutation_reader::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_partition_reader = stdx::nullopt;
}
}
future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
clear_buffer();
_prange = &pr;
_keyspaces = stdx::nullopt;
_partition_reader = stdx::nullopt;
_end_of_stream = false;
return make_ready_future<>();
}
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
forward_buffer_to(pr.start());
_end_of_stream = false;
if (_partition_reader) {
return _partition_reader->fast_forward_to(std::move(pr), timeout);
}
return make_ready_future<>();
}
size_t size_estimates_mutation_reader::buffer_size() const {
if (_partition_reader) {
return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size();
}
return flat_mutation_reader::impl::buffer_size();
}
std::vector<db::system_keyspace::range_estimates>
size_estimates_mutation_reader::estimates_for_current_keyspace(const database& db, std::vector<token_range> local_ranges) const {
// For each specified range, estimate (crudely) mean partition size and partitions count.
auto pkey = partition_key::from_single_value(*_schema, utf8_type->decompose(*_current_partition));
auto cfs = db.find_keyspace(*_current_partition).metadata()->cf_meta_data();
auto cf_names = boost::copy_range<std::vector<bytes>>(cfs | boost::adaptors::transformed([] (auto&& cf) {
return utf8_type->decompose(cf.first);
}));
boost::sort(cf_names, [] (auto&& n1, auto&& n2) {
return utf8_type->less(n1, n2);
});
std::vector<db::system_keyspace::range_estimates> estimates;
for (auto& range : _slice.row_ranges(*_schema, pkey)) {
auto rows = boost::make_iterator_range(
virtual_row_iterator(cf_names, local_ranges),
virtual_row_iterator(cf_names, local_ranges, virtual_row_iterator::end_iterator_tag()));
auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
for (auto&& r : rows_to_estimate) {
auto& cf = db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
estimates.push_back(estimate(cf, r.tokens));
if (estimates.size() >= _slice.partition_row_limit()) {
return estimates;
}
}
}
return estimates;
}
} // namespace size_estimates
} // namespace db

View File

@@ -21,33 +21,19 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/find_if.hpp>
#include "clustering_bounds_comparator.hh"
#include "database.hh"
#include "db/system_keyspace.hh"
#include "dht/i_partitioner.hh"
#include "mutation_reader.hh"
#include "partition_range_compat.hh"
#include "range.hh"
#include "service/storage_service.hh"
#include "stdx.hh"
#include "mutation_fragment.hh"
#include "sstables/sstables.hh"
#include "db/timeout_clock.hh"
namespace db {
namespace size_estimates {
struct token_range {
bytes start;
bytes end;
};
class size_estimates_mutation_reader final : public flat_mutation_reader::impl {
struct token_range {
bytes start;
bytes end;
};
schema_ptr _schema;
const dht::partition_range* _prange;
const query::partition_slice& _slice;
@@ -57,267 +43,18 @@ class size_estimates_mutation_reader final : public flat_mutation_reader::impl {
streamed_mutation::forwarding _fwd;
flat_mutation_reader_opt _partition_reader;
public:
size_estimates_mutation_reader(schema_ptr schema, const dht::partition_range& prange, const query::partition_slice& slice, streamed_mutation::forwarding fwd)
: impl(schema)
, _schema(std::move(schema))
, _prange(&prange)
, _slice(slice)
, _fwd(fwd)
{ }
size_estimates_mutation_reader(schema_ptr, const dht::partition_range&, const query::partition_slice&, streamed_mutation::forwarding);
virtual future<> fill_buffer(db::timeout_clock::time_point) override;
virtual void next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override;
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override;
virtual size_t buffer_size() const override;
private:
future<> get_next_partition() {
// For each specified range, estimate (crudely) mean partition size and partitions count.
auto& db = service::get_local_storage_proxy().get_db().local();
if (!_keyspaces) {
_keyspaces = get_keyspaces(*_schema, db, *_prange);
_current_partition = _keyspaces->begin();
}
if (_current_partition == _keyspaces->end()) {
_end_of_stream = true;
return make_ready_future<>();
}
return get_local_ranges().then([&db, this] (auto&& ranges) {
auto estimates = this->estimates_for_current_keyspace(db, std::move(ranges));
auto mutations = db::system_keyspace::make_size_estimates_mutation(*_current_partition, std::move(estimates));
++_current_partition;
std::vector<mutation> ms;
ms.emplace_back(std::move(mutations));
_partition_reader = flat_mutation_reader_from_mutations(std::move(ms), _fwd);
});
}
public:
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
if (!_partition_reader) {
return get_next_partition();
}
return _partition_reader->consume_pausable([this] (mutation_fragment mf) {
push_mutation_fragment(std::move(mf));
return stop_iteration(is_buffer_full());
}, timeout).then([this] {
if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
_partition_reader = stdx::nullopt;
}
});
});
}
virtual void next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_partition_reader = stdx::nullopt;
}
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
_prange = &pr;
_keyspaces = stdx::nullopt;
_partition_reader = stdx::nullopt;
_end_of_stream = false;
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
forward_buffer_to(pr.start());
_end_of_stream = false;
if (_partition_reader) {
return _partition_reader->fast_forward_to(std::move(pr), timeout);
}
return make_ready_future<>();
}
virtual size_t buffer_size() const override {
if (_partition_reader) {
return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size();
}
return flat_mutation_reader::impl::buffer_size();
}
/**
* Returns the primary ranges for the local node.
* Used for testing as well.
*/
static future<std::vector<token_range>> get_local_ranges() {
auto& ss = service::get_local_storage_service();
return ss.get_local_tokens().then([&ss] (auto&& tokens) {
auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens));
std::vector<token_range> local_ranges;
auto to_bytes = [](const stdx::optional<dht::token_range::bound>& b) {
assert(b);
return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
};
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
// All queries will be on that table, where all entries are text and there's no notion of
// token ranges form the CQL point of view.
auto left_inf = boost::find_if(ranges, [] (auto&& r) {
return !r.start() || r.start()->value() == dht::minimum_token();
});
auto right_inf = boost::find_if(ranges, [] (auto&& r) {
return !r.end() || r.start()->value() == dht::maximum_token();
});
if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});
ranges.erase(left_inf);
ranges.erase(right_inf);
}
for (auto&& r : ranges) {
local_ranges.push_back(token_range{to_bytes(r.start()), to_bytes(r.end())});
}
boost::sort(local_ranges, [] (auto&& tr1, auto&& tr2) {
return utf8_type->less(tr1.start, tr2.start);
});
return local_ranges;
});
}
private:
struct virtual_row {
const bytes& cf_name;
const token_range& tokens;
clustering_key_prefix as_key() const {
return clustering_key_prefix::from_exploded(std::vector<bytes_view>{cf_name, tokens.start, tokens.end});
}
};
struct virtual_row_comparator {
schema_ptr _schema;
virtual_row_comparator(schema_ptr schema) : _schema(schema) { }
bool operator()(const clustering_key_prefix& key1, const clustering_key_prefix& key2) {
return clustering_key_prefix::prefix_equality_less_compare(*_schema)(key1, key2);
}
bool operator()(const virtual_row& row, const clustering_key_prefix& key) {
return operator()(row.as_key(), key);
}
bool operator()(const clustering_key_prefix& key, const virtual_row& row) {
return operator()(key, row.as_key());
}
};
class virtual_row_iterator : public std::iterator<std::input_iterator_tag, const virtual_row> {
std::reference_wrapper<const std::vector<bytes>> _cf_names;
std::reference_wrapper<const std::vector<token_range>> _ranges;
size_t _cf_names_idx = 0;
size_t _ranges_idx = 0;
public:
struct end_iterator_tag {};
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges)
: _cf_names(std::ref(cf_names))
, _ranges(std::ref(ranges))
{ }
virtual_row_iterator(const std::vector<bytes>& cf_names, const std::vector<token_range>& ranges, end_iterator_tag)
: _cf_names(std::ref(cf_names))
, _ranges(std::ref(ranges))
, _cf_names_idx(cf_names.size())
, _ranges_idx(ranges.size())
{ }
virtual_row_iterator& operator++() {
if (++_ranges_idx == _ranges.get().size() && ++_cf_names_idx < _cf_names.get().size()) {
_ranges_idx = 0;
}
return *this;
}
virtual_row_iterator operator++(int) {
virtual_row_iterator i(*this);
++(*this);
return i;
}
const value_type operator*() const {
return { _cf_names.get()[_cf_names_idx], _ranges.get()[_ranges_idx] };
}
bool operator==(const virtual_row_iterator& i) const {
return _cf_names_idx == i._cf_names_idx
&& _ranges_idx == i._ranges_idx;
}
bool operator!=(const virtual_row_iterator& i) const {
return !(*this == i);
}
};
future<> get_next_partition();
std::vector<db::system_keyspace::range_estimates>
estimates_for_current_keyspace(const database& db, std::vector<token_range> local_ranges) const {
auto pkey = partition_key::from_single_value(*_schema, utf8_type->decompose(*_current_partition));
auto cfs = db.find_keyspace(*_current_partition).metadata()->cf_meta_data();
auto cf_names = boost::copy_range<std::vector<bytes>>(cfs | boost::adaptors::transformed([] (auto&& cf) {
return utf8_type->decompose(cf.first);
}));
boost::sort(cf_names, [] (auto&& n1, auto&& n2) {
return utf8_type->less(n1, n2);
});
std::vector<db::system_keyspace::range_estimates> estimates;
for (auto& range : _slice.row_ranges(*_schema, pkey)) {
auto rows = boost::make_iterator_range(
virtual_row_iterator(cf_names, local_ranges),
virtual_row_iterator(cf_names, local_ranges, virtual_row_iterator::end_iterator_tag()));
auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
for (auto&& r : rows_to_estimate) {
auto& cf = db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
estimates.push_back(estimate(cf, r.tokens));
if (estimates.size() >= _slice.partition_row_limit()) {
return estimates;
}
}
}
return estimates;
}
/**
* Returns the keyspaces, ordered by name, as selected by the partition_range.
*/
static ks_range get_keyspaces(const schema& s, const database& db, dht::partition_range range) {
struct keyspace_less_comparator {
const schema& _s;
keyspace_less_comparator(const schema& s) : _s(s) { }
dht::ring_position as_ring_position(const sstring& ks) {
auto pkey = partition_key::from_single_value(_s, utf8_type->decompose(ks));
return dht::global_partitioner().decorate_key(_s, std::move(pkey));
}
bool operator()(const sstring& ks1, const sstring& ks2) {
return as_ring_position(ks1).less_compare(_s, as_ring_position(ks2));
}
bool operator()(const sstring& ks, const dht::ring_position& rp) {
return as_ring_position(ks).less_compare(_s, rp);
}
bool operator()(const dht::ring_position& rp, const sstring& ks) {
return rp.less_compare(_s, as_ring_position(ks));
}
};
auto keyspaces = db.get_non_system_keyspaces();
auto cmp = keyspace_less_comparator(s);
boost::sort(keyspaces, cmp);
return boost::copy_range<ks_range>(range.slice(keyspaces, std::move(cmp)));
}
/**
* Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
*/
static dht::partition_range as_ring_position_range(dht::token_range& r) {
stdx::optional<range<dht::ring_position>::bound> start_bound, end_bound;
if (r.start()) {
start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
}
if (r.end()) {
end_bound = {{ dht::ring_position(r.end()->value(), dht::ring_position::token_bound::end), r.end()->is_inclusive() }};
}
return dht::partition_range(std::move(start_bound), std::move(end_bound), r.is_singular());
}
/**
* Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
*/
static system_keyspace::range_estimates estimate(const column_family& cf, const token_range& r) {
int64_t count{0};
utils::estimated_histogram hist{0};
auto from_bytes = [] (auto& b) {
return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
};
dht::token_range_vector ranges;
compat::unwrap_into(
wrapping_range<dht::token>({{ from_bytes(r.start) }}, {{ from_bytes(r.end) }}),
dht::token_comparator(),
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
for (auto&& r : ranges) {
auto rp_range = as_ring_position_range(r);
for (auto&& sstable : cf.select_sstables(rp_range)) {
count += sstable->estimated_keys_for_range(r);
hist.merge(sstable->get_stats_metadata().estimated_row_size);
}
}
return {cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
}
estimates_for_current_keyspace(const database&, std::vector<token_range> local_ranges) const;
};
struct virtual_reader {
@@ -332,6 +69,12 @@ struct virtual_reader {
}
};
/**
* Returns the primary ranges for the local node.
* Used for testing as well.
*/
future<std::vector<token_range>> get_local_ranges();
} // namespace size_estimates
} // namespace db

View File

@@ -68,7 +68,7 @@
"type": "shell",
"inline": [
"sudo yum install -y epel-release",
"sudo yum install -y python34",
"sudo yum install -y python36",
"sudo /home/{{user `ssh_username`}}/scylla-ami/scylla_install_ami {{ user `install_args` }}"
]
}

View File

@@ -33,7 +33,7 @@ RUN curl http://downloads.scylladb.com/rpm/centos/scylla-2.3.repo -o /etc/yum.re
yum -y remove boost-thread boost-system && \
yum -y install scylla hostname supervisor && \
yum clean all && \
yum -y install python34 python34-PyYAML && \
yum -y install python36 python36-PyYAML && \
cat /scylla_bashrc >> /etc/bashrc && \
mkdir -p /etc/supervisor.conf.d && \
mkdir -p /var/log/scylla && \

View File

@@ -56,9 +56,9 @@ License: AGPLv3
URL: http://www.scylladb.com/
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler systemtap-sdt-devel ninja-build cmake python ragel grep kernel-headers
%{?fedora:BuildRequires: boost-devel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum python2-pystache}
%{?rhel:BuildRequires: scylla-libstdc++73-static scylla-boost163-devel scylla-boost163-static scylla-antlr35-tool scylla-antlr35-C++-devel python34 scylla-gcc73-c++, scylla-python34-pyparsing20 yaml-cpp-static pystache python-setuptools}
%{?rhel:BuildRequires: scylla-libstdc++73-static scylla-boost163-devel scylla-boost163-static scylla-antlr35-tool scylla-antlr35-C++-devel python36 scylla-gcc73-c++, scylla-python36-pyparsing20 yaml-cpp-static pystache python-setuptools}
Requires: scylla-conf systemd-libs hwloc PyYAML python-urwid pciutils pyparsing python-requests curl util-linux python-setuptools pciutils python3-pyudev mdadm xfsprogs
%{?rhel:Requires: python34 python34-PyYAML kernel >= 3.10.0-514}
%{?rhel:Requires: python36 python36-PyYAML kernel >= 3.10.0-514}
%{?fedora:Requires: python3 python3-PyYAML}
Conflicts: abrt
%ifarch x86_64
@@ -97,7 +97,7 @@ cflags="--cflags=${defines[*]}"
%endif
%if 0%{?rhel}
. /etc/profile.d/scylla.sh
python3.4 ./configure.py %{?configure_opt} --with=scylla --with=iotune --mode=release "$cflags" --static-boost --static-yaml-cpp --compiler=/opt/scylladb/bin/g++-7.3 --python python3.4 --ldflag=-Wl,-rpath=/opt/scylladb/lib64
python3.6 ./configure.py %{?configure_opt} --with=scylla --with=iotune --mode=release "$cflags" --static-boost --static-yaml-cpp --compiler=/opt/scylladb/bin/g++-7.3 --python python3.6 --ldflag=-Wl,-rpath=/opt/scylladb/lib64
%endif
ninja-build %{?_smp_mflags} build/release/scylla build/release/iotune

View File

@@ -449,9 +449,13 @@ GCC6_CONCEPT(requires requires(StopCondition stop, ConsumeMutationFragment consu
{ consume_mf(std::move(mf)) } -> void;
{ consume_eos() } -> future<>;
})
future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition&& stop,
ConsumeMutationFragment&& consume_mf, ConsumeEndOfStream&& consume_eos) {
return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos] {
future<> consume_mutation_fragments_until(
flat_mutation_reader& r,
StopCondition&& stop,
ConsumeMutationFragment&& consume_mf,
ConsumeEndOfStream&& consume_eos,
db::timeout_clock::time_point timeout) {
return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos, timeout] {
while (!r.is_buffer_empty()) {
consume_mf(r.pop_mutation_fragment());
if (stop()) {
@@ -461,7 +465,7 @@ future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition
if (r.is_end_of_stream()) {
return consume_eos();
}
return r.fill_buffer();
return r.fill_buffer(timeout);
});
}

View File

@@ -42,5 +42,5 @@ elif [ "$ID" = "fedora" ]; then
yum install -y yaml-cpp-devel thrift-devel antlr3-tool antlr3-C++-devel jsoncpp-devel snappy-devel
elif [ "$ID" = "centos" ]; then
yum install -y yaml-cpp-devel thrift-devel scylla-antlr35-tool scylla-antlr35-C++-devel jsoncpp-devel snappy-devel scylla-boost163-static scylla-python34-pyparsing20 systemd-devel
echo -e "Configure example:\n\tpython3.4 ./configure.py --enable-dpdk --mode=release --static-boost --compiler=/opt/scylladb/bin/g++-7.3 --python python3.4 --ldflag=-Wl,-rpath=/opt/scylladb/lib64 --cflags=-I/opt/scylladb/include --with-antlr3=/opt/scylladb/bin/antlr3"
echo -e "Configure example:\n\tpython3.6 ./configure.py --enable-dpdk --mode=release --static-boost --compiler=/opt/scylladb/bin/g++-7.3 --python python3.6 --ldflag=-Wl,-rpath=/opt/scylladb/lib64 --cflags=-I/opt/scylladb/include --with-antlr3=/opt/scylladb/bin/antlr3"
fi

View File

@@ -763,8 +763,11 @@ int main(int ac, char** av) {
return service::get_local_storage_service().drain_on_shutdown();
});
engine().at_exit([] {
return view_builder.stop();
engine().at_exit([cfg] {
if (cfg->view_building()) {
return view_builder.stop();
}
return make_ready_future<>();
});
engine().at_exit([&db] {

View File

@@ -144,7 +144,14 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
, _static_row(s, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _row_tombstones(x._row_tombstones) {
, _row_tombstones(x._row_tombstones)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{
#ifdef SEASTAR_DEBUG
assert(x._schema_version == _schema_version);
#endif
auto cloner = [&s] (const auto& x) {
return current_allocator().construct<rows_entry>(s, x);
};
@@ -157,7 +164,14 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
, _static_row(schema, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only()) {
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(schema.version())
#endif
{
#ifdef SEASTAR_DEBUG
assert(x._schema_version == _schema_version);
#endif
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
@@ -180,7 +194,13 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
, _static_row_continuous(x._static_row_continuous)
, _rows(std::move(x._rows))
, _row_tombstones(std::move(x._row_tombstones))
#ifdef SEASTAR_DEBUG
, _schema_version(schema.version())
#endif
{
#ifdef SEASTAR_DEBUG
assert(x._schema_version == _schema_version);
#endif
{
auto deleter = current_deleter<rows_entry>();
auto it = _rows.begin();
@@ -220,6 +240,7 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
}
void mutation_partition::ensure_last_dummy(const schema& s) {
check_schema(s);
if (_rows.empty() || !_rows.rbegin()->is_last_dummy()) {
_rows.insert_before(_rows.end(),
*current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::yes));
@@ -276,11 +297,16 @@ void deletable_row::apply(const schema& s, clustering_row cr) {
void
mutation_partition::apply(const schema& s, const mutation_fragment& mf) {
check_schema(s);
mutation_fragment_applier applier{s, *this};
mf.visit(applier);
}
void mutation_partition::apply_monotonically(const schema& s, mutation_partition&& p, cache_tracker* tracker) {
#ifdef SEASTAR_DEBUG
assert(s.version() == _schema_version);
assert(p._schema_version == _schema_version);
#endif
_tombstone.apply(p._tombstone);
_row_tombstones.apply_monotonically(s, std::move(p._row_tombstones));
_static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row));
@@ -356,6 +382,7 @@ void mutation_partition::apply_weak(const schema& s, mutation_partition&& p) {
tombstone
mutation_partition::range_tombstone_for_row(const schema& schema, const clustering_key& key) const {
check_schema(schema);
tombstone t = _tombstone;
if (!_row_tombstones.empty()) {
auto found = _row_tombstones.search_tombstone_covering(schema, key);
@@ -366,6 +393,7 @@ mutation_partition::range_tombstone_for_row(const schema& schema, const clusteri
row_tombstone
mutation_partition::tombstone_for_row(const schema& schema, const clustering_key& key) const {
check_schema(schema);
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
auto j = _rows.find(key, rows_entry::compare(schema));
@@ -378,6 +406,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
row_tombstone
mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e) const {
check_schema(schema);
row_tombstone t = e.row().deleted_at();
t.apply(range_tombstone_for_row(schema, e.key()));
return t;
@@ -385,6 +414,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e)
void
mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t) {
check_schema(schema);
assert(!prefix.is_full(schema));
auto start = prefix;
_row_tombstones.apply(schema, {std::move(start), std::move(prefix), std::move(t)});
@@ -392,11 +422,13 @@ mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_pre
void
mutation_partition::apply_row_tombstone(const schema& schema, range_tombstone rt) {
check_schema(schema);
_row_tombstones.apply(schema, std::move(rt));
}
void
mutation_partition::apply_delete(const schema& schema, const clustering_key_prefix& prefix, tombstone t) {
check_schema(schema);
if (prefix.is_empty(schema)) {
apply(t);
} else if (prefix.is_full(schema)) {
@@ -408,6 +440,7 @@ mutation_partition::apply_delete(const schema& schema, const clustering_key_pref
void
mutation_partition::apply_delete(const schema& schema, range_tombstone rt) {
check_schema(schema);
if (range_tombstone::is_single_clustering_row_tombstone(schema, rt.start, rt.start_kind, rt.end, rt.end_kind)) {
apply_delete(schema, std::move(rt.start), std::move(rt.tomb));
return;
@@ -417,6 +450,7 @@ mutation_partition::apply_delete(const schema& schema, range_tombstone rt) {
void
mutation_partition::apply_delete(const schema& schema, clustering_key&& prefix, tombstone t) {
check_schema(schema);
if (prefix.is_empty(schema)) {
apply(t);
} else if (prefix.is_full(schema)) {
@@ -428,6 +462,7 @@ mutation_partition::apply_delete(const schema& schema, clustering_key&& prefix,
void
mutation_partition::apply_delete(const schema& schema, clustering_key_prefix_view prefix, tombstone t) {
check_schema(schema);
if (prefix.is_empty(schema)) {
apply(t);
} else if (prefix.is_full(schema)) {
@@ -451,12 +486,14 @@ void mutation_partition::insert_row(const schema& s, const clustering_key& key,
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
check_schema(s);
auto e = current_allocator().construct<rows_entry>(s, key, row);
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
}
const row*
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
return nullptr;
@@ -466,6 +503,7 @@ mutation_partition::find_row(const schema& s, const clustering_key& key) const {
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct<rows_entry>(std::move(key));
@@ -477,6 +515,7 @@ mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
deletable_row&
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct<rows_entry>(key);
@@ -488,6 +527,7 @@ mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct<rows_entry>(key);
@@ -499,6 +539,7 @@ mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
deletable_row&
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
auto i = _rows.find(pos, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct<rows_entry>(s, pos, dummy, continuous);
@@ -510,6 +551,7 @@ mutation_partition::clustered_row(const schema& s, position_in_partition_view po
mutation_partition::rows_type::const_iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (!r.start()) {
return std::cbegin(_rows);
}
@@ -518,6 +560,7 @@ mutation_partition::lower_bound(const schema& schema, const query::clustering_ra
mutation_partition::rows_type::const_iterator
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (!r.end()) {
return std::cend(_rows);
}
@@ -526,6 +569,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
boost::iterator_range<mutation_partition::rows_type::const_iterator>
mutation_partition::range(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
return boost::make_iterator_range(lower_bound(schema, r), upper_bound(schema, r));
}
@@ -562,6 +606,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
template<typename Func>
void mutation_partition::for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const
{
check_schema(schema);
auto r = range(schema, row_range);
if (!reversed) {
for (const auto& e : r) {
@@ -778,6 +823,7 @@ bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tomb
void
mutation_partition::query_compacted(query::result::partition_writer& pw, const schema& s, uint32_t limit) const {
check_schema(s);
const query::partition_slice& slice = pw.slice();
max_timestamp max_ts{pw.last_modified()};
@@ -996,6 +1042,10 @@ bool mutation_partition::equal(const schema& s, const mutation_partition& p) con
}
bool mutation_partition::equal(const schema& this_schema, const mutation_partition& p, const schema& p_schema) const {
#ifdef SEASTAR_DEBUG
assert(_schema_version == this_schema.version());
assert(p._schema_version == p_schema.version());
#endif
if (_tombstone != p._tombstone) {
return false;
}
@@ -1124,6 +1174,7 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
void
row::append_cell(column_id id, atomic_cell_or_collection value) {
if (_type == storage_type::vector && id < max_vector_size) {
assert(_storage.vector.v.size() <= id);
_storage.vector.v.resize(id);
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
_storage.vector.present.set(id);
@@ -1188,6 +1239,7 @@ size_t rows_entry::memory_usage(const schema& s) const {
}
size_t mutation_partition::external_memory_usage(const schema& s) const {
check_schema(s);
size_t sum = 0;
sum += static_row().external_memory_usage(s, column_kind::static_column);
for (auto& clr : clustered_rows()) {
@@ -1206,6 +1258,7 @@ void mutation_partition::trim_rows(const schema& s,
const std::vector<query::clustering_range>& row_ranges,
Func&& func)
{
check_schema(s);
static_assert(std::is_same<stop_iteration, std::result_of_t<Func(rows_entry&)>>::value, "Bad func signature");
stop_iteration stop = stop_iteration::no;
@@ -1250,6 +1303,7 @@ uint32_t mutation_partition::do_compact(const schema& s,
uint32_t row_limit,
can_gc_fn& can_gc)
{
check_schema(s);
assert(row_limit > 0);
auto gc_before = saturating_subtract(query_time, s.gc_grace_seconds());
@@ -1315,12 +1369,14 @@ mutation_partition::compact_for_query(
bool reverse,
uint32_t row_limit)
{
check_schema(s);
return do_compact(s, query_time, row_ranges, reverse, row_limit, always_gc);
}
void mutation_partition::compact_for_compaction(const schema& s,
can_gc_fn& can_gc, gc_clock::time_point compaction_time)
{
check_schema(s);
static const std::vector<query::clustering_range> all_rows = {
query::clustering_range::make_open_ended_both_sides()
};
@@ -1354,11 +1410,13 @@ row::is_live(const schema& s, column_kind kind, tombstone base_tombstone, gc_clo
bool
mutation_partition::is_static_row_live(const schema& s, gc_clock::time_point query_time) const {
check_schema(s);
return has_any_live_data(s, column_kind::static_column, static_row(), _tombstone, query_time);
}
size_t
mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_time) const {
check_schema(s);
size_t count = 0;
for (const rows_entry& e : non_dummy_rows()) {
@@ -1704,6 +1762,7 @@ row row::difference(const schema& s, column_kind kind, const row& other) const
mutation_partition mutation_partition::difference(schema_ptr s, const mutation_partition& other) const
{
check_schema(*s);
mutation_partition mp(s);
if (_tombstone > other._tombstone) {
mp.apply(_tombstone);
@@ -1734,6 +1793,7 @@ mutation_partition mutation_partition::difference(schema_ptr s, const mutation_p
}
void mutation_partition::accept(const schema& s, mutation_partition_visitor& v) const {
check_schema(s);
v.accept_partition_tombstone(_tombstone);
_static_row.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
const column_definition& def = s.static_column_at(id);
@@ -2167,6 +2227,9 @@ mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const
, _static_row_continuous(!s.has_static_columns())
, _rows()
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{
_rows.insert_before(_rows.end(),
*current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
@@ -2198,6 +2261,7 @@ void mutation_partition::make_fully_continuous() {
}
clustering_interval_set mutation_partition::get_continuity(const schema& s, is_continuous cont) const {
check_schema(s);
clustering_interval_set result;
auto i = _rows.begin();
auto prev_pos = position_in_partition::before_all_clustered_rows();
@@ -2247,6 +2311,7 @@ stop_iteration mutation_partition::clear_gently(cache_tracker* tracker) noexcept
bool
mutation_partition::check_continuity(const schema& s, const position_range& r, is_continuous cont) const {
check_schema(s);
auto less = rows_entry::compare(s);
auto i = _rows.lower_bound(r.start(), less);
auto end = _rows.lower_bound(r.end(), less);

View File

@@ -905,6 +905,9 @@ private:
// Contains only strict prefixes so that we don't have to lookup full keys
// in both _row_tombstones and _rows.
range_tombstone_list _row_tombstones;
#ifdef SEASTAR_DEBUG
table_schema_version _schema_version;
#endif
friend class mutation_partition_applier;
friend class converting_mutation_partition_applier;
@@ -919,10 +922,16 @@ public:
mutation_partition(schema_ptr s)
: _rows()
, _row_tombstones(*s)
#ifdef SEASTAR_DEBUG
, _schema_version(s->version())
#endif
{ }
mutation_partition(mutation_partition& other, copy_comparators_only)
: _rows()
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(other._schema_version)
#endif
{ }
mutation_partition(mutation_partition&&) = default;
mutation_partition(const schema& s, const mutation_partition&);
@@ -1122,6 +1131,12 @@ private:
template<typename Func>
void for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const;
friend class counter_write_query_result_builder;
void check_schema(const schema& s) const {
#ifdef SEASTAR_DEBUG
assert(s.version() == _schema_version);
#endif
}
};
inline

View File

@@ -312,7 +312,7 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
void partition_entry::apply(const schema& s, const mutation_partition& mp, const schema& mp_schema)
{
apply(s, mutation_partition(s, mp), mp_schema);
apply(s, mutation_partition(mp_schema, mp), mp_schema);
}
void partition_entry::apply(const schema& s, mutation_partition&& mp, const schema& mp_schema)

View File

@@ -38,7 +38,7 @@ class autoupdating_underlying_reader final {
row_cache& _cache;
read_context& _read_context;
stdx::optional<flat_mutation_reader> _reader;
utils::phased_barrier::phase_type _reader_creation_phase;
utils::phased_barrier::phase_type _reader_creation_phase = 0;
dht::partition_range _range = { };
stdx::optional<dht::decorated_key> _last_key;
stdx::optional<dht::decorated_key> _new_last_key;
@@ -105,7 +105,6 @@ public:
return make_ready_future<>();
}
utils::phased_barrier::phase_type creation_phase() const {
assert(_reader);
return _reader_creation_phase;
}
const dht::partition_range& range() const {

Submodule seastar updated: 10ac122dd6...efda4281c2

View File

@@ -179,6 +179,8 @@ struct compaction_read_monitor_generator final : public read_monitor_generator {
void remove_sstable(bool is_tracking) {
if (is_tracking) {
_cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst);
} else if (_sst) {
_cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst);
}
_sst = {};
}
@@ -303,6 +305,7 @@ public:
class compaction {
protected:
column_family& _cf;
schema_ptr _schema;
std::vector<shared_sstable> _sstables;
uint64_t _max_sstable_size;
uint32_t _sstable_level;
@@ -313,6 +316,7 @@ protected:
protected:
compaction(column_family& cf, std::vector<shared_sstable> sstables, uint64_t max_sstable_size, uint32_t sstable_level)
: _cf(cf)
, _schema(cf.schema())
, _sstables(std::move(sstables))
, _max_sstable_size(max_sstable_size)
, _sstable_level(sstable_level)
@@ -361,10 +365,9 @@ private:
virtual flat_mutation_reader make_sstable_reader(lw_shared_ptr<sstables::sstable_set> ssts) const = 0;
flat_mutation_reader setup() {
auto ssts = make_lw_shared<sstables::sstable_set>(_cf.get_compaction_strategy().make_sstable_set(_cf.schema()));
auto schema = _cf.schema();
auto ssts = make_lw_shared<sstables::sstable_set>(_cf.get_compaction_strategy().make_sstable_set(_schema));
sstring formatted_msg = "[";
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - schema->gc_grace_seconds());
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds());
for (auto& sst : _sstables) {
// Compacted sstable keeps track of its ancestors.
@@ -396,8 +399,8 @@ private:
}
formatted_msg += "]";
_info->sstables = _sstables.size();
_info->ks = schema->ks_name();
_info->cf = schema->cf_name();
_info->ks = _schema->ks_name();
_info->cf = _schema->cf_name();
report_start(formatted_msg);
return make_sstable_reader(std::move(ssts));
@@ -462,7 +465,7 @@ private:
}
const schema_ptr& schema() const {
return _cf.schema();
return _schema;
}
public:
static future<compaction_info> run(std::unique_ptr<compaction> c);
@@ -518,10 +521,10 @@ public:
}
flat_mutation_reader make_sstable_reader(lw_shared_ptr<sstables::sstable_set> ssts) const override {
return ::make_local_shard_sstable_reader(_cf.schema(),
return ::make_local_shard_sstable_reader(_schema,
std::move(ssts),
query::full_partition_range,
_cf.schema()->full_slice(),
_schema->full_slice(),
service::get_local_compaction_priority(),
no_resource_tracking(),
nullptr,
@@ -570,7 +573,7 @@ public:
cfg.monitor = &_active_write_monitors.back();
cfg.large_partition_handler = _cf.get_large_partition_handler();
// TODO: calculate encoding_stats based on statistics of compacted sstables
_writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, encoding_stats{}, priority));
_writer.emplace(_sst->get_writer(*_schema, partitions_per_sstable(), cfg, encoding_stats{}, priority));
}
return &*_writer;
}
@@ -610,7 +613,7 @@ public:
}
std::function<bool(const dht::decorated_key&)> filter_func() const override {
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_cf.schema()->ks_name());
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
return [this, owned_ranges = std::move(owned_ranges)] (const dht::decorated_key& dk) {
if (dht::shard_of(dk.token()) != engine().cpu_id()) {
@@ -684,10 +687,10 @@ public:
// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader make_sstable_reader(lw_shared_ptr<sstables::sstable_set> ssts) const override {
return ::make_range_sstable_reader(_cf.schema(),
return ::make_range_sstable_reader(_schema,
std::move(ssts),
query::full_partition_range,
_cf.schema()->full_slice(),
_schema->full_slice(),
service::get_local_compaction_priority(),
no_resource_tracking(),
nullptr,
@@ -719,7 +722,7 @@ public:
cfg.large_partition_handler = _cf.get_large_partition_handler();
auto&& priority = service::get_local_compaction_priority();
// TODO: calculate encoding_stats based on statistics of compacted sstables
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, encoding_stats{}, priority, _shard));
writer.emplace(sst->get_writer(*_schema, partitions_per_sstable(_shard), cfg, encoding_stats{}, priority, _shard));
}
return &*writer;
}

View File

@@ -66,6 +66,14 @@ public:
_cm->deregister_compacting_sstables(_compacting);
}
}
// Explicitly release compacting sstables
void release_compacting(const std::vector<sstables::shared_sstable>& sstables) {
_cm->deregister_compacting_sstables(sstables);
for (auto& sst : sstables) {
_compacting.erase(boost::remove(_compacting, sst), _compacting.end());
}
}
};
compaction_weight_registration::compaction_weight_registration(compaction_manager* cm, int weight)
@@ -564,17 +572,23 @@ future<> compaction_manager::perform_cleanup(column_family* cf) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
column_family& cf = *task->compacting_cf;
sstables::compaction_descriptor descriptor = sstables::compaction_descriptor(get_candidates(cf));
auto compacting = compacting_sstable_registration(this, descriptor.sstables);
auto sstables = get_candidates(cf);
auto compacting = make_lw_shared<compacting_sstable_registration>(this, sstables);
_stats.pending_tasks--;
_stats.active_tasks++;
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
return cf.cleanup_sstables(std::move(descriptor));
return do_with(std::move(user_initiated), std::move(sstables), [this, &cf, compacting] (compaction_backlog_tracker& bt,
std::vector<sstables::shared_sstable>& sstables) mutable {
return with_scheduling_group(_scheduling_group, [this, &cf, &sstables, compacting] () mutable {
return do_for_each(sstables, [this, &cf, compacting] (auto& sst) {
return cf.cleanup_sstables(sstables::compaction_descriptor({sst})).then([&sst, compacting] {
// Releases reference to cleaned sstable such that respective used disk space can be freed.
compacting->release_compacting({std::move(sst)});
});
});
});
}).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
}).then_wrapped([this, task, compacting] (future<> f) mutable {
_stats.active_tasks--;
if (!can_proceed(task)) {
maybe_stop_on_error(std::move(f));

View File

@@ -404,11 +404,6 @@ public:
auto itw = writes_per_window.find(bound);
if (itw != writes_per_window.end()) {
ow_this_window = &itw->second;
// We will erase here so we can keep track of which
// writes belong to existing windows. Writes that don't belong to any window
// are writes in progress to new windows and will be accounted in the final
// loop before we return
writes_per_window.erase(itw);
}
auto* oc_this_window = &no_oc;
auto itc = compactions_per_window.find(bound);
@@ -416,6 +411,13 @@ public:
oc_this_window = &itc->second;
}
b += windows.second.backlog(*ow_this_window, *oc_this_window);
if (itw != writes_per_window.end()) {
// We will erase here so we can keep track of which
// writes belong to existing windows. Writes that don't belong to any window
// are writes in progress to new windows and will be accounted in the final
// loop before we return
writes_per_window.erase(itw);
}
}
// Partial writes that don't belong to any window are accounted here.

View File

@@ -695,9 +695,12 @@ public:
// Sets streamed_mutation::_end_of_range when there are no more fragments for the query range.
// Returns information whether the parser should continue to parse more
// input and produce more fragments or we have collected enough and should yield.
// Returns proceed:yes only when all pending fragments have been pushed.
proceed push_ready_fragments() {
if (_ready) {
return push_ready_fragments_with_ready_set();
if (push_ready_fragments_with_ready_set() == proceed::no) {
return proceed::no;
}
}
if (_out_of_range) {

View File

@@ -292,7 +292,7 @@ void stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state)
}
void stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
if (has_peer(endpoint) && ep_state.is_shutdown()) {
if (has_peer(endpoint)) {
sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint);
get_stream_manager().invoke_on_all([endpoint] (auto& sm) {
sm.fail_sessions(endpoint);

View File

@@ -509,8 +509,7 @@ void stream_session::close_session(stream_session_state final_state) {
_stream_result->handle_session_complete(shared_from_this());
}
sslog.debug("[Stream #{}] close_session session={}, state={}, cancel keep_alive timer", plan_id(), this, final_state);
_keep_alive.cancel();
sslog.debug("[Stream #{}] close_session session={}, state={}", plan_id(), this, final_state);
}
}
@@ -537,41 +536,6 @@ bool stream_session::is_initialized() const {
void stream_session::init(shared_ptr<stream_result_future> stream_result_) {
_stream_result = stream_result_;
_keep_alive.set_callback([this] {
auto plan_id = this->plan_id();
auto peer = this->peer;
get_local_stream_manager().get_progress_on_all_shards(plan_id, peer).then([this, peer, plan_id] (stream_bytes sbytes) {
if (this->_is_aborted) {
sslog.info("[Stream #{}] The session {} is closed, keep alive timer will do nothing", plan_id, this);
return;
}
auto now = lowres_clock::now();
sslog.debug("[Stream #{}] keep alive timer callback sbytes old: tx={}, rx={} new: tx={} rx={}",
plan_id, this->_last_stream_bytes.bytes_sent, this->_last_stream_bytes.bytes_received,
sbytes.bytes_sent, sbytes.bytes_received);
if (sbytes.bytes_sent > this->_last_stream_bytes.bytes_sent ||
sbytes.bytes_received > this->_last_stream_bytes.bytes_received) {
sslog.debug("[Stream #{}] The session {} made progress with peer {}", plan_id, this, peer);
// Progress has been made
this->_last_stream_bytes = sbytes;
this->_last_stream_progress = now;
this->start_keep_alive_timer();
} else if (now - this->_last_stream_progress >= this->_keep_alive_timeout) {
// Timeout
sslog.info("[Stream #{}] The session {} is idle for {} seconds, the peer {} is probably gone, close it",
plan_id, this, this->_keep_alive_timeout.count(), peer);
this->on_error();
} else {
// Start the timer to check again
sslog.info("[Stream #{}] The session {} made no progress with peer {}", plan_id, this, peer);
this->start_keep_alive_timer();
}
}).handle_exception([plan_id, peer, session = this->shared_from_this()] (auto ep) {
sslog.info("[Stream #{}] keep alive timer callback fails with peer {}: {}", plan_id, peer, ep);
});
});
_last_stream_progress = lowres_clock::now();
start_keep_alive_timer();
}
utils::UUID stream_session::plan_id() {

View File

@@ -175,20 +175,8 @@ private:
bool _complete_sent = false;
bool _received_failed_complete_message = false;
// If the session is idle for 10 minutes, close the session
std::chrono::seconds _keep_alive_timeout{60 * 10};
// Check every 1 minutes
std::chrono::seconds _keep_alive_interval{60};
timer<lowres_clock> _keep_alive;
stream_bytes _last_stream_bytes;
lowres_clock::time_point _last_stream_progress;
session_info _session_info;
public:
void start_keep_alive_timer() {
_keep_alive.rearm(lowres_clock::now() + _keep_alive_interval);
}
void add_bytes_sent(int64_t bytes) {
_bytes_sent += bytes;
}

View File

@@ -161,7 +161,6 @@ future<> stream_transfer_task::execute() {
});
}).then([this, id, plan_id, cf_id] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr);
session->start_keep_alive_timer();
}).handle_exception([this, plan_id, id] (auto ep){
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
std::rethrow_exception(ep);

View File

@@ -1171,7 +1171,7 @@ static mutation_sets generate_mutation_sets() {
auto tomb = new_tombstone();
m1.partition().apply_delete(*s1, ck2, tomb);
result.unequal.emplace_back(mutations{m1, m2});
m2.partition().apply_delete(*s1, ck2, tomb);
m2.partition().apply_delete(*s2, ck2, tomb);
result.equal.emplace_back(mutations{m1, m2});
}
@@ -1180,7 +1180,7 @@ static mutation_sets generate_mutation_sets() {
auto key = clustering_key_prefix::from_deeply_exploded(*s1, {data_value(bytes("ck2_0"))});
m1.partition().apply_row_tombstone(*s1, key, tomb);
result.unequal.emplace_back(mutations{m1, m2});
m2.partition().apply_row_tombstone(*s1, key, tomb);
m2.partition().apply_row_tombstone(*s2, key, tomb);
result.equal.emplace_back(mutations{m1, m2});
}
@@ -1204,7 +1204,7 @@ static mutation_sets generate_mutation_sets() {
auto ts = new_timestamp();
m1.partition().apply_insert(*s1, ck2, ts);
result.unequal.emplace_back(mutations{m1, m2});
m2.partition().apply_insert(*s1, ck2, ts);
m2.partition().apply_insert(*s2, ck2, ts);
result.equal.emplace_back(mutations{m1, m2});
}

View File

@@ -620,6 +620,66 @@ SEASTAR_TEST_CASE(test_single_key_queries_after_population_in_reverse_order) {
});
}
// Reproducer for https://github.com/scylladb/scylla/issues/4236
SEASTAR_TEST_CASE(test_partition_range_population_with_concurrent_memtable_flushes) {
return seastar::async([] {
auto s = make_schema();
std::vector<mutation> mutations = make_ring(s, 3);
auto mt = make_lw_shared<memtable>(s);
for (auto&& m : mutations) {
mt->apply(m);
}
cache_tracker tracker;
row_cache cache(s, snapshot_source_from_snapshot(mt->as_data_source()), tracker);
bool cancel_updater = false;
auto updater = repeat([&] {
if (cancel_updater) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return later().then([&] {
auto mt = make_lw_shared<memtable>(s);
return cache.update([]{}, *mt).then([mt] {
return stop_iteration::no;
});
});
});
{
auto pr = dht::partition_range::make_singular(query::ring_position(mutations[1].decorated_key()));
assert_that(cache.make_reader(s, pr))
.produces(mutations[1])
.produces_end_of_stream();
}
{
auto pr = dht::partition_range::make_ending_with(
{query::ring_position(mutations[2].decorated_key()), true});
assert_that(cache.make_reader(s, pr))
.produces(mutations[0])
.produces(mutations[1])
.produces(mutations[2])
.produces_end_of_stream();
}
cache.invalidate([]{}).get();
{
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(mutations[0])
.produces(mutations[1])
.produces(mutations[2])
.produces_end_of_stream();
}
cancel_updater = true;
updater.get();
});
}
SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) {
return seastar::async([] {
cache_tracker tracker;

View File

@@ -4649,3 +4649,74 @@ SEASTAR_TEST_CASE(sstable_timestamp_metadata_correcness_with_negative) {
BOOST_REQUIRE(sst->get_stats_metadata().max_timestamp == 5);
});
}
SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
return async([] {
storage_service_for_tests ssft;
cell_locker_stats cl_stats;
auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
auto tmp = make_lw_shared<tmpdir>();
auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
auto sst = make_sstable(s, tmp->path, (*gen)++, la, big);
sst->set_unshared();
return sst;
};
column_family_for_tests cf(s);
cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled);
{
auto tokens = token_generation_for_current_shard(4);
auto make_insert = [&] (auto p) {
auto key = partition_key::from_exploded(*s, {to_bytes(p.first)});
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 1 /* ts */);
BOOST_REQUIRE(m.decorated_key().token() == p.second);
return m;
};
auto mut1 = make_insert(tokens[0]);
auto mut2 = make_insert(tokens[1]);
auto mut3 = make_insert(tokens[2]);
auto mut4 = make_insert(tokens[3]);
std::vector<shared_sstable> ssts = {
make_sstable_containing(sst_gen, {mut1, mut2}),
make_sstable_containing(sst_gen, {mut3, mut4})
};
for (auto& sst : ssts) {
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
}
// Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog.
// That's done to assert backlog will work for compaction that is finished and was stopped tracking.
auto fut = sstables::compact_sstables(sstables::compaction_descriptor(ssts), *cf, sst_gen);
bool stopped_tracking = false;
for (auto& info : cf._data->cm.get_compactions()) {
if (info->cf == cf->schema()->cf_name()) {
info->stop_tracking();
stopped_tracking = true;
}
}
BOOST_REQUIRE(stopped_tracking);
cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window);
for (auto& sst : ssts) {
cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst);
}
auto ret = fut.get0();
BOOST_REQUIRE(ret.new_sstables.size() == 1);
BOOST_REQUIRE(ret.tracking == false);
}
// triggers code that iterates through registered compactions.
cf._data->cm.backlog();
cf->get_compaction_strategy().get_backlog_tracker().backlog();
});
}

View File

@@ -45,180 +45,143 @@
using namespace std::literals::chrono_literals;
SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) {
return do_with_cql_env([] (auto& e) {
auto ranges = db::size_estimates::size_estimates_mutation_reader::get_local_ranges().get0();
return do_with_cql_env_thread([] (cql_test_env& e) {
auto ranges = db::size_estimates::get_local_ranges().get0();
auto start_token1 = utf8_type->to_string(ranges[3].start);
auto start_token2 = utf8_type->to_string(ranges[5].start);
auto end_token1 = utf8_type->to_string(ranges[3].end);
auto end_token2 = utf8_type->to_string(ranges[55].end);
auto &qp = e.local_qp();
return e.execute_cql("create table cf1(pk text PRIMARY KEY, v int);").discard_result().then([&e] {
return e.execute_cql("create table cf2(pk text PRIMARY KEY, v int);").discard_result();
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 512);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' limit 100;").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 100);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 256);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name > 'cf1';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 256);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name >= 'cf1';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 512);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name < 'cf2';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 256);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name <= 'cf2';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 512);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name in ('cf1', 'cf2');").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 512);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name >= 'cf1' and table_name <= 'cf1';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 256);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name >= 'cf1' and table_name <= 'cf2';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 512);
});
}).then([&qp] {
return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name > 'cf1' and table_name < 'cf2';").then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 0);
});
}).then([&qp, start_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s';", start_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s';", start_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 253);
});
}).then([&qp, start_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start > '%s';", start_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 252);
});
}).then([&qp, start_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start <= '%s';", start_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 4);
});
}).then([&qp, start_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start < '%s';", start_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 3);
});
}).then([&qp, start_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1, start_token2] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token2)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 3);
});
}).then([&qp, start_token1, start_token2] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start > '%s' and range_start < '%s';", start_token1, start_token2)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1, start_token2] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start in ('%s', '%s');", start_token1, start_token2)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 2);
});
}).then([&qp, start_token1, start_token2] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start > '%s' and range_start <= '%s';", start_token1, start_token2)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 2);
});
}).then([&qp, start_token1, start_token2] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s' and range_start < '%s';", start_token1, start_token2)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 2);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end = '%s';", start_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s';", start_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s';", start_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 0);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end <= '%s';", start_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end < '%s';", start_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 0);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s' and range_end <= '%s';", start_token1, end_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s' and range_end < '%s';", start_token1, end_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 0);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start, range_end) = ('cf1', '%s', '%s');", start_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 1);
});
}).then([&qp, start_token1, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') and (table_name) <= ('cf2');", start_token1, end_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 509);
});
}).then([&qp, start_token1, start_token2, end_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') "
"and (table_name, range_start) <= ('cf2', '%s');", start_token1, end_token1, start_token2)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 259);
});
}).then([&qp, start_token1] {
return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start) < ('cf2', '%s');", start_token1)).then([](auto rs) {
BOOST_REQUIRE_EQUAL(rs->size(), 259);
});
}).discard_result();
// Should not timeout.
e.execute_cql("select * from system.size_estimates;").discard_result().get();
auto rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks';").get0();
assert_that(rs).is_rows().with_size(0);
e.execute_cql("create table cf1(pk text PRIMARY KEY, v int);").discard_result().get();
e.execute_cql("create table cf2(pk text PRIMARY KEY, v int);").discard_result().get();
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks';").get0();
assert_that(rs).is_rows().with_size(512);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' limit 100;").get0();
assert_that(rs).is_rows().with_size(100);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name = 'cf1';").get0();
assert_that(rs).is_rows().with_size(256);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name > 'cf1';").get0();
assert_that(rs).is_rows().with_size(256);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1';").get0();
assert_that(rs).is_rows().with_size(512);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name < 'cf2';").get0();
assert_that(rs).is_rows().with_size(256);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name <= 'cf2';").get0();
assert_that(rs).is_rows().with_size(512);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name in ('cf1', 'cf2');").get0();
assert_that(rs).is_rows().with_size(512);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1' and table_name <= 'cf1';").get0();
assert_that(rs).is_rows().with_size(256);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1' and table_name <= 'cf2';").get0();
assert_that(rs).is_rows().with_size(512);
rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name > 'cf1' and table_name < 'cf2';").get0();
assert_that(rs).is_rows().with_size(0);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s';", start_token1)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s';", start_token1)).get0();
assert_that(rs).is_rows().with_size(253);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start > '%s';", start_token1)).get0();
assert_that(rs).is_rows().with_size(252);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start <= '%s';", start_token1)).get0();
assert_that(rs).is_rows().with_size(4);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start < '%s';", start_token1)).get0();
assert_that(rs).is_rows().with_size(3);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token1)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token2)).get0();
assert_that(rs).is_rows().with_size(3);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start > '%s' and range_start < '%s';", start_token1, start_token2)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start in ('%s', '%s');", start_token1, start_token2)).get0();
assert_that(rs).is_rows().with_size(2);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start > '%s' and range_start <= '%s';", start_token1, start_token2)).get0();
assert_that(rs).is_rows().with_size(2);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start >= '%s' and range_start < '%s';", start_token1, start_token2)).get0();
assert_that(rs).is_rows().with_size(2);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end = '%s';", start_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s';", start_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s';", start_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(0);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end <= '%s';", start_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end < '%s';", start_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(0);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end >= '%s' and range_end <= '%s';", start_token1, end_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and table_name = 'cf1' and range_start = '%s' and range_end > '%s' and range_end < '%s';", start_token1, end_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(0);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start, range_end) = ('cf1', '%s', '%s');", start_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(1);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') and (table_name) <= ('cf2');", start_token1, end_token1)).get0();
assert_that(rs).is_rows().with_size(509);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') "
"and (table_name, range_start) <= ('cf2', '%s');", start_token1, end_token1, start_token2)).get0();
assert_that(rs).is_rows().with_size(259);
rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
"and (table_name, range_start) < ('cf2', '%s');", start_token1)).get0();
assert_that(rs).is_rows().with_size(259);
});
}

View File

@@ -1471,7 +1471,7 @@ public:
}
b.push_back(v);
}
std::copy(b.crbegin(), b.crend(), out);
out = std::copy(b.crbegin(), b.crend(), out);
}
virtual size_t serialized_size(const void* value) const override {
if (!value) {

View File

@@ -667,6 +667,7 @@ segment* segment_pool::allocate_segment(size_t reserve, size_t reclamation_step)
return seg;
}
if (can_allocate_more_memory(segment::size)) {
memory::disable_abort_on_alloc_failure_temporarily dfg;
auto p = aligned_alloc(segment::size, segment::size);
if (!p) {
continue;
@@ -677,10 +678,6 @@ segment* segment_pool::allocate_segment(size_t reserve, size_t reclamation_step)
return seg;
}
} while (shard_tracker().get_impl().compact_and_evict(reclamation_step * segment::size));
if (shard_tracker().should_abort_on_bad_alloc()) {
llogger.error("Aborting due to segment allocation failure");
abort();
}
return nullptr;
}
@@ -2261,6 +2258,7 @@ allocating_section::guard::~guard() {
#ifndef SEASTAR_DEFAULT_ALLOCATOR
void allocating_section::reserve() {
try {
shard_segment_pool.set_emergency_reserve_max(std::max(_lsa_reserve, _minimum_lsa_emergency_reserve));
shard_segment_pool.refill_emergency_reserve();
@@ -2275,6 +2273,13 @@ void allocating_section::reserve() {
}
shard_segment_pool.clear_allocation_failure_flag();
} catch (const std::bad_alloc&) {
if (shard_tracker().should_abort_on_bad_alloc()) {
llogger.error("Aborting due to allocation failure");
abort();
}
throw;
}
}
void allocating_section::on_alloc_failure(logalloc::region& r) {

View File

@@ -710,6 +710,7 @@ public:
while (true) {
try {
logalloc::reclaim_lock _(r);
memory::disable_abort_on_alloc_failure_temporarily dfg;
return fn();
} catch (const std::bad_alloc&) {
on_alloc_failure(r);