Compare commits
11 Commits
scylla-2.1
...
next-2.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d630e068b | ||
|
|
5a8e9698d8 | ||
|
|
64f1aa8d99 | ||
|
|
280e6eedb9 | ||
|
|
f80f15a6af | ||
|
|
d0eb0c0b90 | ||
|
|
1427c4d428 | ||
|
|
034f2cb42d | ||
|
|
e043a5c276 | ||
|
|
5da9bd3a6e | ||
|
|
3578027e2e |
@@ -141,7 +141,9 @@ static sstring gensalt() {
|
||||
// blowfish 2011 fix, blowfish, sha512, sha256, md5
|
||||
for (sstring pfx : { "$2y$", "$2a$", "$6$", "$5$", "$1$" }) {
|
||||
salt = pfx + input;
|
||||
if (crypt_r("fisk", salt.c_str(), &tlcrypt)) {
|
||||
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
|
||||
|
||||
if (e && (e[0] != '*')) {
|
||||
prefix = pfx;
|
||||
return salt;
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
// - _next_row_in_range = _next.position() < _upper_bound
|
||||
// - _last_row points at a direct predecessor of the next row which is going to be read.
|
||||
// Used for populating continuity.
|
||||
// - _population_range_starts_before_all_rows is set accordingly
|
||||
reading_from_underlying,
|
||||
|
||||
end_of_stream
|
||||
@@ -96,6 +97,13 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
partition_snapshot_row_cursor _next_row;
|
||||
bool _next_row_in_range = false;
|
||||
|
||||
// True iff current population interval, since the previous clustering row, starts before all clustered rows.
|
||||
// We cannot just look at _lower_bound, because emission of range tombstones changes _lower_bound and
|
||||
// because we mark clustering intervals as continuous when consuming a clustering_row, it would prevent
|
||||
// us from marking the interval as continuous.
|
||||
// Valid when _state == reading_from_underlying.
|
||||
bool _population_range_starts_before_all_rows;
|
||||
|
||||
future<> do_fill_buffer();
|
||||
void copy_from_cache_to_buffer();
|
||||
future<> process_static_row();
|
||||
@@ -225,6 +233,7 @@ inline
|
||||
future<> cache_flat_mutation_reader::do_fill_buffer() {
|
||||
if (_state == state::move_to_underlying) {
|
||||
_state = state::reading_from_underlying;
|
||||
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
|
||||
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
|
||||
: position_in_partition(_upper_bound);
|
||||
return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)}).then([this] {
|
||||
@@ -348,7 +357,7 @@ future<> cache_flat_mutation_reader::read_from_underlying() {
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_update_continuity() {
|
||||
if (can_populate() && (!_ck_ranges_curr->start() || _last_row.refresh(*_snp))) {
|
||||
if (can_populate() && (_population_range_starts_before_all_rows || _last_row.refresh(*_snp))) {
|
||||
if (_next_row.is_in_latest_version()) {
|
||||
clogger.trace("csm {}: mark {} continuous", this, _next_row.get_iterator_in_latest_version()->position());
|
||||
_next_row.get_iterator_in_latest_version()->set_continuous(true);
|
||||
@@ -387,6 +396,7 @@ inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
if (!can_populate()) {
|
||||
_last_row = nullptr;
|
||||
_population_range_starts_before_all_rows = false;
|
||||
_read_context->cache().on_mispopulate();
|
||||
return;
|
||||
}
|
||||
@@ -417,6 +427,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
_last_row = partition_snapshot_row_weakref(*_snp, it);
|
||||
});
|
||||
_population_range_starts_before_all_rows = false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -209,19 +209,18 @@ void query_options::prepare(const std::vector<::shared_ptr<column_specification>
|
||||
}
|
||||
|
||||
auto& names = *_names;
|
||||
std::vector<cql3::raw_value> ordered_values;
|
||||
std::vector<cql3::raw_value_view> ordered_values;
|
||||
ordered_values.reserve(specs.size());
|
||||
for (auto&& spec : specs) {
|
||||
auto& spec_name = spec->name->text();
|
||||
for (size_t j = 0; j < names.size(); j++) {
|
||||
if (names[j] == spec_name) {
|
||||
ordered_values.emplace_back(_values[j]);
|
||||
ordered_values.emplace_back(_value_views[j]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_values = std::move(ordered_values);
|
||||
fill_value_views();
|
||||
_value_views = std::move(ordered_values);
|
||||
}
|
||||
|
||||
void query_options::fill_value_views()
|
||||
|
||||
@@ -202,6 +202,14 @@ public:
|
||||
const query_options& options,
|
||||
gc_clock::time_point now) const override;
|
||||
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const = 0;
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret = values_raw(options);
|
||||
std::sort(ret.begin(),ret.end());
|
||||
ret.erase(std::unique(ret.begin(),ret.end()),ret.end());
|
||||
return ret;
|
||||
}
|
||||
#if 0
|
||||
@Override
|
||||
protected final boolean isSupportedBy(SecondaryIndex index)
|
||||
@@ -224,7 +232,7 @@ public:
|
||||
return abstract_restriction::term_uses_function(_values, ks_name, function_name);
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret;
|
||||
for (auto&& v : _values) {
|
||||
ret.emplace_back(to_bytes_opt(v->bind_and_get(options)));
|
||||
@@ -249,7 +257,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
auto&& lval = dynamic_pointer_cast<multi_item_terminal>(_marker->bind(options));
|
||||
if (!lval) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value for IN restriction");
|
||||
|
||||
@@ -53,6 +53,9 @@ update_parameters::get_prefetched_list(
|
||||
return {};
|
||||
}
|
||||
|
||||
if (column.is_static()) {
|
||||
ckey = clustering_key_view::make_empty();
|
||||
}
|
||||
auto i = _prefetched->rows.find(std::make_pair(std::move(pkey), std::move(ckey)));
|
||||
if (i == _prefetched->rows.end()) {
|
||||
return {};
|
||||
|
||||
13
dist/common/scripts/scylla_setup
vendored
13
dist/common/scripts/scylla_setup
vendored
@@ -112,10 +112,15 @@ run_setup_script() {
|
||||
name=$1
|
||||
shift 1
|
||||
$* &&:
|
||||
if [ $? -ne 0 ] && [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
if [ $? -ne 0 ]; then
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
else
|
||||
printf "$name setup failed.\n"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
4
keys.hh
4
keys.hh
@@ -733,6 +733,10 @@ public:
|
||||
static const compound& get_compound_type(const schema& s) {
|
||||
return s.clustering_key_prefix_type();
|
||||
}
|
||||
|
||||
static clustering_key_prefix_view make_empty() {
|
||||
return { bytes_view() };
|
||||
}
|
||||
};
|
||||
|
||||
class clustering_key_prefix : public prefix_compound_wrapper<clustering_key_prefix, clustering_key_prefix_view, clustering_key> {
|
||||
|
||||
@@ -1849,9 +1849,10 @@ void mutation_querier::query_static_row(const row& r, tombstone current_tombston
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__static_row__cells<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
get_compacted_row_slice(_schema, slice, column_kind::static_column,
|
||||
r, slice.static_columns, _static_cells_wr);
|
||||
_memory_accounter.update(stream.size());
|
||||
r, slice.static_columns, out);
|
||||
_memory_accounter.update(stream.size() - start);
|
||||
}
|
||||
if (_pw.requested_digest()) {
|
||||
::feed_hash(_pw.digest(), current_tombstone);
|
||||
@@ -1909,8 +1910,9 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone curr
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__rows<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
write_row(out);
|
||||
stop = _memory_accounter.update_and_check(stream.size());
|
||||
stop = _memory_accounter.update_and_check(stream.size() - start);
|
||||
}
|
||||
|
||||
_live_clustering_rows++;
|
||||
|
||||
@@ -259,6 +259,11 @@ public:
|
||||
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight > 0);
|
||||
}
|
||||
|
||||
bool is_before_all_clustered_rows(const schema& s) const {
|
||||
return _type < partition_region::clustered
|
||||
|| (_type == partition_region::clustered && _ck->is_empty(s) && _bound_weight < 0);
|
||||
}
|
||||
|
||||
template<typename Hasher>
|
||||
void feed_hash(Hasher& hasher, const schema& s) const {
|
||||
::feed_hash(hasher, _bound_weight);
|
||||
|
||||
@@ -144,7 +144,11 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
|
||||
return _db.invoke_on_all([this, rates = std::move(rates), cpuid = engine().cpu_id()] (database& db) {
|
||||
sstring gstate;
|
||||
for (auto& cf : db.get_column_families() | boost::adaptors::filtered(non_system_filter)) {
|
||||
stat s = rates.at(cf.first);
|
||||
auto it = rates.find(cf.first);
|
||||
if (it == rates.end()) { // a table may be added before map/reduce compltes and this code runs
|
||||
continue;
|
||||
}
|
||||
stat s = it->second;
|
||||
float rate = 0;
|
||||
if (s.h) {
|
||||
rate = s.h / (s.h + s.m);
|
||||
|
||||
@@ -1987,10 +1987,9 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
return e.execute_cql("select r1 from tir where p1 in (2, 0, 2, 1);");
|
||||
}).then([&e] (auto msg) {
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
@@ -2012,6 +2011,22 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(1)},
|
||||
});
|
||||
return e.prepare("select r1 from tir where p1 in ?");
|
||||
}).then([&e] (cql3::prepared_cache_key_type prepared_id){
|
||||
auto my_list_type = list_type_impl::get_instance(int32_type, true);
|
||||
std::vector<cql3::raw_value> raw_values;
|
||||
auto in_values_list = my_list_type->decompose(make_list_value(my_list_type,
|
||||
list_type_impl::native_type{{int(2), int(0), int(2), int(1)}}));
|
||||
raw_values.emplace_back(cql3::raw_value::make_value(in_values_list));
|
||||
return e.execute_prepared(prepared_id,raw_values);
|
||||
}).then([&e] (shared_ptr<cql_transport::messages::result_message> msg) {
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2479,3 +2494,66 @@ SEASTAR_TEST_CASE(test_secondary_index_query) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_static_multi_cell_static_lists_with_ckey) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE t (p int, c int, slist list<int> static, v int, PRIMARY KEY (p, c));").get();
|
||||
e.execute_cql("INSERT INTO t (p, c, slist, v) VALUES (1, 1, [1], 1); ").get();
|
||||
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist[0] = 3, v = 3 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{3}}))) },
|
||||
{ int32_type->decompose(3) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [4], v = 4 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{4}}))) },
|
||||
{ int32_type->decompose(4) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [3] + slist , v = 5 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(5) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist + [5] , v = 6 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4, 5}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("DELETE slist[2] from t WHERE p = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist - [4] , v = 7 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3}))) },
|
||||
{ int32_type->decompose(7) }
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -26,11 +26,13 @@
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <query-result-set.hh>
|
||||
#include <query-result-writer.hh>
|
||||
|
||||
#include "tests/test_services.hh"
|
||||
#include "tests/test-utils.hh"
|
||||
#include "tests/mutation_assertions.hh"
|
||||
#include "tests/result_set_assertions.hh"
|
||||
#include "tests/mutation_source_test.hh"
|
||||
|
||||
#include "mutation_query.hh"
|
||||
#include "core/do_with.hh"
|
||||
@@ -530,3 +532,22 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
std::vector<mutation> mutations = gen(1);
|
||||
schema_ptr s = gen.schema();
|
||||
mutation_source source = make_source(std::move(mutations));
|
||||
query::result_memory_limiter l(std::numeric_limits<ssize_t>::max());
|
||||
query::partition_slice slice = make_full_slice(*s);
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), digest_only_builder).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), result_and_digest_builder).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user