Compare commits

...

32 Commits

Author SHA1 Message Date
Yaron Kaikov
e8a1cfb6f8 release: prepare for 4.6.rc2 2022-01-02 09:15:47 +02:00
Tomasz Grabiec
fc312b3021 lsa: Fix segment leak on memory reclamation during alloc_buf
alloc_buf() calls new_buf_active() when there is no active segment to
allocate a new active segment. new_buf_active() allocates memory
(e.g. a new segment) so may cause memory reclamation, which may cause
segment compaction, which may call alloc_buf() and re-enter
new_buf_active(). The first call to new_buf_active() would then
override _buf_active and cause the segment allocated during segment
compaction to be leaked.

This then causes abort when objects from the leaked segment are freed
because the segment is expected to be present in _closed_segments, but
isn't. boost::intrusive::list::erase() will fail on assertion that the
object being erased is linked.

Introduced in b5ca0eb2a2.

Fixes #9821
Fixes #9192
Fixes #9825
Fixes #9544
Fixes #9508
Refs #9573

Message-Id: <20211229201443.119812-1-tgrabiec@scylladb.com>
(cherry picked from commit 7038dc7003)
2021-12-30 18:56:28 +02:00
Nadav Har'El
7b82aaf939 alternator: fix error on UpdateTable for non-existent table
When the UpdateTable operation is called for a non-existent table, the
appropriate error is ResourceNotFoundException, but before this patch
we ran into an exception, which resulted in an ugly "internal server
error".

In this patch we use the existing get_table() function which most other
operations use, and which does all the appropriate verifications and
generates the appropriate Alternator api_error instead of letting
internal Scylla exceptions escape to the user.

This patch also includes a test for UpdateTable on a non-existent table,
which used to fail before this patch and pass afterwards. We also add a
test for DeleteTable in the same scenario, and see it didn't have this
bug. As usual, both tests pass on DynamoDB, which confirms we generate
the right error codes.

Fixes #9747.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211206181605.1182431-1-nyh@scylladb.com>
(cherry picked from commit 31eeb44d28)
2021-12-29 22:59:25 +02:00
Nadav Har'El
894a4abfae commitlog: fix missing wait for semaphore units
Commit dcc73c5d4e introduced a semaphore
for excluding concurrent recalculations - _reserve_recalculation_guard.

Unfortunately, the two places in the code which tried to take this
guard just called get_units() - which returns a future<units>, not
units - and never waited for this future to become available.

So this patch adds the missing "co_await" needed to wait for the
units to become available.

Fixes #9770.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211214122612.1462436-1-nyh@scylladb.com>
(cherry picked from commit b8786b96f4)
2021-12-29 13:18:59 +02:00
Takuya ASADA
4dcf023470 scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8
On CentOS8, mdmonitor.service does not works correctly when using
mdadm-4.1-15.el8.x86_64 and later versions.
Until we find a solution, let's pinning the package version to older one
which does not cause the issue (4.1-14.el8.x86_64).

Fixes #9540

Closes #9782

(cherry picked from commit 0d8f932f0b)
2021-12-28 11:38:04 +02:00
Benny Halevy
283788828e compaction: scrub_validate_mode_validate_reader: throw compaction_stopped_exception if stop is requested
Currently when scrub/validate is stopped (e.g. via the api),
scrub_validate_mode_validate_reader co_return:s without
closing the reader passed to it - causing a crash due
to internal error check, see #9766.

Throwing a compaction_stopped_exception rather than co_return:ing
an exception will be handled as any other exeption, including closing
the reader.

Fixes #9766

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20211213125528.2422745-1-bhalevy@scylladb.com>
(cherry picked from commit c89876c975)
2021-12-15 15:03:59 +02:00
Pavel Emelyanov
730a147ba6 row-cache: Handle exception (un)safety of rows_entry insertion
The B-tree's insert_before() is throwing operation, its caller
must account for that. When the rows_entry's collection was
switched on B-tree all the risky places were fixed by ee9e1045,
but few places went under the radar.

In the cache_flat_mutation_reader there's a place where a C-pointer
is inserted into the tree, thus potentially leaking the entry.

In the partition_snapshot_row_cursor there are two places that not
only leak the entry, but also leave it in the LRU list. The latter
it quite nasty, because those entry can be evicted, eviction code
tries to get rows_entry iterator from "this", but the hook happens
to be unattached (because insertion threw) and fails the assert.

fixes: #9728

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit ee103636ac)
2021-12-14 15:53:42 +02:00
Pavel Emelyanov
9897e83029 partition_snapshot_row_cursor: Shuffle ensure_result creation
Both places get the C-pointer on the freshly allocated rows_entry,
insert it where needed and return back the dereferenced pointer.

The C-pointer is going to become smart-pointer that would go out
of scope before return. This change prepares for that by constructing
the ensure_result from the iterator, that's returned from insertion
of the entry.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit 9fd8db318d)

Ref #9728
2021-12-14 15:52:37 +02:00
Asias He
1a9b64e6f6 storage_service: Wait for seastar::get_units in node_ops
The seastar::get_units returns a future, we have to wait for it.

Fixes #9767

Closes #9768

(cherry picked from commit 9859c76de1)
2021-12-12 18:42:20 +02:00
Takuya ASADA
49fe9e2c8e dist: allow running scylla-housekeeping with strict umask setting
To avoid failing scylla-housekeeping in strict umask environment,
we need to chmod a+r on repository file and housekeeping.uuid.

Fixes #9683

Closes #9739

(cherry picked from commit ea20f89c56)
2021-12-12 14:25:57 +02:00
Takuya ASADA
d0580c41ee dist: add support im4gn/is4gen instance on AWS
Add support next-generation, storage-optimized ARM64 instance types.

Fixes #9711

Closes #9730

(cherry picked from commit 097a6ee245)
2021-12-08 14:29:44 +02:00
Beni Peled
542394c82f release: prepare for 4.6.rc1 2021-12-08 11:08:45 +02:00
Avi Kivity
018ad3f6f4 test: refine test suite names exposed via xunit format
The test suite names seen by Jenkins are suboptimal: there is
no distinction between modes, and the ".cc" suffix of file names
is interpreted as a class name, which is converted to a tree node
that must be clicked to expand. Massage the names to remove
unnecessary information and add the mode.

Closes #9696

(cherry picked from commit ef3edcf848)

Fixes #9738.
2021-12-05 19:58:22 +02:00
Avi Kivity
9b8b7efb54 tests: consolidate boost xunit result files
The recent parallelization of boost unit tests caused an increase
in xml result files. This is challenging to Jenkins, since it
appears to use rpc-over-ssh to read the result files, and as a result
it takes more than an hour to read all result files when the Jenkins
main node is not on the same continent as the agent.

To fix this, merge the result files in test.py and leave one result
file per mode. Later we can leave one result file overall (integrating
the mode into the testsuite name), but that can wait.

Tested on a local Jenkins instance (just reading the result files,
not the entire build).

Closes #9668

(cherry picked from commit b23af15432)

Fixes #9738
2021-12-05 19:57:39 +02:00
Botond Dénes
1c3e63975f Merge 'Backport of #9348 (xceptions in commitlog::segment_manager::delete_segments could cause footprint counters to loose track)' from Calle Wilund
Backport of series to 4.6
Upstream merge commit: e2c27ee743.
Refs #9348

Closes #9702

* github.com:scylladb/scylla:
  commitlog: Recalculate footprint on delete_segment exceptions
  commitlog_test: Add test for exception in alloc w. deleted underlying file
  commitlog: Ensure failed-to-create-segment is re-deleted
  commitlog::allocate_segment_ex: Don't re-throw out of function
2021-12-02 09:22:19 +02:00
Calle Wilund
11bb03e46d commitlog: Recalculate footprint on delete_segment exceptions
Fixes #9348

If we get exceptions in delete_segments, we can, and probably will, loose
track of footprint counters. We need to recompute the used disk footprint,
otherwise we will flush too often, and even block indefinately on new_seg
iff using hard limits.
2021-11-29 14:56:48 +00:00
Calle Wilund
810e410c5d commitlog_test: Add test for exception in alloc w. deleted underlying file
Tests that we can handle exception-in-alloc cleanup if the file actually
does not exist. This however uncovers another weakness (addressed in next
patch) - that we can loose track of disk footprint here, and w. hard limits
end up waiting for disk space that never comes. Thus test does not use hard
limit.
2021-11-29 14:56:43 +00:00
Calle Wilund
97f6da0c3e commitlog: Ensure failed-to-create-segment is re-deleted
Fixes #9343

If we fail in allocate_segment_ex, we should push the file opened/created
to the delete set to ensure we reclaim the disk space. We should also
ensure that if we did not recycle a file in delete_segments, we still
wake up any recycle waiters iff we made a file delete instead.

Included a small unit test.
2021-11-29 14:51:39 +00:00
Calle Wilund
c229fe9694 commitlog::allocate_segment_ex: Don't re-throw out of function
Fixes #9342

commitlog_error_handler rethrows. But we want to not. And run post-handler
cleanup (co_await)
2021-11-29 14:51:39 +00:00
Tomasz Grabiec
ee1ca8ae4d lsa: Add sanity checks around lsa_buffer operations
We've been observing hard to explain crashes recently around
lsa_buffer destruction, where the containing segment is absent in
_segment_descs which causes log_heap::adjust_up to abort. Add more
checks to catch certain impossible senarios which can lead to this
sooner.

Refs #9192.
Message-Id: <20211116122346.814437-1-tgrabiec@scylladb.com>

(cherry picked from commit bf6898a5a0)
2021-11-24 15:17:37 +01:00
Tomasz Grabiec
6bfd322e3b lsa: Mark compact_segment_locked() as noexcept
We cannot recover from a failure in this method. The implementation
makes sure it never happens. Invariants will be broken if this
throws. Detect violations early by marking as noexcept.

We could make it exception safe and try to leave the data structures
in a consistent state but the reclaimer cannot make progress if this throws, so
it's pointless.

Refs #9192
Message-Id: <20211116122019.813418-1-tgrabiec@scylladb.com>

(cherry picked from commit 4d627affc3)
2021-11-24 15:17:35 +01:00
Tomasz Grabiec
afc18d5070 cql: Fix missing data in indexed queries with base table short reads
Indexed queries are using paging over the materialized view
table. Results of the view read are then used to issue reads of the
base table. If base table reads are short reads, the page is returned
to the user and paging state is adjusted accordingly so that when
paging is resumed it will query the view starting from the row
corresponding to the next row in the base which was not yet
returned. However, paging state's "remaining" count was not reset, so
if the view read was exhausted the reading will stop even though the
base table read was short.

Fix by restoring the "remaining" count when adjusting the paging state
on short read.

Tests:

  - index_with_paging_test
  - secondary_index_test

Fixes #9198
Message-Id: <20210818131840.1160267-1-tgrabiec@scylladb.com>

(cherry picked from commit 1e4da2dcce)
2021-11-23 11:22:00 +02:00
Tomasz Grabiec
2ec22c2404 sstables: partition_index_cache: Avoid abort due to benign bad_alloc inside allocating section
shared_promise::get_shared_future() is marked noexcept, but can
allocate memory. It is invoked by sstable partition index cache inside
an allocating section, which means that allocations can throw
bad_alloc even though there is memory to reclaim, so under normal
conditions.

Fix by allocating the shared_promise in a stable memory, in the
standard allocator via lw_shared_ptr<>, so that it can be accessed outside
allocating section.

Fixes #9666

Tests:

  - build/dev/test/boost/sstable_partition_index_cache_test

Message-Id: <20211122165100.1606854-1-tgrabiec@scylladb.com>
(cherry picked from commit 1d84bc6c3b)
2021-11-23 11:21:27 +02:00
Avi Kivity
19da778271 Merge "Run gossiper message handlers in a gate" from Pavel E
"
When gossiper processes its messages in the background some of
the continuations may pop up after the gossiper is shutdown.
This, in turn, may result in unwanted code to be executed when
it doesn't expect.

In particular, storage_service notification hooks may try to
update system keyspace (with "fresh" peer info/state/tokens/etc).
This update doesn't work after drain because drain shuts down
commitlog. The intention was that gossiper did _not_ notify
anyone after drain, because it's shut down during drain too.
But since there are background continuations left, it's not
working as expected.

refs: #9567
tests: unit(dev), dtest.concurrent_schema_changes.snapshot(dev)
"

* 'br-gossiper-background-messages-2' of https://github.com/xemul/scylla:
  gossiper: Guard background processing with gate
  gossiper: Helper for background messaging processing

(cherry picked from commit 9e2b6176a2)
2021-11-19 07:25:26 +02:00
Avi Kivity
cbd4c13ba6 Merge 'Revert "scylla_util.py: return bool value on systemd_unit.is_active()"' from Takuya ASADA
On scylla_unit.py, we provide `systemd_unit.is_active()` to return `systemctl is-active` output.
When we introduced systemd_unit class, we just returned `systemctl is-active` output as string, but we changed the return value to bool after that (2545d7fd43).
This was because `if unit.is_active():` always becomes True even it returns "failed" or "inactive", to avoid such scripting bug.
However, probably this was mistake.
Because systemd unit state is not 2 state, like "start" / "stop", there are many state.

And we already using multiple unit state ("activating", "failed", "inactive", "active") in our Cloud image login prompt:
https://github.com/scylladb/scylla-machine-image/blob/next/common/scylla_login#L135
After we merged 2545d7fd43, the login prompt is broken, because it does not return string as script expected (https://github.com/scylladb/scylla-machine-image/issues/241).

I think we should revert 2545d7fd43, it should return exactly same value as `systemctl is-active` says.

Fixes #9627
Fixes scylladb/scylla-machine-image#241

Closes #9628

* github.com:scylladb/scylla:
  scylla_ntp_setup: use string in systemd_unit.is_active()
  Revert "scylla_util.py: return bool value on systemd_unit.is_active()"

(cherry picked from commit c17101604f)
2021-11-18 11:44:11 +02:00
Pavel Emelyanov
338871802d generic_server: Keep server alive during conn background processing
There's at least one tiny race in generic_server code. The trailing
.handle_exception after the conn->process() captures this, but since the
whole continuation chain happens in the background, that this can be
released thus causing the whole lambda to execute on freed generic_server
instance. This, in turn, is not nice because captured this is used to get
a _logger from.

The fix is based on the observation that all connections pin the server
in memory until all of them (connections) are destructed. Said that, to
keep the server alive in the aforementioned lambda it's enough to make
sure the conn variable (it's lw_shared_ptr on the connection) is alive in
it. Not to generate a bunch of tiny continuations with identical set of
captures -- tail the single .then_wrapped() one and do whatever is needed
to wrap up the connection processing in it.

tests: unit(dev)
fixes: #9316

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20211115105818.11348-1-xemul@scylladb.com>
(cherry picked from commit ba16318457)
2021-11-17 10:21:11 +02:00
Yaron Kaikov
8b5b1b8af6 dist/docker/debian/build_docker.sh: debian version fix for rc releases
When building a docker we relay on `VERSION` value from
`SCYLLA-VERSION-GEN` . For `rc` releases only there is a different
between the configured version (X.X.rcX) and the actualy debian package
we generate (X.X~rcX)

Using a similar solution as i did in dcb10374a5

Fixes: #9616

Closes #9617

(cherry picked from commit 060a91431d)
2021-11-12 20:07:19 +02:00
Takuya ASADA
ea89eff95d dist/docker: fix bashrc filename for Ubuntu
For Debian variants, correct filename is /etc/bash.bashrc.

Fixes #9588

Closes #9589

(cherry picked from commit 201a97e4a4)
2021-11-10 14:25:27 +02:00
Michał Radwański
96421e7779 memtable: fix gcc function argument evaluation order induced use after move
clang evaluates function arguments from left to right, while gcc does so
in reverse. Therefore, this code can be correct on clang and incorrect
on gcc:
```
f(x.sth(), std::move(x))
```

This patch fixes one such instance of this bug, in memtable.cc.

Fixes #9605.

Closes #9606

(cherry picked from commit eff392073c)
2021-11-10 08:58:09 +02:00
Botond Dénes
142336ca53 mutation_writer/feed_writer: don't drop readers with small amount of content
Due to an error in transforming the above routine, readers who have <= a
buffer worth of content are dropped without consuming them.
This is due to the outer consume loop being conditioned on
`is_end_of_stream()`, which will be set for readers that eagerly
pre-fill their buffer and also have no more data then what is in their
buffer.
Change the condition to also check for `is_buffer_empty()` and only drop
the reader if both of these are true.

Fixes: #9594

Tests: unit(mutation_writer_test --repeat=200, dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20211108092923.104504-1-bdenes@scylladb.com>
(cherry picked from commit 4b6c0fe592)
2021-11-09 14:13:21 +02:00
Calle Wilund
492f12248c commitlog: Add explicit track var for "wasted space" to avoid double counting
Refs #9331

In segment::close() we add space to managers "wasted" counter. In destructor,
if we can cleanly delete/recycle the file we remove it. However, if we never
went through close (shutdown - ok, exception in batch_cycle - not ok), we can
end up subtracting numbers that were never added in the first place.
Just keep track of the bytes added in a var.

Observed behaviour in above issue is timeouts in batch_cycle, where we
declare the segment closed early (because we cannot add anything more safely
- chunks could get partial/misplaced). Exception will propagate to caller(s),
but the segment will not go through actual close() call -> destructor should
not assume such.

Closes #9598

(cherry picked from commit 3929b7da1f)
2021-11-09 14:07:04 +02:00
Yaron Kaikov
7eb7a0e5fe release: prepare for 4.6.rc0 2021-11-08 09:18:26 +02:00
26 changed files with 481 additions and 72 deletions

View File

@@ -60,7 +60,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=4.6.dev
VERSION=4.6.rc2
if test -f version
then

View File

@@ -1017,18 +1017,16 @@ future<executor::request_return_type> executor::update_table(client_state& clien
_stats.api_operations.update_table++;
elogger.trace("Updating table {}", request);
std::string table_name = get_table_name(request);
if (table_name.find(INTERNAL_TABLE_PREFIX) == 0) {
schema_ptr tab = get_table(_proxy, request);
// the ugly but harmless conversion to string_view here is because
// Seastar's sstring is missing a find(std::string_view) :-()
if (std::string_view(tab->cf_name()).find(INTERNAL_TABLE_PREFIX) == 0) {
return make_ready_future<request_return_type>(api_error::validation(
format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX)));
}
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
tracing::add_table_name(trace_state, keyspace_name, table_name);
tracing::add_table_name(trace_state, tab->ks_name(), tab->cf_name());
auto& db = _proxy.get_db().local();
auto& cf = db.find_column_family(keyspace_name, table_name);
schema_builder builder(cf.schema());
schema_builder builder(tab);
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {

View File

@@ -593,8 +593,8 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
clogger.trace("csm {}: insert dummy at {}", fmt::ptr(this), _lower_bound);
auto it = with_allocator(_lsa_manager.region().allocator(), [&] {
auto& rows = _snp->version()->partition().mutable_clustered_rows();
auto new_entry = current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no);
return rows.insert_before(_next_row.get_iterator_in_latest_version(), *new_entry);
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no));
return rows.insert_before(_next_row.get_iterator_in_latest_version(), std::move(new_entry));
});
_snp->tracker()->insert(*it);
_last_row = partition_snapshot_row_weakref(*_snp, it, true);

View File

@@ -1634,7 +1634,7 @@ future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader reader, co
while (auto mf_opt = co_await reader()) {
if (cdata.is_stop_requested()) [[unlikely]] {
// Compaction manager will catch this exception and re-schedule the compaction.
co_return coroutine::make_exception(compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested));
throw compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested);
}
const auto& mf = *mf_opt;

View File

@@ -995,6 +995,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
}
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
paging_state_copy->set_remaining(internal_paging_size);
paging_state_copy->set_partition_key(std::move(index_pk));
paging_state_copy->set_clustering_key(std::move(index_ck));
return std::move(paging_state_copy);

View File

@@ -428,6 +428,8 @@ private:
void abort_recycled_list(std::exception_ptr);
void abort_deletion_promise(std::exception_ptr);
future<> recalculate_footprint();
future<> rename_file(sstring, sstring) const;
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
@@ -444,6 +446,7 @@ private:
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
seastar::semaphore _reserve_recalculation_guard;
};
template<typename T>
@@ -512,6 +515,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
uint64_t _file_pos = 0;
uint64_t _flush_pos = 0;
uint64_t _size_on_disk = 0;
uint64_t _waste = 0;
size_t _alignment;
@@ -598,7 +602,7 @@ public:
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.active_size_on_disk -= file_position();
_segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position());
_segment_manager->totals.wasted_size_on_disk -= _waste;
_segment_manager->add_file_to_delete(_file_name, _desc);
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
clogger.warn("Segment {} is dirty and is left on disk.", *this);
@@ -725,7 +729,8 @@ public:
auto s = co_await sync();
co_await flush();
co_await terminate();
_segment_manager->totals.wasted_size_on_disk += (_size_on_disk - file_position());
_waste = _size_on_disk - file_position();
_segment_manager->totals.wasted_size_on_disk += _waste;
co_return s;
}
future<sseg_ptr> do_flush(uint64_t pos) {
@@ -1223,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
, _reserve_recalculation_guard(1)
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
@@ -1248,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
}
try {
gate::holder g(_gate);
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
if (_reserve_segments.full()) {
// can happen if we recalculate
continue;
}
// note: if we were strict with disk size, we would refuse to do this
// unless disk footprint is lower than threshold. but we cannot (yet?)
// trust that flush logic will absolutely free up an existing
@@ -1519,7 +1530,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) {
for (auto * ext : cfg.extensions->commitlog_file_extensions()) {
auto nf = co_await ext->wrap_file(std::move(filename), f, flags);
auto nf = co_await ext->wrap_file(filename, f, flags);
if (nf) {
f = std::move(nf);
align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment();
@@ -1529,13 +1540,17 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
f = make_checked_file(commit_error_handler, std::move(f));
} catch (...) {
ep = std::current_exception();
commit_error_handler(ep);
try {
commit_error_handler(std::current_exception());
} catch (...) {
ep = std::current_exception();
}
}
if (ep && f) {
co_await f.close();
}
if (ep) {
add_file_to_delete(filename, d);
co_return coroutine::exception(std::move(ep));
}
@@ -1865,6 +1880,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
std::exception_ptr recycle_error;
size_t num_deleted = 0;
bool except = false;
while (!files.empty()) {
auto filename = std::move(files.back());
files.pop_back();
@@ -1914,8 +1931,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
}
}
co_await delete_file(filename);
++num_deleted;
} catch (...) {
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
except = true;
}
}
@@ -1928,6 +1947,16 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
if (recycle_error && _recycled_segments.empty()) {
abort_recycled_list(recycle_error);
}
// If recycle failed and turned into a delete, we should fake-wakeup waiters
// since we might still have cleaned up disk space.
if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) {
abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files")));
}
// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
if (except) {
co_await recalculate_footprint();
}
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
@@ -1942,6 +1971,63 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e
std::exchange(_disk_deletions, {}).set_exception(ep);
}
future<> db::commitlog::segment_manager::recalculate_footprint() {
try {
co_await do_pending_deletes();
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
auto segments_copy = _segments;
std::vector<sseg_ptr> reserves;
std::vector<sstring> recycles;
// this causes haywire things while we steal stuff, but...
while (!_reserve_segments.empty()) {
reserves.push_back(_reserve_segments.pop());
}
while (!_recycled_segments.empty()) {
recycles.push_back(_recycled_segments.pop());
}
// first, guesstimate sizes
uint64_t recycle_size = recycles.size() * max_size;
auto old = totals.total_size_on_disk;
totals.total_size_on_disk = recycle_size;
for (auto& s : _segments) {
totals.total_size_on_disk += s->_size_on_disk;
}
for (auto& s : reserves) {
totals.total_size_on_disk += s->_size_on_disk;
}
// now we need to adjust the actual sizes of recycled files
uint64_t actual_recycled_size = 0;
try {
for (auto& filename : recycles) {
auto s = co_await seastar::file_size(filename);
actual_recycled_size += s;
}
} catch (...) {
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
actual_recycled_size = recycle_size; // best we got
}
for (auto&& filename : recycles) {
_recycled_segments.push(std::move(filename));
}
for (auto&& s : reserves) {
_reserve_segments.push(std::move(s)); // you can have it back now.
}
totals.total_size_on_disk += actual_recycled_size - recycle_size;
// pushing things to reserve/recycled queues will have resumed any
// waiters, so we should be done.
} catch (...) {
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
}
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftc = std::exchange(_files_to_close, {});
auto ftd = std::exchange(_files_to_delete, {});

View File

@@ -100,6 +100,7 @@ def version_compare(a, b):
def create_uuid_file(fl):
with open(args.uuid_file, 'w') as myfile:
myfile.write(str(uuid.uuid1()) + "\n")
os.fchmod(myfile, 0o644)
def sanitize_version(version):

View File

@@ -278,6 +278,66 @@ if __name__ == "__main__":
disk_properties["read_bandwidth"] = 2527296683 * nr_disks
disk_properties["write_iops"] = 156326 * nr_disks
disk_properties["write_bandwidth"] = 1063657088 * nr_disks
elif idata.instance() == "im4gn.large":
disk_properties["read_iops"] = 33943
disk_properties["read_bandwidth"] = 288433525
disk_properties["write_iops"] = 27877
disk_properties["write_bandwidth"] = 126864680
elif idata.instance() == "im4gn.xlarge":
disk_properties["read_iops"] = 68122
disk_properties["read_bandwidth"] = 576603520
disk_properties["write_iops"] = 55246
disk_properties["write_bandwidth"] = 254534954
elif idata.instance() == "im4gn.2xlarge":
disk_properties["read_iops"] = 136422
disk_properties["read_bandwidth"] = 1152663765
disk_properties["write_iops"] = 92184
disk_properties["write_bandwidth"] = 508926453
elif idata.instance() == "im4gn.4xlarge":
disk_properties["read_iops"] = 273050
disk_properties["read_bandwidth"] = 1638427264
disk_properties["write_iops"] = 92173
disk_properties["write_bandwidth"] = 1027966826
elif idata.instance() == "im4gn.8xlarge":
disk_properties["read_iops"] = 250241 * nr_disks
disk_properties["read_bandwidth"] = 1163130709 * nr_disks
disk_properties["write_iops"] = 86374 * nr_disks
disk_properties["write_bandwidth"] = 977617664 * nr_disks
elif idata.instance() == "im4gn.16xlarge":
disk_properties["read_iops"] = 273030 * nr_disks
disk_properties["read_bandwidth"] = 1638211413 * nr_disks
disk_properties["write_iops"] = 92607 * nr_disks
disk_properties["write_bandwidth"] = 1028340266 * nr_disks
elif idata.instance() == "is4gen.medium":
disk_properties["read_iops"] = 33965
disk_properties["read_bandwidth"] = 288462506
disk_properties["write_iops"] = 27876
disk_properties["write_bandwidth"] = 126954200
elif idata.instance() == "is4gen.large":
disk_properties["read_iops"] = 68131
disk_properties["read_bandwidth"] = 576654869
disk_properties["write_iops"] = 55257
disk_properties["write_bandwidth"] = 254551002
elif idata.instance() == "is4gen.xlarge":
disk_properties["read_iops"] = 136413
disk_properties["read_bandwidth"] = 1152747904
disk_properties["write_iops"] = 92180
disk_properties["write_bandwidth"] = 508889546
elif idata.instance() == "is4gen.2xlarge":
disk_properties["read_iops"] = 273038
disk_properties["read_bandwidth"] = 1628982613
disk_properties["write_iops"] = 92182
disk_properties["write_bandwidth"] = 1027983530
elif idata.instance() == "is4gen.4xlarge":
disk_properties["read_iops"] = 260493 * nr_disks
disk_properties["read_bandwidth"] = 1217396928 * nr_disks
disk_properties["write_iops"] = 83169 * nr_disks
disk_properties["write_bandwidth"] = 1000390784 * nr_disks
elif idata.instance() == "is4gen.8xlarge":
disk_properties["read_iops"] = 273021 * nr_disks
disk_properties["read_bandwidth"] = 1656354602 * nr_disks
disk_properties["write_iops"] = 92233 * nr_disks
disk_properties["write_bandwidth"] = 1028010325 * nr_disks
properties_file = open(etcdir() + "/scylla.d/io_properties.yaml", "w")
yaml.dump({ "disks": [ disk_properties ] }, properties_file, default_flow_style=False)
ioconf = open(etcdir() + "/scylla.d/io.conf", "w")

View File

@@ -66,18 +66,18 @@ if __name__ == '__main__':
target = None
if os.path.exists('/lib/systemd/systemd-timesyncd'):
if systemd_unit('systemd-timesyncd').is_active():
if systemd_unit('systemd-timesyncd').is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
target = 'systemd-timesyncd'
if shutil.which('chronyd'):
if get_chrony_unit().is_active():
if get_chrony_unit().is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
if not target:
target = 'chrony'
if shutil.which('ntpd'):
if get_ntp_unit().is_active():
if get_ntp_unit().is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
if not target:

View File

@@ -30,6 +30,8 @@ import distro
from pathlib import Path
from scylla_util import *
from subprocess import run
import distro
from pkg_resources import parse_version
if __name__ == '__main__':
if os.getuid() > 0:
@@ -117,6 +119,25 @@ if __name__ == '__main__':
pkg_install('xfsprogs')
if not shutil.which('mdadm'):
pkg_install('mdadm')
# XXX: Workaround for mdmonitor.service issue on CentOS8
if is_redhat_variant() and distro.version() == '8':
mdadm_rpm = run('rpm -q mdadm', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
match = re.match(r'^mdadm-([0-9]+\.[0-9]+-[a-zA-Z0-9]+)\.', mdadm_rpm)
mdadm_version = match.group(1)
if parse_version('4.1-14') < parse_version(mdadm_version):
repo_data = '''
[BaseOS_8_3_2011]
name=CentOS8.3.2011 - Base
baseurl=http://vault.centos.org/8.3.2011/BaseOS/$basearch/os/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-centosofficial
'''[1:-1]
with open('/etc/yum.repos.d/CentOS-Vault-8.3.repo', 'w') as f:
f.write(repo_data)
run('dnf downgrade --enablerepo=BaseOS_8_3_2011 -y mdadm', shell=True, check=True)
run('dnf install -y python3-dnf-plugin-versionlock', shell=True, check=True)
run('dnf versionlock add mdadm', shell=True, check=True)
try:
md_service = systemd_unit('mdmonitor.service')
except SystemdException:

View File

@@ -370,6 +370,10 @@ if __name__ == '__main__':
version_check = interactive_ask_service('Do you want to enable Scylla to check if there is a newer version of Scylla available?', 'Yes - start the Scylla-housekeeping service to check for a newer version. This check runs periodically. No - skips this step.', version_check)
args.no_version_check = not version_check
if version_check:
cfg = sysconfig_parser(sysconfdir_p() / 'scylla-housekeeping')
repo_files = cfg.get('REPO_FILES')
for f in glob.glob(repo_files):
os.chmod(f, 0o644)
with open('/etc/scylla.d/housekeeping.cfg', 'w') as f:
f.write('[housekeeping]\ncheck-version: True\n')
os.chmod('/etc/scylla.d/housekeeping.cfg', 0o644)

View File

@@ -674,7 +674,7 @@ class aws_instance:
return self._type.split(".")[0]
def is_supported_instance_class(self):
if self.instance_class() in ['i2', 'i3', 'i3en', 'c5d', 'm5d', 'm5ad', 'r5d', 'z1d', 'c6gd', 'm6gd', 'r6gd', 'x2gd']:
if self.instance_class() in ['i2', 'i3', 'i3en', 'c5d', 'm5d', 'm5ad', 'r5d', 'z1d', 'c6gd', 'm6gd', 'r6gd', 'x2gd', 'im4gn', 'is4gen']:
return True
return False
@@ -683,7 +683,7 @@ class aws_instance:
instance_size = self.instance_size()
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
return 'ixgbevf'
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5b', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d', 'c6g', 'c6gd', 'm6g', 'm6gd', 't4g', 'r6g', 'r6gd', 'x2gd']:
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5b', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d', 'c6g', 'c6gd', 'm6g', 'm6gd', 't4g', 'r6g', 'r6gd', 'x2gd', 'im4gn', 'is4gen']:
return 'ena'
if instance_class == 'm4':
if instance_size == '16xlarge':
@@ -1041,7 +1041,7 @@ class systemd_unit:
return run('systemctl {} disable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def is_active(self):
return True if run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip() == 'active' else False
return run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip()
def mask(self):
return run('systemctl {} mask {}'.format(self.ctlparam, self._unit), shell=True, check=True)

View File

@@ -25,6 +25,10 @@ product="$(<build/SCYLLA-PRODUCT-FILE)"
version="$(<build/SCYLLA-VERSION-FILE)"
release="$(<build/SCYLLA-RELEASE-FILE)"
if [[ "$version" = *rc* ]]; then
version=$(echo $version |sed 's/\(.*\)\.)*/\1~/')
fi
mode="release"
if uname -m | grep x86_64 ; then
@@ -93,7 +97,7 @@ run apt-get -y install hostname supervisor openssh-server openssh-client openjdk
run locale-gen en_US.UTF-8
run bash -ec "dpkg -i packages/*.deb"
run apt-get -y clean all
run bash -ec "cat /scylla_bashrc >> /etc/bashrc"
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
run mkdir -p /etc/supervisor.conf.d
run mkdir -p /var/log/scylla
run chown -R scylla:scylla /var/lib/scylla

View File

@@ -184,14 +184,18 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
_logger.info("exception while advertising new connection: {}", std::current_exception());
}
// Block while monitoring for lifetime/errors.
return conn->process().finally([this, conn] {
return unadvertise_connection(conn);
}).handle_exception([this] (std::exception_ptr ep) {
if (is_broken_pipe_or_connection_reset(ep)) {
// expected if another side closes a connection or we're shutting down
return;
return conn->process().then_wrapped([this, conn] (auto f) {
try {
f.get();
} catch (...) {
auto ep = std::current_exception();
if (!is_broken_pipe_or_connection_reset(ep)) {
// some exceptions are expected if another side closes a connection
// or we're shutting down
_logger.info("exception while processing connection: {}", ep);
}
}
_logger.info("exception while processing connection: {}", ep);
return unadvertise_connection(conn);
});
});
return stop_iteration::no;

View File

@@ -477,49 +477,42 @@ gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request requ
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
}
rpc::no_wait_type gossiper::background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn) {
(void)with_gate(_background_msg, [this, type = std::move(type), fn = std::move(fn)] () mutable {
return container().invoke_on(0, std::move(fn)).handle_exception([type = std::move(type)] (auto ep) {
logger.warn("Failed to handle {}: {}", type, ep);
});
});
return messaging_service::no_wait();
}
void gossiper::init_messaging_service_handler() {
_messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_SYN", [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_syn_msg(from, std::move(syn_msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_ACK", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_ACK2", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack2_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return handle_echo_msg(from, generation_number_opt);
});
_messaging.register_gossip_shutdown([this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
// In a new fiber.
(void)container().invoke_on(0, [from, generation_number_opt] (gms::gossiper& gossiper) {
return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
return gossiper.handle_shutdown_msg(from, generation_number_opt);
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
@@ -2178,6 +2171,9 @@ future<> gossiper::start() {
}
future<> gossiper::shutdown() {
if (!_background_msg.is_closed()) {
co_await _background_msg.close();
}
if (this_shard_id() == 0) {
co_await do_stop_gossiping();
}

View File

@@ -41,7 +41,9 @@
#include "unimplemented.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/print.hh>
#include <seastar/rpc/rpc_types.hh>
#include "utils/atomic_vector.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
@@ -138,12 +140,16 @@ private:
bool _enabled = false;
semaphore _callback_running{1};
semaphore _apply_state_locally_semaphore{100};
seastar::gate _background_msg;
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
bool _advertise_myself = true;
// Map ip address and generation number
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
future<> _failure_detector_loop_done{make_ready_future<>()} ;
rpc::no_wait_type background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
public:
// Get current generation number for the given nodes
future<std::unordered_map<gms::inet_address, int32_t>>

View File

@@ -613,7 +613,8 @@ static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
} else {
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
schema_ptr snp_schema = snp->schema();
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
}
}

View File

@@ -54,7 +54,7 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
auto rd = std::move(rd_ref);
std::exception_ptr ex;
try {
while (!rd.is_end_of_stream()) {
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
co_await rd.fill_buffer();
while (!rd.is_buffer_empty()) {
co_await rd.pop_mutation_fragment().consume(wr);

View File

@@ -411,11 +411,11 @@ public:
} else {
// Copy row from older version because rows in evictable versions must
// hold values which are independently complete to be consistent on eviction.
auto e = current_allocator().construct<rows_entry>(_schema, *_current_row[0].it);
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, *_current_row[0].it));
e->set_continuous(latest_i && latest_i->continuous());
_snp.tracker()->insert(*e);
rows.insert_before(latest_i, *e);
return {*e, true};
auto e_i = rows.insert_before(latest_i, std::move(e));
return ensure_result{*e_i, true};
}
}
@@ -447,11 +447,11 @@ public:
}
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
auto latest_i = get_iterator_in_latest_version();
auto e = current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
is_continuous(latest_i && latest_i->continuous()));
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
is_continuous(latest_i && latest_i->continuous())));
_snp.tracker()->insert(*e);
rows.insert_before(latest_i, *e);
return ensure_result{*e, true};
auto e_i = rows.insert_before(latest_i, std::move(e));
return ensure_result{*e_i, true};
}
// Brings the entry pointed to by the cursor to the front of the LRU

View File

@@ -3670,7 +3670,7 @@ shared_ptr<abort_source> node_ops_meta_data::get_abort_source() {
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3680,7 +3680,7 @@ void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
void storage_service::node_ops_done(utils::UUID ops_uuid) {
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3691,7 +3691,7 @@ void storage_service::node_ops_done(utils::UUID ops_uuid) {
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;

View File

@@ -49,12 +49,13 @@ private:
public:
partition_index_cache* _parent;
key_type _key;
std::variant<shared_promise<>, partition_index_page> _page;
std::variant<lw_shared_ptr<shared_promise<>>, partition_index_page> _page;
size_t _size_in_allocator = 0;
public:
entry(partition_index_cache* parent, key_type key)
: _parent(parent)
, _key(key)
, _page(make_lw_shared<shared_promise<>>())
{ }
void set_page(partition_index_page&& page) noexcept {
@@ -76,7 +77,7 @@ private:
// Always returns the same value for a given state of _page.
size_t size_in_allocator() const { return _size_in_allocator; }
shared_promise<>& promise() { return std::get<shared_promise<>>(_page); }
lw_shared_ptr<shared_promise<>> promise() { return std::get<lw_shared_ptr<shared_promise<>>>(_page); }
bool ready() const { return std::holds_alternative<partition_index_page>(_page); }
partition_index_page& page() { return std::get<partition_index_page>(_page); }
const partition_index_page& page() const { return std::get<partition_index_page>(_page); }
@@ -207,9 +208,7 @@ public:
return make_ready_future<entry_ptr>(std::move(ptr));
} else {
++_shard_stats.blocks;
return _as(_region, [ptr] () mutable {
return ptr.get_entry().promise().get_shared_future();
}).then([ptr] () mutable {
return ptr.get_entry().promise()->get_shared_future().then([ptr] () mutable {
return std::move(ptr);
});
}
@@ -238,12 +237,12 @@ public:
entry& e = ptr.get_entry();
try {
partition_index_page&& page = f.get0();
e.promise().set_value();
e.promise()->set_value();
e.set_page(std::move(page));
_shard_stats.used_bytes += e.size_in_allocator();
++_shard_stats.populations;
} catch (...) {
e.promise().set_exception(std::current_exception());
e.promise()->set_exception(std::current_exception());
with_allocator(_region.allocator(), [&] {
_cache.erase(key);
});

43
test.py
View File

@@ -291,6 +291,8 @@ class Test:
def print_summary(self):
pass
def get_junit_etree(self):
return None
def check_log(self, trim):
"""Check and trim logs and xml output for tests which have it"""
@@ -338,9 +340,36 @@ class BoostTest(UnitTest):
boost_args += ['--color_output=false']
boost_args += ['--']
self.args = boost_args + self.args
self.casename = casename
self.__junit_etree = None
def get_junit_etree(self):
def adjust_suite_name(name):
# Normalize "path/to/file.cc" to "path.to.file" to conform to
# Jenkins expectations that the suite name is a class name. ".cc"
# doesn't add any infomation. Add the mode, otherwise failures
# in different modes are indistinguishable. The "test/" prefix adds
# no information, so remove it.
import re
name = re.sub(r'^test/', '', name)
name = re.sub(r'\.cc$', '', name)
name = re.sub(r'/', '.', name)
name = f'{name}.{self.mode}'
return name
if self.__junit_etree is None:
self.__junit_etree = ET.parse(self.xmlout)
root = self.__junit_etree.getroot()
suites = root.findall('.//TestSuite')
for suite in suites:
suite.attrib['name'] = adjust_suite_name(suite.attrib['name'])
skipped = suite.findall('./TestCase[@reason="disabled"]')
for e in skipped:
suite.remove(e)
os.unlink(self.xmlout)
return self.__junit_etree
def check_log(self, trim):
ET.parse(self.xmlout)
self.get_junit_etree()
super().check_log(trim)
@@ -800,6 +829,17 @@ def write_junit_report(tmpdir, mode):
with open(junit_filename, "w") as f:
ET.ElementTree(xml_results).write(f, encoding="unicode")
def write_consolidated_boost_junit_xml(tmpdir, mode):
xml = ET.Element("TestLog")
for suite in TestSuite.suites.values():
for test in suite.tests:
if test.mode != mode:
continue
test_xml = test.get_junit_etree()
if test_xml is not None:
xml.extend(test_xml.getroot().findall('.//TestSuite'))
et = ET.ElementTree(xml)
et.write(f'{tmpdir}/{mode}/xml/boost.xunit.xml', encoding='unicode')
def open_log(tmpdir):
pathlib.Path(tmpdir).mkdir(parents=True, exist_ok=True)
@@ -839,6 +879,7 @@ async def main():
for mode in options.modes:
write_junit_report(options.tmpdir, mode)
write_consolidated_boost_junit_xml(options.tmpdir, mode)
if 'coverage' in options.modes:
coverage.generate_coverage_report("build/coverage", "tests")

View File

@@ -16,6 +16,9 @@
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
# Tests for basic table operations: CreateTable, DeleteTable, ListTables.
# Also some basic tests for UpdateTable - although UpdateTable usually
# enables more elaborate features (such as GSI or Streams) and those are
# tested elsewhere.
import pytest
from botocore.exceptions import ClientError
@@ -311,3 +314,17 @@ def test_table_sse_off(dynamodb):
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
table.delete();
# Test that trying to delete a table that doesn't exist fails in the
# appropriate way (ResourceNotFoundException)
def test_delete_table_non_existent(dynamodb, test_table):
client = dynamodb.meta.client
with pytest.raises(ClientError, match='ResourceNotFoundException'):
client.delete_table(TableName=random_string(20))
# Test that trying to update a table that doesn't exist fails in the
# appropriate way (ResourceNotFoundException)
def test_update_table_non_existent(dynamodb, test_table):
client = dynamodb.meta.client
with pytest.raises(ClientError, match='ResourceNotFoundException'):
client.update_table(TableName=random_string(20), BillingMode='PAY_PER_REQUEST')

View File

@@ -44,7 +44,9 @@
#include "test/lib/tmpdir.hh"
#include "db/commitlog/commitlog.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog_extensions.hh"
#include "db/commitlog/rp_set.hh"
#include "db/extensions.hh"
#include "log.hh"
#include "service/priority_manager.hh"
#include "test/lib/exception_utils.hh"
@@ -947,3 +949,113 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) {
co_await log.clear();
}
}
static future<> do_test_exception_in_allocate_ex(bool do_file_delete, bool reuse = true) {
commitlog::config cfg;
constexpr auto max_size_mb = 1;
cfg.commitlog_segment_size_in_mb = max_size_mb;
cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count;
cfg.commitlog_sync_period_in_ms = 10;
cfg.reuse_segments = reuse;
cfg.allow_going_over_size_limit = false; // #9348 - now can enforce size limit always
cfg.use_o_dsync = true; // make sure we pre-allocate.
// not using cl_test, because we need to be able to abandon
// the log.
tmpdir tmp;
cfg.commit_log_location = tmp.path().string();
class myfail : public std::exception {
public:
using std::exception::exception;
};
struct myext: public db::commitlog_file_extension {
public:
bool fail = false;
bool thrown = false;
bool do_file_delete;
myext(bool dd)
: do_file_delete(dd)
{}
seastar::future<seastar::file> wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override {
if (fail && !thrown) {
thrown = true;
if (do_file_delete) {
co_await f.close();
co_await seastar::remove_file(filename);
}
throw myfail{};
}
co_return f;
}
seastar::future<> before_delete(const seastar::sstring&) override {
co_return;
}
};
auto ep = std::make_unique<myext>(do_file_delete);
auto& mx = *ep;
db::extensions myexts;
myexts.add_commitlog_file_extension("hufflepuff", std::move(ep));
cfg.extensions = &myexts;
auto log = co_await commitlog::create_commitlog(cfg);
rp_set rps;
// uncomment for verbosity
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
auto uuid = utils::UUID_gen::get_time_UUID();
auto size = log.max_record_size();
auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) {
log.discard_completed_segments(id, rps);
mx.fail = true;
});
try {
while (!mx.thrown) {
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.fill('1', size);
});
rps.put(std::move(h));
}
} catch (...) {
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
}
co_await log.shutdown();
co_await log.clear();
}
/**
* Test generating an exception in segment file allocation
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) {
co_await do_test_exception_in_allocate_ex(false);
}
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_no_recycle) {
co_await do_test_exception_in_allocate_ex(false, false);
}
/**
* Test generating an exception in segment file allocation, but also
* delete the file, which in turn should cause follow-up exceptions
* in cleanup delete. Which CL should handle
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file) {
co_await do_test_exception_in_allocate_ex(true, false);
}
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file_no_recycle) {
co_await do_test_exception_in_allocate_ex(true);
}

View File

@@ -22,6 +22,8 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
SEASTAR_TEST_CASE(test_index_with_paging) {
@@ -56,3 +58,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}

View File

@@ -1395,6 +1395,8 @@ private:
}
lsa_buffer alloc_buf(size_t buf_size) {
// Note: Can be re-entered from allocation sites below due to memory reclamation which
// invokes segment compaction.
static_assert(segment::size % buf_align == 0);
if (buf_size > segment::size) {
throw_with_backtrace<std::runtime_error>(format("Buffer size {} too large", buf_size));
@@ -1447,6 +1449,7 @@ private:
if (seg != _buf_active) {
if (desc.is_empty()) {
assert(desc._buf_pointers.empty());
_segment_descs.erase(desc);
desc._buf_pointers = std::vector<entangled>();
free_segment(seg, desc);
@@ -1457,7 +1460,7 @@ private:
}
}
void compact_segment_locked(segment* seg, segment_descriptor& desc) {
void compact_segment_locked(segment* seg, segment_descriptor& desc) noexcept {
auto seg_occupancy = desc.occupancy();
llogger.debug("Compacting segment {} from region {}, {}", fmt::ptr(seg), id(), seg_occupancy);
@@ -1472,6 +1475,7 @@ private:
for (entangled& e : _buf_ptrs_for_compact_segment) {
if (e) {
lsa_buffer* old_ptr = e.get(&lsa_buffer::_link);
assert(&desc == old_ptr->_desc);
lsa_buffer dst = alloc_buf(old_ptr->_size);
memcpy(dst._buf, old_ptr->_buf, dst._size);
old_ptr->_link = std::move(dst._link);
@@ -1502,6 +1506,10 @@ private:
std::vector<entangled> ptrs;
ptrs.reserve(segment::size / buf_align);
segment* new_active = new_segment();
if (_buf_active) [[unlikely]] {
// Memory allocation above could allocate active buffer during segment compaction.
close_buf_active();
}
assert((uintptr_t)new_active->at(0) % buf_align == 0);
segment_descriptor& desc = shard_segment_pool.descriptor(new_active);
desc._buf_pointers = std::move(ptrs);