Compare commits
79 Commits
copilot/ad
...
scylla-5.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b177dacd36 | ||
|
|
283a722923 | ||
|
|
522d0a81e7 | ||
|
|
cd13911db4 | ||
|
|
32423ebc38 | ||
|
|
97054ee691 | ||
|
|
34085c364f | ||
|
|
323521f4c8 | ||
|
|
1ad59d6a7b | ||
|
|
d3045df9c9 | ||
|
|
be48b7aa8b | ||
|
|
3c4688bcfa | ||
|
|
cc22021876 | ||
|
|
c9e79cb4a3 | ||
|
|
f28542a71e | ||
|
|
527a75a4c0 | ||
|
|
df00f8fcfb | ||
|
|
41a00c744f | ||
|
|
2d7b6cd702 | ||
|
|
ff79228178 | ||
|
|
1803124cc6 | ||
|
|
6fcbf66bfb | ||
|
|
e9a3dee234 | ||
|
|
279cd44c7f | ||
|
|
c99f768381 | ||
|
|
89a540d54a | ||
|
|
338edcc02e | ||
|
|
a8eb5164b2 | ||
|
|
9accb44f9c | ||
|
|
8878007106 | ||
|
|
9da666e778 | ||
|
|
aca355dec1 | ||
|
|
efbb2efd3f | ||
|
|
44dc5c4a1d | ||
|
|
6b34ba3a4f | ||
|
|
f1e25cb4a6 | ||
|
|
c9798746ae | ||
|
|
7f70ffc5ce | ||
|
|
551636ec89 | ||
|
|
e1130a01e7 | ||
|
|
b0233cb7c5 | ||
|
|
e480c5bf4d | ||
|
|
7d90f7e93f | ||
|
|
3e6e8579c6 | ||
|
|
3e98e17d18 | ||
|
|
a214f8cf6e | ||
|
|
e8b92fe34d | ||
|
|
fa479c84ac | ||
|
|
40c26dd2c5 | ||
|
|
2c6f069fd1 | ||
|
|
e27dff0c50 | ||
|
|
3f03260ffb | ||
|
|
1315135fca | ||
|
|
f92622e0de | ||
|
|
3bca608db5 | ||
|
|
a93b72d5dd | ||
|
|
d58ca2edbd | ||
|
|
75740ace2a | ||
|
|
d7a1bf6331 | ||
|
|
bbd7d657cc | ||
|
|
f5bf4c81d1 | ||
|
|
02e8336659 | ||
|
|
601812e11b | ||
|
|
ea466320d2 | ||
|
|
25ea831a15 | ||
|
|
8648c79c9e | ||
|
|
7ae4d0e6f8 | ||
|
|
f3564db941 | ||
|
|
97caf12836 | ||
|
|
839d9ef41a | ||
|
|
782bd50f92 | ||
|
|
0a4d971b4a | ||
|
|
22562f767f | ||
|
|
eb80dd1db5 | ||
|
|
51d699ee21 | ||
|
|
83a33bff8c | ||
|
|
273563b9ad | ||
|
|
891990ec09 | ||
|
|
da0cd2b107 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.0.dev
|
||||
VERSION=5.0.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2173,6 +2173,9 @@ static attrs_to_get calculate_attrs_to_get(const rjson::value& req, std::unorder
|
||||
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
|
||||
attribute_path_map_add("AttributesToGet", ret, it->GetString());
|
||||
}
|
||||
if (ret.empty()) {
|
||||
throw api_error::validation("Empty AttributesToGet is not allowed. Consider using Select=COUNT instead.");
|
||||
}
|
||||
return ret;
|
||||
} else if (has_projection_expression) {
|
||||
const rjson::value& projection_expression = req["ProjectionExpression"];
|
||||
@@ -2577,8 +2580,8 @@ static bool hierarchy_actions(
|
||||
// attr member so we can use add()
|
||||
rjson::add_with_string_name(v, attr, std::move(*newv));
|
||||
} else {
|
||||
throw api_error::validation(format("Can't remove document path {} - not present in item",
|
||||
subh.get_value()._path));
|
||||
// Removing a.b when a is a map but a.b doesn't exist
|
||||
// is silently ignored. It's not considered an error.
|
||||
}
|
||||
} else {
|
||||
throw api_error::validation(format("UpdateExpression: document paths not valid for this item:{}", h));
|
||||
|
||||
@@ -116,9 +116,6 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
|
||||
future<executor::request_return_type> executor::describe_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.describe_time_to_live++;
|
||||
if (!_proxy.data_dictionary().features().cluster_supports_alternator_ttl()) {
|
||||
co_return api_error::unknown_operation("DescribeTimeToLive not yet supported. Experimental support is available if the 'alternator_ttl' experimental feature is enabled on all nodes.");
|
||||
}
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
|
||||
rjson::value desc = rjson::empty_object();
|
||||
|
||||
@@ -624,7 +624,7 @@
|
||||
},
|
||||
{
|
||||
"name":"kn",
|
||||
"description":"Comma seperated keyspaces name to snapshot",
|
||||
"description":"Keyspace(s) to snapshot. Multiple keyspaces can be provided using a comma-separated list. If omitted, snapshot all keyspaces.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
@@ -632,7 +632,7 @@
|
||||
},
|
||||
{
|
||||
"name":"cf",
|
||||
"description":"the column family to snapshot",
|
||||
"description":"Table(s) to snapshot. Multiple tables (in a single keyspace) can be provided using a comma-separated list. If omitted, snapshot all tables in the given keyspace(s).",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -669,19 +669,16 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
});
|
||||
}));
|
||||
|
||||
ss::force_keyspace_flush.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
ss::force_keyspace_flush.set(r, [&ctx](std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
auto column_families = parse_tables(keyspace, ctx, req->query_parameters, "cf");
|
||||
auto &db = ctx.db.local();
|
||||
if (column_families.empty()) {
|
||||
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
|
||||
co_await db.flush_on_all(keyspace);
|
||||
} else {
|
||||
co_await db.flush_on_all(keyspace, std::move(column_families));
|
||||
}
|
||||
return ctx.db.invoke_on_all([keyspace, column_families] (replica::database& db) {
|
||||
return parallel_for_each(column_families, [&db, keyspace](const sstring& cf) mutable {
|
||||
return db.find_column_family(keyspace, cf).flush();
|
||||
});
|
||||
}).then([]{
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
co_return json_void();
|
||||
});
|
||||
|
||||
|
||||
@@ -1284,40 +1281,46 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
});
|
||||
});
|
||||
|
||||
ss::take_snapshot.set(r, [&snap_ctl](std::unique_ptr<request> req) {
|
||||
apilog.debug("take_snapshot: {}", req->query_parameters);
|
||||
ss::take_snapshot.set(r, [&snap_ctl](std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
apilog.info("take_snapshot: {}", req->query_parameters);
|
||||
auto tag = req->get_query_param("tag");
|
||||
auto column_families = split(req->get_query_param("cf"), ",");
|
||||
auto sfopt = req->get_query_param("sf");
|
||||
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
|
||||
|
||||
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
|
||||
|
||||
auto resp = make_ready_future<>();
|
||||
if (column_families.empty()) {
|
||||
resp = snap_ctl.local().take_snapshot(tag, keynames, sf);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
try {
|
||||
if (column_families.empty()) {
|
||||
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
}
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
|
||||
}
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
resp = snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
|
||||
co_return json_void();
|
||||
} catch (...) {
|
||||
apilog.error("take_snapshot failed: {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
return resp.then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
});
|
||||
|
||||
ss::del_snapshot.set(r, [&snap_ctl](std::unique_ptr<request> req) {
|
||||
ss::del_snapshot.set(r, [&snap_ctl](std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
apilog.info("del_snapshot: {}", req->query_parameters);
|
||||
auto tag = req->get_query_param("tag");
|
||||
auto column_family = req->get_query_param("cf");
|
||||
|
||||
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
|
||||
return snap_ctl.local().clear_snapshot(tag, keynames, column_family).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
try {
|
||||
co_await snap_ctl.local().clear_snapshot(tag, keynames, column_family);
|
||||
co_return json_void();
|
||||
} catch (...) {
|
||||
apilog.error("del_snapshot failed: {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
|
||||
ss::true_snapshots_size.set(r, [&snap_ctl](std::unique_ptr<request> req) {
|
||||
|
||||
@@ -87,19 +87,24 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
|
||||
// prefer expiring cells.
|
||||
return left.is_live_and_has_ttl() ? std::strong_ordering::greater : std::strong_ordering::less;
|
||||
}
|
||||
if (left.is_live_and_has_ttl() && left.expiry() != right.expiry()) {
|
||||
return left.expiry() <=> right.expiry();
|
||||
if (left.is_live_and_has_ttl()) {
|
||||
if (left.expiry() != right.expiry()) {
|
||||
return left.expiry() <=> right.expiry();
|
||||
} else {
|
||||
// prefer the cell that was written later,
|
||||
// so it survives longer after it expires, until purged.
|
||||
return right.ttl() <=> left.ttl();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Both are deleted
|
||||
if (left.deletion_time() != right.deletion_time()) {
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
|
||||
}
|
||||
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
|
||||
}
|
||||
return std::strong_ordering::equal;
|
||||
}
|
||||
|
||||
20
cdc/log.cc
20
cdc/log.cc
@@ -59,7 +59,7 @@ using namespace std::chrono_literals;
|
||||
logging::logger cdc_log("cdc");
|
||||
|
||||
namespace cdc {
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {}, schema_ptr = nullptr);
|
||||
}
|
||||
|
||||
static constexpr auto cdc_group_name = "cdc";
|
||||
@@ -206,7 +206,7 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt, log_schema);
|
||||
|
||||
auto log_mut = log_schema
|
||||
? db::schema_tables::make_update_table_mutations(db, keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
|
||||
@@ -484,7 +484,7 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
|
||||
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
|
||||
}
|
||||
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid, schema_ptr old) {
|
||||
schema_builder b(s.ks_name(), log_name(s.cf_name()));
|
||||
b.with_partitioner("com.scylladb.dht.CDCPartitioner");
|
||||
b.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
@@ -571,6 +571,20 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
b.set_uuid(*uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* #10473 - if we are redefining the log table, we need to ensure any dropped
|
||||
* columns are registered in "dropped_columns" table, otherwise clients will not
|
||||
* be able to read data older than now.
|
||||
*/
|
||||
if (old) {
|
||||
// not super efficient, but we don't do this often.
|
||||
for (auto& col : old->all_columns()) {
|
||||
if (!b.has_column({col.name(), col.name_as_text() })) {
|
||||
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return b.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -1281,6 +1281,13 @@ private:
|
||||
|
||||
const auto& key = _validator.previous_partition_key();
|
||||
|
||||
if (_validator.current_tombstone()) {
|
||||
throw compaction_aborted_exception(
|
||||
_schema->ks_name(),
|
||||
_schema->cf_name(),
|
||||
"scrub compaction cannot handle invalid fragments with an active range tombstone change");
|
||||
}
|
||||
|
||||
// If the unexpected fragment is a partition end, we just drop it.
|
||||
// The only case a partition end is invalid is when it comes after
|
||||
// another partition end, and we can just drop it in that case.
|
||||
|
||||
@@ -317,9 +317,9 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
|
||||
|
||||
auto job_ptr = std::make_unique<noncopyable_function<future<>(sstables::compaction_data&)>>(std::move(job));
|
||||
|
||||
task->compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, &job = *job_ptr] () mutable {
|
||||
// take read lock for table, so major compaction and resharding can't proceed in parallel.
|
||||
return with_lock(task->compaction_state.lock.for_read(), [this, task, &job] () mutable {
|
||||
task->compaction_done = with_semaphore(_custom_jobs_sem, 1, [this, task, &job = *job_ptr] () mutable {
|
||||
// We don't need to take task->compaction_state.lock.for_read() as it only serializes minor and major
|
||||
|
||||
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
|
||||
if (task->stopping) {
|
||||
throw sstables::compaction_stopped_exception(task->compacting_table->schema()->ks_name(), task->compacting_table->schema()->cf_name(),
|
||||
@@ -335,7 +335,6 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
|
||||
// no need to register shared sstables because they're excluded from non-resharding
|
||||
// compaction and some of them may not even belong to current shard.
|
||||
return job(task->compaction_data);
|
||||
});
|
||||
}).then_wrapped([this, task, job_ptr = std::move(job_ptr), type] (future<> f) {
|
||||
_stats.active_tasks--;
|
||||
_tasks.remove(task);
|
||||
@@ -353,32 +352,50 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t)
|
||||
: _cm(cm)
|
||||
, _table(t)
|
||||
, _compaction_state(cm.get_compaction_state(_table))
|
||||
, _holder(_compaction_state.gate.hold())
|
||||
{
|
||||
_compaction_state.compaction_disabled_counter++;
|
||||
cmlog.debug("Temporarily disabled compaction for {}.{}. compaction_disabled_counter={}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), _compaction_state.compaction_disabled_counter);
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept
|
||||
: _cm(o._cm)
|
||||
, _table(std::exchange(o._table, nullptr))
|
||||
, _compaction_state(o._compaction_state)
|
||||
, _holder(std::move(o._holder))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_reenabler::~compaction_reenabler() {
|
||||
// submit compaction request if we're the last holder of the gate which is still opened.
|
||||
if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) {
|
||||
cmlog.debug("Reenabling compaction for {}.{}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name());
|
||||
try {
|
||||
_cm.submit(_table);
|
||||
} catch (...) {
|
||||
cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_reenabler>
|
||||
compaction_manager::stop_and_disable_compaction(replica::table* t) {
|
||||
compaction_reenabler cre(*this, t);
|
||||
co_await stop_ongoing_compactions("user-triggered operation", t);
|
||||
co_return cre;
|
||||
}
|
||||
|
||||
future<>
|
||||
compaction_manager::run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func) {
|
||||
auto& c_state = _compaction_state[t];
|
||||
auto holder = c_state.gate.hold();
|
||||
compaction_reenabler cre = co_await stop_and_disable_compaction(t);
|
||||
|
||||
c_state.compaction_disabled_counter++;
|
||||
|
||||
std::exception_ptr err;
|
||||
try {
|
||||
co_await stop_ongoing_compactions("user-triggered operation", t);
|
||||
co_await func();
|
||||
} catch (...) {
|
||||
err = std::current_exception();
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
assert(_compaction_state.contains(t));
|
||||
#endif
|
||||
// submit compaction request if we're the last holder of the gate which is still opened.
|
||||
if (--c_state.compaction_disabled_counter == 0 && !c_state.gate.is_closed()) {
|
||||
submit(t);
|
||||
}
|
||||
if (err) {
|
||||
std::rethrow_exception(err);
|
||||
}
|
||||
co_return;
|
||||
co_await func();
|
||||
}
|
||||
|
||||
void compaction_manager::task::setup_new_compaction() {
|
||||
@@ -742,6 +759,7 @@ future<> compaction_manager::perform_offstrategy(replica::table* t) {
|
||||
_stats.active_tasks++;
|
||||
task->setup_new_compaction();
|
||||
|
||||
return with_scheduling_group(_maintenance_sg.cpu, [this, task, t] {
|
||||
return t->run_offstrategy_compaction(task->compaction_data).then_wrapped([this, task, schema = t->schema()] (future<> f) mutable {
|
||||
_stats.active_tasks--;
|
||||
task->finish_compaction();
|
||||
@@ -763,6 +781,7 @@ future<> compaction_manager::perform_offstrategy(replica::table* t) {
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).finally([this, task] {
|
||||
@@ -810,7 +829,8 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
auto run_identifier = sst->run_identifier();
|
||||
auto sstable_set_snapshot = can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), _maintenance_sg.io,
|
||||
// FIXME: this compaction should run with maintenance priority.
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
|
||||
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
||||
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
@@ -819,8 +839,9 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
};
|
||||
|
||||
auto maintenance_permit = co_await seastar::get_units(_maintenance_ops_sem, 1);
|
||||
// Take write lock for table to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard.
|
||||
auto write_lock_holder = co_await _compaction_state[&t].lock.hold_write_lock();
|
||||
// FIXME: acquiring the read lock is not needed after acquiring the _maintenance_ops_sem
|
||||
// only major compaction needs to acquire the write lock to synchronize with regular compaction.
|
||||
auto lock_holder = co_await _compaction_state[&t].lock.hold_read_lock();
|
||||
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
@@ -852,7 +873,7 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
};
|
||||
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
completed = co_await with_scheduling_group(_maintenance_sg.cpu, std::ref(perform_rewrite));
|
||||
completed = co_await with_scheduling_group(_compaction_controller.sg(), std::ref(perform_rewrite));
|
||||
} while (!completed);
|
||||
};
|
||||
|
||||
|
||||
@@ -147,6 +147,8 @@ private:
|
||||
// If the operation must be serialized with regular, then the per-table write lock must be taken.
|
||||
seastar::named_semaphore _maintenance_ops_sem = {1, named_semaphore_exception_factory{"maintenance operation"}};
|
||||
|
||||
seastar::named_semaphore _custom_jobs_sem = {1, named_semaphore_exception_factory{"custom jobs"}};
|
||||
|
||||
std::function<void()> compaction_submission_callback();
|
||||
// all registered tables are reevaluated at a constant interval.
|
||||
// Submission is a NO-OP when there's nothing to do, so it's fine to call it regularly.
|
||||
@@ -269,6 +271,31 @@ public:
|
||||
// parameter job is a function that will carry the operation
|
||||
future<> run_custom_job(replica::table* t, sstables::compaction_type type, noncopyable_function<future<>(sstables::compaction_data&)> job);
|
||||
|
||||
class compaction_reenabler {
|
||||
compaction_manager& _cm;
|
||||
replica::table* _table;
|
||||
compaction_state& _compaction_state;
|
||||
gate::holder _holder;
|
||||
|
||||
public:
|
||||
compaction_reenabler(compaction_manager&, replica::table*);
|
||||
compaction_reenabler(compaction_reenabler&&) noexcept;
|
||||
|
||||
~compaction_reenabler();
|
||||
|
||||
replica::table* compacting_table() const noexcept {
|
||||
return _table;
|
||||
}
|
||||
|
||||
const compaction_state& compaction_state() const noexcept {
|
||||
return _compaction_state;
|
||||
}
|
||||
};
|
||||
|
||||
// Disable compaction temporarily for a table t.
|
||||
// Caller should call the compaction_reenabler::reenable
|
||||
future<compaction_reenabler> stop_and_disable_compaction(replica::table* t);
|
||||
|
||||
// Run a function with compaction temporarily disabled for a table T.
|
||||
future<> run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func);
|
||||
|
||||
|
||||
@@ -69,7 +69,11 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(tabl
|
||||
}
|
||||
|
||||
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
|
||||
if (removed.empty() || added.empty()) {
|
||||
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
|
||||
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
|
||||
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
|
||||
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
|
||||
if (removed.empty() || added.empty() || !_last_compacted_keys) {
|
||||
return;
|
||||
}
|
||||
auto min_level = std::numeric_limits<uint32_t>::max();
|
||||
|
||||
@@ -217,6 +217,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
|
||||
auto compaction_time = gc_clock::now();
|
||||
|
||||
if (candidates.empty()) {
|
||||
_estimated_remaining_tasks = 0;
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
|
||||
@@ -81,9 +81,7 @@ public:
|
||||
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0;
|
||||
|
||||
virtual bool depends_on_keyspace(const seastar::sstring& ks_name) const = 0;
|
||||
|
||||
virtual bool depends_on_column_family(const seastar::sstring& cf_name) const = 0;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;
|
||||
|
||||
virtual seastar::shared_ptr<const metadata> get_result_metadata() const = 0;
|
||||
|
||||
|
||||
@@ -103,7 +103,13 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
|
||||
if (!col_type->is_map()) {
|
||||
throw exceptions::invalid_request_exception(format("subscripting non-map column {}", cdef->name_as_text()));
|
||||
}
|
||||
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[data.sel.index_of(*cdef)]));
|
||||
int32_t index = data.sel.index_of(*cdef);
|
||||
if (index == -1) {
|
||||
throw std::runtime_error(
|
||||
format("Column definition {} does not match any column in the query selection",
|
||||
cdef->name_as_text()));
|
||||
}
|
||||
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[index]));
|
||||
const auto& data_map = value_cast<map_type_impl::native_type>(deserialized);
|
||||
const auto key = evaluate(*col.sub, options);
|
||||
auto&& key_type = col_type->name_comparator();
|
||||
@@ -121,8 +127,16 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
|
||||
case column_kind::clustering_key:
|
||||
return managed_bytes(data.clustering_key[cdef->id]);
|
||||
case column_kind::static_column:
|
||||
case column_kind::regular_column:
|
||||
return managed_bytes_opt(data.other_columns[data.sel.index_of(*cdef)]);
|
||||
[[fallthrough]];
|
||||
case column_kind::regular_column: {
|
||||
int32_t index = data.sel.index_of(*cdef);
|
||||
if (index == -1) {
|
||||
throw std::runtime_error(
|
||||
format("Column definition {} does not match any column in the query selection",
|
||||
cdef->name_as_text()));
|
||||
}
|
||||
return managed_bytes_opt(data.other_columns[index]);
|
||||
}
|
||||
default:
|
||||
throw exceptions::unsupported_operation_exception("Unknown column kind");
|
||||
}
|
||||
|
||||
@@ -953,7 +953,7 @@ bool query_processor::migration_subscriber::should_invalidate(
|
||||
sstring ks_name,
|
||||
std::optional<sstring> cf_name,
|
||||
::shared_ptr<cql_statement> statement) {
|
||||
return statement->depends_on_keyspace(ks_name) && (!cf_name || statement->depends_on_column_family(*cf_name));
|
||||
return statement->depends_on(ks_name, cf_name);
|
||||
}
|
||||
|
||||
future<> query_processor::query_internal(
|
||||
|
||||
@@ -514,7 +514,7 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
|
||||
}
|
||||
|
||||
if (!_nonprimary_key_restrictions->empty()) {
|
||||
if (_has_queriable_regular_index) {
|
||||
if (_has_queriable_regular_index && _partition_range_is_simple) {
|
||||
_uses_secondary_indexing = true;
|
||||
} else if (!allow_filtering) {
|
||||
throw exceptions::invalid_request_exception("Cannot execute this query as it might involve data filtering and "
|
||||
|
||||
@@ -165,7 +165,7 @@ public:
|
||||
|
||||
template<typename RowComparator>
|
||||
void sort(const RowComparator& cmp) {
|
||||
std::sort(_rows.begin(), _rows.end(), std::ref(cmp));
|
||||
std::sort(_rows.begin(), _rows.end(), cmp);
|
||||
}
|
||||
|
||||
metadata& get_metadata();
|
||||
|
||||
@@ -18,13 +18,7 @@ uint32_t cql3::statements::authentication_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool cql3::statements::authentication_statement::depends_on_keyspace(
|
||||
const sstring& ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool cql3::statements::authentication_statement::depends_on_column_family(
|
||||
const sstring& cf_name) const {
|
||||
bool cql3::statements::authentication_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -27,9 +27,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -20,13 +20,7 @@ uint32_t cql3::statements::authorization_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool cql3::statements::authorization_statement::depends_on_keyspace(
|
||||
const sstring& ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool cql3::statements::authorization_statement::depends_on_column_family(
|
||||
const sstring& cf_name) const {
|
||||
bool cql3::statements::authorization_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -31,9 +31,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -70,14 +70,9 @@ batch_statement::batch_statement(type type_,
|
||||
{
|
||||
}
|
||||
|
||||
bool batch_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
bool batch_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool batch_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
{
|
||||
return false;
|
||||
return boost::algorithm::any_of(_statements, [&ks_name, &cf_name] (auto&& s) { return s.statement->depends_on(ks_name, cf_name); });
|
||||
}
|
||||
|
||||
uint32_t batch_statement::get_bound_terms() const
|
||||
|
||||
@@ -88,9 +88,7 @@ public:
|
||||
std::unique_ptr<attributes> attrs,
|
||||
cql_stats& stats);
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
|
||||
@@ -539,12 +539,8 @@ modification_statement::validate(query_processor&, const service::client_state&
|
||||
}
|
||||
}
|
||||
|
||||
bool modification_statement::depends_on_keyspace(const sstring& ks_name) const {
|
||||
return keyspace() == ks_name;
|
||||
}
|
||||
|
||||
bool modification_statement::depends_on_column_family(const sstring& cf_name) const {
|
||||
return column_family() == cf_name;
|
||||
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
|
||||
}
|
||||
|
||||
void modification_statement::add_operation(::shared_ptr<operation> op) {
|
||||
|
||||
@@ -137,9 +137,7 @@ public:
|
||||
// Validate before execute, using client state and current schema
|
||||
void validate(query_processor&, const service::client_state& state) const override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
void add_operation(::shared_ptr<operation> op);
|
||||
|
||||
|
||||
@@ -45,12 +45,7 @@ future<> schema_altering_statement::grant_permissions_to_creator(const service::
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool schema_altering_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool schema_altering_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool schema_altering_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -53,9 +53,7 @@ protected:
|
||||
*/
|
||||
virtual future<> grant_permissions_to_creator(const service::client_state&) const;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
|
||||
@@ -167,12 +167,8 @@ void select_statement::validate(query_processor&, const service::client_state& s
|
||||
// Nothing to do, all validation has been done by raw_statemet::prepare()
|
||||
}
|
||||
|
||||
bool select_statement::depends_on_keyspace(const sstring& ks_name) const {
|
||||
return keyspace() == ks_name;
|
||||
}
|
||||
|
||||
bool select_statement::depends_on_column_family(const sstring& cf_name) const {
|
||||
return column_family() == cf_name;
|
||||
bool select_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
|
||||
}
|
||||
|
||||
const sstring& select_statement::keyspace() const {
|
||||
|
||||
@@ -100,8 +100,7 @@ public:
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
virtual void validate(query_processor&, const service::client_state& state) const override;
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor& qp,
|
||||
service::query_state& state, const query_options& options) const override;
|
||||
|
||||
@@ -17,13 +17,7 @@ uint32_t service_level_statement::get_bound_terms() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool service_level_statement::depends_on_keyspace(
|
||||
const sstring &ks_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool service_level_statement::depends_on_column_family(
|
||||
const sstring &cf_name) const {
|
||||
bool service_level_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -43,9 +43,7 @@ public:
|
||||
|
||||
uint32_t get_bound_terms() const override;
|
||||
|
||||
bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -39,12 +39,7 @@ std::unique_ptr<prepared_statement> truncate_statement::prepare(data_dictionary:
|
||||
return std::make_unique<prepared_statement>(::make_shared<truncate_statement>(*this));
|
||||
}
|
||||
|
||||
bool truncate_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool truncate_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool truncate_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -30,9 +30,7 @@ public:
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
|
||||
|
||||
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -46,12 +46,7 @@ std::unique_ptr<prepared_statement> use_statement::prepare(data_dictionary::data
|
||||
|
||||
}
|
||||
|
||||
bool use_statement::depends_on_keyspace(const sstring& ks_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool use_statement::depends_on_column_family(const sstring& cf_name) const
|
||||
bool use_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -31,9 +31,7 @@ public:
|
||||
|
||||
virtual uint32_t get_bound_terms() const override;
|
||||
|
||||
virtual bool depends_on_keyspace(const seastar::sstring& ks_name) const override;
|
||||
|
||||
virtual bool depends_on_column_family(const seastar::sstring& cf_name) const override;
|
||||
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
|
||||
|
||||
virtual seastar::future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
23
db/config.cc
23
db/config.cc
@@ -65,6 +65,25 @@ hinted_handoff_enabled_to_json(const db::config::hinted_handoff_enabled_type& h)
|
||||
return value_to_json(h.to_configuration_string());
|
||||
}
|
||||
|
||||
// Convert a value that can be printed with operator<<, or a vector of
|
||||
// such values, to JSON. An example is enum_option<T>, because enum_option<T>
|
||||
// has a operator<<.
|
||||
template <typename T>
|
||||
static json::json_return_type
|
||||
printable_to_json(const T& e) {
|
||||
return value_to_json(format("{}", e));
|
||||
}
|
||||
template <typename T>
|
||||
static json::json_return_type
|
||||
printable_vector_to_json(const std::vector<T>& e) {
|
||||
std::vector<sstring> converted;
|
||||
converted.reserve(e.size());
|
||||
for (const auto& option : e) {
|
||||
converted.push_back(format("{}", option));
|
||||
}
|
||||
return value_to_json(converted);
|
||||
}
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<bool> = config_type("bool", value_to_json<bool>);
|
||||
|
||||
@@ -109,11 +128,11 @@ const config_type config_type_for<db::seed_provider_type> = config_type("seed pr
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<std::vector<enum_option<db::experimental_features_t>>> = config_type(
|
||||
"experimental features", value_to_json<std::vector<sstring>>);
|
||||
"experimental features", printable_vector_to_json<enum_option<db::experimental_features_t>>);
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<enum_option<db::tri_mode_restriction_t>> = config_type(
|
||||
"restriction mode", value_to_json<sstring>);
|
||||
"restriction mode", printable_to_json<enum_option<db::tri_mode_restriction_t>>);
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<db::config::hinted_handoff_enabled_type> = config_type("hinted handoff enabled", hinted_handoff_enabled_to_json);
|
||||
|
||||
@@ -574,12 +574,8 @@ public:
|
||||
}
|
||||
|
||||
future<> flush_schemas() {
|
||||
return _qp.proxy().get_db().invoke_on_all([this] (replica::database& db) {
|
||||
return parallel_for_each(db::schema_tables::all_table_names(schema_features::full()), [this, &db](const sstring& cf_name) {
|
||||
auto& cf = db.find_column_family(db::schema_tables::NAME, cf_name);
|
||||
return cf.flush();
|
||||
});
|
||||
});
|
||||
auto& db = _qp.db().real_database();
|
||||
return db.flush_on_all(db::schema_tables::NAME, db::schema_tables::all_table_names(schema_features::full()));
|
||||
}
|
||||
|
||||
future<> migrate() {
|
||||
|
||||
@@ -1042,12 +1042,9 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
||||
|
||||
if (do_flush) {
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
auto& cfs = column_families;
|
||||
co_await parallel_for_each(cfs.begin(), cfs.end(), [&] (const utils::UUID& id) -> future<> {
|
||||
auto& cf = db.find_column_family(id);
|
||||
co_await cf.flush();
|
||||
});
|
||||
auto& db = proxy.local().local_db();
|
||||
co_await parallel_for_each(column_families, [&db] (const utils::UUID& id) -> future<> {
|
||||
return db.flush_on_all(id);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
*/
|
||||
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include "db/snapshot-ctl.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
@@ -59,20 +61,17 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
|
||||
boost::copy(_db.local().get_keyspaces() | boost::adaptors::map_keys, std::back_inserter(keyspace_names));
|
||||
};
|
||||
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] {
|
||||
return parallel_for_each(keyspace_names, [tag, this] (auto& ks_name) {
|
||||
return check_snapshot_not_exist(ks_name, tag);
|
||||
}).then([this, tag, keyspace_names, sf] {
|
||||
return _db.invoke_on_all([tag = std::move(tag), keyspace_names, sf] (replica::database& db) {
|
||||
return parallel_for_each(keyspace_names, [&db, tag = std::move(tag), sf] (auto& ks_name) {
|
||||
auto& ks = db.find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, tag = std::move(tag), sf] (auto& pair) {
|
||||
auto& cf = db.find_column_family(pair.second);
|
||||
return cf.snapshot(db, tag, bool(sf));
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
|
||||
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
|
||||
co_await parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
|
||||
return check_snapshot_not_exist(ks_name, tag);
|
||||
});
|
||||
co_await parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
|
||||
return _db.local().snapshot_on_all(ks_name, tag, bool(sf));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -87,23 +86,23 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
throw std::runtime_error("You must supply a snapshot name.");
|
||||
}
|
||||
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] {
|
||||
return check_snapshot_not_exist(ks_name, tag, tables).then([this, ks_name, tables, tag, sf] {
|
||||
return do_with(std::vector<sstring>(std::move(tables)),[this, ks_name, tag, sf](const std::vector<sstring>& tables) {
|
||||
return do_for_each(tables, [ks_name, tag, sf, this] (const sstring& table_name) {
|
||||
if (table_name.find(".") != sstring::npos) {
|
||||
throw std::invalid_argument("Cannot take a snapshot of a secondary index by itself. Run snapshot on the table that owns the index.");
|
||||
}
|
||||
return _db.invoke_on_all([ks_name, table_name, tag, sf] (replica::database &db) {
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
return cf.snapshot(db, tag, bool(sf));
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
|
||||
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
|
||||
co_await check_snapshot_not_exist(ks_name, tag, tables);
|
||||
|
||||
for (const auto& table_name : tables) {
|
||||
auto& cf = _db.local().find_column_family(ks_name, table_name);
|
||||
if (cf.schema()->is_view()) {
|
||||
throw std::invalid_argument("Do not take a snapshot of a materialized view or a secondary index by itself. Run snapshot on the base table instead.");
|
||||
}
|
||||
}
|
||||
co_await _db.local().snapshot_on_all(ks_name, std::move(tables), std::move(tag), bool(sf));
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, sstring cf_name, sstring tag, skip_flush sf) {
|
||||
return take_column_family_snapshot(ks_name, std::vector<sstring>{cf_name}, tag, sf);
|
||||
}
|
||||
|
||||
@@ -97,6 +97,9 @@ private:
|
||||
|
||||
template <typename Func>
|
||||
std::result_of_t<Func()> run_snapshot_list_operation(Func&&);
|
||||
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -322,7 +322,11 @@ public:
|
||||
view_filter_checking_visitor(const schema& base, const view_info& view)
|
||||
: _base(base)
|
||||
, _view(view)
|
||||
, _selection(cql3::selection::selection::wildcard(_base.shared_from_this()))
|
||||
, _selection(cql3::selection::selection::for_columns(_base.shared_from_this(),
|
||||
boost::copy_range<std::vector<const column_definition*>>(
|
||||
_base.regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return &cdef; }))
|
||||
)
|
||||
)
|
||||
{}
|
||||
|
||||
void accept_new_partition(const partition_key& key, uint64_t row_count) {
|
||||
@@ -1293,7 +1297,7 @@ future<> mutate_MV(
|
||||
auto mut_ptr = remote_endpoints.empty() ? std::make_unique<frozen_mutation>(std::move(mut.fm)) : std::make_unique<frozen_mutation>(mut.fm);
|
||||
tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}",
|
||||
mut.s->ks_name(), mut.s->cf_name(), base_token, view_token);
|
||||
local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, std::move(tr_state), db::commitlog::force_sync::no).then_wrapped(
|
||||
local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped(
|
||||
[s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr),
|
||||
units = sem_units.split(sem_units.count())] (future<>&& f) {
|
||||
--stats.writes;
|
||||
|
||||
@@ -202,6 +202,12 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
future<flush_permit> get_all_flush_permits() {
|
||||
return get_units(_background_work_flush_serializer, _max_background_work).then([this] (auto&& units) {
|
||||
return this->get_flush_permit(std::move(units));
|
||||
});
|
||||
}
|
||||
|
||||
bool has_extraneous_flushes_requested() const {
|
||||
return _extraneous_flushes > 0;
|
||||
}
|
||||
|
||||
6
dist/common/scripts/scylla_coredump_setup
vendored
6
dist/common/scripts/scylla_coredump_setup
vendored
@@ -123,10 +123,14 @@ WantedBy=multi-user.target
|
||||
# - Storage: /path/to/file (inacessible)
|
||||
# - Storage: /path/to/file
|
||||
#
|
||||
# After systemd-v248, available coredump file output changed like this:
|
||||
# - Storage: /path/to/file (present)
|
||||
# We need to support both versions.
|
||||
#
|
||||
# reference: https://github.com/systemd/systemd/commit/47f50642075a7a215c9f7b600599cbfee81a2913
|
||||
|
||||
corefail = False
|
||||
res = re.findall(r'Storage: (.*)$', coreinfo, flags=re.MULTILINE)
|
||||
res = re.findall(r'Storage: (\S+)(?: \(.+\))?$', coreinfo, flags=re.MULTILINE)
|
||||
# v232 or later
|
||||
if res:
|
||||
corepath = res[0]
|
||||
|
||||
12
dist/common/scripts/scylla_sysconfig_setup
vendored
12
dist/common/scripts/scylla_sysconfig_setup
vendored
@@ -70,7 +70,17 @@ if __name__ == '__main__':
|
||||
network_mode = args.mode if args.mode else cfg.get('NETWORK_MODE')
|
||||
|
||||
if args.setup_nic_and_disks:
|
||||
rps_cpus = run('{} --tune net --nic {} --get-cpu-mask'.format(perftune_base_command(), ifname), shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
res = run('{} --tune net --nic {} --get-cpu-mask'.format(perftune_base_command(), ifname), shell=True, check=True, capture_output=True, encoding='utf-8').stdout
|
||||
# we need to extract CPU mask from output, since perftune.py may also print warning messages (#10082)
|
||||
match = re.match('(.*\n)?(0x[0-9a-f]+(?:,0x[0-9a-f]+)*)', res, re.DOTALL)
|
||||
try:
|
||||
warning = match.group(1)
|
||||
rps_cpus = match.group(2)
|
||||
except:
|
||||
raise Exception(f'Failed to retrive CPU mask: {res}')
|
||||
# print warning message if available
|
||||
if warning:
|
||||
print(warning.strip())
|
||||
if len(rps_cpus) > 0:
|
||||
cpuset = hex2list(rps_cpus)
|
||||
run('/opt/scylladb/scripts/scylla_cpuset_setup --cpuset {}'.format(cpuset), shell=True, check=True)
|
||||
|
||||
6
dist/common/supervisor/scylla_util.sh
vendored
6
dist/common/supervisor/scylla_util.sh
vendored
@@ -6,12 +6,16 @@ is_nonroot() {
|
||||
[ -f "$scylladir"/SCYLLA-NONROOT-FILE ]
|
||||
}
|
||||
|
||||
is_container() {
|
||||
[ -f "$scylladir"/SCYLLA-CONTAINER-FILE ]
|
||||
}
|
||||
|
||||
is_privileged() {
|
||||
[ ${EUID:-${UID}} = 0 ]
|
||||
}
|
||||
|
||||
execsudo() {
|
||||
if is_nonroot; then
|
||||
if is_nonroot || is_container; then
|
||||
exec "$@"
|
||||
else
|
||||
exec sudo -u scylla -g scylla "$@"
|
||||
|
||||
4
dist/docker/debian/build_docker.sh
vendored
4
dist/docker/debian/build_docker.sh
vendored
@@ -82,15 +82,17 @@ run bash -ec "echo 'debconf debconf/frontend select Noninteractive' | debconf-se
|
||||
run bash -ec "rm -rf /etc/rsyslog.conf"
|
||||
run apt-get -y install hostname supervisor openssh-server openssh-client openjdk-11-jre-headless python python-yaml curl rsyslog locales sudo
|
||||
run locale-gen en_US.UTF-8
|
||||
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF_8
|
||||
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
|
||||
run bash -ec "dpkg -i packages/*.deb"
|
||||
run apt-get -y clean all
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
|
||||
run mkdir -p /etc/supervisor.conf.d
|
||||
run mkdir -p /var/log/scylla
|
||||
run chown -R scylla:scylla /var/lib/scylla
|
||||
run sed -i -e 's/^SCYLLA_ARGS=".*"$/SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --default-log-level info --network-stack posix"/' /etc/default/scylla-server
|
||||
|
||||
run mkdir -p /opt/scylladb/supervisor
|
||||
run touch /opt/scylladb/SCYLLA-CONTAINER-FILE
|
||||
bcp dist/common/supervisor/scylla-server.sh /opt/scylladb/supervisor/scylla-server.sh
|
||||
bcp dist/common/supervisor/scylla-jmx.sh /opt/scylladb/supervisor/scylla-jmx.sh
|
||||
bcp dist/common/supervisor/scylla-node-exporter.sh /opt/scylladb/supervisor/scylla-node-exporter.sh
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
[program:scylla-server]
|
||||
[program:scylla]
|
||||
command=/opt/scylladb/supervisor/scylla-server.sh
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
|
||||
41
dist/docker/etc/sysconfig/scylla-server
vendored
41
dist/docker/etc/sysconfig/scylla-server
vendored
@@ -1,41 +0,0 @@
|
||||
# choose following mode: virtio, dpdk, posix
|
||||
NETWORK_MODE=posix
|
||||
|
||||
# tap device name(virtio)
|
||||
TAP=tap0
|
||||
|
||||
# bridge device name (virtio)
|
||||
BRIDGE=virbr0
|
||||
|
||||
# ethernet device name
|
||||
IFNAME=eth0
|
||||
|
||||
# setup NIC's and disks' interrupts, RPS, XPS, nomerges and I/O scheduler (posix)
|
||||
SET_NIC_AND_DISKS=no
|
||||
|
||||
# ethernet device driver (dpdk)
|
||||
ETHDRV=
|
||||
|
||||
# ethernet device PCI ID (dpdk)
|
||||
ETHPCIID=
|
||||
|
||||
# number of hugepages
|
||||
NR_HUGEPAGES=64
|
||||
|
||||
# user for process (must be root for dpdk)
|
||||
USER=scylla
|
||||
|
||||
# group for process
|
||||
GROUP=scylla
|
||||
|
||||
# scylla home dir
|
||||
SCYLLA_HOME=/var/lib/scylla
|
||||
|
||||
# scylla config dir
|
||||
SCYLLA_CONF=/etc/scylla
|
||||
|
||||
# scylla arguments
|
||||
SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --default-log-level info --network-stack posix"
|
||||
|
||||
# setup as AMI instance
|
||||
AMI=no
|
||||
@@ -1580,6 +1580,9 @@ bool mutation_fragment_stream_validator::operator()(dht::token t) {
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos) {
|
||||
if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
|
||||
return false;
|
||||
}
|
||||
if (_prev_kind == mutation_fragment_v2::kind::partition_end) {
|
||||
const bool valid = (kind == mutation_fragment_v2::kind::partition_start);
|
||||
if (valid) {
|
||||
@@ -1607,7 +1610,11 @@ bool mutation_fragment_stream_validator::operator()(mutation_fragment::kind kind
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validator::operator()(const mutation_fragment_v2& mf) {
|
||||
return (*this)(mf.mutation_fragment_kind(), mf.position());
|
||||
const auto valid = (*this)(mf.mutation_fragment_kind(), mf.position());
|
||||
if (valid && mf.is_range_tombstone_change()) {
|
||||
_current_tombstone = mf.as_range_tombstone_change().tombstone();
|
||||
}
|
||||
return valid;
|
||||
}
|
||||
bool mutation_fragment_stream_validator::operator()(const mutation_fragment& mf) {
|
||||
return (*this)(to_mutation_fragment_kind_v2(mf.mutation_fragment_kind()), mf.position());
|
||||
@@ -1646,11 +1653,17 @@ void mutation_fragment_stream_validator::reset(dht::decorated_key dk) {
|
||||
_prev_partition_key = dk;
|
||||
_prev_pos = position_in_partition::for_partition_start();
|
||||
_prev_kind = mutation_fragment_v2::kind::partition_start;
|
||||
_current_tombstone = {};
|
||||
}
|
||||
|
||||
void mutation_fragment_stream_validator::reset(const mutation_fragment_v2& mf) {
|
||||
_prev_pos = mf.position();
|
||||
_prev_kind = mf.mutation_fragment_kind();
|
||||
if (mf.is_range_tombstone_change()) {
|
||||
_current_tombstone = mf.as_range_tombstone_change().tombstone();
|
||||
} else {
|
||||
_current_tombstone = {};
|
||||
}
|
||||
}
|
||||
void mutation_fragment_stream_validator::reset(const mutation_fragment& mf) {
|
||||
_prev_pos = mf.position();
|
||||
@@ -1719,6 +1732,11 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment_v2
|
||||
|
||||
fmr_logger.debug("[validator {}] {}:{}", static_cast<void*>(this), kind, pos);
|
||||
|
||||
if (kind == mutation_fragment_v2::kind::partition_end && _current_tombstone) {
|
||||
on_validation_error(fmr_logger, format("[validator {} for {}] Unexpected active tombstone at partition-end: partition key {}: tombstone {}",
|
||||
static_cast<void*>(this), _name, _validator.previous_partition_key(), _current_tombstone));
|
||||
}
|
||||
|
||||
if (_validation_level >= mutation_fragment_stream_validation_level::clustering_key) {
|
||||
valid = _validator(kind, pos);
|
||||
} else {
|
||||
@@ -1745,7 +1763,11 @@ bool mutation_fragment_stream_validating_filter::operator()(mutation_fragment::k
|
||||
}
|
||||
|
||||
bool mutation_fragment_stream_validating_filter::operator()(const mutation_fragment_v2& mv) {
|
||||
return (*this)(mv.mutation_fragment_kind(), mv.position());
|
||||
auto valid = (*this)(mv.mutation_fragment_kind(), mv.position());
|
||||
if (valid && mv.is_range_tombstone_change()) {
|
||||
_current_tombstone = mv.as_range_tombstone_change().tombstone();
|
||||
}
|
||||
return valid;
|
||||
}
|
||||
bool mutation_fragment_stream_validating_filter::operator()(const mutation_fragment& mv) {
|
||||
return (*this)(to_mutation_fragment_kind_v2(mv.mutation_fragment_kind()), mv.position());
|
||||
|
||||
18
install.sh
18
install.sh
@@ -143,7 +143,7 @@ export LD_LIBRARY_PATH="$prefix/libreloc"
|
||||
export UBSAN_OPTIONS="${UBSAN_OPTIONS:+$UBSAN_OPTIONS:}suppressions=$prefix/libexec/ubsan-suppressions.supp"
|
||||
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
|
||||
EOF
|
||||
chmod +x "$root/$prefix/bin/$bin"
|
||||
chmod 755 "$root/$prefix/bin/$bin"
|
||||
}
|
||||
|
||||
relocate_python3() {
|
||||
@@ -156,11 +156,11 @@ relocate_python3() {
|
||||
local pythonpath="$(dirname "$pythoncmd")"
|
||||
|
||||
if [ ! -x "$script" ]; then
|
||||
cp "$script" "$install"
|
||||
install -m755 "$script" "$install"
|
||||
return
|
||||
fi
|
||||
mkdir -p "$relocateddir"
|
||||
cp "$script" "$relocateddir"
|
||||
install -d -m755 "$relocateddir"
|
||||
install -m755 "$script" "$relocateddir"
|
||||
cat > "$install"<<EOF
|
||||
#!/usr/bin/env bash
|
||||
[[ -z "\$LD_PRELOAD" ]] || { echo "\$0: not compatible with LD_PRELOAD" >&2; exit 110; }
|
||||
@@ -178,7 +178,7 @@ if [ -f "\${DEBIAN_SSL_CERT_FILE}" ]; then
|
||||
fi
|
||||
PYTHONPATH="\${d}:\${d}/libexec:\$PYTHONPATH" PATH="\${d}/../bin:\${d}/$pythonpath:\${PATH}" SSL_CERT_FILE="\${c}" exec -a "\$0" "\${d}/libexec/\${b}" "\$@"
|
||||
EOF
|
||||
chmod +x "$install"
|
||||
chmod 755 "$install"
|
||||
}
|
||||
|
||||
install() {
|
||||
@@ -392,6 +392,7 @@ install -d -m755 -d "$rprefix"/scyllatop
|
||||
cp -r tools/scyllatop/* "$rprefix"/scyllatop
|
||||
install -d -m755 -d "$rprefix"/scripts
|
||||
cp -r dist/common/scripts/* "$rprefix"/scripts
|
||||
chmod 755 "$rprefix"/scripts/*
|
||||
ln -srf "$rprefix/scyllatop/scyllatop.py" "$rprefix/bin/scyllatop"
|
||||
if $supervisor; then
|
||||
install -d -m755 "$rprefix"/supervisor
|
||||
@@ -508,8 +509,13 @@ relocate_python3 "$rprefix"/scripts fix_system_distributed_tables.py
|
||||
if $supervisor; then
|
||||
install -d -m755 `supervisor_dir $retc`
|
||||
for service in scylla-server scylla-jmx scylla-node-exporter; do
|
||||
if [ "$service" = "scylla-server" ]; then
|
||||
program="scylla"
|
||||
else
|
||||
program=$service
|
||||
fi
|
||||
cat << EOS > `supervisor_conf $retc $service`
|
||||
[program:$service]
|
||||
[program:$program]
|
||||
directory=$rprefix
|
||||
command=/bin/bash -c './supervisor/$service.sh'
|
||||
EOS
|
||||
|
||||
@@ -34,6 +34,10 @@ azure_snitch::azure_snitch(const sstring& fname, unsigned io_cpuid) : production
|
||||
}
|
||||
|
||||
future<> azure_snitch::load_config() {
|
||||
if (this_shard_id() != io_cpu_id()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
sstring region = co_await azure_api_call(REGION_NAME_QUERY_PATH);
|
||||
sstring azure_zone = co_await azure_api_call(ZONE_NAME_QUERY_PATH);
|
||||
|
||||
|
||||
33
main.cc
33
main.cc
@@ -367,11 +367,38 @@ static auto defer_verbose_shutdown(const char* what, Func&& func) {
|
||||
startlog.info("Shutting down {}", what);
|
||||
try {
|
||||
func();
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
} catch (...) {
|
||||
startlog.error("Unexpected error shutting down {}: {}", what, std::current_exception());
|
||||
throw;
|
||||
auto ex = std::current_exception();
|
||||
bool do_abort = true;
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (const std::system_error& e) {
|
||||
// System error codes we consider "environmental",
|
||||
// i.e. not scylla's fault, therefore there is no point in
|
||||
// aborting and dumping core.
|
||||
for (int i : {EIO, EACCES, ENOSPC}) {
|
||||
if (e.code() == std::error_code(i, std::system_category())) {
|
||||
do_abort = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
}
|
||||
auto msg = fmt::format("Unexpected error shutting down {}: {}", what, ex);
|
||||
if (do_abort) {
|
||||
startlog.error("{}: aborting", msg);
|
||||
abort();
|
||||
} else {
|
||||
startlog.error("{}: exiting, at {}", msg, current_backtrace());
|
||||
|
||||
// Call _exit() rather than exit() to exit immediately
|
||||
// without calling exit handlers, avoiding
|
||||
// boost::intrusive::detail::destructor_impl assert failure
|
||||
// from ~segment_pool exit handler.
|
||||
_exit(255);
|
||||
}
|
||||
}
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
};
|
||||
|
||||
auto ret = deferred_action(std::move(vfunc));
|
||||
|
||||
@@ -716,10 +716,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
}
|
||||
opts.tcp_nodelay = must_tcp_nodelay;
|
||||
opts.reuseaddr = true;
|
||||
// We send cookies only for non-default statement tenant clients.
|
||||
if (idx > 3) {
|
||||
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
|
||||
}
|
||||
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
|
||||
|
||||
auto client = must_encrypt ?
|
||||
::make_shared<rpc_protocol_client_wrapper>(_rpc->protocol(), std::move(opts),
|
||||
|
||||
@@ -28,6 +28,7 @@ class mutation_fragment_stream_validator {
|
||||
mutation_fragment_v2::kind _prev_kind;
|
||||
position_in_partition _prev_pos;
|
||||
dht::decorated_key _prev_partition_key;
|
||||
tombstone _current_tombstone;
|
||||
public:
|
||||
explicit mutation_fragment_stream_validator(const schema& s);
|
||||
|
||||
@@ -122,6 +123,12 @@ public:
|
||||
const position_in_partition& previous_position() const {
|
||||
return _prev_pos;
|
||||
}
|
||||
/// Get the current effective tombstone
|
||||
///
|
||||
/// Not meaningful, when operator()(mutation_fragment_v2) is not used.
|
||||
tombstone current_tombstone() const {
|
||||
return _current_tombstone;
|
||||
}
|
||||
/// The previous valid partition key.
|
||||
///
|
||||
/// Only valid if `operator()(const dht::decorated_key&)` or
|
||||
@@ -151,6 +158,7 @@ class mutation_fragment_stream_validating_filter {
|
||||
mutation_fragment_stream_validator _validator;
|
||||
sstring _name;
|
||||
mutation_fragment_stream_validation_level _validation_level;
|
||||
tombstone _current_tombstone;
|
||||
|
||||
public:
|
||||
/// Constructor.
|
||||
|
||||
@@ -96,7 +96,7 @@ void range_tombstone_list::insert_from(const schema& s,
|
||||
if (cmp(end, it->position()) < 0) {
|
||||
// not overlapping
|
||||
if (it->tombstone().tomb == tomb && cmp(end, it->position()) == 0) {
|
||||
rev.update(it, {std::move(start), std::move(start), tomb});
|
||||
rev.update(it, {std::move(start), std::move(end), tomb});
|
||||
} else {
|
||||
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), tomb);
|
||||
rev.insert(it, *rt);
|
||||
|
||||
194
repair/repair.cc
194
repair/repair.cc
@@ -25,6 +25,7 @@
|
||||
#include "utils/bit_cast.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "partition_range_compat.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
@@ -41,6 +42,7 @@
|
||||
#include <seastar/core/sleep.hh>
|
||||
|
||||
#include <cfloat>
|
||||
#include <algorithm>
|
||||
|
||||
#include "idl/partition_checksum.dist.hh"
|
||||
|
||||
@@ -118,6 +120,13 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo
|
||||
return out << "unknown";
|
||||
}
|
||||
|
||||
static size_t get_nr_tables(const replica::database& db, const sstring& keyspace) {
|
||||
auto& m = db.get_column_families_mapping();
|
||||
return std::count_if(m.begin(), m.end(), [&keyspace] (auto& e) {
|
||||
return e.first.first == keyspace;
|
||||
});
|
||||
}
|
||||
|
||||
static std::vector<sstring> list_column_families(const replica::database& db, const sstring& keyspace) {
|
||||
std::vector<sstring> ret;
|
||||
for (auto &&e : db.get_column_families_mapping()) {
|
||||
@@ -443,7 +452,7 @@ float tracker::report_progress(streaming::stream_reason reason) {
|
||||
for (auto& x : _repairs) {
|
||||
auto& ri = x.second;
|
||||
if (ri->reason == reason) {
|
||||
nr_ranges_total += ri->nr_ranges_total;
|
||||
nr_ranges_total += ri->ranges_size();
|
||||
nr_ranges_finished += ri->nr_ranges_finished;
|
||||
}
|
||||
}
|
||||
@@ -555,8 +564,8 @@ void repair_info::check_failed_ranges() {
|
||||
rlogger.info("repair id {} on shard {} stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
|
||||
id, shard, reason, keyspace, table_names(), ranges.size(), _stats.get_stats());
|
||||
if (nr_failed_ranges) {
|
||||
rlogger.warn("repair id {} on shard {} failed - {} out of {} ranges failed", id, shard, nr_failed_ranges, ranges.size());
|
||||
throw std::runtime_error(format("repair id {} on shard {} failed to repair {} out of {} ranges", id, shard, nr_failed_ranges, ranges.size()));
|
||||
rlogger.warn("repair id {} on shard {} failed - {} out of {} ranges failed", id, shard, nr_failed_ranges, ranges_size());
|
||||
throw std::runtime_error(format("repair id {} on shard {} failed to repair {} out of {} ranges", id, shard, nr_failed_ranges, ranges_size()));
|
||||
} else {
|
||||
if (dropped_tables.size()) {
|
||||
rlogger.warn("repair id {} on shard {} completed successfully, keyspace={}, ignoring dropped tables={}", id, shard, keyspace, dropped_tables);
|
||||
@@ -582,14 +591,18 @@ repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range
|
||||
neighbors[range];
|
||||
}
|
||||
|
||||
size_t repair_info::ranges_size() {
|
||||
return ranges.size() * table_ids.size();
|
||||
}
|
||||
|
||||
// Repair a single local range, multiple column families.
|
||||
// Comparable to RepairSession in Origin
|
||||
future<> repair_info::repair_range(const dht::token_range& range) {
|
||||
future<> repair_info::repair_range(const dht::token_range& range, utils::UUID table_id) {
|
||||
check_in_shutdown();
|
||||
check_in_abort();
|
||||
ranges_index++;
|
||||
repair_neighbors neighbors = get_repair_neighbors(range);
|
||||
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [this, range] (auto& neighbors, auto& mandatory_neighbors) {
|
||||
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [this, range, table_id] (auto& neighbors, auto& mandatory_neighbors) {
|
||||
auto live_neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors |
|
||||
boost::adaptors::filtered([this] (const gms::inet_address& node) { return gossiper.is_alive(node); }));
|
||||
for (auto& node : mandatory_neighbors) {
|
||||
@@ -598,7 +611,7 @@ future<> repair_info::repair_range(const dht::token_range& range) {
|
||||
nr_failed_ranges++;
|
||||
auto status = format("failed: mandatory neighbor={} is not alive", node);
|
||||
rlogger.error("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
ranges_index, ranges_size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
abort();
|
||||
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
|
||||
node, keyspace, mandatory_neighbors)));
|
||||
@@ -608,7 +621,7 @@ future<> repair_info::repair_range(const dht::token_range& range) {
|
||||
nr_failed_ranges++;
|
||||
auto status = live_neighbors.empty() ? "skipped" : "partial";
|
||||
rlogger.warn("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
ranges_index, ranges_size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
if (live_neighbors.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -617,13 +630,12 @@ future<> repair_info::repair_range(const dht::token_range& range) {
|
||||
if (neighbors.empty()) {
|
||||
auto status = "skipped_no_followers";
|
||||
rlogger.warn("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
ranges_index, ranges_size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}",
|
||||
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors);
|
||||
return mm.sync_schema(db.local(), neighbors).then([this, &neighbors, range] {
|
||||
return do_for_each(table_ids.begin(), table_ids.end(), [this, &neighbors, range] (utils::UUID table_id) {
|
||||
ranges_index, ranges_size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors);
|
||||
return mm.sync_schema(db.local(), neighbors).then([this, &neighbors, range, table_id] {
|
||||
sstring cf;
|
||||
try {
|
||||
cf = db.local().find_column_family(table_id).schema()->cf_name();
|
||||
@@ -641,7 +653,6 @@ future<> repair_info::repair_range(const dht::token_range& range) {
|
||||
nr_failed_ranges++;
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -914,27 +925,55 @@ private:
|
||||
|
||||
|
||||
static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
// repair all the ranges in limited parallelism
|
||||
return parallel_for_each(ri->ranges, [ri] (auto&& range) {
|
||||
return with_semaphore(ri->rs.repair_tracker().range_parallelism_semaphore(), 1, [ri, &range] {
|
||||
return ri->repair_range(range).then([ri] {
|
||||
if (ri->reason == streaming::stream_reason::bootstrap) {
|
||||
ri->rs.get_metrics().bootstrap_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::replace) {
|
||||
ri->rs.get_metrics().replace_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::rebuild) {
|
||||
ri->rs.get_metrics().rebuild_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::decommission) {
|
||||
ri->rs.get_metrics().decommission_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::removenode) {
|
||||
ri->rs.get_metrics().removenode_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::repair) {
|
||||
ri->rs.get_metrics().repair_finished_ranges_sum++;
|
||||
ri->nr_ranges_finished++;
|
||||
}
|
||||
// Repair tables in the keyspace one after another
|
||||
assert(ri->table_names().size() == ri->table_ids.size());
|
||||
for (int idx = 0; idx < ri->table_ids.size(); idx++) {
|
||||
auto table_id = ri->table_ids[idx];
|
||||
auto table_name = ri->table_names()[idx];
|
||||
// repair all the ranges in limited parallelism
|
||||
rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}",
|
||||
ri->id.uuid, idx + 1, ri->table_ids.size(), ri->keyspace, table_name, table_id, ri->reason);
|
||||
co_await parallel_for_each(ri->ranges, [ri, table_id] (auto&& range) {
|
||||
return with_semaphore(ri->rs.repair_tracker().range_parallelism_semaphore(), 1, [ri, &range, table_id] {
|
||||
return ri->repair_range(range, table_id).then([ri] {
|
||||
if (ri->reason == streaming::stream_reason::bootstrap) {
|
||||
ri->rs.get_metrics().bootstrap_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::replace) {
|
||||
ri->rs.get_metrics().replace_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::rebuild) {
|
||||
ri->rs.get_metrics().rebuild_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::decommission) {
|
||||
ri->rs.get_metrics().decommission_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::removenode) {
|
||||
ri->rs.get_metrics().removenode_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::repair) {
|
||||
ri->rs.get_metrics().repair_finished_ranges_sum++;
|
||||
ri->nr_ranges_finished++;
|
||||
}
|
||||
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
|
||||
ri->id.uuid,
|
||||
ri->rs.get_metrics().bootstrap_finished_percentage(),
|
||||
ri->rs.get_metrics().replace_finished_percentage(),
|
||||
ri->rs.get_metrics().rebuild_finished_percentage(),
|
||||
ri->rs.get_metrics().decommission_finished_percentage(),
|
||||
ri->rs.get_metrics().removenode_finished_percentage(),
|
||||
ri->rs.get_metrics().repair_finished_percentage());
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
if (ri->reason != streaming::stream_reason::repair) {
|
||||
try {
|
||||
auto& table = ri->db.local().find_column_family(table_id);
|
||||
rlogger.debug("repair[{}]: Trigger off-strategy compaction for keyspace={}, table={}",
|
||||
ri->id.uuid, table.schema()->ks_name(), table.schema()->cf_name());
|
||||
table.trigger_offstrategy_compaction();
|
||||
} catch (replica::no_such_column_family&) {
|
||||
// Ignore dropped table
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
// repair_ranges repairs a list of token ranges, each assumed to be a token
|
||||
@@ -1060,33 +1099,48 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
|
||||
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
|
||||
auto uuid = id.uuid;
|
||||
|
||||
auto waiting_nodes = db.local().get_token_metadata().get_all_endpoints();
|
||||
std::erase_if(waiting_nodes, [&] (const auto& addr) {
|
||||
return ignore_nodes.contains(addr);
|
||||
});
|
||||
auto participants = get_hosts_participating_in_repair(db.local(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get();
|
||||
auto hints_timeout = std::chrono::seconds(300);
|
||||
auto batchlog_timeout = std::chrono::seconds(300);
|
||||
repair_flush_hints_batchlog_request req{id.uuid, participants, hints_timeout, batchlog_timeout};
|
||||
bool needs_flush_before_repair = false;
|
||||
if (db.local().features().cluster_supports_tombstone_gc_options()) {
|
||||
for (auto& table: cfs) {
|
||||
auto s = db.local().find_column_family(keyspace, table).schema();
|
||||
const auto& options = s->tombstone_gc_options();
|
||||
if (options.mode() == tombstone_gc_mode::repair) {
|
||||
needs_flush_before_repair = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool hints_batchlog_flushed = false;
|
||||
try {
|
||||
parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
|
||||
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
|
||||
uuid, node, participants);
|
||||
try {
|
||||
auto& ms = get_messaging();
|
||||
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
|
||||
uuid, node, participants, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}).get();
|
||||
hints_batchlog_flushed = true;
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
|
||||
uuid, participants);
|
||||
auto participants = get_hosts_participating_in_repair(db.local(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get();
|
||||
if (needs_flush_before_repair) {
|
||||
auto waiting_nodes = db.local().get_token_metadata().get_all_endpoints();
|
||||
std::erase_if(waiting_nodes, [&] (const auto& addr) {
|
||||
return ignore_nodes.contains(addr);
|
||||
});
|
||||
auto hints_timeout = std::chrono::seconds(300);
|
||||
auto batchlog_timeout = std::chrono::seconds(300);
|
||||
repair_flush_hints_batchlog_request req{id.uuid, participants, hints_timeout, batchlog_timeout};
|
||||
|
||||
try {
|
||||
parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
|
||||
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
|
||||
uuid, node, participants);
|
||||
try {
|
||||
auto& ms = get_messaging();
|
||||
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
|
||||
uuid, node, participants, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}).get();
|
||||
hints_batchlog_flushed = true;
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
|
||||
uuid, participants);
|
||||
}
|
||||
} else {
|
||||
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
|
||||
}
|
||||
|
||||
std::vector<future<>> repair_results;
|
||||
@@ -1288,7 +1342,8 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip).get0();
|
||||
seastar::thread::maybe_yield();
|
||||
nr_ranges_total += desired_ranges.size();
|
||||
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
|
||||
nr_ranges_total += desired_ranges.size() * nr_tables;
|
||||
}
|
||||
container().invoke_on_all([nr_ranges_total] (repair_service& rs) {
|
||||
rs.get_metrics().bootstrap_finished_ranges = 0;
|
||||
@@ -1320,7 +1375,8 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
//Collects the source that will have its range moved to the new node
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
|
||||
rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size());
|
||||
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
|
||||
rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size() * nr_tables);
|
||||
for (auto& desired_range : desired_ranges) {
|
||||
for (auto& x : range_addresses) {
|
||||
const range<dht::token>& src_range = x.first;
|
||||
@@ -1461,7 +1517,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
}
|
||||
auto& ks = db.local().find_keyspace(keyspace_name);
|
||||
dht::token_range_vector ranges = ks.get_effective_replication_map()->get_ranges(leaving_node);
|
||||
nr_ranges_total += ranges.size();
|
||||
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
|
||||
nr_ranges_total += ranges.size() * nr_tables;
|
||||
}
|
||||
if (reason == streaming::stream_reason::decommission) {
|
||||
container().invoke_on_all([nr_ranges_total] (repair_service& rs) {
|
||||
@@ -1485,8 +1542,9 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
auto erm = ks.get_effective_replication_map();
|
||||
// First get all ranges the leaving node is responsible for
|
||||
dht::token_range_vector ranges = erm->get_ranges(leaving_node);
|
||||
rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size());
|
||||
size_t nr_ranges_total = ranges.size();
|
||||
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
|
||||
rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size() * nr_tables);
|
||||
size_t nr_ranges_total = ranges.size() * nr_tables;
|
||||
size_t nr_ranges_skipped = 0;
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> current_replica_endpoints;
|
||||
// Find (for each range) all nodes that store replicas for these ranges as well
|
||||
@@ -1677,7 +1735,8 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
// Okay to yield since tm is immutable
|
||||
dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0();
|
||||
nr_ranges_total += ranges.size();
|
||||
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
|
||||
nr_ranges_total += ranges.size() * nr_tables;
|
||||
|
||||
}
|
||||
if (reason == streaming::stream_reason::rebuild) {
|
||||
@@ -1702,7 +1761,8 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0();
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}, ignore_nodes={}", op, keyspace_name, source_dc, ranges.size(), ignore_nodes);
|
||||
auto nr_tables = get_nr_tables(db.local(), keyspace_name);
|
||||
rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}, ignore_nodes={}", op, keyspace_name, source_dc, ranges.size() * nr_tables, ignore_nodes);
|
||||
for (auto it = ranges.begin(); it != ranges.end();) {
|
||||
auto& r = *it;
|
||||
seastar::thread::maybe_yield();
|
||||
@@ -1730,12 +1790,12 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
}
|
||||
}
|
||||
if (reason == streaming::stream_reason::rebuild) {
|
||||
container().invoke_on_all([nr_ranges_skipped] (repair_service& rs) {
|
||||
rs.get_metrics().rebuild_finished_ranges += nr_ranges_skipped;
|
||||
container().invoke_on_all([nr_ranges_skipped, nr_tables] (repair_service& rs) {
|
||||
rs.get_metrics().rebuild_finished_ranges += nr_ranges_skipped * nr_tables;
|
||||
}).get();
|
||||
} else if (reason == streaming::stream_reason::replace) {
|
||||
container().invoke_on_all([nr_ranges_skipped] (repair_service& rs) {
|
||||
rs.get_metrics().replace_finished_ranges += nr_ranges_skipped;
|
||||
container().invoke_on_all([nr_ranges_skipped, nr_tables] (repair_service& rs) {
|
||||
rs.get_metrics().replace_finished_ranges += nr_ranges_skipped * nr_tables;
|
||||
}).get();
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
|
||||
@@ -200,7 +200,9 @@ public:
|
||||
return _hints_batchlog_flushed;
|
||||
}
|
||||
|
||||
future<> repair_range(const dht::token_range& range);
|
||||
future<> repair_range(const dht::token_range& range, utils::UUID table_id);
|
||||
|
||||
size_t ranges_size();
|
||||
};
|
||||
|
||||
// The repair_tracker tracks ongoing repair operations and their progress.
|
||||
|
||||
@@ -67,6 +67,7 @@ public:
|
||||
uint64_t repair_finished_ranges_sum{0};
|
||||
private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
public:
|
||||
float bootstrap_finished_percentage();
|
||||
float replace_finished_percentage();
|
||||
float rebuild_finished_percentage();
|
||||
|
||||
@@ -910,10 +910,9 @@ bool database::update_column_family(schema_ptr new_schema) {
|
||||
return columns_changed;
|
||||
}
|
||||
|
||||
future<> database::remove(const column_family& cf) noexcept {
|
||||
void database::remove(const table& cf) noexcept {
|
||||
auto s = cf.schema();
|
||||
auto& ks = find_keyspace(s->ks_name());
|
||||
co_await _querier_cache.evict_all_for_table(s->id());
|
||||
_column_families.erase(s->id());
|
||||
ks.metadata()->remove_column_family(s);
|
||||
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
|
||||
@@ -937,13 +936,20 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
|
||||
on_internal_error(dblog, fmt::format("drop_column_family {}.{}: UUID={} not found", ks_name, cf_name, uuid));
|
||||
}
|
||||
dblog.debug("Dropping {}.{}", ks_name, cf_name);
|
||||
co_await remove(*cf);
|
||||
remove(*cf);
|
||||
cf->clear_views();
|
||||
co_return co_await cf->await_pending_ops().then([this, &ks, cf, tsf = std::move(tsf), snapshot] {
|
||||
return truncate(ks, *cf, std::move(tsf), snapshot).finally([this, cf] {
|
||||
return cf->stop();
|
||||
});
|
||||
}).finally([cf] {});
|
||||
co_await cf->await_pending_ops();
|
||||
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await truncate(ks, *cf, std::move(tsf), snapshot);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await cf->stop();
|
||||
if (ex) {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
const utils::UUID& database::find_uuid(std::string_view ks, std::string_view cf) const {
|
||||
@@ -2054,6 +2060,53 @@ future<> database::flush(const sstring& ksname, const sstring& cfname) {
|
||||
return cf.flush();
|
||||
}
|
||||
|
||||
future<> database::flush_on_all(utils::UUID id) {
|
||||
return container().invoke_on_all([id] (replica::database& db) {
|
||||
return db.find_column_family(id).flush();
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::flush_on_all(std::string_view ks_name, std::string_view table_name) {
|
||||
return flush_on_all(find_uuid(ks_name, table_name));
|
||||
}
|
||||
|
||||
future<> database::flush_on_all(std::string_view ks_name, std::vector<sstring> table_names) {
|
||||
return parallel_for_each(table_names, [this, ks_name] (const auto& table_name) {
|
||||
return flush_on_all(ks_name, table_name);
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::flush_on_all(std::string_view ks_name) {
|
||||
return parallel_for_each(find_keyspace(ks_name).metadata()->cf_meta_data(), [this] (auto& pair) {
|
||||
return flush_on_all(pair.second->id());
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::snapshot_on_all(std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
|
||||
co_await parallel_for_each(table_names, [this, ks_name, tag = std::move(tag), skip_flush] (const auto& table_name) -> future<> {
|
||||
if (!skip_flush) {
|
||||
co_await flush_on_all(ks_name, table_name);
|
||||
}
|
||||
co_await container().invoke_on_all([ks_name, &table_name, tag, skip_flush] (replica::database& db) {
|
||||
auto& t = db.find_column_family(ks_name, table_name);
|
||||
return t.snapshot(db, tag);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::snapshot_on_all(std::string_view ks_name, sstring tag, bool skip_flush) {
|
||||
auto& ks = find_keyspace(ks_name);
|
||||
co_await parallel_for_each(ks.metadata()->cf_meta_data(), [this, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
|
||||
if (!skip_flush) {
|
||||
co_await flush_on_all(pair.second->id());
|
||||
}
|
||||
co_await container().invoke_on_all([id = pair.second, tag, skip_flush] (replica::database& db) {
|
||||
auto& t = db.find_column_family(id);
|
||||
return t.snapshot(db, tag);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf) {
|
||||
auto& ks = find_keyspace(ksname);
|
||||
auto& cf = find_column_family(ksname, cfname);
|
||||
@@ -2062,80 +2115,77 @@ future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf)
|
||||
|
||||
future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf, bool with_snapshot) {
|
||||
dblog.debug("Truncating {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name());
|
||||
return with_gate(cf.async_gate(), [this, &ks, &cf, tsf = std::move(tsf), with_snapshot] () mutable -> future<> {
|
||||
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
|
||||
const auto should_flush = auto_snapshot;
|
||||
auto holder = cf.async_gate().hold();
|
||||
|
||||
// Force mutations coming in to re-acquire higher rp:s
|
||||
// This creates a "soft" ordering, in that we will guarantee that
|
||||
// any sstable written _after_ we issue the flush below will
|
||||
// only have higher rp:s than we will get from the discard_sstable
|
||||
// call.
|
||||
auto low_mark = cf.set_low_replay_position_mark();
|
||||
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
|
||||
const auto should_flush = auto_snapshot;
|
||||
|
||||
const auto uuid = cf.schema()->id();
|
||||
// Force mutations coming in to re-acquire higher rp:s
|
||||
// This creates a "soft" ordering, in that we will guarantee that
|
||||
// any sstable written _after_ we issue the flush below will
|
||||
// only have higher rp:s than we will get from the discard_sstable
|
||||
// call.
|
||||
auto low_mark = cf.set_low_replay_position_mark();
|
||||
|
||||
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
|
||||
future<> f = make_ready_future<>();
|
||||
bool did_flush = false;
|
||||
if (should_flush && cf.can_flush()) {
|
||||
// TODO:
|
||||
// this is not really a guarantee at all that we've actually
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
f = cf.flush();
|
||||
did_flush = true;
|
||||
} else {
|
||||
f = cf.clear();
|
||||
}
|
||||
return f.then([this, &cf, auto_snapshot, tsf = std::move(tsf), low_mark, should_flush, did_flush] {
|
||||
dblog.debug("Discarding sstable data for truncated CF + indexes");
|
||||
// TODO: notify truncation
|
||||
const auto uuid = cf.schema()->id();
|
||||
|
||||
return tsf().then([this, &cf, auto_snapshot, low_mark, should_flush, did_flush](db_clock::time_point truncated_at) {
|
||||
future<> f = make_ready_future<>();
|
||||
if (auto_snapshot) {
|
||||
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
|
||||
f = cf.snapshot(*this, name);
|
||||
}
|
||||
return f.then([this, &cf, truncated_at, low_mark, should_flush, did_flush] {
|
||||
return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush, did_flush](db::replay_position rp) {
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
|
||||
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
|
||||
// the low_mark assertion does not hold, because we maybe/probably never got around to
|
||||
// creating the sstables that would create them.
|
||||
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
return truncate_views(cf, truncated_at, should_flush).then([&cf, truncated_at, rp] {
|
||||
// save_truncation_record() may actually fail after we cached the truncation time
|
||||
// but this is not be worse that if failing without caching: at least the correct time
|
||||
// will be available until next reboot and a client will have to retry truncation anyway.
|
||||
cf.cache_truncation_record(truncated_at);
|
||||
return db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([this, uuid] {
|
||||
drop_repair_history_map_for_table(uuid);
|
||||
});
|
||||
});
|
||||
}
|
||||
std::vector<compaction_manager::compaction_reenabler> cres;
|
||||
cres.reserve(1 + cf.views().size());
|
||||
|
||||
future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) {
|
||||
return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) {
|
||||
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&cf));
|
||||
co_await parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> {
|
||||
auto& vcf = find_column_family(v);
|
||||
return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] {
|
||||
return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] {
|
||||
return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) {
|
||||
return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
|
||||
});
|
||||
});
|
||||
});
|
||||
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&vcf));
|
||||
});
|
||||
|
||||
bool did_flush = false;
|
||||
if (should_flush && cf.can_flush()) {
|
||||
// TODO:
|
||||
// this is not really a guarantee at all that we've actually
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
co_await cf.flush();
|
||||
did_flush = true;
|
||||
} else {
|
||||
co_await cf.clear();
|
||||
}
|
||||
|
||||
dblog.debug("Discarding sstable data for truncated CF + indexes");
|
||||
// TODO: notify truncation
|
||||
|
||||
db_clock::time_point truncated_at = co_await tsf();
|
||||
|
||||
if (auto_snapshot) {
|
||||
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
|
||||
co_await cf.snapshot(*this, name);
|
||||
}
|
||||
|
||||
db::replay_position rp = co_await cf.discard_sstables(truncated_at);
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
|
||||
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
|
||||
// the low_mark assertion does not hold, because we maybe/probably never got around to
|
||||
// creating the sstables that would create them.
|
||||
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
co_await parallel_for_each(cf.views(), [this, truncated_at, should_flush] (view_ptr v) -> future<> {
|
||||
auto& vcf = find_column_family(v);
|
||||
if (should_flush) {
|
||||
co_await vcf.flush();
|
||||
} else {
|
||||
co_await vcf.clear();
|
||||
}
|
||||
db::replay_position rp = co_await vcf.discard_sstables(truncated_at);
|
||||
co_await db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
|
||||
});
|
||||
// save_truncation_record() may actually fail after we cached the truncation time
|
||||
// but this is not be worse that if failing without caching: at least the correct time
|
||||
// will be available until next reboot and a client will have to retry truncation anyway.
|
||||
cf.cache_truncation_record(truncated_at);
|
||||
co_await db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
|
||||
|
||||
drop_repair_history_map_for_table(uuid);
|
||||
}
|
||||
|
||||
const sstring& database::get_snitch_name() const {
|
||||
|
||||
@@ -839,7 +839,11 @@ public:
|
||||
|
||||
db::replay_position set_low_replay_position_mark();
|
||||
|
||||
future<> snapshot(database& db, sstring name, bool skip_flush = false);
|
||||
private:
|
||||
future<> snapshot(database& db, sstring name);
|
||||
|
||||
friend class database;
|
||||
public:
|
||||
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
|
||||
|
||||
/*!
|
||||
@@ -1217,7 +1221,7 @@ struct string_pair_eq {
|
||||
// local metadata reads
|
||||
// use shard_of() for data
|
||||
|
||||
class database {
|
||||
class database : public peering_sharded_service<database> {
|
||||
friend class ::database_test;
|
||||
public:
|
||||
enum class table_kind {
|
||||
@@ -1371,6 +1375,7 @@ private:
|
||||
Future update_write_metrics(Future&& f);
|
||||
void update_write_metrics_for_timed_out_write();
|
||||
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, bool is_bootstrap, system_keyspace system);
|
||||
void remove(const table&) noexcept;
|
||||
public:
|
||||
static utils::UUID empty_version;
|
||||
|
||||
@@ -1560,6 +1565,17 @@ public:
|
||||
|
||||
future<> flush_all_memtables();
|
||||
future<> flush(const sstring& ks, const sstring& cf);
|
||||
// flush a table identified by the given id on all shards.
|
||||
future<> flush_on_all(utils::UUID id);
|
||||
// flush a single table in a keyspace on all shards.
|
||||
future<> flush_on_all(std::string_view ks_name, std::string_view table_name);
|
||||
// flush a list of tables in a keyspace on all shards.
|
||||
future<> flush_on_all(std::string_view ks_name, std::vector<sstring> table_names);
|
||||
// flush all tables in a keyspace on all shards.
|
||||
future<> flush_on_all(std::string_view ks_name);
|
||||
|
||||
future<> snapshot_on_all(std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
|
||||
future<> snapshot_on_all(std::string_view ks_name, sstring tag, bool skip_flush);
|
||||
|
||||
// See #937. Truncation now requires a callback to get a time stamp
|
||||
// that must be guaranteed to be the same for all shards.
|
||||
@@ -1568,11 +1584,9 @@ public:
|
||||
/** Truncates the given column family */
|
||||
future<> truncate(sstring ksname, sstring cfname, timestamp_func);
|
||||
future<> truncate(const keyspace& ks, column_family& cf, timestamp_func, bool with_snapshot = true);
|
||||
future<> truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush);
|
||||
|
||||
bool update_column_family(schema_ptr s);
|
||||
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);
|
||||
future<> remove(const column_family&) noexcept;
|
||||
|
||||
const logalloc::region_group& dirty_memory_region_group() const {
|
||||
return _dirty_memory_manager.region_group();
|
||||
|
||||
@@ -454,12 +454,13 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
|
||||
});
|
||||
}
|
||||
|
||||
future<> distributed_loader::populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, bool must_exist) {
|
||||
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), must_exist] {
|
||||
future<> distributed_loader::populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
|
||||
dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", ks, cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist);
|
||||
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), do_allow_offstrategy_compaction, dir_must_exist] {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
if (!file_exists(sstdir).get0()) {
|
||||
if (must_exist) {
|
||||
if (dir_must_exist) {
|
||||
throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", ks, cf, sstdir));
|
||||
}
|
||||
return;
|
||||
@@ -529,12 +530,14 @@ future<> distributed_loader::populate_column_family(distributed<replica::databas
|
||||
return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big);
|
||||
}, eligible_for_reshape_on_boot).get();
|
||||
|
||||
directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot] (sstables::sstable_directory& dir) {
|
||||
return dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot] (sstables::shared_sstable sst) {
|
||||
auto requires_offstrategy = sstables::offstrategy(!eligible_for_reshape_on_boot(sst));
|
||||
directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) {
|
||||
return dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) {
|
||||
auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst));
|
||||
return global_table->add_sstable_and_update_cache(sst, requires_offstrategy);
|
||||
}).then([&global_table] {
|
||||
}).then([&global_table, do_allow_offstrategy_compaction] {
|
||||
if (do_allow_offstrategy_compaction) {
|
||||
global_table->trigger_offstrategy_compaction();
|
||||
}
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
@@ -560,11 +563,11 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
|
||||
auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
|
||||
dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version());
|
||||
return ks.make_directory_for_column_family(cfname, uuid).then([&db, sstdir, uuid, ks_name, cfname] {
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname);
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
|
||||
}).then([&db, sstdir, ks_name, cfname] {
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, false /* must_exist */);
|
||||
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
|
||||
}).then([&db, sstdir, uuid, ks_name, cfname] {
|
||||
return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname);
|
||||
return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
|
||||
}).handle_exception([ks_name, cfname, sstdir](std::exception_ptr eptr) {
|
||||
std::string msg =
|
||||
format("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/file.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
#include <filesystem>
|
||||
@@ -67,7 +68,9 @@ class distributed_loader {
|
||||
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
|
||||
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
|
||||
std::filesystem::path datadir, sstring ks, sstring cf);
|
||||
static future<> populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, bool must_exist = true);
|
||||
using allow_offstrategy_compaction = bool_class<struct allow_offstrategy_compaction_tag>;
|
||||
using must_exist = bool_class<struct must_exist_tag>;
|
||||
static future<> populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, allow_offstrategy_compaction, must_exist = must_exist::yes);
|
||||
static future<> populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name);
|
||||
static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir);
|
||||
static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);
|
||||
|
||||
163
replica/table.cc
163
replica/table.cc
@@ -9,6 +9,7 @@
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
#include "replica/database.hh"
|
||||
@@ -662,11 +663,21 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
|
||||
[] (const dht::decorated_key&) { return api::min_timestamp; });
|
||||
}
|
||||
|
||||
mutation_fragment* fragment = co_await reader.peek();
|
||||
if (!fragment) {
|
||||
std::exception_ptr err;
|
||||
try {
|
||||
mutation_fragment* fragment = co_await reader.peek();
|
||||
if (!fragment) {
|
||||
co_await reader.close();
|
||||
_memtables->erase(old);
|
||||
co_return stop_iteration::yes;
|
||||
}
|
||||
} catch (...) {
|
||||
err = std::current_exception();
|
||||
}
|
||||
if (err) {
|
||||
tlogger.error("failed to flush memtable for {}.{}: {}", old->schema()->ks_name(), old->schema()->cf_name(), err);
|
||||
co_await reader.close();
|
||||
_memtables->erase(old);
|
||||
co_return stop_iteration::yes;
|
||||
co_return stop_iteration(_async_gate.is_closed());
|
||||
}
|
||||
|
||||
auto f = consumer(upgrade_to_v2(std::move(reader)));
|
||||
@@ -1426,70 +1437,86 @@ future<> table::write_schema_as_cql(database& db, sstring dir) const {
|
||||
|
||||
}
|
||||
|
||||
future<> table::snapshot(database& db, sstring name, bool skip_flush) {
|
||||
future<> table::snapshot(database& db, sstring name) {
|
||||
auto jsondir = _config.datadir + "/snapshots/" + name;
|
||||
tlogger.debug("snapshot {}: skip_flush={}", jsondir, skip_flush);
|
||||
auto f = skip_flush ? make_ready_future<>() : flush();
|
||||
return f.then([this, &db, jsondir = std::move(jsondir)]() {
|
||||
return with_semaphore(_sstable_deletion_sem, 1, [this, &db, jsondir = std::move(jsondir)]() {
|
||||
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables->all());
|
||||
return do_with(std::move(tables), std::move(jsondir), [this, &db] (std::vector<sstables::shared_sstable>& tables, const sstring& jsondir) {
|
||||
return io_check([&jsondir] { return recursive_touch_directory(jsondir); }).then([this, &db, &jsondir, &tables] {
|
||||
return max_concurrent_for_each(tables, db.get_config().initial_sstable_loading_concurrency(), [&db, &jsondir] (sstables::shared_sstable sstable) {
|
||||
return with_semaphore(db.get_sharded_sst_dir_semaphore().local(), 1, [&jsondir, sstable] {
|
||||
return io_check([sstable, &dir = jsondir] {
|
||||
return sstable->create_links(dir);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([&jsondir, &tables] {
|
||||
return io_check(sync_directory, jsondir);
|
||||
}).finally([this, &tables, &db, &jsondir] {
|
||||
auto shard = std::hash<sstring>()(jsondir) % smp::count;
|
||||
std::unordered_set<sstring> table_names;
|
||||
for (auto& sst : tables) {
|
||||
auto f = sst->get_filename();
|
||||
auto rf = f.substr(sst->get_dir().size() + 1);
|
||||
table_names.insert(std::move(rf));
|
||||
}
|
||||
return smp::submit_to(shard, [requester = this_shard_id(), &jsondir, this, &db,
|
||||
tables = std::move(table_names), datadir = _config.datadir] {
|
||||
tlogger.debug("snapshot {}", jsondir);
|
||||
|
||||
if (!pending_snapshots.contains(jsondir)) {
|
||||
pending_snapshots.emplace(jsondir, make_lw_shared<snapshot_manager>());
|
||||
}
|
||||
auto snapshot = pending_snapshots.at(jsondir);
|
||||
for (auto&& sst: tables) {
|
||||
snapshot->files.insert(std::move(sst));
|
||||
}
|
||||
auto sstable_deletion_guard = co_await get_units(_sstable_deletion_sem, 1);
|
||||
std::exception_ptr ex;
|
||||
|
||||
snapshot->requests.signal(1);
|
||||
auto my_work = make_ready_future<>();
|
||||
if (requester == this_shard_id()) {
|
||||
my_work = snapshot->requests.wait(smp::count).then([&jsondir,
|
||||
&db, snapshot, this] {
|
||||
// this_shard_id() here == requester == this_shard_id() before submit_to() above,
|
||||
// so the db reference is still local
|
||||
return write_schema_as_cql(db, jsondir).handle_exception([&jsondir](std::exception_ptr ptr) {
|
||||
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr);
|
||||
return make_ready_future<>();
|
||||
}).finally([&jsondir, snapshot] () mutable {
|
||||
return seal_snapshot(jsondir).handle_exception([&jsondir] (std::exception_ptr ex) {
|
||||
tlogger.error("Failed to seal snapshot in {}: {}. Ignored.", jsondir, ex);
|
||||
}).then([snapshot] {
|
||||
snapshot->manifest_write.signal(smp::count);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return my_work.finally([snapshot] {
|
||||
return snapshot->manifest_write.wait(1);
|
||||
}).then([snapshot] {});
|
||||
std::vector<sstables::shared_sstable> tables;
|
||||
try {
|
||||
tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables->all());
|
||||
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
|
||||
co_await max_concurrent_for_each(tables, db.get_config().initial_sstable_loading_concurrency(), [&db, &jsondir] (sstables::shared_sstable sstable) {
|
||||
return with_semaphore(db.get_sharded_sst_dir_semaphore().local(), 1, [&jsondir, sstable] {
|
||||
return io_check([sstable, &dir = jsondir] {
|
||||
return sstable->create_links(dir);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
co_await io_check(sync_directory, jsondir);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
auto shard = std::hash<sstring>()(jsondir) % smp::count;
|
||||
std::unordered_set<sstring> table_names;
|
||||
try {
|
||||
for (auto& sst : tables) {
|
||||
auto f = sst->get_filename();
|
||||
auto rf = f.substr(sst->get_dir().size() + 1);
|
||||
table_names.insert(std::move(rf));
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await smp::submit_to(shard, [requester = this_shard_id(), &jsondir, this, &db,
|
||||
tables = std::move(table_names), datadir = _config.datadir, ex = std::move(ex)] () mutable -> future<> {
|
||||
if (!pending_snapshots.contains(jsondir)) {
|
||||
try {
|
||||
pending_snapshots.emplace(jsondir, make_lw_shared<snapshot_manager>());
|
||||
} catch (...) {
|
||||
// abort since the process will hang if we can't coordinate
|
||||
// snapshot across shards, similar to failing to allocation a continuation.
|
||||
tlogger.error("Failed allocating snapshot_manager: {}. Aborting.", std::current_exception());
|
||||
abort();
|
||||
}
|
||||
}
|
||||
auto snapshot = pending_snapshots.at(jsondir);
|
||||
try {
|
||||
for (auto&& sst: tables) {
|
||||
snapshot->files.insert(std::move(sst));
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
tlogger.debug("snapshot {}: signal requests", jsondir);
|
||||
snapshot->requests.signal(1);
|
||||
if (requester == this_shard_id()) {
|
||||
tlogger.debug("snapshot {}: waiting for all shards", jsondir);
|
||||
co_await snapshot->requests.wait(smp::count);
|
||||
// this_shard_id() here == requester == this_shard_id() before submit_to() above,
|
||||
// so the db reference is still local
|
||||
tlogger.debug("snapshot {}: writing schema.cql", jsondir);
|
||||
co_await write_schema_as_cql(db, jsondir).handle_exception([&] (std::exception_ptr ptr) {
|
||||
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr);
|
||||
ex = std::move(ptr);
|
||||
});
|
||||
tlogger.debug("snapshot {}: seal_snapshot", jsondir);
|
||||
co_await seal_snapshot(jsondir).handle_exception([&] (std::exception_ptr ptr) {
|
||||
tlogger.error("Failed to seal snapshot in {}: {}.", jsondir, ptr);
|
||||
ex = std::move(ptr);
|
||||
});
|
||||
snapshot->manifest_write.signal(smp::count);
|
||||
}
|
||||
tlogger.debug("snapshot {}: waiting for manifest on behalf of shard {}", jsondir, requester);
|
||||
co_await snapshot->manifest_write.wait(1);
|
||||
tlogger.debug("snapshot {}: done: error={}", jsondir, ex);
|
||||
if (ex) {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1571,13 +1598,14 @@ bool table::can_flush() const {
|
||||
}
|
||||
|
||||
future<> table::clear() {
|
||||
auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits();
|
||||
if (_commitlog) {
|
||||
for (auto& t : *_memtables) {
|
||||
_commitlog->discard_completed_segments(_schema->id(), t->get_and_discard_rp_set());
|
||||
}
|
||||
}
|
||||
_memtables->clear_and_add();
|
||||
return _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
||||
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
|
||||
}
|
||||
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
@@ -2235,7 +2263,7 @@ std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double
|
||||
|
||||
void
|
||||
table::enable_auto_compaction() {
|
||||
// FIXME: unmute backlog. turn table backlog back on.
|
||||
// XXX: unmute backlog. turn table backlog back on.
|
||||
// see table::disable_auto_compaction() notes.
|
||||
_compaction_disabled_by_user = false;
|
||||
trigger_compaction();
|
||||
@@ -2243,7 +2271,7 @@ table::enable_auto_compaction() {
|
||||
|
||||
future<>
|
||||
table::disable_auto_compaction() {
|
||||
// FIXME: mute backlog. When we disable background compactions
|
||||
// XXX: mute backlog. When we disable background compactions
|
||||
// for the table, we must also disable current backlog of the
|
||||
// table compaction strategy that contributes to the scheduling
|
||||
// group resources prioritization.
|
||||
@@ -2270,9 +2298,8 @@ table::disable_auto_compaction() {
|
||||
// - it will break computation of major compaction descriptor
|
||||
// for new submissions
|
||||
_compaction_disabled_by_user = true;
|
||||
return with_gate(_async_gate, [this] {
|
||||
return compaction_manager().stop_ongoing_compactions("disable auto-compaction", this, sstables::compaction_type::Compaction);
|
||||
});
|
||||
// FIXME: stop ongoing compactions
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 0d250d15ac...9a7ba6d57e
@@ -55,7 +55,12 @@ private:
|
||||
entry(entry&&) noexcept = default;
|
||||
|
||||
~entry() {
|
||||
assert(!is_referenced());
|
||||
if (is_referenced()) {
|
||||
// Live entry_ptr should keep the entry alive, except when the entry failed on loading.
|
||||
// In that case, entry_ptr holders are not supposed to use the pointer, so it's safe
|
||||
// to nullify those entry_ptrs.
|
||||
assert(!ready());
|
||||
}
|
||||
}
|
||||
|
||||
void on_evicted() noexcept override;
|
||||
|
||||
@@ -361,6 +361,14 @@ def test_getitem_attributes_to_get_duplicate(dynamodb, test_table):
|
||||
with pytest.raises(ClientError, match='ValidationException.*Duplicate'):
|
||||
test_table.get_item(Key={'p': p, 'c': c}, AttributesToGet=['a', 'a'], ConsistentRead=True)
|
||||
|
||||
# Verify that it is forbidden to ask for an empty AttributesToGet
|
||||
# Reproduces issue #10332.
|
||||
def test_getitem_attributes_to_get_empty(dynamodb, test_table):
|
||||
p = random_string()
|
||||
c = random_string()
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table.get_item(Key={'p': p, 'c': c}, AttributesToGet=[], ConsistentRead=True)
|
||||
|
||||
# Basic test for DeleteItem, with hash key only
|
||||
def test_delete_item_hash(test_table_s):
|
||||
p = random_string()
|
||||
|
||||
@@ -157,6 +157,13 @@ def test_query_attributes_to_get(dynamodb, test_table):
|
||||
expected_items = [{k: x[k] for k in wanted if k in x} for x in items]
|
||||
assert multiset(expected_items) == multiset(got_items)
|
||||
|
||||
# Verify that it is forbidden to ask for an empty AttributesToGet
|
||||
# Reproduces issue #10332.
|
||||
def test_query_attributes_to_get_empty(dynamodb, test_table):
|
||||
p = random_string()
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
full_query(test_table, KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'}}, AttributesToGet=[])
|
||||
|
||||
# Test that in a table with both hash key and sort key, which keys we can
|
||||
# Query by: We can Query by the hash key, by a combination of both hash and
|
||||
# sort keys, but *cannot* query by just the sort key, and obviously not
|
||||
|
||||
@@ -1030,6 +1030,20 @@ def test_nested_attribute_remove_from_missing_item(test_table_s):
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x[0]')
|
||||
|
||||
# Though in an above test (test_nested_attribute_update_bad_path_dot) we
|
||||
# showed that DynamoDB does not allow REMOVE x.y if attribute x doesn't
|
||||
# exist - and generates a ValidationException, if x *does* exist but y
|
||||
# doesn't, it's fine and the removal should just be silently ignored.
|
||||
def test_nested_attribute_remove_missing_leaf(test_table_s):
|
||||
p = random_string()
|
||||
item = {'p': p, 'a': {'x': 3}, 'b': ['hi']}
|
||||
test_table_s.put_item(Item=item)
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE a.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE b[7]')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE c')
|
||||
# The above UpdateItem calls didn't change anything...
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == item
|
||||
|
||||
# Similarly for other types of bad paths - using [0] on something which
|
||||
# doesn't exist or isn't an array.
|
||||
def test_nested_attribute_update_bad_path_array(test_table_s):
|
||||
|
||||
@@ -207,7 +207,9 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
|
||||
}
|
||||
|
||||
{
|
||||
cf_lru.evict_all();
|
||||
with_allocator(region.allocator(), [] {
|
||||
cf_lru.evict_all();
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.cached_bytes); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, cf.cached_bytes()); // change here
|
||||
@@ -215,6 +217,8 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(region.occupancy().used_space(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
#include <deque>
|
||||
#include <random>
|
||||
#include "utils/lsa/chunked_managed_vector.hh"
|
||||
#include "utils/managed_ref.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
#include <boost/range/algorithm/sort.hpp>
|
||||
#include <boost/range/algorithm/equal.hpp>
|
||||
@@ -203,3 +205,106 @@ SEASTAR_TEST_CASE(tests_reserve_partial) {
|
||||
});
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_clear_and_release) {
|
||||
region region;
|
||||
allocating_section as;
|
||||
|
||||
with_allocator(region.allocator(), [&] {
|
||||
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
|
||||
|
||||
for (uint64_t i = 1; i < 4000; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(i));
|
||||
});
|
||||
}
|
||||
|
||||
v.clear_and_release();
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_chunk_reserve) {
|
||||
region region;
|
||||
allocating_section as;
|
||||
|
||||
for (auto conf :
|
||||
{ // std::make_pair(reserve size, push count)
|
||||
std::make_pair(0, 4000),
|
||||
std::make_pair(100, 4000),
|
||||
std::make_pair(200, 4000),
|
||||
std::make_pair(1000, 4000),
|
||||
std::make_pair(2000, 4000),
|
||||
std::make_pair(3000, 4000),
|
||||
std::make_pair(5000, 4000),
|
||||
std::make_pair(500, 8000),
|
||||
std::make_pair(1000, 8000),
|
||||
std::make_pair(2000, 8000),
|
||||
std::make_pair(8000, 500),
|
||||
})
|
||||
{
|
||||
with_allocator(region.allocator(), [&] {
|
||||
auto [reserve_size, push_count] = conf;
|
||||
testlog.info("Testing reserve({}), {}x emplace_back()", reserve_size, push_count);
|
||||
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
|
||||
v.reserve(reserve_size);
|
||||
uint64_t seed = rand();
|
||||
for (uint64_t i = 0; i < push_count; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(seed + i));
|
||||
BOOST_REQUIRE(**v.begin() == seed);
|
||||
});
|
||||
}
|
||||
auto v_it = v.begin();
|
||||
for (uint64_t i = 0; i < push_count; ++i) {
|
||||
BOOST_REQUIRE(**v_it++ == seed + i);
|
||||
}
|
||||
v.clear_and_release();
|
||||
});
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
|
||||
// the last reserved chunk.
|
||||
SEASTAR_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
|
||||
region region;
|
||||
allocating_section as;
|
||||
|
||||
with_allocator(region.allocator(), [&] {
|
||||
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
|
||||
|
||||
// Fill two chunks
|
||||
v.reserve(2000);
|
||||
for (uint64_t i = 0; i < 2000; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(i));
|
||||
});
|
||||
}
|
||||
|
||||
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
|
||||
v.shrink_to_fit();
|
||||
|
||||
// Leave the last chunk reserved but empty
|
||||
for (uint64_t i = 0; i < 1000; ++i) {
|
||||
v.pop_back();
|
||||
}
|
||||
|
||||
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
|
||||
// with _size not in the last chunk. Should not sigsegv.
|
||||
v.reserve(8000);
|
||||
|
||||
for (uint64_t i = 0; i < 2000; ++i) {
|
||||
as(region, [&] {
|
||||
v.emplace_back(make_managed<uint64_t>(i));
|
||||
});
|
||||
}
|
||||
|
||||
v.clear_and_release();
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
|
||||
@@ -178,3 +178,32 @@ BOOST_AUTO_TEST_CASE(tests_reserve_partial) {
|
||||
BOOST_REQUIRE_EQUAL(v.capacity(), orig_size);
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
|
||||
// the last reserved chunk.
|
||||
BOOST_AUTO_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
|
||||
using vector_type = utils::chunked_vector<std::unique_ptr<uint64_t>>;
|
||||
vector_type v;
|
||||
|
||||
// Fill two chunks
|
||||
v.reserve(vector_type::max_chunk_capacity() * 3 / 2);
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 3 / 2; ++i) {
|
||||
v.emplace_back(std::make_unique<uint64_t>(i));
|
||||
}
|
||||
|
||||
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
|
||||
v.shrink_to_fit();
|
||||
|
||||
// Leave the last chunk reserved but empty
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity(); ++i) {
|
||||
v.pop_back();
|
||||
}
|
||||
|
||||
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
|
||||
// with _size not in the last chunk. Should not sigsegv.
|
||||
v.reserve(vector_type::max_chunk_capacity() * 4);
|
||||
|
||||
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 2; ++i) {
|
||||
v.emplace_back(std::make_unique<uint64_t>(i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -391,11 +391,14 @@ future<> do_with_some_data(std::function<future<> (cql_test_env& env)> func, lw_
|
||||
});
|
||||
}
|
||||
|
||||
future<> take_snapshot(sharded<replica::database>& db, bool skip_flush = false) {
|
||||
return db.invoke_on_all([skip_flush] (replica::database& db) {
|
||||
auto& cf = db.find_column_family("ks", "cf");
|
||||
return cf.snapshot(db, "test", skip_flush);
|
||||
});
|
||||
future<> take_snapshot(sharded<replica::database>& db, bool skip_flush = false, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test") {
|
||||
try {
|
||||
co_await db.local().snapshot_on_all(ks_name, {cf_name}, snapshot_name, skip_flush);
|
||||
} catch (...) {
|
||||
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
|
||||
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
future<> take_snapshot(cql_test_env& e, bool skip_flush = false) {
|
||||
@@ -985,3 +988,38 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
||||
BOOST_REQUIRE(expected.empty());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
||||
auto& db = e.local_db();
|
||||
const auto ts = db_clock::now();
|
||||
auto& tbl = db.find_column_family("ks", "cf");
|
||||
|
||||
auto op = std::optional(tbl.read_in_progress());
|
||||
auto s = tbl.schema();
|
||||
auto q = query::data_querier(
|
||||
tbl.as_mutation_source(),
|
||||
tbl.schema(),
|
||||
database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
default_priority_class(),
|
||||
nullptr);
|
||||
|
||||
auto f = e.db().invoke_on_all([ts] (replica::database& db) {
|
||||
return db.drop_column_family("ks", "cf", [ts] { return make_ready_future<db_clock::time_point>(ts); });
|
||||
});
|
||||
|
||||
// we add a querier to the querier cache while the drop is ongoing
|
||||
auto& qc = db.get_querier_cache();
|
||||
qc.insert(utils::make_random_uuid(), std::move(q), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 1);
|
||||
|
||||
op.reset(); // this should allow the drop to finish
|
||||
f.get();
|
||||
|
||||
// the drop should have cleaned up all entries belonging to that table
|
||||
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -89,10 +89,7 @@ SEASTAR_TEST_CASE(simple_sstable_extension) {
|
||||
// minimal data
|
||||
return e.execute_cql("insert into ks.cf (id, value) values (1, 100);").discard_result().then([&e] {
|
||||
// flush all shards
|
||||
return e.db().invoke_on_all([](replica::database& db) {
|
||||
auto& cf = db.find_column_family("ks", "cf");
|
||||
return cf.flush();
|
||||
}).then([] {
|
||||
return e.db().local().flush_on_all("ks", "cf").then([] {
|
||||
BOOST_REQUIRE(counter > 1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
#include <vector>
|
||||
#include <numeric>
|
||||
@@ -428,6 +429,163 @@ SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged_minimum_size) {
|
||||
return seastar::async([] {
|
||||
// Test that unprivileged section is not starved.
|
||||
//
|
||||
// This scenario is tested: cache max_size is 50 and there are 49 entries in
|
||||
// privileged section. After adding 5 elements (that go to unprivileged
|
||||
// section) all of them should stay in unprivileged section and elements
|
||||
// in privileged section should get evicted.
|
||||
//
|
||||
// Wrong handling of this situation caused problems with BATCH statements
|
||||
// where all prepared statements in the batch have to stay in cache at
|
||||
// the same time for the batch to correctly execute.
|
||||
|
||||
using namespace std::chrono;
|
||||
utils::loading_cache<int, sstring, 1> loading_cache(50, 1h, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
prepare().get();
|
||||
|
||||
// Add 49 elements to privileged section
|
||||
for (int i = 0; i < 49; i++) {
|
||||
// Touch the value with the key "i" twice
|
||||
loading_cache.get_ptr(i, loader).discard_result().get();
|
||||
loading_cache.find(i);
|
||||
}
|
||||
|
||||
// Add 5 elements to unprivileged section
|
||||
for (int i = 50; i < 55; i++) {
|
||||
loading_cache.get_ptr(i, loader).discard_result().get();
|
||||
}
|
||||
|
||||
// Make sure that none of 5 elements were evicted
|
||||
for (int i = 50; i < 55; i++) {
|
||||
BOOST_REQUIRE(loading_cache.find(i) != nullptr);
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 50);
|
||||
});
|
||||
}
|
||||
|
||||
struct sstring_length_entry_size {
|
||||
size_t operator()(const sstring& val) {
|
||||
return val.size();
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_section_size_correctly_calculated) {
|
||||
return seastar::async([] {
|
||||
auto load_len1 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(1)); };
|
||||
auto load_len5 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(5)); };
|
||||
auto load_len10 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(10)); };
|
||||
auto load_len95 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(95)); };
|
||||
|
||||
using namespace std::chrono;
|
||||
utils::loading_cache<int, sstring, 1, utils::loading_cache_reload_enabled::no, sstring_length_entry_size> loading_cache(100, 1h, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
loading_cache.get_ptr(1, load_len1).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
|
||||
|
||||
loading_cache.get_ptr(2, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);
|
||||
|
||||
// Move "2" to privileged section by touching it the second time.
|
||||
loading_cache.get_ptr(2, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);
|
||||
|
||||
loading_cache.get_ptr(3, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);
|
||||
|
||||
// Move "1" to privileged section. load_len10 should not get executed, as "1"
|
||||
// is already in the cache.
|
||||
loading_cache.get_ptr(1, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 10);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);
|
||||
|
||||
// Flood cache with elements of size 10,
|
||||
// unprivileged. "1" and "2" should stay in the privileged section.
|
||||
for (int i = 11; i < 30; i++) {
|
||||
loading_cache.get_ptr(i, load_len10).discard_result().get();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 100);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
|
||||
// Flood cache with elements of size 10, privileged.
|
||||
for (int i = 11; i < 30; i++) {
|
||||
loading_cache.get_ptr(i, load_len10).discard_result().get();
|
||||
loading_cache.get_ptr(i, load_len10).discard_result().get();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 100);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);
|
||||
|
||||
// Add one new unprivileged entry.
|
||||
loading_cache.get_ptr(31, load_len1).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);
|
||||
|
||||
// Add another unprivileged entry, privileged entry should get evicted.
|
||||
loading_cache.get_ptr(32, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);
|
||||
|
||||
// Make it privileged by touching it again.
|
||||
loading_cache.get_ptr(32, load_len5).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);
|
||||
|
||||
// Add another unprivileged entry.
|
||||
loading_cache.get_ptr(33, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
|
||||
// Add another unprivileged entry, privileged entry should get evicted.
|
||||
loading_cache.get_ptr(34, load_len10).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 85);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 21);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
|
||||
// Add a big unprivileged entry, filling almost entire cache.
|
||||
loading_cache.get_ptr(35, load_len95).discard_result().get();
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 75);
|
||||
// We shrink the cache BEFORE adding element,
|
||||
// so after adding the element, the cache
|
||||
// can exceed max_size...
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 95 + 21);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
|
||||
return seastar::async([] {
|
||||
using namespace std::chrono;
|
||||
@@ -449,3 +607,169 @@ SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_loading_cache_remove_leaves_no_old_entries_behind) {
|
||||
using namespace std::chrono;
|
||||
load_count = 0;
|
||||
|
||||
auto load_v1 = [] (auto key) { return make_ready_future<sstring>("v1"); };
|
||||
auto load_v2 = [] (auto key) { return make_ready_future<sstring>("v2"); };
|
||||
auto load_v3 = [] (auto key) { return make_ready_future<sstring>("v3"); };
|
||||
|
||||
{
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
//
|
||||
// Test remove() concurrent with loading
|
||||
//
|
||||
|
||||
auto f = loading_cache.get_ptr(0, [&](auto key) {
|
||||
return yield().then([&] {
|
||||
return load_v1(key);
|
||||
});
|
||||
});
|
||||
|
||||
loading_cache.remove(0);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
auto ptr1 = f.get0();
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
|
||||
loading_cache.remove(0);
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
|
||||
|
||||
//
|
||||
// Test that live ptr1, removed from cache, does not prevent reload of new value
|
||||
//
|
||||
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
|
||||
ptr1 = nullptr;
|
||||
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
|
||||
}
|
||||
|
||||
// Test remove_if()
|
||||
{
|
||||
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
|
||||
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
//
|
||||
// Test remove_if() concurrent with loading
|
||||
//
|
||||
auto f = loading_cache.get_ptr(0, [&](auto key) {
|
||||
return yield().then([&] {
|
||||
return load_v1(key);
|
||||
});
|
||||
});
|
||||
|
||||
loading_cache.remove_if([] (auto&& v) { return v == "v1"; });
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
auto ptr1 = f.get0();
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
|
||||
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
|
||||
loading_cache.remove_if([] (auto&& v) { return v == "v2"; });
|
||||
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
|
||||
|
||||
//
|
||||
// Test that live ptr1, removed from cache, does not prevent reload of new value
|
||||
//
|
||||
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
|
||||
ptr1 = nullptr;
|
||||
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
|
||||
ptr2 = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_prepared_statement_small_cache) {
|
||||
// CQL prepared statement cache uses loading_cache
|
||||
// internally.
|
||||
constexpr auto CACHE_SIZE = 950000;
|
||||
|
||||
cql_test_config small_cache_config;
|
||||
small_cache_config.qp_mcfg = {CACHE_SIZE, CACHE_SIZE};
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE tbl1 (a int, b int, PRIMARY KEY (a))").get();
|
||||
|
||||
auto current_uid = 0;
|
||||
|
||||
// Prepare 100 queries and execute them twice,
|
||||
// filling "privileged section" of loading_cache.
|
||||
std::vector<cql3::prepared_cache_key_type> prepared_ids_privileged;
|
||||
for (int i = 0; i < 100; i++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
prepared_ids_privileged.push_back(prepared_id);
|
||||
}
|
||||
|
||||
int how_many_in_cache = 0;
|
||||
for (auto& prepared_id : prepared_ids_privileged) {
|
||||
if (e.local_qp().get_prepared(prepared_id)) {
|
||||
how_many_in_cache++;
|
||||
}
|
||||
}
|
||||
|
||||
// Assumption: CACHE_SIZE should hold at least 50 queries,
|
||||
// but not more than 99 queries. Other checks in this
|
||||
// test rely on that fact.
|
||||
BOOST_REQUIRE(how_many_in_cache >= 50);
|
||||
BOOST_REQUIRE(how_many_in_cache <= 99);
|
||||
|
||||
// Then prepare 5 queries and execute them one time,
|
||||
// which will occupy "unprivileged section" of loading_cache.
|
||||
std::vector<cql3::prepared_cache_key_type> prepared_ids_unprivileged;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
prepared_ids_unprivileged.push_back(prepared_id);
|
||||
}
|
||||
|
||||
// Check that all of those prepared queries can still be
|
||||
// executed. This simulates as if you wanted to execute
|
||||
// a BATCH with all of them, which requires all of those
|
||||
// prepared statements to be executable (in the cache).
|
||||
for (auto& prepared_id : prepared_ids_unprivileged) {
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
}
|
||||
|
||||
// Deterministic random for reproducibility.
|
||||
testing::local_random_engine.seed(12345);
|
||||
|
||||
// Prepare 500 queries and execute them a random number of times.
|
||||
for (int i = 0; i < 500; i++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
auto times = rand_int(4);
|
||||
for (int j = 0; j < times; j++) {
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare 100 simulated "batches" and execute them
|
||||
// a random number of times.
|
||||
for (int i = 0; i < 100; i++) {
|
||||
std::vector<cql3::prepared_cache_key_type> prepared_ids_batch;
|
||||
for (int j = 0; j < 5; j++) {
|
||||
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
|
||||
prepared_ids_batch.push_back(prepared_id);
|
||||
}
|
||||
auto times = rand_int(4);
|
||||
for (int j = 0; j < times; j++) {
|
||||
for (auto& prepared_id : prepared_ids_batch) {
|
||||
e.execute_prepared(prepared_id, {}).get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}, small_cache_config);
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/testing/on_internal_error.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
@@ -467,3 +468,197 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
|
||||
BOOST_REQUIRE(available_res == sem.available_resources());
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
|
||||
testing::scoped_no_abort_on_internal_error _;
|
||||
|
||||
simple_schema ss;
|
||||
|
||||
const auto dkeys = ss.make_pkeys(3);
|
||||
const auto& dk_ = dkeys[0];
|
||||
const auto& dk0 = dkeys[1];
|
||||
const auto& dk1 = dkeys[2];
|
||||
const auto ck0 = ss.make_ckey(0);
|
||||
const auto ck1 = ss.make_ckey(1);
|
||||
const auto ck2 = ss.make_ckey(2);
|
||||
const auto ck3 = ss.make_ckey(3);
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout);
|
||||
|
||||
auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) {
|
||||
std::vector<mutation_fragment_v2> mfs;
|
||||
{
|
||||
bool need_inject_ps = false;
|
||||
if constexpr (std::is_same_v<std::remove_reference_t<decltype(first_mf)>, mutation_fragment_v2>) {
|
||||
need_inject_ps = !first_mf.is_partition_start();
|
||||
} else {
|
||||
need_inject_ps = !std::is_same_v<std::remove_reference_t<decltype(first_mf)>, partition_start>;
|
||||
}
|
||||
if (need_inject_ps) {
|
||||
testlog.trace("Injecting partition start");
|
||||
mfs.emplace_back(*ss.schema(), permit, partition_start(dk_, {}));
|
||||
if (at != std::numeric_limits<unsigned>::max()) {
|
||||
++at;
|
||||
}
|
||||
}
|
||||
mfs.emplace_back(*ss.schema(), permit, std::move(first_mf));
|
||||
auto _ = std::vector<mutation_fragment_v2*>{&mfs.emplace_back(*ss.schema(), permit, std::move(mf))..., };
|
||||
}
|
||||
|
||||
testlog.info("Checking scenario {} with validator", desc);
|
||||
{
|
||||
unsigned i = 0;
|
||||
mutation_fragment_stream_validator validator(*ss.schema());
|
||||
bool valid = true;
|
||||
for (const auto& mf : mfs) {
|
||||
testlog.trace("validate fragment [{}] {} @ {}", i, mf.mutation_fragment_kind(), mf.position());
|
||||
valid &= validator(mf);
|
||||
if (expect_valid) {
|
||||
if (!valid) {
|
||||
BOOST_FAIL(fmt::format("Unexpected invalid fragment {} @ {}", mf.mutation_fragment_kind(), mf.position()));
|
||||
}
|
||||
} else {
|
||||
if (i == at && valid) {
|
||||
BOOST_FAIL(fmt::format("Unexpected valid fragment {} @ {}", mf.mutation_fragment_kind(), mf.position()));
|
||||
}
|
||||
}
|
||||
++i;
|
||||
}
|
||||
if (expect_valid || i <= at) {
|
||||
valid &= validator.on_end_of_stream();
|
||||
BOOST_REQUIRE(valid == expect_valid);
|
||||
}
|
||||
}
|
||||
|
||||
testlog.info("Checking scenario {} with validating filter", desc);
|
||||
{
|
||||
unsigned i = 0;
|
||||
mutation_fragment_stream_validating_filter validator(get_name(), *ss.schema(), mutation_fragment_stream_validation_level::clustering_key);
|
||||
for (const auto& mf : mfs) {
|
||||
testlog.trace("validate fragment [{}] {} @ {}", i, mf.mutation_fragment_kind(), mf.position());
|
||||
try {
|
||||
validator(mf);
|
||||
if (!expect_valid && i == at) {
|
||||
BOOST_FAIL(fmt::format("Unexpected valid fragment {} @ {}", mf.mutation_fragment_kind(), mf.position()));
|
||||
}
|
||||
} catch (invalid_mutation_fragment_stream& e) {
|
||||
if (expect_valid || i < at) {
|
||||
BOOST_FAIL(fmt::format("Unexpected invalid fragment {} @ {}: {}", mf.mutation_fragment_kind(), mf.position(), e));
|
||||
} else {
|
||||
testlog.trace("Got expected exception for fragment {} @ {}: {}", mf.mutation_fragment_kind(), mf.position(), e);
|
||||
}
|
||||
}
|
||||
++i;
|
||||
}
|
||||
if (expect_valid || i <= at) {
|
||||
try {
|
||||
validator.on_end_of_stream();
|
||||
if (!expect_valid) {
|
||||
BOOST_FAIL("Unexpected valid EOS");
|
||||
}
|
||||
} catch (invalid_mutation_fragment_stream& e) {
|
||||
if (expect_valid) {
|
||||
BOOST_FAIL(fmt::format("Unexpected invalid EOS: {}", e));
|
||||
} else {
|
||||
testlog.trace("Got expected exception at EOS: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto expect_valid = [&] (const char* desc, auto&&... mf) {
|
||||
return expect(true, desc, std::numeric_limits<unsigned>::max(), std::move(mf)...);
|
||||
};
|
||||
|
||||
auto expect_invalid_at_eos = [&] (const char* desc, auto&&... mf) {
|
||||
return expect(false, desc, std::numeric_limits<unsigned>::max(), std::move(mf)...);
|
||||
};
|
||||
|
||||
auto expect_invalid_at_fragment = [&] (const char* desc, unsigned at, auto&&... mf) {
|
||||
return expect(false, desc, at, std::move(mf)...);
|
||||
};
|
||||
|
||||
expect_valid(
|
||||
"kitchen sink",
|
||||
partition_start(dk0, {}),
|
||||
ss.make_static_row_v2(permit, "v"),
|
||||
ss.make_row_v2(permit, ck0, "ck0"),
|
||||
ss.make_row_v2(permit, ck1, "ck1"),
|
||||
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
|
||||
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
|
||||
range_tombstone_change(position_in_partition::before_key(ck2), {ss.new_tombstone()}),
|
||||
ss.make_row_v2(permit, ck2, "ck2"),
|
||||
range_tombstone_change(position_in_partition::after_key(ck2), {}),
|
||||
partition_end{},
|
||||
partition_start(dk1, {}),
|
||||
partition_end{});
|
||||
|
||||
expect_valid(
|
||||
"static row alone",
|
||||
partition_start(dk0, {}),
|
||||
ss.make_static_row_v2(permit, "v"),
|
||||
partition_end{});
|
||||
|
||||
expect_valid(
|
||||
"clustering row alone",
|
||||
partition_start(dk0, {}),
|
||||
ss.make_row_v2(permit, ck0, "ck0"),
|
||||
partition_end{});
|
||||
|
||||
expect_valid(
|
||||
"2 range tombstone changes",
|
||||
partition_start(dk0, {}),
|
||||
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
|
||||
range_tombstone_change(position_in_partition::after_key(ck2), {}),
|
||||
partition_end{});
|
||||
|
||||
expect_valid(
|
||||
"null range tombstone change alone",
|
||||
partition_start(dk0, {}),
|
||||
range_tombstone_change(position_in_partition::after_key(ck2), {}),
|
||||
partition_end{});
|
||||
|
||||
expect_invalid_at_eos(
|
||||
"missing partition end at EOS",
|
||||
partition_start(dk0, {}));
|
||||
|
||||
expect_invalid_at_fragment(
|
||||
"active range tombstone end at partition end",
|
||||
2,
|
||||
partition_start(dk0, {}),
|
||||
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
|
||||
partition_end{});
|
||||
|
||||
const auto ps = mutation_fragment_v2(*ss.schema(), permit, partition_start(dk1, {}));
|
||||
const auto sr = ss.make_static_row_v2(permit, "v");
|
||||
const auto cr = ss.make_row_v2(permit, ck2, "ck2");
|
||||
const auto rtc = mutation_fragment_v2(*ss.schema(), permit, range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}));
|
||||
const auto pe = mutation_fragment_v2(*ss.schema(), permit, partition_end{});
|
||||
|
||||
auto check_invalid_after = [&] (auto&& mf_raw, std::initializer_list<const mutation_fragment_v2*> invalid_mfs) {
|
||||
auto mf = mutation_fragment_v2(*ss.schema(), permit, std::move(mf_raw));
|
||||
for (const auto invalid_mf : invalid_mfs) {
|
||||
std::string desc;
|
||||
if (mf.position().region() == partition_region::clustered) {
|
||||
desc = fmt::format("{} @ {} after {} @ {}", invalid_mf->mutation_fragment_kind(), invalid_mf->position(), mf.mutation_fragment_kind(), mf.position());
|
||||
} else {
|
||||
desc = fmt::format("{} after {}", invalid_mf->mutation_fragment_kind(), mf.mutation_fragment_kind());
|
||||
}
|
||||
|
||||
expect_invalid_at_fragment(
|
||||
desc.c_str(),
|
||||
1,
|
||||
mutation_fragment_v2(*ss.schema(), permit, mf),
|
||||
mutation_fragment_v2(*ss.schema(), permit, *invalid_mf));
|
||||
}
|
||||
};
|
||||
|
||||
check_invalid_after(partition_start(dk0, {}), {&ps});
|
||||
check_invalid_after(sr, {&sr, &ps});
|
||||
check_invalid_after(cr, {&ps, &sr, &cr});
|
||||
check_invalid_after(rtc, {&ps, &sr});
|
||||
check_invalid_after(pe, {&sr, &cr, &rtc, &pe});
|
||||
}
|
||||
|
||||
@@ -690,6 +690,7 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
};
|
||||
|
||||
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
|
||||
testlog.trace("Expected {} == {}", c1, c2);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
|
||||
};
|
||||
@@ -711,9 +712,11 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2));
|
||||
|
||||
// Origin doesn't compare ttl (is it wise?)
|
||||
assert_equal(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2));
|
||||
// But we do. See https://github.com/scylladb/scylla/issues/10156
|
||||
// and https://github.com/scylladb/scylla/issues/10173
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
|
||||
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value1")),
|
||||
|
||||
@@ -24,20 +24,30 @@ static void add_entry(logalloc::region& r,
|
||||
{
|
||||
logalloc::allocating_section as;
|
||||
as(r, [&] {
|
||||
sstables::key sst_key = sstables::key::from_partition_key(s, key);
|
||||
page._entries.push_back(make_managed<index_entry>(
|
||||
managed_bytes(sst_key.get_bytes()),
|
||||
position,
|
||||
managed_ref<promoted_index>()));
|
||||
with_allocator(r.allocator(), [&] {
|
||||
sstables::key sst_key = sstables::key::from_partition_key(s, key);
|
||||
page._entries.push_back(make_managed<index_entry>(
|
||||
managed_bytes(sst_key.get_bytes()),
|
||||
position,
|
||||
managed_ref<promoted_index>()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static partition_index_page make_page0(logalloc::region& r, simple_schema& s) {
|
||||
partition_index_page page;
|
||||
auto destroy_page = defer([&] {
|
||||
with_allocator(r.allocator(), [&] {
|
||||
auto p = std::move(page);
|
||||
});
|
||||
});
|
||||
|
||||
add_entry(r, *s.schema(), page, s.make_pkey(0).key(), 0);
|
||||
add_entry(r, *s.schema(), page, s.make_pkey(1).key(), 1);
|
||||
add_entry(r, *s.schema(), page, s.make_pkey(2).key(), 2);
|
||||
add_entry(r, *s.schema(), page, s.make_pkey(3).key(), 3);
|
||||
|
||||
destroy_page.cancel();
|
||||
return page;
|
||||
}
|
||||
|
||||
@@ -128,6 +138,47 @@ SEASTAR_THREAD_TEST_CASE(test_caching) {
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static future<> ignore_result(future<T>&& f) {
|
||||
return f.then_wrapped([] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
// expected, silence warnings about ignored failed futures
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_exception_while_loading) {
|
||||
::lru lru;
|
||||
simple_schema s;
|
||||
logalloc::region r;
|
||||
partition_index_cache cache(lru, r);
|
||||
|
||||
auto clear_lru = defer([&] {
|
||||
with_allocator(r.allocator(), [&] {
|
||||
lru.evict_all();
|
||||
});
|
||||
});
|
||||
|
||||
auto page0_loader = [&] (partition_index_cache::key_type k) {
|
||||
return yield().then([&] {
|
||||
return make_page0(r, s);
|
||||
});
|
||||
};
|
||||
|
||||
memory::with_allocation_failures([&] {
|
||||
cache.evict_gently().get();
|
||||
auto f0 = ignore_result(cache.get_or_load(0, page0_loader));
|
||||
auto f1 = ignore_result(cache.get_or_load(0, page0_loader));
|
||||
f0.get();
|
||||
f1.get();
|
||||
});
|
||||
|
||||
auto ptr = cache.get_or_load(0, page0_loader).get0();
|
||||
has_page0(ptr);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_auto_clear) {
|
||||
::lru lru;
|
||||
simple_schema s;
|
||||
|
||||
@@ -6,6 +6,7 @@ from cassandra.cluster import ConsistencyLevel
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
from util import new_test_table
|
||||
from nodetool import flush
|
||||
|
||||
def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
|
||||
'''Test that the stream IDs chosen for CDC log entries come from the CDC generation
|
||||
@@ -31,3 +32,16 @@ def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
|
||||
|
||||
assert(log_stream_ids.issubset(stream_ids))
|
||||
|
||||
|
||||
# Test for #10473 - reading logs (from sstable) after dropping
|
||||
# column in base.
|
||||
def test_cdc_alter_table_drop_column(scylla_only, cql, test_keyspace):
|
||||
schema = "pk int primary key, v int"
|
||||
extra = " with cdc = {'enabled': true}"
|
||||
with new_test_table(cql, test_keyspace, schema, extra) as table:
|
||||
cql.execute(f"insert into {table} (pk, v) values (0, 0)")
|
||||
cql.execute(f"insert into {table} (pk, v) values (1, null)")
|
||||
flush(cql, table)
|
||||
flush(cql, table + "_scylla_cdc_log")
|
||||
cql.execute(f"alter table {table} drop v")
|
||||
cql.execute(f"select * from {table}_scylla_cdc_log")
|
||||
|
||||
@@ -115,3 +115,26 @@ def test_operator_ne_not_supported(cql, table1):
|
||||
cql.execute(f'SELECT a FROM {table1} WHERE a != 0')
|
||||
with pytest.raises(InvalidRequest, match='Unsupported.*!='):
|
||||
cql.execute(f'SELECT a FROM {table1} WHERE token(a) != 0')
|
||||
|
||||
# Test that the fact that a column is indexed does not cause us to fetch
|
||||
# incorrect results from a filtering query (issue #10300).
|
||||
def test_index_with_in_relation(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int, c int, v boolean, primary key (p,c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"create index on {table}(v)")
|
||||
for p, c, v in [(0,0,True),(0,1,False),(0,2,True),(0,3,False),
|
||||
(1,0,True),(1,1,False),(1,2,True),(1,3,False),
|
||||
(2,0,True),(2,1,False),(2,2,True),(2,3,False)]:
|
||||
cql.execute(f"insert into {table} (p,c,v) values ({p}, {c}, {v})")
|
||||
res = cql.execute(f"select * from {table} where p in (0,1) and v = False ALLOW FILTERING")
|
||||
assert set(res) == set([(0,1,False),(0,3,False),(1,1,False), (1,3,False)])
|
||||
|
||||
# Test that LIKE operator works fine as a filter when the filtered column
|
||||
# has descending order. Regression test for issue #10183, when it was incorrectly
|
||||
# rejected as a "non-string" column.
|
||||
def test_filter_like_on_desc_column(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "a int, b text, primary key(a, b)",
|
||||
extra="with clustering order by (b desc)") as table:
|
||||
cql.execute(f"INSERT INTO {table} (a, b) VALUES (1, 'one')")
|
||||
res = cql.execute(f"SELECT b FROM {table} WHERE b LIKE '%%%' ALLOW FILTERING")
|
||||
assert res.one().b == "one"
|
||||
|
||||
@@ -85,3 +85,17 @@ def test_mv_empty_string_partition_key(cql, test_keyspace):
|
||||
# because Cassandra forbids an empty partition key on select
|
||||
with pytest.raises(InvalidRequest, match='Key may not be empty'):
|
||||
cql.execute(f"SELECT * FROM {mv} WHERE v=''")
|
||||
|
||||
# Refs #10851. The code used to create a wildcard selection for all columns,
|
||||
# which erroneously also includes static columns if such are present in the
|
||||
# base table. Currently views only operate on regular columns and the filtering
|
||||
# code assumes that. TODO: once we implement static column support for materialized
|
||||
# views, this test case will be a nice regression test to ensure that everything still
|
||||
# works if the static columns are *not* used in the view.
|
||||
def test_filter_with_unused_static_column(cql, test_keyspace):
|
||||
schema = 'p int, c int, v int, s int static, primary key (p,c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
with new_materialized_view(cql, table, select='p,c,v', pk='p,c,v', where='p IS NOT NULL and c IS NOT NULL and v = 44') as mv:
|
||||
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES (42,43,44)")
|
||||
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES (1,2,3)")
|
||||
assert list(cql.execute(f"SELECT * FROM {mv}")) == [(42, 43, 44)]
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
import pytest
|
||||
import util
|
||||
import nodetool
|
||||
import json
|
||||
|
||||
def test_snapshots_table(scylla_only, cql, test_keyspace):
|
||||
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
|
||||
@@ -32,3 +33,31 @@ def test_runtime_info(scylla_only, cql):
|
||||
|
||||
def test_versions(scylla_only, cql):
|
||||
_check_exists(cql, "versions", ("key", "build_id", "build_mode", "version"))
|
||||
|
||||
# Check reading the system.config table, which should list all configuration
|
||||
# parameters. As we noticed in issue #10047, each type of configuration
|
||||
# parameter can have a different function for printing it out, and some of
|
||||
# those may be wrong so we want to check as many as we can - including
|
||||
# specifically the experimental_features option which was wrong in #10047.
|
||||
def test_system_config_read(scylla_only, cql):
|
||||
# All rows should have the columns name, source, type and value:
|
||||
rows = list(cql.execute("SELECT name, source, type, value FROM system.config"))
|
||||
values = dict()
|
||||
for row in rows:
|
||||
values[row.name] = row.value
|
||||
# Check that experimental_features exists and makes sense.
|
||||
# It needs to be a JSON-formatted strings, and the strings need to be
|
||||
# ASCII feature names - not binary garbage as it was in #10047.
|
||||
assert 'experimental_features' in values
|
||||
obj = json.loads(values['experimental_features'])
|
||||
assert isinstance(obj, list)
|
||||
assert isinstance(obj[0], str)
|
||||
assert obj[0] and obj[0].isascii() and obj[0].isprintable()
|
||||
# Check formatting of tri_mode_restriction like
|
||||
# restrict_replication_simplestrategy. These need to be one of
|
||||
# allowed string values 0, 1, true, false or warn - but in particular
|
||||
# non-empty and printable ASCII, not garbage.
|
||||
assert 'restrict_replication_simplestrategy' in values
|
||||
obj = json.loads(values['restrict_replication_simplestrategy'])
|
||||
assert isinstance(obj, str)
|
||||
assert obj and obj.isascii() and obj.isprintable()
|
||||
|
||||
@@ -105,6 +105,19 @@ def new_materialized_view(cql, table, select, pk, where):
|
||||
finally:
|
||||
cql.execute(f"DROP MATERIALIZED VIEW {mv}")
|
||||
|
||||
# A utility function for creating a new temporary secondary index of
|
||||
# an existing table.
|
||||
@contextmanager
|
||||
def new_secondary_index(cql, table, column, name='', extra=''):
|
||||
keyspace = table.split('.')[0]
|
||||
if not name:
|
||||
name = unique_name()
|
||||
cql.execute(f"CREATE INDEX {name} ON {table} ({column}) {extra}")
|
||||
try:
|
||||
yield f"{keyspace}.{name}"
|
||||
finally:
|
||||
cql.execute(f"DROP INDEX {keyspace}.{name}")
|
||||
|
||||
def project(column_name_string, rows):
|
||||
"""Returns a list of column values from each of the rows."""
|
||||
return [getattr(r, column_name_string) for r in rows]
|
||||
|
||||
@@ -626,7 +626,12 @@ public:
|
||||
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr)).get();
|
||||
auto stop_mm = defer([&mm] { mm.stop().get(); });
|
||||
|
||||
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
|
||||
cql3::query_processor::memory_config qp_mcfg;
|
||||
if (cfg_in.qp_mcfg) {
|
||||
qp_mcfg = *cfg_in.qp_mcfg;
|
||||
} else {
|
||||
qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
|
||||
}
|
||||
auto local_data_dict = seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db));
|
||||
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config)).get();
|
||||
auto stop_qp = defer([&qp] { qp.stop().get(); });
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "cql3/query_options_fwd.hh"
|
||||
#include "cql3/values.hh"
|
||||
#include "cql3/prepared_statements_cache.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "bytes.hh"
|
||||
#include "schema.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
@@ -85,6 +86,7 @@ public:
|
||||
// Scheduling groups are overwritten unconditionally, see get_scheduling_groups().
|
||||
std::optional<replica::database_config> dbcfg;
|
||||
std::set<sstring> disabled_features;
|
||||
std::optional<cql3::query_processor::memory_config> qp_mcfg;
|
||||
|
||||
cql_test_config();
|
||||
cql_test_config(const cql_test_config&);
|
||||
|
||||
32
test/rest_api/rest_util.py
Normal file
32
test/rest_api/rest_util.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import time
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
# A utility function for creating a new temporary snapshot.
|
||||
# If no keyspaces are given, a snapshot is taken over all keyspaces and tables.
|
||||
# If no tables are given, a snapshot is taken over all tables in the keyspace.
|
||||
# If no tag is given, a unique tag will be computed using the current time, in milliseconds.
|
||||
# It can be used in a "with", as:
|
||||
# with new_test_snapshot(cql, tag, keyspace, [table(s)]) as snapshot:
|
||||
# This is not a fixture - see those in conftest.py.
|
||||
@contextmanager
|
||||
def new_test_snapshot(rest_api, keyspaces=[], tables=[], tag=""):
|
||||
if not tag:
|
||||
tag = f"test_snapshot_{int(time.time() * 1000)}"
|
||||
params = { "tag": tag }
|
||||
if type(keyspaces) is str:
|
||||
params["kn"] = keyspaces
|
||||
else:
|
||||
params["kn"] = ",".join(keyspaces)
|
||||
if tables:
|
||||
if type(tables) is str:
|
||||
params["cf"] = tables
|
||||
else:
|
||||
params["cf"] = ",".join(tables)
|
||||
resp = rest_api.send("POST", "storage_service/snapshots", params)
|
||||
resp.raise_for_status()
|
||||
try:
|
||||
yield tag
|
||||
finally:
|
||||
resp = rest_api.send("DELETE", "storage_service/snapshots", params)
|
||||
resp.raise_for_status()
|
||||
@@ -8,7 +8,8 @@ import requests
|
||||
|
||||
# Use the util.py library from ../cql-pytest:
|
||||
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
|
||||
from util import unique_name, new_test_table
|
||||
from util import unique_name, new_test_table, new_test_keyspace, new_materialized_view, new_secondary_index
|
||||
from rest_util import new_test_snapshot
|
||||
|
||||
# "keyspace" function: Creates and returns a temporary keyspace to be
|
||||
# used in tests that need a keyspace. The keyspace is created with RF=1,
|
||||
@@ -95,3 +96,110 @@ def test_storage_service_keyspace_offstrategy_compaction_tables(cql, this_dc, re
|
||||
assert resp.status_code == requests.codes.bad_request
|
||||
|
||||
cql.execute(f"DROP KEYSPACE {keyspace}")
|
||||
|
||||
def test_storage_service_snapshot(cql, this_dc, rest_api):
|
||||
resp = rest_api.send("GET", "storage_service/snapshots")
|
||||
resp.raise_for_status()
|
||||
|
||||
def verify_snapshot_details(expected):
|
||||
resp = rest_api.send("GET", "storage_service/snapshots")
|
||||
found = False
|
||||
for data in resp.json():
|
||||
if data['key'] == expected['key']:
|
||||
assert not found
|
||||
found = True
|
||||
sort_key = lambda v: f"{v['ks']}-{v['cf']}"
|
||||
value = sorted([v for v in data['value'] if not v['ks'].startswith('system')], key=sort_key)
|
||||
expected_value = sorted(expected['value'], key=sort_key)
|
||||
assert len(value) == len(expected_value), f"length mismatch: expected {expected_value} but got {value}"
|
||||
for i in range(len(value)):
|
||||
v = value[i]
|
||||
# normalize `total` and `live`
|
||||
# since we care only if they are zero or not
|
||||
v['total'] = 1 if v['total'] else 0
|
||||
v['live'] = 1 if v['live'] else 0
|
||||
ev = expected_value[i]
|
||||
assert v == ev
|
||||
assert found, f"key='{expected['key']}' not found in {resp.json()}"
|
||||
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace0:
|
||||
with new_test_table(cql, keyspace0, "p text PRIMARY KEY") as table00:
|
||||
ks0, cf00 = table00.split('.')
|
||||
stmt = cql.prepare(f"INSERT INTO {table00} (p) VALUES (?)")
|
||||
cql.execute(stmt, ["pk0"])
|
||||
|
||||
# single keyspace / table
|
||||
with new_test_snapshot(rest_api, ks0, cf00) as snapshot0:
|
||||
verify_snapshot_details({
|
||||
'key': snapshot0,
|
||||
'value': [{'ks': ks0, 'cf': cf00, 'total': 1, 'live': 0}]
|
||||
})
|
||||
|
||||
cql.execute(f"TRUNCATE {table00}")
|
||||
verify_snapshot_details({
|
||||
'key': snapshot0,
|
||||
'value': [{'ks': ks0, 'cf': cf00, 'total': 1, 'live': 1}]
|
||||
})
|
||||
|
||||
with new_test_table(cql, keyspace0, "p text PRIMARY KEY") as table01:
|
||||
_, cf01 = table01.split('.')
|
||||
stmt = cql.prepare(f"INSERT INTO {table01} (p) VALUES (?)")
|
||||
cql.execute(stmt, ["pk1"])
|
||||
|
||||
# single keyspace / multiple tables
|
||||
with new_test_snapshot(rest_api, ks0, [cf00, cf01]) as snapshot1:
|
||||
verify_snapshot_details({
|
||||
'key': snapshot1,
|
||||
'value': [
|
||||
{'ks': ks0, 'cf': cf00, 'total': 0, 'live': 0},
|
||||
{'ks': ks0, 'cf': cf01, 'total': 1, 'live': 0}
|
||||
]
|
||||
})
|
||||
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace1:
|
||||
with new_test_table(cql, keyspace1, "p text PRIMARY KEY") as table10:
|
||||
ks1, cf10 = table10.split('.')
|
||||
|
||||
# multiple keyspaces
|
||||
with new_test_snapshot(rest_api, [ks0, ks1]) as snapshot2:
|
||||
verify_snapshot_details({
|
||||
'key': snapshot2,
|
||||
'value': [
|
||||
{'ks': ks0, 'cf': cf00, 'total': 0, 'live': 0},
|
||||
{'ks': ks0, 'cf': cf01, 'total': 1, 'live': 0},
|
||||
{'ks': ks1, 'cf': cf10, 'total': 0, 'live': 0}
|
||||
]
|
||||
})
|
||||
|
||||
# all keyspaces
|
||||
with new_test_snapshot(rest_api, ) as snapshot3:
|
||||
verify_snapshot_details({
|
||||
'key': snapshot3,
|
||||
'value': [
|
||||
{'ks': ks0, 'cf': cf00, 'total': 0, 'live': 0},
|
||||
{'ks': ks0, 'cf': cf01, 'total': 1, 'live': 0},
|
||||
{'ks': ks1, 'cf': cf10, 'total': 0, 'live': 0}
|
||||
]
|
||||
})
|
||||
|
||||
# Verify that snapshots of materialized views and secondary indexes are disallowed.
|
||||
def test_storage_service_snapshot_mv_si(cql, this_dc, rest_api):
|
||||
resp = rest_api.send("GET", "storage_service/snapshots")
|
||||
resp.raise_for_status()
|
||||
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as table:
|
||||
with new_materialized_view(cql, table, '*', 'v, p', 'v is not null and p is not null') as mv:
|
||||
try:
|
||||
with new_test_snapshot(rest_api, keyspace, mv.split('.')[1]) as snap:
|
||||
pytest.fail(f"Snapshot of materialized view {mv} should have failed")
|
||||
except requests.HTTPError:
|
||||
pass
|
||||
|
||||
with new_secondary_index(cql, table, 'v') as si:
|
||||
try:
|
||||
with new_test_snapshot(rest_api, keyspace, si.split('.')[1]) as snap:
|
||||
pytest.fail(f"Snapshot of secondary index {si} should have failed")
|
||||
except requests.HTTPError:
|
||||
pass
|
||||
|
||||
Submodule tools/java updated: b1e09c8b8f...2241a63bda
@@ -444,7 +444,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
break;
|
||||
case auth_state::AUTHENTICATION:
|
||||
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
|
||||
assert(cqlop == cql_binary_opcode::AUTH_RESPONSE || cqlop == cql_binary_opcode::CREDENTIALS);
|
||||
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
|
||||
throw exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop)));
|
||||
}
|
||||
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
|
||||
client_state.set_auth_state(auth_state::READY);
|
||||
}
|
||||
@@ -1219,7 +1221,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
||||
return make_read_timeout_error(stream, exceptions::exception_code::READ_TIMEOUT, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
||||
}
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
@@ -1247,7 +1249,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_writ
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
if (_version < 4) {
|
||||
return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state);
|
||||
return make_mutation_write_timeout_error(stream, exceptions::exception_code::WRITE_TIMEOUT, std::move(msg), cl, received, blockfor, type, tr_state);
|
||||
}
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
|
||||
1
types.cc
1
types.cc
@@ -777,6 +777,7 @@ bool abstract_type::is_native() const { return !is_collection() && !is_tuple();
|
||||
bool abstract_type::is_string() const {
|
||||
struct visitor {
|
||||
bool operator()(const abstract_type&) { return false; }
|
||||
bool operator()(const reversed_type_impl& t) { return t.underlying_type()->is_string(); }
|
||||
bool operator()(const string_type_impl&) { return true; }
|
||||
};
|
||||
return visit(*this, visitor{});
|
||||
|
||||
@@ -326,6 +326,7 @@ public:
|
||||
}
|
||||
|
||||
size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept {
|
||||
return with_allocator(standard_allocator(), [&] {
|
||||
size_t count = 0;
|
||||
auto disposer = [] (auto* p) noexcept {};
|
||||
while (start != end) {
|
||||
@@ -338,6 +339,7 @@ public:
|
||||
}
|
||||
}
|
||||
return count;
|
||||
});
|
||||
}
|
||||
public:
|
||||
/// \brief Constructs a cached_file.
|
||||
@@ -464,8 +466,10 @@ public:
|
||||
inline
|
||||
void cached_file::cached_page::on_evicted() noexcept {
|
||||
parent->on_evicted(*this);
|
||||
cached_file::cache_type::iterator it(this);
|
||||
it.erase(page_idx_less_comparator());
|
||||
with_allocator(standard_allocator(), [this] {
|
||||
cached_file::cache_type::iterator it(this);
|
||||
it.erase(page_idx_less_comparator());
|
||||
});
|
||||
}
|
||||
|
||||
class cached_file_impl : public file_impl {
|
||||
|
||||
@@ -376,7 +376,9 @@ chunked_vector<T, max_contiguous_allocation>::make_room(size_t n, bool stop_afte
|
||||
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
|
||||
// FIXME: realloc? maybe not worth the complication; only works for PODs
|
||||
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
|
||||
if (_size > _capacity - last_chunk_capacity) {
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
|
||||
}
|
||||
_chunks.back() = std::move(new_last_chunk);
|
||||
_capacity += capacity_increase;
|
||||
}
|
||||
|
||||
@@ -108,81 +108,82 @@ template<typename Key,
|
||||
typename Alloc = std::pmr::polymorphic_allocator<>>
|
||||
class loading_cache {
|
||||
|
||||
using loading_cache_clock_type = seastar::lowres_clock;
|
||||
using safe_link_list_hook = bi::list_base_hook<bi::link_mode<bi::safe_link>>;
|
||||
using loading_cache_clock_type = seastar::lowres_clock;
|
||||
using safe_link_list_hook = bi::list_base_hook<bi::link_mode<bi::safe_link>>;
|
||||
|
||||
class timestamped_val {
|
||||
public:
|
||||
using value_type = Tp;
|
||||
using loading_values_type = typename utils::loading_shared_values<Key, timestamped_val, Hash, EqualPred, LoadingSharedValuesStats, 256>;
|
||||
class lru_entry;
|
||||
class value_ptr;
|
||||
class timestamped_val {
|
||||
public:
|
||||
using value_type = Tp;
|
||||
using loading_values_type = typename utils::loading_shared_values<Key, timestamped_val, Hash, EqualPred, LoadingSharedValuesStats, 256>;
|
||||
class lru_entry;
|
||||
class value_ptr;
|
||||
|
||||
private:
|
||||
value_type _value;
|
||||
loading_cache_clock_type::time_point _loaded;
|
||||
loading_cache_clock_type::time_point _last_read;
|
||||
lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back
|
||||
size_t _size = 0;
|
||||
private:
|
||||
value_type _value;
|
||||
loading_cache_clock_type::time_point _loaded;
|
||||
loading_cache_clock_type::time_point _last_read;
|
||||
lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back
|
||||
size_t _size = 0;
|
||||
|
||||
public:
|
||||
timestamped_val(value_type val)
|
||||
: _value(std::move(val))
|
||||
, _loaded(loading_cache_clock_type::now())
|
||||
, _last_read(_loaded)
|
||||
, _size(EntrySize()(_value))
|
||||
{}
|
||||
timestamped_val(timestamped_val&&) = default;
|
||||
public:
|
||||
timestamped_val(value_type val)
|
||||
: _value(std::move(val))
|
||||
, _loaded(loading_cache_clock_type::now())
|
||||
, _last_read(_loaded)
|
||||
, _size(EntrySize()(_value))
|
||||
{}
|
||||
timestamped_val(timestamped_val&&) = default;
|
||||
|
||||
timestamped_val& operator=(value_type new_val) {
|
||||
assert(_lru_entry_ptr);
|
||||
timestamped_val& operator=(value_type new_val) {
|
||||
assert(_lru_entry_ptr);
|
||||
|
||||
_value = std::move(new_val);
|
||||
_loaded = loading_cache_clock_type::now();
|
||||
_lru_entry_ptr->cache_size() -= _size;
|
||||
_size = EntrySize()(_value);
|
||||
_lru_entry_ptr->cache_size() += _size;
|
||||
return *this;
|
||||
}
|
||||
_value = std::move(new_val);
|
||||
_loaded = loading_cache_clock_type::now();
|
||||
_lru_entry_ptr->owning_section_size() -= _size;
|
||||
_size = EntrySize()(_value);
|
||||
_lru_entry_ptr->owning_section_size() += _size;
|
||||
return *this;
|
||||
}
|
||||
|
||||
value_type& value() noexcept { return _value; }
|
||||
const value_type& value() const noexcept { return _value; }
|
||||
value_type& value() noexcept { return _value; }
|
||||
const value_type& value() const noexcept { return _value; }
|
||||
|
||||
static const timestamped_val& container_of(const value_type& value) {
|
||||
return *bi::get_parent_from_member(&value, ×tamped_val::_value);
|
||||
}
|
||||
static const timestamped_val& container_of(const value_type& value) {
|
||||
return *bi::get_parent_from_member(&value, ×tamped_val::_value);
|
||||
}
|
||||
|
||||
loading_cache_clock_type::time_point last_read() const noexcept {
|
||||
return _last_read;
|
||||
}
|
||||
loading_cache_clock_type::time_point last_read() const noexcept {
|
||||
return _last_read;
|
||||
}
|
||||
|
||||
loading_cache_clock_type::time_point loaded() const noexcept {
|
||||
return _loaded;
|
||||
}
|
||||
loading_cache_clock_type::time_point loaded() const noexcept {
|
||||
return _loaded;
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
return _size;
|
||||
}
|
||||
size_t size() const {
|
||||
return _size;
|
||||
}
|
||||
|
||||
bool ready() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
bool ready() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
|
||||
lru_entry* lru_entry_ptr() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
lru_entry* lru_entry_ptr() const noexcept {
|
||||
return _lru_entry_ptr;
|
||||
}
|
||||
|
||||
private:
|
||||
void touch() noexcept {
|
||||
assert(_lru_entry_ptr);
|
||||
_last_read = loading_cache_clock_type::now();
|
||||
_lru_entry_ptr->touch();
|
||||
}
|
||||
private:
|
||||
void touch() noexcept {
|
||||
_last_read = loading_cache_clock_type::now();
|
||||
if (_lru_entry_ptr) {
|
||||
_lru_entry_ptr->touch();
|
||||
}
|
||||
}
|
||||
|
||||
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
|
||||
_lru_entry_ptr = lru_entry_ptr;
|
||||
}
|
||||
};
|
||||
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
|
||||
_lru_entry_ptr = lru_entry_ptr;
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
using loading_values_type = typename timestamped_val::loading_values_type;
|
||||
@@ -265,7 +266,7 @@ public:
|
||||
});
|
||||
}).then([this, k] (timestamped_val_ptr ts_val_ptr) {
|
||||
// check again since it could have already been inserted and initialized
|
||||
if (!ts_val_ptr->ready()) {
|
||||
if (!ts_val_ptr->ready() && !ts_val_ptr.orphaned()) {
|
||||
_logger.trace("{}: storing the value for the first time", k);
|
||||
|
||||
if (ts_val_ptr->size() > _max_size) {
|
||||
@@ -331,6 +332,11 @@ public:
|
||||
return set_find(k);
|
||||
}
|
||||
|
||||
// Removes all values matching a given predicate and values which are currently loading.
|
||||
// Guarantees that no values which match the predicate and whose loading was initiated
|
||||
// before this call will be present after this call (or appear at any time later).
|
||||
// The predicate may be invoked multiple times on the same value.
|
||||
// It must return the same result for a given value (it must be a pure function).
|
||||
template <typename Pred>
|
||||
void remove_if(Pred&& pred) {
|
||||
static_assert(std::is_same<bool, std::result_of_t<Pred(const value_type&)>>::value, "Bad Pred signature");
|
||||
@@ -344,15 +350,29 @@ public:
|
||||
|
||||
_unprivileged_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
|
||||
_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
|
||||
_loading_values.remove_if([&pred] (const timestamped_val& v) {
|
||||
return pred(v.value());
|
||||
});
|
||||
}
|
||||
|
||||
// Removes a given key from the cache.
|
||||
// The key is removed immediately.
|
||||
// After this, get_ptr() is guaranteed to reload the value before returning it.
|
||||
// As a consequence of the above, if there is a concurrent get_ptr() in progress with this,
|
||||
// its value will not populate the cache. It will still succeed.
|
||||
void remove(const Key& k) {
|
||||
remove_ts_value(set_find(k));
|
||||
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
|
||||
_loading_values.remove(k);
|
||||
}
|
||||
|
||||
// Removes a given key from the cache.
|
||||
// Same guarantees as with remove(key).
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
|
||||
remove_ts_value(set_find(key, std::move(key_hasher_func), std::move(key_equal_func)));
|
||||
remove_ts_value(set_find(key, key_hasher_func, key_equal_func));
|
||||
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
|
||||
_loading_values.remove(key, key_hasher_func, key_equal_func);
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
@@ -361,9 +381,18 @@ public:
|
||||
|
||||
/// \brief returns the memory size the currently cached entries occupy according to the EntrySize predicate.
|
||||
size_t memory_footprint() const {
|
||||
return _current_size;
|
||||
return _unprivileged_section_size + _privileged_section_size;
|
||||
}
|
||||
|
||||
/// \brief returns the memory size the currently cached entries occupy in the privileged section according to the EntrySize predicate.
|
||||
size_t privileged_section_memory_footprint() const noexcept {
|
||||
return _privileged_section_size;
|
||||
}
|
||||
|
||||
/// \brief returns the memory size the currently cached entries occupy in the unprivileged section according to the EntrySize predicate.
|
||||
size_t unprivileged_section_memory_footprint() const noexcept {
|
||||
return _unprivileged_section_size;
|
||||
}
|
||||
private:
|
||||
void remove_ts_value(timestamped_val_ptr ts_ptr) {
|
||||
if (!ts_ptr) {
|
||||
@@ -419,16 +448,22 @@ private:
|
||||
}
|
||||
|
||||
if (lru_entry.touch_count() < SectionHitThreshold) {
|
||||
_logger.trace("Putting key {} into the unpriviledged section", lru_entry.key());
|
||||
_logger.trace("Putting key {} into the unprivileged section", lru_entry.key());
|
||||
_unprivileged_lru_list.push_front(lru_entry);
|
||||
lru_entry.inc_touch_count();
|
||||
} else {
|
||||
_logger.trace("Putting key {} into the priviledged section", lru_entry.key());
|
||||
_logger.trace("Putting key {} into the privileged section", lru_entry.key());
|
||||
_lru_list.push_front(lru_entry);
|
||||
|
||||
// Bump it up only once to avoid a wrap around
|
||||
if (lru_entry.touch_count() == SectionHitThreshold) {
|
||||
// This code will run only once, when a promotion
|
||||
// from unprivileged to privileged section happens.
|
||||
// Update section size bookkeeping.
|
||||
|
||||
lru_entry.owning_section_size() -= lru_entry.timestamped_value().size();
|
||||
lru_entry.inc_touch_count();
|
||||
lru_entry.owning_section_size() += lru_entry.timestamped_value().size();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -495,17 +530,44 @@ private:
|
||||
void shrink() {
|
||||
using namespace std::chrono;
|
||||
|
||||
while (_current_size >= _max_size && !_unprivileged_lru_list.empty()) {
|
||||
ts_value_lru_entry& lru_entry = *_unprivileged_lru_list.rbegin();
|
||||
_logger.trace("shrink(): {}: dropping the unpriviledged entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
|
||||
loading_cache::destroy_ts_value(&lru_entry);
|
||||
LoadingCacheStats::inc_unprivileged_on_cache_size_eviction();
|
||||
}
|
||||
|
||||
while (_current_size >= _max_size) {
|
||||
auto drop_privileged_entry = [&] {
|
||||
ts_value_lru_entry& lru_entry = *_lru_list.rbegin();
|
||||
_logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
|
||||
loading_cache::destroy_ts_value(&lru_entry);
|
||||
};
|
||||
|
||||
auto drop_unprivileged_entry = [&] {
|
||||
ts_value_lru_entry& lru_entry = *_unprivileged_lru_list.rbegin();
|
||||
_logger.trace("shrink(): {}: dropping the unprivileged entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
|
||||
loading_cache::destroy_ts_value(&lru_entry);
|
||||
LoadingCacheStats::inc_unprivileged_on_cache_size_eviction();
|
||||
};
|
||||
|
||||
// When cache entries need to be evicted due to a size restriction,
|
||||
// unprivileged section entries are evicted first.
|
||||
//
|
||||
// However, we make sure that the unprivileged section does not get
|
||||
// too small, because this could lead to starving the unprivileged section.
|
||||
// For example if the cache could store at most 50 entries and there are 49 entries in
|
||||
// privileged section, after adding 5 entries (that would go to unprivileged
|
||||
// section) 4 of them would get evicted and only the 5th one would stay.
|
||||
// This caused problems with BATCH statements where all prepared statements
|
||||
// in the batch have to stay in cache at the same time for the batch to correctly
|
||||
// execute.
|
||||
auto minimum_unprivileged_section_size = _max_size / 2;
|
||||
while (memory_footprint() >= _max_size && _unprivileged_section_size > minimum_unprivileged_section_size) {
|
||||
drop_unprivileged_entry();
|
||||
}
|
||||
|
||||
while (memory_footprint() >= _max_size && !_lru_list.empty()) {
|
||||
drop_privileged_entry();
|
||||
}
|
||||
|
||||
// If dropping entries from privileged section did not help,
|
||||
// we have to drop entries from unprivileged section,
|
||||
// going below minimum_unprivileged_section_size.
|
||||
while (memory_footprint() >= _max_size) {
|
||||
drop_unprivileged_entry();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -558,7 +620,8 @@ private:
|
||||
loading_values_type _loading_values;
|
||||
lru_list_type _lru_list; // list containing "privileged" section entries
|
||||
lru_list_type _unprivileged_lru_list; // list containing "unprivileged" section entries
|
||||
size_t _current_size = 0;
|
||||
size_t _privileged_section_size = 0;
|
||||
size_t _unprivileged_section_size = 0;
|
||||
size_t _max_size = 0;
|
||||
loading_cache_clock_type::duration _expiry;
|
||||
loading_cache_clock_type::duration _refresh;
|
||||
@@ -624,7 +687,7 @@ public:
|
||||
static_assert(SectionHitThreshold <= std::numeric_limits<typeof(_touch_count)>::max() / 2, "SectionHitThreshold value is too big");
|
||||
|
||||
_ts_val_ptr->set_anchor_back_reference(this);
|
||||
cache_size() += _ts_val_ptr->size();
|
||||
owning_section_size() += _ts_val_ptr->size();
|
||||
}
|
||||
|
||||
void inc_touch_count() noexcept {
|
||||
@@ -640,12 +703,12 @@ public:
|
||||
lru_list_type& lru_list = _parent.container_list(*this);
|
||||
lru_list.erase(lru_list.iterator_to(*this));
|
||||
}
|
||||
cache_size() -= _ts_val_ptr->size();
|
||||
owning_section_size() -= _ts_val_ptr->size();
|
||||
_ts_val_ptr->set_anchor_back_reference(nullptr);
|
||||
}
|
||||
|
||||
size_t& cache_size() noexcept {
|
||||
return _parent._current_size;
|
||||
size_t& owning_section_size() noexcept {
|
||||
return _touch_count <= SectionHitThreshold ? _parent._unprivileged_section_size : _parent._privileged_section_size;
|
||||
}
|
||||
|
||||
void touch() noexcept {
|
||||
|
||||
@@ -83,6 +83,10 @@ private:
|
||||
_val.emplace(std::move(new_val));
|
||||
}
|
||||
|
||||
bool orphaned() const {
|
||||
return !is_linked();
|
||||
}
|
||||
|
||||
shared_promise<>& loaded() {
|
||||
return _loaded;
|
||||
}
|
||||
@@ -95,7 +99,9 @@ private:
|
||||
: _parent(parent), _key(std::move(k)) {}
|
||||
|
||||
~entry() {
|
||||
_parent._set.erase(_parent._set.iterator_to(*this));
|
||||
if (is_linked()) {
|
||||
_parent._set.erase(_parent._set.iterator_to(*this));
|
||||
}
|
||||
Stats::inc_evictions();
|
||||
}
|
||||
|
||||
@@ -153,6 +159,18 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
// Returns the key this entry is associated with.
|
||||
// Valid if bool(*this).
|
||||
const key_type& key() const {
|
||||
return _e->key();
|
||||
}
|
||||
|
||||
// Returns true iff the entry is not linked in the set.
|
||||
// Call only when bool(*this).
|
||||
bool orphaned() const {
|
||||
return _e->orphaned();
|
||||
}
|
||||
|
||||
friend class loading_shared_values;
|
||||
friend std::ostream& operator<<(std::ostream& os, const entry_ptr& ep) {
|
||||
return os << ep._e.get();
|
||||
@@ -265,6 +283,50 @@ public:
|
||||
return entry_ptr(it->shared_from_this());
|
||||
};
|
||||
|
||||
// Removes a given key from this container.
|
||||
// If a given key is currently loading, the loading will succeed and will return entry_ptr
|
||||
// to the caller, but the value will not be present in the container. It will be removed
|
||||
// when the last entry_ptr dies, as usual.
|
||||
//
|
||||
// Post-condition: !find(key)
|
||||
template<typename KeyType, typename KeyHasher, typename KeyEqual>
|
||||
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) {
|
||||
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
|
||||
if (it != _set.end()) {
|
||||
_set.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
// Removes a given key from this container.
|
||||
// If a given key is currently loading, the loading will succeed and will return entry_ptr
|
||||
// to the caller, but the value will not be present in the container. It will be removed
|
||||
// when the last entry_ptr dies, as usual.
|
||||
//
|
||||
// Post-condition: !find(key)
|
||||
template<typename KeyType>
|
||||
void remove(const KeyType& key) {
|
||||
remove(key, Hash(), EqualPred());
|
||||
}
|
||||
|
||||
// Removes all values which match a given predicate or are currently loading.
|
||||
// Guarantees that no values which match the predicate and whose loading was initiated
|
||||
// before this call will be present after this call (or appear at any time later).
|
||||
// Same effects as if remove(e.key()) was called on each matching entry.
|
||||
template<typename Pred>
|
||||
requires std::is_invocable_r_v<bool, Pred, const Tp&>
|
||||
void remove_if(const Pred& pred) {
|
||||
auto it = _set.begin();
|
||||
while (it != _set.end()) {
|
||||
if (!it->ready() || pred(it->value())) {
|
||||
auto next = std::next(it);
|
||||
_set.erase(it);
|
||||
it = next;
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// keep the default non-templated overloads to ease on the compiler for specifications
|
||||
// that do not require the templated find().
|
||||
entry_ptr find(const key_type& key) noexcept {
|
||||
|
||||
@@ -584,6 +584,10 @@ static constexpr auto max_used_space_ratio_for_compaction = 0.85;
|
||||
static constexpr size_t max_used_space_for_compaction = segment_size * max_used_space_ratio_for_compaction;
|
||||
static constexpr size_t min_free_space_for_compaction = segment_size - max_used_space_for_compaction;
|
||||
|
||||
struct [[gnu::packed]] non_lsa_object_cookie {
|
||||
uint64_t value = 0xbadcaffe;
|
||||
};
|
||||
|
||||
static_assert(min_free_space_for_compaction >= max_managed_object_size,
|
||||
"Segments which cannot fit max_managed_object_size must not be considered compactible for the sake of forward progress of compaction");
|
||||
|
||||
@@ -827,9 +831,13 @@ public:
|
||||
void clear_allocation_failure_flag() { _allocation_failure_flag = false; }
|
||||
bool allocation_failure_flag() { return _allocation_failure_flag; }
|
||||
void refill_emergency_reserve();
|
||||
void update_non_lsa_memory_in_use(ssize_t n) {
|
||||
void add_non_lsa_memory_in_use(size_t n) {
|
||||
_non_lsa_memory_in_use += n;
|
||||
}
|
||||
void subtract_non_lsa_memory_in_use(size_t n) {
|
||||
assert(_non_lsa_memory_in_use >= n);
|
||||
_non_lsa_memory_in_use -= n;
|
||||
}
|
||||
size_t non_lsa_memory_in_use() const {
|
||||
return _non_lsa_memory_in_use;
|
||||
}
|
||||
@@ -1630,17 +1638,18 @@ public:
|
||||
memory::on_alloc_point();
|
||||
shard_segment_pool.on_memory_allocation(size);
|
||||
if (size > max_managed_object_size) {
|
||||
auto ptr = standard_allocator().alloc(migrator, size, alignment);
|
||||
auto ptr = standard_allocator().alloc(migrator, size + sizeof(non_lsa_object_cookie), alignment);
|
||||
// This isn't very acurrate, the correct free_space value would be
|
||||
// malloc_usable_size(ptr) - size, but there is no way to get
|
||||
// the exact object size at free.
|
||||
auto allocated_size = malloc_usable_size(ptr);
|
||||
new ((char*)ptr + allocated_size - sizeof(non_lsa_object_cookie)) non_lsa_object_cookie();
|
||||
_non_lsa_occupancy += occupancy_stats(0, allocated_size);
|
||||
if (_group) {
|
||||
_evictable_space += allocated_size;
|
||||
_group->increase_usage(_heap_handle, allocated_size);
|
||||
}
|
||||
shard_segment_pool.update_non_lsa_memory_in_use(allocated_size);
|
||||
shard_segment_pool.add_non_lsa_memory_in_use(allocated_size);
|
||||
return ptr;
|
||||
} else {
|
||||
auto ptr = alloc_small(object_descriptor(migrator), (segment::size_type) size, alignment);
|
||||
@@ -1652,12 +1661,14 @@ public:
|
||||
private:
|
||||
void on_non_lsa_free(void* obj) noexcept {
|
||||
auto allocated_size = malloc_usable_size(obj);
|
||||
auto cookie = (non_lsa_object_cookie*)((char*)obj + allocated_size) - 1;
|
||||
assert(cookie->value == non_lsa_object_cookie().value);
|
||||
_non_lsa_occupancy -= occupancy_stats(0, allocated_size);
|
||||
if (_group) {
|
||||
_evictable_space -= allocated_size;
|
||||
_group->decrease_usage(_heap_handle, allocated_size);
|
||||
}
|
||||
shard_segment_pool.update_non_lsa_memory_in_use(-allocated_size);
|
||||
shard_segment_pool.subtract_non_lsa_memory_in_use(allocated_size);
|
||||
}
|
||||
public:
|
||||
virtual void free(void* obj) noexcept override {
|
||||
|
||||
@@ -60,6 +60,9 @@ private:
|
||||
throw std::out_of_range("chunked_managed_vector out of range access");
|
||||
}
|
||||
}
|
||||
chunk_ptr& back_chunk() {
|
||||
return _chunks[_size / max_chunk_capacity()];
|
||||
}
|
||||
static void migrate(T* begin, T* end, managed_vector<T>& result);
|
||||
public:
|
||||
using value_type = T;
|
||||
@@ -106,24 +109,24 @@ public:
|
||||
|
||||
void push_back(const T& x) {
|
||||
reserve_for_push_back();
|
||||
_chunks.back().emplace_back(x);
|
||||
back_chunk().emplace_back(x);
|
||||
++_size;
|
||||
}
|
||||
void push_back(T&& x) {
|
||||
reserve_for_push_back();
|
||||
_chunks.back().emplace_back(std::move(x));
|
||||
back_chunk().emplace_back(std::move(x));
|
||||
++_size;
|
||||
}
|
||||
template <typename... Args>
|
||||
T& emplace_back(Args&&... args) {
|
||||
reserve_for_push_back();
|
||||
auto& ret = _chunks.back().emplace_back(std::forward<Args>(args)...);
|
||||
auto& ret = back_chunk().emplace_back(std::forward<Args>(args)...);
|
||||
++_size;
|
||||
return ret;
|
||||
}
|
||||
void pop_back() {
|
||||
--_size;
|
||||
_chunks.back().pop_back();
|
||||
back_chunk().pop_back();
|
||||
}
|
||||
const T& back() const {
|
||||
return *addr(_size - 1);
|
||||
@@ -381,7 +384,9 @@ chunked_managed_vector<T>::make_room(size_t n, bool stop_after_one) {
|
||||
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
|
||||
// FIXME: realloc? maybe not worth the complication; only works for PODs
|
||||
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk);
|
||||
if (_size > _capacity - last_chunk_capacity) {
|
||||
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk);
|
||||
}
|
||||
_chunks.back() = std::move(new_last_chunk);
|
||||
_capacity += capacity_increase;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user