Compare commits

...

49 Commits

Author SHA1 Message Date
Yaron Kaikov
5694ec189f release: prepare for 4.6.rc5 2022-02-03 16:19:46 +02:00
Calle Wilund
34d470967a commitlog: Fix double clearing of _segment_allocating shared_future.
Fixes #10020

Previous fix 445e1d3 tried to close one double invocation,  but added
another, since it failed to ensure all potential nullings of the opt
shared_future happened before a new allocator could reset it.

This simplifies the code by making clearing the shared_future a
pre-requisite for resolving its contents (as read by waiters).

Also removes any need for try-catch etc.

Closes #10024

(cherry picked from commit 1e66043412)
2022-02-03 07:43:18 +02:00
Calle Wilund
61db571a44 commitlog: Ensure we never have more than one new_segment call at a time
Refs #9896

Found by @eliransin. Call to new_segment was wrapped in with_timeout.
This means that if primary caller timed out, we would leave new_segment
calls running, but potentially issue new ones for next caller.

This could lead to reserve segment queue being read simultanously. And
it is not what we want.

Change to always use the shared_future wait, all callers, and clear it
only on result (exception or segment)

Closes #10001

(cherry picked from commit 445e1d3e41)
2022-02-01 09:10:27 +02:00
Tomasz Grabiec
5b5a300a9e util: cached_file: Fix corruption after memory reclamation was triggered from population
If memory reclamation is triggered inside _cache.emplace(), the _cache
btree can get corrupted. Reclaimers erase from it, and emplace()
assumes that the tree is not modified during its execution. It first
locates the target node and then does memory allocation.

Fix by running emplace() under allocating section, which disables
memory reclamation.

The bug manifests with assert failures, e.g:

./utils/bptree.hh:1699: void bplus::node<unsigned long, cached_file::cached_page, cached_file::page_idx_less_comparator, 12, bplus::key_search::linear, bplus::with_debug::no>::refill(Less) [Key = unsigned long, T = cached_file::cached_page, Less = cached_file::page_idx_less_comparator, NodeSize = 12, Search = bplus::key_search::linear, Debug = bplus::with_debug::no]: Assertion `p._kids[i].n == this' failed.

Fixes #9915

Message-Id: <20220130175639.15258-1-tgrabiec@scylladb.com>
(cherry picked from commit b734615f51)
2022-01-31 01:24:47 +02:00
Avi Kivity
148a65d0d6 Update seastar submodule (gratuitous exceptions on allocation failure)
* seastar a189cdc45d...a375681303 (1):
  > core: memory: Avoid current_backtrace() on alloc failure when logging suppressed

Fixes #9982.
2022-01-30 20:02:24 +02:00
Avi Kivity
e3ad14d55f Point seastar submodule at scylla-seastar.git
This allows us to backport fixes to seastar selectively.
2022-01-30 20:01:12 +02:00
Calle Wilund
2b506c2d4a commitlog: Ensure we don't run continuation (task switch) with queues modified
Fixes #9955

In #9348 we handled the problem of failing to delete segment files on disk, and
the need to recompute disk footprint to keep data flow consistent across intermittent
failures. However, because _reserve_segments and _recycled_segments are queues, we
have to empty them to inspect the contents. One would think it is ok for these
queues to be empty for a while, whilst we do some recaclulating, including
disk listing -> continuation switching. But then one (i.e. I) misses the fact
that these queues use the pop_eventually mechanism, which does _not_ handle
a scenario where we push something into an empty queue, thus triggering the
future that resumes a waiting task, but then pop the element immediately, before
the waiting task is run. In fact, _iff_ one does this, not only will things break,
they will in fact start creating undefined behaviour, because the underlying
std::queue<T, circular_buffer> will _not_ do any bounds checks on the pop/push
operations -> we will pop an empty queue, immediately making it non-empty, but
using undefined memory (with luck null/zeroes).

Strictly speakging, seastar::queue::pop_eventually should be fixed to handle
the scenario, but nontheless we can fix the usage here as well, by simply copy
objects and do the calculation "in background" while we potentially start
popping queue again.

Closes #9966

(cherry picked from commit 43f51e9639)
2022-01-27 10:24:03 +02:00
Avi Kivity
50aad1c668 Merge 'scylla_raid_setup: use mdmonitor only when RAID level > 0' from Takuya ASADA
We found that monitor mode of mdadm does not work on RAID0, and it is
not a bug, expected behavior according to RHEL developer.
Therefore, we should stop enabling mdmonitor when RAID0 is specified.

Fixes #9540

----

This reverts 0d8f932 and introduce correct fix.

Closes #9970

* github.com:scylladb/scylla:
  scylla_raid_setup: use mdmonitor only when RAID level > 0
  Revert "scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8"

(cherry picked from commit df22396a34)
2022-01-27 10:21:25 +02:00
Yaron Kaikov
7bf3f37cd1 release: prepare for 4.6.rc4 2022-01-23 10:44:09 +02:00
Botond Dénes
0f7f8585f2 reader_permit: release_base_resources(): also update _resources
If the permit was admitted, _base_resources was already accounted in
_resource and therefore has to be deducted from it, otherwise the permit
will think it leaked some resources on destruction.

Test:
dtest(repair_additional_test.py.test_repair_one_missing_row_diff_shard_count)

Refs: #9751
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20220119132550.532073-1-bdenes@scylladb.com>
(cherry picked from commit a65b38a9f7)
2022-01-20 18:39:25 +02:00
Pavel Emelyanov
2c65c4a569 Merge 'db: range_tombstone_list: Deoverlap empty range tombstones' from Tomasz Grabiec
Appending an empty range adjacent to an existing range tombstone would
not deoverlap (by dropping the empty range tombstone) resulting in
different (non canoncial) result depending on the order of appending.

Suppose that range tombstone [a, b] covers range tombstone [x, x), and [a, x) and [x, b) are range tombstones which correspond to [a, b] split around position x.

Appending [a, x) then [x, b) then [x, x) would give [a, b)
Appending [a, x) then [x, x) then [x, b) would give [a, x), [x, x), [x, b)

The fix is to drop empty range tombstones in range_tombstone_list so that the result is canonical.

Fixes #9661

Closes #9764

* github.com:scylladb/scylla:
  range_tombstone_list: Deoverlap adjacent empty ranges
  range_tombstone_list: Convert to work in terms of position_in_partition

(cherry picked from commit b2a62d2b59)
2022-01-20 12:35:21 +02:00
Avi Kivity
f85cd289bc Merge "repair: make sure there is one permit per repair with count res" from Botond
"
Repair obtains a permit for each repair-meta instance it creates. This
permit is supposed to track all resources consumed by that repair as
well as ensure concurrency limit is respected. However when the
non-local reader path is used (shard config of master != shard config of
follower), a second permit will be obtained -- for the shard reader of
the multishard reader. This creates a situation where the repair-meta's
permit can block the shard permit, creating a deadlock situation.
This patch solves this by dropping the count resource on the
repair-meta's permit when a non-local reader path is executed -- that is
a multishard reader is created.

Fixes: #9751
"

* 'repair-double-permit-block/v4' of https://github.com/denesb/scylla:
  repair: make sure there is one permit per repair with count res
  reader_permit: add release_base_resource()

(cherry picked from commit 52b7778ae6)
2022-01-17 16:02:55 +02:00
Beni Peled
5e661af9a4 release: prepare for 4.6.rc3 2022-01-17 13:11:54 +02:00
Calle Wilund
5629b67d25 messaging_service: Make dc/rack encryption check for connection more strict
Fixes #9653

When doing an outgoing connection, in a internode_encryption=dc/rack situation
we should not use endpoint/local broadcast solely to determine if we can
downgrade a connection.

If gossip/message_service determines that we will connect to a different
address than the "official" endpoint address, we should use this to determine
association of target node, and similarly, if we bind outgoing connection
to interface != bc we need to use this to decide local one.

Note: This will effectively _disable_ internode_encryption=dc/rack on ec2 etc
until such time that gossip can give accurate info on dc/rack for "internal"
ip addresses of nodes.

(cherry picked from commit 4778770814)
2022-01-16 19:10:57 +02:00
Takuya ASADA
ad632cf7fc dist: fix scylla-housekeeping uuid file chmod call
Should use chmod() on a file, not fchmod()

Fixes #9683

Closes #9802

(cherry picked from commit 7064ae3d90)
2022-01-10 16:57:34 +02:00
Botond Dénes
ca24bebcf2 sstables/partition_index_cache: destroy entry ptr on error
The error-handling code removes the cache entry but this leads to an
assertion because the entry is still referenced by the entry pointer
instance which is returned on the normal path. To avoid this clear the
pointer on the error path and make sure there are no additional
references kept to it.

Fixes #9887

Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20220105140859.586234-2-bdenes@scylladb.com>
(cherry picked from commit 92727ac36c)
2022-01-07 21:21:44 +01:00
Calle Wilund
7dc5abb6f8 commitlog: Don't allow error_handler to swallow exception
Fixes #9798

If an exception in allocate_segment_ex is (sub)type of std::system_error,
commit_error_handler might _not_ cause throw (doh), in which case the error
handling code would forget the current exception and return an unusable
segment.

Now only used as an exception pointer replacer.

Closes #9870

(cherry picked from commit 3c02cab2f7)
2022-01-06 14:10:18 +02:00
Yaron Kaikov
e8a1cfb6f8 release: prepare for 4.6.rc2 2022-01-02 09:15:47 +02:00
Tomasz Grabiec
fc312b3021 lsa: Fix segment leak on memory reclamation during alloc_buf
alloc_buf() calls new_buf_active() when there is no active segment to
allocate a new active segment. new_buf_active() allocates memory
(e.g. a new segment) so may cause memory reclamation, which may cause
segment compaction, which may call alloc_buf() and re-enter
new_buf_active(). The first call to new_buf_active() would then
override _buf_active and cause the segment allocated during segment
compaction to be leaked.

This then causes abort when objects from the leaked segment are freed
because the segment is expected to be present in _closed_segments, but
isn't. boost::intrusive::list::erase() will fail on assertion that the
object being erased is linked.

Introduced in b5ca0eb2a2.

Fixes #9821
Fixes #9192
Fixes #9825
Fixes #9544
Fixes #9508
Refs #9573

Message-Id: <20211229201443.119812-1-tgrabiec@scylladb.com>
(cherry picked from commit 7038dc7003)
2021-12-30 18:56:28 +02:00
Nadav Har'El
7b82aaf939 alternator: fix error on UpdateTable for non-existent table
When the UpdateTable operation is called for a non-existent table, the
appropriate error is ResourceNotFoundException, but before this patch
we ran into an exception, which resulted in an ugly "internal server
error".

In this patch we use the existing get_table() function which most other
operations use, and which does all the appropriate verifications and
generates the appropriate Alternator api_error instead of letting
internal Scylla exceptions escape to the user.

This patch also includes a test for UpdateTable on a non-existent table,
which used to fail before this patch and pass afterwards. We also add a
test for DeleteTable in the same scenario, and see it didn't have this
bug. As usual, both tests pass on DynamoDB, which confirms we generate
the right error codes.

Fixes #9747.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211206181605.1182431-1-nyh@scylladb.com>
(cherry picked from commit 31eeb44d28)
2021-12-29 22:59:25 +02:00
Nadav Har'El
894a4abfae commitlog: fix missing wait for semaphore units
Commit dcc73c5d4e introduced a semaphore
for excluding concurrent recalculations - _reserve_recalculation_guard.

Unfortunately, the two places in the code which tried to take this
guard just called get_units() - which returns a future<units>, not
units - and never waited for this future to become available.

So this patch adds the missing "co_await" needed to wait for the
units to become available.

Fixes #9770.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211214122612.1462436-1-nyh@scylladb.com>
(cherry picked from commit b8786b96f4)
2021-12-29 13:18:59 +02:00
Takuya ASADA
4dcf023470 scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8
On CentOS8, mdmonitor.service does not works correctly when using
mdadm-4.1-15.el8.x86_64 and later versions.
Until we find a solution, let's pinning the package version to older one
which does not cause the issue (4.1-14.el8.x86_64).

Fixes #9540

Closes #9782

(cherry picked from commit 0d8f932f0b)
2021-12-28 11:38:04 +02:00
Benny Halevy
283788828e compaction: scrub_validate_mode_validate_reader: throw compaction_stopped_exception if stop is requested
Currently when scrub/validate is stopped (e.g. via the api),
scrub_validate_mode_validate_reader co_return:s without
closing the reader passed to it - causing a crash due
to internal error check, see #9766.

Throwing a compaction_stopped_exception rather than co_return:ing
an exception will be handled as any other exeption, including closing
the reader.

Fixes #9766

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20211213125528.2422745-1-bhalevy@scylladb.com>
(cherry picked from commit c89876c975)
2021-12-15 15:03:59 +02:00
Pavel Emelyanov
730a147ba6 row-cache: Handle exception (un)safety of rows_entry insertion
The B-tree's insert_before() is throwing operation, its caller
must account for that. When the rows_entry's collection was
switched on B-tree all the risky places were fixed by ee9e1045,
but few places went under the radar.

In the cache_flat_mutation_reader there's a place where a C-pointer
is inserted into the tree, thus potentially leaking the entry.

In the partition_snapshot_row_cursor there are two places that not
only leak the entry, but also leave it in the LRU list. The latter
it quite nasty, because those entry can be evicted, eviction code
tries to get rows_entry iterator from "this", but the hook happens
to be unattached (because insertion threw) and fails the assert.

fixes: #9728

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit ee103636ac)
2021-12-14 15:53:42 +02:00
Pavel Emelyanov
9897e83029 partition_snapshot_row_cursor: Shuffle ensure_result creation
Both places get the C-pointer on the freshly allocated rows_entry,
insert it where needed and return back the dereferenced pointer.

The C-pointer is going to become smart-pointer that would go out
of scope before return. This change prepares for that by constructing
the ensure_result from the iterator, that's returned from insertion
of the entry.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit 9fd8db318d)

Ref #9728
2021-12-14 15:52:37 +02:00
Asias He
1a9b64e6f6 storage_service: Wait for seastar::get_units in node_ops
The seastar::get_units returns a future, we have to wait for it.

Fixes #9767

Closes #9768

(cherry picked from commit 9859c76de1)
2021-12-12 18:42:20 +02:00
Takuya ASADA
49fe9e2c8e dist: allow running scylla-housekeeping with strict umask setting
To avoid failing scylla-housekeeping in strict umask environment,
we need to chmod a+r on repository file and housekeeping.uuid.

Fixes #9683

Closes #9739

(cherry picked from commit ea20f89c56)
2021-12-12 14:25:57 +02:00
Takuya ASADA
d0580c41ee dist: add support im4gn/is4gen instance on AWS
Add support next-generation, storage-optimized ARM64 instance types.

Fixes #9711

Closes #9730

(cherry picked from commit 097a6ee245)
2021-12-08 14:29:44 +02:00
Beni Peled
542394c82f release: prepare for 4.6.rc1 2021-12-08 11:08:45 +02:00
Avi Kivity
018ad3f6f4 test: refine test suite names exposed via xunit format
The test suite names seen by Jenkins are suboptimal: there is
no distinction between modes, and the ".cc" suffix of file names
is interpreted as a class name, which is converted to a tree node
that must be clicked to expand. Massage the names to remove
unnecessary information and add the mode.

Closes #9696

(cherry picked from commit ef3edcf848)

Fixes #9738.
2021-12-05 19:58:22 +02:00
Avi Kivity
9b8b7efb54 tests: consolidate boost xunit result files
The recent parallelization of boost unit tests caused an increase
in xml result files. This is challenging to Jenkins, since it
appears to use rpc-over-ssh to read the result files, and as a result
it takes more than an hour to read all result files when the Jenkins
main node is not on the same continent as the agent.

To fix this, merge the result files in test.py and leave one result
file per mode. Later we can leave one result file overall (integrating
the mode into the testsuite name), but that can wait.

Tested on a local Jenkins instance (just reading the result files,
not the entire build).

Closes #9668

(cherry picked from commit b23af15432)

Fixes #9738
2021-12-05 19:57:39 +02:00
Botond Dénes
1c3e63975f Merge 'Backport of #9348 (xceptions in commitlog::segment_manager::delete_segments could cause footprint counters to loose track)' from Calle Wilund
Backport of series to 4.6
Upstream merge commit: e2c27ee743.
Refs #9348

Closes #9702

* github.com:scylladb/scylla:
  commitlog: Recalculate footprint on delete_segment exceptions
  commitlog_test: Add test for exception in alloc w. deleted underlying file
  commitlog: Ensure failed-to-create-segment is re-deleted
  commitlog::allocate_segment_ex: Don't re-throw out of function
2021-12-02 09:22:19 +02:00
Calle Wilund
11bb03e46d commitlog: Recalculate footprint on delete_segment exceptions
Fixes #9348

If we get exceptions in delete_segments, we can, and probably will, loose
track of footprint counters. We need to recompute the used disk footprint,
otherwise we will flush too often, and even block indefinately on new_seg
iff using hard limits.
2021-11-29 14:56:48 +00:00
Calle Wilund
810e410c5d commitlog_test: Add test for exception in alloc w. deleted underlying file
Tests that we can handle exception-in-alloc cleanup if the file actually
does not exist. This however uncovers another weakness (addressed in next
patch) - that we can loose track of disk footprint here, and w. hard limits
end up waiting for disk space that never comes. Thus test does not use hard
limit.
2021-11-29 14:56:43 +00:00
Calle Wilund
97f6da0c3e commitlog: Ensure failed-to-create-segment is re-deleted
Fixes #9343

If we fail in allocate_segment_ex, we should push the file opened/created
to the delete set to ensure we reclaim the disk space. We should also
ensure that if we did not recycle a file in delete_segments, we still
wake up any recycle waiters iff we made a file delete instead.

Included a small unit test.
2021-11-29 14:51:39 +00:00
Calle Wilund
c229fe9694 commitlog::allocate_segment_ex: Don't re-throw out of function
Fixes #9342

commitlog_error_handler rethrows. But we want to not. And run post-handler
cleanup (co_await)
2021-11-29 14:51:39 +00:00
Tomasz Grabiec
ee1ca8ae4d lsa: Add sanity checks around lsa_buffer operations
We've been observing hard to explain crashes recently around
lsa_buffer destruction, where the containing segment is absent in
_segment_descs which causes log_heap::adjust_up to abort. Add more
checks to catch certain impossible senarios which can lead to this
sooner.

Refs #9192.
Message-Id: <20211116122346.814437-1-tgrabiec@scylladb.com>

(cherry picked from commit bf6898a5a0)
2021-11-24 15:17:37 +01:00
Tomasz Grabiec
6bfd322e3b lsa: Mark compact_segment_locked() as noexcept
We cannot recover from a failure in this method. The implementation
makes sure it never happens. Invariants will be broken if this
throws. Detect violations early by marking as noexcept.

We could make it exception safe and try to leave the data structures
in a consistent state but the reclaimer cannot make progress if this throws, so
it's pointless.

Refs #9192
Message-Id: <20211116122019.813418-1-tgrabiec@scylladb.com>

(cherry picked from commit 4d627affc3)
2021-11-24 15:17:35 +01:00
Tomasz Grabiec
afc18d5070 cql: Fix missing data in indexed queries with base table short reads
Indexed queries are using paging over the materialized view
table. Results of the view read are then used to issue reads of the
base table. If base table reads are short reads, the page is returned
to the user and paging state is adjusted accordingly so that when
paging is resumed it will query the view starting from the row
corresponding to the next row in the base which was not yet
returned. However, paging state's "remaining" count was not reset, so
if the view read was exhausted the reading will stop even though the
base table read was short.

Fix by restoring the "remaining" count when adjusting the paging state
on short read.

Tests:

  - index_with_paging_test
  - secondary_index_test

Fixes #9198
Message-Id: <20210818131840.1160267-1-tgrabiec@scylladb.com>

(cherry picked from commit 1e4da2dcce)
2021-11-23 11:22:00 +02:00
Tomasz Grabiec
2ec22c2404 sstables: partition_index_cache: Avoid abort due to benign bad_alloc inside allocating section
shared_promise::get_shared_future() is marked noexcept, but can
allocate memory. It is invoked by sstable partition index cache inside
an allocating section, which means that allocations can throw
bad_alloc even though there is memory to reclaim, so under normal
conditions.

Fix by allocating the shared_promise in a stable memory, in the
standard allocator via lw_shared_ptr<>, so that it can be accessed outside
allocating section.

Fixes #9666

Tests:

  - build/dev/test/boost/sstable_partition_index_cache_test

Message-Id: <20211122165100.1606854-1-tgrabiec@scylladb.com>
(cherry picked from commit 1d84bc6c3b)
2021-11-23 11:21:27 +02:00
Avi Kivity
19da778271 Merge "Run gossiper message handlers in a gate" from Pavel E
"
When gossiper processes its messages in the background some of
the continuations may pop up after the gossiper is shutdown.
This, in turn, may result in unwanted code to be executed when
it doesn't expect.

In particular, storage_service notification hooks may try to
update system keyspace (with "fresh" peer info/state/tokens/etc).
This update doesn't work after drain because drain shuts down
commitlog. The intention was that gossiper did _not_ notify
anyone after drain, because it's shut down during drain too.
But since there are background continuations left, it's not
working as expected.

refs: #9567
tests: unit(dev), dtest.concurrent_schema_changes.snapshot(dev)
"

* 'br-gossiper-background-messages-2' of https://github.com/xemul/scylla:
  gossiper: Guard background processing with gate
  gossiper: Helper for background messaging processing

(cherry picked from commit 9e2b6176a2)
2021-11-19 07:25:26 +02:00
Avi Kivity
cbd4c13ba6 Merge 'Revert "scylla_util.py: return bool value on systemd_unit.is_active()"' from Takuya ASADA
On scylla_unit.py, we provide `systemd_unit.is_active()` to return `systemctl is-active` output.
When we introduced systemd_unit class, we just returned `systemctl is-active` output as string, but we changed the return value to bool after that (2545d7fd43).
This was because `if unit.is_active():` always becomes True even it returns "failed" or "inactive", to avoid such scripting bug.
However, probably this was mistake.
Because systemd unit state is not 2 state, like "start" / "stop", there are many state.

And we already using multiple unit state ("activating", "failed", "inactive", "active") in our Cloud image login prompt:
https://github.com/scylladb/scylla-machine-image/blob/next/common/scylla_login#L135
After we merged 2545d7fd43, the login prompt is broken, because it does not return string as script expected (https://github.com/scylladb/scylla-machine-image/issues/241).

I think we should revert 2545d7fd43, it should return exactly same value as `systemctl is-active` says.

Fixes #9627
Fixes scylladb/scylla-machine-image#241

Closes #9628

* github.com:scylladb/scylla:
  scylla_ntp_setup: use string in systemd_unit.is_active()
  Revert "scylla_util.py: return bool value on systemd_unit.is_active()"

(cherry picked from commit c17101604f)
2021-11-18 11:44:11 +02:00
Pavel Emelyanov
338871802d generic_server: Keep server alive during conn background processing
There's at least one tiny race in generic_server code. The trailing
.handle_exception after the conn->process() captures this, but since the
whole continuation chain happens in the background, that this can be
released thus causing the whole lambda to execute on freed generic_server
instance. This, in turn, is not nice because captured this is used to get
a _logger from.

The fix is based on the observation that all connections pin the server
in memory until all of them (connections) are destructed. Said that, to
keep the server alive in the aforementioned lambda it's enough to make
sure the conn variable (it's lw_shared_ptr on the connection) is alive in
it. Not to generate a bunch of tiny continuations with identical set of
captures -- tail the single .then_wrapped() one and do whatever is needed
to wrap up the connection processing in it.

tests: unit(dev)
fixes: #9316

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20211115105818.11348-1-xemul@scylladb.com>
(cherry picked from commit ba16318457)
2021-11-17 10:21:11 +02:00
Yaron Kaikov
8b5b1b8af6 dist/docker/debian/build_docker.sh: debian version fix for rc releases
When building a docker we relay on `VERSION` value from
`SCYLLA-VERSION-GEN` . For `rc` releases only there is a different
between the configured version (X.X.rcX) and the actualy debian package
we generate (X.X~rcX)

Using a similar solution as i did in dcb10374a5

Fixes: #9616

Closes #9617

(cherry picked from commit 060a91431d)
2021-11-12 20:07:19 +02:00
Takuya ASADA
ea89eff95d dist/docker: fix bashrc filename for Ubuntu
For Debian variants, correct filename is /etc/bash.bashrc.

Fixes #9588

Closes #9589

(cherry picked from commit 201a97e4a4)
2021-11-10 14:25:27 +02:00
Michał Radwański
96421e7779 memtable: fix gcc function argument evaluation order induced use after move
clang evaluates function arguments from left to right, while gcc does so
in reverse. Therefore, this code can be correct on clang and incorrect
on gcc:
```
f(x.sth(), std::move(x))
```

This patch fixes one such instance of this bug, in memtable.cc.

Fixes #9605.

Closes #9606

(cherry picked from commit eff392073c)
2021-11-10 08:58:09 +02:00
Botond Dénes
142336ca53 mutation_writer/feed_writer: don't drop readers with small amount of content
Due to an error in transforming the above routine, readers who have <= a
buffer worth of content are dropped without consuming them.
This is due to the outer consume loop being conditioned on
`is_end_of_stream()`, which will be set for readers that eagerly
pre-fill their buffer and also have no more data then what is in their
buffer.
Change the condition to also check for `is_buffer_empty()` and only drop
the reader if both of these are true.

Fixes: #9594

Tests: unit(mutation_writer_test --repeat=200, dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20211108092923.104504-1-bdenes@scylladb.com>
(cherry picked from commit 4b6c0fe592)
2021-11-09 14:13:21 +02:00
Calle Wilund
492f12248c commitlog: Add explicit track var for "wasted space" to avoid double counting
Refs #9331

In segment::close() we add space to managers "wasted" counter. In destructor,
if we can cleanly delete/recycle the file we remove it. However, if we never
went through close (shutdown - ok, exception in batch_cycle - not ok), we can
end up subtracting numbers that were never added in the first place.
Just keep track of the bytes added in a var.

Observed behaviour in above issue is timeouts in batch_cycle, where we
declare the segment closed early (because we cannot add anything more safely
- chunks could get partial/misplaced). Exception will propagate to caller(s),
but the segment will not go through actual close() call -> destructor should
not assume such.

Closes #9598

(cherry picked from commit 3929b7da1f)
2021-11-09 14:07:04 +02:00
Yaron Kaikov
7eb7a0e5fe release: prepare for 4.6.rc0 2021-11-08 09:18:26 +02:00
37 changed files with 752 additions and 163 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -60,7 +60,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=4.6.dev
VERSION=4.6.rc5
if test -f version
then

View File

@@ -1017,18 +1017,16 @@ future<executor::request_return_type> executor::update_table(client_state& clien
_stats.api_operations.update_table++;
elogger.trace("Updating table {}", request);
std::string table_name = get_table_name(request);
if (table_name.find(INTERNAL_TABLE_PREFIX) == 0) {
schema_ptr tab = get_table(_proxy, request);
// the ugly but harmless conversion to string_view here is because
// Seastar's sstring is missing a find(std::string_view) :-()
if (std::string_view(tab->cf_name()).find(INTERNAL_TABLE_PREFIX) == 0) {
return make_ready_future<request_return_type>(api_error::validation(
format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX)));
}
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
tracing::add_table_name(trace_state, keyspace_name, table_name);
tracing::add_table_name(trace_state, tab->ks_name(), tab->cf_name());
auto& db = _proxy.get_db().local();
auto& cf = db.find_column_family(keyspace_name, table_name);
schema_builder builder(cf.schema());
schema_builder builder(tab);
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {

View File

@@ -593,8 +593,8 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
clogger.trace("csm {}: insert dummy at {}", fmt::ptr(this), _lower_bound);
auto it = with_allocator(_lsa_manager.region().allocator(), [&] {
auto& rows = _snp->version()->partition().mutable_clustered_rows();
auto new_entry = current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no);
return rows.insert_before(_next_row.get_iterator_in_latest_version(), *new_entry);
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no));
return rows.insert_before(_next_row.get_iterator_in_latest_version(), std::move(new_entry));
});
_snp->tracker()->insert(*it);
_last_row = partition_snapshot_row_weakref(*_snp, it, true);

View File

@@ -1634,7 +1634,7 @@ future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader reader, co
while (auto mf_opt = co_await reader()) {
if (cdata.is_stop_requested()) [[unlikely]] {
// Compaction manager will catch this exception and re-schedule the compaction.
co_return coroutine::make_exception(compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested));
throw compaction_stopped_exception(schema->ks_name(), schema->cf_name(), cdata.stop_requested);
}
const auto& mf = *mf_opt;

View File

@@ -995,6 +995,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
}
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
paging_state_copy->set_remaining(internal_paging_size);
paging_state_copy->set_partition_key(std::move(index_pk));
paging_state_copy->set_clustering_key(std::move(index_ck));
return std::move(paging_state_copy);

View File

@@ -428,6 +428,8 @@ private:
void abort_recycled_list(std::exception_ptr);
void abort_deletion_promise(std::exception_ptr);
future<> recalculate_footprint();
future<> rename_file(sstring, sstring) const;
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
@@ -444,6 +446,7 @@ private:
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
seastar::semaphore _reserve_recalculation_guard;
};
template<typename T>
@@ -512,6 +515,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
uint64_t _file_pos = 0;
uint64_t _flush_pos = 0;
uint64_t _size_on_disk = 0;
uint64_t _waste = 0;
size_t _alignment;
@@ -598,7 +602,7 @@ public:
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.active_size_on_disk -= file_position();
_segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position());
_segment_manager->totals.wasted_size_on_disk -= _waste;
_segment_manager->add_file_to_delete(_file_name, _desc);
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
clogger.warn("Segment {} is dirty and is left on disk.", *this);
@@ -725,7 +729,8 @@ public:
auto s = co_await sync();
co_await flush();
co_await terminate();
_segment_manager->totals.wasted_size_on_disk += (_size_on_disk - file_position());
_waste = _size_on_disk - file_position();
_segment_manager->totals.wasted_size_on_disk += _waste;
co_return s;
}
future<sseg_ptr> do_flush(uint64_t pos) {
@@ -1223,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
, _reserve_recalculation_guard(1)
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
@@ -1248,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
}
try {
gate::holder g(_gate);
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
if (_reserve_segments.full()) {
// can happen if we recalculate
continue;
}
// note: if we were strict with disk size, we would refuse to do this
// unless disk footprint is lower than threshold. but we cannot (yet?)
// trust that flush logic will absolutely free up an existing
@@ -1519,7 +1530,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) {
for (auto * ext : cfg.extensions->commitlog_file_extensions()) {
auto nf = co_await ext->wrap_file(std::move(filename), f, flags);
auto nf = co_await ext->wrap_file(filename, f, flags);
if (nf) {
f = std::move(nf);
align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment();
@@ -1530,12 +1541,21 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
f = make_checked_file(commit_error_handler, std::move(f));
} catch (...) {
ep = std::current_exception();
commit_error_handler(ep);
}
if (ep) {
// do this early, so iff we are to fast-fail server,
// we do it before anything else can go wrong.
try {
commit_error_handler(ep);
} catch (...) {
ep = std::current_exception();
}
}
if (ep && f) {
co_await f.close();
}
if (ep) {
add_file_to_delete(filename, d);
co_return coroutine::exception(std::move(ep));
}
@@ -1594,6 +1614,8 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::new_segment() {
gate::holder g(_gate);
if (_shutdown) {
co_return coroutine::make_exception(std::runtime_error("Commitlog has been shut down. Cannot add data"));
}
@@ -1628,22 +1650,23 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
co_return _segments.back();
}
if (_segment_allocating) {
co_await _segment_allocating->get_future(timeout);
continue;
}
promise<> p;
_segment_allocating.emplace(p.get_future());
auto finally = defer([&] () noexcept { _segment_allocating = std::nullopt; });
try {
gate::holder g(_gate);
auto s = co_await with_timeout(timeout, new_segment());
p.set_value();
} catch (...) {
p.set_exception(std::current_exception());
throw;
// #9896 - we don't want to issue a new_segment call until
// the old one has terminated with either result or exception.
// Do all waiting through the shared_future
if (!_segment_allocating) {
auto f = new_segment();
// must check that we are not already done.
if (f.available()) {
f.get(); // maybe force exception
continue;
}
_segment_allocating.emplace(f.discard_result().finally([this] {
// clear the shared_future _before_ resolving its contents
// (i.e. with result of this finally)
_segment_allocating = std::nullopt;
}));
}
co_await _segment_allocating->get_future(timeout);
}
}
@@ -1865,6 +1888,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
std::exception_ptr recycle_error;
size_t num_deleted = 0;
bool except = false;
while (!files.empty()) {
auto filename = std::move(files.back());
files.pop_back();
@@ -1914,8 +1939,10 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
}
}
co_await delete_file(filename);
++num_deleted;
} catch (...) {
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
except = true;
}
}
@@ -1928,6 +1955,16 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
if (recycle_error && _recycled_segments.empty()) {
abort_recycled_list(recycle_error);
}
// If recycle failed and turned into a delete, we should fake-wakeup waiters
// since we might still have cleaned up disk space.
if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) {
abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files")));
}
// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
if (except) {
co_await recalculate_footprint();
}
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
@@ -1942,6 +1979,67 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e
std::exchange(_disk_deletions, {}).set_exception(ep);
}
future<> db::commitlog::segment_manager::recalculate_footprint() {
try {
co_await do_pending_deletes();
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
auto segments_copy = _segments;
std::vector<sseg_ptr> reserves;
std::vector<sstring> recycles;
// this causes haywire things while we steal stuff, but...
while (!_reserve_segments.empty()) {
reserves.push_back(_reserve_segments.pop());
}
while (!_recycled_segments.empty()) {
recycles.push_back(_recycled_segments.pop());
}
// #9955 - must re-stock the queues before we do anything
// interruptable/continuation. Because both queues are
// used with push/pop eventually which _waits_ for signal
// but does _not_ verify that the condition is true once
// we return. So copy the objects and look at instead.
for (auto& filename : recycles) {
_recycled_segments.push(sstring(filename));
}
for (auto& s : reserves) {
_reserve_segments.push(sseg_ptr(s)); // you can have it back now.
}
// first, guesstimate sizes
uint64_t recycle_size = recycles.size() * max_size;
auto old = totals.total_size_on_disk;
totals.total_size_on_disk = recycle_size;
for (auto& s : _segments) {
totals.total_size_on_disk += s->_size_on_disk;
}
for (auto& s : reserves) {
totals.total_size_on_disk += s->_size_on_disk;
}
// now we need to adjust the actual sizes of recycled files
uint64_t actual_recycled_size = 0;
try {
for (auto& filename : recycles) {
auto s = co_await seastar::file_size(filename);
actual_recycled_size += s;
}
} catch (...) {
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
actual_recycled_size = recycle_size; // best we got
}
totals.total_size_on_disk += actual_recycled_size - recycle_size;
// pushing things to reserve/recycled queues will have resumed any
// waiters, so we should be done.
} catch (...) {
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
}
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftc = std::exchange(_files_to_close, {});
auto ftd = std::exchange(_files_to_delete, {});

View File

@@ -100,6 +100,7 @@ def version_compare(a, b):
def create_uuid_file(fl):
with open(args.uuid_file, 'w') as myfile:
myfile.write(str(uuid.uuid1()) + "\n")
os.chmod(args.uuid_file, 0o644)
def sanitize_version(version):

View File

@@ -278,6 +278,66 @@ if __name__ == "__main__":
disk_properties["read_bandwidth"] = 2527296683 * nr_disks
disk_properties["write_iops"] = 156326 * nr_disks
disk_properties["write_bandwidth"] = 1063657088 * nr_disks
elif idata.instance() == "im4gn.large":
disk_properties["read_iops"] = 33943
disk_properties["read_bandwidth"] = 288433525
disk_properties["write_iops"] = 27877
disk_properties["write_bandwidth"] = 126864680
elif idata.instance() == "im4gn.xlarge":
disk_properties["read_iops"] = 68122
disk_properties["read_bandwidth"] = 576603520
disk_properties["write_iops"] = 55246
disk_properties["write_bandwidth"] = 254534954
elif idata.instance() == "im4gn.2xlarge":
disk_properties["read_iops"] = 136422
disk_properties["read_bandwidth"] = 1152663765
disk_properties["write_iops"] = 92184
disk_properties["write_bandwidth"] = 508926453
elif idata.instance() == "im4gn.4xlarge":
disk_properties["read_iops"] = 273050
disk_properties["read_bandwidth"] = 1638427264
disk_properties["write_iops"] = 92173
disk_properties["write_bandwidth"] = 1027966826
elif idata.instance() == "im4gn.8xlarge":
disk_properties["read_iops"] = 250241 * nr_disks
disk_properties["read_bandwidth"] = 1163130709 * nr_disks
disk_properties["write_iops"] = 86374 * nr_disks
disk_properties["write_bandwidth"] = 977617664 * nr_disks
elif idata.instance() == "im4gn.16xlarge":
disk_properties["read_iops"] = 273030 * nr_disks
disk_properties["read_bandwidth"] = 1638211413 * nr_disks
disk_properties["write_iops"] = 92607 * nr_disks
disk_properties["write_bandwidth"] = 1028340266 * nr_disks
elif idata.instance() == "is4gen.medium":
disk_properties["read_iops"] = 33965
disk_properties["read_bandwidth"] = 288462506
disk_properties["write_iops"] = 27876
disk_properties["write_bandwidth"] = 126954200
elif idata.instance() == "is4gen.large":
disk_properties["read_iops"] = 68131
disk_properties["read_bandwidth"] = 576654869
disk_properties["write_iops"] = 55257
disk_properties["write_bandwidth"] = 254551002
elif idata.instance() == "is4gen.xlarge":
disk_properties["read_iops"] = 136413
disk_properties["read_bandwidth"] = 1152747904
disk_properties["write_iops"] = 92180
disk_properties["write_bandwidth"] = 508889546
elif idata.instance() == "is4gen.2xlarge":
disk_properties["read_iops"] = 273038
disk_properties["read_bandwidth"] = 1628982613
disk_properties["write_iops"] = 92182
disk_properties["write_bandwidth"] = 1027983530
elif idata.instance() == "is4gen.4xlarge":
disk_properties["read_iops"] = 260493 * nr_disks
disk_properties["read_bandwidth"] = 1217396928 * nr_disks
disk_properties["write_iops"] = 83169 * nr_disks
disk_properties["write_bandwidth"] = 1000390784 * nr_disks
elif idata.instance() == "is4gen.8xlarge":
disk_properties["read_iops"] = 273021 * nr_disks
disk_properties["read_bandwidth"] = 1656354602 * nr_disks
disk_properties["write_iops"] = 92233 * nr_disks
disk_properties["write_bandwidth"] = 1028010325 * nr_disks
properties_file = open(etcdir() + "/scylla.d/io_properties.yaml", "w")
yaml.dump({ "disks": [ disk_properties ] }, properties_file, default_flow_style=False)
ioconf = open(etcdir() + "/scylla.d/io.conf", "w")

View File

@@ -66,18 +66,18 @@ if __name__ == '__main__':
target = None
if os.path.exists('/lib/systemd/systemd-timesyncd'):
if systemd_unit('systemd-timesyncd').is_active():
if systemd_unit('systemd-timesyncd').is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
target = 'systemd-timesyncd'
if shutil.which('chronyd'):
if get_chrony_unit().is_active():
if get_chrony_unit().is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
if not target:
target = 'chrony'
if shutil.which('ntpd'):
if get_ntp_unit().is_active():
if get_ntp_unit().is_active() == 'active':
print('ntp is already configured, skip setup')
sys.exit(0)
if not target:

View File

@@ -117,10 +117,11 @@ if __name__ == '__main__':
pkg_install('xfsprogs')
if not shutil.which('mdadm'):
pkg_install('mdadm')
try:
md_service = systemd_unit('mdmonitor.service')
except SystemdException:
md_service = systemd_unit('mdadm.service')
if args.raid_level != '0':
try:
md_service = systemd_unit('mdmonitor.service')
except SystemdException:
md_service = systemd_unit('mdadm.service')
print('Creating {type} for scylla using {nr_disk} disk(s): {disks}'.format(type='fRAID{args.raid_level}' if raid else 'XFS volume', nr_disk=len(disks), disks=args.disks))
procs=[]
@@ -164,14 +165,15 @@ if __name__ == '__main__':
uuid = run(f'blkid -s UUID -o value {fsdev}', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
after = 'local-fs.target'
if raid:
wants = ''
if raid and args.raid_level != '0':
after += f' {md_service}'
wants = f'\nWants={md_service}'
unit_data = f'''
[Unit]
Description=Scylla data directory
Before=scylla-server.service
After={after}
Wants={md_service}
After={after}{wants}
DefaultDependencies=no
[Mount]
@@ -195,7 +197,8 @@ WantedBy=multi-user.target
f.write(f'RequiresMountsFor={mount_at}\n')
systemd_unit.reload()
md_service.start()
if args.raid_level != '0':
md_service.start()
mount = systemd_unit(mntunit_bn)
mount.start()
if args.enable_on_nextboot:

View File

@@ -370,6 +370,10 @@ if __name__ == '__main__':
version_check = interactive_ask_service('Do you want to enable Scylla to check if there is a newer version of Scylla available?', 'Yes - start the Scylla-housekeeping service to check for a newer version. This check runs periodically. No - skips this step.', version_check)
args.no_version_check = not version_check
if version_check:
cfg = sysconfig_parser(sysconfdir_p() / 'scylla-housekeeping')
repo_files = cfg.get('REPO_FILES')
for f in glob.glob(repo_files):
os.chmod(f, 0o644)
with open('/etc/scylla.d/housekeeping.cfg', 'w') as f:
f.write('[housekeeping]\ncheck-version: True\n')
os.chmod('/etc/scylla.d/housekeeping.cfg', 0o644)

View File

@@ -674,7 +674,7 @@ class aws_instance:
return self._type.split(".")[0]
def is_supported_instance_class(self):
if self.instance_class() in ['i2', 'i3', 'i3en', 'c5d', 'm5d', 'm5ad', 'r5d', 'z1d', 'c6gd', 'm6gd', 'r6gd', 'x2gd']:
if self.instance_class() in ['i2', 'i3', 'i3en', 'c5d', 'm5d', 'm5ad', 'r5d', 'z1d', 'c6gd', 'm6gd', 'r6gd', 'x2gd', 'im4gn', 'is4gen']:
return True
return False
@@ -683,7 +683,7 @@ class aws_instance:
instance_size = self.instance_size()
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
return 'ixgbevf'
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5b', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d', 'c6g', 'c6gd', 'm6g', 'm6gd', 't4g', 'r6g', 'r6gd', 'x2gd']:
if instance_class in ['a1', 'c5', 'c5a', 'c5d', 'c5n', 'c6g', 'c6gd', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'm6gd', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5b', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d', 'c6g', 'c6gd', 'm6g', 'm6gd', 't4g', 'r6g', 'r6gd', 'x2gd', 'im4gn', 'is4gen']:
return 'ena'
if instance_class == 'm4':
if instance_size == '16xlarge':
@@ -1041,7 +1041,7 @@ class systemd_unit:
return run('systemctl {} disable {}'.format(self.ctlparam, self._unit), shell=True, check=True)
def is_active(self):
return True if run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip() == 'active' else False
return run('systemctl {} is-active {}'.format(self.ctlparam, self._unit), shell=True, capture_output=True, encoding='utf-8').stdout.strip()
def mask(self):
return run('systemctl {} mask {}'.format(self.ctlparam, self._unit), shell=True, check=True)

View File

@@ -25,6 +25,10 @@ product="$(<build/SCYLLA-PRODUCT-FILE)"
version="$(<build/SCYLLA-VERSION-FILE)"
release="$(<build/SCYLLA-RELEASE-FILE)"
if [[ "$version" = *rc* ]]; then
version=$(echo $version |sed 's/\(.*\)\.)*/\1~/')
fi
mode="release"
if uname -m | grep x86_64 ; then
@@ -93,7 +97,7 @@ run apt-get -y install hostname supervisor openssh-server openssh-client openjdk
run locale-gen en_US.UTF-8
run bash -ec "dpkg -i packages/*.deb"
run apt-get -y clean all
run bash -ec "cat /scylla_bashrc >> /etc/bashrc"
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
run mkdir -p /etc/supervisor.conf.d
run mkdir -p /var/log/scylla
run chown -R scylla:scylla /var/lib/scylla

View File

@@ -184,14 +184,18 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
_logger.info("exception while advertising new connection: {}", std::current_exception());
}
// Block while monitoring for lifetime/errors.
return conn->process().finally([this, conn] {
return unadvertise_connection(conn);
}).handle_exception([this] (std::exception_ptr ep) {
if (is_broken_pipe_or_connection_reset(ep)) {
// expected if another side closes a connection or we're shutting down
return;
return conn->process().then_wrapped([this, conn] (auto f) {
try {
f.get();
} catch (...) {
auto ep = std::current_exception();
if (!is_broken_pipe_or_connection_reset(ep)) {
// some exceptions are expected if another side closes a connection
// or we're shutting down
_logger.info("exception while processing connection: {}", ep);
}
}
_logger.info("exception while processing connection: {}", ep);
return unadvertise_connection(conn);
});
});
return stop_iteration::no;

View File

@@ -477,49 +477,42 @@ gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request requ
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
}
rpc::no_wait_type gossiper::background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn) {
(void)with_gate(_background_msg, [this, type = std::move(type), fn = std::move(fn)] () mutable {
return container().invoke_on(0, std::move(fn)).handle_exception([type = std::move(type)] (auto ep) {
logger.warn("Failed to handle {}: {}", type, ep);
});
});
return messaging_service::no_wait();
}
void gossiper::init_messaging_service_handler() {
_messaging.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_SYN", [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_syn_msg(from, std::move(syn_msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_ACK", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack2([this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)container().invoke_on(0, [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return background_msg("GOSSIP_DIGEST_ACK2", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack2_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_echo([this] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return handle_echo_msg(from, generation_number_opt);
});
_messaging.register_gossip_shutdown([this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
// In a new fiber.
(void)container().invoke_on(0, [from, generation_number_opt] (gms::gossiper& gossiper) {
return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
return gossiper.handle_shutdown_msg(from, generation_number_opt);
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
});
return messaging_service::no_wait();
});
_messaging.register_gossip_get_endpoint_states([this] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
@@ -2178,6 +2171,9 @@ future<> gossiper::start() {
}
future<> gossiper::shutdown() {
if (!_background_msg.is_closed()) {
co_await _background_msg.close();
}
if (this_shard_id() == 0) {
co_await do_stop_gossiping();
}

View File

@@ -41,7 +41,9 @@
#include "unimplemented.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/print.hh>
#include <seastar/rpc/rpc_types.hh>
#include "utils/atomic_vector.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
@@ -138,12 +140,16 @@ private:
bool _enabled = false;
semaphore _callback_running{1};
semaphore _apply_state_locally_semaphore{100};
seastar::gate _background_msg;
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
bool _advertise_myself = true;
// Map ip address and generation number
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
future<> _failure_detector_loop_done{make_ready_future<>()} ;
rpc::no_wait_type background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
public:
// Get current generation number for the given nodes
future<std::unordered_map<gms::inet_address, int32_t>>

View File

@@ -613,7 +613,8 @@ static flat_mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
} else {
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(snp->schema(), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
schema_ptr snp_schema = snp->schema();
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
}
}

View File

@@ -628,7 +628,12 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
remove_error_rpc_client(verb, id);
}
auto must_encrypt = [&id, &verb, this] {
auto addr = get_preferred_ip(id.addr);
auto broadcast_address = utils::fb_utilities::get_broadcast_address();
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != broadcast_address;
auto laddr = socket_address(listen_to_bc ? broadcast_address : _cfg.ip, 0);
auto must_encrypt = [&] {
if (_cfg.encrypt == encrypt_what::none) {
return false;
}
@@ -646,13 +651,27 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
// either rack/dc need to be in same dc to use non-tls
if (snitch_ptr->get_datacenter(id.addr) != snitch_ptr->get_datacenter(utils::fb_utilities::get_broadcast_address())) {
auto my_dc = snitch_ptr->get_datacenter(broadcast_address);
if (snitch_ptr->get_datacenter(addr) != my_dc) {
return true;
}
// #9653 - if our idea of dc for bind address differs from our official endpoint address,
// we cannot trust downgrading. We need to ensure either (local) bind address is same as
// broadcast or that the dc info we get for it is the same.
if (broadcast_address != laddr && snitch_ptr->get_datacenter(laddr) != my_dc) {
return true;
}
// if cross-rack tls, check rack.
return _cfg.encrypt == encrypt_what::rack &&
snitch_ptr->get_rack(id.addr) != snitch_ptr->get_rack(utils::fb_utilities::get_broadcast_address())
;
if (_cfg.encrypt == encrypt_what::dc) {
return false;
}
auto my_rack = snitch_ptr->get_rack(broadcast_address);
if (snitch_ptr->get_rack(addr) != my_rack) {
return true;
}
// See above: We need to ensure either (local) bind address is same as
// broadcast or that the rack info we get for it is the same.
return broadcast_address != laddr && snitch_ptr->get_rack(laddr) != my_rack;
}();
auto must_compress = [&id, this] {
@@ -681,7 +700,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
return true;
}();
auto remote_addr = socket_address(get_preferred_ip(id.addr), must_encrypt ? _cfg.ssl_port : _cfg.port);
auto remote_addr = socket_address(addr, must_encrypt ? _cfg.ssl_port : _cfg.port);
rpc::client_options opts;
// send keepalive messages each minute if connection is idle, drop connection after 10 failures
@@ -696,8 +715,6 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
}
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != utils::fb_utilities::get_broadcast_address();
auto laddr = socket_address(listen_to_bc ? utils::fb_utilities::get_broadcast_address() : _cfg.ip, 0);
auto client = must_encrypt ?
::make_shared<rpc_protocol_client_wrapper>(_rpc->protocol(), std::move(opts),
remote_addr, laddr, _credentials) :

View File

@@ -54,7 +54,7 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
auto rd = std::move(rd_ref);
std::exception_ptr ex;
try {
while (!rd.is_end_of_stream()) {
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
co_await rd.fill_buffer();
while (!rd.is_buffer_empty()) {
co_await rd.pop_mutation_fragment().consume(wr);

View File

@@ -411,11 +411,11 @@ public:
} else {
// Copy row from older version because rows in evictable versions must
// hold values which are independently complete to be consistent on eviction.
auto e = current_allocator().construct<rows_entry>(_schema, *_current_row[0].it);
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, *_current_row[0].it));
e->set_continuous(latest_i && latest_i->continuous());
_snp.tracker()->insert(*e);
rows.insert_before(latest_i, *e);
return {*e, true};
auto e_i = rows.insert_before(latest_i, std::move(e));
return ensure_result{*e_i, true};
}
}
@@ -447,11 +447,11 @@ public:
}
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
auto latest_i = get_iterator_in_latest_version();
auto e = current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
is_continuous(latest_i && latest_i->continuous()));
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
is_continuous(latest_i && latest_i->continuous())));
_snp.tracker()->insert(*e);
rows.insert_before(latest_i, *e);
return ensure_result{*e, true};
auto e_i = rows.insert_before(latest_i, std::move(e));
return ensure_result{*e_i, true};
}
// Brings the entry pointed to by the cursor to the front of the LRU

View File

@@ -42,28 +42,34 @@ static auto construct_range_tombstone_entry(Args&&... args) {
}
void range_tombstone_list::apply_reversibly(const schema& s,
clustering_key_prefix start, bound_kind start_kind,
clustering_key_prefix end,
clustering_key_prefix start_key, bound_kind start_kind,
clustering_key_prefix end_key,
bound_kind end_kind,
tombstone tomb,
reverter& rev)
{
position_in_partition::less_compare less(s);
position_in_partition start(position_in_partition::range_tag_t(), bound_view(std::move(start_key), start_kind));
position_in_partition end(position_in_partition::range_tag_t(), bound_view(std::move(end_key), end_kind));
if (!less(start, end)) {
return;
}
if (!_tombstones.empty()) {
bound_view::compare less(s);
bound_view start_bound(start, start_kind);
auto last = --_tombstones.end();
range_tombstones_type::iterator it;
if (less(start_bound, last->end_bound())) {
it = _tombstones.upper_bound(start_bound, [less](auto&& sb, auto&& rt) {
return less(sb, rt.end_bound());
if (less(start, last->end_position())) {
it = _tombstones.upper_bound(start, [less](auto&& sb, auto&& rt) {
return less(sb, rt.end_position());
});
} else {
it = _tombstones.end();
}
insert_from(s, std::move(it), std::move(start), start_kind, std::move(end), end_kind, std::move(tomb), rev);
insert_from(s, std::move(it), std::move(start), std::move(end), std::move(tomb), rev);
return;
}
auto rt = construct_range_tombstone_entry(std::move(start), start_kind, std::move(end), end_kind, std::move(tomb));
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), std::move(tomb));
rev.insert(_tombstones.end(), *rt);
rt.release();
}
@@ -81,35 +87,31 @@ void range_tombstone_list::apply_reversibly(const schema& s,
*/
void range_tombstone_list::insert_from(const schema& s,
range_tombstones_type::iterator it,
clustering_key_prefix start,
bound_kind start_kind,
clustering_key_prefix end,
bound_kind end_kind,
position_in_partition start,
position_in_partition end,
tombstone tomb,
reverter& rev)
{
bound_view::compare less(s);
bound_view end_bound(end, end_kind);
position_in_partition::tri_compare cmp(s);
if (it != _tombstones.begin()) {
auto prev = std::prev(it);
if (prev->tombstone().tomb == tomb && prev->end_bound().adjacent(s, bound_view(start, start_kind))) {
start = prev->tombstone().start;
start_kind = prev->tombstone().start_kind;
if (prev->tombstone().tomb == tomb && cmp(prev->end_position(), start) == 0) {
start = prev->position();
rev.erase(prev);
}
}
while (it != _tombstones.end()) {
bound_view start_bound(start, start_kind);
if (less(end_bound, start_bound)) {
if (cmp(end, start) <= 0) {
return;
}
if (less(end_bound, it->start_bound())) {
if (cmp(end, it->position()) < 0) {
// not overlapping
if (it->tombstone().tomb == tomb && end_bound.adjacent(s, it->start_bound())) {
rev.update(it, {std::move(start), start_kind, it->tombstone().end, it->tombstone().end_kind, tomb});
if (it->tombstone().tomb == tomb && cmp(end, it->position()) == 0) {
rev.update(it, {std::move(start), std::move(start), tomb});
} else {
auto rt = construct_range_tombstone_entry(std::move(start), start_kind, std::move(end), end_kind, tomb);
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), tomb);
rev.insert(it, *rt);
rt.release();
}
@@ -119,34 +121,29 @@ void range_tombstone_list::insert_from(const schema& s,
auto c = tomb <=> it->tombstone().tomb;
if (c == 0) {
// same timestamp, overlapping or adjacent, so merge.
if (less(it->start_bound(), start_bound)) {
start = it->tombstone().start;
start_kind = it->tombstone().start_kind;
if (cmp(it->position(), start) < 0) {
start = it->position();
}
if (less(end_bound, it->end_bound())) {
end = it->tombstone().end;
end_kind = it->tombstone().end_kind;
end_bound = bound_view(end, end_kind);
if (cmp(end, it->end_position()) < 0) {
end = it->end_position();
}
it = rev.erase(it);
} else if (c > 0) {
// We overwrite the current tombstone.
if (less(it->start_bound(), start_bound)) {
auto new_end = bound_view(start, invert_kind(start_kind));
if (!less(new_end, it->start_bound())) {
// Here it->start < start
auto rt = construct_range_tombstone_entry(it->start_bound(), new_end, it->tombstone().tomb);
rev.update(it, {start_bound, it->end_bound(), it->tombstone().tomb});
if (cmp(it->position(), start) < 0) {
{
auto rt = construct_range_tombstone_entry(it->position(), start, it->tombstone().tomb);
rev.update(it, {start, it->end_position(), it->tombstone().tomb});
rev.insert(it, *rt);
rt.release();
}
}
if (less(end_bound, it->end_bound())) {
if (cmp(end, it->end_position()) < 0) {
// Here start <= it->start and end < it->end.
auto rt = construct_range_tombstone_entry(std::move(start), start_kind, end, end_kind, std::move(tomb));
rev.update(it, {std::move(end), invert_kind(end_kind), it->tombstone().end, it->tombstone().end_kind, it->tombstone().tomb});
auto rt = construct_range_tombstone_entry(std::move(start), end, std::move(tomb));
rev.update(it, {std::move(end), it->end_position(), it->tombstone().tomb});
rev.insert(it, *rt);
rt.release();
return;
@@ -157,30 +154,28 @@ void range_tombstone_list::insert_from(const schema& s,
} else {
// We don't overwrite the current tombstone.
if (less(start_bound, it->start_bound())) {
if (cmp(start, it->position()) < 0) {
// The new tombstone starts before the current one.
if (less(it->start_bound(), end_bound)) {
if (cmp(it->position(), end) < 0) {
// Here start < it->start and it->start < end.
auto new_end_kind = invert_kind(it->tombstone().start_kind);
if (!less(bound_view(it->tombstone().start, new_end_kind), start_bound)) {
auto rt = construct_range_tombstone_entry(std::move(start), start_kind, it->tombstone().start, new_end_kind, tomb);
{
auto rt = construct_range_tombstone_entry(std::move(start), it->position(), tomb);
it = rev.insert(it, *rt);
rt.release();
++it;
}
} else {
// Here start < it->start and end <= it->start, so just insert the new tombstone.
auto rt = construct_range_tombstone_entry(std::move(start), start_kind, std::move(end), end_kind, std::move(tomb));
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), std::move(tomb));
rev.insert(it, *rt);
rt.release();
return;
}
}
if (less(it->end_bound(), end_bound)) {
if (cmp(it->end_position(), end) < 0) {
// Here the current tombstone overwrites a range of the new one.
start = it->tombstone().end;
start_kind = invert_kind(it->tombstone().end_kind);
start = it->end_position();
++it;
} else {
// Here the current tombstone completely overwrites the new one.
@@ -190,7 +185,7 @@ void range_tombstone_list::insert_from(const schema& s,
}
// If we got here, then just insert the remainder at the end.
auto rt = construct_range_tombstone_entry(std::move(start), start_kind, std::move(end), end_kind, std::move(tomb));
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), std::move(tomb));
rev.insert(it, *rt);
rt.release();
}

View File

@@ -297,7 +297,13 @@ public:
private:
void apply_reversibly(const schema& s, clustering_key_prefix start, bound_kind start_kind,
clustering_key_prefix end, bound_kind end_kind, tombstone tomb, reverter& rev);
void insert_from(const schema& s, range_tombstones_type::iterator it, clustering_key_prefix start,
bound_kind start_kind, clustering_key_prefix end, bound_kind end_kind, tombstone tomb, reverter& rev);
void insert_from(const schema& s,
range_tombstones_type::iterator it,
position_in_partition start,
position_in_partition end,
tombstone tomb,
reverter& rev);
range_tombstones_type::iterator find(const schema& s, const range_tombstone_entry& rt);
};

View File

@@ -249,6 +249,14 @@ public:
return _base_resources;
}
void release_base_resources() noexcept {
if (_base_resources_consumed) {
_resources -= _base_resources;
_base_resources_consumed = false;
}
_semaphore.signal(std::exchange(_base_resources, {}));
}
sstring description() const {
return format("{}.{}:{}",
_schema ? _schema->ks_name() : "*",
@@ -394,6 +402,10 @@ reader_resources reader_permit::base_resources() const {
return _impl->base_resources();
}
void reader_permit::release_base_resources() noexcept {
return _impl->release_base_resources();
}
sstring reader_permit::description() const {
return _impl->description();
}

View File

@@ -161,6 +161,8 @@ public:
reader_resources base_resources() const;
void release_base_resources() noexcept;
sstring description() const;
db::timeout_clock::time_point timeout() const noexcept;

View File

@@ -407,6 +407,10 @@ public:
{},
mutation_reader::forwarding::no);
} else {
// We can't have two permits with count resource for 1 repair.
// So we release the one on _permit so the only one is the one the
// shard reader will obtain.
_permit.release_base_resources();
_reader = make_multishard_streaming_reader(db, _schema, _permit, [this] {
auto shard_range = _sharder.next();
if (shard_range) {

Submodule seastar updated: a189cdc45d...a375681303

View File

@@ -3670,7 +3670,7 @@ shared_ptr<abort_source> node_ops_meta_data::get_abort_source() {
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3680,7 +3680,7 @@ void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
void storage_service::node_ops_done(utils::UUID ops_uuid) {
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3691,7 +3691,7 @@ void storage_service::node_ops_done(utils::UUID ops_uuid) {
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;

View File

@@ -49,12 +49,13 @@ private:
public:
partition_index_cache* _parent;
key_type _key;
std::variant<shared_promise<>, partition_index_page> _page;
std::variant<lw_shared_ptr<shared_promise<>>, partition_index_page> _page;
size_t _size_in_allocator = 0;
public:
entry(partition_index_cache* parent, key_type key)
: _parent(parent)
, _key(key)
, _page(make_lw_shared<shared_promise<>>())
{ }
void set_page(partition_index_page&& page) noexcept {
@@ -76,7 +77,7 @@ private:
// Always returns the same value for a given state of _page.
size_t size_in_allocator() const { return _size_in_allocator; }
shared_promise<>& promise() { return std::get<shared_promise<>>(_page); }
lw_shared_ptr<shared_promise<>> promise() { return std::get<lw_shared_ptr<shared_promise<>>>(_page); }
bool ready() const { return std::holds_alternative<partition_index_page>(_page); }
partition_index_page& page() { return std::get<partition_index_page>(_page); }
const partition_index_page& page() const { return std::get<partition_index_page>(_page); }
@@ -207,9 +208,7 @@ public:
return make_ready_future<entry_ptr>(std::move(ptr));
} else {
++_shard_stats.blocks;
return _as(_region, [ptr] () mutable {
return ptr.get_entry().promise().get_shared_future();
}).then([ptr] () mutable {
return ptr.get_entry().promise()->get_shared_future().then([ptr] () mutable {
return std::move(ptr);
});
}
@@ -234,23 +233,23 @@ public:
// No exceptions before then_wrapped() is installed so that ptr will be eventually populated.
return futurize_invoke(loader, key).then_wrapped([this, key, ptr] (auto&& f) mutable {
return futurize_invoke(loader, key).then_wrapped([this, key, ptr = std::move(ptr)] (auto&& f) mutable {
entry& e = ptr.get_entry();
try {
partition_index_page&& page = f.get0();
e.promise().set_value();
e.promise()->set_value();
e.set_page(std::move(page));
_shard_stats.used_bytes += e.size_in_allocator();
++_shard_stats.populations;
return ptr;
} catch (...) {
e.promise().set_exception(std::current_exception());
e.promise()->set_exception(std::current_exception());
ptr = {};
with_allocator(_region.allocator(), [&] {
_cache.erase(key);
});
throw;
}
}).then([ptr] {
return ptr;
});
}

43
test.py
View File

@@ -291,6 +291,8 @@ class Test:
def print_summary(self):
pass
def get_junit_etree(self):
return None
def check_log(self, trim):
"""Check and trim logs and xml output for tests which have it"""
@@ -338,9 +340,36 @@ class BoostTest(UnitTest):
boost_args += ['--color_output=false']
boost_args += ['--']
self.args = boost_args + self.args
self.casename = casename
self.__junit_etree = None
def get_junit_etree(self):
def adjust_suite_name(name):
# Normalize "path/to/file.cc" to "path.to.file" to conform to
# Jenkins expectations that the suite name is a class name. ".cc"
# doesn't add any infomation. Add the mode, otherwise failures
# in different modes are indistinguishable. The "test/" prefix adds
# no information, so remove it.
import re
name = re.sub(r'^test/', '', name)
name = re.sub(r'\.cc$', '', name)
name = re.sub(r'/', '.', name)
name = f'{name}.{self.mode}'
return name
if self.__junit_etree is None:
self.__junit_etree = ET.parse(self.xmlout)
root = self.__junit_etree.getroot()
suites = root.findall('.//TestSuite')
for suite in suites:
suite.attrib['name'] = adjust_suite_name(suite.attrib['name'])
skipped = suite.findall('./TestCase[@reason="disabled"]')
for e in skipped:
suite.remove(e)
os.unlink(self.xmlout)
return self.__junit_etree
def check_log(self, trim):
ET.parse(self.xmlout)
self.get_junit_etree()
super().check_log(trim)
@@ -800,6 +829,17 @@ def write_junit_report(tmpdir, mode):
with open(junit_filename, "w") as f:
ET.ElementTree(xml_results).write(f, encoding="unicode")
def write_consolidated_boost_junit_xml(tmpdir, mode):
xml = ET.Element("TestLog")
for suite in TestSuite.suites.values():
for test in suite.tests:
if test.mode != mode:
continue
test_xml = test.get_junit_etree()
if test_xml is not None:
xml.extend(test_xml.getroot().findall('.//TestSuite'))
et = ET.ElementTree(xml)
et.write(f'{tmpdir}/{mode}/xml/boost.xunit.xml', encoding='unicode')
def open_log(tmpdir):
pathlib.Path(tmpdir).mkdir(parents=True, exist_ok=True)
@@ -839,6 +879,7 @@ async def main():
for mode in options.modes:
write_junit_report(options.tmpdir, mode)
write_consolidated_boost_junit_xml(options.tmpdir, mode)
if 'coverage' in options.modes:
coverage.generate_coverage_report("build/coverage", "tests")

View File

@@ -16,6 +16,9 @@
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
# Tests for basic table operations: CreateTable, DeleteTable, ListTables.
# Also some basic tests for UpdateTable - although UpdateTable usually
# enables more elaborate features (such as GSI or Streams) and those are
# tested elsewhere.
import pytest
from botocore.exceptions import ClientError
@@ -311,3 +314,17 @@ def test_table_sse_off(dynamodb):
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
table.delete();
# Test that trying to delete a table that doesn't exist fails in the
# appropriate way (ResourceNotFoundException)
def test_delete_table_non_existent(dynamodb, test_table):
client = dynamodb.meta.client
with pytest.raises(ClientError, match='ResourceNotFoundException'):
client.delete_table(TableName=random_string(20))
# Test that trying to update a table that doesn't exist fails in the
# appropriate way (ResourceNotFoundException)
def test_update_table_non_existent(dynamodb, test_table):
client = dynamodb.meta.client
with pytest.raises(ClientError, match='ResourceNotFoundException'):
client.update_table(TableName=random_string(20), BillingMode='PAY_PER_REQUEST')

View File

@@ -19,6 +19,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/irange.hpp>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/core/iostream.hh>
@@ -49,6 +50,15 @@ static sstring read_to_string(cached_file::stream& s, size_t limit = std::numeri
return b.substr(0, limit);
}
static void read_to_void(cached_file::stream& s, size_t limit = std::numeric_limits<size_t>::max()) {
while (auto buf = s.next().get0()) {
if (buf.size() >= limit) {
break;
}
limit -= buf.size();
}
}
static sstring read_to_string(file& f, size_t start, size_t len) {
file_input_stream_options opt;
auto in = make_file_input_stream(f, start, len, opt);
@@ -61,6 +71,12 @@ static sstring read_to_string(cached_file& cf, size_t off, size_t limit = std::n
return read_to_string(s, limit);
}
[[gnu::unused]]
static void read_to_void(cached_file& cf, size_t off, size_t limit = std::numeric_limits<size_t>::max()) {
auto s = cf.read(off, default_priority_class(), std::nullopt);
read_to_void(s, limit);
}
struct test_file {
tmpdir dir;
file f;
@@ -255,6 +271,88 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
}
}
// A file which serves garbage but is very fast.
class garbage_file_impl : public file_impl {
private:
[[noreturn]] void unsupported() {
throw_with_backtrace<std::logic_error>("unsupported operation");
}
public:
// unsupported
virtual future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) override { unsupported(); }
virtual future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) override { unsupported(); }
virtual future<> flush(void) override { unsupported(); }
virtual future<> truncate(uint64_t length) override { unsupported(); }
virtual future<> discard(uint64_t offset, uint64_t length) override { unsupported(); }
virtual future<> allocate(uint64_t position, uint64_t length) override { unsupported(); }
virtual subscription<directory_entry> list_directory(std::function<future<>(directory_entry)>) override { unsupported(); }
virtual future<struct stat> stat(void) override { unsupported(); }
virtual future<uint64_t> size(void) override { unsupported(); }
virtual std::unique_ptr<seastar::file_handle_impl> dup() override { unsupported(); }
virtual future<> close() override { return make_ready_future<>(); }
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t size, const io_priority_class& pc) override {
return make_ready_future<temporary_buffer<uint8_t>>(temporary_buffer<uint8_t>(size));
}
virtual future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) override {
unsupported(); // FIXME
}
virtual future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) override {
unsupported(); // FIXME
}
};
#ifndef SEASTAR_DEFAULT_ALLOCATOR // Eviction works only with the seastar allocator
SEASTAR_THREAD_TEST_CASE(test_stress_eviction) {
auto page_size = cached_file::page_size;
auto n_pages = 8'000'000 / page_size;
auto file_size = page_size * n_pages;
auto cached_size = 4'000'000;
cached_file::metrics metrics;
logalloc::region region;
auto f = file(make_shared<garbage_file_impl>());
cached_file cf(f, metrics, cf_lru, region, file_size);
region.make_evictable([&] {
testlog.trace("Evicting");
cf.invalidate_at_most_front(file_size / 2);
return cf_lru.evict();
});
for (int i = 0; i < (cached_size / page_size); ++i) {
read_to_string(cf, page_size * i, page_size);
}
testlog.debug("Saturating memory...");
// Disable background reclaiming which will prevent bugs from reproducing
// We want reclamation to happen synchronously with page cache population in read_to_void()
seastar::memory::set_min_free_pages(0);
// Saturate std memory
chunked_fifo<bytes> blobs;
auto rc = region.reclaim_counter();
while (region.reclaim_counter() == rc) {
blobs.emplace_back(bytes(bytes::initialized_later(), 1024));
}
testlog.debug("Memory: allocated={}, free={}", seastar::memory::stats().allocated_memory(), seastar::memory::stats().free_memory());
testlog.debug("Starting test...");
for (int j = 0; j < n_pages * 16; ++j) {
testlog.trace("Allocating");
auto stride = tests::random::get_int(1, 20);
auto page_idx = tests::random::get_int(n_pages - stride);
read_to_void(cf, page_idx * page_size, page_size * stride);
}
}
#endif
SEASTAR_THREAD_TEST_CASE(test_invalidation) {
auto page_size = cached_file::page_size;
test_file tf = make_test_file(page_size * 2);

View File

@@ -44,7 +44,9 @@
#include "test/lib/tmpdir.hh"
#include "db/commitlog/commitlog.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog_extensions.hh"
#include "db/commitlog/rp_set.hh"
#include "db/extensions.hh"
#include "log.hh"
#include "service/priority_manager.hh"
#include "test/lib/exception_utils.hh"
@@ -947,3 +949,113 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) {
co_await log.clear();
}
}
static future<> do_test_exception_in_allocate_ex(bool do_file_delete, bool reuse = true) {
commitlog::config cfg;
constexpr auto max_size_mb = 1;
cfg.commitlog_segment_size_in_mb = max_size_mb;
cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count;
cfg.commitlog_sync_period_in_ms = 10;
cfg.reuse_segments = reuse;
cfg.allow_going_over_size_limit = false; // #9348 - now can enforce size limit always
cfg.use_o_dsync = true; // make sure we pre-allocate.
// not using cl_test, because we need to be able to abandon
// the log.
tmpdir tmp;
cfg.commit_log_location = tmp.path().string();
class myfail : public std::exception {
public:
using std::exception::exception;
};
struct myext: public db::commitlog_file_extension {
public:
bool fail = false;
bool thrown = false;
bool do_file_delete;
myext(bool dd)
: do_file_delete(dd)
{}
seastar::future<seastar::file> wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override {
if (fail && !thrown) {
thrown = true;
if (do_file_delete) {
co_await f.close();
co_await seastar::remove_file(filename);
}
throw myfail{};
}
co_return f;
}
seastar::future<> before_delete(const seastar::sstring&) override {
co_return;
}
};
auto ep = std::make_unique<myext>(do_file_delete);
auto& mx = *ep;
db::extensions myexts;
myexts.add_commitlog_file_extension("hufflepuff", std::move(ep));
cfg.extensions = &myexts;
auto log = co_await commitlog::create_commitlog(cfg);
rp_set rps;
// uncomment for verbosity
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
auto uuid = utils::UUID_gen::get_time_UUID();
auto size = log.max_record_size();
auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) {
log.discard_completed_segments(id, rps);
mx.fail = true;
});
try {
while (!mx.thrown) {
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.fill('1', size);
});
rps.put(std::move(h));
}
} catch (...) {
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
}
co_await log.shutdown();
co_await log.clear();
}
/**
* Test generating an exception in segment file allocation
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) {
co_await do_test_exception_in_allocate_ex(false);
}
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_no_recycle) {
co_await do_test_exception_in_allocate_ex(false, false);
}
/**
* Test generating an exception in segment file allocation, but also
* delete the file, which in turn should cause follow-up exceptions
* in cleanup delete. Which CL should handle
*/
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file) {
co_await do_test_exception_in_allocate_ex(true, false);
}
SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex_deleted_file_no_recycle) {
co_await do_test_exception_in_allocate_ex(true);
}

View File

@@ -22,6 +22,8 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
SEASTAR_TEST_CASE(test_index_with_paging) {
@@ -56,3 +58,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}

View File

@@ -210,6 +210,35 @@ BOOST_AUTO_TEST_CASE(test_overlapping_addition) {
BOOST_REQUIRE(it == l.end());
}
BOOST_AUTO_TEST_CASE(test_adjacent_empty_range_tombstone) {
range_tombstone_list l(*s);
l.apply(*s, rtie(1, 1, 2));
l.apply(*s, rt(1, 2, 3));
l.apply(*s, rtei(2, 2, 2));
l.apply(*s, rtei(2, 4, 3));
auto it = l.begin();
assert_rt(rt(1, 4, 3), *it++);
BOOST_REQUIRE(it == l.end());
}
BOOST_AUTO_TEST_CASE(test_empty_range_tombstones_are_dropped) {
range_tombstone_list l(*s);
l.apply(*s, rtei(0, 0, 1));
l.apply(*s, rtie(0, 0, 1));
l.apply(*s, rt(1, 2, 1));
l.apply(*s, rtei(4, 4, 1));
l.apply(*s, rtie(5, 5, 1));
l.apply(*s, rt(7, 8, 1));
auto it = l.begin();
assert_rt(rt(1, 2, 1), *it++);
assert_rt(rt(7, 8, 1), *it++);
BOOST_REQUIRE(it == l.end());
}
BOOST_AUTO_TEST_CASE(test_simple_overlap) {
range_tombstone_list l1(*s);
@@ -473,6 +502,23 @@ static std::vector<range_tombstone> make_random() {
rts.emplace_back(std::move(start_b), std::move(end_b), tombstone(dist(gen), gc_now));
}
int32_t size_empty = dist(gen) / 2;
for (int32_t i = 0; i < size_empty; ++i) {
clustering_key_prefix key = make_random_ckey();
bool start_incl = dist(gen) > 25;
if (start_incl) {
rts.emplace_back(
position_in_partition::before_key(key),
position_in_partition::before_key(key),
tombstone(dist(gen), gc_now));
} else {
rts.emplace_back(
position_in_partition::after_key(key),
position_in_partition::after_key(key),
tombstone(dist(gen), gc_now));
}
}
return rts;
}

View File

@@ -157,6 +157,7 @@ private:
metrics& _metrics;
lru& _lru;
logalloc::region& _region;
logalloc::allocating_section _as;
using cache_type = bplus::tree<page_idx_type, cached_page, page_idx_less_comparator, 12, bplus::key_search::linear>;
cache_type _cache;
@@ -187,10 +188,15 @@ private:
.then([this, idx] (temporary_buffer<char>&& buf) mutable {
cached_page::ptr_type first_page;
while (buf.size()) {
auto this_buf = buf.share();
this_buf.trim(std::min(page_size, buf.size()));
buf.trim_front(this_buf.size());
auto it_and_flag = _cache.emplace(idx, this, idx, std::move(this_buf));
auto this_size = std::min(page_size, buf.size());
// _cache.emplace() needs to run under allocating section even though it lives in the std space
// because bplus::tree operations are not reentrant, so we need to prevent memory reclamation.
auto it_and_flag = _as(_region, [&] {
auto this_buf = buf.share();
this_buf.trim(this_size);
return _cache.emplace(idx, this, idx, std::move(this_buf));
});
buf.trim_front(this_size);
++idx;
cached_page &cp = *it_and_flag.first;
if (it_and_flag.second) {

View File

@@ -1395,6 +1395,8 @@ private:
}
lsa_buffer alloc_buf(size_t buf_size) {
// Note: Can be re-entered from allocation sites below due to memory reclamation which
// invokes segment compaction.
static_assert(segment::size % buf_align == 0);
if (buf_size > segment::size) {
throw_with_backtrace<std::runtime_error>(format("Buffer size {} too large", buf_size));
@@ -1447,6 +1449,7 @@ private:
if (seg != _buf_active) {
if (desc.is_empty()) {
assert(desc._buf_pointers.empty());
_segment_descs.erase(desc);
desc._buf_pointers = std::vector<entangled>();
free_segment(seg, desc);
@@ -1457,7 +1460,7 @@ private:
}
}
void compact_segment_locked(segment* seg, segment_descriptor& desc) {
void compact_segment_locked(segment* seg, segment_descriptor& desc) noexcept {
auto seg_occupancy = desc.occupancy();
llogger.debug("Compacting segment {} from region {}, {}", fmt::ptr(seg), id(), seg_occupancy);
@@ -1472,6 +1475,7 @@ private:
for (entangled& e : _buf_ptrs_for_compact_segment) {
if (e) {
lsa_buffer* old_ptr = e.get(&lsa_buffer::_link);
assert(&desc == old_ptr->_desc);
lsa_buffer dst = alloc_buf(old_ptr->_size);
memcpy(dst._buf, old_ptr->_buf, dst._size);
old_ptr->_link = std::move(dst._link);
@@ -1502,6 +1506,10 @@ private:
std::vector<entangled> ptrs;
ptrs.reserve(segment::size / buf_align);
segment* new_active = new_segment();
if (_buf_active) [[unlikely]] {
// Memory allocation above could allocate active buffer during segment compaction.
close_buf_active();
}
assert((uintptr_t)new_active->at(0) % buf_align == 0);
segment_descriptor& desc = shard_segment_pool.descriptor(new_active);
desc._buf_pointers = std::move(ptrs);