Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d630e068b | ||
|
|
5a8e9698d8 | ||
|
|
64f1aa8d99 | ||
|
|
280e6eedb9 | ||
|
|
f80f15a6af | ||
|
|
d0eb0c0b90 | ||
|
|
1427c4d428 | ||
|
|
034f2cb42d | ||
|
|
e043a5c276 | ||
|
|
5da9bd3a6e | ||
|
|
3578027e2e | ||
|
|
7d2150a057 | ||
|
|
afd3c571cc | ||
|
|
093c8512db | ||
|
|
9c0b8ec736 | ||
|
|
1794b732b0 | ||
|
|
c1ac4fb8b0 | ||
|
|
2e7e59fb50 | ||
|
|
af29d4bed3 | ||
|
|
72494bbe05 | ||
|
|
5784823888 | ||
|
|
a7633be1a9 | ||
|
|
e78ded74ce |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=2.1.4
|
||||
VERSION=2.1.6
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -141,7 +141,9 @@ static sstring gensalt() {
|
||||
// blowfish 2011 fix, blowfish, sha512, sha256, md5
|
||||
for (sstring pfx : { "$2y$", "$2a$", "$6$", "$5$", "$1$" }) {
|
||||
salt = pfx + input;
|
||||
if (crypt_r("fisk", salt.c_str(), &tlcrypt)) {
|
||||
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
|
||||
|
||||
if (e && (e[0] != '*')) {
|
||||
prefix = pfx;
|
||||
return salt;
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
// - _next_row_in_range = _next.position() < _upper_bound
|
||||
// - _last_row points at a direct predecessor of the next row which is going to be read.
|
||||
// Used for populating continuity.
|
||||
// - _population_range_starts_before_all_rows is set accordingly
|
||||
reading_from_underlying,
|
||||
|
||||
end_of_stream
|
||||
@@ -96,6 +97,13 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
partition_snapshot_row_cursor _next_row;
|
||||
bool _next_row_in_range = false;
|
||||
|
||||
// True iff current population interval, since the previous clustering row, starts before all clustered rows.
|
||||
// We cannot just look at _lower_bound, because emission of range tombstones changes _lower_bound and
|
||||
// because we mark clustering intervals as continuous when consuming a clustering_row, it would prevent
|
||||
// us from marking the interval as continuous.
|
||||
// Valid when _state == reading_from_underlying.
|
||||
bool _population_range_starts_before_all_rows;
|
||||
|
||||
future<> do_fill_buffer();
|
||||
void copy_from_cache_to_buffer();
|
||||
future<> process_static_row();
|
||||
@@ -225,6 +233,7 @@ inline
|
||||
future<> cache_flat_mutation_reader::do_fill_buffer() {
|
||||
if (_state == state::move_to_underlying) {
|
||||
_state = state::reading_from_underlying;
|
||||
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
|
||||
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
|
||||
: position_in_partition(_upper_bound);
|
||||
return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)}).then([this] {
|
||||
@@ -348,7 +357,7 @@ future<> cache_flat_mutation_reader::read_from_underlying() {
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_update_continuity() {
|
||||
if (can_populate() && (!_ck_ranges_curr->start() || _last_row.refresh(*_snp))) {
|
||||
if (can_populate() && (_population_range_starts_before_all_rows || _last_row.refresh(*_snp))) {
|
||||
if (_next_row.is_in_latest_version()) {
|
||||
clogger.trace("csm {}: mark {} continuous", this, _next_row.get_iterator_in_latest_version()->position());
|
||||
_next_row.get_iterator_in_latest_version()->set_continuous(true);
|
||||
@@ -387,6 +396,7 @@ inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
if (!can_populate()) {
|
||||
_last_row = nullptr;
|
||||
_population_range_starts_before_all_rows = false;
|
||||
_read_context->cache().on_mispopulate();
|
||||
return;
|
||||
}
|
||||
@@ -417,6 +427,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
_last_row = partition_snapshot_row_weakref(*_snp, it);
|
||||
});
|
||||
_population_range_starts_before_all_rows = false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -209,19 +209,18 @@ void query_options::prepare(const std::vector<::shared_ptr<column_specification>
|
||||
}
|
||||
|
||||
auto& names = *_names;
|
||||
std::vector<cql3::raw_value> ordered_values;
|
||||
std::vector<cql3::raw_value_view> ordered_values;
|
||||
ordered_values.reserve(specs.size());
|
||||
for (auto&& spec : specs) {
|
||||
auto& spec_name = spec->name->text();
|
||||
for (size_t j = 0; j < names.size(); j++) {
|
||||
if (names[j] == spec_name) {
|
||||
ordered_values.emplace_back(_values[j]);
|
||||
ordered_values.emplace_back(_value_views[j]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_values = std::move(ordered_values);
|
||||
fill_value_views();
|
||||
_value_views = std::move(ordered_values);
|
||||
}
|
||||
|
||||
void query_options::fill_value_views()
|
||||
|
||||
@@ -202,6 +202,14 @@ public:
|
||||
const query_options& options,
|
||||
gc_clock::time_point now) const override;
|
||||
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const = 0;
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret = values_raw(options);
|
||||
std::sort(ret.begin(),ret.end());
|
||||
ret.erase(std::unique(ret.begin(),ret.end()),ret.end());
|
||||
return ret;
|
||||
}
|
||||
#if 0
|
||||
@Override
|
||||
protected final boolean isSupportedBy(SecondaryIndex index)
|
||||
@@ -224,7 +232,7 @@ public:
|
||||
return abstract_restriction::term_uses_function(_values, ks_name, function_name);
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret;
|
||||
for (auto&& v : _values) {
|
||||
ret.emplace_back(to_bytes_opt(v->bind_and_get(options)));
|
||||
@@ -249,7 +257,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
auto&& lval = dynamic_pointer_cast<multi_item_terminal>(_marker->bind(options));
|
||||
if (!lval) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value for IN restriction");
|
||||
|
||||
@@ -53,6 +53,9 @@ update_parameters::get_prefetched_list(
|
||||
return {};
|
||||
}
|
||||
|
||||
if (column.is_static()) {
|
||||
ckey = clustering_key_view::make_empty();
|
||||
}
|
||||
auto i = _prefetched->rows.find(std::make_pair(std::move(pkey), std::move(ckey)));
|
||||
if (i == _prefetched->rows.end()) {
|
||||
return {};
|
||||
|
||||
@@ -328,9 +328,13 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
|
||||
};
|
||||
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
|
||||
|
||||
// FIXME: Workaround for https://github.com/scylladb/scylla/issues/3552
|
||||
// and https://github.com/scylladb/scylla/issues/3553
|
||||
const bool filtering_broken = true;
|
||||
|
||||
// no clustering filtering is applied if schema defines no clustering key or
|
||||
// compaction strategy thinks it will not benefit from such an optimization.
|
||||
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter()) {
|
||||
if (filtering_broken || !schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter()) {
|
||||
return sstables;
|
||||
}
|
||||
::cf_stats* stats = cf.cf_stats();
|
||||
|
||||
2
dist/ami/build_ami.sh
vendored
2
dist/ami/build_ami.sh
vendored
@@ -43,7 +43,7 @@ done
|
||||
. /etc/os-release
|
||||
case "$ID" in
|
||||
"centos")
|
||||
AMI=ami-4bf3d731
|
||||
AMI=ami-ae7bfdb8
|
||||
REGION=us-east-1
|
||||
SSH_USERNAME=centos
|
||||
;;
|
||||
|
||||
2
dist/ami/files/scylla-ami
vendored
2
dist/ami/files/scylla-ami
vendored
Submodule dist/ami/files/scylla-ami updated: c5d9e9645b...0df779dcca
13
dist/common/scripts/scylla_setup
vendored
13
dist/common/scripts/scylla_setup
vendored
@@ -112,10 +112,15 @@ run_setup_script() {
|
||||
name=$1
|
||||
shift 1
|
||||
$* &&:
|
||||
if [ $? -ne 0 ] && [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
if [ $? -ne 0 ]; then
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
else
|
||||
printf "$name setup failed.\n"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
13
dist/debian/build_deb.sh
vendored
13
dist/debian/build_deb.sh
vendored
@@ -2,10 +2,11 @@
|
||||
|
||||
. /etc/os-release
|
||||
print_usage() {
|
||||
echo "build_deb.sh -target <codename> --dist --rebuild-dep"
|
||||
echo "build_deb.sh -target <codename> --dist --rebuild-dep --jobs 2"
|
||||
echo " --target target distribution codename"
|
||||
echo " --dist create a public distribution package"
|
||||
echo " --no-clean don't rebuild pbuilder tgz"
|
||||
echo " --jobs specify number of jobs"
|
||||
exit 1
|
||||
}
|
||||
install_deps() {
|
||||
@@ -19,6 +20,7 @@ install_deps() {
|
||||
DIST=0
|
||||
TARGET=
|
||||
NO_CLEAN=0
|
||||
JOBS=0
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--dist")
|
||||
@@ -33,6 +35,10 @@ while [ $# -gt 0 ]; do
|
||||
NO_CLEAN=1
|
||||
shift 1
|
||||
;;
|
||||
"--jobs")
|
||||
JOBS=$2
|
||||
shift 2
|
||||
;;
|
||||
*)
|
||||
print_usage
|
||||
;;
|
||||
@@ -239,10 +245,13 @@ if [ $NO_CLEAN -eq 0 ]; then
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder clean
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder create --allow-untrusted
|
||||
fi
|
||||
if [ $JOBS -ne 0 ]; then
|
||||
DEB_BUILD_OPTIONS="parallel=$JOBS"
|
||||
fi
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder update --allow-untrusted
|
||||
if [ "$TARGET" = "trusty" ] || [ "$TARGET" = "xenial" ] || [ "$TARGET" = "yakkety" ] || [ "$TARGET" = "zesty" ] || [ "$TARGET" = "artful" ] || [ "$TARGET" = "bionic" ]; then
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/ubuntu_enable_ppa.sh
|
||||
elif [ "$TARGET" = "jessie" ] || [ "$TARGET" = "stretch" ]; then
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/debian_install_gpgkey.sh
|
||||
fi
|
||||
sudo -H DIST=$TARGET pdebuild --buildresult build/debs
|
||||
sudo -H DIST=$TARGET DEB_BUILD_OPTIONS=$DEB_BUILD_OPTIONS pdebuild --buildresult build/debs
|
||||
|
||||
3
dist/debian/rules.in
vendored
3
dist/debian/rules.in
vendored
@@ -1,12 +1,13 @@
|
||||
#!/usr/bin/make -f
|
||||
|
||||
export PYBUILD_DISABLE=1
|
||||
jobs := $(shell echo $$DEB_BUILD_OPTIONS | sed -r "s/.*parallel=([0-9]+).*/-j\1/")
|
||||
|
||||
override_dh_auto_configure:
|
||||
./configure.py --enable-dpdk --mode=release --static-thrift --static-boost --compiler=@@COMPILER@@ --cflags="-I/opt/scylladb/include -L/opt/scylladb/lib/x86-linux-gnu/" --ldflags="-Wl,-rpath=/opt/scylladb/lib"
|
||||
|
||||
override_dh_auto_build:
|
||||
PATH="/opt/scylladb/bin:$$PATH" ninja
|
||||
PATH="/opt/scylladb/bin:$$PATH" ninja $(jobs)
|
||||
|
||||
override_dh_auto_clean:
|
||||
rm -rf build/release seastar/build
|
||||
|
||||
@@ -984,7 +984,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
|
||||
logger.warn("Assassinating {} via gossip", endpoint);
|
||||
if (es) {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
auto tokens = ss.get_token_metadata().get_tokens(endpoint);
|
||||
tokens = ss.get_token_metadata().get_tokens(endpoint);
|
||||
if (tokens.empty()) {
|
||||
logger.warn("Unable to calculate tokens for {}. Will use a random one", address);
|
||||
throw std::runtime_error(sprint("Unable to calculate tokens for %s", endpoint));
|
||||
|
||||
4
keys.hh
4
keys.hh
@@ -733,6 +733,10 @@ public:
|
||||
static const compound& get_compound_type(const schema& s) {
|
||||
return s.clustering_key_prefix_type();
|
||||
}
|
||||
|
||||
static clustering_key_prefix_view make_empty() {
|
||||
return { bytes_view() };
|
||||
}
|
||||
};
|
||||
|
||||
class clustering_key_prefix : public prefix_compound_wrapper<clustering_key_prefix, clustering_key_prefix_view, clustering_key> {
|
||||
|
||||
@@ -31,6 +31,7 @@ class md5_hasher {
|
||||
CryptoPP::Weak::MD5 hash{};
|
||||
public:
|
||||
void update(const char* ptr, size_t length) {
|
||||
using namespace CryptoPP;
|
||||
static_assert(sizeof(char) == sizeof(byte), "Assuming lengths will be the same");
|
||||
hash.Update(reinterpret_cast<const byte*>(ptr), length * sizeof(byte));
|
||||
}
|
||||
|
||||
@@ -1849,9 +1849,10 @@ void mutation_querier::query_static_row(const row& r, tombstone current_tombston
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__static_row__cells<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
get_compacted_row_slice(_schema, slice, column_kind::static_column,
|
||||
r, slice.static_columns, _static_cells_wr);
|
||||
_memory_accounter.update(stream.size());
|
||||
r, slice.static_columns, out);
|
||||
_memory_accounter.update(stream.size() - start);
|
||||
}
|
||||
if (_pw.requested_digest()) {
|
||||
::feed_hash(_pw.digest(), current_tombstone);
|
||||
@@ -1909,8 +1910,9 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone curr
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__rows<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
write_row(out);
|
||||
stop = _memory_accounter.update_and_check(stream.size());
|
||||
stop = _memory_accounter.update_and_check(stream.size() - start);
|
||||
}
|
||||
|
||||
_live_clustering_rows++;
|
||||
|
||||
@@ -259,6 +259,11 @@ public:
|
||||
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight > 0);
|
||||
}
|
||||
|
||||
bool is_before_all_clustered_rows(const schema& s) const {
|
||||
return _type < partition_region::clustered
|
||||
|| (_type == partition_region::clustered && _ck->is_empty(s) && _bound_weight < 0);
|
||||
}
|
||||
|
||||
template<typename Hasher>
|
||||
void feed_hash(Hasher& hasher, const schema& s) const {
|
||||
::feed_hash(hasher, _bound_weight);
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 2a2c1d2708...c89c8b8043
@@ -144,7 +144,11 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
|
||||
return _db.invoke_on_all([this, rates = std::move(rates), cpuid = engine().cpu_id()] (database& db) {
|
||||
sstring gstate;
|
||||
for (auto& cf : db.get_column_families() | boost::adaptors::filtered(non_system_filter)) {
|
||||
stat s = rates.at(cf.first);
|
||||
auto it = rates.find(cf.first);
|
||||
if (it == rates.end()) { // a table may be added before map/reduce compltes and this code runs
|
||||
continue;
|
||||
}
|
||||
stat s = it->second;
|
||||
float rate = 0;
|
||||
if (s.h) {
|
||||
rate = s.h / (s.h + s.m);
|
||||
|
||||
@@ -1987,10 +1987,9 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
return e.execute_cql("select r1 from tir where p1 in (2, 0, 2, 1);");
|
||||
}).then([&e] (auto msg) {
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
@@ -2012,6 +2011,22 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(1)},
|
||||
});
|
||||
return e.prepare("select r1 from tir where p1 in ?");
|
||||
}).then([&e] (cql3::prepared_cache_key_type prepared_id){
|
||||
auto my_list_type = list_type_impl::get_instance(int32_type, true);
|
||||
std::vector<cql3::raw_value> raw_values;
|
||||
auto in_values_list = my_list_type->decompose(make_list_value(my_list_type,
|
||||
list_type_impl::native_type{{int(2), int(0), int(2), int(1)}}));
|
||||
raw_values.emplace_back(cql3::raw_value::make_value(in_values_list));
|
||||
return e.execute_prepared(prepared_id,raw_values);
|
||||
}).then([&e] (shared_ptr<cql_transport::messages::result_message> msg) {
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2479,3 +2494,66 @@ SEASTAR_TEST_CASE(test_secondary_index_query) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_static_multi_cell_static_lists_with_ckey) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE t (p int, c int, slist list<int> static, v int, PRIMARY KEY (p, c));").get();
|
||||
e.execute_cql("INSERT INTO t (p, c, slist, v) VALUES (1, 1, [1], 1); ").get();
|
||||
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist[0] = 3, v = 3 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{3}}))) },
|
||||
{ int32_type->decompose(3) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [4], v = 4 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{4}}))) },
|
||||
{ int32_type->decompose(4) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [3] + slist , v = 5 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(5) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist + [5] , v = 6 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4, 5}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("DELETE slist[2] from t WHERE p = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist - [4] , v = 7 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3}))) },
|
||||
{ int32_type->decompose(7) }
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,9 @@
|
||||
#include "database.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "frozen_mutation.hh"
|
||||
#include "mutation_source_test.hh"
|
||||
#include "schema_registry.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
|
||||
@@ -79,3 +82,33 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
run_mutation_source_tests([&] (schema_ptr s, const std::vector<mutation>& partitions) -> mutation_source {
|
||||
try {
|
||||
e.local_db().find_column_family(s->ks_name(), s->cf_name());
|
||||
service::get_local_migration_manager().announce_column_family_drop(s->ks_name(), s->cf_name(), true).get();
|
||||
} catch (const no_such_column_family&) {
|
||||
// expected
|
||||
}
|
||||
service::get_local_migration_manager().announce_new_column_family(s, true).get();
|
||||
column_family& cf = e.local_db().find_column_family(s);
|
||||
for (auto&& m : partitions) {
|
||||
e.local_db().apply(cf.schema(), freeze(m)).get();
|
||||
}
|
||||
cf.flush().get();
|
||||
cf.get_row_cache().invalidate([] {}).get();
|
||||
return mutation_source([&] (schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return cf.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
});
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -26,11 +26,13 @@
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <query-result-set.hh>
|
||||
#include <query-result-writer.hh>
|
||||
|
||||
#include "tests/test_services.hh"
|
||||
#include "tests/test-utils.hh"
|
||||
#include "tests/mutation_assertions.hh"
|
||||
#include "tests/result_set_assertions.hh"
|
||||
#include "tests/mutation_source_test.hh"
|
||||
|
||||
#include "mutation_query.hh"
|
||||
#include "core/do_with.hh"
|
||||
@@ -530,3 +532,22 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
std::vector<mutation> mutations = gen(1);
|
||||
schema_ptr s = gen.schema();
|
||||
mutation_source source = make_source(std::move(mutations));
|
||||
query::result_memory_limiter l(std::numeric_limits<ssize_t>::max());
|
||||
query::partition_slice slice = make_full_slice(*s);
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), digest_only_builder).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), result_and_digest_builder).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
|
||||
@@ -656,6 +656,46 @@ void test_streamed_mutation_fragments_have_monotonic_positions(populate_fn popul
|
||||
});
|
||||
}
|
||||
|
||||
static void test_date_tiered_clustering_slicing(populate_fn populate) {
|
||||
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
|
||||
|
||||
simple_schema ss;
|
||||
|
||||
auto s = schema_builder(ss.schema())
|
||||
.set_compaction_strategy(sstables::compaction_strategy_type::date_tiered)
|
||||
.build();
|
||||
|
||||
auto pkey = ss.make_pkey();
|
||||
|
||||
mutation m1(pkey, s);
|
||||
ss.add_static_row(m1, "s");
|
||||
m1.partition().apply(ss.new_tombstone());
|
||||
ss.add_row(m1, ss.make_ckey(0), "v1");
|
||||
|
||||
mutation_source ms = populate(s, {m1});
|
||||
|
||||
// query row outside the range of existing rows to exercise sstable clustering key filter
|
||||
{
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(ss.make_ckey_range(1, 2))
|
||||
.build();
|
||||
auto prange = dht::partition_range::make_singular(pkey);
|
||||
assert_that(ms(s, prange, slice))
|
||||
.produces(m1, slice.row_ranges(*s, pkey.key()))
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make_singular(ss.make_ckey(0)))
|
||||
.build();
|
||||
auto prange = dht::partition_range::make_singular(pkey);
|
||||
assert_that(ms(s, prange, slice))
|
||||
.produces(m1)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
}
|
||||
|
||||
static void test_clustering_slices(populate_fn populate) {
|
||||
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
|
||||
auto s = schema_builder("ks", "cf")
|
||||
@@ -935,6 +975,7 @@ void test_streamed_mutation_forwarding_succeeds_with_no_data(populate_fn populat
|
||||
}
|
||||
|
||||
void run_mutation_reader_tests(populate_fn populate) {
|
||||
test_date_tiered_clustering_slicing(populate);
|
||||
test_fast_forwarding_across_partitions_to_empty_range(populate);
|
||||
test_clustering_slices(populate);
|
||||
test_streamed_mutation_fragments_have_monotonic_positions(populate);
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "mutation.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "streamed_mutation.hh"
|
||||
#include "sstable_utils.hh"
|
||||
|
||||
// Helper for working with the following table:
|
||||
//
|
||||
@@ -147,12 +148,14 @@ public:
|
||||
|
||||
// Creates a sequence of keys in ring order
|
||||
std::vector<dht::decorated_key> make_pkeys(int n) {
|
||||
std::vector<dht::decorated_key> keys;
|
||||
for (int i = 0; i < n; ++i) {
|
||||
keys.push_back(make_pkey(i));
|
||||
}
|
||||
std::sort(keys.begin(), keys.end(), dht::decorated_key::less_comparator(_s));
|
||||
return keys;
|
||||
auto local_keys = make_local_keys(n, _s);
|
||||
return boost::copy_range<std::vector<dht::decorated_key>>(local_keys | boost::adaptors::transformed([this] (sstring& key) {
|
||||
return make_pkey(std::move(key));
|
||||
}));
|
||||
}
|
||||
|
||||
dht::decorated_key make_pkey() {
|
||||
return make_pkey(make_local_key(_s));
|
||||
}
|
||||
|
||||
static std::vector<dht::ring_position> to_ring_positions(const std::vector<dht::decorated_key>& keys) {
|
||||
|
||||
@@ -19,6 +19,45 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "sstables/sstables.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <boost/range/irange.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
|
||||
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, std::vector<mutation> muts);
|
||||
|
||||
//
|
||||
// Make set of keys sorted by token for current shard.
|
||||
//
|
||||
static std::vector<sstring> make_local_keys(unsigned n, const schema_ptr& s, size_t min_key_size = 1) {
|
||||
std::vector<std::pair<sstring, dht::decorated_key>> p;
|
||||
p.reserve(n);
|
||||
|
||||
auto key_id = 0U;
|
||||
auto generated = 0U;
|
||||
while (generated < n) {
|
||||
auto raw_key = sstring(std::max(min_key_size, sizeof(key_id)), int8_t(0));
|
||||
std::copy_n(reinterpret_cast<int8_t*>(&key_id), sizeof(key_id), raw_key.begin());
|
||||
auto dk = dht::global_partitioner().decorate_key(*s, partition_key::from_single_value(*s, to_bytes(raw_key)));
|
||||
key_id++;
|
||||
|
||||
if (engine_is_ready() && engine().cpu_id() != dht::global_partitioner().shard_of(dk.token())) {
|
||||
continue;
|
||||
}
|
||||
generated++;
|
||||
p.emplace_back(std::move(raw_key), std::move(dk));
|
||||
}
|
||||
boost::sort(p, [&] (auto& p1, auto& p2) {
|
||||
return p1.second.less_compare(*s, p2.second);
|
||||
});
|
||||
return boost::copy_range<std::vector<sstring>>(p | boost::adaptors::map_keys);
|
||||
}
|
||||
|
||||
//
|
||||
// Return one key for current shard. Note that it always returns the same key for a given shard.
|
||||
//
|
||||
inline sstring make_local_key(const schema_ptr& s, size_t min_key_size = 1) {
|
||||
return make_local_keys(1, s, min_key_size).front();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user