Compare commits

...

64 Commits

Author SHA1 Message Date
Asias He
9b46b9f1a8 gossip: Add an option to force gossip generation
Consider 3 nodes in the cluster, n1, n2, n3 with gossip generation
number g1, g2, g3.

n1, n2, n3 running scylla version with commit
0a52ecb6df (gossip: Fix max generation
drift measure)

One year later, user wants the upgrade n1,n2,n3 to a new version

when n3 does a rolling restart with a new version, n3 will use a
generation number g3'. Because g3' - g2 > MAX_GENERATION_DIFFERENCE and
g3' - g1 > MAX_GENERATION_DIFFERENCE, so g1 and g2 will reject n3's
gossip update and mark g3 as down.

Such unnecessary marking of node down can cause availability issues.
For example:

DC1: n1, n2
DC2: n3, n4

When n3 and n4 restart, n1 and n2 will mark n3 and n4 as down, which
causes the whole DC2 to be unavailable.

To fix, we can start the node with a gossip generation within
MAX_GENERATION_DIFFERENCE difference for the new node.

Once all the nodes run the version with commit
0a52ecb6df, the option is no logger
needed.

Fixes #5164

(cherry picked from commit 743b529c2b)

[tgrabiec: resolved major conflicts in config.hh]
2020-03-27 13:08:26 +01:00
Asias He
93da2e2ff0 gossiper: Always use the new generation number
User reported an issue that after a node restart, the restarted node
is marked as DOWN by other nodes in the cluster while the node is up
and running normally.

Consier the following:

- n1, n2, n3 in the cluster
- n3 shutdown itself
- n3 send shutdown verb to n1 and n2
- n1 and n2 set n3 in SHUTDOWN status and force the heartbeat version to
  INT_MAX
- n3 restarts
- n3 sends gossip shadow rounds to n1 and n2, in
  storage_service::prepare_to_join,
- n3 receives response from n1, in gossiper::handle_ack_msg, since
  _enabled = false and _in_shadow_round == false, n3 will apply the
  application state in fiber1, filber 1 finishes faster filber 2, it
  sets _in_shadow_round = false
- n3 receives response from n2, in gossiper::handle_ack_msg, since
  _enabled = false and _in_shadow_round == false, n3 will apply the
  application state in fiber2, filber 2 yields
- n3 finishes the shadow round and continues
- n3 resets gossip endpoint_state_map with
  gossiper.reset_endpoint_state_map()
- n3 resumes fiber 2, apply application state about n3 into
  endpoint_state_map, at this point endpoint_state_map contains
  information including n3 itself from n2.
- n3 calls gossiper.start_gossiping(generation_number, app_states, ...)
  with new generation number generated correctly in
  storage_service::prepare_to_join, but in
  maybe_initialize_local_state(generation_nbr), it will not set new
  generation and heartbeat if the endpoint_state_map contains itself
- n3 continues with the old generation and heartbeat learned in fiber 2
- n3 continues the gossip loop, in gossiper::run,
  hbs.update_heart_beat() the heartbeat is set to the number starting
  from 0.
- n1 and n2 will not get update from n3 because they use the same
  generation number but n1 and n2 has larger heartbeat version
- n1 and n2 will mark n3 as down even if n3 is alive.

To fix, always use the the new generation number.

Fixes: #5800
Backports: 3.0 3.1 3.2
(cherry picked from commit 62774ff882)
2020-03-27 12:53:26 +01:00
Piotr Sarna
b764db3f1c cql: fix qualifying indexed columns for filtering
When qualifying columns to be fetched for filtering, we also check
if the target column is not used as an index - in which case there's
no need of fetching it. However, the check was incorrectly assuming
that any restriction is eligible for indexing, while it's currently
only true for EQ. The fix makes a more specific check and contains
many dynamic casts, but these will hopefully we gone once our
long planned "restrictions rewrite" is done.
This commit comes with a test.

Fixes #5708
Tests: unit(dev)

(cherry picked from commit 767ff59418)
2020-03-22 10:08:48 +01:00
Konstantin Osipov
304d339193 locator: correctly select endpoints if RF=0
SimpleStrategy creates a list of endpoints by iterating over the set of
all configured endpoints for the given token, until we reach keyspace
replication factor.
There is a trivial coding bug when we first add at least one endpoint
to the list, and then compare list size and replication factor.
If RF=0 this never yields true.
Fix by moving the RF check before at least one endpoint is added to the
list.
Cassandra never had this bug since it uses a less fancy while()
loop.

Fixes #5962
Message-Id: <20200306193729.130266-1-kostja@scylladb.com>

(cherry picked from commit ac6f64a885)
2020-03-12 12:10:45 +02:00
Avi Kivity
9f7ba4203d logalloc: increase capacity of _regions vector outside reclaim lock
Reclaim consults the _regions vector, so we don't want it moving around while
allocating more capacity. For that we take the reclaim lock. However, that
can cause a false-positive OOM during startup:

1. all memory is allocated to LSA as part of priming (2baa16b371)
2. the _regions vector is resized from 64k to 128k, requiring a segment
   to be freed (plenty are free)
3. but reclaiming_lock is taken, so we cannot reclaim anything.

To fix, resize the _regions vector outside the lock.

Fixes #6003.
Message-Id: <20200311091217.1112081-1-avi@scylladb.com>

(cherry picked from commit c020b4e5e2)
2020-03-12 11:25:50 +02:00
Benny Halevy
8b6a792f81 dist/redhat: scylla.spec.mustache: set _no_recompute_build_ids
By default, `/usr/lib/rpm/find-debuginfo.sh` will temper with
the binary's build-id when stripping its debug info as it is passed
the `--build-id-seed <version>.<release>` option.

To prevent that we need to set the following macros as follows:
  unset `_unique_build_ids`
  set `_no_recompute_build_ids` to 1

Fixes #5881

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 25a763a187)
2020-03-09 15:22:58 +02:00
Benny Halevy
8a94f6b180 gossiper: do_stop_gossiping: copy live endpoints vector
It can be resized asynchronously by mark_dead.

Fixes #5701

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20200203091344.229518-1-bhalevy@scylladb.com>
(cherry picked from commit f45fabab73)
2020-02-26 13:00:33 +02:00
Benny Halevy
27209a5b2e storage_service: drain_on_shutdown: unregister storage_proxy subscribers from local_storage_service
Match subscription done in main() and avoid cross shard access
to _lifecycle_subscribers vector.

Fixes #5385

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Acked-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20200123092817.454271-1-bhalevy@scylladb.com>
(cherry picked from commit 5b0ea4c114)
2020-02-25 16:40:31 +02:00
Hagit Segev
c25f627a6e release: prepare for 3.1.4 2020-02-24 23:53:03 +02:00
Avi Kivity
58b1bdc20c Revert "streaming: Do not invalidate cache if no sstable is added in flush_streaming_mutations"
This reverts commit 4791b726a0. It exposes a data
resurrection bug (#5858).
2020-02-24 11:41:37 +02:00
Piotr Dulikowski
e1a7df174c hh: handle counter update hints correctly
This patch fixes a bug that appears because of an incorrect interaction
between counters and hinted handoff.

When a counter is updated on the leader, it sends mutations to other
replicas that contain all counter shards from the leader. If consistency
level is achieved but some replicas are unavailable, a hint with
mutation containing counter shards is stored.

When a hint's destination node is no longer its replica, it is attempted
to be sent to all its current replicas. Previously,
storage_proxy::mutate was used for that purpose. It was incorrect
because that function treats mutations for counter tables as mutations
containing only a delta (by how much to increase/decrease the counter).
These two types of mutations have different serialization format, so in
this case a "shards" mutation is reinterpreted as "delta" mutation,
which can cause data corruption to occur.

This patch backports `storage_proxy::mutate_hint_from_scratch`
function, which bypasses special handling of counter mutations and
treats them as regular mutations - which is the correct behavior for
"shards" mutations.

Refs #5833.
Backports: 3.1, 3.2, 3.3
Tests: unit(dev)
(cherry picked from commit ec513acc49)
2020-02-19 18:04:06 +02:00
Avi Kivity
6c39e17838 Merge "cql3: time_uuid_fcts: validate time UUID" from Benny
"
Throw an error in case we hit an invalid time UUID
rather than hitting an assert.

Fixes #5552

(Ref #5588 that was dequeued and fixed here)

Test: UUID_test, cql_query_test(debug)
"

* 'validate-time-uuid' of https://github.com/bhalevy/scylla:
  cql3: abstract_function_selector: provide assignment_testable_source_context
  test: cql_query_test: add time uuid validation tests
  cql3: time_uuid_fcts: validate timestamp arg
  cql3: make_max_timeuuid_fct: delete outdated FIXME comment
  cql3: time_uuid_fcts: validate time UUID
  test: UUID_test: add tests for time uuid
  utils: UUID: create_time assert nanos_since validity
  utils/UUID_gen: make_nanos_since
  utils: UUID: assert UUID.is_timestamp

(cherry picked from commit 3343baf159)

Conflicts:
	cql3/functions/time_uuid_fcts.hh
	tests/cql_query_test.cc
2020-02-17 20:09:09 +02:00
Avi Kivity
507d763f45 Update seastar submodule
* seastar a5312ab85a...a51bd8b91a (1):
  > config: Do not allow zero rates

Fixes #5360.
2020-02-16 17:02:54 +02:00
Benny Halevy
b042e27f0a repair: initialize row_level_repair: _zero_rows
Avoid following UBSAN error:
repair/row_level.cc:2141:7: runtime error: load of value 240, which is not a valid value for type 'bool'

Fixes #5531

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 474ffb6e54)
2020-02-16 16:11:36 +02:00
Rafael Ávila de Espíndola
375ce345a3 main: Explicitly allow scylla core dumps
I have not looked into the security reason for disabling it when
a program has file capabilities.

Fixes #5560

[avi: remove extraneous semicolon]
Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200106231836.99052-1-espindola@scylladb.com>
(cherry picked from commit b80852c447)
2020-02-16 16:04:24 +02:00
Avi Kivity
493b821dfa Update seastar submodule
* seastar cfc082207c...a5312ab85a (1):
  > perftune.py: Use safe_load() for fix arbitrary code execution

Fixes #5630.
2020-02-16 15:55:05 +02:00
Avi Kivity
e1999c76b2 tools: toolchain: dbuild: relax process limit in container
Docker restricts the number of processes in a container to some
limit it calculates. This limit turns out to be too low on large
machines, since we run multiple links in parallel, and each link
runs many threads.

Remove the limit by specifying --pids-limit -1. Since dbuild is
meant to provide a build environment, not a security barrier,
this is okay (the container is still restricted by host limits).

I checked that --pids-limit is supported by old versions of
docker and by podman.

Fixes #5651.
Message-Id: <20200127090807.3528561-1-avi@scylladb.com>

(cherry picked from commit 897320f6ab)
2020-02-16 15:42:12 +02:00
Asias He
4791b726a0 streaming: Do not invalidate cache if no sstable is added in flush_streaming_mutations
The table::flush_streaming_mutations is used in the days when streaming
data goes to memtable. After switching to the new streaming, data goes
to sstables directly in streaming, so the sstables generated in
table::flush_streaming_mutations will be empty.

It is unnecessary to invalidate the cache if no sstables are added. To
avoid unnecessary cache invalidating which pokes hole in the cache, skip
calling _cache.invalidate() if the sstables is empty.

The steps are:

- STREAM_MUTATION_DONE verb is sent when streaming is done with old or
  new streaming
- table::flush_streaming_mutations is called in the verb handler
- cache is invalidated for the streaming ranges

In summary, this patch will avoid a lot of cache invalidation for
streaming.

Backports: 3.0 3.1 3.2
Fixes: #5769
(cherry picked from commit 5e9925b9f0)
2020-02-16 15:16:50 +02:00
Botond Dénes
d7354a5b8d row: append(): downgrade assert to on_internal_error()
This assert, added by 060e3f8 is supposed to make sure the invariant of
the append() is respected, in order to prevent building an invalid row.
The assert however proved to be too harsh, as it converts any bug
causing out-of-order clustering rows into cluster unavailability.
Downgrade it to on_internal_error(). This will still prevent corrupt
data from spreading in the cluster, without the unavailability caused by
the assert.

Fixes: #5786
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20200211083829.915031-1-bdenes@scylladb.com>
(cherry picked from commit 3164456108)
2020-02-16 15:14:03 +02:00
Takuya ASADA
6815b72b06 dist/debian: keep /etc/systemd .conf files on 'remove'
Since dpkg does not re-install conffiles when it removed by user,
currently we are missing dependencies.conf and sysconfdir.conf on rollback.
To prevent this, we need to stop running
'rm -rf /etc/systemd/system/scylla-server.service.d/' on 'remove'.

Fixes #5734

(cherry picked from commit 43097854a5)
2020-02-12 14:29:30 +02:00
Rafael Ávila de Espíndola
efc2df8ca3 types: Fix encoding of negative varint
We would sometimes produce an unnecessary extra 0xff prefix byte.

The new encoding matches what cassandra does.

This was both a efficiency and correctness issue, as using varint in a
key could produce different tokens.

Fixes #5656

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
(cherry picked from commit c89c90d07f)
2020-02-02 17:35:02 +02:00
Avi Kivity
dbe90f131f test: make eventually() more patient
We use eventually() in tests to wait for eventually consistent data
to become consistent. However, we see spurious failures indicating
that we wait too little.

Increasing the timeout has a negative side effect in that tests that
fail will now take longer to do so. However, this negative side effect
is negligible to false-positive failures, since they throw away large
test efforts and sometimes require a person to investigate the problem,
only to conclude it is a false positive.

This patch therefore makes eventually() more patient, by a factor of
32.

Fixes #4707.
Message-Id: <20200130162745.45569-1-avi@scylladb.com>

(cherry picked from commit ec5b721db7)
2020-02-01 13:22:03 +02:00
Takuya ASADA
31d5d16c3d dist/debian: Use tilde for release candidate builds
We need to add '~' to handle rcX version correctly on Debian variants
(merged at ae33e9f), but when we moved to relocated package we mistakenly
dropped the code, so add the code again.

Fixes #5641

(cherry picked from commit dd81fd3454)
2020-01-28 18:35:40 +02:00
Hagit Segev
b0d122f9c5 release: prepare for 3.1.3 2020-01-28 14:09:57 +02:00
Asias He
9a10e4a245 repair: Avoid duplicated partition_end write
Consider this:

1) Write partition_start of p1
2) Write clustering_row of p1
3) Write partition_end of p1
4) Repair is stopped due to error before writing partition_start of p2
5) Repair calls repair_row_level_stop() to tear down which calls
   wait_for_writer_done(). A duplicate partition_end is written.

To fix, track the partition_start and partition_end written, avoid
unpaired writes.

Backports: 3.1 and 3.2
Fixes: #5527
(cherry picked from commit 401854dbaf)
2020-01-21 13:39:19 +02:00
Piotr Sarna
871d1ebdd5 view: ignore duplicated key entries in progress virtual reader
Build progress virtual reader uses Scylla-specific
scylla_views_builds_in_progress table in order to represent
legacy views_builds_in_progress rows. The Scylla-specific table contains
additional cpu_id clustering key part, which is trimmed before returning
it to the user. That may cause duplicated clustering row fragments to be
emitted by the reader, which may cause undefined behaviour in consumers.
The solution is to keep track of previous clustering keys for each
partition and drop fragments that would cause duplication. That way if
any shard is still building a view, its progress will be returned,
and if many shards are still building, the returned value will indicate
the progress of a single arbitrary shard.

Fixes #4524
Tests:
unit(dev) + custom monotonicity checks from <tgrabiec@scylladb.com>

(cherry picked from commit 85a3a4b458)
2020-01-16 12:07:40 +01:00
Tomasz Grabiec
bff996959d cql: alter type: Format field name as text instead of hex
Fixes #4841

Message-Id: <1565702635-26214-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 64ff1b6405)
2020-01-05 18:51:53 +02:00
Gleb Natapov
1bdc83540b cache_hitrate_calculator: do not ignore a future returned from gossiper::add_local_application_state
We should wait for a future returned from add_local_application_state() to
resolve before issuing new calculation, otherwise two
add_local_application_state() may run simultaneously for the same state.

Fixes #4838.

Message-Id: <20190812082158.GE17984@scylladb.com>
(cherry picked from commit 00c4078af3)
2020-01-05 18:50:13 +02:00
Takuya ASADA
478c35e07a dist/debian: fix missing scyllatop files
Debian package build script does runs relocate_python_scripts.py for scyllatop,
but mistakenly forgetting to install tools/scyllatop/*.py.
We need install them by using scylla-server.install.

Fixes #5518

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <20191227025750.434407-1-syuu@scylladb.com>
2019-12-30 19:38:34 +02:00
Benny Halevy
ba968ab9ec tracing: one_session_records: keep local tracing ptr
Similar to trace_state keep shared_ptr<tracing> _local_tracing_ptr
in one_session_records when constructed so it can be used
during shutdown.

Fixes #5243

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 7aef39e400)
2019-12-24 18:42:21 +02:00
Avi Kivity
883b5e8395 database: fix schema use-after-move in make_multishard_streaming_reader
On aarch64, asan detected a use-after-move. It doesn't happen on x86_64,
likely due to different argument evaluation order.

Fix by evaluating full_slice before moving the schema.

Note: I used "auto&&" and "std::move()" even though full_slice()
returns a reference. I think this is safer in case full_slice()
changes, and works just as well with a reference.

Fixes #5419.

(cherry picked from commit 85822c7786)
2019-12-24 18:35:01 +02:00
Rafael Ávila de Espíndola
b47033676a types: recreate dependent user types.
In the system.types table a user type refers to another by name. When
a user type is modified, only its entry in the table is changed.

At runtime a user type has direct pointer to the types it uses. To
handle the discrepancy we need to recreate any dependent types when a
entry in system.types changes.

Fixes #5049

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
(cherry picked from commit 5af8b1e4a3)
2019-12-23 17:58:26 +02:00
Tomasz Grabiec
67e45b73f0 types: Fix abort on type alter which affects a compact storage table with no regular columns
Fixes #4837

Message-Id: <1565702247-23800-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 34cff6ed6b)
2019-12-23 17:34:06 +02:00
Rafael Ávila de Espíndola
37eac75b6f cql: Fix use of UDT in reversed columns
We were missing calls to underlying_type in a few locations and so the
insert would think the given literal was invalid and the select would
refuse to fetch a UDT field.

Fixes #4672

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20190708200516.59841-1-espindola@scylladb.com>
(cherry picked from commit 4e7ffb80c0)
2019-12-23 15:54:36 +02:00
Piotr Sarna
e8431a3474 table: Reduce read amplification in view update generation
This commit makes sure that single-partition readers for
read-before-write do not have fast-forwarding enabled,
as it may lead to huge read amplification. The observed case was:
1. Creating an index.
  CREATE INDEX index1  ON myks2.standard1 ("C1");
2. Running cassandra-stress in order to generate view updates.
cassandra-stress write no-warmup n=1000000 cl=ONE -schema \
  'replication(factor=2) compaction(strategy=LeveledCompactionStrategy)' \
  keyspace=myks2 -pop seq=4000000..8000000 -rate threads=100 -errors
  skip-read-validation -node 127.0.0.1;

Without disabling fast-forwarding, single-partition readers
were turned into scanning readers in cache, which resulted
in reading 36GB (sic!) on a workload which generates less
than 1GB of view updates. After applying the fix, the number
dropped down to less than 1GB, as expected.

Refs #5409
Fixes #4615
Fixes #5418

(cherry picked from commit 79c3a508f4)
2019-12-05 22:36:20 +02:00
Yaron Kaikov
9d78d848e6 release: prepare for 3.1.2 2019-11-27 10:24:43 +02:00
Rafael Ávila de Espíndola
32aa6ddd7e commitlog: make sure a file is closed
If allocate or truncate throws, we have to close the file.

Fixes #4877

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20191114174810.49004-1-espindola@scylladb.com>
(cherry picked from commit 6160b9017d)
2019-11-24 17:48:24 +02:00
Tomasz Grabiec
74cc9477af row_cache: Fix abort on bad_alloc during cache update
Since 90d6c0b, cache will abort when trying to detach partition
entries while they're updated. This should never happen. It can happen
though, when the update fails on bad_alloc, because the cleanup guard
invalidates the cache before it releases partition snapshots (held by
"update" coroutine).

Fix by destroying the coroutine first.

Fixes #5327.

Tests:
  - row_cache_test (dev)

Message-Id: <1574360259-10132-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit e3d025d014)
2019-11-24 17:44:07 +02:00
Nadav Har'El
95acf71680 merge: row_marker: correct row expiry condition
Merged patch set by Piotr Dulikowski:

This change corrects condition on which a row was considered expired by its
TTL.

The logic that decides when a row becomes expired was inconsistent with the
logic that decides if a single cell is expired. A single cell becomes expired
when expiry_timestamp <= now, while a row became expired when
expiry_timestamp < now (notice the strict inequality). For rows inserted
with TTL, this caused non-key cells to expire (change their values to null)
one second before the row disappeared. Now, row expiry logic uses non-strict
inequality.

Fixes #4263,
Fixes #5290.

Tests:

    unit(dev)
    python test described in issue #5290

(cherry picked from commit 9b9609c65b)
2019-11-20 21:40:11 +02:00
Asias He
921f8baf00 gossip: Fix max generation drift measure
Assume n1 and n2 in a cluster with generation number g1, g2. The
cluster runs for more than 1 year (MAX_GENERATION_DIFFERENCE). When n1
reboots with generation g1' which is time based, n2 will see
g1' > g2 + MAX_GENERATION_DIFFERENCE and reject n1's gossip update.

To fix, check the generation drift with generation value this node would
get if this node were restarted.

This is a backport of CASSANDRA-10969.

Fixes #5164

(cherry picked from commit 0a52ecb6df)
2019-11-20 11:39:16 +02:00
Avi Kivity
071d7d9210 reloc: do not install dependencies when building the relocatable package
The dependencies are provided by the frozen toolchain. If a dependency
is missing, we must update the toolchain rather than rely on build-time
installation, which is not reproducible (as different package versions
are available at different times).

Luckily "dnf install" does not update an already-installed package. Had
that been a case, none of our builds would have been reproducible, since
packages would be updated to the latest version as of the build time rather
than the version selected by the frozen toolchain.

So, to prevent missing packages in the frozen toolchain translating to
an unreproducible build, remove the support for installing dependencies
from reloc/build_reloc.sh. We still parse the --nodeps option in case some
script uses it.

Fixes #5222.

Tests: reloc/build_reloc.sh.
(cherry picked from commit cd075e9132)
2019-11-18 14:58:24 +02:00
Kamil Braun
769b9bbe59 view: fix bug in virtual columns.
When creating a virtual column of non-frozen map type,
the wrong type was used for the map's keys.

Fixes #5165.

(cherry picked from commit ef9d5750c8)
2019-11-18 14:55:17 +02:00
Benny Halevy
d4e553c153 sstables: delete_atomically: fix misplaced parenthesis in pending_delete_log warning message
Fixes #4861.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20190818064637.9207-1-bhalevy@scylladb.com>
(cherry picked from commit 20083be9f6)
2019-11-17 17:57:00 +02:00
Avi Kivity
d983411488 build: adjust libthread_db file name to match gdb expectations
gdb searches for libthread_db.so using its canonical name of libthread_db.so.1 rather
than the file name of libthread_db-1.0.so, so use that name to store the file in the
archive.

Fixes #4996.

(cherry picked from commit d77171e10e)
2019-11-17 17:57:00 +02:00
Avi Kivity
27de1bb8e6 reconcilable_result: use chunked_vector to hold partitions
Usually, a reconcilable_result holds very few partitions (1 is common),
since the page size is limited by 1MB. But if we have paging disabled or
if we are reconciling a range full of tombstones, we may see many more.
This can cause large allocations.

Change to chunked_vector to prevent those large allocations, as they
can be quite expensive.

Fixes #4780.

(cherry picked from commit 093d2cd7e5)
2019-11-17 17:57:00 +02:00
Avi Kivity
854f8ccb40 utils::chunked_vector: add rbegin() and related iterators
Needed as an std::vector replacement.

(cherry picked from commit eaa9a5b0d7)

Prerequisite for #4780.
2019-11-17 17:57:00 +02:00
Avi Kivity
a68170c9a3 utils: chunked_vector: make begin()/end() const correct
begin() of a const vector should return a const_iterator, to avoid
giving the caller the ability to mutate it.

This slipped through since iterator's constructor does a const_cast.

Noticed by code inspection.

(cherry picked from commit df6faae980)

Prerequisite for #4780.
2019-11-17 17:57:00 +02:00
Glauber Costa
7e4bcf2c0f do not crash in user-defined operations if the controller is disabled
Scylla currently crashes if we run manual operations like nodetool
compact with the controller disabled. While we neither like nor
recommend running with the controller disabled, due to some corner cases
in the controller algorithm we are not yet at the point in which we can
deprecate this and are sometimes forced to disable it.

The reason for the crash is that manual operations will invoke
_backlog_of_shares, which returns what is the backlog needed to
create a certain number of shares. That scan the existing control
points, but when we run without the controller there are no control
points and we crash.

Backlog doesn't matter if the controller is disabled, and the return
value of this function will be immaterial in this case. So to avoid the
crash, we return something right away if the controller is disabled.

Fixes #5016

Signed-off-by: Glauber Costa <glauber@scylladb.com>
(cherry picked from commit c9f2d1d105)
2019-11-17 12:33:23 +02:00
Avi Kivity
a74b3a182e Merge "Add proper aggregation for paged indexing" from Piotr
"
Fixes #4540

This series adds proper handling of aggregation for paged indexed queries.
Before this series returned results were presented to the user in per-page
partial manner, while they should have been returned as a single aggregated
value.

Tests: unit(dev)
"

* 'add_proper_aggregation_for_paged_indexing_for_3.1' of https://github.com/psarna/scylla:
  test: add 'eventually' block to index paging test
  tests: add indexing+paging test case for clustering keys
  tests: add indexing + paging + aggregation test case
  tests: add query_options to cquery_nofail
  cql3: make DEFAULT_COUNT_PAGE_SIZE constant public
  cql3: add proper aggregation to paged indexing
  cql3: add a query options constructor with explicit page size
  cql3: enable explicit copying of query_options
  cql3: split execute_base_query implementation
2019-11-17 12:23:30 +02:00
Piotr Sarna
e9bc579565 test: add 'eventually' block to index paging test
Without 'eventually', the test is flaky because the index can still
be not up to date while checking its conditions.

Fixes #4670

(cherry picked from commit ebbe038d19)
2019-11-15 09:12:40 +01:00
Piotr Sarna
ad46bf06a7 tests: add indexing+paging test case for clustering keys
Indexing a non-prefix part of the clustering key has a separate
code path (see issue #3405), so it deserves a separate test case.
2019-11-14 10:26:49 +01:00
Piotr Sarna
1ff21a28b7 tests: add indexing + paging + aggregation test case
Indexed queries used to erroneously return partial per-page results
for aggregation queries. This test case used to reproduce the problem
and now ensures that there would be no regressions.

Refs #4540
2019-11-14 10:26:49 +01:00
Piotr Sarna
fb3dfaa736 tests: add query_options to cquery_nofail
The cquery_nofail utility is extended, so it can accept custom
query options, just like execute_cql does.
2019-11-14 10:26:49 +01:00
Piotr Sarna
5a02e6976f cql3: make DEFAULT_COUNT_PAGE_SIZE constant public
The constant will be later used in test scenarios.
2019-11-14 10:26:49 +01:00
Piotr Sarna
5202eea7a7 cql3: add proper aggregation to paged indexing
Aggregated and paged filtering needs to aggregate the results
from all pages in order to avoid returning partial per-page
results. It's a little bit more complicated than regular aggregation,
because each paging state needs to be translated between the base
table and the underlying view. The routine keeps fetching pages
from the underlying view, which are then used to fetch base rows,
which go straight to the result set builder.

Fixes #4540
2019-11-14 10:26:48 +01:00
Gleb Natapov
038733f1a5 storage_proxy: do not release mutation if not all replies were received
MV backpressure code frees mutation for delayed client replies earlier
to save memory. The commit 2d7c026d6e that
introduced the logic claimed to do it only when all replies are received,
but this is not the case. Fix the code to free only when all replies
are received for real.

Fixes #5242

Message-Id: <20191113142117.GA14484@scylladb.com>
(cherry picked from commit 552c56633e)
2019-11-14 11:04:27 +02:00
Piotr Sarna
0ed2e90925 cql3: add a query options constructor with explicit page size
For internal use, there already exists a query_options constructor
that copies data from another query_options with overwritten paging
state. This commit adds an option to overwrite page size as well.
2019-11-14 09:58:35 +01:00
Piotr Sarna
9ee6d2bc15 cql3: enable explicit copying of query_options 2019-11-14 09:58:28 +01:00
Piotr Sarna
23582a2ce9 cql3: split execute_base_query implementation
In order to handle aggregation queries correctly, the function that
returns base query results is split into two, so it's possible to
access raw query results, before they're converted into end-user
CQL message.
2019-11-14 09:58:05 +01:00
Takuya ASADA
5ddf0ec1df dist/common/scripts/scylla_setup: don't proceed with empty NIC name
Currently NIC selection prompt on scylla_setup just proceed setup when
user just pressed Enter key on the prompt.
The prompt should ask NIC name again until user input correct NIC name.

Fixes #4517
Message-Id: <20190617124925.11559-1-syuu@scylladb.com>

(cherry picked from commit 7320c966bc)
2019-11-13 17:27:21 +02:00
Avi Kivity
e6eb54af90 Update seastar submodule
* seastar 75488f6ef2...cfc082207c (2):
  > core: fix a race in execution stages
  > execution_stage: prevent unbounded growth

Fixes #4749.
Fixes #4856.
2019-11-13 13:14:27 +02:00
Piotr Sarna
f5a869966a view: fix view_info select statement for local indexes
Calculating the select statement for given view_info structure
used to work fine, but once local indexes were introduced, a subtle
bug appeared: the legacy token column does not exist in local indexes
and a valid clustering key column was omitted instead.
That results in potentially incorrect partition slices being used later
in read-before-write.
There's a long term plan for removing select_statement from
view info altogether, but nonetheless the bug needs to be fixed first.

cherry picked from commit 9e98b51aaa

Fixes #5241
Message-Id: <cb2e863e8e993e00ec7329505f737a9ce4b752ae.1572432826.git.sarna@scylladb.com>
2019-11-01 08:06:30 +02:00
Piotr Sarna
0c70cd626b index: add is_global_index() utility
The helper function is useful for determining if given schema
represents a global index.

cherry picked from commit 2ee8c6f595
Message-Id: <db5c9383e426fb2e55e5dbeebc7b8127afc91158.1572432826.git.sarna@scylladb.com>
2019-11-01 08:06:25 +02:00
Botond Dénes
0928aa4791 repair: repair_cf_range(): extract result of local checksum calculation only once
The loop that collects the result of the checksum calculations and logs
any errors. The error logging includes `checksums[0]` which corresponds
to the checksum calculation on the local node. This violates the
assumption of the code following the loop, which assumes that the future
of `checksums[0]` is intact after the loop terminates. However this is
only true when the checksum calculation is successful and is false when
it fails, as in this case the loop extracts the error and logs it. When
the code after the loop checks again whether said calculation failed, it
will get a false negative and will go ahead and attempt to extract the
value, triggering an assert failure.
Fix by making sure that even in the case of failed checksum calculation,
the result of `checksum[0]` is extracted only once.

Fixes: #5238
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20191029151709.90986-1-bdenes@scylladb.com>
(cherry picked from commit e48f301e95)
2019-10-29 20:43:30 +02:00
69 changed files with 997 additions and 147 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=3.1.1
VERSION=3.1.4
if test -f version
then

View File

@@ -61,6 +61,16 @@ make_now_fct() {
});
}
static int64_t get_valid_timestamp(const data_value& ts_obj) {
auto ts = value_cast<db_clock::time_point>(ts_obj);
int64_t ms = ts.time_since_epoch().count();
auto nanos_since = utils::UUID_gen::make_nanos_since(ms);
if (!utils::UUID_gen::is_valid_nanos_since(nanos_since)) {
throw exceptions::server_exception(format("{}: timestamp is out of range. Must be in milliseconds since epoch", ms));
}
return ms;
}
inline
shared_ptr<function>
make_min_timeuuid_fct() {
@@ -74,8 +84,7 @@ make_min_timeuuid_fct() {
if (ts_obj.is_null()) {
return {};
}
auto ts = value_cast<db_clock::time_point>(ts_obj);
auto uuid = utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count());
auto uuid = utils::UUID_gen::min_time_UUID(get_valid_timestamp(ts_obj));
return {timeuuid_type->decompose(uuid)};
});
}
@@ -85,7 +94,6 @@ shared_ptr<function>
make_max_timeuuid_fct() {
return make_native_scalar_function<true>("maxtimeuuid", timeuuid_type, { timestamp_type },
[] (cql_serialization_format sf, const std::vector<bytes_opt>& values) -> bytes_opt {
// FIXME: should values be a vector<optional<bytes>>?
auto& bb = values[0];
if (!bb) {
return {};
@@ -94,12 +102,22 @@ make_max_timeuuid_fct() {
if (ts_obj.is_null()) {
return {};
}
auto ts = value_cast<db_clock::time_point>(ts_obj);
auto uuid = utils::UUID_gen::max_time_UUID(ts.time_since_epoch().count());
auto uuid = utils::UUID_gen::max_time_UUID(get_valid_timestamp(ts_obj));
return {timeuuid_type->decompose(uuid)};
});
}
inline utils::UUID get_valid_timeuuid(bytes raw) {
if (!utils::UUID_gen::is_valid_UUID(raw)) {
throw exceptions::server_exception(format("invalid timeuuid: size={}", raw.size()));
}
auto uuid = utils::UUID_gen::get_UUID(raw);
if (!uuid.is_timestamp()) {
throw exceptions::server_exception(format("{}: Not a timeuuid: version={}", uuid, uuid.version()));
}
return uuid;
}
inline
shared_ptr<function>
make_date_of_fct() {
@@ -110,7 +128,7 @@ make_date_of_fct() {
if (!bb) {
return {};
}
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
return {timestamp_type->decompose(ts)};
});
}
@@ -125,7 +143,7 @@ make_unix_timestamp_of_fct() {
if (!bb) {
return {};
}
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
});
}
@@ -176,7 +194,7 @@ make_timeuuidtodate_fct() {
if (!bb) {
return {};
}
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
auto to_simple_date = get_castas_fctn(simple_date_type, timestamp_type);
return {simple_date_type->decompose(to_simple_date(ts))};
});
@@ -211,7 +229,7 @@ make_timeuuidtotimestamp_fct() {
if (!bb) {
return {};
}
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
return {timestamp_type->decompose(ts)};
});
}
@@ -245,10 +263,14 @@ make_timeuuidtounixtimestamp_fct() {
if (!bb) {
return {};
}
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
});
}
inline bytes time_point_to_long(const data_value& v) {
return data_value(get_valid_timestamp(v)).serialize();
}
inline
shared_ptr<function>
make_timestamptounixtimestamp_fct() {
@@ -263,7 +285,7 @@ make_timestamptounixtimestamp_fct() {
if (ts_obj.is_null()) {
return {};
}
return {long_type->decompose(ts_obj)};
return time_point_to_long(ts_obj);
});
}
@@ -282,7 +304,7 @@ make_datetounixtimestamp_fct() {
return {};
}
auto from_simple_date = get_castas_fctn(timestamp_type, simple_date_type);
return {long_type->decompose(from_simple_date(simple_date_obj))};
return time_point_to_long(from_simple_date(simple_date_obj));
});
}

View File

@@ -130,6 +130,18 @@ query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<ser
}
query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
: query_options(qo->_consistency,
qo->get_timeout_config(),
std::move(qo->_names),
std::move(qo->_values),
std::move(qo->_value_views),
qo->_skip_metadata,
std::move(query_options::specific_options{page_size, paging_state, qo->_options.serial_consistency, qo->_options.timestamp}),
qo->_cql_serialization_format) {
}
query_options::query_options(std::vector<cql3::raw_value> values)
: query_options(
db::consistency_level::ONE, infinite_timeout_config, std::move(values))

View File

@@ -102,7 +102,7 @@ private:
public:
query_options(query_options&&) = default;
query_options(const query_options&) = delete;
explicit query_options(const query_options&) = default;
explicit query_options(db::consistency_level consistency,
const timeout_config& timeouts,
@@ -155,6 +155,7 @@ public:
explicit query_options(db::consistency_level, const timeout_config& timeouts,
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state);
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);
const timeout_config& get_timeout_config() const { return _timeout_config; }

View File

@@ -380,28 +380,45 @@ std::vector<const column_definition*> statement_restrictions::get_column_defs_fo
if (need_filtering()) {
auto& sim = db.find_column_family(_schema).get_index_manager();
auto [opt_idx, _] = find_idx(sim);
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef) {
return opt_idx && opt_idx->depends_on(*cdef);
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef, ::shared_ptr<single_column_restriction> restr) {
return opt_idx && restr && restr->is_supported_by(*opt_idx);
};
auto single_pk_restrs = dynamic_pointer_cast<single_column_partition_key_restrictions>(_partition_key_restrictions);
if (_partition_key_restrictions->needs_filtering(*_schema)) {
for (auto&& cdef : _partition_key_restrictions->get_column_defs()) {
if (!column_uses_indexing(cdef)) {
::shared_ptr<single_column_restriction> restr;
if (single_pk_restrs) {
auto it = single_pk_restrs->restrictions().find(cdef);
if (it != single_pk_restrs->restrictions().end()) {
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
}
}
if (!column_uses_indexing(cdef, restr)) {
column_defs_for_filtering.emplace_back(cdef);
}
}
}
auto single_ck_restrs = dynamic_pointer_cast<single_column_clustering_key_restrictions>(_clustering_columns_restrictions);
const bool pk_has_unrestricted_components = _partition_key_restrictions->has_unrestricted_components(*_schema);
if (pk_has_unrestricted_components || _clustering_columns_restrictions->needs_filtering(*_schema)) {
column_id first_filtering_id = pk_has_unrestricted_components ? 0 : _schema->clustering_key_columns().begin()->id +
_clustering_columns_restrictions->num_prefix_columns_that_need_not_be_filtered();
for (auto&& cdef : _clustering_columns_restrictions->get_column_defs()) {
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef)) {
::shared_ptr<single_column_restriction> restr;
if (single_pk_restrs) {
auto it = single_ck_restrs->restrictions().find(cdef);
if (it != single_ck_restrs->restrictions().end()) {
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
}
}
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef, restr)) {
column_defs_for_filtering.emplace_back(cdef);
}
}
}
for (auto&& cdef : _nonprimary_key_restrictions->get_column_defs()) {
if (!column_uses_indexing(cdef)) {
auto restr = dynamic_pointer_cast<single_column_restriction>(_nonprimary_key_restrictions->get_restriction(*cdef));
if (!column_uses_indexing(cdef, restr)) {
column_defs_for_filtering.emplace_back(cdef);
}
}

View File

@@ -92,6 +92,14 @@ public:
: abstract_function_selector(fun, std::move(arg_selectors))
, _tfun(dynamic_pointer_cast<T>(fun)) {
}
const functions::function_name& name() const {
return _tfun->name();
}
virtual sstring assignment_testable_source_context() const override {
return format("{}", this->name());
}
};
}

View File

@@ -79,11 +79,6 @@ public:
dynamic_pointer_cast<functions::aggregate_function>(func), std::move(arg_selectors))
, _aggregate(fun()->new_aggregate()) {
}
virtual sstring assignment_testable_source_context() const override {
// FIXME:
return "FIXME";
}
};
}

View File

@@ -82,12 +82,6 @@ public:
: abstract_function_selector_for<functions::scalar_function>(
dynamic_pointer_cast<functions::scalar_function>(std::move(fun)), std::move(arg_selectors)) {
}
virtual sstring assignment_testable_source_context() const override {
// FIXME:
return "FIXME";
}
};
}

View File

@@ -142,7 +142,7 @@ shared_ptr<selector::factory>
selectable::with_field_selection::new_selector_factory(database& db, schema_ptr s, std::vector<const column_definition*>& defs) {
auto&& factory = _selected->new_selector_factory(db, s, defs);
auto&& type = factory->new_instance()->get_type();
auto&& ut = dynamic_pointer_cast<const user_type_impl>(std::move(type));
auto&& ut = dynamic_pointer_cast<const user_type_impl>(type->underlying_type());
if (!ut) {
throw exceptions::invalid_request_exception(
format("Invalid field selection: {} of type {} is not a user type",

View File

@@ -166,7 +166,8 @@ alter_type_statement::add_or_alter::add_or_alter(const ut_name& name, bool is_ad
user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_update) const
{
if (get_idx_of_field(to_update, _field_name)) {
throw exceptions::invalid_request_exception(format("Cannot add new field {} to type {}: a field of the same name already exists", _field_name->name(), _name.to_string()));
throw exceptions::invalid_request_exception(format("Cannot add new field {} to type {}: a field of the same name already exists",
_field_name->to_string(), _name.to_string()));
}
std::vector<bytes> new_names(to_update->field_names());
@@ -174,7 +175,7 @@ user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_
std::vector<data_type> new_types(to_update->field_types());
auto&& add_type = _field_type->prepare(db, keyspace()).get_type();
if (add_type->references_user_type(to_update->_keyspace, to_update->_name)) {
throw exceptions::invalid_request_exception(format("Cannot add new field {} of type {} to type {} as this would create a circular reference", _field_name->name(), _field_type->to_string(), _name.to_string()));
throw exceptions::invalid_request_exception(format("Cannot add new field {} of type {} to type {} as this would create a circular reference", _field_name->to_string(), _field_type->to_string(), _name.to_string()));
}
new_types.push_back(std::move(add_type));
return user_type_impl::get_instance(to_update->_keyspace, to_update->_name, std::move(new_names), std::move(new_types));
@@ -184,13 +185,14 @@ user_type alter_type_statement::add_or_alter::do_alter(database& db, user_type t
{
std::optional<uint32_t> idx = get_idx_of_field(to_update, _field_name);
if (!idx) {
throw exceptions::invalid_request_exception(format("Unknown field {} in type {}", _field_name->name(), _name.to_string()));
throw exceptions::invalid_request_exception(format("Unknown field {} in type {}", _field_name->to_string(), _name.to_string()));
}
auto previous = to_update->field_types()[*idx];
auto new_type = _field_type->prepare(db, keyspace()).get_type();
if (!new_type->is_compatible_with(*previous)) {
throw exceptions::invalid_request_exception(format("Type {} in incompatible with previous type {} of field {} in user type {}", _field_type->to_string(), previous->as_cql3_type().to_string(), _field_name->name(), _name.to_string()));
throw exceptions::invalid_request_exception(format("Type {} in incompatible with previous type {} of field {} in user type {}",
_field_type->to_string(), previous->as_cql3_type().to_string(), _field_name->to_string(), _name.to_string()));
}
std::vector<data_type> new_types(to_update->field_types());

View File

@@ -440,8 +440,8 @@ indexed_table_select_statement::prepare_command_for_base_query(const query_optio
return cmd;
}
future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
indexed_table_select_statement::do_execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
@@ -492,22 +492,27 @@ indexed_table_select_statement::execute_base_query(
}).then([&merger]() {
return merger.get();
});
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
});
}
// Function for fetching the selected columns from a list of clustering rows.
// It is currently used only in our Secondary Index implementation - ordinary
// CQL SELECT statements do not have the syntax to request a list of rows.
// FIXME: The current implementation is very inefficient - it requests each
// row separately (and, incrementally, in parallel). Even multiple rows from a single
// partition are requested separately. This last case can be easily improved,
// but to implement the general case (multiple rows from multiple partitions)
// efficiently, we will need more support from other layers.
// Keys are ordered in token order (see #3423)
future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state) {
return do_execute_base_query(proxy, std::move(partition_ranges), state, options, now, paging_state).then(
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
});
}
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
indexed_table_select_statement::do_execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
@@ -562,9 +567,23 @@ indexed_table_select_statement::execute_base_query(
});
}).then([&merger] () {
return merger.get();
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
});
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
});
}
future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state) {
return do_execute_base_query(proxy, std::move(primary_keys), state, options, now, paging_state).then(
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
});
}
@@ -868,6 +887,60 @@ indexed_table_select_statement::do_execute(service::storage_proxy& proxy,
}
}
// Aggregated and paged filtering needs to aggregate the results from all pages
// in order to avoid returning partial per-page results (issue #4540).
// It's a little bit more complicated than regular aggregation, because each paging state
// needs to be translated between the base table and the underlying view.
// The routine below keeps fetching pages from the underlying view, which are then
// used to fetch base rows, which go straight to the result set builder.
// A local, internal copy of query_options is kept in order to keep updating
// the paging state between requesting data from replicas.
const bool aggregate = _selection->is_aggregate();
if (aggregate) {
const bool restrictions_need_filtering = _restrictions->need_filtering();
return do_with(cql3::selection::result_set_builder(*_selection, now, options.get_cql_serialization_format()), std::make_unique<cql3::query_options>(cql3::query_options(options)),
[this, &options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] (cql3::selection::result_set_builder& builder, std::unique_ptr<cql3::query_options>& internal_options) {
// page size is set to the internal count page size, regardless of the user-provided value
internal_options.reset(new cql3::query_options(std::move(internal_options), options.get_paging_state(), DEFAULT_COUNT_PAGE_SIZE));
return repeat([this, &builder, &options, &internal_options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] () {
auto consume_results = [this, &builder, &options, &internal_options, restrictions_need_filtering] (foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
if (restrictions_need_filtering) {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection,
cql3::selection::result_set_builder::restrictions_filter(_restrictions, options, cmd->row_limit, _schema, cmd->slice.partition_row_limit())));
} else {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection));
}
};
if (whole_partitions || partition_slices) {
return find_index_partition_ranges(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (dht::partition_range_vector partition_ranges, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return do_execute_base_query(proxy, std::move(partition_ranges), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
} else {
return find_index_clustering_rows(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (std::vector<primary_key> primary_keys, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return this->do_execute_base_query(proxy, std::move(primary_keys), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
}
}).then([this, &builder, restrictions_need_filtering] () {
auto rs = builder.build();
update_stats_rows_read(rs->size());
_stats.filtered_rows_matched_total += restrictions_need_filtering ? rs->size() : 0;
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
});
});
}
if (whole_partitions || partition_slices) {
// In this case, can use our normal query machinery, which retrieves
// entire partitions or the same slice for many partitions.

View File

@@ -68,8 +68,8 @@ class select_statement : public cql_statement {
public:
using parameters = raw::select_statement::parameters;
using ordering_comparator_type = raw::select_statement::ordering_comparator_type;
protected:
static constexpr int DEFAULT_COUNT_PAGE_SIZE = 10000;
protected:
static thread_local const ::shared_ptr<parameters> _default_parameters;
schema_ptr _schema;
uint32_t _bound_terms;
@@ -229,6 +229,14 @@ private:
lw_shared_ptr<query::read_command>
prepare_command_for_base_query(const query_options& options, service::query_state& state, gc_clock::time_point now, bool use_paging);
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
do_execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
future<shared_ptr<cql_transport::messages::result_message>>
execute_base_query(
service::storage_proxy& proxy,
@@ -238,6 +246,23 @@ private:
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
// Function for fetching the selected columns from a list of clustering rows.
// It is currently used only in our Secondary Index implementation - ordinary
// CQL SELECT statements do not have the syntax to request a list of rows.
// FIXME: The current implementation is very inefficient - it requests each
// row separately (and, incrementally, in parallel). Even multiple rows from a single
// partition are requested separately. This last case can be easily improved,
// but to implement the general case (multiple rows from multiple partitions)
// efficiently, we will need more support from other layers.
// Keys are ordered in token order (see #3423)
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
do_execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
future<shared_ptr<cql_transport::messages::result_message>>
execute_base_query(
service::storage_proxy& proxy,

View File

@@ -32,7 +32,7 @@ tuples::component_spec_of(shared_ptr<column_specification> column, size_t compon
column->ks_name,
column->cf_name,
::make_shared<column_identifier>(format("{}[{:d}]", column->name, component), true),
static_pointer_cast<const tuple_type_impl>(column->type)->type(component));
static_pointer_cast<const tuple_type_impl>(column->type->underlying_type())->type(component));
}
shared_ptr<term>

View File

@@ -70,7 +70,7 @@ public:
private:
void validate_assignable_to(database& db, const sstring& keyspace, shared_ptr<column_specification> receiver) {
auto tt = dynamic_pointer_cast<const tuple_type_impl>(receiver->type);
auto tt = dynamic_pointer_cast<const tuple_type_impl>(receiver->type->underlying_type());
if (!tt) {
throw exceptions::invalid_request_exception(format("Invalid tuple type literal for {} of type {}", receiver->name, receiver->type->as_cql3_type()));
}

View File

@@ -260,6 +260,10 @@ void backlog_controller::adjust() {
float backlog_controller::backlog_of_shares(float shares) const {
size_t idx = 1;
// No control points means the controller is disabled.
if (_control_points.size() == 0) {
return 1.0f;
}
while ((idx < _control_points.size() - 1) && (_control_points[idx].output < shares)) {
idx++;
}
@@ -1963,7 +1967,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), partitioner, std::move(s), pr, ps, pc,
std::move(trace_state), fwd_mr);
});
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), schema->full_slice(),
auto&& full_slice = schema->full_slice();
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), std::move(full_slice),
service::get_local_streaming_read_priority(), {}, mutation_reader::forwarding::no);
}

View File

@@ -1236,6 +1236,34 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
}
}
/// \brief Helper for ensuring a file is closed if an exception is thrown.
///
/// The file provided by the file_fut future is passed to func.
/// * If func throws an exception E, the file is closed and we return
/// a failed future with E.
/// * If func returns a value V, the file is not closed and we return
/// a future with V.
/// Note that when an exception is not thrown, it is the
/// responsibility of func to make sure the file will be closed. It
/// can close the file itself, return it, or store it somewhere.
///
/// \tparam Func The type of function this wraps
/// \param file_fut A future that produces a file
/// \param func A function that uses a file
/// \return A future that passes the file produced by file_fut to func
/// and closes it if func fails
template <typename Func>
static auto close_on_failure(future<file> file_fut, Func func) {
return file_fut.then([func = std::move(func)](file f) {
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
using futurator = futurize<std::result_of_t<Func(file)>>;
return futurator::make_exception_future(e);
});
});
});
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(const descriptor& d, sstring filename, open_flags flags, bool active) {
file_open_options opt;
opt.extent_allocation_size_hint = max_size;
@@ -1262,7 +1290,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
return fut;
});
return fut.then([this, d, active, filename](file f) {
return close_on_failure(std::move(fut), [this, d, active, filename] (file f) {
f = make_checked_file(commit_error_handler, f);
// xfs doesn't like files extended betond eof, so enlarge the file
return f.truncate(max_size).then([this, d, active, f, filename] () mutable {

View File

@@ -735,6 +735,7 @@ public:
val(shutdown_announce_in_ms, uint32_t, 2 * 1000, Used, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.") \
val(developer_mode, bool, false, Used, "Relax environment checks. Setting to true can reduce performance and reliability significantly.") \
val(skip_wait_for_gossip_to_settle, int32_t, -1, Used, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.") \
val(force_gossip_generation, int32_t, -1, Used, "Force gossip to use the generation number provided by user") \
val(experimental, bool, false, Used, "Set to true to unlock experimental features.") \
val(lsa_reclamation_step, size_t, 1, Used, "Minimum number of segments to reclaim in a single step") \
val(prometheus_port, uint16_t, 9180, Used, "Prometheus port, set to zero to disable") \

View File

@@ -405,11 +405,8 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no);
} else {
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
// unavailable exception.
auto timeout = db::timeout_clock::now() + 1h;
//FIXME: Add required frozen_mutation overloads
return _proxy.mutate({m.fm.unfreeze(m.s)}, consistency_level::ALL, timeout, nullptr);
return _proxy.mutate_hint_from_scratch(std::move(m));
}
});
}

View File

@@ -1092,10 +1092,31 @@ static std::vector<V> get_list(const query::result_set_row& row, const sstring&
// Create types for a given keyspace. This takes care of topologically sorting user defined types.
template <typename T> static std::vector<user_type> create_types(keyspace_metadata& ks, T&& range) {
cql_type_parser::raw_builder builder(ks);
std::unordered_set<bytes> names;
for (const query::result_set_row& row : range) {
builder.add(row.get_nonnull<sstring>("type_name"),
get_list<sstring>(row, "field_names"),
get_list<sstring>(row, "field_types"));
auto name = row.get_nonnull<sstring>("type_name");
names.insert(to_bytes(name));
builder.add(std::move(name), get_list<sstring>(row, "field_names"), get_list<sstring>(row, "field_types"));
}
// Add user types that use any of the above types. From the
// database point of view they haven't changed since the content
// of system.types is the same for them. The runtime objects in
// the other hand now point to out of date types, so we need to
// recreate them.
for (const auto& p : ks.user_types()->get_all_types()) {
const user_type& t = p.second;
if (names.count(t->_name) != 0) {
continue;
}
for (const auto& name : names) {
if (t->references_user_type(t->_keyspace, name)) {
std::vector<sstring> field_types;
for (const data_type& f : t->field_types()) {
field_types.push_back(f->as_cql3_type().to_string());
}
builder.add(t->get_name_as_string(), t->string_field_names(), std::move(field_types));
}
}
}
return builder.build();
}

View File

@@ -44,6 +44,11 @@ namespace db::view {
// columns. When reading the results from the scylla_views_builds_in_progress
// table, we adjust the clustering key (we shed the cpu_id column) and map
// back the regular columns.
// Since mutation fragment consumers expect clustering_row fragments
// not to be duplicated for given primary key, previous clustering key
// is stored between mutation fragments. If the clustering key becomes
// the same as the previous one (as a result of trimming cpu_id),
// the duplicated fragment is ignored.
class build_progress_virtual_reader {
database& _db;
@@ -55,6 +60,7 @@ class build_progress_virtual_reader {
const query::partition_slice& _legacy_slice;
query::partition_slice _slice;
flat_mutation_reader _underlying;
std::optional<clustering_key> _previous_clustering_key;
build_progress_reader(
schema_ptr legacy_schema,
@@ -79,7 +85,8 @@ class build_progress_virtual_reader {
pc,
std::move(trace_state),
fwd,
fwd_mr)) {
fwd_mr))
, _previous_clustering_key() {
}
const schema& underlying_schema() const {
@@ -127,8 +134,13 @@ class build_progress_virtual_reader {
legacy_in_progress_row.append_cell(_legacy_generation_number_col, std::move(c));
}
});
auto ck = adjust_ckey(scylla_in_progress_row.key());
if (_previous_clustering_key && ck.equal(*_schema, *_previous_clustering_key)) {
continue;
}
_previous_clustering_key = ck;
mf = clustering_row(
adjust_ckey(scylla_in_progress_row.key()),
std::move(ck),
std::move(scylla_in_progress_row.tomb()),
std::move(scylla_in_progress_row.marker()),
std::move(legacy_in_progress_row));
@@ -140,6 +152,8 @@ class build_progress_virtual_reader {
adjust_ckey(scylla_in_progress_rt.end),
scylla_in_progress_rt.end_kind,
scylla_in_progress_rt.tomb);
} else if (mf.is_end_of_partition()) {
_previous_clustering_key.reset();
}
push_mutation_fragment(std::move(mf));
}
@@ -192,4 +206,4 @@ public:
}
};
}
}

View File

@@ -83,7 +83,7 @@ view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
cql3::statements::select_statement& view_info::select_statement() const {
if (!_select_statement) {
shared_ptr<cql3::statements::raw::select_statement> raw;
if (is_index()) {
if (service::get_local_storage_service().db().local().find_column_family(base_id()).get_index_manager().is_global_index(_schema)) {
// Token column is the first clustering column
auto token_column_it = boost::range::find_if(_schema.all_columns(), std::mem_fn(&column_definition::is_clustering_key));
auto real_columns = _schema.all_columns() | boost::adaptors::filtered([this, token_column_it](const column_definition& cdef) {
@@ -449,7 +449,7 @@ void create_virtual_column(schema_builder& builder, const bytes& name, const dat
// A map has keys and values. We don't need these values,
// and can use empty values instead.
auto mtype = dynamic_pointer_cast<const map_type_impl>(type);
builder.with_column(name, map_type_impl::get_instance(mtype->get_values_type(), empty_type, true), column_kind::regular_column, column_view_virtual::yes);
builder.with_column(name, map_type_impl::get_instance(mtype->get_keys_type(), empty_type, true), column_kind::regular_column, column_view_virtual::yes);
} else if (ctype->is_set()) {
// A set's cell has nothing beyond the keys, so the
// virtual version of a set is, unfortunately, a complete

View File

@@ -476,6 +476,8 @@ def create_perftune_conf(nic='eth0'):
def is_valid_nic(nic):
if len(nic) == 0:
return False
return os.path.exists('/sys/class/net/{}'.format(nic))
# Remove this when we do not support SET_NIC configuration value anymore

View File

@@ -125,7 +125,7 @@ if [ -z "$TARGET" ]; then
fi
RELOC_PKG_FULLPATH=$(readlink -f $RELOC_PKG)
RELOC_PKG_BASENAME=$(basename $RELOC_PKG)
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE)
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE | sed 's/\.rc/~rc/')
SCYLLA_RELEASE=$(cat SCYLLA-RELEASE-FILE)
ln -fv $RELOC_PKG_FULLPATH ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz

View File

@@ -4,7 +4,11 @@ set -e
case "$1" in
purge|remove)
rm -rf /etc/systemd/system/scylla-server.service.d/
# We need to keep dependencies.conf and sysconfdir.conf on 'remove',
# otherwise it will be missing after rollback.
if [ "$1" = "purge" ]; then
rm -rf /etc/systemd/system/scylla-server.service.d/
fi
;;
esac

View File

@@ -15,3 +15,4 @@ dist/common/systemd/scylla-housekeeping-restart.timer /lib/systemd/system
dist/common/systemd/scylla-fstrim.timer /lib/systemd/system
dist/debian/scripts/scylla_save_coredump usr/lib/scylla
dist/debian/scripts/scylla_delay_fstrim usr/lib/scylla
tools/scyllatop usr/lib/scylla

View File

@@ -15,6 +15,10 @@ Obsoletes: scylla-server < 1.1
%global __brp_python_bytecompile %{nil}
%global __brp_mangle_shebangs %{nil}
# Prevent find-debuginfo.sh from tempering with scylla's build-id (#5881)
%undefine _unique_build_ids
%global _no_recompute_build_ids 1
%description
Scylla is a highly scalable, eventually consistent, distributed,
partitioned row DB.

View File

@@ -98,6 +98,13 @@ public:
sstring get_message() const { return what(); }
};
class server_exception : public cassandra_exception {
public:
server_exception(sstring msg) noexcept
: exceptions::cassandra_exception{exceptions::exception_code::SERVER_ERROR, std::move(msg)}
{ }
};
class protocol_exception : public cassandra_exception {
public:
protocol_exception(sstring msg) noexcept

View File

@@ -481,8 +481,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
int local_generation = local_ep_state_ptr.get_heart_beat_state().get_generation();
int remote_generation = remote_state.get_heart_beat_state().get_generation();
logger.trace("{} local generation {}, remote generation {}", ep, local_generation, remote_generation);
// A node was removed with nodetool removenode can have a generation of 2
if (local_generation > 2 && remote_generation > local_generation + MAX_GENERATION_DIFFERENCE) {
if (remote_generation > service::get_generation_number() + MAX_GENERATION_DIFFERENCE) {
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
ep, local_generation, remote_generation);
@@ -1613,11 +1612,15 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
// message on all cpus and forard them to cpu0 to process.
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
g.init_messaging_service_handler(do_bind);
}).then([this, generation_nbr, preload_local_states] {
}).then([this, generation_nbr, preload_local_states] () mutable {
build_seeds_list();
/* initialize the heartbeat state for this localEndpoint */
maybe_initialize_local_state(generation_nbr);
if (_cfg.force_gossip_generation() > 0) {
generation_nbr = _cfg.force_gossip_generation();
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
local_state.mark_alive();
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
}
@@ -1821,7 +1824,8 @@ future<> gossiper::do_stop_gossiping() {
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
logger.info("Announcing shutdown");
add_local_application_state(application_state::STATUS, _value_factory.shutdown(true)).get();
for (inet_address addr : _live_endpoints) {
auto live_endpoints = _live_endpoints;
for (inet_address addr : live_endpoints) {
msg_addr id = get_msg_addr(addr);
logger.trace("Sending a GossipShutdown to {}", id);
ms().send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {

View File

@@ -160,7 +160,9 @@ public:
static constexpr std::chrono::milliseconds INTERVAL{1000};
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
/** Maximimum difference in generation and version values we are willing to accept about a peer */
// Maximimum difference between remote generation value and generation
// value this node would get if this node were restarted that we are
// willing to accept about a peer.
static constexpr int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365;
std::chrono::milliseconds fat_client_timeout;

View File

@@ -26,6 +26,6 @@ class partition {
class reconcilable_result {
uint32_t row_count();
std::vector<partition> partitions();
utils::chunked_vector<partition> partitions();
query::short_read is_short_read() [[version 1.6]] = query::short_read::no;
};

View File

@@ -181,4 +181,10 @@ bool secondary_index_manager::is_index(const schema& s) const {
});
}
bool secondary_index_manager::is_global_index(const schema& s) const {
return boost::algorithm::any_of(_indices | boost::adaptors::map_values, [&s] (const index& i) {
return !i.metadata().local() && s.cf_name() == index_table_name(i.metadata().name());
});
}
}

View File

@@ -77,6 +77,7 @@ public:
std::vector<index> list_indexes() const;
bool is_index(view_ptr) const;
bool is_index(const schema& s) const;
bool is_global_index(const schema& s) const;
private:
void add_index(const index_metadata& im);
};

View File

@@ -53,13 +53,13 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
endpoints.reserve(replicas);
for (auto& token : tm.ring_range(t)) {
if (endpoints.size() == replicas) {
break;
}
auto ep = tm.get_endpoint(token);
assert(ep);
endpoints.push_back(*ep);
if (endpoints.size() == replicas) {
break;
}
}
return std::move(endpoints.get_vector());

10
main.cc
View File

@@ -54,6 +54,7 @@
#include <seastar/core/file.hh>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/prctl.h>
#include "disk-error-handler.hh"
#include "tracing/tracing.hh"
#include "tracing/tracing_backend_registry.hh"
@@ -323,6 +324,15 @@ static std::optional<std::vector<sstring>> parse_hinted_handoff_enabled(sstring
}
int main(int ac, char** av) {
// Allow core dumps. The would be disabled by default if
// CAP_SYS_NICE was added to the binary, as is suggested by the
// epoll backend.
int r = prctl(PR_SET_DUMPABLE, 1, 0, 0, 0);
if (r) {
std::cerr << "Could not make scylla dumpable\n";
exit(1);
}
int return_value = 0;
try {
// early check to avoid triggering

View File

@@ -39,6 +39,9 @@
#include "mutation_cleaner.hh"
#include <seastar/core/execution_stage.hh>
#include "types/map.hh"
#include "utils/exceptions.hh"
logging::logger mplog("mutation_partition");
template<bool reversed>
struct reversal_traits;
@@ -1227,7 +1230,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
void
row::append_cell(column_id id, atomic_cell_or_collection value) {
if (_type == storage_type::vector && id < max_vector_size) {
assert(_storage.vector.v.size() <= id);
if (_storage.vector.v.size() > id) {
on_internal_error(mplog, format("Attempted to append cell#{} to row already having {} cells", id, _storage.vector.v.size()));
}
_storage.vector.v.resize(id);
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
_storage.vector.present.set(id);

View File

@@ -397,7 +397,7 @@ public:
if (is_missing() || _ttl == dead) {
return false;
}
if (_ttl != no_ttl && _expiry < now) {
if (_ttl != no_ttl && _expiry <= now) {
return false;
}
return _timestamp > t.timestamp;
@@ -407,7 +407,7 @@ public:
if (_ttl == dead) {
return true;
}
return _ttl != no_ttl && _expiry < now;
return _ttl != no_ttl && _expiry <= now;
}
// Can be called only when is_live().
bool is_expiring() const {
@@ -447,7 +447,7 @@ public:
_timestamp = api::missing_timestamp;
return false;
}
if (_ttl > no_ttl && _expiry < now) {
if (_ttl > no_ttl && _expiry <= now) {
_expiry -= _ttl;
_ttl = dead;
}

View File

@@ -31,7 +31,7 @@ reconcilable_result::reconcilable_result()
: _row_count(0)
{ }
reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partition> p, query::short_read short_read,
reconcilable_result::reconcilable_result(uint32_t row_count, utils::chunked_vector<partition> p, query::short_read short_read,
query::result_memory_tracker memory_tracker)
: _row_count(row_count)
, _short_read(short_read)
@@ -39,11 +39,11 @@ reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partiti
, _partitions(std::move(p))
{ }
const std::vector<partition>& reconcilable_result::partitions() const {
const utils::chunked_vector<partition>& reconcilable_result::partitions() const {
return _partitions;
}
std::vector<partition>& reconcilable_result::partitions() {
utils::chunked_vector<partition>& reconcilable_result::partitions() {
return _partitions;
}

View File

@@ -27,6 +27,7 @@
#include "frozen_mutation.hh"
#include "db/timeout_clock.hh"
#include "querier.hh"
#include "utils/chunked_vector.hh"
#include <seastar/core/execution_stage.hh>
class reconcilable_result;
@@ -72,17 +73,17 @@ class reconcilable_result {
uint32_t _row_count;
query::short_read _short_read;
query::result_memory_tracker _memory_tracker;
std::vector<partition> _partitions;
utils::chunked_vector<partition> _partitions;
public:
~reconcilable_result();
reconcilable_result();
reconcilable_result(reconcilable_result&&) = default;
reconcilable_result& operator=(reconcilable_result&&) = default;
reconcilable_result(uint32_t row_count, std::vector<partition> partitions, query::short_read short_read,
reconcilable_result(uint32_t row_count, utils::chunked_vector<partition> partitions, query::short_read short_read,
query::result_memory_tracker memory_tracker = { });
const std::vector<partition>& partitions() const;
std::vector<partition>& partitions();
const utils::chunked_vector<partition>& partitions() const;
utils::chunked_vector<partition>& partitions();
uint32_t row_count() const {
return _row_count;
@@ -112,7 +113,7 @@ class reconcilable_result_builder {
const schema& _schema;
const query::partition_slice& _slice;
std::vector<partition> _result;
utils::chunked_vector<partition> _result;
uint32_t _live_rows{};
bool _has_ck_selector{};

View File

@@ -8,7 +8,6 @@ print_usage() {
echo " --clean clean build directory"
echo " --compiler C++ compiler path"
echo " --c-compiler C compiler path"
echo " --nodeps skip installing dependencies"
exit 1
}
@@ -16,7 +15,6 @@ JOBS=
CLEAN=
COMPILER=
CCOMPILER=
NODEPS=
while [ $# -gt 0 ]; do
case "$1" in
"--jobs")
@@ -36,7 +34,6 @@ while [ $# -gt 0 ]; do
shift 2
;;
"--nodeps")
NODEPS=yes
shift 1
;;
*)
@@ -66,10 +63,6 @@ if [ -f build/release/scylla-package.tar.gz ]; then
rm build/release/scylla-package.tar.gz
fi
if [ -z "$NODEPS" ]; then
sudo ./install-dependencies.sh
fi
NINJA=$(which ninja-build) &&:
if [ -z "$NINJA" ]; then
NINJA=$(which ninja) &&:

View File

@@ -780,8 +780,10 @@ static future<> repair_cf_range(repair_info& ri,
// still do our best to repair available replicas.
std::vector<gms::inet_address> live_neighbors;
std::vector<partition_checksum> live_neighbors_checksum;
bool local_checksum_failed = false;
for (unsigned i = 0; i < checksums.size(); i++) {
if (checksums[i].failed()) {
local_checksum_failed |= (i == 0);
rlogger.warn(
"Checksum of ks={}, table={}, range={} on {} failed: {}",
ri.keyspace, cf, range,
@@ -797,7 +799,7 @@ static future<> repair_cf_range(repair_info& ri,
live_neighbors_checksum.push_back(checksums[i].get0());
}
}
if (checksums[0].failed() || live_neighbors.empty()) {
if (local_checksum_failed || live_neighbors.empty()) {
return make_ready_future<>();
}
// If one of the available checksums is different, repair

View File

@@ -371,6 +371,10 @@ class repair_writer {
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
// Current partition written to disk
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
// Is current partition still open. A partition is opened when a
// partition_start is written and is closed when a partition_end is
// written.
std::vector<bool> _partition_opened;
public:
repair_writer(
schema_ptr schema,
@@ -385,10 +389,13 @@ public:
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf, unsigned node_idx) {
_current_dk_written_to_sstable[node_idx] = dk;
if (mf.is_partition_start()) {
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] {
_partition_opened[node_idx] = true;
});
} else {
auto start = mutation_fragment(partition_start(dk->dk, tombstone()));
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable {
_partition_opened[node_idx] = true;
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
});
}
@@ -398,6 +405,7 @@ public:
_writer_done.resize(_nr_peer_nodes);
_mq.resize(_nr_peer_nodes);
_current_dk_written_to_sstable.resize(_nr_peer_nodes);
_partition_opened.resize(_nr_peer_nodes, false);
}
void create_writer(unsigned node_idx) {
@@ -434,12 +442,21 @@ public:
t.stream_in_progress());
}
future<> write_partition_end(unsigned node_idx) {
if (_partition_opened[node_idx]) {
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] {
_partition_opened[node_idx] = false;
});
}
return make_ready_future<>();
}
future<> do_write(unsigned node_idx, lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
if (_current_dk_written_to_sstable[node_idx]) {
if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) {
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
} else {
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this,
return write_partition_end(node_idx).then([this,
node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable {
return write_start_and_mf(std::move(dk), std::move(mf), node_idx);
});
@@ -453,7 +470,7 @@ public:
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
if (_writer_done[node_idx] && _mq[node_idx]) {
// Partition_end is never sent on wire, so we have to write one ourselves.
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] () mutable {
return write_partition_end(node_idx).then([this, node_idx] () mutable {
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
@@ -1458,7 +1475,7 @@ class row_level_repair {
// If the total size of the `_row_buf` on either of the nodes is zero,
// we set this flag, which is an indication that rows are not synced.
bool _zero_rows;
bool _zero_rows = false;
// Sum of estimated_partitions on all peers
uint64_t _estimated_partitions = 0;

View File

@@ -925,7 +925,6 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
});
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
coroutine update;
size_t size_entry;
// In case updater fails, we must bring the cache to consistency without deferring.
auto cleanup = defer([&m, this] {
@@ -933,6 +932,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
_prev_snapshot_pos = {};
_prev_snapshot = {};
});
coroutine update; // Destroy before cleanup to release snapshots before invalidating.
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
while (!m.partitions.empty()) {
with_allocator(_tracker.allocator(), [&] () {

View File

@@ -78,7 +78,7 @@ for exe in executables:
libs.update(ldd(exe))
# manually add libthread_db for debugging thread
libs.update({'libthread_db-1.0.so': '/lib64/libthread_db-1.0.so'})
libs.update({'libthread_db.so.1': '/lib64/libthread_db-1.0.so'})
ld_so = libs['ld.so']

Submodule seastar updated: 75488f6ef2...a51bd8b91a

View File

@@ -162,13 +162,14 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
auto& g = gms::get_local_gossiper();
auto& ss = get_local_storage_service();
_slen = _gstate.size();
g.add_local_application_state(gms::application_state::CACHE_HITRATES, ss.value_factory.cache_hitrates(_gstate));
// if max difference during this round is big schedule next recalculate earlier
if (_diff < 0.01) {
return std::chrono::milliseconds(2000);
} else {
return std::chrono::milliseconds(500);
}
return g.add_local_application_state(gms::application_state::CACHE_HITRATES, ss.value_factory.cache_hitrates(_gstate)).then([this] {
// if max difference during this round is big schedule next recalculate earlier
if (_diff < 0.01) {
return std::chrono::milliseconds(2000);
} else {
return std::chrono::milliseconds(500);
}
});
}).finally([this] {
_gstate = std::string(); // free memory, do not trust clear() to do that for string
_rates.clear();

View File

@@ -350,7 +350,9 @@ public:
}
void on_released() {
_expire_timer.cancel();
_mutation_holder->release_mutation();
if (_targets.size() == 0) {
_mutation_holder->release_mutation();
}
}
void timeout_cb() {
if (_cl_achieved || _cl == db::consistency_level::ANY) {
@@ -1558,6 +1560,14 @@ future<> storage_proxy::send_to_endpoint(
allow_hints);
}
future<> storage_proxy::mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s) {
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
// unavailable exception.
const auto timeout = db::timeout_clock::now() + 1h;
std::array<mutation, 1> ms{fm_a_s.fm.unfreeze(fm_a_s.s)};
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, timeout);
}
/**
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
* is not available.
@@ -2305,8 +2315,8 @@ public:
// build reconcilable_result from reconciled data
// traverse backwards since large keys are at the start
std::vector<partition> vec;
auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::ref(vec), [] (std::vector<partition>& a, const mutation_and_live_row_count& m_a_rc) {
utils::chunked_vector<partition> vec;
auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::ref(vec), [] (utils::chunked_vector<partition>& a, const mutation_and_live_row_count& m_a_rc) {
a.emplace_back(partition(m_a_rc.live_row_count, freeze(m_a_rc.mut)));
return std::ref(a);
});

View File

@@ -387,6 +387,8 @@ public:
*/
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
future<> mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s);
// Send a mutation to one specific remote target.
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
// hinted handoff support, and just one target. See also

View File

@@ -1440,7 +1440,8 @@ future<> storage_service::drain_on_shutdown() {
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
slogger.info("Drain on shutdown: system distributed keyspace stopped");
get_storage_proxy().invoke_on_all([&ss] (storage_proxy& local_proxy) mutable {
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
auto& ss = service::get_local_storage_service();
ss.unregister_subscriber(&local_proxy);
return local_proxy.drain_on_shutdown();
}).get();

View File

@@ -3285,7 +3285,7 @@ delete_atomically(std::vector<shared_sstable> ssts) {
dir_f.close().get();
sstlog.debug("{} written successfully.", pending_delete_log);
} catch (...) {
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log), std::current_exception();
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
parallel_for_each(ssts, [] (shared_sstable sst) {

View File

@@ -2518,7 +2518,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
std::move(slice),
std::move(m),
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
auto reader = source.make_reader(base, pk, slice, io_priority);
auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this

View File

@@ -77,3 +77,45 @@ BOOST_AUTO_TEST_CASE(test_make_random_uuid) {
std::sort(uuids.begin(), uuids.end());
BOOST_CHECK(std::unique(uuids.begin(), uuids.end()) == uuids.end());
}
BOOST_AUTO_TEST_CASE(test_get_time_uuid) {
using namespace std::chrono;
auto uuid = utils::UUID_gen::get_time_UUID();
BOOST_CHECK(uuid.is_timestamp());
auto tp = system_clock::now();
uuid = utils::UUID_gen::get_time_UUID(tp);
BOOST_CHECK(uuid.is_timestamp());
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
uuid = utils::UUID_gen::get_time_UUID(millis);
BOOST_CHECK(uuid.is_timestamp());
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
BOOST_CHECK(unix_timestamp == millis);
}
BOOST_AUTO_TEST_CASE(test_min_time_uuid) {
using namespace std::chrono;
auto tp = system_clock::now();
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
auto uuid = utils::UUID_gen::min_time_UUID(millis);
BOOST_CHECK(uuid.is_timestamp());
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
BOOST_CHECK(unix_timestamp == millis);
}
BOOST_AUTO_TEST_CASE(test_max_time_uuid) {
using namespace std::chrono;
auto tp = system_clock::now();
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
auto uuid = utils::UUID_gen::max_time_UUID(millis);
BOOST_CHECK(uuid.is_timestamp());
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
BOOST_CHECK(unix_timestamp == millis);
}

View File

@@ -180,9 +180,13 @@ rows_assertions rows_assertions::with_serialized_columns_count(size_t columns_co
}
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
cql_test_env& env, const char* query, const std::experimental::source_location& loc) {
cql_test_env& env, const char* query, std::unique_ptr<cql3::query_options>&& qo, const std::experimental::source_location& loc) {
try {
return env.execute_cql(query).get0();
if (qo) {
return env.execute_cql(query, std::move(qo)).get0();
} else {
return env.execute_cql(query).get0();
}
} catch (...) {
BOOST_FAIL(format("query '{}' failed: {}\n{}:{}: originally from here",
query, std::current_exception(), loc.file_name(), loc.line()));

View File

@@ -86,4 +86,5 @@ void assert_that_failed(future<T...>&& f)
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
cql_test_env& env,
const char* query,
std::unique_ptr<cql3::query_options>&& qo = nullptr,
const std::experimental::source_location& loc = std::experimental::source_location::current());

View File

@@ -1526,6 +1526,18 @@ SEASTAR_TEST_CASE(test_user_type_nested) {
});
}
SEASTAR_TEST_CASE(test_user_type_reversed) {
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create type my_type (a int);").get();
e.execute_cql("create table tbl (a int, b frozen<my_type>, primary key ((a), b)) with clustering order by (b desc);").get();
e.execute_cql("insert into tbl (a, b) values (1, (2));").get();
assert_that(e.execute_cql("select a,b.a from tbl;").get0())
.is_rows()
.with_size(1)
.with_row({int32_type->decompose(1), int32_type->decompose(2)});
});
}
SEASTAR_TEST_CASE(test_user_type) {
return do_with_cql_env([] (cql_test_env& e) {
return e.execute_cql("create type ut1 (my_int int, my_bigint bigint, my_text text);").discard_result().then([&e] {
@@ -3577,3 +3589,239 @@ SEASTAR_TEST_CASE(test_describe_varchar) {
});
});
}
SEASTAR_TEST_CASE(test_alter_type_on_compact_storage_with_no_regular_columns_does_not_crash) {
return do_with_cql_env_thread([] (cql_test_env& e) {
cquery_nofail(e, "CREATE TYPE my_udf (first text);");
cquery_nofail(e, "create table z (pk int, ck frozen<my_udf>, primary key(pk, ck)) with compact storage;");
cquery_nofail(e, "alter type my_udf add test_int int;");
});
}
shared_ptr<cql_transport::messages::result_message> cql_func_require_nofail(
cql_test_env& env,
const seastar::sstring& fct,
const seastar::sstring& inp,
std::unique_ptr<cql3::query_options>&& qo = nullptr,
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
auto res = shared_ptr<cql_transport::messages::result_message>(nullptr);
auto query = format("SELECT {}({}) FROM t;", fct, inp);
try {
if (qo) {
res = env.execute_cql(query, std::move(qo)).get0();
} else {
res = env.execute_cql(query).get0();
}
BOOST_TEST_MESSAGE(format("Query '{}' succeeded as expected", query));
} catch (...) {
BOOST_ERROR(format("query '{}' failed unexpectedly with error: {}\n{}:{}: originally from here",
query, std::current_exception(),
loc.file_name(), loc.line()));
}
return res;
}
// FIXME: should be in cql_assertions, but we don't want to call boost from cql_assertions.hh
template <typename Exception>
void cql_func_require_throw(
cql_test_env& env,
const seastar::sstring& fct,
const seastar::sstring& inp,
std::unique_ptr<cql3::query_options>&& qo = nullptr,
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
auto query = format("SELECT {}({}) FROM t;", fct, inp);
try {
if (qo) {
env.execute_cql(query, std::move(qo)).get();
} else {
env.execute_cql(query).get();
}
BOOST_ERROR(format("query '{}' succeeded unexpectedly\n{}:{}: originally from here", query,
loc.file_name(), loc.line()));
} catch (Exception& e) {
BOOST_TEST_MESSAGE(format("Query '{}' failed as expected with error: {}", query, e));
} catch (...) {
BOOST_ERROR(format("query '{}' failed with unexpected error: {}\n{}:{}: originally from here",
query, std::current_exception(),
loc.file_name(), loc.line()));
}
}
static void create_time_uuid_fcts_schema(cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE t (id int primary key, t timestamp, l bigint, f float, u timeuuid, d date)");
cquery_nofail(e, "INSERT INTO t (id, t, l, f, u, d) VALUES "
"(1, 1579072460606, 1579072460606000, 1579072460606, a66525e0-3766-11ea-8080-808080808080, '2020-01-13')");
cquery_nofail(e, "SELECT * FROM t;");
}
SEASTAR_TEST_CASE(test_basic_time_uuid_fcts) {
return do_with_cql_env_thread([] (auto& e) {
create_time_uuid_fcts_schema(e);
cql_func_require_nofail(e, "currenttime", "");
cql_func_require_nofail(e, "currentdate", "");
cql_func_require_nofail(e, "now", "");
cql_func_require_nofail(e, "currenttimeuuid", "");
cql_func_require_nofail(e, "currenttimestamp", "");
});
}
SEASTAR_TEST_CASE(test_time_uuid_fcts_input_validation) {
return do_with_cql_env_thread([] (auto& e) {
create_time_uuid_fcts_schema(e);
// test timestamp arg
auto require_timestamp = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "t");
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "u");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "now()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimeuuid()");
cql_func_require_nofail(e, fct, "currenttimestamp()");
};
require_timestamp("mintimeuuid");
require_timestamp("maxtimeuuid");
// test timeuuid arg
auto require_timeuuid = [&e] (const sstring& fct) {
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
};
require_timeuuid("dateof");
require_timeuuid("unixtimestampof");
// test timeuuid or date arg
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_nofail(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_nofail(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
};
require_timeuuid_or_date("totimestamp");
// test timestamp or timeuuid arg
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "t");
cql_func_require_throw<std::exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_nofail(e, fct, "currenttimestamp()");
};
require_timestamp_or_timeuuid("todate");
// test timestamp, timeuuid, or date arg
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "t");
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
cql_func_require_nofail(e, fct, "u");
cql_func_require_nofail(e, fct, "d");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
cql_func_require_nofail(e, fct, "currentdate()");
cql_func_require_nofail(e, fct, "now()");
cql_func_require_nofail(e, fct, "currenttimeuuid()");
cql_func_require_nofail(e, fct, "currenttimestamp()");
};
require_timestamp_timeuuid_or_date("tounixtimestamp");
});
}
SEASTAR_TEST_CASE(test_time_uuid_fcts_result) {
return do_with_cql_env_thread([] (auto& e) {
create_time_uuid_fcts_schema(e);
// test timestamp arg
auto require_timestamp = [&e] (const sstring& fct) {
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "mintimeuuid(t)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "maxtimeuuid(t)");
cql_func_require_nofail(e, fct, "dateof(u)");
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
cql_func_require_nofail(e, fct, "totimestamp(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
};
require_timestamp("mintimeuuid");
require_timestamp("maxtimeuuid");
// test timeuuid arg
auto require_timeuuid = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
};
require_timeuuid("dateof");
require_timeuuid("unixtimestampof");
// test timeuuid or date arg
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
cql_func_require_nofail(e, fct, "todate(u)");
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
};
require_timeuuid_or_date("totimestamp");
// test timestamp or timeuuid arg
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
};
require_timestamp_or_timeuuid("todate");
// test timestamp, timeuuid, or date arg
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
cql_func_require_nofail(e, fct, "dateof(u)");
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
cql_func_require_nofail(e, fct, "totimestamp(u)");
cql_func_require_nofail(e, fct, "todate(u)");
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
};
require_timestamp_timeuuid_or_date("tounixtimestamp");
});
}

View File

@@ -25,7 +25,7 @@
#include <seastar/util/noncopyable_function.hh>
inline
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 17) {
size_t attempts = 0;
while (true) {
try {
@@ -43,7 +43,7 @@ void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
inline
bool eventually_true(noncopyable_function<bool ()> f) {
const unsigned max_attempts = 10;
const unsigned max_attempts = 15;
unsigned attempts = 0;
while (true) {
if (f()) {

View File

@@ -1253,6 +1253,104 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) {
assert_that(m).is_equal_to(m2);
}
// This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some
// duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect
// to TTL.
SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) {
can_gc_fn never_gc = [] (tombstone) { return false; };
auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) {
BOOST_TEST_MESSAGE(format("must_be_alive({}, {})", mark, t));
BOOST_REQUIRE(mark.is_live(tombstone(), t));
BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t));
BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
};
auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) {
BOOST_TEST_MESSAGE(format("must_be_dead({}, {})", mark, t));
BOOST_REQUIRE(!mark.is_live(tombstone(), t));
BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t));
BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
};
const auto timestamp = api::timestamp_type(1);
const auto t0 = gc_clock::now();
const auto t1 = t0 + 1s;
const auto t2 = t0 + 2s;
const auto t3 = t0 + 3s;
// Without timestamp the marker is missing (doesn't exist)
const row_marker m1;
must_be_dead(m1, t0);
must_be_dead(m1, t1);
must_be_dead(m1, t2);
must_be_dead(m1, t3);
// With timestamp and without ttl, a row_marker is always alive
const row_marker m2(timestamp);
must_be_alive(m2, t0);
must_be_alive(m2, t1);
must_be_alive(m2, t2);
must_be_alive(m2, t3);
// A row_marker becomes dead exactly at the moment of expiry
// Reproduces #4263, #5290
const auto ttl = 1s;
const row_marker m3(timestamp, ttl, t2);
must_be_alive(m3, t0);
must_be_alive(m3, t1);
must_be_dead(m3, t2);
must_be_dead(m3, t3);
}
SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("ck", bytes_type, column_kind::clustering_key)
.build();
auto pk = partition_key::from_singular(*s, data_value(bytes("key1")));
auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A")));
auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B")));
auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C")));
auto ttl = 1s;
auto t0 = gc_clock::now();
auto t1 = t0 + 1s;
auto t2 = t0 + 2s;
auto t3 = t0 + 3s;
auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) {
auto slice = partition_slice_builder(*s)
.without_partition_key_columns()
.build();
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
};
mutation m(s, pk);
m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1));
m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2));
m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3));
assert_that(results_at_time(m, t0))
.has_size(3)
.has(a_row().with_column("ck", data_value(bytes("A"))))
.has(a_row().with_column("ck", data_value(bytes("B"))))
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t1))
.has_size(2)
.has(a_row().with_column("ck", data_value(bytes("B"))))
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t2))
.has_size(1)
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t3)).is_empty();
}
SEASTAR_TEST_CASE(test_querying_expired_cells) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")

View File

@@ -421,6 +421,47 @@ public:
virtual void on_drop_view(const sstring&, const sstring&) override { ++drop_view_count; }
};
SEASTAR_TEST_CASE(test_alter_nested_type) {
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("CREATE TYPE foo (foo_k int);").get();
e.execute_cql("CREATE TYPE bar (bar_k frozen<foo>);").get();
e.execute_cql("alter type foo add zed_v int;").get();
e.execute_cql("CREATE TABLE tbl (key int PRIMARY KEY, val frozen<bar>);").get();
e.execute_cql("insert into tbl (key, val) values (1, {bar_k: {foo_k: 2, zed_v: 3} });").get();
});
}
SEASTAR_TEST_CASE(test_nested_type_mutation_in_update) {
// ALTER TYPE always creates a mutation with a single type. This
// creates a mutation with 2 types, one nested in the other, to
// show that we can handle that.
return do_with_cql_env_thread([](cql_test_env& e) {
counting_migration_listener listener;
service::get_local_migration_manager().register_listener(&listener);
e.execute_cql("CREATE TYPE foo (foo_k int);").get();
e.execute_cql("CREATE TYPE bar (bar_k frozen<foo>);").get();
BOOST_REQUIRE_EQUAL(listener.create_user_type_count, 2);
service::migration_manager& mm = service::get_local_migration_manager();
auto&& keyspace = e.db().local().find_keyspace("ks").metadata();
auto type1 = user_type_impl::get_instance("ks", to_bytes("foo"), {"foo_k", "extra"}, {int32_type, int32_type});
auto muts1 = db::schema_tables::make_create_type_mutations(keyspace, type1, api::new_timestamp());
auto type2 = user_type_impl::get_instance("ks", to_bytes("bar"), {"bar_k", "extra"}, {type1, int32_type});
auto muts2 = db::schema_tables::make_create_type_mutations(keyspace, type2, api::new_timestamp());
auto muts = muts1;
muts.insert(muts.end(), muts2.begin(), muts2.end());
mm.announce(std::move(muts), false).get();
BOOST_REQUIRE_EQUAL(listener.create_user_type_count, 2);
BOOST_REQUIRE_EQUAL(listener.update_user_type_count, 2);
});
}
SEASTAR_TEST_CASE(test_notifications) {
return do_with_cql_env([](cql_test_env& e) {
return seastar::async([&] {

View File

@@ -27,6 +27,7 @@
#include "types/map.hh"
#include "types/list.hh"
#include "types/set.hh"
#include "cql3/statements/select_statement.hh"
SEASTAR_TEST_CASE(test_secondary_index_regular_column_query) {
@@ -1101,3 +1102,65 @@ SEASTAR_TEST_CASE(test_secondary_index_on_partition_key_with_filtering) {
});
});
}
SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
static constexpr int row_count = 2 * cql3::statements::select_statement::DEFAULT_COUNT_PAGE_SIZE + 120;
return do_with_cql_env_thread([] (cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE fpa (id int primary key, v int)");
cquery_nofail(e, "CREATE INDEX ON fpa(v)");
for (int i = 0; i < row_count; ++i) {
cquery_nofail(e, format("INSERT INTO fpa (id, v) VALUES ({}, {})", i + 1, i % 2).c_str());
}
eventually([&] {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function.
// Also, instead of the user-provided page size, internal DEFAULT_COUNT_PAGE_SIZE is expected to be used.
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4)},
});
// Even if paging is not explicitly used, the query will be internally paged to avoid OOM.
msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 1;");
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count / 2 + 1)},
});
});
// Similar, but this time a non-prefix clustering key part is indexed (wrt. issue 3405, after which we have
// a special code path for indexing composite non-prefix clustering keys).
cquery_nofail(e, "CREATE TABLE fpa2 (id int, c1 int, c2 int, primary key (id, c1, c2))");
cquery_nofail(e, "CREATE INDEX ON fpa2(c2)");
eventually([&] {
for (int i = 0; i < row_count; ++i) {
cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str());
}
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4)},
});
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count / 2 + 1)},
});
});
});
}

View File

@@ -385,6 +385,8 @@ BOOST_AUTO_TEST_CASE(test_varint) {
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00deadbeef"))), boost::multiprecision::cpp_int("0xdeadbeef"));
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00ffffffffffffffffffffffffffffffff"))), boost::multiprecision::cpp_int("340282366920938463463374607431768211455"));
BOOST_REQUIRE_EQUAL(from_hex("80000000"), varint_type->decompose(boost::multiprecision::cpp_int(-2147483648)));
test_parsing_fails(varint_type, "1A");
}

View File

@@ -515,7 +515,7 @@ SEASTAR_TEST_CASE(test_update_column_not_in_view_with_flush) {
}
void test_partial_update_with_unselected_collections(cql_test_env& e, std::function<void()>&& maybe_flush) {
e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<int>, m map<int,int>, primary key (p, c))").get();
e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<int>, m map<int,text>, primary key (p, c))").get();
e.execute_cql("create materialized view vcf as select a, b from cf "
"where p is not null and c is not null "
"primary key (c, p)").get();
@@ -563,7 +563,7 @@ e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<i
assert_that(msg).is_rows().is_empty();
});
e.execute_cql("update cf set m=m+{3:3}, l=l-[1], s=s-{2} where p = 1 and c = 1").get();
e.execute_cql("update cf set m=m+{3:'text'}, l=l-[1], s=s-{2} where p = 1 and c = 1").get();
maybe_flush();
eventually([&] {
auto msg = e.execute_cql("select * from vcf").get0();

View File

@@ -38,6 +38,7 @@ if [[ "$1" = -* ]]; then
fi
docker_common_args=(
--pids-limit -1 \
--network host \
-u "$(id -u):$(id -g)" \
"${group_args[@]}" \

View File

@@ -206,8 +206,9 @@ void tracing::set_trace_probability(double p) {
}
one_session_records::one_session_records()
: backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state())
, budget_ptr(tracing::get_local_tracing_instance().get_cached_records_ptr()) {}
: _local_tracing_ptr(tracing::get_local_tracing_instance().shared_from_this())
, backend_state_ptr(_local_tracing_ptr->allocate_backend_session_state())
, budget_ptr(_local_tracing_ptr->get_cached_records_ptr()) {}
std::ostream& operator<<(std::ostream& os, const span_id& id) {
return os << id.get_id();

View File

@@ -240,6 +240,8 @@ public:
};
class one_session_records {
private:
shared_ptr<tracing> _local_tracing_ptr;
public:
utils::UUID session_id;
session_record session_rec;
@@ -665,7 +667,7 @@ private:
void one_session_records::set_pending_for_write() {
_is_pending_for_write = true;
budget_ptr = tracing::get_local_tracing_instance().get_pending_records_ptr();
budget_ptr = _local_tracing_ptr->get_pending_records_ptr();
}
void one_session_records::data_consumed() {
@@ -674,7 +676,7 @@ void one_session_records::data_consumed() {
}
_is_pending_for_write = false;
budget_ptr = tracing::get_local_tracing_instance().get_cached_records_ptr();
budget_ptr = _local_tracing_ptr->get_cached_records_ptr();
}
inline span_id span_id::make_span_id() {

View File

@@ -1558,6 +1558,13 @@ public:
}
out = std::copy(b.crbegin(), b.crend(), out);
}
static size_t serialized_size_aux(const boost::multiprecision::cpp_int& num) {
if (num) {
return align_up(boost::multiprecision::msb(num) + 2, 8u) / 8;
} else {
return 1;
}
}
virtual size_t serialized_size(const void* value) const override {
if (!value) {
return 0;
@@ -1570,8 +1577,10 @@ public:
if (!num) {
return 1;
}
auto pnum = abs(num);
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
if (num < 0) {
return serialized_size_aux(-num - 1);
}
return serialized_size_aux(num);
}
virtual int32_t compare(bytes_view v1, bytes_view v2) const override {
if (v1.empty()) {
@@ -2087,8 +2096,7 @@ struct empty_type_impl : abstract_type {
return false;
}
virtual std::optional<data_type> update_user_type(const shared_ptr<const user_type_impl> updated) const {
// Can't happen
abort();
return std::nullopt;
}
};

View File

@@ -51,6 +51,7 @@ public:
bytes_view field_name(size_t i) const { return _field_names[i]; }
sstring field_name_as_string(size_t i) const { return _string_field_names[i]; }
const std::vector<bytes>& field_names() const { return _field_names; }
const std::vector<sstring>& string_field_names() const { return _string_field_names; }
sstring get_name_as_string() const;
virtual sstring cql3_type_name_impl() const override;
virtual bool is_native() const override { return false; }

View File

@@ -59,11 +59,15 @@ public:
return (most_sig_bits >> 12) & 0xf;
}
bool is_timestamp() const {
return version() == 1;
}
int64_t timestamp() const {
//if (version() != 1) {
// throw new UnsupportedOperationException("Not a time-based UUID");
//}
assert(version() == 1);
assert(is_timestamp());
return ((most_sig_bits & 0xFFF) << 48) |
(((most_sig_bits >> 16) & 0xFFFF) << 32) |

View File

@@ -75,7 +75,7 @@ private:
// placement of this singleton is important. It needs to be instantiated *AFTER* the other statics.
static thread_local const std::unique_ptr<UUID_gen> instance;
int64_t last_nanos = 0;
uint64_t last_nanos = 0;
UUID_gen()
{
@@ -91,7 +91,9 @@ public:
*/
static UUID get_time_UUID()
{
return UUID(instance->create_time_safe(), clock_seq_and_node);
auto uuid = UUID(instance->create_time_safe(), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -101,7 +103,9 @@ public:
*/
static UUID get_time_UUID(int64_t when)
{
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -115,12 +119,21 @@ public:
// "nanos" needs to be in 100ns intervals since the adoption of the Gregorian calendar in the West.
uint64_t nanos = duration_cast<nanoseconds>(tp.time_since_epoch()).count() / 100;
nanos -= (10000ULL * START_EPOCH);
return UUID(create_time(nanos), clock_seq_and_node);
auto uuid = UUID(create_time(nanos), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
static UUID get_time_UUID(int64_t when, int64_t clock_seq_and_node)
{
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
assert(uuid.is_timestamp());
return uuid;
}
/** validates uuid from raw bytes. */
static bool is_valid_UUID(bytes raw) {
return raw.size() == 16;
}
/** creates uuid from raw bytes. */
@@ -176,7 +189,9 @@ public:
*/
static UUID min_time_UUID(int64_t timestamp)
{
return UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
auto uuid = UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -192,7 +207,9 @@ public:
// timestamp 1ms, then we should not extend 100's nanoseconds
// precision by taking 10000, but rather 19999.
int64_t uuid_tstamp = from_unix_timestamp(timestamp + 1) - 1;
return UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
auto uuid = UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
assert(uuid.is_timestamp());
return uuid;
}
/**
@@ -276,6 +293,15 @@ public:
return (uuid.timestamp() / 10000) + START_EPOCH;
}
static uint64_t make_nanos_since(int64_t millis) {
return (static_cast<uint64_t>(millis) - static_cast<uint64_t>(START_EPOCH)) * 10000;
}
// nanos_since must fit in 60 bits
static bool is_valid_nanos_since(uint64_t nanos_since) {
return !(0xf000000000000000UL & nanos_since);
}
private:
// needs to return two different values for the same when.
@@ -287,7 +313,7 @@ private:
using namespace std::chrono;
int64_t millis = duration_cast<milliseconds>(
system_clock::now().time_since_epoch()).count();
int64_t nanos_since = (millis - START_EPOCH) * 10000;
uint64_t nanos_since = make_nanos_since(millis);
if (nanos_since > last_nanos)
last_nanos = nanos_since;
else
@@ -298,16 +324,17 @@ private:
int64_t create_time_unsafe(int64_t when, int nanos)
{
uint64_t nanos_since = ((when - START_EPOCH) * 10000) + nanos;
uint64_t nanos_since = make_nanos_since(when) + static_cast<uint64_t>(static_cast<int64_t>(nanos));
return create_time(nanos_since);
}
static int64_t create_time(uint64_t nanos_since)
{
uint64_t msb = 0L;
assert(is_valid_nanos_since(nanos_since));
msb |= (0x00000000ffffffffL & nanos_since) << 32;
msb |= (0x0000ffff00000000UL & nanos_since) >> 16;
msb |= (0xffff000000000000UL & nanos_since) >> 48;
msb |= (0x0fff000000000000UL & nanos_since) >> 48;
msb |= 0x0000000000001000L; // sets the version to 1.
return msb;
}

View File

@@ -246,10 +246,18 @@ public:
public:
const T& front() const { return *cbegin(); }
T& front() { return *begin(); }
iterator begin() const { return iterator(_chunks.data(), 0); }
iterator end() const { return iterator(_chunks.data(), _size); }
iterator begin() { return iterator(_chunks.data(), 0); }
iterator end() { return iterator(_chunks.data(), _size); }
const_iterator begin() const { return const_iterator(_chunks.data(), 0); }
const_iterator end() const { return const_iterator(_chunks.data(), _size); }
const_iterator cbegin() const { return const_iterator(_chunks.data(), 0); }
const_iterator cend() const { return const_iterator(_chunks.data(), _size); }
std::reverse_iterator<iterator> rbegin() { return std::reverse_iterator(end()); }
std::reverse_iterator<iterator> rend() { return std::reverse_iterator(begin()); }
std::reverse_iterator<const_iterator> rbegin() const { return std::reverse_iterator(end()); }
std::reverse_iterator<const_iterator> rend() const { return std::reverse_iterator(begin()); }
std::reverse_iterator<const_iterator> crbegin() const { return std::reverse_iterator(cend()); }
std::reverse_iterator<const_iterator> crend() const { return std::reverse_iterator(cbegin()); }
public:
bool operator==(const chunked_vector& x) const {
return boost::equal(*this, x);

View File

@@ -2065,6 +2065,17 @@ bool segment_pool::migrate_segment(segment* src, segment* dst)
#endif
void tracker::impl::register_region(region::impl* r) {
// If needed, increase capacity of regions before taking the reclaim lock,
// to avoid failing an allocation when push_back() tries to increase
// capacity.
//
// The capacity increase is atomic (wrt _regions) so it cannot be
// observed
if (_regions.size() == _regions.capacity()) {
auto copy = _regions;
copy.reserve(copy.capacity() * 2);
_regions = std::move(copy);
}
reclaiming_lock _(*this);
_regions.push_back(r);
llogger.debug("Registered region @{} with id={}", r, r->id());