Compare commits
21 Commits
copilot/re
...
scylla-4.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
542394c82f | ||
|
|
018ad3f6f4 | ||
|
|
9b8b7efb54 | ||
|
|
1c3e63975f | ||
|
|
11bb03e46d | ||
|
|
810e410c5d | ||
|
|
97f6da0c3e | ||
|
|
c229fe9694 | ||
|
|
ee1ca8ae4d | ||
|
|
6bfd322e3b | ||
|
|
afc18d5070 | ||
|
|
2ec22c2404 | ||
|
|
19da778271 | ||
|
|
cbd4c13ba6 | ||
|
|
338871802d | ||
|
|
8b5b1b8af6 | ||
|
|
ea89eff95d | ||
|
|
96421e7779 | ||
|
|
142336ca53 | ||
|
|
492f12248c | ||
|
|
7eb7a0e5fe |
@@ -60,7 +60,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=4.6.dev
|
||||
VERSION=4.6.rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -995,6 +995,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
|
||||
}
|
||||
|
||||
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
|
||||
paging_state_copy->set_remaining(internal_paging_size);
|
||||
paging_state_copy->set_partition_key(std::move(index_pk));
|
||||
paging_state_copy->set_clustering_key(std::move(index_ck));
|
||||
return std::move(paging_state_copy);
|
||||
|
||||
@@ -428,6 +428,8 @@ private:
|
||||
void abort_recycled_list(std::exception_ptr);
|
||||
void abort_deletion_promise(std::exception_ptr);
|
||||
|
||||
future<> recalculate_footprint();
|
||||
|
||||
future<> rename_file(sstring, sstring) const;
|
||||
size_t max_request_controller_units() const;
|
||||
segment_id_type _ids = 0;
|
||||
@@ -444,6 +446,7 @@ private:
|
||||
seastar::gate _gate;
|
||||
uint64_t _new_counter = 0;
|
||||
std::optional<size_t> _disk_write_alignment;
|
||||
seastar::semaphore _reserve_recalculation_guard;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
@@ -512,6 +515,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
|
||||
uint64_t _file_pos = 0;
|
||||
uint64_t _flush_pos = 0;
|
||||
uint64_t _size_on_disk = 0;
|
||||
uint64_t _waste = 0;
|
||||
|
||||
size_t _alignment;
|
||||
|
||||
@@ -598,7 +602,7 @@ public:
|
||||
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
|
||||
++_segment_manager->totals.segments_destroyed;
|
||||
_segment_manager->totals.active_size_on_disk -= file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position());
|
||||
_segment_manager->totals.wasted_size_on_disk -= _waste;
|
||||
_segment_manager->add_file_to_delete(_file_name, _desc);
|
||||
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
|
||||
clogger.warn("Segment {} is dirty and is left on disk.", *this);
|
||||
@@ -725,7 +729,8 @@ public:
|
||||
auto s = co_await sync();
|
||||
co_await flush();
|
||||
co_await terminate();
|
||||
_segment_manager->totals.wasted_size_on_disk += (_size_on_disk - file_position());
|
||||
_waste = _size_on_disk - file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk += _waste;
|
||||
co_return s;
|
||||
}
|
||||
future<sseg_ptr> do_flush(uint64_t pos) {
|
||||
@@ -1223,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c)
|
||||
, _recycled_segments(std::numeric_limits<size_t>::max())
|
||||
, _reserve_replenisher(make_ready_future<>())
|
||||
, _background_sync(make_ready_future<>())
|
||||
, _reserve_recalculation_guard(1)
|
||||
{
|
||||
assert(max_size > 0);
|
||||
assert(max_mutation_size < segment::multi_entry_size_magic);
|
||||
@@ -1248,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
|
||||
}
|
||||
try {
|
||||
gate::holder g(_gate);
|
||||
auto guard = get_units(_reserve_recalculation_guard, 1);
|
||||
if (_reserve_segments.full()) {
|
||||
// can happen if we recalculate
|
||||
continue;
|
||||
}
|
||||
// note: if we were strict with disk size, we would refuse to do this
|
||||
// unless disk footprint is lower than threshold. but we cannot (yet?)
|
||||
// trust that flush logic will absolutely free up an existing
|
||||
@@ -1519,7 +1530,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
|
||||
if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) {
|
||||
for (auto * ext : cfg.extensions->commitlog_file_extensions()) {
|
||||
auto nf = co_await ext->wrap_file(std::move(filename), f, flags);
|
||||
auto nf = co_await ext->wrap_file(filename, f, flags);
|
||||
if (nf) {
|
||||
f = std::move(nf);
|
||||
align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment();
|
||||
@@ -1529,13 +1540,17 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
|
||||
f = make_checked_file(commit_error_handler, std::move(f));
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
commit_error_handler(ep);
|
||||
try {
|
||||
commit_error_handler(std::current_exception());
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
}
|
||||
}
|
||||
if (ep && f) {
|
||||
co_await f.close();
|
||||
}
|
||||
if (ep) {
|
||||
add_file_to_delete(filename, d);
|
||||
co_return coroutine::exception(std::move(ep));
|
||||
}
|
||||
|
||||
@@ -1865,6 +1880,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
||||
|
||||
std::exception_ptr recycle_error;
|
||||
|
||||
size_t num_deleted = 0;
|
||||
bool except = false;
|
||||
while (!files.empty()) {
|
||||
auto filename = std::move(files.back());
|
||||
files.pop_back();
|
||||
@@ -1914,8 +1931,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
||||
}
|
||||
}
|
||||
co_await delete_file(filename);
|
||||
++num_deleted;
|
||||
} catch (...) {
|
||||
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
|
||||
except = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1928,6 +1947,16 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
||||
if (recycle_error && _recycled_segments.empty()) {
|
||||
abort_recycled_list(recycle_error);
|
||||
}
|
||||
// If recycle failed and turned into a delete, we should fake-wakeup waiters
|
||||
// since we might still have cleaned up disk space.
|
||||
if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) {
|
||||
abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files")));
|
||||
}
|
||||
|
||||
// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
|
||||
if (except) {
|
||||
co_await recalculate_footprint();
|
||||
}
|
||||
}
|
||||
|
||||
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
|
||||
@@ -1942,6 +1971,63 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e
|
||||
std::exchange(_disk_deletions, {}).set_exception(ep);
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::recalculate_footprint() {
|
||||
try {
|
||||
co_await do_pending_deletes();
|
||||
|
||||
auto guard = get_units(_reserve_recalculation_guard, 1);
|
||||
auto segments_copy = _segments;
|
||||
std::vector<sseg_ptr> reserves;
|
||||
std::vector<sstring> recycles;
|
||||
// this causes haywire things while we steal stuff, but...
|
||||
while (!_reserve_segments.empty()) {
|
||||
reserves.push_back(_reserve_segments.pop());
|
||||
}
|
||||
while (!_recycled_segments.empty()) {
|
||||
recycles.push_back(_recycled_segments.pop());
|
||||
}
|
||||
|
||||
// first, guesstimate sizes
|
||||
uint64_t recycle_size = recycles.size() * max_size;
|
||||
auto old = totals.total_size_on_disk;
|
||||
|
||||
totals.total_size_on_disk = recycle_size;
|
||||
for (auto& s : _segments) {
|
||||
totals.total_size_on_disk += s->_size_on_disk;
|
||||
}
|
||||
for (auto& s : reserves) {
|
||||
totals.total_size_on_disk += s->_size_on_disk;
|
||||
}
|
||||
|
||||
// now we need to adjust the actual sizes of recycled files
|
||||
|
||||
uint64_t actual_recycled_size = 0;
|
||||
|
||||
try {
|
||||
for (auto& filename : recycles) {
|
||||
auto s = co_await seastar::file_size(filename);
|
||||
actual_recycled_size += s;
|
||||
}
|
||||
} catch (...) {
|
||||
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
|
||||
actual_recycled_size = recycle_size; // best we got
|
||||
}
|
||||
|
||||
for (auto&& filename : recycles) {
|
||||
_recycled_segments.push(std::move(filename));
|
||||
}
|
||||
for (auto&& s : reserves) {
|
||||
_reserve_segments.push(std::move(s)); // you can have it back now.
|
||||
}
|
||||
|
||||
totals.total_size_on_disk += actual_recycled_size - recycle_size;
|
||||
// pushing things to reserve/recycled queues will have resumed any
|
||||
// waiters, so we should be done.
|
||||
} catch (...) {
|
||||
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
auto ftc = std::exchange(_files_to_close, {});
|
||||
auto ftd = std::exchange(_files_to_delete, {});
|
||||
|
||||
6
dist/common/scripts/scylla_ntp_setup
vendored
6
dist/common/scripts/scylla_ntp_setup
vendored
@@ -66,18 +66,18 @@ if __name__ == '__main__':
|
||||
|
||||
target = None
|
||||
if os.path.exists('/lib/systemd/systemd-timesyncd'):
|
||||
if systemd_unit('systemd-timesyncd').is_active():
|
||||
if systemd_unit('systemd-timesyncd').is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
target = 'systemd-timesyncd'
|
||||
if shutil.which('chronyd'):
|
||||
if get_chrony_unit().is_active():
|
||||
if get_chrony_unit().is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
if not target:
|
||||
target = 'chrony'
|
||||
if shutil.which('ntpd'):
|
||||
if get_ntp_unit().is_active():
|
||||
if get_ntp_unit().is_active() == 'active':
|
||||
print('ntp is already configured, skip setup')
|
||||
sys.exit(0)
|
||||
if not target:
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -1041,7 +1041,7 @@ class systemd_unit:
|
||||
return run('systemctl {} disable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
|
||||
|
||||
def is_active(self):
|
||||
return True if run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip() == 'active' else False
|
||||
return run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
|
||||
def mask(self):
|
||||
return run('systemctl {} mask {}'.format(self.ctlparam, self._unit), shell=True, check=True)
|
||||
|
||||
6
dist/docker/debian/build_docker.sh
vendored
6
dist/docker/debian/build_docker.sh
vendored
@@ -25,6 +25,10 @@ product="$(<build/SCYLLA-PRODUCT-FILE)"
|
||||
version="$(<build/SCYLLA-VERSION-FILE)"
|
||||
release="$(<build/SCYLLA-RELEASE-FILE)"
|
||||
|
||||
if [[ "$version" = *rc* ]]; then
|
||||
version=$(echo $version |sed 's/\(.*\)\.)*/\1~/')
|
||||
fi
|
||||
|
||||
mode="release"
|
||||
|
||||
if uname -m | grep x86_64 ; then
|
||||
@@ -93,7 +97,7 @@ run apt-get -y install hostname supervisor openssh-server openssh-client openjdk
|
||||
run locale-gen en_US.UTF-8
|
||||
run bash -ec "dpkg -i packages/*.deb"
|
||||
run apt-get -y clean all
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bashrc"
|
||||
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
|
||||
run mkdir -p /etc/supervisor.conf.d
|
||||
run mkdir -p /var/log/scylla
|
||||
run chown -R scylla:scylla /var/lib/scylla
|
||||
|
||||
@@ -184,14 +184,18 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
|
||||
_logger.info("exception while advertising new connection: {}", std::current_exception());
|
||||
}
|
||||
// Block while monitoring for lifetime/errors.
|
||||
return conn->process().finally([this, conn] {
|
||||
return unadvertise_connection(conn);
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
if (is_broken_pipe_or_connection_reset(ep)) {
|
||||
// expected if another side closes a connection or we're shutting down
|
||||
return;
|
||||
return conn->process().then_wrapped([this, conn] (auto f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
auto ep = std::current_exception();
|
||||
if (!is_broken_pipe_or_connection_reset(ep)) {
|
||||
// some exceptions are expected if another side closes a connection
|
||||
// or we're shutting down
|
||||
_logger.info("exception while processing connection: {}", ep);
|
||||
}
|
||||
}
|
||||
_logger.info("exception while processing connection: {}", ep);
|
||||
return unadvertise_connection(conn);
|
||||
});
|
||||
});
|
||||
return stop_iteration::no;
|
||||
|
||||
@@ -477,49 +477,42 @@ gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request requ
|
||||
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
|
||||
}
|
||||
|
||||
rpc::no_wait_type gossiper::background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn) {
|
||||
(void)with_gate(_background_msg, [this, type = std::move(type), fn = std::move(fn)] () mutable {
|
||||
return container().invoke_on(0, std::move(fn)).handle_exception([type = std::move(type)] (auto ep) {
|
||||
logger.warn("Failed to handle {}: {}", type, ep);
|
||||
});
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
}
|
||||
|
||||
void gossiper::init_messaging_service_handler() {
|
||||
_messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_SYN", [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_syn_msg(from, std::move(syn_msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_ACK", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_ack_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return background_msg("GOSSIP_DIGEST_ACK2", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
|
||||
return gossiper.handle_ack2_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return handle_echo_msg(from, generation_number_opt);
|
||||
});
|
||||
_messaging.register_gossip_shutdown([this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
|
||||
// In a new fiber.
|
||||
(void)container().invoke_on(0, [from, generation_number_opt] (gms::gossiper& gossiper) {
|
||||
return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
|
||||
return gossiper.handle_shutdown_msg(from, generation_number_opt);
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
_messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
|
||||
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
|
||||
@@ -2178,6 +2171,9 @@ future<> gossiper::start() {
|
||||
}
|
||||
|
||||
future<> gossiper::shutdown() {
|
||||
if (!_background_msg.is_closed()) {
|
||||
co_await _background_msg.close();
|
||||
}
|
||||
if (this_shard_id() == 0) {
|
||||
co_await do_stop_gossiping();
|
||||
}
|
||||
|
||||
@@ -41,7 +41,9 @@
|
||||
#include "unimplemented.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/print.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include "utils/atomic_vector.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
@@ -138,12 +140,16 @@ private:
|
||||
bool _enabled = false;
|
||||
semaphore _callback_running{1};
|
||||
semaphore _apply_state_locally_semaphore{100};
|
||||
seastar::gate _background_msg;
|
||||
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
|
||||
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
|
||||
bool _advertise_myself = true;
|
||||
// Map ip address and generation number
|
||||
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
|
||||
future<> _failure_detector_loop_done{make_ready_future<>()} ;
|
||||
|
||||
rpc::no_wait_type background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
|
||||
|
||||
public:
|
||||
// Get current generation number for the given nodes
|
||||
future<std::unordered_map<gms::inet_address, int32_t>>
|
||||
|
||||
@@ -613,7 +613,8 @@ static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
|
||||
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
|
||||
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
} else {
|
||||
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
schema_ptr snp_schema = snp->schema();
|
||||
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
|
||||
auto rd = std::move(rd_ref);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
while (!rd.is_end_of_stream()) {
|
||||
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
|
||||
co_await rd.fill_buffer();
|
||||
while (!rd.is_buffer_empty()) {
|
||||
co_await rd.pop_mutation_fragment().consume(wr);
|
||||
|
||||
@@ -49,12 +49,13 @@ private:
|
||||
public:
|
||||
partition_index_cache* _parent;
|
||||
key_type _key;
|
||||
std::variant<shared_promise<>, partition_index_page> _page;
|
||||
std::variant<lw_shared_ptr<shared_promise<>>, partition_index_page> _page;
|
||||
size_t _size_in_allocator = 0;
|
||||
public:
|
||||
entry(partition_index_cache* parent, key_type key)
|
||||
: _parent(parent)
|
||||
, _key(key)
|
||||
, _page(make_lw_shared<shared_promise<>>())
|
||||
{ }
|
||||
|
||||
void set_page(partition_index_page&& page) noexcept {
|
||||
@@ -76,7 +77,7 @@ private:
|
||||
// Always returns the same value for a given state of _page.
|
||||
size_t size_in_allocator() const { return _size_in_allocator; }
|
||||
|
||||
shared_promise<>& promise() { return std::get<shared_promise<>>(_page); }
|
||||
lw_shared_ptr<shared_promise<>> promise() { return std::get<lw_shared_ptr<shared_promise<>>>(_page); }
|
||||
bool ready() const { return std::holds_alternative<partition_index_page>(_page); }
|
||||
partition_index_page& page() { return std::get<partition_index_page>(_page); }
|
||||
const partition_index_page& page() const { return std::get<partition_index_page>(_page); }
|
||||
@@ -207,9 +208,7 @@ public:
|
||||
return make_ready_future<entry_ptr>(std::move(ptr));
|
||||
} else {
|
||||
++_shard_stats.blocks;
|
||||
return _as(_region, [ptr] () mutable {
|
||||
return ptr.get_entry().promise().get_shared_future();
|
||||
}).then([ptr] () mutable {
|
||||
return ptr.get_entry().promise()->get_shared_future().then([ptr] () mutable {
|
||||
return std::move(ptr);
|
||||
});
|
||||
}
|
||||
@@ -238,12 +237,12 @@ public:
|
||||
entry& e = ptr.get_entry();
|
||||
try {
|
||||
partition_index_page&& page = f.get0();
|
||||
e.promise().set_value();
|
||||
e.promise()->set_value();
|
||||
e.set_page(std::move(page));
|
||||
_shard_stats.used_bytes += e.size_in_allocator();
|
||||
++_shard_stats.populations;
|
||||
} catch (...) {
|
||||
e.promise().set_exception(std::current_exception());
|
||||
e.promise()->set_exception(std::current_exception());
|
||||
with_allocator(_region.allocator(), [&] {
|
||||
_cache.erase(key);
|
||||
});
|
||||
|
||||
43
test.py
43
test.py
@@ -291,6 +291,8 @@ class Test:
|
||||
def print_summary(self):
|
||||
pass
|
||||
|
||||
def get_junit_etree(self):
|
||||
return None
|
||||
|
||||
def check_log(self, trim):
|
||||
"""Check and trim logs and xml output for tests which have it"""
|
||||
@@ -338,9 +340,36 @@ class BoostTest(UnitTest):
|
||||
boost_args += ['--color_output=false']
|
||||
boost_args += ['--']
|
||||
self.args = boost_args + self.args
|
||||
self.casename = casename
|
||||
self.__junit_etree = None
|
||||
|
||||
def get_junit_etree(self):
|
||||
def adjust_suite_name(name):
|
||||
# Normalize "path/to/file.cc" to "path.to.file" to conform to
|
||||
# Jenkins expectations that the suite name is a class name. ".cc"
|
||||
# doesn't add any infomation. Add the mode, otherwise failures
|
||||
# in different modes are indistinguishable. The "test/" prefix adds
|
||||
# no information, so remove it.
|
||||
import re
|
||||
name = re.sub(r'^test/', '', name)
|
||||
name = re.sub(r'\.cc$', '', name)
|
||||
name = re.sub(r'/', '.', name)
|
||||
name = f'{name}.{self.mode}'
|
||||
return name
|
||||
if self.__junit_etree is None:
|
||||
self.__junit_etree = ET.parse(self.xmlout)
|
||||
root = self.__junit_etree.getroot()
|
||||
suites = root.findall('.//TestSuite')
|
||||
for suite in suites:
|
||||
suite.attrib['name'] = adjust_suite_name(suite.attrib['name'])
|
||||
skipped = suite.findall('./TestCase[@reason="disabled"]')
|
||||
for e in skipped:
|
||||
suite.remove(e)
|
||||
os.unlink(self.xmlout)
|
||||
return self.__junit_etree
|
||||
|
||||
def check_log(self, trim):
|
||||
ET.parse(self.xmlout)
|
||||
self.get_junit_etree()
|
||||
super().check_log(trim)
|
||||
|
||||
|
||||
@@ -800,6 +829,17 @@ def write_junit_report(tmpdir, mode):
|
||||
with open(junit_filename, "w") as f:
|
||||
ET.ElementTree(xml_results).write(f, encoding="unicode")
|
||||
|
||||
def write_consolidated_boost_junit_xml(tmpdir, mode):
|
||||
xml = ET.Element("TestLog")
|
||||
for suite in TestSuite.suites.values():
|
||||
for test in suite.tests:
|
||||
if test.mode != mode:
|
||||
continue
|
||||
test_xml = test.get_junit_etree()
|
||||
if test_xml is not None:
|
||||
xml.extend(test_xml.getroot().findall('.//TestSuite'))
|
||||
et = ET.ElementTree(xml)
|
||||
et.write(f'{tmpdir}/{mode}/xml/boost.xunit.xml', encoding='unicode')
|
||||
|
||||
def open_log(tmpdir):
|
||||
pathlib.Path(tmpdir).mkdir(parents=True, exist_ok=True)
|
||||
@@ -839,6 +879,7 @@ async def main():
|
||||
|
||||
for mode in options.modes:
|
||||
write_junit_report(options.tmpdir, mode)
|
||||
write_consolidated_boost_junit_xml(options.tmpdir, mode)
|
||||
|
||||
if 'coverage' in options.modes:
|
||||
coverage.generate_coverage_report("build/coverage", "tests")
|
||||
|
||||
@@ -44,7 +44,9 @@
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/commitlog/commitlog_extensions.hh"
|
||||
#include "db/commitlog/rp_set.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "log.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "test/lib/exception_utils.hh"
|
||||
@@ -947,3 +949,113 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) {
|
||||
co_await log.clear();
|
||||
}
|
||||
}
|
||||
|
||||
static future<> do_test_exception_in_allocate_ex(bool do_file_delete, bool reuse = true) {
|
||||
commitlog::config cfg;
|
||||
|
||||
constexpr auto max_size_mb = 1;
|
||||
|
||||
cfg.commitlog_segment_size_in_mb = max_size_mb;
|
||||
cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count;
|
||||
cfg.commitlog_sync_period_in_ms = 10;
|
||||
cfg.reuse_segments = reuse;
|
||||
cfg.allow_going_over_size_limit = false; // #9348 - now can enforce size limit always
|
||||
cfg.use_o_dsync = true; // make sure we pre-allocate.
|
||||
|
||||
// not using cl_test, because we need to be able to abandon
|
||||
// the log.
|
||||
|
||||
tmpdir tmp;
|
||||
cfg.commit_log_location = tmp.path().string();
|
||||
|
||||
class myfail : public std::exception {
|
||||
public:
|
||||
using std::exception::exception;
|
||||
};
|
||||
|
||||
struct myext: public db::commitlog_file_extension {
|
||||
public:
|
||||
bool fail = false;
|
||||
bool thrown = false;
|
||||
bool do_file_delete;
|
||||
|
||||
myext(bool dd)
|
||||
: do_file_delete(dd)
|
||||
{}
|
||||
|
||||
seastar::future<seastar::file> wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override {
|
||||
if (fail && !thrown) {
|
||||
thrown = true;
|
||||
if (do_file_delete) {
|
||||
co_await f.close();
|
||||
co_await seastar::remove_file(filename);
|
||||
}
|
||||
throw myfail{};
|
||||
}
|
||||
co_return f;
|
||||
}
|
||||
seastar::future<> before_delete(const seastar::sstring&) override {
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
auto ep = std::make_unique<myext>(do_file_delete);
|
||||
auto& mx = *ep;
|
||||
|
||||
db::extensions myexts;
|
||||
myexts.add_commitlog_file_extension("hufflepuff", std::move(ep));
|
||||
|
||||
cfg.extensions = &myexts;
|
||||
|
||||
auto log = co_await commitlog::create_commitlog(cfg);
|
||||
|
||||
rp_set rps;
|
||||
// uncomment for verbosity
|
||||
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
|
||||
|
||||
auto uuid = utils::UUID_gen::get_time_UUID();
|
||||
auto size = log.max_record_size();
|
||||
|
||||
auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) {
|
||||
log.discard_completed_segments(id, rps);
|
||||
mx.fail = true;
|
||||
});
|
||||
|
||||
try {
|
||||
while (!mx.thrown) {
|
||||
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
|
||||
dst.fill('1', size);
|
||||
});
|
||||
rps.put(std::move(h));
|
||||
}
|
||||
} catch (...) {
|
||||
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
|
||||
}
|
||||
|
||||
co_await log.shutdown();
|
||||
co_await log.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test generating an exception in segment file allocation
|
||||
*/
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) {
|
||||
co_await do_test_exception_in_allocate_ex(false);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_no_recycle) {
|
||||
co_await do_test_exception_in_allocate_ex(false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test generating an exception in segment file allocation, but also
|
||||
* delete the file, which in turn should cause follow-up exceptions
|
||||
* in cleanup delete. Which CL should handle
|
||||
*/
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file) {
|
||||
co_await do_test_exception_in_allocate_ex(true, false);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file_no_recycle) {
|
||||
co_await do_test_exception_in_allocate_ex(true);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
@@ -56,3 +58,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1447,6 +1447,7 @@ private:
|
||||
|
||||
if (seg != _buf_active) {
|
||||
if (desc.is_empty()) {
|
||||
assert(desc._buf_pointers.empty());
|
||||
_segment_descs.erase(desc);
|
||||
desc._buf_pointers = std::vector<entangled>();
|
||||
free_segment(seg, desc);
|
||||
@@ -1457,7 +1458,7 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void compact_segment_locked(segment* seg, segment_descriptor& desc) {
|
||||
void compact_segment_locked(segment* seg, segment_descriptor& desc) noexcept {
|
||||
auto seg_occupancy = desc.occupancy();
|
||||
llogger.debug("Compacting segment {} from region {}, {}", fmt::ptr(seg), id(), seg_occupancy);
|
||||
|
||||
@@ -1472,6 +1473,7 @@ private:
|
||||
for (entangled& e : _buf_ptrs_for_compact_segment) {
|
||||
if (e) {
|
||||
lsa_buffer* old_ptr = e.get(&lsa_buffer::_link);
|
||||
assert(&desc == old_ptr->_desc);
|
||||
lsa_buffer dst = alloc_buf(old_ptr->_size);
|
||||
memcpy(dst._buf, old_ptr->_buf, dst._size);
|
||||
old_ptr->_link = std::move(dst._link);
|
||||
|
||||
Reference in New Issue
Block a user