Compare commits
26 Commits
debug_form
...
scylla-5.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f92622e0de | ||
|
|
3bca608db5 | ||
|
|
a93b72d5dd | ||
|
|
d58ca2edbd | ||
|
|
75740ace2a | ||
|
|
d7a1bf6331 | ||
|
|
bbd7d657cc | ||
|
|
f5bf4c81d1 | ||
|
|
02e8336659 | ||
|
|
601812e11b | ||
|
|
ea466320d2 | ||
|
|
25ea831a15 | ||
|
|
8648c79c9e | ||
|
|
7ae4d0e6f8 | ||
|
|
f3564db941 | ||
|
|
97caf12836 | ||
|
|
839d9ef41a | ||
|
|
782bd50f92 | ||
|
|
0a4d971b4a | ||
|
|
22562f767f | ||
|
|
eb80dd1db5 | ||
|
|
51d699ee21 | ||
|
|
83a33bff8c | ||
|
|
273563b9ad | ||
|
|
891990ec09 | ||
|
|
da0cd2b107 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.0.dev
|
||||
VERSION=5.0.rc3
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2577,8 +2577,8 @@ static bool hierarchy_actions(
|
||||
// attr member so we can use add()
|
||||
rjson::add_with_string_name(v, attr, std::move(*newv));
|
||||
} else {
|
||||
throw api_error::validation(format("Can't remove document path {} - not present in item",
|
||||
subh.get_value()._path));
|
||||
// Removing a.b when a is a map but a.b doesn't exist
|
||||
// is silently ignored. It's not considered an error.
|
||||
}
|
||||
} else {
|
||||
throw api_error::validation(format("UpdateExpression: document paths not valid for this item:{}", h));
|
||||
|
||||
@@ -87,19 +87,24 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
|
||||
// prefer expiring cells.
|
||||
return left.is_live_and_has_ttl() ? std::strong_ordering::greater : std::strong_ordering::less;
|
||||
}
|
||||
if (left.is_live_and_has_ttl() && left.expiry() != right.expiry()) {
|
||||
return left.expiry() <=> right.expiry();
|
||||
if (left.is_live_and_has_ttl()) {
|
||||
if (left.expiry() != right.expiry()) {
|
||||
return left.expiry() <=> right.expiry();
|
||||
} else {
|
||||
// prefer the cell that was written later,
|
||||
// so it survives longer after it expires, until purged.
|
||||
return right.ttl() <=> left.ttl();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Both are deleted
|
||||
if (left.deletion_time() != right.deletion_time()) {
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
|
||||
}
|
||||
|
||||
// Origin compares big-endian serialized deletion time. That's because it
|
||||
// delegates to AbstractCell.reconcile() which compares values after
|
||||
// comparing timestamps, which in case of deleted cells will hold
|
||||
// serialized expiry.
|
||||
return (uint64_t) left.deletion_time().time_since_epoch().count()
|
||||
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
|
||||
}
|
||||
return std::strong_ordering::equal;
|
||||
}
|
||||
|
||||
@@ -353,32 +353,50 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t)
|
||||
: _cm(cm)
|
||||
, _table(t)
|
||||
, _compaction_state(cm.get_compaction_state(_table))
|
||||
, _holder(_compaction_state.gate.hold())
|
||||
{
|
||||
_compaction_state.compaction_disabled_counter++;
|
||||
cmlog.debug("Temporarily disabled compaction for {}.{}. compaction_disabled_counter={}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), _compaction_state.compaction_disabled_counter);
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept
|
||||
: _cm(o._cm)
|
||||
, _table(std::exchange(o._table, nullptr))
|
||||
, _compaction_state(o._compaction_state)
|
||||
, _holder(std::move(o._holder))
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_reenabler::~compaction_reenabler() {
|
||||
// submit compaction request if we're the last holder of the gate which is still opened.
|
||||
if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) {
|
||||
cmlog.debug("Reenabling compaction for {}.{}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name());
|
||||
try {
|
||||
_cm.submit(_table);
|
||||
} catch (...) {
|
||||
cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_reenabler>
|
||||
compaction_manager::stop_and_disable_compaction(replica::table* t) {
|
||||
compaction_reenabler cre(*this, t);
|
||||
co_await stop_ongoing_compactions("user-triggered operation", t);
|
||||
co_return cre;
|
||||
}
|
||||
|
||||
future<>
|
||||
compaction_manager::run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func) {
|
||||
auto& c_state = _compaction_state[t];
|
||||
auto holder = c_state.gate.hold();
|
||||
compaction_reenabler cre = co_await stop_and_disable_compaction(t);
|
||||
|
||||
c_state.compaction_disabled_counter++;
|
||||
|
||||
std::exception_ptr err;
|
||||
try {
|
||||
co_await stop_ongoing_compactions("user-triggered operation", t);
|
||||
co_await func();
|
||||
} catch (...) {
|
||||
err = std::current_exception();
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
assert(_compaction_state.contains(t));
|
||||
#endif
|
||||
// submit compaction request if we're the last holder of the gate which is still opened.
|
||||
if (--c_state.compaction_disabled_counter == 0 && !c_state.gate.is_closed()) {
|
||||
submit(t);
|
||||
}
|
||||
if (err) {
|
||||
std::rethrow_exception(err);
|
||||
}
|
||||
co_return;
|
||||
co_await func();
|
||||
}
|
||||
|
||||
void compaction_manager::task::setup_new_compaction() {
|
||||
@@ -810,7 +828,8 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
auto sstable_level = sst->get_sstable_level();
|
||||
auto run_identifier = sst->run_identifier();
|
||||
auto sstable_set_snapshot = can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), _maintenance_sg.io,
|
||||
// FIXME: this compaction should run with maintenance priority.
|
||||
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
|
||||
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
||||
|
||||
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
||||
@@ -819,8 +838,9 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
};
|
||||
|
||||
auto maintenance_permit = co_await seastar::get_units(_maintenance_ops_sem, 1);
|
||||
// Take write lock for table to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard.
|
||||
auto write_lock_holder = co_await _compaction_state[&t].lock.hold_write_lock();
|
||||
// FIXME: acquiring the read lock is not needed after acquiring the _maintenance_ops_sem
|
||||
// only major compaction needs to acquire the write lock to synchronize with regular compaction.
|
||||
auto lock_holder = co_await _compaction_state[&t].lock.hold_read_lock();
|
||||
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
@@ -852,7 +872,7 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
|
||||
};
|
||||
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
||||
completed = co_await with_scheduling_group(_maintenance_sg.cpu, std::ref(perform_rewrite));
|
||||
completed = co_await with_scheduling_group(_compaction_controller.sg(), std::ref(perform_rewrite));
|
||||
} while (!completed);
|
||||
};
|
||||
|
||||
|
||||
@@ -269,6 +269,31 @@ public:
|
||||
// parameter job is a function that will carry the operation
|
||||
future<> run_custom_job(replica::table* t, sstables::compaction_type type, noncopyable_function<future<>(sstables::compaction_data&)> job);
|
||||
|
||||
class compaction_reenabler {
|
||||
compaction_manager& _cm;
|
||||
replica::table* _table;
|
||||
compaction_state& _compaction_state;
|
||||
gate::holder _holder;
|
||||
|
||||
public:
|
||||
compaction_reenabler(compaction_manager&, replica::table*);
|
||||
compaction_reenabler(compaction_reenabler&&) noexcept;
|
||||
|
||||
~compaction_reenabler();
|
||||
|
||||
replica::table* compacting_table() const noexcept {
|
||||
return _table;
|
||||
}
|
||||
|
||||
const compaction_state& compaction_state() const noexcept {
|
||||
return _compaction_state;
|
||||
}
|
||||
};
|
||||
|
||||
// Disable compaction temporarily for a table t.
|
||||
// Caller should call the compaction_reenabler::reenable
|
||||
future<compaction_reenabler> stop_and_disable_compaction(replica::table* t);
|
||||
|
||||
// Run a function with compaction temporarily disabled for a table T.
|
||||
future<> run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func);
|
||||
|
||||
|
||||
@@ -103,7 +103,13 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
|
||||
if (!col_type->is_map()) {
|
||||
throw exceptions::invalid_request_exception(format("subscripting non-map column {}", cdef->name_as_text()));
|
||||
}
|
||||
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[data.sel.index_of(*cdef)]));
|
||||
int32_t index = data.sel.index_of(*cdef);
|
||||
if (index == -1) {
|
||||
throw std::runtime_error(
|
||||
format("Column definition {} does not match any column in the query selection",
|
||||
cdef->name_as_text()));
|
||||
}
|
||||
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[index]));
|
||||
const auto& data_map = value_cast<map_type_impl::native_type>(deserialized);
|
||||
const auto key = evaluate(*col.sub, options);
|
||||
auto&& key_type = col_type->name_comparator();
|
||||
@@ -121,8 +127,16 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
|
||||
case column_kind::clustering_key:
|
||||
return managed_bytes(data.clustering_key[cdef->id]);
|
||||
case column_kind::static_column:
|
||||
case column_kind::regular_column:
|
||||
return managed_bytes_opt(data.other_columns[data.sel.index_of(*cdef)]);
|
||||
[[fallthrough]];
|
||||
case column_kind::regular_column: {
|
||||
int32_t index = data.sel.index_of(*cdef);
|
||||
if (index == -1) {
|
||||
throw std::runtime_error(
|
||||
format("Column definition {} does not match any column in the query selection",
|
||||
cdef->name_as_text()));
|
||||
}
|
||||
return managed_bytes_opt(data.other_columns[index]);
|
||||
}
|
||||
default:
|
||||
throw exceptions::unsupported_operation_exception("Unknown column kind");
|
||||
}
|
||||
|
||||
@@ -514,7 +514,7 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
|
||||
}
|
||||
|
||||
if (!_nonprimary_key_restrictions->empty()) {
|
||||
if (_has_queriable_regular_index) {
|
||||
if (_has_queriable_regular_index && _partition_range_is_simple) {
|
||||
_uses_secondary_indexing = true;
|
||||
} else if (!allow_filtering) {
|
||||
throw exceptions::invalid_request_exception("Cannot execute this query as it might involve data filtering and "
|
||||
|
||||
@@ -165,7 +165,7 @@ public:
|
||||
|
||||
template<typename RowComparator>
|
||||
void sort(const RowComparator& cmp) {
|
||||
std::sort(_rows.begin(), _rows.end(), std::ref(cmp));
|
||||
std::sort(_rows.begin(), _rows.end(), cmp);
|
||||
}
|
||||
|
||||
metadata& get_metadata();
|
||||
|
||||
6
dist/common/supervisor/scylla_util.sh
vendored
6
dist/common/supervisor/scylla_util.sh
vendored
@@ -6,12 +6,16 @@ is_nonroot() {
|
||||
[ -f "$scylladir"/SCYLLA-NONROOT-FILE ]
|
||||
}
|
||||
|
||||
is_container() {
|
||||
[ -f "$scylladir"/SCYLLA-CONTAINER-FILE ]
|
||||
}
|
||||
|
||||
is_privileged() {
|
||||
[ ${EUID:-${UID}} = 0 ]
|
||||
}
|
||||
|
||||
execsudo() {
|
||||
if is_nonroot; then
|
||||
if is_nonroot || is_container; then
|
||||
exec "$@"
|
||||
else
|
||||
exec sudo -u scylla -g scylla "$@"
|
||||
|
||||
3
dist/docker/debian/build_docker.sh
vendored
3
dist/docker/debian/build_docker.sh
vendored
@@ -82,7 +82,7 @@ run bash -ec "echo 'debconf debconf/frontend select Noninteractive' | debconf-se
|
||||
run bash -ec "rm -rf /etc/rsyslog.conf"
|
||||
run apt-get -y install hostname supervisor openssh-server openssh-client openjdk-11-jre-headless python python-yaml curl rsyslog locales sudo
|
||||
run locale-gen en_US.UTF-8
|
||||
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF_8
|
||||
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
|
||||
run bash -ec "dpkg -i packages/*.deb"
|
||||
run apt-get -y clean all
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
|
||||
@@ -91,6 +91,7 @@ run mkdir -p /var/log/scylla
|
||||
run chown -R scylla:scylla /var/lib/scylla
|
||||
|
||||
run mkdir -p /opt/scylladb/supervisor
|
||||
run touch /opt/scylladb/SCYLLA-CONTAINER-FILE
|
||||
bcp dist/common/supervisor/scylla-server.sh /opt/scylladb/supervisor/scylla-server.sh
|
||||
bcp dist/common/supervisor/scylla-jmx.sh /opt/scylladb/supervisor/scylla-jmx.sh
|
||||
bcp dist/common/supervisor/scylla-node-exporter.sh /opt/scylladb/supervisor/scylla-node-exporter.sh
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
[program:scylla-server]
|
||||
[program:scylla]
|
||||
command=/opt/scylladb/supervisor/scylla-server.sh
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
|
||||
@@ -508,8 +508,13 @@ relocate_python3 "$rprefix"/scripts fix_system_distributed_tables.py
|
||||
if $supervisor; then
|
||||
install -d -m755 `supervisor_dir $retc`
|
||||
for service in scylla-server scylla-jmx scylla-node-exporter; do
|
||||
if [ "$service" = "scylla-server" ]; then
|
||||
program="scylla"
|
||||
else
|
||||
program=$service
|
||||
fi
|
||||
cat << EOS > `supervisor_conf $retc $service`
|
||||
[program:$service]
|
||||
[program:$program]
|
||||
directory=$rprefix
|
||||
command=/bin/bash -c './supervisor/$service.sh'
|
||||
EOS
|
||||
|
||||
33
main.cc
33
main.cc
@@ -367,11 +367,38 @@ static auto defer_verbose_shutdown(const char* what, Func&& func) {
|
||||
startlog.info("Shutting down {}", what);
|
||||
try {
|
||||
func();
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
} catch (...) {
|
||||
startlog.error("Unexpected error shutting down {}: {}", what, std::current_exception());
|
||||
throw;
|
||||
auto ex = std::current_exception();
|
||||
bool do_abort = true;
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (const std::system_error& e) {
|
||||
// System error codes we consider "environmental",
|
||||
// i.e. not scylla's fault, therefore there is no point in
|
||||
// aborting and dumping core.
|
||||
for (int i : {EIO, EACCES, ENOSPC}) {
|
||||
if (e.code() == std::error_code(i, std::system_category())) {
|
||||
do_abort = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
}
|
||||
auto msg = fmt::format("Unexpected error shutting down {}: {}", what, ex);
|
||||
if (do_abort) {
|
||||
startlog.error("{}: aborting", msg);
|
||||
abort();
|
||||
} else {
|
||||
startlog.error("{}: exiting, at {}", msg, current_backtrace());
|
||||
|
||||
// Call _exit() rather than exit() to exit immediately
|
||||
// without calling exit handlers, avoiding
|
||||
// boost::intrusive::detail::destructor_impl assert failure
|
||||
// from ~segment_pool exit handler.
|
||||
_exit(255);
|
||||
}
|
||||
}
|
||||
startlog.info("Shutting down {} was successful", what);
|
||||
};
|
||||
|
||||
auto ret = deferred_action(std::move(vfunc));
|
||||
|
||||
@@ -96,7 +96,7 @@ void range_tombstone_list::insert_from(const schema& s,
|
||||
if (cmp(end, it->position()) < 0) {
|
||||
// not overlapping
|
||||
if (it->tombstone().tomb == tomb && cmp(end, it->position()) == 0) {
|
||||
rev.update(it, {std::move(start), std::move(start), tomb});
|
||||
rev.update(it, {std::move(start), std::move(end), tomb});
|
||||
} else {
|
||||
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), tomb);
|
||||
rev.insert(it, *rt);
|
||||
|
||||
@@ -2062,80 +2062,77 @@ future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf)
|
||||
|
||||
future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf, bool with_snapshot) {
|
||||
dblog.debug("Truncating {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name());
|
||||
return with_gate(cf.async_gate(), [this, &ks, &cf, tsf = std::move(tsf), with_snapshot] () mutable -> future<> {
|
||||
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
|
||||
const auto should_flush = auto_snapshot;
|
||||
auto holder = cf.async_gate().hold();
|
||||
|
||||
// Force mutations coming in to re-acquire higher rp:s
|
||||
// This creates a "soft" ordering, in that we will guarantee that
|
||||
// any sstable written _after_ we issue the flush below will
|
||||
// only have higher rp:s than we will get from the discard_sstable
|
||||
// call.
|
||||
auto low_mark = cf.set_low_replay_position_mark();
|
||||
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
|
||||
const auto should_flush = auto_snapshot;
|
||||
|
||||
const auto uuid = cf.schema()->id();
|
||||
// Force mutations coming in to re-acquire higher rp:s
|
||||
// This creates a "soft" ordering, in that we will guarantee that
|
||||
// any sstable written _after_ we issue the flush below will
|
||||
// only have higher rp:s than we will get from the discard_sstable
|
||||
// call.
|
||||
auto low_mark = cf.set_low_replay_position_mark();
|
||||
|
||||
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
|
||||
future<> f = make_ready_future<>();
|
||||
bool did_flush = false;
|
||||
if (should_flush && cf.can_flush()) {
|
||||
// TODO:
|
||||
// this is not really a guarantee at all that we've actually
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
f = cf.flush();
|
||||
did_flush = true;
|
||||
} else {
|
||||
f = cf.clear();
|
||||
}
|
||||
return f.then([this, &cf, auto_snapshot, tsf = std::move(tsf), low_mark, should_flush, did_flush] {
|
||||
dblog.debug("Discarding sstable data for truncated CF + indexes");
|
||||
// TODO: notify truncation
|
||||
const auto uuid = cf.schema()->id();
|
||||
|
||||
return tsf().then([this, &cf, auto_snapshot, low_mark, should_flush, did_flush](db_clock::time_point truncated_at) {
|
||||
future<> f = make_ready_future<>();
|
||||
if (auto_snapshot) {
|
||||
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
|
||||
f = cf.snapshot(*this, name);
|
||||
}
|
||||
return f.then([this, &cf, truncated_at, low_mark, should_flush, did_flush] {
|
||||
return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush, did_flush](db::replay_position rp) {
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
|
||||
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
|
||||
// the low_mark assertion does not hold, because we maybe/probably never got around to
|
||||
// creating the sstables that would create them.
|
||||
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
return truncate_views(cf, truncated_at, should_flush).then([&cf, truncated_at, rp] {
|
||||
// save_truncation_record() may actually fail after we cached the truncation time
|
||||
// but this is not be worse that if failing without caching: at least the correct time
|
||||
// will be available until next reboot and a client will have to retry truncation anyway.
|
||||
cf.cache_truncation_record(truncated_at);
|
||||
return db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([this, uuid] {
|
||||
drop_repair_history_map_for_table(uuid);
|
||||
});
|
||||
});
|
||||
}
|
||||
std::vector<compaction_manager::compaction_reenabler> cres;
|
||||
cres.reserve(1 + cf.views().size());
|
||||
|
||||
future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) {
|
||||
return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) {
|
||||
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&cf));
|
||||
co_await parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> {
|
||||
auto& vcf = find_column_family(v);
|
||||
return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] {
|
||||
return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] {
|
||||
return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) {
|
||||
return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
|
||||
});
|
||||
});
|
||||
});
|
||||
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&vcf));
|
||||
});
|
||||
|
||||
bool did_flush = false;
|
||||
if (should_flush && cf.can_flush()) {
|
||||
// TODO:
|
||||
// this is not really a guarantee at all that we've actually
|
||||
// gotten all things to disk. Again, need queue-ish or something.
|
||||
co_await cf.flush();
|
||||
did_flush = true;
|
||||
} else {
|
||||
co_await cf.clear();
|
||||
}
|
||||
|
||||
dblog.debug("Discarding sstable data for truncated CF + indexes");
|
||||
// TODO: notify truncation
|
||||
|
||||
db_clock::time_point truncated_at = co_await tsf();
|
||||
|
||||
if (auto_snapshot) {
|
||||
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
|
||||
co_await cf.snapshot(*this, name);
|
||||
}
|
||||
|
||||
db::replay_position rp = co_await cf.discard_sstables(truncated_at);
|
||||
// TODO: indexes.
|
||||
// Note: since discard_sstables was changed to only count tables owned by this shard,
|
||||
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
|
||||
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
|
||||
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
|
||||
// the low_mark assertion does not hold, because we maybe/probably never got around to
|
||||
// creating the sstables that would create them.
|
||||
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
|
||||
rp = std::max(low_mark, rp);
|
||||
co_await parallel_for_each(cf.views(), [this, truncated_at, should_flush] (view_ptr v) -> future<> {
|
||||
auto& vcf = find_column_family(v);
|
||||
if (should_flush) {
|
||||
co_await vcf.flush();
|
||||
} else {
|
||||
co_await vcf.clear();
|
||||
}
|
||||
db::replay_position rp = co_await vcf.discard_sstables(truncated_at);
|
||||
co_await db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
|
||||
});
|
||||
// save_truncation_record() may actually fail after we cached the truncation time
|
||||
// but this is not be worse that if failing without caching: at least the correct time
|
||||
// will be available until next reboot and a client will have to retry truncation anyway.
|
||||
cf.cache_truncation_record(truncated_at);
|
||||
co_await db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
|
||||
|
||||
drop_repair_history_map_for_table(uuid);
|
||||
}
|
||||
|
||||
const sstring& database::get_snitch_name() const {
|
||||
|
||||
@@ -1568,7 +1568,6 @@ public:
|
||||
/** Truncates the given column family */
|
||||
future<> truncate(sstring ksname, sstring cfname, timestamp_func);
|
||||
future<> truncate(const keyspace& ks, column_family& cf, timestamp_func, bool with_snapshot = true);
|
||||
future<> truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush);
|
||||
|
||||
bool update_column_family(schema_ptr s);
|
||||
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 0d250d15ac...4a30c44c4c
@@ -1030,6 +1030,20 @@ def test_nested_attribute_remove_from_missing_item(test_table_s):
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x[0]')
|
||||
|
||||
# Though in an above test (test_nested_attribute_update_bad_path_dot) we
|
||||
# showed that DynamoDB does not allow REMOVE x.y if attribute x doesn't
|
||||
# exist - and generates a ValidationException, if x *does* exist but y
|
||||
# doesn't, it's fine and the removal should just be silently ignored.
|
||||
def test_nested_attribute_remove_missing_leaf(test_table_s):
|
||||
p = random_string()
|
||||
item = {'p': p, 'a': {'x': 3}, 'b': ['hi']}
|
||||
test_table_s.put_item(Item=item)
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE a.y')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE b[7]')
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE c')
|
||||
# The above UpdateItem calls didn't change anything...
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == item
|
||||
|
||||
# Similarly for other types of bad paths - using [0] on something which
|
||||
# doesn't exist or isn't an array.
|
||||
def test_nested_attribute_update_bad_path_array(test_table_s):
|
||||
|
||||
@@ -207,7 +207,9 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
|
||||
}
|
||||
|
||||
{
|
||||
cf_lru.evict_all();
|
||||
with_allocator(region.allocator(), [] {
|
||||
cf_lru.evict_all();
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.cached_bytes); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, cf.cached_bytes()); // change here
|
||||
@@ -215,6 +217,8 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions); // change here
|
||||
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
|
||||
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(region.occupancy().used_space(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@@ -690,6 +690,7 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
};
|
||||
|
||||
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
|
||||
testlog.trace("Expected {} == {}", c1, c2);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0);
|
||||
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
|
||||
};
|
||||
@@ -711,9 +712,11 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2));
|
||||
|
||||
// Origin doesn't compare ttl (is it wise?)
|
||||
assert_equal(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2));
|
||||
// But we do. See https://github.com/scylladb/scylla/issues/10156
|
||||
// and https://github.com/scylladb/scylla/issues/10173
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2),
|
||||
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
|
||||
|
||||
assert_order(
|
||||
atomic_cell::make_live(*bytes_type, 0, bytes("value1")),
|
||||
|
||||
@@ -24,11 +24,13 @@ static void add_entry(logalloc::region& r,
|
||||
{
|
||||
logalloc::allocating_section as;
|
||||
as(r, [&] {
|
||||
sstables::key sst_key = sstables::key::from_partition_key(s, key);
|
||||
page._entries.push_back(make_managed<index_entry>(
|
||||
managed_bytes(sst_key.get_bytes()),
|
||||
position,
|
||||
managed_ref<promoted_index>()));
|
||||
with_allocator(r.allocator(), [&] {
|
||||
sstables::key sst_key = sstables::key::from_partition_key(s, key);
|
||||
page._entries.push_back(make_managed<index_entry>(
|
||||
managed_bytes(sst_key.get_bytes()),
|
||||
position,
|
||||
managed_ref<promoted_index>()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -115,3 +115,16 @@ def test_operator_ne_not_supported(cql, table1):
|
||||
cql.execute(f'SELECT a FROM {table1} WHERE a != 0')
|
||||
with pytest.raises(InvalidRequest, match='Unsupported.*!='):
|
||||
cql.execute(f'SELECT a FROM {table1} WHERE token(a) != 0')
|
||||
|
||||
# Test that the fact that a column is indexed does not cause us to fetch
|
||||
# incorrect results from a filtering query (issue #10300).
|
||||
def test_index_with_in_relation(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int, c int, v boolean, primary key (p,c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"create index on {table}(v)")
|
||||
for p, c, v in [(0,0,True),(0,1,False),(0,2,True),(0,3,False),
|
||||
(1,0,True),(1,1,False),(1,2,True),(1,3,False),
|
||||
(2,0,True),(2,1,False),(2,2,True),(2,3,False)]:
|
||||
cql.execute(f"insert into {table} (p,c,v) values ({p}, {c}, {v})")
|
||||
res = cql.execute(f"select * from {table} where p in (0,1) and v = False ALLOW FILTERING")
|
||||
assert set(res) == set([(0,1,False),(0,3,False),(1,1,False), (1,3,False)])
|
||||
|
||||
@@ -326,6 +326,7 @@ public:
|
||||
}
|
||||
|
||||
size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept {
|
||||
return with_allocator(standard_allocator(), [&] {
|
||||
size_t count = 0;
|
||||
auto disposer = [] (auto* p) noexcept {};
|
||||
while (start != end) {
|
||||
@@ -338,6 +339,7 @@ public:
|
||||
}
|
||||
}
|
||||
return count;
|
||||
});
|
||||
}
|
||||
public:
|
||||
/// \brief Constructs a cached_file.
|
||||
@@ -464,8 +466,10 @@ public:
|
||||
inline
|
||||
void cached_file::cached_page::on_evicted() noexcept {
|
||||
parent->on_evicted(*this);
|
||||
cached_file::cache_type::iterator it(this);
|
||||
it.erase(page_idx_less_comparator());
|
||||
with_allocator(standard_allocator(), [this] {
|
||||
cached_file::cache_type::iterator it(this);
|
||||
it.erase(page_idx_less_comparator());
|
||||
});
|
||||
}
|
||||
|
||||
class cached_file_impl : public file_impl {
|
||||
|
||||
@@ -584,6 +584,10 @@ static constexpr auto max_used_space_ratio_for_compaction = 0.85;
|
||||
static constexpr size_t max_used_space_for_compaction = segment_size * max_used_space_ratio_for_compaction;
|
||||
static constexpr size_t min_free_space_for_compaction = segment_size - max_used_space_for_compaction;
|
||||
|
||||
struct [[gnu::packed]] non_lsa_object_cookie {
|
||||
uint64_t value = 0xbadcaffe;
|
||||
};
|
||||
|
||||
static_assert(min_free_space_for_compaction >= max_managed_object_size,
|
||||
"Segments which cannot fit max_managed_object_size must not be considered compactible for the sake of forward progress of compaction");
|
||||
|
||||
@@ -827,9 +831,13 @@ public:
|
||||
void clear_allocation_failure_flag() { _allocation_failure_flag = false; }
|
||||
bool allocation_failure_flag() { return _allocation_failure_flag; }
|
||||
void refill_emergency_reserve();
|
||||
void update_non_lsa_memory_in_use(ssize_t n) {
|
||||
void add_non_lsa_memory_in_use(size_t n) {
|
||||
_non_lsa_memory_in_use += n;
|
||||
}
|
||||
void subtract_non_lsa_memory_in_use(size_t n) {
|
||||
assert(_non_lsa_memory_in_use >= n);
|
||||
_non_lsa_memory_in_use -= n;
|
||||
}
|
||||
size_t non_lsa_memory_in_use() const {
|
||||
return _non_lsa_memory_in_use;
|
||||
}
|
||||
@@ -1630,17 +1638,18 @@ public:
|
||||
memory::on_alloc_point();
|
||||
shard_segment_pool.on_memory_allocation(size);
|
||||
if (size > max_managed_object_size) {
|
||||
auto ptr = standard_allocator().alloc(migrator, size, alignment);
|
||||
auto ptr = standard_allocator().alloc(migrator, size + sizeof(non_lsa_object_cookie), alignment);
|
||||
// This isn't very acurrate, the correct free_space value would be
|
||||
// malloc_usable_size(ptr) - size, but there is no way to get
|
||||
// the exact object size at free.
|
||||
auto allocated_size = malloc_usable_size(ptr);
|
||||
new ((char*)ptr + allocated_size - sizeof(non_lsa_object_cookie)) non_lsa_object_cookie();
|
||||
_non_lsa_occupancy += occupancy_stats(0, allocated_size);
|
||||
if (_group) {
|
||||
_evictable_space += allocated_size;
|
||||
_group->increase_usage(_heap_handle, allocated_size);
|
||||
}
|
||||
shard_segment_pool.update_non_lsa_memory_in_use(allocated_size);
|
||||
shard_segment_pool.add_non_lsa_memory_in_use(allocated_size);
|
||||
return ptr;
|
||||
} else {
|
||||
auto ptr = alloc_small(object_descriptor(migrator), (segment::size_type) size, alignment);
|
||||
@@ -1652,12 +1661,14 @@ public:
|
||||
private:
|
||||
void on_non_lsa_free(void* obj) noexcept {
|
||||
auto allocated_size = malloc_usable_size(obj);
|
||||
auto cookie = (non_lsa_object_cookie*)((char*)obj + allocated_size) - 1;
|
||||
assert(cookie->value == non_lsa_object_cookie().value);
|
||||
_non_lsa_occupancy -= occupancy_stats(0, allocated_size);
|
||||
if (_group) {
|
||||
_evictable_space -= allocated_size;
|
||||
_group->decrease_usage(_heap_handle, allocated_size);
|
||||
}
|
||||
shard_segment_pool.update_non_lsa_memory_in_use(-allocated_size);
|
||||
shard_segment_pool.subtract_non_lsa_memory_in_use(allocated_size);
|
||||
}
|
||||
public:
|
||||
virtual void free(void* obj) noexcept override {
|
||||
|
||||
Reference in New Issue
Block a user