Compare commits

...

44 Commits

Author SHA1 Message Date
Avi Kivity
d112a230c0 Merge 'Fix hang in multishard_writer' from Asias
"
This series fix hang in multishard_writer when error happens. It contains
- multishard_writer: Abort the queue attached to consumers when producer fails
- repair: Fix hang when the writer is dead

Fixes #6241
Refs: #6248
"

* asias-stream_fix_multishard_writer_hang:
  repair: Fix hang when the writer is dead
  mutation_writer_test: Add test_multishard_writer_producer_aborts
  multishard_writer: Abort the queue attached to consumers when producer fails

(cherry picked from commit 8925e00e96)
2020-05-02 07:35:46 +03:00
Raphael S. Carvalho
4371cb41d0 api/service: fix segfault when taking a snapshot without keyspace specified
If no keyspace is specified when taking snapshot, there will be a segfault
because keynames is unconditionally dereferenced. Let's return an error
because a keyspace must be specified when column families are specified.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20200427195634.99940-1-raphaelsc@scylladb.com>
(cherry picked from commit 02e046608f)

Fixes #6336.
2020-04-30 12:57:39 +03:00
Botond Dénes
a8b9f94dcb schema: schema(): use std::stable_sort() to sort key columns
When multiple key columns (clustering or partition) are passed to
the schema constructor, all having the same column id, the expectation
is that these columns will retain the order in which they were passed to
`schema_builder::with_column()`. Currently however this is not
guaranteed as the schema constructor sort key columns by column id with
`std::sort()`, which doesn't guarantee that equally comparing elements
retain their order. This can be an issue for indexes, the schemas of
which are built independently on each node. If there is any room for
variance between for the key column order, this can result in different
nodes having incompatible schemas for the same index.
The fix is to use `std::stable_sort()` which guarantees that the order
of equally comparing elements won't change.

This is a suspected cause of #5856, although we don't have hard proof.

Fixes: #5856
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
[avi: upgraded "Refs" to "Fixes", since we saw that std::sort() becomes
      unstable at 17 elements, and the failing schema had a
      clustering key with 23 elements]
Message-Id: <20200417121848.1456817-1-bdenes@scylladb.com>
(cherry picked from commit a4aa753f0f)
2020-04-19 18:25:09 +03:00
Hagit Segev
77500f9171 release: prepare for 3.2.5 2020-04-18 18:57:59 +03:00
Kamil Braun
13328e7253 sstables: freeze types nested in collection types in legacy sstables
Some legacy `mc` SSTables (created in Scylla 3.0) may contain incorrect
serialization headers, which don't wrap frozen UDTs nested inside collections
with the FrozenType<...> tag. When reading such SSTable,
Scylla would detect a mismatch between the schema saved in schema
tables (which correctly wraps UDTs in the FrozenType<...> tag) and the schema
from the serialization header (which doesn't have these tags).

SSTables created in Scylla versions 3.1 and above, in particular in
Scylla versions that contain this commit, create correct serialization
headers (which wrap UDTs in the FrozenType<...> tag).

This commit does two things:
1. for all SSTables created after this commit, include a new feature
   flag, CorrectUDTsInCollections, presence of which implies that frozen
   UDTs inside collections have the FrozenType<...> tag.
2. when reading a Scylla SSTable without the feature flag, we assume that UDTs
   nested inside collections are always frozen, even if they don't have
   the tag. This assumption is safe to be made, because at the time of
   this commit, Scylla does not allow non-frozen (multi-cell) types inside
   collections or UDTs, and because of point 1 above.

There is one edge case not covered: if we don't know whether the SSTable
comes from Scylla or from C*. In that case we won't make the assumption
described in 2. Therefore, if we get a mismatch between schema and
serialization headers of a table which we couldn't confirm to come from
Scylla, we will still reject the table. If any user encounters such an
issue (unlikely), we will have to use another solution, e.g. using a
separate tool to rewrite the SSTable.

Fixes #6130.

[avi: adjusted sstable file paths]
(cherry picked from commit 3d811e2f95)
2020-04-17 09:53:17 +03:00
Kamil Braun
79b58f89f1 sstables: move definition of column_translation::state::build to a .cc file
Ref #6130
2020-04-17 09:16:28 +03:00
Asias He
ba2821ec70 gossip: Add an option to force gossip generation
Consider 3 nodes in the cluster, n1, n2, n3 with gossip generation
number g1, g2, g3.

n1, n2, n3 running scylla version with commit
0a52ecb6df (gossip: Fix max generation
drift measure)

One year later, user wants the upgrade n1,n2,n3 to a new version

when n3 does a rolling restart with a new version, n3 will use a
generation number g3'. Because g3' - g2 > MAX_GENERATION_DIFFERENCE and
g3' - g1 > MAX_GENERATION_DIFFERENCE, so g1 and g2 will reject n3's
gossip update and mark g3 as down.

Such unnecessary marking of node down can cause availability issues.
For example:

DC1: n1, n2
DC2: n3, n4

When n3 and n4 restart, n1 and n2 will mark n3 and n4 as down, which
causes the whole DC2 to be unavailable.

To fix, we can start the node with a gossip generation within
MAX_GENERATION_DIFFERENCE difference for the new node.

Once all the nodes run the version with commit
0a52ecb6df, the option is no logger
needed.

Fixes #5164

(cherry picked from commit 743b529c2b)
2020-03-27 12:50:23 +01:00
Asias He
d72555e786 gossiper: Always use the new generation number
User reported an issue that after a node restart, the restarted node
is marked as DOWN by other nodes in the cluster while the node is up
and running normally.

Consier the following:

- n1, n2, n3 in the cluster
- n3 shutdown itself
- n3 send shutdown verb to n1 and n2
- n1 and n2 set n3 in SHUTDOWN status and force the heartbeat version to
  INT_MAX
- n3 restarts
- n3 sends gossip shadow rounds to n1 and n2, in
  storage_service::prepare_to_join,
- n3 receives response from n1, in gossiper::handle_ack_msg, since
  _enabled = false and _in_shadow_round == false, n3 will apply the
  application state in fiber1, filber 1 finishes faster filber 2, it
  sets _in_shadow_round = false
- n3 receives response from n2, in gossiper::handle_ack_msg, since
  _enabled = false and _in_shadow_round == false, n3 will apply the
  application state in fiber2, filber 2 yields
- n3 finishes the shadow round and continues
- n3 resets gossip endpoint_state_map with
  gossiper.reset_endpoint_state_map()
- n3 resumes fiber 2, apply application state about n3 into
  endpoint_state_map, at this point endpoint_state_map contains
  information including n3 itself from n2.
- n3 calls gossiper.start_gossiping(generation_number, app_states, ...)
  with new generation number generated correctly in
  storage_service::prepare_to_join, but in
  maybe_initialize_local_state(generation_nbr), it will not set new
  generation and heartbeat if the endpoint_state_map contains itself
- n3 continues with the old generation and heartbeat learned in fiber 2
- n3 continues the gossip loop, in gossiper::run,
  hbs.update_heart_beat() the heartbeat is set to the number starting
  from 0.
- n1 and n2 will not get update from n3 because they use the same
  generation number but n1 and n2 has larger heartbeat version
- n1 and n2 will mark n3 as down even if n3 is alive.

To fix, always use the the new generation number.

Fixes: #5800
Backports: 3.0 3.1 3.2
(cherry picked from commit 62774ff882)
2020-03-27 12:50:20 +01:00
Hagit Segev
4c38534f75 release: prepare for 3.2.4 2020-03-25 10:12:29 +02:00
Gleb Natapov
a092f5d1f4 transport: pass tracing state explicitly instead of relying on it been in the client_state
Multiple requests can use the same client_state simultaneously, so it is
not safe to use it as a container for a tracing state which is per request.
Currently next request may overwrite tracing state for previous one
causing, in a best case, wrong trace to be taken or crash if overwritten
pointer is freed prematurely.

Fixes #6014

(cherry picked from commit 866c04dd64)

Message-Id: <20200324144003.GA20781@scylladb.com>
2020-03-24 16:55:46 +02:00
Piotr Sarna
723fd50712 cql: fix qualifying indexed columns for filtering
When qualifying columns to be fetched for filtering, we also check
if the target column is not used as an index - in which case there's
no need of fetching it. However, the check was incorrectly assuming
that any restriction is eligible for indexing, while it's currently
only true for EQ. The fix makes a more specific check and contains
many dynamic casts, but these will hopefully we gone once our
long planned "restrictions rewrite" is done.
This commit comes with a test.

Fixes #5708
Tests: unit(dev)

(cherry picked from commit 767ff59418)
2020-03-22 09:47:12 +01:00
Konstantin Osipov
89deac7795 locator: correctly select endpoints if RF=0
SimpleStrategy creates a list of endpoints by iterating over the set of
all configured endpoints for the given token, until we reach keyspace
replication factor.
There is a trivial coding bug when we first add at least one endpoint
to the list, and then compare list size and replication factor.
If RF=0 this never yields true.
Fix by moving the RF check before at least one endpoint is added to the
list.
Cassandra never had this bug since it uses a less fancy while()
loop.

Fixes #5962
Message-Id: <20200306193729.130266-1-kostja@scylladb.com>

(cherry picked from commit ac6f64a885)
2020-03-12 12:10:27 +02:00
Avi Kivity
3843e5233c logalloc: increase capacity of _regions vector outside reclaim lock
Reclaim consults the _regions vector, so we don't want it moving around while
allocating more capacity. For that we take the reclaim lock. However, that
can cause a false-positive OOM during startup:

1. all memory is allocated to LSA as part of priming (2baa16b371)
2. the _regions vector is resized from 64k to 128k, requiring a segment
   to be freed (plenty are free)
3. but reclaiming_lock is taken, so we cannot reclaim anything.

To fix, resize the _regions vector outside the lock.

Fixes #6003.
Message-Id: <20200311091217.1112081-1-avi@scylladb.com>

(cherry picked from commit c020b4e5e2)
2020-03-12 11:25:34 +02:00
Benny Halevy
1b3c78480c dist/redhat: scylla.spec.mustache: set _no_recompute_build_ids
By default, `/usr/lib/rpm/find-debuginfo.sh` will temper with
the binary's build-id when stripping its debug info as it is passed
the `--build-id-seed <version>.<release>` option.

To prevent that we need to set the following macros as follows:
  unset `_unique_build_ids`
  set `_no_recompute_build_ids` to 1

Fixes #5881

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 25a763a187)
2020-03-09 15:22:09 +02:00
Yaron Kaikov
48253eb183 release: prepare for 3.2.3 2020-03-04 14:18:38 +02:00
Takuya ASADA
5d60522c81 dist/debian: fix "unable to open node-exporter.service.dpkg-new" error
It seems like *.service is conflicting on install time because the file
installed twice, both debian/*.service and debian/scylla-server.install.

We don't need to use *.install, so we can just drop the line.

Fixes #5640

(cherry picked from commit 29285b28e2)
2020-03-02 11:32:30 +02:00
Benny Halevy
63e93110d1 gossiper: do_stop_gossiping: copy live endpoints vector
It can be resized asynchronously by mark_dead.

Fixes #5701

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20200203091344.229518-1-bhalevy@scylladb.com>
(cherry picked from commit f45fabab73)
2020-02-26 13:00:22 +02:00
Gleb Natapov
83105efba8 commitlog: use commitlog IO scheduling class for segment zeroing
There may be other commitlog writes waiting for zeroing to complete, so
not using proper scheduling class causes priority inversion.

Fixes #5858.

Message-Id: <20200220102939.30769-2-gleb@scylladb.com>
(cherry picked from commit 6a78cc9e31)
2020-02-26 12:51:29 +02:00
Benny Halevy
5840eb602a storage_service: drain_on_shutdown: unregister storage_proxy subscribers from local_storage_service
Match subscription done in main() and avoid cross shard access
to _lifecycle_subscribers vector.

Fixes #5385

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Acked-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20200123092817.454271-1-bhalevy@scylladb.com>
(cherry picked from commit 5b0ea4c114)
2020-02-25 16:40:12 +02:00
Avi Kivity
61738999ea Revert "streaming: Do not invalidate cache if no sstable is added in flush_streaming_mutations"
This reverts commit dbf72c72b3. It exposes a data
resurrection bug (#5838).
2020-02-24 10:04:06 +02:00
Piotr Dulikowski
0b23e7145d hh: handle counter update hints correctly
This patch fixes a bug that appears because of an incorrect interaction
between counters and hinted handoff.

When a counter is updated on the leader, it sends mutations to other
replicas that contain all counter shards from the leader. If consistency
level is achieved but some replicas are unavailable, a hint with
mutation containing counter shards is stored.

When a hint's destination node is no longer its replica, it is attempted
to be sent to all its current replicas. Previously,
storage_proxy::mutate was used for that purpose. It was incorrect
because that function treats mutations for counter tables as mutations
containing only a delta (by how much to increase/decrease the counter).
These two types of mutations have different serialization format, so in
this case a "shards" mutation is reinterpreted as "delta" mutation,
which can cause data corruption to occur.

This patch backports `storage_proxy::mutate_hint_from_scratch`
function, which bypasses special handling of counter mutations and
treats them as regular mutations - which is the correct behavior for
"shards" mutations.

Refs #5833.
Backports: 3.1, 3.2, 3.3
Tests: unit(dev)
(cherry picked from commit ec513acc49)
2020-02-19 16:51:24 +02:00
Hagit Segev
3374aa20bb release: prepare for 3.2.2 2020-02-18 15:19:57 +02:00
Avi Kivity
c4e89ea1b0 Merge "cql3: time_uuid_fcts: validate time UUID" from Benny
"
Throw an error in case we hit an invalid time UUID
rather than hitting an assert.

Fixes #5552

(Ref #5588 that was dequeued and fixed here)

Test: UUID_test, cql_query_test(debug)
"

* 'validate-time-uuid' of https://github.com/bhalevy/scylla:
  cql3: abstract_function_selector: provide assignment_testable_source_context
  test: cql_query_test: add time uuid validation tests
  cql3: time_uuid_fcts: validate timestamp arg
  cql3: make_max_timeuuid_fct: delete outdated FIXME comment
  cql3: time_uuid_fcts: validate time UUID
  test: UUID_test: add tests for time uuid
  utils: UUID: create_time assert nanos_since validity
  utils/UUID_gen: make_nanos_since
  utils: UUID: assert UUID.is_timestamp

(cherry picked from commit 3343baf159)

Conflicts:
	cql3/functions/time_uuid_fcts.hh
	tests/cql_query_test.cc
2020-02-17 20:05:38 +02:00
Piotr Sarna
26d9ce6b98 db,view: fix generating view updates for partition tombstones
The update generation path must track and apply all tombstones,
both from the existing base row (if read-before-write was needed)
and for the new row. One such path contained an error, because
it assumed that if the existing row is empty, then the update
can be simply generated from the new row. However, lack of the
existing row can also be the result of a partition/range tombstone.
If that's the case, it needs to be applied, because it's entirely
possible that this partition row also hides the new row.
Without taking the partition tombstone into account, creating
a future tombstone and inserting an out-of-order write before it
in the base table can result in ghost rows in the view table.
This patch comes with a test which was proven to fail before the
changes.

Branches 3.1,3.2,3.3
Fixes #5793

Tests: unit(dev)
Message-Id: <8d3b2abad31572668693ab585f37f4af5bb7577a.1581525398.git.sarna@scylladb.com>
(cherry picked from commit e93c54e837)
2020-02-16 18:19:28 +02:00
Avi Kivity
6d1a4e2c0b Update seastar submodule
* seastar dab9f10e76...c8668e98bd (1):
  > config: Do not allow zero rates

Fixes #5360.
2020-02-16 17:01:59 +02:00
Benny Halevy
fad143a441 repair: initialize row_level_repair: _zero_rows
Avoid following UBSAN error:
repair/row_level.cc:2141:7: runtime error: load of value 240, which is not a valid value for type 'bool'

Fixes #5531

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 474ffb6e54)
2020-02-16 16:11:17 +02:00
Rafael Ávila de Espíndola
bc07b877a5 main: Explicitly allow scylla core dumps
I have not looked into the security reason for disabling it when
a program has file capabilities.

Fixes #5560

[avi: remove extraneous semicolon]
Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200106231836.99052-1-espindola@scylladb.com>
(cherry picked from commit b80852c447)
2020-02-16 16:03:51 +02:00
Gleb Natapov
09ad011f98 lwt: fix write timeout exception reporting
CQL transport code relies on an exception's C++ type to create correct
reply, but in lwt we converted some mutation_timeout exceptions to more
generic request_timeout while forwarding them which broke the protocol.
Do not drop type information.

Fixes #5598.

Message-Id: <20200115180313.GQ9084@scylladb.com>
(cherry picked from commit 51281bc8ad)
2020-02-16 16:00:38 +02:00
Avi Kivity
b34973df4e tools: toolchain: dbuild: relax process limit in container
Docker restricts the number of processes in a container to some
limit it calculates. This limit turns out to be too low on large
machines, since we run multiple links in parallel, and each link
runs many threads.

Remove the limit by specifying --pids-limit -1. Since dbuild is
meant to provide a build environment, not a security barrier,
this is okay (the container is still restricted by host limits).

I checked that --pids-limit is supported by old versions of
docker and by podman.

Fixes #5651.
Message-Id: <20200127090807.3528561-1-avi@scylladb.com>

(cherry picked from commit 897320f6ab)
2020-02-16 15:41:18 +02:00
Pavel Solodovnikov
d65e2ac6af lwt: fix handling of nulls in parameter markers for LWT queries
This patch affects the LWT queries with IF conditions of the
following form: `IF col in :value`, i.e. if the parameter
marker is used.

When executing a prepared query with a bound value
of `(None,)` (tuple with null, example for Python driver), it is
serialized not as NULL but as "empty" value (serialization
format differs in each case).

Therefore, Scylla deserializes the parameters in the request as
empty `data_value` instances, which are, in turn, translated
to non-empty `bytes_opt` with empty byte-string value later.

Account for this case too in the CAS condition evaluation code.

Example of a problem this patch aims to fix:

Suppose we have a table `tbl` with a boolean field `test` and
INSERT a row with NULL value for the `test` column.

Then the following update query fails to apply due to the
error in IF condition evaluation code (assume `v=(null)`):
`UPDATE tbl SET test=false WHERE key=0 IF test IN :v`
returns false in `[applied]` column, but is expected to succeed.

Tests: unit(debug, dev), dtest(prepared stmt LWT tests at https://github.com/scylladb/scylla-dtest/pull/1286)

Fixes: #5710

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
Message-Id: <20200205102039.35851-1-pa.solodovnikov@scylladb.com>
(cherry picked from commit bcc4647552)
2020-02-16 15:37:44 +02:00
Asias He
dbf72c72b3 streaming: Do not invalidate cache if no sstable is added in flush_streaming_mutations
The table::flush_streaming_mutations is used in the days when streaming
data goes to memtable. After switching to the new streaming, data goes
to sstables directly in streaming, so the sstables generated in
table::flush_streaming_mutations will be empty.

It is unnecessary to invalidate the cache if no sstables are added. To
avoid unnecessary cache invalidating which pokes hole in the cache, skip
calling _cache.invalidate() if the sstables is empty.

The steps are:

- STREAM_MUTATION_DONE verb is sent when streaming is done with old or
  new streaming
- table::flush_streaming_mutations is called in the verb handler
- cache is invalidated for the streaming ranges

In summary, this patch will avoid a lot of cache invalidation for
streaming.

Backports: 3.0 3.1 3.2
Fixes: #5769
(cherry picked from commit 5e9925b9f0)
2020-02-16 15:16:37 +02:00
Botond Dénes
b542b9c89a row: append(): downgrade assert to on_internal_error()
This assert, added by 060e3f8 is supposed to make sure the invariant of
the append() is respected, in order to prevent building an invalid row.
The assert however proved to be too harsh, as it converts any bug
causing out-of-order clustering rows into cluster unavailability.
Downgrade it to on_internal_error(). This will still prevent corrupt
data from spreading in the cluster, without the unavailability caused by
the assert.

Fixes: #5786
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20200211083829.915031-1-bdenes@scylladb.com>
(cherry picked from commit 3164456108)
2020-02-16 15:12:22 +02:00
Takuya ASADA
c0e493edcc dist/debian: keep /etc/systemd .conf files on 'remove'
Since dpkg does not re-install conffiles when it removed by user,
currently we are missing dependencies.conf and sysconfdir.conf on rollback.
To prevent this, we need to stop running
'rm -rf /etc/systemd/system/scylla-server.service.d/' on 'remove'.

Fixes #5734

(cherry picked from commit 43097854a5)
2020-02-12 14:27:00 +02:00
Takuya ASADA
88718996ed scylla_post_install.sh: fix 'integer expression expected' error
awk returns float value on Debian, it causes postinst script failure
since we compare it as integer value.
Replaced with sed + bash.

Fixes #5569

(cherry picked from commit 5627888b7c)
2020-02-04 14:30:28 +02:00
Gleb Natapov
97236a2cee db/system_keyspace: use user memory limits for local.paxos table
Treat writes to local.paxos as user memory, as the number of writes is
dependent on the amount of user data written with LWT.

Fixes #5682

Message-Id: <20200130150048.GW26048@scylladb.com>
(cherry picked from commit b08679e1d3)
2020-02-02 17:37:04 +02:00
Rafael Ávila de Espíndola
6c272b48f5 types: Fix encoding of negative varint
We would sometimes produce an unnecessary extra 0xff prefix byte.

The new encoding matches what cassandra does.

This was both a efficiency and correctness issue, as using varint in a
key could produce different tokens.

Fixes #5656

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
(cherry picked from commit c89c90d07f)
2020-02-02 16:46:27 +02:00
Avi Kivity
6a8ae87efa test: make eventually() more patient
We use eventually() in tests to wait for eventually consistent data
to become consistent. However, we see spurious failures indicating
that we wait too little.

Increasing the timeout has a negative side effect in that tests that
fail will now take longer to do so. However, this negative side effect
is negligible to false-positive failures, since they throw away large
test efforts and sometimes require a person to investigate the problem,
only to conclude it is a false positive.

This patch therefore makes eventually() more patient, by a factor of
32.

Fixes #4707.
Message-Id: <20200130162745.45569-1-avi@scylladb.com>

(cherry picked from commit ec5b721db7)
2020-02-01 13:21:38 +02:00
Pekka Enberg
d24d9d037e Update seastar submodule
* seastar acd63c47...dab9f10e (1):
  > perftune.py: Use safe_load() for fix arbitrary code execution
Fixes #5630
2020-01-30 16:42:51 +02:00
Dejan Mircevski
43766bd453 config: Remove UDF from experimental_features_t
Scylla 3.2 doesn't support UDF, so do not accept UDF as a valid option
to experimental_features.

Fixes #5645.

No fix is needed on master, which does support UDF.

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>
2020-01-28 19:28:20 +02:00
Takuya ASADA
ddd8f9b1d1 dist/debian: Use tilde for release candidate builds
We need to add '~' to handle rcX version correctly on Debian variants
(merged at ae33e9f), but when we moved to relocated package we mistakenly
dropped the code, so add the code again.

Fixes #5641

(cherry picked from commit dd81fd3454)
2020-01-28 18:35:17 +02:00
Yaron Kaikov
e3e301906d release: prepare for 3.2.1 2020-01-22 17:02:42 +02:00
Piotr Sarna
2c822d4c1f db,view: fix checking for secondary index special columns
A mistake in handling legacy checks for special 'idx_token' column
resulted in not recognizing materialized views backing secondary
indexes properly. The mistake is really a typo, but with bad
consequences - instead of checking the view schema for being an index,
we asked for the base schema, which is definitely not an index of
itself.

Branches 3.1,3.2 (asap)
Fixes #5621
Fixes #4744

(cherry picked from commit 9b379e3d63)
2020-01-21 23:23:24 +02:00
Avi Kivity
04f8800b5b Update seastar submodule
* seastar 8e236efda...acd63c479 (1):
  > inet_address: Make inet_address == operator ignore scope (again)

Fixes #5225.
2020-01-21 13:44:41 +02:00
Asias He
a72a06d3b7 repair: Avoid duplicated partition_end write
Consider this:

1) Write partition_start of p1
2) Write clustering_row of p1
3) Write partition_end of p1
4) Repair is stopped due to error before writing partition_start of p2
5) Repair calls repair_row_level_stop() to tear down which calls
   wait_for_writer_done(). A duplicate partition_end is written.

To fix, track the partition_start and partition_end written, avoid
unpaired writes.

Backports: 3.1 and 3.2
Fixes: #5527
(cherry picked from commit 401854dbaf)
2020-01-21 13:38:57 +02:00
63 changed files with 969 additions and 250 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=3.2.0
VERSION=3.2.5
if test -f version
then

View File

@@ -254,6 +254,9 @@ void set_storage_service(http_context& ctx, routes& r) {
if (column_family.empty()) {
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
} else {
if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified");
}
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}

View File

@@ -244,7 +244,6 @@ batch_size_fail_threshold_in_kb: 50
# experimental_features:
# - cdc
# - lwt
# - udf
# The directory where hints files are stored if hinted handoff is enabled.
# hints_directory: /var/lib/scylla/hints

View File

@@ -266,7 +266,7 @@ bool column_condition::applies_to(const data_value* cell_value, const query_opti
return value.has_value() && is_satisfied_by(operator_type::EQ, *cell_value->type(), *column.type, *cell_value, *value);
});
} else {
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return value.has_value() == false; });
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return !value.has_value() || value->empty(); });
}
}

View File

@@ -61,6 +61,16 @@ make_now_fct() {
});
}
static int64_t get_valid_timestamp(const data_value& ts_obj) {
auto ts = value_cast<db_clock::time_point>(ts_obj);
int64_t ms = ts.time_since_epoch().count();
auto nanos_since = utils::UUID_gen::make_nanos_since(ms);
if (!utils::UUID_gen::is_valid_nanos_since(nanos_since)) {
throw exceptions::server_exception(format("{}: timestamp is out of range. Must be in milliseconds since epoch", ms));
}
return ms;
}
inline
shared_ptr<function>
make_min_timeuuid_fct() {
@@ -74,8 +84,7 @@ make_min_timeuuid_fct() {
if (ts_obj.is_null()) {
return {};
}
auto ts = value_cast<db_clock::time_point>(ts_obj);
auto uuid = utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count());
auto uuid = utils::UUID_gen::min_time_UUID(get_valid_timestamp(ts_obj));
return {timeuuid_type->decompose(uuid)};
});
}
@@ -85,7 +94,6 @@ shared_ptr<function>
make_max_timeuuid_fct() {
return make_native_scalar_function<true>("maxtimeuuid", timeuuid_type, { timestamp_type },
[] (cql_serialization_format sf, const std::vector<bytes_opt>& values) -> bytes_opt {
// FIXME: should values be a vector<optional<bytes>>?
auto& bb = values[0];
if (!bb) {
return {};
@@ -94,12 +102,22 @@ make_max_timeuuid_fct() {
if (ts_obj.is_null()) {
return {};
}
auto ts = value_cast<db_clock::time_point>(ts_obj);
auto uuid = utils::UUID_gen::max_time_UUID(ts.time_since_epoch().count());
auto uuid = utils::UUID_gen::max_time_UUID(get_valid_timestamp(ts_obj));
return {timeuuid_type->decompose(uuid)};
});
}
inline utils::UUID get_valid_timeuuid(bytes raw) {
if (!utils::UUID_gen::is_valid_UUID(raw)) {
throw exceptions::server_exception(format("invalid timeuuid: size={}", raw.size()));
}
auto uuid = utils::UUID_gen::get_UUID(raw);
if (!uuid.is_timestamp()) {
throw exceptions::server_exception(format("{}: Not a timeuuid: version={}", uuid, uuid.version()));
}
return uuid;
}
inline
shared_ptr<function>
make_date_of_fct() {
@@ -110,7 +128,7 @@ make_date_of_fct() {
if (!bb) {
return {};
}
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
return {timestamp_type->decompose(ts)};
});
}
@@ -125,7 +143,7 @@ make_unix_timestamp_of_fct() {
if (!bb) {
return {};
}
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
});
}
@@ -176,7 +194,7 @@ make_timeuuidtodate_fct() {
if (!bb) {
return {};
}
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
auto to_simple_date = get_castas_fctn(simple_date_type, timestamp_type);
return {simple_date_type->decompose(to_simple_date(ts))};
});
@@ -211,7 +229,7 @@ make_timeuuidtotimestamp_fct() {
if (!bb) {
return {};
}
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
return {timestamp_type->decompose(ts)};
});
}
@@ -245,10 +263,14 @@ make_timeuuidtounixtimestamp_fct() {
if (!bb) {
return {};
}
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
});
}
inline bytes time_point_to_long(const data_value& v) {
return data_value(get_valid_timestamp(v)).serialize();
}
inline
shared_ptr<function>
make_timestamptounixtimestamp_fct() {
@@ -263,7 +285,7 @@ make_timestamptounixtimestamp_fct() {
if (ts_obj.is_null()) {
return {};
}
return {long_type->decompose(ts_obj)};
return time_point_to_long(ts_obj);
});
}
@@ -282,7 +304,7 @@ make_datetounixtimestamp_fct() {
return {};
}
auto from_simple_date = get_castas_fctn(timestamp_type, simple_date_type);
return {long_type->decompose(from_simple_date(simple_date_obj))};
return time_point_to_long(from_simple_date(simple_date_obj));
});
}

View File

@@ -390,28 +390,45 @@ std::vector<const column_definition*> statement_restrictions::get_column_defs_fo
if (need_filtering()) {
auto& sim = db.find_column_family(_schema).get_index_manager();
auto [opt_idx, _] = find_idx(sim);
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef) {
return opt_idx && opt_idx->depends_on(*cdef);
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef, ::shared_ptr<single_column_restriction> restr) {
return opt_idx && restr && restr->is_supported_by(*opt_idx);
};
auto single_pk_restrs = dynamic_pointer_cast<single_column_partition_key_restrictions>(_partition_key_restrictions);
if (_partition_key_restrictions->needs_filtering(*_schema)) {
for (auto&& cdef : _partition_key_restrictions->get_column_defs()) {
if (!column_uses_indexing(cdef)) {
::shared_ptr<single_column_restriction> restr;
if (single_pk_restrs) {
auto it = single_pk_restrs->restrictions().find(cdef);
if (it != single_pk_restrs->restrictions().end()) {
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
}
}
if (!column_uses_indexing(cdef, restr)) {
column_defs_for_filtering.emplace_back(cdef);
}
}
}
auto single_ck_restrs = dynamic_pointer_cast<single_column_clustering_key_restrictions>(_clustering_columns_restrictions);
const bool pk_has_unrestricted_components = _partition_key_restrictions->has_unrestricted_components(*_schema);
if (pk_has_unrestricted_components || _clustering_columns_restrictions->needs_filtering(*_schema)) {
column_id first_filtering_id = pk_has_unrestricted_components ? 0 : _schema->clustering_key_columns().begin()->id +
_clustering_columns_restrictions->num_prefix_columns_that_need_not_be_filtered();
for (auto&& cdef : _clustering_columns_restrictions->get_column_defs()) {
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef)) {
::shared_ptr<single_column_restriction> restr;
if (single_pk_restrs) {
auto it = single_ck_restrs->restrictions().find(cdef);
if (it != single_ck_restrs->restrictions().end()) {
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
}
}
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef, restr)) {
column_defs_for_filtering.emplace_back(cdef);
}
}
}
for (auto&& cdef : _nonprimary_key_restrictions->get_column_defs()) {
if (!column_uses_indexing(cdef)) {
auto restr = dynamic_pointer_cast<single_column_restriction>(_nonprimary_key_restrictions->get_restriction(*cdef));
if (!column_uses_indexing(cdef, restr)) {
column_defs_for_filtering.emplace_back(cdef);
}
}

View File

@@ -92,6 +92,14 @@ public:
: abstract_function_selector(fun, std::move(arg_selectors))
, _tfun(dynamic_pointer_cast<T>(fun)) {
}
const functions::function_name& name() const {
return _tfun->name();
}
virtual sstring assignment_testable_source_context() const override {
return format("{}", this->name());
}
};
}

View File

@@ -79,11 +79,6 @@ public:
dynamic_pointer_cast<functions::aggregate_function>(func), std::move(arg_selectors))
, _aggregate(fun()->new_aggregate()) {
}
virtual sstring assignment_testable_source_context() const override {
// FIXME:
return "FIXME";
}
};
}

View File

@@ -82,12 +82,6 @@ public:
: abstract_function_selector_for<functions::scalar_function>(
dynamic_pointer_cast<functions::scalar_function>(std::move(fun)), std::move(arg_selectors)) {
}
virtual sstring assignment_testable_source_context() const override {
// FIXME:
return "FIXME";
}
};
}

View File

@@ -1316,7 +1316,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
v.emplace_back(iovec{ buf.get_write(), s});
m += s;
}
return f.dma_write(max_size - rem, std::move(v)).then([&rem](size_t s) {
return f.dma_write(max_size - rem, std::move(v), service::get_local_commitlog_priority()).then([&rem](size_t s) {
rem -= s;
return stop_iteration::no;
});

View File

@@ -691,8 +691,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt' and 'cdc'). Can be repeated.")
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable")
, prometheus_address(this, "prometheus_address", value_status::Used, "0.0.0.0", "Prometheus listening address")
@@ -855,7 +856,7 @@ const db::extensions& db::config::extensions() const {
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
// We decided against using the construct-on-first-use idiom here:
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
return {{"lwt", LWT}, {"cdc", CDC}};
}
template struct utils::config_file::named_value<seastar::log_level>;

View File

@@ -78,7 +78,7 @@ namespace db {
/// Enumeration of all valid values for the `experimental` config entry.
struct experimental_features_t {
enum feature { LWT, UDF, CDC };
enum feature { LWT, CDC };
static std::unordered_map<sstring, feature> map(); // See enum_option.
};
@@ -269,6 +269,7 @@ public:
named_value<uint32_t> shutdown_announce_in_ms;
named_value<bool> developer_mode;
named_value<int32_t> skip_wait_for_gossip_to_settle;
named_value<int32_t> force_gossip_generation;
named_value<bool> experimental;
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
named_value<size_t> lsa_reclamation_step;

View File

@@ -405,11 +405,8 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no);
} else {
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
// unavailable exception.
auto timeout = db::timeout_clock::now() + 1h;
//FIXME: Add required frozen_mutation overloads
return _proxy.mutate({m.fm.unfreeze(m.s)}, consistency_level::ALL, timeout, nullptr, empty_service_permit());
return _proxy.mutate_hint_from_scratch(std::move(m));
}
});
}

View File

@@ -1748,7 +1748,7 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) {
}
static bool maybe_write_in_user_memory(schema_ptr s, database& db) {
return (s.get() == batchlog().get())
return (s.get() == batchlog().get()) || (s.get() == paxos().get())
|| s == v3::scylla_views_builds_in_progress();
}

View File

@@ -307,7 +307,7 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
if (!cdef.is_computed()) {
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
// once "computed_columns feature" is supported by every node
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_base)) {
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
}
computed_value = token_column_computation().compute_value(*_base, base_key, update);
@@ -879,7 +879,11 @@ future<stop_iteration> view_update_builder::on_results() {
if (_update && !_update->is_end_of_partition()) {
if (_update->is_clustering_row()) {
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
generate_update(std::move(*_update).as_clustering_row(), { });
auto existing_tombstone = _existing_tombstone_tracker.current_tombstone();
auto existing = existing_tombstone
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
: std::nullopt;
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
}
return advance_updates();
}

View File

@@ -125,7 +125,7 @@ if [ -z "$TARGET" ]; then
fi
RELOC_PKG_FULLPATH=$(readlink -f $RELOC_PKG)
RELOC_PKG_BASENAME=$(basename $RELOC_PKG)
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE)
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE | sed 's/\.rc/~rc/')
SCYLLA_RELEASE=$(cat SCYLLA-RELEASE-FILE)
ln -fv $RELOC_PKG_FULLPATH ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz

View File

@@ -4,7 +4,6 @@ etc/security/limits.d/scylla.conf
etc/scylla.d/*.conf
opt/scylladb/share/doc/scylla/*
opt/scylladb/share/doc/scylla/licenses/
usr/lib/systemd/system/*.service
usr/lib/systemd/system/*.timer
usr/lib/systemd/system/*.slice
usr/bin/scylla

View File

@@ -6,8 +6,12 @@ case "$1" in
purge|remove)
rm -rf /etc/systemd/system/scylla-housekeeping-daily.service.d/
rm -rf /etc/systemd/system/scylla-housekeeping-restart.service.d/
rm -rf /etc/systemd/system/scylla-server.service.d/
rm -rf /etc/systemd/system/scylla-helper.slice.d/
# We need to keep dependencies.conf and sysconfdir.conf on 'remove',
# otherwise it will be missing after rollback.
if [ "$1" = "purge" ]; then
rm -rf /etc/systemd/system/scylla-server.service.d/
fi
;;
esac

View File

@@ -17,6 +17,10 @@ Obsoletes: scylla-server < 1.1
%undefine _find_debuginfo_dwz_opts
# Prevent find-debuginfo.sh from tempering with scylla's build-id (#5881)
%undefine _unique_build_ids
%global _no_recompute_build_ids 1
%description
Scylla is a highly scalable, eventually consistent, distributed,
partitioned row DB.

View File

@@ -76,6 +76,9 @@ Scylla with issue #4139 fixed)
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
Scylla with issue #4363 fixed)
bit 5: CorrectUDTsInCollections (if set, indicates that the sstable was generated
by Scylla with issue #6130 fixed)
## extension_attributes subcomponent
extension_attributes = extension_attribute_count extension_attribute*

View File

@@ -98,6 +98,13 @@ public:
sstring get_message() const { return what(); }
};
class server_exception : public cassandra_exception {
public:
server_exception(sstring msg) noexcept
: exceptions::cassandra_exception{exceptions::exception_code::SERVER_ERROR, std::move(msg)}
{ }
};
class protocol_exception : public cassandra_exception {
public:
protocol_exception(sstring msg) noexcept

View File

@@ -1622,11 +1622,15 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
// message on all cpus and forard them to cpu0 to process.
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
g.init_messaging_service_handler(do_bind);
}).then([this, generation_nbr, preload_local_states] {
}).then([this, generation_nbr, preload_local_states] () mutable {
build_seeds_list();
/* initialize the heartbeat state for this localEndpoint */
maybe_initialize_local_state(generation_nbr);
if (_cfg.force_gossip_generation() > 0) {
generation_nbr = _cfg.force_gossip_generation();
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
local_state.mark_alive();
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
}
@@ -1831,7 +1835,8 @@ future<> gossiper::do_stop_gossiping() {
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
logger.info("Announcing shutdown");
add_local_application_state(application_state::STATUS, _value_factory.shutdown(true)).get();
for (inet_address addr : _live_endpoints) {
auto live_endpoints = _live_endpoints;
for (inet_address addr : live_endpoints) {
msg_addr id = get_msg_addr(addr);
logger.trace("Sending a GossipShutdown to {}", id);
ms().send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {

View File

@@ -53,13 +53,13 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
endpoints.reserve(replicas);
for (auto& token : tm.ring_range(t)) {
if (endpoints.size() == replicas) {
break;
}
auto ep = tm.get_endpoint(token);
assert(ep);
endpoints.push_back(*ep);
if (endpoints.size() == replicas) {
break;
}
}
return std::move(endpoints.get_vector());

10
main.cc
View File

@@ -54,6 +54,7 @@
#include <seastar/core/file.hh>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/prctl.h>
#include "disk-error-handler.hh"
#include "tracing/tracing.hh"
#include "tracing/tracing_backend_registry.hh"
@@ -464,6 +465,15 @@ inline auto defer_with_log_on_error(Func&& func) {
}
int main(int ac, char** av) {
// Allow core dumps. The would be disabled by default if
// CAP_SYS_NICE was added to the binary, as is suggested by the
// epoll backend.
int r = prctl(PR_SET_DUMPABLE, 1, 0, 0, 0);
if (r) {
std::cerr << "Could not make scylla dumpable\n";
exit(1);
}
int return_value = 0;
try {
// early check to avoid triggering

View File

@@ -39,6 +39,9 @@
#include <seastar/core/execution_stage.hh>
#include "types/map.hh"
#include "compaction_garbage_collector.hh"
#include "utils/exceptions.hh"
logging::logger mplog("mutation_partition");
template<bool reversed>
struct reversal_traits;
@@ -1236,7 +1239,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
void
row::append_cell(column_id id, atomic_cell_or_collection value) {
if (_type == storage_type::vector && id < max_vector_size) {
assert(_storage.vector.v.size() <= id);
if (_storage.vector.v.size() > id) {
on_internal_error(mplog, format("Attempted to append cell#{} to row already having {} cells", id, _storage.vector.v.size()));
}
_storage.vector.v.resize(id);
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
_storage.vector.present.set(id);

View File

@@ -177,6 +177,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
return handle_end_of_stream();
}
});
}).handle_exception([this] (std::exception_ptr ep) {
for (auto& q : _queue_reader_handles) {
if (q) {
q->abort(ep);
}
}
return make_exception_future<>(std::move(ep));
});
}

View File

@@ -444,10 +444,14 @@ class repair_writer {
uint64_t _estimated_partitions;
size_t _nr_peer_nodes;
// Needs more than one for repair master
std::vector<std::optional<future<uint64_t>>> _writer_done;
std::vector<std::optional<future<>>> _writer_done;
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
// Current partition written to disk
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
// Is current partition still open. A partition is opened when a
// partition_start is written and is closed when a partition_end is
// written.
std::vector<bool> _partition_opened;
public:
repair_writer(
schema_ptr schema,
@@ -462,10 +466,13 @@ public:
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf, unsigned node_idx) {
_current_dk_written_to_sstable[node_idx] = dk;
if (mf.is_partition_start()) {
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] {
_partition_opened[node_idx] = true;
});
} else {
auto start = mutation_fragment(partition_start(dk->dk, tombstone()));
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable {
_partition_opened[node_idx] = true;
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
});
}
@@ -475,6 +482,7 @@ public:
_writer_done.resize(_nr_peer_nodes);
_mq.resize(_nr_peer_nodes);
_current_dk_written_to_sstable.resize(_nr_peer_nodes);
_partition_opened.resize(_nr_peer_nodes, false);
}
void create_writer(unsigned node_idx) {
@@ -516,7 +524,24 @@ public:
return consumer(std::move(reader));
});
},
t.stream_in_progress());
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
_schema->ks_name(), _schema->cf_name(), partitions);
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
_mq[node_idx]->abort(ep);
return make_exception_future<>(std::move(ep));
});
}
future<> write_partition_end(unsigned node_idx) {
if (_partition_opened[node_idx]) {
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] {
_partition_opened[node_idx] = false;
});
}
return make_ready_future<>();
}
future<> do_write(unsigned node_idx, lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
@@ -524,7 +549,7 @@ public:
if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) {
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
} else {
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this,
return write_partition_end(node_idx).then([this,
node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable {
return write_start_and_mf(std::move(dk), std::move(mf), node_idx);
});
@@ -534,21 +559,33 @@ public:
}
}
future<> write_end_of_stream(unsigned node_idx) {
if (_mq[node_idx]) {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end(node_idx).then([this, node_idx] () mutable {
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
});
} else {
return make_ready_future<>();
}
}
future<> do_wait_for_writer_done(unsigned node_idx) {
if (_writer_done[node_idx]) {
return std::move(*(_writer_done[node_idx]));
} else {
return make_ready_future<>();
}
}
future<> wait_for_writer_done() {
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
if (_writer_done[node_idx] && _mq[node_idx]) {
// Partition_end is never sent on wire, so we have to write one ourselves.
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] () mutable {
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
rlogger.debug("Managed to write partitions={} to sstable", partitions);
return make_ready_future<>();
});
});
});
}
return make_ready_future<>();
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
}).handle_exception([this] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
}
};
@@ -2174,7 +2211,7 @@ class row_level_repair {
// If the total size of the `_row_buf` on either of the nodes is zero,
// we set this flag, which is an indication that rows are not synced.
bool _zero_rows;
bool _zero_rows = false;
// Sum of estimated_partitions on all peers
uint64_t _estimated_partitions = 0;

View File

@@ -288,10 +288,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
+ column_offset(column_kind::regular_column),
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
std::sort(_raw._columns.begin(),
std::stable_sort(_raw._columns.begin(),
_raw._columns.begin() + column_offset(column_kind::clustering_key),
[] (auto x, auto y) { return x.id < y.id; });
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
_raw._columns.begin() + column_offset(column_kind::static_column),
[] (auto x, auto y) { return x.id < y.id; });

View File

@@ -58,7 +58,8 @@ EOS
# For systems with not a lot of memory, override default reservations for the slices
# seastar has a minimum reservation of 1.5GB that kicks in, and 21GB * 0.07 = 1.5GB.
# So for anything smaller than that we will not use percentages in the helper slice
MEMTOTAL_BYTES=$(cat /proc/meminfo | grep MemTotal | awk '{print $2 * 1024}')
MEMTOTAL=$(cat /proc/meminfo |grep -e "^MemTotal:"|sed -s 's/^MemTotal:\s*\([0-9]*\) kB$/\1/')
MEMTOTAL_BYTES=$(($MEMTOTAL * 1024))
if [ $MEMTOTAL_BYTES -lt 23008753371 ]; then
mkdir -p /etc/systemd/system/scylla-helper.slice.d/
cat << EOS > /etc/systemd/system/scylla-helper.slice.d/memory.conf

Submodule seastar updated: 8e236efda9...c8668e98bd

View File

@@ -38,7 +38,12 @@ private:
public:
query_state(client_state& client_state, service_permit permit)
: _client_state(client_state)
, _trace_state_ptr(_client_state.get_trace_state())
, _trace_state_ptr(tracing::trace_state_ptr())
, _permit(std::move(permit))
{ }
query_state(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
: _client_state(client_state)
, _trace_state_ptr(std::move(trace_state))
, _permit(std::move(permit))
{ }

View File

@@ -2183,6 +2183,14 @@ future<> storage_proxy::send_to_endpoint(
allow_hints);
}
future<> storage_proxy::mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s) {
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
// unavailable exception.
const auto timeout = db::timeout_clock::now() + 1h;
std::array<mutation, 1> ms{fm_a_s.fm.unfreeze(fm_a_s.s)};
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, empty_service_permit(), timeout);
}
/**
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
* is not available.
@@ -3935,7 +3943,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
return make_ready_future<storage_proxy::coordinator_query_result>(f.get0());
} catch (request_timeout_exception& ex) {
_stats.cas_read_timeouts.mark();
return make_exception_future<storage_proxy::coordinator_query_result>(std::move(ex));
return make_exception_future<storage_proxy::coordinator_query_result>(std::current_exception());
} catch (exceptions::unavailable_exception& ex) {
_stats.cas_read_unavailables.mark();
return make_exception_future<storage_proxy::coordinator_query_result>(std::move(ex));
@@ -4062,7 +4070,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
return make_ready_future<bool>(f.get0());
} catch (request_timeout_exception& ex) {
_stats.cas_write_timeouts.mark();
return make_exception_future<bool>(std::move(ex));
return make_exception_future<bool>(std::current_exception());
} catch (exceptions::unavailable_exception& ex) {
_stats.cas_write_unavailables.mark();
return make_exception_future<bool>(std::move(ex));

View File

@@ -459,6 +459,8 @@ public:
*/
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit);
future<> mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s);
// Send a mutation to one specific remote target.
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
// hinted handoff support, and just one target. See also

View File

@@ -1440,7 +1440,8 @@ future<> storage_service::drain_on_shutdown() {
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
slogger.info("Drain on shutdown: system distributed keyspace stopped");
get_storage_proxy().invoke_on_all([&ss] (storage_proxy& local_proxy) mutable {
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
auto& ss = service::get_local_storage_service();
ss.unregister_subscriber(&local_proxy);
return local_proxy.drain_on_shutdown();
}).get();

View File

@@ -72,47 +72,8 @@ private:
static std::vector<column_info> build(
const schema& s,
const utils::chunked_vector<serialization_header::column_desc>& src,
bool is_static) {
std::vector<column_info> cols;
if (s.is_dense()) {
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
cols.push_back(column_info{
&col.name(),
col.type,
col.id,
col.type->value_length_if_fixed(),
col.is_multi_cell(),
col.is_counter(),
false
});
} else {
cols.reserve(src.size());
for (auto&& desc : src) {
const bytes& type_name = desc.type_name.value;
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
const column_definition* def = s.get_column_definition(desc.name.value);
std::optional<column_id> id;
bool schema_mismatch = false;
if (def) {
id = def->id;
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
def->is_counter() != type->is_counter() ||
!def->type->is_value_compatible_with(*type);
}
cols.push_back(column_info{
&desc.name.value,
type,
id,
type->value_length_if_fixed(),
type->is_multi_cell(),
type->is_counter(),
schema_mismatch
});
}
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
}
return cols;
}
const sstable_enabled_features& features,
bool is_static);
utils::UUID schema_uuid;
std::vector<column_info> regular_schema_columns_from_sstable;
@@ -125,10 +86,10 @@ private:
state(state&&) = default;
state& operator=(state&&) = default;
state(const schema& s, const serialization_header& header)
state(const schema& s, const serialization_header& header, const sstable_enabled_features& features)
: schema_uuid(s.version())
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, false))
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, true))
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, features, false))
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, features, true))
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
{}
};
@@ -136,9 +97,10 @@ private:
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
public:
column_translation get_for_schema(const schema& s, const serialization_header& header) {
column_translation get_for_schema(
const schema& s, const serialization_header& header, const sstable_enabled_features& features) {
if (s.version() != _state->schema_uuid) {
_state = make_lw_shared(state(s, header));
_state = make_lw_shared(state(s, header, features));
}
return *this;
}

View File

@@ -38,6 +38,8 @@
*/
#include "mp_row_consumer.hh"
#include "column_translation.hh"
#include "concrete_types.hh"
namespace sstables {
@@ -79,4 +81,86 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
return ccb.build(timestamp);
}
// See #6130.
static data_type freeze_types_in_collections(data_type t) {
return ::visit(*t, make_visitor(
[] (const map_type_impl& typ) -> data_type {
return map_type_impl::get_instance(
freeze_types_in_collections(typ.get_keys_type()->freeze()),
freeze_types_in_collections(typ.get_values_type()->freeze()),
typ.is_multi_cell());
},
[] (const set_type_impl& typ) -> data_type {
return set_type_impl::get_instance(
freeze_types_in_collections(typ.get_elements_type()->freeze()),
typ.is_multi_cell());
},
[] (const list_type_impl& typ) -> data_type {
return list_type_impl::get_instance(
freeze_types_in_collections(typ.get_elements_type()->freeze()),
typ.is_multi_cell());
},
[&] (const abstract_type& typ) -> data_type {
return std::move(t);
}
));
}
/* If this function returns false, the caller cannot assume that the SSTable comes from Scylla.
* It might, if for some reason a table was created using Scylla that didn't contain any feature bit,
* but that should never happen. */
static bool is_certainly_scylla_sstable(const sstable_enabled_features& features) {
return features.enabled_features;
}
std::vector<column_translation::column_info> column_translation::state::build(
const schema& s,
const utils::chunked_vector<serialization_header::column_desc>& src,
const sstable_enabled_features& features,
bool is_static) {
std::vector<column_info> cols;
if (s.is_dense()) {
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
cols.push_back(column_info{
&col.name(),
col.type,
col.id,
col.type->value_length_if_fixed(),
col.is_multi_cell(),
col.is_counter(),
false
});
} else {
cols.reserve(src.size());
for (auto&& desc : src) {
const bytes& type_name = desc.type_name.value;
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
if (!features.is_enabled(CorrectUDTsInCollections) && is_certainly_scylla_sstable(features)) {
// See #6130.
type = freeze_types_in_collections(std::move(type));
}
const column_definition* def = s.get_column_definition(desc.name.value);
std::optional<column_id> id;
bool schema_mismatch = false;
if (def) {
id = def->id;
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
def->is_counter() != type->is_counter() ||
!def->type->is_value_compatible_with(*type);
}
cols.push_back(column_info{
&desc.name.value,
type,
id,
type->value_length_if_fixed(),
type->is_multi_cell(),
type->is_counter(),
schema_mismatch
});
}
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
}
return cols;
}
}

View File

@@ -1344,7 +1344,7 @@ public:
, _consumer(consumer)
, _sst(sst)
, _header(sst->get_serialization_header())
, _column_translation(sst->get_column_translation(s, _header))
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
{
setup_columns(_regular_row, _column_translation.regular_columns());

View File

@@ -780,8 +780,9 @@ public:
const serialization_header& get_serialization_header() const {
return get_mutable_serialization_header(*_components);
}
column_translation get_column_translation(const schema& s, const serialization_header& h) {
return _column_translation.get_for_schema(s, h);
column_translation get_column_translation(
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
return _column_translation.get_for_schema(s, h, f);
}
const std::vector<unsigned>& get_shards_for_this_sstable() const {
return _shards;

View File

@@ -459,7 +459,8 @@ enum sstable_feature : uint8_t {
ShadowableTombstones = 2, // See #3885
CorrectStaticCompact = 3, // See #4139
CorrectEmptyCounters = 4, // See #4363
End = 5,
CorrectUDTsInCollections = 5, // See #6130
End = 6,
};
// Scylla-specific features enabled for a particular sstable.

View File

@@ -77,3 +77,45 @@ BOOST_AUTO_TEST_CASE(test_make_random_uuid) {
std::sort(uuids.begin(), uuids.end());
BOOST_CHECK(std::unique(uuids.begin(), uuids.end()) == uuids.end());
}
BOOST_AUTO_TEST_CASE(test_get_time_uuid) {
using namespace std::chrono;
auto uuid = utils::UUID_gen::get_time_UUID();
BOOST_CHECK(uuid.is_timestamp());
auto tp = system_clock::now();
uuid = utils::UUID_gen::get_time_UUID(tp);
BOOST_CHECK(uuid.is_timestamp());
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
uuid = utils::UUID_gen::get_time_UUID(millis);
BOOST_CHECK(uuid.is_timestamp());
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
BOOST_CHECK(unix_timestamp == millis);
}
BOOST_AUTO_TEST_CASE(test_min_time_uuid) {
using namespace std::chrono;
auto tp = system_clock::now();
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
auto uuid = utils::UUID_gen::min_time_UUID(millis);
BOOST_CHECK(uuid.is_timestamp());
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
BOOST_CHECK(unix_timestamp == millis);
}
BOOST_AUTO_TEST_CASE(test_max_time_uuid) {
using namespace std::chrono;
auto tp = system_clock::now();
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
auto uuid = utils::UUID_gen::max_time_UUID(millis);
BOOST_CHECK(uuid.is_timestamp());
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
BOOST_CHECK(unix_timestamp == millis);
}

View File

@@ -933,7 +933,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -943,17 +942,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -963,7 +951,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -976,7 +963,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
});
return make_ready_future();
}
@@ -986,7 +972,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
cfg.read_from_yaml("experimental: true", throw_on_error);
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -995,6 +980,5 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
cfg.read_from_yaml("experimental: false", throw_on_error);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}

View File

@@ -4263,3 +4263,272 @@ SEASTAR_TEST_CASE(test_rf_expand) {
});
});
}
// Test that tombstones with future timestamps work correctly
// when a write with lower timestamp arrives - in such case,
// if the base row is covered by such a tombstone, a view update
// needs to take it into account. Refs #5793
SEASTAR_TEST_CASE(test_views_with_future_tombstones) {
return do_with_cql_env_thread([] (auto& e) {
cquery_nofail(e, "CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY (a,b,c));");
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT * FROM t"
" WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (b,a,c);");
// Partition tombstone
cquery_nofail(e, "delete from t using timestamp 10 where a=1;");
auto msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "insert into t (a,b,c,d,e) values (1,2,3,4,5) using timestamp 8;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
msg = cquery_nofail(e, "select * from tv;");
assert_that(msg).is_rows().with_size(0);
// Range tombstone
cquery_nofail(e, "delete from t using timestamp 16 where a=2 and b > 1 and b < 4;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "insert into t (a,b,c,d,e) values (2,3,4,5,6) using timestamp 12;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
msg = cquery_nofail(e, "select * from tv;");
assert_that(msg).is_rows().with_size(0);
// Row tombstone
cquery_nofail(e, "delete from t using timestamp 24 where a=3 and b=4 and c=5;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "insert into t (a,b,c,d,e) values (3,4,5,6,7) using timestamp 18;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
msg = cquery_nofail(e, "select * from tv;");
assert_that(msg).is_rows().with_size(0);
});
}
shared_ptr<cql_transport::messages::result_message> cql_func_require_nofail(
cql_test_env& env,
const seastar::sstring& fct,
const seastar::sstring& inp,
std::unique_ptr<cql3::query_options>&& qo = nullptr,
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
auto res = shared_ptr<cql_transport::messages::result_message>(nullptr);
auto query = format("SELECT {}({}) FROM t;", fct, inp);
try {
if (qo) {
res = env.execute_cql(query, std::move(qo)).get0();
} else {
res = env.execute_cql(query).get0();
}
BOOST_TEST_MESSAGE(format("Query '{}' succeeded as expected", query));
} catch (...) {
BOOST_ERROR(format("query '{}' failed unexpectedly with error: {}\n{}:{}: originally from here",
query, std::current_exception(),
loc.file_name(), loc.line()));
}
return res;
}
// FIXME: should be in cql_assertions, but we don't want to call boost from cql_assertions.hh
template <typename Exception>
void cql_func_require_throw(
cql_test_env& env,
const seastar::sstring& fct,
const seastar::sstring& inp,
std::unique_ptr<cql3::query_options>&& qo = nullptr,
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
auto query = format("SELECT {}({}) FROM t;", fct, inp);
try {
if (qo) {
env.execute_cql(query, std::move(qo)).get();
} else {
env.execute_cql(query).get();
}
BOOST_ERROR(format("query '{}' succeeded unexpectedly\n{}:{}: originally from here", query,
loc.file_name(), loc.line()));
} catch (Exception& e) {
BOOST_TEST_MESSAGE(format("Query '{}' failed as expected with error: {}", query, e));
} catch (...) {
BOOST_ERROR(format("query '{}' failed with unexpected error: {}\n{}:{}: originally from here",
query, std::current_exception(),
loc.file_name(), loc.line()));
}
}
static void create_time_uuid_fcts_schema(cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE t (id int primary key, t timestamp, l bigint, f float, u timeuuid, d date)");
cquery_nofail(e, "INSERT INTO t (id, t, l, f, u, d) VALUES "
"(1, 1579072460606, 1579072460606000, 1579072460606, a66525e0-3766-11ea-8080-808080808080, '2020-01-13')");
cquery_nofail(e, "SELECT * FROM t;");
}
SEASTAR_TEST_CASE(test_basic_time_uuid_fcts) {
return do_with_cql_env_thread([] (auto& e) {
create_time_uuid_fcts_schema(e);
cql_func_require_nofail(e, "currenttime", "");
cql_func_require_nofail(e, "currentdate", "");
cql_func_require_nofail(e, "now", "");
cql_func_require_nofail(e, "currenttimeuuid", "");
cql_func_require_nofail(e, "currenttimestamp", "");
});
}
SEASTAR_TEST_CASE(test_time_uuid_fcts_input_validation) {
return do_with_cql_env_thread([] (auto& e) {
create_time_uuid_fcts_schema(e);
// test timestamp arg
auto require_timestamp = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "t");
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "u");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "now()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimeuuid()");
cql_func_require_nofail(e, fct, "currenttimestamp()");
};
require_timestamp("mintimeuuid");
require_timestamp("maxtimeuuid");
// test timeuuid arg
auto require_timeuuid = [&e] (const sstring& fct) {
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
};
require_timeuuid("dateof");
require_timeuuid("unixtimestampof");
// test timeuuid or date arg
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_nofail(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_nofail(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
};
require_timeuuid_or_date("totimestamp");
// test timestamp or timeuuid arg
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "t");
cql_func_require_throw<std::exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_nofail(e, fct, "currenttimestamp()");
};
require_timestamp_or_timeuuid("todate");
// test timestamp, timeuuid, or date arg
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "t");
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_nofail(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_nofail(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_nofail(e, fct, "currenttimestamp()");
};
require_timestamp_timeuuid_or_date("tounixtimestamp");
});
}
SEASTAR_TEST_CASE(test_time_uuid_fcts_result) {
return do_with_cql_env_thread([] (auto& e) {
create_time_uuid_fcts_schema(e);
// test timestamp arg
auto require_timestamp = [&e] (const sstring& fct) {
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "mintimeuuid(t)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "maxtimeuuid(t)");
cql_func_require_nofail(e, fct, "dateof(u)");
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
cql_func_require_nofail(e, fct, "totimestamp(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
};
require_timestamp("mintimeuuid");
require_timestamp("maxtimeuuid");
// test timeuuid arg
auto require_timeuuid = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
};
require_timeuuid("dateof");
require_timeuuid("unixtimestampof");
// test timeuuid or date arg
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
cql_func_require_nofail(e, fct, "todate(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
};
require_timeuuid_or_date("totimestamp");
// test timestamp or timeuuid arg
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
};
require_timestamp_or_timeuuid("todate");
// test timestamp, timeuuid, or date arg
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
cql_func_require_nofail(e, fct, "dateof(u)");
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
cql_func_require_nofail(e, fct, "totimestamp(u)");
cql_func_require_nofail(e, fct, "todate(u)");
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
};
require_timestamp_timeuuid_or_date("tounixtimestamp");
});
}

View File

@@ -25,7 +25,7 @@
#include <seastar/util/noncopyable_function.hh>
inline
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 17) {
size_t attempts = 0;
while (true) {
try {
@@ -43,7 +43,7 @@ void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
inline
bool eventually_true(noncopyable_function<bool ()> f) {
const unsigned max_attempts = 10;
const unsigned max_attempts = 15;
unsigned attempts = 0;
while (true) {
if (f()) {

View File

@@ -118,6 +118,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
});
}
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
auto muts = gen(partition_nr);
schema_ptr s = gen.schema();
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
int mf_produced = 0;
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
if (mf_produced++ > 800) {
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
} else {
return source_reader(db::no_timeout);
}
};
auto& partitioner = dht::global_partitioner();
try {
distribute_reader_and_consume_on_shards(s, partitioner,
make_generating_reader(s, std::move(get_next_mutation_fragment)),
[&partitioner, error] (flat_mutation_reader reader) mutable {
if (error) {
return make_exception_future<>(std::runtime_error("Failed to write"));
}
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
if (mf_opt) {
if (mf_opt->is_partition_start()) {
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
}
return make_ready_future<stop_iteration>(stop_iteration::no);
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}
).get0();
} catch (...) {
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
}
};
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
});
}
namespace {
class bucket_writer {

View File

@@ -586,7 +586,6 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
auto db_cfg_ptr = make_shared<db::config>();
auto& db_cfg = *db_cfg_ptr;
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
if (regenerate) {
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
} else {

View File

@@ -5262,3 +5262,131 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
test_sstable_log_too_many_rows_f(random, (random + 1), false);
test_sstable_log_too_many_rows_f((random + 1), random, true);
}
// The following test runs on tests/sstables/3.x/uncompressed/legacy_udt_in_collection
// It was created using Scylla 3.0.x using the following CQL statements:
//
// CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
// CREATE TYPE ks.ut (a int, b int);
// CREATE TABLE ks.t ( pk int PRIMARY KEY,
// m map<int, frozen<ut>>,
// fm frozen<map<int, frozen<ut>>>,
// mm map<int, frozen<map<int, frozen<ut>>>>,
// fmm frozen<map<int, frozen<map<int, frozen<ut>>>>>,
// s set<frozen<ut>>,
// fs frozen<set<frozen<ut>>>,
// l list<frozen<ut>>,
// fl frozen<list<frozen<ut>>>
// ) WITH compression = {};
// UPDATE ks.t USING TIMESTAMP 1525385507816568 SET
// m[0] = {a: 0, b: 0},
// fm = {0: {a: 0, b: 0}},
// mm[0] = {0: {a: 0, b: 0}},
// fmm = {0: {0: {a: 0, b: 0}}},
// s = s + {{a: 0, b: 0}},
// fs = {{a: 0, b: 0}},
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
// fl = [{a: 0, b: 0}]
// WHERE pk = 0;
//
// It checks whether a SSTable containing UDTs nested in collections, which contains incorrect serialization headers
// (doesn't wrap nested UDTs in the FrozenType<...> tag) can be loaded by new versions of Scylla.
static const sstring LEGACY_UDT_IN_COLLECTION_PATH =
"tests/sstables/3.x/uncompressed/legacy_udt_in_collection";
SEASTAR_THREAD_TEST_CASE(test_legacy_udt_in_collection_table) {
auto abj = defer([] { await_background_jobs().get(); });
auto ut = user_type_impl::get_instance("ks", to_bytes("ut"),
{to_bytes("a"), to_bytes("b")},
{int32_type, int32_type}, false);
auto m_type = map_type_impl::get_instance(int32_type, ut, true);
auto fm_type = map_type_impl::get_instance(int32_type, ut, false);
auto mm_type = map_type_impl::get_instance(int32_type, fm_type, true);
auto fmm_type = map_type_impl::get_instance(int32_type, fm_type, false);
auto s_type = set_type_impl::get_instance(ut, true);
auto fs_type = set_type_impl::get_instance(ut, false);
auto l_type = list_type_impl::get_instance(ut, true);
auto fl_type = list_type_impl::get_instance(ut, false);
auto s = schema_builder("ks", "t")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("m", m_type)
.with_column("fm", fm_type)
.with_column("mm", mm_type)
.with_column("fmm", fmm_type)
.with_column("s", s_type)
.with_column("fs", fs_type)
.with_column("l", l_type)
.with_column("fl", fl_type)
.set_compressor_params(compression_parameters::no_compression())
.build();
auto m_cdef = s->get_column_definition(to_bytes("m"));
auto fm_cdef = s->get_column_definition(to_bytes("fm"));
auto mm_cdef = s->get_column_definition(to_bytes("mm"));
auto fmm_cdef = s->get_column_definition(to_bytes("fmm"));
auto s_cdef = s->get_column_definition(to_bytes("s"));
auto fs_cdef = s->get_column_definition(to_bytes("fs"));
auto l_cdef = s->get_column_definition(to_bytes("l"));
auto fl_cdef = s->get_column_definition(to_bytes("fl"));
BOOST_REQUIRE(m_cdef && fm_cdef && mm_cdef && fmm_cdef && s_cdef && fs_cdef && l_cdef && fl_cdef);
auto ut_val = make_user_value(ut, {int32_t(0), int32_t(0)});
auto fm_val = make_map_value(fm_type, {{int32_t(0), ut_val}});
auto fmm_val = make_map_value(fmm_type, {{int32_t(0), fm_val}});
auto fs_val = make_set_value(fs_type, {ut_val});
auto fl_val = make_list_value(fl_type, {ut_val});
mutation mut{s, partition_key::from_deeply_exploded(*s, {0})};
auto ckey = clustering_key::make_empty();
// m[0] = {a: 0, b: 0}
{
collection_mutation_description desc;
desc.cells.emplace_back(int32_type->decompose(0),
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *m_cdef, desc.serialize(*m_type));
}
// fm = {0: {a: 0, b: 0}}
mut.set_clustered_cell(ckey, *fm_cdef, atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val)));
// mm[0] = {0: {a: 0, b: 0}},
{
collection_mutation_description desc;
desc.cells.emplace_back(int32_type->decompose(0),
atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val), atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *mm_cdef, desc.serialize(*mm_type));
}
// fmm = {0: {0: {a: 0, b: 0}}},
mut.set_clustered_cell(ckey, *fmm_cdef, atomic_cell::make_live(*fmm_type, write_timestamp, fmm_type->decompose(fmm_val)));
// s = s + {{a: 0, b: 0}},
{
collection_mutation_description desc;
desc.cells.emplace_back(ut->decompose(ut_val),
atomic_cell::make_live(*bytes_type, write_timestamp, bytes{}, atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *s_cdef, desc.serialize(*s_type));
}
// fs = {{a: 0, b: 0}},
mut.set_clustered_cell(ckey, *fs_cdef, atomic_cell::make_live(*fs_type, write_timestamp, fs_type->decompose(fs_val)));
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
{
collection_mutation_description desc;
desc.cells.emplace_back(timeuuid_type->decompose(utils::UUID("7fb27e80-7b12-11ea-9fad-f4d108a9e4a3")),
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *l_cdef, desc.serialize(*l_type));
}
// fl = [{a: 0, b: 0}]
mut.set_clustered_cell(ckey, *fl_cdef, atomic_cell::make_live(*fl_type, write_timestamp, fl_type->decompose(fl_val)));
sstable_assertions sst(s, LEGACY_UDT_IN_COLLECTION_PATH);
sst.load();
assert_that(sst.read_rows_flat()).produces(mut).produces_end_of_stream();
}

View File

@@ -0,0 +1 @@
3519784297

View File

@@ -0,0 +1,9 @@
Scylla.db
CRC.db
Filter.db
Statistics.db
TOC.txt
Digest.crc32
Index.db
Summary.db
Data.db

View File

@@ -402,6 +402,8 @@ BOOST_AUTO_TEST_CASE(test_varint) {
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00deadbeef"))), boost::multiprecision::cpp_int("0xdeadbeef"));
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00ffffffffffffffffffffffffffffffff"))), boost::multiprecision::cpp_int("340282366920938463463374607431768211455"));
BOOST_REQUIRE_EQUAL(from_hex("80000000"), varint_type->decompose(boost::multiprecision::cpp_int(-2147483648)));
test_parsing_fails(varint_type, "1A");
}

View File

@@ -108,6 +108,7 @@ MAVEN_LOCAL_REPO="$HOME/.m2"
mkdir -p "$MAVEN_LOCAL_REPO"
docker_common_args=(
--pids-limit -1 \
--network host \
-u "$(id -u):$(id -g)" \
"${group_args[@]}" \

View File

@@ -347,6 +347,7 @@ future<std::unique_ptr<cql_server::response>>
trace_props.set_if<tracing::trace_state_props::log_slow_query>(tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled());
trace_props.set_if<tracing::trace_state_props::full_tracing>(tracing_request != tracing_request_type::not_requested);
tracing::trace_state_ptr trace_state;
if (trace_props) {
if (cqlop == cql_binary_opcode::QUERY ||
@@ -354,15 +355,15 @@ future<std::unique_ptr<cql_server::response>>
cqlop == cql_binary_opcode::EXECUTE ||
cqlop == cql_binary_opcode::BATCH) {
trace_props.set_if<tracing::trace_state_props::write_on_close>(tracing_request == tracing_request_type::write_on_close);
client_state.create_tracing_session(tracing::trace_type::QUERY, trace_props);
trace_state = tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, trace_props);
}
}
tracing::set_request_size(client_state.get_trace_state(), fbuf.bytes_left());
tracing::set_request_size(trace_state, fbuf.bytes_left());
auto linearization_buffer = std::make_unique<bytes_ostream>();
auto linearization_buffer_ptr = linearization_buffer.get();
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit)] () mutable {
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable {
// When using authentication, we need to ensure we are doing proper state transitions,
// i.e. we cannot simply accept any query/exec ops unless auth is complete
switch (client_state.get_auth_state()) {
@@ -393,23 +394,23 @@ future<std::unique_ptr<cql_server::response>>
return *user;
}();
tracing::set_username(client_state.get_trace_state(), user);
tracing::set_username(trace_state, user);
auto in = request_reader(std::move(fbuf), *linearization_buffer_ptr);
switch (cqlop) {
case cql_binary_opcode::STARTUP: return process_startup(stream, std::move(in), client_state);
case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(stream, std::move(in), client_state);
case cql_binary_opcode::OPTIONS: return process_options(stream, std::move(in), client_state);
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit));
case cql_binary_opcode::PREPARE: return process_prepare(stream, std::move(in), client_state);
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit));
case cql_binary_opcode::BATCH: return process_batch(stream, std::move(in), client_state, std::move(permit));
case cql_binary_opcode::REGISTER: return process_register(stream, std::move(in), client_state);
case cql_binary_opcode::STARTUP: return process_startup(stream, std::move(in), client_state, trace_state);
case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(stream, std::move(in), client_state, trace_state);
case cql_binary_opcode::OPTIONS: return process_options(stream, std::move(in), client_state, trace_state);
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit), trace_state);
case cql_binary_opcode::PREPARE: return process_prepare(stream, std::move(in), client_state, trace_state);
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit), trace_state);
case cql_binary_opcode::BATCH: return process_batch(stream, std::move(in), client_state, std::move(permit), trace_state);
case cql_binary_opcode::REGISTER: return process_register(stream, std::move(in), client_state, trace_state);
default: throw exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop)));
}
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer)] (future<std::unique_ptr<cql_server::response>> f) -> std::unique_ptr<cql_server::response> {
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<std::unique_ptr<cql_server::response>> f) -> std::unique_ptr<cql_server::response> {
auto stop_trace = defer([&] {
tracing::stop_foreground(client_state.get_trace_state());
tracing::stop_foreground(trace_state);
});
--_server._requests_serving;
try {
@@ -440,28 +441,28 @@ future<std::unique_ptr<cql_server::response>>
break;
}
tracing::set_response_size(client_state.get_trace_state(), response->size());
tracing::set_response_size(trace_state, response->size());
return response;
} catch (const exceptions::unavailable_exception& ex) {
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, client_state.get_trace_state());
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, trace_state);
} catch (const exceptions::read_timeout_exception& ex) {
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, client_state.get_trace_state());
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, trace_state);
} catch (const exceptions::read_failure_exception& ex) {
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, client_state.get_trace_state());
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, trace_state);
} catch (const exceptions::mutation_write_timeout_exception& ex) {
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, client_state.get_trace_state());
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, trace_state);
} catch (const exceptions::mutation_write_failure_exception& ex) {
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, client_state.get_trace_state());
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, trace_state);
} catch (const exceptions::already_exists_exception& ex) {
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, client_state.get_trace_state());
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, trace_state);
} catch (const exceptions::prepared_query_not_found_exception& ex) {
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, client_state.get_trace_state());
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
} catch (const exceptions::cassandra_exception& ex) {
return make_error(stream, ex.code(), ex.what(), client_state.get_trace_state());
return make_error(stream, ex.code(), ex.what(), trace_state);
} catch (std::exception& ex) {
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), client_state.get_trace_state());
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), trace_state);
} catch (...) {
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", client_state.get_trace_state());
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
}
});
}
@@ -661,8 +662,8 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
return _buffer_reader.read_exactly(_read_buf, length);
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state)
{
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
auto options = in.read_string_map();
auto compression_opt = options.find("COMPRESSION");
if (compression_opt != options.end()) {
@@ -678,33 +679,31 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
}
auto& a = client_state.get_auth_service()->underlying_authenticator();
if (a.require_authentication()) {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), client_state.get_trace_state()));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), trace_state));
}
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, trace_state));
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state)
{
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
auto buf = in.read_raw_bytes_view(in.bytes_left());
auto challenge = sasl_challenge->evaluate_response(buf);
if (sasl_challenge->is_complete()) {
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge)](auth::authenticated_user user) mutable {
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable {
client_state.set_login(::make_shared<auth::authenticated_user>(std::move(user)));
auto f = client_state.check_user_can_login();
return f.then([this, stream, &client_state, challenge = std::move(challenge)]() mutable {
auto tr_state = client_state.get_trace_state();
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), tr_state));
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
});
});
}
auto tr_state = client_state.get_trace_state();
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), tr_state));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), trace_state));
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state)
{
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, client_state.get_trace_state()));
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, std::move(trace_state)));
}
void
@@ -712,10 +711,10 @@ cql_server::connection::init_cql_serialization_format() {
_cql_serialization_format = cql_serialization_format(_version);
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
{
auto query = in.read_long_string_view();
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config);
auto& options = *q_state->options;
@@ -735,12 +734,12 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_qu
}).finally([q_state = std::move(q_state)] {});
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state)
{
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
auto query = sstring(in.read_long_string_view());
tracing::add_query(client_state.get_trace_state(), query);
tracing::begin(client_state.get_trace_state(), "Preparing CQL3 query", client_state.get_client_address());
tracing::add_query(trace_state, query);
tracing::begin(trace_state, "Preparing CQL3 query", client_state.get_client_address());
auto cpu_id = engine().cpu_id();
auto cpus = boost::irange(0u, smp::count);
@@ -752,19 +751,19 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
} else {
return make_ready_future<>();
}
}).then([this, query, stream, &client_state] () mutable {
tracing::trace(client_state.get_trace_state(), "Done preparing on remote shards");
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state] (auto msg) {
tracing::trace(client_state.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
}).then([this, query, stream, &client_state, trace_state] () mutable {
tracing::trace(trace_state, "Done preparing on remote shards");
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state, trace_state] (auto msg) {
tracing::trace(trace_state, "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
return messages::result_message::prepared::cql::get_id(msg);
}));
return this->make_result(stream, msg, client_state.get_trace_state());
return this->make_result(stream, msg, trace_state);
});
});
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
{
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit,
tracing::trace_state_ptr trace_state) {
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
bool needs_authorization = false;
@@ -781,7 +780,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_ex
throw exceptions::prepared_query_not_found_exception(id);
}
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
if (_version == 1) {
std::vector<cql3::raw_value_view> values;
@@ -795,22 +794,22 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_ex
auto& options = *q_state->options;
auto skip_metadata = options.skip_metadata();
tracing::set_page_size(client_state.get_trace_state(), options.get_page_size());
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
tracing::add_query(client_state.get_trace_state(), prepared->raw_cql_statement);
tracing::add_prepared_statement(client_state.get_trace_state(), prepared);
tracing::set_page_size(trace_state, options.get_page_size());
tracing::set_consistency_level(trace_state, options.get_consistency());
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
tracing::add_query(trace_state, prepared->raw_cql_statement);
tracing::add_prepared_statement(trace_state, prepared);
tracing::begin(client_state.get_trace_state(), seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
tracing::begin(trace_state, seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
client_state.get_client_address());
auto stmt = prepared->statement;
tracing::trace(query_state.get_trace_state(), "Checking bounds");
tracing::trace(trace_state, "Checking bounds");
if (stmt->get_bound_terms() != options.get_values_count()) {
const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}",
stmt->get_bound_terms(),
options.get_values_count());
tracing::trace(query_state.get_trace_state(), msg);
tracing::trace(trace_state, msg);
throw exceptions::invalid_request_exception(msg);
}
@@ -828,7 +827,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_ex
}
future<std::unique_ptr<cql_server::response>>
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
{
if (_version == 1) {
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
@@ -844,7 +843,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
modifications.reserve(n);
values.reserve(n);
tracing::begin(client_state.get_trace_state(), "Execute batch of CQL3 queries", client_state.get_client_address());
tracing::begin(trace_state, "Execute batch of CQL3 queries", client_state.get_client_address());
for ([[gnu::unused]] auto i : boost::irange(0u, n)) {
const auto kind = in.read_byte();
@@ -858,7 +857,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
auto query = in.read_long_string_view();
stmt_ptr = _server._query_processor.local().get_statement(query, client_state);
ps = stmt_ptr->checked_weak_from_this();
tracing::add_query(client_state.get_trace_state(), query);
tracing::add_query(trace_state, query);
break;
}
case 1: {
@@ -877,7 +876,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
needs_authorization = pending_authorization_entries.emplace(std::move(cache_key), ps->checked_weak_from_this()).second;
}
tracing::add_query(client_state.get_trace_state(), ps->raw_cql_statement);
tracing::add_query(trace_state, ps->raw_cql_statement);
break;
}
default:
@@ -891,8 +890,8 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
}
::shared_ptr<cql3::statements::modification_statement> modif_statement_ptr = static_pointer_cast<cql3::statements::modification_statement>(ps->statement);
tracing::add_table_name(client_state.get_trace_state(), modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
tracing::add_prepared_statement(client_state.get_trace_state(), ps);
tracing::add_table_name(trace_state, modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
tracing::add_prepared_statement(trace_state, ps);
modifications.emplace_back(std::move(modif_statement_ptr), needs_authorization);
@@ -907,15 +906,15 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
values.emplace_back(std::move(tmp));
}
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
// #563. CQL v2 encodes query_options in v1 format for batch requests.
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values)));
auto& options = *q_state->options;
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
tracing::trace(client_state.get_trace_state(), "Creating a batch statement");
tracing::set_consistency_level(trace_state, options.get_consistency());
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
tracing::trace(trace_state, "Creating a batch statement");
auto batch = ::make_shared<cql3::statements::batch_statement>(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), _server._query_processor.local().get_cql_stats());
return _server._query_processor.local().process_batch(batch, query_state, options, std::move(pending_authorization_entries)).then([this, stream, batch, &query_state] (auto msg) {
@@ -928,15 +927,15 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
}
future<std::unique_ptr<cql_server::response>>
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state)
{
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
std::vector<sstring> event_types;
in.read_string_list(event_types);
for (auto&& event_type : event_types) {
auto et = parse_event_type(event_type);
_server._notifier->register_event(et, this);
}
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, std::move(trace_state)));
}
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state)

View File

@@ -94,8 +94,8 @@ struct cql_query_state {
service::query_state query_state;
std::unique_ptr<cql3::query_options> options;
cql_query_state(service::client_state& client_state, service_permit permit)
: query_state(client_state, std::move(permit))
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
: query_state(client_state, std::move(trace_state), std::move(permit))
{ }
};
@@ -186,14 +186,14 @@ private:
cql_binary_frame_v3 parse_frame(temporary_buffer<char> buf);
future<fragmented_temporary_buffer> read_and_decompress_frame(size_t length, uint8_t flags);
future<std::optional<cql_binary_frame_v3>> read_frame();
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
std::unique_ptr<cql_server::response> make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state);
std::unique_ptr<cql_server::response> make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state);

View File

@@ -2253,13 +2253,20 @@ static size_t concrete_serialized_size(const string_type_impl::native_type& v) {
static size_t concrete_serialized_size(const bytes_type_impl::native_type& v) { return v.size(); }
static size_t concrete_serialized_size(const inet_addr_type_impl::native_type& v) { return v.get().size(); }
static size_t concrete_serialized_size(const varint_type_impl::native_type& v) {
const auto& num = v.get();
if (!num) {
static size_t concrete_serialized_size_aux(const boost::multiprecision::cpp_int& num) {
if (num) {
return align_up(boost::multiprecision::msb(num) + 2, 8u) / 8;
} else {
return 1;
}
auto pnum = abs(num);
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
}
static size_t concrete_serialized_size(const varint_type_impl::native_type& v) {
const auto& num = v.get();
if (num < 0) {
return concrete_serialized_size_aux(-num - 1);
}
return concrete_serialized_size_aux(num);
}
static size_t concrete_serialized_size(const decimal_type_impl::native_type& v) {

View File

@@ -59,11 +59,15 @@ public:
return (most_sig_bits >> 12) & 0xf;
}
bool is_timestamp() const {
return version() == 1;
}
int64_t timestamp() const {
//if (version() != 1) {
// throw new UnsupportedOperationException("Not a time-based UUID");
//}
assert(version() == 1);
assert(is_timestamp());
return ((most_sig_bits & 0xFFF) << 48) |
(((most_sig_bits >> 16) & 0xFFFF) << 32) |

View File

@@ -77,7 +77,7 @@ private:
// placement of this singleton is important. It needs to be instantiated *AFTER* the other statics.
static thread_local const std::unique_ptr<UUID_gen> instance;
int64_t last_nanos = 0;
uint64_t last_nanos = 0;
UUID_gen()
{
@@ -93,7 +93,9 @@ public:
*/
static UUID get_time_UUID()
{
return UUID(instance->create_time_safe(), clock_seq_and_node);
auto uuid = UUID(instance->create_time_safe(), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -103,7 +105,9 @@ public:
*/
static UUID get_time_UUID(int64_t when)
{
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -117,12 +121,16 @@ public:
// "nanos" needs to be in 100ns intervals since the adoption of the Gregorian calendar in the West.
uint64_t nanos = duration_cast<nanoseconds>(tp.time_since_epoch()).count() / 100;
nanos -= (10000ULL * START_EPOCH);
return UUID(create_time(nanos), clock_seq_and_node);
auto uuid = UUID(create_time(nanos), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
static UUID get_time_UUID(int64_t when, int64_t clock_seq_and_node)
{
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
/**
* Similar to get_time_UUID, but randomize the clock and sequence.
@@ -142,7 +150,14 @@ public:
int64_t when_in_millis = when_in_micros / 1000;
int64_t nanos = (when_in_micros - (when_in_millis * 1000)) * 10;
return UUID(create_time(from_unix_timestamp(when_in_millis) + nanos), rand_dist(rand_gen));
auto uuid = UUID(create_time(from_unix_timestamp(when_in_millis) + nanos), rand_dist(rand_gen));
assert(uuid.is_timestamp());
return uuid;
}
/** validates uuid from raw bytes. */
static bool is_valid_UUID(bytes raw) {
return raw.size() == 16;
}
/** creates uuid from raw bytes. */
@@ -198,7 +213,9 @@ public:
*/
static UUID min_time_UUID(int64_t timestamp)
{
return UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
auto uuid = UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -214,7 +231,9 @@ public:
// timestamp 1ms, then we should not extend 100's nanoseconds
// precision by taking 10000, but rather 19999.
int64_t uuid_tstamp = from_unix_timestamp(timestamp + 1) - 1;
return UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
auto uuid = UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -308,6 +327,15 @@ public:
return (uuid.timestamp() / 10000) + START_EPOCH;
}
static uint64_t make_nanos_since(int64_t millis) {
return (static_cast<uint64_t>(millis) - static_cast<uint64_t>(START_EPOCH)) * 10000;
}
// nanos_since must fit in 60 bits
static bool is_valid_nanos_since(uint64_t nanos_since) {
return !(0xf000000000000000UL & nanos_since);
}
private:
// needs to return two different values for the same when.
@@ -319,7 +347,7 @@ private:
using namespace std::chrono;
int64_t millis = duration_cast<milliseconds>(
system_clock::now().time_since_epoch()).count();
int64_t nanos_since = (millis - START_EPOCH) * 10000;
uint64_t nanos_since = make_nanos_since(millis);
if (nanos_since > last_nanos)
last_nanos = nanos_since;
else
@@ -330,16 +358,17 @@ private:
int64_t create_time_unsafe(int64_t when, int nanos)
{
uint64_t nanos_since = ((when - START_EPOCH) * 10000) + nanos;
uint64_t nanos_since = make_nanos_since(when) + static_cast<uint64_t>(static_cast<int64_t>(nanos));
return create_time(nanos_since);
}
static int64_t create_time(uint64_t nanos_since)
{
uint64_t msb = 0L;
assert(is_valid_nanos_since(nanos_since));
msb |= (0x00000000ffffffffL & nanos_since) << 32;
msb |= (0x0000ffff00000000UL & nanos_since) >> 16;
msb |= (0xffff000000000000UL & nanos_since) >> 48;
msb |= (0x0fff000000000000UL & nanos_since) >> 48;
msb |= 0x0000000000001000L; // sets the version to 1.
return msb;
}

View File

@@ -2064,6 +2064,17 @@ bool segment_pool::migrate_segment(segment* src, segment* dst)
}
void tracker::impl::register_region(region::impl* r) {
// If needed, increase capacity of regions before taking the reclaim lock,
// to avoid failing an allocation when push_back() tries to increase
// capacity.
//
// The capacity increase is atomic (wrt _regions) so it cannot be
// observed
if (_regions.size() == _regions.capacity()) {
auto copy = _regions;
copy.reserve(copy.capacity() * 2);
_regions = std::move(copy);
}
reclaiming_lock _(*this);
_regions.push_back(r);
llogger.debug("Registered region @{} with id={}", r, r->id());