Compare commits
42 Commits
next
...
scylla-4.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
000585522e | ||
|
|
47b121130a | ||
|
|
15f55141ec | ||
|
|
69fbeaa27e | ||
|
|
a366de2a63 | ||
|
|
5bd52e4dba | ||
|
|
8a3a69bc3e | ||
|
|
50c01f7331 | ||
|
|
ecfe466e7b | ||
|
|
69e5caadb6 | ||
|
|
0ff3c0dcb5 | ||
|
|
2148a194c2 | ||
|
|
77ab7b1221 | ||
|
|
59bcd7f029 | ||
|
|
bc5008b165 | ||
|
|
dd7e3d3eab | ||
|
|
3b617164dc | ||
|
|
bb99d7ced6 | ||
|
|
9877246251 | ||
|
|
d966e2d500 | ||
|
|
81831d93d2 | ||
|
|
542a7d28a3 | ||
|
|
1310e6cb48 | ||
|
|
99a6ecb25d | ||
|
|
bc922a743f | ||
|
|
1ec4f50e3c | ||
|
|
9c7ff01c5d | ||
|
|
da29b65e04 | ||
|
|
8c3e8350d6 | ||
|
|
708588bf8b | ||
|
|
b2271800a5 | ||
|
|
209c3512e7 | ||
|
|
4896ce0fd4 | ||
|
|
9d84b1f13d | ||
|
|
a8e372bf94 | ||
|
|
17e5ac9ab1 | ||
|
|
d1d968c6e9 | ||
|
|
e186f66bfe | ||
|
|
78a39e8364 | ||
|
|
bbef05ae3c | ||
|
|
6f324cb732 | ||
|
|
239499a35a |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=4.3.0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1881,7 +1881,8 @@ static std::string get_item_type_string(const rjson::value& v) {
|
||||
|
||||
// calculate_attrs_to_get() takes either AttributesToGet or
|
||||
// ProjectionExpression parameters (having both is *not* allowed),
|
||||
// and returns the list of cells we need to read.
|
||||
// and returns the list of cells we need to read, or an empty set when
|
||||
// *all* attributes are to be returned.
|
||||
// In our current implementation, only top-level attributes are stored
|
||||
// as cells, and nested documents are stored serialized as JSON.
|
||||
// So this function currently returns only the the top-level attributes
|
||||
@@ -2571,6 +2572,10 @@ public:
|
||||
std::unordered_set<std::string>& used_attribute_values);
|
||||
bool check(const rjson::value& item) const;
|
||||
bool filters_on(std::string_view attribute) const;
|
||||
// for_filters_on() runs the given function on the attributes that the
|
||||
// filter works on. It may run for the same attribute more than once if
|
||||
// used more than once in the filter.
|
||||
void for_filters_on(const noncopyable_function<void(std::string_view)>& func) const;
|
||||
operator bool() const { return bool(_imp); }
|
||||
};
|
||||
|
||||
@@ -2651,10 +2656,26 @@ bool filter::filters_on(std::string_view attribute) const {
|
||||
}, *_imp);
|
||||
}
|
||||
|
||||
void filter::for_filters_on(const noncopyable_function<void(std::string_view)>& func) const {
|
||||
if (_imp) {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (const conditions_filter& f) -> void {
|
||||
for (auto it = f.conditions.MemberBegin(); it != f.conditions.MemberEnd(); ++it) {
|
||||
func(rjson::to_string_view(it->name));
|
||||
}
|
||||
},
|
||||
[&] (const expression_filter& f) -> void {
|
||||
return for_condition_expression_on(f.expression, func);
|
||||
}
|
||||
}, *_imp);
|
||||
}
|
||||
}
|
||||
|
||||
class describe_items_visitor {
|
||||
typedef std::vector<const column_definition*> columns_t;
|
||||
const columns_t& _columns;
|
||||
const std::unordered_set<std::string>& _attrs_to_get;
|
||||
std::unordered_set<std::string> _extra_filter_attrs;
|
||||
const filter& _filter;
|
||||
typename columns_t::const_iterator _column_it;
|
||||
rjson::value _item;
|
||||
@@ -2670,7 +2691,20 @@ public:
|
||||
, _item(rjson::empty_object())
|
||||
, _items(rjson::empty_array())
|
||||
, _scanned_count(0)
|
||||
{ }
|
||||
{
|
||||
// _filter.check() may need additional attributes not listed in
|
||||
// _attrs_to_get (i.e., not requested as part of the output).
|
||||
// We list those in _extra_filter_attrs. We will include them in
|
||||
// the JSON but take them out before finally returning the JSON.
|
||||
if (!_attrs_to_get.empty()) {
|
||||
_filter.for_filters_on([&] (std::string_view attr) {
|
||||
std::string a(attr); // no heterogenous maps searches :-(
|
||||
if (!_attrs_to_get.contains(a)) {
|
||||
_extra_filter_attrs.emplace(std::move(a));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void start_row() {
|
||||
_column_it = _columns.begin();
|
||||
@@ -2684,7 +2718,7 @@ public:
|
||||
result_bytes_view->with_linearized([this] (bytes_view bv) {
|
||||
std::string column_name = (*_column_it)->name_as_text();
|
||||
if (column_name != executor::ATTRS_COLUMN_NAME) {
|
||||
if (_attrs_to_get.empty() || _attrs_to_get.contains(column_name)) {
|
||||
if (_attrs_to_get.empty() || _attrs_to_get.contains(column_name) || _extra_filter_attrs.contains(column_name)) {
|
||||
if (!_item.HasMember(column_name.c_str())) {
|
||||
rjson::set_with_string_name(_item, column_name, rjson::empty_object());
|
||||
}
|
||||
@@ -2696,7 +2730,7 @@ public:
|
||||
auto keys_and_values = value_cast<map_type_impl::native_type>(deserialized);
|
||||
for (auto entry : keys_and_values) {
|
||||
std::string attr_name = value_cast<sstring>(entry.first);
|
||||
if (_attrs_to_get.empty() || _attrs_to_get.contains(attr_name)) {
|
||||
if (_attrs_to_get.empty() || _attrs_to_get.contains(attr_name) || _extra_filter_attrs.contains(attr_name)) {
|
||||
bytes value = value_cast<bytes>(entry.second);
|
||||
rjson::set_with_string_name(_item, attr_name, deserialize_item(value));
|
||||
}
|
||||
@@ -2708,6 +2742,11 @@ public:
|
||||
|
||||
void end_row() {
|
||||
if (_filter.check(_item)) {
|
||||
// Remove the extra attributes _extra_filter_attrs which we had
|
||||
// to add just for the filter, and not requested to be returned:
|
||||
for (const auto& attr : _extra_filter_attrs) {
|
||||
rjson::remove_member(_item, attr);
|
||||
}
|
||||
rjson::push_back(_items, std::move(_item));
|
||||
}
|
||||
_item = rjson::empty_object();
|
||||
@@ -2742,7 +2781,7 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
|
||||
for (const column_definition& cdef : schema.partition_key_columns()) {
|
||||
rjson::set_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
|
||||
rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()];
|
||||
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), rjson::parse(to_json_string(*cdef.type, *exploded_pk_it)));
|
||||
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_pk_it, cdef));
|
||||
++exploded_pk_it;
|
||||
}
|
||||
auto ck = paging_state.get_clustering_key();
|
||||
@@ -2752,7 +2791,7 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
|
||||
for (const column_definition& cdef : schema.clustering_key_columns()) {
|
||||
rjson::set_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
|
||||
rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()];
|
||||
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), rjson::parse(to_json_string(*cdef.type, *exploded_ck_it)));
|
||||
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_ck_it, cdef));
|
||||
++exploded_ck_it;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -348,6 +348,39 @@ bool condition_expression_on(const parsed::condition_expression& ce, std::string
|
||||
}, ce._expression);
|
||||
}
|
||||
|
||||
// for_condition_expression_on() runs a given function over all the attributes
|
||||
// mentioned in the expression. If the same attribute is mentioned more than
|
||||
// once, the function will be called more than once for the same attribute.
|
||||
|
||||
static void for_value_on(const parsed::value& v, const noncopyable_function<void(std::string_view)>& func) {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (const parsed::constant& c) { },
|
||||
[&] (const parsed::value::function_call& f) {
|
||||
for (const parsed::value& value : f._parameters) {
|
||||
for_value_on(value, func);
|
||||
}
|
||||
},
|
||||
[&] (const parsed::path& p) {
|
||||
func(p.root());
|
||||
}
|
||||
}, v._value);
|
||||
}
|
||||
|
||||
void for_condition_expression_on(const parsed::condition_expression& ce, const noncopyable_function<void(std::string_view)>& func) {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (const parsed::primitive_condition& cond) {
|
||||
for (const parsed::value& value : cond._values) {
|
||||
for_value_on(value, func);
|
||||
}
|
||||
},
|
||||
[&] (const parsed::condition_expression::condition_list& list) {
|
||||
for (const parsed::condition_expression& cond : list.conditions) {
|
||||
for_condition_expression_on(cond, func);
|
||||
}
|
||||
}
|
||||
}, ce._expression);
|
||||
}
|
||||
|
||||
// The following calculate_value() functions calculate, or evaluate, a parsed
|
||||
// expression. The parsed expression is assumed to have been "resolved", with
|
||||
// the matching resolve_* function.
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
#include <unordered_set>
|
||||
#include <string_view>
|
||||
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
#include "expressions_types.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
@@ -59,6 +61,11 @@ void validate_value(const rjson::value& v, const char* caller);
|
||||
|
||||
bool condition_expression_on(const parsed::condition_expression& ce, std::string_view attribute);
|
||||
|
||||
// for_condition_expression_on() runs the given function on the attributes
|
||||
// that the expression uses. It may run for the same attribute more than once
|
||||
// if the same attribute is used more than once in the expression.
|
||||
void for_condition_expression_on(const parsed::condition_expression& ce, const noncopyable_function<void(std::string_view)>& func);
|
||||
|
||||
// calculate_value() behaves slightly different (especially, different
|
||||
// functions supported) when used in different types of expressions, as
|
||||
// enumerated in this enum:
|
||||
|
||||
@@ -849,6 +849,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
|
||||
static const bytes timestamp_column_name = cdc::log_meta_column_name_bytes("time");
|
||||
static const bytes op_column_name = cdc::log_meta_column_name_bytes("operation");
|
||||
static const bytes eor_column_name = cdc::log_meta_column_name_bytes("end_of_batch");
|
||||
|
||||
auto key_names = boost::copy_range<std::unordered_set<std::string>>(
|
||||
boost::range::join(std::move(base->partition_key_columns()), std::move(base->clustering_key_columns()))
|
||||
@@ -872,7 +873,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
||||
|
||||
auto regular_columns = boost::copy_range<query::column_id_vector>(schema->regular_columns()
|
||||
| boost::adaptors::filtered([](const column_definition& cdef) { return cdef.name() == op_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); })
|
||||
| boost::adaptors::filtered([](const column_definition& cdef) { return cdef.name() == op_column_name || cdef.name() == eor_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); })
|
||||
| boost::adaptors::transformed([&] (const column_definition& cdef) { columns.emplace_back(&cdef); return cdef.id; })
|
||||
);
|
||||
|
||||
@@ -905,6 +906,11 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
return cdef->name->name() == timestamp_column_name;
|
||||
})
|
||||
);
|
||||
auto eor_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == eor_column_name;
|
||||
})
|
||||
);
|
||||
|
||||
std::optional<utils::UUID> timestamp;
|
||||
auto dynamodb = rjson::empty_object();
|
||||
@@ -930,15 +936,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
for (auto& row : result_set->rows()) {
|
||||
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
|
||||
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
|
||||
|
||||
if (timestamp && timestamp != ts) {
|
||||
maybe_add_record();
|
||||
if (limit == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
timestamp = ts;
|
||||
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
|
||||
|
||||
if (!dynamodb.HasMember("Keys")) {
|
||||
auto keys = rjson::empty_object();
|
||||
@@ -991,9 +989,13 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
rjson::set(record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (limit > 0 && timestamp) {
|
||||
maybe_add_record();
|
||||
if (eor) {
|
||||
maybe_add_record();
|
||||
timestamp = ts;
|
||||
if (limit == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto ret = rjson::empty_object();
|
||||
@@ -1047,6 +1049,9 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche
|
||||
if (!db.features().cluster_supports_cdc()) {
|
||||
throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster.");
|
||||
}
|
||||
if (!db.features().cluster_supports_alternator_streams()) {
|
||||
throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster.");
|
||||
}
|
||||
|
||||
cdc::options opts;
|
||||
opts.enabled(true);
|
||||
|
||||
@@ -20,10 +20,16 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "serializer.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "cdc/cdc_options.hh"
|
||||
#include "schema.hh"
|
||||
#include "serializer_impl.hh"
|
||||
|
||||
namespace cdc {
|
||||
|
||||
|
||||
@@ -154,7 +154,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
|
||||
future<db_clock::time_point> get_local_streams_timestamp();
|
||||
|
||||
/* Generate a new set of CDC streams and insert it into the distributed cdc_generation_descriptions table.
|
||||
* Returns the timestamp of this new generation.
|
||||
* Returns the timestamp of this new generation
|
||||
*
|
||||
* Should be called when starting the node for the first time (i.e., joining the ring).
|
||||
*
|
||||
|
||||
32
cdc/log.cc
32
cdc/log.cc
@@ -519,6 +519,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
|
||||
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
|
||||
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
|
||||
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
|
||||
b.set_caching_options(caching_options::get_disabled_caching_options());
|
||||
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
|
||||
for (const auto& column : columns) {
|
||||
@@ -880,14 +881,26 @@ public:
|
||||
return _base_schema;
|
||||
}
|
||||
|
||||
clustering_key create_ck(int batch) const {
|
||||
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
|
||||
}
|
||||
|
||||
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
|
||||
// The numbering of batch sequence numbers starts from 0.
|
||||
clustering_key allocate_new_log_row() {
|
||||
auto log_ck = clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(_batch_no++) });
|
||||
auto log_ck = create_ck(_batch_no++);
|
||||
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
|
||||
return log_ck;
|
||||
}
|
||||
|
||||
bool has_rows() const {
|
||||
return _batch_no != 0;
|
||||
}
|
||||
|
||||
clustering_key last_row_key() const {
|
||||
return create_ck(_batch_no - 1);
|
||||
}
|
||||
|
||||
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
|
||||
clustering_key allocate_new_log_row(operation op) {
|
||||
auto log_ck = allocate_new_log_row();
|
||||
@@ -944,6 +957,11 @@ public:
|
||||
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
|
||||
}
|
||||
|
||||
void end_record() {
|
||||
if (has_rows()) {
|
||||
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
|
||||
}
|
||||
}
|
||||
private:
|
||||
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<bytes>& key) {
|
||||
size_t pos = 0;
|
||||
@@ -1272,6 +1290,13 @@ struct process_change_visitor {
|
||||
_clustering_row_states, _generate_delta_values);
|
||||
visit_row_cells(v);
|
||||
|
||||
if (_enable_updating_state) {
|
||||
// #7716: if there are no regular columns, our visitor would not have visited any cells,
|
||||
// hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced.
|
||||
// Ensure that the row state exists.
|
||||
_clustering_row_states.try_emplace(ckey);
|
||||
}
|
||||
|
||||
_builder.set_operation(log_ck, v._cdc_op);
|
||||
_builder.set_ttl(log_ck, v._ttl_column);
|
||||
}
|
||||
@@ -1519,6 +1544,11 @@ public:
|
||||
cdc::inspect_mutation(m, v);
|
||||
}
|
||||
|
||||
void end_record() override {
|
||||
assert(_builder);
|
||||
_builder->end_record();
|
||||
}
|
||||
|
||||
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
|
||||
// The `transformer` object on which this method was called on should not be used anymore.
|
||||
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {
|
||||
|
||||
@@ -684,6 +684,8 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
|
||||
processor.produce_postimage(&ck);
|
||||
}
|
||||
}
|
||||
|
||||
processor.end_record();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,6 +733,8 @@ void process_changes_without_splitting(const mutation& base_mutation, change_pro
|
||||
processor.produce_postimage(&cr.key());
|
||||
}
|
||||
}
|
||||
|
||||
processor.end_record();
|
||||
}
|
||||
|
||||
} // namespace cdc
|
||||
|
||||
@@ -77,6 +77,10 @@ public:
|
||||
// both columns have different timestamp or TTL set.
|
||||
// m - the small mutation to be converted into CDC log rows.
|
||||
virtual void process_change(const mutation& m) = 0;
|
||||
|
||||
// Tells processor we have reached end of record - last part
|
||||
// of a given timestamp batch
|
||||
virtual void end_record() = 0;
|
||||
};
|
||||
|
||||
bool should_split(const mutation& base_mutation);
|
||||
|
||||
15
db/config.cc
15
db/config.cc
@@ -31,6 +31,7 @@
|
||||
#include <seastar/core/print.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "config.hh"
|
||||
#include "extensions.hh"
|
||||
#include "log.hh"
|
||||
@@ -694,7 +695,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
@@ -792,6 +793,10 @@ db::config::config()
|
||||
db::config::~config()
|
||||
{}
|
||||
|
||||
void db::config::add_cdc_extension() {
|
||||
_extensions->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
|
||||
}
|
||||
|
||||
void db::config::setup_directories() {
|
||||
maybe_in_workdir(commitlog_directory, "commitlog");
|
||||
maybe_in_workdir(data_file_directories, "data");
|
||||
@@ -874,7 +879,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
|
||||
}
|
||||
|
||||
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
||||
if (experimental() && f != experimental_features_t::UNUSED) {
|
||||
if (experimental() && f != experimental_features_t::UNUSED && f != experimental_features_t::UNUSED_CDC) {
|
||||
return true;
|
||||
}
|
||||
const auto& optval = experimental_features();
|
||||
@@ -928,11 +933,13 @@ std::unordered_map<sstring, db::experimental_features_t::feature> db::experiment
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
// Lightweight transactions are no longer experimental. Map them
|
||||
// to UNUSED switch for a while, then remove altogether.
|
||||
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
|
||||
// Change Data Capture is no longer experimental. Map it
|
||||
// to UNUSED_CDC switch for a while, then remove altogether.
|
||||
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", UNUSED_CDC}, {"alternator-streams", ALTERNATOR_STREAMS}};
|
||||
}
|
||||
|
||||
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
|
||||
return {UDF, CDC};
|
||||
return {UDF, ALTERNATOR_STREAMS};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
@@ -81,7 +81,7 @@ namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { UNUSED, UDF, CDC };
|
||||
enum feature { UNUSED, UDF, UNUSED_CDC, ALTERNATOR_STREAMS };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
static std::vector<enum_option<experimental_features_t>> all();
|
||||
};
|
||||
@@ -92,6 +92,9 @@ public:
|
||||
config(std::shared_ptr<db::extensions>);
|
||||
~config();
|
||||
|
||||
// For testing only
|
||||
void add_cdc_extension();
|
||||
|
||||
/// True iff the feature is enabled.
|
||||
bool check_experimental(experimental_features_t::feature f) const;
|
||||
|
||||
|
||||
@@ -111,27 +111,12 @@ public:
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> maybe_delete_large_data_entries(const schema& s, sstring filename, uint64_t data_size) {
|
||||
future<> maybe_delete_large_data_entries(const schema& /*s*/, sstring /*filename*/, uint64_t /*data_size*/) {
|
||||
assert(running());
|
||||
future<> large_partitions = make_ready_future<>();
|
||||
if (__builtin_expect(data_size > _partition_threshold_bytes, false)) {
|
||||
large_partitions = with_sem([&s, filename, this] () mutable {
|
||||
return delete_large_data_entries(s, std::move(filename), db::system_keyspace::LARGE_PARTITIONS);
|
||||
});
|
||||
}
|
||||
future<> large_rows = make_ready_future<>();
|
||||
if (__builtin_expect(data_size > _row_threshold_bytes, false)) {
|
||||
large_rows = with_sem([&s, filename, this] () mutable {
|
||||
return delete_large_data_entries(s, std::move(filename), db::system_keyspace::LARGE_ROWS);
|
||||
});
|
||||
}
|
||||
future<> large_cells = make_ready_future<>();
|
||||
if (__builtin_expect(data_size > _cell_threshold_bytes, false)) {
|
||||
large_cells = with_sem([&s, filename, this] () mutable {
|
||||
return delete_large_data_entries(s, std::move(filename), db::system_keyspace::LARGE_CELLS);
|
||||
});
|
||||
}
|
||||
return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result();
|
||||
|
||||
// Deletion of large data entries is disabled due to #7668
|
||||
// They will evetually expire based on the 30 days TTL.
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
const large_data_handler::stats& stats() const { return _stats; }
|
||||
|
||||
@@ -58,6 +58,7 @@
|
||||
#include "schema_registry.hh"
|
||||
#include "mutation_query.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "system_distributed_keyspace.hh"
|
||||
#include "cql3/cql3_type.hh"
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "cql3/util.hh"
|
||||
@@ -104,6 +105,11 @@ using namespace std::chrono_literals;
|
||||
|
||||
static logging::logger diff_logger("schema_diff");
|
||||
|
||||
static bool is_extra_durable(const sstring& ks_name, const sstring& cf_name) {
|
||||
return (is_system_keyspace(ks_name) && db::system_keyspace::is_extra_durable(cf_name))
|
||||
|| (ks_name == db::system_distributed_keyspace::NAME && db::system_distributed_keyspace::is_extra_durable(cf_name));
|
||||
}
|
||||
|
||||
|
||||
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
|
||||
namespace db {
|
||||
@@ -2499,7 +2505,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
|
||||
builder.with_sharder(smp::count, ctxt.murmur3_partitioner_ignore_msb_bits());
|
||||
}
|
||||
|
||||
if (is_system_keyspace(ks_name) && is_extra_durable(cf_name)) {
|
||||
if (is_extra_durable(ks_name, cf_name)) {
|
||||
builder.set_wait_for_sync_to_commitlog(true);
|
||||
}
|
||||
|
||||
@@ -3035,10 +3041,6 @@ future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manage
|
||||
// format, where "token" is not marked as computed. Once we're sure that all indexes have their
|
||||
// columns marked as computed (because they were either created on a node that supports computed
|
||||
// columns or were fixed by this utility function), it's safe to remove this function altogether.
|
||||
if (!db.features().cluster_supports_computed_columns()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
if (v->clustering_key_size() == 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -201,10 +201,10 @@ static future<std::vector<token_range>> get_local_ranges(database& db) {
|
||||
// All queries will be on that table, where all entries are text and there's no notion of
|
||||
// token ranges form the CQL point of view.
|
||||
auto left_inf = boost::find_if(ranges, [] (auto&& r) {
|
||||
return !r.start() || r.start()->value() == dht::minimum_token();
|
||||
return r.end() && (!r.start() || r.start()->value() == dht::minimum_token());
|
||||
});
|
||||
auto right_inf = boost::find_if(ranges, [] (auto&& r) {
|
||||
return !r.end() || r.start()->value() == dht::maximum_token();
|
||||
return r.start() && (!r.end() || r.end()->value() == dht::maximum_token());
|
||||
});
|
||||
if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
|
||||
local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});
|
||||
|
||||
@@ -113,6 +113,10 @@ static std::vector<schema_ptr> all_tables() {
|
||||
};
|
||||
}
|
||||
|
||||
bool system_distributed_keyspace::is_extra_durable(const sstring& cf_name) {
|
||||
return cf_name == CDC_TOPOLOGY_DESCRIPTION;
|
||||
}
|
||||
|
||||
system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm)
|
||||
: _qp(qp)
|
||||
, _mm(mm) {
|
||||
|
||||
@@ -64,6 +64,10 @@ private:
|
||||
service::migration_manager& _mm;
|
||||
|
||||
public:
|
||||
/* Should writes to the given table always be synchronized by commitlog (flushed to disk)
|
||||
* before being acknowledged? */
|
||||
static bool is_extra_durable(const sstring& cf_name);
|
||||
|
||||
system_distributed_keyspace(cql3::query_processor&, service::migration_manager&);
|
||||
|
||||
future<> start();
|
||||
|
||||
@@ -1241,6 +1241,14 @@ future<> mutate_MV(
|
||||
}
|
||||
}
|
||||
}
|
||||
// It's still possible that a target endpoint is dupliated in the remote endpoints list,
|
||||
// so let's get rid of the duplicate if it exists
|
||||
if (target_endpoint) {
|
||||
auto remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint);
|
||||
if (remote_it != remote_endpoints.end()) {
|
||||
remote_endpoints.erase(remote_it);
|
||||
}
|
||||
}
|
||||
|
||||
if (target_endpoint && *target_endpoint == my_address) {
|
||||
++stats.view_updates_pushed_local;
|
||||
|
||||
31
dist/common/scripts/scylla_raid_setup
vendored
31
dist/common/scripts/scylla_raid_setup
vendored
@@ -36,7 +36,7 @@ if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Configure RAID volume for Scylla.')
|
||||
parser.add_argument('--disks', required=True,
|
||||
help='specify disks for RAID')
|
||||
parser.add_argument('--raiddev', default='/dev/md0',
|
||||
parser.add_argument('--raiddev',
|
||||
help='MD device name for RAID')
|
||||
parser.add_argument('--enable-on-nextboot', '--update-fstab', action='store_true', default=False,
|
||||
help='mount RAID on next boot')
|
||||
@@ -73,9 +73,25 @@ if __name__ == '__main__':
|
||||
print('{} is busy'.format(disk))
|
||||
sys.exit(1)
|
||||
|
||||
if os.path.exists(args.raiddev):
|
||||
print('{} is already using'.format(args.raiddev))
|
||||
sys.exit(1)
|
||||
if len(disks) == 1 and not args.force_raid:
|
||||
raid = False
|
||||
fsdev = disks[0]
|
||||
else:
|
||||
raid = True
|
||||
if args.raiddev is None:
|
||||
raiddevs_to_try = [f'/dev/md{i}' for i in range(10)]
|
||||
else:
|
||||
raiddevs_to_try = [args.raiddev, ]
|
||||
for fsdev in raiddevs_to_try:
|
||||
raiddevname = os.path.basename(fsdev)
|
||||
if not os.path.exists(f'/sys/block/{raiddevname}/md/array_state'):
|
||||
break
|
||||
print(f'{fsdev} is already using')
|
||||
else:
|
||||
if args.raiddev is None:
|
||||
print("Can't find unused /dev/mdX")
|
||||
sys.exit(1)
|
||||
print(f'{fsdev} will be used to setup a RAID')
|
||||
|
||||
if os.path.ismount(mount_at):
|
||||
print('{} is already mounted'.format(mount_at))
|
||||
@@ -94,13 +110,6 @@ if __name__ == '__main__':
|
||||
except SystemdException:
|
||||
md_service = systemd_unit('mdadm.service')
|
||||
|
||||
if len(disks) == 1 and not args.force_raid:
|
||||
raid = False
|
||||
fsdev = disks[0]
|
||||
else:
|
||||
raid = True
|
||||
fsdev = args.raiddev
|
||||
|
||||
print('Creating {type} for scylla using {nr_disk} disk(s): {disks}'.format(type='RAID0' if raid else 'XFS volume', nr_disk=len(disks), disks=args.disks))
|
||||
if distro.name() == 'Ubuntu' and distro.version() == '14.04':
|
||||
if raid:
|
||||
|
||||
54
dist/common/scripts/scylla_util.py
vendored
54
dist/common/scripts/scylla_util.py
vendored
@@ -92,7 +92,7 @@ def scyllabindir():
|
||||
|
||||
|
||||
# @param headers dict of k:v
|
||||
def curl(url, headers=None, byte=False, timeout=3, max_retries=5):
|
||||
def curl(url, headers=None, byte=False, timeout=3, max_retries=5, retry_interval=5):
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
@@ -102,9 +102,8 @@ def curl(url, headers=None, byte=False, timeout=3, max_retries=5):
|
||||
return res.read()
|
||||
else:
|
||||
return res.read().decode('utf-8')
|
||||
except urllib.error.HTTPError:
|
||||
logging.warning("Failed to grab %s..." % url)
|
||||
time.sleep(5)
|
||||
except urllib.error.URLError:
|
||||
time.sleep(retry_interval)
|
||||
retries += 1
|
||||
if retries >= max_retries:
|
||||
raise
|
||||
@@ -188,7 +187,7 @@ class gcp_instance:
|
||||
"""get list of nvme disks from metadata server"""
|
||||
import json
|
||||
try:
|
||||
disksREST=self.__instance_metadata("disks")
|
||||
disksREST=self.__instance_metadata("disks", True)
|
||||
disksobj=json.loads(disksREST)
|
||||
nvmedisks=list(filter(self.isNVME, disksobj))
|
||||
except Exception as e:
|
||||
@@ -236,7 +235,8 @@ class gcp_instance:
|
||||
|
||||
def instance_size(self):
|
||||
"""Returns the size of the instance we are running in. i.e.: 2"""
|
||||
return self.instancetype.split("-")[2]
|
||||
instancetypesplit = self.instancetype.split("-")
|
||||
return instancetypesplit[2] if len(instancetypesplit)>2 else 0
|
||||
|
||||
def instance_class(self):
|
||||
"""Returns the class of the instance we are running in. i.e.: n2"""
|
||||
@@ -298,22 +298,30 @@ class gcp_instance:
|
||||
return self.__firstNvmeSize
|
||||
|
||||
def is_recommended_instance(self):
|
||||
if self.is_recommended_instance_size() and not self.is_unsupported_instance_class() and self.is_supported_instance_class():
|
||||
if not self.is_unsupported_instance_class() and self.is_supported_instance_class() and self.is_recommended_instance_size():
|
||||
# at least 1:2GB cpu:ram ratio , GCP is at 1:4, so this should be fine
|
||||
if self.cpu/self.memoryGB < 0.5:
|
||||
# 30:1 Disk/RAM ratio must be kept at least(AWS), we relax this a little bit
|
||||
# on GCP we are OK with 50:1 , n1-standard-2 can cope with 1 disk, not more
|
||||
diskCount = self.nvmeDiskCount
|
||||
# to reach max performance for > 16 disks we mandate 32 or more vcpus
|
||||
# https://cloud.google.com/compute/docs/disks/local-ssd#performance
|
||||
if diskCount >= 16 and self.cpu < 32:
|
||||
return False
|
||||
diskSize= self.firstNvmeSize
|
||||
if diskCount < 1:
|
||||
return False
|
||||
disktoramratio = (diskCount*diskSize)/self.memoryGB
|
||||
if (disktoramratio <= 50) and (disktoramratio > 0):
|
||||
return True
|
||||
diskCount = self.nvmeDiskCount
|
||||
# to reach max performance for > 16 disks we mandate 32 or more vcpus
|
||||
# https://cloud.google.com/compute/docs/disks/local-ssd#performance
|
||||
if diskCount >= 16 and self.cpu < 32:
|
||||
logging.warning(
|
||||
"This machine doesn't have enough CPUs for allocated number of NVMEs (at least 32 cpus for >=16 disks). Performance will suffer.")
|
||||
return False
|
||||
diskSize = self.firstNvmeSize
|
||||
if diskCount < 1:
|
||||
return False
|
||||
max_disktoramratio = 105
|
||||
# 30:1 Disk/RAM ratio must be kept at least(AWS), we relax this a little bit
|
||||
# on GCP we are OK with {max_disktoramratio}:1 , n1-standard-2 can cope with 1 disk, not more
|
||||
disktoramratio = (diskCount * diskSize) / self.memoryGB
|
||||
if (disktoramratio > max_disktoramratio):
|
||||
logging.warning(
|
||||
f"Instance disk-to-RAM ratio is {disktoramratio}, which is higher than the recommended ratio {max_disktoramratio}. Performance may suffer.")
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
logging.warning("At least 2G of RAM per CPU is needed. Performance will suffer.")
|
||||
return False
|
||||
|
||||
def private_ipv4(self):
|
||||
@@ -398,7 +406,7 @@ class aws_instance:
|
||||
def is_aws_instance(cls):
|
||||
"""Check if it's AWS instance via query to metadata server."""
|
||||
try:
|
||||
curl(cls.META_DATA_BASE_URL, max_retries=2)
|
||||
curl(cls.META_DATA_BASE_URL, max_retries=2, retry_interval=1)
|
||||
return True
|
||||
except (urllib.error.URLError, urllib.error.HTTPError):
|
||||
return False
|
||||
@@ -462,7 +470,7 @@ class aws_instance:
|
||||
|
||||
def ebs_disks(self):
|
||||
"""Returns all EBS disks"""
|
||||
return set(self._disks["ephemeral"])
|
||||
return set(self._disks["ebs"])
|
||||
|
||||
def public_ipv4(self):
|
||||
"""Returns the public IPv4 address of this instance"""
|
||||
@@ -490,9 +498,7 @@ class aws_instance:
|
||||
return curl(self.META_DATA_BASE_URL + "user-data")
|
||||
|
||||
|
||||
# When a CLI tool is not installed, use relocatable CLI tool provided by Scylla
|
||||
scylla_env = os.environ.copy()
|
||||
scylla_env['PATH'] = '{}:{}'.format(scyllabindir(), scylla_env['PATH'])
|
||||
scylla_env['DEBIAN_FRONTEND'] = 'noninteractive'
|
||||
|
||||
def run(cmd, shell=False, silent=False, exception=True):
|
||||
|
||||
4
dist/common/sysctl.d/99-scylla-inotify.conf
vendored
Normal file
4
dist/common/sysctl.d/99-scylla-inotify.conf
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
# allocate enough inotify instances for large machines
|
||||
# each tls instance needs 1 inotify instance, and there can be
|
||||
# multiple tls instances per shard.
|
||||
fs.inotify.max_user_instances = 1200
|
||||
@@ -11,6 +11,7 @@ else
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-sched.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-aio.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-vm.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-inotify.conf || :
|
||||
fi
|
||||
|
||||
#DEBHELPER#
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=666.development
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/scylla-4.3/latest/scylla.repo
|
||||
ARG VERSION=4.3.rc0
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
6
dist/redhat/scylla.spec
vendored
6
dist/redhat/scylla.spec
vendored
@@ -129,10 +129,9 @@ rm -rf $RPM_BUILD_ROOT
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla-housekeeping
|
||||
%ghost /etc/systemd/system/scylla-helper.slice.d/
|
||||
%ghost /etc/systemd/system/scylla-helper.slice.d/memory.conf
|
||||
%ghost /etc/systemd/system/scylla-server.service.d/
|
||||
%ghost /etc/systemd/system/scylla-server.service.d/capabilities.conf
|
||||
%ghost /etc/systemd/system/scylla-server.service.d/mounts.conf
|
||||
%ghost /etc/systemd/system/scylla-server.service.d/dependencies.conf
|
||||
/etc/systemd/system/scylla-server.service.d/dependencies.conf
|
||||
%ghost /etc/systemd/system/var-lib-systemd-coredump.mount
|
||||
%ghost /etc/systemd/system/scylla-cpupower.service
|
||||
%ghost /etc/systemd/system/var-lib-scylla.mount
|
||||
@@ -190,6 +189,8 @@ Summary: Scylla configuration package for the Linux kernel
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
Requires: kmod
|
||||
# tuned overwrites our sysctl settings
|
||||
Obsoletes: tuned
|
||||
|
||||
%description kernel-conf
|
||||
This package contains Linux kernel configuration changes for the Scylla database. Install this package
|
||||
@@ -201,6 +202,7 @@ if Scylla is the main application on your server and you wish to optimize its la
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-aio.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-vm.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-inotify.conf >/dev/null 2>&1 || :
|
||||
|
||||
%files kernel-conf
|
||||
%defattr(-,root,root)
|
||||
|
||||
@@ -143,6 +143,7 @@ extern const std::string_view LWT;
|
||||
extern const std::string_view PER_TABLE_PARTITIONERS;
|
||||
extern const std::string_view PER_TABLE_CACHING;
|
||||
extern const std::string_view DIGEST_FOR_NULL_VALUES;
|
||||
extern const std::string_view ALTERNATOR_STREAMS;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -62,6 +62,7 @@ constexpr std::string_view features::LWT = "LWT";
|
||||
constexpr std::string_view features::PER_TABLE_PARTITIONERS = "PER_TABLE_PARTITIONERS";
|
||||
constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING";
|
||||
constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES";
|
||||
constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS";
|
||||
|
||||
static logging::logger logger("features");
|
||||
|
||||
@@ -86,6 +87,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
|
||||
, _per_table_partitioners_feature(*this, features::PER_TABLE_PARTITIONERS)
|
||||
, _per_table_caching_feature(*this, features::PER_TABLE_CACHING)
|
||||
, _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES)
|
||||
, _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS)
|
||||
{}
|
||||
|
||||
feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring> disabled) {
|
||||
@@ -116,8 +118,8 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
|
||||
}
|
||||
}
|
||||
|
||||
if (!cfg.check_experimental(db::experimental_features_t::CDC)) {
|
||||
fcfg._disabled_features.insert(sstring(gms::features::CDC));
|
||||
if (!cfg.check_experimental(db::experimental_features_t::ALTERNATOR_STREAMS)) {
|
||||
fcfg._disabled_features.insert(sstring(gms::features::ALTERNATOR_STREAMS));
|
||||
}
|
||||
|
||||
return fcfg;
|
||||
@@ -187,6 +189,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
|
||||
gms::features::UDF,
|
||||
gms::features::CDC,
|
||||
gms::features::DIGEST_FOR_NULL_VALUES,
|
||||
gms::features::ALTERNATOR_STREAMS,
|
||||
};
|
||||
|
||||
for (const sstring& s : _config._disabled_features) {
|
||||
@@ -266,6 +269,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
|
||||
std::ref(_per_table_partitioners_feature),
|
||||
std::ref(_per_table_caching_feature),
|
||||
std::ref(_digest_for_null_values_feature),
|
||||
std::ref(_alternator_streams_feature),
|
||||
})
|
||||
{
|
||||
if (list.contains(f.name())) {
|
||||
|
||||
@@ -92,6 +92,7 @@ private:
|
||||
gms::feature _per_table_partitioners_feature;
|
||||
gms::feature _per_table_caching_feature;
|
||||
gms::feature _digest_for_null_values_feature;
|
||||
gms::feature _alternator_streams_feature;
|
||||
|
||||
public:
|
||||
bool cluster_supports_user_defined_functions() const {
|
||||
@@ -160,6 +161,10 @@ public:
|
||||
bool cluster_supports_lwt() const {
|
||||
return bool(_lwt_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_alternator_streams() const {
|
||||
return bool(_alternator_streams_feature);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
|
||||
@@ -142,7 +142,7 @@ DEBIAN_SSL_CERT_FILE="/etc/ssl/certs/ca-certificates.crt"
|
||||
if [ -f "\${DEBIAN_SSL_CERT_FILE}" ]; then
|
||||
c=\${DEBIAN_SSL_CERT_FILE}
|
||||
fi
|
||||
PYTHONPATH="\${d}:\${d}/libexec:\$PYTHONPATH" PATH="\${d}/$pythonpath:\${PATH}" SSL_CERT_FILE="\${c}" exec -a "\$0" "\${d}/libexec/\${b}" "\$@"
|
||||
PYTHONPATH="\${d}:\${d}/libexec:\$PYTHONPATH" PATH="\${d}/../bin:\${d}/$pythonpath:\${PATH}" SSL_CERT_FILE="\${c}" exec -a "\$0" "\${d}/libexec/\${b}" "\$@"
|
||||
EOF
|
||||
chmod +x "$install"
|
||||
}
|
||||
@@ -412,6 +412,10 @@ elif ! $packaging; then
|
||||
chown -R scylla:scylla $rdata
|
||||
chown -R scylla:scylla $rhkdata
|
||||
|
||||
for file in dist/common/sysctl.d/*.conf; do
|
||||
bn=$(basename "$file")
|
||||
sysctl -p "$rusr"/lib/sysctl.d/"$bn"
|
||||
done
|
||||
$rprefix/scripts/scylla_post_install.sh
|
||||
echo "Scylla offline install completed."
|
||||
fi
|
||||
|
||||
3
main.cc
3
main.cc
@@ -1023,8 +1023,7 @@ int main(int ac, char** av) {
|
||||
proxy.invoke_on_all([] (service::storage_proxy& local_proxy) {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.register_subscriber(&local_proxy);
|
||||
//FIXME: discarded future
|
||||
(void)local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
|
||||
return local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
|
||||
}).get();
|
||||
|
||||
supervisor::notify("starting messaging service");
|
||||
|
||||
@@ -2044,11 +2044,13 @@ public:
|
||||
}
|
||||
}
|
||||
void abort(std::exception_ptr ep) {
|
||||
_end_of_stream = true;
|
||||
_ex = std::move(ep);
|
||||
if (_full) {
|
||||
_full->set_exception(_ex);
|
||||
_full.reset();
|
||||
} else if (_not_full) {
|
||||
_not_full->set_exception(_ex);
|
||||
_not_full.reset();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -36,8 +36,14 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
|
||||
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
|
||||
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
|
||||
});
|
||||
}).finally([&wr] {
|
||||
return wr.consume_end_of_stream();
|
||||
}).then_wrapped([&wr] (future<> f) {
|
||||
if (f.failed()) {
|
||||
auto ex = f.get_exception();
|
||||
wr.abort(ex);
|
||||
return make_exception_future<>(ex);
|
||||
} else {
|
||||
return wr.consume_end_of_stream();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -57,6 +57,9 @@ class shard_based_splitting_mutation_writer {
|
||||
}
|
||||
return std::move(_consume_fut);
|
||||
}
|
||||
void abort(std::exception_ptr ep) {
|
||||
_handle.abort(ep);
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
@@ -110,6 +113,13 @@ public:
|
||||
return shard->consume_end_of_stream();
|
||||
});
|
||||
}
|
||||
void abort(std::exception_ptr ep) {
|
||||
for (auto&& shard : _shards) {
|
||||
if (shard) {
|
||||
shard->abort(ep);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer) {
|
||||
|
||||
@@ -144,6 +144,9 @@ class timestamp_based_splitting_mutation_writer {
|
||||
}
|
||||
return std::move(_consume_fut);
|
||||
}
|
||||
void abort(std::exception_ptr ep) {
|
||||
_handle.abort(ep);
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
@@ -188,6 +191,11 @@ public:
|
||||
return bucket.second.consume_end_of_stream();
|
||||
});
|
||||
}
|
||||
void abort(std::exception_ptr ep) {
|
||||
for (auto&& b : _buckets) {
|
||||
b.second.abort(ep);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment&& mf) {
|
||||
|
||||
@@ -542,12 +542,12 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
|
||||
return partition_snapshot_ptr(std::move(snp));
|
||||
}
|
||||
|
||||
std::vector<range_tombstone>
|
||||
partition_snapshot::range_tombstone_result
|
||||
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end)
|
||||
{
|
||||
partition_version* v = &*version();
|
||||
if (!v->next()) {
|
||||
return boost::copy_range<std::vector<range_tombstone>>(
|
||||
return boost::copy_range<range_tombstone_result>(
|
||||
v->partition().row_tombstones().slice(*_schema, start, end));
|
||||
}
|
||||
range_tombstone_list list(*_schema);
|
||||
@@ -557,10 +557,10 @@ partition_snapshot::range_tombstones(position_in_partition_view start, position_
|
||||
}
|
||||
v = v->next();
|
||||
}
|
||||
return boost::copy_range<std::vector<range_tombstone>>(list.slice(*_schema, start, end));
|
||||
return boost::copy_range<range_tombstone_result>(list.slice(*_schema, start, end));
|
||||
}
|
||||
|
||||
std::vector<range_tombstone>
|
||||
partition_snapshot::range_tombstone_result
|
||||
partition_snapshot::range_tombstones()
|
||||
{
|
||||
return range_tombstones(
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "utils/anchorless_list.hh"
|
||||
#include "utils/logalloc.hh"
|
||||
#include "utils/coroutine.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
|
||||
#include <boost/intrusive/parent_from_member.hpp>
|
||||
#include <boost/intrusive/slist.hpp>
|
||||
@@ -400,10 +401,13 @@ public:
|
||||
::static_row static_row(bool digest_requested) const;
|
||||
bool static_row_continuous() const;
|
||||
mutation_partition squashed() const;
|
||||
|
||||
using range_tombstone_result = utils::chunked_vector<range_tombstone>;
|
||||
|
||||
// Returns range tombstones overlapping with [start, end)
|
||||
std::vector<range_tombstone> range_tombstones(position_in_partition_view start, position_in_partition_view end);
|
||||
range_tombstone_result range_tombstones(position_in_partition_view start, position_in_partition_view end);
|
||||
// Returns all range tombstones
|
||||
std::vector<range_tombstone> range_tombstones();
|
||||
range_tombstone_result range_tombstones();
|
||||
};
|
||||
|
||||
class partition_snapshot_ptr {
|
||||
|
||||
@@ -509,7 +509,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class repair_writer {
|
||||
class repair_writer : public enable_lw_shared_from_this<repair_writer> {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
uint64_t _estimated_partitions;
|
||||
@@ -569,8 +569,9 @@ public:
|
||||
table& t = db.local().find_column_family(_schema->id());
|
||||
auto [queue_reader, queue_handle] = make_queue_reader(_schema, _permit);
|
||||
_mq[node_idx] = std::move(queue_handle);
|
||||
auto writer = shared_from_this();
|
||||
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
|
||||
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
|
||||
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions, writer] (flat_mutation_reader reader) {
|
||||
auto& t = db.local().find_column_family(reader.schema());
|
||||
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
|
||||
//FIXME: for better estimations this should be transmitted from remote
|
||||
@@ -598,13 +599,13 @@ public:
|
||||
return consumer(std::move(reader));
|
||||
});
|
||||
},
|
||||
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||
t.stream_in_progress()).then([node_idx, writer] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
writer->_schema->ks_name(), writer->_schema->cf_name(), partitions);
|
||||
}).handle_exception([node_idx, writer] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
_mq[node_idx]->abort(ep);
|
||||
writer->_schema->ks_name(), writer->_schema->cf_name(), ep);
|
||||
writer->_mq[node_idx]->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
@@ -718,7 +719,7 @@ private:
|
||||
size_t _nr_peer_nodes= 1;
|
||||
repair_stats _stats;
|
||||
repair_reader _repair_reader;
|
||||
repair_writer _repair_writer;
|
||||
lw_shared_ptr<repair_writer> _repair_writer;
|
||||
// Contains rows read from disk
|
||||
std::list<repair_row> _row_buf;
|
||||
// Contains rows we are working on to sync between peers
|
||||
@@ -822,7 +823,7 @@ public:
|
||||
_seed,
|
||||
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
|
||||
)
|
||||
, _repair_writer(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason)
|
||||
, _repair_writer(make_lw_shared<repair_writer>(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason))
|
||||
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
|
||||
[&ms] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
|
||||
return ms.local().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
|
||||
@@ -855,7 +856,7 @@ public:
|
||||
auto f2 = _sink_source_for_get_row_diff.close();
|
||||
auto f3 = _sink_source_for_put_row_diff.close();
|
||||
return when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
|
||||
return _repair_writer.wait_for_writer_done();
|
||||
return _repair_writer->wait_for_writer_done();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1340,8 +1341,8 @@ private:
|
||||
|
||||
future<> do_apply_rows(std::list<repair_row>&& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
|
||||
return do_with(std::move(row_diff), [this, node_idx, update_buf] (std::list<repair_row>& row_diff) {
|
||||
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer.create_writer(_db, node_idx);
|
||||
return with_semaphore(_repair_writer->sem(), 1, [this, node_idx, update_buf, &row_diff] {
|
||||
_repair_writer->create_writer(_db, node_idx);
|
||||
return repeat([this, node_idx, update_buf, &row_diff] () mutable {
|
||||
if (row_diff.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
@@ -1355,7 +1356,7 @@ private:
|
||||
// to_repair_rows_list above where the repair_row is created.
|
||||
mutation_fragment mf = std::move(r.get_mutation_fragment());
|
||||
auto dk_with_hash = r.get_dk_with_hash();
|
||||
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
|
||||
return _repair_writer->do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
|
||||
row_diff.pop_front();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
|
||||
@@ -298,7 +298,7 @@ void storage_service::prepare_to_join(
|
||||
_token_metadata.update_normal_tokens(my_tokens, get_broadcast_address());
|
||||
|
||||
_cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0();
|
||||
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
|
||||
if (!_cdc_streams_ts) {
|
||||
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
|
||||
// unless we are restarting after upgrading from non-CDC supported version.
|
||||
// In that case we won't begin a CDC generation: it should be done by one of the nodes
|
||||
@@ -550,7 +550,7 @@ void storage_service::join_token_ring(int delay) {
|
||||
assert(should_bootstrap() || db().local().is_replacing() || !_cdc_streams_ts);
|
||||
}
|
||||
|
||||
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
|
||||
if (!_cdc_streams_ts) {
|
||||
// If we didn't choose a CDC streams timestamp at this point, then either
|
||||
// 1. we're replacing a node which didn't gossip a CDC streams timestamp for whatever reason,
|
||||
// 2. we've already bootstrapped, but are upgrading from a non-CDC version,
|
||||
@@ -570,10 +570,15 @@ void storage_service::join_token_ring(int delay) {
|
||||
if (!db().local().is_replacing()
|
||||
&& (!db::system_keyspace::bootstrap_complete()
|
||||
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
|
||||
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
|
||||
_bootstrap_tokens, _token_metadata, _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
try {
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
|
||||
_bootstrap_tokens, _token_metadata, _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
} catch (...) {
|
||||
cdc_log.warn(
|
||||
"Could not create a new CDC generation: {}. This may make it impossible to use CDC. Use nodetool checkAndRepairCdcStreams to fix CDC generation",
|
||||
std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -893,24 +898,18 @@ void storage_service::bootstrap() {
|
||||
// It doesn't hurt: other nodes will (potentially) just do more generation switches.
|
||||
// We do this because with this new attempt at bootstrapping we picked a different set of tokens.
|
||||
|
||||
if (db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
|
||||
// Update pending ranges now, so we correctly count ourselves as a pending replica
|
||||
// when inserting the new CDC generation.
|
||||
_token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address());
|
||||
update_pending_ranges().get();
|
||||
// Update pending ranges now, so we correctly count ourselves as a pending replica
|
||||
// when inserting the new CDC generation.
|
||||
_token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address());
|
||||
update_pending_ranges().get();
|
||||
|
||||
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
|
||||
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
||||
assert(!_cdc_streams_ts);
|
||||
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
|
||||
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
||||
assert(!_cdc_streams_ts);
|
||||
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
|
||||
_bootstrap_tokens, _token_metadata, _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
} else {
|
||||
// We should not be able to join the cluster if other nodes support CDC but we don't.
|
||||
// The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`).
|
||||
assert(!_feature_service.cluster_supports_cdc());
|
||||
}
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
|
||||
_bootstrap_tokens, _token_metadata, _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
|
||||
_gossiper.add_local_application_state({
|
||||
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
|
||||
@@ -2036,9 +2035,8 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) {
|
||||
return seastar::async([&ss, do_bind] {
|
||||
if (!ss._initialized) {
|
||||
slogger.warn("Starting gossip by operator request");
|
||||
bool cdc_enabled = ss.db().local().get_config().check_experimental(db::experimental_features_t::CDC);
|
||||
ss.set_gossip_tokens(db::system_keyspace::get_local_tokens().get0(),
|
||||
cdc_enabled ? std::make_optional(cdc::get_local_streams_timestamp().get0()) : std::nullopt);
|
||||
std::make_optional(cdc::get_local_streams_timestamp().get0()));
|
||||
ss._gossiper.force_newer_generation();
|
||||
ss._gossiper.start_gossiping(utils::get_generation_number(), gms::bind_messaging_port(bool(do_bind))).then([&ss] {
|
||||
ss._initialized = true;
|
||||
|
||||
@@ -212,16 +212,18 @@ public:
|
||||
};
|
||||
|
||||
struct compaction_writer {
|
||||
shared_sstable sst;
|
||||
// We use a ptr for pointer stability and so that it can be null
|
||||
// when using a noop monitor.
|
||||
sstable_writer writer;
|
||||
// The order in here is important. A monitor must be destroyed before the writer it is monitoring since it has a
|
||||
// periodic timer that checks the writer.
|
||||
// The writer must be destroyed before the shared_sstable since the it may depend on the sstable
|
||||
// (as in the mx::writer over compressed_file_data_sink_impl case that depends on sstables::compression).
|
||||
std::unique_ptr<compaction_write_monitor> monitor;
|
||||
shared_sstable sst;
|
||||
|
||||
compaction_writer(std::unique_ptr<compaction_write_monitor> monitor, sstable_writer writer, shared_sstable sst)
|
||||
: writer(std::move(writer)), monitor(std::move(monitor)), sst(std::move(sst)) {}
|
||||
: sst(std::move(sst)), writer(std::move(writer)), monitor(std::move(monitor)) {}
|
||||
compaction_writer(sstable_writer writer, shared_sstable sst)
|
||||
: compaction_writer(nullptr, std::move(writer), std::move(sst)) {}
|
||||
};
|
||||
@@ -609,10 +611,12 @@ private:
|
||||
std::move(gc_consumer));
|
||||
|
||||
return seastar::async([cfc = std::move(cfc), reader = std::move(reader), this] () mutable {
|
||||
reader.consume_in_thread(std::move(cfc), make_partition_filter(), db::no_timeout);
|
||||
reader.consume_in_thread(std::move(cfc), db::no_timeout);
|
||||
});
|
||||
});
|
||||
return consumer(make_sstable_reader());
|
||||
// producer will filter out a partition before it reaches the consumer(s)
|
||||
auto producer = make_filtering_reader(make_sstable_reader(), make_partition_filter());
|
||||
return consumer(std::move(producer));
|
||||
}
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) {
|
||||
|
||||
@@ -378,6 +378,7 @@ private:
|
||||
_fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows();
|
||||
_out_of_range = false;
|
||||
_range_tombstones.reset();
|
||||
_ready = {};
|
||||
_first_row_encountered = false;
|
||||
}
|
||||
public:
|
||||
|
||||
@@ -86,7 +86,7 @@ ln -s "$SCYLLA" "$SCYLLA_LINK"
|
||||
--alternator-write-isolation=always_use_lwt \
|
||||
--alternator-streams-time-window-s=0 \
|
||||
--developer-mode=1 \
|
||||
--experimental-features=cdc \
|
||||
--experimental-features=alternator-streams \
|
||||
--ring-delay-ms 0 --collectd 0 \
|
||||
--smp 2 -m 1G \
|
||||
--overprovisioned --unsafe-bypass-fsync 1 \
|
||||
|
||||
@@ -658,7 +658,6 @@ def test_filter_expression_and_sort_key_condition(test_table_sn_with_data):
|
||||
# In particular, test that FilterExpression may inspect attributes which will
|
||||
# not be returned by the query, because of the ProjectionExpression.
|
||||
# This test reproduces issue #6951.
|
||||
@pytest.mark.xfail(reason="issue #6951: cannot filter on non-returned attributes")
|
||||
def test_filter_expression_and_projection_expression(test_table):
|
||||
p = random_string()
|
||||
test_table.put_item(Item={'p': p, 'c': 'hi', 'x': 'dog', 'y': 'cat'})
|
||||
|
||||
@@ -386,3 +386,38 @@ def test_query_missing_key(test_table):
|
||||
full_query(test_table, KeyConditions={})
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
full_query(test_table)
|
||||
|
||||
# The paging tests above used a numeric sort key. Let's now also test paging
|
||||
# with a bytes sort key. We already have above a test that bytes sort keys
|
||||
# work and are sorted correctly (test_query_sort_order_bytes), but the
|
||||
# following test adds a check that *paging* works correctly for such keys.
|
||||
# We used to have a bug in this (issue #7768) - the returned LastEvaluatedKey
|
||||
# was incorrectly formatted, breaking the boto3's parsing of the response.
|
||||
# Note we only check the case of bytes *sort* keys in this test. For bytes
|
||||
# *partition* keys, see test_scan_paging_bytes().
|
||||
def test_query_paging_bytes(test_table_sb):
|
||||
p = random_string()
|
||||
items = [{'p': p, 'c': random_bytes()} for i in range(10)]
|
||||
with test_table_sb.batch_writer() as batch:
|
||||
for item in items:
|
||||
batch.put_item(item)
|
||||
# Deliberately pass Limit=1 to enforce paging even though we have
|
||||
# just 10 items in the partition.
|
||||
got_items = full_query(test_table_sb, Limit=1,
|
||||
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'}})
|
||||
got_sort_keys = [x['c'] for x in got_items]
|
||||
expected_sort_keys = sorted(x['c'] for x in items)
|
||||
assert got_sort_keys == expected_sort_keys
|
||||
|
||||
# Similar for test for string clustering keys
|
||||
def test_query_paging_string(test_table_ss):
|
||||
p = random_string()
|
||||
items = [{'p': p, 'c': random_string()} for i in range(10)]
|
||||
with test_table_ss.batch_writer() as batch:
|
||||
for item in items:
|
||||
batch.put_item(item)
|
||||
got_items = full_query(test_table_ss, Limit=1,
|
||||
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'}})
|
||||
got_sort_keys = [x['c'] for x in got_items]
|
||||
expected_sort_keys = sorted(x['c'] for x in items)
|
||||
assert got_sort_keys == expected_sort_keys
|
||||
|
||||
@@ -539,7 +539,6 @@ def test_query_filter_paging(test_table_sn_with_data):
|
||||
# In particular, test that QueryFilter may inspect attributes which will
|
||||
# not be returned by the query, because the AttributesToGet.
|
||||
# This test reproduces issue #6951.
|
||||
@pytest.mark.xfail(reason="issue #6951: cannot filter on non-returned attributes")
|
||||
def test_query_filter_and_attributes_to_get(test_table):
|
||||
p = random_string()
|
||||
test_table.put_item(Item={'p': p, 'c': 'hi', 'x': 'dog', 'y': 'cat'})
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
from util import random_string, full_scan, full_scan_and_count, multiset
|
||||
from util import random_string, random_bytes, full_scan, full_scan_and_count, multiset
|
||||
from boto3.dynamodb.conditions import Attr
|
||||
|
||||
# Test that scanning works fine with/without pagination
|
||||
@@ -264,3 +264,20 @@ def test_scan_parallel_incorrect(filled_test_table):
|
||||
for segment in [7, 9]:
|
||||
with pytest.raises(ClientError, match='ValidationException.*Segment'):
|
||||
full_scan(test_table, TotalSegments=5, Segment=segment)
|
||||
|
||||
# We used to have a bug with formatting of LastEvaluatedKey in the response
|
||||
# of Query and Scan with bytes keys (issue #7768). In test_query_paging_byte()
|
||||
# (test_query.py) we tested the case of bytes *sort* keys. In the following
|
||||
# test we check bytes *partition* keys.
|
||||
def test_scan_paging_bytes(test_table_b):
|
||||
# We will not Scan the entire table - we have no idea what it contains.
|
||||
# But we don't need to scan the entire table - we just need the table
|
||||
# to contain at least two items, and then Scan it with Limit=1 and stop
|
||||
# after one page. Before #7768 was fixed, the test failed when the
|
||||
# LastEvaluatedKey in the response could not be parsed.
|
||||
items = [{'p': random_bytes()}, {'p': random_bytes()}]
|
||||
with test_table_b.batch_writer() as batch:
|
||||
for item in items:
|
||||
batch.put_item(item)
|
||||
response = test_table_b.scan(ConsistentRead=True, Limit=1)
|
||||
assert 'LastEvaluatedKey' in response
|
||||
|
||||
@@ -41,8 +41,10 @@ def test_fetch_from_system_tables(scylla_only, dynamodb):
|
||||
|
||||
key_columns = [item['column_name'] for item in col_response['Items'] if item['kind'] == 'clustering' or item['kind'] == 'partition_key']
|
||||
qualified_name = "{}{}.{}".format(internal_prefix, ks_name, table_name)
|
||||
response = client.scan(TableName=qualified_name, AttributesToGet=key_columns)
|
||||
print(ks_name, table_name, response)
|
||||
import time
|
||||
start = time.time()
|
||||
response = client.scan(TableName=qualified_name, AttributesToGet=key_columns, Limit=50)
|
||||
print(ks_name, table_name, len(str(response)), time.time()-start)
|
||||
|
||||
def test_block_access_to_non_system_tables_with_virtual_interface(scylla_only, test_table_s, dynamodb):
|
||||
client = dynamodb.meta.client
|
||||
|
||||
@@ -42,16 +42,6 @@
|
||||
|
||||
using namespace std::string_literals;
|
||||
|
||||
static cql_test_config mk_cdc_test_config() {
|
||||
auto ext = std::make_shared<db::extensions>();
|
||||
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
|
||||
auto cfg = ::make_shared<db::config>(std::move(ext));
|
||||
auto features = cfg->experimental_features();
|
||||
features.emplace_back(db::experimental_features_t::CDC);
|
||||
cfg->experimental_features(features);
|
||||
return cql_test_config(std::move(cfg));
|
||||
};
|
||||
|
||||
namespace cdc {
|
||||
api::timestamp_type find_timestamp(const mutation&);
|
||||
utils::UUID generate_timeuuid(api::timestamp_type);
|
||||
@@ -131,7 +121,7 @@ SEASTAR_THREAD_TEST_CASE(test_find_mutation_timestamp) {
|
||||
check_stmt("DELETE vut.b FROM t WHERE pk = 0 AND ck = 0");
|
||||
check_stmt("DELETE vfut FROM t WHERE pk = 0 AND ck = 0");
|
||||
check_stmt("DELETE vstatic FROM t WHERE pk = 0");
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_generate_timeuuid) {
|
||||
@@ -199,7 +189,7 @@ SEASTAR_THREAD_TEST_CASE(test_with_cdc_parameter) {
|
||||
test("WITH cdc = {'enabled':'false'}", "{'enabled':'true'}", "{'enabled':'false'}", {false}, {true}, {false});
|
||||
test("", "{'enabled':'true','preimage':'true','postimage':'true','ttl':'1'}", "{'enabled':'false'}", {false}, {true, true, true, 1}, {false});
|
||||
test("WITH cdc = {'enabled':'true','preimage':'true','postimage':'true','ttl':'1'}", "{'enabled':'false'}", "{'enabled':'true','preimage':'false','postimage':'true','ttl':'2'}", {true, true, true, 1}, {false}, {true, false, true, 2});
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_detecting_conflict_of_cdc_log_table_with_existing_table) {
|
||||
@@ -213,7 +203,7 @@ SEASTAR_THREAD_TEST_CASE(test_detecting_conflict_of_cdc_log_table_with_existing_
|
||||
e.execute_cql("CREATE TABLE ks.tbl (a int PRIMARY KEY)").get();
|
||||
e.require_table_exists("ks", "tbl").get();
|
||||
BOOST_REQUIRE_THROW(e.execute_cql("ALTER TABLE ks.tbl WITH cdc = {'enabled': true}").get(), exceptions::invalid_request_exception);
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_log_table) {
|
||||
@@ -247,7 +237,7 @@ SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_log_table) {
|
||||
|
||||
// Disallow DROP
|
||||
assert_unauthorized("DROP TABLE " + log_table);
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_disallow_cdc_on_materialized_view) {
|
||||
@@ -257,7 +247,7 @@ SEASTAR_THREAD_TEST_CASE(test_disallow_cdc_on_materialized_view) {
|
||||
|
||||
BOOST_REQUIRE_THROW(e.execute_cql("CREATE MATERIALIZED VIEW ks.mv AS SELECT a FROM ks.tbl PRIMARY KEY (a) WITH cdc = {'enabled': true}").get(), exceptions::invalid_request_exception);
|
||||
e.require_table_does_not_exist("ks", "mv").get();
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_description) {
|
||||
@@ -285,7 +275,7 @@ SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_description) {
|
||||
|
||||
test_table("cdc_streams_descriptions");
|
||||
test_table("cdc_generation_descriptions");
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
|
||||
@@ -326,6 +316,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
|
||||
// cdc log clustering key
|
||||
assert_has_column(cdc::log_meta_column_name("operation"), byte_type);
|
||||
assert_has_column(cdc::log_meta_column_name("ttl"), long_type);
|
||||
assert_has_column(cdc::log_meta_column_name("end_of_batch"), boolean_type);
|
||||
|
||||
// pk
|
||||
assert_has_column(cdc::log_data_column_name("pk"), int32_type);
|
||||
@@ -370,7 +361,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
|
||||
|
||||
// Check if we missed something
|
||||
BOOST_REQUIRE_EQUAL(required_column_count, log_schema->all_columns_count());
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
static std::vector<std::vector<bytes_opt>> to_bytes(const cql_transport::messages::result_message::rows& rows) {
|
||||
@@ -512,7 +503,7 @@ SEASTAR_THREAD_TEST_CASE(test_primary_key_logging) {
|
||||
// DELETE FROM ks.tbl WHERE pk = 1 AND pk2 = 11
|
||||
assert_row(1, 11);
|
||||
BOOST_REQUIRE(actual_i == actual_end);
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
@@ -534,6 +525,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
auto val_index = column_index(*rows, cdc::log_data_column_name("val"));
|
||||
auto val2_index = column_index(*rows, cdc::log_data_column_name("val2"));
|
||||
auto ttl_index = column_index(*rows, cdc::log_meta_column_name("ttl"));
|
||||
auto eor_index = column_index(*rows, cdc::log_meta_column_name("end_of_batch"));
|
||||
|
||||
auto val_type = int32_type;
|
||||
auto val = *first[0][val_index];
|
||||
@@ -567,7 +559,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
BOOST_REQUIRE_EQUAL(pre_image.size(), i + 1);
|
||||
|
||||
val = *pre_image.back()[val_index];
|
||||
// note: no val2 in pre-image, because we are not modifying it.
|
||||
// note: no val2 in pre-image, because we are not modifying it.
|
||||
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *pre_image.back()[ck2_index]);
|
||||
BOOST_REQUIRE_EQUAL(data_value(last), val_type->deserialize(bytes_view(val)));
|
||||
BOOST_REQUIRE_EQUAL(bytes_opt(), pre_image.back()[ttl_index]);
|
||||
@@ -583,10 +575,12 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
if (post_enabled) {
|
||||
val = *post_image.back()[val_index];
|
||||
val2 = *post_image.back()[val2_index];
|
||||
auto eor = *post_image.back()[eor_index];
|
||||
|
||||
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *post_image.back()[ck2_index]);
|
||||
BOOST_REQUIRE_EQUAL(data_value(nv), val_type->deserialize(bytes_view(val)));
|
||||
BOOST_REQUIRE_EQUAL(data_value(22222), val_type->deserialize(bytes_view(val2)));
|
||||
BOOST_REQUIRE_EQUAL(data_value(true), boolean_type->deserialize(bytes_view(eor)));
|
||||
}
|
||||
|
||||
const auto& ttl_cell = second[second.size() - 2][ttl_index];
|
||||
@@ -608,7 +602,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging_static_row) {
|
||||
@@ -682,7 +676,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging_static_row) {
|
||||
test(true, false);
|
||||
test(false, true);
|
||||
test(false, false);
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_range_deletion) {
|
||||
@@ -691,7 +685,7 @@ SEASTAR_THREAD_TEST_CASE(test_range_deletion) {
|
||||
cquery_nofail(e, "DELETE FROM ks.tbl WHERE pk = 123 AND ck > 1 AND ck < 23");
|
||||
cquery_nofail(e, "DELETE FROM ks.tbl WHERE pk = 123 AND ck >= 4 AND ck <= 56");
|
||||
|
||||
auto msg = e.execute_cql(format("SELECT \"{}\", \"{}\", \"{}\", \"{}\" FROM ks.{}",
|
||||
auto msg = e.execute_cql(format("SELECT \"{}\", \"{}\", \"{}\", \"{}\" FROM ks.{}",
|
||||
cdc::log_meta_column_name("time"),
|
||||
cdc::log_data_column_name("pk"),
|
||||
cdc::log_data_column_name("ck"),
|
||||
@@ -726,7 +720,7 @@ SEASTAR_THREAD_TEST_CASE(test_range_deletion) {
|
||||
// ck >= 4 AND ck <= 56
|
||||
check_row(4, cdc::operation::range_delete_start_inclusive);
|
||||
check_row(56, cdc::operation::range_delete_end_inclusive);
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_add_columns) {
|
||||
@@ -750,11 +744,11 @@ SEASTAR_THREAD_TEST_CASE(test_add_columns) {
|
||||
auto kokos = *inserts.back()[kokos_index];
|
||||
|
||||
BOOST_REQUIRE_EQUAL(data_value("kaka"), kokos_type->deserialize(bytes_view(kokos)));
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
// #5582 - just quickly test that we can create the cdc enabled table on a different shard
|
||||
// and still get the logs proper.
|
||||
// #5582 - just quickly test that we can create the cdc enabled table on a different shard
|
||||
// and still get the logs proper.
|
||||
SEASTAR_THREAD_TEST_CASE(test_cdc_across_shards) {
|
||||
do_with_cql_env_thread([](cql_test_env& e) {
|
||||
if (smp::count < 2) {
|
||||
@@ -772,7 +766,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_across_shards) {
|
||||
auto rows = select_log(e, "tbl");
|
||||
|
||||
BOOST_REQUIRE(!to_bytes_filtered(*rows, cdc::operation::insert).empty());
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_negative_ttl_fail) {
|
||||
@@ -780,7 +774,7 @@ SEASTAR_THREAD_TEST_CASE(test_negative_ttl_fail) {
|
||||
BOOST_REQUIRE_EXCEPTION(e.execute_cql("CREATE TABLE ks.fail (a int PRIMARY KEY, b int) WITH cdc = {'enabled':true,'ttl':'-1'}").get0(),
|
||||
exceptions::configuration_exception,
|
||||
exception_predicate::message_contains("ttl"));
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_ttls) {
|
||||
@@ -830,11 +824,11 @@ SEASTAR_THREAD_TEST_CASE(test_ttls) {
|
||||
auto cell_ttl_seconds = value_cast<int32_t>(cell_ttl);
|
||||
// 30% tolerance in case of slow execution (a little flaky...)
|
||||
BOOST_REQUIRE_CLOSE((float)cell_ttl_seconds, (float)ttl_seconds, 30.f);
|
||||
}
|
||||
}
|
||||
};
|
||||
test_ttl(0);
|
||||
test_ttl(10);
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
// helper funcs + structs for collection testing
|
||||
@@ -851,13 +845,13 @@ struct col_test {
|
||||
data_value post = data_value::make_null(int32_type); // whatever
|
||||
};
|
||||
|
||||
// iterate a set of updates and verify pre and delta values.
|
||||
// iterate a set of updates and verify pre and delta values.
|
||||
static void test_collection(cql_test_env& e, data_type val_type, data_type del_type, std::vector<col_test> tests, translate_func f = [](data_value v) { return v; }) {
|
||||
auto col_type = val_type;
|
||||
|
||||
for (auto& t : tests) {
|
||||
cquery_nofail(e, t.update);
|
||||
|
||||
|
||||
auto rows = select_log(e, "tbl");
|
||||
auto pre_image = to_bytes_filtered(*rows, cdc::operation::pre_image);
|
||||
auto updates = to_bytes_filtered(*rows, cdc::operation::update);
|
||||
@@ -918,7 +912,7 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
|
||||
auto map_keys_type = set_type_impl::get_instance(utf8_type, false);
|
||||
|
||||
test_collection(e, map_type, map_keys_type, {
|
||||
{
|
||||
{
|
||||
"UPDATE ks.tbl set val = { 'apa':'ko' } where pk=1 and pk2=11 and ck=111",
|
||||
data_value::make_null(map_type), // no previous value
|
||||
{
|
||||
@@ -930,7 +924,7 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
|
||||
},
|
||||
::make_map_value(map_type, { { "apa", "ko" } })
|
||||
},
|
||||
{
|
||||
{
|
||||
"UPDATE ks.tbl set val = val + { 'ninja':'mission' } where pk=1 and pk2=11 and ck=111",
|
||||
::make_map_value(map_type, { { "apa", "ko" } }),
|
||||
{
|
||||
@@ -941,9 +935,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
|
||||
},
|
||||
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } })
|
||||
},
|
||||
{
|
||||
{
|
||||
"UPDATE ks.tbl set val['ninja'] = 'shuriken' where pk=1 and pk2=11 and ck=111",
|
||||
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } }),
|
||||
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } }),
|
||||
{
|
||||
{
|
||||
::make_map_value(map_type, { { "ninja", "shuriken" } }),
|
||||
@@ -952,9 +946,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
|
||||
},
|
||||
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } })
|
||||
},
|
||||
{
|
||||
{
|
||||
"UPDATE ks.tbl set val['apa'] = null where pk=1 and pk2=11 and ck=111",
|
||||
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } }),
|
||||
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } }),
|
||||
{
|
||||
{
|
||||
data_value::make_null(map_type),
|
||||
@@ -963,9 +957,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
|
||||
},
|
||||
::make_map_value(map_type, { { "ninja", "shuriken" } })
|
||||
},
|
||||
{
|
||||
{
|
||||
"UPDATE ks.tbl set val['ninja'] = null, val['ola'] = 'kokos' where pk=1 and pk2=11 and ck=111",
|
||||
::make_map_value(map_type, { { "ninja", "shuriken" } }),
|
||||
::make_map_value(map_type, { { "ninja", "shuriken" } }),
|
||||
{
|
||||
{
|
||||
::make_map_value(map_type, { { "ola", "kokos" } }),
|
||||
@@ -974,9 +968,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
|
||||
},
|
||||
::make_map_value(map_type, { { "ola", "kokos" } })
|
||||
},
|
||||
{
|
||||
{
|
||||
"UPDATE ks.tbl set val = { 'bolla':'trolla', 'kork':'skruv' } where pk=1 and pk2=11 and ck=111",
|
||||
::make_map_value(map_type, { { "ola", "kokos" } }),
|
||||
::make_map_value(map_type, { { "ola", "kokos" } }),
|
||||
{
|
||||
{
|
||||
::make_map_value(map_type, { { "bolla", "trolla" }, { "kork", "skruv" } }),
|
||||
@@ -988,7 +982,7 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
|
||||
}
|
||||
|
||||
});
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_set_logging) {
|
||||
@@ -999,7 +993,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
|
||||
});
|
||||
|
||||
auto set_type = set_type_impl::get_instance(utf8_type, false);
|
||||
|
||||
|
||||
test_collection(e, set_type, set_type, {
|
||||
{
|
||||
"UPDATE ks.tbl set val = { 'apa', 'ko' } where pk=1 and pk2=11 and ck=111",
|
||||
@@ -1026,7 +1020,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
|
||||
},
|
||||
{
|
||||
"UPDATE ks.tbl set val = val - { 'apa' } where pk=1 and pk2=11 and ck=111",
|
||||
::make_set_value(set_type, { "apa", "ko", "mission", "ninja" }),
|
||||
::make_set_value(set_type, { "apa", "ko", "mission", "ninja" }),
|
||||
{
|
||||
{
|
||||
data_value::make_null(set_type),
|
||||
@@ -1037,7 +1031,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
|
||||
},
|
||||
{
|
||||
"UPDATE ks.tbl set val = val - { 'mission' }, val = val + { 'nils' } where pk=1 and pk2=11 and ck=111",
|
||||
::make_set_value(set_type, { "ko", "mission", "ninja" }),
|
||||
::make_set_value(set_type, { "ko", "mission", "ninja" }),
|
||||
{
|
||||
{
|
||||
::make_set_value(set_type, { "nils" }),
|
||||
@@ -1059,7 +1053,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
|
||||
::make_set_value(set_type, { "bolla", "trolla" })
|
||||
}
|
||||
});
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_list_logging) {
|
||||
@@ -1072,11 +1066,11 @@ SEASTAR_THREAD_TEST_CASE(test_list_logging) {
|
||||
auto list_type = list_type_impl::get_instance(utf8_type, false);
|
||||
auto uuids_type = set_type_impl::get_instance(timeuuid_type, false);
|
||||
auto val_type = map_type_impl::get_instance(list_type->name_comparator(), list_type->value_comparator(), false);
|
||||
|
||||
|
||||
test_collection(e, val_type, uuids_type, {
|
||||
{
|
||||
"UPDATE ks.tbl set val = [ 'apa', 'ko' ] where pk=1 and pk2=11 and ck=111",
|
||||
data_value::make_null(list_type),
|
||||
data_value::make_null(list_type),
|
||||
{
|
||||
{
|
||||
::make_list_value(list_type, { "apa", "ko" }),
|
||||
@@ -1121,7 +1115,7 @@ SEASTAR_THREAD_TEST_CASE(test_list_logging) {
|
||||
},
|
||||
{
|
||||
"UPDATE ks.tbl set val[0] = 'babar' where pk=1 and pk2=11 and ck=111",
|
||||
::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }),
|
||||
::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }),
|
||||
{
|
||||
{
|
||||
::make_list_value(list_type, { "babar" }),
|
||||
@@ -1151,7 +1145,7 @@ SEASTAR_THREAD_TEST_CASE(test_list_logging) {
|
||||
}
|
||||
return ::make_list_value(list_type, std::move(cpy));
|
||||
});
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
|
||||
@@ -1163,7 +1157,7 @@ SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
|
||||
e.execute_cql("DROP TYPE ks.mytype").get();
|
||||
});
|
||||
|
||||
auto udt_type = user_type_impl::get_instance("ks", to_bytes("mytype"),
|
||||
auto udt_type = user_type_impl::get_instance("ks", to_bytes("mytype"),
|
||||
{ to_bytes("field0"), to_bytes("field1") },
|
||||
{ int32_type, utf8_type },
|
||||
false
|
||||
@@ -1171,18 +1165,18 @@ SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
|
||||
auto index_set_type = set_type_impl::get_instance(short_type, false);
|
||||
auto f0_type = int32_type;
|
||||
auto f1_type = utf8_type;
|
||||
|
||||
|
||||
auto make_tuple = [&](std::optional<std::optional<int32_t>> i, std::optional<std::optional<sstring>> s) {
|
||||
return ::make_user_value(udt_type, {
|
||||
i ? ::data_value(*i) : data_value::make_null(f0_type),
|
||||
s ? ::data_value(*s) : data_value::make_null(f1_type),
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
test_collection(e, udt_type, index_set_type, {
|
||||
{
|
||||
"UPDATE ks.tbl set val = { field0: 12, field1: 'ko' } where pk=1 and pk2=11 and ck=111",
|
||||
data_value::make_null(udt_type),
|
||||
data_value::make_null(udt_type),
|
||||
{
|
||||
{
|
||||
make_tuple(12, "ko"),
|
||||
@@ -1238,7 +1232,7 @@ SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
|
||||
make_tuple(1, "bolla")
|
||||
},
|
||||
});
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_frozen_logging) {
|
||||
@@ -1289,7 +1283,7 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_logging) {
|
||||
test_frozen("frozen<set<text>>", "{'a', 'bb', 'ccc'}");
|
||||
test_frozen("frozen<map<text, text>>", "{'a': 'bb', 'ccc': 'dddd'}");
|
||||
test_frozen("frozen<udt>", "{a: 'bb', ccc: 'dddd'}");
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_update_insert_delete_distinction) {
|
||||
@@ -1321,7 +1315,32 @@ SEASTAR_THREAD_TEST_CASE(test_update_insert_delete_distinction) {
|
||||
|
||||
BOOST_REQUIRE_EQUAL(results[3].size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(*results[3].front(), data_value(static_cast<int8_t>(cdc::operation::row_delete)).serialize_nonnull()); // log entry from (3)
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
static std::vector<std::vector<data_value>> get_result(cql_test_env& e,
|
||||
const std::vector<data_type>& col_types, const sstring& query) {
|
||||
auto deser = [] (const data_type& t, const bytes_opt& b) -> data_value {
|
||||
if (!b) {
|
||||
return data_value::make_null(t);
|
||||
}
|
||||
return t->deserialize(*b);
|
||||
};
|
||||
|
||||
auto msg = e.execute_cql(query).get0();
|
||||
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
|
||||
BOOST_REQUIRE(rows);
|
||||
|
||||
std::vector<std::vector<data_value>> res;
|
||||
for (auto&& r: to_bytes(*rows)) {
|
||||
BOOST_REQUIRE_LE(col_types.size(), r.size());
|
||||
std::vector<data_value> res_r;
|
||||
for (size_t i = 0; i < col_types.size(); ++i) {
|
||||
res_r.push_back(deser(col_types[i], r[i]));
|
||||
}
|
||||
res.push_back(std::move(res_r));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
|
||||
@@ -1346,28 +1365,8 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
|
||||
return make_set_value(keys_type, std::move(s));
|
||||
};
|
||||
|
||||
auto deser = [] (const data_type& t, const bytes_opt& b) -> data_value {
|
||||
if (!b) {
|
||||
return data_value::make_null(t);
|
||||
}
|
||||
return t->deserialize(*b);
|
||||
};
|
||||
|
||||
auto get_result = [&] (const std::vector<data_type>& col_types, const sstring& s) -> std::vector<std::vector<data_value>> {
|
||||
auto msg = e.execute_cql(s).get0();
|
||||
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
|
||||
BOOST_REQUIRE(rows);
|
||||
|
||||
std::vector<std::vector<data_value>> res;
|
||||
for (auto&& r: to_bytes(*rows)) {
|
||||
BOOST_REQUIRE_LE(col_types.size(), r.size());
|
||||
std::vector<data_value> res_r;
|
||||
for (size_t i = 0; i < col_types.size(); ++i) {
|
||||
res_r.push_back(deser(col_types[i], r[i]));
|
||||
}
|
||||
res.push_back(std::move(res_r));
|
||||
}
|
||||
return res;
|
||||
return ::get_result(e, col_types, s);
|
||||
};
|
||||
|
||||
cquery_nofail(e, "create table ks.t (pk int, ck int, s int static, v1 int, v2 int, m map<int, int>, primary key (pk, ck)) with cdc = {'enabled':true}");
|
||||
@@ -1566,7 +1565,7 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
|
||||
};
|
||||
BOOST_REQUIRE_EQUAL(expected, result);
|
||||
}
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_batch_with_row_delete) {
|
||||
@@ -1630,7 +1629,7 @@ SEASTAR_THREAD_TEST_CASE(test_batch_with_row_delete) {
|
||||
BOOST_REQUIRE_EQUAL(deser(s_type, r[3]), er[3]);
|
||||
BOOST_REQUIRE_EQUAL(deser(oper_type, r[4]), er[4]);
|
||||
}
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
struct image_set {
|
||||
@@ -1939,7 +1938,7 @@ void test_batch_images(bool preimage, bool postimage) {
|
||||
}
|
||||
}
|
||||
}, preimage, postimage);
|
||||
}, mk_cdc_test_config()).get();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_batch_pre_image) {
|
||||
@@ -1953,3 +1952,24 @@ SEASTAR_THREAD_TEST_CASE(test_batch_post_image) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_batch_pre_post_image) {
|
||||
test_batch_images(true, true);
|
||||
}
|
||||
|
||||
// Regression test for #7716
|
||||
SEASTAR_THREAD_TEST_CASE(test_postimage_with_no_regular_columns) {
|
||||
do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
using oper_ut = std::underlying_type_t<cdc::operation>;
|
||||
|
||||
cquery_nofail(e, "create table ks.t (pk int, ck int, primary key (pk, ck)) with cdc = {'enabled': true, 'postimage': true}");
|
||||
cquery_nofail(e, "insert into ks.t (pk, ck) values (1, 2)");
|
||||
|
||||
auto result = get_result(e,
|
||||
{data_type_for<oper_ut>(), int32_type, int32_type},
|
||||
"select \"cdc$operation\", pk, ck from ks.t_scylla_cdc_log");
|
||||
|
||||
std::vector<std::vector<data_value>> expected = {
|
||||
{ oper_ut(cdc::operation::insert), int32_t(1), int32_t(2) },
|
||||
{ oper_ut(cdc::operation::post_image), int32_t(1), int32_t(2) },
|
||||
};
|
||||
|
||||
BOOST_REQUIRE_EQUAL(expected, result);
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -931,10 +931,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED_CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -943,9 +944,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -954,9 +956,22 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_alternator_streams) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - alternator-streams\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::ALTERNATOR_STREAMS});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -964,10 +979,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::UNUSED_CDC, ef::UNUSED, ef::UNUSED_CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -979,9 +995,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
[&cfg] (const sstring& opt, const sstring& msg, std::optional<value_status> status) {
|
||||
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
});
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -990,9 +1007,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -1000,8 +1018,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -128,12 +128,14 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
|
||||
});
|
||||
}).get();
|
||||
|
||||
// Since deletion of large data entries has been deleted,
|
||||
// expect the record to be present.
|
||||
assert_that(e.execute_cql("select partition_key from system.large_rows where table_name = 'tbl' allow filtering;").get0())
|
||||
.is_rows()
|
||||
.is_empty();
|
||||
.with_size(1);
|
||||
assert_that(e.execute_cql("select partition_key from system.large_cells where table_name = 'tbl' allow filtering;").get0())
|
||||
.is_rows()
|
||||
.is_empty();
|
||||
.with_size(1);
|
||||
|
||||
return make_ready_future<>();
|
||||
}, cfg).get();
|
||||
|
||||
@@ -118,7 +118,6 @@ SEASTAR_TEST_CASE(cdc_schema_extension) {
|
||||
// Extensions have to be registered here - config needs to have them before construction of test env.
|
||||
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
|
||||
auto cfg = ::make_shared<db::config>(ext);
|
||||
cfg->experimental_features({db::experimental_features_t::feature::CDC});
|
||||
|
||||
return do_with_cql_env([] (cql_test_env& e) {
|
||||
auto assert_ext_correctness = [] (cql_test_env& e, cdc::cdc_extension expected_ext) {
|
||||
|
||||
@@ -2715,7 +2715,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
|
||||
}
|
||||
}
|
||||
|
||||
// abort()
|
||||
// abort() -- check that consumer is aborted
|
||||
{
|
||||
auto [reader, handle] = make_queue_reader(gen.schema(), tests::make_permit());
|
||||
auto fill_buffer_fut = reader.fill_buffer(db::no_timeout);
|
||||
@@ -2730,6 +2730,28 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
|
||||
|
||||
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
|
||||
BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), tests::make_permit(), partition_end{})).get(), std::runtime_error);
|
||||
BOOST_REQUIRE(!reader.is_end_of_stream());
|
||||
}
|
||||
|
||||
// abort() -- check that producer is aborted
|
||||
{
|
||||
auto [reader, handle] = make_queue_reader(gen.schema(), tests::make_permit());
|
||||
reader.set_max_buffer_size(1);
|
||||
|
||||
auto expected_reader = flat_mutation_reader_from_mutations(tests::make_permit(), expected_muts);
|
||||
|
||||
auto push_fut = make_ready_future<>();
|
||||
while (push_fut.available()) {
|
||||
push_fut = handle.push(std::move(*expected_reader(db::no_timeout).get0()));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(!push_fut.available());
|
||||
|
||||
handle.abort(std::make_exception_ptr<std::runtime_error>(std::runtime_error("error")));
|
||||
|
||||
BOOST_REQUIRE_THROW(reader.fill_buffer(db::no_timeout).get(), std::runtime_error);
|
||||
BOOST_REQUIRE_THROW(push_fut.get(), std::runtime_error);
|
||||
BOOST_REQUIRE(!reader.is_end_of_stream());
|
||||
}
|
||||
|
||||
// Detached handle
|
||||
|
||||
@@ -49,7 +49,7 @@ static thread_local mutation_application_stats app_stats_for_tests;
|
||||
// Verifies that tombstones in "list" are monotonic, overlap with the requested range,
|
||||
// and have information equivalent with "expected" in that range.
|
||||
static
|
||||
void check_tombstone_slice(const schema& s, std::vector<range_tombstone> list,
|
||||
void check_tombstone_slice(const schema& s, const utils::chunked_vector<range_tombstone>& list,
|
||||
const query::clustering_range& range,
|
||||
std::initializer_list<range_tombstone> expected)
|
||||
{
|
||||
|
||||
@@ -607,7 +607,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
auto db_cfg_ptr = ::make_shared<db::config>(std::move(extensions));
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
db_cfg.enable_user_defined_functions({true}, db::config::config_source::CommandLine);
|
||||
db_cfg.experimental_features({experimental_features_t::UDF, experimental_features_t::CDC}, db::config::config_source::CommandLine);
|
||||
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
|
||||
if (regenerate) {
|
||||
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
|
||||
} else {
|
||||
|
||||
@@ -75,6 +75,8 @@ cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
|
||||
// which all get thrown away when the test is done. This can cause timeouts
|
||||
// if /tmp is not tmpfs.
|
||||
db_config->commitlog_use_o_dsync.set(false);
|
||||
|
||||
db_config->add_cdc_extension();
|
||||
}
|
||||
|
||||
cql_test_config::cql_test_config(const cql_test_config&) = default;
|
||||
|
||||
Submodule tools/jmx updated: c51906ed01...47b355ec66
@@ -39,7 +39,7 @@ class LiveData(object):
|
||||
def _discoverMetrics(self):
|
||||
results = metric.Metric.discover(self._metric_source)
|
||||
logging.debug('_discoverMetrics: {} results discovered'.format(len(results)))
|
||||
for symbol in results:
|
||||
for symbol in list(results):
|
||||
if not self._matches(symbol, self._metricPatterns):
|
||||
results.pop(symbol)
|
||||
logging.debug('_initializeMetrics: {} results matched'.format(len(results)))
|
||||
|
||||
Reference in New Issue
Block a user