mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 19:32:02 +00:00
Compare commits
27 Commits
scylla-4.6
...
next-4.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c0825e2a6 | ||
|
|
db3dd3bdf6 | ||
|
|
4ad24180f5 | ||
|
|
755c7eeb6a | ||
|
|
8914ca8c58 | ||
|
|
e82e4bbed3 | ||
|
|
f9c457778e | ||
|
|
8315a7b164 | ||
|
|
291ca8db60 | ||
|
|
4da5fbaa24 | ||
|
|
fc16664d81 | ||
|
|
80bea5341e | ||
|
|
6ecc772b56 | ||
|
|
0b2e951954 | ||
|
|
f2a738497f | ||
|
|
badf7c816f | ||
|
|
bfb86f2c78 | ||
|
|
18e7a46038 | ||
|
|
cbcfa31e51 | ||
|
|
5ee69ff3a9 | ||
|
|
949103d22a | ||
|
|
549cb60f4c | ||
|
|
37633c5576 | ||
|
|
abd9f43fa7 | ||
|
|
d41d4db5c0 | ||
|
|
c500043a78 | ||
|
|
af4752a526 |
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=4.6.8
|
||||
VERSION=4.6.11
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -415,6 +415,11 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
rjson::add(table_description, "BillingModeSummary", rjson::empty_object());
|
||||
rjson::add(table_description["BillingModeSummary"], "BillingMode", "PAY_PER_REQUEST");
|
||||
rjson::add(table_description["BillingModeSummary"], "LastUpdateToPayPerRequestDateTime", rjson::value(creation_date_seconds));
|
||||
// In PAY_PER_REQUEST billing mode, provisioned capacity should return 0
|
||||
rjson::add(table_description, "ProvisionedThroughput", rjson::empty_object());
|
||||
rjson::add(table_description["ProvisionedThroughput"], "ReadCapacityUnits", 0);
|
||||
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", 0);
|
||||
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
|
||||
|
||||
std::unordered_map<std::string,std::string> key_attribute_types;
|
||||
// Add base table's KeySchema and collect types for AttributeDefinitions:
|
||||
|
||||
@@ -604,15 +604,21 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
return make_exception_future<json::json_return_type>(
|
||||
std::runtime_error("Can not perform cleanup operation when topology changes"));
|
||||
}
|
||||
return ctx.db.invoke_on_all([keyspace, column_families] (database& db) {
|
||||
std::vector<column_family*> column_families_vec;
|
||||
auto& cm = db.get_compaction_manager();
|
||||
for (auto cf : column_families) {
|
||||
column_families_vec.push_back(&db.find_column_family(keyspace, cf));
|
||||
}
|
||||
return parallel_for_each(column_families_vec, [&cm, &db] (column_family* cf) {
|
||||
return cm.perform_cleanup(db, cf);
|
||||
return ctx.db.invoke_on_all([keyspace, column_families] (database& db) -> future<> {
|
||||
auto table_ids = boost::copy_range<std::vector<utils::UUID>>(column_families | boost::adaptors::transformed([&] (auto& table_name) {
|
||||
return db.find_uuid(keyspace, table_name);
|
||||
}));
|
||||
// cleanup smaller tables first, to increase chances of success if low on space.
|
||||
std::ranges::sort(table_ids, std::less<>(), [&] (const utils::UUID& id) {
|
||||
return db.find_column_family(id).get_stats().live_disk_space_used;
|
||||
});
|
||||
auto& cm = db.get_compaction_manager();
|
||||
// as a table can be dropped during loop below, let's find it before issuing the cleanup request.
|
||||
for (auto& id : table_ids) {
|
||||
table& t = db.find_column_family(id);
|
||||
co_await cm.perform_cleanup(db, &t);
|
||||
}
|
||||
co_return;
|
||||
}).then([]{
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
});
|
||||
|
||||
@@ -527,16 +527,11 @@ future<> compaction_manager::stop() {
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_manager::really_do_stop() {
|
||||
if (_state == state::none || _state == state::stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
_state = state::stopped;
|
||||
future<> compaction_manager::really_do_stop() {
|
||||
cmlog.info("Asked to stop");
|
||||
// Reset the metrics registry
|
||||
_metrics.clear();
|
||||
_stop_future.emplace(stop_ongoing_compactions("shutdown").then([this] () mutable {
|
||||
return stop_ongoing_compactions("shutdown").then([this] () mutable {
|
||||
reevaluate_postponed_compactions();
|
||||
return std::move(_waiting_reevalution);
|
||||
}).then([this] {
|
||||
@@ -544,12 +539,34 @@ void compaction_manager::really_do_stop() {
|
||||
_compaction_submission_timer.cancel();
|
||||
cmlog.info("Stopped");
|
||||
return _compaction_controller.shutdown();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Ex>
|
||||
requires std::is_base_of_v<std::exception, Ex> &&
|
||||
requires (const Ex& ex) {
|
||||
{ ex.code() } noexcept -> std::same_as<const std::error_code&>;
|
||||
}
|
||||
auto swallow_enospc(const Ex& ex) noexcept {
|
||||
if (ex.code().value() != ENOSPC) {
|
||||
return make_exception_future<>(std::make_exception_ptr(ex));
|
||||
}
|
||||
|
||||
cmlog.warn("Got ENOSPC on stop, ignoring...");
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void compaction_manager::do_stop() noexcept {
|
||||
if (_state == state::none || _state == state::stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
really_do_stop();
|
||||
_state = state::stopped;
|
||||
_stop_future = really_do_stop()
|
||||
.handle_exception_type([] (const std::system_error& ex) { return swallow_enospc(ex); })
|
||||
.handle_exception_type([] (const storage_io_error& ex) { return swallow_enospc(ex); })
|
||||
;
|
||||
} catch (...) {
|
||||
try {
|
||||
cmlog.error("Failed to stop the manager: {}", std::current_exception());
|
||||
|
||||
@@ -209,7 +209,7 @@ public:
|
||||
|
||||
// Stop all fibers, without waiting. Safe to be called multiple times.
|
||||
void do_stop() noexcept;
|
||||
void really_do_stop();
|
||||
future<> really_do_stop();
|
||||
|
||||
// Submit a column family to be compacted.
|
||||
void submit(column_family* cf);
|
||||
|
||||
@@ -1403,7 +1403,7 @@ serviceLevelOrRoleName returns [sstring name]
|
||||
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower); }
|
||||
| t=STRING_LITERAL { $name = sstring($t.text); }
|
||||
| t=QUOTED_NAME { $name = sstring($t.text); }
|
||||
| k=unreserved_keyword { $name = sstring($t.text);
|
||||
| k=unreserved_keyword { $name = k;
|
||||
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower);}
|
||||
| QMARK {add_recognition_error("Bind variables cannot be used for service levels or role names");}
|
||||
;
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
|
||||
#include "cql3_type.hh"
|
||||
#include "cql3/util.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "ut_name.hh"
|
||||
#include "database.hh"
|
||||
#include "user_types_metadata.hh"
|
||||
@@ -448,7 +449,20 @@ sstring maybe_quote(const sstring& identifier) {
|
||||
}
|
||||
|
||||
if (!need_quotes) {
|
||||
return identifier;
|
||||
// A seemingly valid identifier matching [a-z][a-z0-9_]* may still
|
||||
// need quoting if it is a CQL keyword, e.g., "to" (see issue #9450).
|
||||
// While our parser Cql.g has different production rules for different
|
||||
// types of identifiers (column names, table names, etc.), all of
|
||||
// these behave identically for alphanumeric strings: they exclude
|
||||
// many keywords but allow keywords listed as "unreserved keywords".
|
||||
// So we can use any of them, for example cident.
|
||||
try {
|
||||
cql3::util::do_with_parser(identifier, std::mem_fn(&cql3_parser::CqlParser::cident));
|
||||
return identifier;
|
||||
} catch(exceptions::syntax_exception&) {
|
||||
// This alphanumeric string is not a valid identifier, so fall
|
||||
// through to have it quoted:
|
||||
}
|
||||
}
|
||||
if (num_quotes == 0) {
|
||||
return make_sstring("\"", identifier, "\"");
|
||||
|
||||
@@ -450,11 +450,16 @@ bool result_set_builder::restrictions_filter::do_filter(const selection& selecti
|
||||
}
|
||||
|
||||
auto clustering_columns_restrictions = _restrictions->get_clustering_columns_restrictions();
|
||||
if (dynamic_pointer_cast<cql3::restrictions::multi_column_restriction>(clustering_columns_restrictions)) {
|
||||
bool has_multi_col_clustering_restrictions =
|
||||
dynamic_pointer_cast<cql3::restrictions::multi_column_restriction>(clustering_columns_restrictions) != nullptr;
|
||||
if (has_multi_col_clustering_restrictions) {
|
||||
clustering_key_prefix ckey = clustering_key_prefix::from_exploded(clustering_key);
|
||||
return expr::is_satisfied_by(
|
||||
bool multi_col_clustering_satisfied = expr::is_satisfied_by(
|
||||
clustering_columns_restrictions->expression,
|
||||
partition_key, clustering_key, static_row, row, selection, _options);
|
||||
if (!multi_col_clustering_satisfied) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
auto static_row_iterator = static_row.iterator();
|
||||
@@ -502,6 +507,13 @@ bool result_set_builder::restrictions_filter::do_filter(const selection& selecti
|
||||
if (_skip_ck_restrictions) {
|
||||
continue;
|
||||
}
|
||||
if (has_multi_col_clustering_restrictions) {
|
||||
// Mixing multi column and single column restrictions on clustering
|
||||
// key columns is forbidden.
|
||||
// Since there are multi column restrictions we have to skip
|
||||
// evaluating single column restrictions or we will get an error.
|
||||
continue;
|
||||
}
|
||||
auto clustering_key_restrictions_map = _restrictions->get_single_column_clustering_key_restrictions();
|
||||
auto restr_it = clustering_key_restrictions_map.find(cdef);
|
||||
if (restr_it == clustering_key_restrictions_map.end()) {
|
||||
|
||||
@@ -46,6 +46,7 @@
|
||||
#include "cdc/cdc_extension.hh"
|
||||
#include "gms/feature.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "utils/bloom_calculations.hh"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
@@ -168,6 +169,16 @@ void cf_prop_defs::validate(const database& db, const schema::extensions_map& sc
|
||||
throw exceptions::configuration_exception(KW_MAX_INDEX_INTERVAL + " must be greater than " + KW_MIN_INDEX_INTERVAL);
|
||||
}
|
||||
|
||||
if (get_simple(KW_BF_FP_CHANCE)) {
|
||||
double bloom_filter_fp_chance = get_double(KW_BF_FP_CHANCE, 0/*not used*/);
|
||||
double min_bloom_filter_fp_chance = utils::bloom_calculations::min_supported_bloom_filter_fp_chance();
|
||||
if (bloom_filter_fp_chance <= min_bloom_filter_fp_chance || bloom_filter_fp_chance > 1.0) {
|
||||
throw exceptions::configuration_exception(format(
|
||||
"{} must be larger than {} and less than or equal to 1.0 (got {})",
|
||||
KW_BF_FP_CHANCE, min_bloom_filter_fp_chance, bloom_filter_fp_chance));
|
||||
}
|
||||
}
|
||||
|
||||
speculative_retry::from_sstring(get_string(KW_SPECULATIVE_RETRY, speculative_retry(speculative_retry::type::NONE, 0).to_sstring()));
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,8 @@
|
||||
#include "types/listlike_partial_deserializing_iterator.hh"
|
||||
#include "utils/managed_bytes.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include <boost/algorithm/string/trim_all.hpp>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
static inline bool is_control_char(char c) {
|
||||
return c >= 0 && c <= 0x1F;
|
||||
@@ -212,6 +214,17 @@ struct from_json_object_visitor {
|
||||
}
|
||||
bytes operator()(const boolean_type_impl& t) {
|
||||
if (!value.IsBool()) {
|
||||
if (value.IsString()) {
|
||||
std::string str(rjson::to_string_view(value));
|
||||
boost::trim_all(str);
|
||||
boost::to_lower(str);
|
||||
|
||||
if (str == "true") {
|
||||
return t.decompose(true);
|
||||
} else if (str == "false") {
|
||||
return t.decompose(false);
|
||||
}
|
||||
}
|
||||
throw marshal_exception(format("Invalid JSON object {}", value));
|
||||
}
|
||||
return t.decompose(value.GetBool());
|
||||
|
||||
@@ -87,6 +87,13 @@ std::unique_ptr<cql3::statements::raw::select_statement> build_select_statement(
|
||||
/// forbids non-alpha-numeric characters in identifier names.
|
||||
/// Quoting involves wrapping the string in double-quotes ("). A double-quote
|
||||
/// character itself is quoted by doubling it.
|
||||
/// maybe_quote() also quotes reserved CQL keywords (e.g., "to", "where")
|
||||
/// but doesn't quote *unreserved* keywords (like ttl, int or as).
|
||||
/// Note that this means that if new reserved keywords are added to the
|
||||
/// parser, a saved output of maybe_quote() may no longer be parsable by
|
||||
/// parser. To avoid this forward-compatibility issue, use quote() instead
|
||||
/// of maybe_quote() - to unconditionally quote an identifier even if it is
|
||||
/// lowercase and not (yet) a keyword.
|
||||
sstring maybe_quote(const sstring& s);
|
||||
|
||||
// Check whether timestamp is not too far in the future as this probably
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
*/
|
||||
|
||||
#include <chrono>
|
||||
#include <exception>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
@@ -306,6 +307,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
|
||||
} catch (no_such_keyspace& ex) {
|
||||
// should probably ignore and drop the batch
|
||||
} catch (...) {
|
||||
blogger.warn("Replay failed (will retry): {}", std::current_exception());
|
||||
// timeout, overload etc.
|
||||
// Do _not_ remove the batch, assuning we got a node write error.
|
||||
// Since we don't have hints (which origin is satisfied with),
|
||||
|
||||
@@ -891,13 +891,18 @@ void view_updates::generate_update(
|
||||
bool same_row = true;
|
||||
for (auto col_id : col_ids) {
|
||||
auto* after = update.cells().find_cell(col_id);
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto& cdef = _base->regular_column_at(col_id);
|
||||
if (existing) {
|
||||
auto* before = existing->cells().find_cell(col_id);
|
||||
// Note that this cell is necessarily atomic, because col_ids are
|
||||
// view key columns, and keys must be atomic.
|
||||
if (before && before->as_atomic_cell(cdef).is_live()) {
|
||||
if (after && after->as_atomic_cell(cdef).is_live()) {
|
||||
auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(cdef), after->as_atomic_cell(cdef));
|
||||
// We need to compare just the values of the keys, not
|
||||
// metadata like the timestamp. This is because below,
|
||||
// if the old and new view row have the same key, we need
|
||||
// to be sure to reach the update_entry() case.
|
||||
auto cmp = compare_unsigned(before->as_atomic_cell(cdef).value(), after->as_atomic_cell(cdef).value());
|
||||
if (cmp != 0) {
|
||||
same_row = false;
|
||||
}
|
||||
@@ -917,7 +922,13 @@ void view_updates::generate_update(
|
||||
if (same_row) {
|
||||
update_entry(base_key, update, *existing, now);
|
||||
} else {
|
||||
replace_entry(base_key, update, *existing, now);
|
||||
// This code doesn't work if the old and new view row have the
|
||||
// same key, because if they do we get both data and tombstone
|
||||
// for the same timestamp (now) and the tombstone wins. This
|
||||
// is why we need the "same_row" case above - it's not just a
|
||||
// performance optimization.
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
create_entry(base_key, update, now);
|
||||
}
|
||||
} else {
|
||||
delete_old_entry(base_key, *existing, update, now);
|
||||
|
||||
@@ -164,10 +164,7 @@ private:
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
|
||||
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
create_entry(base_key, update, now);
|
||||
delete_old_entry(base_key, existing, update, now);
|
||||
}
|
||||
void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
|
||||
};
|
||||
|
||||
class view_update_builder {
|
||||
|
||||
@@ -45,7 +45,7 @@
|
||||
logging::logger fmr_logger("flat_mutation_reader");
|
||||
|
||||
flat_mutation_reader& flat_mutation_reader::operator=(flat_mutation_reader&& o) noexcept {
|
||||
if (_impl) {
|
||||
if (_impl && _impl->is_close_required()) {
|
||||
impl* ip = _impl.get();
|
||||
// Abort to enforce calling close() before readers are closed
|
||||
// to prevent leaks and potential use-after-free due to background
|
||||
@@ -58,7 +58,7 @@ flat_mutation_reader& flat_mutation_reader::operator=(flat_mutation_reader&& o)
|
||||
}
|
||||
|
||||
flat_mutation_reader::~flat_mutation_reader() {
|
||||
if (_impl) {
|
||||
if (_impl && _impl->is_close_required()) {
|
||||
impl* ip = _impl.get();
|
||||
// Abort to enforce calling close() before readers are closed
|
||||
// to prevent leaks and potential use-after-free due to background
|
||||
@@ -1344,7 +1344,7 @@ void mutation_fragment_stream_validating_filter::on_end_of_stream() {
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2& flat_mutation_reader_v2::operator=(flat_mutation_reader_v2&& o) noexcept {
|
||||
if (_impl) {
|
||||
if (_impl && _impl->is_close_required()) {
|
||||
impl* ip = _impl.get();
|
||||
// Abort to enforce calling close() before readers are closed
|
||||
// to prevent leaks and potential use-after-free due to background
|
||||
@@ -1357,7 +1357,7 @@ flat_mutation_reader_v2& flat_mutation_reader_v2::operator=(flat_mutation_reader
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2::~flat_mutation_reader_v2() {
|
||||
if (_impl) {
|
||||
if (_impl && _impl->is_close_required()) {
|
||||
impl* ip = _impl.get();
|
||||
// Abort to enforce calling close() before readers are closed
|
||||
// to prevent leaks and potential use-after-free due to background
|
||||
|
||||
@@ -142,6 +142,7 @@ public:
|
||||
private:
|
||||
tracked_buffer _buffer;
|
||||
size_t _buffer_size = 0;
|
||||
bool _close_required = false;
|
||||
protected:
|
||||
size_t max_buffer_size_in_bytes = default_max_buffer_size_in_bytes();
|
||||
bool _end_of_stream = false;
|
||||
@@ -175,6 +176,8 @@ public:
|
||||
bool is_end_of_stream() const { return _end_of_stream; }
|
||||
bool is_buffer_empty() const { return _buffer.empty(); }
|
||||
bool is_buffer_full() const { return _buffer_size >= max_buffer_size_in_bytes; }
|
||||
bool is_close_required() const { return _close_required; }
|
||||
void set_close_required() { _close_required = true; }
|
||||
static constexpr size_t default_max_buffer_size_in_bytes() { return 8 * 1024; }
|
||||
|
||||
mutation_fragment pop_mutation_fragment() {
|
||||
@@ -506,9 +509,15 @@ public:
|
||||
//
|
||||
// Can be used to skip over entire partitions if interleaved with
|
||||
// `operator()()` calls.
|
||||
future<> next_partition() { return _impl->next_partition(); }
|
||||
future<> next_partition() {
|
||||
_impl->set_close_required();
|
||||
return _impl->next_partition();
|
||||
}
|
||||
|
||||
future<> fill_buffer() { return _impl->fill_buffer(); }
|
||||
future<> fill_buffer() {
|
||||
_impl->set_close_required();
|
||||
return _impl->fill_buffer();
|
||||
}
|
||||
|
||||
// Changes the range of partitions to pr. The range can only be moved
|
||||
// forwards. pr.begin() needs to be larger than pr.end() of the previousl
|
||||
@@ -517,6 +526,7 @@ public:
|
||||
// pr needs to be valid until the reader is destroyed or fast_forward_to()
|
||||
// is called again.
|
||||
future<> fast_forward_to(const dht::partition_range& pr) {
|
||||
_impl->set_close_required();
|
||||
return _impl->fast_forward_to(pr);
|
||||
}
|
||||
// Skips to a later range of rows.
|
||||
@@ -546,6 +556,7 @@ public:
|
||||
// In particular one must first enter a partition by fetching a `partition_start`
|
||||
// fragment before calling `fast_forward_to`.
|
||||
future<> fast_forward_to(position_range cr) {
|
||||
_impl->set_close_required();
|
||||
return _impl->fast_forward_to(std::move(cr));
|
||||
}
|
||||
// Closes the reader.
|
||||
|
||||
@@ -177,6 +177,7 @@ public:
|
||||
private:
|
||||
tracked_buffer _buffer;
|
||||
size_t _buffer_size = 0;
|
||||
bool _close_required = false;
|
||||
protected:
|
||||
size_t max_buffer_size_in_bytes = default_max_buffer_size_in_bytes();
|
||||
|
||||
@@ -216,6 +217,8 @@ public:
|
||||
bool is_end_of_stream() const { return _end_of_stream; }
|
||||
bool is_buffer_empty() const { return _buffer.empty(); }
|
||||
bool is_buffer_full() const { return _buffer_size >= max_buffer_size_in_bytes; }
|
||||
bool is_close_required() const { return _close_required; }
|
||||
void set_close_required() { _close_required = true; }
|
||||
static constexpr size_t default_max_buffer_size_in_bytes() { return 8 * 1024; }
|
||||
|
||||
mutation_fragment_v2 pop_mutation_fragment() {
|
||||
@@ -547,9 +550,15 @@ public:
|
||||
//
|
||||
// Can be used to skip over entire partitions if interleaved with
|
||||
// `operator()()` calls.
|
||||
future<> next_partition() { return _impl->next_partition(); }
|
||||
future<> next_partition() {
|
||||
_impl->set_close_required();
|
||||
return _impl->next_partition();
|
||||
}
|
||||
|
||||
future<> fill_buffer() { return _impl->fill_buffer(); }
|
||||
future<> fill_buffer() {
|
||||
_impl->set_close_required();
|
||||
return _impl->fill_buffer();
|
||||
}
|
||||
|
||||
// Changes the range of partitions to pr. The range can only be moved
|
||||
// forwards. pr.begin() needs to be larger than pr.end() of the previousl
|
||||
@@ -558,6 +567,7 @@ public:
|
||||
// pr needs to be valid until the reader is destroyed or fast_forward_to()
|
||||
// is called again.
|
||||
future<> fast_forward_to(const dht::partition_range& pr) {
|
||||
_impl->set_close_required();
|
||||
return _impl->fast_forward_to(pr);
|
||||
}
|
||||
// Skips to a later range of rows.
|
||||
@@ -587,6 +597,7 @@ public:
|
||||
// In particular one must first enter a partition by fetching a `partition_start`
|
||||
// fragment before calling `fast_forward_to`.
|
||||
future<> fast_forward_to(position_range cr) {
|
||||
_impl->set_close_required();
|
||||
return _impl->fast_forward_to(std::move(cr));
|
||||
}
|
||||
// Closes the reader.
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#include "locator/ec2_snitch.hh"
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
@@ -67,6 +69,30 @@ future<> ec2_snitch::start() {
|
||||
}
|
||||
|
||||
future<sstring> ec2_snitch::aws_api_call(sstring addr, uint16_t port, sstring cmd) {
|
||||
return do_with(int(0), [this, addr, port, cmd] (int& i) {
|
||||
return repeat_until_value([this, addr, port, cmd, &i]() -> future<std::optional<sstring>> {
|
||||
++i;
|
||||
return aws_api_call_once(addr, port, cmd).then([] (auto res) {
|
||||
return make_ready_future<std::optional<sstring>>(std::move(res));
|
||||
}).handle_exception([&i] (auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const std::system_error &e) {
|
||||
logger().error(e.what());
|
||||
if (i >= AWS_API_CALL_RETRIES - 1) {
|
||||
logger().error("Maximum number of retries exceeded");
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return sleep(AWS_API_CALL_RETRY_INTERVAL).then([] {
|
||||
return make_ready_future<std::optional<sstring>>(std::nullopt);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<sstring> ec2_snitch::aws_api_call_once(sstring addr, uint16_t port, sstring cmd) {
|
||||
return connect(socket_address(inet_address{addr}, port))
|
||||
.then([this, addr, cmd] (connected_socket fd) {
|
||||
_sd = std::move(fd);
|
||||
|
||||
@@ -29,6 +29,8 @@ public:
|
||||
static constexpr const char* ZONE_NAME_QUERY_REQ = "/latest/meta-data/placement/availability-zone";
|
||||
static constexpr const char* AWS_QUERY_SERVER_ADDR = "169.254.169.254";
|
||||
static constexpr uint16_t AWS_QUERY_SERVER_PORT = 80;
|
||||
static constexpr int AWS_API_CALL_RETRIES = 5;
|
||||
static constexpr auto AWS_API_CALL_RETRY_INTERVAL = std::chrono::seconds{5};
|
||||
|
||||
ec2_snitch(const sstring& fname = "", unsigned io_cpu_id = 0);
|
||||
virtual future<> start() override;
|
||||
@@ -45,5 +47,6 @@ private:
|
||||
output_stream<char> _out;
|
||||
http_response_parser _parser;
|
||||
sstring _zone_req;
|
||||
future<sstring> aws_api_call_once(sstring addr, uint16_t port, const sstring cmd);
|
||||
};
|
||||
} // namespace locator
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include <seastar/core/io_priority_class.hh>
|
||||
|
||||
class memtable;
|
||||
class reader_permit;
|
||||
class flat_mutation_reader;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -442,6 +442,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::GOSSIP_ECHO:
|
||||
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
|
||||
case messaging_verb::GET_SCHEMA_VERSION:
|
||||
// ATTN -- if moving GOSSIP_ verbs elsewhere, mind updating the tcp_nodelay
|
||||
// setting in get_rpc_client(), which assumes gossiper verbs live in idx 0
|
||||
return 0;
|
||||
case messaging_verb::PREPARE_MESSAGE:
|
||||
case messaging_verb::PREPARE_DONE_MESSAGE:
|
||||
@@ -689,7 +691,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
}();
|
||||
|
||||
auto must_tcp_nodelay = [&] {
|
||||
if (idx == 1) {
|
||||
if (idx == 0) {
|
||||
return true; // gossip
|
||||
}
|
||||
if (_cfg.tcp_nodelay == tcp_nodelay_what::local) {
|
||||
|
||||
@@ -283,8 +283,8 @@ public:
|
||||
|
||||
future<> lookup_readers(db::timeout_clock::time_point timeout);
|
||||
|
||||
future<> save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, detached_compaction_state compaction_state,
|
||||
std::optional<clustering_key_prefix> last_ckey);
|
||||
future<> save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, std::optional<detached_compaction_state> compaction_state,
|
||||
dht::decorated_key last_pkey, std::optional<clustering_key_prefix> last_ckey);
|
||||
|
||||
future<> stop();
|
||||
};
|
||||
@@ -583,19 +583,22 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, detached_compaction_state compaction_state,
|
||||
std::optional<clustering_key_prefix> last_ckey) {
|
||||
future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, std::optional<detached_compaction_state> compaction_state,
|
||||
dht::decorated_key last_pkey, std::optional<clustering_key_prefix> last_ckey) {
|
||||
if (_cmd.query_uuid == utils::UUID{}) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto last_pkey = compaction_state.partition_start.key();
|
||||
|
||||
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey);
|
||||
tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats);
|
||||
|
||||
const auto cs_stats = dismantle_compaction_state(std::move(compaction_state));
|
||||
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
|
||||
auto cs_stats = dismantle_buffer_stats{};
|
||||
if (compaction_state) {
|
||||
cs_stats = dismantle_compaction_state(std::move(*compaction_state));
|
||||
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
|
||||
} else {
|
||||
tracing::trace(_trace_state, "No compaction state to dismantle, partition exhausted", cs_stats);
|
||||
}
|
||||
|
||||
return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey,
|
||||
const std::optional<clustering_key_prefix>& last_ckey) {
|
||||
@@ -703,7 +706,9 @@ future<typename ResultBuilder::result_type> do_query(
|
||||
std::move(result_builder));
|
||||
|
||||
if (compaction_state->are_limits_reached() || result.is_short_read()) {
|
||||
co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_ckey));
|
||||
// Must call before calling 'detached_state()`.
|
||||
auto last_pkey = *compaction_state->current_partition();
|
||||
co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_pkey), std::move(last_ckey));
|
||||
}
|
||||
|
||||
co_await ctx->stop();
|
||||
|
||||
@@ -175,6 +175,9 @@ class compact_mutation_state {
|
||||
std::unique_ptr<mutation_compactor_garbage_collector> _collector;
|
||||
|
||||
compaction_stats _stats;
|
||||
|
||||
// Remember if we requested to stop mid-partition.
|
||||
stop_iteration _stop = stop_iteration::no;
|
||||
private:
|
||||
static constexpr bool only_live() {
|
||||
return OnlyLive == emit_only_live_rows::yes;
|
||||
@@ -270,6 +273,7 @@ public:
|
||||
}
|
||||
|
||||
void consume_new_partition(const dht::decorated_key& dk) {
|
||||
_stop = stop_iteration::no;
|
||||
auto& pk = dk.key();
|
||||
_dk = &dk;
|
||||
_return_static_content_on_partition_with_no_rows =
|
||||
@@ -323,9 +327,9 @@ public:
|
||||
_static_row_live = is_live;
|
||||
if (is_live || (!only_live() && !sr.empty())) {
|
||||
partition_is_not_empty(consumer);
|
||||
return consumer.consume(std::move(sr), current_tombstone, is_live);
|
||||
_stop = consumer.consume(std::move(sr), current_tombstone, is_live);
|
||||
}
|
||||
return stop_iteration::no;
|
||||
return _stop;
|
||||
}
|
||||
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
@@ -370,23 +374,22 @@ public:
|
||||
|
||||
if (only_live() && is_live) {
|
||||
partition_is_not_empty(consumer);
|
||||
auto stop = consumer.consume(std::move(cr), t, true);
|
||||
_stop = consumer.consume(std::move(cr), t, true);
|
||||
if (++_rows_in_current_partition == _current_partition_limit) {
|
||||
return stop_iteration::yes;
|
||||
_stop = stop_iteration::yes;
|
||||
}
|
||||
return stop;
|
||||
return _stop;
|
||||
} else if (!only_live()) {
|
||||
auto stop = stop_iteration::no;
|
||||
if (!cr.empty()) {
|
||||
partition_is_not_empty(consumer);
|
||||
stop = consumer.consume(std::move(cr), t, is_live);
|
||||
_stop = consumer.consume(std::move(cr), t, is_live);
|
||||
}
|
||||
if (!sstable_compaction() && is_live && ++_rows_in_current_partition == _current_partition_limit) {
|
||||
return stop_iteration::yes;
|
||||
_stop = stop_iteration::yes;
|
||||
}
|
||||
return stop;
|
||||
return _stop;
|
||||
}
|
||||
return stop_iteration::no;
|
||||
return _stop;
|
||||
}
|
||||
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
@@ -398,13 +401,13 @@ public:
|
||||
if (rt.tomb > _range_tombstones.get_partition_tombstone()) {
|
||||
if (can_purge_tombstone(rt.tomb)) {
|
||||
partition_is_not_empty_for_gc_consumer(gc_consumer);
|
||||
return gc_consumer.consume(std::move(rt));
|
||||
_stop = gc_consumer.consume(std::move(rt));
|
||||
} else {
|
||||
partition_is_not_empty(consumer);
|
||||
return consumer.consume(std::move(rt));
|
||||
_stop = consumer.consume(std::move(rt));
|
||||
}
|
||||
}
|
||||
return stop_iteration::no;
|
||||
return _stop;
|
||||
}
|
||||
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
@@ -492,9 +495,24 @@ public:
|
||||
/// compactor will result in the new compactor being in the same state *this
|
||||
/// is (given the same outside parameters of course). Practically this
|
||||
/// allows the compaction state to be stored in the compacted reader.
|
||||
detached_compaction_state detach_state() && {
|
||||
/// If the currently compacted partition is exhausted a disengaged optional
|
||||
/// is returned -- in this case there is no state to detach.
|
||||
std::optional<detached_compaction_state> detach_state() && {
|
||||
// If we exhausted the partition, there is no need to detach-restore the
|
||||
// compaction state.
|
||||
// We exhausted the partition if `consume_partition_end()` was called
|
||||
// without us requesting the consumption to stop (remembered in _stop)
|
||||
// from one of the consume() overloads.
|
||||
// The consume algorithm calls `consume_partition_end()` in two cases:
|
||||
// * on a partition-end fragment
|
||||
// * consume() requested to stop
|
||||
// In the latter case, the partition is not exhausted. Even if the next
|
||||
// fragment to process is a partition-end, it will not be consumed.
|
||||
if (!_stop) {
|
||||
return {};
|
||||
}
|
||||
partition_start ps(std::move(_last_dk), _range_tombstones.get_partition_tombstone());
|
||||
return {std::move(ps), std::move(_last_static_row), std::move(_range_tombstones).range_tombstones()};
|
||||
return detached_compaction_state{std::move(ps), std::move(_last_static_row), std::move(_range_tombstones).range_tombstones()};
|
||||
}
|
||||
|
||||
const compaction_stats& stats() const { return _stats; }
|
||||
|
||||
@@ -843,7 +843,6 @@ public:
|
||||
|
||||
void apply(shadowable_tombstone deleted_at) {
|
||||
_deleted_at.apply(deleted_at, _marker);
|
||||
maybe_shadow();
|
||||
}
|
||||
|
||||
void apply(row_tombstone deleted_at) {
|
||||
|
||||
@@ -109,7 +109,7 @@ void range_tombstone_list::insert_from(const schema& s,
|
||||
if (cmp(end, it->position()) < 0) {
|
||||
// not overlapping
|
||||
if (it->tombstone().tomb == tomb && cmp(end, it->position()) == 0) {
|
||||
rev.update(it, {std::move(start), std::move(start), tomb});
|
||||
rev.update(it, {std::move(start), std::move(end), tomb});
|
||||
} else {
|
||||
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), tomb);
|
||||
rev.insert(it, *rt);
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "mutation_fragment.hh"
|
||||
#include "mutation_fragment_v2.hh"
|
||||
#include "converting_mutation_partition_applier.hh"
|
||||
|
||||
// A StreamedMutationTransformer which transforms the stream to a different schema
|
||||
|
||||
@@ -1745,9 +1745,7 @@ public:
|
||||
_monitor.on_read_started(_context->reader_position());
|
||||
}
|
||||
public:
|
||||
void on_out_of_clustering_range() override {
|
||||
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
|
||||
}
|
||||
void on_out_of_clustering_range() override { }
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
|
||||
}
|
||||
|
||||
@@ -107,9 +107,9 @@ def test_describe_table_size(test_table):
|
||||
# Test the ProvisionedThroughput attribute returned by DescribeTable.
|
||||
# This is a very partial test: Our test table is configured without
|
||||
# provisioned throughput, so obviously it will not have interesting settings
|
||||
# for it. DynamoDB returns zeros for some of the attributes, even though
|
||||
# the documentation suggests missing values should have been fine too.
|
||||
@pytest.mark.xfail(reason="DescribeTable does not return provisioned throughput")
|
||||
# for it. But DynamoDB documents that zeros be returned for WriteCapacityUnits
|
||||
# and ReadCapacityUnits, and does this in practice as well - and some
|
||||
# applications assume these numbers are always there (even if 0).
|
||||
def test_describe_table_provisioned_throughput(test_table):
|
||||
got = test_table.meta.client.describe_table(TableName=test_table.name)['Table']
|
||||
assert got['ProvisionedThroughput']['NumberOfDecreasesToday'] == 0
|
||||
|
||||
@@ -438,6 +438,126 @@ def test_gsi_update_second_regular_base_column(test_table_gsi_3):
|
||||
KeyConditions={'a': {'AttributeValueList': [items[3]['a']], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [items[3]['b']], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# Test reproducing issue #11801: In issue #5006 we noticed that in the special
|
||||
# case of a GSI with with two non-key attributes as keys (test_table_gsi_3),
|
||||
# an update of the second attribute forgot to delete the old row. We fixed
|
||||
# that bug, but a bug remained for updates which update the value to the *same*
|
||||
# value - in that case the old row shouldn't be deleted, but we did - as
|
||||
# noticed in issue #11801.
|
||||
def test_11801(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
# Update the attribute 'b' to the same value b that it already had.
|
||||
# This shouldn't change anything in the base table or in the GSI
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
# In issue #11801, the following assertion failed (the view row was
|
||||
# deleted and nothing matched the query).
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
# Above we checked that setting 'b' to the same value didn't remove
|
||||
# the old GSI row. But the same update may actually modify the GSI row
|
||||
# (e.g., an unrelated attribute d) - check this modification took place:
|
||||
item['d'] = random_string()
|
||||
test_table_gsi_3.update_item(Key={'p': p},
|
||||
AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'},
|
||||
'd': {'Value': item['d'], 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# This test is the same as test_11801, but updating the first attribute (a)
|
||||
# instead of the second (b). This test didn't fail, showing that issue #11801
|
||||
# is - like #5006 - specific to the case of updating the second attribute.
|
||||
def test_11801_variant1(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
d = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': d}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'a': {'Value': a, 'Action': 'PUT'}})
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# This test is the same as test_11801, but updates b to a different value
|
||||
# (newb) instead of to the same one. This test didn't fail, showing that
|
||||
# issue #11801 is specific to updates to the same value. This test basically
|
||||
# reproduces the already-fixed #5006 (we also have another test above which
|
||||
# reproduces that issue - test_gsi_update_second_regular_base_column())
|
||||
def test_11801_variant2(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
newb = random_string()
|
||||
item['b'] = newb
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': newb, 'Action': 'PUT'}})
|
||||
assert_index_query(test_table_gsi_3, 'hello', [],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [newb], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# This test is the same as test_11801, but uses a different table schema
|
||||
# (test_table_gsi_5) where there is only one new key column in the view (x).
|
||||
# This test passed, showing that issue #11801 was specific to the special
|
||||
# case of a view with two new key columns (test_table_gsi_3).
|
||||
def test_11801_variant3(test_table_gsi_5):
|
||||
p = random_string()
|
||||
c = random_string()
|
||||
x = random_string()
|
||||
item = {'p': p, 'c': c, 'x': x, 'd': random_string()}
|
||||
test_table_gsi_5.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_5, 'hello', [item],
|
||||
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
test_table_gsi_5.update_item(Key={'p': p, 'c': c}, AttributeUpdates={'x': {'Value': x, 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
|
||||
assert_index_query(test_table_gsi_5, 'hello', [item],
|
||||
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
|
||||
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# Another test similar to test_11801, but instead of updating a view key
|
||||
# column to the same value it already has, simply don't update it at all
|
||||
# (and just modify some other regular column). This test passed, showing
|
||||
# that issue #11801 is specific to the case of updating a view key column
|
||||
# to the same value it already had.
|
||||
def test_11801_variant4(test_table_gsi_3):
|
||||
p = random_string()
|
||||
a = random_string()
|
||||
b = random_string()
|
||||
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
|
||||
test_table_gsi_3.put_item(Item=item)
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
# An update that doesn't change the GSI keys (a or b), just a regular
|
||||
# column d.
|
||||
item['d'] = random_string()
|
||||
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'d': {'Value': item['d'], 'Action': 'PUT'}})
|
||||
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert_index_query(test_table_gsi_3, 'hello', [item],
|
||||
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
|
||||
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
|
||||
|
||||
# Test that when a table has a GSI, if the indexed attribute is missing, the
|
||||
# item is added to the base table but not the index.
|
||||
# This is the same feature we already tested in test_gsi_missing_attribute()
|
||||
|
||||
@@ -32,7 +32,7 @@
|
||||
#include "cql3/util.hh"
|
||||
|
||||
//
|
||||
// Test basic CQL string quoting
|
||||
// Test basic CQL identifier quoting
|
||||
//
|
||||
BOOST_AUTO_TEST_CASE(maybe_quote) {
|
||||
std::string s(65536, 'x');
|
||||
@@ -67,6 +67,16 @@ BOOST_AUTO_TEST_CASE(maybe_quote) {
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"\""), "\"\"\"\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"hell0\""), "\"\"\"hell0\"\"\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello \"my\" world"), "\"hello \"\"my\"\" world\"");
|
||||
|
||||
// Reproducer for issue #9450. Reserved keywords like "to" or "where"
|
||||
// need quoting, but unreserved keywords like "ttl", "int" or "as",
|
||||
// do not.
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("to"), "\"to\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("where"), "\"where\"");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("ttl"), "ttl");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("int"), "int");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("as"), "as");
|
||||
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("ttl hi"), "\"ttl hi\"");
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -941,3 +941,28 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_is_mutation_source) {
|
||||
};
|
||||
run_mutation_source_tests(populate);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_allow_reader_early_destruction) {
|
||||
struct test_reader_impl : public flat_mutation_reader::impl {
|
||||
using flat_mutation_reader::impl::impl;
|
||||
virtual future<> fill_buffer() override { return make_ready_future<>(); }
|
||||
virtual future<> next_partition() override { return make_ready_future<>(); }
|
||||
virtual future<> fast_forward_to(const dht::partition_range&) override { return make_ready_future<>(); }
|
||||
virtual future<> fast_forward_to(position_range) override { return make_ready_future<>(); }
|
||||
virtual future<> close() noexcept override { return make_ready_future<>(); };
|
||||
};
|
||||
struct test_reader_v2_impl : public flat_mutation_reader_v2::impl {
|
||||
using flat_mutation_reader_v2::impl::impl;
|
||||
virtual future<> fill_buffer() override { return make_ready_future<>(); }
|
||||
virtual future<> next_partition() override { return make_ready_future<>(); }
|
||||
virtual future<> fast_forward_to(const dht::partition_range&) override { return make_ready_future<>(); }
|
||||
virtual future<> fast_forward_to(position_range) override { return make_ready_future<>(); }
|
||||
virtual future<> close() noexcept override { return make_ready_future<>(); };
|
||||
};
|
||||
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// These readers are not closed, but didn't start any operations, so it's safe for them to be destroyed.
|
||||
auto reader = make_flat_mutation_reader<test_reader_impl>(s.schema(), semaphore.make_permit());
|
||||
auto reader_v2 = make_flat_mutation_reader_v2<test_reader_v2_impl>(s.schema(), semaphore.make_permit());
|
||||
}
|
||||
|
||||
@@ -763,9 +763,8 @@ SEASTAR_THREAD_TEST_CASE(multi_col_in) {
|
||||
cquery_nofail(e, "insert into t(pk,ck1,ck2,r) values (4,13,23,'a')");
|
||||
require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) allow filtering", {{I(3)}, {I(4)}});
|
||||
require_rows(e, "select pk from t where (ck1) in ((13),(33),(44)) allow filtering", {{I(3)}, {I(4)}});
|
||||
// TODO: uncomment when #6200 is fixed.
|
||||
// require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) and r='a' allow filtering",
|
||||
// {{I(4), I(13), F(23), T("a")}});
|
||||
require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) and r='a' allow filtering",
|
||||
{{I(4), I(13), F(23), T("a")}});
|
||||
cquery_nofail(e, "delete from t where pk=4");
|
||||
require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) allow filtering", {{I(3)}});
|
||||
auto stmt = e.prepare("select ck1 from t where (ck1,ck2) in ? allow filtering").get0();
|
||||
|
||||
@@ -72,6 +72,7 @@
|
||||
#include "test/lib/reader_concurrency_semaphore.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/random_schema.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@@ -3003,3 +3004,58 @@ SEASTAR_TEST_CASE(sstable_reader_with_timeout) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_crawling_reader_out_of_range_last_range_tombstone_change) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema table;
|
||||
|
||||
auto mut = table.new_mutation("pk0");
|
||||
auto ckeys = table.make_ckeys(4);
|
||||
table.add_row(mut, ckeys[0], "v0");
|
||||
table.add_row(mut, ckeys[1], "v1");
|
||||
table.add_row(mut, ckeys[2], "v2");
|
||||
using bound = query::clustering_range::bound;
|
||||
table.delete_range(mut, query::clustering_range::make(bound{ckeys[3], true}, bound{clustering_key::make_empty(), true}), tombstone(1, gc_clock::now()));
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto sst_gen = [&env, &table, &tmp] () {
|
||||
return env.make_sstable(table.schema(), tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
|
||||
};
|
||||
auto sst = make_sstable_containing(sst_gen, {mut});
|
||||
|
||||
assert_that(sst->make_crawling_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_crawling_reader_random_schema_random_mutations) {
|
||||
return test_env::do_with_async([this] (test_env& env) {
|
||||
auto random_spec = tests::make_random_schema_specification(
|
||||
get_name(),
|
||||
std::uniform_int_distribution<size_t>(1, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 8),
|
||||
std::uniform_int_distribution<size_t>(2, 8));
|
||||
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
|
||||
auto schema = random_schema.schema();
|
||||
|
||||
testlog.info("Random schema:\n{}", random_schema.cql());
|
||||
|
||||
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto sst_gen = [&env, schema, &tmp] () {
|
||||
return env.make_sstable(schema, tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
|
||||
};
|
||||
auto sst = make_sstable_containing(sst_gen, muts);
|
||||
|
||||
{
|
||||
auto rd = assert_that(sst->make_crawling_reader(schema, env.make_reader_permit()));
|
||||
|
||||
for (const auto& mut : muts) {
|
||||
rd.produces(mut);
|
||||
}
|
||||
}
|
||||
|
||||
assert_that(sst->make_crawling_reader(schema, env.make_reader_permit())).has_monotonic_positions();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -112,3 +112,37 @@ def test_filter_with_unused_static_column(cql, test_keyspace):
|
||||
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES (42,43,44)")
|
||||
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES (1,2,3)")
|
||||
assert list(cql.execute(f"SELECT * FROM {mv}")) == [(42, 43, 44)]
|
||||
|
||||
# Reproducer for issue #9450 - when a view's key column name is a (quoted)
|
||||
# keyword, writes used to fail because they generated internally broken CQL
|
||||
# with the column name not quoted.
|
||||
def test_mv_quoted_column_names(cql, test_keyspace):
|
||||
for colname in ['"dog"', '"Dog"', 'DOG', '"to"', 'int']:
|
||||
with new_test_table(cql, test_keyspace, f'p int primary key, {colname} int') as table:
|
||||
with new_materialized_view(cql, table, '*', f'{colname}, p', f'{colname} is not null and p is not null') as mv:
|
||||
cql.execute(f'INSERT INTO {table} (p, {colname}) values (1, 2)')
|
||||
# Validate that not only the write didn't fail, it actually
|
||||
# write the right thing to the view. NOTE: on a single-node
|
||||
# Scylla, view update is synchronous so we can just read and
|
||||
# don't need to wait or retry.
|
||||
assert list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]
|
||||
|
||||
# Same as test_mv_quoted_column_names above (reproducing issue #9450), just
|
||||
# check *view building* - i.e., pre-existing data in the base table that
|
||||
# needs to be copied to the view. The view building cannot return an error
|
||||
# to the user, but can fail to write the desired data into the view.
|
||||
def test_mv_quoted_column_names_build(cql, test_keyspace):
|
||||
for colname in ['"dog"', '"Dog"', 'DOG', '"to"', 'int']:
|
||||
with new_test_table(cql, test_keyspace, f'p int primary key, {colname} int') as table:
|
||||
cql.execute(f'INSERT INTO {table} (p, {colname}) values (1, 2)')
|
||||
with new_materialized_view(cql, table, '*', f'{colname}, p', f'{colname} is not null and p is not null') as mv:
|
||||
# When Scylla's view builder fails as it did in issue #9450,
|
||||
# there is no way to tell this state apart from a view build
|
||||
# that simply hasn't completed (besides looking at the logs,
|
||||
# which we don't). This means, unfortunately, that a failure
|
||||
# of this test is slow - it needs to wait for a timeout.
|
||||
start_time = time.time()
|
||||
while time.time() < start_time + 30:
|
||||
if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]:
|
||||
break
|
||||
assert list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]
|
||||
|
||||
64
test/cql-pytest/test_scan.py
Normal file
64
test/cql-pytest/test_scan.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# Copyright 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
#############################################################################
|
||||
# Tests for scanning SELECT requests (which read many rows and/or many
|
||||
# partitions).
|
||||
# We have a separate test file test_filtering.py for scans which also involve
|
||||
# filtering, and test_allow_filtering.py for checking when "ALLOW FILTERING"
|
||||
# is needed in scan. test_secondary_index.py also contains tests for scanning
|
||||
# using a secondary index.
|
||||
#############################################################################
|
||||
|
||||
import pytest
|
||||
from util import new_test_table
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
# Regression test for #9482
|
||||
def test_scan_ending_with_static_row(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "pk int, ck int, s int STATIC, v int, PRIMARY KEY (pk, ck)") as table:
|
||||
stmt = cql.prepare(f"UPDATE {table} SET s = ? WHERE pk = ?")
|
||||
for pk in range(100):
|
||||
cql.execute(stmt, (0, pk))
|
||||
|
||||
statement = SimpleStatement(f"SELECT * FROM {table}", fetch_size=10)
|
||||
# This will trigger an error in either processing or building the query
|
||||
# results. The success criteria for this test is the query finishing
|
||||
# without errors.
|
||||
res = list(cql.execute(statement))
|
||||
|
||||
|
||||
# Test that if we have multi-column restrictions on the clustering key
|
||||
# and additional filtering on regular columns, both restrictions are obeyed.
|
||||
# Reproduces #6200.
|
||||
def test_multi_column_restrictions_and_filtering(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "p int, c1 int, c2 int, r int, PRIMARY KEY (p, c1, c2)") as table:
|
||||
stmt = cql.prepare(f"INSERT INTO {table} (p, c1, c2, r) VALUES (1, ?, ?, ?)")
|
||||
for i in range(2):
|
||||
for j in range(2):
|
||||
cql.execute(stmt, [i, j, j])
|
||||
assert list(cql.execute(f"SELECT c1,c2,r FROM {table} WHERE p=1 AND (c1, c2) = (0,1)")) == [(0,1,1)]
|
||||
# Since in that result r=1, adding "AND r=1" should return the same
|
||||
# result, and adding "AND r=0" should return nothing.
|
||||
assert list(cql.execute(f"SELECT c1,c2,r FROM {table} WHERE p=1 AND (c1, c2) = (0,1) AND r=1 ALLOW FILTERING")) == [(0,1,1)]
|
||||
# Reproduces #6200:
|
||||
assert list(cql.execute(f"SELECT c1,c2,r FROM {table} WHERE p=1 AND (c1, c2) = (0,1) AND r=0 ALLOW FILTERING")) == []
|
||||
|
||||
# Test that if we have a range multi-column restrictions on the clustering key
|
||||
# and additional filtering on regular columns, both restrictions are obeyed.
|
||||
# Similar to test_multi_column_restrictions_and_filtering, but uses a range
|
||||
# restriction on the clustering key columns.
|
||||
# Reproduces #12014, the code is taken from a reproducer provided by a user.
|
||||
def test_multi_column_range_restrictions_and_filtering(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "pk int, ts timestamp, id int, processed boolean, PRIMARY KEY (pk, ts, id)") as table:
|
||||
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 0, true)")
|
||||
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 1, true)")
|
||||
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 2, false)")
|
||||
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 3, false)")
|
||||
# This select doesn't use multi-column restrictions, the result shouldn't change when it does.
|
||||
rows1 = list(cql.execute(f"SELECT id, processed FROM {table} WHERE pk = 0 AND ts >= 0 AND processed = false ALLOW FILTERING"))
|
||||
assert rows1 == [(2, False), (3, False)]
|
||||
# Reproduces #12014
|
||||
rows2 = list(cql.execute(f"SELECT id, processed FROM {table} WHERE pk = 0 AND (ts, id) >= (0, 0) AND processed = false ALLOW FILTERING"))
|
||||
assert rows1 == rows2
|
||||
@@ -2298,8 +2298,14 @@ public:
|
||||
};
|
||||
|
||||
size_t row_count = row_count_dist(_gen);
|
||||
for (size_t i = 0; i < row_count; ++i) {
|
||||
auto ckey = make_random_key();
|
||||
|
||||
std::unordered_set<clustering_key, clustering_key::hashing, clustering_key::equality> keys(
|
||||
0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema));
|
||||
while (keys.size() < row_count) {
|
||||
keys.emplace(make_random_key());
|
||||
}
|
||||
|
||||
for (auto&& ckey : keys) {
|
||||
is_continuous continuous = is_continuous(_bool_dist(_gen));
|
||||
if (_not_dummy_dist(_gen)) {
|
||||
deletable_row& row = m.partition().clustered_row(*_schema, ckey, is_dummy::no, continuous);
|
||||
|
||||
@@ -1115,4 +1115,12 @@ future<std::vector<mutation>> generate_random_mutations(
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<mutation>> generate_random_mutations(tests::random_schema& random_schema, size_t partition_count) {
|
||||
return generate_random_mutations(
|
||||
random_schema,
|
||||
default_timestamp_generator(),
|
||||
no_expiry_expiry_generator(),
|
||||
std::uniform_int_distribution<size_t>(partition_count, partition_count));
|
||||
}
|
||||
|
||||
} // namespace tests
|
||||
|
||||
@@ -255,4 +255,7 @@ future<std::vector<mutation>> generate_random_mutations(
|
||||
std::uniform_int_distribution<size_t> clustering_row_count_dist = std::uniform_int_distribution<size_t>(16, 128),
|
||||
std::uniform_int_distribution<size_t> range_tombstone_count_dist = std::uniform_int_distribution<size_t>(4, 16));
|
||||
|
||||
/// Generate exactly partition_count partitions. See the more general overload above.
|
||||
future<std::vector<mutation>> generate_random_mutations(tests::random_schema& random_schema, size_t partition_count);
|
||||
|
||||
} // namespace tests
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
|
||||
#include "test/raft/logical_timer.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
// A set of futures that can be polled to obtain the result of some ready future in the set.
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <seastar/core/future-util.hh>
|
||||
|
||||
#include "raft/logical_clock.hh"
|
||||
#include "raft/raft.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <utility>
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace thrift {
|
||||
|
||||
|
||||
@@ -153,6 +153,18 @@ namespace bloom_calculations {
|
||||
}
|
||||
return std::min(probs.size() - 1, size_t(v));
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the minimum supported bloom_filter_fp_chance value
|
||||
* if compute_bloom_spec() above is attempted with bloom_filter_fp_chance
|
||||
* lower than this, it will throw an unsupported_operation_exception.
|
||||
*/
|
||||
inline double min_supported_bloom_filter_fp_chance() {
|
||||
int max_buckets = probs.size() - 1;
|
||||
int max_K = probs[max_buckets].size() - 1;
|
||||
return probs[max_buckets][max_K];
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <boost/intrusive/parent_from_member.hpp>
|
||||
|
||||
// A movable pointer-like object paired with exactly one other object of the same type.
|
||||
|
||||
@@ -49,5 +49,5 @@ public:
|
||||
return _what.c_str();
|
||||
}
|
||||
|
||||
const std::error_code& code() const { return _code; }
|
||||
const std::error_code& code() const noexcept { return _code; }
|
||||
};
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
|
||||
#pragma once
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <seastar/util/concepts.hh>
|
||||
|
||||
namespace utils {
|
||||
|
||||
Reference in New Issue
Block a user