Compare commits

...

44 Commits

Author SHA1 Message Date
Hagit Segev
7ae2cdf46c release: prepare for 3.3.0 2020-03-19 21:46:44 +02:00
Hagit Segev
863f88c067 release: prepare for 3.3.rc3 2020-03-15 22:45:30 +02:00
Avi Kivity
90b4e9e595 Update seastar submodule
* seastar f54084c08f...a0bdc6cd85 (1):
  > tls: Fix race and stale memory use in delayed shutdown

Fixes #5759 (maybe)
2020-03-12 19:41:50 +02:00
Konstantin Osipov
434ad4548f locator: correctly select endpoints if RF=0
SimpleStrategy creates a list of endpoints by iterating over the set of
all configured endpoints for the given token, until we reach keyspace
replication factor.
There is a trivial coding bug when we first add at least one endpoint
to the list, and then compare list size and replication factor.
If RF=0 this never yields true.
Fix by moving the RF check before at least one endpoint is added to the
list.
Cassandra never had this bug since it uses a less fancy while()
loop.

Fixes #5962
Message-Id: <20200306193729.130266-1-kostja@scylladb.com>

(cherry picked from commit ac6f64a885)
2020-03-12 12:09:46 +02:00
Avi Kivity
cbbb15af5c logalloc: increase capacity of _regions vector outside reclaim lock
Reclaim consults the _regions vector, so we don't want it moving around while
allocating more capacity. For that we take the reclaim lock. However, that
can cause a false-positive OOM during startup:

1. all memory is allocated to LSA as part of priming (2baa16b371)
2. the _regions vector is resized from 64k to 128k, requiring a segment
   to be freed (plenty are free)
3. but reclaiming_lock is taken, so we cannot reclaim anything.

To fix, resize the _regions vector outside the lock.

Fixes #6003.
Message-Id: <20200311091217.1112081-1-avi@scylladb.com>

(cherry picked from commit c020b4e5e2)
2020-03-12 11:25:20 +02:00
Benny Halevy
3231580c05 dist/redhat: scylla.spec.mustache: set _no_recompute_build_ids
By default, `/usr/lib/rpm/find-debuginfo.sh` will temper with
the binary's build-id when stripping its debug info as it is passed
the `--build-id-seed <version>.<release>` option.

To prevent that we need to set the following macros as follows:
  unset `_unique_build_ids`
  set `_no_recompute_build_ids` to 1

Fixes #5881

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 25a763a187)
2020-03-09 15:21:50 +02:00
Piotr Sarna
62364d9dcd Merge 'cql3: do_execute_base_query: fix null deref ...
... when clustering key is unavailable' from Benny

This series fixes null pointer dereference seen in #5794

efd7efe cql3: generate_base_key_from_index_pk; support optional index_ck
7af1f9e cql3: do_execute_base_query: generate open-ended slice when clustering key is unavailable
7fe1a9e cql3: do_execute_base_query: fixup indentation

Fixes #5794

Branches: 3.3

Test: unit(dev) secondary_indexes_test:TestSecondaryIndexes.test_truncate_base(debug)

* bhalevy/fix-5794-generate_base_key_from_index_pk:
  cql3: do_execute_base_query: fixup indentation
  cql3: do_execute_base_query: generate open-ended slice when clustering key is unavailable
  cql3: generate_base_key_from_index_pk; support optional index_ck

(cherry picked from commit 4e95b67501)
2020-03-09 15:20:01 +02:00
Takuya ASADA
3bed8063f6 dist/debian: fix "unable to open node-exporter.service.dpkg-new" error
It seems like *.service is conflicting on install time because the file
installed twice, both debian/*.service and debian/scylla-server.install.

We don't need to use *.install, so we can just drop the line.

Fixes #5640

(cherry picked from commit 29285b28e2)
2020-03-03 12:40:39 +02:00
Yaron Kaikov
413fcab833 release: prepare for 3.3.rc2 2020-02-27 14:45:18 +02:00
Juliusz Stasiewicz
9f3c3036bf cdc: set TTLs on CDC log cells
Cells in CDC logs used to be created while completely neglecting
TTLs (the TTLs from `cdc = {...'ttl':600}`). This patch adds TTLs
to all cells; there are no row markers, so wee need not set TTL
there.

Fixes #5688

(cherry picked from commit 67b92c584f)
2020-02-26 18:12:55 +02:00
Benny Halevy
ff2e108a6d gossiper: do_stop_gossiping: copy live endpoints vector
It can be resized asynchronously by mark_dead.

Fixes #5701

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20200203091344.229518-1-bhalevy@scylladb.com>
(cherry picked from commit f45fabab73)
2020-02-26 13:00:11 +02:00
Gleb Natapov
ade788ffe8 commitlog: use commitlog IO scheduling class for segment zeroing
There may be other commitlog writes waiting for zeroing to complete, so
not using proper scheduling class causes priority inversion.

Fixes #5858.

Message-Id: <20200220102939.30769-2-gleb@scylladb.com>
(cherry picked from commit 6a78cc9e31)
2020-02-26 12:51:10 +02:00
Benny Halevy
1f8bb754d9 storage_service: drain_on_shutdown: unregister storage_proxy subscribers from local_storage_service
Match subscription done in main() and avoid cross shard access
to _lifecycle_subscribers vector.

Fixes #5385

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Acked-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20200123092817.454271-1-bhalevy@scylladb.com>
(cherry picked from commit 5b0ea4c114)
2020-02-25 16:39:49 +02:00
Tomasz Grabiec
7b2eb09225 Merge fixes for use-after-frees related to shutdown of services
Backport of 884d5e2bcb and
4839ca8491.

Fixes crashes when scylla is stopped early during boot.

Merged from https://github.com/xemul/scylla/tree/br-mm-combined-fixes-for-3.3

Fixes #5765.
2020-02-25 13:34:01 +01:00
Pavel Emelyanov
d2293f9fd5 migration_manager: Abort and wait cluster upgrade waiters
The maybe_schedule_schema_pull waits for schema_tables_v3 to
become available. This is unsafe in case migration manager
goes away before the feature is enabled.

Fix this by subscribing on feature with feature::listener and
waiting for condition variable in maybe_schedule_schema_pull.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2020-02-24 14:18:15 +03:00
Pavel Emelyanov
25b31f6c23 migration_manager: Abort and wait delayed schema pulls
The sleep is interrupted with the abort source, the "wait" part
is done with the existing _background_tasks gate. Also we need
to make sure the gate stays alive till the end of the function,
so make use of the async_sharded_service (migration manager is
already such).

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2020-02-24 14:18:15 +03:00
Pavel Emelyanov
742a1ce7d6 storage_service: Unregister from gossiper notifications ... at all
This unregistration doesn't happen currently, but doesn't seem to
cause any problems in general, as on stop gossiper is stopped and
nothing from it hits the store_service.

However (!) if an exception pops up between the storage_service
is subscribed on gossiper and the drain_on_shutdown defer action
is set up  then we _may_ get into the following situation:

- main's stuff gets unrolled back
- gossiper is not stopped (drain_on_shutdown defer is not set up)
- migration manager is stopped (with deferred action in main)
- a nitification comes from gossiper
    -> storage_service::on_change might want to pull schema with
       the help of local migration manager
    -> assert(local_is_initialized) strikes

Fix this by registering storage_service to gossiper a bit earlier
(both are already initialized y that time) and setting up unregister
defer right afterwards.

Test: unit(dev), manual start-stop
Bug: #5628

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20200130190343.25656-1-xemul@scylladb.com>
2020-02-24 14:18:15 +03:00
Avi Kivity
4ca9d23b83 Revert "streaming: Do not invalidate cache if no sstable is added in flush_streaming_mutations"
This reverts commit bdc542143e. Exposes a data resurrection
bug (#5838).
2020-02-24 10:02:58 +02:00
Avi Kivity
9e97f3a9b3 Update seastar submodule
* seastar dd686552ff...f54084c08f (2):
  > reactor: fallback to epoll backend when fs.aio-max-nr is too small
  > util: move read_sys_file_as() from iotune to seastar header, rename read_first_line_as()

Fixes #5638.
2020-02-20 10:25:00 +02:00
Piotr Dulikowski
183418f228 hh: handle counter update hints correctly
This patch fixes a bug that appears because of an incorrect interaction
between counters and hinted handoff.

When a counter is updated on the leader, it sends mutations to other
replicas that contain all counter shards from the leader. If consistency
level is achieved but some replicas are unavailable, a hint with
mutation containing counter shards is stored.

When a hint's destination node is no longer its replica, it is attempted
to be sent to all its current replicas. Previously,
storage_proxy::mutate was used for that purpose. It was incorrect
because that function treats mutations for counter tables as mutations
containing only a delta (by how much to increase/decrease the counter).
These two types of mutations have different serialization format, so in
this case a "shards" mutation is reinterpreted as "delta" mutation,
which can cause data corruption to occur.

This patch backports `storage_proxy::mutate_hint_from_scratch`
function, which bypasses special handling of counter mutations and
treats them as regular mutations - which is the correct behavior for
"shards" mutations.

Refs #5833.
Backports: 3.1, 3.2, 3.3
Tests: unit(dev)
(cherry picked from commit ec513acc49)
2020-02-19 16:49:12 +02:00
Piotr Sarna
756574d094 db,view: fix generating view updates for partition tombstones
The update generation path must track and apply all tombstones,
both from the existing base row (if read-before-write was needed)
and for the new row. One such path contained an error, because
it assumed that if the existing row is empty, then the update
can be simply generated from the new row. However, lack of the
existing row can also be the result of a partition/range tombstone.
If that's the case, it needs to be applied, because it's entirely
possible that this partition row also hides the new row.
Without taking the partition tombstone into account, creating
a future tombstone and inserting an out-of-order write before it
in the base table can result in ghost rows in the view table.
This patch comes with a test which was proven to fail before the
changes.

Branches 3.1,3.2,3.3
Fixes #5793

Tests: unit(dev)
Message-Id: <8d3b2abad31572668693ab585f37f4af5bb7577a.1581525398.git.sarna@scylladb.com>
(cherry picked from commit e93c54e837)
2020-02-16 20:26:28 +02:00
Rafael Ávila de Espíndola
a348418918 service: Add a lock around migration_notifier::_listeners
Before this patch the iterations over migration_notifier::_listeners
could race with listeners being added and removed.

The addition side is not modified, since it is common to add a
listener during construction and it would require a fairly big
refactoring. Instead, the iteration is modified to use indexes instead
of iterators so that it is still valid if another listener is added
concurrently.

For removal we use a rw lock, since removing an element invalidates
indexes too. There are only a few places that needed refactoring to
handle unregister_listener returning a future<>, so this is probably
OK.

Fixes #5541.

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200120192819.136305-1-espindola@scylladb.com>
(cherry picked from commit 27bd3fe203)
2020-02-16 20:13:42 +02:00
Avi Kivity
06c0bd0681 Update seastar submodule
* seastar 3f3e117de3...dd686552ff (1):
  > perftune.py: Use safe_load() for fix arbitrary code execution

Fixes #5630.
2020-02-16 15:53:16 +02:00
Avi Kivity
223c300435 Point seastar submodule at scylla-seastar.git branch-3.3
This allows us to backport seastar patches to Scylla 3.3.
2020-02-16 15:51:46 +02:00
Gleb Natapov
ac8bef6781 commitlog: fix flushing an entry marked as "sync" in periodic mode
After 546556b71b we can have mixed writes into commitlog,
some do flush immediately some do not. If non flushing write races with
flushing one and becomes responsible for writing back its buffer into a
file flush will be skipped which will cause assert in batch_cycle() to
trigger since flush position will not be advanced. Fix that by checking
that flush was skipped and in this case flush explicitly our file
position.

Fixes #5670

Message-Id: <20200128145103.GI26048@scylladb.com>
(cherry picked from commit c654ffe34b)
2020-02-16 15:48:40 +02:00
Pavel Solodovnikov
68691907af lwt: fix handling of nulls in parameter markers for LWT queries
This patch affects the LWT queries with IF conditions of the
following form: `IF col in :value`, i.e. if the parameter
marker is used.

When executing a prepared query with a bound value
of `(None,)` (tuple with null, example for Python driver), it is
serialized not as NULL but as "empty" value (serialization
format differs in each case).

Therefore, Scylla deserializes the parameters in the request as
empty `data_value` instances, which are, in turn, translated
to non-empty `bytes_opt` with empty byte-string value later.

Account for this case too in the CAS condition evaluation code.

Example of a problem this patch aims to fix:

Suppose we have a table `tbl` with a boolean field `test` and
INSERT a row with NULL value for the `test` column.

Then the following update query fails to apply due to the
error in IF condition evaluation code (assume `v=(null)`):
`UPDATE tbl SET test=false WHERE key=0 IF test IN :v`
returns false in `[applied]` column, but is expected to succeed.

Tests: unit(debug, dev), dtest(prepared stmt LWT tests at https://github.com/scylladb/scylla-dtest/pull/1286)

Fixes: #5710

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
Message-Id: <20200205102039.35851-1-pa.solodovnikov@scylladb.com>
(cherry picked from commit bcc4647552)
2020-02-16 15:29:28 +02:00
Avi Kivity
f59d2fcbf1 Merge "stop passing tracing state pointer in client_state" from Gleb
"
client_state is used simultaneously by many requests running in parallel
while tracing state pointer is per request. Both those facts do not sit
well together and as a result sometimes tracing state is being overwritten
while still been used by active request which may cause incorrect trace
or even a crash.
"

Fixes #5700.

Backported from 9f1f60fc38

* 'gleb/trace_fix_3.3_backport' of ssh://github.com/scylladb/seastar-dev:
  client_state: drop the pointer to a tracing state from client_state
  transport: pass tracing state explicitly instead of relying on it been in the client_state
  alternator: pass tracing state explicitly instead of relying on it been in the client_state
2020-02-16 15:23:41 +02:00
Asias He
bdc542143e streaming: Do not invalidate cache if no sstable is added in flush_streaming_mutations
The table::flush_streaming_mutations is used in the days when streaming
data goes to memtable. After switching to the new streaming, data goes
to sstables directly in streaming, so the sstables generated in
table::flush_streaming_mutations will be empty.

It is unnecessary to invalidate the cache if no sstables are added. To
avoid unnecessary cache invalidating which pokes hole in the cache, skip
calling _cache.invalidate() if the sstables is empty.

The steps are:

- STREAM_MUTATION_DONE verb is sent when streaming is done with old or
  new streaming
- table::flush_streaming_mutations is called in the verb handler
- cache is invalidated for the streaming ranges

In summary, this patch will avoid a lot of cache invalidation for
streaming.

Backports: 3.0 3.1 3.2
Fixes: #5769
(cherry picked from commit 5e9925b9f0)
2020-02-16 15:16:24 +02:00
Botond Dénes
061a02237c row: append(): downgrade assert to on_internal_error()
This assert, added by 060e3f8 is supposed to make sure the invariant of
the append() is respected, in order to prevent building an invalid row.
The assert however proved to be too harsh, as it converts any bug
causing out-of-order clustering rows into cluster unavailability.
Downgrade it to on_internal_error(). This will still prevent corrupt
data from spreading in the cluster, without the unavailability caused by
the assert.

Fixes: #5786
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20200211083829.915031-1-bdenes@scylladb.com>
(cherry picked from commit 3164456108)
2020-02-16 15:12:46 +02:00
Gleb Natapov
35b6505517 client_state: drop the pointer to a tracing state from client_state
client_state is shared between requests and tracing state is per
request. It is not safe to use the former as a container for the later
since a state can be overwritten prematurely by subsequent requests.

(cherry picked from commit 31cf2434d6)
2020-02-13 13:45:56 +02:00
Gleb Natapov
866c04dd64 transport: pass tracing state explicitly instead of relying on it been in the client_state
Multiple requests can use the same client_state simultaneously, so it is
not safe to use it as a container for a tracing state which is per request.
Currently next request may overwrite tracing state for previous one
causing, in a best case, wrong trace to be taken or crash if overwritten
pointer is freed prematurely.

Fixes #5700

(cherry picked from commit 9f1f60fc38)
2020-02-13 13:45:56 +02:00
Gleb Natapov
dc588e6e7b alternator: pass tracing state explicitly instead of relying on it been in the client_state
Multiple requests can use the same client_state simultaneously, so it is
not safe to use it as a container for a tracing state which is per
request. This is not yet an issue for the alternator since it creates
new client_state object for each request, but first of all it should not
and second trace state will be dropped from the client_state, by later
patch.

(cherry picked from commit 38fcab3db4)
2020-02-13 13:45:56 +02:00
Takuya ASADA
f842154453 dist/debian: keep /etc/systemd .conf files on 'remove'
Since dpkg does not re-install conffiles when it removed by user,
currently we are missing dependencies.conf and sysconfdir.conf on rollback.
To prevent this, we need to stop running
'rm -rf /etc/systemd/system/scylla-server.service.d/' on 'remove'.

Fixes #5734

(cherry picked from commit 43097854a5)
2020-02-12 14:26:40 +02:00
Yaron Kaikov
b38193f71d dist/docker: Switch to 3.3 release repository (#5756)
Change the SCYLLA_REPO_URL variable to point to branch-3.3 instead of
master. This ensures that Docker image builds that don't specify the
variable build from the right repository by default.
2020-02-10 11:11:38 +02:00
Rafael Ávila de Espíndola
f47ba6dc06 lua: Handle nil returns correctly
This is a minimum backport to 3.3.

With this patch lua nil values are mapped to CQL null values instead
of producing an error.

Fixes #5667

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200203164918.70450-1-espindola@scylladb.com>
2020-02-09 18:55:42 +02:00
Hagit Segev
0d0c1d4318 release: prepare for 3.3.rc1 2020-02-09 15:55:24 +02:00
Takuya ASADA
9225b17b99 scylla_post_install.sh: fix 'integer expression expected' error
awk returns float value on Debian, it causes postinst script failure
since we compare it as integer value.
Replaced with sed + bash.

Fixes #5569

(cherry picked from commit 5627888b7c)
2020-02-04 14:30:04 +02:00
Gleb Natapov
00b3f28199 db/system_keyspace: use user memory limits for local.paxos table
Treat writes to local.paxos as user memory, as the number of writes is
dependent on the amount of user data written with LWT.

Fixes #5682

Message-Id: <20200130150048.GW26048@scylladb.com>
(cherry picked from commit b08679e1d3)
2020-02-02 17:36:52 +02:00
Rafael Ávila de Espíndola
1bbe619689 types: Fix encoding of negative varint
We would sometimes produce an unnecessary extra 0xff prefix byte.

The new encoding matches what cassandra does.

This was both a efficiency and correctness issue, as using varint in a
key could produce different tokens.

Fixes #5656

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
(cherry picked from commit c89c90d07f)
2020-02-02 16:00:58 +02:00
Avi Kivity
c36f71c783 test: make eventually() more patient
We use eventually() in tests to wait for eventually consistent data
to become consistent. However, we see spurious failures indicating
that we wait too little.

Increasing the timeout has a negative side effect in that tests that
fail will now take longer to do so. However, this negative side effect
is negligible to false-positive failures, since they throw away large
test efforts and sometimes require a person to investigate the problem,
only to conclude it is a false positive.

This patch therefore makes eventually() more patient, by a factor of
32.

Fixes #4707.
Message-Id: <20200130162745.45569-1-avi@scylladb.com>

(cherry picked from commit ec5b721db7)
2020-02-01 13:20:22 +02:00
Pekka Enberg
f5471d268b release: prepare for 3.3.rc0 2020-01-30 14:00:51 +02:00
Takuya ASADA
fd5c65d9dc dist/debian: Use tilde for release candidate builds
We need to add '~' to handle rcX version correctly on Debian variants
(merged at ae33e9f), but when we moved to relocated package we mistakenly
dropped the code, so add the code again.

Fixes #5641

(cherry picked from commit dd81fd3454)
2020-01-28 18:34:48 +02:00
Avi Kivity
3aa406bf00 tools: toolchain: dbuild: relax process limit in container
Docker restricts the number of processes in a container to some
limit it calculates. This limit turns out to be too low on large
machines, since we run multiple links in parallel, and each link
runs many threads.

Remove the limit by specifying --pids-limit -1. Since dbuild is
meant to provide a build environment, not a security barrier,
this is okay (the container is still restricted by host limits).

I checked that --pids-limit is supported by old versions of
docker and by podman.

Fixes #5651.
Message-Id: <20200127090807.3528561-1-avi@scylladb.com>

(cherry picked from commit 897320f6ab)
2020-01-28 18:14:01 +02:00
Piotr Sarna
c0253d9221 db,view: fix checking for secondary index special columns
A mistake in handling legacy checks for special 'idx_token' column
resulted in not recognizing materialized views backing secondary
indexes properly. The mistake is really a typo, but with bad
consequences - instead of checking the view schema for being an index,
we asked for the base schema, which is definitely not an index of
itself.

Branches 3.1,3.2 (asap)
Fixes #5621
Fixes #4744

(cherry picked from commit 9b379e3d63)
2020-01-21 23:32:11 +02:00
47 changed files with 523 additions and 328 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=666.development
VERSION=3.3.0
if test -f version
then

View File

@@ -299,14 +299,14 @@ static void describe_key_schema(rjson::value& parent, const schema& schema, std:
}
future<json::json_return_type> executor::describe_table(client_state& client_state, std::string content) {
future<json::json_return_type> executor::describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.describe_table++;
rjson::value request = rjson::parse(content);
elogger.trace("Describing table {}", request);
schema_ptr schema = get_table(_proxy, request);
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value table_description = rjson::empty_object();
rjson::set(table_description, "TableName", rjson::from_string(schema->cf_name()));
@@ -378,13 +378,13 @@ future<json::json_return_type> executor::describe_table(client_state& client_sta
return make_ready_future<json::json_return_type>(make_jsonable(std::move(response)));
}
future<json::json_return_type> executor::delete_table(client_state& client_state, std::string content) {
future<json::json_return_type> executor::delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.delete_table++;
rjson::value request = rjson::parse(content);
elogger.trace("Deleting table {}", request);
std::string table_name = get_table_name(request);
tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, table_name);
tracing::add_table_name(trace_state, KEYSPACE_NAME, table_name);
if (!_proxy.get_db().local().has_schema(KEYSPACE_NAME, table_name)) {
throw api_error("ResourceNotFoundException",
@@ -481,14 +481,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
}
future<json::json_return_type> executor::create_table(client_state& client_state, std::string content) {
future<json::json_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.create_table++;
rjson::value table_info = rjson::parse(content);
elogger.trace("Creating table {}", table_info);
std::string table_name = get_table_name(table_info);
const rjson::value& attribute_definitions = table_info["AttributeDefinitions"];
tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, table_name);
tracing::add_table_name(trace_state, KEYSPACE_NAME, table_name);
schema_builder builder(KEYSPACE_NAME, table_name);
auto [hash_key, range_key] = parse_key_schema(table_info);
@@ -749,14 +749,14 @@ static future<std::unique_ptr<rjson::value>> maybe_get_previous_item(
bool need_read_before_write,
alternator::stats& stats);
future<json::json_return_type> executor::put_item(client_state& client_state, std::string content) {
future<json::json_return_type> executor::put_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.put_item++;
auto start_time = std::chrono::steady_clock::now();
rjson::value update_info = rjson::parse(content);
elogger.trace("Updating value {}", update_info);
schema_ptr schema = get_table(_proxy, update_info);
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
if (rjson::find(update_info, "ConditionExpression")) {
throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator");
@@ -766,6 +766,7 @@ future<json::json_return_type> executor::put_item(client_state& client_state, st
// FIXME: Need to support also the ALL_OLD option. See issue #5053.
throw api_error("ValidationException", format("Unsupported ReturnValues={} for PutItem operation", return_values));
}
const bool has_expected = update_info.HasMember("Expected");
const rjson::value& item = update_info["Item"];
@@ -774,11 +775,11 @@ future<json::json_return_type> executor::put_item(client_state& client_state, st
return maybe_get_previous_item(_proxy, client_state, schema, item, has_expected, _stats).then(
[this, schema, has_expected, update_info = rjson::copy(update_info), m = std::move(m),
&client_state, start_time] (std::unique_ptr<rjson::value> previous_item) mutable {
&client_state, start_time, trace_state] (std::unique_ptr<rjson::value> previous_item) mutable {
if (has_expected) {
verify_expected(update_info, previous_item);
}
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () {
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () {
_stats.api_operations.put_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.put_item_latency._count + 1);
// Without special options on what to return, PutItem returns nothing.
return make_ready_future<json::json_return_type>(json_string(""));
@@ -806,13 +807,13 @@ static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr sc
return m;
}
future<json::json_return_type> executor::delete_item(client_state& client_state, std::string content) {
future<json::json_return_type> executor::delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.delete_item++;
auto start_time = std::chrono::steady_clock::now();
rjson::value update_info = rjson::parse(content);
schema_ptr schema = get_table(_proxy, update_info);
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
if (rjson::find(update_info, "ConditionExpression")) {
throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator");
@@ -831,11 +832,11 @@ future<json::json_return_type> executor::delete_item(client_state& client_state,
return maybe_get_previous_item(_proxy, client_state, schema, key, has_expected, _stats).then(
[this, schema, has_expected, update_info = rjson::copy(update_info), m = std::move(m),
&client_state, start_time] (std::unique_ptr<rjson::value> previous_item) mutable {
&client_state, start_time, trace_state] (std::unique_ptr<rjson::value> previous_item) mutable {
if (has_expected) {
verify_expected(update_info, previous_item);
}
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () {
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () {
_stats.api_operations.delete_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.delete_item_latency._count + 1);
// Without special options on what to return, DeleteItem returns nothing.
return make_ready_future<json::json_return_type>(json_string(""));
@@ -868,7 +869,7 @@ struct primary_key_equal {
}
};
future<json::json_return_type> executor::batch_write_item(client_state& client_state, std::string content) {
future<json::json_return_type> executor::batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.batch_write_item++;
rjson::value batch_info = rjson::parse(content);
rjson::value& request_items = batch_info["RequestItems"];
@@ -878,7 +879,7 @@ future<json::json_return_type> executor::batch_write_item(client_state& client_s
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
schema_ptr schema = get_table_from_batch_request(_proxy, it);
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(1, primary_key_hash{schema}, primary_key_equal{schema});
for (auto& request : it->value.GetArray()) {
if (!request.IsObject() || request.MemberCount() != 1) {
@@ -911,7 +912,7 @@ future<json::json_return_type> executor::batch_write_item(client_state& client_s
}
}
return _proxy.mutate(std::move(mutations), db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([] () {
return _proxy.mutate(std::move(mutations), db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([] () {
// Without special options on what to return, BatchWriteItem returns nothing,
// unless there are UnprocessedItems - it's possible to just stop processing a batch
// due to throttling. TODO(sarna): Consider UnprocessedItems when returning.
@@ -1410,13 +1411,13 @@ static future<std::unique_ptr<rjson::value>> maybe_get_previous_item(
}
future<json::json_return_type> executor::update_item(client_state& client_state, std::string content) {
future<json::json_return_type> executor::update_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.update_item++;
auto start_time = std::chrono::steady_clock::now();
rjson::value update_info = rjson::parse(content);
elogger.trace("update_item {}", update_info);
schema_ptr schema = get_table(_proxy, update_info);
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
if (rjson::find(update_info, "ConditionExpression")) {
throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator");
@@ -1472,7 +1473,7 @@ future<json::json_return_type> executor::update_item(client_state& client_state,
return maybe_get_previous_item(_proxy, client_state, schema, pk, ck, has_update_expression, expression, has_expected, _stats).then(
[this, schema, expression = std::move(expression), has_update_expression, ck = std::move(ck), has_expected,
update_info = rjson::copy(update_info), m = std::move(m), attrs_collector = std::move(attrs_collector),
attribute_updates = rjson::copy(attribute_updates), ts, &client_state, start_time] (std::unique_ptr<rjson::value> previous_item) mutable {
attribute_updates = rjson::copy(attribute_updates), ts, &client_state, start_time, trace_state] (std::unique_ptr<rjson::value> previous_item) mutable {
if (has_expected) {
verify_expected(update_info, previous_item);
}
@@ -1603,7 +1604,7 @@ future<json::json_return_type> executor::update_item(client_state& client_state,
row.apply(row_marker(ts));
elogger.trace("Applying mutation {}", m);
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () {
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () {
// Without special options on what to return, UpdateItem returns nothing.
_stats.api_operations.update_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.update_item_latency._count + 1);
return make_ready_future<json::json_return_type>(json_string(""));
@@ -1630,7 +1631,7 @@ static db::consistency_level get_read_consistency(const rjson::value& request) {
return consistent_read ? db::consistency_level::LOCAL_QUORUM : db::consistency_level::LOCAL_ONE;
}
future<json::json_return_type> executor::get_item(client_state& client_state, std::string content) {
future<json::json_return_type> executor::get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.get_item++;
auto start_time = std::chrono::steady_clock::now();
rjson::value table_info = rjson::parse(content);
@@ -1638,7 +1639,7 @@ future<json::json_return_type> executor::get_item(client_state& client_state, st
schema_ptr schema = get_table(_proxy, table_info);
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value& query_key = table_info["Key"];
db::consistency_level cl = get_read_consistency(table_info);
@@ -1673,7 +1674,7 @@ future<json::json_return_type> executor::get_item(client_state& client_state, st
});
}
future<json::json_return_type> executor::batch_get_item(client_state& client_state, std::string content) {
future<json::json_return_type> executor::batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
// FIXME: In this implementation, an unbounded batch size can cause
// unbounded response JSON object to be buffered in memory, unbounded
// parallelism of the requests, and unbounded amount of non-preemptable
@@ -1701,7 +1702,7 @@ future<json::json_return_type> executor::batch_get_item(client_state& client_sta
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs;
rs.schema = get_table_from_batch_request(_proxy, it);
tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, rs.schema->cf_name());
tracing::add_table_name(trace_state, KEYSPACE_NAME, rs.schema->cf_name());
rs.cl = get_read_consistency(it->value);
rs.attrs_to_get = calculate_attrs_to_get(it->value);
auto& keys = (it->value)["Keys"];
@@ -1867,10 +1868,11 @@ static future<json::json_return_type> do_query(schema_ptr schema,
db::consistency_level cl,
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions,
service::client_state& client_state,
cql3::cql_stats& cql_stats) {
cql3::cql_stats& cql_stats,
tracing::trace_state_ptr trace_state) {
::shared_ptr<service::pager::paging_state> paging_state = nullptr;
tracing::trace(client_state.get_trace_state(), "Performing a database query");
tracing::trace(trace_state, "Performing a database query");
if (exclusive_start_key) {
partition_key pk = pk_from_json(*exclusive_start_key, schema);
@@ -1887,7 +1889,7 @@ static future<json::json_return_type> do_query(schema_ptr schema,
auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, query::max_partitions);
auto query_state_ptr = std::make_unique<service::query_state>(client_state, empty_service_permit());
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, empty_service_permit());
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto query_options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, std::vector<cql3::raw_value>{});
@@ -1919,7 +1921,7 @@ static future<json::json_return_type> do_query(schema_ptr schema,
// 2. Filtering - by passing appropriately created restrictions to pager as a last parameter
// 3. Proper timeouts instead of gc_clock::now() and db::no_timeout
// 4. Implement parallel scanning via Segments
future<json::json_return_type> executor::scan(client_state& client_state, std::string content) {
future<json::json_return_type> executor::scan(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.scan++;
rjson::value request_info = rjson::parse(content);
elogger.trace("Scanning {}", request_info);
@@ -1956,7 +1958,7 @@ future<json::json_return_type> executor::scan(client_state& client_state, std::s
partition_ranges = filtering_restrictions->get_partition_key_ranges(query_options);
ck_bounds = filtering_restrictions->get_clustering_bounds(query_options);
}
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats);
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, trace_state);
}
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) {
@@ -2079,14 +2081,14 @@ calculate_bounds(schema_ptr schema, const rjson::value& conditions) {
return {std::move(partition_ranges), std::move(ck_bounds)};
}
future<json::json_return_type> executor::query(client_state& client_state, std::string content) {
future<json::json_return_type> executor::query(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) {
_stats.api_operations.query++;
rjson::value request_info = rjson::parse(content);
elogger.trace("Querying {}", request_info);
schema_ptr schema = get_table_or_view(_proxy, request_info);
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value* exclusive_start_key = rjson::find(request_info, "ExclusiveStartKey");
db::consistency_level cl = get_read_consistency(request_info);
@@ -2129,7 +2131,7 @@ future<json::json_return_type> executor::query(client_state& client_state, std::
throw api_error("ValidationException", format("QueryFilter can only contain non-primary key attributes: Primary key attribute: {}", ck_defs.front()->name_as_text()));
}
}
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats);
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, std::move(trace_state));
}
static void validate_limit(int limit) {
@@ -2238,18 +2240,20 @@ future<> executor::maybe_create_keyspace() {
});
}
static void create_tracing_session(executor::client_state& client_state) {
static tracing::trace_state_ptr create_tracing_session() {
tracing::trace_state_props_set props;
props.set<tracing::trace_state_props::full_tracing>();
client_state.create_tracing_session(tracing::trace_type::QUERY, props);
return tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, props);
}
void executor::maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query) {
tracing::trace_state_ptr executor::maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query) {
tracing::trace_state_ptr trace_state;
if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
create_tracing_session(client_state);
tracing::add_query(client_state.get_trace_state(), query);
tracing::begin(client_state.get_trace_state(), format("Alternator {}", op), client_state.get_client_address());
trace_state = create_tracing_session();
tracing::add_query(trace_state, query);
tracing::begin(trace_state, format("Alternator {}", op), client_state.get_client_address());
}
return trace_state;
}
future<> executor::start() {

View File

@@ -46,26 +46,26 @@ public:
executor(service::storage_proxy& proxy, service::migration_manager& mm) : _proxy(proxy), _mm(mm) {}
future<json::json_return_type> create_table(client_state& client_state, std::string content);
future<json::json_return_type> describe_table(client_state& client_state, std::string content);
future<json::json_return_type> delete_table(client_state& client_state, std::string content);
future<json::json_return_type> put_item(client_state& client_state, std::string content);
future<json::json_return_type> get_item(client_state& client_state, std::string content);
future<json::json_return_type> delete_item(client_state& client_state, std::string content);
future<json::json_return_type> update_item(client_state& client_state, std::string content);
future<json::json_return_type> create_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> put_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> update_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> list_tables(client_state& client_state, std::string content);
future<json::json_return_type> scan(client_state& client_state, std::string content);
future<json::json_return_type> scan(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> describe_endpoints(client_state& client_state, std::string content, std::string host_header);
future<json::json_return_type> batch_write_item(client_state& client_state, std::string content);
future<json::json_return_type> batch_get_item(client_state& client_state, std::string content);
future<json::json_return_type> query(client_state& client_state, std::string content);
future<json::json_return_type> batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<json::json_return_type> query(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content);
future<> start();
future<> stop() { return make_ready_future<>(); }
future<> maybe_create_keyspace();
static void maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query);
static tracing::trace_state_ptr maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query);
};
}

View File

@@ -231,9 +231,9 @@ future<json::json_return_type> server::handle_api_request(std::unique_ptr<reques
// We use unique_ptr because client_state cannot be moved or copied
return do_with(std::make_unique<executor::client_state>(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr<executor::client_state>& client_state) mutable {
client_state->set_raw_keyspace(executor::KEYSPACE_NAME);
executor::maybe_trace_query(*client_state, op, req->content);
tracing::trace(client_state->get_trace_state(), op);
return callback_it->second(_executor.local(), *client_state, std::move(req));
tracing::trace_state_ptr trace_state = executor::maybe_trace_query(*client_state, op, req->content);
tracing::trace(trace_state, op);
return callback_it->second(_executor.local(), *client_state, trace_state, std::move(req)).finally([trace_state] {});
});
});
}
@@ -253,21 +253,21 @@ void server::set_routes(routes& r) {
server::server(seastar::sharded<executor>& e)
: _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false)
, _callbacks{
{"CreateTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) {
return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req)] { return e.create_table(client_state, req->content); }); }
{"CreateTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) {
return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req), trace_state = std::move(trace_state)] () mutable { return e.create_table(client_state, std::move(trace_state), req->content); }); }
},
{"DescribeTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_table(client_state, req->content); }},
{"DeleteTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_table(client_state, req->content); }},
{"PutItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.put_item(client_state, req->content); }},
{"UpdateItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.update_item(client_state, req->content); }},
{"GetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.get_item(client_state, req->content); }},
{"DeleteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_item(client_state, req->content); }},
{"ListTables", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.list_tables(client_state, req->content); }},
{"Scan", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.scan(client_state, req->content); }},
{"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }},
{"BatchWriteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_write_item(client_state, req->content); }},
{"BatchGetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_get_item(client_state, req->content); }},
{"Query", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.query(client_state, req->content); }},
{"DescribeTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.describe_table(client_state, std::move(trace_state), req->content); }},
{"DeleteTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.delete_table(client_state, std::move(trace_state), req->content); }},
{"PutItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.put_item(client_state, std::move(trace_state), req->content); }},
{"UpdateItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.update_item(client_state, std::move(trace_state), req->content); }},
{"GetItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.get_item(client_state, std::move(trace_state), req->content); }},
{"DeleteItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.delete_item(client_state, std::move(trace_state), req->content); }},
{"ListTables", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.list_tables(client_state, req->content); }},
{"Scan", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.scan(client_state, std::move(trace_state), req->content); }},
{"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }},
{"BatchWriteItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.batch_write_item(client_state, std::move(trace_state), req->content); }},
{"BatchGetItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.batch_get_item(client_state, std::move(trace_state), req->content); }},
{"Query", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.query(client_state, std::move(trace_state), req->content); }},
} {
}

View File

@@ -31,7 +31,7 @@
namespace alternator {
class server {
using alternator_callback = std::function<future<json::json_return_type>(executor&, executor::client_state&, std::unique_ptr<request>)>;
using alternator_callback = std::function<future<json::json_return_type>(executor&, executor::client_state&, tracing::trace_state_ptr, std::unique_ptr<request>)>;
using alternator_callbacks_map = std::unordered_map<std::string_view, alternator_callback>;
seastar::httpd::http_server_control _control;

View File

@@ -193,9 +193,9 @@ future<> service::start(::service::migration_manager& mm) {
future<> service::stop() {
// Only one of the shards has the listener registered, but let's try to
// unregister on each one just to make sure.
_mnotifier.unregister_listener(_migration_listener.get());
return _permissions_cache->stop().then([this] {
return _mnotifier.unregister_listener(_migration_listener.get()).then([this] {
return _permissions_cache->stop();
}).then([this] {
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop());
});
}

View File

@@ -73,6 +73,7 @@ static future<> populate_desc(db_context ctx, const schema& s);
class cdc::cdc_service::impl : service::migration_listener::empty_listener {
friend cdc_service;
db_context _ctxt;
bool _stopped = false;
public:
impl(db_context ctxt)
: _ctxt(std::move(ctxt))
@@ -80,7 +81,13 @@ public:
_ctxt._migration_notifier.register_listener(this);
}
~impl() {
_ctxt._migration_notifier.unregister_listener(this);
assert(_stopped);
}
future<> stop() {
return _ctxt._migration_notifier.unregister_listener(this).then([this] {
_stopped = true;
});
}
void on_before_create_column_family(const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
@@ -198,6 +205,10 @@ cdc::cdc_service::cdc_service(db_context ctxt)
_impl->_ctxt._proxy.set_cdc_service(this);
}
future<> cdc::cdc_service::stop() {
return _impl->stop();
}
cdc::cdc_service::~cdc_service() = default;
cdc::options::options(const std::map<sstring, sstring>& map) {
@@ -260,7 +271,6 @@ sstring desc_name(const sstring& table_name) {
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.set_default_time_to_live(gc_clock::duration{s.cdc_options().ttl()});
b.set_comment(sprint("CDC log for %s.%s", s.ks_name(), s.cf_name()));
b.with_column("stream_id", uuid_type, column_kind::partition_key);
b.with_column("time", timeuuid_type, column_kind::clustering_key);
@@ -405,6 +415,7 @@ private:
bytes _decomposed_time;
::shared_ptr<const transformer::streams_type> _streams;
const column_definition& _op_col;
ttl_opt _cdc_ttl_opt;
clustering_key set_pk_columns(const partition_key& pk, int batch_no, mutation& m) const {
const auto log_ck = clustering_key::from_exploded(
@@ -416,7 +427,8 @@ private:
auto cdef = m.schema()->get_column_definition(to_bytes("_" + column.name()));
auto value = atomic_cell::make_live(*column.type,
_time.timestamp(),
bytes_view(pk_value[pos]));
bytes_view(pk_value[pos]),
_cdc_ttl_opt);
m.set_cell(log_ck, *cdef, std::move(value));
++pos;
}
@@ -424,7 +436,7 @@ private:
}
void set_operation(const clustering_key& ck, operation op, mutation& m) const {
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _time.timestamp(), _op_col.type->decompose(operation_native_type(op))));
m.set_cell(ck, _op_col, atomic_cell::make_live(*_op_col.type, _time.timestamp(), _op_col.type->decompose(operation_native_type(op)), _cdc_ttl_opt));
}
partition_key stream_id(const net::inet_address& ip, unsigned int shard_id) const {
@@ -443,7 +455,11 @@ public:
, _decomposed_time(timeuuid_type->decompose(_time))
, _streams(std::move(streams))
, _op_col(*_log_schema->get_column_definition(to_bytes("operation")))
{}
{
if (_schema->cdc_options().ttl()) {
_cdc_ttl_opt = std::chrono::seconds(_schema->cdc_options().ttl());
}
}
// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
// more details like tombstones/ttl? Probably not but keep in mind.
@@ -475,7 +491,8 @@ public:
auto cdef = _log_schema->get_column_definition(to_bytes("_" + column.name()));
auto value = atomic_cell::make_live(*column.type,
_time.timestamp(),
bytes_view(exploded[pos]));
bytes_view(exploded[pos]),
_cdc_ttl_opt);
res.set_cell(log_ck, *cdef, std::move(value));
++pos;
}
@@ -531,11 +548,11 @@ public:
for (const auto& column : _schema->clustering_key_columns()) {
assert (pos < ck_value.size());
auto cdef = _log_schema->get_column_definition(to_bytes("_" + column.name()));
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos])));
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos]), _cdc_ttl_opt));
if (pirow) {
assert(pirow->has(column.name_as_text()));
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos])));
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, _time.timestamp(), bytes_view(ck_value[pos]), _cdc_ttl_opt));
}
++pos;
@@ -564,7 +581,7 @@ public:
}
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(op)));
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values)));
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values), _cdc_ttl_opt));
if (pirow && pirow->has(cdef.name_as_text())) {
values[0] = data_type_for<column_op_native_type>()->decompose(data_value(static_cast<column_op_native_type>(column_op::set)));
@@ -573,7 +590,7 @@ public:
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
assert(pikey->explode() != log_ck.explode());
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values)));
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, _time.timestamp(), tuple_type_impl::build_value(values), _cdc_ttl_opt));
}
} else {
cdc_log.warn("Non-atomic cell ignored {}.{}:{}", _schema->ks_name(), _schema->cf_name(), cdef.name_as_text());

View File

@@ -81,6 +81,7 @@ class cdc_service {
class impl;
std::unique_ptr<impl> _impl;
public:
future<> stop();
cdc_service(service::storage_proxy&);
cdc_service(db_context);
~cdc_service();

View File

@@ -266,7 +266,7 @@ bool column_condition::applies_to(const data_value* cell_value, const query_opti
return value.has_value() && is_satisfied_by(operator_type::EQ, *cell_value->type(), *column.type, *cell_value, *value);
});
} else {
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return value.has_value() == false; });
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return !value.has_value() || value->empty(); });
}
}

View File

@@ -450,8 +450,9 @@ query_processor::~query_processor() {
}
future<> query_processor::stop() {
_mnotifier.unregister_listener(_migration_subscriber.get());
return _authorized_prepared_cache.stop().finally([this] { return _prepared_cache.stop(); });
return _mnotifier.unregister_listener(_migration_subscriber.get()).then([this] {
return _authorized_prepared_cache.stop().finally([this] { return _prepared_cache.stop(); });
});
}
future<::shared_ptr<result_message>>

View File

@@ -423,7 +423,7 @@ GCC6_CONCEPT(
requires (std::is_same_v<KeyType, partition_key> || std::is_same_v<KeyType, clustering_key_prefix>)
)
static KeyType
generate_base_key_from_index_pk(const partition_key& index_pk, const clustering_key& index_ck, const schema& base_schema, const schema& view_schema) {
generate_base_key_from_index_pk(const partition_key& index_pk, const std::optional<clustering_key>& index_ck, const schema& base_schema, const schema& view_schema) {
const auto& base_columns = std::is_same_v<KeyType, partition_key> ? base_schema.partition_key_columns() : base_schema.clustering_key_columns();
std::vector<bytes_view> exploded_base_key;
exploded_base_key.reserve(base_columns.size());
@@ -432,8 +432,8 @@ generate_base_key_from_index_pk(const partition_key& index_pk, const clustering_
const column_definition* view_col = view_schema.view_info()->view_column(base_col);
if (view_col->is_partition_key()) {
exploded_base_key.push_back(index_pk.get_component(view_schema, view_col->id));
} else {
exploded_base_key.push_back(index_ck.get_component(view_schema, view_col->id));
} else if (index_ck) {
exploded_base_key.push_back(index_ck->get_component(view_schema, view_col->id));
}
}
return KeyType::from_range(exploded_base_key);
@@ -497,11 +497,16 @@ indexed_table_select_statement::do_execute_base_query(
auto old_paging_state = options.get_paging_state();
if (old_paging_state && concurrency == 1) {
auto base_pk = generate_base_key_from_index_pk<partition_key>(old_paging_state->get_partition_key(),
*old_paging_state->get_clustering_key(), *_schema, *_view_schema);
auto base_ck = generate_base_key_from_index_pk<clustering_key>(old_paging_state->get_partition_key(),
*old_paging_state->get_clustering_key(), *_schema, *_view_schema);
command->slice.set_range(*_schema, base_pk,
std::vector<query::clustering_range>{query::clustering_range::make_starting_with(range_bound<clustering_key>(base_ck, false))});
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
if (_schema->clustering_key_size() > 0) {
assert(old_paging_state->get_clustering_key().has_value());
auto base_ck = generate_base_key_from_index_pk<clustering_key>(old_paging_state->get_partition_key(),
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
command->slice.set_range(*_schema, base_pk,
std::vector<query::clustering_range>{query::clustering_range::make_starting_with(range_bound<clustering_key>(base_ck, false))});
} else {
command->slice.set_range(*_schema, base_pk, std::vector<query::clustering_range>{query::clustering_range::make_open_ended_both_sides()});
}
}
concurrency *= 2;
return proxy.query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})

View File

@@ -807,7 +807,12 @@ public:
// (Note: wait_for_pending(pos) waits for operation _at_ pos (and before),
replay_position rp(me->_desc.id, position_type(fp));
return me->_pending_ops.wait_for_pending(rp, timeout).then([me, fp] {
assert(me->_flush_pos > fp);
assert(me->_segment_manager->cfg.mode != sync_mode::BATCH || me->_flush_pos > fp);
if (me->_flush_pos <= fp) {
// previous op we were waiting for was not sync one, so it did not flush
// force flush here
return me->do_flush(fp);
}
return make_ready_future<sseg_ptr>(me);
});
}
@@ -1323,7 +1328,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
v.emplace_back(iovec{ buf.get_write(), s});
m += s;
}
return f.dma_write(max_size - rem, std::move(v)).then([&rem](size_t s) {
return f.dma_write(max_size - rem, std::move(v), service::get_local_commitlog_priority()).then([&rem](size_t s) {
rem -= s;
return stop_iteration::no;
});

View File

@@ -405,11 +405,8 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
return _proxy.send_hint_to_endpoint(std::move(m), end_point_key());
} else {
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
// unavailable exception.
auto timeout = db::timeout_clock::now() + 1h;
//FIXME: Add required frozen_mutation overloads
return _proxy.mutate({m.fm.unfreeze(m.s)}, consistency_level::ALL, timeout, nullptr, empty_service_permit());
return _proxy.mutate_hint_from_scratch(std::move(m));
}
});
}

View File

@@ -1792,7 +1792,7 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) {
}
static bool maybe_write_in_user_memory(schema_ptr s, database& db) {
return (s.get() == batchlog().get())
return (s.get() == batchlog().get()) || (s.get() == paxos().get())
|| s == v3::scylla_views_builds_in_progress();
}

View File

@@ -311,7 +311,7 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
if (!cdef.is_computed()) {
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
// once "computed_columns feature" is supported by every node
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_base)) {
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
}
computed_value = token_column_computation().compute_value(*_base, base_key, update);
@@ -894,7 +894,11 @@ future<stop_iteration> view_update_builder::on_results() {
if (_update && !_update->is_end_of_partition()) {
if (_update->is_clustering_row()) {
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
generate_update(std::move(*_update).as_clustering_row(), { });
auto existing_tombstone = _existing_tombstone_tracker.current_tombstone();
auto existing = existing_tombstone
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
: std::nullopt;
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
}
return advance_updates();
}
@@ -1187,8 +1191,9 @@ future<> view_builder::stop() {
vlogger.info("Stopping view builder");
_as.request_abort();
return _started.finally([this] {
_mnotifier.unregister_listener(this);
return _sem.wait().then([this] {
return _mnotifier.unregister_listener(this).then([this] {
return _sem.wait();
}).then([this] {
_sem.broken();
return _build_step.join();
}).handle_exception_type([] (const broken_semaphore&) {

View File

@@ -125,7 +125,7 @@ if [ -z "$TARGET" ]; then
fi
RELOC_PKG_FULLPATH=$(readlink -f $RELOC_PKG)
RELOC_PKG_BASENAME=$(basename $RELOC_PKG)
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE)
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE | sed 's/\.rc/~rc/')
SCYLLA_RELEASE=$(cat SCYLLA-RELEASE-FILE)
ln -fv $RELOC_PKG_FULLPATH ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz

View File

@@ -4,7 +4,6 @@ etc/security/limits.d/scylla.conf
etc/scylla.d/*.conf
opt/scylladb/share/doc/scylla/*
opt/scylladb/share/doc/scylla/licenses/
usr/lib/systemd/system/*.service
usr/lib/systemd/system/*.timer
usr/lib/systemd/system/*.slice
usr/bin/scylla

View File

@@ -6,8 +6,12 @@ case "$1" in
purge|remove)
rm -rf /etc/systemd/system/scylla-housekeeping-daily.service.d/
rm -rf /etc/systemd/system/scylla-housekeeping-restart.service.d/
rm -rf /etc/systemd/system/scylla-server.service.d/
rm -rf /etc/systemd/system/scylla-helper.slice.d/
# We need to keep dependencies.conf and sysconfdir.conf on 'remove',
# otherwise it will be missing after rollback.
if [ "$1" = "purge" ]; then
rm -rf /etc/systemd/system/scylla-server.service.d/
fi
;;
esac

View File

@@ -5,7 +5,7 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-3.3/latest/scylla.repo
ADD scylla_bashrc /scylla_bashrc

View File

@@ -17,6 +17,10 @@ Obsoletes: scylla-server < 1.1
%undefine _find_debuginfo_dwz_opts
# Prevent find-debuginfo.sh from tempering with scylla's build-id (#5881)
%undefine _unique_build_ids
%global _no_recompute_build_ids 1
%description
Scylla is a highly scalable, eventually consistent, distributed,
partitioned row DB.

View File

@@ -1935,7 +1935,8 @@ future<> gossiper::do_stop_gossiping() {
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
logger.info("Announcing shutdown");
add_local_application_state(application_state::STATUS, _value_factory.shutdown(true)).get();
for (inet_address addr : _live_endpoints) {
auto live_endpoints = _live_endpoints;
for (inet_address addr : live_endpoints) {
msg_addr id = get_msg_addr(addr);
logger.trace("Sending a GossipShutdown to {}", id);
ms().send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {

View File

@@ -53,13 +53,13 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
endpoints.reserve(replicas);
for (auto& token : tm.ring_range(t)) {
if (endpoints.size() == replicas) {
break;
}
auto ep = tm.get_endpoint(token);
assert(ep);
endpoints.push_back(*ep);
if (endpoints.size() == replicas) {
break;
}
}
return std::move(endpoints.get_vector());

17
lua.cc
View File

@@ -888,10 +888,13 @@ db_clock::time_point timestamp_return_visitor::operator()(const lua_table&) {
}
static data_value convert_from_lua(lua_slice_state &l, const data_type& type) {
if (lua_isnil(l, -1)) {
return data_value::make_null(type);
}
return ::visit(*type, from_lua_visitor{l});
}
static bytes convert_return(lua_slice_state &l, const data_type& return_type) {
static bytes_opt convert_return(lua_slice_state &l, const data_type& return_type) {
int num_return_vals = lua_gettop(l);
if (num_return_vals != 1) {
throw exceptions::invalid_request_exception(
@@ -901,7 +904,11 @@ static bytes convert_return(lua_slice_state &l, const data_type& return_type) {
// FIXME: It should be possible to avoid creating the data_value,
// or even better, change the function::execute interface to
// return a data_value instead of bytes_opt.
return convert_from_lua(l, return_type).serialize();
data_value ret = convert_from_lua(l, return_type);
if (ret.is_null()) {
return {};
}
return ret.serialize();
}
static void push_sstring(lua_slice_state& l, const sstring& v) {
@@ -1065,7 +1072,7 @@ lua::runtime_config lua::make_runtime_config(const db::config& config) {
}
// run the script for at most max_instructions
future<bytes> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_value>& values, data_type return_type, const lua::runtime_config& cfg) {
future<bytes_opt> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_value>& values, data_type return_type, const lua::runtime_config& cfg) {
lua_slice_state l = load_script(cfg, bitcode);
unsigned nargs = values.size();
if (!lua_checkstack(l, nargs)) {
@@ -1088,7 +1095,7 @@ future<bytes> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_
auto start = ::now();
switch (lua_resume(l, nullptr, nargs)) {
case LUA_OK:
return make_ready_future<bytes_opt>(convert_return(l, return_type));
return make_ready_future<std::optional<bytes_opt>>(convert_return(l, return_type));
case LUA_YIELD: {
nargs = 0;
elapsed += ::now() - start;
@@ -1096,7 +1103,7 @@ future<bytes> lua::run_script(lua::bitcode_view bitcode, const std::vector<data_
millisecond ms = elapsed;
throw exceptions::invalid_request_exception(format("lua execution timeout: {}ms elapsed", ms.count()));
}
return make_ready_future<bytes_opt>(bytes_opt());
return make_ready_future<std::optional<bytes_opt>>(std::nullopt);
}
default:
throw exceptions::invalid_request_exception(std::string("lua execution failed: ") +

4
lua.hh
View File

@@ -41,6 +41,6 @@ struct runtime_config {
runtime_config make_runtime_config(const db::config& config);
sstring compile(const runtime_config& cfg, const std::vector<sstring>& arg_names, sstring script);
seastar::future<bytes> run_script(bitcode_view bitcode, const std::vector<data_value>& values,
data_type return_type, const runtime_config& cfg);
seastar::future<bytes_opt> run_script(bitcode_view bitcode, const std::vector<data_value>& values,
data_type return_type, const runtime_config& cfg);
}

View File

@@ -946,6 +946,12 @@ int main(int ac, char** av) {
ss.init_messaging_service_part().get();
api::set_server_messaging_service(ctx).get();
api::set_server_storage_service(ctx).get();
gossiper.local().register_(ss.shared_from_this());
auto stop_listening = defer_verbose_shutdown("storage service notifications", [&gossiper, &ss] {
gossiper.local().unregister_(ss.shared_from_this());
});
ss.init_server_without_the_messaging_service_part().get();
supervisor::notify("starting batchlog manager");
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {

View File

@@ -39,6 +39,9 @@
#include <seastar/core/execution_stage.hh>
#include "types/map.hh"
#include "compaction_garbage_collector.hh"
#include "utils/exceptions.hh"
logging::logger mplog("mutation_partition");
template<bool reversed>
struct reversal_traits;
@@ -1238,7 +1241,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
void
row::append_cell(column_id id, atomic_cell_or_collection value) {
if (_type == storage_type::vector && id < max_vector_size) {
assert(_storage.vector.v.size() <= id);
if (_storage.vector.v.size() > id) {
on_internal_error(mplog, format("Attempted to append cell#{} to row already having {} cells", id, _storage.vector.v.size()));
}
_storage.vector.v.resize(id);
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
_storage.vector.present.set(id);

View File

@@ -58,7 +58,8 @@ EOS
# For systems with not a lot of memory, override default reservations for the slices
# seastar has a minimum reservation of 1.5GB that kicks in, and 21GB * 0.07 = 1.5GB.
# So for anything smaller than that we will not use percentages in the helper slice
MEMTOTAL_BYTES=$(cat /proc/meminfo | grep MemTotal | awk '{print $2 * 1024}')
MEMTOTAL=$(cat /proc/meminfo |grep -e "^MemTotal:"|sed -s 's/^MemTotal:\s*\([0-9]*\) kB$/\1/')
MEMTOTAL_BYTES=$(($MEMTOTAL * 1024))
if [ $MEMTOTAL_BYTES -lt 23008753371 ]; then
mkdir -p /etc/systemd/system/scylla-helper.slice.d/
cat << EOS > /etc/systemd/system/scylla-helper.slice.d/memory.conf

Submodule seastar updated: 3f3e117de3...a0bdc6cd85

View File

@@ -75,25 +75,22 @@ public:
class client_state_for_another_shard {
private:
const client_state* _cs;
tracing::global_trace_state_ptr _trace_state;
seastar::sharded<auth::service>* _auth_service;
client_state_for_another_shard(const client_state* cs, tracing::global_trace_state_ptr gt,
seastar::sharded<auth::service>* auth_service) : _cs(cs), _trace_state(gt), _auth_service(auth_service) {}
client_state_for_another_shard(const client_state* cs, seastar::sharded<auth::service>* auth_service) : _cs(cs), _auth_service(auth_service) {}
friend client_state;
public:
client_state get() const {
return client_state(_cs, _trace_state, _auth_service);
return client_state(_cs, _auth_service);
}
};
private:
client_state(const client_state* cs, tracing::global_trace_state_ptr gt, seastar::sharded<auth::service>* auth_service)
: _keyspace(cs->_keyspace), _trace_state_ptr(gt), _user(cs->_user), _auth_state(cs->_auth_state),
client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service)
: _keyspace(cs->_keyspace), _user(cs->_user), _auth_state(cs->_auth_state),
_is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address),
_auth_service(auth_service ? &auth_service->local() : nullptr) {}
friend client_state_for_another_shard;
private:
sstring _keyspace;
tracing::trace_state_ptr _trace_state_ptr;
#if 0
private static final Logger logger = LoggerFactory.getLogger(ClientState.class);
public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION;
@@ -138,18 +135,6 @@ public:
struct internal_tag {};
struct external_tag {};
void create_tracing_session(tracing::trace_type type, tracing::trace_state_props_set props) {
_trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(type, props);
}
tracing::trace_state_ptr& get_trace_state() {
return _trace_state_ptr;
}
const tracing::trace_state_ptr& get_trace_state() const {
return _trace_state_ptr;
}
auth_state get_auth_state() const noexcept {
return _auth_state;
}
@@ -344,7 +329,7 @@ public:
}
client_state_for_another_shard move_to_other_shard() {
return client_state_for_another_shard(this, _trace_state_ptr, _auth_service ? &_auth_service->container() : nullptr);
return client_state_for_another_shard(this, _auth_service ? &_auth_service->container() : nullptr);
}
#if 0

View File

@@ -42,8 +42,10 @@
#pragma once
#include <vector>
#include <seastar/core/rwlock.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/util/defer.hh>
class keyspace_metadata;
class view_ptr;
@@ -129,14 +131,24 @@ public:
class migration_notifier {
private:
std::vector<migration_listener*> _listeners;
class listener_vector {
std::vector<migration_listener*> _vec;
rwlock _vec_lock;
public:
void add(migration_listener* listener);
future<> remove(migration_listener* listener);
// This must be called on a thread.
void for_each(noncopyable_function<void(migration_listener*)> func);
};
listener_vector _listeners;
public:
/// Register a migration listener on current shard.
void register_listener(migration_listener* listener);
/// Unregister a migration listener on current shard.
void unregister_listener(migration_listener* listener);
future<> unregister_listener(migration_listener* listener);
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
future<> create_column_family(const schema_ptr& cfm);

View File

@@ -70,6 +70,11 @@ migration_manager::migration_manager(migration_notifier& notifier) : _notifier(n
future<> migration_manager::stop()
{
mlogger.info("stopping migration service");
_as.request_abort();
if (!_cluster_upgraded) {
_wait_cluster_upgraded.broken();
}
return uninit_messaging_service().then([this] {
return parallel_for_each(_schema_pulls.begin(), _schema_pulls.end(), [] (auto&& e) {
serialized_action& sp = e.second;
@@ -97,6 +102,10 @@ void migration_manager::init_messaging_service()
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
_feature_listeners.push_back(ss.cluster_supports_cdc().when_enabled(update_schema));
}
_feature_listeners.push_back(ss.cluster_supports_schema_tables_v3().when_enabled([this] {
_cluster_upgraded = true;
_wait_cluster_upgraded.broadcast();
}));
auto& ms = netw::get_local_messaging_service();
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
@@ -161,14 +170,37 @@ future<> migration_manager::uninit_messaging_service()
);
}
void migration_notifier::register_listener(migration_listener* listener)
{
_listeners.emplace_back(listener);
void migration_notifier::listener_vector::add(migration_listener* listener) {
_vec.push_back(listener);
}
void migration_notifier::unregister_listener(migration_listener* listener)
future<> migration_notifier::listener_vector::remove(migration_listener* listener) {
return with_lock(_vec_lock.for_write(), [this, listener] {
_vec.erase(std::remove(_vec.begin(), _vec.end(), listener), _vec.end());
});
}
void migration_notifier::listener_vector::for_each(noncopyable_function<void(migration_listener*)> func) {
_vec_lock.for_read().lock().get();
auto unlock = defer([this] {
_vec_lock.for_read().unlock();
});
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard agaist the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
}
}
void migration_notifier::register_listener(migration_listener* listener)
{
_listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end());
_listeners.add(listener);
}
future<> migration_notifier::unregister_listener(migration_listener* listener)
{
return _listeners.remove(listener);
}
future<> migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state)
@@ -220,7 +252,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
{
auto& proxy = get_local_storage_proxy();
auto& db = proxy.get_db().local();
auto& ss = get_storage_service().local();
if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) {
mlogger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return make_ready_future<>();
@@ -228,21 +260,23 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
// Disable pulls during rolling upgrade from 1.7 to 2.0 to avoid
// schema version inconsistency. See https://github.com/scylladb/scylla/issues/2802.
if (!ss.cluster_supports_schema_tables_v3()) {
if (!_cluster_upgraded) {
mlogger.debug("Delaying pull with {} until cluster upgrade is complete", endpoint);
return ss.cluster_supports_schema_tables_v3().when_enabled().then([this, their_version, endpoint] {
return _wait_cluster_upgraded.wait().then([this, their_version, endpoint] {
return maybe_schedule_schema_pull(their_version, endpoint);
});
}).finally([me = shared_from_this()] {});
}
if (db.get_version() == database::empty_version || runtime::get_uptime() < migration_delay) {
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
mlogger.debug("Submitting migration task for {}", endpoint);
return submit_migration_task(endpoint);
} else {
}
return with_gate(_background_tasks, [this, &db, endpoint] {
// Include a delay to make sure we have a chance to apply any changes being
// pushed out simultaneously. See CASSANDRA-5025
return sleep(migration_delay).then([this, &proxy, endpoint] {
return sleep_abortable(migration_delay, _as).then([this, &db, endpoint] {
// grab the latest version of the schema since it may have changed again since the initial scheduling
auto& gossiper = gms::get_local_gossiper();
auto* ep_state = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
@@ -256,7 +290,6 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
return make_ready_future<>();
}
utils::UUID current_version{value->value};
auto& db = proxy.get_db().local();
if (db.get_version() == current_version) {
mlogger.debug("not submitting migration task for {} because our versions match", endpoint);
return make_ready_future<>();
@@ -264,7 +297,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
mlogger.debug("submitting migration task for {}", endpoint);
return submit_migration_task(endpoint);
});
}
}).finally([me = shared_from_this()] {});
}
future<> migration_manager::submit_migration_task(const gms::inet_address& endpoint)
@@ -357,56 +390,56 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
future<> migration_notifier::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
const auto& name = ksm->name();
_listeners.for_each([&name] (migration_listener* listener) {
try {
listener->on_create_keyspace(name);
} catch (...) {
mlogger.warn("Create keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
});
}
future<> migration_notifier::create_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
for (auto&& listener : _listeners) {
const auto& ks_name = cfm->ks_name();
const auto& cf_name = cfm->cf_name();
_listeners.for_each([&ks_name, &cf_name] (migration_listener* listener) {
try {
listener->on_create_column_family(ks_name, cf_name);
} catch (...) {
mlogger.warn("Create column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
});
}
future<> migration_notifier::create_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
for (auto&& listener : _listeners) {
const auto& ks_name = type->_keyspace;
const auto& type_name = type->get_name_as_string();
_listeners.for_each([&ks_name, &type_name] (migration_listener* listener) {
try {
listener->on_create_user_type(ks_name, type_name);
} catch (...) {
mlogger.warn("Create user type notification failed {}.{}: {}", ks_name, type_name, std::current_exception());
}
}
});
});
}
future<> migration_notifier::create_view(const view_ptr& view) {
return seastar::async([this, view] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
for (auto&& listener : _listeners) {
const auto& ks_name = view->ks_name();
const auto& view_name = view->cf_name();
_listeners.for_each([&ks_name, &view_name] (migration_listener* listener) {
try {
listener->on_create_view(ks_name, view_name);
} catch (...) {
mlogger.warn("Create view notification failed {}.{}: {}", ks_name, view_name, std::current_exception());
}
}
});
});
}
@@ -426,56 +459,56 @@ public void notifyCreateAggregate(UDAggregate udf)
future<> migration_notifier::update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
const auto& name = ksm->name();
_listeners.for_each([&name] (migration_listener* listener) {
try {
listener->on_update_keyspace(name);
} catch (...) {
mlogger.warn("Update keyspace notification failed {}: {}", name, std::current_exception());
}
}
});
});
}
future<> migration_notifier::update_column_family(const schema_ptr& cfm, bool columns_changed) {
return seastar::async([this, cfm, columns_changed] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
for (auto&& listener : _listeners) {
const auto& ks_name = cfm->ks_name();
const auto& cf_name = cfm->cf_name();
_listeners.for_each([&ks_name, &cf_name, columns_changed] (migration_listener* listener) {
try {
listener->on_update_column_family(ks_name, cf_name, columns_changed);
} catch (...) {
mlogger.warn("Update column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
});
}
future<> migration_notifier::update_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
for (auto&& listener : _listeners) {
const auto& ks_name = type->_keyspace;
const auto& type_name = type->get_name_as_string();
_listeners.for_each([&ks_name, &type_name] (migration_listener* listener) {
try {
listener->on_update_user_type(ks_name, type_name);
} catch (...) {
mlogger.warn("Update user type notification failed {}.{}: {}", ks_name, type_name, std::current_exception());
}
}
});
});
}
future<> migration_notifier::update_view(const view_ptr& view, bool columns_changed) {
return seastar::async([this, view, columns_changed] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
for (auto&& listener : _listeners) {
const auto& ks_name = view->ks_name();
const auto& view_name = view->cf_name();
_listeners.for_each([&ks_name, &view_name, columns_changed] (migration_listener* listener) {
try {
listener->on_update_view(ks_name, view_name, columns_changed);
} catch (...) {
mlogger.warn("Update view notification failed {}.{}: {}", ks_name, view_name, std::current_exception());
}
}
});
});
}
@@ -495,27 +528,27 @@ public void notifyUpdateAggregate(UDAggregate udf)
future<> migration_notifier::drop_keyspace(const sstring& ks_name) {
return seastar::async([this, ks_name] {
for (auto&& listener : _listeners) {
_listeners.for_each([&ks_name] (migration_listener* listener) {
try {
listener->on_drop_keyspace(ks_name);
} catch (...) {
mlogger.warn("Drop keyspace notification failed {}: {}", ks_name, std::current_exception());
}
}
});
});
}
future<> migration_notifier::drop_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& cf_name = cfm->cf_name();
auto&& ks_name = cfm->ks_name();
for (auto&& listener : _listeners) {
const auto& cf_name = cfm->cf_name();
const auto& ks_name = cfm->ks_name();
_listeners.for_each([&ks_name, &cf_name] (migration_listener* listener) {
try {
listener->on_drop_column_family(ks_name, cf_name);
} catch (...) {
mlogger.warn("Drop column family notification failed {}.{}: {}", ks_name, cf_name, std::current_exception());
}
}
});
});
}
@@ -523,13 +556,13 @@ future<> migration_notifier::drop_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
for (auto&& listener : _listeners) {
_listeners.for_each([&ks_name, &type_name] (migration_listener* listener) {
try {
listener->on_drop_user_type(ks_name, type_name);
} catch (...) {
mlogger.warn("Drop user type notification failed {}.{}: {}", ks_name, type_name, std::current_exception());
}
}
});
});
}
@@ -537,38 +570,38 @@ future<> migration_notifier::drop_view(const view_ptr& view) {
return seastar::async([this, view] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
for (auto&& listener : _listeners) {
_listeners.for_each([&ks_name, &view_name] (migration_listener* listener) {
try {
listener->on_drop_view(ks_name, view_name);
} catch (...) {
mlogger.warn("Drop view notification failed {}.{}: {}", ks_name, view_name, std::current_exception());
}
}
});
});
}
void migration_notifier::before_create_column_family(const schema& schema,
std::vector<mutation>& mutations, api::timestamp_type timestamp) {
for (auto&& listener : _listeners) {
_listeners.for_each([&mutations, &schema, timestamp] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_before_create_column_family(schema, mutations, timestamp);
}
});
}
void migration_notifier::before_update_column_family(const schema& new_schema,
const schema& old_schema, std::vector<mutation>& mutations, api::timestamp_type ts) {
for (auto&& listener : _listeners) {
_listeners.for_each([&mutations, &new_schema, &old_schema, ts] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill an update-column
listener->on_before_update_column_family(new_schema, old_schema, mutations, ts);
}
});
}
void migration_notifier::before_drop_column_family(const schema& schema,
std::vector<mutation>& mutations, api::timestamp_type ts) {
for (auto&& listener : _listeners) {
_listeners.for_each([&mutations, &schema, ts] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a drop-column
listener->on_before_drop_column_family(schema, mutations, ts);
}
});
}

View File

@@ -62,6 +62,9 @@ private:
std::vector<gms::feature::listener_registration> _feature_listeners;
seastar::gate _background_tasks;
static const std::chrono::milliseconds migration_delay;
seastar::abort_source _as;
bool _cluster_upgraded = false;
seastar::condition_variable _wait_cluster_upgraded;
public:
explicit migration_manager(migration_notifier&);

View File

@@ -38,7 +38,7 @@ private:
public:
query_state(client_state& client_state, service_permit permit)
: _client_state(client_state)
, _trace_state_ptr(_client_state.get_trace_state())
, _trace_state_ptr(tracing::trace_state_ptr())
, _permit(std::move(permit))
{ }

View File

@@ -2319,6 +2319,14 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
allow_hints::no);
}
future<> storage_proxy::mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s) {
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
// unavailable exception.
const auto timeout = db::timeout_clock::now() + 1h;
std::array<mutation, 1> ms{fm_a_s.fm.unfreeze(fm_a_s.s)};
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, empty_service_permit(), timeout);
}
/**
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
* is not available.

View File

@@ -471,6 +471,8 @@ public:
*/
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit);
future<> mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s);
// Send a mutation to one specific remote target.
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
// hinted handoff support, and just one target. See also

View File

@@ -555,7 +555,6 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
}
slogger.info("Starting up server gossip");
_gossiper.register_(this->shared_from_this());
auto generation_number = db::system_keyspace::increment_and_get_generation().get0();
_gossiper.start_gossiping(generation_number, app_states, gms::bind_messaging_port(bool(do_bind))).get();
@@ -1431,7 +1430,8 @@ future<> storage_service::drain_on_shutdown() {
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
slogger.info("Drain on shutdown: system distributed keyspace stopped");
get_storage_proxy().invoke_on_all([&ss] (storage_proxy& local_proxy) mutable {
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
auto& ss = service::get_local_storage_service();
ss.unregister_subscriber(&local_proxy);
return local_proxy.drain_on_shutdown();
}).get();
@@ -1445,7 +1445,7 @@ future<> storage_service::drain_on_shutdown() {
}).get();
slogger.info("Drain on shutdown: shutdown commitlog done");
ss._mnotifier.local().unregister_listener(&ss);
ss._mnotifier.local().unregister_listener(&ss).get();
slogger.info("Drain on shutdown: done");
});

View File

@@ -4702,3 +4702,45 @@ SEASTAR_TEST_CASE(test_describe_view_schema) {
}
});
}
// Test that tombstones with future timestamps work correctly
// when a write with lower timestamp arrives - in such case,
// if the base row is covered by such a tombstone, a view update
// needs to take it into account. Refs #5793
SEASTAR_TEST_CASE(test_views_with_future_tombstones) {
return do_with_cql_env_thread([] (auto& e) {
cquery_nofail(e, "CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY (a,b,c));");
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT * FROM t"
" WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (b,a,c);");
// Partition tombstone
cquery_nofail(e, "delete from t using timestamp 10 where a=1;");
auto msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "insert into t (a,b,c,d,e) values (1,2,3,4,5) using timestamp 8;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
msg = cquery_nofail(e, "select * from tv;");
assert_that(msg).is_rows().with_size(0);
// Range tombstone
cquery_nofail(e, "delete from t using timestamp 16 where a=2 and b > 1 and b < 4;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "insert into t (a,b,c,d,e) values (2,3,4,5,6) using timestamp 12;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
msg = cquery_nofail(e, "select * from tv;");
assert_that(msg).is_rows().with_size(0);
// Row tombstone
cquery_nofail(e, "delete from t using timestamp 24 where a=3 and b=4 and c=5;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "insert into t (a,b,c,d,e) values (3,4,5,6,7) using timestamp 18;");
msg = cquery_nofail(e, "select * from t;");
assert_that(msg).is_rows().with_size(0);
msg = cquery_nofail(e, "select * from tv;");
assert_that(msg).is_rows().with_size(0);
});
}

View File

@@ -415,6 +415,8 @@ BOOST_AUTO_TEST_CASE(test_varint) {
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00deadbeef"))), boost::multiprecision::cpp_int("0xdeadbeef"));
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00ffffffffffffffffffffffffffffffff"))), boost::multiprecision::cpp_int("340282366920938463463374607431768211455"));
BOOST_REQUIRE_EQUAL(from_hex("80000000"), varint_type->decompose(boost::multiprecision::cpp_int(-2147483648)));
test_parsing_fails(varint_type, "1A");
}

View File

@@ -79,6 +79,20 @@ SEASTAR_TEST_CASE(test_user_function_out_of_memory) {
});
}
SEASTAR_TEST_CASE(test_user_function_use_null) {
return with_udf_enabled([] (cql_test_env& e) {
e.execute_cql("CREATE TABLE my_table (key text PRIMARY KEY, val int);").get();
e.execute_cql("INSERT INTO my_table (key, val) VALUES ('foo', null);").get();
e.execute_cql("CREATE FUNCTION my_func1(val int) CALLED ON NULL INPUT RETURNS int LANGUAGE Lua AS 'return val + 1';").get();
e.execute_cql("CREATE FUNCTION my_func2(val int) CALLED ON NULL INPUT RETURNS int LANGUAGE Lua AS 'return val';").get();
BOOST_REQUIRE_EXCEPTION(e.execute_cql("SELECT my_func1(val) FROM my_table;").get0(), ire, message_equals("lua execution failed: ?:-1: attempt to perform arithmetic on a nil value"));
auto res = e.execute_cql("SELECT my_func2(val) FROM my_table;").get0();
assert_that(res).is_rows().with_rows({{std::nullopt}});
res = e.execute_cql("SELECT val FROM my_table;").get0();
assert_that(res).is_rows().with_rows({{std::nullopt}});
});
}
SEASTAR_TEST_CASE(test_user_function_wrong_return_type) {
return with_udf_enabled([] (cql_test_env& e) {
e.execute_cql("CREATE TABLE my_table (key text PRIMARY KEY, val int);").get();

View File

@@ -25,7 +25,7 @@
#include <seastar/util/noncopyable_function.hh>
inline
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 17) {
size_t attempts = 0;
while (true) {
try {
@@ -43,7 +43,7 @@ void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
inline
bool eventually_true(noncopyable_function<bool ()> f) {
const unsigned max_attempts = 10;
const unsigned max_attempts = 15;
unsigned attempts = 0;
while (true) {
if (f()) {

View File

@@ -133,6 +133,7 @@ fi
tmpdir=$(mktemp -d)
docker_common_args+=(
--pids-limit -1 \
--network host \
--cap-add SYS_PTRACE \
-v "$PWD:$PWD:z" \

View File

@@ -38,7 +38,13 @@ cql_server::event_notifier::event_notifier(service::migration_notifier& mn) : _m
cql_server::event_notifier::~event_notifier()
{
service::get_local_storage_service().unregister_subscriber(this);
_mnotifier.unregister_listener(this);
assert(_stopped);
}
future<> cql_server::event_notifier::stop() {
return _mnotifier.unregister_listener(this).then([this]{
_stopped = true;
});
}
void cql_server::event_notifier::register_event(event::event_type et, cql_server::connection* conn)

View File

@@ -197,6 +197,8 @@ future<> cql_server::stop() {
return c.shutdown().then([nr_conn, nr_conn_total] {
clogger.debug("cql_server: shutdown connection {} out of {} done", ++(*nr_conn), nr_conn_total);
});
}).then([this] {
return _notifier->stop();
}).then([this] {
return std::move(_stopped);
});
@@ -372,6 +374,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
trace_props.set_if<tracing::trace_state_props::log_slow_query>(tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled());
trace_props.set_if<tracing::trace_state_props::full_tracing>(tracing_request != tracing_request_type::not_requested);
tracing::trace_state_ptr trace_state;
if (trace_props) {
if (cqlop == cql_binary_opcode::QUERY ||
@@ -379,15 +382,15 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
cqlop == cql_binary_opcode::EXECUTE ||
cqlop == cql_binary_opcode::BATCH) {
trace_props.set_if<tracing::trace_state_props::write_on_close>(tracing_request == tracing_request_type::write_on_close);
client_state.create_tracing_session(tracing::trace_type::QUERY, trace_props);
trace_state = tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, trace_props);
}
}
tracing::set_request_size(client_state.get_trace_state(), fbuf.bytes_left());
tracing::set_request_size(trace_state, fbuf.bytes_left());
auto linearization_buffer = std::make_unique<bytes_ostream>();
auto linearization_buffer_ptr = linearization_buffer.get();
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit)] () mutable {
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable {
// When using authentication, we need to ensure we are doing proper state transitions,
// i.e. we cannot simply accept any query/exec ops unless auth is complete
switch (client_state.get_auth_state()) {
@@ -409,7 +412,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
break;
}
tracing::set_username(client_state.get_trace_state(), client_state.user());
tracing::set_username(trace_state, client_state.user());
auto wrap_in_foreign = [] (future<std::unique_ptr<cql_server::response>> f) {
return f.then([] (std::unique_ptr<cql_server::response> p) {
@@ -418,19 +421,19 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
};
auto in = request_reader(std::move(fbuf), *linearization_buffer_ptr);
switch (cqlop) {
case cql_binary_opcode::STARTUP: return wrap_in_foreign(process_startup(stream, std::move(in), client_state));
case cql_binary_opcode::AUTH_RESPONSE: return wrap_in_foreign(process_auth_response(stream, std::move(in), client_state));
case cql_binary_opcode::OPTIONS: return wrap_in_foreign(process_options(stream, std::move(in), client_state));
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit));
case cql_binary_opcode::PREPARE: return wrap_in_foreign(process_prepare(stream, std::move(in), client_state));
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit));
case cql_binary_opcode::BATCH: return wrap_in_foreign(process_batch(stream, std::move(in), client_state, std::move(permit)));
case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state));
case cql_binary_opcode::STARTUP: return wrap_in_foreign(process_startup(stream, std::move(in), client_state, trace_state));
case cql_binary_opcode::AUTH_RESPONSE: return wrap_in_foreign(process_auth_response(stream, std::move(in), client_state, trace_state));
case cql_binary_opcode::OPTIONS: return wrap_in_foreign(process_options(stream, std::move(in), client_state, trace_state));
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit), trace_state);
case cql_binary_opcode::PREPARE: return wrap_in_foreign(process_prepare(stream, std::move(in), client_state, trace_state));
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit), trace_state);
case cql_binary_opcode::BATCH: return wrap_in_foreign(process_batch(stream, std::move(in), client_state, std::move(permit), trace_state));
case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state, trace_state));
default: throw exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop)));
}
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer)] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> f) -> foreign_ptr<std::unique_ptr<cql_server::response>> {
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> f) -> foreign_ptr<std::unique_ptr<cql_server::response>> {
auto stop_trace = defer([&] {
tracing::stop_foreground(client_state.get_trace_state());
tracing::stop_foreground(trace_state);
});
--_server._requests_serving;
try {
@@ -461,28 +464,28 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
break;
}
tracing::set_response_size(client_state.get_trace_state(), response->size());
tracing::set_response_size(trace_state, response->size());
return response;
} catch (const exceptions::unavailable_exception& ex) {
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, client_state.get_trace_state());
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, trace_state);
} catch (const exceptions::read_timeout_exception& ex) {
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, client_state.get_trace_state());
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, trace_state);
} catch (const exceptions::read_failure_exception& ex) {
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, client_state.get_trace_state());
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, trace_state);
} catch (const exceptions::mutation_write_timeout_exception& ex) {
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, client_state.get_trace_state());
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, trace_state);
} catch (const exceptions::mutation_write_failure_exception& ex) {
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, client_state.get_trace_state());
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, trace_state);
} catch (const exceptions::already_exists_exception& ex) {
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, client_state.get_trace_state());
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, trace_state);
} catch (const exceptions::prepared_query_not_found_exception& ex) {
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, client_state.get_trace_state());
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
} catch (const exceptions::cassandra_exception& ex) {
return make_error(stream, ex.code(), ex.what(), client_state.get_trace_state());
return make_error(stream, ex.code(), ex.what(), trace_state);
} catch (std::exception& ex) {
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), client_state.get_trace_state());
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), trace_state);
} catch (...) {
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", client_state.get_trace_state());
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
}
});
}
@@ -695,8 +698,8 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
return _buffer_reader.read_exactly(_read_buf, length);
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state)
{
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
auto options = in.read_string_map();
auto compression_opt = options.find("COMPRESSION");
if (compression_opt != options.end()) {
@@ -712,33 +715,31 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
}
auto& a = client_state.get_auth_service()->underlying_authenticator();
if (a.require_authentication()) {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), client_state.get_trace_state()));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), trace_state));
}
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, trace_state));
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state)
{
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
auto buf = in.read_raw_bytes_view(in.bytes_left());
auto challenge = sasl_challenge->evaluate_response(buf);
if (sasl_challenge->is_complete()) {
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge)](auth::authenticated_user user) mutable {
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable {
client_state.set_login(std::move(user));
auto f = client_state.check_user_can_login();
return f.then([this, stream, &client_state, challenge = std::move(challenge)]() mutable {
auto tr_state = client_state.get_trace_state();
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), tr_state));
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
});
});
}
auto tr_state = client_state.get_trace_state();
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), tr_state));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), trace_state));
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state)
{
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, client_state.get_trace_state()));
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, std::move(trace_state)));
}
void
@@ -753,22 +754,23 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, bool init_trace) {
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state,
bool init_trace) {
auto query = in.read_long_string_view();
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
q_state->options = in.read_options(version, serialization_format, timeout_config, cql_config);
auto& options = *q_state->options;
auto skip_metadata = options.skip_metadata();
if (init_trace) {
tracing::set_page_size(query_state.get_trace_state(), options.get_page_size());
tracing::set_consistency_level(query_state.get_trace_state(), options.get_consistency());
tracing::set_optional_serial_consistency_level(query_state.get_trace_state(), options.get_serial_consistency());
tracing::add_query(query_state.get_trace_state(), query);
tracing::set_user_timestamp(query_state.get_trace_state(), options.get_specific_options().timestamp);
tracing::set_page_size(trace_state, options.get_page_size());
tracing::set_consistency_level(trace_state, options.get_consistency());
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
tracing::add_query(trace_state, query);
tracing::set_user_timestamp(trace_state, options.get_specific_options().timestamp);
tracing::begin(query_state.get_trace_state(), "Execute CQL3 query", client_state.get_client_address());
tracing::begin(trace_state, "Execute CQL3 query", client_state.get_client_address());
}
return qp.local().process(query, query_state, options).then([q_state = std::move(q_state), stream, skip_metadata, version] (auto msg) {
@@ -783,16 +785,18 @@ process_query_internal(service::client_state& client_state, distributed<cql3::qu
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
cql_server::connection::process_query_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
service::client_state& cs, service_permit permit) {
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state) {
return smp::submit_to(shard, _server._config.bounce_request_smp_service_group,
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit)] () {
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit),
gt = tracing::global_trace_state_ptr(std::move(trace_state))] () {
service::client_state client_state = cs.get();
cql_server& server = s.get().local();
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream]
tracing::trace_state_ptr trace_state = gt;
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream, trace_state]
(bytes_ostream& linearization_buffer, service::client_state& client_state) {
request_reader in(is, linearization_buffer);
return process_query_internal(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), false).then([] (auto msg) {
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), trace_state, false).then([] (auto msg) {
// result here has to be foreign ptr
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
});
@@ -801,27 +805,27 @@ cql_server::connection::process_query_on_shard(unsigned shard, uint16_t stream,
}
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
{
fragmented_temporary_buffer::istream is = in.get_stream();
return process_query_internal(client_state, _server._query_processor, in, stream,
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, true)
.then([stream, &client_state, this, is, permit] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, trace_state, true)
.then([stream, &client_state, this, is, permit, trace_state] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
unsigned* shard = std::get_if<unsigned>(&msg);
if (shard) {
return process_query_on_shard(*shard, stream, is, client_state, std::move(permit));
return process_query_on_shard(*shard, stream, is, client_state, std::move(permit), std::move(trace_state));
}
return make_ready_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg)));
});
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state)
{
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
auto query = sstring(in.read_long_string_view());
tracing::add_query(client_state.get_trace_state(), query);
tracing::begin(client_state.get_trace_state(), "Preparing CQL3 query", client_state.get_client_address());
tracing::add_query(trace_state, query);
tracing::begin(trace_state, "Preparing CQL3 query", client_state.get_client_address());
auto cpu_id = engine().cpu_id();
auto cpus = boost::irange(0u, smp::count);
@@ -833,13 +837,13 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
} else {
return make_ready_future<>();
}
}).then([this, query, stream, &client_state] () mutable {
tracing::trace(client_state.get_trace_state(), "Done preparing on remote shards");
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state] (auto msg) {
tracing::trace(client_state.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
}).then([this, query, stream, &client_state, trace_state] () mutable {
tracing::trace(trace_state, "Done preparing on remote shards");
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state, trace_state] (auto msg) {
tracing::trace(trace_state, "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
return messages::result_message::prepared::cql::get_id(msg);
}));
return make_result(stream, *msg, client_state.get_trace_state(), _version);
return make_result(stream, *msg, trace_state, _version);
});
});
}
@@ -847,7 +851,8 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, bool init_trace) {
const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit,
tracing::trace_state_ptr trace_state, bool init_trace) {
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
bool needs_authorization = false;
@@ -864,7 +869,7 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
throw exceptions::prepared_query_not_found_exception(id);
}
auto q_state = std::make_unique<cql_query_state>(client_state, client_state.get_trace_state(), std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
if (version == 1) {
std::vector<cql3::raw_value_view> values;
@@ -879,33 +884,33 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
auto skip_metadata = options.skip_metadata();
if (init_trace) {
tracing::set_page_size(client_state.get_trace_state(), options.get_page_size());
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
tracing::add_query(client_state.get_trace_state(), prepared->raw_cql_statement);
tracing::add_prepared_statement(client_state.get_trace_state(), prepared);
tracing::set_page_size(trace_state, options.get_page_size());
tracing::set_consistency_level(trace_state, options.get_consistency());
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
tracing::add_query(trace_state, prepared->raw_cql_statement);
tracing::add_prepared_statement(trace_state, prepared);
tracing::begin(client_state.get_trace_state(), seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
tracing::begin(trace_state, seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
client_state.get_client_address());
}
auto stmt = prepared->statement;
tracing::trace(query_state.get_trace_state(), "Checking bounds");
tracing::trace(trace_state, "Checking bounds");
if (stmt->get_bound_terms() != options.get_values_count()) {
const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}",
stmt->get_bound_terms(),
options.get_values_count());
tracing::trace(query_state.get_trace_state(), msg);
tracing::trace(trace_state, msg);
throw exceptions::invalid_request_exception(msg);
}
options.prepare(prepared->bound_names);
if (init_trace) {
tracing::add_prepared_query_options(client_state.get_trace_state(), options);
tracing::add_prepared_query_options(trace_state, options);
}
tracing::trace(query_state.get_trace_state(), "Processing a statement");
tracing::trace(trace_state, "Processing a statement");
return qp.local().process_statement_prepared(std::move(prepared), std::move(cache_key), query_state, options, needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version] (auto msg) {
if (msg->move_to_shard()) {
@@ -919,16 +924,18 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
cql_server::connection::process_execute_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
service::client_state& cs, service_permit permit) {
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state) {
return smp::submit_to(shard, _server._config.bounce_request_smp_service_group,
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit)] () {
[this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit),
gt = tracing::global_trace_state_ptr(std::move(trace_state))] () {
service::client_state client_state = cs.get();
tracing::trace_state_ptr trace_state = gt;
cql_server& server = s.get().local();
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream]
return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream, trace_state]
(bytes_ostream& linearization_buffer, service::client_state& client_state) {
request_reader in(is, linearization_buffer);
return process_execute_internal(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), false).then([] (auto msg) {
server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), trace_state, false).then([] (auto msg) {
// result here has to be foreign ptr
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
});
@@ -937,22 +944,22 @@ cql_server::connection::process_execute_on_shard(unsigned shard, uint16_t stream
}
future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connection::process_execute(uint16_t stream, request_reader in,
service::client_state& client_state, service_permit permit) {
service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) {
fragmented_temporary_buffer::istream is = in.get_stream();
return process_execute_internal(client_state, _server._query_processor, in, stream,
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, true)
.then([stream, &client_state, this, is, permit] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
_version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, trace_state, true)
.then([stream, &client_state, this, is, permit, trace_state] (std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
unsigned* shard = std::get_if<unsigned>(&msg);
if (shard) {
return process_execute_on_shard(*shard, stream, is, client_state, std::move(permit));
return process_execute_on_shard(*shard, stream, is, client_state, std::move(permit), trace_state);
}
return make_ready_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg)));
});
}
future<std::unique_ptr<cql_server::response>>
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
{
if (_version == 1) {
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
@@ -968,7 +975,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
modifications.reserve(n);
values.reserve(n);
tracing::begin(client_state.get_trace_state(), "Execute batch of CQL3 queries", client_state.get_client_address());
tracing::begin(trace_state, "Execute batch of CQL3 queries", client_state.get_client_address());
for ([[gnu::unused]] auto i : boost::irange(0u, n)) {
const auto kind = in.read_byte();
@@ -982,7 +989,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
auto query = in.read_long_string_view();
stmt_ptr = _server._query_processor.local().get_statement(query, client_state);
ps = stmt_ptr->checked_weak_from_this();
tracing::add_query(client_state.get_trace_state(), query);
tracing::add_query(trace_state, query);
break;
}
case 1: {
@@ -1001,7 +1008,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
needs_authorization = pending_authorization_entries.emplace(std::move(cache_key), ps->checked_weak_from_this()).second;
}
tracing::add_query(client_state.get_trace_state(), ps->raw_cql_statement);
tracing::add_query(trace_state, ps->raw_cql_statement);
break;
}
default:
@@ -1015,8 +1022,8 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
}
::shared_ptr<cql3::statements::modification_statement> modif_statement_ptr = static_pointer_cast<cql3::statements::modification_statement>(ps->statement);
tracing::add_table_name(client_state.get_trace_state(), modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
tracing::add_prepared_statement(client_state.get_trace_state(), ps);
tracing::add_table_name(trace_state, modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
tracing::add_prepared_statement(trace_state, ps);
modifications.emplace_back(std::move(modif_statement_ptr), needs_authorization);
@@ -1031,16 +1038,16 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
values.emplace_back(std::move(tmp));
}
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
// #563. CQL v2 encodes query_options in v1 format for batch requests.
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values)));
auto& options = *q_state->options;
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
tracing::add_prepared_query_options(client_state.get_trace_state(), options);
tracing::trace(client_state.get_trace_state(), "Creating a batch statement");
tracing::set_consistency_level(trace_state, options.get_consistency());
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
tracing::add_prepared_query_options(trace_state, options);
tracing::trace(trace_state, "Creating a batch statement");
auto batch = ::make_shared<cql3::statements::batch_statement>(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), _server._query_processor.local().get_cql_stats());
return _server._query_processor.local().process_batch(batch, query_state, options, std::move(pending_authorization_entries))
@@ -1050,15 +1057,15 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
}
future<std::unique_ptr<cql_server::response>>
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state)
{
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state,
tracing::trace_state_ptr trace_state) {
std::vector<sstring> event_types;
in.read_string_list(event_types);
for (auto&& event_type : event_types) {
auto et = parse_event_type(event_type);
_server._notifier->register_event(et, this);
}
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, std::move(trace_state)));
}
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const

View File

@@ -96,9 +96,6 @@ struct cql_query_state {
service::query_state query_state;
std::unique_ptr<cql3::query_options> options;
cql_query_state(service::client_state& client_state, service_permit permit)
: query_state(client_state, std::move(permit))
{ }
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit)
: query_state(client_state, std::move(trace_state_ptr), std::move(permit))
{ }
@@ -197,14 +194,14 @@ private:
cql_binary_frame_v3 parse_frame(temporary_buffer<char> buf) const;
future<fragmented_temporary_buffer> read_and_decompress_frame(size_t length, uint8_t flags);
future<std::optional<cql_binary_frame_v3>> read_frame();
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state);
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state);
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state);
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
std::unique_ptr<cql_server::response> make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const;
@@ -224,11 +221,12 @@ private:
std::unique_ptr<cql_server::response> make_auth_challenge(int16_t, bytes, const tracing::trace_state_ptr& tr_state) const;
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
process_execute_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
service::client_state& cs, service_permit permit);
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state);
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
process_query_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is,
service::client_state& cs, service_permit permit);
service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state);
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none);
@@ -266,7 +264,9 @@ class cql_server::event_notifier : public service::migration_listener,
std::set<cql_server::connection*> _schema_change_listeners;
std::unordered_map<gms::inet_address, event::status_change::status_type> _last_status_change;
service::migration_notifier& _mnotifier;
bool _stopped = false;
public:
future<> stop();
event_notifier(service::migration_notifier& mn);
~event_notifier();
void register_event(cql_transport::event::event_type et, cql_server::connection* conn);

View File

@@ -2082,12 +2082,19 @@ static size_t concrete_serialized_size(const string_type_impl::native_type& v) {
static size_t concrete_serialized_size(const bytes_type_impl::native_type& v) { return v.size(); }
static size_t concrete_serialized_size(const inet_addr_type_impl::native_type& v) { return v.get().size(); }
static size_t concrete_serialized_size(const boost::multiprecision::cpp_int& num) {
if (!num) {
static size_t concrete_serialized_size_aux(const boost::multiprecision::cpp_int& num) {
if (num) {
return align_up(boost::multiprecision::msb(num) + 2, 8u) / 8;
} else {
return 1;
}
auto pnum = abs(num);
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
}
static size_t concrete_serialized_size(const boost::multiprecision::cpp_int& num) {
if (num < 0) {
return concrete_serialized_size_aux(-num - 1);
}
return concrete_serialized_size_aux(num);
}
static size_t concrete_serialized_size(const varint_type_impl::native_type& v) {

View File

@@ -2068,6 +2068,17 @@ bool segment_pool::migrate_segment(segment* src, segment* dst)
}
void tracker::impl::register_region(region::impl* r) {
// If needed, increase capacity of regions before taking the reclaim lock,
// to avoid failing an allocation when push_back() tries to increase
// capacity.
//
// The capacity increase is atomic (wrt _regions) so it cannot be
// observed
if (_regions.size() == _regions.capacity()) {
auto copy = _regions;
copy.reserve(copy.capacity() * 2);
_regions = std::move(copy);
}
reclaiming_lock _(*this);
_regions.push_back(r);
llogger.debug("Registered region @{} with id={}", r, r->id());