Compare commits
49 Commits
next-5.2
...
scylla-5.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9da666e778 | ||
|
|
aca355dec1 | ||
|
|
efbb2efd3f | ||
|
|
44dc5c4a1d | ||
|
|
6b34ba3a4f | ||
|
|
f1e25cb4a6 | ||
|
|
c9798746ae | ||
|
|
7f70ffc5ce | ||
|
|
551636ec89 | ||
|
|
e1130a01e7 | ||
|
|
b0233cb7c5 | ||
|
|
e480c5bf4d | ||
|
|
7d90f7e93f | ||
|
|
3e6e8579c6 | ||
|
|
3e98e17d18 | ||
|
|
a214f8cf6e | ||
|
|
e8b92fe34d | ||
|
|
fa479c84ac | ||
|
|
40c26dd2c5 | ||
|
|
2c6f069fd1 | ||
|
|
e27dff0c50 | ||
|
|
3f03260ffb | ||
|
|
1315135fca | ||
|
|
f92622e0de | ||
|
|
3bca608db5 | ||
|
|
a93b72d5dd | ||
|
|
d58ca2edbd | ||
|
|
75740ace2a | ||
|
|
d7a1bf6331 | ||
|
|
bbd7d657cc | ||
|
|
f5bf4c81d1 | ||
|
|
02e8336659 | ||
|
|
601812e11b | ||
|
|
ea466320d2 | ||
|
|
25ea831a15 | ||
|
|
8648c79c9e | ||
|
|
7ae4d0e6f8 | ||
|
|
f3564db941 | ||
|
|
97caf12836 | ||
|
|
839d9ef41a | ||
|
|
782bd50f92 | ||
|
|
0a4d971b4a | ||
|
|
22562f767f | ||
|
|
eb80dd1db5 | ||
|
|
51d699ee21 | ||
|
|
83a33bff8c | ||
|
|
273563b9ad | ||
|
|
891990ec09 | ||
|
|
da0cd2b107 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.0.dev
|
||||
VERSION=5.0.rc5
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2577,8 +2577,8 @@ static bool hierarchy_actions(
|
||||
// attr member so we can use add()
|
||||
rjson::add_with_string_name(v, attr, std::move(*newv));
|
||||
} else {
|
||||
throw api_error::validation(format("Can't remove document path {} - not present in item",
|
||||
subh.get_value()._path));
|
||||
// Removing a.b when a is a map but a.b doesn't exist
|
||||
// is silently ignored. It's not considered an error.
|
||||
}
|
||||
} else {
|
||||
throw api_error::validation(format("UpdateExpression: document paths not valid for this item:{}", h));
|
||||
|
||||
@@ -87,19 +87,24 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
|
||||
// prefer expiring cells.
|
||||
return left.is_live_and_has_ttl() ? std::strong_ordering::greater : std::strong_ordering::less;
|
||||
}
|
||||
if (left.is_live_and_has_ttl() && left.expiry() != right.expiry()) {
|
||||
return left.expiry() <=> right.expiry();
|
||||
if (left.is_live_and_has_ttl()) {
|
||||
if (left.expiry() != right.expiry()) {
|
||||
return left.expiry() <=> right.expiry();
|
||||
} else {
|
||||
// prefer the cell that was written later,
|
||||
// so it survives longer after it expires, until purged.
|
||||
return right.ttl() <=> left.ttl();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Both are deleted
|
||||
if (left.deletion_time() != right.deletion_time()) {
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
|
||||
}
|
||||
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
|
||||
}
|
||||
return std::strong_ordering::equal;
|
||||
}
|
||||
|
||||
20
cdc/log.cc
20
cdc/log.cc
@@ -59,7 +59,7 @@ using namespace std::chrono_literals;
|
||||
logging::logger cdc_log("cdc");
|
||||
|
||||
namespace cdc {
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {}, schema_ptr = nullptr);
|
||||
}
|
||||
|
||||
static constexpr auto cdc_group_name = "cdc";
|
||||
@@ -206,7 +206,7 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt, log_schema);
|
||||
|
||||
auto log_mut = log_schema
|
||||
? db::schema_tables::make_update_table_mutations(db, keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
|
||||
@@ -484,7 +484,7 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
|
||||
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
|
||||
}
|
||||
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid, schema_ptr old) {
|
||||
schema_builder b(s.ks_name(), log_name(s.cf_name()));
|
||||
b.with_partitioner("com.scylladb.dht.CDCPartitioner");
|
||||
b.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
@@ -571,6 +571,20 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
b.set_uuid(*uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* #10473 - if we are redefining the log table, we need to ensure any dropped
|
||||
* columns are registered in "dropped_columns" table, otherwise clients will not
|
||||
* be able to read data older than now.
|
||||
*/
|
||||
if (old) {
|
||||
// not super efficient, but we don't do this often.
|
||||
for (auto& col : old->all_columns()) {
|
||||
if (!b.has_column({col.name(), col.name_as_text() })) {
|
||||
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return b.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -353,32 +353,50 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t)
|
||||
: _cm(cm)
|
||||
, _table(t)
|
||||
, _compaction_state(cm.get_compaction_state(_table))
|
||||
, _holder(_compaction_state.gate.hold())
|
||||
{
|
||||
_compaction_state.compaction_disabled_counter++;
|
||||
cmlog.debug("Temporarily disabled compaction for {}.{}. compaction_disabled_counter={}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), _compaction_state.compaction_disabled_counter);
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept
|
||||
: _cm(o._cm)
|
||||
, _table(std::exchange(o._table, nullptr))
|
||||
, _compaction_state(o._compaction_state)
|
||||
, _holder(std::move(o._holder))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_reenabler::~compaction_reenabler() {
|
||||
// submit compaction request if we're the last holder of the gate which is still opened.
|
||||
if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) {
|
||||
cmlog.debug("Reenabling compaction for {}.{}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name());
|
||||
try {
|
||||
_cm.submit(_table);
|
||||
} catch (...) {
|
||||
cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_reenabler>
|
||||
compaction_manager::stop_and_disable_compaction(replica::table* t) {
|
||||
compaction_reenabler cre(*this, t);
|
||||
co_await stop_ongoing_compactions("user-triggered operation", t);
|
||||
co_return cre;
|
||||
}
|
||||
|
||||
future<>
|
||||
compaction_manager::run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func) {
|
||||
auto& c_state = _compaction_state[t];
|
||||
auto holder = c_state.gate.hold();
|
||||
compaction_reenabler cre = co_await stop_and_disable_compaction(t);
|
||||
|
||||
c_state.compaction_disabled_counter++;
|
||||
|
||||
std::exception_ptr err;
|
||||
try {
|
||||
co_await stop_ongoing_compactions("user-triggered operation", t);
|
||||
co_await func();
|
||||
} catch (...) {
|
||||
err = std::current_exception();
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
assert(_compaction_state.contains(t));
|
||||
#endif
|
||||
// submit compaction request if we're the last holder of the gate which is still opened.
|
||||
if (--c_state.compaction_disabled_counter == 0 && !c_state.gate.is_closed()) {
|
||||
submit(t);
|
||||
}
|
||||
if (err) {
|
||||
std::rethrow_exception(err);
|
||||
}
|
||||
co_return;
|
||||
co_await func();
|
||||
}
|
||||
|
||||
void compaction_manager::task::setup_new_compaction() {
|
||||
@@ -810,7 +828,8 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
auto run_identifier = sst->run_identifier();
|
||||
auto sstable_set_snapshot = can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), _maintenance_sg.io,
|
||||
// FIXME: this compaction should run with maintenance priority.
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
|
||||
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
||||
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
@@ -819,8 +838,9 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
};
|
||||
|
||||
auto maintenance_permit = co_await seastar::get_units(_maintenance_ops_sem, 1);
|
||||
// Take write lock for table to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard.
|
||||
auto write_lock_holder = co_await _compaction_state[&t].lock.hold_write_lock();
|
||||
// FIXME: acquiring the read lock is not needed after acquiring the _maintenance_ops_sem
|
||||
// only major compaction needs to acquire the write lock to synchronize with regular compaction.
|
||||
auto lock_holder = co_await _compaction_state[&t].lock.hold_read_lock();
|
||||
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
@@ -852,7 +872,7 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
};
|
||||
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
completed = co_await with_scheduling_group(_maintenance_sg.cpu, std::ref(perform_rewrite));
|
||||
completed = co_await with_scheduling_group(_compaction_controller.sg(), std::ref(perform_rewrite));
|
||||
} while (!completed);
|
||||
};
|
||||
|
||||
|
||||
@@ -269,6 +269,31 @@ public:
|
||||
// parameter job is a function that will carry the operation
|
||||
future<> run_custom_job(replica::table* t, sstables::compaction_type type, noncopyable_function<future<>(sstables::compaction_data&)> job);
|
||||
|
||||
class compaction_reenabler {
|
||||
compaction_manager& _cm;
|
||||
replica::table* _table;
|
||||
compaction_state& _compaction_state;
|
||||
gate::holder _holder;
|
||||
|
||||
public:
|
||||
compaction_reenabler(compaction_manager&, replica::table*);
|
||||
compaction_reenabler(compaction_reenabler&&) noexcept;
|
||||
|
||||
~compaction_reenabler();
|
||||
|
||||
replica::table* compacting_table() const noexcept {
|
||||
return _table;
|
||||
}
|
||||
|
||||
const compaction_state& compaction_state() const noexcept {
|
||||
return _compaction_state;
|
||||
}
|
||||
};
|
||||
|
||||
// Disable compaction temporarily for a table t.
|
||||
// Caller should call the compaction_reenabler::reenable
|
||||
future<compaction_reenabler> stop_and_disable_compaction(replica::table* t);
|
||||
|
||||
// Run a function with compaction temporarily disabled for a table T.
|
||||
future<> run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func);
|
||||
|
||||
|
||||
@@ -69,7 +69,11 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(tabl
|
||||
}
|
||||
|
||||
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
|
||||
if (removed.empty() || added.empty()) {
|
||||
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
|
||||
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
|
||||
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
|
||||
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
|
||||
if (removed.empty() || added.empty() || !_last_compacted_keys) {
|
||||
return;
|
||||
}
|
||||
auto min_level = std::numeric_limits<uint32_t>::max();
|
||||
|
||||
@@ -217,6 +217,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
|
||||
auto compaction_time = gc_clock::now();
|
||||
|
||||
if (candidates.empty()) {
|
||||
_estimated_remaining_tasks = 0;
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
|
||||
@@ -81,9 +81,7 @@ public:
|
||||
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0;
|
||||
|
||||
virtual bool depends_on_keyspace(const seastar::sstring& ks_name) const = 0;
|
||||
|
||||
virtual bool depends_on_column_family(const seastar::sstring& cf_name) const = 0;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;
|
||||
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const = 0;
|
||||
|
||||
|
||||
@@ -103,7 +103,13 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
|
||||
if (!col_type->is_map()) {
|
||||
throw exceptions::invalid_request_exception(format("subscripting non-map column {}", cdef->name_as_text()));
|
||||
}
|
||||
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[data.sel.index_of(*cdef)]));
|
||||
int32_t index = data.sel.index_of(*cdef);
|
||||
if (index == -1) {
|
||||
throw std::runtime_error(
|
||||
format("Column definition {} does not match any column in the query selection",
|
||||
cdef->name_as_text()));
|
||||
}
|
||||
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[index]));
|
||||
const auto& data_map = value_cast<map_type_impl::native_type>(deserialized);
|
||||
const auto key = evaluate(*col.sub, options);
|
||||
auto&& key_type = col_type->name_comparator();
|
||||
@@ -121,8 +127,16 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
|
||||
case column_kind::clustering_key:
|
||||
return managed_bytes(data.clustering_key[cdef->id]);
|
||||
case column_kind::static_column:
|
||||
case column_kind::regular_column:
|
||||
return managed_bytes_opt(data.other_columns[data.sel.index_of(*cdef)]);
|
||||
[[fallthrough]];
|
||||
case column_kind::regular_column: {
|
||||
int32_t index = data.sel.index_of(*cdef);
|
||||
if (index == -1) {
|
||||
throw std::runtime_error(
|
||||
format("Column definition {} does not match any column in the query selection",
|
||||
cdef->name_as_text()));
|
||||
}
|
||||
return managed_bytes_opt(data.other_columns[index]);
|
||||
}
|
||||
default:
|
||||
throw exceptions::unsupported_operation_exception("Unknown column kind");
|
||||
}
|
||||
|
||||
@@ -953,7 +953,7 @@ bool query_processor::migration_subscriber::should_invalidate(
|
||||
sstring ks_name,
|
||||
std::optional<sstring> cf_name,
|
||||
::shared_ptr<cql_statement> statement) {
|
||||
return statement->depends_on_keyspace(ks_name) && (!cf_name || statement->depends_on_column_family(*cf_name));
|
||||
return statement->depends_on(ks_name, cf_name);
|
||||
}
|
||||
|
||||
future<> query_processor::query_internal(
|
||||
|
||||
@@ -514,7 +514,7 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
|
||||
}
|
||||
|
||||
if (!_nonprimary_key_restrictions->empty()) {
|
||||
if (_has_queriable_regular_index) {
|
||||
if (_has_queriable_regular_index && _partition_range_is_simple) {
|
||||
_uses_secondary_indexing = true;
|
||||
} else if (!allow_filtering) {
|
||||
throw exceptions::invalid_request_exception("Cannot execute this query as it might involve data filtering and "
|
||||
|
||||
@@ -165,7 +165,7 @@ public:
|
||||
|
||||
template<typename RowComparator>
|
||||
void sort(const RowComparator& cmp) {
|
||||
std::sort(_rows.begin(), _rows.end(), std::ref(cmp));
|
||||
std::sort(_rows.begin(), _rows.end(), cmp);
|
||||
}
|
||||
|
||||
metadata& get_metadata();
|
||||
|
||||
@@ -18,13 +18,7 @@ uint32_t cql3::statements::authentication_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool cql3::statements::authentication_statement::depends_on_keyspace(
|
||||
const sstring& ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool cql3::statements::authentication_statement::depends_on_column_family(
|
||||
const sstring& cf_name) const {
|
||||
bool cql3::statements::authentication_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -27,9 +27,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -20,13 +20,7 @@ uint32_t cql3::statements::authorization_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool cql3::statements::authorization_statement::depends_on_keyspace(
|
||||
const sstring& ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool cql3::statements::authorization_statement::depends_on_column_family(
|
||||
const sstring& cf_name) const {
|
||||
bool cql3::statements::authorization_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -31,9 +31,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -70,14 +70,9 @@ batch_statement::batch_statement(type type_,
|
||||
{
|
||||
}
|
||||
|
||||
bool batch_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
bool batch_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool batch_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
{
|
||||
return false;
|
||||
return boost::algorithm::any_of(_statements, [&ks_name, &cf_name] (auto&& s) { return s.statement->depends_on(ks_name, cf_name); });
|
||||
}
|
||||
|
||||
uint32_t batch_statement::get_bound_terms() const
|
||||
|
||||
@@ -88,9 +88,7 @@ public:
|
||||
std::unique_ptr<attributes> attrs,
|
||||
cql_stats& stats);
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
|
||||
@@ -539,12 +539,8 @@ modification_statement::validate(query_processor&, const service::client_state&
|
||||
}
|
||||
}
|
||||
|
||||
bool modification_statement::depends_on_keyspace(const sstring& ks_name) const {
|
||||
return keyspace() == ks_name;
|
||||
}
|
||||
|
||||
bool modification_statement::depends_on_column_family(const sstring& cf_name) const {
|
||||
return column_family() == cf_name;
|
||||
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
|
||||
}
|
||||
|
||||
void modification_statement::add_operation(::shared_ptr<operation> op) {
|
||||
|
||||
@@ -137,9 +137,7 @@ public:
|
||||
// Validate before execute, using client state and current schema
|
||||
void validate(query_processor&, const service::client_state& state) const override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
void add_operation(::shared_ptr<operation> op);
|
||||
|
||||
|
||||
@@ -45,12 +45,7 @@ future<> schema_altering_statement::grant_permissions_to_creator(const service::
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool schema_altering_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool schema_altering_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool schema_altering_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -53,9 +53,7 @@ protected:
|
||||
*/
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
|
||||
@@ -167,12 +167,8 @@ void select_statement::validate(query_processor&, const service::client_state& s
|
||||
// Nothing to do, all validation has been done by raw_statemet::prepare()
|
||||
}
|
||||
|
||||
bool select_statement::depends_on_keyspace(const sstring& ks_name) const {
|
||||
return keyspace() == ks_name;
|
||||
}
|
||||
|
||||
bool select_statement::depends_on_column_family(const sstring& cf_name) const {
|
||||
return column_family() == cf_name;
|
||||
bool select_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
|
||||
}
|
||||
|
||||
const sstring& select_statement::keyspace() const {
|
||||
|
||||
@@ -100,8 +100,7 @@ public:
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
virtual void validate(query_processor&, const service::client_state& state) const override;
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor& qp,
|
||||
service::query_state& state, const query_options& options) const override;
|
||||
|
||||
@@ -17,13 +17,7 @@ uint32_t service_level_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool service_level_statement::depends_on_keyspace(
|
||||
const sstring &ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool service_level_statement::depends_on_column_family(
|
||||
const sstring &cf_name) const {
|
||||
bool service_level_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -43,9 +43,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -39,12 +39,7 @@ std::unique_ptr<prepared_statement> truncate_statement::prepare(data_dictionary:
|
||||
return std::make_unique<prepared_statement>(::make_shared<truncate_statement>(*this));
|
||||
}
|
||||
|
||||
bool truncate_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool truncate_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool truncate_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -30,9 +30,7 @@ public:
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -46,12 +46,7 @@ std::unique_ptr<prepared_statement> use_statement::prepare(data_dictionary::data
|
||||
|
||||
}
|
||||
|
||||
bool use_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool use_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool use_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -31,9 +31,7 @@ public:
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
virtual bool depends_on_keyspace(const seastar::sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const seastar::sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual seastar::future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
23
db/config.cc
23
db/config.cc
@@ -65,6 +65,25 @@ hinted_handoff_enabled_to_json(const db::config::hinted_handoff_enabled_type& h)
|
||||
return value_to_json(h.to_configuration_string());
|
||||
}
|
||||
|
||||
// Convert a value that can be printed with operator<<, or a vector of
|
||||
// such values, to JSON. An example is enum_option<T>, because enum_option<T>
|
||||
// has a operator<<.
|
||||
template <typename T>
|
||||
static json::json_return_type
|
||||
printable_to_json(const T& e) {
|
||||
return value_to_json(format("{}", e));
|
||||
}
|
||||
template <typename T>
|
||||
static json::json_return_type
|
||||
printable_vector_to_json(const std::vector<T>& e) {
|
||||
std::vector<sstring> converted;
|
||||
converted.reserve(e.size());
|
||||
for (const auto& option : e) {
|
||||
converted.push_back(format("{}", option));
|
||||
}
|
||||
return value_to_json(converted);
|
||||
}
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<bool> = config_type("bool", value_to_json<bool>);
|
||||
|
||||
@@ -109,11 +128,11 @@ const config_type config_type_for<db::seed_provider_type> = config_type("seed pr
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<std::vector<enum_option<db::experimental_features_t>>> = config_type(
|
||||
"experimental features", value_to_json<std::vector<sstring>>);
|
||||
"experimental features", printable_vector_to_json<enum_option<db::experimental_features_t>>);
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<enum_option<db::tri_mode_restriction_t>> = config_type(
|
||||
"restriction mode", value_to_json<sstring>);
|
||||
"restriction mode", printable_to_json<enum_option<db::tri_mode_restriction_t>>);
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<db::config::hinted_handoff_enabled_type> = config_type("hinted handoff enabled", hinted_handoff_enabled_to_json);
|
||||
|
||||
@@ -202,6 +202,12 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
future<flush_permit> get_all_flush_permits() {
|
||||
return get_units(_background_work_flush_serializer, _max_background_work).then([this] (auto&& units) {
|
||||
return this->get_flush_permit(std::move(units));
|
||||
});
|
||||
}
|
||||
|
||||
bool has_extraneous_flushes_requested() const {
|
||||
return _extraneous_flushes > 0;
|
||||
}
|
||||
|
||||
6
dist/common/supervisor/scylla_util.sh
vendored
6
dist/common/supervisor/scylla_util.sh
vendored
@@ -6,12 +6,16 @@ is_nonroot() {
|
||||
[ -f "$scylladir"/SCYLLA-NONROOT-FILE ]
|
||||
}
|
||||
|
||||
is_container() {
|
||||
[ -f "$scylladir"/SCYLLA-CONTAINER-FILE ]
|
||||
}
|
||||
|
||||
is_privileged() {
|
||||
[ ${EUID:-${UID}} = 0 ]
|
||||
}
|
||||
|
||||
execsudo() {
|
||||
if is_nonroot; then
|
||||
if is_nonroot || is_container; then
|
||||
exec "$@"
|
||||
else
|
||||
exec sudo -u scylla -g scylla "$@"
|
||||
|
||||
4
dist/docker/debian/build_docker.sh
vendored
4
dist/docker/debian/build_docker.sh
vendored
@@ -82,15 +82,17 @@ run bash -ec "echo 'debconf debconf/frontend select Noninteractive' | debconf-se
|
||||
run bash -ec "rm -rf /etc/rsyslog.conf"
|
||||
run apt-get -y install hostname supervisor openssh-server openssh-client openjdk-11-jre-headless python python-yaml curl rsyslog locales sudo
|
||||
run locale-gen en_US.UTF-8
|
||||
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF_8
|
||||
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
|
||||
run bash -ec "dpkg -i packages/*.deb"
|
||||
run apt-get -y clean all
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
|
||||
run mkdir -p /etc/supervisor.conf.d
|
||||
run mkdir -p /var/log/scylla
|
||||
run chown -R scylla:scylla /var/lib/scylla
|
||||
run sed -i -e 's/^SCYLLA_ARGS=".*"$/SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --default-log-level info --network-stack posix"/' /etc/default/scylla-server
|
||||
|
||||
run mkdir -p /opt/scylladb/supervisor
|
||||
run touch /opt/scylladb/SCYLLA-CONTAINER-FILE
|
||||
bcp dist/common/supervisor/scylla-server.sh /opt/scylladb/supervisor/scylla-server.sh
|
||||
bcp dist/common/supervisor/scylla-jmx.sh /opt/scylladb/supervisor/scylla-jmx.sh
|
||||
bcp dist/common/supervisor/scylla-node-exporter.sh /opt/scylladb/supervisor/scylla-node-exporter.sh
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
[program:scylla-server]
|
||||
[program:scylla]
|
||||
command=/opt/scylladb/supervisor/scylla-server.sh
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
|
||||
41
dist/docker/etc/sysconfig/scylla-server
vendored
41
dist/docker/etc/sysconfig/scylla-server
vendored
@@ -1,41 +0,0 @@
|
||||
# choose following mode: virtio, dpdk, posix
|
||||
NETWORK_MODE=posix
|
||||
|
||||
# tap device name(virtio)
|
||||
TAP=tap0
|
||||
|
||||
# bridge device name (virtio)
|
||||
BRIDGE=virbr0
|
||||
|
||||
# ethernet device name
|
||||
IFNAME=eth0
|
||||
|
||||
# setup NIC's and disks' interrupts, RPS, XPS, nomerges and I/O scheduler (posix)
|
||||
SET_NIC_AND_DISKS=no
|
||||
|
||||
# ethernet device driver (dpdk)
|
||||
ETHDRV=
|
||||
|
||||
# ethernet device PCI ID (dpdk)
|
||||
ETHPCIID=
|
||||
|
||||
# number of hugepages
|
||||
NR_HUGEPAGES=64
|
||||
|
||||
# user for process (must be root for dpdk)
|
||||
USER=scylla
|
||||
|
||||
# group for process
|
||||
GROUP=scylla
|
||||
|
||||
# scylla home dir
|
||||
SCYLLA_HOME=/var/lib/scylla
|
||||
|
||||
# scylla config dir
|
||||
SCYLLA_CONF=/etc/scylla
|
||||
|
||||
# scylla arguments
|
||||
SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --default-log-level info --network-stack posix"
|
||||
|
||||
# setup as AMI instance
|
||||
AMI=no
|
||||
@@ -508,8 +508,13 @@ relocate_python3 "$rprefix"/scripts fix_system_distributed_tables.py
|
||||
if $supervisor; then
|
||||
install -d -m755 `supervisor_dir $retc`
|
||||
for service in scylla-server scylla-jmx scylla-node-exporter; do
|
||||
if [ "$service" = "scylla-server" ]; then
|
||||
program="scylla"
|
||||
else
|
||||
program=$service
|
||||
fi
|
||||
cat << EOS > `supervisor_conf $retc $service`
|
||||
[program:$service]
|
||||
[program:$program]
|
||||
directory=$rprefix
|
||||
command=/bin/bash -c './supervisor/$service.sh'
|
||||
EOS
|
||||
|
||||
33
main.cc
33
main.cc
@@ -367,11 +367,38 @@ static auto defer_verbose_shutdown(const char* what, Func&& func) {
|
||||
startlog.info("Shutting down {}", what);
|
||||
try {
|
||||
func();
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
} catch (...) {
|
||||
startlog.error("Unexpected error shutting down {}: {}", what, std::current_exception());
|
||||
throw;
|
||||
auto ex = std::current_exception();
|
||||
bool do_abort = true;
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (const std::system_error& e) {
|
||||
// System error codes we consider "environmental",
|
||||
// i.e. not scylla's fault, therefore there is no point in
|
||||
// aborting and dumping core.
|
||||
for (int i : {EIO, EACCES, ENOSPC}) {
|
||||
if (e.code() == std::error_code(i, std::system_category())) {
|
||||
do_abort = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
}
|
||||
auto msg = fmt::format("Unexpected error shutting down {}: {}", what, ex);
|
||||
if (do_abort) {
|
||||
startlog.error("{}: aborting", msg);
|
||||
abort();
|
||||
} else {
|
||||
startlog.error("{}: exiting, at {}", msg, current_backtrace());
|
||||
|
||||
// Call _exit() rather than exit() to exit immediately
|
||||
// without calling exit handlers, avoiding
|
||||
// boost::intrusive::detail::destructor_impl assert failure
|
||||
// from ~segment_pool exit handler.
|
||||
_exit(255);
|
||||
}
|
||||
}
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
};
|
||||
|
||||
auto ret = deferred_action(std::move(vfunc));
|
||||
|
||||
@@ -96,7 +96,7 @@ void range_tombstone_list::insert_from(const schema& s,
|
||||
if (cmp(end, it->position()) < 0) {
|
||||
// not overlapping
|
||||
if (it->tombstone().tomb == tomb && cmp(end, it->position()) == 0) {
|
||||
rev.update(it, {std::move(start), std::move(start), tomb});
|
||||
rev.update(it, {std::move(start), std::move(end), tomb});
|
||||
} else {
|
||||
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), tomb);
|
||||
rev.insert(it, *rt);
|
||||
|
||||
@@ -910,10 +910,9 @@ bool database::update_column_family(schema_ptr new_schema) {
|
||||
return columns_changed;
|
||||
}
|
||||
|
||||
future<> database::remove(const column_family& cf) noexcept {
|
||||
void database::remove(const table& cf) noexcept {
|
||||
auto s = cf.schema();
|
||||
auto& ks = find_keyspace(s->ks_name());
|
||||
co_await _querier_cache.evict_all_for_table(s->id());
|
||||
_column_families.erase(s->id());
|
||||
ks.metadata()->remove_column_family(s);
|
||||
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
|
||||
@@ -937,13 +936,20 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
|
||||
on_internal_error(dblog, fmt::format("drop_column_family {}.{}: UUID={} not found", ks_name, cf_name, uuid));
|
||||
}
|
||||
dblog.debug("Dropping {}.{}", ks_name, cf_name);
|
||||
co_await remove(*cf);
|
||||
remove(*cf);
|
||||
cf->clear_views();
|
||||
co_return co_await cf->await_pending_ops().then([this, &ks, cf, tsf = std::move(tsf), snapshot] {
|
||||
return truncate(ks, *cf, std::move(tsf), snapshot).finally([this, cf] {
|
||||
return cf->stop();
|
||||
});
|
||||
}).finally([cf] {});
|
||||
co_await cf->await_pending_ops();
|
||||
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await truncate(ks, *cf, std::move(tsf), snapshot);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await cf->stop();
|
||||
if (ex) {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
const utils::UUID& database::find_uuid(std::string_view ks, std::string_view cf) const {
|
||||
@@ -2062,80 +2068,77 @@ future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf)
|
||||
|
||||
future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf, bool with_snapshot) {
|
||||
dblog.debug("Truncating {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name());
|
||||
return with_gate(cf.async_gate(), [this, &ks, &cf, tsf = std::move(tsf), with_snapshot] () mutable -> future<> {
|
||||
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
|
||||
const auto should_flush = auto_snapshot;
|
||||
auto holder = cf.async_gate().hold();
|
||||
|
||||
// Force mutations coming in to re-acquire higher rp:s
|
||||
// This creates a "soft" ordering, in that we will guarantee that
|
||||
// any sstable written _after_ we issue the flush below will
|
||||
// only have higher rp:s than we will get from the discard_sstable
|
||||
// call.
|
||||
auto low_mark = cf.set_low_replay_position_mark();
|
||||
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
|
||||
const auto should_flush = auto_snapshot;
|
||||
|
||||
const auto uuid = cf.schema()->id();
|
||||
// Force mutations coming in to re-acquire higher rp:s
|
||||
// This creates a "soft" ordering, in that we will guarantee that
|
||||
// any sstable written _after_ we issue the flush below will
|
||||
// only have higher rp:s than we will get from the discard_sstable
|
||||
// call.
|
||||
auto low_mark = cf.set_low_replay_position_mark();
|
||||
|
||||
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
|
||||
future<> f = make_ready_future<>();
|
||||
bool did_flush = false;
|
||||
if (should_flush && cf.can_flush()) {
|
||||
// TODO:
|
||||
// this is not really a guarantee at all that we've actually
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
f = cf.flush();
|
||||
did_flush = true;
|
||||
} else {
|
||||
f = cf.clear();
|
||||
}
|
||||
return f.then([this, &cf, auto_snapshot, tsf = std::move(tsf), low_mark, should_flush, did_flush] {
|
||||
dblog.debug("Discarding sstable data for truncated CF + indexes");
|
||||
// TODO: notify truncation
|
||||
const auto uuid = cf.schema()->id();
|
||||
|
||||
return tsf().then([this, &cf, auto_snapshot, low_mark, should_flush, did_flush](db_clock::time_point truncated_at) {
|
||||
future<> f = make_ready_future<>();
|
||||
if (auto_snapshot) {
|
||||
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
|
||||
f = cf.snapshot(*this, name);
|
||||
}
|
||||
return f.then([this, &cf, truncated_at, low_mark, should_flush, did_flush] {
|
||||
return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush, did_flush](db::replay_position rp) {
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
|
||||
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
|
||||
// the low_mark assertion does not hold, because we maybe/probably never got around to
|
||||
// creating the sstables that would create them.
|
||||
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
return truncate_views(cf, truncated_at, should_flush).then([&cf, truncated_at, rp] {
|
||||
// save_truncation_record() may actually fail after we cached the truncation time
|
||||
// but this is not be worse that if failing without caching: at least the correct time
|
||||
// will be available until next reboot and a client will have to retry truncation anyway.
|
||||
cf.cache_truncation_record(truncated_at);
|
||||
return db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([this, uuid] {
|
||||
drop_repair_history_map_for_table(uuid);
|
||||
});
|
||||
});
|
||||
}
|
||||
std::vector<compaction_manager::compaction_reenabler> cres;
|
||||
cres.reserve(1 + cf.views().size());
|
||||
|
||||
future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) {
|
||||
return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) {
|
||||
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&cf));
|
||||
co_await parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> {
|
||||
auto& vcf = find_column_family(v);
|
||||
return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] {
|
||||
return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] {
|
||||
return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) {
|
||||
return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
|
||||
});
|
||||
});
|
||||
});
|
||||
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&vcf));
|
||||
});
|
||||
|
||||
bool did_flush = false;
|
||||
if (should_flush && cf.can_flush()) {
|
||||
// TODO:
|
||||
// this is not really a guarantee at all that we've actually
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
co_await cf.flush();
|
||||
did_flush = true;
|
||||
} else {
|
||||
co_await cf.clear();
|
||||
}
|
||||
|
||||
dblog.debug("Discarding sstable data for truncated CF + indexes");
|
||||
// TODO: notify truncation
|
||||
|
||||
db_clock::time_point truncated_at = co_await tsf();
|
||||
|
||||
if (auto_snapshot) {
|
||||
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
|
||||
co_await cf.snapshot(*this, name);
|
||||
}
|
||||
|
||||
db::replay_position rp = co_await cf.discard_sstables(truncated_at);
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
|
||||
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
|
||||
// the low_mark assertion does not hold, because we maybe/probably never got around to
|
||||
// creating the sstables that would create them.
|
||||
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
co_await parallel_for_each(cf.views(), [this, truncated_at, should_flush] (view_ptr v) -> future<> {
|
||||
auto& vcf = find_column_family(v);
|
||||
if (should_flush) {
|
||||
co_await vcf.flush();
|
||||
} else {
|
||||
co_await vcf.clear();
|
||||
}
|
||||
db::replay_position rp = co_await vcf.discard_sstables(truncated_at);
|
||||
co_await db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
|
||||
});
|
||||
// save_truncation_record() may actually fail after we cached the truncation time
|
||||
// but this is not be worse that if failing without caching: at least the correct time
|
||||
// will be available until next reboot and a client will have to retry truncation anyway.
|
||||
cf.cache_truncation_record(truncated_at);
|
||||
co_await db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
|
||||
|
||||
drop_repair_history_map_for_table(uuid);
|
||||
}
|
||||
|
||||
const sstring& database::get_snitch_name() const {
|
||||
|
||||
@@ -1371,6 +1371,7 @@ private:
|
||||
Future update_write_metrics(Future&& f);
|
||||
void update_write_metrics_for_timed_out_write();
|
||||
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, bool is_bootstrap, system_keyspace system);
|
||||
void remove(const table&) noexcept;
|
||||
public:
|
||||
static utils::UUID empty_version;
|
||||
|
||||
@@ -1568,11 +1569,9 @@ public:
|
||||
/** Truncates the given column family */
|
||||
future<> truncate(sstring ksname, sstring cfname, timestamp_func);
|
||||
future<> truncate(const keyspace& ks, column_family& cf, timestamp_func, bool with_snapshot = true);
|
||||
future<> truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush);
|
||||
|
||||
bool update_column_family(schema_ptr s);
|
||||
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);
|
||||
future<> remove(const column_family&) noexcept;
|
||||
|
||||
const logalloc::region_group& dirty_memory_region_group() const {
|
||||
return _dirty_memory_manager.region_group();
|
||||
|
||||
@@ -454,12 +454,13 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
|
||||
});
|
||||
}
|
||||
|
||||
future<> distributed_loader::populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, bool must_exist) {
|
||||
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), must_exist] {
|
||||
future<> distributed_loader::populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
|
||||
dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", ks, cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist);
|
||||
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), do_allow_offstrategy_compaction, dir_must_exist] {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
if (!file_exists(sstdir).get0()) {
|
||||
if (must_exist) {
|
||||
if (dir_must_exist) {
|
||||
throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", ks, cf, sstdir));
|
||||
}
|
||||
return;
|
||||
@@ -529,12 +530,14 @@ future<> distributed_loader::populate_column_family(distributed<replica::databas
|
||||
return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big);
|
||||
}, eligible_for_reshape_on_boot).get();
|
||||
|
||||
directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot] (sstables::sstable_directory& dir) {
|
||||
return dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot] (sstables::shared_sstable sst) {
|
||||
auto requires_offstrategy = sstables::offstrategy(!eligible_for_reshape_on_boot(sst));
|
||||
directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) {
|
||||
return dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) {
|
||||
auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst));
|
||||
return global_table->add_sstable_and_update_cache(sst, requires_offstrategy);
|
||||
}).then([&global_table] {
|
||||
}).then([&global_table, do_allow_offstrategy_compaction] {
|
||||
if (do_allow_offstrategy_compaction) {
|
||||
global_table->trigger_offstrategy_compaction();
|
||||
}
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
@@ -560,11 +563,11 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
|
||||
auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
|
||||
dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version());
|
||||
return ks.make_directory_for_column_family(cfname, uuid).then([&db, sstdir, uuid, ks_name, cfname] {
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname);
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
|
||||
}).then([&db, sstdir, ks_name, cfname] {
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, false /* must_exist */);
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
|
||||
}).then([&db, sstdir, uuid, ks_name, cfname] {
|
||||
return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname);
|
||||
return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
|
||||
}).handle_exception([ks_name, cfname, sstdir](std::exception_ptr eptr) {
|
||||
std::string msg =
|
||||
format("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/file.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
#include <filesystem>
|
||||
@@ -67,7 +68,9 @@ class distributed_loader {
|
||||
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
|
||||
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
|
||||
std::filesystem::path datadir, sstring ks, sstring cf);
|
||||
static future<> populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, bool must_exist = true);
|
||||
using allow_offstrategy_compaction = bool_class<struct allow_offstrategy_compaction_tag>;
|
||||
using must_exist = bool_class<struct must_exist_tag>;
|
||||
static future<> populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, allow_offstrategy_compaction, must_exist = must_exist::yes);
|
||||
static future<> populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name);
|
||||
static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir);
|
||||
static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);
|
||||
|
||||
@@ -662,11 +662,21 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
|
||||
[] (const dht::decorated_key&) { return api::min_timestamp; });
|
||||
}
|
||||
|
||||
mutation_fragment* fragment = co_await reader.peek();
|
||||
if (!fragment) {
|
||||
std::exception_ptr err;
|
||||
try {
|
||||
mutation_fragment* fragment = co_await reader.peek();
|
||||
if (!fragment) {
|
||||
co_await reader.close();
|
||||
_memtables->erase(old);
|
||||
co_return stop_iteration::yes;
|
||||
}
|
||||
} catch (...) {
|
||||
err = std::current_exception();
|
||||
}
|
||||
if (err) {
|
||||
tlogger.error("failed to flush memtable for {}.{}: {}", old->schema()->ks_name(), old->schema()->cf_name(), err);
|
||||
co_await reader.close();
|
||||
_memtables->erase(old);
|
||||
co_return stop_iteration::yes;
|
||||
co_return stop_iteration(_async_gate.is_closed());
|
||||
}
|
||||
|
||||
auto f = consumer(upgrade_to_v2(std::move(reader)));
|
||||
@@ -1571,13 +1581,14 @@ bool table::can_flush() const {
|
||||
}
|
||||
|
||||
future<> table::clear() {
|
||||
auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits();
|
||||
if (_commitlog) {
|
||||
for (auto& t : *_memtables) {
|
||||
_commitlog->discard_completed_segments(_schema->id(), t->get_and_discard_rp_set());
|
||||
}
|
||||
}
|
||||
_memtables->clear_and_add();
|
||||
return _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
||||
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
||||
}
|
||||
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
@@ -2235,7 +2246,7 @@ std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double
|
||||
|
||||
void
|
||||
table::enable_auto_compaction() {
|
||||
// FIXME: unmute backlog. turn table backlog back on.
|
||||
// XXX: unmute backlog. turn table backlog back on.
|
||||
// see table::disable_auto_compaction() notes.
|
||||
_compaction_disabled_by_user = false;
|
||||
trigger_compaction();
|
||||
@@ -2243,7 +2254,7 @@ table::enable_auto_compaction() {
|
||||
|
||||
future<>
|
||||
table::disable_auto_compaction() {
|
||||
// FIXME: mute backlog. When we disable background compactions
|
||||
// XXX: mute backlog. When we disable background compactions
|
||||
// for the table, we must also disable current backlog of the
|
||||
// table compaction strategy that contributes to the scheduling
|
||||
// group resources prioritization.
|
||||
@@ -2270,9 +2281,8 @@ table::disable_auto_compaction() {
|
||||
// - it will break computation of major compaction descriptor
|
||||
// for new submissions
|
||||
_compaction_disabled_by_user = true;
|
||||
return with_gate(_async_gate, [this] {
|
||||
return compaction_manager().stop_ongoing_compactions("disable auto-compaction", this, sstables::compaction_type::Compaction);
|
||||
});
|
||||
// FIXME: stop ongoing compactions
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 0d250d15ac...4a30c44c4c
@@ -1030,6 +1030,20 @@ def test_nested_attribute_remove_from_missing_item(test_table_s):
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x[0]')
|
||||
|
||||
# Though in an above test (test_nested_attribute_update_bad_path_dot) we
|
||||
# showed that DynamoDB does not allow REMOVE x.y if attribute x doesn't
|
||||
# exist - and generates a ValidationException, if x *does* exist but y
|
||||
# doesn't, it's fine and the removal should just be silently ignored.
|
||||
def test_nested_attribute_remove_missing_leaf(test_table_s):
|
||||
p = random_string()
|
||||
item = {'p': p, 'a': {'x': 3}, 'b': ['hi']}
|
||||
test_table_s.put_item(Item=item)
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE a.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE b[7]')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE c')
|
||||
# The above UpdateItem calls didn't change anything...
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == item
|
||||
|
||||
# Similarly for other types of bad paths - using [0] on something which
|
||||
# doesn't exist or isn't an array.
|
||||
def test_nested_attribute_update_bad_path_array(test_table_s):
|
||||
|
||||
@@ -207,7 +207,9 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
|
||||
}
|
||||
|
||||
{
|
||||
cf_lru.evict_all();
|
||||
with_allocator(region.allocator(), [] {
|
||||
cf_lru.evict_all();
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.cached_bytes); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, cf.cached_bytes()); // change here
|
||||
@@ -215,6 +217,8 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(region.occupancy().used_space(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
#include <deque>
|
||||
#include <random>
|
||||
#include "utils/lsa/chunked_managed_vector.hh"
|
||||
#include "utils/managed_ref.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
#include <boost/range/algorithm/sort.hpp>
|
||||
#include <boost/range/algorithm/equal.hpp>
|
||||
@@ -203,3 +205,106 @@ SEASTAR_TEST_CASE(tests_reserve_partial) {
|
||||
});
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_clear_and_release) {
|
||||
region region;
|
||||
allocating_section as;
|
||||
|
||||
with_allocator(region.allocator(), [&] {
|
||||
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
|
||||
|
||||
for (uint64_t i = 1; i < 4000; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(i));
|
||||
});
|
||||
}
|
||||
|
||||
v.clear_and_release();
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_chunk_reserve) {
|
||||
region region;
|
||||
allocating_section as;
|
||||
|
||||
for (auto conf :
|
||||
{ // std::make_pair(reserve size, push count)
|
||||
std::make_pair(0, 4000),
|
||||
std::make_pair(100, 4000),
|
||||
std::make_pair(200, 4000),
|
||||
std::make_pair(1000, 4000),
|
||||
std::make_pair(2000, 4000),
|
||||
std::make_pair(3000, 4000),
|
||||
std::make_pair(5000, 4000),
|
||||
std::make_pair(500, 8000),
|
||||
std::make_pair(1000, 8000),
|
||||
std::make_pair(2000, 8000),
|
||||
std::make_pair(8000, 500),
|
||||
})
|
||||
{
|
||||
with_allocator(region.allocator(), [&] {
|
||||
auto [reserve_size, push_count] = conf;
|
||||
testlog.info("Testing reserve({}), {}x emplace_back()", reserve_size, push_count);
|
||||
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
|
||||
v.reserve(reserve_size);
|
||||
uint64_t seed = rand();
|
||||
for (uint64_t i = 0; i < push_count; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(seed + i));
|
||||
BOOST_REQUIRE(**v.begin() == seed);
|
||||
});
|
||||
}
|
||||
auto v_it = v.begin();
|
||||
for (uint64_t i = 0; i < push_count; ++i) {
|
||||
BOOST_REQUIRE(**v_it++ == seed + i);
|
||||
}
|
||||
v.clear_and_release();
|
||||
});
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
|
||||
// the last reserved chunk.
|
||||
SEASTAR_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
|
||||
region region;
|
||||
allocating_section as;
|
||||
|
||||
with_allocator(region.allocator(), [&] {
|
||||
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
|
||||
|
||||
// Fill two chunks
|
||||
v.reserve(2000);
|
||||
for (uint64_t i = 0; i < 2000; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(i));
|
||||
});
|
||||
}
|
||||
|
||||
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
|
||||
v.shrink_to_fit();
|
||||
|
||||
// Leave the last chunk reserved but empty
|
||||
for (uint64_t i = 0; i < 1000; ++i) {
|
||||
v.pop_back();
|
||||
}
|
||||
|
||||
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
|
||||
// with _size not in the last chunk. Should not sigsegv.
|
||||
v.reserve(8000);
|
||||
|
||||
for (uint64_t i = 0; i < 2000; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(i));
|
||||
});
|
||||
}
|
||||
|
||||
v.clear_and_release();
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
|
||||
@@ -178,3 +178,32 @@ BOOST_AUTO_TEST_CASE(tests_reserve_partial) {
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), orig_size);
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
|
||||
// the last reserved chunk.
|
||||
BOOST_AUTO_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
|
||||
using vector_type = utils::chunked_vector<std::unique_ptr<uint64_t>>;
|
||||
vector_type v;
|
||||
|
||||
// Fill two chunks
|
||||
v.reserve(vector_type::max_chunk_capacity() * 3 / 2);
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 3 / 2; ++i) {
|
||||
v.emplace_back(std::make_unique<uint64_t>(i));
|
||||
}
|
||||
|
||||
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
|
||||
v.shrink_to_fit();
|
||||
|
||||
// Leave the last chunk reserved but empty
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity(); ++i) {
|
||||
v.pop_back();
|
||||
}
|
||||
|
||||
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
|
||||
// with _size not in the last chunk. Should not sigsegv.
|
||||
v.reserve(vector_type::max_chunk_capacity() * 4);
|
||||
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 2; ++i) {
|
||||
v.emplace_back(std::make_unique<uint64_t>(i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -985,3 +985,38 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
||||
BOOST_REQUIRE(expected.empty());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
||||
auto& db = e.local_db();
|
||||
const auto ts = db_clock::now();
|
||||
auto& tbl = db.find_column_family("ks", "cf");
|
||||
|
||||
auto op = std::optional(tbl.read_in_progress());
|
||||
auto s = tbl.schema();
|
||||
auto q = query::data_querier(
|
||||
tbl.as_mutation_source(),
|
||||
tbl.schema(),
|
||||
database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
default_priority_class(),
|
||||
nullptr);
|
||||
|
||||
auto f = e.db().invoke_on_all([ts] (replica::database& db) {
|
||||
return db.drop_column_family("ks", "cf", [ts] { return make_ready_future<db_clock::time_point>(ts); });
|
||||
});
|
||||
|
||||
// we add a querier to the querier cache while the drop is ongoing
|
||||
auto& qc = db.get_querier_cache();
|
||||
qc.insert(utils::make_random_uuid(), std::move(q), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 1);
|
||||
|
||||
op.reset(); // this should allow the drop to finish
|
||||
f.get();
|
||||
|
||||
// the drop should have cleaned up all entries belonging to that table
|
||||
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
#include <vector>
|
||||
#include <numeric>
|
||||
@@ -428,6 +429,163 @@ SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged_minimum_size) {
|
||||
return seastar::async([] {
|
||||
// Test that unprivileged section is not starved.
|
||||
//
|
||||
// This scenario is tested: cache max_size is 50 and there are 49 entries in
|
||||
// privileged section. After adding 5 elements (that go to unprivileged
|
||||
// section) all of them should stay in unprivileged section and elements
|
||||
// in privileged section should get evicted.
|
||||
//
|
||||
// Wrong handling of this situation caused problems with BATCH statements
|
||||
// where all prepared statements in the batch have to stay in cache at
|
||||
// the same time for the batch to correctly execute.
|
||||
|
||||
using namespace std::chrono;
|
||||
utils::loading_cache<int, sstring, 1> loading_cache(50, 1h, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
prepare().get();
|
||||
|
||||
// Add 49 elements to privileged section
|
||||
for (int i = 0; i < 49; i++) {
|
||||
// Touch the value with the key "i" twice
|
||||
loading_cache.get_ptr(i, loader).discard_result().get();
|
||||
loading_cache.find(i);
|
||||
}
|
||||
|
||||
// Add 5 elements to unprivileged section
|
||||
for (int i = 50; i < 55; i++) {
|
||||
loading_cache.get_ptr(i, loader).discard_result().get();
|
||||
}
|
||||
|
||||
// Make sure that none of 5 elements were evicted
|
||||
for (int i = 50; i < 55; i++) {
|
||||
BOOST_REQUIRE(loading_cache.find(i) != nullptr);
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 50);
|
||||
});
|
||||
}
|
||||
|
||||
struct sstring_length_entry_size {
|
||||
size_t operator()(const sstring& val) {
|
||||
return val.size();
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_section_size_correctly_calculated) {
|
||||
return seastar::async([] {
|
||||
auto load_len1 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(1)); };
|
||||
auto load_len5 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(5)); };
|
||||
auto load_len10 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(10)); };
|
||||
auto load_len95 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(95)); };
|
||||
|
||||
using namespace std::chrono;
|
||||
utils::loading_cache<int, sstring, 1, utils::loading_cache_reload_enabled::no, sstring_length_entry_size> loading_cache(100, 1h, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
loading_cache.get_ptr(1, load_len1).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
|
||||
|
||||
loading_cache.get_ptr(2, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);
|
||||
|
||||
// Move "2" to privileged section by touching it the second time.
|
||||
loading_cache.get_ptr(2, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);
|
||||
|
||||
loading_cache.get_ptr(3, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);
|
||||
|
||||
// Move "1" to privileged section. load_len10 should not get executed, as "1"
|
||||
// is already in the cache.
|
||||
loading_cache.get_ptr(1, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 10);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);
|
||||
|
||||
// Flood cache with elements of size 10,
|
||||
// unprivileged. "1" and "2" should stay in the privileged section.
|
||||
for (int i = 11; i < 30; i++) {
|
||||
loading_cache.get_ptr(i, load_len10).discard_result().get();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 100);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
|
||||
// Flood cache with elements of size 10, privileged.
|
||||
for (int i = 11; i < 30; i++) {
|
||||
loading_cache.get_ptr(i, load_len10).discard_result().get();
|
||||
loading_cache.get_ptr(i, load_len10).discard_result().get();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 100);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);
|
||||
|
||||
// Add one new unprivileged entry.
|
||||
loading_cache.get_ptr(31, load_len1).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);
|
||||
|
||||
// Add another unprivileged entry, privileged entry should get evicted.
|
||||
loading_cache.get_ptr(32, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);
|
||||
|
||||
// Make it privileged by touching it again.
|
||||
loading_cache.get_ptr(32, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);
|
||||
|
||||
// Add another unprivileged entry.
|
||||
loading_cache.get_ptr(33, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
|
||||
// Add another unprivileged entry, privileged entry should get evicted.
|
||||
loading_cache.get_ptr(34, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 85);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 21);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
|
||||
// Add a big unprivileged entry, filling almost entire cache.
|
||||
loading_cache.get_ptr(35, load_len95).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 75);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 95 + 21);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
|
||||
return seastar::async([] {
|
||||
using namespace std::chrono;
|
||||
@@ -449,3 +607,169 @@ SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_loading_cache_remove_leaves_no_old_entries_behind) {
|
||||
using namespace std::chrono;
|
||||
load_count = 0;
|
||||
|
||||
auto load_v1 = [] (auto key) { return make_ready_future<sstring>("v1"); };
|
||||
auto load_v2 = [] (auto key) { return make_ready_future<sstring>("v2"); };
|
||||
auto load_v3 = [] (auto key) { return make_ready_future<sstring>("v3"); };
|
||||
|
||||
{
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
//
|
||||
// Test remove() concurrent with loading
|
||||
//
|
||||
|
||||
auto f = loading_cache.get_ptr(0, [&](auto key) {
|
||||
return yield().then([&] {
|
||||
return load_v1(key);
|
||||
});
|
||||
});
|
||||
|
||||
loading_cache.remove(0);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
auto ptr1 = f.get0();
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
|
||||
loading_cache.remove(0);
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
|
||||
|
||||
//
|
||||
// Test that live ptr1, removed from cache, does not prevent reload of new value
|
||||
//
|
||||
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
|
||||
ptr1 = nullptr;
|
||||
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
|
||||
}
|
||||
|
||||
// Test remove_if()
|
||||
{
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
//
|
||||
// Test remove_if() concurrent with loading
|
||||
//
|
||||
auto f = loading_cache.get_ptr(0, [&](auto key) {
|
||||
return yield().then([&] {
|
||||
return load_v1(key);
|
||||
});
|
||||
});
|
||||
|
||||
loading_cache.remove_if([] (auto&& v) { return v == "v1"; });
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
auto ptr1 = f.get0();
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
|
||||
loading_cache.remove_if([] (auto&& v) { return v == "v2"; });
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
|
||||
|
||||
//
|
||||
// Test that live ptr1, removed from cache, does not prevent reload of new value
|
||||
//
|
||||
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
|
||||
ptr1 = nullptr;
|
||||
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
|
||||
ptr2 = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_prepared_statement_small_cache) {
|
||||
// CQL prepared statement cache uses loading_cache
|
||||
// internally.
|
||||
constexpr auto CACHE_SIZE = 950000;
|
||||
|
||||
cql_test_config small_cache_config;
|
||||
small_cache_config.qp_mcfg = {CACHE_SIZE, CACHE_SIZE};
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE tbl1 (a int, b int, PRIMARY KEY (a))").get();
|
||||
|
||||
auto current_uid = 0;
|
||||
|
||||
// Prepare 100 queries and execute them twice,
|
||||
// filling "privileged section" of loading_cache.
|
||||
std::vector<cql3::prepared_cache_key_type> prepared_ids_privileged;
|
||||
for (int i = 0; i < 100; i++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
prepared_ids_privileged.push_back(prepared_id);
|
||||
}
|
||||
|
||||
int how_many_in_cache = 0;
|
||||
for (auto& prepared_id : prepared_ids_privileged) {
|
||||
if (e.local_qp().get_prepared(prepared_id)) {
|
||||
how_many_in_cache++;
|
||||
}
|
||||
}
|
||||
|
||||
// Assumption: CACHE_SIZE should hold at least 50 queries,
|
||||
// but not more than 99 queries. Other checks in this
|
||||
// test rely on that fact.
|
||||
BOOST_REQUIRE(how_many_in_cache >= 50);
|
||||
BOOST_REQUIRE(how_many_in_cache <= 99);
|
||||
|
||||
// Then prepare 5 queries and execute them one time,
|
||||
// which will occupy "unprivileged section" of loading_cache.
|
||||
std::vector<cql3::prepared_cache_key_type> prepared_ids_unprivileged;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
prepared_ids_unprivileged.push_back(prepared_id);
|
||||
}
|
||||
|
||||
// Check that all of those prepared queries can still be
|
||||
// executed. This simulates as if you wanted to execute
|
||||
// a BATCH with all of them, which requires all of those
|
||||
// prepared statements to be executable (in the cache).
|
||||
for (auto& prepared_id : prepared_ids_unprivileged) {
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
}
|
||||
|
||||
// Deterministic random for reproducibility.
|
||||
testing::local_random_engine.seed(12345);
|
||||
|
||||
// Prepare 500 queries and execute them a random number of times.
|
||||
for (int i = 0; i < 500; i++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
auto times = rand_int(4);
|
||||
for (int j = 0; j < times; j++) {
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare 100 simulated "batches" and execute them
|
||||
// a random number of times.
|
||||
for (int i = 0; i < 100; i++) {
|
||||
std::vector<cql3::prepared_cache_key_type> prepared_ids_batch;
|
||||
for (int j = 0; j < 5; j++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
prepared_ids_batch.push_back(prepared_id);
|
||||
}
|
||||
auto times = rand_int(4);
|
||||
for (int j = 0; j < times; j++) {
|
||||
for (auto& prepared_id : prepared_ids_batch) {
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}, small_cache_config);
|
||||
}
|
||||
|
||||
@@ -690,6 +690,7 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
};
|
||||
|
||||
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
|
||||
testlog.trace("Expected {} == {}", c1, c2);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
|
||||
};
|
||||
@@ -711,9 +712,11 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2));
|
||||
|
||||
// Origin doesn't compare ttl (is it wise?)
|
||||
assert_equal(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2));
|
||||
// But we do. See https://github.com/scylladb/scylla/issues/10156
|
||||
// and https://github.com/scylladb/scylla/issues/10173
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
|
||||
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value1")),
|
||||
|
||||
@@ -24,11 +24,13 @@ static void add_entry(logalloc::region& r,
|
||||
{
|
||||
logalloc::allocating_section as;
|
||||
as(r, [&] {
|
||||
sstables::key sst_key = sstables::key::from_partition_key(s, key);
|
||||
page._entries.push_back(make_managed<index_entry>(
|
||||
managed_bytes(sst_key.get_bytes()),
|
||||
position,
|
||||
managed_ref<promoted_index>()));
|
||||
with_allocator(r.allocator(), [&] {
|
||||
sstables::key sst_key = sstables::key::from_partition_key(s, key);
|
||||
page._entries.push_back(make_managed<index_entry>(
|
||||
managed_bytes(sst_key.get_bytes()),
|
||||
position,
|
||||
managed_ref<promoted_index>()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ from cassandra.cluster import ConsistencyLevel
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
from util import new_test_table
|
||||
from nodetool import flush
|
||||
|
||||
def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
|
||||
'''Test that the stream IDs chosen for CDC log entries come from the CDC generation
|
||||
@@ -31,3 +32,16 @@ def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
|
||||
|
||||
assert(log_stream_ids.issubset(stream_ids))
|
||||
|
||||
|
||||
# Test for #10473 - reading logs (from sstable) after dropping
|
||||
# column in base.
|
||||
def test_cdc_alter_table_drop_column(scylla_only, cql, test_keyspace):
|
||||
schema = "pk int primary key, v int"
|
||||
extra = " with cdc = {'enabled': true}"
|
||||
with new_test_table(cql, test_keyspace, schema, extra) as table:
|
||||
cql.execute(f"insert into {table} (pk, v) values (0, 0)")
|
||||
cql.execute(f"insert into {table} (pk, v) values (1, null)")
|
||||
flush(cql, table)
|
||||
flush(cql, table + "_scylla_cdc_log")
|
||||
cql.execute(f"alter table {table} drop v")
|
||||
cql.execute(f"select * from {table}_scylla_cdc_log")
|
||||
|
||||
@@ -115,3 +115,16 @@ def test_operator_ne_not_supported(cql, table1):
|
||||
cql.execute(f'SELECT a FROM {table1} WHERE a != 0')
|
||||
with pytest.raises(InvalidRequest, match='Unsupported.*!='):
|
||||
cql.execute(f'SELECT a FROM {table1} WHERE token(a) != 0')
|
||||
|
||||
# Test that the fact that a column is indexed does not cause us to fetch
|
||||
# incorrect results from a filtering query (issue #10300).
|
||||
def test_index_with_in_relation(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int, c int, v boolean, primary key (p,c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"create index on {table}(v)")
|
||||
for p, c, v in [(0,0,True),(0,1,False),(0,2,True),(0,3,False),
|
||||
(1,0,True),(1,1,False),(1,2,True),(1,3,False),
|
||||
(2,0,True),(2,1,False),(2,2,True),(2,3,False)]:
|
||||
cql.execute(f"insert into {table} (p,c,v) values ({p}, {c}, {v})")
|
||||
res = cql.execute(f"select * from {table} where p in (0,1) and v = False ALLOW FILTERING")
|
||||
assert set(res) == set([(0,1,False),(0,3,False),(1,1,False), (1,3,False)])
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
import pytest
|
||||
import util
|
||||
import nodetool
|
||||
import json
|
||||
|
||||
def test_snapshots_table(scylla_only, cql, test_keyspace):
|
||||
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
|
||||
@@ -32,3 +33,31 @@ def test_runtime_info(scylla_only, cql):
|
||||
|
||||
def test_versions(scylla_only, cql):
|
||||
_check_exists(cql, "versions", ("key", "build_id", "build_mode", "version"))
|
||||
|
||||
# Check reading the system.config table, which should list all configuration
|
||||
# parameters. As we noticed in issue #10047, each type of configuration
|
||||
# parameter can have a different function for printing it out, and some of
|
||||
# those may be wrong so we want to check as many as we can - including
|
||||
# specifically the experimental_features option which was wrong in #10047.
|
||||
def test_system_config_read(scylla_only, cql):
|
||||
# All rows should have the columns name, source, type and value:
|
||||
rows = list(cql.execute("SELECT name, source, type, value FROM system.config"))
|
||||
values = dict()
|
||||
for row in rows:
|
||||
values[row.name] = row.value
|
||||
# Check that experimental_features exists and makes sense.
|
||||
# It needs to be a JSON-formatted strings, and the strings need to be
|
||||
# ASCII feature names - not binary garbage as it was in #10047.
|
||||
assert 'experimental_features' in values
|
||||
obj = json.loads(values['experimental_features'])
|
||||
assert isinstance(obj, list)
|
||||
assert isinstance(obj[0], str)
|
||||
assert obj[0] and obj[0].isascii() and obj[0].isprintable()
|
||||
# Check formatting of tri_mode_restriction like
|
||||
# restrict_replication_simplestrategy. These need to be one of
|
||||
# allowed string values 0, 1, true, false or warn - but in particular
|
||||
# non-empty and printable ASCII, not garbage.
|
||||
assert 'restrict_replication_simplestrategy' in values
|
||||
obj = json.loads(values['restrict_replication_simplestrategy'])
|
||||
assert isinstance(obj, str)
|
||||
assert obj and obj.isascii() and obj.isprintable()
|
||||
|
||||
@@ -626,7 +626,12 @@ public:
|
||||
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr)).get();
|
||||
auto stop_mm = defer([&mm] { mm.stop().get(); });
|
||||
|
||||
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
|
||||
cql3::query_processor::memory_config qp_mcfg;
|
||||
if (cfg_in.qp_mcfg) {
|
||||
qp_mcfg = *cfg_in.qp_mcfg;
|
||||
} else {
|
||||
qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
|
||||
}
|
||||
auto local_data_dict = seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db));
|
||||
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config)).get();
|
||||
auto stop_qp = defer([&qp] { qp.stop().get(); });
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "cql3/query_options_fwd.hh"
|
||||
#include "cql3/values.hh"
|
||||
#include "cql3/prepared_statements_cache.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "bytes.hh"
|
||||
#include "schema.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
@@ -85,6 +86,7 @@ public:
|
||||
// Scheduling groups are overwritten unconditionally, see get_scheduling_groups().
|
||||
std::optional<replica::database_config> dbcfg;
|
||||
std::set<sstring> disabled_features;
|
||||
std::optional<cql3::query_processor::memory_config> qp_mcfg;
|
||||
|
||||
cql_test_config();
|
||||
cql_test_config(const cql_test_config&);
|
||||
|
||||
Submodule tools/java updated: b1e09c8b8f...2241a63bda
@@ -444,7 +444,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
break;
|
||||
case auth_state::AUTHENTICATION:
|
||||
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
|
||||
assert(cqlop == cql_binary_opcode::AUTH_RESPONSE || cqlop == cql_binary_opcode::CREDENTIALS);
|
||||
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
|
||||
throw exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop)));
|
||||
}
|
||||
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
|
||||
client_state.set_auth_state(auth_state::READY);
|
||||
}
|
||||
@@ -1219,7 +1221,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
||||
return make_read_timeout_error(stream, exceptions::exception_code::READ_TIMEOUT, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
||||
}
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -1247,7 +1249,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_writ
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state);
|
||||
return make_mutation_write_timeout_error(stream, exceptions::exception_code::WRITE_TIMEOUT, std::move(msg), cl, received, blockfor, type, tr_state);
|
||||
}
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
|
||||
@@ -326,6 +326,7 @@ public:
|
||||
}
|
||||
|
||||
size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept {
|
||||
return with_allocator(standard_allocator(), [&] {
|
||||
size_t count = 0;
|
||||
auto disposer = [] (auto* p) noexcept {};
|
||||
while (start != end) {
|
||||
@@ -338,6 +339,7 @@ public:
|
||||
}
|
||||
}
|
||||
return count;
|
||||
});
|
||||
}
|
||||
public:
|
||||
/// \brief Constructs a cached_file.
|
||||
@@ -464,8 +466,10 @@ public:
|
||||
inline
|
||||
void cached_file::cached_page::on_evicted() noexcept {
|
||||
parent->on_evicted(*this);
|
||||
cached_file::cache_type::iterator it(this);
|
||||
it.erase(page_idx_less_comparator());
|
||||
with_allocator(standard_allocator(), [this] {
|
||||
cached_file::cache_type::iterator it(this);
|
||||
it.erase(page_idx_less_comparator());
|
||||
});
|
||||
}
|
||||
|
||||
class cached_file_impl : public file_impl {
|
||||
|
||||
@@ -376,7 +376,9 @@ chunked_vector<T, max_contiguous_allocation>::make_room(size_t n, bool stop_afte
|
||||
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
|
||||
// FIXME: realloc? maybe not worth the complication; only works for PODs
|
||||
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
|
||||
if (_size > _capacity - last_chunk_capacity) {
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
|
||||
}
|
||||
_chunks.back() = std::move(new_last_chunk);
|
||||
_capacity += capacity_increase;
|
||||
}
|
||||
|
||||
@@ -108,81 +108,82 @@ template<typename Key,
|
||||
typename Alloc = std::pmr::polymorphic_allocator<>>
|
||||
class loading_cache {
|
||||
|
||||
using loading_cache_clock_type = seastar::lowres_clock;
|
||||
using safe_link_list_hook = bi::list_base_hook<bi::link_mode<bi::safe_link>>;
|
||||
using loading_cache_clock_type = seastar::lowres_clock;
|
||||
using safe_link_list_hook = bi::list_base_hook<bi::link_mode<bi::safe_link>>;
|
||||
|
||||
class timestamped_val {
|
||||
public:
|
||||
using value_type = Tp;
|
||||
using loading_values_type = typename utils::loading_shared_values<Key, timestamped_val, Hash, EqualPred, LoadingSharedValuesStats, 256>;
|
||||
class lru_entry;
|
||||
class value_ptr;
|
||||
class timestamped_val {
|
||||
public:
|
||||
using value_type = Tp;
|
||||
using loading_values_type = typename utils::loading_shared_values<Key, timestamped_val, Hash, EqualPred, LoadingSharedValuesStats, 256>;
|
||||
class lru_entry;
|
||||
class value_ptr;
|
||||
|
||||
private:
|
||||
value_type _value;
|
||||
loading_cache_clock_type::time_point _loaded;
|
||||
loading_cache_clock_type::time_point _last_read;
|
||||
lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back
|
||||
size_t _size = 0;
|
||||
private:
|
||||
value_type _value;
|
||||
loading_cache_clock_type::time_point _loaded;
|
||||
loading_cache_clock_type::time_point _last_read;
|
||||
lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back
|
||||
size_t _size = 0;
|
||||
|
||||
public:
|
||||
timestamped_val(value_type val)
|
||||
: _value(std::move(val))
|
||||
, _loaded(loading_cache_clock_type::now())
|
||||
, _last_read(_loaded)
|
||||
, _size(EntrySize()(_value))
|
||||
{}
|
||||
timestamped_val(timestamped_val&&) = default;
|
||||
public:
|
||||
timestamped_val(value_type val)
|
||||
: _value(std::move(val))
|
||||
, _loaded(loading_cache_clock_type::now())
|
||||
, _last_read(_loaded)
|
||||
, _size(EntrySize()(_value))
|
||||
{}
|
||||
timestamped_val(timestamped_val&&) = default;
|
||||
|
||||
timestamped_val& operator=(value_type new_val) {
|
||||
assert(_lru_entry_ptr);
|
||||
timestamped_val& operator=(value_type new_val) {
|
||||
assert(_lru_entry_ptr);
|
||||
|
||||
_value = std::move(new_val);
|
||||
_loaded = loading_cache_clock_type::now();
|
||||
_lru_entry_ptr->cache_size() -= _size;
|
||||
_size = EntrySize()(_value);
|
||||
_lru_entry_ptr->cache_size() += _size;
|
||||
return *this;
|
||||
}
|
||||
_value = std::move(new_val);
|
||||
_loaded = loading_cache_clock_type::now();
|
||||
_lru_entry_ptr->owning_section_size() -= _size;
|
||||
_size = EntrySize()(_value);
|
||||
_lru_entry_ptr->owning_section_size() += _size;
|
||||
return *this;
|
||||
}
|
||||
|
||||
value_type& value() noexcept { return _value; }
|
||||
const value_type& value() const noexcept { return _value; }
|
||||
value_type& value() noexcept { return _value; }
|
||||
const value_type& value() const noexcept { return _value; }
|
||||
|
||||
static const timestamped_val& container_of(const value_type& value) {
|
||||
return *bi::get_parent_from_member(&value, ×tamped_val::_value);
|
||||
}
|
||||
static const timestamped_val& container_of(const value_type& value) {
|
||||
return *bi::get_parent_from_member(&value, ×tamped_val::_value);
|
||||
}
|
||||
|
||||
loading_cache_clock_type::time_point last_read() const noexcept {
|
||||
return _last_read;
|
||||
}
|
||||
loading_cache_clock_type::time_point last_read() const noexcept {
|
||||
return _last_read;
|
||||
}
|
||||
|
||||
loading_cache_clock_type::time_point loaded() const noexcept {
|
||||
return _loaded;
|
||||
}
|
||||
loading_cache_clock_type::time_point loaded() const noexcept {
|
||||
return _loaded;
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
return _size;
|
||||
}
|
||||
size_t size() const {
|
||||
return _size;
|
||||
}
|
||||
|
||||
bool ready() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
bool ready() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
|
||||
lru_entry* lru_entry_ptr() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
lru_entry* lru_entry_ptr() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
|
||||
private:
|
||||
void touch() noexcept {
|
||||
assert(_lru_entry_ptr);
|
||||
_last_read = loading_cache_clock_type::now();
|
||||
_lru_entry_ptr->touch();
|
||||
}
|
||||
private:
|
||||
void touch() noexcept {
|
||||
_last_read = loading_cache_clock_type::now();
|
||||
if (_lru_entry_ptr) {
|
||||
_lru_entry_ptr->touch();
|
||||
}
|
||||
}
|
||||
|
||||
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
|
||||
_lru_entry_ptr = lru_entry_ptr;
|
||||
}
|
||||
};
|
||||
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
|
||||
_lru_entry_ptr = lru_entry_ptr;
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
using loading_values_type = typename timestamped_val::loading_values_type;
|
||||
@@ -265,7 +266,7 @@ public:
|
||||
});
|
||||
}).then([this, k] (timestamped_val_ptr ts_val_ptr) {
|
||||
// check again since it could have already been inserted and initialized
|
||||
if (!ts_val_ptr->ready()) {
|
||||
if (!ts_val_ptr->ready() && !ts_val_ptr.orphaned()) {
|
||||
_logger.trace("{}: storing the value for the first time", k);
|
||||
|
||||
if (ts_val_ptr->size() > _max_size) {
|
||||
@@ -331,6 +332,11 @@ public:
|
||||
return set_find(k);
|
||||
}
|
||||
|
||||
// Removes all values matching a given predicate and values which are currently loading.
|
||||
// Guarantees that no values which match the predicate and whose loading was initiated
|
||||
// before this call will be present after this call (or appear at any time later).
|
||||
// The predicate may be invoked multiple times on the same value.
|
||||
// It must return the same result for a given value (it must be a pure function).
|
||||
template <typename Pred>
|
||||
void remove_if(Pred&& pred) {
|
||||
static_assert(std::is_same<bool, std::result_of_t<Pred(const value_type&)>>::value, "Bad Pred signature");
|
||||
@@ -344,15 +350,29 @@ public:
|
||||
|
||||
_unprivileged_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
|
||||
_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
|
||||
_loading_values.remove_if([&pred] (const timestamped_val& v) {
|
||||
return pred(v.value());
|
||||
});
|
||||
}
|
||||
|
||||
// Removes a given key from the cache.
|
||||
// The key is removed immediately.
|
||||
// After this, get_ptr() is guaranteed to reload the value before returning it.
|
||||
// As a consequence of the above, if there is a concurrent get_ptr() in progress with this,
|
||||
// its value will not populate the cache. It will still succeed.
|
||||
void remove(const Key& k) {
|
||||
remove_ts_value(set_find(k));
|
||||
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
|
||||
_loading_values.remove(k);
|
||||
}
|
||||
|
||||
// Removes a given key from the cache.
|
||||
// Same guarantees as with remove(key).
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
remove_ts_value(set_find(key, std::move(key_hasher_func), std::move(key_equal_func)));
|
||||
remove_ts_value(set_find(key, key_hasher_func, key_equal_func));
|
||||
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
|
||||
_loading_values.remove(key, key_hasher_func, key_equal_func);
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
@@ -361,9 +381,18 @@ public:
|
||||
|
||||
/// \brief returns the memory size the currently cached entries occupy according to the EntrySize predicate.
|
||||
size_t memory_footprint() const {
|
||||
return _current_size;
|
||||
return _unprivileged_section_size + _privileged_section_size;
|
||||
}
|
||||
|
||||
/// \brief returns the memory size the currently cached entries occupy in the privileged section according to the EntrySize predicate.
|
||||
size_t privileged_section_memory_footprint() const noexcept {
|
||||
return _privileged_section_size;
|
||||
}
|
||||
|
||||
/// \brief returns the memory size the currently cached entries occupy in the unprivileged section according to the EntrySize predicate.
|
||||
size_t unprivileged_section_memory_footprint() const noexcept {
|
||||
return _unprivileged_section_size;
|
||||
}
|
||||
private:
|
||||
void remove_ts_value(timestamped_val_ptr ts_ptr) {
|
||||
if (!ts_ptr) {
|
||||
@@ -419,16 +448,22 @@ private:
|
||||
}
|
||||
|
||||
if (lru_entry.touch_count() < SectionHitThreshold) {
|
||||
_logger.trace("Putting key {} into the unpriviledged section", lru_entry.key());
|
||||
_logger.trace("Putting key {} into the unprivileged section", lru_entry.key());
|
||||
_unprivileged_lru_list.push_front(lru_entry);
|
||||
lru_entry.inc_touch_count();
|
||||
} else {
|
||||
_logger.trace("Putting key {} into the priviledged section", lru_entry.key());
|
||||
_logger.trace("Putting key {} into the privileged section", lru_entry.key());
|
||||
_lru_list.push_front(lru_entry);
|
||||
|
||||
// Bump it up only once to avoid a wrap around
|
||||
if (lru_entry.touch_count() == SectionHitThreshold) {
|
||||
// This code will run only once, when a promotion
|
||||
// from unprivileged to privileged section happens.
|
||||
// Update section size bookkeeping.
|
||||
|
||||
lru_entry.owning_section_size() -= lru_entry.timestamped_value().size();
|
||||
lru_entry.inc_touch_count();
|
||||
lru_entry.owning_section_size() += lru_entry.timestamped_value().size();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -495,17 +530,44 @@ private:
|
||||
void shrink() {
|
||||
using namespace std::chrono;
|
||||
|
||||
while (_current_size >= _max_size && !_unprivileged_lru_list.empty()) {
|
||||
ts_value_lru_entry& lru_entry = *_unprivileged_lru_list.rbegin();
|
||||
_logger.trace("shrink(): {}: dropping the unpriviledged entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
|
||||
loading_cache::destroy_ts_value(&lru_entry);
|
||||
LoadingCacheStats::inc_unprivileged_on_cache_size_eviction();
|
||||
}
|
||||
|
||||
while (_current_size >= _max_size) {
|
||||
auto drop_privileged_entry = [&] {
|
||||
ts_value_lru_entry& lru_entry = *_lru_list.rbegin();
|
||||
_logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
|
||||
loading_cache::destroy_ts_value(&lru_entry);
|
||||
};
|
||||
|
||||
auto drop_unprivileged_entry = [&] {
|
||||
ts_value_lru_entry& lru_entry = *_unprivileged_lru_list.rbegin();
|
||||
_logger.trace("shrink(): {}: dropping the unprivileged entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
|
||||
loading_cache::destroy_ts_value(&lru_entry);
|
||||
LoadingCacheStats::inc_unprivileged_on_cache_size_eviction();
|
||||
};
|
||||
|
||||
// When cache entries need to be evicted due to a size restriction,
|
||||
// unprivileged section entries are evicted first.
|
||||
//
|
||||
// However, we make sure that the unprivileged section does not get
|
||||
// too small, because this could lead to starving the unprivileged section.
|
||||
// For example if the cache could store at most 50 entries and there are 49 entries in
|
||||
// privileged section, after adding 5 entries (that would go to unprivileged
|
||||
// section) 4 of them would get evicted and only the 5th one would stay.
|
||||
// This caused problems with BATCH statements where all prepared statements
|
||||
// in the batch have to stay in cache at the same time for the batch to correctly
|
||||
// execute.
|
||||
auto minimum_unprivileged_section_size = _max_size / 2;
|
||||
while (memory_footprint() >= _max_size && _unprivileged_section_size > minimum_unprivileged_section_size) {
|
||||
drop_unprivileged_entry();
|
||||
}
|
||||
|
||||
while (memory_footprint() >= _max_size && !_lru_list.empty()) {
|
||||
drop_privileged_entry();
|
||||
}
|
||||
|
||||
// If dropping entries from privileged section did not help,
|
||||
// we have to drop entries from unprivileged section,
|
||||
// going below minimum_unprivileged_section_size.
|
||||
while (memory_footprint() >= _max_size) {
|
||||
drop_unprivileged_entry();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -558,7 +620,8 @@ private:
|
||||
loading_values_type _loading_values;
|
||||
lru_list_type _lru_list; // list containing "privileged" section entries
|
||||
lru_list_type _unprivileged_lru_list; // list containing "unprivileged" section entries
|
||||
size_t _current_size = 0;
|
||||
size_t _privileged_section_size = 0;
|
||||
size_t _unprivileged_section_size = 0;
|
||||
size_t _max_size = 0;
|
||||
loading_cache_clock_type::duration _expiry;
|
||||
loading_cache_clock_type::duration _refresh;
|
||||
@@ -624,7 +687,7 @@ public:
|
||||
static_assert(SectionHitThreshold <= std::numeric_limits<typeof(_touch_count)>::max() / 2, "SectionHitThreshold value is too big");
|
||||
|
||||
_ts_val_ptr->set_anchor_back_reference(this);
|
||||
cache_size() += _ts_val_ptr->size();
|
||||
owning_section_size() += _ts_val_ptr->size();
|
||||
}
|
||||
|
||||
void inc_touch_count() noexcept {
|
||||
@@ -640,12 +703,12 @@ public:
|
||||
lru_list_type& lru_list = _parent.container_list(*this);
|
||||
lru_list.erase(lru_list.iterator_to(*this));
|
||||
}
|
||||
cache_size() -= _ts_val_ptr->size();
|
||||
owning_section_size() -= _ts_val_ptr->size();
|
||||
_ts_val_ptr->set_anchor_back_reference(nullptr);
|
||||
}
|
||||
|
||||
size_t& cache_size() noexcept {
|
||||
return _parent._current_size;
|
||||
size_t& owning_section_size() noexcept {
|
||||
return _touch_count <= SectionHitThreshold ? _parent._unprivileged_section_size : _parent._privileged_section_size;
|
||||
}
|
||||
|
||||
void touch() noexcept {
|
||||
|
||||
@@ -83,6 +83,10 @@ private:
|
||||
_val.emplace(std::move(new_val));
|
||||
}
|
||||
|
||||
bool orphaned() const {
|
||||
return !is_linked();
|
||||
}
|
||||
|
||||
shared_promise<>& loaded() {
|
||||
return _loaded;
|
||||
}
|
||||
@@ -95,7 +99,9 @@ private:
|
||||
: _parent(parent), _key(std::move(k)) {}
|
||||
|
||||
~entry() {
|
||||
_parent._set.erase(_parent._set.iterator_to(*this));
|
||||
if (is_linked()) {
|
||||
_parent._set.erase(_parent._set.iterator_to(*this));
|
||||
}
|
||||
Stats::inc_evictions();
|
||||
}
|
||||
|
||||
@@ -153,6 +159,18 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
// Returns the key this entry is associated with.
|
||||
// Valid if bool(*this).
|
||||
const key_type& key() const {
|
||||
return _e->key();
|
||||
}
|
||||
|
||||
// Returns true iff the entry is not linked in the set.
|
||||
// Call only when bool(*this).
|
||||
bool orphaned() const {
|
||||
return _e->orphaned();
|
||||
}
|
||||
|
||||
friend class loading_shared_values;
|
||||
friend std::ostream& operator<<(std::ostream& os, const entry_ptr& ep) {
|
||||
return os << ep._e.get();
|
||||
@@ -265,6 +283,50 @@ public:
|
||||
return entry_ptr(it->shared_from_this());
|
||||
};
|
||||
|
||||
// Removes a given key from this container.
|
||||
// If a given key is currently loading, the loading will succeed and will return entry_ptr
|
||||
// to the caller, but the value will not be present in the container. It will be removed
|
||||
// when the last entry_ptr dies, as usual.
|
||||
//
|
||||
// Post-condition: !find(key)
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) {
|
||||
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
|
||||
if (it != _set.end()) {
|
||||
_set.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
// Removes a given key from this container.
|
||||
// If a given key is currently loading, the loading will succeed and will return entry_ptr
|
||||
// to the caller, but the value will not be present in the container. It will be removed
|
||||
// when the last entry_ptr dies, as usual.
|
||||
//
|
||||
// Post-condition: !find(key)
|
||||
template<typename KeyType>
|
||||
void remove(const KeyType& key) {
|
||||
remove(key, Hash(), EqualPred());
|
||||
}
|
||||
|
||||
// Removes all values which match a given predicate or are currently loading.
|
||||
// Guarantees that no values which match the predicate and whose loading was initiated
|
||||
// before this call will be present after this call (or appear at any time later).
|
||||
// Same effects as if remove(e.key()) was called on each matching entry.
|
||||
template<typename Pred>
|
||||
requires std::is_invocable_r_v<bool, Pred, const Tp&>
|
||||
void remove_if(const Pred& pred) {
|
||||
auto it = _set.begin();
|
||||
while (it != _set.end()) {
|
||||
if (!it->ready() || pred(it->value())) {
|
||||
auto next = std::next(it);
|
||||
_set.erase(it);
|
||||
it = next;
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// keep the default non-templated overloads to ease on the compiler for specifications
|
||||
// that do not require the templated find().
|
||||
entry_ptr find(const key_type& key) noexcept {
|
||||
|
||||
@@ -584,6 +584,10 @@ static constexpr auto max_used_space_ratio_for_compaction = 0.85;
|
||||
static constexpr size_t max_used_space_for_compaction = segment_size * max_used_space_ratio_for_compaction;
|
||||
static constexpr size_t min_free_space_for_compaction = segment_size - max_used_space_for_compaction;
|
||||
|
||||
struct [[gnu::packed]] non_lsa_object_cookie {
|
||||
uint64_t value = 0xbadcaffe;
|
||||
};
|
||||
|
||||
static_assert(min_free_space_for_compaction >= max_managed_object_size,
|
||||
"Segments which cannot fit max_managed_object_size must not be considered compactible for the sake of forward progress of compaction");
|
||||
|
||||
@@ -827,9 +831,13 @@ public:
|
||||
void clear_allocation_failure_flag() { _allocation_failure_flag = false; }
|
||||
bool allocation_failure_flag() { return _allocation_failure_flag; }
|
||||
void refill_emergency_reserve();
|
||||
void update_non_lsa_memory_in_use(ssize_t n) {
|
||||
void add_non_lsa_memory_in_use(size_t n) {
|
||||
_non_lsa_memory_in_use += n;
|
||||
}
|
||||
void subtract_non_lsa_memory_in_use(size_t n) {
|
||||
assert(_non_lsa_memory_in_use >= n);
|
||||
_non_lsa_memory_in_use -= n;
|
||||
}
|
||||
size_t non_lsa_memory_in_use() const {
|
||||
return _non_lsa_memory_in_use;
|
||||
}
|
||||
@@ -1630,17 +1638,18 @@ public:
|
||||
memory::on_alloc_point();
|
||||
shard_segment_pool.on_memory_allocation(size);
|
||||
if (size > max_managed_object_size) {
|
||||
auto ptr = standard_allocator().alloc(migrator, size, alignment);
|
||||
auto ptr = standard_allocator().alloc(migrator, size + sizeof(non_lsa_object_cookie), alignment);
|
||||
// This isn't very acurrate, the correct free_space value would be
|
||||
// malloc_usable_size(ptr) - size, but there is no way to get
|
||||
// the exact object size at free.
|
||||
auto allocated_size = malloc_usable_size(ptr);
|
||||
new ((char*)ptr + allocated_size - sizeof(non_lsa_object_cookie)) non_lsa_object_cookie();
|
||||
_non_lsa_occupancy += occupancy_stats(0, allocated_size);
|
||||
if (_group) {
|
||||
_evictable_space += allocated_size;
|
||||
_group->increase_usage(_heap_handle, allocated_size);
|
||||
}
|
||||
shard_segment_pool.update_non_lsa_memory_in_use(allocated_size);
|
||||
shard_segment_pool.add_non_lsa_memory_in_use(allocated_size);
|
||||
return ptr;
|
||||
} else {
|
||||
auto ptr = alloc_small(object_descriptor(migrator), (segment::size_type) size, alignment);
|
||||
@@ -1652,12 +1661,14 @@ public:
|
||||
private:
|
||||
void on_non_lsa_free(void* obj) noexcept {
|
||||
auto allocated_size = malloc_usable_size(obj);
|
||||
auto cookie = (non_lsa_object_cookie*)((char*)obj + allocated_size) - 1;
|
||||
assert(cookie->value == non_lsa_object_cookie().value);
|
||||
_non_lsa_occupancy -= occupancy_stats(0, allocated_size);
|
||||
if (_group) {
|
||||
_evictable_space -= allocated_size;
|
||||
_group->decrease_usage(_heap_handle, allocated_size);
|
||||
}
|
||||
shard_segment_pool.update_non_lsa_memory_in_use(-allocated_size);
|
||||
shard_segment_pool.subtract_non_lsa_memory_in_use(allocated_size);
|
||||
}
|
||||
public:
|
||||
virtual void free(void* obj) noexcept override {
|
||||
|
||||
@@ -60,6 +60,9 @@ private:
|
||||
throw std::out_of_range("chunked_managed_vector out of range access");
|
||||
}
|
||||
}
|
||||
chunk_ptr& back_chunk() {
|
||||
return _chunks[_size / max_chunk_capacity()];
|
||||
}
|
||||
static void migrate(T* begin, T* end, managed_vector<T>& result);
|
||||
public:
|
||||
using value_type = T;
|
||||
@@ -106,24 +109,24 @@ public:
|
||||
|
||||
void push_back(const T& x) {
|
||||
reserve_for_push_back();
|
||||
_chunks.back().emplace_back(x);
|
||||
back_chunk().emplace_back(x);
|
||||
++_size;
|
||||
}
|
||||
void push_back(T&& x) {
|
||||
reserve_for_push_back();
|
||||
_chunks.back().emplace_back(std::move(x));
|
||||
back_chunk().emplace_back(std::move(x));
|
||||
++_size;
|
||||
}
|
||||
template <typename... Args>
|
||||
T& emplace_back(Args&&... args) {
|
||||
reserve_for_push_back();
|
||||
auto& ret = _chunks.back().emplace_back(std::forward<Args>(args)...);
|
||||
auto& ret = back_chunk().emplace_back(std::forward<Args>(args)...);
|
||||
++_size;
|
||||
return ret;
|
||||
}
|
||||
void pop_back() {
|
||||
--_size;
|
||||
_chunks.back().pop_back();
|
||||
back_chunk().pop_back();
|
||||
}
|
||||
const T& back() const {
|
||||
return *addr(_size - 1);
|
||||
@@ -381,7 +384,9 @@ chunked_managed_vector<T>::make_room(size_t n, bool stop_after_one) {
|
||||
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
|
||||
// FIXME: realloc? maybe not worth the complication; only works for PODs
|
||||
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk);
|
||||
if (_size > _capacity - last_chunk_capacity) {
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk);
|
||||
}
|
||||
_chunks.back() = std::move(new_last_chunk);
|
||||
_capacity += capacity_increase;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user