Compare commits
57 Commits
scylla-4.5
...
next-4.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0a8465345 | ||
|
|
1e6fe6391f | ||
|
|
237c67367c | ||
|
|
f5707399c3 | ||
|
|
4c28744bfd | ||
|
|
3b253e1166 | ||
|
|
2dedfacabf | ||
|
|
414e2687d4 | ||
|
|
3d63f12d3b | ||
|
|
e09f2d0ea0 | ||
|
|
484b23c08e | ||
|
|
97bcbd3c1f | ||
|
|
85d3fe744b | ||
|
|
48d4759fad | ||
|
|
c4e8d2e761 | ||
|
|
45c93db71f | ||
|
|
bab3afca5e | ||
|
|
8a0f3cc136 | ||
|
|
c789a18195 | ||
|
|
195c8a6f80 | ||
|
|
8a263600dd | ||
|
|
da806c4451 | ||
|
|
619bfb7c4e | ||
|
|
06795d29c5 | ||
|
|
7e625f3397 | ||
|
|
865cfebfed | ||
|
|
1963d1cc25 | ||
|
|
53b0aaa4e8 | ||
|
|
ebfa2279a4 | ||
|
|
5e38a69f6d | ||
|
|
6a54033a63 | ||
|
|
ec44412cd9 | ||
|
|
8f45f65b09 | ||
|
|
5a7324c423 | ||
|
|
2eb0ad7b4f | ||
|
|
5d7064e00e | ||
|
|
b56b9f5ed5 | ||
|
|
c8f14886dc | ||
|
|
5c8057749b | ||
|
|
a35646b874 | ||
|
|
da8708932d | ||
|
|
78a545716a | ||
|
|
1488278fc1 | ||
|
|
d9455a910f | ||
|
|
406b4bce8d | ||
|
|
417e853b9b | ||
|
|
44c784cb79 | ||
|
|
ab425a11a8 | ||
|
|
2228a1a92a | ||
|
|
36b190a65e | ||
|
|
f7e5339c14 | ||
|
|
4c4972cb33 | ||
|
|
50ce5bef2c | ||
|
|
f864eea844 | ||
|
|
b9735ab079 | ||
|
|
766e16f19e | ||
|
|
e6520df41c |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.5.2
|
||||
VERSION=4.5.7
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2442,8 +2442,8 @@ static bool hierarchy_actions(
|
||||
if (newv) {
|
||||
rjson::set_with_string_name(v, attr, std::move(*newv));
|
||||
} else {
|
||||
throw api_error::validation(format("Can't remove document path {} - not present in item",
|
||||
subh.get_value()._path));
|
||||
// Removing a.b when a is a map but a.b doesn't exist
|
||||
// is silently ignored. It's not considered an error.
|
||||
}
|
||||
} else {
|
||||
throw api_error::validation(format("UpdateExpression: document paths not valid for this item:{}", h));
|
||||
|
||||
@@ -129,6 +129,10 @@ public:
|
||||
[&] (const json::json_return_type& json_return_value) {
|
||||
slogger.trace("api_handler success case");
|
||||
if (json_return_value._body_writer) {
|
||||
// Unfortunately, write_body() forces us to choose
|
||||
// from a fixed and irrelevant list of "mime-types"
|
||||
// at this point. But we'll override it with the
|
||||
// one (application/x-amz-json-1.0) below.
|
||||
rep->write_body("json", std::move(json_return_value._body_writer));
|
||||
} else {
|
||||
rep->_content += json_return_value._res;
|
||||
@@ -141,7 +145,7 @@ public:
|
||||
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
});
|
||||
}), _type("json") { }
|
||||
}) { }
|
||||
|
||||
api_handler(const api_handler&) = default;
|
||||
future<std::unique_ptr<reply>> handle(const sstring& path,
|
||||
@@ -149,7 +153,8 @@ public:
|
||||
handle_CORS(*req, *rep, false);
|
||||
return _f_handle(std::move(req), std::move(rep)).then(
|
||||
[this](std::unique_ptr<reply> rep) {
|
||||
rep->done(_type);
|
||||
rep->set_mime_type("application/x-amz-json-1.0");
|
||||
rep->done();
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
});
|
||||
}
|
||||
@@ -163,7 +168,6 @@ protected:
|
||||
}
|
||||
|
||||
future_handler_function _f_handle;
|
||||
sstring _type;
|
||||
};
|
||||
|
||||
class gated_handler : public handler_base {
|
||||
@@ -246,24 +250,31 @@ future<> server::verify_signature(const request& req, const chunked_content& con
|
||||
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
|
||||
}
|
||||
std::string host = host_it->second;
|
||||
std::vector<std::string_view> credentials_raw = split(authorization_it->second, ' ');
|
||||
std::string_view authorization_header = authorization_it->second;
|
||||
auto pos = authorization_header.find_first_of(' ');
|
||||
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
|
||||
throw api_error::invalid_signature(format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
|
||||
}
|
||||
authorization_header.remove_prefix(pos+1);
|
||||
std::string credential;
|
||||
std::string user_signature;
|
||||
std::string signed_headers_str;
|
||||
std::vector<std::string_view> signed_headers;
|
||||
for (std::string_view entry : credentials_raw) {
|
||||
do {
|
||||
// Either one of a comma or space can mark the end of an entry
|
||||
pos = authorization_header.find_first_of(" ,");
|
||||
std::string_view entry = authorization_header.substr(0, pos);
|
||||
if (pos != std::string_view::npos) {
|
||||
authorization_header.remove_prefix(pos + 1);
|
||||
}
|
||||
if (entry.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<std::string_view> entry_split = split(entry, '=');
|
||||
if (entry_split.size() != 2) {
|
||||
if (entry != "AWS4-HMAC-SHA256") {
|
||||
throw api_error::invalid_signature(format("Only AWS4-HMAC-SHA256 algorithm is supported. Found: {}", entry));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
std::string_view auth_value = entry_split[1];
|
||||
// Commas appear as an additional (quite redundant) delimiter
|
||||
if (auth_value.back() == ',') {
|
||||
auth_value.remove_suffix(1);
|
||||
}
|
||||
if (entry_split[0] == "Credential") {
|
||||
credential = std::string(auth_value);
|
||||
} else if (entry_split[0] == "Signature") {
|
||||
@@ -273,7 +284,8 @@ future<> server::verify_signature(const request& req, const chunked_content& con
|
||||
signed_headers = split(auth_value, ';');
|
||||
std::sort(signed_headers.begin(), signed_headers.end());
|
||||
}
|
||||
}
|
||||
} while (pos != std::string_view::npos);
|
||||
|
||||
std::vector<std::string_view> credential_split = split(credential, '/');
|
||||
if (credential_split.size() != 5) {
|
||||
throw api_error::validation(format("Incorrect credential information format: {}", credential));
|
||||
|
||||
@@ -38,6 +38,7 @@ stats::stats() : api_operations{} {
|
||||
#define OPERATION_LATENCY(name, CamelCaseName) \
|
||||
seastar::metrics::make_histogram("op_latency", \
|
||||
seastar::metrics::description("Latency histogram of an operation via Alternator API"), {op(CamelCaseName)}, [this]{return to_metrics_histogram(api_operations.name);}),
|
||||
OPERATION(batch_get_item, "BatchGetItem")
|
||||
OPERATION(batch_write_item, "BatchWriteItem")
|
||||
OPERATION(create_backup, "CreateBackup")
|
||||
OPERATION(create_global_table, "CreateGlobalTable")
|
||||
|
||||
@@ -262,7 +262,7 @@ void set_repair(http_context& ctx, routes& r, sharded<netw::messaging_service>&
|
||||
try {
|
||||
res = fut.get0();
|
||||
} catch (std::exception& e) {
|
||||
return make_exception_future<json::json_return_type>(httpd::server_error_exception(e.what()));
|
||||
return make_exception_future<json::json_return_type>(httpd::bad_param_exception(e.what()));
|
||||
}
|
||||
return make_ready_future<json::json_return_type>(json::json_return_type(res));
|
||||
});
|
||||
|
||||
@@ -79,6 +79,54 @@ atomic_cell::atomic_cell(const abstract_type& type, atomic_cell_view other)
|
||||
set_view(_data);
|
||||
}
|
||||
|
||||
// Based on:
|
||||
// - org.apache.cassandra.db.AbstractCell#reconcile()
|
||||
// - org.apache.cassandra.db.BufferExpiringCell#reconcile()
|
||||
// - org.apache.cassandra.db.BufferDeletedCell#reconcile()
|
||||
int
|
||||
compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
|
||||
if (left.timestamp() != right.timestamp()) {
|
||||
return left.timestamp() > right.timestamp() ? 1 : -1;
|
||||
}
|
||||
if (left.is_live() != right.is_live()) {
|
||||
return left.is_live() ? -1 : 1;
|
||||
}
|
||||
if (left.is_live()) {
|
||||
auto c = compare_unsigned(left.value(), right.value());
|
||||
if (c != 0) {
|
||||
return c;
|
||||
}
|
||||
if (left.is_live_and_has_ttl() != right.is_live_and_has_ttl()) {
|
||||
// prefer expiring cells.
|
||||
return left.is_live_and_has_ttl() ? 1 : -1;
|
||||
}
|
||||
if (left.is_live_and_has_ttl()) {
|
||||
if (left.expiry() != right.expiry()) {
|
||||
return left.expiry() < right.expiry() ? -1 : 1;
|
||||
} else {
|
||||
// prefer the cell that was written later,
|
||||
// so it survives longer after it expires, until purged.
|
||||
if (left.ttl() != right.ttl()) {
|
||||
return left.ttl() < right.ttl() ? 1 : -1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Both are deleted
|
||||
if (left.deletion_time() != right.deletion_time()) {
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
< (uint64_t) right.deletion_time().time_since_epoch().count() ? -1 : 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
atomic_cell_or_collection atomic_cell_or_collection::copy(const abstract_type& type) const {
|
||||
if (_data.empty()) {
|
||||
return atomic_cell_or_collection();
|
||||
|
||||
@@ -576,8 +576,8 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
|
||||
clogger.trace("csm {}: insert dummy at {}", fmt::ptr(this), _lower_bound);
|
||||
auto it = with_allocator(_lsa_manager.region().allocator(), [&] {
|
||||
auto& rows = _snp->version()->partition().clustered_rows();
|
||||
auto new_entry = current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no);
|
||||
return rows.insert_before(_next_row.get_iterator_in_latest_version(), *new_entry);
|
||||
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no));
|
||||
return rows.insert_before(_next_row.get_iterator_in_latest_version(), std::move(new_entry));
|
||||
});
|
||||
_snp->tracker()->insert(*it);
|
||||
_last_row = partition_snapshot_row_weakref(*_snp, it, true);
|
||||
|
||||
26
cdc/log.cc
26
cdc/log.cc
@@ -74,7 +74,7 @@ using namespace std::chrono_literals;
|
||||
logging::logger cdc_log("cdc");
|
||||
|
||||
namespace cdc {
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {}, schema_ptr = nullptr);
|
||||
}
|
||||
|
||||
static constexpr auto cdc_group_name = "cdc";
|
||||
@@ -221,7 +221,7 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt, log_schema);
|
||||
|
||||
auto log_mut = log_schema
|
||||
? db::schema_tables::make_update_table_mutations(db, keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
|
||||
@@ -490,7 +490,7 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
|
||||
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
|
||||
}
|
||||
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid, schema_ptr old) {
|
||||
schema_builder b(s.ks_name(), log_name(s.cf_name()));
|
||||
b.with_partitioner("com.scylladb.dht.CDCPartitioner");
|
||||
b.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
@@ -571,6 +571,20 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
b.set_uuid(*uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* #10473 - if we are redefining the log table, we need to ensure any dropped
|
||||
* columns are registered in "dropped_columns" table, otherwise clients will not
|
||||
* be able to read data older than now.
|
||||
*/
|
||||
if (old) {
|
||||
// not super efficient, but we don't do this often.
|
||||
for (auto& col : old->all_columns()) {
|
||||
if (!b.has_column({col.name(), col.name_as_text() })) {
|
||||
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return b.build();
|
||||
}
|
||||
|
||||
@@ -716,16 +730,16 @@ private:
|
||||
}
|
||||
return false;
|
||||
}
|
||||
bool compare(const T&, const value_type& v);
|
||||
int32_t compare(const T&, const value_type& v);
|
||||
};
|
||||
|
||||
template<>
|
||||
bool maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
int32_t maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
return _type.compare(t, v.first);
|
||||
}
|
||||
|
||||
template<>
|
||||
bool maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
int32_t maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
return _type.compare(t, v);
|
||||
}
|
||||
|
||||
|
||||
@@ -35,6 +35,28 @@ struct authorized_prepared_statements_cache_size {
|
||||
class authorized_prepared_statements_cache_key {
|
||||
public:
|
||||
using cache_key_type = std::pair<auth::authenticated_user, typename cql3::prepared_cache_key_type::cache_key_type>;
|
||||
|
||||
struct view {
|
||||
const auth::authenticated_user& user_ref;
|
||||
const cql3::prepared_cache_key_type& prep_cache_key_ref;
|
||||
};
|
||||
|
||||
struct view_hasher {
|
||||
size_t operator()(const view& kv) {
|
||||
return cql3::authorized_prepared_statements_cache_key::hash(kv.user_ref, kv.prep_cache_key_ref.key());
|
||||
}
|
||||
};
|
||||
|
||||
struct view_equal {
|
||||
bool operator()(const authorized_prepared_statements_cache_key& k1, const view& k2) {
|
||||
return k1.key().first == k2.user_ref && k1.key().second == k2.prep_cache_key_ref.key();
|
||||
}
|
||||
|
||||
bool operator()(const view& k2, const authorized_prepared_statements_cache_key& k1) {
|
||||
return operator()(k1, k2);
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
cache_key_type _key;
|
||||
|
||||
@@ -100,10 +122,12 @@ private:
|
||||
|
||||
public:
|
||||
using key_type = cache_key_type;
|
||||
using key_view_type = typename key_type::view;
|
||||
using key_view_hasher = typename key_type::view_hasher;
|
||||
using key_view_equal = typename key_type::view_equal;
|
||||
using value_type = checked_weak_ptr;
|
||||
using entry_is_too_big = typename cache_type::entry_is_too_big;
|
||||
using iterator = typename cache_type::iterator;
|
||||
|
||||
using value_ptr = typename cache_type::value_ptr;
|
||||
private:
|
||||
cache_type _cache;
|
||||
logging::logger& _logger;
|
||||
@@ -124,38 +148,12 @@ public:
|
||||
}).discard_result();
|
||||
}
|
||||
|
||||
iterator find(const auth::authenticated_user& user, const cql3::prepared_cache_key_type& prep_cache_key) {
|
||||
struct key_view {
|
||||
const auth::authenticated_user& user_ref;
|
||||
const cql3::prepared_cache_key_type& prep_cache_key_ref;
|
||||
};
|
||||
|
||||
struct hasher {
|
||||
size_t operator()(const key_view& kv) {
|
||||
return cql3::authorized_prepared_statements_cache_key::hash(kv.user_ref, kv.prep_cache_key_ref.key());
|
||||
}
|
||||
};
|
||||
|
||||
struct equal {
|
||||
bool operator()(const key_type& k1, const key_view& k2) {
|
||||
return k1.key().first == k2.user_ref && k1.key().second == k2.prep_cache_key_ref.key();
|
||||
}
|
||||
|
||||
bool operator()(const key_view& k2, const key_type& k1) {
|
||||
return operator()(k1, k2);
|
||||
}
|
||||
};
|
||||
|
||||
return _cache.find(key_view{user, prep_cache_key}, hasher(), equal());
|
||||
}
|
||||
|
||||
iterator end() {
|
||||
return _cache.end();
|
||||
value_ptr find(const auth::authenticated_user& user, const cql3::prepared_cache_key_type& prep_cache_key) {
|
||||
return _cache.find(key_view_type{user, prep_cache_key}, key_view_hasher(), key_view_equal());
|
||||
}
|
||||
|
||||
void remove(const auth::authenticated_user& user, const cql3::prepared_cache_key_type& prep_cache_key) {
|
||||
iterator it = find(user, prep_cache_key);
|
||||
_cache.remove(it);
|
||||
_cache.remove(key_view_type{user, prep_cache_key}, key_view_hasher(), key_view_equal());
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
|
||||
@@ -103,9 +103,7 @@ public:
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const = 0;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const = 0;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;
|
||||
|
||||
virtual shared_ptr<const metadata> get_result_metadata() const = 0;
|
||||
|
||||
|
||||
@@ -102,13 +102,7 @@ private:
|
||||
using cache_key_type = typename prepared_cache_key_type::cache_key_type;
|
||||
using cache_type = utils::loading_cache<cache_key_type, prepared_cache_entry, utils::loading_cache_reload_enabled::no, prepared_cache_entry_size, utils::tuple_hash, std::equal_to<cache_key_type>, prepared_cache_stats_updater>;
|
||||
using cache_value_ptr = typename cache_type::value_ptr;
|
||||
using cache_iterator = typename cache_type::iterator;
|
||||
using checked_weak_ptr = typename statements::prepared_statement::checked_weak_ptr;
|
||||
struct value_extractor_fn {
|
||||
checked_weak_ptr operator()(prepared_cache_entry& e) const {
|
||||
return e->checked_weak_from_this();
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
static const std::chrono::minutes entry_expiry;
|
||||
@@ -116,12 +110,9 @@ public:
|
||||
using key_type = prepared_cache_key_type;
|
||||
using value_type = checked_weak_ptr;
|
||||
using statement_is_too_big = typename cache_type::entry_is_too_big;
|
||||
/// \note both iterator::reference and iterator::value_type are checked_weak_ptr
|
||||
using iterator = boost::transform_iterator<value_extractor_fn, cache_iterator>;
|
||||
|
||||
private:
|
||||
cache_type _cache;
|
||||
value_extractor_fn _value_extractor_fn;
|
||||
|
||||
public:
|
||||
prepared_statements_cache(logging::logger& logger, size_t size)
|
||||
@@ -135,16 +126,12 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
iterator find(const key_type& key) {
|
||||
return boost::make_transform_iterator(_cache.find(key.key()), _value_extractor_fn);
|
||||
}
|
||||
|
||||
iterator end() {
|
||||
return boost::make_transform_iterator(_cache.end(), _value_extractor_fn);
|
||||
}
|
||||
|
||||
iterator begin() {
|
||||
return boost::make_transform_iterator(_cache.begin(), _value_extractor_fn);
|
||||
value_type find(const key_type& key) {
|
||||
cache_value_ptr vp = _cache.find(key.key());
|
||||
if (vp) {
|
||||
return (*vp)->checked_weak_from_this();
|
||||
}
|
||||
return value_type();
|
||||
}
|
||||
|
||||
template <typename Pred>
|
||||
|
||||
@@ -943,7 +943,7 @@ bool query_processor::migration_subscriber::should_invalidate(
|
||||
sstring ks_name,
|
||||
std::optional<sstring> cf_name,
|
||||
::shared_ptr<cql_statement> statement) {
|
||||
return statement->depends_on_keyspace(ks_name) && (!cf_name || statement->depends_on_column_family(*cf_name));
|
||||
return statement->depends_on(ks_name, cf_name);
|
||||
}
|
||||
|
||||
future<> query_processor::query_internal(
|
||||
|
||||
@@ -179,10 +179,10 @@ public:
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
|
||||
if (user) {
|
||||
auto it = _authorized_prepared_cache.find(*user, key);
|
||||
if (it != _authorized_prepared_cache.end()) {
|
||||
auto vp = _authorized_prepared_cache.find(*user, key);
|
||||
if (vp) {
|
||||
try {
|
||||
return it->get()->checked_weak_from_this();
|
||||
return vp->get()->checked_weak_from_this();
|
||||
} catch (seastar::checked_ptr_is_null_exception&) {
|
||||
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
|
||||
_authorized_prepared_cache.remove(*user, key);
|
||||
@@ -193,11 +193,7 @@ public:
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
|
||||
auto it = _prepared_cache.find(key);
|
||||
if (it == _prepared_cache.end()) {
|
||||
return statements::prepared_statement::checked_weak_ptr();
|
||||
}
|
||||
return *it;
|
||||
return _prepared_cache.find(key);
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
|
||||
@@ -133,6 +133,7 @@ statement_restrictions::statement_restrictions(schema_ptr schema, bool allow_fil
|
||||
, _partition_key_restrictions(get_initial_partition_key_restrictions(allow_filtering))
|
||||
, _clustering_columns_restrictions(get_initial_clustering_key_restrictions(allow_filtering))
|
||||
, _nonprimary_key_restrictions(::make_shared<single_column_restrictions>(schema))
|
||||
, _partition_range_is_simple(true)
|
||||
{ }
|
||||
#if 0
|
||||
static const column_definition*
|
||||
@@ -335,7 +336,7 @@ statement_restrictions::statement_restrictions(database& db,
|
||||
}
|
||||
|
||||
if (!_nonprimary_key_restrictions->empty()) {
|
||||
if (_has_queriable_regular_index) {
|
||||
if (_has_queriable_regular_index && _partition_range_is_simple) {
|
||||
_uses_secondary_indexing = true;
|
||||
} else if (!allow_filtering) {
|
||||
throw exceptions::invalid_request_exception("Cannot execute this query as it might involve data filtering and "
|
||||
@@ -377,6 +378,7 @@ void statement_restrictions::add_single_column_restriction(::shared_ptr<single_c
|
||||
"Only EQ and IN relation are supported on the partition key (unless you use the token() function or allow filtering)");
|
||||
}
|
||||
_partition_key_restrictions = _partition_key_restrictions->merge_to(_schema, restriction);
|
||||
_partition_range_is_simple &= !find(restriction->expression, expr::oper_t::IN);
|
||||
} else if (def.is_clustering_key()) {
|
||||
_clustering_columns_restrictions = _clustering_columns_restrictions->merge_to(_schema, restriction);
|
||||
} else {
|
||||
|
||||
@@ -107,6 +107,8 @@ private:
|
||||
std::optional<expr::expression> _where; ///< The entire WHERE clause.
|
||||
std::vector<expr::expression> _clustering_prefix_restrictions; ///< Parts of _where defining the clustering slice.
|
||||
|
||||
bool _partition_range_is_simple; ///< False iff _partition_range_restrictions imply a Cartesian product.
|
||||
|
||||
public:
|
||||
/**
|
||||
* Creates a new empty <code>StatementRestrictions</code>.
|
||||
|
||||
@@ -46,13 +46,7 @@ uint32_t cql3::statements::authentication_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool cql3::statements::authentication_statement::depends_on_keyspace(
|
||||
const sstring& ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool cql3::statements::authentication_statement::depends_on_column_family(
|
||||
const sstring& cf_name) const {
|
||||
bool cql3::statements::authentication_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -56,9 +56,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -46,13 +46,7 @@ uint32_t cql3::statements::authorization_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool cql3::statements::authorization_statement::depends_on_keyspace(
|
||||
const sstring& ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool cql3::statements::authorization_statement::depends_on_column_family(
|
||||
const sstring& cf_name) const {
|
||||
bool cql3::statements::authorization_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -60,9 +60,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -93,14 +93,9 @@ batch_statement::batch_statement(type type_,
|
||||
{
|
||||
}
|
||||
|
||||
bool batch_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
bool batch_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool batch_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
{
|
||||
return false;
|
||||
return boost::algorithm::any_of(_statements, [&ks_name, &cf_name] (auto&& s) { return s.statement->depends_on(ks_name, cf_name); });
|
||||
}
|
||||
|
||||
uint32_t batch_statement::get_bound_terms() const
|
||||
|
||||
@@ -120,9 +120,7 @@ public:
|
||||
std::unique_ptr<attributes> attrs,
|
||||
cql_stats& stats);
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
|
||||
@@ -540,12 +540,8 @@ modification_statement::validate(service::storage_proxy&, const service::client_
|
||||
}
|
||||
}
|
||||
|
||||
bool modification_statement::depends_on_keyspace(const sstring& ks_name) const {
|
||||
return keyspace() == ks_name;
|
||||
}
|
||||
|
||||
bool modification_statement::depends_on_column_family(const sstring& cf_name) const {
|
||||
return column_family() == cf_name;
|
||||
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
|
||||
}
|
||||
|
||||
void modification_statement::add_operation(::shared_ptr<operation> op) {
|
||||
|
||||
@@ -173,9 +173,7 @@ public:
|
||||
// Validate before execute, using client state and current schema
|
||||
void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
void add_operation(::shared_ptr<operation> op);
|
||||
|
||||
|
||||
@@ -67,12 +67,7 @@ future<> schema_altering_statement::grant_permissions_to_creator(const service::
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool schema_altering_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool schema_altering_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool schema_altering_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -81,9 +81,7 @@ protected:
|
||||
*/
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
|
||||
@@ -190,12 +190,8 @@ void select_statement::validate(service::storage_proxy&, const service::client_s
|
||||
// Nothing to do, all validation has been done by raw_statemet::prepare()
|
||||
}
|
||||
|
||||
bool select_statement::depends_on_keyspace(const sstring& ks_name) const {
|
||||
return keyspace() == ks_name;
|
||||
}
|
||||
|
||||
bool select_statement::depends_on_column_family(const sstring& cf_name) const {
|
||||
return column_family() == cf_name;
|
||||
bool select_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
|
||||
}
|
||||
|
||||
const sstring& select_statement::keyspace() const {
|
||||
@@ -965,6 +961,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
|
||||
}
|
||||
|
||||
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
|
||||
paging_state_copy->set_remaining(internal_paging_size);
|
||||
paging_state_copy->set_partition_key(std::move(index_pk));
|
||||
paging_state_copy->set_clustering_key(std::move(index_ck));
|
||||
return std::move(paging_state_copy);
|
||||
|
||||
@@ -121,8 +121,7 @@ public:
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const;
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor& qp,
|
||||
service::query_state& state, const query_options& options) const override;
|
||||
|
||||
@@ -66,12 +66,7 @@ std::unique_ptr<prepared_statement> truncate_statement::prepare(database& db,cql
|
||||
return std::make_unique<prepared_statement>(::make_shared<truncate_statement>(*this));
|
||||
}
|
||||
|
||||
bool truncate_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool truncate_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool truncate_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -60,9 +60,7 @@ public:
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@
|
||||
#include "types/list.hh"
|
||||
#include "types/user.hh"
|
||||
#include "concrete_types.hh"
|
||||
#include "validation.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -252,6 +253,7 @@ insert_prepared_json_statement::build_partition_keys(const query_options& option
|
||||
exploded.emplace_back(json_value->second);
|
||||
}
|
||||
auto pkey = partition_key::from_optional_exploded(*s, std::move(exploded));
|
||||
validation::validate_cql_key(*s, pkey);
|
||||
auto k = query::range<query::ring_position>::make_singular(dht::decorate_key(*s, std::move(pkey)));
|
||||
ranges.emplace_back(std::move(k));
|
||||
return ranges;
|
||||
|
||||
@@ -73,12 +73,7 @@ std::unique_ptr<prepared_statement> use_statement::prepare(database& db, cql_sta
|
||||
|
||||
}
|
||||
|
||||
bool use_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool use_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool use_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -61,9 +61,7 @@ public:
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
|
||||
|
||||
61
database.cc
61
database.cc
@@ -975,10 +975,9 @@ bool database::update_column_family(schema_ptr new_schema) {
|
||||
return columns_changed;
|
||||
}
|
||||
|
||||
future<> database::remove(const column_family& cf) noexcept {
|
||||
void database::remove(const table& cf) noexcept {
|
||||
auto s = cf.schema();
|
||||
auto& ks = find_keyspace(s->ks_name());
|
||||
_querier_cache.evict_all_for_table(s->id());
|
||||
_column_families.erase(s->id());
|
||||
ks.metadata()->remove_column_family(s);
|
||||
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
|
||||
@@ -989,20 +988,26 @@ future<> database::remove(const column_family& cf) noexcept {
|
||||
// Drop view mutations received after base table drop.
|
||||
}
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func tsf, bool snapshot) {
|
||||
auto& ks = find_keyspace(ks_name);
|
||||
auto uuid = find_uuid(ks_name, cf_name);
|
||||
auto cf = _column_families.at(uuid);
|
||||
co_await remove(*cf);
|
||||
remove(*cf);
|
||||
cf->clear_views();
|
||||
co_return co_await cf->await_pending_ops().then([this, &ks, cf, tsf = std::move(tsf), snapshot] {
|
||||
return truncate(ks, *cf, std::move(tsf), snapshot).finally([this, cf] {
|
||||
return cf->stop();
|
||||
});
|
||||
}).finally([cf] {});
|
||||
co_await cf->await_pending_ops();
|
||||
_querier_cache.evict_all_for_table(cf->schema()->id());
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await truncate(ks, *cf, std::move(tsf), snapshot);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await cf->stop();
|
||||
if (ex) {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
const utils::UUID& database::find_uuid(std::string_view ks, std::string_view cf) const {
|
||||
@@ -1384,44 +1389,6 @@ database::existing_index_names(const sstring& ks_name, const sstring& cf_to_excl
|
||||
return names;
|
||||
}
|
||||
|
||||
// Based on:
|
||||
// - org.apache.cassandra.db.AbstractCell#reconcile()
|
||||
// - org.apache.cassandra.db.BufferExpiringCell#reconcile()
|
||||
// - org.apache.cassandra.db.BufferDeletedCell#reconcile()
|
||||
int
|
||||
compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
|
||||
if (left.timestamp() != right.timestamp()) {
|
||||
return left.timestamp() > right.timestamp() ? 1 : -1;
|
||||
}
|
||||
if (left.is_live() != right.is_live()) {
|
||||
return left.is_live() ? -1 : 1;
|
||||
}
|
||||
if (left.is_live()) {
|
||||
auto c = compare_unsigned(left.value(), right.value());
|
||||
if (c != 0) {
|
||||
return c;
|
||||
}
|
||||
if (left.is_live_and_has_ttl() != right.is_live_and_has_ttl()) {
|
||||
// prefer expiring cells.
|
||||
return left.is_live_and_has_ttl() ? 1 : -1;
|
||||
}
|
||||
if (left.is_live_and_has_ttl() && left.expiry() != right.expiry()) {
|
||||
return left.expiry() < right.expiry() ? -1 : 1;
|
||||
}
|
||||
} else {
|
||||
// Both are deleted
|
||||
if (left.deletion_time() != right.deletion_time()) {
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
< (uint64_t) right.deletion_time().time_since_epoch().count() ? -1 : 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
|
||||
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
|
||||
|
||||
@@ -1367,6 +1367,7 @@ private:
|
||||
Future update_write_metrics(Future&& f);
|
||||
void update_write_metrics_for_timed_out_write();
|
||||
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, bool is_bootstrap, system_keyspace system);
|
||||
void remove(const table&) noexcept;
|
||||
public:
|
||||
static utils::UUID empty_version;
|
||||
|
||||
@@ -1550,7 +1551,6 @@ public:
|
||||
|
||||
bool update_column_family(schema_ptr s);
|
||||
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);
|
||||
future<> remove(const column_family&) noexcept;
|
||||
|
||||
const logalloc::region_group& dirty_memory_region_group() const {
|
||||
return _dirty_memory_manager.region_group();
|
||||
|
||||
@@ -215,6 +215,12 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
future<flush_permit> get_all_flush_permits() {
|
||||
return get_units(_background_work_flush_serializer, _max_background_work).then([this] (auto&& units) {
|
||||
return this->get_flush_permit(std::move(units));
|
||||
});
|
||||
}
|
||||
|
||||
bool has_extraneous_flushes_requested() const {
|
||||
return _extraneous_flushes > 0;
|
||||
}
|
||||
|
||||
50
dist/common/scripts/scylla_io_setup
vendored
50
dist/common/scripts/scylla_io_setup
vendored
@@ -229,6 +229,52 @@ if __name__ == "__main__":
|
||||
disk_properties["read_bandwidth"] = 507338935 * nr_disks
|
||||
disk_properties["write_iops"] = 57100 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 483141731 * nr_disks
|
||||
elif idata.instance_class() in ("c6gd", "m6gd", "r6gd", "x2gd"):
|
||||
if idata.instance_size() == "medium":
|
||||
disk_properties["read_iops"] = 14808
|
||||
disk_properties["read_bandwidth"] = 77869147
|
||||
disk_properties["write_iops"] = 5972
|
||||
disk_properties["write_bandwidth"] = 32820302
|
||||
elif idata.instance_size() == "large":
|
||||
disk_properties["read_iops"] = 29690
|
||||
disk_properties["read_bandwidth"] = 157712240
|
||||
disk_properties["write_iops"] = 12148
|
||||
disk_properties["write_bandwidth"] = 65978069
|
||||
elif idata.instance_size() == "xlarge":
|
||||
disk_properties["read_iops"] = 59688
|
||||
disk_properties["read_bandwidth"] = 318762880
|
||||
disk_properties["write_iops"] = 24449
|
||||
disk_properties["write_bandwidth"] = 133311808
|
||||
elif idata.instance_size() == "2xlarge":
|
||||
disk_properties["read_iops"] = 119353
|
||||
disk_properties["read_bandwidth"] = 634795733
|
||||
disk_properties["write_iops"] = 49069
|
||||
disk_properties["write_bandwidth"] = 266841680
|
||||
elif idata.instance_size() == "4xlarge":
|
||||
disk_properties["read_iops"] = 237196
|
||||
disk_properties["read_bandwidth"] = 1262309504
|
||||
disk_properties["write_iops"] = 98884
|
||||
disk_properties["write_bandwidth"] = 533938080
|
||||
elif idata.instance_size() == "8xlarge":
|
||||
disk_properties["read_iops"] = 442945
|
||||
disk_properties["read_bandwidth"] = 2522688939
|
||||
disk_properties["write_iops"] = 166021
|
||||
disk_properties["write_bandwidth"] = 1063041152
|
||||
elif idata.instance_size() == "12xlarge":
|
||||
disk_properties["read_iops"] = 353691 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 1908192256 * nr_disks
|
||||
disk_properties["write_iops"] = 146732 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 806399360 * nr_disks
|
||||
elif idata.instance_size() == "16xlarge":
|
||||
disk_properties["read_iops"] = 426893 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 2525781589 * nr_disks
|
||||
disk_properties["write_iops"] = 161740 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 1063389952 * nr_disks
|
||||
elif idata.instance_size() == "metal":
|
||||
disk_properties["read_iops"] = 416257 * nr_disks
|
||||
disk_properties["read_bandwidth"] = 2527296683 * nr_disks
|
||||
disk_properties["write_iops"] = 156326 * nr_disks
|
||||
disk_properties["write_bandwidth"] = 1063657088 * nr_disks
|
||||
properties_file = open(etcdir() + "/scylla.d/io_properties.yaml", "w")
|
||||
yaml.dump({ "disks": [ disk_properties ] }, properties_file, default_flow_style=False)
|
||||
ioconf = open(etcdir() + "/scylla.d/io.conf", "w")
|
||||
@@ -254,7 +300,7 @@ if __name__ == "__main__":
|
||||
disk_properties["read_bandwidth"] = 2650 * mbs
|
||||
disk_properties["write_iops"] = 360000
|
||||
disk_properties["write_bandwidth"] = 1400 * mbs
|
||||
elif nr_disks == "16":
|
||||
elif nr_disks == 16:
|
||||
disk_properties["read_iops"] = 1600000
|
||||
disk_properties["read_bandwidth"] = 4521251328
|
||||
#below is google, above is our measured
|
||||
@@ -263,7 +309,7 @@ if __name__ == "__main__":
|
||||
disk_properties["write_bandwidth"] = 2759452672
|
||||
#below is google, above is our measured
|
||||
#disk_properties["write_bandwidth"] = 3120 * mbs
|
||||
elif nr_disks == "24":
|
||||
elif nr_disks == 24:
|
||||
disk_properties["read_iops"] = 2400000
|
||||
disk_properties["read_bandwidth"] = 5921532416
|
||||
#below is google, above is our measured
|
||||
|
||||
11
dist/common/scripts/scylla_raid_setup
vendored
11
dist/common/scripts/scylla_raid_setup
vendored
@@ -115,10 +115,6 @@ if __name__ == '__main__':
|
||||
pkg_install('xfsprogs')
|
||||
if not shutil.which('mdadm'):
|
||||
pkg_install('mdadm')
|
||||
try:
|
||||
md_service = systemd_unit('mdmonitor.service')
|
||||
except SystemdException:
|
||||
md_service = systemd_unit('mdadm.service')
|
||||
|
||||
print('Creating {type} for scylla using {nr_disk} disk(s): {disks}'.format(type='RAID0' if raid else 'XFS volume', nr_disk=len(disks), disks=args.disks))
|
||||
procs=[]
|
||||
@@ -153,15 +149,11 @@ if __name__ == '__main__':
|
||||
os.makedirs(mount_at, exist_ok=True)
|
||||
|
||||
uuid = run(f'blkid -s UUID -o value {fsdev}', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
after = 'local-fs.target'
|
||||
if raid:
|
||||
after += f' {md_service}'
|
||||
unit_data = f'''
|
||||
[Unit]
|
||||
Description=Scylla data directory
|
||||
Before=scylla-server.service
|
||||
After={after}
|
||||
Wants={md_service}
|
||||
After=local-fs.target
|
||||
DefaultDependencies=no
|
||||
|
||||
[Mount]
|
||||
@@ -185,7 +177,6 @@ WantedBy=multi-user.target
|
||||
f.write(f'RequiresMountsFor={mount_at}\n')
|
||||
|
||||
systemd_unit.reload()
|
||||
md_service.start()
|
||||
mount = systemd_unit(mntunit_bn)
|
||||
mount.start()
|
||||
if args.enable_on_nextboot:
|
||||
|
||||
5
dist/common/scripts/scylla_util.py
vendored
5
dist/common/scripts/scylla_util.py
vendored
@@ -147,6 +147,11 @@ class gcp_instance:
|
||||
if af == socket.AF_INET:
|
||||
addr, port = sa
|
||||
if addr == "169.254.169.254":
|
||||
# Make sure it is not on GKE
|
||||
try:
|
||||
gcp_instance().__instance_metadata("machine-type")
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -6,7 +6,7 @@ ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=downloads.scylladb.com/unstable/scylla/branch-4.5/rpm/centos/latest/
|
||||
ARG VERSION=4.5.2
|
||||
ARG VERSION=4.5.7
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -4,3 +4,4 @@ stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
stopwaitsecs=900
|
||||
|
||||
5
dist/docker/redhat/scyllasetup.py
vendored
5
dist/docker/redhat/scyllasetup.py
vendored
@@ -121,12 +121,13 @@ class ScyllaSetup:
|
||||
if self._apiAddress is not None:
|
||||
args += ["--api-address %s" % self._apiAddress]
|
||||
|
||||
if self._alternatorPort is not None:
|
||||
if self._alternatorAddress is not None:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
|
||||
if self._alternatorPort is not None:
|
||||
args += ["--alternator-port %s" % self._alternatorPort]
|
||||
|
||||
if self._alternatorHttpsPort is not None:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-https-port %s" % self._alternatorHttpsPort]
|
||||
|
||||
if self._alternatorWriteIsolation is not None:
|
||||
|
||||
@@ -1448,7 +1448,7 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
logger.trace("marking as alive {}", addr);
|
||||
|
||||
// Do not mark a node with status shutdown as UP.
|
||||
auto status = get_gossip_status(local_state);
|
||||
auto status = sstring(get_gossip_status(local_state));
|
||||
if (status == sstring(versioned_value::SHUTDOWN)) {
|
||||
logger.warn("Skip marking node {} with status = {} as UP", addr, status);
|
||||
return;
|
||||
@@ -1467,6 +1467,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Make a copy for endpoint_state because the code below can yield
|
||||
endpoint_state state = local_state;
|
||||
_live_endpoints.push_back(addr);
|
||||
if (_endpoints_to_talk_with.empty()) {
|
||||
_endpoints_to_talk_with.push_back({addr});
|
||||
@@ -1478,8 +1480,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
logger.info("InetAddress {} is now UP, status = {}", addr, status);
|
||||
}
|
||||
|
||||
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_alive(addr, local_state);
|
||||
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_alive(addr, state);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
@@ -1488,11 +1490,12 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
logger.trace("marking as down {}", addr);
|
||||
local_state.mark_dead();
|
||||
endpoint_state state = local_state;
|
||||
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr)));
|
||||
_unreachable_endpoints[addr] = now();
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(local_state));
|
||||
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_dead(addr, local_state);
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
|
||||
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_dead(addr, state);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
|
||||
33
main.cc
33
main.cc
@@ -388,11 +388,38 @@ static auto defer_verbose_shutdown(const char* what, Func&& func) {
|
||||
startlog.info("Shutting down {}", what);
|
||||
try {
|
||||
func();
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
} catch (...) {
|
||||
startlog.error("Unexpected error shutting down {}: {}", what, std::current_exception());
|
||||
throw;
|
||||
auto ex = std::current_exception();
|
||||
bool do_abort = true;
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (const std::system_error& e) {
|
||||
// System error codes we consider "environmental",
|
||||
// i.e. not scylla's fault, therefore there is no point in
|
||||
// aborting and dumping core.
|
||||
for (int i : {EIO, EACCES, ENOSPC}) {
|
||||
if (e.code() == std::error_code(i, std::system_category())) {
|
||||
do_abort = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
}
|
||||
auto msg = fmt::format("Unexpected error shutting down {}: {}", what, ex);
|
||||
if (do_abort) {
|
||||
startlog.error("{}: aborting", msg);
|
||||
abort();
|
||||
} else {
|
||||
startlog.error("{}: exiting, at {}", msg, current_backtrace());
|
||||
|
||||
// Call _exit() rather than exit() to exit immediately
|
||||
// without calling exit handlers, avoiding
|
||||
// boost::intrusive::detail::destructor_impl assert failure
|
||||
// from ~segment_pool exit handler.
|
||||
_exit(255);
|
||||
}
|
||||
}
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
};
|
||||
|
||||
auto ret = deferred_action(std::move(vfunc));
|
||||
|
||||
@@ -1557,8 +1557,8 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
|
||||
private:
|
||||
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
|
||||
const unsigned _shard;
|
||||
const dht::partition_range* _pr;
|
||||
const query::partition_slice& _ps;
|
||||
dht::partition_range _pr;
|
||||
query::partition_slice _ps;
|
||||
const io_priority_class& _pc;
|
||||
tracing::global_trace_state_ptr _trace_state;
|
||||
const mutation_reader::forwarding _fwd_mr;
|
||||
@@ -1583,7 +1583,7 @@ public:
|
||||
: impl(std::move(schema), std::move(permit))
|
||||
, _lifecycle_policy(std::move(lifecycle_policy))
|
||||
, _shard(shard)
|
||||
, _pr(&pr)
|
||||
, _pr(pr)
|
||||
, _ps(ps)
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
@@ -1667,7 +1667,7 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
});
|
||||
auto s = gs.get();
|
||||
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
|
||||
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), _pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
|
||||
auto f = rreader->fill_buffer(timeout);
|
||||
return f.then([rreader = std::move(rreader)] () mutable {
|
||||
@@ -1721,7 +1721,7 @@ future<> shard_reader::next_partition() {
|
||||
}
|
||||
|
||||
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
_pr = ≺
|
||||
_pr = pr;
|
||||
|
||||
if (!_reader && !_read_ahead) {
|
||||
// No need to fast-forward uncreated readers, they will be passed the new
|
||||
@@ -1730,12 +1730,12 @@ future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeo
|
||||
}
|
||||
|
||||
auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>();
|
||||
return f.then([this, &pr, timeout] {
|
||||
return f.then([this, timeout] {
|
||||
_end_of_stream = false;
|
||||
clear_buffer();
|
||||
|
||||
return smp::submit_to(_shard, [this, &pr, timeout] {
|
||||
return _reader->fast_forward_to(pr, timeout);
|
||||
return smp::submit_to(_shard, [this, timeout] {
|
||||
return _reader->fast_forward_to(_pr, timeout);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -57,6 +57,8 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
|
||||
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
|
||||
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
|
||||
});
|
||||
}).then([&wr] {
|
||||
wr.consume_end_of_stream();
|
||||
}).then_wrapped([&wr] (future<> f) {
|
||||
if (f.failed()) {
|
||||
auto ex = f.get_exception();
|
||||
@@ -70,7 +72,6 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
|
||||
return make_exception_future<>(std::move(ex));
|
||||
});
|
||||
} else {
|
||||
wr.consume_end_of_stream();
|
||||
return wr.close();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -417,11 +417,11 @@ public:
|
||||
} else {
|
||||
// Copy row from older version because rows in evictable versions must
|
||||
// hold values which are independently complete to be consistent on eviction.
|
||||
auto e = current_allocator().construct<rows_entry>(_schema, *_current_row[0].it);
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, *_current_row[0].it));
|
||||
e->set_continuous(latest_i != rows.end() && latest_i->continuous());
|
||||
_snp.tracker()->insert(*e);
|
||||
rows.insert_before(latest_i, *e);
|
||||
return {*e, true};
|
||||
auto e_i = rows.insert_before(latest_i, std::move(e));
|
||||
return ensure_result{*e_i, true};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -453,11 +453,11 @@ public:
|
||||
}
|
||||
auto&& rows = _snp.version()->partition().clustered_rows();
|
||||
auto latest_i = get_iterator_in_latest_version();
|
||||
auto e = current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
|
||||
is_continuous(latest_i != rows.end() && latest_i->continuous()));
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
|
||||
is_continuous(latest_i != rows.end() && latest_i->continuous())));
|
||||
_snp.tracker()->insert(*e);
|
||||
rows.insert_before(latest_i, *e);
|
||||
return ensure_result{*e, true};
|
||||
auto e_i = rows.insert_before(latest_i, std::move(e));
|
||||
return ensure_result{*e_i, true};
|
||||
}
|
||||
|
||||
// Brings the entry pointed to by the cursor to the front of the LRU
|
||||
|
||||
@@ -267,9 +267,14 @@ public:
|
||||
return _current_tombstone;
|
||||
}
|
||||
|
||||
const std::deque<range_tombstone>& range_tombstones_for_row(const clustering_key_prefix& ck) {
|
||||
std::vector<range_tombstone> range_tombstones_for_row(const clustering_key_prefix& ck) {
|
||||
drop_unneeded_tombstones(ck);
|
||||
return _range_tombstones;
|
||||
std::vector<range_tombstone> result(_range_tombstones.begin(), _range_tombstones.end());
|
||||
auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) {
|
||||
return _cmp(rt1.start_bound(), rt2.start_bound());
|
||||
};
|
||||
std::sort(result.begin(), result.end(), cmp);
|
||||
return result;
|
||||
}
|
||||
|
||||
std::deque<range_tombstone> range_tombstones() && {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 70ea9312a1...fd0d7c1c9a
@@ -89,7 +89,7 @@ template<typename Input>
|
||||
size_type read_frame_size(Input& in) {
|
||||
auto sz = deserialize(in, boost::type<size_type>());
|
||||
if (sz < sizeof(size_type)) {
|
||||
throw std::runtime_error("Truncated frame");
|
||||
throw std::runtime_error(fmt::format("IDL frame truncated: expected to have at least {} bytes, got {}", sizeof(size_type), sz));
|
||||
}
|
||||
return sz - sizeof(size_type);
|
||||
}
|
||||
|
||||
@@ -2623,7 +2623,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
auto& table = _db.local().find_column_family(table_id);
|
||||
auto s = table.schema();
|
||||
const auto cf_id = s->id();
|
||||
const auto reason = streaming::stream_reason::rebuild;
|
||||
const auto reason = streaming::stream_reason::repair;
|
||||
auto& rs = _db.local().find_keyspace(ks_name).get_replication_strategy();
|
||||
|
||||
size_t nr_sst_total = sstables.size();
|
||||
@@ -3348,7 +3348,7 @@ shared_ptr<node_ops_info> node_ops_meta_data::get_ops_info() {
|
||||
|
||||
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3358,7 +3358,7 @@ void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
|
||||
|
||||
void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3369,7 +3369,7 @@ void storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
|
||||
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
|
||||
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
|
||||
@@ -124,7 +124,7 @@ void sstable_writer_k_l::maybe_flush_pi_block(file_writer& out,
|
||||
// block includes them), but we set block_next_start_offset after - so
|
||||
// even if we wrote a lot of open tombstones, we still get a full
|
||||
// block size of new data.
|
||||
auto& rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
|
||||
auto rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
|
||||
clustering_key_prefix::from_range(clustering_key.values()));
|
||||
for (const auto& rt : rts) {
|
||||
auto start = composite::from_clustering_element(*_pi_write.schemap, rt.start);
|
||||
|
||||
@@ -78,7 +78,11 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(colu
|
||||
}
|
||||
|
||||
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
|
||||
if (removed.empty() || added.empty()) {
|
||||
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
|
||||
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
|
||||
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
|
||||
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
|
||||
if (removed.empty() || added.empty() || !_last_compacted_keys) {
|
||||
return;
|
||||
}
|
||||
auto min_level = std::numeric_limits<uint32_t>::max();
|
||||
|
||||
3
table.cc
3
table.cc
@@ -1473,12 +1473,13 @@ bool table::can_flush() const {
|
||||
}
|
||||
|
||||
future<> table::clear() {
|
||||
auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits();
|
||||
if (_commitlog) {
|
||||
_commitlog->discard_completed_segments(_schema->id());
|
||||
}
|
||||
_memtables->clear();
|
||||
_memtables->add_memtable();
|
||||
return _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
||||
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
||||
}
|
||||
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
|
||||
@@ -85,3 +85,20 @@ def test_signature_too_futuristic(dynamodb, test_table):
|
||||
response = requests.post(url, headers=headers, verify=False)
|
||||
assert not response.ok
|
||||
assert "InvalidSignatureException" in response.text and "Signature not yet current" in response.text
|
||||
|
||||
# A test that commas can be uses instead of whitespace to separate components
|
||||
# of the Authorization headers - reproducing issue #9568.
|
||||
def test_authorization_no_whitespace(dynamodb, test_table):
|
||||
# Unlike the above tests which checked error cases so didn't need to
|
||||
# calculate a real signature, in this test we really a correct signature,
|
||||
# so we use a function we already have in test_manual_requests.py.
|
||||
from test_manual_requests import get_signed_request
|
||||
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
|
||||
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||
# Boto3 separates the components of the Authorization header by spaces.
|
||||
# Let's remove all of them except the first one (which separates the
|
||||
# signature algorithm name from the rest) and check the result still works:
|
||||
a = req.headers['Authorization'].split()
|
||||
req.headers['Authorization'] = a[0] + ' ' + ''.join(a[1:])
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert response.ok
|
||||
|
||||
@@ -186,3 +186,25 @@ def test_incorrect_numbers(dynamodb, test_table):
|
||||
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert "ValidationException" in response.text and "numeric" in response.text
|
||||
|
||||
# Although the DynamoDB API responses are JSON, additional conventions apply
|
||||
# to these responses - such as how error codes are encoded in JSON. For this
|
||||
# reason, DynamoDB uses the content type 'application/x-amz-json-1.0' instead
|
||||
# of the standard 'application/json'. This test verifies that we return the
|
||||
# correct content type header.
|
||||
# While most DynamoDB libraries we tried do not care about an unexpected
|
||||
# content-type, it turns out that one (aiodynamo) does. Moreover, AWS already
|
||||
# defined x-amz-json-1.1 - see
|
||||
# https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
|
||||
# which differs (only) in how it encodes error replies.
|
||||
# So in the future it may become even more important that Scylla return the
|
||||
# correct content type.
|
||||
def test_content_type(dynamodb, test_table):
|
||||
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
|
||||
# Note that get_signed_request() uses x-amz-json-1.0 to encode the
|
||||
# *request*. In the future this may or may not effect the content type
|
||||
# in the response (today, DynamoDB doesn't allow any other content type
|
||||
# in the request anyway).
|
||||
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert response.headers['Content-Type'] == 'application/x-amz-json-1.0'
|
||||
|
||||
113
test/alternator/test_metrics.py
Normal file
113
test/alternator/test_metrics.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# Copyright 2021-present ScyllaDB
|
||||
#
|
||||
# This file is part of Scylla.
|
||||
#
|
||||
# Scylla is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Scylla is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
##############################################################################
|
||||
# Tests for Scylla's metrics (see docs/design-notes/metrics.md) for Alternator
|
||||
# queries. Reproduces issue #9406, where although metrics was implemented for
|
||||
# Alternator requests, they were missing for some operations (BatchGetItem).
|
||||
# In the tests here we attempt to ensure that the metrics continue to work
|
||||
# for the relevant operations as the code evolves.
|
||||
#
|
||||
# Note that all tests in this file test Scylla-specific features, and are
|
||||
# "skipped" when not running against Scylla, or when unable to retrieve
|
||||
# metrics through out-of-band HTTP requests to Scylla's Prometheus port (9180).
|
||||
#
|
||||
# IMPORTANT: we do not want these tests to assume that are not running in
|
||||
# parallel with any other tests or workload - because such an assumption
|
||||
# would limit our test deployment options in the future. NOT making this
|
||||
# assumption means that these tests can't check that a certain operation
|
||||
# increases a certain counter by exactly 1 - because other concurrent
|
||||
# operations might increase it further! So our test can only check that the
|
||||
# counter increases.
|
||||
##############################################################################
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
import re
|
||||
|
||||
from util import random_string
|
||||
|
||||
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
|
||||
# are not available on AWS (of course), but may also not be available for
|
||||
# Scylla if for some reason we have only access to the Alternator protocol
|
||||
# port but no access to the metrics port (9180).
|
||||
# If metrics are *not* available, tests using this fixture will be skipped.
|
||||
# Tests using this fixture may call get_metrics(metrics).
|
||||
@pytest.fixture(scope="module")
|
||||
def metrics(dynamodb):
|
||||
if dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com'):
|
||||
pytest.skip('Scylla-only feature not supported by AWS')
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
# The Prometheus API is on port 9180, and always http, not https.
|
||||
url = re.sub(r':[0-9]+(/|$)', ':9180', url)
|
||||
url = re.sub(r'^https:', 'http:', url)
|
||||
url = url + '/metrics'
|
||||
resp = requests.get(url)
|
||||
if resp.status_code != 200:
|
||||
pytest.skip('Metrics port 9180 is not available')
|
||||
yield url
|
||||
|
||||
# Utility function for fetching all metrics from Scylla, using an HTTP request
|
||||
# to port 9180. The response format is defined by the Prometheus protocol.
|
||||
# Only use get_metrics() in a test using the metrics_available fixture.
|
||||
def get_metrics(metrics):
|
||||
response = requests.get(metrics)
|
||||
assert response.status_code == 200
|
||||
return response.text
|
||||
|
||||
# Utility function for fetching a metric with a given name and optionally a
|
||||
# given sub-metric label (which should be a name-value map). If multiple
|
||||
# matches are found, they are summed - this is useful for summing up the
|
||||
# counts from multiple shards.
|
||||
def get_metric(metrics, name, requested_labels=None):
|
||||
total = 0.0
|
||||
lines = re.compile('^'+name+'{.*$', re.MULTILINE)
|
||||
for match in re.findall(lines, get_metrics(metrics)):
|
||||
a = match.split()
|
||||
metric = a[0]
|
||||
val = float(a[1])
|
||||
# Check if match also matches the requested labels
|
||||
if requested_labels:
|
||||
# we know metric begins with name{ and ends with } - the labels
|
||||
# are what we have between those
|
||||
got_labels = metric[len(name)+1:-1].split(',')
|
||||
# Check that every one of the requested labels is in got_labels:
|
||||
for k, v in requested_labels.items():
|
||||
if not f'{k}="{v}"' in got_labels:
|
||||
# No match for requested label, skip this metric (python
|
||||
# doesn't have "continue 2" so let's just set val to 0...
|
||||
val = 0
|
||||
break
|
||||
total += float(val)
|
||||
return total
|
||||
|
||||
def test_batch_write_item(test_table_s, metrics):
|
||||
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
|
||||
test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
|
||||
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
|
||||
assert n2 > n1
|
||||
|
||||
# Reproduces issue #9406:
|
||||
def test_batch_get_item(test_table_s, metrics):
|
||||
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
|
||||
assert n2 > n1
|
||||
|
||||
# TODO: check the rest of the operations
|
||||
@@ -1014,6 +1014,20 @@ def test_nested_attribute_remove_from_missing_item(test_table_s):
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x[0]')
|
||||
|
||||
# Though in an above test (test_nested_attribute_update_bad_path_dot) we
|
||||
# showed that DynamoDB does not allow REMOVE x.y if attribute x doesn't
|
||||
# exist - and generates a ValidationException, if x *does* exist but y
|
||||
# doesn't, it's fine and the removal should just be silently ignored.
|
||||
def test_nested_attribute_remove_missing_leaf(test_table_s):
|
||||
p = random_string()
|
||||
item = {'p': p, 'a': {'x': 3}, 'b': ['hi']}
|
||||
test_table_s.put_item(Item=item)
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE a.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE b[7]')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE c')
|
||||
# The above UpdateItem calls didn't change anything...
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == item
|
||||
|
||||
# Similarly for other types of bad paths - using [0] on something which
|
||||
# doesn't exist or isn't an array.
|
||||
def test_nested_attribute_update_bad_path_array(test_table_s):
|
||||
|
||||
@@ -191,3 +191,32 @@ BOOST_AUTO_TEST_CASE(tests_reserve_partial) {
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), orig_size);
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
|
||||
// the last reserved chunk.
|
||||
BOOST_AUTO_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
|
||||
using vector_type = utils::chunked_vector<std::unique_ptr<uint64_t>>;
|
||||
vector_type v;
|
||||
|
||||
// Fill two chunks
|
||||
v.reserve(vector_type::max_chunk_capacity() * 3 / 2);
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 3 / 2; ++i) {
|
||||
v.emplace_back(std::make_unique<uint64_t>(i));
|
||||
}
|
||||
|
||||
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
|
||||
v.shrink_to_fit();
|
||||
|
||||
// Leave the last chunk reserved but empty
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity(); ++i) {
|
||||
v.pop_back();
|
||||
}
|
||||
|
||||
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
|
||||
// with _size not in the last chunk. Should not sigsegv.
|
||||
v.reserve(vector_type::max_chunk_capacity() * 4);
|
||||
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 2; ++i) {
|
||||
v.emplace_back(std::make_unique<uint64_t>(i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,3 +618,38 @@ SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) {
|
||||
}
|
||||
}, std::move(cfg)).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
||||
auto& db = e.local_db();
|
||||
const auto ts = db_clock::now();
|
||||
auto& tbl = db.find_column_family("ks", "cf");
|
||||
|
||||
auto op = std::optional(tbl.read_in_progress());
|
||||
auto s = tbl.schema();
|
||||
auto q = query::data_querier(
|
||||
tbl.as_mutation_source(),
|
||||
tbl.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
default_priority_class(),
|
||||
nullptr);
|
||||
|
||||
auto f = e.db().invoke_on_all([ts] (database& db) {
|
||||
return db.drop_column_family("ks", "cf", [ts] { return make_ready_future<db_clock::time_point>(ts); });
|
||||
});
|
||||
|
||||
// we add a querier to the querier cache while the drop is ongoing
|
||||
auto& qc = db.get_querier_cache();
|
||||
qc.insert(utils::make_random_uuid(), std::move(q), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 1);
|
||||
|
||||
op.reset(); // this should allow the drop to finish
|
||||
f.get();
|
||||
|
||||
// the drop should have cleaned up all entries belonging to that table
|
||||
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
@@ -48,3 +50,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -197,9 +197,9 @@ SEASTAR_TEST_CASE(test_loading_shared_values_parallel_loading_explicit_eviction)
|
||||
}).get();
|
||||
|
||||
int rand_key = rand_int(num_loaders);
|
||||
BOOST_REQUIRE(shared_values.find(rand_key) != shared_values.end());
|
||||
BOOST_REQUIRE(shared_values.find(rand_key) != nullptr);
|
||||
anchors_vec[rand_key] = nullptr;
|
||||
BOOST_REQUIRE_MESSAGE(shared_values.find(rand_key) == shared_values.end(), format("explicit removal for key {} failed", rand_key));
|
||||
BOOST_REQUIRE_MESSAGE(shared_values.find(rand_key) == nullptr, format("explicit removal for key {} failed", rand_key));
|
||||
anchors_vec.clear();
|
||||
});
|
||||
}
|
||||
@@ -236,29 +236,10 @@ SEASTAR_THREAD_TEST_CASE(test_loading_cache_removing_key) {
|
||||
|
||||
loading_cache.get_ptr(0, loader).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(load_count, 1);
|
||||
BOOST_REQUIRE(loading_cache.find(0) != loading_cache.end());
|
||||
BOOST_REQUIRE(loading_cache.find(0) != nullptr);
|
||||
|
||||
loading_cache.remove(0);
|
||||
BOOST_REQUIRE(loading_cache.find(0) == loading_cache.end());
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_loading_cache_removing_iterator) {
|
||||
using namespace std::chrono;
|
||||
load_count = 0;
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
prepare().get();
|
||||
|
||||
loading_cache.get_ptr(0, loader).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(load_count, 1);
|
||||
|
||||
auto it = loading_cache.find(0);
|
||||
|
||||
BOOST_REQUIRE(it != loading_cache.end());
|
||||
|
||||
loading_cache.remove(it);
|
||||
BOOST_REQUIRE(loading_cache.find(0) == loading_cache.end());
|
||||
BOOST_REQUIRE(loading_cache.find(0) == nullptr);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_loading_different_keys) {
|
||||
@@ -292,10 +273,69 @@ SEASTAR_TEST_CASE(test_loading_cache_loading_expiry_eviction) {
|
||||
|
||||
loading_cache.get_ptr(0, loader).discard_result().get();
|
||||
|
||||
BOOST_REQUIRE(loading_cache.find(0) != loading_cache.end());
|
||||
BOOST_REQUIRE(loading_cache.find(0) != nullptr);
|
||||
|
||||
sleep(20ms).get();
|
||||
REQUIRE_EVENTUALLY_EQUAL(loading_cache.find(0), loading_cache.end());
|
||||
REQUIRE_EVENTUALLY_EQUAL(loading_cache.find(0), nullptr);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_loading_expiry_reset_on_sync_op) {
|
||||
return seastar::async([] {
|
||||
using namespace std::chrono;
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 30ms, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
prepare().get();
|
||||
|
||||
loading_cache.get_ptr(0, loader).discard_result().get();
|
||||
auto vp = loading_cache.find(0);
|
||||
auto load_time = steady_clock::now();
|
||||
|
||||
// Check that the expiration timer is reset every time we call a find()
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
// seastar::lowres_clock has 10ms resolution. This means that we should use 10ms threshold to compensate.
|
||||
if (steady_clock::now() <= load_time + 20ms) {
|
||||
BOOST_REQUIRE(vp != nullptr);
|
||||
} else {
|
||||
// If there was a delay and we weren't able to execute the next loop iteration during 20ms let's repopulate
|
||||
// the cache.
|
||||
loading_cache.get_ptr(0, loader).discard_result().get();
|
||||
BOOST_TEST_MESSAGE("Test " << i << " was skipped. Repopulating...");
|
||||
}
|
||||
vp = loading_cache.find(0);
|
||||
load_time = steady_clock::now();
|
||||
sleep(10ms).get();
|
||||
}
|
||||
|
||||
sleep(30ms).get();
|
||||
REQUIRE_EVENTUALLY_EQUAL(loading_cache.size(), 0);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_move_item_to_mru_list_front_on_sync_op) {
|
||||
return seastar::async([] {
|
||||
using namespace std::chrono;
|
||||
utils::loading_cache<int, sstring> loading_cache(2, 1h, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
prepare().get();
|
||||
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
loading_cache.get_ptr(i, loader).discard_result().get();
|
||||
}
|
||||
|
||||
auto vp0 = loading_cache.find(0);
|
||||
BOOST_REQUIRE(vp0 != nullptr);
|
||||
|
||||
loading_cache.get_ptr(2, loader).discard_result().get();
|
||||
|
||||
// "0" should be at the beginning of the list and "1" right after it before we try to add a new entry to the
|
||||
// cache ("2"). And hence "1" should get evicted.
|
||||
vp0 = loading_cache.find(0);
|
||||
auto vp1 = loading_cache.find(1);
|
||||
BOOST_REQUIRE(vp0 != nullptr);
|
||||
BOOST_REQUIRE(vp1 == nullptr);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -351,3 +391,87 @@ SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_loading_cache_remove_leaves_no_old_entries_behind) {
|
||||
using namespace std::chrono;
|
||||
load_count = 0;
|
||||
|
||||
auto load_v1 = [] (auto key) { return make_ready_future<sstring>("v1"); };
|
||||
auto load_v2 = [] (auto key) { return make_ready_future<sstring>("v2"); };
|
||||
auto load_v3 = [] (auto key) { return make_ready_future<sstring>("v3"); };
|
||||
|
||||
{
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
//
|
||||
// Test remove() concurrent with loading
|
||||
//
|
||||
|
||||
auto f = loading_cache.get_ptr(0, [&](auto key) {
|
||||
return later().then([&] {
|
||||
return load_v1(key);
|
||||
});
|
||||
});
|
||||
|
||||
loading_cache.remove(0);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
auto ptr1 = f.get0();
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
|
||||
loading_cache.remove(0);
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
|
||||
|
||||
//
|
||||
// Test that live ptr1, removed from cache, does not prevent reload of new value
|
||||
//
|
||||
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
|
||||
ptr1 = nullptr;
|
||||
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
|
||||
}
|
||||
|
||||
// Test remove_if()
|
||||
{
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
//
|
||||
// Test remove_if() concurrent with loading
|
||||
//
|
||||
auto f = loading_cache.get_ptr(0, [&](auto key) {
|
||||
return later().then([&] {
|
||||
return load_v1(key);
|
||||
});
|
||||
});
|
||||
|
||||
loading_cache.remove_if([] (auto&& v) { return v == "v1"; });
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
auto ptr1 = f.get0();
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
|
||||
loading_cache.remove_if([] (auto&& v) { return v == "v2"; });
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
|
||||
|
||||
//
|
||||
// Test that live ptr1, removed from cache, does not prevent reload of new value
|
||||
//
|
||||
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
|
||||
ptr1 = nullptr;
|
||||
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
|
||||
ptr2 = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -687,6 +687,7 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
};
|
||||
|
||||
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
|
||||
testlog.trace("Expected {} == {}", c1, c2);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
|
||||
};
|
||||
@@ -708,9 +709,11 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2));
|
||||
|
||||
// Origin doesn't compare ttl (is it wise?)
|
||||
assert_equal(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2));
|
||||
// But we do. See https://github.com/scylladb/scylla/issues/10156
|
||||
// and https://github.com/scylladb/scylla/issues/10173
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
|
||||
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value1")),
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "test/lib/mutation_source_test.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "test/lib/mutation_assertions.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
|
||||
using namespace sstables;
|
||||
using namespace std::chrono_literals;
|
||||
@@ -62,3 +64,69 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Regression test for scylladb/scylla-enterprise#2016
|
||||
SEASTAR_THREAD_TEST_CASE(test_produces_range_tombstone) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", int32_type, column_kind::regular_column)
|
||||
.build();
|
||||
|
||||
mutation m(s, partition_key::from_single_value(*s, int32_type->decompose(0)));
|
||||
m.partition().apply_row_tombstone(*s, range_tombstone{
|
||||
clustering_key::from_exploded(*s, {int32_type->decompose(6)}), bound_kind::excl_start,
|
||||
clustering_key::from_exploded(*s, {int32_type->decompose(10)}), bound_kind::incl_end,
|
||||
tombstone(0, gc_clock::time_point())
|
||||
});
|
||||
|
||||
{
|
||||
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(6)});
|
||||
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
|
||||
row.marker() = row_marker(4);
|
||||
}
|
||||
{
|
||||
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(8)});
|
||||
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
|
||||
row.apply(tombstone(2, gc_clock::time_point()));
|
||||
row.marker() = row_marker(5);
|
||||
}
|
||||
|
||||
testlog.info("m: {}", m);
|
||||
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make(
|
||||
{clustering_key::from_exploded(*s, {int32_type->decompose(8)}), false},
|
||||
{clustering_key::from_exploded(*s, {int32_type->decompose(10)}), true}
|
||||
))
|
||||
.build();
|
||||
|
||||
auto pr = dht::partition_range::make_singular(m.decorated_key());
|
||||
|
||||
std::vector<tmpdir> dirs;
|
||||
dirs.emplace_back();
|
||||
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
auto version = sstable_version_types::la;
|
||||
auto index_block_size = 1;
|
||||
sstable_writer_config cfg = env.manager().configure_writer();
|
||||
cfg.promoted_index_block_size = index_block_size;
|
||||
|
||||
auto source = make_sstable_mutation_source(env, s, dirs.back().path().string(), {m}, cfg, version, gc_clock::now());
|
||||
|
||||
{
|
||||
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
|
||||
while (auto mf = rd(db::no_timeout).get0()) {
|
||||
testlog.info("produced {}", mutation_fragment::printer(*s, *mf));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
|
||||
mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
|
||||
BOOST_REQUIRE(bool(sliced_m));
|
||||
|
||||
assert_that(*sliced_m).is_equal_to(m, slice.row_ranges(*m.schema(), m.key()));
|
||||
}
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -86,3 +86,16 @@ def test_filtering_contiguous_nonmatching_single_partition(cql, test_keyspace):
|
||||
# Scylla won't count its size as part of the 1MB limit, and will not
|
||||
# return empty pages - the first page will contain the result.
|
||||
assert list(cql.execute(f"SELECT c, s FROM {table} WHERE p=1 AND v={count-1} ALLOW FILTERING")) == [(count-1, long)]
|
||||
|
||||
# Test that the fact that a column is indexed does not cause us to fetch
|
||||
# incorrect results from a filtering query (issue #10300).
|
||||
def test_index_with_in_relation(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int, c int, v boolean, primary key (p,c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"create index on {table}(v)")
|
||||
for p, c, v in [(0,0,True),(0,1,False),(0,2,True),(0,3,False),
|
||||
(1,0,True),(1,1,False),(1,2,True),(1,3,False),
|
||||
(2,0,True),(2,1,False),(2,2,True),(2,3,False)]:
|
||||
cql.execute(f"insert into {table} (p,c,v) values ({p}, {c}, {v})")
|
||||
res = cql.execute(f"select * from {table} where p in (0,1) and v = False ALLOW FILTERING")
|
||||
assert set(res) == set([(0,1,False),(0,3,False),(1,1,False), (1,3,False)])
|
||||
|
||||
@@ -64,8 +64,9 @@ def test_insert_null_key(cql, table1):
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
cql.execute(stmt, [None, s])
|
||||
|
||||
# Tests handling of "key_column in ?" where ? is bound to null.
|
||||
# Reproduces issue #8265.
|
||||
def test_primary_key_in_null(cql, table1):
|
||||
'''Tests handling of "key_column in ?" where ? is bound to null.'''
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
cql.execute(cql.prepare(f"SELECT p FROM {table1} WHERE p IN ?"), [None])
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
@@ -80,3 +81,98 @@ def test_regular_column_in_null(scylla_only, cql, table1):
|
||||
cql.execute(f"INSERT INTO {table1} (p,c) VALUES ('p', 'c')")
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
cql.execute(cql.prepare(f"SELECT v FROM {table1} WHERE v IN ? ALLOW FILTERING"), [None])
|
||||
|
||||
# Though nonsensical, this operation is allowed by Cassandra. Ensure we allow it, too.
|
||||
def test_delete_impossible_clustering_range(cql, table1):
|
||||
cql.execute(f"DELETE FROM {table1} WHERE p='p' and c<'a' and c>'a'")
|
||||
|
||||
@pytest.mark.xfail(reason="issue #7852")
|
||||
def test_delete_null_key(cql, table1):
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
cql.execute(f"DELETE FROM {table1} WHERE p=null")
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
cql.execute(cql.prepare(f"DELETE FROM {table1} WHERE p=?"), [None])
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
cql.execute(f"DELETE FROM {table1} WHERE p='p' AND c=null")
|
||||
with pytest.raises(InvalidRequest, match='null value'):
|
||||
cql.execute(cql.prepare(f"DELETE FROM {table1} WHERE p='p' AND c=?"), [None])
|
||||
|
||||
# Test what SELECT does with the restriction "WHERE v=NULL".
|
||||
# In SQL, "WHERE v=NULL" doesn't match anything - because nothing is equal
|
||||
# to null - not even null. SQL also provides a more useful restriction
|
||||
# "WHERE v IS NULL" which matches all rows where v is unset.
|
||||
# Scylla and Cassandra do *not* support the "IS NULL" syntax yet (they do
|
||||
# have "IS NOT NULL" but only in a definition of a materialized view),
|
||||
# so it is commonly requested that "WHERE v=NULL" should do what "IS NULL"
|
||||
# is supposed to do - see issues #4776 and #8489 for Scylla and
|
||||
# CASSANDRA-10715 for Cassandra, where this feature was requested.
|
||||
# Nevertheless, in Scylla we decided to follow SQL: "WHERE v=NULL" should
|
||||
# matche nothing, not even rows where v is unset. This is what the following
|
||||
# test verifies.
|
||||
# This test fails on Cassandra (hence cassandra_bug) because Cassandra
|
||||
# refuses the "WHERE v=NULL" relation, rather than matching nothing.
|
||||
# We consider this a mistake, and not something we want to emulate in Scylla.
|
||||
def test_filtering_eq_null(cassandra_bug, cql, table1):
|
||||
p = random_string()
|
||||
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('{p}', '1', 'hello')")
|
||||
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('{p}', '2', '')")
|
||||
cql.execute(f"INSERT INTO {table1} (p,c) VALUES ('{p}', '3')")
|
||||
# As explained above, none of the above-inserted rows should match -
|
||||
# not even the one with an unset v:
|
||||
assert list(cql.execute(f"SELECT c FROM {table1} WHERE p='{p}' AND v=NULL ALLOW FILTERING")) == []
|
||||
|
||||
# In test_insert_null_key() above we verified that a null value is not
|
||||
# allowed as a key column - neither as a partition key nor clustering key.
|
||||
# An *empty string*, in contrast, is NOT a null. So ideally should have been
|
||||
# allowed as a key. However, for undocumented reasons (having to do with how
|
||||
# partition keys are serialized in sstables), an empty string is NOT allowed
|
||||
# as a partition key. It is allowed as a clustering key, though. In the
|
||||
# following test we confirm those things.
|
||||
# See issue #9352.
|
||||
def test_insert_empty_string_key(cql, table1):
|
||||
s = random_string()
|
||||
# An empty-string clustering *is* allowed:
|
||||
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('{s}', '', 'cat')")
|
||||
assert list(cql.execute(f"SELECT v FROM {table1} WHERE p='{s}' AND c=''")) == [('cat',)]
|
||||
# But an empty-string partition key is *not* allowed, with a specific
|
||||
# error that a "Key may not be empty":
|
||||
with pytest.raises(InvalidRequest, match='Key may not be empty'):
|
||||
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('', '{s}', 'dog')")
|
||||
|
||||
# test_update_empty_string_key() is the same as test_insert_empty_string_key()
|
||||
# just uses an UPDATE instead of INSERT. It turns out that exactly the cases
|
||||
# which are allowed by INSERT are also allowed by UPDATE.
|
||||
def test_update_empty_string_key(cql, table1):
|
||||
s = random_string()
|
||||
# An empty-string clustering *is* allowed:
|
||||
cql.execute(f"UPDATE {table1} SET v = 'cat' WHERE p='{s}' AND c=''")
|
||||
assert list(cql.execute(f"SELECT v FROM {table1} WHERE p='{s}' AND c=''")) == [('cat',)]
|
||||
# But an empty-string partition key is *not* allowed, with a specific
|
||||
# error that a "Key may not be empty":
|
||||
with pytest.raises(InvalidRequest, match='Key may not be empty'):
|
||||
cql.execute(f"UPDATE {table1} SET v = 'dog' WHERE p='' AND c='{s}'")
|
||||
|
||||
# ... and same for DELETE
|
||||
def test_delete_empty_string_key(cql, table1):
|
||||
s = random_string()
|
||||
# An empty-string clustering *is* allowed:
|
||||
cql.execute(f"DELETE FROM {table1} WHERE p='{s}' AND c=''")
|
||||
# But an empty-string partition key is *not* allowed, with a specific
|
||||
# error that a "Key may not be empty":
|
||||
with pytest.raises(InvalidRequest, match='Key may not be empty'):
|
||||
cql.execute(f"DELETE FROM {table1} WHERE p='' AND c='{s}'")
|
||||
|
||||
# Another test like test_insert_empty_string_key() just using an INSERT JSON
|
||||
# instead of a regular INSERT. Because INSERT JSON takes a different code path
|
||||
# from regular INSERT, we need the emptiness test in yet another place.
|
||||
# Reproduces issue #9853 (the empty-string partition key was allowed, and
|
||||
# actually inserted into the table.)
|
||||
def test_insert_json_empty_string_key(cql, table1):
|
||||
s = random_string()
|
||||
# An empty-string clustering *is* allowed:
|
||||
cql.execute("""INSERT INTO %s JSON '{"p": "%s", "c": "", "v": "cat"}'""" % (table1, s))
|
||||
assert list(cql.execute(f"SELECT v FROM {table1} WHERE p='{s}' AND c=''")) == [('cat',)]
|
||||
# But an empty-string partition key is *not* allowed, with a specific
|
||||
# error that a "Key may not be empty":
|
||||
with pytest.raises(InvalidRequest, match='Key may not be empty'):
|
||||
cql.execute("""INSERT INTO %s JSON '{"p": "", "c": "%s", "v": "cat"}'""" % (table1, s))
|
||||
|
||||
Submodule tools/java updated: dbcea78e7d...42151ec974
@@ -548,7 +548,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
break;
|
||||
case auth_state::AUTHENTICATION:
|
||||
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
|
||||
assert(cqlop == cql_binary_opcode::AUTH_RESPONSE || cqlop == cql_binary_opcode::CREDENTIALS);
|
||||
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
|
||||
throw exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop)));
|
||||
}
|
||||
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
|
||||
client_state.set_auth_state(auth_state::READY);
|
||||
}
|
||||
@@ -793,7 +795,7 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
|
||||
if (flags & cql_frame_flags::compression) {
|
||||
if (_compression == cql_compression::lz4) {
|
||||
if (length < 4) {
|
||||
throw std::runtime_error("Truncated frame");
|
||||
throw std::runtime_error(fmt::format("CQL frame truncated: expected to have at least 4 bytes, got {}", length));
|
||||
}
|
||||
return _buffer_reader.read_exactly(_read_buf, length).then([this] (fragmented_temporary_buffer buf) {
|
||||
auto linearization_buffer = bytes_ostream();
|
||||
@@ -1294,7 +1296,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
||||
return make_read_timeout_error(stream, exceptions::exception_code::READ_TIMEOUT, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
||||
}
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -1322,7 +1324,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_writ
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state);
|
||||
return make_mutation_write_timeout_error(stream, exceptions::exception_code::WRITE_TIMEOUT, std::move(msg), cl, received, blockfor, type, tr_state);
|
||||
}
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
|
||||
@@ -52,10 +52,11 @@ class chunked_vector {
|
||||
utils::small_vector<chunk_ptr, 1> _chunks;
|
||||
size_t _size = 0;
|
||||
size_t _capacity = 0;
|
||||
private:
|
||||
public:
|
||||
static size_t max_chunk_capacity() {
|
||||
return std::max(max_contiguous_allocation / sizeof(T), size_t(1));
|
||||
}
|
||||
private:
|
||||
void reserve_for_push_back() {
|
||||
if (_size == _capacity) {
|
||||
do_reserve_for_push_back();
|
||||
@@ -387,7 +388,9 @@ chunked_vector<T, max_contiguous_allocation>::make_room(size_t n, bool stop_afte
|
||||
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
|
||||
// FIXME: realloc? maybe not worth the complication; only works for PODs
|
||||
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
|
||||
if (_size > _capacity - last_chunk_capacity) {
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
|
||||
}
|
||||
_chunks.back() = std::move(new_last_chunk);
|
||||
_capacity += capacity_increase;
|
||||
}
|
||||
|
||||
@@ -278,6 +278,13 @@ decltype(auto) with_simplified(const View& v, Function&& fn)
|
||||
}
|
||||
}
|
||||
|
||||
template<FragmentedView View>
|
||||
void skip_empty_fragments(View& v) {
|
||||
while (!v.empty() && v.current_fragment().empty()) {
|
||||
v.remove_current();
|
||||
}
|
||||
}
|
||||
|
||||
template<FragmentedView V1, FragmentedView V2>
|
||||
int compare_unsigned(V1 v1, V2 v2) {
|
||||
while (!v1.empty() && !v2.empty()) {
|
||||
@@ -287,6 +294,8 @@ int compare_unsigned(V1 v1, V2 v2) {
|
||||
}
|
||||
v1.remove_prefix(n);
|
||||
v2.remove_prefix(n);
|
||||
skip_empty_fragments(v1);
|
||||
skip_empty_fragments(v2);
|
||||
}
|
||||
return v1.size_bytes() - v2.size_bytes();
|
||||
}
|
||||
@@ -306,6 +315,8 @@ void write_fragmented(Dest& dest, Src src) {
|
||||
memcpy(dest.current_fragment().data(), src.current_fragment().data(), n);
|
||||
dest.remove_prefix(n);
|
||||
src.remove_prefix(n);
|
||||
skip_empty_fragments(dest);
|
||||
skip_empty_fragments(src);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -319,6 +330,8 @@ void copy_fragmented_view(Dest dest, Src src) {
|
||||
memcpy(dest.current_fragment().data(), src.current_fragment().data(), n);
|
||||
dest.remove_prefix(n);
|
||||
src.remove_prefix(n);
|
||||
skip_empty_fragments(dest);
|
||||
skip_empty_fragments(src);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -111,9 +111,10 @@ public:
|
||||
|
||||
private:
|
||||
void touch() noexcept {
|
||||
assert(_lru_entry_ptr);
|
||||
_last_read = loading_cache_clock_type::now();
|
||||
_lru_entry_ptr->touch();
|
||||
if (_lru_entry_ptr) {
|
||||
_lru_entry_ptr->touch();
|
||||
}
|
||||
}
|
||||
|
||||
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
|
||||
@@ -142,10 +143,21 @@ private:
|
||||
timestamped_val_ptr _ts_val_ptr;
|
||||
|
||||
public:
|
||||
value_ptr(timestamped_val_ptr ts_val_ptr) : _ts_val_ptr(std::move(ts_val_ptr)) { _ts_val_ptr->touch(); }
|
||||
value_ptr(timestamped_val_ptr ts_val_ptr) : _ts_val_ptr(std::move(ts_val_ptr)) {
|
||||
if (_ts_val_ptr) {
|
||||
_ts_val_ptr->touch();
|
||||
}
|
||||
}
|
||||
value_ptr(std::nullptr_t) noexcept : _ts_val_ptr() {}
|
||||
bool operator==(const value_ptr& x) const { return _ts_val_ptr == x._ts_val_ptr; }
|
||||
bool operator!=(const value_ptr& x) const { return !operator==(x); }
|
||||
explicit operator bool() const noexcept { return bool(_ts_val_ptr); }
|
||||
value_type& operator*() const noexcept { return _ts_val_ptr->value(); }
|
||||
value_type* operator->() const noexcept { return &_ts_val_ptr->value(); }
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const value_ptr& vp) {
|
||||
return os << vp._ts_val_ptr;
|
||||
}
|
||||
};
|
||||
|
||||
/// \brief This is and LRU list entry which is also an anchor for a loading_cache value.
|
||||
@@ -258,14 +270,8 @@ private:
|
||||
using loading_values_type = typename ts_value_type::loading_values_type;
|
||||
using timestamped_val_ptr = typename loading_values_type::entry_ptr;
|
||||
using ts_value_lru_entry = typename ts_value_type::lru_entry;
|
||||
using set_iterator = typename loading_values_type::iterator;
|
||||
using lru_list_type = typename ts_value_lru_entry::lru_list_type;
|
||||
using list_iterator = typename lru_list_type::iterator;
|
||||
struct value_extractor_fn {
|
||||
Tp& operator()(ts_value_lru_entry& le) const {
|
||||
return le.timestamped_value().value();
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
using value_type = Tp;
|
||||
@@ -273,7 +279,6 @@ public:
|
||||
using value_ptr = typename ts_value_type::value_ptr;
|
||||
|
||||
class entry_is_too_big : public std::exception {};
|
||||
using iterator = boost::transform_iterator<value_extractor_fn, list_iterator>;
|
||||
|
||||
private:
|
||||
loading_cache(size_t max_size, std::chrono::milliseconds expiry, std::chrono::milliseconds refresh, logging::logger& logger)
|
||||
@@ -338,7 +343,7 @@ public:
|
||||
});
|
||||
}).then([this, k] (timestamped_val_ptr ts_val_ptr) {
|
||||
// check again since it could have already been inserted and initialized
|
||||
if (!ts_val_ptr->ready()) {
|
||||
if (!ts_val_ptr->ready() && !ts_val_ptr.orphaned()) {
|
||||
_logger.trace("{}: storing the value for the first time", k);
|
||||
|
||||
if (ts_val_ptr->size() > _max_size) {
|
||||
@@ -383,23 +388,31 @@ public:
|
||||
return _timer_reads_gate.close().finally([this] { _timer.cancel(); });
|
||||
}
|
||||
|
||||
/// Find a value for a specific Key value and touch() it.
|
||||
/// \tparam KeyType Key type
|
||||
/// \tparam KeyHasher Hash functor type
|
||||
/// \tparam KeyEqual Equality functor type
|
||||
///
|
||||
/// \param key Key value to look for
|
||||
/// \param key_hasher_func Hash functor
|
||||
/// \param key_equal_func Equality functor
|
||||
/// \return cache_value_ptr object pointing to the found value or nullptr otherwise.
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
iterator find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
return boost::make_transform_iterator(to_list_iterator(set_find(key, std::move(key_hasher_func), std::move(key_equal_func))), _value_extractor_fn);
|
||||
value_ptr find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
// cache_value_ptr constructor is going to update a "last read" timestamp of the corresponding object and move
|
||||
// the object to the front of the LRU
|
||||
return set_find(key, std::move(key_hasher_func), std::move(key_equal_func));
|
||||
};
|
||||
|
||||
iterator find(const Key& k) noexcept {
|
||||
return boost::make_transform_iterator(to_list_iterator(set_find(k)), _value_extractor_fn);
|
||||
}
|
||||
|
||||
iterator end() {
|
||||
return boost::make_transform_iterator(list_end(), _value_extractor_fn);
|
||||
}
|
||||
|
||||
iterator begin() {
|
||||
return boost::make_transform_iterator(list_begin(), _value_extractor_fn);
|
||||
value_ptr find(const Key& k) noexcept {
|
||||
return set_find(k);
|
||||
}
|
||||
|
||||
// Removes all values matching a given predicate and values which are currently loading.
|
||||
// Guarantees that no values which match the predicate and whose loading was initiated
|
||||
// before this call will be present after this call (or appear at any time later).
|
||||
// The predicate may be invoked multiple times on the same value.
|
||||
// It must return the same result for a given value (it must be a pure function).
|
||||
template <typename Pred>
|
||||
void remove_if(Pred&& pred) {
|
||||
static_assert(std::is_same<bool, std::result_of_t<Pred(const value_type&)>>::value, "Bad Pred signature");
|
||||
@@ -409,24 +422,29 @@ public:
|
||||
}, [this] (ts_value_lru_entry* p) {
|
||||
loading_cache::destroy_ts_value(p);
|
||||
});
|
||||
_loading_values.remove_if([&pred] (const ts_value_type& v) {
|
||||
return pred(v.value());
|
||||
});
|
||||
}
|
||||
|
||||
// Removes a given key from the cache.
|
||||
// The key is removed immediately.
|
||||
// After this, get_ptr() is guaranteed to reload the value before returning it.
|
||||
// As a consequence of the above, if there is a concurrent get_ptr() in progress with this,
|
||||
// its value will not populate the cache. It will still succeed.
|
||||
void remove(const Key& k) {
|
||||
auto it = set_find(k);
|
||||
if (it == set_end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
_lru_list.erase_and_dispose(_lru_list.iterator_to(*it->lru_entry_ptr()), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
|
||||
remove_ts_value(set_find(k));
|
||||
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
|
||||
_loading_values.remove(k);
|
||||
}
|
||||
|
||||
void remove(iterator it) {
|
||||
if (it == end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const ts_value_type& val = ts_value_type::container_of(*it);
|
||||
_lru_list.erase_and_dispose(_lru_list.iterator_to(*val.lru_entry_ptr()), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
|
||||
// Removes a given key from the cache.
|
||||
// Same guarantees as with remove(key).
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
remove_ts_value(set_find(key, key_hasher_func, key_equal_func));
|
||||
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
|
||||
_loading_values.remove(key, key_hasher_func, key_equal_func);
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
@@ -439,49 +457,29 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
/// Should only be called on values for which the following holds: set_it == set_end() || set_it->ready()
|
||||
/// For instance this always holds for iterators returned by set_find(...).
|
||||
list_iterator to_list_iterator(set_iterator set_it) {
|
||||
if (set_it != set_end()) {
|
||||
return _lru_list.iterator_to(*set_it->lru_entry_ptr());
|
||||
void remove_ts_value(timestamped_val_ptr ts_ptr) {
|
||||
if (!ts_ptr) {
|
||||
return;
|
||||
}
|
||||
return list_end();
|
||||
_lru_list.erase_and_dispose(_lru_list.iterator_to(*ts_ptr->lru_entry_ptr()), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
|
||||
}
|
||||
|
||||
set_iterator ready_entry_iterator(set_iterator it) {
|
||||
set_iterator end_it = set_end();
|
||||
|
||||
if (it == end_it || !it->ready()) {
|
||||
return end_it;
|
||||
timestamped_val_ptr ready_entry_ptr(timestamped_val_ptr tv_ptr) {
|
||||
if (!tv_ptr || !tv_ptr->ready()) {
|
||||
return nullptr;
|
||||
}
|
||||
return it;
|
||||
return std::move(tv_ptr);
|
||||
}
|
||||
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
set_iterator set_find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
return ready_entry_iterator(_loading_values.find(key, std::move(key_hasher_func), std::move(key_equal_func)));
|
||||
timestamped_val_ptr set_find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
return ready_entry_ptr(_loading_values.find(key, std::move(key_hasher_func), std::move(key_equal_func)));
|
||||
}
|
||||
|
||||
// keep the default non-templated overloads to ease on the compiler for specifications
|
||||
// that do not require the templated find().
|
||||
set_iterator set_find(const Key& key) noexcept {
|
||||
return ready_entry_iterator(_loading_values.find(key));
|
||||
}
|
||||
|
||||
set_iterator set_end() noexcept {
|
||||
return _loading_values.end();
|
||||
}
|
||||
|
||||
set_iterator set_begin() noexcept {
|
||||
return _loading_values.begin();
|
||||
}
|
||||
|
||||
list_iterator list_end() noexcept {
|
||||
return _lru_list.end();
|
||||
}
|
||||
|
||||
list_iterator list_begin() noexcept {
|
||||
return _lru_list.begin();
|
||||
timestamped_val_ptr set_find(const Key& key) noexcept {
|
||||
return ready_entry_ptr(_loading_values.find(key));
|
||||
}
|
||||
|
||||
bool caching_enabled() const {
|
||||
@@ -613,7 +611,6 @@ private:
|
||||
std::function<future<Tp>(const Key&)> _load;
|
||||
timer<loading_cache_clock_type> _timer;
|
||||
seastar::gate _timer_reads_gate;
|
||||
value_extractor_fn _value_extractor_fn;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/bitops.hh>
|
||||
#include <boost/intrusive/unordered_set.hpp>
|
||||
#include <boost/iterator/transform_iterator.hpp>
|
||||
#include <boost/lambda/bind.hpp>
|
||||
#include "seastarx.hh"
|
||||
|
||||
@@ -98,6 +97,10 @@ private:
|
||||
_val.emplace(std::move(new_val));
|
||||
}
|
||||
|
||||
bool orphaned() const {
|
||||
return !is_linked();
|
||||
}
|
||||
|
||||
shared_promise<>& loaded() {
|
||||
return _loaded;
|
||||
}
|
||||
@@ -110,7 +113,9 @@ private:
|
||||
: _parent(parent), _key(std::move(k)) {}
|
||||
|
||||
~entry() {
|
||||
_parent._set.erase(_parent._set.iterator_to(*this));
|
||||
if (is_linked()) {
|
||||
_parent._set.erase(_parent._set.iterator_to(*this));
|
||||
}
|
||||
Stats::inc_evictions();
|
||||
}
|
||||
|
||||
@@ -137,16 +142,8 @@ private:
|
||||
using set_type = bi::unordered_set<entry, bi::power_2_buckets<true>, bi::compare_hash<true>>;
|
||||
using bi_set_bucket_traits = typename set_type::bucket_traits;
|
||||
using set_iterator = typename set_type::iterator;
|
||||
struct value_extractor_fn {
|
||||
value_type& operator()(entry& e) const {
|
||||
return e.value();
|
||||
}
|
||||
};
|
||||
enum class shrinking_is_allowed { no, yes };
|
||||
|
||||
public:
|
||||
using iterator = boost::transform_iterator<value_extractor_fn, set_iterator>;
|
||||
|
||||
public:
|
||||
// Pointer to entry value
|
||||
class entry_ptr {
|
||||
@@ -154,12 +151,15 @@ public:
|
||||
public:
|
||||
using element_type = value_type;
|
||||
entry_ptr() = default;
|
||||
entry_ptr(std::nullptr_t) noexcept : _e() {};
|
||||
explicit entry_ptr(lw_shared_ptr<entry> e) : _e(std::move(e)) {}
|
||||
entry_ptr& operator=(std::nullptr_t) noexcept {
|
||||
_e = nullptr;
|
||||
return *this;
|
||||
}
|
||||
explicit operator bool() const noexcept { return bool(_e); }
|
||||
bool operator==(const entry_ptr& x) const { return _e == x._e; }
|
||||
bool operator!=(const entry_ptr& x) const { return !operator==(x); }
|
||||
element_type& operator*() const noexcept { return _e->value(); }
|
||||
element_type* operator->() const noexcept { return &_e->value(); }
|
||||
|
||||
@@ -173,13 +173,27 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
// Returns the key this entry is associated with.
|
||||
// Valid if bool(*this).
|
||||
const key_type& key() const {
|
||||
return _e->key();
|
||||
}
|
||||
|
||||
// Returns true iff the entry is not linked in the set.
|
||||
// Call only when bool(*this).
|
||||
bool orphaned() const {
|
||||
return _e->orphaned();
|
||||
}
|
||||
|
||||
friend class loading_shared_values;
|
||||
friend std::ostream& operator<<(std::ostream& os, const entry_ptr& ep) {
|
||||
return os << ep._e.get();
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
std::vector<typename set_type::bucket_type> _buckets;
|
||||
set_type _set;
|
||||
value_extractor_fn _value_extractor_fn;
|
||||
|
||||
public:
|
||||
static const key_type& to_key(const entry_ptr& e_ptr) noexcept {
|
||||
@@ -274,26 +288,62 @@ public:
|
||||
return _set.size();
|
||||
}
|
||||
|
||||
iterator end() {
|
||||
return boost::make_transform_iterator(_set.end(), _value_extractor_fn);
|
||||
}
|
||||
|
||||
iterator begin() {
|
||||
return boost::make_transform_iterator(_set.begin(), _value_extractor_fn);
|
||||
}
|
||||
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
iterator find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
entry_ptr find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
|
||||
if (it == _set.end() || !it->ready()) {
|
||||
return end();
|
||||
return entry_ptr();
|
||||
}
|
||||
return boost::make_transform_iterator(it, _value_extractor_fn);
|
||||
return entry_ptr(it->shared_from_this());
|
||||
};
|
||||
|
||||
// Removes a given key from this container.
|
||||
// If a given key is currently loading, the loading will succeed and will return entry_ptr
|
||||
// to the caller, but the value will not be present in the container. It will be removed
|
||||
// when the last entry_ptr dies, as usual.
|
||||
//
|
||||
// Post-condition: !find(key)
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) {
|
||||
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
|
||||
if (it != _set.end()) {
|
||||
_set.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
// Removes a given key from this container.
|
||||
// If a given key is currently loading, the loading will succeed and will return entry_ptr
|
||||
// to the caller, but the value will not be present in the container. It will be removed
|
||||
// when the last entry_ptr dies, as usual.
|
||||
//
|
||||
// Post-condition: !find(key)
|
||||
template<typename KeyType>
|
||||
void remove(const KeyType& key) {
|
||||
remove(key, Hash(), EqualPred());
|
||||
}
|
||||
|
||||
// Removes all values which match a given predicate or are currently loading.
|
||||
// Guarantees that no values which match the predicate and whose loading was initiated
|
||||
// before this call will be present after this call (or appear at any time later).
|
||||
// Same effects as if remove(e.key()) was called on each matching entry.
|
||||
template<typename Pred>
|
||||
requires std::is_invocable_r_v<bool, Pred, const Tp&>
|
||||
void remove_if(const Pred& pred) {
|
||||
auto it = _set.begin();
|
||||
while (it != _set.end()) {
|
||||
if (!it->ready() || pred(it->value())) {
|
||||
auto next = std::next(it);
|
||||
_set.erase(it);
|
||||
it = next;
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// keep the default non-templated overloads to ease on the compiler for specifications
|
||||
// that do not require the templated find().
|
||||
iterator find(const key_type& key) noexcept {
|
||||
entry_ptr find(const key_type& key) noexcept {
|
||||
return find(key, Hash(), EqualPred());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user