Merge 'secondary_index: fix returned rows token ordering' from Piotr Grabowski

Fixes returned rows ordering to proper signed token ordering. Before this change, rows were sorted by token, but using unsigned comparison, meaning that negative tokens appeared after positive tokens.

Rename `token_column_computation` to `legacy_token_column_computation` and add some comments describing this computation.

Added (new) `token_column_computation` which returns token as `long_type`, which is sorted using signed comparison - the correct ordering of tokens.

Add new `correct_idx_token_in_secondary_index` feature, which flags that the whole cluster is able to use new `token_column_computation`.

Switch token computation in secondary indexes to (new) `token_column_computation`, which fixes the ordering. This column computation type is only set if cluster supports `correct_idx_token_in_secondary_index` feature to make sure that all nodes
will be able to compute new `token_column_computation`. Also old indexes will need to be rebuilt to take advantage of this fix, as new token column computation type is only set for new indexes.

Fix tests according to new token ordering and add one new test to validate this aspect explicitly.

Fixes #7443

Tested manually a scenario when someone created an index on old version of Scylla and then migrated to new Scylla. Old index continued to work properly (but returning in wrong order). Upon dropping and re-creating the index, it still returned the same data, but now in correct order.

Closes #7534

* github.com:scylladb/scylla:
  tests: add token ordering test of indexed selects
  tests: fix tests according to new token ordering
  secondary_index: use new token_column_computation
  feature: add correct_idx_token_in_secondary_index
  column_computation: add token_column_computation
  token_column_computation: rename as legacy
This commit is contained in:
Nadav Har'El
2020-11-04 18:32:58 +02:00
committed by Tomasz Grabiec
12 changed files with 135 additions and 24 deletions

View File

@@ -54,6 +54,36 @@ public:
virtual bytes_opt compute_value(const schema& schema, const partition_key& key, const clustering_row& row) const = 0;
};
/*
* Computes token value of partition key and returns it as bytes.
*
* Should NOT be used (use token_column_computation), because ordering
* of bytes is different than ordering of tokens (signed vs unsigned comparison).
*
* The type name stored for computations of this class is "token" - this was
* the original implementation. (now depracated for new tables)
*/
class legacy_token_column_computation : public column_computation {
public:
virtual column_computation_ptr clone() const override {
return std::make_unique<legacy_token_column_computation>(*this);
}
virtual bytes serialize() const override;
virtual bytes_opt compute_value(const schema& schema, const partition_key& key, const clustering_row& row) const override;
};
/*
* Computes token value of partition key and returns it as long_type.
* The return type means that it can be trivially sorted (for example
* if computed column using this computation is a clustering key),
* preserving the correct order of tokens (using signed comparisons).
*
* Please use this class instead of legacy_token_column_computation.
*
* The type name stored for computations of this class is "token_v2".
* (the name "token" refers to the depracated legacy_token_column_computation)
*/
class token_column_computation : public column_computation {
public:
virtual column_computation_ptr clone() const override {

View File

@@ -891,6 +891,23 @@ static void append_base_key_to_index_ck(std::vector<bytes_view>& exploded_index_
std::move(begin, key_view.end(), std::back_inserter(exploded_index_ck));
}
bytes indexed_table_select_statement::compute_idx_token(const partition_key& key) const {
const column_definition& cdef = *_view_schema->clustering_key_columns().begin();
clustering_row empty_row(clustering_key_prefix::make_empty());
bytes_opt computed_value;
if (!cdef.is_computed()) {
// FIXME(pgrabowski): this legacy code is here for backward compatibility and should be removed
// once "computed_columns feature" is supported by every node
computed_value = legacy_token_column_computation().compute_value(*_schema, key, empty_row);
} else {
computed_value = cdef.get_computation().compute_value(*_schema, key, empty_row);
}
if (!computed_value) {
throw std::logic_error(format("No value computed for idx_token column {}", cdef.name()));
}
return *computed_value;
}
lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement::generate_view_paging_state_from_base_query_results(lw_shared_ptr<const service::pager::paging_state> paging_state,
const foreign_ptr<lw_shared_ptr<query::result>>& results, service::storage_proxy& proxy, service::query_state& state, const query_options& options) const {
const column_definition* cdef = _schema->get_column_definition(to_bytes(_index.target_column()));
@@ -924,7 +941,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
if (_index.metadata().local()) {
exploded_index_ck.push_back(bytes_view(*indexed_column_value));
} else {
token_bytes = dht::get_token(*_schema, last_base_pk).data();
token_bytes = compute_idx_token(last_base_pk);
exploded_index_ck.push_back(bytes_view(token_bytes));
append_base_key_to_index_ck<partition_key>(exploded_index_ck, last_base_pk, *cdef);
}
@@ -1108,7 +1125,7 @@ query::partition_slice indexed_table_select_statement::get_partition_slice_for_g
// Computed token column needs to be added to index view restrictions
const column_definition& token_cdef = *_view_schema->clustering_key_columns().begin();
auto base_pk = partition_key::from_optional_exploded(*_schema, single_pk_restrictions->values(options));
bytes token_value = dht::get_token(*_schema, base_pk).data();
bytes token_value = compute_idx_token(base_pk);
auto token_restriction = ::make_shared<restrictions::single_column_restriction>(token_cdef);
token_restriction->expression = expr::binary_operator{
&token_cdef, expr::oper_t::EQ,

View File

@@ -300,6 +300,8 @@ private:
query::partition_slice get_partition_slice_for_local_index_posting_list(const query_options& options) const;
query::partition_slice get_partition_slice_for_global_index_posting_list(const query_options& options) const;
bytes compute_idx_token(const partition_key& key) const;
};
}

View File

@@ -3050,7 +3050,7 @@ future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manage
// and as such it must be recreated properly.
if (!base_schema->columns_by_name().contains(first_view_ck.name())) {
schema_builder builder{schema_ptr(v)};
builder.mark_column_computed(first_view_ck.name(), std::make_unique<token_column_computation>());
builder.mark_column_computed(first_view_ck.name(), std::make_unique<legacy_token_column_computation>());
return mm.announce_view_update(view_ptr(builder.build()), true);
}
return make_ready_future<>();

View File

@@ -416,7 +416,7 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
}
computed_value = token_column_computation().compute_value(*_base, base_key, update);
computed_value = legacy_token_column_computation().compute_value(*_base, base_key, update);
} else {
computed_value = cdef.get_computation().compute_value(*_base, base_key, update);
}

View File

@@ -143,6 +143,7 @@ extern const std::string_view LWT;
extern const std::string_view PER_TABLE_PARTITIONERS;
extern const std::string_view PER_TABLE_CACHING;
extern const std::string_view DIGEST_FOR_NULL_VALUES;
extern const std::string_view CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX;
}

View File

@@ -62,6 +62,7 @@ constexpr std::string_view features::LWT = "LWT";
constexpr std::string_view features::PER_TABLE_PARTITIONERS = "PER_TABLE_PARTITIONERS";
constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING";
constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES";
constexpr std::string_view features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX = "CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX";
static logging::logger logger("features");
@@ -86,6 +87,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
, _per_table_partitioners_feature(*this, features::PER_TABLE_PARTITIONERS)
, _per_table_caching_feature(*this, features::PER_TABLE_CACHING)
, _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES)
, _correct_idx_token_in_secondary_index_feature(*this, features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX)
{}
feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring> disabled) {
@@ -187,6 +189,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
gms::features::UDF,
gms::features::CDC,
gms::features::DIGEST_FOR_NULL_VALUES,
gms::features::CORRECT_IDX_TOKEN_IN_SECONDARY_INDEX
};
for (const sstring& s : _config._disabled_features) {
@@ -266,6 +269,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
std::ref(_per_table_partitioners_feature),
std::ref(_per_table_caching_feature),
std::ref(_digest_for_null_values_feature),
std::ref(_correct_idx_token_in_secondary_index_feature)
})
{
if (list.contains(f.name())) {

View File

@@ -92,6 +92,7 @@ private:
gms::feature _per_table_partitioners_feature;
gms::feature _per_table_caching_feature;
gms::feature _digest_for_null_values_feature;
gms::feature _correct_idx_token_in_secondary_index_feature;
public:
bool cluster_supports_user_defined_functions() const {
@@ -160,6 +161,10 @@ public:
bool cluster_supports_lwt() const {
return bool(_lwt_feature);
}
bool cluster_supports_correct_idx_token_in_secondary_index() const {
return bool(_correct_idx_token_in_secondary_index_feature);
}
};
} // namespace gms

View File

@@ -48,6 +48,7 @@
#include "schema_builder.hh"
#include "database.hh"
#include "db/view/view.hh"
#include "service/storage_service.hh"
#include <boost/range/adaptor/map.hpp>
#include <boost/algorithm/cxx11/any_of.hpp>
@@ -144,7 +145,13 @@ view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im
builder.with_column(index_target->name(), index_target->type, column_kind::partition_key);
// Additional token column is added to ensure token order on secondary index queries
bytes token_column_name = get_available_token_column_name(*schema);
builder.with_computed_column(token_column_name, bytes_type, column_kind::clustering_key, std::make_unique<token_column_computation>());
if (service::get_local_storage_service().db().local().features().cluster_supports_correct_idx_token_in_secondary_index()) {
builder.with_computed_column(token_column_name, long_type, column_kind::clustering_key, std::make_unique<token_column_computation>());
} else {
// FIXME(pgrabowski): this legacy code is here for backward compatibility and should be removed
// once "supports_correct_idx_token_in_secondary_index" is supported by every node
builder.with_computed_column(token_column_name, bytes_type, column_kind::clustering_key, std::make_unique<legacy_token_column_computation>());
}
for (auto& col : schema->partition_key_columns()) {
if (col == *index_target) {
continue;

View File

@@ -1602,21 +1602,35 @@ column_computation_ptr column_computation::deserialize(const rjson::value& parse
throw std::runtime_error(format("Type {} is not convertible to string", *type_json));
}
if (rjson::to_string_view(*type_json) == "token") {
return std::make_unique<legacy_token_column_computation>();
}
if (rjson::to_string_view(*type_json) == "token_v2") {
return std::make_unique<token_column_computation>();
}
throw std::runtime_error(format("Incorrect column computation type {} found when parsing {}", *type_json, parsed));
}
bytes token_column_computation::serialize() const {
bytes legacy_token_column_computation::serialize() const {
rjson::value serialized = rjson::empty_object();
rjson::set(serialized, "type", rjson::from_string("token"));
return to_bytes(rjson::print(serialized));
}
bytes_opt token_column_computation::compute_value(const schema& schema, const partition_key& key, const clustering_row& row) const {
bytes_opt legacy_token_column_computation::compute_value(const schema& schema, const partition_key& key, const clustering_row& row) const {
return dht::get_token(schema, key).data();
}
bytes token_column_computation::serialize() const {
rjson::value serialized = rjson::empty_object();
rjson::set(serialized, "type", rjson::from_string("token_v2"));
return to_bytes(rjson::print(serialized));
}
bytes_opt token_column_computation::compute_value(const schema& schema, const partition_key& key, const clustering_row& row) const {
auto long_value = dht::token::to_int64(dht::get_token(schema, key));
return long_type->decompose(long_value);
}
bool operator==(const raw_view_info& x, const raw_view_info& y) {
return x._base_id == y._base_id
&& x._base_name == y._base_name

View File

@@ -280,9 +280,9 @@ SEASTAR_TEST_CASE(test_many_columns) {
auto res = e.execute_cql("SELECT * from tab WHERE b = 2").get0();
assert_that(res).is_rows().with_size(3)
.with_rows({
{{int32_type->decompose(0)}, {int32_type->decompose(2)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(4)}, {int32_type->decompose(5)}, {int32_type->decompose(6)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(7)}, {int32_type->decompose(5)}, {int32_type->decompose(0)}},
{{int32_type->decompose(0)}, {int32_type->decompose(2)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}},
});
});
BOOST_TEST_PASSPOINT();
@@ -290,9 +290,9 @@ SEASTAR_TEST_CASE(test_many_columns) {
auto res = e.execute_cql("SELECT * from tab WHERE c = 3").get0();
assert_that(res).is_rows().with_size(3)
.with_rows({
{{int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(3)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(4)}, {int32_type->decompose(5)}, {int32_type->decompose(6)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(7)}, {int32_type->decompose(5)}, {int32_type->decompose(0)}},
{{int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(3)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}},
});
});
BOOST_TEST_PASSPOINT();
@@ -300,8 +300,8 @@ SEASTAR_TEST_CASE(test_many_columns) {
auto res = e.execute_cql("SELECT * from tab WHERE d = 4").get0();
assert_that(res).is_rows().with_size(2)
.with_rows({
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(4)}, {int32_type->decompose(5)}, {int32_type->decompose(6)}},
{{int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(4)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(4)}, {int32_type->decompose(5)}, {int32_type->decompose(6)}},
});
});
BOOST_TEST_PASSPOINT();
@@ -309,9 +309,9 @@ SEASTAR_TEST_CASE(test_many_columns) {
auto res = e.execute_cql("SELECT * from tab WHERE e = 5").get0();
assert_that(res).is_rows().with_size(3)
.with_rows({
{{int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(5)}, {int32_type->decompose(0)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(4)}, {int32_type->decompose(5)}, {int32_type->decompose(6)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(7)}, {int32_type->decompose(5)}, {int32_type->decompose(0)}},
{{int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(5)}, {int32_type->decompose(0)}},
});
});
BOOST_TEST_PASSPOINT();
@@ -319,8 +319,8 @@ SEASTAR_TEST_CASE(test_many_columns) {
auto res = e.execute_cql("SELECT * from tab WHERE f = 6").get0();
assert_that(res).is_rows().with_size(2)
.with_rows({
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(4)}, {int32_type->decompose(5)}, {int32_type->decompose(6)}},
{{int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(0)}, {int32_type->decompose(7)}, {int32_type->decompose(0)}, {int32_type->decompose(6)}},
{{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(3)}, {int32_type->decompose(4)}, {int32_type->decompose(5)}, {int32_type->decompose(6)}},
});
});
});
@@ -477,7 +477,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
expect_more_pages(res, true);
assert_that(res).is_rows().with_rows({{
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
@@ -487,7 +487,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
paging_state = extract_paging_state(res);
assert_that(res).is_rows().with_rows({{
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
@@ -496,7 +496,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
paging_state = extract_paging_state(res);
assert_that(res).is_rows().with_rows({{
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
// Due to implementation differences from origin (Scylla is allowed to return empty pages with has_more_pages == true
@@ -521,7 +521,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
auto paging_state = extract_paging_state(res);
assert_that(res).is_rows().with_rows({{
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
@@ -529,7 +529,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
assert_that(res).is_rows().with_rows({{
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
});
@@ -540,7 +540,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
auto paging_state = extract_paging_state(res);
assert_that(res).is_rows().with_rows({{
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
// Override the actual paging state with one with empty keys,
@@ -1518,6 +1518,37 @@ SEASTAR_TEST_CASE(test_computed_columns) {
});
}
// Ref: #3423 - rows should be returned in token order,
// using signed comparison (ref: #7443)
SEASTAR_TEST_CASE(test_token_order) {
return do_with_cql_env_thread([] (auto& e) {
cquery_nofail(e, "CREATE TABLE t (pk int, v int, PRIMARY KEY(pk))");
cquery_nofail(e, "CREATE INDEX ON t(v)");
for (int i = 0; i < 7; i++) {
cquery_nofail(e, format("INSERT INTO t (pk, v) VALUES ({}, 1)", i).c_str());
}
std::vector<std::vector<bytes_opt>> expected_rows = {
{ int32_type->decompose(5) }, // token(pk) = -7509452495886106294
{ int32_type->decompose(1) }, // token(pk) = -4069959284402364209
{ int32_type->decompose(0) }, // token(pk) = -3485513579396041028
{ int32_type->decompose(2) }, // token(pk) = -3248873570005575792
{ int32_type->decompose(4) }, // token(pk) = -2729420104000364805
{ int32_type->decompose(6) }, // token(pk) = +2705480034054113608
{ int32_type->decompose(3) }, // token(pk) = +9010454139840013625
};
eventually([&] {
auto nonindex_order = cquery_nofail(e, "SELECT pk FROM t");
auto index_order = cquery_nofail(e, "SELECT pk FROM t WHERE v = 1");
assert_that(nonindex_order).is_rows().with_rows(expected_rows);
assert_that(index_order).is_rows().with_rows(expected_rows);
});
});
}
// Ref: #5708 - filtering should be applied on an indexed column
// if the restriction is not eligible for indexing (it's not EQ)
SEASTAR_TEST_CASE(test_filtering_indexed_column) {

View File

@@ -53,17 +53,17 @@ SELECT * FROM t WHERE v = 6;
{
"rows" :
[
{
"c" : "6",
"p1" : "4",
"p2" : "5",
"v" : "6"
},
{
"c" : "5",
"p1" : "3",
"p2" : "4",
"v" : "6"
},
{
"c" : "6",
"p1" : "4",
"p2" : "5",
"v" : "6"
}
]
}