Compare commits

...

21 Commits

Author SHA1 Message Date
Beni Peled
542394c82f release: prepare for 4.6.rc1 2021-12-08 11:08:45 +02:00
Avi Kivity
018ad3f6f4 test: refine test suite names exposed via xunit format
The test suite names seen by Jenkins are suboptimal: there is
no distinction between modes, and the ".cc" suffix of file names
is interpreted as a class name, which is converted to a tree node
that must be clicked to expand. Massage the names to remove
unnecessary information and add the mode.

Closes #9696

(cherry picked from commit ef3edcf848)

Fixes #9738.
2021-12-05 19:58:22 +02:00
Avi Kivity
9b8b7efb54 tests: consolidate boost xunit result files
The recent parallelization of boost unit tests caused an increase
in xml result files. This is challenging to Jenkins, since it
appears to use rpc-over-ssh to read the result files, and as a result
it takes more than an hour to read all result files when the Jenkins
main node is not on the same continent as the agent.

To fix this, merge the result files in test.py and leave one result
file per mode. Later we can leave one result file overall (integrating
the mode into the testsuite name), but that can wait.

Tested on a local Jenkins instance (just reading the result files,
not the entire build).

Closes #9668

(cherry picked from commit b23af15432)

Fixes #9738
2021-12-05 19:57:39 +02:00
Botond Dénes
1c3e63975f Merge 'Backport of #9348 (xceptions in commitlog::segment_manager::delete_segments could cause footprint counters to loose track)' from Calle Wilund
Backport of series to 4.6
Upstream merge commit: e2c27ee743.
Refs #9348

Closes #9702

* github.com:scylladb/scylla:
  commitlog: Recalculate footprint on delete_segment exceptions
  commitlog_test: Add test for exception in alloc w. deleted underlying file
  commitlog: Ensure failed-to-create-segment is re-deleted
  commitlog::allocate_segment_ex: Don't re-throw out of function
2021-12-02 09:22:19 +02:00
Calle Wilund
11bb03e46d commitlog: Recalculate footprint on delete_segment exceptions
Fixes #9348

If we get exceptions in delete_segments, we can, and probably will, loose
track of footprint counters. We need to recompute the used disk footprint,
otherwise we will flush too often, and even block indefinately on new_seg
iff using hard limits.
2021-11-29 14:56:48 +00:00
Calle Wilund
810e410c5d commitlog_test: Add test for exception in alloc w. deleted underlying file
Tests that we can handle exception-in-alloc cleanup if the file actually
does not exist. This however uncovers another weakness (addressed in next
patch) - that we can loose track of disk footprint here, and w. hard limits
end up waiting for disk space that never comes. Thus test does not use hard
limit.
2021-11-29 14:56:43 +00:00
Calle Wilund
97f6da0c3e commitlog: Ensure failed-to-create-segment is re-deleted
Fixes #9343

If we fail in allocate_segment_ex, we should push the file opened/created
to the delete set to ensure we reclaim the disk space. We should also
ensure that if we did not recycle a file in delete_segments, we still
wake up any recycle waiters iff we made a file delete instead.

Included a small unit test.
2021-11-29 14:51:39 +00:00
Calle Wilund
c229fe9694 commitlog::allocate_segment_ex: Don't re-throw out of function
Fixes #9342

commitlog_error_handler rethrows. But we want to not. And run post-handler
cleanup (co_await)
2021-11-29 14:51:39 +00:00
Tomasz Grabiec
ee1ca8ae4d lsa: Add sanity checks around lsa_buffer operations
We've been observing hard to explain crashes recently around
lsa_buffer destruction, where the containing segment is absent in
_segment_descs which causes log_heap::adjust_up to abort. Add more
checks to catch certain impossible senarios which can lead to this
sooner.

Refs #9192.
Message-Id: <20211116122346.814437-1-tgrabiec@scylladb.com>

(cherry picked from commit bf6898a5a0)
2021-11-24 15:17:37 +01:00
Tomasz Grabiec
6bfd322e3b lsa: Mark compact_segment_locked() as noexcept
We cannot recover from a failure in this method. The implementation
makes sure it never happens. Invariants will be broken if this
throws. Detect violations early by marking as noexcept.

We could make it exception safe and try to leave the data structures
in a consistent state but the reclaimer cannot make progress if this throws, so
it's pointless.

Refs #9192
Message-Id: <20211116122019.813418-1-tgrabiec@scylladb.com>

(cherry picked from commit 4d627affc3)
2021-11-24 15:17:35 +01:00
Tomasz Grabiec
afc18d5070 cql: Fix missing data in indexed queries with base table short reads
Indexed queries are using paging over the materialized view
table. Results of the view read are then used to issue reads of the
base table. If base table reads are short reads, the page is returned
to the user and paging state is adjusted accordingly so that when
paging is resumed it will query the view starting from the row
corresponding to the next row in the base which was not yet
returned. However, paging state's "remaining" count was not reset, so
if the view read was exhausted the reading will stop even though the
base table read was short.

Fix by restoring the "remaining" count when adjusting the paging state
on short read.

Tests:

  - index_with_paging_test
  - secondary_index_test

Fixes #9198
Message-Id: <20210818131840.1160267-1-tgrabiec@scylladb.com>

(cherry picked from commit 1e4da2dcce)
2021-11-23 11:22:00 +02:00
Tomasz Grabiec
2ec22c2404 sstables: partition_index_cache: Avoid abort due to benign bad_alloc inside allocating section
shared_promise::get_shared_future() is marked noexcept, but can
allocate memory. It is invoked by sstable partition index cache inside
an allocating section, which means that allocations can throw
bad_alloc even though there is memory to reclaim, so under normal
conditions.

Fix by allocating the shared_promise in a stable memory, in the
standard allocator via lw_shared_ptr<>, so that it can be accessed outside
allocating section.

Fixes #9666

Tests:

  - build/dev/test/boost/sstable_partition_index_cache_test

Message-Id: <20211122165100.1606854-1-tgrabiec@scylladb.com>
(cherry picked from commit 1d84bc6c3b)
2021-11-23 11:21:27 +02:00
Avi Kivity
19da778271 Merge "Run gossiper message handlers in a gate" from Pavel E
"
When gossiper processes its messages in the background some of
the continuations may pop up after the gossiper is shutdown.
This, in turn, may result in unwanted code to be executed when
it doesn't expect.

In particular, storage_service notification hooks may try to
update system keyspace (with "fresh" peer info/state/tokens/etc).
This update doesn't work after drain because drain shuts down
commitlog. The intention was that gossiper did _not_ notify
anyone after drain, because it's shut down during drain too.
But since there are background continuations left, it's not
working as expected.

refs: #9567
tests: unit(dev), dtest.concurrent_schema_changes.snapshot(dev)
"

* 'br-gossiper-background-messages-2' of https://github.com/xemul/scylla:
  gossiper: Guard background processing with gate
  gossiper: Helper for background messaging processing

(cherry picked from commit 9e2b6176a2)
2021-11-19 07:25:26 +02:00
Avi Kivity
cbd4c13ba6 Merge 'Revert "scylla_util.py: return bool value on systemd_unit.is_active()"' from Takuya ASADA
On scylla_unit.py, we provide `systemd_unit.is_active()` to return `systemctl is-active` output.
When we introduced systemd_unit class, we just returned `systemctl is-active` output as string, but we changed the return value to bool after that (2545d7fd43).
This was because `if unit.is_active():` always becomes True even it returns "failed" or "inactive", to avoid such scripting bug.
However, probably this was mistake.
Because systemd unit state is not 2 state, like "start" / "stop", there are many state.

And we already using multiple unit state ("activating", "failed", "inactive", "active") in our Cloud image login prompt:
https://github.com/scylladb/scylla-machine-image/blob/next/common/scylla_login#L135
After we merged 2545d7fd43, the login prompt is broken, because it does not return string as script expected (https://github.com/scylladb/scylla-machine-image/issues/241).

I think we should revert 2545d7fd43, it should return exactly same value as `systemctl is-active` says.

Fixes #9627
Fixes scylladb/scylla-machine-image#241

Closes #9628

* github.com:scylladb/scylla:
  scylla_ntp_setup: use string in systemd_unit.is_active()
  Revert "scylla_util.py: return bool value on systemd_unit.is_active()"

(cherry picked from commit c17101604f)
2021-11-18 11:44:11 +02:00
Pavel Emelyanov
338871802d generic_server: Keep server alive during conn background processing
There's at least one tiny race in generic_server code. The trailing
.handle_exception after the conn->process() captures this, but since the
whole continuation chain happens in the background, that this can be
released thus causing the whole lambda to execute on freed generic_server
instance. This, in turn, is not nice because captured this is used to get
a _logger from.

The fix is based on the observation that all connections pin the server
in memory until all of them (connections) are destructed. Said that, to
keep the server alive in the aforementioned lambda it's enough to make
sure the conn variable (it's lw_shared_ptr on the connection) is alive in
it. Not to generate a bunch of tiny continuations with identical set of
captures -- tail the single .then_wrapped() one and do whatever is needed
to wrap up the connection processing in it.

tests: unit(dev)
fixes: #9316

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20211115105818.11348-1-xemul@scylladb.com>
(cherry picked from commit ba16318457)
2021-11-17 10:21:11 +02:00
Yaron Kaikov
8b5b1b8af6 dist/docker/debian/build_docker.sh: debian version fix for rc releases
When building a docker we relay on `VERSION` value from
`SCYLLA-VERSION-GEN` . For `rc` releases only there is a different
between the configured version (X.X.rcX) and the actualy debian package
we generate (X.X~rcX)

Using a similar solution as i did in dcb10374a5

Fixes: #9616

Closes #9617

(cherry picked from commit 060a91431d)
2021-11-12 20:07:19 +02:00
Takuya ASADA
ea89eff95d dist/docker: fix bashrc filename for Ubuntu
For Debian variants, correct filename is /etc/bash.bashrc.

Fixes #9588

Closes #9589

(cherry picked from commit 201a97e4a4)
2021-11-10 14:25:27 +02:00
Michał Radwański
96421e7779 memtable: fix gcc function argument evaluation order induced use after move
clang evaluates function arguments from left to right, while gcc does so
in reverse. Therefore, this code can be correct on clang and incorrect
on gcc:
```
f(x.sth(), std::move(x))
```

This patch fixes one such instance of this bug, in memtable.cc.

Fixes #9605.

Closes #9606

(cherry picked from commit eff392073c)
2021-11-10 08:58:09 +02:00
Botond Dénes
142336ca53 mutation_writer/feed_writer: don't drop readers with small amount of content
Due to an error in transforming the above routine, readers who have <= a
buffer worth of content are dropped without consuming them.
This is due to the outer consume loop being conditioned on
`is_end_of_stream()`, which will be set for readers that eagerly
pre-fill their buffer and also have no more data then what is in their
buffer.
Change the condition to also check for `is_buffer_empty()` and only drop
the reader if both of these are true.

Fixes: #9594

Tests: unit(mutation_writer_test --repeat=200, dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20211108092923.104504-1-bdenes@scylladb.com>
(cherry picked from commit 4b6c0fe592)
2021-11-09 14:13:21 +02:00
Calle Wilund
492f12248c commitlog: Add explicit track var for "wasted space" to avoid double counting
Refs #9331

In segment::close() we add space to managers "wasted" counter. In destructor,
if we can cleanly delete/recycle the file we remove it. However, if we never
went through close (shutdown - ok, exception in batch_cycle - not ok), we can
end up subtracting numbers that were never added in the first place.
Just keep track of the bytes added in a var.

Observed behaviour in above issue is timeouts in batch_cycle, where we
declare the segment closed early (because we cannot add anything more safely
- chunks could get partial/misplaced). Exception will propagate to caller(s),
but the segment will not go through actual close() call -> destructor should
not assume such.

Closes #9598

(cherry picked from commit 3929b7da1f)
2021-11-09 14:07:04 +02:00
Yaron Kaikov
7eb7a0e5fe release: prepare for 4.6.rc0 2021-11-08 09:18:26 +02:00
16 changed files with 351 additions and 49 deletions

View File

@@ -60,7 +60,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=4.6.dev
VERSION=4.6.rc1
if test -f version
then

View File

@@ -995,6 +995,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
}
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
paging_state_copy->set_remaining(internal_paging_size);
paging_state_copy->set_partition_key(std::move(index_pk));
paging_state_copy->set_clustering_key(std::move(index_ck));
return std::move(paging_state_copy);

View File

@@ -428,6 +428,8 @@ private:
void abort_recycled_list(std::exception_ptr);
void abort_deletion_promise(std::exception_ptr);
future<> recalculate_footprint();
future<> rename_file(sstring, sstring) const;
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
@@ -444,6 +446,7 @@ private:
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
seastar::semaphore _reserve_recalculation_guard;
};
template<typename T>
@@ -512,6 +515,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
uint64_t _file_pos = 0;
uint64_t _flush_pos = 0;
uint64_t _size_on_disk = 0;
uint64_t _waste = 0;
size_t _alignment;
@@ -598,7 +602,7 @@ public:
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.active_size_on_disk -= file_position();
_segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position());
_segment_manager->totals.wasted_size_on_disk -= _waste;
_segment_manager->add_file_to_delete(_file_name, _desc);
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
clogger.warn("Segment {} is dirty and is left on disk.", *this);
@@ -725,7 +729,8 @@ public:
auto s = co_await sync();
co_await flush();
co_await terminate();
_segment_manager->totals.wasted_size_on_disk += (_size_on_disk - file_position());
_waste = _size_on_disk - file_position();
_segment_manager->totals.wasted_size_on_disk += _waste;
co_return s;
}
future<sseg_ptr> do_flush(uint64_t pos) {
@@ -1223,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
, _reserve_recalculation_guard(1)
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
@@ -1248,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
}
try {
gate::holder g(_gate);
auto guard = get_units(_reserve_recalculation_guard, 1);
if (_reserve_segments.full()) {
// can happen if we recalculate
continue;
}
// note: if we were strict with disk size, we would refuse to do this
// unless disk footprint is lower than threshold. but we cannot (yet?)
// trust that flush logic will absolutely free up an existing
@@ -1519,7 +1530,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) {
for (auto * ext : cfg.extensions->commitlog_file_extensions()) {
auto nf = co_await ext->wrap_file(std::move(filename), f, flags);
auto nf = co_await ext->wrap_file(filename, f, flags);
if (nf) {
f = std::move(nf);
align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment();
@@ -1529,13 +1540,17 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
f = make_checked_file(commit_error_handler, std::move(f));
} catch (...) {
ep = std::current_exception();
commit_error_handler(ep);
try {
commit_error_handler(std::current_exception());
} catch (...) {
ep = std::current_exception();
}
}
if (ep && f) {
co_await f.close();
}
if (ep) {
add_file_to_delete(filename, d);
co_return coroutine::exception(std::move(ep));
}
@@ -1865,6 +1880,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
std::exception_ptr recycle_error;
size_t num_deleted = 0;
bool except = false;
while (!files.empty()) {
auto filename = std::move(files.back());
files.pop_back();
@@ -1914,8 +1931,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
}
}
co_await delete_file(filename);
++num_deleted;
} catch (...) {
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
except = true;
}
}
@@ -1928,6 +1947,16 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
if (recycle_error && _recycled_segments.empty()) {
abort_recycled_list(recycle_error);
}
// If recycle failed and turned into a delete, we should fake-wakeup waiters
// since we might still have cleaned up disk space.
if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) {
abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files")));
}
// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
if (except) {
co_await recalculate_footprint();
}
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
@@ -1942,6 +1971,63 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e
std::exchange(_disk_deletions, {}).set_exception(ep);
}
future<> db::commitlog::segment_manager::recalculate_footprint() {
try {
co_await do_pending_deletes();
auto guard = get_units(_reserve_recalculation_guard, 1);
auto segments_copy = _segments;
std::vector<sseg_ptr> reserves;
std::vector<sstring> recycles;
// this causes haywire things while we steal stuff, but...
while (!_reserve_segments.empty()) {
reserves.push_back(_reserve_segments.pop());
}
while (!_recycled_segments.empty()) {
recycles.push_back(_recycled_segments.pop());
}
// first, guesstimate sizes
uint64_t recycle_size = recycles.size() * max_size;
auto old = totals.total_size_on_disk;
totals.total_size_on_disk = recycle_size;
for (auto& s : _segments) {
totals.total_size_on_disk += s->_size_on_disk;
}
for (auto& s : reserves) {
totals.total_size_on_disk += s->_size_on_disk;
}
// now we need to adjust the actual sizes of recycled files
uint64_t actual_recycled_size = 0;
try {
for (auto& filename : recycles) {
auto s = co_await seastar::file_size(filename);
actual_recycled_size += s;
}
} catch (...) {
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
actual_recycled_size = recycle_size; // best we got
}
for (auto&& filename : recycles) {
_recycled_segments.push(std::move(filename));
}
for (auto&& s : reserves) {
_reserve_segments.push(std::move(s)); // you can have it back now.
}
totals.total_size_on_disk += actual_recycled_size - recycle_size;
// pushing things to reserve/recycled queues will have resumed any
// waiters, so we should be done.
} catch (...) {
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
}
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftc = std::exchange(_files_to_close, {});
auto ftd = std::exchange(_files_to_delete, {});

View File

@@ -66,18 +66,18 @@ if __name__ == '__main__':
target = None
if os.path.exists('/lib/systemd/systemd-timesyncd'):
if systemd_unit('systemd-timesyncd').is_active():
if systemd_unit('systemd-timesyncd').is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
target = 'systemd-timesyncd'
if shutil.which('chronyd'):
if get_chrony_unit().is_active():
if get_chrony_unit().is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
if not target:
target = 'chrony'
if shutil.which('ntpd'):
if get_ntp_unit().is_active():
if get_ntp_unit().is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
if not target:

View File

@@ -1041,7 +1041,7 @@ class systemd_unit:
return run('systemctl {} disable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def is_active(self):
return True if run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip() == 'active' else False
return run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip()
def mask(self):
return run('systemctl {} mask {}'.format(self.ctlparam, self._unit), shell=True, check=True)

View File

@@ -25,6 +25,10 @@ product="$(<build/SCYLLA-PRODUCT-FILE)"
version="$(<build/SCYLLA-VERSION-FILE)"
release="$(<build/SCYLLA-RELEASE-FILE)"
if [[ "$version" = *rc* ]]; then
version=$(echo $version |sed 's/\(.*\)\.)*/\1~/')
fi
mode="release"
if uname -m | grep x86_64 ; then
@@ -93,7 +97,7 @@ run apt-get -y install hostname supervisor openssh-server openssh-client openjdk
run locale-gen en_US.UTF-8
run bash -ec "dpkg -i packages/*.deb"
run apt-get -y clean all
run bash -ec "cat /scylla_bashrc >> /etc/bashrc"
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
run mkdir -p /etc/supervisor.conf.d
run mkdir -p /var/log/scylla
run chown -R scylla:scylla /var/lib/scylla

View File

@@ -184,14 +184,18 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
_logger.info("exception while advertising new connection: {}", std::current_exception());
}
// Block while monitoring for lifetime/errors.
return conn->process().finally([this, conn] {
return unadvertise_connection(conn);
}).handle_exception([this] (std::exception_ptr ep) {
if (is_broken_pipe_or_connection_reset(ep)) {
// expected if another side closes a connection or we're shutting down
return;
return conn->process().then_wrapped([this, conn] (auto f) {
try {
f.get();
} catch (...) {
auto ep = std::current_exception();
if (!is_broken_pipe_or_connection_reset(ep)) {
// some exceptions are expected if another side closes a connection
// or we're shutting down
_logger.info("exception while processing connection: {}", ep);
}
}
_logger.info("exception while processing connection: {}", ep);
return unadvertise_connection(conn);
});
});
return stop_iteration::no;

View File

@@ -477,49 +477,42 @@ gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request requ
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
}
rpc::no_wait_type gossiper::background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn) {
(void)with_gate(_background_msg, [this, type = std::move(type), fn = std::move(fn)] () mutable {
return container().invoke_on(0, std::move(fn)).handle_exception([type = std::move(type)] (auto ep) {
logger.warn("Failed to handle {}: {}", type, ep);
});
});
return messaging_service::no_wait();
}
void gossiper::init_messaging_service_handler() {
_messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_SYN", [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_syn_msg(from, std::move(syn_msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_ACK", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_ACK2", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack2_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return handle_echo_msg(from, generation_number_opt);
});
_messaging.register_gossip_shutdown([this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
// In a new fiber.
(void)container().invoke_on(0, [from, generation_number_opt] (gms::gossiper& gossiper) {
return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
return gossiper.handle_shutdown_msg(from, generation_number_opt);
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
@@ -2178,6 +2171,9 @@ future<> gossiper::start() {
}
future<> gossiper::shutdown() {
if (!_background_msg.is_closed()) {
co_await _background_msg.close();
}
if (this_shard_id() == 0) {
co_await do_stop_gossiping();
}

View File

@@ -41,7 +41,9 @@
#include "unimplemented.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/print.hh>
#include <seastar/rpc/rpc_types.hh>
#include "utils/atomic_vector.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
@@ -138,12 +140,16 @@ private:
bool _enabled = false;
semaphore _callback_running{1};
semaphore _apply_state_locally_semaphore{100};
seastar::gate _background_msg;
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
bool _advertise_myself = true;
// Map ip address and generation number
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
future<> _failure_detector_loop_done{make_ready_future<>()} ;
rpc::no_wait_type background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
public:
// Get current generation number for the given nodes
future<std::unordered_map<gms::inet_address, int32_t>>

View File

@@ -613,7 +613,8 @@ static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
} else {
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
schema_ptr snp_schema = snp->schema();
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
}
}

View File

@@ -54,7 +54,7 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
auto rd = std::move(rd_ref);
std::exception_ptr ex;
try {
while (!rd.is_end_of_stream()) {
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
co_await rd.fill_buffer();
while (!rd.is_buffer_empty()) {
co_await rd.pop_mutation_fragment().consume(wr);

View File

@@ -49,12 +49,13 @@ private:
public:
partition_index_cache* _parent;
key_type _key;
std::variant<shared_promise<>, partition_index_page> _page;
std::variant<lw_shared_ptr<shared_promise<>>, partition_index_page> _page;
size_t _size_in_allocator = 0;
public:
entry(partition_index_cache* parent, key_type key)
: _parent(parent)
, _key(key)
, _page(make_lw_shared<shared_promise<>>())
{ }
void set_page(partition_index_page&& page) noexcept {
@@ -76,7 +77,7 @@ private:
// Always returns the same value for a given state of _page.
size_t size_in_allocator() const { return _size_in_allocator; }
shared_promise<>& promise() { return std::get<shared_promise<>>(_page); }
lw_shared_ptr<shared_promise<>> promise() { return std::get<lw_shared_ptr<shared_promise<>>>(_page); }
bool ready() const { return std::holds_alternative<partition_index_page>(_page); }
partition_index_page& page() { return std::get<partition_index_page>(_page); }
const partition_index_page& page() const { return std::get<partition_index_page>(_page); }
@@ -207,9 +208,7 @@ public:
return make_ready_future<entry_ptr>(std::move(ptr));
} else {
++_shard_stats.blocks;
return _as(_region, [ptr] () mutable {
return ptr.get_entry().promise().get_shared_future();
}).then([ptr] () mutable {
return ptr.get_entry().promise()->get_shared_future().then([ptr] () mutable {
return std::move(ptr);
});
}
@@ -238,12 +237,12 @@ public:
entry& e = ptr.get_entry();
try {
partition_index_page&& page = f.get0();
e.promise().set_value();
e.promise()->set_value();
e.set_page(std::move(page));
_shard_stats.used_bytes += e.size_in_allocator();
++_shard_stats.populations;
} catch (...) {
e.promise().set_exception(std::current_exception());
e.promise()->set_exception(std::current_exception());
with_allocator(_region.allocator(), [&] {
_cache.erase(key);
});

43
test.py
View File

@@ -291,6 +291,8 @@ class Test:
def print_summary(self):
pass
def get_junit_etree(self):
return None
def check_log(self, trim):
"""Check and trim logs and xml output for tests which have it"""
@@ -338,9 +340,36 @@ class BoostTest(UnitTest):
boost_args += ['--color_output=false']
boost_args += ['--']
self.args = boost_args + self.args
self.casename = casename
self.__junit_etree = None
def get_junit_etree(self):
def adjust_suite_name(name):
# Normalize "path/to/file.cc" to "path.to.file" to conform to
# Jenkins expectations that the suite name is a class name. ".cc"
# doesn't add any infomation. Add the mode, otherwise failures
# in different modes are indistinguishable. The "test/" prefix adds
# no information, so remove it.
import re
name = re.sub(r'^test/', '', name)
name = re.sub(r'\.cc$', '', name)
name = re.sub(r'/', '.', name)
name = f'{name}.{self.mode}'
return name
if self.__junit_etree is None:
self.__junit_etree = ET.parse(self.xmlout)
root = self.__junit_etree.getroot()
suites = root.findall('.//TestSuite')
for suite in suites:
suite.attrib['name'] = adjust_suite_name(suite.attrib['name'])
skipped = suite.findall('./TestCase[@reason="disabled"]')
for e in skipped:
suite.remove(e)
os.unlink(self.xmlout)
return self.__junit_etree
def check_log(self, trim):
ET.parse(self.xmlout)
self.get_junit_etree()
super().check_log(trim)
@@ -800,6 +829,17 @@ def write_junit_report(tmpdir, mode):
with open(junit_filename, "w") as f:
ET.ElementTree(xml_results).write(f, encoding="unicode")
def write_consolidated_boost_junit_xml(tmpdir, mode):
xml = ET.Element("TestLog")
for suite in TestSuite.suites.values():
for test in suite.tests:
if test.mode != mode:
continue
test_xml = test.get_junit_etree()
if test_xml is not None:
xml.extend(test_xml.getroot().findall('.//TestSuite'))
et = ET.ElementTree(xml)
et.write(f'{tmpdir}/{mode}/xml/boost.xunit.xml', encoding='unicode')
def open_log(tmpdir):
pathlib.Path(tmpdir).mkdir(parents=True, exist_ok=True)
@@ -839,6 +879,7 @@ async def main():
for mode in options.modes:
write_junit_report(options.tmpdir, mode)
write_consolidated_boost_junit_xml(options.tmpdir, mode)
if 'coverage' in options.modes:
coverage.generate_coverage_report("build/coverage", "tests")

View File

@@ -44,7 +44,9 @@
#include "test/lib/tmpdir.hh"
#include "db/commitlog/commitlog.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog_extensions.hh"
#include "db/commitlog/rp_set.hh"
#include "db/extensions.hh"
#include "log.hh"
#include "service/priority_manager.hh"
#include "test/lib/exception_utils.hh"
@@ -947,3 +949,113 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) {
co_await log.clear();
}
}
static future<> do_test_exception_in_allocate_ex(bool do_file_delete, bool reuse = true) {
commitlog::config cfg;
constexpr auto max_size_mb = 1;
cfg.commitlog_segment_size_in_mb = max_size_mb;
cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count;
cfg.commitlog_sync_period_in_ms = 10;
cfg.reuse_segments = reuse;
cfg.allow_going_over_size_limit = false; // #9348 - now can enforce size limit always
cfg.use_o_dsync = true; // make sure we pre-allocate.
// not using cl_test, because we need to be able to abandon
// the log.
tmpdir tmp;
cfg.commit_log_location = tmp.path().string();
class myfail : public std::exception {
public:
using std::exception::exception;
};
struct myext: public db::commitlog_file_extension {
public:
bool fail = false;
bool thrown = false;
bool do_file_delete;
myext(bool dd)
: do_file_delete(dd)
{}
seastar::future<seastar::file> wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override {
if (fail && !thrown) {
thrown = true;
if (do_file_delete) {
co_await f.close();
co_await seastar::remove_file(filename);
}
throw myfail{};
}
co_return f;
}
seastar::future<> before_delete(const seastar::sstring&) override {
co_return;
}
};
auto ep = std::make_unique<myext>(do_file_delete);
auto& mx = *ep;
db::extensions myexts;
myexts.add_commitlog_file_extension("hufflepuff", std::move(ep));
cfg.extensions = &myexts;
auto log = co_await commitlog::create_commitlog(cfg);
rp_set rps;
// uncomment for verbosity
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
auto uuid = utils::UUID_gen::get_time_UUID();
auto size = log.max_record_size();
auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) {
log.discard_completed_segments(id, rps);
mx.fail = true;
});
try {
while (!mx.thrown) {
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.fill('1', size);
});
rps.put(std::move(h));
}
} catch (...) {
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
}
co_await log.shutdown();
co_await log.clear();
}
/**
* Test generating an exception in segment file allocation
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) {
co_await do_test_exception_in_allocate_ex(false);
}
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_no_recycle) {
co_await do_test_exception_in_allocate_ex(false, false);
}
/**
* Test generating an exception in segment file allocation, but also
* delete the file, which in turn should cause follow-up exceptions
* in cleanup delete. Which CL should handle
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file) {
co_await do_test_exception_in_allocate_ex(true, false);
}
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file_no_recycle) {
co_await do_test_exception_in_allocate_ex(true);
}

View File

@@ -22,6 +22,8 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
SEASTAR_TEST_CASE(test_index_with_paging) {
@@ -56,3 +58,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}

View File

@@ -1447,6 +1447,7 @@ private:
if (seg != _buf_active) {
if (desc.is_empty()) {
assert(desc._buf_pointers.empty());
_segment_descs.erase(desc);
desc._buf_pointers = std::vector<entangled>();
free_segment(seg, desc);
@@ -1457,7 +1458,7 @@ private:
}
}
void compact_segment_locked(segment* seg, segment_descriptor& desc) {
void compact_segment_locked(segment* seg, segment_descriptor& desc) noexcept {
auto seg_occupancy = desc.occupancy();
llogger.debug("Compacting segment {} from region {}, {}", fmt::ptr(seg), id(), seg_occupancy);
@@ -1472,6 +1473,7 @@ private:
for (entangled& e : _buf_ptrs_for_compact_segment) {
if (e) {
lsa_buffer* old_ptr = e.get(&lsa_buffer::_link);
assert(&desc == old_ptr->_desc);
lsa_buffer dst = alloc_buf(old_ptr->_size);
memcpy(dst._buf, old_ptr->_buf, dst._size);
old_ptr->_link = std::move(dst._link);